Project

General

Profile

Sample-hadoop-basic » History » Version 13

Henning Blohm, 20.09.2012 18:10

1 1 Henning Blohm
h1. A simple Hadoop with Z2 sample
2
3 2 Henning Blohm
This sample is an adaptation of the classical Wordcount sample in the Z2 context. This sample is supposed to show you how Hadoop can be used from within Z2 and in particular how to write Map/Reduce jobs in that context. 
4
5 9 Henning Blohm
*Note #1:* This sample is made to be run on Linux or Mac-OS. Supposedly it is possible to run Hadoop on Windows. Sorry, but we have not been able to adapt the sample yet. A machine with 8GB of RAM should be sufficient.
6 4 Henning Blohm
*Note #2:* For your convenience everything in this sample assumes you use Eclipse. That as such is of course no prerequisite to running the software, but it just makes everything much more integrated for now. Please have Eclipse ready and the Eclipsoid installed. See [[How to install Eclipsoid]]. 
7 1 Henning Blohm
8 4 Henning Blohm
This sample is provided by the repository "z2-samples-hadoop-basic":http://redmine.z2-environment.net/projects/z2-samples/repository/z2-samples-hadoop-basic.
9 3 Henning Blohm
10 2 Henning Blohm
h2. Prerequisites
11
12 1 Henning Blohm
This sample makes use of the [[Hadoop add-on]] that is based on Cloudera's CDH4 distribution of Hadoop. As client access is version dependent, so is the sample. In order to simplify this for you, there is a pre-configured CDH4 distribution available to you from this site. Apart from its development style configuration (i.e. no security), this is anyway the way we prefer to install Hadoop and friends: Just one root installation folder, one OS user, one log folder etc.
13
14 9 Henning Blohm
Please follow the procedure described here: [[Install prepacked CDH4]].
15 3 Henning Blohm
16
To use with this sample, it is most convenient, if you clone and configure the CDH4 install next to your Eclipse workspace and the sample repository clone.
17
18 1 Henning Blohm
h2. Setting up the sample
19
20 4 Henning Blohm
From here on, the sample is run like all samples, that is, following [[How to run a sample]].
21
22
Assuming everything (including the CDH4 setup) is under *install* and your workspace in in *install/workspace* please clone "z2-samples-hadoop-basic":http://redmine.z2-environment.net/projects/z2-samples/repository/z2-samples-hadoop-basic under *install* as well. Either from the command line as 
23
24
<pre><code class="ruby">
25
cd install
26
git clone -b master http://git.z2-environment.net/z2-samples.hadoop-basic
27
</code></pre>
28
29
or from within Eclipse using the Git repositories view (but make sure the folder is right next to your z2-base.core clone).
30
31
You should have an Eclipse workspace and next to it *z2-samples.hadoop-basic*, *z2-samples.cdh4-base*, and *z2-base.core*. Import all projects into your workspace.
32
33 10 Henning Blohm
We assume that you followed the steps in [[Install prepacked CDH4]] and Hadoop is running (we do not need HBase in this case).
34 5 Henning Blohm
35
h2. Running the sample
36
37 11 Henning Blohm
h3. Starting Z2. 
38 5 Henning Blohm
39 11 Henning Blohm
Use the Eclipse launcher or start from the command line. The first time this will take a short moment. When up, we want to first write a file into the Hadoop file system that we are going to split into words and count their occurences later on. 
40 1 Henning Blohm
41 11 Henning Blohm
h3. Loading data
42
43 5 Henning Blohm
If you want to load some file you already have at hand, use the "copyFromLocal" operation to copy it into */hadoop-wordcount/input*. E.g. if the file is called *myfile.txt* go into the CDH4 install and run
44
45
<pre><code class="ruby">
46
. ./env.sh 
47
hadoop fs -copyFromLocal myfile.txt /hadoop-wordcount/input
48
</code></pre>
49
50
(the env.sh call is only required once per shell session).
51
52
Alternatively there is a z2Unit test (see [[How to z2Unit]]) that you can invoke to generate some input. As that is interesting on its own right, here is how that is done.
53
54
You should already have all the projects, in particular *com.zfabrik.samples.hadoop-basic.wordcount* already in your workspace. Otherwise import them from the repository you cloned previously.
55
56
You Eclipsoid to resolve all required compile dependency (Alt-R or click on the right Z in the toolbar), if you have not done so already.
57
58
Look for the type *WriteWordsFile* (Ctrl+Shift+T). 
59
60
The method *writeWordsFile* will write a file of 100 million words in lines containing between 1 and 9 words each (but you can change that of course). Invoke it by right-clicking and "Run as / JUnit test". If you want to play around with the settings, simply change than, synchronize z2 (Alt-Y or click on the left Z in the toolbar) and rerun.
61
62
The interesting piece about this code is how it is connecting to HFDS:
63
64
<pre><code class="java">
65 6 Henning Blohm
...
66
	@Test
67
	public void writeWordsFile() throws Exception {
68
		FileSystem fs = FileSystem.get(IComponentsLookup.INSTANCE.lookup(WordCountMRJob.CONFIG, Configuration.class));
69
		fs.delete(WordCountMRJob.INPUT_PATH, true);
70
		fs.mkdirs(WordCountMRJob.INPUT_PATH.getParent());
71
...
72 1 Henning Blohm
</code></pre>
73 6 Henning Blohm
74 8 Henning Blohm
Here, the actual connection configuration, one of Hadoop's XML configuration files, is looked up from a Z2 component called "com.zfabrik.samples.hadoop-basic.wordcount/nosql":http://redmine.z2-environment.net/projects/z2-samples/repository/z2-samples-hadoop-basic/revisions/master/show/com.zfabrik.samples.hadoop-basic.wordcount/nosql. The component type for that is defined by the Hadoop integration module "com.zfabrik.hadoop" of the [[Hadoop add on]]. 
75 1 Henning Blohm
76
The purpose of this is to separate the client configuration information from the using implementation. We will see another application of that below.
77
78 11 Henning Blohm
So now we assume you have the input file uploaded or generated in HDFS and we turn to a Map/Reduce job that counts the number of occurances of single words.
79
80
h3. Running the WordCount Map/Reduce Job
81
82
There is two ways of doing that. 
83
84
The generic, interactive, method is to open http://localhost:8080/z_hadoop use (z*/z by default) and schedule or run the job *com.zfabrik.samples.hadoop-basic.wordcount/wordcount* with the remote connectivity config above. If you choose schedule, the web site will not wait for the job completion, otherwise it will wait for the job and keep displaying it progress. Alternatively to watching the job progress from there, you can go to Yarn's Nodemanager at http://localhost:8088.
85
86
Once the job has completed, the results are HDFS at */hadoop-wordcount/output*. On the shell where CDH4 was installed run 
87
88
<pre><code class="ruby">
89
hadoop fs -cat /hadoop-wordcount/output/*
90
</code></pre>
91
92
To make things more interesting, there is another method to run the Job: Programmatically from a z2Unit test. Look for the type "CountWords":http://redmine.z2-environment.net/projects/z2-samples/repository/z2-samples-hadoop-basic/revisions/master/entry/com.zfabrik.samples.hadoop-basic.wordcount/java/src.test/com/zfabrik/samples/hadoop_basic/test/CountWords.java (Ctrl-Shift-T) and "Run as / JUnit test". This will wait for the job and log its progress and finally its results to the Z2 console. 
93
94
Here's the relevant code fragments:
95
96
<pre><code class="java">
97
	@Test
98
	public void countWords() throws Exception {
99
                // get the config
100
                Configuration c = getConfiguration();
101
102
		// prepare the fs.
103
104
		// <taken out>
105
		
106
		// get the job configurator and configure it
107
		IJobConfigurator jc = IComponentsLookup.INSTANCE.lookup("com.zfabrik.samples.hadoop-basic.wordcount/wordcount",IJobConfigurator.class);
108
		jc.configure(c);
109
		// submit the job
110
		Job j = jc.submit();
111
		
112
		// wait for it to complete and log progress
113
114
		// <taken out>
115
116
	}
117
</code></pre>
118
119
The general principle is the following: When you need to run a Map/Reduce job from your application, which is actually the typical case in our experience, you proceed as follows:
120
121
# Do anything you need to prepare before the execution.
122
# Get the client config
123 12 Henning Blohm
# Retrieve the "Job Main class" (see [[Hadoop add-on]], "IMapReduceJob":http://www.z2-environment.net/javadoc/com.zfabrik.hadoop!2Fjava/api/com/zfabrik/hadoop/job/IMapReduceJob.html).
124 11 Henning Blohm
# Call configure to retrieve a configured Job object
125
# Submit the job.
126 1 Henning Blohm
# If you need to, wait for the job to finish.
127 12 Henning Blohm
128
Let's have a look at the job's main class.
129
130
h2. The job implementation
131
132
The WordCount M/R job is implemented in "WordCountMRJob":http://redmine.z2-environment.net/projects/z2-samples/repository/z2-samples-hadoop-basic/revisions/master/entry/com.zfabrik.samples.hadoop-basic.wordcount/java/src.impl/com/zfabrik/samples/hadoop_basic/impl/WordCountMRJob.java. 
133
134
Here are the relevant code fragments:
135 13 Henning Blohm
136
In its <code>configure</code> method, the job sets all the relevant job config given a client configuration. This is pretty much as always in Hadoop, with the difference that you do not specify task classes (map, combine, reduce). Instead, Z2 will set those to generic implementations that make sure the actual implementations run in the right context.
137
138
<pre><code class="java">
139
public Job configure(Configuration configuration) throws Exception {
140
	// create the job instance 
141
	Job job = Job.getInstance(configuration, name);
142
143
	// configure all the input and output types
144
	job.setOutputKeyClass(Text.class);
145
	job.setOutputValueClass(IntWritable.class);
146
	job.setInputFormatClass(TextInputFormat.class);
147
	job.setOutputFormatClass(TextOutputFormat.class);
148
149
	// and where stuff is coming from and where it is going in the end
150
	FileInputFormat.setInputPaths(job, INPUT_PATH);
151
	FileOutputFormat.setOutputPath(job, OUTPUT_PATH);
152
		
153
	// if the output already exists, delete it
154
	FileSystem fs = FileSystem.get(configuration);
155
	fs.delete(WordCountMRJob.OUTPUT_PATH, true);
156
		
157
	// do not set mapper, reducer, or combiner classes, as that is done by the Hadoop integration
158
	return job;
159
}
160
</code></pre>
161
162
All the rest is really just plumbing.
163
164
<pre><code class="java">
165
	/*
166
	 * During the life cycle of the Job these questions will be asked: 
167
	 */
168
	@Override
169
	public Reducer<Text, IntWritable, Text, IntWritable> getCombiner(Configuration configuration) { return new WordCountReducer(); 	}
170
	@Override
171
	public Mapper<LongWritable, Text, Text, IntWritable> getMapper(Configuration configuration) {return new WordCountMapper();}
172
	@Override
173
	public Reducer<Text, IntWritable, Text, IntWritable> getReducer(Configuration configuration) { return new WordCountReducer(); }
174
	@Override
175
	public boolean hasCombiner() { return true; }
176
	@Override
177
	public boolean hasMapper() { return true; }
178
	@Override
179
	public boolean hasReducer() { return true; };
180
</code></pre>
181
182
and the mapper implementation WordCountMapper and the reducer implementation WordCountReducer are just doing what the word count sample always does:
183
184
# When reading a line of text, the mapper splits it into words and emits (<word>,1) for every word.
185
# The combiner and reducer get a sequence of counts per word, (<word>,(<count_i>)_i) and emit (<word>, sum(<count_i>,i))
186
187
The whole example is of course not practically usable and in terms of using the data structures it tells very little. The sample [[Sample-hbase-full-stack-TBD]] is much more interesting in those respects.
188
189
h2. Summary
190
191
In real world applications Map/Reduce jobs are just one part of the application scenario. In particular they do usually need access to domain types, if not other application services and even access to other databases. That is one element of the Hadoop integration: Provide first-class application component support.
192
193
Secondly, jobs may get triggered based on application state changes or - for example - time based events that are evaluated by an application. That's why it is so important to be able to trigger jobs programmatically - much more so than manually from the command line (as much as that may be useful for demos and testing). That is the other part of the Hadoop integration: Provide an abstraction to programmatic job execution that respects modularity and abstraction of connectivity configuration.