Joining in Apache Beam

Jun 7, 2021

helio-dilolwa

When working with data, you’re almost always going to hit a situation where you need to combine and transform multiple sets of data into a singular entity.

Doing this sort of transformation is simpler in analytics frameworks where whole columnar transformations are made easy. An example of this can be seen via SQL and Spark.

SQL Join Example

select order_date, order_amount
from customers
join orders
   on customers.customer_id = orders.customer_id

PySpark Join Example

joined_df = orders_df.join(
    customers_df, 
    orders_df.customer_id == customers_df.customer_id, 
    how='left',
)

What happens if we want to do this in a framework which emphasizes row-level transformations? Enter Apache Beam.

Apache Beam is a relatively new framework based on Map-Reduce and Java Streams paradigms. The goal is to take a wide set of data and apply subsequent transformations to filter or extract meaningful computations, and then finally to aggregate or reduce the result into something you’ll be able to use further down the pipeline.

The problem with this design is that it becomes hard to apply transforms between multiple streams on a columnar level, AKA joining.

Apache Beam as of today, has no stable way of joining by keys, so, how do we do this?

First, note that joins are very similar to grouping. For example, if I wanted to join by an id column, I would need to check that my ids were grouped together. If a particular data had an id of 1, I’d expect all other rows with an id of 1 to be clumped together. This way, it’d be easy to extract all fields that have an id of 1.

Now, the problem is, grouping doesn’t necessarily imply that all our rows are combined into a singular row with multiple fields. Most of the time, it just has the following format:

[
    {'id': 1, 'data': ['dat_1', 'dat_2', 'dat_3']},
    {'id': 2, 'data': ['dat_4', 'dat_5']},
]

This is the same way with beam’s grouping. We must unpack these values and combine them into a singular set.

Let’s go over an example with two sub-collections.

First, we create two different collections. Both collections have an id_col which we will be using as a join key.

Sub-Collection 1

{'id_col': 1, 'val_1': 4.5}
{'id_col': 2, 'val_1': 23.7}
{'id_col': 3, 'val_1': 684.34}
{'id_col': 4, 'val_1': 896.0}

Sub-Collection 2

{'id_col': 1, 'val_2': 351.0}
{'id_col': 2, 'val_2': 64.23}
{'id_col': 3, 'val_2': -5489.0}
{'id_col': 4, 'val_2': 894.2}

What we want is a combined collection structure that looks like this:

{'id_col': 1, 'val_1': 4.5, 'val_2': 351.0}
{'id_col': 2, 'val_1': 23.7, 'val_2': 64.23}
{'id_col': 3, 'val_1': 684.34, 'val_2': -5489.0}
{'id_col': 4, 'val_1': 896.0, 'val_2': 894.2}

Now that we know our goal, we need to complete these steps:

  1. Create and set the joinable key
  2. Group the data based on the key
  3. Unpack the grouped data into a singular set

Creating and Setting Keys

Creating a key that both collections can join is as simple as separating the key value from the rest of the dataset. For example, instead of representing a row as itself, we now have:

row = (row['key'], row)

Differentiating the row key like this will make it easier to be grouped via Beam’s GroupByKey.

Grouping By Key

Now that we have our key set, we can directly call Beam’s GroupByKey.

merge = [sub_pipe_1, sub_pipe_2] | beam.CoGroupByKey()
We use CoGroupByKey instead of GroupByKey in this example because we are joining multiple input collections instead of a singular input collection.
Also note that we can join more than 2 input streams using Beam. Grouping by key will work with any input set that is contained within an iterable. This includes lists, tuples, and dictionaries.

Unpacking Grouped Values

Now that we have finished grouping, our dataset looks somewhat like the following:

(1, ({'id_col': 1, 'val_1': 4.5}, {'id_col': 1, 'val_2': 351.0}))
(2, ({'id_col': 2, 'val_1': 23.7}, {'id_col': 2, 'val_2': 64.23}))

We can break this custom tuple using a standard python assignment.

id_col, grouped = row
id_col grouped
1 ({‘id_col’: 1, ‘val_1’: 4.5}, {‘id_col’: 1, ‘val_2’: 351.0})
2 ({‘id_col’: 2, ‘val_1’: 23.7}, {‘id_col’: 2, ‘val_2’: 64.23})

This structure brings us the benefits of being able to split and combine grouped sub-dictionaries (collections) into a unified one using standard python dictionary computations.

def unpack(row):
    id_col, group = row
    res = {'id_col': id_col}
    for gp in group:
        # update our resulting dict with all other vals
        res.update({k: v for k, v in gp[0].items() if k != 'id_col'})
    return res
We can automatically assume that row[0] is the id_col since we have set it that way in the beginning as a joinable key. Keys can also be multiple values assuming that you provide verification functions which confirms valid keys that can be grouped.

The unpack function (or something similar like it) can be used to unpack the nested dictionary structures into a singular structure which contains unique values from different beam collections.

This step will finally give us the format that we wanted.

{'id_col': 1, 'val_1': 4.5, 'val_2': 351.0}
{'id_col': 2, 'val_1': 23.7, 'val_2': 64.23}
{'id_col': 3, 'val_1': 684.34, 'val_2': -5489.0}
{'id_col': 4, 'val_1': 896.0, 'val_2': 894.2}

Ready To Try It?

Try running and analyzing the code structure below.

Full Code

with beam.Pipeline() as p:
    sub_col_1 = (
        p
        | 'Create collection 1' >> beam.Create([
            {'id_col': 1, 'val_1': 4.5},
            {'id_col': 2, 'val_1': 23.7},
            {'id_col': 3, 'val_1': 684.34},
            {'id_col': 4, 'val_1': 896.0},
        ])
        | 'Create sub collection 1 joinable key' >> beam.Map(lambda row: (row['id_col'], row))
    )
    sub_col_2 = (
        p
        | 'Create collection 2' >> beam.Create([
            {'id_col': 1, 'val_2': 351.0},
            {'id_col': 2, 'val_2': 64.23},
            {'id_col': 3, 'val_2': -5489.0},
            {'id_col': 4, 'val_2': 894.2},
        ])
        | 'Create sub collection 2 joinable key' >> beam.Map(lambda row: (row['id_col'], row))
    )
    
    # group these collections by id
    merge = [sub_col_1, sub_col_2] | beam.CoGroupByKey()
    
    # unpack merge contents to flattened dict
    def unpack(row):
        id_col, group = row
        res = {'id_col': id_col}
        for gp in group:
            # update our resulting dict with all other vals
            res.update({k: v for k, v in gp[0].items() if k != 'id_col'})
        return res
    
    (
        merge
        | 'unpack values' >> beam.Map(unpack)
        | 'print final result' >> beam.Map(print)
    )
Yiping Su
Yiping Su
Software Engineer

I am interested in data, software engineering, and the application of computer science concepts in real-world scenarios.

Related