2026-06-22

Improving A Naive ETL Approach With Simple Domain Knowledge

The Context

It's 2021 and I'm working with a small team at a pharma client to productionize the ETL pipeline for a knowledge graph. We've trudged through lines of untyped python, untangled the web of intermediate outputs whose state seemed to change with each pass, and finally have it running end-to-end on pyspark and airflow.

The client knew the original codebase worked and also acknowledged it was a big bowl of spaghetti (hence the productionization work). At this point we turned our attention to opportunities for optimization. We knew there were more than a few as we had taken pains to match the original intended ETL logic to minimize output drift. Broadly speaking, the pipeline could be divided into 5-6 distinct stages. One end-to-end run of all stages took nearly 48 hours, depending on how much new data was processed.

The Problem

Most of the pipeline's wall clock time (36 of the ~48 hours) was spent in a bottleneck stage: Determining which resolved entities were related to each other by which documents they both appeared in. Conceptually this required comparing each entity with every other entity, meaning if we had N entities, we would then have to run N×N comparisons. Thus a cross join matched closest to the original logic. Practically however, the data contained on the order of 107 entities, implying 1014 comparisons!

A Naive Approach

# Entity IDs each had a list of documents they appeared in
overlap = (
    nodes.select(
        F.col("entity_id").alias("entity_id_1"),
        F.col("doc_ids").alias("doc_ids_1")
    )
    # Cartesian product (!)
    .crossJoin(
        nodes.select(
            F.col("entity_id").alias("entity_id_2"),
            F.col("doc_ids").alias("doc_ids_2")
        )
    )
    # See you in 36 hours...
    .withColumn(
        "n_shared_docs",
        F.size(F.array_intersect("doc_ids_1", "doc_ids_2"))
    )
)

Simple Domain Knowledge

The documents in this case were scientific abstracts. We knew that none of them individually could be referencing every single one of the 107 entities in our graph. In other words, the pairwise comparison matrix was likely sparse. Intuitively this meant the naive implementation was performing at least N−1 (and at most N2−N) calls to F.array_intersect() that would return no intersection at all.

Knowing this, wouldn't it be great if we only ran the comparisons that would actually yield an intersection?

An Informed Approach

A simple way around this is to explode out every entity → document relation, inner join on document, then group by entity pairs. This avoids the need for a cross join and only counts shared documents for entity pairs that actually share them.

overlap = (
    nodes.select(
        F.col("entity_id").alias("entity_id_1"),
        F.explode("docs").alias("d_id")
    )
    # Only performs comparisons where there is actually an overlap
    .join(
        nodes.select(
            F.col("entity_id").alias("entity_id_2"),
            F.explode("docs").alias("d_id")
        ),
        how="inner",
        on="d_id"
    )
    # Group-by & count now implicitly handle array intersect
    .groupBy(["entity_id_1", "entity_id_2"])
    .agg({"d_id": "n_shared_docs"})
)

The above completed the same overlap calculation in just 46 minutes on our dataset. This significant reduction implies the naive approach was running far more comparisons than necessary.

Hindsight and Costs

Is this optimization obvious in hindsight? Of course. The informed approach is simple and leverages very simple domain knowledge. In my experience these kinds of naive approaches are common and well-intended, but lack a larger perspective, often by no fault of their own. In this particular case, each piece of the pipeline was migrated and kept faithful to the original prototype. Only once fully assembled did a bottleneck become clear.