Tuesday, March 19, 2019

Faster File Distribution with HDFS and S3

In the Hadoop world there is almost always more than one way to accomplish a task. I prefer the platform because it's very unlikely I'm ever backed into a corner when working on a solution.

Hadoop has the ability to decouple storage from compute. The various distributed storage solutions supported all come with their own set of strong points and trade-offs. I often find myself needing to copy data back and forth between HDFS on AWS EMR and AWS S3 for performance reasons.

S3 is a great place to keep a master dataset as it can be used among many clusters without affect the performance of any one of them; it also comes with 11 9s of durability meaning it's one of the most unlikely places for data to go missing or become corrupt.

HDFS is where I find the best performance when running queries. If the workload will take long enough it's worth the time to copy a given dataset off of S3 and onto HDFS; any derivative results can then be transferred back onto S3 before the EMR cluster is terminated.

In this post I'll examine a number of different methods for copying data off of S3 and onto HDFS and see which is the fastest.

AWS EMR, Up & Running

To start, I'll launch an 11-node EMR cluster. I'll use the m3.xlarge instance type with 1 master node, 5 core nodes (these will make up the HDFS cluster) and 5 task nodes (these will run MapReduce jobs). I'm using spot pricing which often reduces the cost of the instances by 75-80% depending on market conditions. Both the EMR cluster and the S3 bucket are located in Ireland.

$ aws emr create-cluster --applications Name=Hadoop \ Name=Hive \ Name=Presto \ --auto-scaling-role EMR_AutoScaling_DefaultRole \ --ebs-root-volume-size 10 \ --ec2-attributes '{  "KeyName": "emr",  "InstanceProfile": "EMR_EC2_DefaultRole",  "AvailabilityZone": "eu-west-1c",  "EmrManagedSlaveSecurityGroup": "sg-89cd3eff",  "EmrManagedMasterSecurityGroup": "sg-d4cc3fa2"}' \ --enable-debugging \ --instance-groups '[{  "InstanceCount": 5,  "BidPrice": "OnDemandPrice",  "InstanceGroupType": "CORE",  "InstanceType": "m3.xlarge",  "Name": "Core - 2"  },{  "InstanceCount": 5,  "BidPrice": "OnDemandPrice",  "InstanceGroupType": "TASK",  "InstanceType": "m3.xlarge",  "Name": "Task - 3"  },{  "InstanceCount": 1,  "BidPrice": "OnDemandPrice",  "InstanceGroupType": "MASTER",  "InstanceType": "m3.xlarge",  "Name": "Master - 1"  }]' \ --log-uri 's3n://aws-logs-591231097547-eu-west-1/elasticmapreduce/' \ --name 'My cluster' \ --region eu-west-1 \ --release-label emr-5.21.0 \ --scale-down-behavior TERMINATE_AT_TASK_COMPLETION \ --service-role EMR_DefaultRole \ --termination-protected 

After a few minutes the cluster has been launched and bootstrapped and I'm able to SSH in.

$ ssh -i ~/.ssh/emr.pem \ hadoop@ec2-54-155-42-195.eu-west-1.compute.amazonaws.com 
 __| __|_ ) _| ( / Amazon Linux AMI ___|\___|___| https://aws.amazon.com/amazon-linux-ami/2018.03-release-notes/ 1 package(s) needed for security, out of 9 available Run "sudo yum update" to apply all updates. EEEEEEEEEEEEEEEEEEEE MMMMMMMM MMMMMMMM RRRRRRRRRRRRRRR E::::::::::::::::::E M:::::::M M:::::::M R::::::::::::::R EE:::::EEEEEEEEE:::E M::::::::M M::::::::M R:::::RRRRRR:::::R E::::E EEEEE M:::::::::M M:::::::::M RR::::R R::::R E::::E M::::::M:::M M:::M::::::M R:::R R::::R E:::::EEEEEEEEEE M:::::M M:::M M:::M M:::::M R:::RRRRRR:::::R E::::::::::::::E M:::::M M:::M:::M M:::::M R:::::::::::RR E:::::EEEEEEEEEE M:::::M M:::::M M:::::M R:::RRRRRR::::R E::::E M:::::M M:::M M:::::M R:::R R::::R E::::E EEEEE M:::::M MMM M:::::M R:::R R::::R EE:::::EEEEEEEE::::E M:::::M M:::::M R:::R R::::R E::::::::::::::::::E M:::::M M:::::M RR::::R R::::R EEEEEEEEEEEEEEEEEEEE MMMMMMM MMMMMMM RRRRRRR RRRRRR 

The five core nodes each have 68.95 GB of capacity that together create 344.75 GB of capacity across the HDFS cluster.

$ hdfs dfsadmin -report \ | grep 'Configured Capacity' 
Configured Capacity: 370168258560 (344.75 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) 

The dataset I'll be using in this benchmark is a data dump I've produced of 1.1 billion taxi trips conducted in New York City over a six year period. The Billion Taxi Rides in Redshift blog post goes into detail on how I put this dataset together. This dataset is approximately 86 GB in ORC format spread across 56 files. The typical ORC file is ~1.6 GB in size.

I'll create a filename manifest that I'll use for various operations below. I'll exclude the S3 URL prefix as these names will also be used to address files on HDFS as well.

000000_0 000001_0 000002_0 000003_0 000004_0 000005_0 000006_0 000007_0 000008_0 000009_0 000010_0 000011_0 000012_0 000013_0 000014_0 000015_0 000016_0 000017_0 000018_0 000019_0 000020_0 000021_0 000022_0 000023_0 000024_0 000025_0 000026_0 000027_0 000028_0 000029_0 000030_0 000031_0 000032_0 000033_0 000034_0 000035_0 000036_0 000037_0 000038_0 000039_0 000040_0 000041_0 000042_0 000043_0 000044_0 000045_0 000046_0 000047_0 000048_0 000049_0 000050_0 000051_0 000052_0 000053_0 000054_0 000055_0 

I'll adjust the AWS CLI's configuration to allow for up to 100 concurrent requests at any one time.

$ aws configure set \ default.s3.max_concurrent_requests \ 100 

The disk space on the master node cannot hold the entire 86 GB worth of ORC files so I'll download, import onto HDFS and remove each file one at a time. This will allow me to maintain enough working disk space on the master node.

$ hdfs dfs -mkdir /orc $ time (for FILE in `cat files`; do aws s3 cp s3://<bucket>/orc/$FILE ./ hdfs dfs -copyFromLocal $FILE /orc/ rm $FILE done) 

The above completed 15 minutes and 57 seconds.

The HDFS CLI uses the JVM which comes with a fair amount of overhead. In my HDFS CLI benchmark I found the alternative CLI gohdfs could save a lot of start-up time as it is written in GoLang and doesn't run on the JVM. Below I've run the same operation using gohdfs.

$ wget -c -O gohdfs.tar.gz \ https://github.com/colinmarc/hdfs/releases/download/v2.0.0/gohdfs-v2.0.0-linux-amd64.tar.gz $ tar xvf gohdfs.tar.gz 

I'll clear out the previously downloaded dataset off HDFS first so there is enough space on the cluster going forward. With triple replication, 86 GB turns into 258 GB on disk and there is only 344.75 GB of HDFS capacity in total.

$ hdfs dfs -rm -r -skipTrash /orc $ hdfs dfs -mkdir /orc $ time (for FILE in `cat files`; do aws s3 cp s3://<bucket>/orc/$FILE ./ gohdfs-v2.0.0-linux-amd64/hdfs put \ $FILE \ hdfs://ip-10-10-207-160.eu-west-1.compute.internal:8020/orc/ rm $FILE done) 

The above took 27 minutes and 40 seconds. I wasn't expecting this client to be almost twice as slow as the HDFS CLI. S3 provides consistent performance when I've run other tools multiple times so I suspect either the code behind the put functionality could be optimised or there might be a more appropriate endpoint for copying multi-gigabyte files onto HDFS. As of this writing I can't find support for copying from S3 to HDFS directly with resorting to a file system fuse.

The HDFS CLI does support copying from S3 to HDFS directly. Below I'll copy the 56 ORC files to HDFS straight from S3. I'll set the concurrent process limit to 8.

$ hdfs dfs -rm -r -skipTrash /orc $ hdfs dfs -mkdir /orc $ time (cat files \ | xargs -n 1 \ -P 8 \ -I % \ hdfs dfs -cp s3://<bucket>/orc/% /orc/) 

The above took 14 minutes and 17 seconds. There wasn't much of an improvement over simply copying the files down one at a time and uploading them to HDFS.

I'll try the above command again but set the concurrency limit to 16 processes. Note, this is running on the master node which has 15 GB of RAM and the following will use what little memory capacity is left on the machine.

$ hdfs dfs -rm -r -skipTrash /orc $ hdfs dfs -mkdir /orc $ time (cat files \ | xargs -n 1 \ -P 16 \ -I % \ hdfs dfs -cp s3://<bucket>/orc/% /orc/) 

The above took 14 minutes and 36 seconds. Again, a very similar time despite a higher concurrency limit. The effective transfer rate was ~98.9 MB/s off of S3. HDFS is configured for triple redundancy but I expect there is a lot more throughput available with a cluster of this size.

DistCp (distributed copy) is bundled with Hadoop and uses MapReduce to copy files in a distributed manner. It can work with HDFS, AWS S3, Azure Blob Storage and Google Cloud Storage. It can break up the downloading and importing across the task nodes so all five machines can work on a single job instead of the master node being the single machine downloading and importing onto HDFS.

$ hdfs dfs -rm -r -skipTrash /orc $ hdfs dfs -mkdir /orc $ time (hadoop distcp s3://<bucket>/orc/* /orc) 

The above completed in 6 minutes and 16 seconds. A huge improvement over the previous methods.

On AWS EMR, there is a tool called S3DistCp that aims to provide the functionality of Hadoop's DistCp but in a fashion optimised for S3. Like DistCp, it uses MapReduce for executing its operations.

$ hdfs dfs -rm -r -skipTrash /orc $ hdfs dfs -mkdir /orc $ time (s3-dist-cp \ --src=s3://<bucket>/orc/ \ --dest=hdfs:///orc/) 

The above completed in 5 minutes and 59 seconds. This gives an effective throughput of ~241 MB/s off of S3. There wasn't a huge performance increase over DistCp and I suspect neither tool can greatly out-perform the other.

I did come across settings to increase the chunk size from 128 MB to 1 GB, which would be useful for larger files but enough tooling in the Hadoop ecosystem will suffer from ballooning memory requirements with files over 2 GB that it is very rare to see files larger than this in any sensibly-deployed production environment. S3 usually has low connection setup latency so I can't see this being a huge overhead.

With the above its now understood that both DistCp and S3DistCp can leverage a cluster's task nodes to import data from S3 onto HDFS quickly. I'm going to see how well these tools scale with a 21-node m3.xlarge cluster. This cluster will have 1 master node, 10 core nodes and 10 task nodes.

$ aws emr create-cluster \ --applications Name=Hadoop \ Name=Hive \ Name=Presto \ --auto-scaling-role EMR_AutoScaling_DefaultRole \ --ebs-root-volume-size 10 \ --ec2-attributes '{  "KeyName": "emr",  "InstanceProfile": "EMR_EC2_DefaultRole",  "AvailabilityZone": "eu-west-1c",  "EmrManagedSlaveSecurityGroup": "sg-89cd3eff",  "EmrManagedMasterSecurityGroup": "sg-d4cc3fa2"}' \ --enable-debugging \ --instance-groups '[{  "InstanceCount": 1,  "BidPrice": "OnDemandPrice",  "InstanceGroupType": "MASTER",  "InstanceType": "m3.xlarge",  "Name": "Master - 1"  },{  "InstanceCount": 10,  "BidPrice": "OnDemandPrice",  "InstanceGroupType": "CORE",  "InstanceType": "m3.xlarge",  "Name": "Core - 2"  },{  "InstanceCount": 10,  "BidPrice": "OnDemandPrice",  "InstanceGroupType": "TASK",  "InstanceType": "m3.xlarge",  "Name": "Task - 3"  }]' \ --log-uri 's3n://aws-logs-591231097547-eu-west-1/elasticmapreduce/' \ --name 'My cluster' \ --region eu-west-1 \ --release-label emr-5.21.0 \ --scale-down-behavior TERMINATE_AT_TASK_COMPLETION \ --service-role EMR_DefaultRole \ --termination-protected 

With the new EMR cluster up and running I can SSH into it.

$ ssh -i ~/.ssh/emr.pem \ hadoop@ec2-54-78-53-9.eu-west-1.compute.amazonaws.com 

Each core node on the HDFS cluster still has 68.95 GB of capacity but the ten machines combined create 689.49 GB of HDFS storage capacity.

$ hdfs dfsadmin -report \ | grep 'Configured Capacity' 
Configured Capacity: 740336517120 (689.49 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) Configured Capacity: 74033651712 (68.95 GB) 

I'll run S3DistCp first.

$ hdfs dfs -mkdir /orc $ time (s3-dist-cp \ --src=s3://<bucket>/orc/ \ --dest=hdfs:///orc/) 

The above completed in 4 minutes and 56 seconds. This is an improvement over the 11-node cluster but not the 2x improvement I was expecting.

Below is DistCp running on the 21-node cluster.

$ hdfs dfs -rm -r -skipTrash /orc $ hdfs dfs -mkdir /orc $ time (hadoop distcp s3://<bucket>/orc/* /orc) 

The above completed in 4 minutes and 44 seconds.

The performance ratio between these two tools is more or less consistent between cluster sizes. Its a shame neither showed linear scaling with twice the number of core and task nodes.

Here is a recap of the transfer times seen in this post.

Duration Transfer Method
27m40s gohdfs, Sequentially
15m57s HDFS DFS CLI, Sequentially
14m36s HDFS DFS CLI, Concurrently x 16
14m17s HDFS DFS CLI, Concurrently x 8
6m16s Hadoop DistCp, 11-Node Cluster
5m59s S3DistCp, 11-Node Cluster
4m56s S3DistCp, 21-Node Cluster
4m44s Hadoop DistCp, 21-Node Cluster

Why Use HDFS At All?

S3 is excellent for durability and doesn't suffer performance-wise if you have one cluster or ten clusters pointed at it. S3 also works as well as HDFS when appending records to a dataset. Both of the following queries will run without issue.

$ presto-cli \ --schema default \ --catalog hive 
INSERT INTO trips_hdfs SELECT * from trips_hdfs LIMIT 10; 
INSERT INTO trips_s3 SELECT * from trips_s3 LIMIT 10; 

I've heard arguments that S3 is as fast as HDFS but I've never witnessed this in my time with both technologies. Below I'll run a benchmark on the 1.1 billion taxi trips. I'll have one table using S3-backed data and another table using HDFS-backed data. This was run on the 11-node cluster. I ran each query multiple times and recorded the fastest times.

This is the HDFS-backed table.

CREATE EXTERNAL TABLE trips_hdfs ( trip_id INT, vendor_id STRING, pickup_datetime TIMESTAMP, dropoff_datetime TIMESTAMP, store_and_fwd_flag STRING, rate_code_id SMALLINT, pickup_longitude DOUBLE, pickup_latitude DOUBLE, dropoff_longitude DOUBLE, dropoff_latitude DOUBLE, passenger_count SMALLINT, trip_distance DOUBLE, fare_amount DOUBLE, extra DOUBLE, mta_tax DOUBLE, tip_amount DOUBLE, tolls_amount DOUBLE, ehail_fee DOUBLE, improvement_surcharge DOUBLE, total_amount DOUBLE, payment_type STRING, trip_type SMALLINT, pickup STRING, dropoff STRING, cab_type STRING, precipitation SMALLINT, snow_depth SMALLINT, snowfall SMALLINT, max_temperature SMALLINT, min_temperature SMALLINT, average_wind_speed SMALLINT, pickup_nyct2010_gid SMALLINT, pickup_ctlabel STRING, pickup_borocode SMALLINT, pickup_boroname STRING, pickup_ct2010 STRING, pickup_boroct2010 STRING, pickup_cdeligibil STRING, pickup_ntacode STRING, pickup_ntaname STRING, pickup_puma STRING, dropoff_nyct2010_gid SMALLINT, dropoff_ctlabel STRING, dropoff_borocode SMALLINT, dropoff_boroname STRING, dropoff_ct2010 STRING, dropoff_boroct2010 STRING, dropoff_cdeligibil STRING, dropoff_ntacode STRING, dropoff_ntaname STRING, dropoff_puma STRING ) STORED AS orc LOCATION '/orc/'; 

This is the S3-backed table.

CREATE EXTERNAL TABLE trips_s3 ( trip_id INT, vendor_id STRING, pickup_datetime TIMESTAMP, dropoff_datetime TIMESTAMP, store_and_fwd_flag STRING, rate_code_id SMALLINT, pickup_longitude DOUBLE, pickup_latitude DOUBLE, dropoff_longitude DOUBLE, dropoff_latitude DOUBLE, passenger_count SMALLINT, trip_distance DOUBLE, fare_amount DOUBLE, extra DOUBLE, mta_tax DOUBLE, tip_amount DOUBLE, tolls_amount DOUBLE, ehail_fee DOUBLE, improvement_surcharge DOUBLE, total_amount DOUBLE, payment_type STRING, trip_type SMALLINT, pickup STRING, dropoff STRING, cab_type STRING, precipitation SMALLINT, snow_depth SMALLINT, snowfall SMALLINT, max_temperature SMALLINT, min_temperature SMALLINT, average_wind_speed SMALLINT, pickup_nyct2010_gid SMALLINT, pickup_ctlabel STRING, pickup_borocode SMALLINT, pickup_boroname STRING, pickup_ct2010 STRING, pickup_boroct2010 STRING, pickup_cdeligibil STRING, pickup_ntacode STRING, pickup_ntaname STRING, pickup_puma STRING, dropoff_nyct2010_gid SMALLINT, dropoff_ctlabel STRING, dropoff_borocode SMALLINT, dropoff_boroname STRING, dropoff_ct2010 STRING, dropoff_boroct2010 STRING, dropoff_cdeligibil STRING, dropoff_ntacode STRING, dropoff_ntaname STRING, dropoff_puma STRING ) STORED AS orc LOCATION 's3://<bucket>/orc/'; 

I'll use Presto to run the benchmarks.

$ presto-cli \ --schema default \ --catalog hive 

The following four queries were run on the HDFS-backed table.

The following completed in 6.77 seconds.

SELECT cab_type, count(*) FROM trips_hdfs GROUP BY cab_type; 

The following completed in 10.97 seconds.

SELECT passenger_count, avg(total_amount) FROM trips_hdfs GROUP BY passenger_count; 

The following completed in 13.38 seconds.

SELECT passenger_count, year(pickup_datetime), count(*) FROM trips_hdfs GROUP BY passenger_count, year(pickup_datetime); 

The following completed in 19.82 seconds.

SELECT passenger_count, year(pickup_datetime) trip_year, round(trip_distance), count(*) trips FROM trips_hdfs GROUP BY passenger_count, year(pickup_datetime), round(trip_distance) ORDER BY trip_year, trips desc; 

The following four queries were run on the S3-backed table.

The following completed in 10.82 seconds.

SELECT cab_type, count(*) FROM trips_s3 GROUP BY cab_type; 

The following completed in 14.73 seconds.

SELECT passenger_count, avg(total_amount) FROM trips_s3 GROUP BY passenger_count; 

The following completed in 19.19 seconds.

SELECT passenger_count, year(pickup_datetime), count(*) FROM trips_s3 GROUP BY passenger_count, year(pickup_datetime); 

The following completed in 24.61 seconds.

SELECT passenger_count, year(pickup_datetime) trip_year, round(trip_distance), count(*) trips FROM trips_s3 GROUP BY passenger_count, year(pickup_datetime), round(trip_distance) ORDER BY trip_year, trips desc; 

This is a recap of the query times above.

HDFS on AWS EMR AWS S3 Speed Up Query
6.77s 10.82s 1.6x Query 1
10.97s 14.73s 1.34x Query 2
13.38s 19.19s 1.43x Query 3
19.82s 24.61s 1.24x Query 4

The HDFS-backed queries were anywhere from 1.24x to 1.6x faster than the S3-backed queries.


DataTau published first on DataTau

No comments:

Post a Comment