HDFS_API

This is about HDFS

Posted by PsycheLee on 2015-09-08

HFDF_API

基础环境

JDK 1.8

windows本地

  1. windows上面创建任意一个Hadoop目录:然后在里面创建bin目录,把下载的hadoop.dll这个文件丢进去即可
  2. 配置HADOOP的环境变量

依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version> <!--2.6.0-cdh5.16.2-->
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
</dependency>
</dependencies>

FileSystem

1. 基本使用

mkdir

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
  @Test
public void mkdir() throws Exception {
Configuration configuration = new Configuration();

URI uri = new URI("hdfs://ruozedata001:8020");

// 获取文件系统客户端对象 mock用户
FileSystem fileSystem = FileSystem.get(uri, configuration, "hadoop");

// 对文件系统做相应的操作
Path path = new Path("/hdfsapi");
final boolean isSucceeded = fileSystem.mkdirs(path);

// 断言
Assert.assertEquals(true, isSucceeded);

// 释放资源
fileSystem.close();
}

copyFromLocal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
public void CopyFromLocal() throws Exception {

System.setProperty("HADOOP_USER_NAME", "bigdata");
Configuration configuration = new Configuration();
URI uri = new URI("hdfs://hadoop001:9000");
FileSystem fileSystem = FileSystem.get(uri, configuration);

Path src = new Path("data/wc.txt");
Path dst = new Path("/hadoop/data/");
fileSystem.copyFromLocalFile(src, dst);

fileSystem.close();
}

报错

1
2
3
4
5
org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /hadoop/data/wc.txt could only be replicated to 0 nodes instead of minReplication (=1).  There are 1 datanode(s) running and 1 node(s) are excluded in this operation.
at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1723)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3508)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:694)
....

解决

  • 服务端 hdfs-site.xml

    1
    2
    3
    4
    5
    [bigdata@hadoop001 hadoop]$ vi hdfs-site.xml   
    <property>
    <name>hdfs.datanode.use.datanode.hostname</name>
    <value>true</value>
    </property>
  • 客户端

    1
    2
    configuration.set("dfs.client.use.datanode.hostname", "true");
    configuration.set("dfs.replication", "1");

    Hadoop相关的配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17

    * 默认:
    * core-default.xml
    * hdfs-default.xml
    * mapred-default.xml
    * yarn-default.xml
    *
    * 服务器:
    * core-site.xml
    * hdfs-site.xml
    * mapred-site.xml
    * yarn-site.xml
    *
    * 客户端:
    * hdfs-site.xml
    *
    * 加载顺序: 客户端 > 服务端 > 默认

copyToLocal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/***
* 下载文件
* @param src
* @param dest
* @throws Exception
*/
public static void get(String src, String dest) throws Exception {
FileSystem fileSystem = getFileSystem();
//1. HDFS
Path inputPath = new Path(src);
//2.本地
Path outputPath = new Path(dest);
fileSystem.copyToLocalFile(false,inputPath, outputPath,true);
fileSystem.close();
}

工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class FileUtils {
/**
* 获取FileSystem
*
* @return
*/
static FileSystem getFileSystem() {
System.setProperty(HADOOP_USER_NAME, "bigdata");
Properties properties = ParamUtil.getProperties();
Configuration configuration = new Configuration();
configuration.set("dfs.client.use.datanode.hostname", "true");
configuration.set("dfs.replication", "1");

try {
URI uri = new URI(properties.getProperty(HDFS_URI));
FileSystem fileSystem = FileSystem.get(uri, configuration);
return fileSystem;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}

/**
* 删除目标文件
* @param configuration
* @param output
*/

public static void deleteTarget(Configuration configuration, String output) {

try {

FileSystem fileSystem = FileSystem.get( configuration);

boolean exists = fileSystem.exists(new Path(output));
if (exists) {
fileSystem.delete(new Path(output), true);
}
} catch (Exception e) {
e.printStackTrace();
}

}
}

2.单元测试

  • 代码包名和单元测试包名一致

  • 针对单元测试中的方法

    • @Before进行初始化操作
    • @After 进行资源释放操作
  • 对整个类static

    • @BeforeClass
    • AfterClass

MapReduce

了解

  • 分布式计算框架, 容错、RPC通信等问题不用关注, 关注业务逻辑

  • Map: 把一个任务拆解成多个

  • Reduce: 把拆开的任务做聚合

  • 扩展性: 机器等资源扩展

  • 适用: 离线 批计算

  • 不适用: 实时计算 迭代计算

wordcount任务

WordCount

shuffle : 按照key相同的hash进行分发

​ 相同key的分发到同一个reduce任务中进行

运行wc, 查看进程:

1
2
3
1020 YarnChild
1023 YarnChild
9021 MRAppMaster

MRAppMaster: 主程序

YarnChild: 对应Task

​ MapTask和ReduceTask都是以进程方式运行【MR速度慢的原因之一】

三个核心

  • Mapper
  • Reducer
  • Driver
Mapper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

IntWritable ONE = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 获取到内容
final String line = value.toString().toLowerCase();

// 按照分隔符进行拆分
final String[] splits = line.split(",");

// 输出
for (String word : splits) {
context.write(new Text(word), ONE);
}

}
}
  • Mapper<LongWritable, Text, Text, IntWritable>
    • LongWritable, Text 输入的key是偏移量 和value是一行数据
    • Text 输出的key类型 (拆解开的每个单词)
    • IntWritable 输出的value类型(1)

mapper中map每行数据执行一次

​ setup和cleanup 每个task执行一次(1个文件是1个task)

Reducer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

/**
*
* 排序:默认key 字典序
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 对每个key做聚合操作(次数相加)
int count = 0;
for (IntWritable value : values) {
count += value.get();
}

context.write(key, new IntWritable(count));
}
}
  • Reducer<Text, IntWritable, Text, IntWritable>

    • Text, IntWritable输入的key 和value类型

      [与Mapper的输出类型一致]

    • Text 输出的key类型 (单词)

    • IntWritable 输出的value类型, 累计值

Reducer中reduce每个key执行一次

​ setup和cleanup 每个task执行一次(1个文件是1个task)

Driver
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class WordCountDriver {

public static void main(String[] args) throws Exception {
String input = "data/wc.txt";
String output = "out";

final Configuration configuration = new Configuration();

// 1、获取Job对象
final Job job = Job.getInstance(configuration);

// 删除输出文件夹
FileUtils.deleteTarget(configuration,output);

// 2、设置class
job.setJarByClass(WordCountDriver.class);

// 3、设置Mapper和Reducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);

// 4、设置Mapper阶段输出数据的key和value的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

// 5、设置Reducer阶段输出数据的key和value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// 6、设置输入和输出数据的路径
FileInputFormat.setInputPaths(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));

// 7、提交Job
final boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}

模板方法模式

定义一个操作算法的骨架(interface/trait/abstract class),将实现步骤放在子类中实现

例如:

Mapper中的run方法

1
2
3
4
5
6
7
8
9
10
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} finally {
cleanup(context);
}
}

序列化

Hadoop内置的内置序列化类型

boolean==> BooleanWritable

String ==> Text

int ==> IntWritable

double ==>DoubleWritable

序列化与反序列化

序列化: 内存中对象==》字节数组 网络传输、数据持久化

反序列化:字节数组==》内存中

序列化如何完成

​ SerDe: hive 序列化

​ Writable 类中的方法:write() readFields()

自定义序列化对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Access  implements Writable {

private String phone;
private long up;
private long down;
private long sum;

// 序列化
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(phone);
out.writeLong(up);
out.writeLong(down);
out.writeLong(sum);
}

// 反序列化
@Override
public void readFields(DataInput in) throws IOException {
phone = in.readUTF();
up = in.readLong();
down = in.readLong();
sum = in.readLong();
}

@Override
public String toString() {
return "Access{" +
"phone='" + phone + '\'' +
", up=" + up +
", down=" + down +
", sum=" + sum +
'}';
}

//getter setter方法略
}

注意:

  1. 实现Writable接口
  2. 重写readFields和write方法, 注意:顺序和类型

​ ** 反序列化和序列化方法的字段操作顺序一定要一致**

  1. 建议重写toString方法
  2. 建议手工加上一个无参的构造器【optional】
  3. Key是经过排序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class AccessDriver {
public static void main(String[] args) throws Exception {
String input = "data/access.log";
String output = "out";

Configuration configuration = new Configuration();

// 1、获取Job对象
Job job = Job.getInstance(configuration);

// 删除输出文件夹
FileUtils.deleteTarget(configuration,output);

// 2、设置class
job.setJarByClass(AccessDriver.class);

// 3、设置Mapper和Reducer
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);

// 4、设置Mapper阶段输出数据的key和value的类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Access.class);

// 5、设置Reducer阶段输出数据的key和value的类型
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Access.class);

// 6、设置输入和输出数据的路径
FileInputFormat.setInputPaths(job, new Path(input));
FileOutputFormat.setOutputPath(job, new Path(output));

// 7、提交Job
final boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}

public static class MyMapper extends Mapper<LongWritable, Text, Text, Access> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] splits = value.toString().split("\t");
final String phone = splits[1]; // 第二个字段:手机号
long up = Long.parseLong(splits[splits.length-3]); //倒数第三个字段:上行流量
long down = Long.parseLong(splits[splits.length-2]); // 倒数第二个字段:下行流量

final Access access = new Access();
access.setPhone(phone);
access.setUp(up);
access.setDown(down);
access.setSum(up + down);

context.write(new Text(phone), access);
}
}

public static class MyReducer extends Reducer<Text, Access, NullWritable, Access> {
@Override
protected void reduce(Text phone, Iterable<Access> values, Context context) throws IOException, InterruptedException {
long ups = 0;
long downs = 0;

for (Access access : values) {
ups += access.getUp();
downs += access.getDown();
}

final Access access = new Access();
access.setPhone(phone.toString());
access.setUp(ups);
access.setDown(downs);
access.setSum(ups + downs);

context.write(NullWritable.get(), access);
}
}
}

InputFormat

读取MR作业输入数据

​ InputSplit logical 默认情况下 就等同于一个block
​ File ==> 1…N block 物理上的概念
​ 把InputSplit分配给MapTask来执行(进程)

179M 被拆分成6个InputSplit
Input ==> InputSplit(s) ==> Mapper ==> Shuffle ==> Reducer ==> Output

179M被一个MapTask处理 vs 被6个MapTask处理

思路:分布式计算,[相互之间没有依赖关系]并行化来处理 性能更高[前提是资源够的情况下]

==> 并行度 InputSplit决定了MapTask的并行度[有几个进程同时在运行]

InputFormat
RecordReader ==> InputSplit ==> MapTask
==> setup
==> map
==> cleanup

eg:

blocksize : 128M
200M ==> 2个blocksize 128 + 剩下的大小
==> 2个MapTask : 并行处理

N MapTask ==> N个JVM (JVM复用 reuse)

MapReduce作业来说:
在执行的时候,是有一个切片的过程(Input ==> InputSplit)
InputSplit ==> MapTask
默认情况下: InputSplit == BlockSize
调整InputSplit的可能有,但是比较少

FileInputFormat
CombineFileInputFormat 小文件合并
KeyValueTextInputFormat 读取通过KV形式
NLineInputFormat: 按行数切片
SequenceFileInputFormat 二进制
TextInputFormat 文本(按行分隔)

作业提交流程

图

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
job.waitForCompletion{
submit() {
connect() {
// 1)本地 2)YARN
return new Cluster(getConfiguration());
}

JobSubmitter submitter // 作业提交器
submitter.submitJobInternal(Job.this, cluster) {// *****
//1. 数据输出目录是否存在
//2. 输出有无配置
checkSpecs(job);
//submitJobDir存放作业相关的信息
Path submitJobDir = new Path(jobStagingArea, jobId.toString());
//拷贝
copyAndConfigureFiles(job, submitJobDir);

// job.xml维护job的相关信息
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
//计算Map(InputSplit)数量
int maps = writeSplits(job, submitJobDir); {
minSize = max(1,0) = 1
maxSize = Long.MAX_VALUE

blockSize = 32M
//计算splitSize
splitSize = computeSplitSize(blockSize, minSize, maxSize);{
Math.max(minSize, Math.min(maxSize, blockSize));
max(1, min(Long.MAX_VALUE, 32M))
max(1, 32M) = 32M
}
//SPLIT_SLOP=1.1 ((double)bytesRemaining)/splitSize>SPLIT_SLOP

}
//配置文件写入
writeConf(conf, submitJobFile);
}

}
}
  • 是否能够split

    isSplitable = true

    考虑: 生产上使用压缩时, 是否能够被split

    ​ 能切: BZip2

    ​ 不能切: Gzip Snappy

  • InputFormat的方法:

1
2
3
4
5
6
7
8
9
public abstract 
List<InputSplit> getSplits(JobContext context
) throws IOException, InterruptedException;

public abstract
RecordReader<K,V> createRecordReader(InputSplit split,
TaskAttemptContext context
) throws IOException,
InterruptedException;

DBInputFormat 读取数据库表的信息

  1. 引入mysql依赖

    1
    2
    3
    4
    5
    <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.47</version>
    </dependency>
  2. 自定义类实现DBWritable, Writable 接口

    • DBWritable 数据库操作
    • Writable 序列化
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    public class DeptWritable implements DBWritable, Writable {

    private int deptno;
    private String dname;
    private String loc;

    @Override
    public void write(PreparedStatement statement) throws SQLException {
    statement.setInt(1, deptno);
    statement.setString(2, dname);
    statement.setString(3, loc);
    }

    @Override
    public void readFields(ResultSet resultSet) throws SQLException {
    this.deptno = resultSet.getInt(1);
    this.dname = resultSet.getString(2);
    this.loc = resultSet.getString(3);
    }

    @Override
    public void write(DataOutput out) throws IOException {
    out.writeInt(deptno);
    out.writeUTF(dname);
    out.writeUTF(loc);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
    this.deptno = in.readInt();
    this.dname = in.readUTF();
    this.loc = in.readUTF();
    }

    @Override
    public String toString() {
    return deptno + "\t" + dname + "\t" + loc;
    }

    public DeptWritable() {
    }

    public DeptWritable(int deptno, String dname, String loc) {
    this.deptno = deptno;
    this.dname = dname;
    this.loc = loc;
    }

    //getter setter方法略
    }
  3. 获取数据库连接, 将数据读到并写入文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    public class MySQLDBInputFormatDriver {

    public static void main(String[] args) throws Exception {
    String output = "out";

    Configuration configuration = new Configuration();
    //通过DBConfiguration 设置数据库连接信息
    DBConfiguration.configureDB(configuration,
    "com.mysql.jdbc.Driver",
    "jdbc:mysql://hadoop001:3306/lxl",
    "root",
    "123456"
    );

    Job job = Job.getInstance(configuration);

    FileUtils.deleteTarget(configuration,output);

    job.setJarByClass(MySQLDBInputFormatDriver.class);

    job.setMapperClass(MyMapper.class);
    job.setMapOutputKeyClass(NullWritable.class);
    job.setMapOutputValueClass(DeptWritable.class);

    //设置读取的数据库表 字段 条件 排序字段
    String[] fieldNames = {"deptno", "dname", "loc"};
    DBInputFormat.setInput(job,DeptWritable.class,"dept", "deptno>10", "dname", fieldNames);
    FileOutputFormat.setOutputPath(job, new Path(output));

    final boolean result = job.waitForCompletion(true);
    System.exit(result ? 0 : 1);
    }

    public static class MyMapper extends Mapper<LongWritable, DeptWritable, NullWritable, DeptWritable> {

    @Override
    protected void map(LongWritable key, DeptWritable value, Context context) throws IOException, InterruptedException {
    context.write(NullWritable.get(), value);
    }

KeyValueTextInputFormat

需求: 统计每一行第一个单词相同的个数

driver中设置

1
2
3
4
5
 final Configuration configuration = new Configuration();
//在job获取之前设置才生效
configuration.set(KEY_VALUE_SEPERATOR,",");

final Job job = Job.getInstance(configuration);

mapper 输入的K V 类型Text Text

1
2
3
4
5
6
7
8
public static class MyMapper extends Mapper<Text, Text, Text, IntWritable> {
IntWritable ONE = new IntWritable(1);

@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
context.write(key, ONE);
}
}

reducer

1
2
3
4
5
6
7
8
9
10
11
public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int count = 0;
for (IntWritable value : values) {
count += value.get();
}

context.write(key, new IntWritable(count));
}
}

遇到的问题

image-20210228155439355

解决 : Mapper 和 Reducer类前面加static

Partiontion

  1. 自定义分区器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class AccessPartitioner extends Partitioner<Text, Access> {

@Override
public int getPartition(Text text, Access access, int numPartitions) {
final String telephone = text.toString();
if(telephone.startsWith("13")) {
return 0;
} else if (telephone.startsWith("15")) {
return 1;
} else {
return 2;
}
}
}

分区器的泛型类型 与 map输出的KV一致

  1. driver设置

    1
    2
    job.setPartitionerClass(AccessPartitioner.class);
    job.setNumReduceTasks(3); // 默认就是一个
    • 如果task数量>partioner数量, 会生成多的空文件
    • task的数量<partioner的数量, 报错 (默认只有一个task, 设置分区无效)
    • reducerTask数量决定了最终文件输出的个数
  2. Mapper

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    public static class MyMapper extends Mapper<LongWritable, Text, Text, Access> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String[] splits = value.toString().split("\t");
    final String phone = splits[1]; // 第二个字段:手机号
    long up = Long.parseLong(splits[splits.length-3]); //倒数第三个字段:上行流量
    long down = Long.parseLong(splits[splits.length-2]); // 倒数第二个字段:下行流量

    final Access access = new Access();
    access.setPhone(phone);
    access.setUp(up);
    access.setDown(down);
    access.setSum(up + down);

    context.write(new Text(phone), access);
    }
    }
  1. Reducer

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    public static class MyReducer extends Reducer<Text, Access, NullWritable, Access> {
    @Override
    protected void reduce(Text phone, Iterable<Access> values, Context context) throws IOException, InterruptedException {
    long ups = 0;
    long downs = 0;

    for (Access access : values) {
    ups += access.getUp();
    downs += access.getDown();
    }

    final Access access = new Access();
    access.setPhone(phone.toString());
    access.setUp(ups);
    access.setDown(downs);
    access.setSum(ups + downs);

    context.write(NullWritable.get(), access);
    }
    }

Combine预聚合

  • 运行再MapTask上

  • 减少数据的网络传输

  • 父类就是Reducer

  • 求平均数不能用

    WC 日志

    1
    2
    3
    4
    5
    6
    7
    8
    9
    Map-Reduce Framework
    Map input records=3
    Map output records=7
    Map output bytes=61
    Map output materialized bytes=81
    Input split bytes=108
    Combine input records=0
    Combine output records=0
    ...

    在job中设置combiner后

    1
    2
    //  设置combiner 由于业务逻辑和reduce一样,使用WordCountReducer
    // job.setCombinerClass(WordCountReducer.class);

    日志

    1
    2
    3
    4
    5
    6
    7
    8
    9
    Map-Reduce Framework
    Map input records=3
    Map output records=7
    Map output bytes=61
    Map output materialized bytes=49
    Input split bytes=108
    Combine input records=7
    Combine output records=4
    ...

Sort

自定义的类实现WritableComparable接口

1
2
3
4
5
6
7
8
9
10
11
public interface WritableComparable<T> extends Writable, Comparable<T> {
}

public class Access implements WritableComparable<Access> {
//省略序列化方法...

// 自定义排序方法
@Override
public int compareTo(Access o) {
return this.getSum() > o.getSum() ? 1 : -1;
}

全局排序: 只有一个文件输出 order by

分区排序: 每个reduce有序, 但不能全局有序 sort by

distinct

有哪些单词 , 关注KEY

Mapper

1
2
3
4
5
6
7
8
9
10
11
12
public static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

final String line = value.toString().toLowerCase();
final String[] splits = line.split(",");

for (String word : splits) {
context.write(new Text(word), NullWritable.get());
}
}
}

Reducer

1
2
3
4
5
6
public static class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}

group by

emp数据中 每个部门有多少人

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

IntWritable ONE = new IntWritable(1);

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

final String line = value.toString().toLowerCase();
final String[] splits = line.split("\t");

// 代码容错
if(splits.length == 8) {
context.write(new Text(splits[7].trim()), ONE);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 对每个key做聚合操作(次数相加)
int count = 0;
for (IntWritable value : values) {
count += value.get();
}

context.write(key, new IntWritable(count));
}
}

Join

  • Reduce Join/Common Join/Shuffle Join 都是要走shuffle的

  • 例如: emp e join dept d on e.deptno = d.deptno;

    Mapper
    join的条件做为key
    emp:(20, Info) tag=e
    dept: (20, Info) tag=d
    来自于不同的“表”的数据打个tag

    shuffle:
    	相同的key会被分到同一个reducer中去进行处理
    	不同的key是否可能会被分到同一个reducer中
    

    Reducer
    (20, (Info,Info)) 使用tag 区分Info来自哪个表

Reduce Join

Mapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public static class MyMapper extends Mapper<LongWritable, Text, IntWritable, Info> {
String name;

@Override
protected void setup(Context context) throws IOException, InterruptedException {
FileSplit fileSplit = (FileSplit)context.getInputSplit();
name = fileSplit.getPath().getName();
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String[] splits = value.toString().split("\t");

// 在同一个map中如何区分出来这一行数据到底是来自emp还是dept
if (name.contains("emp")) { // emp
final int empno = Integer.parseInt(splits[0].trim());
final String ename = splits[1].trim();
final int deptno = Integer.parseInt(splits[7].trim());

Info info = new Info();
info.setEmpno(empno);
info.setEname(ename);
info.setDeptno(deptno);
info.setDname("");
info.setFlag(1);

context.write(new IntWritable(deptno), info);
} else { // dept
Info info = new Info();
final int deptno = Integer.parseInt(splits[0].trim());
final String dname = splits[1].trim();

info.setDeptno(deptno);
info.setDname(dname);
info.setFlag(2);
info.setEmpno(0);
info.setEname("");

context.write(new IntWritable(deptno), info);
}
}
}

Reducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static class MyReducer extends Reducer<IntWritable, Info, Info, NullWritable> {
@Override
protected void reduce(IntWritable key, Iterable<Info> values, Context context) throws IOException, InterruptedException {

List<Info> infos = new ArrayList<>();

String dname = "";

// 真正完成join操作
for (Info info : values) {
if(info.getFlag() == 1) { // emp
Info tmp = new Info();
tmp.setEmpno(info.getEmpno());
tmp.setEname(info.getEname());
tmp.setDeptno(info.getDeptno());
infos.add(tmp);
} else { // dept
dname = info.getDname();
}
}


for(Info bean : infos) {
bean.setDname(dname);
context.write(bean, NullWritable.get());
}
}
}

日志

1
2
3
4
5
6
7
Reduce input groups=4
Reduce shuffle bytes=511
Reduce input records=18
Reduce output records=14
Spilled Records=36
Shuffled Maps =2
Failed Shuffles=0

MapJoin,BroadcastJoin

​ 有一方是小表
​ Join操作是在Map端完成,换句话说:不需要Reducer,那就说明没有Shuffle
​ 工作原理:
​ 小表加载到缓存中
​ 在读大表的每行数据时,直接从缓存中进行join
​ 弊端:小表不能太大,因为要加载缓存中

driver

1
2
3
4
5
6
7
8
9
...
// 5、设置Reducer阶段输出数据的key和value的类型

job.setNumReduceTasks(0); // 明确告诉MR框架没有reducer

// 小表放到缓存中
job.addCacheFile(new URI("data/join/dept.txt"));

...

Mapper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public static class MyMapper extends Mapper<LongWritable, Text, Info, NullWritable> {

Map<String,String> cacheMap = new HashMap<>();

@Override
protected void setup(Context context) throws IOException, InterruptedException {
//获取缓存中的信息
final String path = context.getCacheFiles()[0].getPath();

final BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path), "UTF-8"));

String line;
while (StringUtils.isNotEmpty(line = reader.readLine())) {
final String[] splits = line.split("\t");
//deptno,dname
cacheMap.put(splits[0], splits[1]);
}

IOUtils.closeStream(reader);

}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
final String[] splits = value.toString().split("\t");
final int empno = Integer.parseInt(splits[0].trim());
final String ename = splits[1].trim();
final int deptno = Integer.parseInt(splits[7].trim());

Info info = new Info();
info.setEmpno(empno);
info.setEname(ename);
info.setDeptno(deptno);

info.setDname(cacheMap.get(deptno+""));

context.write(info, NullWritable.get());
}
}

日志

1
2
3
4
5
6
7
Map-Reduce Framework
Map input records=14
Map output records=14
Input split bytes=114
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0