启动环境
start-dfs.sh //启动hdfs文件系统,如果已经启动,可以不执行
start-yarn.sh //启动yarn
sudo mapred historyserver start//启动历史记录服务器,用来查看mapreduce运行详情
构建jar包
请参考基础版中的构建过程
所需代码在本文章结尾
网页端上传数据
命令行运行程序
#切换到jar包所在目录
cd ~/IdeaProjects/Matrix/out/artifacts/MatrixMultiply
#最后三个参数为m n l ,表示两个矩阵的规模分别为m*n n*l
hadoop jar MatrixMultiply.jar com.company.MatrixMultiply 10 8 6
hadoop jar ./MatrixMultiply.jar com.company.MatrixMultiply 10 8 6
网页端查看结果
详细过程(108 86规模)
注:此辅助工具也为本人使用qt开发
代码
package com.company;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.ArrayList;
/**
* MapReduce下的矩阵乘法(命令行运行)
* 在做矩阵乘法前还增加了一轮预处理的MapReduce过程
* 预处理:矩阵中的每个值由他周围的值决定,如果根据周围值计算出来的当前位置值与实际值相差大于阈值,则用计算出来的值替换掉实际值
* 本算法特点,适合于稀疏矩阵
*
* @author qianqiu
* @version 2.2
* @hadoop版本 3.1.3
* @输入 hdfs中的两个矩阵文件,所处路径/input/left_matrix,/input/right_matrix
* @中间值 预处理后的矩阵在 /input_inter/*
* @输出 hdfs中的结果矩阵,所处路径/output/*
* @使用说明 只需 命令行 中修改矩阵规模相关的三个参数LeftMatrixL RightMatrixL MiddleMatrixL,并将所需的两个矩阵文件存放到hdfs中的指定路径中
* @其他帮助 我自己写了一个辅助软件, 用于生成本程序所需的两个随机矩阵,并用常规算法实现了两轮MapReduce所做的事情,以便对数据进行验证。
* @矩阵文件格式 每行记录矩阵中的一个值,具体形式为i,j value分别表示 行号,列号,值
* 示例如下
* 1,1 10
* 1,2 5
* 2,1 5
* 2,2 10
*/
public class MatrixMultiply {
//全局控制参数,如果你用了我的辅助软件,你只需要修改这些参数即可
//假设矩阵A为m*n 矩阵B为n*l,则结果矩阵为m*l
private static int LeftMatrixL = 10; // 左矩阵的m(行数)
private static int MiddleMatrixL = 8; // 左矩阵的n(列数),右矩阵的n(行数)
private static int RightMatrixL = 6; // 右矩阵的l(列数)
private static final int threshold = 3;// 预处理时,修改矩阵的阈值,表示当计算值与原始值相差超过多少时,用计算值替换原始值。
//其他全局控制参数
private static final String left_matrix = "left_matrix"; // 左矩阵文件名,注意没后缀
private static final String right_matrix = "right_matrix"; // 右矩阵文件名
private static final String Delimeter = ",";// 分界符,用于分割坐标
/**
* 矩阵乘法进阶版,程序入口
*
* @param args 如果本程序通过命令行启动,则附加的参数会传给args
* @throws Exception 抛出所有异常
*/
public static void main(String[] args) throws Exception {
// 使用 conf 处理配置信息
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");//设置hdfs地址
// 分隔符只是一个byte类型的数据,即便传入的是个字符串,只会取字符串的第一个字符作为分割符。默认的分隔符其实就是\t
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "\t");
//设置输入格式为key value类型
conf.set("mapreduce.job.inputformat.class", "org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat");
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
//输入输出加起来最少两个参数
if (otherArgs.length < 3) {
System.err.println("Usage: MatrixMultiply <m> <n> <l>");
System.exit(2);
}
LeftMatrixL = Integer.parseInt(otherArgs[0]);
MiddleMatrixL = Integer.parseInt(otherArgs[1]);
RightMatrixL = Integer.parseInt(otherArgs[2]);
// 删除中间过程和结果的输出目录
Path outputPath = new Path("hdfs://localhost:9000/output");
outputPath.getFileSystem(conf).delete(outputPath, true);
outputPath = new Path("hdfs://localhost:9000/input_inter");
outputPath.getFileSystem(conf).delete(outputPath, true);
/*job1 预处理*/
Job job1 = Job.getInstance(conf, "PreProcess");
job1.setJarByClass(MatrixMultiply.class);
job1.setMapperClass(MatrixMultiply.MyMapper1.class);
job1.setReducerClass(MatrixMultiply.MyReducer1.class);
job1.setMapOutputKeyClass(Text.class);//map阶段的输出的key
job1.setMapOutputValueClass(Text.class);//map阶段的输出的value
job1.setOutputKeyClass(Text.class);//reduce阶段的输出的key
job1.setOutputValueClass(Text.class);//reduce阶段的输出的value
//加入控制容器
ControlledJob ctrljob1 = new ControlledJob(conf);
ctrljob1.setJob(job1);
//job1的输入输出文件路径
FileInputFormat.addInputPath(job1, new Path("hdfs://localhost:9000/input/*"));
FileOutputFormat.setOutputPath(job1, new Path("hdfs://localhost:9000/input_inter"));
/*job2 矩阵乘法*/
// 根据配置信息初始化 job2,并指定job的名字
Job job2 = Job.getInstance(conf, "MartrixMultiply");
// 设置job调用的类,这里就是本类
job2.setJarByClass(MatrixMultiply.class);
// 设置map过程调用的类
job2.setMapperClass(MatrixMultiply.MyMapper2.class);
// 设置合并过程调用的类(可选,我们所用的方法无法在map阶段合并,故不设置)
// 设置reduce过程调用的类
job2.setReducerClass(MatrixMultiply.MyReducer2.class);
// 设置map输出键的类型为文本类型
job2.setMapOutputKeyClass(Text.class);
// 设置map输出值的类型为文本类型
job2.setMapOutputValueClass(Text.class);
// 设置reduce输出键的类型为文本类型
job2.setOutputKeyClass(Text.class);
// 设置reduce输出值的类型为整数类型
job2.setOutputValueClass(IntWritable.class);
// 指定job2要处理的输入数据存放的路径,这里指定了一个文件夹,表示文件夹中所有文件均作为输入
FileInputFormat.setInputPaths(job2, new Path("hdfs://localhost:9000/input_inter/*"));
// 指定job2输出结构存放路径(输出结构有多个文件,所以这里的路径为文件夹)
FileOutputFormat.setOutputPath(job2, new Path("hdfs://localhost:9000/output"));
//加入控制容器
ControlledJob ctrljob2 = new ControlledJob(conf);
ctrljob2.setJob(job2);
// 设置多个作业直接的依赖关系
// 意思为job2的启动,依赖于job1作业的完成
ctrljob2.addDependingJob(ctrljob1);
// 主控制容器,控制上面的总的两个子容器中的作业
JobControl jobCtrl = new JobControl("MatrixMultiply_Pro");
// 添加到总的JobControl里,进行控制
jobCtrl.addJob(ctrljob1);
jobCtrl.addJob(ctrljob2);
// 在线程中启动
Thread thread = new Thread(jobCtrl);
thread.start();
while (true) {
if (jobCtrl.allFinished()) {
System.out.println(jobCtrl.getSuccessfulJobList());
jobCtrl.stop();
break;
}
}
}
/**
* 预处理:map过程调用的类,重写map方法
*/
public static class MyMapper1 extends Mapper<Text, Text, Text, Text> {
private String flag; // 用于判断是左矩阵,还是右矩阵 左为left_martrix 右为right_martrix
//对输入文件进行处理,获取到文件名
@Override
protected void setup(Context context) throws IOException, InterruptedException {
FileSplit split = (FileSplit) context.getInputSplit();
flag = split.getPath().getName();
}
@Override
public void map(Text key, Text value, Mapper<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
String location = key.toString();// i,j
String text = value.toString();//如果运行正确,则text可能为1,1,10 表示矩阵第一行第一列的值为10
String[] locationList = location.split(Delimeter);//用分界符,分割text,正常应得到三部分
if (locationList.length < 2) return; // 如果坐标切割后少于两部分,说明输入数据异常
if (text == null || text.equals("")) return;//如果value为空,也表示异常
String rowindex = locationList[0]; // i
String colindex = locationList[1]; // j
String elevalue = text; // value
//通过判断刚才获取到的输入文件名是否与预设值相同,来决定不同的处理方式
if (flag.equals(left_matrix)) {//左矩阵的值
if (rowindex.equals("1") || rowindex.equals(String.valueOf(LeftMatrixL)) || colindex.equals("1") || colindex.equals(String.valueOf(MiddleMatrixL))) {//四条边上
if (rowindex.equals("1") && colindex.equals("1")) {
//被派送到周围三个
context.write(new Text("a" + Delimeter + rowindex + Delimeter + "2"), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + "2" + Delimeter + "1"), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + "2" + Delimeter + "2"), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + rowindex + Delimeter + colindex), new Text("a#" + elevalue + "#it"));
} else if (rowindex.equals("1") && colindex.equals(String.valueOf(MiddleMatrixL))) {
//被派送到周围三个
context.write(new Text("a" + Delimeter + rowindex + Delimeter + String.valueOf(MiddleMatrixL - 1)), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + "2" + Delimeter + String.valueOf(MiddleMatrixL - 1)), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + "2" + Delimeter + String.valueOf(MiddleMatrixL)), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + rowindex + Delimeter + colindex), new Text("a#" + elevalue + "#it"));
} else if (rowindex.equals(String.valueOf(LeftMatrixL)) && colindex.equals("1")) {
//被派送到周围三个
context.write(new Text("a" + Delimeter + String.valueOf(LeftMatrixL - 1) + Delimeter + "1"), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + String.valueOf(LeftMatrixL - 1) + Delimeter + "2"), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + String.valueOf(LeftMatrixL) + Delimeter + "2"), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + rowindex + Delimeter + colindex), new Text("a#" + elevalue + "#it"));
} else if (rowindex.equals(String.valueOf(LeftMatrixL)) && colindex.equals(String.valueOf(MiddleMatrixL))) {
//被派送到周围三个
context.write(new Text("a" + Delimeter + String.valueOf(LeftMatrixL - 1) + Delimeter + String.valueOf(MiddleMatrixL - 1)), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + String.valueOf(LeftMatrixL - 1) + Delimeter + String.valueOf(MiddleMatrixL)), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + String.valueOf(LeftMatrixL) + Delimeter + String.valueOf(MiddleMatrixL - 1)), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + rowindex + Delimeter + colindex), new Text("a#" + elevalue + "#it"));
} else if (rowindex.equals("1")) {//派送到周围5个,最上面一行
context.write(new Text("a" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + colindex), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + rowindex + Delimeter + colindex), new Text("a#" + elevalue + "#it"));
} else if (rowindex.equals(String.valueOf(LeftMatrixL))) {//派送到周围5个,最下面一行
context.write(new Text("a" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + colindex), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + rowindex + Delimeter + colindex), new Text("a#" + elevalue + "#it"));
} else if (colindex.equals("1")) {//派送到周围5个,最左边一列
context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + colindex), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + colindex), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + rowindex + Delimeter + colindex), new Text("a#" + elevalue + "#it"));
} else if (colindex.equals(String.valueOf(MiddleMatrixL))) {//派送到周围5个,最右边一列
context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + colindex), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + colindex), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + rowindex + Delimeter + colindex), new Text("a#" + elevalue + "#it"));
}
} else {//派送到周围8个
context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex))), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex))), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
context.write(new Text("a" + Delimeter + rowindex + Delimeter + colindex), new Text("a#" + elevalue + "#it"));
}
} else if (flag.equals(right_matrix)) {//右矩阵的值
if (rowindex.equals("1") || rowindex.equals(String.valueOf(MiddleMatrixL)) || colindex.equals("1") || colindex.equals(String.valueOf(RightMatrixL))) {//四条边上
if (rowindex.equals("1") && colindex.equals("1")) {
//被派送到周围三个
context.write(new Text("b" + Delimeter + rowindex + Delimeter + 2), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + 2 + Delimeter + 1), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + 2 + Delimeter + 2), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + rowindex + Delimeter + colindex), new Text("b#" + elevalue + "#it"));
} else if (rowindex.equals("1") && colindex.equals(String.valueOf(RightMatrixL))) {
//被派送到周围三个
context.write(new Text("b" + Delimeter + rowindex + Delimeter + String.valueOf(RightMatrixL - 1)), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + 2 + Delimeter + String.valueOf(RightMatrixL - 1)), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + 2 + Delimeter + String.valueOf(RightMatrixL)), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + rowindex + Delimeter + colindex), new Text("b#" + elevalue + "#it"));
} else if (rowindex.equals(String.valueOf(MiddleMatrixL)) && colindex.equals("1")) {
//被派送到周围三个
context.write(new Text("b" + Delimeter + String.valueOf(MiddleMatrixL - 1) + Delimeter + 1), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + String.valueOf(MiddleMatrixL - 1) + Delimeter + 2), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + String.valueOf(MiddleMatrixL) + Delimeter + 2), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + rowindex + Delimeter + colindex), new Text("b#" + elevalue + "#it"));
} else if (rowindex.equals(String.valueOf(MiddleMatrixL)) && colindex.equals(String.valueOf(RightMatrixL))) {
//被派送到周围三个
context.write(new Text("b" + Delimeter + String.valueOf(MiddleMatrixL - 1) + Delimeter + String.valueOf(RightMatrixL - 1)), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + String.valueOf(MiddleMatrixL - 1) + Delimeter + String.valueOf(RightMatrixL)), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + String.valueOf(MiddleMatrixL) + Delimeter + String.valueOf(RightMatrixL - 1)), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + rowindex + Delimeter + colindex), new Text("b#" + elevalue + "#it"));
} else if (rowindex.equals("1")) {//派送到周围5个,最上面一行
context.write(new Text("b" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + colindex), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + rowindex + Delimeter + colindex), new Text("b#" + elevalue + "#it"));
} else if (rowindex.equals(String.valueOf(MiddleMatrixL))) {//派送到周围5个,最下面一行
context.write(new Text("b" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + colindex), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + rowindex + Delimeter + colindex), new Text("b#" + elevalue + "#it"));
} else if (colindex.equals("1")) {//派送到周围5个,最左边一列
context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + colindex), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + colindex), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + rowindex + Delimeter + colindex), new Text("b#" + elevalue + "#it"));
} else if (colindex.equals(String.valueOf(RightMatrixL))) {//派送到周围5个,最右边一列
context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + colindex), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + colindex), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + rowindex + Delimeter + colindex), new Text("b#" + elevalue + "#it"));
}
} else {//派送到周围8个
context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex))), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex))), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
context.write(new Text("b" + Delimeter + rowindex + Delimeter + colindex), new Text("b#" + elevalue + "#it"));
}
}
}
}
/**
* 预处理:reduce过程调用的类,重写reduce方法
*/
public static class MyReducer1 extends Reducer<Text, Text, Text, Text> {
private MultipleOutputs<Text, Text> mos;
@Override
public void setup(Context context) throws IOException, InterruptedException {
mos = new MultipleOutputs(context);//用于输出到不同文件
}
@Override
public void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
}
@Override
public void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
String location = key.toString();//如果运行正确,则text可能为1,1,10 表示矩阵第一行第一列的值为10
ArrayList<Integer> evalues = new ArrayList<Integer>();
String mode = "";
int oldvalue = 0;
String[] keyList = location.split(Delimeter);
mode = keyList[0];//快速确定当前处理的是左矩阵还是右矩阵
//取出当前位置的值,以及当前位置周围一圈的值
for (Text tempV : values) { // a#9
String val = tempV.toString();
String[] valList = val.split("#");
if (val.endsWith("it")) {
oldvalue = Integer.parseInt(valList[1]);
} else if (val.startsWith("a#")) {
evalues.add(Integer.parseInt(valList[1]));
} else if (val.startsWith("b#")) {
evalues.add(Integer.parseInt(valList[1]));
}
}
//用当前位置一圈的数计算当前值
int result = 0;
for (int i = 0; i < evalues.size(); i++) {
result += evalues.get(i);
}
result = result / evalues.size();//当前点周围求均值算出来的
Text outValue = new Text();//最终矩阵当前点的值
if (Math.abs(oldvalue - result) > threshold)//如果误差大于允许值,则用新值
{
outValue.set(String.valueOf(result));
} else {//否则,使用旧值
outValue.set(String.valueOf(oldvalue));
}
Text text = new Text();
text.set(keyList[1] + Delimeter + keyList[2]);//优化后的key,表示坐标
if (mode.equals("a"))//根据reduce过程获得的不同矩阵的值,写到不同文件中,作为下一个MapReduce过程输入
{
mos.write(text, outValue, left_matrix);
} else {
mos.write(text, outValue, right_matrix);
}
}
}
/**
* 矩阵乘法:map过程调用的类,重写map方法
*/
public static class MyMapper2 extends Mapper<Text, Text, Text, Text> {
private String flag; // 用于判断是左矩阵,还是右矩阵 左为left_martrix 右为right_martrix
//对输入文件进行处理,获取到文件名
@Override
protected void setup(Context context) throws IOException, InterruptedException {
FileSplit split = (FileSplit) context.getInputSplit();
flag = split.getPath().getName();
}
/**
* map 过程作用:将<行号,一行的内容> 转换为 <结果矩阵某个值的坐标,辅助信息+值>
* 注意,输入文本的每行调用一次此map函数!对左右矩阵的处理均写在这里面,即此map是一个对于所有输入数据的行是通用的
*
* @param key 某行行号
* @param value 某行内容
* @param context 用于存放经过map过程后的输出键值 key value
* @throws IOException
* @throws InterruptedException
*/
@Override
public void map(Text key, Text value, Mapper<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
String location = key.toString();// i,j 如果运行正确,则为1,1,表示矩阵第一行第一列的值
String text = value.toString();//如果运行正确,则text可能为10 表示值为10
String[] locationList = location.split(Delimeter);//用分界符,分割text,正常应得到三部分
if (locationList.length < 2) return; // 如果坐标切割后少于两部分,说明输入数据异常
if (text == null || text.equals("")) return;//如果value为空,也表示异常
String rowindex = locationList[0]; // i
String colindex = locationList[1]; // j
String elevalue = text; // value
//通过判断刚才获取到的输入文件名是否与预设值相同,来决定不同的处理方式
if (flag.startsWith(left_matrix)) {//左矩阵的行
//左矩阵的每个值在矩阵乘法中要被使用l次(右矩阵列数),用来计算结果矩阵(m*l)m行中某一行的所有值(l列l个),故这里分发出l个
for (int i = 1; i <= RightMatrixL; i++) {
//key为结果矩阵的坐标,在shuffe阶段,相同key的value被传递给同一个reduce(即计算结果矩阵某一个点所需的2l个值),key在shuffle完后即失去作用,所以为了知道2l个值的两两对应关系,故在value中加入辅助信息,表明是左矩阵的某一行第几列
context.write(new Text(rowindex + Delimeter + i), new Text("a#" + colindex + "#" + elevalue));
}
} else if (flag.startsWith(right_matrix)) {//右矩阵的行
//右矩阵的每个值在矩阵乘法中要被使用m次(左矩阵行数),用来计算结果矩阵(m*l)l行中某一列的所有值(m行m个),故这里分发出m个
for (int i = 1; i <= LeftMatrixL; i++) {
//key为结果矩阵的坐标,在shuffe阶段,相同key的value被传递给同一个reduce(即计算结果矩阵某一个点所需的2l个值),key在shuffle完后即失去作用,所以为了知道2l个值的两两对应关系,故在value中加入辅助信息,表明是右矩阵的某一列第几行
context.write(new Text(i + Delimeter + colindex), new Text("b#" + rowindex + "#" + elevalue));
}
}
}
}
/**
* 矩阵乘法:reduce过程调用的类,重写reduce方法
*/
public static class MyReducer2 extends Reducer<Text, Text, Text, IntWritable> {
private IntWritable outValue = new IntWritable();
/**
* @param key 结果矩阵某一点的坐标
* @param values 计算某一点所需的2l个值
* @param context 存放经过reduce过程后的输出键值
* @throws IOException
* @throws InterruptedException
*/
@Override
public void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
//初始化计算结果矩阵某一点所需要的数据,即左矩阵的某一行,右矩阵的某一列
int[] valA = new int[MiddleMatrixL];
int[] valB = new int[MiddleMatrixL];
for (int i = 0; i < MiddleMatrixL; i++) {
valA[i] = 0;
valB[i] = 0;
}
//解析values列表,根据map过程添加的辅助信息,确定两两相乘所需的数
for (Text tempV : values) { // b#3#9
String val = tempV.toString();
String[] valList = val.split("#");
if (val.startsWith("a#")) {
valA[Integer.parseInt(valList[1]) - 1] = Integer.parseInt(valList[2]);
} else if (val.startsWith("b#")) {
valB[Integer.parseInt(valList[1]) - 1] = Integer.parseInt(valList[2]);
}
}
//两两相乘再相加,计算得到结果矩阵某一坐标的值
int result = 0;
for (int i = 0; i < MiddleMatrixL; i++) {
result += valA[i] * valB[i];
}
//以key value的形式输出 结果矩阵的某一点坐标,值
outValue.set(result);
context.write(key, outValue);
}
}
}