diff --git a/CHANGELOG.md b/CHANGELOG.md index 57764e7f9e..fef6a43a71 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ All notable changes to this project will be documented in this file. - Field `auto_replay_nacks` added to all inputs that traditionally automatically retry nacked messages as a toggle for this behaviour. - New `retry` processor. - New `noop` cache. +- Field `targets_input` added to the `azure_blob_storage` input. ### Fixed diff --git a/internal/docs/format_yaml.go b/internal/docs/format_yaml.go index 824c3ff86c..8bc4eb642f 100644 --- a/internal/docs/format_yaml.go +++ b/internal/docs/format_yaml.go @@ -725,6 +725,8 @@ func (f FieldSpec) ToYAML(recurse bool) (*yaml.Node, error) { } return &node, nil } + + _, isCore := f.Type.IsCoreComponent() if f.Kind == KindArray || f.Kind == Kind2DArray { s := []any{} if err := node.Encode(s); err != nil { @@ -738,6 +740,10 @@ func (f FieldSpec) ToYAML(recurse bool) (*yaml.Node, error) { if err := node.Encode(s); err != nil { return nil, err } + } else if isCore { + if err := node.Encode(nil); err != nil { + return nil, err + } } else { if len(f.Examples) > 0 { if err := node.Encode(f.Examples[0]); err != nil { diff --git a/internal/impl/azure/input_blob_storage.go b/internal/impl/azure/input_blob_storage.go index 3172ebfef3..a93e6c5f9f 100644 --- a/internal/impl/azure/input_blob_storage.go +++ b/internal/impl/azure/input_blob_storage.go @@ -8,11 +8,13 @@ import ( "net/http" "strings" "sync" + "sync/atomic" "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" + "github.com/Jeffail/gabs/v2" "github.com/benthosdev/benthos/v4/internal/codec" "github.com/benthosdev/benthos/v4/internal/codec/interop" @@ -26,6 +28,7 @@ const ( bsiFieldContainer = "container" bsiFieldPrefix = "prefix" bsiFieldDeleteObjects = "delete_objects" + bsiFieldTargetsInput = "targets_input" ) type bsiConfig struct { @@ -33,6 +36,7 @@ type bsiConfig struct { Container string Prefix string DeleteObjects bool + FileReader *service.OwnedInput Codec interop.FallbackReaderCodec } @@ -57,6 +61,11 @@ func bsiConfigFromParsed(pConf *service.ParsedConfig) (conf bsiConfig, err error if conf.DeleteObjects, err = pConf.FieldBool(bsiFieldDeleteObjects); err != nil { return } + if pConf.Contains(bsiFieldTargetsInput) { + if conf.FileReader, err = pConf.FieldInput(bsiFieldTargetsInput); err != nil { + return + } + } return } @@ -79,7 +88,11 @@ If the `+"`storage_connection_string`"+` does not contain the `+"`AccountName`"+ ## Downloading Large Files -When downloading large files it's often necessary to process it in streamed parts in order to avoid loading the entire file in memory at a given time. In order to do this a `+"[`codec`](#codec)"+` can be specified that determines how to break the input into smaller individual messages. +When downloading large files it's often necessary to process it in streamed parts in order to avoid loading the entire file in memory at a given time. In order to do this a `+"[`scanner`](#scanner)"+` can be specified that determines how to break the input into smaller individual messages. + +## Streaming New Files + +By default this input will consume all files found within the target container and will then gracefully terminate. This is referred to as a "batch" mode of operation. However, it's possible to instead configure a container as [an Event Grid source](https://learn.microsoft.com/en-gb/azure/event-grid/event-schema-blob-storage) and then use this as a `+"[`targets_input`](#targetsinput)"+`, in which case new files are consumed as they're uploaded and Benthos will continue listening for and downloading files as they arrive. This is referred to as a "streamed" mode of operation. ## Metadata @@ -109,6 +122,34 @@ You can access these metadata fields using [function interpolation](/docs/config Description("Whether to delete downloaded objects from the blob once they are processed."). Advanced(). Default(false), + service.NewInputField(bsiFieldTargetsInput). + Description("EXPERIMENTAL: An optional source of download targets, configured as a [regular Benthos input](/docs/components/inputs/about). Each message yielded by this input should be a single structured object containing a field `name`, which represents the blob to be downloaded."). + Optional(). + Version("4.27.0"). + Example(map[string]any{ + "mqtt": map[string]any{ + "urls": []any{ + "example.westeurope-1.ts.eventgrid.azure.net:8883", + }, + "topics": []any{ + "some-topic", + }, + }, + "processors": []any{ + map[string]any{ + "unarchive": map[string]any{ + "format": "json_array", + }, + }, + map[string]any{ + "mapping": `if this.eventType == "Microsoft.Storage.BlobCreated" { + root.name = this.data.url.parse_url().path.trim_prefix("/foocontainer/") +} else { + root = deleted() +}`, + }, + }, + }), ) } @@ -120,11 +161,15 @@ func init() { return nil, err } - rdr, err := newAzureBlobStorage(conf, res.Logger()) - if err != nil { + var rdr service.BatchInput + if rdr, err = newAzureBlobStorage(conf, res.Logger()); err != nil { return nil, err } - return service.AutoRetryNacksBatched(rdr), nil + + if conf.FileReader == nil { + rdr = service.AutoRetryNacksBatched(rdr) + } + return rdr, nil }) if err != nil { panic(err) @@ -179,13 +224,117 @@ type azurePendingObject struct { scanner interop.FallbackReaderStream } -type azureTargetReader struct { +type azureTargetReader interface { + Pop(ctx context.Context) (*azureObjectTarget, error) + Close(context.Context) error +} + +func newAzureTargetReader(ctx context.Context, logger *service.Logger, conf bsiConfig) (azureTargetReader, error) { + if conf.FileReader == nil { + return newAzureTargetBatchReader(ctx, conf) + } + return &azureTargetStreamReader{ + input: conf.FileReader, + log: logger, + }, nil +} + +//------------------------------------------------------------------------------ + +type azureTargetStreamReader struct { + pending []*azureObjectTarget + input *service.OwnedInput + log *service.Logger +} + +func (a *azureTargetStreamReader) Pop(ctx context.Context) (*azureObjectTarget, error) { + if len(a.pending) > 0 { + t := a.pending[0] + a.pending = a.pending[1:] + return t, nil + } + + for { + next, ackFn, err := a.input.ReadBatch(ctx) + if err != nil { + if errors.Is(err, service.ErrEndOfInput) { + return nil, io.EOF + } + return nil, err + } + + var pendingAcks int32 + var nackOnce sync.Once + for _, msg := range next { + mStructured, err := msg.AsStructured() + if err != nil { + a.log.With("error", err).Error("Failed to extract structured object from targets input message") + continue + } + + name, _ := gabs.Wrap(mStructured).S("name").Data().(string) + if name == "" { + a.log.Warn("Targets input yielded a message that did not contain a `name` field") + continue + } + + pendingAcks++ + + var ackOnce sync.Once + a.pending = append(a.pending, &azureObjectTarget{ + key: name, + ackFn: func(ctx context.Context, err error) (aerr error) { + if err != nil { + nackOnce.Do(func() { + // Prevent future acks from triggering a delete. + atomic.StoreInt32(&pendingAcks, -1) + + // It's possible that this is called for one message + // at the _exact_ same time as another is acked, but + // if the acked message triggers a full ack of the + // origin message then even though it shouldn't be + // possible, it's also harmless. + aerr = ackFn(ctx, err) + }) + } else { + ackOnce.Do(func() { + if atomic.AddInt32(&pendingAcks, -1) == 0 { + aerr = ackFn(ctx, nil) + } + }) + } + return + }, + }) + } + + if len(a.pending) > 0 { + t := a.pending[0] + a.pending = a.pending[1:] + return t, nil + } else { + // Ack the messages even though we didn't extract any valid names. + _ = ackFn(ctx, nil) + } + } +} + +func (a *azureTargetStreamReader) Close(ctx context.Context) error { + for _, p := range a.pending { + _ = p.ackFn(ctx, errors.New("shutting down")) + } + return a.input.Close(ctx) +} + +//------------------------------------------------------------------------------ + +type azureTargetBatchReader struct { pending []*azureObjectTarget conf bsiConfig pager *runtime.Pager[azblob.ListBlobsFlatResponse] } -func newAzureTargetReader(ctx context.Context, conf bsiConfig) (*azureTargetReader, error) { +func newAzureTargetBatchReader(ctx context.Context, conf bsiConfig) (*azureTargetBatchReader, error) { var maxResults int32 = 100 params := &azblob.ListBlobsFlatOptions{ MaxResults: &maxResults, @@ -194,7 +343,7 @@ func newAzureTargetReader(ctx context.Context, conf bsiConfig) (*azureTargetRead params.Prefix = &conf.Prefix } pager := conf.client.NewListBlobsFlatPager(conf.Container, params) - staticKeys := azureTargetReader{conf: conf} + staticKeys := azureTargetBatchReader{conf: conf} if pager.More() { page, err := pager.NextPage(ctx) if err != nil { @@ -210,7 +359,7 @@ func newAzureTargetReader(ctx context.Context, conf bsiConfig) (*azureTargetRead return &staticKeys, nil } -func (s *azureTargetReader) Pop(ctx context.Context) (*azureObjectTarget, error) { +func (s *azureTargetBatchReader) Pop(ctx context.Context) (*azureObjectTarget, error) { if len(s.pending) == 0 && s.pager.More() { s.pending = nil page, err := s.pager.NextPage(ctx) @@ -230,7 +379,7 @@ func (s *azureTargetReader) Pop(ctx context.Context) (*azureObjectTarget, error) return obj, nil } -func (s azureTargetReader) Close(context.Context) error { +func (s azureTargetBatchReader) Close(context.Context) error { return nil } @@ -240,7 +389,7 @@ type azureBlobStorage struct { conf bsiConfig objectScannerCtor interop.FallbackReaderCodec - keyReader *azureTargetReader + keyReader azureTargetReader objectMut sync.Mutex object *azurePendingObject @@ -259,7 +408,7 @@ func newAzureBlobStorage(conf bsiConfig, log *service.Logger) (*azureBlobStorage func (a *azureBlobStorage) Connect(ctx context.Context) error { var err error - a.keyReader, err = newAzureTargetReader(ctx, a.conf) + a.keyReader, err = newAzureTargetReader(ctx, a.log, a.conf) return err } diff --git a/internal/impl/azure/integration_test.go b/internal/impl/azure/integration_test.go index 8891982bd0..fbf39e2a90 100644 --- a/internal/impl/azure/integration_test.go +++ b/internal/impl/azure/integration_test.go @@ -102,6 +102,41 @@ input: ) }) + t.Run("blob_storage_streamed", func(t *testing.T) { + template := ` +output: + azure_blob_storage: + blob_type: BLOCK + container: $VAR1-$ID + max_in_flight: 1 + path: $VAR2/${!count("$ID")}.txt + public_access_level: PRIVATE + storage_connection_string: $VAR3 + +input: + azure_blob_storage: + container: $VAR1-$ID + prefix: $VAR2 + storage_connection_string: $VAR3 + targets_input: + azure_blob_storage: + container: $VAR1-$ID + prefix: $VAR2 + storage_connection_string: $VAR3 + processors: + - mapping: 'root.name = @blob_storage_key' +` + integration.StreamTests( + integration.StreamTestOpenCloseIsolated(), + integration.StreamTestStreamIsolated(10), + ).Run( + t, template, + integration.StreamTestOptVarOne(dummyContainer), + integration.StreamTestOptVarTwo(dummyPrefix), + integration.StreamTestOptVarThree(connString), + ) + }) + t.Run("blob_storage_append", func(t *testing.T) { template := ` output: diff --git a/website/docs/components/inputs/azure_blob_storage.md b/website/docs/components/inputs/azure_blob_storage.md index acf25e8542..c6f4914049 100644 --- a/website/docs/components/inputs/azure_blob_storage.md +++ b/website/docs/components/inputs/azure_blob_storage.md @@ -43,6 +43,7 @@ input: prefix: "" scanner: to_the_end: {} + targets_input: null # No default (optional) ``` @@ -62,6 +63,7 @@ input: scanner: to_the_end: {} delete_objects: false + targets_input: null # No default (optional) ``` @@ -80,7 +82,11 @@ If the `storage_connection_string` does not contain the `AccountName` parameter, ## Downloading Large Files -When downloading large files it's often necessary to process it in streamed parts in order to avoid loading the entire file in memory at a given time. In order to do this a [`codec`](#codec) can be specified that determines how to break the input into smaller individual messages. +When downloading large files it's often necessary to process it in streamed parts in order to avoid loading the entire file in memory at a given time. In order to do this a [`scanner`](#scanner) can be specified that determines how to break the input into smaller individual messages. + +## Streaming New Files + +By default this input will consume all files found within the target container and will then gracefully terminate. This is referred to as a "batch" mode of operation. However, it's possible to instead configure a container as [an Event Grid source](https://learn.microsoft.com/en-gb/azure/event-grid/event-schema-blob-storage) and then use this as a [`targets_input`](#targetsinput), in which case new files are consumed as they're uploaded and Benthos will continue listening for and downloading files as they arrive. This is referred to as a "streamed" mode of operation. ## Metadata @@ -164,4 +170,32 @@ Whether to delete downloaded objects from the blob once they are processed. Type: `bool` Default: `false` +### `targets_input` + +EXPERIMENTAL: An optional source of download targets, configured as a [regular Benthos input](/docs/components/inputs/about). Each message yielded by this input should be a single structured object containing a field `name`, which represents the blob to be downloaded. + + +Type: `input` +Requires version 4.27.0 or newer + +```yml +# Examples + +targets_input: + mqtt: + topics: + - some-topic + urls: + - example.westeurope-1.ts.eventgrid.azure.net:8883 + processors: + - unarchive: + format: json_array + - mapping: |- + if this.eventType == "Microsoft.Storage.BlobCreated" { + root.name = this.data.url.parse_url().path.trim_prefix("/foocontainer/") + } else { + root = deleted() + } +``` +