Hadoop学习笔记一

内容概要: Hadoop安装, 数据集下载, MapReduce实例

Hadoop官方提供的例子WordCount可以很好的诠释Hadoop应用执行流程, 具体代码参考官方文档

以下图片来源于Mike Pluta

HadoopDataAndProcessFlowOfWordCount

1. Hadoop安装

1.1 下载安装
1.首先下载相应的Hadoop版本
2.解压到本地文件夹, 例如:/opt/hadoop
3.配置环境变量到/etc/profile
4.验证安装是否成功

1
2
3
4
5
6
7
8
9
10
11
12
# 配置环境Hadoop变量
export HADOOP_INSTALL=/opt/hadoop/hadoop-3.0.0-alpha4
export PATH=$PATH:$HADOOP_INSTALL/bin:$HADOOP_INSTALL/sbin

# 验证是否安装成功
$ hadoop version
Hadoop 3.0.0-alpha4
Source code repository https://git-wip-us.apache.org/repos/asf/hadoop.git -r e324cf8a2a6e55e996414ff281fee757f09d8172
Compiled by andrew on 2017-06-30T01:52Z
Compiled with protoc 2.5.0
From source with checksum 74491a36456845ab59719bc761659d3
This command was run using /opt/hadoop/hadoop-3.0.0-alpha4/share/hadoop/common/hadoop-common-3.0.0-alpha4.jar

1.2 配置安装模式
参考安装配置文档
1.单机模式
2.伪分布式模式
3.完全分布式模式


2. 数据集下载

2.1 下载数据集
下面的shell脚本来源于网络

1
2
3
4
5
6
7
8
9
#!/bin/bash

# 指定存储下载文件位置
cd $1
#cd /home/jay/hadoop/dataset/
for i in $(seq 1901 2017)
do
wget -r -np -nH --cut-dirs=3 ftp://ftp.ncdc.noaa.gov/pub/data/noaa/$i/
done

运行之后得到数据集:

1
2
$ ls
1901 1902 1903 1904 1905 1906 1907 1908 1909 1910 1911 1912 ...

2.2 清理数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 制定下载的文件存储的位置
echo $1

for file in ./*
do
if test -f $file
then
echo $file "is file"
else
fn=$(basename $file)
echo $fn "is directory"
for fl in $file/*
do
if test -f $fl
then
dats=$(basename $fl)
echo "unzip" $dats under $fn
gunzip -c $1/${fl} >> ${fn}.txt
fi
done
fi
done

文件加压处理后非常大:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
$ du -ah
1.2M ./1920.txt
204M ./1941.txt
868K ./1901.txt
872K ./1902.txt
868K ./1903.txt
868K ./1904.txt
868K ./1905.txt
728K ./1906.txt
728K ./1907.txt
876K ./1908.txt
1004K ./1909.txt
1016K ./1910.txt
1.1M ./1911.txt
1004K ./1912.txt
1.2M ./1913.txt
1.2M ./1914.txt
1.2M ./1915.txt
304K ./1916.txt
1.2M ./1917.txt
1.2M ./1918.txt
1.1M ./1919.txt
1.2M ./1921.txt
1.2M ./1922.txt
1.1M ./1923.txt
1.0M ./1924.txt
1.1M ./1925.txt
1.5M ./1926.txt
676K ./1927.txt
1.2M ./1928.txt
5.3M ./1929.txt
15M ./1930.txt
42M ./1931.txt
67M ./1932.txt
71M ./1933.txt
73M ./1934.txt
99M ./1935.txt
123M ./1936.txt
150M ./1937.txt
129M ./1938.txt
131M ./1939.txt
314M ./1942.txt
602M ./1943.txt
772M ./1944.txt
989M ./1945.txt
640M ./1946.txt
654M ./1947.txt
1.5G ./1948.txt
1.4G ./1949.txt
919M ./1950.txt
2.8G ./1952.txt
3.0G ./1953.txt
3.0G ./1954.txt
2.7G ./1955.txt
467M ./1956.txt
500M ./1959.txt
3.0G ./1960.txt
1.8G ./1961.txt
654M ./1964.txt
3.4G ./1983.txt
2.7G ./1990.txt
2.9G ./2010.txt
35G .

2.3 找出历年温度极值: awk脚本实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#!/bin/bash

# 跳转到待处理文件的位置
cd $1
echo "Year" "Max" " Min"

#cd /home/jay/hadoop/temperature/input/
for file in ./*.txt
do
name=$(basename $file)
echo -e ${name%%.*} "\c"
awk '{temp=substr($0,88,5)+0; quality=substr($0,93,1); if(temp!=9999 && quality~/[01459]/ && temp>max) max=temp; if(temp!=9999 && quality~/[01459]/ && temp<min) min=temp;}END{print max,'\t',min}' $name

done

运行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
$ time bash maxTemp.sh 
Year Max Min
1901 317 -333
1902 244 -328
1903 289 -306
1904 256 -294
1905 283 -328
1906 294 -250
1907 283 -350
1908 289 -378
1909 278 -378
1910 294 -372
1911 306 -378
1912 322 -411
1913 300 -372
1914 333 -378
1915 294 -411
1916 278 -289
1917 317 -478
1918 322 -450
1919 378 -428
1920 294 -344
1921 283 -417
1922 278 -400
1923 294 -394
1924 294 -456
1925 317 -378
1926 51199 -422
1927 489 -400
1928 378 -239
1929 328 -400
1930 400 -311
1931 461 -450
1932 489 -500
1933 489 -489
1934 478 -489
1935 478 -517
1936 550 -544
1937 478 -428
1950 494 -639
1960 428 -589

real 0m2.406s
user 0m2.060s
sys 0m0.232s


3. MapReduce实例

此程序的主要功能是分析上述温度数据, 从中找出历年温度极值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class MaxTemperature{
public static void main(String[] ar) throws IOException{
if (ar.length != 2){
System.out.println("Usage Error!");
System.exit(-1);
}
JobConf conf = new JobConf(MaxTemperature.class);
conf.setJobName("Max Temperature");

FileInputFormat.addInputPath(conf, new Path(ar[0]));
FileOutputFormat.setOutputPath(conf, new Path(ar[1]));

conf.setMapperClass(TemperatureMapper.class);
conf.setReducerClass(TemperatureReducer.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
JobClient.runJob(conf);
}
}


/***
*
*/
public class TemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>{
private static final int MISSING = 9999;

public void map(LongWritable arg0, Text arg1, OutputCollector<Text, IntWritable> arg2, Reporter arg3)
throws IOException{
String line = arg1.toString();
String year = line.substring(15, 19);

int airTemperature;

if (line.charAt(97) == '+'){
airTemperature = Integer.parseInt(line.substring(88, 92));
}
else{
airTemperature = Integer.parseInt(line.substring(87, 92));
}

String quality = line.substring(92, 93);
if(airTemperature != MISSING && quality.matches("[01459]")){
arg2.collect(new Text(year), new IntWritable(airTemperature));
}
}
}


/***
*
*/

public class TemperatureReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable>{

public void reduce(Text arg0, Iterator<IntWritable> arg1, OutputCollector<Text, IntWritable> arg2, Reporter arg3)
throws IOException{
int maxValue = Integer.MIN_VALUE;
while (arg1.hasNext()){
maxValue = Math.max(maxValue, arg1.next().get());
}
arg2.collect(arg0, new IntWritable(maxValue));
}
}

然后mvn clean install编译打包, hadoop执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# maven打包
$ mvn clean install

# input文件夹为输入数据
$ ls input/

# hadoop执行
$ export HADOOP_CLASSPATH=target/hadp-0.0.1-SNAPSHOT.jar
$ hadoop com.hadp.MaxTemperature input/ out/

# 或者使用hadoop jar命令执行
$ time hadoop jar hadp-0.0.1-SNAPSHOT.jar com.hadp.MaxTemperature input/ out/
...

2017-10-24 23:27:37,305 INFO mapreduce.Job: Counters: 30
File System Counters
FILE: Number of bytes read=58414808714
FILE: Number of bytes written=3649044055
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=6725106
Map output records=6360545
Map output bytes=57244905
Map output materialized bytes=69966397
Input split bytes=6700
Combine input records=0
Combine output records=0
Reduce input groups=39
Reduce shuffle bytes=69966397
Reduce input records=6360545
Reduce output records=39
Spilled Records=12721090
Shuffled Maps =67
Failed Shuffles=0
Merged Map outputs=67
GC time elapsed (ms)=1413
Total committed heap usage (bytes)=97800683520
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=1140691646
File Output Format Counters
Bytes Written=365

real 0m16.391s
user 0m21.824s
sys 0m1.288s