Setting Hadoop Node Types on AWS EMR

Amazon Web Services (AWS) offers dozens (if not over one hundred) different services. I probably use about a dozen of them regularly, including Elastic Map Reduce (EMR) which is their platform for running big-data things like Hadoop and Spark.

EMR runs your work on EC2 instances, and you can pick which kind(s) you want when the job starts. You can also pick the "lifecycle" of these instances. This means you can pick some instances to run as "on-demand" where the instance is yours (barring a hardware failure), and other instances to run as "spot" which costs much less than on-demand but AWS can take away the instance at any time.

Luckily, Hadoop and Spark are designed to work on unreliable hardware, and if previously done work is unavailable (e.g. because the instance that did it is no longer running), Hadoop and Spark can re-run that work again. This means that you can use a mix of on-demand and spot instances that, as long as AWS doesn't take away too many spot instances, will run the job for lower cost than otherwise.

A big issue with running Hadoop on spot instances is that multi-stage Hadoop jobs save some data between stages that can't be redone. This data is stored in HDFS, which is where Hadoop stores (semi)permanent data. Because we don't want this data going away, we need to run HDFS on on-demand instances, and not run it on spot instances. Hadoop handles this by having two kind of "worker" instances: "CORE" instances that run HDFS and have the important data, and "TASK" types that do not run HDFS and store easily reproduced data. Both types share in the computational workload, the difference is what kind of data is allowed to be stored on them. It makes sense, then, to confine "TASK" instances to spot nodes.

The trick is to configure Hadoop such that the instances themselves know what kind of instance they are. Figuring this out was harder than it should have been because AWS EMR doesn't auto-configure nodes to work this way; the user needs to configure the job to do this including running scripts on the instances themselves.

I like to run my Hadoop jobs using mrjob which makes development and running Hadoop with Python easier. I assume that this can be done outside of mrjob, but its up to the reader to figure out how to do that.

There are three parts to this. The first two are two Python scripts that are run on the EC2 instances, and the third is modifying the mrjob configuration file. The Python scripts should be uploaded to a S3 bucket because they will be downloaded to each Hadoop node (see the bootstrap actions below).

With the changes below, you should be able to run a multi-step Hadoop job on AWS EMR using spot nodes and not lose any intermediate work. Good luck!

make_node_labels.py

This script tells yarn what kind of instance types are available. This only needs to run once.

#!/usr/bin/python3
import subprocess
import time

def run(cmd):
    proc = subprocess.Popen(cmd,
        stdout = subprocess.PIPE,
        stderr = subprocess.PIPE,
    )
    stdout, stderr = proc.communicate()

    return proc.returncode, stdout, stderr

if __name__ == '__main__':

    # Wait for the yarn stuff to be installed
    code, out, err = run(['which', 'yarn'])
    while code == 1:
        time.sleep(5)
        code, out, err = run(['which', 'yarn'])

    # Now we wait for things to be configured
    time.sleep(60)

    # Now set the node label types
    code, out, err = run(["yarn",
        "rmadmin",
        "-addToClusterNodeLabels",
        '"CORE(exclusive=false),TASK(exclusive=false)"'])

get_node_label.py

This script tells Hadoop what kind of instance this is. It is called by Hadoop and run as many times as needed.

#!/usr/bin/python3
import json
k='/mnt/var/lib/info/extraInstanceData.json'
with open(k) as f:
    response = json.load(f)
    print("NODE_PARTITION:", response['instanceRole'].upper())

mrjob.conf

This is not a complete mrjob configuration file. It shows the essential parts needed for setting up CORE/TASK nodes. You will need to fill in the rest for your specific situation.

runners:
  emr:

    instance_fleets:
    - InstanceFleetType: MASTER
      TargetOnDemandCapacity: 1
      InstanceTypeConfigs:
      - InstanceType: (smallish instance type)
        WeightedCapacity: 1
    - InstanceFleetType: CORE
      # Some nodes are launched on-demand which prevents the whole job from
      # dying if spot nodes are yanked
      TargetOnDemandCapacity: NNN (count of on-demand cores)
      InstanceTypeConfigs:
      - InstanceType: (bigger instance type)
        BidPriceAsPercentageOfOnDemandPrice: 100
        WeightedCapacity: (core count per instance)
    - InstanceFleetType: TASK
      # TASK means no HDFS is stored so loss of a node won't lose data
      # that can't be recovered relatively easily
      TargetOnDemandCapacity: 0
      TargetSpotCapacity: MMM (count of spot cores)
      LaunchSpecifications:
        SpotSpecification:
          TimeoutDurationMinutes: 60
          TimeoutAction: SWITCH_TO_ON_DEMAND
      InstanceTypeConfigs:
      - InstanceType: (bigger instance type)
        BidPriceAsPercentageOfOnDemandPrice: 100
        WeightedCapacity: (core count per instance)
      - InstanceType: (alternative instance type)
        BidPriceAsPercentageOfOnDemandPrice: 100
        WeightedCapacity: (core count per instance)

    bootstrap:
    # Download the Python scripts to the instance
    - /usr/bin/aws s3 cp s3://bucket/get_node_label.py /home/hadoop/
    - chmod a+x /home/hadoop/get_node_label.py
    - /usr/bin/aws s3 cp s3://bucket/make_node_labels.py /home/hadoop/
    - chmod a+x /home/hadoop/make_node_labels.py
    # nohup runs this until it quits on its own
    - nohup /home/hadoop/make_node_labels.py &

    emr_configurations:
      - Classification: yarn-site
        Properties:
          yarn.node-labels.enabled: true
          yarn.node-labels.am.default-node-label-expression: CORE
          yarn.node-labels.configuration-type: distributed
          yarn.nodemanager.node-labels.provider: script
          yarn.nodemanager.node-labels.provider.script.path: /home/hadoop/get_node_label.py
more ...

Pages

  • About Stephen Skory
  • Solar Water Heater