MapReduce

This is about mr

Posted by PsycheLee on 2015-09-08

MapReduce

MR作业部署

将MySQL的数据读取存到HDFS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class MySQLDBInputFormatDriver {

public static void main(String[] args) throws Exception {
String output = args[0];


Configuration configuration = new Configuration();
DBConfiguration.configureDB(configuration,
"com.mysql.jdbc.Driver",
"jdbc:mysql://hadoop001:3306/lxl",
"root",
"123456"
);

Job job = Job.getInstance(configuration);

job.setJarByClass(MySQLDBInputFormatDriver.class);

job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(DeptWritable.class);

String[] fieldNames = {"deptno", "dname", "loc"};
DBInputFormat.setInput(job,DeptWritable.class,"dept", "", "deptno", fieldNames);

FileSystem fileSystem = FileUtils.getFileSystem();
Path out = new Path(output);
if(fileSystem.exists(out)){
fileSystem.delete(out, true);

}

FileOutputFormat.setOutputPath(job, new Path(output));

final boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}

public static class MyMapper extends Mapper<LongWritable, DeptWritable, NullWritable, DeptWritable> {

@Override
protected void map(LongWritable key, DeptWritable value, Context context) throws IOException, InterruptedException {
context.write(NullWritable.get(), value);
}
}

}

重点:

  1. DBInputFormat.setInput设置读取的表,字段,条件,排序

  2. 将本地的应用打包

  3. 上传本地的jar, 然后在服务器上运行

    1
    hadoop jar /home/bigdata/lib/hdfs-1.0-SNAPSHOT.jar com.lxl.mr_04.MySQLDBInputFormatDriver hdfs://hadoop001:9000/data6

    可以传参,指定目录

  4. 报错 CLASSNOTFOUND, MySQL驱动包找不到

    解决:

    ​ 第一种: 在服务器上$HADOOP_HOME/share/hadoop/yarn/这个路径上传MySQL驱动包

1
2
3
4
5
6
7
8
[bigdata@hadoop001 yarn]$ ll
total 7184
...
-rw-r--r-- 1 bigdata bigdata 8982 Jun 3 2019 hadoop-yarn-server-tests-2.6.0-cdh5.16.2.jar
-rw-r--r-- 1 bigdata bigdata 39677 Jun 3 2019 hadoop-yarn-server-web-proxy-2.6.0-cdh5.16.2.jar
drwxr-xr-x 2 bigdata bigdata 4096 Jun 3 2019 lib
-rw-r--r-- 1 bigdata bigdata 1007502 Mar 13 18:49 mysql-connector-java-5.1.47.jar
...

​ 第二种: 打包时将依赖包全部打入jar包中, 再上传到服务器上运行

​ 在pom中添加maven打包插件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>

</configuration>
<executions>
<execution>
<id>make-assembly</id>
<!-- bind to the packaging phase -->
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>

第三种: 使用RunnerTool和libjars命令

**重点: **

1. 重写run方法, configuration通过getConf()获取,不能new

**2.使用命令: **

1
2
3
4
5
hadoop jar application.jar \
mainClass \
-libjars "thirdpartlib_1.jar,thirdpartlib_2.jar ..." \
-files "file1, file2 ..." \
userparam1, userparam2 ...

必须保证 “-libjars” 和 “-files” 在 “hadoop jar application.jar” 后面 并且在 “userparam1, userparam2 …” 前面!如果在 “userparam1, userparam2 …” 后面,则会被当做 userparam 处理

1
2
3
4
hadoop jar /home/bigdata/lib/hdfs-1.0-SNAPSHOT.jar \
com.lxl.mr_04.MySQLDBInputFormatDriver \
-libjars /home/bigdata/lib/mysql-connector-java-5.1.47.jar \
hdfs://hadoop001:9000/data6/dept2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class MySQLDBInputFormatDriver  extends Configured implements Tool {

public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new MySQLDBInputFormatDriver(), args);
System.exit(res);
}

@Override
public int run(String[] args) throws Exception {
String output = args[0];
Configuration configuration = getConf();
DBConfiguration.configureDB(configuration,
"com.mysql.jdbc.Driver",
"jdbc:mysql://hadoop001:3306/lxl",
"root",
"123456"
);

Job job = Job.getInstance(configuration);

job.setJarByClass(MySQLDBInputFormatDriver.class);

job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(DeptWritable.class);

//job.addFileToClassPath(new Path("hdfs://hadoop001:9000/hadoop/mysql-connector-java-5.1.47.jar"));
String[] fieldNames = {"deptno", "dname", "loc"};
DBInputFormat.setInput(job,DeptWritable.class,"dept", "", "deptno", fieldNames);

FileSystem fileSystem = FileUtils.getFileSystem();
Path out = new Path(output);
if(fileSystem.exists(out)){
fileSystem.delete(out, true);

}

FileOutputFormat.setOutputPath(job, new Path(output));

System.exit(job.waitForCompletion(true) ? 0 : 1);
return 0;
}

public static class MyMapper extends Mapper<LongWritable, DeptWritable, NullWritable, DeptWritable> {

@Override
protected void map(LongWritable key, DeptWritable value, Context context) throws IOException, InterruptedException {
context.write(NullWritable.get(), value);
}
}

}

Hive(UDF)注册到源码中编译

hive版本 hive-1.1.0-cdh5.16.2

编写UDF函数

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.ruozedata.bigdata</groupId>
<artifactId>hive</artifactId>
<version>1.0-SNAPSHOT</version>

<name>hive</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<hive.version>1.1.0-cdh5.16.2</hive.version>
<hadoop.version>2.6.0-cdh5.16.2</hadoop.version>
</properties>


<repositories>
<!-- 阿里云仓库-->
<repository>
<id>alimaven</id>
<name>aliyun maven</name>
<url>http://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
<!--Note: CDH版本一定需要添加如下依赖-->
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
</repository>
</repositories>

<dependencies>
<!-- HDFS -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>

<dependency>
<!-- Hive -->
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-contrib</artifactId>
<version>${hive.version}</version>
</dependency>
</dependencies>

</project>

创建自定义函数的类

右键包名:New->java Class 新建UDFHello.java类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.lxl.udf;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;

/**
* UDF开发步骤:
* 1) extends UDF
* 2) 重写evaluate方法
**/
@Description(name = "say_hello_doc",
value = "_FUNC_(str) - Returns str with BIGDATA: BIGDATA- str",
extended = "Example:\n " +
"> SELECT _FUNC_('JIM') FROM src LIMIT 1;\n" + " 'Hello: BIGDATA-JIM'")
public class UDFHello extends UDF {

public String evaluate(String ename) {
return "Hello: BIGDATA-" + ename;
}

public String evaluate(String ename, String grade) {
return "Hello: BIGDATA-" + grade + " : " + ename;
}

}

注意: apache版本的extended中的字符串不能手动换行, 只能使用\n换行,否则不会生效

下载源码

hive-1.1.0-cdh5.16.2-src.tar.gz源码文件,上传到服务器,解压

1
2
解压到当前目录:
[hadoop@hadoop001 sourcecode]$ tar -zxvf hive-1.1.0-cdh5.16.2-src.tar.gz

将UDFHello.java类添加到源码

  1. 将UDFHello.java类上传到/home/bigdata/sourcecode/hive-1.1.0-cdh5.16.2/ql/src/java/org/apache/hadoop/hive/ql/udf 目录

  2. 修改UDFHello.java类的包:将 package com.lxl.udf; 修改为package org.apache.hadoop.hive.ql.udf;

  3. 修改FunctionRegistry.java类文件,对UDFHello.java类进行注册:
    3.3.1 引入UDFHello.java类文件的路径:
    import org.apache.hadoop.hive.ql.udf.UDFHello;

    3.3.2 在 static 块中添加:system.registerUDF("helloudf", UDFHello.class, false);
         如下:
               static {
                       system.registerGenericUDF("concat", GenericUDFConcat.class);
                       system.registerUDF("helloudf", HelloUDF.class, false);
                       .
                       .
                       .
                    }
    

编译

  1. 进入hive-1.1.0-cdh5.16.0目录进行源码编译:

    1
    2
    [hadoop@hadoop001 sourcecode] cd hive-1.1.0-cdh5.16.2
    [hadoop@hadoop001 hive-1.1.0-cdh5.16.2] mvn clean package -DskipTests -Phadoop-2 -Pdist
  2. 编译成功后会生成一个tar包:/home/bigdata/sourcecode/hive-1.1.0-cdh5.16.2/packaging/target/apache-hive-1.1.0-cdh5.16.2-bin.tar.gz

部署hive-1.1.0-cdh5.16.2

  1. 直接将编译好的apache-hive-1.1.0-cdh5.16.2-bin.tar.gz进行解压安装

  2. 替换hive-exec-1.1.0-cdh5.16.2.jar包。

    在编译好的文件中找到hive-exec-1.1.0-cdh5.16.2.jar包,去替换现在使用的hadoop-2.6.0-cdh5.16.2的hive-exec-1.1.0-cdh5.16.2.jar包
    编译好的hive-exec-1.1.0-cdh5.16.2.jar包路径:/home/bigdata/sourcecode/hive-1.1.0-cdh5.16.2/packaging/target/apache-hive-1.1.0-cdh5.16.2-bin/apache-hive-1.1.0-cdh5.16.2-bin/lib/
    现在使用的hadoop-2.6.0-cdh5.16.2的hive-exec-1.1.0-cdh5.16.2.jar包路径:/home/bigdata/app/hive-1.1.0-cdh5.16.2/lib/hive-exec-1.1.0-cdh5.16.2.jar
    切换到现在使用的hive目录:

1
2
3
4
5
6
7
8
9
10
11
12
13
hadoop@hadoop001 ~]$ cd ~/app/hive-1.1.0-cdh5.16.2/
[hadoop@hadoop001 hive-1.1.0-cdh5.16.2]$ pwd
/home/hadoop/app/hive-1.1.0-cdh5.16.2
[hadoop@hadoop001 hive-1.1.0-cdh5.16.2]$
备份现在使用的jar包:
/home/hadoop/app/hive-1.1.0-cdh5.16.2/lib/
[hadoop@hadoop001 hive-1.1.0-cdh5.16.2]# mv hive-exec-1.1.0-cdh5.16.2.jar hive-exec-1.1.0-cdh5.16.2.jar.bak
拷贝到原hive 部署位置:
[hadoop@hadoop001 hive-1.1.0-cdh5.16.2]# cp /home/hadoop/sourcecode/hive-1.1.0-cdh5.16.2/packaging/target/apache-hive-1.1.0-cdh5.16.2-bin/apache-hive-1.1.0-cdh5.16.2-bin/lib/hive-exec-1.1.0-cdh5.16.2.jar ./
查看hive-exec-1.1.0-cdh5.16.2.jar包
[hadoop@hadoop001 lib]$ ll hive-exec-1.1.0-cdh5.16.2.*
-rw-rw-r--. 1 hadoop hadoop 19274963 Jul 5 17:44 hive-exec-1.1.0-cdh5.16.2.jar
-rw-r--r--. 1 hadoop hadoop 19272159 Mar 24 2016 hive-exec-1.1.0-cdh5.16.2.jar.bak

重启

重启hive进行验证:
hive> desc function helloudf