MapReduce矩阵乘法(命令行运行jar)

启动环境

start-dfs.sh //启动hdfs文件系统,如果已经启动,可以不执行
start-yarn.sh //启动yarn
sudo mapred historyserver start//启动历史记录服务器,用来查看mapreduce运行详情

构建jar包

请参考基础版中的构建过程
所需代码在本文章结尾

网页端上传数据

file
file
file
file
file
file

命令行运行程序

#切换到jar包所在目录
cd ~/IdeaProjects/Matrix/out/artifacts/MatrixMultiply
#最后三个参数为m n l ,表示两个矩阵的规模分别为m*n n*l
hadoop jar MatrixMultiply.jar com.company.MatrixMultiply 10 8 6
hadoop jar ./MatrixMultiply.jar com.company.MatrixMultiply 10 8 6

网页端查看结果

详细过程(108 86规模)
注:此辅助工具也为本人使用qt开发
file
file
file
file

代码

package com.company;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.ArrayList;

/**
 * MapReduce下的矩阵乘法(命令行运行)
 * 在做矩阵乘法前还增加了一轮预处理的MapReduce过程
 * 预处理:矩阵中的每个值由他周围的值决定,如果根据周围值计算出来的当前位置值与实际值相差大于阈值,则用计算出来的值替换掉实际值
 * 本算法特点,适合于稀疏矩阵
 *
 * @author qianqiu
 * @version 2.2
 * @hadoop版本 3.1.3
 * @输入 hdfs中的两个矩阵文件,所处路径/input/left_matrix,/input/right_matrix
 * @中间值 预处理后的矩阵在 /input_inter/*
 * @输出 hdfs中的结果矩阵,所处路径/output/*
 * @使用说明 只需 命令行 中修改矩阵规模相关的三个参数LeftMatrixL RightMatrixL MiddleMatrixL,并将所需的两个矩阵文件存放到hdfs中的指定路径中
 * @其他帮助 我自己写了一个辅助软件, 用于生成本程序所需的两个随机矩阵,并用常规算法实现了两轮MapReduce所做的事情,以便对数据进行验证。
 * @矩阵文件格式 每行记录矩阵中的一个值,具体形式为i,j value分别表示 行号,列号,值
 * 示例如下
 * 1,1  10
 * 1,2  5
 * 2,1  5
 * 2,2  10
 */

public class MatrixMultiply {
    //全局控制参数,如果你用了我的辅助软件,你只需要修改这些参数即可
    //假设矩阵A为m*n 矩阵B为n*l,则结果矩阵为m*l
    private static int LeftMatrixL = 10;   // 左矩阵的m(行数)
    private static int MiddleMatrixL = 8;   // 左矩阵的n(列数),右矩阵的n(行数)
    private static int RightMatrixL = 6;  // 右矩阵的l(列数)
    private static final int threshold = 3;// 预处理时,修改矩阵的阈值,表示当计算值与原始值相差超过多少时,用计算值替换原始值。
    //其他全局控制参数
    private static final String left_matrix = "left_matrix"; // 左矩阵文件名,注意没后缀
    private static final String right_matrix = "right_matrix"; // 右矩阵文件名
    private static final String Delimeter = ",";// 分界符,用于分割坐标

    /**
     * 矩阵乘法进阶版,程序入口
     *
     * @param args 如果本程序通过命令行启动,则附加的参数会传给args
     * @throws Exception 抛出所有异常
     */
    public static void main(String[] args) throws Exception {
        // 使用 conf 处理配置信息
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://localhost:9000");//设置hdfs地址
        // 分隔符只是一个byte类型的数据,即便传入的是个字符串,只会取字符串的第一个字符作为分割符。默认的分隔符其实就是\t
        conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "\t");
        //设置输入格式为key value类型
        conf.set("mapreduce.job.inputformat.class", "org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat");

        String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
        //输入输出加起来最少两个参数
        if (otherArgs.length < 3) {
            System.err.println("Usage: MatrixMultiply <m> <n> <l>");
            System.exit(2);
        }

        LeftMatrixL = Integer.parseInt(otherArgs[0]);
        MiddleMatrixL = Integer.parseInt(otherArgs[1]);
        RightMatrixL = Integer.parseInt(otherArgs[2]);
        // 删除中间过程和结果的输出目录
        Path outputPath = new Path("hdfs://localhost:9000/output");
        outputPath.getFileSystem(conf).delete(outputPath, true);
        outputPath = new Path("hdfs://localhost:9000/input_inter");
        outputPath.getFileSystem(conf).delete(outputPath, true);

        /*job1 预处理*/
        Job job1 = Job.getInstance(conf, "PreProcess");
        job1.setJarByClass(MatrixMultiply.class);
        job1.setMapperClass(MatrixMultiply.MyMapper1.class);
        job1.setReducerClass(MatrixMultiply.MyReducer1.class);
        job1.setMapOutputKeyClass(Text.class);//map阶段的输出的key
        job1.setMapOutputValueClass(Text.class);//map阶段的输出的value
        job1.setOutputKeyClass(Text.class);//reduce阶段的输出的key
        job1.setOutputValueClass(Text.class);//reduce阶段的输出的value

        //加入控制容器
        ControlledJob ctrljob1 = new ControlledJob(conf);
        ctrljob1.setJob(job1);
        //job1的输入输出文件路径
        FileInputFormat.addInputPath(job1, new Path("hdfs://localhost:9000/input/*"));
        FileOutputFormat.setOutputPath(job1, new Path("hdfs://localhost:9000/input_inter"));

        /*job2 矩阵乘法*/
        // 根据配置信息初始化 job2,并指定job的名字
        Job job2 = Job.getInstance(conf, "MartrixMultiply");
        // 设置job调用的类,这里就是本类
        job2.setJarByClass(MatrixMultiply.class);
        // 设置map过程调用的类
        job2.setMapperClass(MatrixMultiply.MyMapper2.class);
        // 设置合并过程调用的类(可选,我们所用的方法无法在map阶段合并,故不设置)
        // 设置reduce过程调用的类
        job2.setReducerClass(MatrixMultiply.MyReducer2.class);
        // 设置map输出键的类型为文本类型
        job2.setMapOutputKeyClass(Text.class);
        // 设置map输出值的类型为文本类型
        job2.setMapOutputValueClass(Text.class);
        // 设置reduce输出键的类型为文本类型
        job2.setOutputKeyClass(Text.class);
        // 设置reduce输出值的类型为整数类型
        job2.setOutputValueClass(IntWritable.class);

        // 指定job2要处理的输入数据存放的路径,这里指定了一个文件夹,表示文件夹中所有文件均作为输入
        FileInputFormat.setInputPaths(job2, new Path("hdfs://localhost:9000/input_inter/*"));
        // 指定job2输出结构存放路径(输出结构有多个文件,所以这里的路径为文件夹)
        FileOutputFormat.setOutputPath(job2, new Path("hdfs://localhost:9000/output"));

        //加入控制容器
        ControlledJob ctrljob2 = new ControlledJob(conf);
        ctrljob2.setJob(job2);

        // 设置多个作业直接的依赖关系
        // 意思为job2的启动,依赖于job1作业的完成
        ctrljob2.addDependingJob(ctrljob1);

        // 主控制容器,控制上面的总的两个子容器中的作业
        JobControl jobCtrl = new JobControl("MatrixMultiply_Pro");

        // 添加到总的JobControl里,进行控制
        jobCtrl.addJob(ctrljob1);
        jobCtrl.addJob(ctrljob2);

        // 在线程中启动
        Thread thread = new Thread(jobCtrl);
        thread.start();
        while (true) {
            if (jobCtrl.allFinished()) {
                System.out.println(jobCtrl.getSuccessfulJobList());
                jobCtrl.stop();
                break;
            }
        }

    }

    /**
     * 预处理:map过程调用的类,重写map方法
     */
    public static class MyMapper1 extends Mapper<Text, Text, Text, Text> {
        private String flag; // 用于判断是左矩阵,还是右矩阵  左为left_martrix  右为right_martrix

        //对输入文件进行处理,获取到文件名
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSplit split = (FileSplit) context.getInputSplit();
            flag = split.getPath().getName();
        }

        @Override
        public void map(Text key, Text value, Mapper<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
            String location = key.toString();// i,j
            String text = value.toString();//如果运行正确,则text可能为1,1,10 表示矩阵第一行第一列的值为10
            String[] locationList = location.split(Delimeter);//用分界符,分割text,正常应得到三部分
            if (locationList.length < 2) return;  // 如果坐标切割后少于两部分,说明输入数据异常
            if (text == null || text.equals("")) return;//如果value为空,也表示异常
            String rowindex = locationList[0]; // i
            String colindex = locationList[1]; // j
            String elevalue = text; // value

            //通过判断刚才获取到的输入文件名是否与预设值相同,来决定不同的处理方式
            if (flag.equals(left_matrix)) {//左矩阵的值
                if (rowindex.equals("1") || rowindex.equals(String.valueOf(LeftMatrixL)) || colindex.equals("1") || colindex.equals(String.valueOf(MiddleMatrixL))) {//四条边上
                    if (rowindex.equals("1") && colindex.equals("1")) {
                        //被派送到周围三个
                        context.write(new Text("a" + Delimeter + rowindex + Delimeter + "2"), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + "2" + Delimeter + "1"), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + "2" + Delimeter + "2"), new Text("a#" + elevalue));

                        context.write(new Text("a" + Delimeter + rowindex + Delimeter + colindex), new Text("a#" + elevalue + "#it"));
                    } else if (rowindex.equals("1") && colindex.equals(String.valueOf(MiddleMatrixL))) {
                        //被派送到周围三个
                        context.write(new Text("a" + Delimeter + rowindex + Delimeter + String.valueOf(MiddleMatrixL - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + "2" + Delimeter + String.valueOf(MiddleMatrixL - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + "2" + Delimeter + String.valueOf(MiddleMatrixL)), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + rowindex + Delimeter + colindex), new Text("a#" + elevalue + "#it"));
                    } else if (rowindex.equals(String.valueOf(LeftMatrixL)) && colindex.equals("1")) {
                        //被派送到周围三个
                        context.write(new Text("a" + Delimeter + String.valueOf(LeftMatrixL - 1) + Delimeter + "1"), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + String.valueOf(LeftMatrixL - 1) + Delimeter + "2"), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + String.valueOf(LeftMatrixL) + Delimeter + "2"), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + rowindex + Delimeter + colindex), new Text("a#" + elevalue + "#it"));
                    } else if (rowindex.equals(String.valueOf(LeftMatrixL)) && colindex.equals(String.valueOf(MiddleMatrixL))) {
                        //被派送到周围三个
                        context.write(new Text("a" + Delimeter + String.valueOf(LeftMatrixL - 1) + Delimeter + String.valueOf(MiddleMatrixL - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + String.valueOf(LeftMatrixL - 1) + Delimeter + String.valueOf(MiddleMatrixL)), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + String.valueOf(LeftMatrixL) + Delimeter + String.valueOf(MiddleMatrixL - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + rowindex + Delimeter + colindex), new Text("a#" + elevalue + "#it"));
                    } else if (rowindex.equals("1")) {//派送到周围5个,最上面一行
                        context.write(new Text("a" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + colindex), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + rowindex + Delimeter + colindex), new Text("a#" + elevalue + "#it"));
                    } else if (rowindex.equals(String.valueOf(LeftMatrixL))) {//派送到周围5个,最下面一行
                        context.write(new Text("a" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + colindex), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + rowindex + Delimeter + colindex), new Text("a#" + elevalue + "#it"));
                    } else if (colindex.equals("1")) {//派送到周围5个,最左边一列
                        context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + colindex), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + colindex), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + rowindex + Delimeter + colindex), new Text("a#" + elevalue + "#it"));
                    } else if (colindex.equals(String.valueOf(MiddleMatrixL))) {//派送到周围5个,最右边一列
                        context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + colindex), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + colindex), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a" + Delimeter + rowindex + Delimeter + colindex), new Text("a#" + elevalue + "#it"));
                    }

                } else {//派送到周围8个
                    context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
                    context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex))), new Text("a#" + elevalue));
                    context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
                    context.write(new Text("a" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
                    context.write(new Text("a" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
                    context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
                    context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex))), new Text("a#" + elevalue));
                    context.write(new Text("a" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
                    context.write(new Text("a" + Delimeter + rowindex + Delimeter + colindex), new Text("a#" + elevalue + "#it"));
                }
            } else if (flag.equals(right_matrix)) {//右矩阵的值

                if (rowindex.equals("1") || rowindex.equals(String.valueOf(MiddleMatrixL)) || colindex.equals("1") || colindex.equals(String.valueOf(RightMatrixL))) {//四条边上
                    if (rowindex.equals("1") && colindex.equals("1")) {
                        //被派送到周围三个
                        context.write(new Text("b" + Delimeter + rowindex + Delimeter + 2), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + 2 + Delimeter + 1), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + 2 + Delimeter + 2), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + rowindex + Delimeter + colindex), new Text("b#" + elevalue + "#it"));
                    } else if (rowindex.equals("1") && colindex.equals(String.valueOf(RightMatrixL))) {
                        //被派送到周围三个
                        context.write(new Text("b" + Delimeter + rowindex + Delimeter + String.valueOf(RightMatrixL - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + 2 + Delimeter + String.valueOf(RightMatrixL - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + 2 + Delimeter + String.valueOf(RightMatrixL)), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + rowindex + Delimeter + colindex), new Text("b#" + elevalue + "#it"));
                    } else if (rowindex.equals(String.valueOf(MiddleMatrixL)) && colindex.equals("1")) {
                        //被派送到周围三个
                        context.write(new Text("b" + Delimeter + String.valueOf(MiddleMatrixL - 1) + Delimeter + 1), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + String.valueOf(MiddleMatrixL - 1) + Delimeter + 2), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + String.valueOf(MiddleMatrixL) + Delimeter + 2), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + rowindex + Delimeter + colindex), new Text("b#" + elevalue + "#it"));
                    } else if (rowindex.equals(String.valueOf(MiddleMatrixL)) && colindex.equals(String.valueOf(RightMatrixL))) {
                        //被派送到周围三个
                        context.write(new Text("b" + Delimeter + String.valueOf(MiddleMatrixL - 1) + Delimeter + String.valueOf(RightMatrixL - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + String.valueOf(MiddleMatrixL - 1) + Delimeter + String.valueOf(RightMatrixL)), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + String.valueOf(MiddleMatrixL) + Delimeter + String.valueOf(RightMatrixL - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + rowindex + Delimeter + colindex), new Text("b#" + elevalue + "#it"));
                    } else if (rowindex.equals("1")) {//派送到周围5个,最上面一行
                        context.write(new Text("b" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + colindex), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + rowindex + Delimeter + colindex), new Text("b#" + elevalue + "#it"));
                    } else if (rowindex.equals(String.valueOf(MiddleMatrixL))) {//派送到周围5个,最下面一行
                        context.write(new Text("b" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + colindex), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + rowindex + Delimeter + colindex), new Text("b#" + elevalue + "#it"));
                    } else if (colindex.equals("1")) {//派送到周围5个,最左边一列
                        context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + colindex), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + colindex), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + rowindex + Delimeter + colindex), new Text("b#" + elevalue + "#it"));
                    } else if (colindex.equals(String.valueOf(RightMatrixL))) {//派送到周围5个,最右边一列
                        context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + colindex), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + colindex), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b" + Delimeter + rowindex + Delimeter + colindex), new Text("b#" + elevalue + "#it"));
                    }
                } else {//派送到周围8个
                    context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
                    context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex))), new Text("b#" + elevalue));
                    context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
                    context.write(new Text("b" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
                    context.write(new Text("b" + Delimeter + rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
                    context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
                    context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex))), new Text("b#" + elevalue));
                    context.write(new Text("b" + Delimeter + String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
                    context.write(new Text("b" + Delimeter + rowindex + Delimeter + colindex), new Text("b#" + elevalue + "#it"));
                }
            }
        }
    }

    /**
     * 预处理:reduce过程调用的类,重写reduce方法
     */
    public static class MyReducer1 extends Reducer<Text, Text, Text, Text> {

        private MultipleOutputs<Text, Text> mos;

        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            mos = new MultipleOutputs(context);//用于输出到不同文件
        }

        @Override
        public void cleanup(Context context) throws IOException, InterruptedException {
            mos.close();
        }

        @Override
        public void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
            String location = key.toString();//如果运行正确,则text可能为1,1,10 表示矩阵第一行第一列的值为10
            ArrayList<Integer> evalues = new ArrayList<Integer>();
            String mode = "";
            int oldvalue = 0;
            String[] keyList = location.split(Delimeter);
            mode = keyList[0];//快速确定当前处理的是左矩阵还是右矩阵

            //取出当前位置的值,以及当前位置周围一圈的值
            for (Text tempV : values) { // a#9
                String val = tempV.toString();
                String[] valList = val.split("#");
                if (val.endsWith("it")) {
                    oldvalue = Integer.parseInt(valList[1]);
                } else if (val.startsWith("a#")) {
                    evalues.add(Integer.parseInt(valList[1]));
                } else if (val.startsWith("b#")) {
                    evalues.add(Integer.parseInt(valList[1]));
                }
            }
            //用当前位置一圈的数计算当前值
            int result = 0;
            for (int i = 0; i < evalues.size(); i++) {
                result += evalues.get(i);
            }
            result = result / evalues.size();//当前点周围求均值算出来的
            Text outValue = new Text();//最终矩阵当前点的值
            if (Math.abs(oldvalue - result) > threshold)//如果误差大于允许值,则用新值
            {
                outValue.set(String.valueOf(result));
            } else {//否则,使用旧值
                outValue.set(String.valueOf(oldvalue));
            }
            Text text = new Text();
            text.set(keyList[1] + Delimeter + keyList[2]);//优化后的key,表示坐标

            if (mode.equals("a"))//根据reduce过程获得的不同矩阵的值,写到不同文件中,作为下一个MapReduce过程输入
            {
                mos.write(text, outValue, left_matrix);
            } else {
                mos.write(text, outValue, right_matrix);
            }
        }
    }

    /**
     * 矩阵乘法:map过程调用的类,重写map方法
     */
    public static class MyMapper2 extends Mapper<Text, Text, Text, Text> {
        private String flag; // 用于判断是左矩阵,还是右矩阵  左为left_martrix  右为right_martrix

        //对输入文件进行处理,获取到文件名
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSplit split = (FileSplit) context.getInputSplit();
            flag = split.getPath().getName();
        }

        /**
         * map 过程作用:将<行号,一行的内容> 转换为 <结果矩阵某个值的坐标,辅助信息+值>
         * 注意,输入文本的每行调用一次此map函数!对左右矩阵的处理均写在这里面,即此map是一个对于所有输入数据的行是通用的
         *
         * @param key     某行行号
         * @param value   某行内容
         * @param context 用于存放经过map过程后的输出键值 key value
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        public void map(Text key, Text value, Mapper<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
            String location = key.toString();// i,j  如果运行正确,则为1,1,表示矩阵第一行第一列的值
            String text = value.toString();//如果运行正确,则text可能为10 表示值为10
            String[] locationList = location.split(Delimeter);//用分界符,分割text,正常应得到三部分
            if (locationList.length < 2) return;  // 如果坐标切割后少于两部分,说明输入数据异常
            if (text == null || text.equals("")) return;//如果value为空,也表示异常
            String rowindex = locationList[0]; // i
            String colindex = locationList[1]; // j
            String elevalue = text; // value

            //通过判断刚才获取到的输入文件名是否与预设值相同,来决定不同的处理方式
            if (flag.startsWith(left_matrix)) {//左矩阵的行
                //左矩阵的每个值在矩阵乘法中要被使用l次(右矩阵列数),用来计算结果矩阵(m*l)m行中某一行的所有值(l列l个),故这里分发出l个
                for (int i = 1; i <= RightMatrixL; i++) {
                    //key为结果矩阵的坐标,在shuffe阶段,相同key的value被传递给同一个reduce(即计算结果矩阵某一个点所需的2l个值),key在shuffle完后即失去作用,所以为了知道2l个值的两两对应关系,故在value中加入辅助信息,表明是左矩阵的某一行第几列
                    context.write(new Text(rowindex + Delimeter + i), new Text("a#" + colindex + "#" + elevalue));
                }
            } else if (flag.startsWith(right_matrix)) {//右矩阵的行
                //右矩阵的每个值在矩阵乘法中要被使用m次(左矩阵行数),用来计算结果矩阵(m*l)l行中某一列的所有值(m行m个),故这里分发出m个
                for (int i = 1; i <= LeftMatrixL; i++) {
                    //key为结果矩阵的坐标,在shuffe阶段,相同key的value被传递给同一个reduce(即计算结果矩阵某一个点所需的2l个值),key在shuffle完后即失去作用,所以为了知道2l个值的两两对应关系,故在value中加入辅助信息,表明是右矩阵的某一列第几行
                    context.write(new Text(i + Delimeter + colindex), new Text("b#" + rowindex + "#" + elevalue));
                }
            }
        }
    }

    /**
     * 矩阵乘法:reduce过程调用的类,重写reduce方法
     */
    public static class MyReducer2 extends Reducer<Text, Text, Text, IntWritable> {
        private IntWritable outValue = new IntWritable();

        /**
         * @param key     结果矩阵某一点的坐标
         * @param values  计算某一点所需的2l个值
         * @param context 存放经过reduce过程后的输出键值
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        public void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            //初始化计算结果矩阵某一点所需要的数据,即左矩阵的某一行,右矩阵的某一列
            int[] valA = new int[MiddleMatrixL];
            int[] valB = new int[MiddleMatrixL];
            for (int i = 0; i < MiddleMatrixL; i++) {
                valA[i] = 0;
                valB[i] = 0;
            }
            //解析values列表,根据map过程添加的辅助信息,确定两两相乘所需的数
            for (Text tempV : values) { // b#3#9
                String val = tempV.toString();
                String[] valList = val.split("#");
                if (val.startsWith("a#")) {
                    valA[Integer.parseInt(valList[1]) - 1] = Integer.parseInt(valList[2]);
                } else if (val.startsWith("b#")) {
                    valB[Integer.parseInt(valList[1]) - 1] = Integer.parseInt(valList[2]);
                }
            }
            //两两相乘再相加,计算得到结果矩阵某一坐标的值
            int result = 0;
            for (int i = 0; i < MiddleMatrixL; i++) {
                result += valA[i] * valB[i];
            }
            //以key value的形式输出 结果矩阵的某一点坐标,值
            outValue.set(result);
            context.write(key, outValue);
        }
    }
}