Loading Data
Snowflake Streaming
SnowcatCloud loader streams events into Snowflake in real time through the Snowpipe Streaming API. Scalable, cost-efficient, and reliable.
Using Snowflake streaming loader OpenSnowcat events are loaded into Snowflake in real-time using the Snowpipe Streaming API.
Features
- Load JSON or TSV
- Automatic Schema evolution (JSON only)
- Out-of-the-box monitoring
- Kafka or Kinesis ingestion
SnowcatCloud's Snowflake Streaming Loader requires a commercial license for use.
SnowcatCloud's Snowflake Streaming Loader supports two INPUT_FORMAT
types — TSV (OpenSnowcat Enriched TSV) and JSON (FlattenedJson
) — both of which are fully supported by the OpenSnowcat Enricher.
Running
docker run \
--mount=type=bind,source=/opensnowcat/config,destination=/config \
--env STREAM_TYPE="kafka|kinesis" \
snowcatcloud/snowcatcloud-snowflake-streaming-loader:latest \
--config=/config/loader.hocon
Loader Configuration
{
// customerId and serviceId will be available as dimensions on Cloudwatch
// for monitoring.
"customerId": "snowcatcloud" // A unique string to your org name
"serviceId": "daf952f5" // A service unique reference
"region": "us-west-2" // The region for AWS SSM parameter
"input": {
"service":"kafka"
"format": "json" // JSON or TSV
"kafka": {
"topic": "enriched-good" // Enriched topic
"groupId": "igroup" // Ingestion group
"bootstrapServers":9092 // Kafka bootstrap servers
"batchSizeThreshold":500 // Max number of messages before flushing
"flushIntervalMs":5000 // Max time before flushing
"sessionTimeoutMs":45000 // Timeout for detecting consumer failures
"heartbeatIntervalMs":15000 // How often the consumer sends heartbeats.
}
}
"output": {
"account": "xxxxxxxx" // "https://xxxxxx.snowflakecomputing.com"
"user": "snowcatcloud" // Snowflake Username
"warehouse":"COMPUTE_WH" // For schema changes
"database": "RAW_DATA" // Database
"schema": "SNOWCATCLOUD" // Schema
"table": "EVENTS" // Events good or bad table (see SQL)
"privateKey": "snowflake.key" // AWS SSM Parameter
}
}