Project

General

Profile

Sample-hadoop-basic » History » Version 20

Henning Blohm, 22.09.2015 21:08

1 1 Henning Blohm
h1. A simple Hadoop with Z2 sample
2
3 2 Henning Blohm
This sample is an adaptation of the classical Wordcount sample in the Z2 context. This sample is supposed to show you how Hadoop can be used from within Z2 and in particular how to write Map/Reduce jobs in that context. 
4
5 9 Henning Blohm
*Note #1:* This sample is made to be run on Linux or Mac-OS. Supposedly it is possible to run Hadoop on Windows. Sorry, but we have not been able to adapt the sample yet. A machine with 8GB of RAM should be sufficient.
6 14 Henning Blohm
*Note #2:* For your convenience everything in this sample assumes you use Eclipse. As such, that is of course no prerequisite to running the software, but it just makes everything much more integrated for now. Please have Eclipse ready and the Eclipsoid installed. See [[How to install Eclipsoid]]. 
7 1 Henning Blohm
8 14 Henning Blohm
This sample is provided via the repository "z2-samples-hadoop-basic":http://redmine.z2-environment.net/projects/z2-samples/repository/z2-samples-hadoop-basic.
9 3 Henning Blohm
10 2 Henning Blohm
h2. Prerequisites
11
12 1 Henning Blohm
This sample makes use of the [[Hadoop add-on]] that is based on Cloudera's CDH4 distribution of Hadoop. As client access is version dependent, so is the sample. In order to simplify this for you, there is a pre-configured CDH4 distribution available to you from this site. Apart from its development style configuration (i.e. no security), this is anyway the way we prefer to install Hadoop and friends: Just one root installation folder, one OS user, one log folder etc.
13
14 9 Henning Blohm
Please follow the procedure described here: [[Install prepacked CDH4]].
15 3 Henning Blohm
16
To use with this sample, it is most convenient, if you clone and configure the CDH4 install next to your Eclipse workspace and the sample repository clone.
17
18 20 Henning Blohm
{{include(Java Version Requirements)}}
19 19 Henning Blohm
20
21 1 Henning Blohm
h2. Setting up the sample
22
23 4 Henning Blohm
From here on, the sample is run like all samples, that is, following [[How to run a sample]].
24
25 16 Udo Offermann
Assuming everything (including the z2 core and the CDH4 setup) is under *install* and your workspace is in *install/workspace* please clone "z2-samples-hadoop-basic":http://redmine.z2-environment.net/projects/z2-samples/repository/z2-samples-hadoop-basic under *install* as well. Either from the command line as 
26 4 Henning Blohm
27
<pre><code class="ruby">
28
cd install
29
git clone -b master http://git.z2-environment.net/z2-samples.hadoop-basic
30
</code></pre>
31
32
or from within Eclipse using the Git repositories view (but make sure the folder is right next to your z2-base.core clone).
33
34
You should have an Eclipse workspace and next to it *z2-samples.hadoop-basic*, *z2-samples.cdh4-base*, and *z2-base.core*. Import all projects into your workspace.
35
36 10 Henning Blohm
We assume that you followed the steps in [[Install prepacked CDH4]] and Hadoop is running (we do not need HBase in this case).
37 5 Henning Blohm
38
h2. Running the sample
39
40 11 Henning Blohm
h3. Starting Z2. 
41 5 Henning Blohm
42 11 Henning Blohm
Use the Eclipse launcher or start from the command line. The first time this will take a short moment. When up, we want to first write a file into the Hadoop file system that we are going to split into words and count their occurences later on. 
43 1 Henning Blohm
44 11 Henning Blohm
h3. Loading data
45
46 5 Henning Blohm
If you want to load some file you already have at hand, use the "copyFromLocal" operation to copy it into */hadoop-wordcount/input*. E.g. if the file is called *myfile.txt* go into the CDH4 install and run
47
48
<pre><code class="ruby">
49
. ./env.sh 
50 17 Henning Blohm
hadoop fs -mkdir /hadoop-wordcount
51 5 Henning Blohm
hadoop fs -copyFromLocal myfile.txt /hadoop-wordcount/input
52
</code></pre>
53
54
(the env.sh call is only required once per shell session).
55
56
Alternatively there is a z2Unit test (see [[How to z2Unit]]) that you can invoke to generate some input. As that is interesting on its own right, here is how that is done.
57
58 14 Henning Blohm
You should have all the projects, in particular *com.zfabrik.samples.hadoop-basic.wordcount* already in your workspace. Otherwise import them from the repository you cloned previously.
59 5 Henning Blohm
60 14 Henning Blohm
Use Eclipsoid to resolve all required compile dependency (Alt-R or click on the right Z in the toolbar), if you have not done so already.
61 5 Henning Blohm
62
Look for the type *WriteWordsFile* (Ctrl+Shift+T). 
63
64 14 Henning Blohm
The method *writeWordsFile* will write a file of 100 million words in lines containing between 1 and 9 words each (but you can change that of course). Invoke it by right-clicking and "Run as / JUnit test". If you want to play around with the settings, simply change the code, synchronize Z2 (Alt-Y or click on the left Z in the toolbar) and rerun.
65 5 Henning Blohm
66
The interesting piece about this code is how it is connecting to HFDS:
67
68
<pre><code class="java">
69 6 Henning Blohm
...
70
	@Test
71
	public void writeWordsFile() throws Exception {
72
		FileSystem fs = FileSystem.get(IComponentsLookup.INSTANCE.lookup(WordCountMRJob.CONFIG, Configuration.class));
73
		fs.delete(WordCountMRJob.INPUT_PATH, true);
74
		fs.mkdirs(WordCountMRJob.INPUT_PATH.getParent());
75
...
76 1 Henning Blohm
</code></pre>
77 6 Henning Blohm
78 14 Henning Blohm
Here, the actual connection configuration, one of Hadoop's XML configuration files, is looked up from a Z2 component called "com.zfabrik.samples.hadoop-basic.wordcount/nosql":http://redmine.z2-environment.net/projects/z2-samples/repository/z2-samples-hadoop-basic/revisions/master/show/com.zfabrik.samples.hadoop-basic.wordcount/nosql. The component type for that is defined by the Hadoop integration module *com.zfabrik.hadoop* of the [[Hadoop add-on]]. 
79 1 Henning Blohm
80
The purpose of this is to separate the client configuration information from the using implementation. We will see another application of that below.
81
82 11 Henning Blohm
So now we assume you have the input file uploaded or generated in HDFS and we turn to a Map/Reduce job that counts the number of occurances of single words.
83
84
h3. Running the WordCount Map/Reduce Job
85
86
There is two ways of doing that. 
87
88
The generic, interactive, method is to open http://localhost:8080/z_hadoop use (z*/z by default) and schedule or run the job *com.zfabrik.samples.hadoop-basic.wordcount/wordcount* with the remote connectivity config above. If you choose schedule, the web site will not wait for the job completion, otherwise it will wait for the job and keep displaying it progress. Alternatively to watching the job progress from there, you can go to Yarn's Nodemanager at http://localhost:8088.
89
90
Once the job has completed, the results are HDFS at */hadoop-wordcount/output*. On the shell where CDH4 was installed run 
91
92
<pre><code class="ruby">
93
hadoop fs -cat /hadoop-wordcount/output/*
94
</code></pre>
95
96
To make things more interesting, there is another method to run the Job: Programmatically from a z2Unit test. Look for the type "CountWords":http://redmine.z2-environment.net/projects/z2-samples/repository/z2-samples-hadoop-basic/revisions/master/entry/com.zfabrik.samples.hadoop-basic.wordcount/java/src.test/com/zfabrik/samples/hadoop_basic/test/CountWords.java (Ctrl-Shift-T) and "Run as / JUnit test". This will wait for the job and log its progress and finally its results to the Z2 console. 
97
98
Here's the relevant code fragments:
99
100
<pre><code class="java">
101
	@Test
102
	public void countWords() throws Exception {
103
                // get the config
104
                Configuration c = getConfiguration();
105
106
		// prepare the fs.
107
108
		// <taken out>
109
		
110
		// get the job configurator and configure it
111
		IJobConfigurator jc = IComponentsLookup.INSTANCE.lookup("com.zfabrik.samples.hadoop-basic.wordcount/wordcount",IJobConfigurator.class);
112
		jc.configure(c);
113
		// submit the job
114
		Job j = jc.submit();
115
		
116
		// wait for it to complete and log progress
117
118
		// <taken out>
119
120
	}
121
</code></pre>
122
123
The general principle is the following: When you need to run a Map/Reduce job from your application, which is actually the typical case in our experience, you proceed as follows:
124
125
# Do anything you need to prepare before the execution.
126
# Get the client config
127 12 Henning Blohm
# Retrieve the "Job Main class" (see [[Hadoop add-on]], "IMapReduceJob":http://www.z2-environment.net/javadoc/com.zfabrik.hadoop!2Fjava/api/com/zfabrik/hadoop/job/IMapReduceJob.html).
128 11 Henning Blohm
# Call configure to retrieve a configured Job object
129
# Submit the job.
130 1 Henning Blohm
# If you need to, wait for the job to finish.
131 12 Henning Blohm
132
Let's have a look at the job's main class.
133
134
h2. The job implementation
135
136
The WordCount M/R job is implemented in "WordCountMRJob":http://redmine.z2-environment.net/projects/z2-samples/repository/z2-samples-hadoop-basic/revisions/master/entry/com.zfabrik.samples.hadoop-basic.wordcount/java/src.impl/com/zfabrik/samples/hadoop_basic/impl/WordCountMRJob.java. 
137
138
Here are the relevant code fragments:
139 13 Henning Blohm
140
In its <code>configure</code> method, the job sets all the relevant job config given a client configuration. This is pretty much as always in Hadoop, with the difference that you do not specify task classes (map, combine, reduce). Instead, Z2 will set those to generic implementations that make sure the actual implementations run in the right context.
141
142
<pre><code class="java">
143
public Job configure(Configuration configuration) throws Exception {
144
	// create the job instance 
145
	Job job = Job.getInstance(configuration, name);
146
147
	// configure all the input and output types
148
	job.setOutputKeyClass(Text.class);
149
	job.setOutputValueClass(IntWritable.class);
150
	job.setInputFormatClass(TextInputFormat.class);
151
	job.setOutputFormatClass(TextOutputFormat.class);
152
153
	// and where stuff is coming from and where it is going in the end
154
	FileInputFormat.setInputPaths(job, INPUT_PATH);
155
	FileOutputFormat.setOutputPath(job, OUTPUT_PATH);
156
		
157
	// if the output already exists, delete it
158
	FileSystem fs = FileSystem.get(configuration);
159
	fs.delete(WordCountMRJob.OUTPUT_PATH, true);
160
		
161
	// do not set mapper, reducer, or combiner classes, as that is done by the Hadoop integration
162
	return job;
163
}
164
</code></pre>
165
166
All the rest is really just plumbing.
167
168
<pre><code class="java">
169
	/*
170
	 * During the life cycle of the Job these questions will be asked: 
171
	 */
172
	@Override
173
	public Reducer<Text, IntWritable, Text, IntWritable> getCombiner(Configuration configuration) { return new WordCountReducer(); 	}
174
	@Override
175
	public Mapper<LongWritable, Text, Text, IntWritable> getMapper(Configuration configuration) {return new WordCountMapper();}
176
	@Override
177
	public Reducer<Text, IntWritable, Text, IntWritable> getReducer(Configuration configuration) { return new WordCountReducer(); }
178
	@Override
179
	public boolean hasCombiner() { return true; }
180
	@Override
181
	public boolean hasMapper() { return true; }
182
	@Override
183
	public boolean hasReducer() { return true; };
184
</code></pre>
185
186
and the mapper implementation WordCountMapper and the reducer implementation WordCountReducer are just doing what the word count sample always does:
187
188
# When reading a line of text, the mapper splits it into words and emits (<word>,1) for every word.
189
# The combiner and reducer get a sequence of counts per word, (<word>,(<count_i>)_i) and emit (<word>, sum(<count_i>,i))
190
191 18 Henning Blohm
The whole example is of course not practically usable and in terms of using the data structures it tells very little. The sample [[Sample-hbase-mail-digester]] is much more interesting in those respects.
192 1 Henning Blohm
193
h2. Summary
194 13 Henning Blohm
195
In real world applications Map/Reduce jobs are just one part of the application scenario. In particular they do usually need access to domain types, if not other application services and even access to other databases. That is one element of the Hadoop integration: Provide first-class application component support.
196
197
Secondly, jobs may get triggered based on application state changes or - for example - time based events that are evaluated by an application. That's why it is so important to be able to trigger jobs programmatically - much more so than manually from the command line (as much as that may be useful for demos and testing). That is the other part of the Hadoop integration: Provide an abstraction to programmatic job execution that respects modularity and abstraction of connectivity configuration.
198 14 Henning Blohm
199
Please check out [[Hadoop add-on]] to learn more about what is happening behind the scenes.