# Real-Time FHIR into Snowflake: Subscriptions, Snowpipe Streaming, Dynamic Tables

A nightly batch is fine for a population-health report and useless for a sepsis-risk model. When a clinician wants the dashboard to reflect a lab result that posted four minutes ago, the gap between "the EHR knows" and "the warehouse knows" has to shrink from hours to seconds — and in healthcare, the data crossing that gap is **FHIR**, the HL7 interoperability standard. I've built this path more than once, and the modern shape of it is clean: a FHIR Subscription pushes each new resource the moment it's created, an ingestion service feeds it (optionally through Kafka) into **Snowpipe Streaming**, and Snowflake's own **Streams, Tasks, and Dynamic Tables** refine raw FHIR JSON into typed clinical facts. This is that reference architecture, end to end, with the decisions that actually matter.

The mental model before the parts: **FHIR resources arrive as semi-structured events, land raw in Snowflake within seconds, and are refined in-warehouse into a clinical model** — push-based ingestion plus declarative in-database transformation, with no nightly extract anywhere in sight.

## The source: FHIR Subscriptions

A **FHIR Subscription** is the standard's push mechanism: you register interest in a kind of change — "notify me whenever a new `Observation` with a particular code is created for any patient" — and the FHIR server delivers a notification when a matching resource changes. The modern R5 model is *topic-based*: a `SubscriptionTopic` defines the triggering criteria, and subscribers attach to it, which scales far better than each subscriber crafting bespoke filters. Delivery is typically a **rest-hook** (the server POSTs to your webhook URL) and can be configured to send the full resource or just a notification you then fetch.

This matters because it's the difference between *polling* the FHIR API on a timer (load on the EHR, latency bounded by the interval, and you re-pull unchanged data) and being *pushed* exactly the changes as they happen. Subscriptions are how you get clinical events at the speed clinical decisions need them.

## The end-to-end pipeline

```mermaid
graph TD
    EHR["FHIR server / EHR"]
    SUB["FHIR Subscription(topic-based, rest-hook)"]
    RCV["Ingestion service(webhook receiver)"]
    KAFKA["Kafka (optional buffer)decouple, replay, fan-out"]
    SP["Snowpipe Streaming(row-level, sub-10s)"]
    BRONZE[("BRONZE: raw_fhirVARIANT + load metadata")]
    XFORM["Streams + TasksOR Dynamic Tables"]
    SILVER[("SILVER: typed resourcesobservations, encounters")]
    GOLD[("GOLD: patient marts,risk features")]
    CONSUME["Dashboards / ML / alerts"]
    EHR --> SUB --> RCV
    RCV --> KAFKA --> SP
    RCV -.->|"or direct"| SP
    SP --> BRONZE --> XFORM --> SILVER --> GOLD --> CONSUME
          
```

The full path. A FHIR Subscription pushes changed resources to a webhook receiver, which writes them — directly, or through a Kafka buffer — to Snowpipe Streaming, landing each resource as a raw VARIANT row in bronze within seconds. From there it's pure Snowflake: Streams + Tasks or Dynamic Tables parse the FHIR JSON into typed silver tables (one per resource type) and then into gold patient marts and model features. The dotted line is the real decision — whether Kafka belongs between the receiver and Snowpipe at all.

## Landing raw FHIR with Snowpipe Streaming

The ingestion target is a bronze table that stores the FHIR resource as a **VARIANT** — Snowflake's semi-structured type — plus load metadata. You do *not* try to parse FHIR at ingest time; FHIR resources are deeply nested, extension-laden, and evolve, so landing them whole and parsing in-warehouse keeps ingestion simple and replayable.

```sql
-- BRONZE: raw FHIR resources land here as VARIANT, one row per notification
CREATE OR REPLACE TABLE bronze.raw_fhir (
    resource        VARIANT,                       -- the full FHIR resource JSON
    resource_type   STRING,                         -- Observation, Encounter, ...
    resource_id     STRING,                          -- FHIR logical id
    version_id      STRING,                          -- meta.versionId
    last_updated    TIMESTAMP_NTZ,                   -- meta.lastUpdated
    _ingested_at    TIMESTAMP_NTZ DEFAULT current_timestamp(),
    _offset_token   STRING                           -- for exactly-once dedup
);
```

**Snowpipe Streaming** writes rows into this table with low latency (seconds, not the minutes of file-based Snowpipe) and — critically for clinical data — supports **exactly-once** semantics through per-channel offset tokens: the ingestion client records how far it has committed, so on a restart it resumes without duplicating or dropping rows. I cover the client side of this in depth in the [Snowpipe Streaming SDK](snowpipe-streaming-sdk) piece; here the point is that the bronze table receives a faithful, deduplicated stream of raw resources. (For the broader real-time Snowflake context, see [real-time Snowflake on AWS](snowflake-realtime-aws).)

## The Kafka question: when it earns its place

**Kafka between the webhook and Snowpipe is sometimes essential and sometimes pure overhead — decide on purpose, not by reflex.** Going *direct* (webhook receiver → Snowpipe Streaming) is simpler, cheaper, and lower-latency, and it's the right default for a single FHIR source feeding one warehouse. Add [Kafka](kafka-production-pipeline-patterns) when you have a concrete reason: you need a durable buffer so a Snowflake hiccup or maintenance window doesn't drop notifications the FHIR server won't redeliver; you need to *fan out* the same clinical events to several consumers (the warehouse, a real-time alerting service, an ML feature pipeline); you need replay to rebuild downstream from history; or the notification volume is bursty enough that you want backpressure decoupling between an unpredictable source and steady ingestion. If none of those apply, Kafka is a cluster to operate for no benefit. The honest test: name the specific failure or requirement Kafka solves for *this* pipeline — if you can't, go direct.

## Refining FHIR in-warehouse: Streams + Tasks, or Dynamic Tables

Once raw resources are in bronze, the refinement is pure Snowflake, and you have two idioms. Both turn the raw VARIANT into typed, queryable clinical tables; they differ in whether you write the orchestration or declare the result.

### Parsing the FHIR JSON

FHIR's nesting is handled with Snowflake's semi-structured access — dot/bracket navigation and `LATERAL FLATTEN` for repeating elements. An `Observation`, for instance, carries its code, value, subject reference, and effective time at known JSON paths:

```sql
-- SILVER: typed observations parsed from the raw VARIANT
CREATE OR REPLACE DYNAMIC TABLE silver.observations
  TARGET_LAG = '1 minute'
  WAREHOUSE = transform_wh
AS
SELECT
    resource:id::string                                   AS observation_id,
    resource:meta.versionId::string                       AS version_id,
    resource:subject.reference::string                    AS patient_ref,
    resource:code.coding[0].code::string                  AS loinc_code,
    resource:code.coding[0].display::string               AS observation_name,
    resource:valueQuantity.value::float                   AS value_num,
    resource:valueQuantity.unit::string                   AS unit,
    resource:effectiveDateTime::timestamp_ntz             AS effective_at,
    last_updated
FROM bronze.raw_fhir
WHERE resource_type = 'Observation'
QUALIFY row_number() OVER (PARTITION BY resource:id::string
                           ORDER BY last_updated DESC) = 1;   -- keep latest version
```

That `DYNAMIC TABLE` with a `TARGET_LAG` is the declarative option: you state the query and how fresh the result must be, and Snowflake incrementally maintains it as bronze changes — no orchestration code, near-real-time silver. The imperative alternative uses a **Stream** (change capture on bronze) plus a **Task** (scheduled or triggered transform) when you need procedural control, multi-statement logic, or explicit ordering.

|  | Streams + Tasks | Dynamic Tables |
| --- | --- | --- |
| Model | Imperative — you write the transform & schedule | Declarative — state the query + target lag |
| Change tracking | Stream = CDC offset on the source | Managed by Snowflake automatically |
| Best for | Multi-step logic, calls to procedures/UDFs, fine control | Straightforward SQL transforms that must stay fresh |
| Operational burden | You own the DAG and error handling | Snowflake maintains it; less to manage |

A Stream is conceptually the same idea as [log-based CDC](debezium-cdc) — it exposes exactly the rows that changed since you last consumed it — which is what lets a Task process only new resources rather than rescanning bronze. My default for FHIR refinement is Dynamic Tables for the straightforward type-and-flatten layers (they're declarative and stay fresh on their own) and Streams + Tasks for the steps that need procedural logic, such as resolving patient references across resource types or invoking a tokenization UDF.

## Idempotency and ordering: the clinical-correctness essentials

Two properties are non-negotiable when the data is clinical. **Idempotency**: a FHIR resource has a stable logical `id` and a `meta.versionId` / `meta.lastUpdated`, so even if a notification is delivered twice (webhooks retry, Kafka is at-least-once, Snowpipe restarts), you dedup on id and keep the latest version — the `QUALIFY row_number()` above is exactly this, applied at the silver layer. **Ordering**: an `Observation` can be created, then amended or corrected minutes later; your model must reflect the *latest* version, not whichever arrived last, which is why you sort by `lastUpdated` rather than ingestion time. Out-of-order and duplicate delivery are the normal case in push-based pipelines, not the exception — design for them from the start.

## PHI: this is regulated data the whole way down

**Treat the pipeline as PHI-bearing at every hop and bake the controls in, don't bolt them on.** The raw FHIR resource is loaded with identifiers, so: tokenize or mask direct identifiers as early as the silver layer (a Snowflake masking policy or a tokenization UDF), keep the bronze raw zone tightly access-controlled and audited, encrypt in transit on the webhook and Kafka hops, and tag PHI columns so governance policies apply by classification rather than by hand. If you're routing through Kafka, that topic carries PHI too — same encryption and access discipline as the warehouse. The techniques are the subject of [PII, tokenization, and privacy-preserving analytics](pii-tokenization-privacy-analytics), and the governance posture is the one I describe for [regulated AI in healthcare](regulated-ai-healthcare): the point is that "real-time" never means "skip the controls" — a fast pipeline that leaks PHI is a breach that happens faster.

## Honest trade-offs

A few realities to set expectations. This is **near-real-time, not real-time**: Snowpipe Streaming lands data in seconds and Dynamic Tables refresh on a target lag, so end-to-end you're looking at seconds-to-a-minute, which is right for dashboards, alerting, and feature freshness but not for sub-second control loops. **FHIR parsing is genuine work**: resources are deeply nested with optional extensions and code systems, so the silver layer that maps FHIR to a clean clinical model is where most of the engineering effort lives — budget for it. And **cost is continuous**: streaming ingestion plus always-maintained Dynamic Tables means compute runs continuously rather than in a nightly burst, so size the transform warehouse and target lags deliberately — a one-minute lag costs more than a fifteen-minute one, and not every table needs to be that fresh.

## What to carry away

Streaming FHIR into Snowflake has a clean modern shape: FHIR Subscriptions push changed resources to a webhook receiver; Snowpipe Streaming lands them — directly, or through a Kafka buffer when you have a concrete need for durability, fan-out, or replay — as raw VARIANT rows in bronze within seconds, with exactly-once via offset tokens; and Snowflake's own Streams + Tasks or Dynamic Tables refine that raw JSON into typed silver clinical tables and gold patient marts, no external transform engine required.

The decisions that determine whether it works in production: go direct unless you can name what Kafka solves; prefer Dynamic Tables for declarative type-and-flatten layers and Streams + Tasks where you need procedural control; dedup on FHIR `id` + `versionId` and order by `lastUpdated` because duplicate and out-of-order delivery are normal; and treat the data as PHI at every hop, applying masking and access controls from the silver layer down. Get those right and a lab result that posts at 9:04 is in the risk model by 9:05 — which, in clinical analytics, is the entire point.
