Hadoop and MapReduce
This is a summary relating Hadoop and MapReduce on the Workstations at ETP. The full cluster consists of two part, the Hadoop File System, HDFS, and the management of jobs or processes on this data. The given Hadoop framework is a combination of both. That means, once the cluster is started, one uses a single interface for managing data and jobs on the cluster.
- 1 HDFS
- 2 MapReduce
- 3 Usage on ETP Workstations
- 3.1 Starting the cluster
- 3.2 Sending data to the cluster
- 3.3 Sending a job to the cluster
- 3.4 Monitoring the cluster
- 4 Examples and HowTo's
- 5 Usefull Links
The Hadoop File System provides a reliable storage system on everyday hard drives. It is an open source derivation of the Google File System, GFS. Google wanted to lower the cost for hard drives by replacing high quality server disk by normal hard drives, because there wasn't much effort in lifetime and reliability even for very expensive hard drives. The idea is quite simple. The backup mechanism of raid mechanisms is transferred to a network of disks. Instead of storing a data only on one machine in the network, the file is distributed on 3 or more hard drives. By this, the failure of a single machine doesn't effect the hole cluster. The central machine for the HDFS is the namenode. The namenode distributes the file on the cluster with the desired replications and keeps track of the files. If the namenode fails, the data on the cluster will be lost. As this is a very crucial point, the namenode itself has a backup machine, the secondary-namenode. All machines in the cluster, which are only storing the data send from the namenode, are called datanodes.
MapReduce is a framework for processing large sets of data. The Name is inspired by the map and reduce functions often used in functional programming. A single MapReduce job runs trough several phases, which is quite well illustrated in the following picture:
An inputwriter reads data from the file system an sends it to map process. There the data will be somehow processed and intermediate results are produced in form a key-value-pairs. All these intermediate results are collected untill all data was processed by a map process. Then the results were send to reduce processes, which will create output files on the file system. It is possible to used a combiner between the map and reduce phase on the intermediate result. The combiner will sort the output of the mapper on the same machine, before results were send to a reducer to avoid massive network traffic. In this framework, at least a mapper is needed, which accepts data from the file system and produces key-value-pairs. The reducer can be disabled or simply do nothing on these pairs. Also, the intermediate results are gathered and sorted by the key before send to a reducer. Similarly to the HDFS, the central machine in the MapReduce framework is the jobtracker. The jobtracker gatheres the input data, splits them and sends the splits to tasktracker, which will process the data as given above. The jobtracker also collects and sorts the intermediate results and the output of the reducer.
Usage on ETP Workstations
Starting the cluster
The hadoop framework is already installed and configured on the workstations at the ETP in Garching. If you look at /project/etp/etphadoop/hadoop-scripte-1.2.1/, you will find several bash-scripts. Usally, the cluster is running and you don't have to start it yourself.
start-cluster.sh and stop-cluster.sh
What you read is what you get: executing these scripts will start or stop the cluster with the all the configs files in /project/etp/etphadoop/hadoop-conf/ The scripts start-cluster-local-config.sh and stop-cluster-local-config.sh will start the cluster without the option of an global config directory. By default, the local config directory will be /scratch-local/etphadoop/hadoop-1.2.1/conf. With starting the cluster, you will become the superuser of the cluster.
This script is used to distribute the global config directory /project/etp/etphadoop/hadoop-conf-1.2.1/ to all datanodes in the cluster. It will first remove the old config-files locally and then copy from this global. It might be useful, although the cluster was started global to have the same config files in the local standard config directory, which is /scratch-local/etphadoop/hadoop-1.2.1/conf/
This script will erase all temporary datas on all datanodes. Use this script after you reformat the namenode. After a (re)-formation of the namenode, the namenode labels itself with a certain Version-Number (After a reformation, it's new one). A Reformation does not effect the datanodes, so on all the datanodes, the old version number is still saved. Due to unknown reasons, the new namenode cannot start the datanodes with a different (new) Version-Number. Now, two work-arounds are possible. Edit the versionnumber on all datanodes, or simply delete the VERSION files, because it will be recreated on the first start of the cluster. For the deleting of the hadoop related tmp-files on all datanodes at ETP, use this script.
This new script will help you setting up a script for the cluster. For each streaming job you want to submit to the cluster, send a line like the following to the script
<yourmappper>;<yourreducer>;<yourMinSplitSize>;<yourInputfiles>;<yourOutputDir> | ./job-submission.sh -a "-D extra Option for your job submission"
The script will separate the incomming line by the semicolons, add your additional option (if needed) and create a script "tobeexec.sh", which you can directly execute on the cluster. For that, copy "tobeexec.sh" to the namenodes local hadoop directory and execute it. It is possible to send multiple lines to job-submission.sh, which results in a script thats starts multiple jobs consecutively.
This script adds the Hadoop local bin directory to your $PATH-Variable. After this, your able to use the Hadoop commands, e.g. checking the HDFS:
:~$ hadoop fsck /user FSCK started by <username> from /10.153.232.70 for path /user at Thu Jun 20 15:38:41 CEST 2013 .................................................................................................... .................................................................................................... .................................................................................................... .................................................................................................... ..............Status: HEALTHY Total size: 591027624671 B Total dirs: 14 Total files: 414 Total blocks (validated): 412 (avg. block size 1434533069 B) Minimally replicated blocks: 412 (100.0 %) Over-replicated blocks: 2 (0.4854369 %) Under-replicated blocks: 0 (0.0 %) Mis-replicated blocks: 0 (0.0 %) Default replication factor: 2 Average block replication: 2.0242717 Corrupt blocks: 0 Missing replicas: 0 (0.0 %) Number of data-nodes: 28 Number of racks: 1 FSCK ended at Thu Jun 20 15:38:41 CEST 2013 in 29 milliseconds The filesystem under path '/user' is HEALTHY
Sending data to the cluster
If you used the global config, gar-ws-etp79 is the namenode of your cluster. Also by starting the cluster, you are by default the superuser on this cluster.
# If You haven't yet created a directory for your data on the HDFS, use /user/<Your.Username> hadoop dfs mkdir /user/<Your.Username> hadoop dfs -copyFromLocal <sourceDir> <DestDir>
As you can see, hadoop is your interface and with the option "dfs -<yourcommand>" you send commands regarding the HDFS. You can use nearly every file system command you know from your linux enviroment. Also, as a member of the usergroup ls-schaile, you should be able to create your own directory and store your data there.
Sending a job to the cluster
To run your Java MapReduce Programm on the cluster, simply type:
hadoop jar <yourJava.jar>
If you want to execute any type of executable, you can use Hadoop Streaming or RootOnHadoop. The Hadoop Streaming is a given jar, which will set up an stream environment you know from your Linux. RootOnHadoop is an implementation, where you're able to process binary data files with any kind of executable.
Using Hadoop Streaming
hadoop jar /scratch-local/etphadoop/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar \\ -input <HDFSPathtoyourInputdir>\\ -output <HDFSPathToYourOutputDir>\\ -file <LocalPathToYourMapper>\\ -mapper <LocalPathToYourMapper>\\ -file <LocalPathToYourReducer>\\ -reducer <LocalPathToYourReducer>
The Streaming environment provides a stdout/stdin for your own executables. Your mapper reads data from stdin, processes them and send the key-value-pairs to stdout. The same for your reducer. The Streaming enviromment will take care of the rest. You can use the script job-submission.sh to avoid useless typing.
If you use the RootOnHadoop package, you need to specify the main class in the package cern.root.hadoop
hadoop jar RootOnHadoop.jar cern.root.hadoop.RootOnHadoop \\ -in <input files on hdfs> \\ -out <output dir on hdfs> \\ -map <local path to mapper script> \\ -reduce <local path to reducer script>
This implementation checks, where the input files are stored and creates a local copy for your script, if needed. Then to full path to the file will be given as first and only command line argument to your script. To the name of the output directory, a simple timestamp is added. By that, you can start the same script or line again without changing the output directory name. In the hadoop directory is a directory MapRed, where a simple mapper and reducer script is given:
#!/bin/bash #Map.sh echo "" echo "Working on:" echo $1 echo "" echo "Access test:" ls -lh $1 echo "" jacksum -a md5 $1 > test.out
This Mapper script prints the given file name stored in $1 and calculates a md5-checksum. You can use the Reduce.sh script, which simply checks on the result files from the mapper, of you don't need a reduce phase.
Monitoring the cluster
Hadoop provides a web interface, where you can monitor your work on the cluster.
You can also acces the datanodes and tasktracker. The Port for the datanode will be 50080, the Port for tasktracker 50060.
Examples and HowTo's
In the directory /project/etp/etphadoop/examples you find all the examples and files described here. If you have problems with the cluster or these examples, contact me
Word Count Example
The Word Count Example is the standard example for Hadoop and provided here in Java or here in python with Usage of Streaming. In the directory /project/etp/etphadoop/examples/WordCount you find two scripts for sending a WordCount job in Java or python. The Java WordCount example uses the examples jar given from Hadoop, the python examples uses Hadoop Streaming. The job mapper will parse through all the text files in the given input directory on HDFS an send a key-value-pair with the word as the key and always 1 as the value. Hadoop will send pairs with the same key always to the same reducer. The reducer then simply sums up all the keys he gets.
In the directory /project/etp/etphadoop/examples/RootOnHadoop you find RootOnHadoop.jar, which is needed for sending a root job to the Hadoop cluster. The general function is described above. You can rebuild the Jar-package with the build.sh script, but usually it is not necessary.
:/project/etp/etphadoop/examples/RootOnHadoop$ ./build.sh ....building.... :/project/etp/etphadoop/examples/RootOnHadoop$ example-run.sh
The example script will start a simple Hadoop Job, where a single root file is checked and analyzed. You can change the example-run.sh for running your own root analysis on this file. Change the variables for the mapper, reducer, data input and output to your needs. Keep in mind, that the data must be uploaded to the HDFS first like described here. The RootOnHadoop software will start the map script with the filename as first and only command line parameter, like:
You can simply use $1 in your map script to use the given file name. If you specified a directory as input, all the files in the directory will be processed. You can use wildcards, e.g. *.root for all root files in a directory. After your map script has finished, the RootOnHadoop software will search for output of your script and uploads it to your HDFS output directory. Rename your result files to a file with a ".out" extension (e.g. analyses_1337_with_specials.$ID.out) and the RootOnHadoop software will take care. Also the stdout of your script will be saved on the HDFS in file $map_number.map.out in the specified output directory.
Root Analyses using dcache and Hadoop Streaming
In the directory /project/etp/etphadoop/examples/Streaming you find a script (example-run.sh) for sending a streaming job to the cluster. The job will process data from dcache with a given root Analyses. It will use the mapper "runlocal_dcap_streaming.sh" and the reducer "runlocal_reduce_dcap_streaming.sh". Hadoop Streaming gives you a stdin/stdout you know from your Linux. In this case, the Hadoop job reads only a list of filenames on dcache and sends it via stdin to the mapper. The mapper is reading the filename from stdin and starting the root script and sends some information to the web front end, the reporter:
while read line; do echo "reporter:status:Starting analysis">&2 root -l -n -b -q -x "/project/etp/etphadoop/examples/Analysis_HiggsTrigger_v2.0.3/runD3PDSelectorDCAP.C(\"$line#physics\")" 1>&2 echo "reporter:counter:Analyse Counter,Number of analyzed files,1">&2 ... done
With the reporter, you can send status information or heartbeats from your job, otherwise after 10 minutes, your job will be killed because of a timeout. After the root analyses, the mapper uploads the results to the HDFS and sends a key with the filename on the hdfs to the reducer. The reducer collects all keys and simply merges the root files with hadd. Like in the RootOnHadoop example above, you can change the example-run.sh for your own purpose by changing the variable for the mapper, reducer, data input and output. But instead of a directory with data on the HDFS, only a simply filelist has to be provided as input. You can obtain the filenames by reading from stdin like this:
while read filename; do echo $filename #this variable $filename contains the filename done
Also, you have to upload your results (e.g. output.root) to the HDFS yourself. Trying to copy a non existing file to the HDFS will provoke an error and possibly stopping your script and by that crashing your job. You can handle this e.g. like this:
if [ -f output.root ] then hadoop fs -put output.root $HDFS_DIR/$ID.$COUNTER.output.root fi
You can check the example filelist either in /project/etp/etphadoop/examples/Streaming or in the example input directory on the HDFS:
#This filelist contains 350 filelinks! hadoop dfs -cat /user/examples/Streaming/filelist | head