One of my primary responsibilities at my current job is ownership of the ETL pipeline that brings in the data upon which we run our business. Every day it processes hundreds of gigabytes of data, cleaning and normalizing it, and outputting it in several different forms.
For a few years I have been using a Hadoop-based mrjob pipeline on top of AWS EMR. It replaced a (I kid you not) Redshift-based pipeline that was hugely expensive (that I didn't write). It's been very reliable. For the last year or two the only failures have been when AWS's systems have had issues, something I can't do anything about. Despite this reliability, it hasn't been perfect. The biggest problem with it is that it's expensive to run. There are few reasons why it's been so expensive:
-
EMR adds a 25% upcharge on all resources used. It is reasonable for AWS to charge something because EMR is a useful service with added value. However, in my opinion, 25% is too high for what it does. AWS already gets paid plenty because you're using their other services underneath it (mainly EC2 virtual computers and EBS block storage), so the extra mark-up feels like a big cash-grab. It is, of course, entirely possible to run Hadoop on AWS without using EMR. But it is a hassle, and it's likely that AWS has figured out that 25% is the inflection point between too expensive and not worth the hassle
-
Hadoop isn't the most efficient way of doing things. Newer tools, notably Spark, have surpassed Hadoop in both speed and features. I originally used Hadoop because I wasn't happy with how Spark needed quite a bit more memory than Hadoop for a similar operation, but over time I became less and less satisfied with my inability to speed up certain parts of the process
-
AWS offers spot compute instances, which are virtual computers that are significantly cheaper than on-demand instances. The difference is that on-demand nodes are yours as long as you want them, while spot instances can be taken away at any time with only two minutes warning.
One of Hadoop's killer features is that during a MapReduce cycle, if one or more worker nodes goes away (for whatever reason including spot node removal), in most cases it can recover and redo any lost results.
However, the EMR pipeline used a multi-step MapReduce process. Unfortunately, Hadoop cannot recover lost results from earlier, fully completed MapReduce cycles. This means that in order to run the pipeline, it had to have enough on-demand instances that the data that needed to be preserved across cycles fit on them. This raised the cost considerably when compared to an all-spot pipeline run
Earlier this year I decided it was time to start looking at how to rewrite the pipeline to dramatically lower costs. I saw two possible ways forward:
-
Choose a modern, high performance tool like Spark/PySpark or Dask that would still run on EMR but hopefully would be much faster
-
Abandon EMR entirely (and its 25% surcharge) and write something that could run completely on spot instances and use S3 for storage, which is four to five times cheaper than EBS
After some thought, I decided that the second option was the better choice. If I could figure it out, it offered the best possible outcome. The pipeline runs once per day, meaning that it has 24 hours to finish before the next run needs to start. Ultimately, high performance was less important than lowering costs.
I have been using Polars quite a bit and have been (mostly1) impressed with its speed and functionality. It is written in Rust, which is one of the fastest programming languages. Polars has a Python-facing API as a first-class member of the project. Rust has an ever-growing library of packages that I've found are high-quality and well documented (in contrast to my experience with cough R packages). The crucial difference between Polars and Hadoop/Spark/Dask is that Polars runs on only one node at a time (it can and does use all the CPU cores), while all of the others can run on multiple nodes. If I could figure out how to slice up the pipeline into chunks that would work on separate instances, I believed I could use Polars in place of Hadoop.
Jumping to the end of the story, I was able to convert the pipeline to use Polars, and to great success. I use a simple pattern for each step. An orchestration process builds a list of work which is submitted to SQS. An EC2 spot fleet is created which launches workers that consume the work. The input and output of the work is stored on S3. The workers send success or failure messages to a callback queue on SQS, which is monitored by the orchestration process. If a spot node goes away interrupting work, the work will be picked up by a different node once the message returns to availability. Once the work is done for a step (i.e. all work has generated a callback), the orchestration process kills the spot fleet and continues to the next step (or sends an error message for a human to figure out).
The bottom line is that the cost has dropped by roughly 85%, primarily due to the following reasons:
-
Polars has the concept of LazyFrames which are data objects that are not realized in memory nor computation until Polars is told to do so. Operations and filtering can be applied to them and Polars can do the work in parallel with efficiency tricks that overall increases speed without loading the whole dataset into memory at once. The combination of sink_parquet with PartitionByKey is effectively a MapReduce operation that is much faster than Hadoop on similar hardware
-
AWS has "regions" and "availability zones (AZs)" which are the physical locations where cloud compute happens. Each AZ is a distinct data center (or close by data centers) within a region. When running an EMR job, you are restricted to a single AZ largely because AWS charges for cross-AZ data transfer, and EMR jobs are very loquacious across the network. There's also increased network latency between AZs. Running and EMR job in multiple AZs would hugely impact performance and cost.
Because the new pipeline reads from and saves data to S3, and there are no cross-AZ charges for accessing S3 within a region, it doesn't matter which AZ the workers run in. This means that the spot fleets can target all AZs within the region, unlike EMR
-
When launching an EC2 fleet, you must specify one or more launch templates, which describe how to launch each instance in terms of OS and installed software. AWS EC2 offers instances using x86 processors from Intel and AMD, and ARM instances using AWS Graviton processors. Conveniently, the pipeline doesn't require any processor-specific features. Therefore, I created two AMIs, one for each of x86 and ARM, which allows the spot fleet to target any and all of Intel, AMD, and Graviton instances
-
The pipeline requirements for each step basically comes down to number of CPU cores and amount of RAM, more or less of each depending on what the step is doing. The upshot of all of the above is that for a given step all the pipeline cares about is the resources of the node, not what kind of node it is. Of course, not all instances are the same speed, but the cost of an instance is roughly proportional with its speed, so it all works out. This means that for a given step, across all AZs and EC2 instance types, there can be over 100 distinct resource combinations to pick from. This basically guarantees spot availability at all times
-
The pipeline uses a fair number of User Defined Functions. Polars supports UDFs written in Python, Numba, and Rust using PyO3. By using the latter two, basically all of the inner loops and heavy computation in the pipeline happens in compiled C or Rust. This, in my opinion, is a really nice way of doing things. Let Python handle moving data around and high-level stuff, and run all the heavy computation in compiled code.
Overall, I'm very pleased about the results of this work. The goal was to save money, and it has done that. I wasn't expecting 85% savings (I'm not sure what I was hoping for), but I feel quite good about that.