Skip to content

Commit

Permalink
Add targets_input field to azure_blob_storage input
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Apr 10, 2024
1 parent bb359ee commit 3f4afe2
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ All notable changes to this project will be documented in this file.
- Field `auto_replay_nacks` added to all inputs that traditionally automatically retry nacked messages as a toggle for this behaviour.
- New `retry` processor.
- New `noop` cache.
- Field `targets_input` added to the `azure_blob_storage` input.

### Fixed

Expand Down
6 changes: 6 additions & 0 deletions internal/docs/format_yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,8 @@ func (f FieldSpec) ToYAML(recurse bool) (*yaml.Node, error) {
}
return &node, nil
}

_, isCore := f.Type.IsCoreComponent()
if f.Kind == KindArray || f.Kind == Kind2DArray {
s := []any{}
if err := node.Encode(s); err != nil {
Expand All @@ -738,6 +740,10 @@ func (f FieldSpec) ToYAML(recurse bool) (*yaml.Node, error) {
if err := node.Encode(s); err != nil {
return nil, err
}
} else if isCore {
if err := node.Encode(nil); err != nil {
return nil, err
}
} else {
if len(f.Examples) > 0 {
if err := node.Encode(f.Examples[0]); err != nil {
Expand Down
171 changes: 160 additions & 11 deletions internal/impl/azure/input_blob_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,13 @@ import (
"net/http"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Jeffail/gabs/v2"

"github.com/benthosdev/benthos/v4/internal/codec"
"github.com/benthosdev/benthos/v4/internal/codec/interop"
Expand All @@ -26,13 +28,15 @@ const (
bsiFieldContainer = "container"
bsiFieldPrefix = "prefix"
bsiFieldDeleteObjects = "delete_objects"
bsiFieldTargetsInput = "targets_input"
)

type bsiConfig struct {
client *azblob.Client
Container string
Prefix string
DeleteObjects bool
FileReader *service.OwnedInput
Codec interop.FallbackReaderCodec
}

Expand All @@ -57,6 +61,11 @@ func bsiConfigFromParsed(pConf *service.ParsedConfig) (conf bsiConfig, err error
if conf.DeleteObjects, err = pConf.FieldBool(bsiFieldDeleteObjects); err != nil {
return
}
if pConf.Contains(bsiFieldTargetsInput) {
if conf.FileReader, err = pConf.FieldInput(bsiFieldTargetsInput); err != nil {
return
}
}
return
}

Expand All @@ -79,7 +88,11 @@ If the `+"`storage_connection_string`"+` does not contain the `+"`AccountName`"+
## Downloading Large Files
When downloading large files it's often necessary to process it in streamed parts in order to avoid loading the entire file in memory at a given time. In order to do this a `+"[`codec`](#codec)"+` can be specified that determines how to break the input into smaller individual messages.
When downloading large files it's often necessary to process it in streamed parts in order to avoid loading the entire file in memory at a given time. In order to do this a `+"[`scanner`](#scanner)"+` can be specified that determines how to break the input into smaller individual messages.
## Streaming New Files
By default this input will consume all files found within the target container and will then gracefully terminate. This is referred to as a "batch" mode of operation. However, it's possible to instead configure a container as [an Event Grid source](https://learn.microsoft.com/en-gb/azure/event-grid/event-schema-blob-storage) and then use this as a `+"[`targets_input`](#targetsinput)"+`, in which case new files are consumed as they're uploaded and Benthos will continue listening for and downloading files as they arrive. This is referred to as a "streamed" mode of operation.
## Metadata
Expand Down Expand Up @@ -109,6 +122,34 @@ You can access these metadata fields using [function interpolation](/docs/config
Description("Whether to delete downloaded objects from the blob once they are processed.").
Advanced().
Default(false),
service.NewInputField(bsiFieldTargetsInput).
Description("EXPERIMENTAL: An optional source of download targets, configured as a [regular Benthos input](/docs/components/inputs/about). Each message yielded by this input should be a single structured object containing a field `name`, which represents the blob to be downloaded.").
Optional().
Version("4.27.0").
Example(map[string]any{
"mqtt": map[string]any{
"urls": []any{
"example.westeurope-1.ts.eventgrid.azure.net:8883",
},
"topics": []any{
"some-topic",
},
},
"processors": []any{
map[string]any{
"unarchive": map[string]any{
"format": "json_array",
},
},
map[string]any{
"mapping": `if this.eventType == "Microsoft.Storage.BlobCreated" {
root.name = this.data.url.parse_url().path.trim_prefix("/foocontainer/")
} else {
root = deleted()
}`,
},
},
}),
)
}

Expand All @@ -120,11 +161,15 @@ func init() {
return nil, err
}

rdr, err := newAzureBlobStorage(conf, res.Logger())
if err != nil {
var rdr service.BatchInput
if rdr, err = newAzureBlobStorage(conf, res.Logger()); err != nil {
return nil, err
}
return service.AutoRetryNacksBatched(rdr), nil

if conf.FileReader == nil {
rdr = service.AutoRetryNacksBatched(rdr)
}
return rdr, nil
})
if err != nil {
panic(err)
Expand Down Expand Up @@ -179,13 +224,117 @@ type azurePendingObject struct {
scanner interop.FallbackReaderStream
}

type azureTargetReader struct {
type azureTargetReader interface {
Pop(ctx context.Context) (*azureObjectTarget, error)
Close(context.Context) error
}

func newAzureTargetReader(ctx context.Context, logger *service.Logger, conf bsiConfig) (azureTargetReader, error) {
if conf.FileReader == nil {
return newAzureTargetBatchReader(ctx, conf)
}
return &azureTargetStreamReader{
input: conf.FileReader,
log: logger,
}, nil
}

//------------------------------------------------------------------------------

type azureTargetStreamReader struct {
pending []*azureObjectTarget
input *service.OwnedInput
log *service.Logger
}

func (a *azureTargetStreamReader) Pop(ctx context.Context) (*azureObjectTarget, error) {
if len(a.pending) > 0 {
t := a.pending[0]
a.pending = a.pending[1:]
return t, nil
}

for {
next, ackFn, err := a.input.ReadBatch(ctx)
if err != nil {
if errors.Is(err, service.ErrEndOfInput) {
return nil, io.EOF
}
return nil, err
}

var pendingAcks int32
var nackOnce sync.Once
for _, msg := range next {
mStructured, err := msg.AsStructured()
if err != nil {
a.log.With("error", err).Error("Failed to extract structured object from targets input message")
continue
}

name, _ := gabs.Wrap(mStructured).S("name").Data().(string)
if name == "" {
a.log.Warn("Targets input yielded a message that did not contain a `name` field")
continue
}

pendingAcks++

var ackOnce sync.Once
a.pending = append(a.pending, &azureObjectTarget{
key: name,
ackFn: func(ctx context.Context, err error) (aerr error) {
if err != nil {
nackOnce.Do(func() {
// Prevent future acks from triggering a delete.
atomic.StoreInt32(&pendingAcks, -1)

// It's possible that this is called for one message
// at the _exact_ same time as another is acked, but
// if the acked message triggers a full ack of the
// origin message then even though it shouldn't be
// possible, it's also harmless.
aerr = ackFn(ctx, err)
})
} else {
ackOnce.Do(func() {
if atomic.AddInt32(&pendingAcks, -1) == 0 {
aerr = ackFn(ctx, nil)
}
})
}
return
},
})
}

if len(a.pending) > 0 {
t := a.pending[0]
a.pending = a.pending[1:]
return t, nil
} else {
// Ack the messages even though we didn't extract any valid names.
_ = ackFn(ctx, nil)
}
}
}

func (a *azureTargetStreamReader) Close(ctx context.Context) error {
for _, p := range a.pending {
_ = p.ackFn(ctx, errors.New("shutting down"))
}
return a.input.Close(ctx)
}

//------------------------------------------------------------------------------

type azureTargetBatchReader struct {
pending []*azureObjectTarget
conf bsiConfig
pager *runtime.Pager[azblob.ListBlobsFlatResponse]
}

func newAzureTargetReader(ctx context.Context, conf bsiConfig) (*azureTargetReader, error) {
func newAzureTargetBatchReader(ctx context.Context, conf bsiConfig) (*azureTargetBatchReader, error) {
var maxResults int32 = 100
params := &azblob.ListBlobsFlatOptions{
MaxResults: &maxResults,
Expand All @@ -194,7 +343,7 @@ func newAzureTargetReader(ctx context.Context, conf bsiConfig) (*azureTargetRead
params.Prefix = &conf.Prefix
}
pager := conf.client.NewListBlobsFlatPager(conf.Container, params)
staticKeys := azureTargetReader{conf: conf}
staticKeys := azureTargetBatchReader{conf: conf}
if pager.More() {
page, err := pager.NextPage(ctx)
if err != nil {
Expand All @@ -210,7 +359,7 @@ func newAzureTargetReader(ctx context.Context, conf bsiConfig) (*azureTargetRead
return &staticKeys, nil
}

func (s *azureTargetReader) Pop(ctx context.Context) (*azureObjectTarget, error) {
func (s *azureTargetBatchReader) Pop(ctx context.Context) (*azureObjectTarget, error) {
if len(s.pending) == 0 && s.pager.More() {
s.pending = nil
page, err := s.pager.NextPage(ctx)
Expand All @@ -230,7 +379,7 @@ func (s *azureTargetReader) Pop(ctx context.Context) (*azureObjectTarget, error)
return obj, nil
}

func (s azureTargetReader) Close(context.Context) error {
func (s azureTargetBatchReader) Close(context.Context) error {
return nil
}

Expand All @@ -240,7 +389,7 @@ type azureBlobStorage struct {
conf bsiConfig

objectScannerCtor interop.FallbackReaderCodec
keyReader *azureTargetReader
keyReader azureTargetReader

objectMut sync.Mutex
object *azurePendingObject
Expand All @@ -259,7 +408,7 @@ func newAzureBlobStorage(conf bsiConfig, log *service.Logger) (*azureBlobStorage

func (a *azureBlobStorage) Connect(ctx context.Context) error {
var err error
a.keyReader, err = newAzureTargetReader(ctx, a.conf)
a.keyReader, err = newAzureTargetReader(ctx, a.log, a.conf)
return err
}

Expand Down
35 changes: 35 additions & 0 deletions internal/impl/azure/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,41 @@ input:
)
})

t.Run("blob_storage_streamed", func(t *testing.T) {
template := `
output:
azure_blob_storage:
blob_type: BLOCK
container: $VAR1-$ID
max_in_flight: 1
path: $VAR2/${!count("$ID")}.txt
public_access_level: PRIVATE
storage_connection_string: $VAR3
input:
azure_blob_storage:
container: $VAR1-$ID
prefix: $VAR2
storage_connection_string: $VAR3
targets_input:
azure_blob_storage:
container: $VAR1-$ID
prefix: $VAR2
storage_connection_string: $VAR3
processors:
- mapping: 'root.name = @blob_storage_key'
`
integration.StreamTests(
integration.StreamTestOpenCloseIsolated(),
integration.StreamTestStreamIsolated(10),
).Run(
t, template,
integration.StreamTestOptVarOne(dummyContainer),
integration.StreamTestOptVarTwo(dummyPrefix),
integration.StreamTestOptVarThree(connString),
)
})

t.Run("blob_storage_append", func(t *testing.T) {
template := `
output:
Expand Down
Loading

0 comments on commit 3f4afe2

Please sign in to comment.