HFDF_API
基础环境
JDK 1.8
windows本地
windows上面创建任意一个Hadoop目录:然后在里面创建bin目录,把下载的hadoop.dll这个文件丢进去即可
配置HADOOP的环境变量
依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> <!--2.6.0-cdh5.16.2--> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13</version> </dependency> </dependencies>
FileSystem
1. 基本使用
mkdir
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Test public void mkdir() throws Exception { Configuration configuration = new Configuration(); URI uri = new URI("hdfs://ruozedata001:8020"); // 获取文件系统客户端对象 mock用户 FileSystem fileSystem = FileSystem.get(uri, configuration, "hadoop"); // 对文件系统做相应的操作 Path path = new Path("/hdfsapi"); final boolean isSucceeded = fileSystem.mkdirs(path); // 断言 Assert.assertEquals(true, isSucceeded); // 释放资源 fileSystem.close(); }
copyFromLocal
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Test public void CopyFromLocal() throws Exception { System.setProperty("HADOOP_USER_NAME", "bigdata"); Configuration configuration = new Configuration(); URI uri = new URI("hdfs://hadoop001:9000"); FileSystem fileSystem = FileSystem.get(uri, configuration); Path src = new Path("data/wc.txt"); Path dst = new Path("/hadoop/data/"); fileSystem.copyFromLocalFile(src, dst); fileSystem.close(); }
报错
1 2 3 4 5 org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /hadoop/data/wc.txt could only be replicated to 0 nodes instead of minReplication (=1). There are 1 datanode(s) running and 1 node(s) are excluded in this operation. at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1723) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3508) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:694) ....
解决
服务端 hdfs-site.xml
1 2 3 4 5 [bigdata@hadoop001 hadoop]$ vi hdfs-site.xml <property> <name>hdfs.datanode.use.datanode.hostname</name> <value>true</value> </property>
客户端
1 2 configuration.set("dfs.client.use.datanode.hostname", "true"); configuration.set("dfs.replication", "1");
Hadoop相关的配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 * 默认: * core-default.xml * hdfs-default.xml * mapred-default.xml * yarn-default.xml * * 服务器: * core-site.xml * hdfs-site.xml * mapred-site.xml * yarn-site.xml * * 客户端: * hdfs-site.xml * * 加载顺序: 客户端 > 服务端 > 默认
copyToLocal
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 /*** * 下载文件 * @param src * @param dest * @throws Exception */ public static void get(String src, String dest) throws Exception { FileSystem fileSystem = getFileSystem(); //1. HDFS Path inputPath = new Path(src); //2.本地 Path outputPath = new Path(dest); fileSystem.copyToLocalFile(false,inputPath, outputPath,true); fileSystem.close(); }
工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public class FileUtils { /** * 获取FileSystem * * @return */ static FileSystem getFileSystem() { System.setProperty(HADOOP_USER_NAME, "bigdata"); Properties properties = ParamUtil.getProperties(); Configuration configuration = new Configuration(); configuration.set("dfs.client.use.datanode.hostname", "true"); configuration.set("dfs.replication", "1"); try { URI uri = new URI(properties.getProperty(HDFS_URI)); FileSystem fileSystem = FileSystem.get(uri, configuration); return fileSystem; } catch (Exception e) { e.printStackTrace(); } return null; } /** * 删除目标文件 * @param configuration * @param output */ public static void deleteTarget(Configuration configuration, String output) { try { FileSystem fileSystem = FileSystem.get( configuration); boolean exists = fileSystem.exists(new Path(output)); if (exists) { fileSystem.delete(new Path(output), true); } } catch (Exception e) { e.printStackTrace(); } } }
2.单元测试
代码包名和单元测试包名一致
针对单元测试中的方法
@Before进行初始化操作
@After 进行资源释放操作
对整个类static
MapReduce
了解
wordcount任务
shuffle : 按照key相同的hash进行分发
相同key的分发到同一个reduce任务中进行
运行wc, 查看进程:
1 2 3 1020 YarnChild 1023 YarnChild 9021 MRAppMaster
MRAppMaster: 主程序
YarnChild: 对应Task
MapTask和ReduceTask都是以进程方式运行【MR速度慢的原因之一】
三个核心
Mapper
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { IntWritable ONE = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 获取到内容 final String line = value.toString().toLowerCase(); // 按照分隔符进行拆分 final String[] splits = line.split(","); // 输出 for (String word : splits) { context.write(new Text(word), ONE); } } }
Mapper<LongWritable, Text, Text, IntWritable>
LongWritable, Text 输入的key是偏移量 和value是一行数据
Text 输出的key类型 (拆解开的每个单词)
IntWritable 输出的value类型(1)
mapper中map每行数据执行一次
setup和cleanup 每个task执行一次(1个文件是1个task)
Reducer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { /** * * 排序:默认key 字典序 */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 对每个key做聚合操作(次数相加) int count = 0; for (IntWritable value : values) { count += value.get(); } context.write(key, new IntWritable(count)); } }
Reducer中reduce每个key执行一次
setup和cleanup 每个task执行一次(1个文件是1个task)
Driver
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class WordCountDriver { public static void main(String[] args) throws Exception { String input = "data/wc.txt"; String output = "out"; final Configuration configuration = new Configuration(); // 1、获取Job对象 final Job job = Job.getInstance(configuration); // 删除输出文件夹 FileUtils.deleteTarget(configuration,output); // 2、设置class job.setJarByClass(WordCountDriver.class); // 3、设置Mapper和Reducer job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); // 4、设置Mapper阶段输出数据的key和value的类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 5、设置Reducer阶段输出数据的key和value的类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 6、设置输入和输出数据的路径 FileInputFormat.setInputPaths(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); // 7、提交Job final boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } }
模板方法模式
定义一个操作算法的骨架(interface/trait/abstract class),将实现步骤放在子类中实现
例如:
Mapper中的run方法
1 2 3 4 5 6 7 8 9 10 public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKeyValue()) { map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } }
序列化
Hadoop内置的内置序列化类型
boolean==> BooleanWritable
String ==> Text
int ==> IntWritable
double ==>DoubleWritable
序列化与反序列化
序列化: 内存中对象==》字节数组 网络传输、数据持久化
反序列化:字节数组==》内存中
序列化如何完成
SerDe: hive 序列化
Writable 类中的方法:write() readFields()
自定义序列化对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class Access implements Writable { private String phone; private long up; private long down; private long sum; // 序列化 @Override public void write(DataOutput out) throws IOException { out.writeUTF(phone); out.writeLong(up); out.writeLong(down); out.writeLong(sum); } // 反序列化 @Override public void readFields(DataInput in) throws IOException { phone = in.readUTF(); up = in.readLong(); down = in.readLong(); sum = in.readLong(); } @Override public String toString() { return "Access{" + "phone='" + phone + '\'' + ", up=" + up + ", down=" + down + ", sum=" + sum + '}'; } //getter setter方法略 }
注意:
实现Writable接口
重写readFields和write方法, 注意:顺序和类型
** 反序列化和序列化方法的字段操作顺序一定要一致**
建议重写toString方法
建议手工加上一个无参的构造器【optional】
Key是经过排序
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 public class AccessDriver { public static void main(String[] args) throws Exception { String input = "data/access.log"; String output = "out"; Configuration configuration = new Configuration(); // 1、获取Job对象 Job job = Job.getInstance(configuration); // 删除输出文件夹 FileUtils.deleteTarget(configuration,output); // 2、设置class job.setJarByClass(AccessDriver.class); // 3、设置Mapper和Reducer job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); // 4、设置Mapper阶段输出数据的key和value的类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Access.class); // 5、设置Reducer阶段输出数据的key和value的类型 job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Access.class); // 6、设置输入和输出数据的路径 FileInputFormat.setInputPaths(job, new Path(input)); FileOutputFormat.setOutputPath(job, new Path(output)); // 7、提交Job final boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } public static class MyMapper extends Mapper<LongWritable, Text, Text, Access> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] splits = value.toString().split("\t"); final String phone = splits[1]; // 第二个字段:手机号 long up = Long.parseLong(splits[splits.length-3]); //倒数第三个字段:上行流量 long down = Long.parseLong(splits[splits.length-2]); // 倒数第二个字段:下行流量 final Access access = new Access(); access.setPhone(phone); access.setUp(up); access.setDown(down); access.setSum(up + down); context.write(new Text(phone), access); } } public static class MyReducer extends Reducer<Text, Access, NullWritable, Access> { @Override protected void reduce(Text phone, Iterable<Access> values, Context context) throws IOException, InterruptedException { long ups = 0; long downs = 0; for (Access access : values) { ups += access.getUp(); downs += access.getDown(); } final Access access = new Access(); access.setPhone(phone.toString()); access.setUp(ups); access.setDown(downs); access.setSum(ups + downs); context.write(NullWritable.get(), access); } } }
读取MR作业输入数据
InputSplit logical 默认情况下 就等同于一个block
File ==> 1…N block 物理上的概念
把InputSplit分配给MapTask来执行(进程)
179M 被拆分成6个InputSplit
Input ==> InputSplit(s) ==> Mapper ==> Shuffle ==> Reducer ==> Output
179M被一个MapTask处理 vs 被6个MapTask处理
思路:分布式计算,[相互之间没有依赖关系]并行化来处理 性能更高[前提是资源够的情况下]
==> 并行度 InputSplit决定了MapTask的并行度[有几个进程同时在运行]
InputFormat
RecordReader ==> InputSplit ==> MapTask
==> setup
==> map
==> cleanup
eg:
blocksize : 128M
200M ==> 2个blocksize 128 + 剩下的大小
==> 2个MapTask : 并行处理
N MapTask ==> N个JVM (JVM复用 reuse)
MapReduce作业来说:
在执行的时候,是有一个切片的过程(Input ==> InputSplit)
InputSplit ==> MapTask
默认情况下: InputSplit == BlockSize
调整InputSplit的可能有,但是比较少
FileInputFormat
CombineFileInputFormat 小文件合并
KeyValueTextInputFormat 读取通过KV形式
NLineInputFormat: 按行数切片
SequenceFileInputFormat 二进制
TextInputFormat 文本(按行分隔)
作业提交流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 job.waitForCompletion{ submit() { connect() { // 1)本地 2)YARN return new Cluster(getConfiguration()); } JobSubmitter submitter // 作业提交器 submitter.submitJobInternal(Job.this, cluster) {// ***** //1. 数据输出目录是否存在 //2. 输出有无配置 checkSpecs(job); //submitJobDir存放作业相关的信息 Path submitJobDir = new Path(jobStagingArea, jobId.toString()); //拷贝 copyAndConfigureFiles(job, submitJobDir); // job.xml维护job的相关信息 Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); //计算Map(InputSplit)数量 int maps = writeSplits(job, submitJobDir); { minSize = max(1,0) = 1 maxSize = Long.MAX_VALUE blockSize = 32M //计算splitSize splitSize = computeSplitSize(blockSize, minSize, maxSize);{ Math.max(minSize, Math.min(maxSize, blockSize)); max(1, min(Long.MAX_VALUE, 32M)) max(1, 32M) = 32M } //SPLIT_SLOP=1.1 ((double)bytesRemaining)/splitSize>SPLIT_SLOP } //配置文件写入 writeConf(conf, submitJobFile); } } }
是否能够split
isSplitable = true
考虑: 生产上使用压缩时, 是否能够被split
能切: BZip2
不能切: Gzip Snappy
InputFormat的方法:
1 2 3 4 5 6 7 8 9 public abstract List<InputSplit> getSplits(JobContext context ) throws IOException, InterruptedException; public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException;
引入mysql依赖
1 2 3 4 5 <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.47</version> </dependency>
自定义类实现DBWritable, Writable 接口
DBWritable 数据库操作
Writable 序列化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public class DeptWritable implements DBWritable, Writable { private int deptno; private String dname; private String loc; @Override public void write(PreparedStatement statement) throws SQLException { statement.setInt(1, deptno); statement.setString(2, dname); statement.setString(3, loc); } @Override public void readFields(ResultSet resultSet) throws SQLException { this.deptno = resultSet.getInt(1); this.dname = resultSet.getString(2); this.loc = resultSet.getString(3); } @Override public void write(DataOutput out) throws IOException { out.writeInt(deptno); out.writeUTF(dname); out.writeUTF(loc); } @Override public void readFields(DataInput in) throws IOException { this.deptno = in.readInt(); this.dname = in.readUTF(); this.loc = in.readUTF(); } @Override public String toString() { return deptno + "\t" + dname + "\t" + loc; } public DeptWritable() { } public DeptWritable(int deptno, String dname, String loc) { this.deptno = deptno; this.dname = dname; this.loc = loc; } //getter setter方法略 }
获取数据库连接, 将数据读到并写入文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public class MySQLDBInputFormatDriver { public static void main(String[] args) throws Exception { String output = "out"; Configuration configuration = new Configuration(); //通过DBConfiguration 设置数据库连接信息 DBConfiguration.configureDB(configuration, "com.mysql.jdbc.Driver", "jdbc:mysql://hadoop001:3306/lxl", "root", "123456" ); Job job = Job.getInstance(configuration); FileUtils.deleteTarget(configuration,output); job.setJarByClass(MySQLDBInputFormatDriver.class); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(DeptWritable.class); //设置读取的数据库表 字段 条件 排序字段 String[] fieldNames = {"deptno", "dname", "loc"}; DBInputFormat.setInput(job,DeptWritable.class,"dept", "deptno>10", "dname", fieldNames); FileOutputFormat.setOutputPath(job, new Path(output)); final boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } public static class MyMapper extends Mapper<LongWritable, DeptWritable, NullWritable, DeptWritable> { @Override protected void map(LongWritable key, DeptWritable value, Context context) throws IOException, InterruptedException { context.write(NullWritable.get(), value); }
KeyValueTextInputFormat
需求: 统计每一行第一个单词相同的个数
driver中设置
1 2 3 4 5 final Configuration configuration = new Configuration(); //在job获取之前设置才生效 configuration.set(KEY_VALUE_SEPERATOR,","); final Job job = Job.getInstance(configuration);
mapper 输入的K V 类型Text Text
1 2 3 4 5 6 7 8 public static class MyMapper extends Mapper<Text, Text, Text, IntWritable> { IntWritable ONE = new IntWritable(1); @Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { context.write(key, ONE); } }
reducer
1 2 3 4 5 6 7 8 9 10 11 public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; for (IntWritable value : values) { count += value.get(); } context.write(key, new IntWritable(count)); } }
遇到的问题
解决 : Mapper 和 Reducer类前面加static
Partiontion
自定义分区器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class AccessPartitioner extends Partitioner<Text, Access> { @Override public int getPartition(Text text, Access access, int numPartitions) { final String telephone = text.toString(); if(telephone.startsWith("13")) { return 0; } else if (telephone.startsWith("15")) { return 1; } else { return 2; } } }
分区器的泛型类型 与 map输出的KV一致
driver设置
1 2 job.setPartitionerClass(AccessPartitioner.class); job.setNumReduceTasks(3); // 默认就是一个
如果task数量>partioner数量, 会生成多的空文件
task的数量<partioner的数量, 报错 (默认只有一个task, 设置分区无效)
reducerTask数量决定了最终文件输出的个数
Mapper
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public static class MyMapper extends Mapper<LongWritable, Text, Text, Access> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] splits = value.toString().split("\t"); final String phone = splits[1]; // 第二个字段:手机号 long up = Long.parseLong(splits[splits.length-3]); //倒数第三个字段:上行流量 long down = Long.parseLong(splits[splits.length-2]); // 倒数第二个字段:下行流量 final Access access = new Access(); access.setPhone(phone); access.setUp(up); access.setDown(down); access.setSum(up + down); context.write(new Text(phone), access); } }
Reducer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public static class MyReducer extends Reducer<Text, Access, NullWritable, Access> { @Override protected void reduce(Text phone, Iterable<Access> values, Context context) throws IOException, InterruptedException { long ups = 0; long downs = 0; for (Access access : values) { ups += access.getUp(); downs += access.getDown(); } final Access access = new Access(); access.setPhone(phone.toString()); access.setUp(ups); access.setDown(downs); access.setSum(ups + downs); context.write(NullWritable.get(), access); } }
Combine预聚合
运行再MapTask上
减少数据的网络传输
父类就是Reducer
求平均数不能用
WC 日志
1 2 3 4 5 6 7 8 9 Map-Reduce Framework Map input records=3 Map output records=7 Map output bytes=61 Map output materialized bytes=81 Input split bytes=108 Combine input records=0 Combine output records=0 ...
在job中设置combiner后
1 2 // 设置combiner 由于业务逻辑和reduce一样,使用WordCountReducer // job.setCombinerClass(WordCountReducer.class);
日志
1 2 3 4 5 6 7 8 9 Map-Reduce Framework Map input records=3 Map output records=7 Map output bytes=61 Map output materialized bytes=49 Input split bytes=108 Combine input records=7 Combine output records=4 ...
Sort
自定义的类实现WritableComparable接口
1 2 3 4 5 6 7 8 9 10 11 public interface WritableComparable<T> extends Writable, Comparable<T> { } public class Access implements WritableComparable<Access> { //省略序列化方法... // 自定义排序方法 @Override public int compareTo(Access o) { return this.getSum() > o.getSum() ? 1 : -1; }
全局排序: 只有一个文件输出 order by
分区排序: 每个reduce有序, 但不能全局有序 sort by
distinct
有哪些单词 , 关注KEY
Mapper
1 2 3 4 5 6 7 8 9 10 11 12 public static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { final String line = value.toString().toLowerCase(); final String[] splits = line.split(","); for (String word : splits) { context.write(new Text(word), NullWritable.get()); } } }
Reducer
1 2 3 4 5 6 public static class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable> { @Override protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); } }
group by
emp数据中 每个部门有多少人
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> { IntWritable ONE = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { final String line = value.toString().toLowerCase(); final String[] splits = line.split("\t"); // 代码容错 if(splits.length == 8) { context.write(new Text(splits[7].trim()), ONE); } } }
1 2 3 4 5 6 7 8 9 10 11 12 public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 对每个key做聚合操作(次数相加) int count = 0; for (IntWritable value : values) { count += value.get(); } context.write(key, new IntWritable(count)); } }
Join
Reduce Join/Common Join/Shuffle Join 都是要走shuffle的
例如: emp e join dept d on e.deptno = d.deptno;
Mapper
join的条件做为key
emp:(20, Info) tag=e
dept: (20, Info) tag=d
来自于不同的“表”的数据打个tag
shuffle:
相同的key会被分到同一个reducer中去进行处理
不同的key是否可能会被分到同一个reducer中
Reducer
(20, (Info,Info)) 使用tag 区分Info来自哪个表
Reduce Join
Mapper
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public static class MyMapper extends Mapper<LongWritable, Text, IntWritable, Info> { String name; @Override protected void setup(Context context) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit)context.getInputSplit(); name = fileSplit.getPath().getName(); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] splits = value.toString().split("\t"); // 在同一个map中如何区分出来这一行数据到底是来自emp还是dept if (name.contains("emp")) { // emp final int empno = Integer.parseInt(splits[0].trim()); final String ename = splits[1].trim(); final int deptno = Integer.parseInt(splits[7].trim()); Info info = new Info(); info.setEmpno(empno); info.setEname(ename); info.setDeptno(deptno); info.setDname(""); info.setFlag(1); context.write(new IntWritable(deptno), info); } else { // dept Info info = new Info(); final int deptno = Integer.parseInt(splits[0].trim()); final String dname = splits[1].trim(); info.setDeptno(deptno); info.setDname(dname); info.setFlag(2); info.setEmpno(0); info.setEname(""); context.write(new IntWritable(deptno), info); } } }
Reducer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public static class MyReducer extends Reducer<IntWritable, Info, Info, NullWritable> { @Override protected void reduce(IntWritable key, Iterable<Info> values, Context context) throws IOException, InterruptedException { List<Info> infos = new ArrayList<>(); String dname = ""; // 真正完成join操作 for (Info info : values) { if(info.getFlag() == 1) { // emp Info tmp = new Info(); tmp.setEmpno(info.getEmpno()); tmp.setEname(info.getEname()); tmp.setDeptno(info.getDeptno()); infos.add(tmp); } else { // dept dname = info.getDname(); } } for(Info bean : infos) { bean.setDname(dname); context.write(bean, NullWritable.get()); } } }
日志
1 2 3 4 5 6 7 Reduce input groups=4 Reduce shuffle bytes=511 Reduce input records=18 Reduce output records=14 Spilled Records=36 Shuffled Maps =2 Failed Shuffles=0
MapJoin,BroadcastJoin
有一方是小表
Join操作是在Map端完成,换句话说:不需要Reducer,那就说明没有Shuffle
工作原理:
小表加载到缓存中
在读大表的每行数据时,直接从缓存中进行join
弊端:小表不能太大,因为要加载缓存中
driver
1 2 3 4 5 6 7 8 9 ... // 5、设置Reducer阶段输出数据的key和value的类型 job.setNumReduceTasks(0); // 明确告诉MR框架没有reducer // 小表放到缓存中 job.addCacheFile(new URI("data/join/dept.txt")); ...
Mapper
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public static class MyMapper extends Mapper<LongWritable, Text, Info, NullWritable> { Map<String,String> cacheMap = new HashMap<>(); @Override protected void setup(Context context) throws IOException, InterruptedException { //获取缓存中的信息 final String path = context.getCacheFiles()[0].getPath(); final BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path), "UTF-8")); String line; while (StringUtils.isNotEmpty(line = reader.readLine())) { final String[] splits = line.split("\t"); //deptno,dname cacheMap.put(splits[0], splits[1]); } IOUtils.closeStream(reader); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { final String[] splits = value.toString().split("\t"); final int empno = Integer.parseInt(splits[0].trim()); final String ename = splits[1].trim(); final int deptno = Integer.parseInt(splits[7].trim()); Info info = new Info(); info.setEmpno(empno); info.setEname(ename); info.setDeptno(deptno); info.setDname(cacheMap.get(deptno+"")); context.write(info, NullWritable.get()); } }
日志
1 2 3 4 5 6 7 Map-Reduce Framework Map input records=14 Map output records=14 Input split bytes=114 Spilled Records=0 Failed Shuffles=0 Merged Map outputs=0