Loading Data

Snowflake Streaming

SnowcatCloud loader streams events into Snowflake in real time through the Snowpipe Streaming API. Scalable, cost-efficient, and reliable.

Using Snowflake streaming loader OpenSnowcat events are loaded into Snowflake in real-time using the Snowpipe Streaming API.

Features

  • Load JSON or TSV
  • Automatic Schema evolution (JSON only)
  • Out-of-the-box monitoring
  • Kafka or Kinesis ingestion
SnowcatCloud's Snowflake Streaming Loader requires a commercial license for use.

SnowcatCloud's Snowflake Streaming Loader supports two INPUT_FORMAT types — TSV (OpenSnowcat Enriched TSV) and JSON (FlattenedJson) — both of which are fully supported by the OpenSnowcat Enricher.

Running

docker run \
  --mount=type=bind,source=/opensnowcat/config,destination=/config \
  --env STREAM_TYPE="kafka|kinesis" \
  snowcatcloud/snowcatcloud-snowflake-streaming-loader:latest \
  --config=/config/loader.hocon

Loader Configuration

{
  // customerId and serviceId will be available as dimensions on Cloudwatch
  // for monitoring.
  "customerId": "snowcatcloud"  // A unique string to your org name
  "serviceId": "daf952f5"       // A service unique reference
  "region": "us-west-2"         // The region for AWS SSM parameter

  "input": {
    "service":"kafka"
    "format": "json"                // JSON or TSV

    "kafka": {
        "topic": "enriched-good"    // Enriched topic
        "groupId": "igroup"         // Ingestion group
        "bootstrapServers":9092     // Kafka bootstrap servers
        "batchSizeThreshold":500    // Max number of messages before flushing
        "flushIntervalMs":5000      // Max time before flushing
        "sessionTimeoutMs":45000    // Timeout for detecting consumer failures
        "heartbeatIntervalMs":15000 // How often the consumer sends heartbeats. 
    }
  }

  "output": {
      "account": "xxxxxxxx"         // "https://xxxxxx.snowflakecomputing.com"
      "user": "snowcatcloud"        // Snowflake Username
      "warehouse":"COMPUTE_WH"      // For schema changes
      "database": "RAW_DATA"        // Database
      "schema": "SNOWCATCLOUD"      // Schema
      "table":  "EVENTS"            // Events good or bad table (see SQL)
      "privateKey": "snowflake.key" // AWS SSM Parameter
  }
}
{
  // customerId and serviceId will be available as dimensions on Cloudwatch
  // for monitoring.
  "customerId": "snowcatcloud"  // A unique string to your org name
  "serviceId": "daf952f5"       // A service unique reference
  "region": "us-west-2"         // The region for AWS SSM parameter

  "input": {
    "service":"kinesis"
    "format": "json"                // JSON or TSV

    "kinesis": {
        "stream": "enriched-good"   // Enriched kinesis stream
        "startPosition":"LATEST"    // LATEST or TRIM_HORIZON
        "batchSizeThreshold":500    // Max number of messages before flushing
        "flushIntervalMs":5000      // Max time before flushing
        "maxRecordsPerFetch":500    // Max number of messages to fetch
    }
  }

  "output": {
      "account": "xxxxxxxx"         // "https://xxxxxx.snowflakecomputing.com"
      "user": "snowcatcloud"        // Snowflake Username
      "warehouse":"COMPUTE_WH"      // For schema changes
      "database": "RAW_DATA"        // Database
      "schema": "SNOWCATCLOUD"      // Schema
      "table":  "EVENTS"            // Events good or bad table (see SQL)
      "privateKey": "snowflake.key" // AWS SSM Parameter
  }
}