MapReduce
MR作业部署
将MySQL的数据读取存到HDFS
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public class MySQLDBInputFormatDriver { public static void main(String[] args) throws Exception { String output = args[0]; Configuration configuration = new Configuration(); DBConfiguration.configureDB(configuration, "com.mysql.jdbc.Driver", "jdbc:mysql://hadoop001:3306/lxl", "root", "123456" ); Job job = Job.getInstance(configuration); job.setJarByClass(MySQLDBInputFormatDriver.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(DeptWritable.class); String[] fieldNames = {"deptno", "dname", "loc"}; DBInputFormat.setInput(job,DeptWritable.class,"dept", "", "deptno", fieldNames); FileSystem fileSystem = FileUtils.getFileSystem(); Path out = new Path(output); if(fileSystem.exists(out)){ fileSystem.delete(out, true); } FileOutputFormat.setOutputPath(job, new Path(output)); final boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } public static class MyMapper extends Mapper<LongWritable, DeptWritable, NullWritable, DeptWritable> { @Override protected void map(LongWritable key, DeptWritable value, Context context) throws IOException, InterruptedException { context.write(NullWritable.get(), value); } } }
重点:
DBInputFormat.setInput设置读取的表,字段,条件,排序
将本地的应用打包
上传本地的jar, 然后在服务器上运行
1 hadoop jar /home/bigdata/lib/hdfs-1.0-SNAPSHOT.jar com.lxl.mr_04.MySQLDBInputFormatDriver hdfs://hadoop001:9000/data6
可以传参,指定目录
报错 CLASSNOTFOUND, MySQL驱动包找不到
解决:
第一种: 在服务器上$HADOOP_HOME/share/hadoop/yarn/这个路径上传MySQL驱动包
1 2 3 4 5 6 7 8 [bigdata@hadoop001 yarn]$ ll total 7184 ... -rw-r--r-- 1 bigdata bigdata 8982 Jun 3 2019 hadoop-yarn-server-tests-2.6.0-cdh5.16.2.jar -rw-r--r-- 1 bigdata bigdata 39677 Jun 3 2019 hadoop-yarn-server-web-proxy-2.6.0-cdh5.16.2.jar drwxr-xr-x 2 bigdata bigdata 4096 Jun 3 2019 lib -rw-r--r-- 1 bigdata bigdata 1007502 Mar 13 18:49 mysql-connector-java-5.1.47.jar ...
第二种: 打包时将依赖包全部打入jar包中, 再上传到服务器上运行
在pom中添加maven打包插件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>3.0.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <!-- bind to the packaging phase --> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin>
第三种: 使用RunnerTool和libjars命令
**重点: **
1. 重写run方法, configuration通过getConf()获取,不能new
**2.使用命令: **
1 2 3 4 5 hadoop jar application.jar \ mainClass \ -libjars "thirdpartlib_1.jar,thirdpartlib_2.jar ..." \ -files "file1, file2 ..." \ userparam1, userparam2 ...
必须保证 “-libjars” 和 “-files” 在 “hadoop jar application.jar” 后面 并且在 “userparam1, userparam2 …” 前面!如果在 “userparam1, userparam2 …” 后面,则会被当做 userparam 处理
1 2 3 4 hadoop jar /home/bigdata/lib/hdfs-1.0-SNAPSHOT.jar \ com.lxl.mr_04.MySQLDBInputFormatDriver \ -libjars /home/bigdata/lib/mysql-connector-java-5.1.47.jar \ hdfs://hadoop001:9000/data6/dept2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public class MySQLDBInputFormatDriver extends Configured implements Tool { public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new MySQLDBInputFormatDriver(), args); System.exit(res); } @Override public int run(String[] args) throws Exception { String output = args[0]; Configuration configuration = getConf(); DBConfiguration.configureDB(configuration, "com.mysql.jdbc.Driver", "jdbc:mysql://hadoop001:3306/lxl", "root", "123456" ); Job job = Job.getInstance(configuration); job.setJarByClass(MySQLDBInputFormatDriver.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(DeptWritable.class); //job.addFileToClassPath(new Path("hdfs://hadoop001:9000/hadoop/mysql-connector-java-5.1.47.jar")); String[] fieldNames = {"deptno", "dname", "loc"}; DBInputFormat.setInput(job,DeptWritable.class,"dept", "", "deptno", fieldNames); FileSystem fileSystem = FileUtils.getFileSystem(); Path out = new Path(output); if(fileSystem.exists(out)){ fileSystem.delete(out, true); } FileOutputFormat.setOutputPath(job, new Path(output)); System.exit(job.waitForCompletion(true) ? 0 : 1); return 0; } public static class MyMapper extends Mapper<LongWritable, DeptWritable, NullWritable, DeptWritable> { @Override protected void map(LongWritable key, DeptWritable value, Context context) throws IOException, InterruptedException { context.write(NullWritable.get(), value); } } }
Hive(UDF)注册到源码中编译
hive版本 hive-1.1.0-cdh5.16.2
编写UDF函数
pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.ruozedata.bigdata</groupId> <artifactId>hive</artifactId> <version>1.0-SNAPSHOT</version> <name>hive</name> <!-- FIXME change it to the project's website --> <url>http://www.example.com</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> <hive.version>1.1.0-cdh5.16.2</hive.version> <hadoop.version>2.6.0-cdh5.16.2</hadoop.version> </properties> <repositories> <!-- 阿里云仓库--> <repository> <id>alimaven</id> <name>aliyun maven</name> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </repository> <!--Note: CDH版本一定需要添加如下依赖--> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos</url> </repository> </repositories> <dependencies> <!-- HDFS --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <dependency> <!-- Hive --> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> </dependency> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-contrib</artifactId> <version>${hive.version}</version> </dependency> </dependencies> </project>
创建自定义函数的类
右键包名:New->java Class 新建UDFHello.java类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 package com.lxl.udf; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.hadoop.hive.ql.exec.UDF; /** * UDF开发步骤: * 1) extends UDF * 2) 重写evaluate方法 **/ @Description(name = "say_hello_doc", value = "_FUNC_(str) - Returns str with BIGDATA: BIGDATA- str", extended = "Example:\n " + "> SELECT _FUNC_('JIM') FROM src LIMIT 1;\n" + " 'Hello: BIGDATA-JIM'") public class UDFHello extends UDF { public String evaluate(String ename) { return "Hello: BIGDATA-" + ename; } public String evaluate(String ename, String grade) { return "Hello: BIGDATA-" + grade + " : " + ename; } }
注意: apache版本的extended中的字符串不能手动换行, 只能使用\n换行,否则不会生效
下载源码
hive-1.1.0-cdh5.16.2-src.tar.gz源码文件,上传到服务器,解压
1 2 解压到当前目录: [hadoop@hadoop001 sourcecode]$ tar -zxvf hive-1.1.0-cdh5.16.2-src.tar.gz
将UDFHello.java类添加到源码
将UDFHello.java类上传到/home/bigdata/sourcecode/hive-1.1.0-cdh5.16.2/ql/src/java/org/apache/hadoop/hive/ql/udf 目录
修改UDFHello.java类的包:将 package com.lxl.udf; 修改为package org.apache.hadoop.hive.ql.udf;
修改FunctionRegistry.java类文件,对UDFHello.java类进行注册:
3.3.1 引入UDFHello.java类文件的路径:
import org.apache.hadoop.hive.ql.udf.UDFHello;
3.3.2 在 static 块中添加:system.registerUDF("helloudf", UDFHello.class, false);
如下:
static {
system.registerGenericUDF("concat", GenericUDFConcat.class);
system.registerUDF("helloudf", HelloUDF.class, false);
.
.
.
}
编译
进入hive-1.1.0-cdh5.16.0目录进行源码编译:
1 2 [hadoop@hadoop001 sourcecode] cd hive-1.1.0-cdh5.16.2 [hadoop@hadoop001 hive-1.1.0-cdh5.16.2] mvn clean package -DskipTests -Phadoop-2 -Pdist
编译成功后会生成一个tar包:/home/bigdata/sourcecode/hive-1.1.0-cdh5.16.2/packaging/target/apache-hive-1.1.0-cdh5.16.2-bin.tar.gz
部署hive-1.1.0-cdh5.16.2
直接将编译好的apache-hive-1.1.0-cdh5.16.2-bin.tar.gz进行解压安装
替换hive-exec-1.1.0-cdh5.16.2.jar包。
在编译好的文件中找到hive-exec-1.1.0-cdh5.16.2.jar包,去替换现在使用的hadoop-2.6.0-cdh5.16.2的hive-exec-1.1.0-cdh5.16.2.jar包
编译好的hive-exec-1.1.0-cdh5.16.2.jar包路径:/home/bigdata/sourcecode/hive-1.1.0-cdh5.16.2/packaging/target/apache-hive-1.1.0-cdh5.16.2-bin/apache-hive-1.1.0-cdh5.16.2-bin/lib/
现在使用的hadoop-2.6.0-cdh5.16.2的hive-exec-1.1.0-cdh5.16.2.jar包路径:/home/bigdata/app/hive-1.1.0-cdh5.16.2/lib/hive-exec-1.1.0-cdh5.16.2.jar
切换到现在使用的hive目录:
1 2 3 4 5 6 7 8 9 10 11 12 13 hadoop@hadoop001 ~]$ cd ~/app/hive-1.1.0-cdh5.16.2/ [hadoop@hadoop001 hive-1.1.0-cdh5.16.2]$ pwd /home/hadoop/app/hive-1.1.0-cdh5.16.2 [hadoop@hadoop001 hive-1.1.0-cdh5.16.2]$ 备份现在使用的jar包: /home/hadoop/app/hive-1.1.0-cdh5.16.2/lib/ [hadoop@hadoop001 hive-1.1.0-cdh5.16.2]# mv hive-exec-1.1.0-cdh5.16.2.jar hive-exec-1.1.0-cdh5.16.2.jar.bak 拷贝到原hive 部署位置: [hadoop@hadoop001 hive-1.1.0-cdh5.16.2]# cp /home/hadoop/sourcecode/hive-1.1.0-cdh5.16.2/packaging/target/apache-hive-1.1.0-cdh5.16.2-bin/apache-hive-1.1.0-cdh5.16.2-bin/lib/hive-exec-1.1.0-cdh5.16.2.jar ./ 查看hive-exec-1.1.0-cdh5.16.2.jar包 [hadoop@hadoop001 lib]$ ll hive-exec-1.1.0-cdh5.16.2.* -rw-rw-r--. 1 hadoop hadoop 19274963 Jul 5 17:44 hive-exec-1.1.0-cdh5.16.2.jar -rw-r--r--. 1 hadoop hadoop 19272159 Mar 24 2016 hive-exec-1.1.0-cdh5.16.2.jar.bak
重启
重启hive进行验证:
hive> desc function helloudf