A simple Hadoop with Z2 sample

This sample is an adaptation of the classical Wordcount sample in the Z2 context. This sample is supposed to show you how Hadoop can be used from within Z2 and in particular how to write Map/Reduce jobs in that context.

Note #1: This sample is made to be run on Linux or Mac-OS. Supposedly it is possible to run Hadoop on Windows. Sorry, but we have not been able to adapt the sample yet. A machine with 8GB of RAM should be sufficient.
Note #2: For your convenience everything in this sample assumes you use Eclipse. As such, that is of course no prerequisite to running the software, but it just makes everything much more integrated for now. Please have Eclipse ready and the Eclipsoid installed. See How to install Eclipsoid.

This sample is provided via the repository z2-samples-hadoop-basic.

Prerequisites

This sample makes use of the Hadoop add-on that is based on Cloudera's CDH4 distribution of Hadoop. As client access is version dependent, so is the sample. In order to simplify this for you, there is a pre-configured CDH4 distribution available to you from this site. Apart from its development style configuration (i.e. no security), this is anyway the way we prefer to install Hadoop and friends: Just one root installation folder, one OS user, one log folder etc.

Please follow the procedure described here: Install prepacked CDH4.

To use with this sample, it is most convenient, if you clone and configure the CDH4 install next to your Eclipse workspace and the sample repository clone.

Z2 has the following Java Version requirements

Z2 Version Min. Java version required Max Java version supported
2.1 - 2.3.1 Java 6 Java 7
2.4 - master Java 8 Java 8

Note: Most samples suggest to use the master branch. You may choose another version branch (please check the respective repository).
Make sure you have a corresponding Java Development Kit (JDK) or Java Runtime Environment (JRE) installed. If in doubt, go to Download Java SE.

Note: Running v2.1-v2.3.1 on Java 8 is supported by specifying

com.zfabrik.java.level=7

(or 6, if that is your desired compilation language level) in <home>/run/bin/runtime.properties. By this the Java compiler version detection does not fall back to a lower level.

Setting up the sample

From here on, the sample is run like all samples, that is, following How to run a sample.

Assuming everything (including the z2 core and the CDH4 setup) is under install and your workspace is in install/workspace please clone z2-samples-hadoop-basic under install as well. Either from the command line as

cd install
git clone -b master http://git.z2-environment.net/z2-samples.hadoop-basic

or from within Eclipse using the Git repositories view (but make sure the folder is right next to your z2-base.core clone).

You should have an Eclipse workspace and next to it z2-samples.hadoop-basic, z2-samples.cdh4-base, and z2-base.core. Import all projects into your workspace.

We assume that you followed the steps in Install prepacked CDH4 and Hadoop is running (we do not need HBase in this case).

Running the sample

Starting Z2.

Use the Eclipse launcher or start from the command line. The first time this will take a short moment. When up, we want to first write a file into the Hadoop file system that we are going to split into words and count their occurences later on.

Loading data

If you want to load some file you already have at hand, use the "copyFromLocal" operation to copy it into /hadoop-wordcount/input. E.g. if the file is called myfile.txt go into the CDH4 install and run

. ./env.sh 
hadoop fs -mkdir /hadoop-wordcount
hadoop fs -copyFromLocal myfile.txt /hadoop-wordcount/input

(the env.sh call is only required once per shell session).

Alternatively there is a z2Unit test (see How to z2Unit) that you can invoke to generate some input. As that is interesting on its own right, here is how that is done.

You should have all the projects, in particular com.zfabrik.samples.hadoop-basic.wordcount already in your workspace. Otherwise import them from the repository you cloned previously.

Use Eclipsoid to resolve all required compile dependency (Alt-R or click on the right Z in the toolbar), if you have not done so already.

Look for the type WriteWordsFile (Ctrl+Shift+T).

The method writeWordsFile will write a file of 100 million words in lines containing between 1 and 9 words each (but you can change that of course). Invoke it by right-clicking and "Run as / JUnit test". If you want to play around with the settings, simply change the code, synchronize Z2 (Alt-Y or click on the left Z in the toolbar) and rerun.

The interesting piece about this code is how it is connecting to HFDS:

...
    @Test
    public void writeWordsFile() throws Exception {
        FileSystem fs = FileSystem.get(IComponentsLookup.INSTANCE.lookup(WordCountMRJob.CONFIG, Configuration.class));
        fs.delete(WordCountMRJob.INPUT_PATH, true);
        fs.mkdirs(WordCountMRJob.INPUT_PATH.getParent());
...

Here, the actual connection configuration, one of Hadoop's XML configuration files, is looked up from a Z2 component called com.zfabrik.samples.hadoop-basic.wordcount/nosql. The component type for that is defined by the Hadoop integration module com.zfabrik.hadoop of the Hadoop add-on.

The purpose of this is to separate the client configuration information from the using implementation. We will see another application of that below.

So now we assume you have the input file uploaded or generated in HDFS and we turn to a Map/Reduce job that counts the number of occurances of single words.

Running the WordCount Map/Reduce Job

There is two ways of doing that.

The generic, interactive, method is to open http://localhost:8080/z_hadoop use (z*/z by default) and schedule or run the job com.zfabrik.samples.hadoop-basic.wordcount/wordcount with the remote connectivity config above. If you choose schedule, the web site will not wait for the job completion, otherwise it will wait for the job and keep displaying it progress. Alternatively to watching the job progress from there, you can go to Yarn's Nodemanager at http://localhost:8088.

Once the job has completed, the results are HDFS at /hadoop-wordcount/output. On the shell where CDH4 was installed run

hadoop fs -cat /hadoop-wordcount/output/*

To make things more interesting, there is another method to run the Job: Programmatically from a z2Unit test. Look for the type CountWords (Ctrl-Shift-T) and "Run as / JUnit test". This will wait for the job and log its progress and finally its results to the Z2 console.

Here's the relevant code fragments:

    @Test
    public void countWords() throws Exception {
                // get the config
                Configuration c = getConfiguration();

        // prepare the fs.

        // <taken out>

        // get the job configurator and configure it
        IJobConfigurator jc = IComponentsLookup.INSTANCE.lookup("com.zfabrik.samples.hadoop-basic.wordcount/wordcount",IJobConfigurator.class);
        jc.configure(c);
        // submit the job
        Job j = jc.submit();

        // wait for it to complete and log progress

        // <taken out>

    }

The general principle is the following: When you need to run a Map/Reduce job from your application, which is actually the typical case in our experience, you proceed as follows:

  1. Do anything you need to prepare before the execution.
  2. Get the client config
  3. Retrieve the "Job Main class" (see Hadoop add-on, IMapReduceJob).
  4. Call configure to retrieve a configured Job object
  5. Submit the job.
  6. If you need to, wait for the job to finish.

Let's have a look at the job's main class.

The job implementation

The WordCount M/R job is implemented in WordCountMRJob.

Here are the relevant code fragments:

In its configure method, the job sets all the relevant job config given a client configuration. This is pretty much as always in Hadoop, with the difference that you do not specify task classes (map, combine, reduce). Instead, Z2 will set those to generic implementations that make sure the actual implementations run in the right context.

public Job configure(Configuration configuration) throws Exception {
    // create the job instance 
    Job job = Job.getInstance(configuration, name);

    // configure all the input and output types
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    // and where stuff is coming from and where it is going in the end
    FileInputFormat.setInputPaths(job, INPUT_PATH);
    FileOutputFormat.setOutputPath(job, OUTPUT_PATH);

    // if the output already exists, delete it
    FileSystem fs = FileSystem.get(configuration);
    fs.delete(WordCountMRJob.OUTPUT_PATH, true);

    // do not set mapper, reducer, or combiner classes, as that is done by the Hadoop integration
    return job;
}

All the rest is really just plumbing.

    /*
     * During the life cycle of the Job these questions will be asked: 
     */
    @Override
    public Reducer<Text, IntWritable, Text, IntWritable> getCombiner(Configuration configuration) { return new WordCountReducer();     }
    @Override
    public Mapper<LongWritable, Text, Text, IntWritable> getMapper(Configuration configuration) {return new WordCountMapper();}
    @Override
    public Reducer<Text, IntWritable, Text, IntWritable> getReducer(Configuration configuration) { return new WordCountReducer(); }
    @Override
    public boolean hasCombiner() { return true; }
    @Override
    public boolean hasMapper() { return true; }
    @Override
    public boolean hasReducer() { return true; };

and the mapper implementation WordCountMapper and the reducer implementation WordCountReducer are just doing what the word count sample always does:

  1. When reading a line of text, the mapper splits it into words and emits (<word>,1) for every word.
  2. The combiner and reducer get a sequence of counts per word, (<word>,(<count_i>)_i) and emit (<word>, sum(<count_i>,i))

The whole example is of course not practically usable and in terms of using the data structures it tells very little. The sample Sample-hbase-mail-digester is much more interesting in those respects.

Summary

In real world applications Map/Reduce jobs are just one part of the application scenario. In particular they do usually need access to domain types, if not other application services and even access to other databases. That is one element of the Hadoop integration: Provide first-class application component support.

Secondly, jobs may get triggered based on application state changes or - for example - time based events that are evaluated by an application. That's why it is so important to be able to trigger jobs programmatically - much more so than manually from the command line (as much as that may be useful for demos and testing). That is the other part of the Hadoop integration: Provide an abstraction to programmatic job execution that respects modularity and abstraction of connectivity configuration.

Please check out Hadoop add-on to learn more about what is happening behind the scenes.