In the spring of 2021 I was working on a provincial analytics platform, and a deceptively simple question landed on my desk: for every confirmed COVID-19 case in British Columbia, which hospital is nearest, and how far is it? Public-health planners wanted it to model load on emergency departments and to reason about access in the rural interior, where "nearest" can mean a two-hour drive. Sounds like one line of SQL. It is not. You have on the order of a few hundred thousand case points (anonymized to locality centroids), a couple hundred hospital and health-facility points, and the brutal arithmetic of a naive solution: compare every case to every facility and you've signed up for tens of millions of distance calculations — and that's the easy version, before anyone asks for the nearest three facilities or wants it re-run nightly. This is a k-nearest-neighbour spatial join, and doing it well on Apache Spark is a genuinely interesting problem.
This is the story of how I built it on Spark 3.1 running on AWS EMR, which geospatial extension I picked and why, and the spatial-partitioning and cluster-tuning work that took it from "times out" to "done before my coffee's cold."
Why a spatial join is not a normal join
A normal join matches rows on equality: a.id = b.id. Spark is brilliant at this — it hashes both sides on the key, shuffles matching keys to the same partition, and joins locally. The whole machine is built around equality of a key.
A spatial join has no such key. The predicate is geometric — "within distance d", "contains", "intersects", "is the closest of all candidates." There is nothing to hash. Two points that are 100 metres apart have latitude/longitude values that look nothing alike to a hash function, so Spark's default toolkit gives you exactly one honest option: the Cartesian product. Cross-join every case against every facility, compute the distance for each pair, and keep the minimum per case. With 300,000 cases and 200 facilities that's 60 million distance computations — survivable, but it scales as the product of both sides, so the moment someone hands you all of Canada it falls over. And a Cartesian join in Spark means shuffling and materializing that whole product, which is where the out-of-memory errors live.
The entire craft of geospatial-at-scale is about avoiding the Cartesian product — using the one thing geometry gives you that hashing throws away: locality. Points that are near each other in the world should be near each other in the cluster, on the same partition, so each task only ever compares things that could plausibly be close.
The geospatial extension landscape in 2021
Spark has no native geometry type, so you reach for an extension. In mid-2021 these were the real options, and I evaluated all of them before committing.
| Library | What it is | State in 2021 | Best for |
|---|---|---|---|
| Apache Sedona (formerly GeoSpark) | Spatial RDD + Spatial SQL on Spark; spatial partitioning, R-tree/quad-tree indexes, KNN and range/distance joins | Just entered the Apache incubator; 1.0.0 released early 2021. Active, broad geometry support | General-purpose spatial joins at scale — the default choice |
| GeoMesa | Spatio-temporal indexing layer over big-data stores (Accumulo, HBase, Cassandra) with a Spark module | Mature, powerful, but oriented around its own datastore + space-filling-curve indexes | Persistent spatio-temporal databases and time-series geospatial |
| Esri GIS Tools for Hadoop | Esri's geometry library + Hive/Spark bindings (the esri-geometry-api) | Solid geometry primitives, but the Hadoop/Hive-era tooling felt dated; thinner Spark-SQL story | Shops already standardized on Esri/ArcGIS who need format compatibility |
| Magellan | Spark-native geospatial with a clever query-optimizer integration | Promising design but effectively stalled — little activity, lagging Spark versions | Hard to recommend by 2021; I ruled it out on maintenance risk |
| Uber H3 | Hierarchical hexagonal grid; a point-to-cell indexing scheme, not a join engine | Stable, language bindings everywhere; a building block rather than a framework | Bucketing, aggregation, and approximate proximity via cell adjacency |
I chose Apache Sedona. Two reasons. First, it gives you a real spatial-join engine: it partitions data spatially, builds local spatial indexes, and runs distance/KNN joins without you hand-rolling the partitioning. Second, it was clearly where the community momentum was heading — GeoSpark renaming to Sedona and joining Apache was the signal that it would be the maintained, standard choice (which is exactly how it played out). Esri and GeoMesa are excellent in their lanes, but neither lane was mine. Magellan I ruled out because betting a production pipeline on a stalled library is how you inherit an unmaintainable mess. H3 I kept in my back pocket as a complement, not a replacement — more on that below.
A note on naming. If you read code or Stack Overflow answers from this era you'll see GeoSpark and Sedona used interchangeably, and imports under both org.datasyslab.geospark and org.apache.sedona. It's the same project mid-rename. New work should target the Sedona packages.
How a distributed KNN join actually works
The trick that makes a spatial join tractable is the same one that makes any Spark join tractable: get the rows that need to meet onto the same partition, then do cheap local work. For a spatial join that means three stages — partition by space, index within each partition, then join only co-located candidates.
graph TD
A["Cases (~300k points)
+ Facilities (~200 points)"]
B["1. Spatial partitioning
(KDB-tree / quad-tree)
nearby points to same partition"]
C["2. Build local index
(R-tree per partition)
on the facility side"]
D["3. Per-partition KNN
each case queries the
local R-tree, not all facilities"]
E["Boundary handling
expand search radius across
adjacent partitions"]
F["Result: case to nearest
facility + distance"]
A --> B --> C --> D --> E --> F
The three moves that turn an O(cases × facilities) Cartesian product into something near-linear: partition by location so comparisons stay local, build an R-tree per partition so each lookup is logarithmic instead of a full scan, and carefully handle partition boundaries so a case near an edge can still find a facility that landed in the neighbouring partition. Sedona orchestrates all three; the boundary handling is the part naive home-grown solutions always get wrong.
Spatial partitioning is the heart of it. Sedona samples the data, builds a tree (a KDB-tree or quad-tree) that carves the map into regions each holding roughly equal numbers of points, and repartitions both datasets into those regions. Now a task working on the Vancouver region holds Vancouver cases and Vancouver facilities — and never wastes a cycle comparing a Vancouver case to a Prince George hospital.
Local indexing handles the within-partition work. Inside each partition Sedona builds an R-tree over the facility points. An R-tree is a balanced tree of nested bounding rectangles; a nearest-neighbour query descends it in roughly logarithmic time instead of scanning every facility. So within a partition holding, say, 30 facilities and 40,000 cases, each case does a handful of comparisons, not 30 — and definitely not 200.
Boundary handling is the subtle, essential bit. A case sitting right at the edge of a partition might have its true nearest facility just across the line, in the next partition. A correct KNN join must expand the search beyond the home partition until it can prove it has found the real nearest neighbour. This is precisely the logic you do not want to write yourself, and it's the strongest argument for using a real library over a clever GROUP BY.
Writing the join with Sedona
Once you understand the model, the code is almost anticlimactic. Register Sedona's types and functions on the Spark session, load both datasets, turn lat/long into geometries, and let Spatial SQL do the work. Here's the shape of it in PySpark.
from sedona.register import SedonaRegistrator
from sedona.utils import SedonaKryoRegistrator, KryoSerializer
spark = (SparkSession.builder
.appName("bc-nearest-hospital")
.config("spark.serializer", KryoSerializer.getName)
.config("spark.kryo.registrator", SedonaKryoRegistrator.getName)
.getOrCreate())
SedonaRegistrator.registerAll(spark) # adds ST_* functions + spatial join rules
cases = (spark.read.parquet("s3://.../cases/")
.selectExpr("case_id", "ST_Point(CAST(lon AS Decimal(24,20)), CAST(lat AS Decimal(24,20))) AS geom"))
facilities = (spark.read.parquet("s3://.../facilities/")
.selectExpr("facility_id", "name", "ST_Point(lon, lat) AS geom"))
cases.createOrReplaceTempView("cases")
facilities.createOrReplaceTempView("facilities")
For a pure "within X kilometres" question, a distance join is the cleanest expression — Sedona recognizes the ST_Distance predicate and triggers the spatial join strategy rather than a Cartesian product:
-- facilities within ~10 km of each case (degrees ~ 0.09 at this latitude)
SELECT c.case_id, f.facility_id, ST_Distance(c.geom, f.geom) AS dist_deg
FROM cases c, facilities f
WHERE ST_Distance(c.geom, f.geom) < 0.09
But "within 10 km" isn't the question — "the single nearest, however far" is, and in the BC interior the nearest hospital can be well beyond any fixed radius. For true KNN I leaned on Sedona's spatial-join + a windowed rank, computing distance only for the already-localized candidate pairs and keeping the closest per case:
WITH ranked AS (
SELECT c.case_id, f.facility_id, f.name,
ST_DistanceSphere(c.geom, f.geom) AS metres,
ROW_NUMBER() OVER (PARTITION BY c.case_id
ORDER BY ST_DistanceSphere(c.geom, f.geom)) AS rn
FROM cases c, facilities f
WHERE ST_Distance(c.geom, f.geom) < 1.0 -- generous candidate radius in degrees
)
SELECT case_id, facility_id, name, metres
FROM ranked
WHERE rn = 1
Two things worth calling out. The WHERE ST_Distance < 1.0 isn't the answer — it's a candidate filter that lets the spatial join prune the cross product down to plausible pairs; the ROW_NUMBER then picks the true nearest among them. And distance comes in two flavours: planar ST_Distance (fast, in degrees, fine for ranking nearby points) versus ST_DistanceSphere (great-circle metres, what you report to humans). I rank with the cheap one and report the accurate one.
The trap that cost me an afternoon: latitude is not longitude. A degree of latitude is ~111 km everywhere, but a degree of longitude shrinks as you go north — at BC's ~50°N it's only ~72 km. So a "1 degree" candidate box is not square on the ground, and a fixed-radius filter that's generous near the US border can be too tight up north. For correctness I either work in metres with ST_DistanceSphere or reproject to a metre-based BC-appropriate CRS (BC Albers, EPSG:3005) before doing planar math. Mixing degrees and metres is the single most common geospatial bug, and it fails silently — wrong answers, no error.
Where H3 earned its place
I said I kept Uber's H3 in my back pocket. Here's where it came out. For the cases where I needed approximate, blisteringly fast bucketing — "roughly how many cases sit near each facility's catchment" for a dashboard, not a precise distance — H3 is perfect. H3 indexes every point to a hexagonal cell ID at a chosen resolution, so a spatial proximity question collapses into a plain equality join and group-by on the cell ID. No spatial partitioning, no R-tree, no boundary logic — just a string key Spark already knows how to shuffle.
# approximate bucketing: assign each point an H3 cell, then it's a normal join
from h3 import geo_to_h3
to_cell = udf(lambda lat, lon: geo_to_h3(lat, lon, 7), StringType()) # res 7 ~ 5 km hexes
cases_h = cases_raw.withColumn("h3", to_cell("lat", "lon"))
fac_h = fac_raw.withColumn("h3", to_cell("lat", "lon"))
# now a hash join on h3 + neighbour cells, no Cartesian product
The mental model I settled on: H3 for approximate aggregation, Sedona for exact distance. H3 trades precision (everything in a cell is "the same place") for the ability to use Spark's native, cheap equality join. When the planners needed exact metres to the nearest ED, Sedona; when they needed a fast heatmap, H3. Using the right one for each question is most of the performance battle.
Tuning the EMR cluster for this workload
The algorithm gets you correctness and rough scalability; the cluster config gets you the runtime. This is a join-and-shuffle workload with one big side and one tiny side, plus index-building that's memory-hungry, so the tuning is specific.
Right-size the cluster and the executors
I ran on memory-optimized instances (r5 family) because spatial indexing and the boundary search hold a lot of geometry objects in memory. The mistake I see most often is one fat executor per node hoarding all the cores; the JVM garbage-collects poorly at huge heaps and a single slow task stalls the stage. I went the other way: several medium executors per node — 5 cores each, ~18–20 GB — which keeps GC pauses short and gives the scheduler more, smaller tasks to balance.
spark.executor.cores = 5
spark.executor.memory = 18g
spark.executor.memoryOverhead = 3g ; geometry/JNI buffers live off-heap
spark.serializer = org.apache.spark.serializer.KryoSerializer
spark.sql.shuffle.partitions = 400 ; tuned to total cores, not the 200 default
Fix the partition count and the skew
The default spark.sql.shuffle.partitions = 200 is almost never right. Too few and each task is huge and spills; too many and you drown in scheduling overhead. I sized it to a small multiple of total executor cores so every core stays busy without tiny-task overhead.
Skew is the bigger enemy here, and it's intrinsic to geography. Spatially partition a map of BC and the Lower Mainland — Vancouver — holds a massive share of the cases while a partition over the northern interior holds almost none. That's textbook data skew: one or two tasks run for minutes while the rest finished long ago. Sedona's tree-based partitioner helps because it splits dense regions more finely (that's the point of a KDB-tree — equal counts, not equal area), but I still watched the Spark UI for a long-tail task and, when I found one, increased the partitioning granularity so dense urban regions got carved into more, smaller pieces.
Broadcast the small side. With only ~200 facilities, the facility dataset is tiny — kilobytes. For some variants of the query the fastest path isn't a spatial-partitioned join at all: broadcast the facilities to every executor so each case partition has the full facility set locally and can build one R-tree per partition with zero shuffle of the big side. Measure both. For a small-fixed-set "nearest facility" problem, broadcast + local index often beats a full spatial repartition, because you skip shuffling 300k points entirely.
Read less, and read it columnar
Half of "slow Spark" is really "slow I/O." I stored both datasets as Parquet on S3 and projected only the columns the join needs — case_id, lat, lon — so Spark never reads the dozens of other case attributes. On EMR I enabled the EMRFS S3-optimized committer to avoid the rename storms that the default Hadoop commit protocol causes on S3 (S3 has no atomic rename, so the naive committer copies every output file — brutal on a large write). Reading narrow and writing through the right committer shaved more wall-clock time than any algorithmic tweak.
Let the catalyst help — and know its limits
Spark 3.0 brought Adaptive Query Execution, and on EMR's Spark 3.1 I turned it on (spark.sql.adaptive.enabled=true). AQE coalesces too-small shuffle partitions and re-optimizes joins at runtime from real statistics — genuinely helpful for the non-spatial parts of the pipeline (the joins back to case demographics, the aggregations). But be clear-eyed: AQE optimizes Spark's relational plan; it doesn't understand ST_Distance, so it can't rescue a query that fell back to a Cartesian product. The spatial-join win comes from Sedona's planner rules and your partitioning, not from AQE. AQE cleans up around the edges.
Best practices I'd hand to the next team
- Never let it become a Cartesian product. Check the physical plan (
.explain()) for aBroadcastNestedLoopJoinor Cartesian operator on the spatial predicate — if it's there, yourST_*predicate isn't triggering the spatial strategy, and no cluster size will save you. - Pick one coordinate system and commit. Decide degrees-for-ranking vs metres-for-reporting (or reproject to EPSG:3005) and write it down. Most geospatial bugs are unit/CRS mismatches that fail silently.
- Watch for geographic skew in the Spark UI. Cities create hot partitions. A tree-based spatial partitioner mitigates it; raise granularity until the longest task isn't 10× the median.
- Broadcast tiny reference sets. Hospitals, stores, depots — a few hundred points belong in a broadcast, not a shuffle.
- Use H3 for approximate, Sedona for exact. Don't pay for precise distance when a hex-cell group-by answers the actual question.
- Store columnar, project early, commit safely. Parquet + column projection + the S3-optimized committer on EMR. I/O is half the battle.
What to carry away
A spatial join is hard for one reason: there's no equality key to hash on, so the default is a Cartesian product that scales as the product of both sides. Every technique that follows — spatial partitioning to keep nearby points co-located, per-partition R-tree indexes for logarithmic lookups, careful boundary handling for points near a partition edge — exists to dodge that product and exploit locality instead. Apache Sedona packages all of it and was, in 2021, the clear maintained choice over GeoMesa (datastore-oriented), Esri's Hadoop-era tooling, and the stalled Magellan, with Uber H3 as the right complement for approximate, equality-joinable bucketing.
The cluster tuning mattered as much as the algorithm: memory-optimized instances, several medium executors instead of one giant one, a shuffle-partition count sized to cores, broadcasting the tiny facility set, and ruthless column projection on Parquet. Get the spatial strategy to fire, kill the skew, and read less — and a problem that started as 60 million blind distance calculations finishes in minutes, telling a public-health planner exactly how far each community sits from care.