Skip to content

Commit

Permalink
Merge pull request #2926 from redpanda-data/snowpipe
Browse files Browse the repository at this point in the history
snowflake: introduce new output based on snowpipe streaming
  • Loading branch information
rockwotj authored Oct 23, 2024
2 parents f6c024a + 8dba019 commit 98da6ae
Show file tree
Hide file tree
Showing 29 changed files with 6,222 additions and 46 deletions.
357 changes: 357 additions & 0 deletions docs/modules/components/pages/outputs/snowflake_streaming.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,357 @@
= snowflake_streaming
:type: output
:status: experimental
:categories: ["Services"]



////
THIS FILE IS AUTOGENERATED!

To make changes, edit the corresponding source file under:

https://github.com/redpanda-data/connect/tree/main/internal/impl/<provider>.

And:

https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl
////
// © 2024 Redpanda Data Inc.
component_type_dropdown::[]
Ingest data into Snowflake using Snowpipe Streaming.
Introduced in version 4.39.0.
[tabs]
======
Common::
+
--
```yml
# Common config fields, showing default values
output:
label: ""
snowflake_streaming:
account: AAAAAAA-AAAAAAA # No default (required)
user: "" # No default (required)
role: ACCOUNTADMIN # No default (required)
database: "" # No default (required)
schema: "" # No default (required)
table: "" # No default (required)
private_key: "" # No default (optional)
private_key_file: "" # No default (optional)
private_key_pass: "" # No default (optional)
mapping: "" # No default (optional)
batching:
count: 0
byte_size: 0
period: ""
check: ""
max_in_flight: 64
```
--
Advanced::
+
--
```yml
# All config fields, showing default values
output:
label: ""
snowflake_streaming:
account: AAAAAAA-AAAAAAA # No default (required)
user: "" # No default (required)
role: ACCOUNTADMIN # No default (required)
database: "" # No default (required)
schema: "" # No default (required)
table: "" # No default (required)
private_key: "" # No default (optional)
private_key_file: "" # No default (optional)
private_key_pass: "" # No default (optional)
mapping: "" # No default (optional)
batching:
count: 0
byte_size: 0
period: ""
check: ""
processors: [] # No default (optional)
max_in_flight: 64
channel_prefix: "" # No default (optional)
```
--
======
Ingest data into Snowflake using Snowpipe Streaming.
[%header,format=dsv]
|===
Snowflake column type:Allowed format in Redpanda Connect
CHAR, VARCHAR:string
BINARY:[]byte
NUMBER:any numeric type, string
FLOAT:any numeric type
BOOLEAN:bool,any numeric type,string parsable according to `strconv.ParseBool`
TIME,DATE,TIMESTAMP:unix or RFC 3339 with nanoseconds timestamps
VARIANT,ARRAY,OBJECT:any data type is converted into JSON
GEOGRAPHY,GEOMETRY: Not supported
|===
For TIMESTAMP, TIME and DATE columns, you can parse different string formats using a bloblang `mapping`.
Authentication can be configured using a https://docs.snowflake.com/en/user-guide/key-pair-auth[RSA Key Pair^].
There are https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview#limitations[limitations^] of what data types can be loaded into Snowflake using this method.
== Performance
This output benefits from sending multiple messages in flight in parallel for improved performance. You can tune the max number of in flight messages (or message batches) with the field `max_in_flight`.
This output benefits from sending messages as a batch for improved performance. Batches can be formed at both the input and output level. You can find out more xref:configuration:batching.adoc[in this doc].
It is recommended that each batches results in at least 16MiB of compressed output being written to Snowflake.
You can monitor the output batch size using the `snowflake_compressed_output_size_bytes` metric.
== Fields
=== `account`
Account name, which is the same as the https://docs.snowflake.com/en/user-guide/admin-account-identifier.html#where-are-account-identifiers-used[Account Identifier^].
However, when using an https://docs.snowflake.com/en/user-guide/admin-account-identifier.html#using-an-account-locator-as-an-identifier[Account Locator^],
the Account Identifier is formatted as `<account_locator>.<region_id>.<cloud>` and this field needs to be
populated using the `<account_locator>` part.
*Type*: `string`
```yml
# Examples
account: AAAAAAA-AAAAAAA
```
=== `user`
The user to run the Snowpipe Stream as. See https://docs.snowflake.com/en/user-guide/admin-user-management[Snowflake Documentation^] on how to create a user.
*Type*: `string`
=== `role`
The role for the `user` field. The role must have the https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview#required-access-privileges[required privileges^] to call the Snowpipe Streaming APIs. See https://docs.snowflake.com/en/user-guide/admin-user-management#user-roles[Snowflake Documentation^] for more information about roles.
*Type*: `string`
```yml
# Examples
role: ACCOUNTADMIN
```
=== `database`
The Snowflake database to ingest data into.
*Type*: `string`
=== `schema`
The Snowflake schema to ingest data into.
*Type*: `string`
=== `table`
The Snowflake table to ingest data into.
*Type*: `string`
=== `private_key`
The PEM encoded private RSA key to use for authenticating with Snowflake. Either this or `private_key_file` must be specified.
[CAUTION]
====
This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info].
====
*Type*: `string`
=== `private_key_file`
The file to load the private RSA key from. This should be a `.p8` PEM encoded file. Either this or `private_key` must be specified.
*Type*: `string`
=== `private_key_pass`
The RSA key passphrase if the RSA key is encrypted.
[CAUTION]
====
This field contains sensitive information that usually shouldn't be added to a config directly, read our xref:configuration:secrets.adoc[secrets page for more info].
====
*Type*: `string`
=== `mapping`
A bloblang mapping to execute on each message.
*Type*: `string`
=== `batching`
Allows you to configure a xref:configuration:batching.adoc[batching policy].
*Type*: `object`
```yml
# Examples
batching:
byte_size: 5000
count: 0
period: 1s
batching:
count: 10
period: 1s
batching:
check: this.contains("END BATCH")
count: 0
period: 1m
```
=== `batching.count`
A number of messages at which the batch should be flushed. If `0` disables count based batching.
*Type*: `int`
*Default*: `0`
=== `batching.byte_size`
An amount of bytes at which the batch should be flushed. If `0` disables size based batching.
*Type*: `int`
*Default*: `0`
=== `batching.period`
A period in which an incomplete batch should be flushed regardless of its size.
*Type*: `string`
*Default*: `""`
```yml
# Examples
period: 1s
period: 1m
period: 500ms
```
=== `batching.check`
A xref:guides:bloblang/about.adoc[Bloblang query] that should return a boolean value indicating whether a message should end a batch.
*Type*: `string`
*Default*: `""`
```yml
# Examples
check: this.type == "end_of_transaction"
```
=== `batching.processors`
A list of xref:components:processors/about.adoc[processors] to apply to a batch as it is flushed. This allows you to aggregate and archive the batch however you see fit. Please note that all resulting messages are flushed as a single batch, therefore splitting the batch into smaller batches using these processors is a no-op.
*Type*: `array`
```yml
# Examples
processors:
- archive:
format: concatenate
processors:
- archive:
format: lines
processors:
- archive:
format: json_array
```
=== `max_in_flight`
The maximum number of messages to have in flight at a given time. Increase this to improve throughput.
*Type*: `int`
*Default*: `64`
=== `channel_prefix`
The prefix to use when creating a channel name.
Duplicate channel names will result in errors and prevent multiple instances of Redpanda Connect from writing at the same time.
By default this will create a channel name that is based on the table FQN so there will only be a single stream per table.
At most `max_in_flight` channels will be opened.
NOTE: There is a limit of 10,000 streams per table - if using more than 10k streams please reach out to Snowflake support.
*Type*: `string`
Loading

0 comments on commit 98da6ae

Please sign in to comment.