# The Snowpipe Streaming SDK: Channels, Offset Tokens, and Exactly-Once Ingest

For most of Snowflake's life, "loading data" meant the same dance: write your rows into files, drop the files on a stage, and let `COPY` or file-based Snowpipe ingest them. It's a great model for batch — and a terrible one for an event stream, where you either buffer rows into files (and accept minutes of latency plus a swarm of tiny files) or fight the small-files problem forever. **Snowpipe Streaming** deletes the files entirely: a client SDK opens a connection to a table and appends rows over the network, and Snowflake makes them queryable within seconds. No stage, no `COPY`, no file management. This is the developer's-eye view of that SDK — the concepts that make it exactly-once and the patterns that make it fast.

Two concepts carry the whole model, so I'll put them up front: a **channel** is a streaming connection to one table and the unit of ordering and parallelism; an **offset token** is a marker you attach to your data so that after any crash you can ask Snowflake "what's the last thing you committed?" and resume exactly there. Channels give you throughput; offset tokens give you exactly-once. Everything else is detail.

## Why files were the bottleneck

```mermaid
graph TD
    subgraph FILE["File-based Snowpipe"]
        P1["Producer buffers rows"] --> F1["Write files to a stage"]
        F1 --> F2["Snowpipe detects + COPY"]
        F2 --> T1[("Table — minutes later,many small files")]
    end
    subgraph STREAM["Snowpipe Streaming"]
        P2["Producer (SDK)"] --> CH["open channel to table"]
        CH --> RR["insert rows over network"]
        RR --> T2[("Table — seconds later,no files, columnar server-side")]
    end
          
```

The shift. File-based Snowpipe makes you serialize rows into files on a stage, then waits for detection and a COPY — minutes of latency and a small-files problem to manage. Snowpipe Streaming's SDK opens a channel and appends rows directly over the network; Snowflake handles the columnar conversion server-side and the rows are queryable in seconds. For a continuous event stream, the file step was always the wrong abstraction.

## Channels: the unit of ordering and parallelism

A **channel** is a logical, ordered connection from one client to one table. Rows you insert on a channel are committed in order, and a channel is owned by exactly one writer at a time. This gives you two things at once. **Ordering**: within a channel, order is preserved — so if you map one source partition (a Kafka partition, a shard, a device) to one channel, that source's events stay in order. **Parallelism**: you scale throughput by opening *many* channels — to the same table or across tables — and writing to them concurrently, the same way you'd use many partitions. The design question for any Snowpipe Streaming app is "what's my channel key?" — usually it mirrors your upstream partitioning so ordering and parallelism line up.

## Offset tokens: exactly-once, by reconciliation

This is the concept people most often skip and most often regret skipping. Each channel persists, on the Snowflake side, the **latest offset token** it has durably committed. An offset token is just a string *you* supply that marks a position in your source (a Kafka offset, a sequence number, a file+line). The contract is simple and powerful: when you (re)open a channel, you can read back the last committed token, and you replay your source *from just after it*. So:

- If your producer crashes after inserting rows but before you recorded success, the rows may or may not have committed — but on restart you ask the channel for its latest committed token, and you know exactly where to resume.

- Because you resume from the committed position, you never skip rows (no loss) and never re-insert already-committed rows (no duplicates) — **exactly-once**, achieved by reconciliation rather than by distributed transactions.

```mermaid
graph TD
    START["Producer (re)starts"]
    GET["Open channel,read latest committed offset token"]
    SEEK["Seek source to token + 1"]
    SEND["insert rows with new offset tokens"]
    POLL["Poll: latest committed token advances"]
    CRASH["Crash anytime?"]
    START --> GET --> SEEK --> SEND --> POLL
    POLL --> SEND
    CRASH -.->|"resume safely"| GET
          
```

The exactly-once loop. On every start (including after a crash), the producer reads the channel's latest committed offset token, seeks its source to just past that point, and resumes inserting — tagging rows with new tokens as it goes. Because the resume point is always the durably-committed token, no row is skipped or duplicated. The offset token is the entire mechanism; without it you get at-least-once and must dedup downstream.

## Writing rows: the Python client

The SDK started on the JVM (Java/Scala) and the ecosystem has since added a **Python** client and a **Go** path, so you can produce from the runtime your services already use. The shape is the same everywhere: authenticate, open a client, open a channel to `db.schema.table`, insert rows (each tagged with an offset token), and check the committed offset. In Python:

```python
# Snowpipe Streaming — append rows directly to a table, exactly-once via offset tokens
client = StreamingIngestClient(account=ACCOUNT, user=USER, private_key=KEY)
channel = client.open_channel(
    db="CLINICAL", schema="BRONZE", table="RAW_EVENTS",
    channel_name="device-shard-07",          # one channel per source partition
)

# on (re)start: resume from what Snowflake durably committed
last = channel.get_latest_committed_offset_token()   # e.g. "kafka:part7:120455" or None
source.seek_after(last)

for batch in source.read_batches():
    rows = [{"event_id": e.id, "payload": e.json, "ts": e.ts} for e in batch]
    channel.insert_rows(rows, offset_token=f"kafka:part7:{batch.last_offset}")

# commit is async; confirm durability before advancing the source bookmark
channel.wait_for_commit(timeout=30)
```

The Go usage follows the same lifecycle — open a channel, insert rows with offset tokens, poll the committed offset — typically against the high-performance REST ingestion endpoint:

```text
// Go (sketch): same channel + offset-token lifecycle
ch, _ := client.OpenChannel(ctx, "CLINICAL", "BRONZE", "RAW_EVENTS", "device-shard-07")
last, _ := ch.LatestCommittedOffsetToken(ctx)        // resume point
src.SeekAfter(last)
for batch := range src.Batches() {
    ch.InsertRows(ctx, batch.Rows, batch.LastOffsetToken())   // tag with offset token
}
ch.WaitForCommit(ctx, 30*time.Second)
```

**If you ignore offset tokens, you have at-least-once — and you'll learn that the hard way during your first restart.** Snowpipe Streaming's exactly-once is not automatic; it's a contract you participate in by (a) tagging inserts with a monotonic offset token that maps to your source position and (b) reading the committed token on startup and resuming from it. Skip either half and a crash will either drop rows you thought committed or replay rows that already did, and you'll be back to deduping downstream. Equally important: an insert returning from the client does *not* mean the row is durable — commit is asynchronous, so you must confirm the committed offset has advanced before you move your source bookmark forward. The most common Snowpipe Streaming bug I see is advancing the source offset on insert-return instead of on commit-confirm, which silently loses data on the next crash.

## The high-performance architecture

The newer, rewritten Snowpipe Streaming (the high-performance architecture that reached GA in the mid-2020s) is worth understanding because it changes the throughput envelope. The earlier design had per-account throughput limits and a JVM-centric client; the rewrite decoupled ingestion scaling from those limits and pushed far higher per-table throughput (Snowflake has cited figures on the order of multiple GB/s per table), with server-side conversion to Snowflake's columnar format and a REST-based interface that makes non-JVM clients (Python, Go) first-class. The practical upshot: streaming ingestion is no longer a niche, latency-only feature you tiptoe around — it can be your *primary* high-volume load path, with volume-based pricing rather than warehouse-time. (For where this sits in the wider real-time Snowflake story, see [real-time Snowflake on AWS](snowflake-realtime-aws).)

## Throughput tuning and patterns

- **Batch rows per insert.** Inserting one row per call wastes the network round-trip; accumulate a batch (hundreds to thousands of rows) and insert together. Throughput lives in batch size, latency in how long you wait to fill a batch — tune the trade-off to your freshness need.

- **Channel-per-partition for parallelism + ordering.** Map each upstream partition/shard to its own channel so you get concurrency *and* per-partition ordering; more channels = more parallel throughput.

- **Design offset tokens to be monotonic and source-aligned.** A good token (e.g. `topic:partition:offset`) makes resume trivial and unambiguous; a token that doesn't map cleanly to a replayable source position defeats the whole mechanism.

- **Handle per-row errors.** A malformed row shouldn't sink a batch — the SDK surfaces row-level results so you can route bad rows to a dead-letter path and keep the channel flowing, the same discipline as a [Kafka consumer](kafka-production-pipeline-patterns).

- **Land raw, transform in-warehouse.** Insert into a raw landing table (VARIANT for semi-structured) and let Dynamic Tables / Streams + Tasks refine — keep the producer simple and the parsing in Snowflake.

### You may not need to write the producer at all

A pragmatic note: if your source is Kafka, the **Snowflake Kafka connector** can run in Snowpipe Streaming mode and do all of this for you — channels, offset tokens, exactly-once — without you writing SDK code. Reach for the SDK directly when you have a custom event source (a webhook receiver, a device gateway, an app emitting events) that isn't already in Kafka. Don't hand-build a producer for data that's already sitting in a topic a connector can drain.

## When file-based loading is still the right call

Streaming isn't a universal replacement, and choosing it for the wrong workload wastes money and effort. The honest split:

|  | Snowpipe Streaming (SDK) | File-based (COPY / Snowpipe) |
| --- | --- | --- |
| Latency | Seconds | Minutes (Snowpipe) / batch (COPY) |
| Shape of data | Continuous row/event streams | Files that already exist, large batches |
| Exactly-once | Yes, via offset tokens (you implement) | File-level dedup |
| Pricing model | Volume-based (data ingested) | Per-file compute / warehouse time |
| Best for | Real-time events, CDC, IoT, app telemetry | Big historical backfills, periodic bulk loads, existing file drops |

**Use streaming for the continuous trickle, files for the big batch — and don't force either.** A one-time 50 TB historical backfill belongs in `COPY` from staged files (cheaper, built for bulk); a never-ending feed of clinical events or clickstream belongs in Snowpipe Streaming (seconds of latency, no small-files mess). The anti-pattern in both directions: micro-batching a real-time stream into files just to use `COPY` (you re-create the small-files and latency problems streaming solved), or hammering the streaming SDK with a massive one-shot backfill (slower and pricier than a bulk file load). Match the tool to the data's *shape over time*: a trickle or a flood.

## What to carry away

Snowpipe Streaming lets a client SDK append rows directly into Snowflake tables — no files, no `COPY`, queryable in seconds — and its model rests on two concepts. A **channel** is a streaming connection to one table that preserves order and is the unit you multiply for parallelism (one channel per source partition is the workhorse pattern). An **offset token** is the marker you attach to data so that, on any restart, you read the channel's latest committed token and resume from exactly there — which is how you get exactly-once by reconciliation, and the part you must implement rather than assume.

The high-performance architecture turned this from a latency-only niche into a primary high-throughput load path with Python and Go clients and volume-based pricing. Tune throughput by batching rows and adding channels, design offset tokens that map cleanly to a replayable source position, confirm commits before advancing your bookmark, and let the Kafka connector do the work when your data is already in Kafka. And keep the boundary honest: streaming for the continuous trickle, file-based `COPY` for the big batch. Get the channel-and-token model right and you have ingestion that's both real-time and correct — which, for the pipelines that feed live dashboards and models, is exactly the combination that's hard to fake. For an end-to-end clinical use of this, see [real-time FHIR into Snowflake](fhir-streaming-snowpipe-dynamic-tables).
