Project

General

Profile

Sample-hadoop-basic » History » Revision 17

Revision 16 (Udo Offermann, 21.09.2012 16:59) → Revision 17/20 (Henning Blohm, 03.01.2014 22:22)

h1. A simple Hadoop with Z2 sample 

 This sample is an adaptation of the classical Wordcount sample in the Z2 context. This sample is supposed to show you how Hadoop can be used from within Z2 and in particular how to write Map/Reduce jobs in that context.  

 *Note #1:* This sample is made to be run on Linux or Mac-OS. Supposedly it is possible to run Hadoop on Windows. Sorry, but we have not been able to adapt the sample yet. A machine with 8GB of RAM should be sufficient. 
 *Note #2:* For your convenience everything in this sample assumes you use Eclipse. As such, that is of course no prerequisite to running the software, but it just makes everything much more integrated for now. Please have Eclipse ready and the Eclipsoid installed. See [[How to install Eclipsoid]].  

 This sample is provided via the repository "z2-samples-hadoop-basic":http://redmine.z2-environment.net/projects/z2-samples/repository/z2-samples-hadoop-basic. 

 h2. Prerequisites 

 This sample makes use of the [[Hadoop add-on]] that is based on Cloudera's CDH4 distribution of Hadoop. As client access is version dependent, so is the sample. In order to simplify this for you, there is a pre-configured CDH4 distribution available to you from this site. Apart from its development style configuration (i.e. no security), this is anyway the way we prefer to install Hadoop and friends: Just one root installation folder, one OS user, one log folder etc. 

 Please follow the procedure described here: [[Install prepacked CDH4]]. 

 To use with this sample, it is most convenient, if you clone and configure the CDH4 install next to your Eclipse workspace and the sample repository clone. 

 h2. Setting up the sample 

 From here on, the sample is run like all samples, that is, following [[How to run a sample]]. 

 Assuming everything (including the z2 core and the CDH4 setup) is under *install* and your workspace is in *install/workspace* please clone "z2-samples-hadoop-basic":http://redmine.z2-environment.net/projects/z2-samples/repository/z2-samples-hadoop-basic under *install* as well. Either from the command line as  

 <pre><code class="ruby"> 
 cd install 
 git clone -b master http://git.z2-environment.net/z2-samples.hadoop-basic 
 </code></pre> 

 or from within Eclipse using the Git repositories view (but make sure the folder is right next to your z2-base.core clone). 

 You should have an Eclipse workspace and next to it *z2-samples.hadoop-basic*, *z2-samples.cdh4-base*, and *z2-base.core*. Import all projects into your workspace. 

 We assume that you followed the steps in [[Install prepacked CDH4]] and Hadoop is running (we do not need HBase in this case). 

 h2. Running the sample 

 h3. Starting Z2.  

 Use the Eclipse launcher or start from the command line. The first time this will take a short moment. When up, we want to first write a file into the Hadoop file system that we are going to split into words and count their occurences later on.  

 h3. Loading data 

 If you want to load some file you already have at hand, use the "copyFromLocal" operation to copy it into */hadoop-wordcount/input*. E.g. if the file is called *myfile.txt* go into the CDH4 install and run 

 <pre><code class="ruby"> 
 . ./env.sh  
 hadoop fs -mkdir /hadoop-wordcount 
 hadoop fs -copyFromLocal myfile.txt /hadoop-wordcount/input 
 </code></pre> 

 (the env.sh call is only required once per shell session). 

 Alternatively there is a z2Unit test (see [[How to z2Unit]]) that you can invoke to generate some input. As that is interesting on its own right, here is how that is done. 

 You should have all the projects, in particular *com.zfabrik.samples.hadoop-basic.wordcount* already in your workspace. Otherwise import them from the repository you cloned previously. 

 Use Eclipsoid to resolve all required compile dependency (Alt-R or click on the right Z in the toolbar), if you have not done so already. 

 Look for the type *WriteWordsFile* (Ctrl+Shift+T).  

 The method *writeWordsFile* will write a file of 100 million words in lines containing between 1 and 9 words each (but you can change that of course). Invoke it by right-clicking and "Run as / JUnit test". If you want to play around with the settings, simply change the code, synchronize Z2 (Alt-Y or click on the left Z in the toolbar) and rerun. 

 The interesting piece about this code is how it is connecting to HFDS: 

 <pre><code class="java"> 
 ... 
	 @Test 
	 public void writeWordsFile() throws Exception { 
		 FileSystem fs = FileSystem.get(IComponentsLookup.INSTANCE.lookup(WordCountMRJob.CONFIG, Configuration.class)); 
		 fs.delete(WordCountMRJob.INPUT_PATH, true); 
		 fs.mkdirs(WordCountMRJob.INPUT_PATH.getParent()); 
 ... 
 </code></pre> 

 Here, the actual connection configuration, one of Hadoop's XML configuration files, is looked up from a Z2 component called "com.zfabrik.samples.hadoop-basic.wordcount/nosql":http://redmine.z2-environment.net/projects/z2-samples/repository/z2-samples-hadoop-basic/revisions/master/show/com.zfabrik.samples.hadoop-basic.wordcount/nosql. The component type for that is defined by the Hadoop integration module *com.zfabrik.hadoop* of the [[Hadoop add-on]].  

 The purpose of this is to separate the client configuration information from the using implementation. We will see another application of that below. 

 So now we assume you have the input file uploaded or generated in HDFS and we turn to a Map/Reduce job that counts the number of occurances of single words. 

 h3. Running the WordCount Map/Reduce Job 

 There is two ways of doing that.  

 The generic, interactive, method is to open http://localhost:8080/z_hadoop use (z*/z by default) and schedule or run the job *com.zfabrik.samples.hadoop-basic.wordcount/wordcount* with the remote connectivity config above. If you choose schedule, the web site will not wait for the job completion, otherwise it will wait for the job and keep displaying it progress. Alternatively to watching the job progress from there, you can go to Yarn's Nodemanager at http://localhost:8088. 

 Once the job has completed, the results are HDFS at */hadoop-wordcount/output*. On the shell where CDH4 was installed run  

 <pre><code class="ruby"> 
 hadoop fs -cat /hadoop-wordcount/output/* 
 </code></pre> 

 To make things more interesting, there is another method to run the Job: Programmatically from a z2Unit test. Look for the type "CountWords":http://redmine.z2-environment.net/projects/z2-samples/repository/z2-samples-hadoop-basic/revisions/master/entry/com.zfabrik.samples.hadoop-basic.wordcount/java/src.test/com/zfabrik/samples/hadoop_basic/test/CountWords.java (Ctrl-Shift-T) and "Run as / JUnit test". This will wait for the job and log its progress and finally its results to the Z2 console.  

 Here's the relevant code fragments: 

 <pre><code class="java"> 
	 @Test 
	 public void countWords() throws Exception { 
                 // get the config 
                 Configuration c = getConfiguration(); 

		 // prepare the fs. 

		 // <taken out> 
		
		 // get the job configurator and configure it 
		 IJobConfigurator jc = IComponentsLookup.INSTANCE.lookup("com.zfabrik.samples.hadoop-basic.wordcount/wordcount",IJobConfigurator.class); 
		 jc.configure(c); 
		 // submit the job 
		 Job j = jc.submit(); 
		
		 // wait for it to complete and log progress 

		 // <taken out> 

	 } 
 </code></pre> 

 The general principle is the following: When you need to run a Map/Reduce job from your application, which is actually the typical case in our experience, you proceed as follows: 

 # Do anything you need to prepare before the execution. 
 # Get the client config 
 # Retrieve the "Job Main class" (see [[Hadoop add-on]], "IMapReduceJob":http://www.z2-environment.net/javadoc/com.zfabrik.hadoop!2Fjava/api/com/zfabrik/hadoop/job/IMapReduceJob.html). 
 # Call configure to retrieve a configured Job object 
 # Submit the job. 
 # If you need to, wait for the job to finish. 

 Let's have a look at the job's main class. 

 h2. The job implementation 

 The WordCount M/R job is implemented in "WordCountMRJob":http://redmine.z2-environment.net/projects/z2-samples/repository/z2-samples-hadoop-basic/revisions/master/entry/com.zfabrik.samples.hadoop-basic.wordcount/java/src.impl/com/zfabrik/samples/hadoop_basic/impl/WordCountMRJob.java.  

 Here are the relevant code fragments: 

 In its <code>configure</code> method, the job sets all the relevant job config given a client configuration. This is pretty much as always in Hadoop, with the difference that you do not specify task classes (map, combine, reduce). Instead, Z2 will set those to generic implementations that make sure the actual implementations run in the right context. 

 <pre><code class="java"> 
 public Job configure(Configuration configuration) throws Exception { 
	 // create the job instance  
	 Job job = Job.getInstance(configuration, name); 

	 // configure all the input and output types 
	 job.setOutputKeyClass(Text.class); 
	 job.setOutputValueClass(IntWritable.class); 
	 job.setInputFormatClass(TextInputFormat.class); 
	 job.setOutputFormatClass(TextOutputFormat.class); 

	 // and where stuff is coming from and where it is going in the end 
	 FileInputFormat.setInputPaths(job, INPUT_PATH); 
	 FileOutputFormat.setOutputPath(job, OUTPUT_PATH); 
		
	 // if the output already exists, delete it 
	 FileSystem fs = FileSystem.get(configuration); 
	 fs.delete(WordCountMRJob.OUTPUT_PATH, true); 
		
	 // do not set mapper, reducer, or combiner classes, as that is done by the Hadoop integration 
	 return job; 
 } 
 </code></pre> 

 All the rest is really just plumbing. 

 <pre><code class="java"> 
	 /* 
	  * During the life cycle of the Job these questions will be asked:  
	  */ 
	 @Override 
	 public Reducer<Text, IntWritable, Text, IntWritable> getCombiner(Configuration configuration) { return new WordCountReducer();  	 } 
	 @Override 
	 public Mapper<LongWritable, Text, Text, IntWritable> getMapper(Configuration configuration) {return new WordCountMapper();} 
	 @Override 
	 public Reducer<Text, IntWritable, Text, IntWritable> getReducer(Configuration configuration) { return new WordCountReducer(); } 
	 @Override 
	 public boolean hasCombiner() { return true; } 
	 @Override 
	 public boolean hasMapper() { return true; } 
	 @Override 
	 public boolean hasReducer() { return true; }; 
 </code></pre> 

 and the mapper implementation WordCountMapper and the reducer implementation WordCountReducer are just doing what the word count sample always does: 

 # When reading a line of text, the mapper splits it into words and emits (<word>,1) for every word. 
 # The combiner and reducer get a sequence of counts per word, (<word>,(<count_i>)_i) and emit (<word>, sum(<count_i>,i)) 

 The whole example is of course not practically usable and in terms of using the data structures it tells very little. The sample [[Sample-hbase-full-stack-TBD]] is much more interesting in those respects. 

 h2. Summary 

 In real world applications Map/Reduce jobs are just one part of the application scenario. In particular they do usually need access to domain types, if not other application services and even access to other databases. That is one element of the Hadoop integration: Provide first-class application component support. 

 Secondly, jobs may get triggered based on application state changes or - for example - time based events that are evaluated by an application. That's why it is so important to be able to trigger jobs programmatically - much more so than manually from the command line (as much as that may be useful for demos and testing). That is the other part of the Hadoop integration: Provide an abstraction to programmatic job execution that respects modularity and abstraction of connectivity configuration. 

 Please check out [[Hadoop add-on]] to learn more about what is happening behind the scenes.