MapReduce矩阵乘法(进阶)

预备知识

多个MapReduce之间的嵌套
Hadoop 多文件输出MultipleOutputFormat
[]()

设计算法

package com.company;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.ArrayList;

/**
 * MapReduce下的矩阵乘法
 * 在做矩阵乘法前还增加了一轮预处理的MapReduce过程
 * 预处理:矩阵中的每个值由他周围的值决定,如果根据周围值计算出来的当前位置值与实际值相差大于阈值,则用计算出来的值替换掉实际值
 * 本算法特点,适合于稀疏矩阵
 *
 * @author qianqiu
 * @version 2.0
 * @hadoop版本 3.1.3
 * @输入 hdfs中的两个矩阵文件,所处路径/input/left_matrix,/input/right_matrix
 * @中间值 预处理后的矩阵在 /input_inter/*
 * @输出 hdfs中的结果矩阵,所处路径/output/*
 * @使用说明 只需修改矩阵规模相关的三个参数LeftMatrixL RightMatrixL MiddleMatrixL,并将所需的两个矩阵文件存放到hdfs中的指定路径中
 * @其他帮助 我自己写了一个辅助软件,用于生成本程序所需的两个随机矩阵,并用常规算法实现了两轮MapReduce所做的事情,以便对数据进行验证。
 * @矩阵文件格式 每行记录矩阵中的一个值,具体形式为i,j,value分别表示 行号,列号,值
 * 示例如下
 * 1,1,10
 * 1,2,5
 * 2,1,5
 * 2,2,10
 */

public class MatrixMultiply {
    //全局控制参数,如果你用了我的辅助软件,你只需要修改这些参数即可
    //假设矩阵A为m*n 矩阵B为n*l,则结果矩阵为m*l
    private static final int LeftMatrixL = 200;   // 左矩阵的m(行数)
    private static final int MiddleMatrixL = 100;   // 左矩阵的n(列数),右矩阵的n(行数)
    private static final int RightMatrixL = 150;  // 右矩阵的l(列数)
    private static final int threshold = 3;// 预处理时,修改矩阵的阈值,表示当计算值与原始值相差超过多少时,用计算值替换原始值。
    //其他全局控制参数
    private static final String left_matrix = "left_matrix"; // 左矩阵文件名,注意没后缀
    private static final String right_matrix = "right_matrix"; // 右矩阵文件名
    private static final String Delimeter = ",";// 分界符,用于分割坐标

    /**
     * 矩阵乘法进阶版,程序入口
     *
     * @param args 如果本程序通过命令行启动,则附加的参数会传给args
     * @throws Exception 抛出所有异常
     */
    public static void main(String[] args) throws Exception {
        // 使用 conf 处理配置信息
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://localhost:9000");//设置hdfs地址
        conf.set("mapreduce.job.jar", "/home/qianqiu/IdeaProjects/Matrix/out/artifacts/MatrixMultiply/MatrixMultiply.jar");//部署到集群中运行
        new GenericOptionsParser(conf, args);//根据命令行参数,配置conf
        // 删除中间过程和结果的输出目录
        Path outputPath = new Path("hdfs://localhost:9000/output");
        outputPath.getFileSystem(conf).delete(outputPath, true);
        outputPath = new Path("hdfs://localhost:9000/input_inter");
        outputPath.getFileSystem(conf).delete(outputPath, true);

        /*job1 预处理*/
        Job job1 = Job.getInstance(conf, "PreProcess");
        job1.setJarByClass(MatrixMultiply.class);
        job1.setMapperClass(MatrixMultiply.MyMapper1.class);
        job1.setReducerClass(MatrixMultiply.MyReducer1.class);
        job1.setMapOutputKeyClass(Text.class);//map阶段的输出的key
        job1.setMapOutputValueClass(Text.class);//map阶段的输出的value
        job1.setOutputKeyClass(Text.class);//reduce阶段的输出的key
        job1.setOutputValueClass(Text.class);//reduce阶段的输出的value

        //加入控制容器
        ControlledJob ctrljob1 = new ControlledJob(conf);
        ctrljob1.setJob(job1);
        //job1的输入输出文件路径
        FileInputFormat.addInputPath(job1, new Path("hdfs://localhost:9000/input/*"));
        FileOutputFormat.setOutputPath(job1, new Path("hdfs://localhost:9000/input_inter"));

        /*job2 矩阵乘法*/
        // 根据配置信息初始化 job2,并指定job的名字
        Job job2 = Job.getInstance(conf, "MartrixMultiply");
        // 设置job调用的类,这里就是本类
        job2.setJarByClass(MatrixMultiply.class);
        // 设置map过程调用的类
        job2.setMapperClass(MatrixMultiply.MyMapper2.class);
        // 设置合并过程调用的类(可选,我们所用的方法无法在map阶段合并,故不设置)
        // 设置reduce过程调用的类
        job2.setReducerClass(MatrixMultiply.MyReducer2.class);
        // 设置map输出键的类型为文本类型
        job2.setMapOutputKeyClass(Text.class);
        // 设置map输出值的类型为文本类型
        job2.setMapOutputValueClass(Text.class);
        // 设置reduce输出键的类型为文本类型
        job2.setOutputKeyClass(Text.class);
        // 设置reduce输出值的类型为整数类型
        job2.setOutputValueClass(IntWritable.class);

        // 指定job2要处理的输入数据存放的路径,这里指定了一个文件夹,表示文件夹中所有文件均作为输入
        FileInputFormat.setInputPaths(job2, new Path("hdfs://localhost:9000/input_inter/*"));
        // 指定job2输出结构存放路径(输出结构有多个文件,所以这里的路径为文件夹)
        FileOutputFormat.setOutputPath(job2, new Path("hdfs://localhost:9000/output"));

        //加入控制容器
        ControlledJob ctrljob2 = new ControlledJob(conf);
        ctrljob2.setJob(job2);

        // 设置多个作业直接的依赖关系
        // 意思为job2的启动,依赖于job1作业的完成
        ctrljob2.addDependingJob(ctrljob1);

        // 主控制容器,控制上面的总的两个子容器中的作业
        JobControl jobCtrl = new JobControl("MatrixMultiply_Pro");

        // 添加到总的JobControl里,进行控制
        jobCtrl.addJob(ctrljob1);
        jobCtrl.addJob(ctrljob2);

        // 在线程中启动
        Thread thread = new Thread(jobCtrl);
        thread.start();
        while (true) {
            if (jobCtrl.allFinished()) {
                System.out.println(jobCtrl.getSuccessfulJobList());
                jobCtrl.stop();
                break;
            }
        }

    }

    /**
     * 预处理:map过程调用的类,重写map方法
     */
    public static class MyMapper1 extends Mapper<LongWritable, Text, Text, Text> {
        private String flag; // 用于判断是左矩阵,还是右矩阵  左为left_martrix  右为right_martrix

        //对输入文件进行处理,获取到文件名
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSplit split = (FileSplit) context.getInputSplit();
            flag = split.getPath().getName();
        }

        @Override
        public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
            String text = value.toString();//如果运行正确,则text可能为1,1,10 表示矩阵第一行第一列的值为10
            if (text == null || text.equals("")) return;
            String[] lineList = text.split(Delimeter);//用分界符,分割text,正常应得到三部分
            if (lineList.length < 3) return;  // 如果line切割后少于三部分,说明输入数据异常
            String rowindex = lineList[0]; // i
            String colindex = lineList[1]; // j
            String elevalue = lineList[2]; // value

            //通过判断刚才获取到的输入文件名是否与预设值相同,来决定不同的处理方式
            if (flag.equals(left_matrix)) {//左矩阵的值
                if (rowindex.equals("1") || rowindex.equals(String.valueOf(LeftMatrixL)) || colindex.equals("1") || colindex.equals(String.valueOf(MiddleMatrixL))) {//四条边上
                    if (rowindex.equals("1") && colindex.equals("1")) {
                        //被派送到周围三个
                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + "2"), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+"2" + Delimeter + "1"), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+"2" + Delimeter + "2"), new Text("a#" + elevalue));

                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + colindex), new Text("a#" + elevalue+"#it"));
                    } else if (rowindex.equals("1") && colindex.equals(String.valueOf(MiddleMatrixL))) {
                        //被派送到周围三个
                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + String.valueOf(MiddleMatrixL - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+"2" + Delimeter + String.valueOf(MiddleMatrixL - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+"2" + Delimeter + String.valueOf(MiddleMatrixL)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + colindex), new Text("a#" + elevalue+"#it"));
                    } else if (rowindex.equals(String.valueOf(LeftMatrixL)) && colindex.equals("1")) {
                        //被派送到周围三个
                        context.write(new Text("a"+Delimeter+String.valueOf(LeftMatrixL - 1) + Delimeter + "1"), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(LeftMatrixL - 1) + Delimeter + "2"), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(LeftMatrixL) + Delimeter + "2"), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + colindex), new Text("a#" + elevalue+"#it"));
                    } else if (rowindex.equals(String.valueOf(LeftMatrixL)) && colindex.equals(String.valueOf(MiddleMatrixL))) {
                        //被派送到周围三个
                        context.write(new Text("a"+Delimeter+String.valueOf(LeftMatrixL - 1) + Delimeter + String.valueOf(MiddleMatrixL - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(LeftMatrixL - 1) + Delimeter + String.valueOf(MiddleMatrixL)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(LeftMatrixL) + Delimeter + String.valueOf(MiddleMatrixL - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + colindex), new Text("a#" + elevalue+"#it"));
                    } else if (rowindex.equals("1")) {//派送到周围5个,最上面一行
                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + colindex), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + colindex), new Text("a#" + elevalue+"#it"));
                    } else if (rowindex.equals(String.valueOf(LeftMatrixL))) {//派送到周围5个,最下面一行
                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + colindex), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + colindex), new Text("a#" + elevalue+"#it"));
                    } else if (colindex.equals("1")) {//派送到周围5个,最左边一列
                        context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + colindex), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + colindex), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + colindex), new Text("a#" + elevalue+"#it"));
                    } else if (colindex.equals(String.valueOf(MiddleMatrixL))) {//派送到周围5个,最右边一列
                        context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + colindex), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + colindex), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + colindex), new Text("a#" + elevalue+"#it"));
                    }

                } else {//派送到周围8个
                    context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
                    context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex))), new Text("a#" + elevalue));
                    context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
                    context.write(new Text("a"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
                    context.write(new Text("a"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
                    context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
                    context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex))), new Text("a#" + elevalue));
                    context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
                    context.write(new Text("a"+Delimeter+rowindex + Delimeter + colindex), new Text("a#" + elevalue+"#it"));
                }
            } else if (flag.equals(right_matrix)) {//右矩阵的值

                if (rowindex.equals("1") || rowindex.equals(String.valueOf(MiddleMatrixL)) || colindex.equals("1") || colindex.equals(String.valueOf(RightMatrixL))) {//四条边上
                    if (rowindex.equals("1") && colindex.equals("1")) {
                        //被派送到周围三个
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + 2), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+2 + Delimeter + 1), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+2 + Delimeter + 2), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + colindex), new Text("b#" + elevalue+"#it"));
                    } else if (rowindex.equals("1") && colindex.equals(String.valueOf(RightMatrixL))) {
                        //被派送到周围三个
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + String.valueOf(RightMatrixL - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+2 + Delimeter + String.valueOf(RightMatrixL - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+2 + Delimeter + String.valueOf(RightMatrixL)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + colindex), new Text("b#" + elevalue+"#it"));
                    } else if (rowindex.equals(String.valueOf(MiddleMatrixL)) && colindex.equals("1")) {
                        //被派送到周围三个
                        context.write(new Text("b"+Delimeter+String.valueOf(MiddleMatrixL - 1) + Delimeter + 1), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(MiddleMatrixL - 1) + Delimeter + 2), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(MiddleMatrixL) + Delimeter + 2), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + colindex), new Text("b#" + elevalue+"#it"));
                    } else if (rowindex.equals(String.valueOf(MiddleMatrixL)) && colindex.equals(String.valueOf(RightMatrixL))) {
                        //被派送到周围三个
                        context.write(new Text("b"+Delimeter+String.valueOf(MiddleMatrixL - 1) + Delimeter + String.valueOf(RightMatrixL - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(MiddleMatrixL - 1) + Delimeter + String.valueOf(RightMatrixL)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(MiddleMatrixL) + Delimeter + String.valueOf(RightMatrixL - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + colindex), new Text("b#" + elevalue+"#it"));
                    } else if (rowindex.equals("1")) {//派送到周围5个,最上面一行
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + colindex), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + colindex), new Text("b#" + elevalue+"#it"));
                    } else if (rowindex.equals(String.valueOf(MiddleMatrixL))) {//派送到周围5个,最下面一行
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + colindex), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + colindex), new Text("b#" + elevalue+"#it"));
                    } else if (colindex.equals("1")) {//派送到周围5个,最左边一列
                        context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + colindex), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + colindex), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + colindex), new Text("b#" + elevalue+"#it"));
                    } else if (colindex.equals(String.valueOf(RightMatrixL))) {//派送到周围5个,最右边一列
                        context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + colindex), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + colindex), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + colindex), new Text("b#" + elevalue+"#it"));
                    }
                } else {//派送到周围8个
                    context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
                    context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex))), new Text("b#" + elevalue));
                    context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
                    context.write(new Text("b"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
                    context.write(new Text("b"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
                    context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
                    context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex))), new Text("b#" + elevalue));
                    context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
                    context.write(new Text("b"+Delimeter+rowindex + Delimeter + colindex), new Text("b#" + elevalue+"#it"));
                }
            }
        }
    }

    /**
     * 预处理:reduce过程调用的类,重写reduce方法
     */
    public static class MyReducer1 extends Reducer<Text, Text, Text, Text> {

        private MultipleOutputs<Text, Text> mos;

        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            mos = new MultipleOutputs(context);//用于输出到不同文件
        }
        @Override
        public void cleanup(Context context) throws IOException, InterruptedException {
            mos.close();
        }

        @Override
        public void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
            String location = key.toString();//如果运行正确,则text可能为1,1,10 表示矩阵第一行第一列的值为10
            ArrayList<Integer> evalues = new ArrayList<Integer>();
            String mode="";
            int oldvalue=0;
            String[] keyList = location.split(Delimeter);
            mode=keyList[0];//快速确定当前处理的是左矩阵还是右矩阵

            //取出当前位置的值,以及当前位置周围一圈的值
            for (Text tempV : values) { // a#9
                String val = tempV.toString();
                String[] valList = val.split("#");
                if(val.endsWith("it")) {
                    oldvalue=Integer.parseInt(valList[1]);
                }else if (val.startsWith("a#")) {
                    evalues.add(Integer.parseInt(valList[1]));
                } else if (val.startsWith("b#")) {
                    evalues.add(Integer.parseInt(valList[1]));
                }
            }
            //用当前位置一圈的数计算当前值
            int result = 0;
            for (int i = 0; i < evalues.size(); i++) {
                result += evalues.get(i);
            }
            result=result/evalues.size();//当前点周围求均值算出来的
            Text outValue = new Text();//最终矩阵当前点的值
            if(Math.abs(oldvalue-result)> threshold)//如果误差大于允许值,则用新值
            {
                outValue.set(keyList[1]+Delimeter+keyList[2]+Delimeter+result);
            }
            else{//否则,使用旧值
                outValue.set(keyList[1]+Delimeter+keyList[2]+Delimeter+oldvalue);
            }
            Text text = new Text();
            text.set("");//此处我们让key为空,结果直接存放到value中,是由于一开始对mapreduce读写了解不深,
            // 第二轮的输入数据没有规定好,导致这里输出只能这样凑合,以符合第二轮的输入

            if (mode.equals("a"))//根据reduce过程获得的不同矩阵的值,写到不同文件中,作为下一个MapReduce过程输入
            {
                mos.write(text, outValue,left_matrix);
            }else{
                mos.write(text, outValue,right_matrix);
            }
        }
    }

    /**
     * 矩阵乘法:map过程调用的类,重写map方法
     */
    public static class MyMapper2 extends Mapper<LongWritable, Text, Text, Text> {
        private String flag; // 用于判断是左矩阵,还是右矩阵  左为left_martrix  右为right_martrix

        //对输入文件进行处理,获取到文件名
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSplit split = (FileSplit) context.getInputSplit();
            flag = split.getPath().getName();
        }

        /**
         * map 过程作用:将<行号,一行的内容> 转换为 <结果矩阵某个值的坐标,辅助信息+值>
         * 注意,输入文本的每行调用一次此map函数!对左右矩阵的处理均写在这里面,即此map是一个对于所有输入数据的行是通用的
         *
         * @param key     某行行号
         * @param value   某行内容
         * @param context 用于存放经过map过程后的输出键值 key value
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
            String text = value.toString();//如果运行正确,则text可能为1,1,10 表示矩阵第一行第一列的值为10
            if (text == null || text.equals("")) return;
            String[] lineList = text.split(Delimeter);//用分界符,分割text,正常应得到三部分
            if (lineList.length < 3) return;  // 如果line切割后少于三部分,说明输入数据异常
            String rowindex = lineList[0].trim(); // i
            String colindex = lineList[1]; // j
            String elevalue = lineList[2]; // value

            //通过判断刚才获取到的输入文件名是否与预设值相同,来决定不同的处理方式
            if (flag.startsWith(left_matrix)) {//左矩阵的行
                //左矩阵的每个值在矩阵乘法中要被使用l次(右矩阵列数),用来计算结果矩阵(m*l)m行中某一行的所有值(l列l个),故这里分发出l个
                for (int i = 1; i <= RightMatrixL; i++) {
                    //key为结果矩阵的坐标,在shuffe阶段,相同key的value被传递给同一个reduce(即计算结果矩阵某一个点所需的2l个值),key在shuffle完后即失去作用,所以为了知道2l个值的两两对应关系,故在value中加入辅助信息,表明是左矩阵的某一行第几列
                    context.write(new Text(rowindex + Delimeter + i), new Text("a#" + colindex + "#" + elevalue));
                }
            } else if (flag.startsWith(right_matrix)) {//右矩阵的行
                //右矩阵的每个值在矩阵乘法中要被使用m次(左矩阵行数),用来计算结果矩阵(m*l)l行中某一列的所有值(m行m个),故这里分发出m个
                for (int i = 1; i <= LeftMatrixL; i++) {
                    //key为结果矩阵的坐标,在shuffe阶段,相同key的value被传递给同一个reduce(即计算结果矩阵某一个点所需的2l个值),key在shuffle完后即失去作用,所以为了知道2l个值的两两对应关系,故在value中加入辅助信息,表明是右矩阵的某一列第几行
                    context.write(new Text(i + Delimeter + colindex), new Text("b#" + rowindex + "#" + elevalue));
                }
            }
        }
    }

    /**
     * 矩阵乘法:reduce过程调用的类,重写reduce方法
     */
    public static class MyReducer2 extends Reducer<Text, Text, Text, IntWritable> {
        private IntWritable outValue = new IntWritable();

        /**
         * @param key     结果矩阵某一点的坐标
         * @param values  计算某一点所需的2l个值
         * @param context 存放经过reduce过程后的输出键值
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        public void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            //初始化计算结果矩阵某一点所需要的数据,即左矩阵的某一行,右矩阵的某一列
            int[] valA = new int[MiddleMatrixL];
            int[] valB = new int[MiddleMatrixL];
            for (int i = 0; i < MiddleMatrixL; i++) {
                valA[i] = 0;
                valB[i] = 0;
            }
            //解析values列表,根据map过程添加的辅助信息,确定两两相乘所需的数
            for (Text tempV : values) { // b#3#9
                String val = tempV.toString();
                String[] valList = val.split("#");
                if (val.startsWith("a#")) {
                    valA[Integer.parseInt(valList[1]) - 1] = Integer.parseInt(valList[2]);
                } else if (val.startsWith("b#")) {
                    valB[Integer.parseInt(valList[1]) - 1] = Integer.parseInt(valList[2]);
                }
            }
            //两两相乘再相加,计算得到结果矩阵某一坐标的值
            int result = 0;
            for (int i = 0; i < MiddleMatrixL; i++) {
                result += valA[i] * valB[i];
            }
            //以key value的形式输出 结果矩阵的某一点坐标,值
            outValue.set(result);
            context.write(key, outValue);
        }
    }
}

存在的问题

输入输出格式

由于先写的第二轮矩阵乘法的算法,导致第二轮的输入格式未规定好。
以为输入只能是 行号 行内容 的形式。
第一轮的输出格式为了符号第二轮的输入,将key置为空,仅用value。这样子不够美观,且第二轮输入时需要先进行化简处理将前面的空格去掉,影响效率

通过查阅
MapReduce的常见输入格式之KeyValueTextInputFormat
发现这个可以指定的,输入采用行内容中的 key value的形式

大矩阵会报错:Container is running beyond memory limits

法一:
禁用 Virtual Memory Limit Checking
这样YARN将简单的忽略限制,修改yarn-site.xml:

   
  yarn.nodemanager.vmem-check-enabled   
  false   
 Whether virtual memory limits will be enforced for containers.   
      

The default for this setting is true.
法二:
增加虚拟内存和物理内存的比率,修改yarn-site.xml:

   
  yarn.nodemanager.vmem-pmem-ratio  
  5   
  Ratio between virtual memory to physical memory when setting memory limits for containers. Container allocations are expressed in terms of physical memory, and virtual memory usage is allowed to exceed this allocation by this ratio.    
      

未设置时默认是2.1
法三,增加分配的物理内存(这样被分配的虚拟内存也会增多)
假设每台机器有 48 GB 内存. 保留 8GB 供操作系统使用,给YARM分配 40GB
设置容器最小分配的内存 (yarn.scheduler.minimum-allocation-mb) = 2 GB.
如果想要 4 GB for Map task Containers, and 8 GB for Reduce tasks Containers.
在 mapred-site.xml中设置单个容器分配的内存(可用上限)

注意请按照法二的格式添加如下值

 mapreduce.map.memory.mb: 4096   
 mapreduce.reduce.memory.mb: 8192    

每一个容器将为Map.Reduce任务运行 JVMs(java虚拟机) 。 JVM heap size 应该设置成小于容器大小
同样在 mapred-site.xml中设置
注意请按照法二的格式添加如下值

 mapreduce.map.java.opts: -Xmx3072m  
 mapreduce.reduce.java.opts: -Xmx6144m   

最后不要忘记重启yarn

优化后代码

package com.company;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.ArrayList;

/**
 * MapReduce下的矩阵乘法
 * 在做矩阵乘法前还增加了一轮预处理的MapReduce过程
 * 预处理:矩阵中的每个值由他周围的值决定,如果根据周围值计算出来的当前位置值与实际值相差大于阈值,则用计算出来的值替换掉实际值
 * 本算法特点,适合于稀疏矩阵
 *
 * @author qianqiu
 * @version 2.1
 * @hadoop版本 3.1.3
 * @输入 hdfs中的两个矩阵文件,所处路径/input/left_matrix,/input/right_matrix
 * @中间值 预处理后的矩阵在 /input_inter/*
 * @输出 hdfs中的结果矩阵,所处路径/output/*
 * @使用说明 只需修改矩阵规模相关的三个参数LeftMatrixL RightMatrixL MiddleMatrixL,并将所需的两个矩阵文件存放到hdfs中的指定路径中
 * @其他帮助 我自己写了一个辅助软件,用于生成本程序所需的两个随机矩阵,并用常规算法实现了两轮MapReduce所做的事情,以便对数据进行验证。
 * @矩阵文件格式 每行记录矩阵中的一个值,具体形式为i,j value分别表示 行号,列号,值
 * 示例如下
 * 1,1  10
 * 1,2  5
 * 2,1  5
 * 2,2  10
 */

public class MatrixMultiply {
    //全局控制参数,如果你用了我的辅助软件,你只需要修改这些参数即可
    //假设矩阵A为m*n 矩阵B为n*l,则结果矩阵为m*l
    private static final int LeftMatrixL = 10;   // 左矩阵的m(行数)
    private static final int MiddleMatrixL = 8;   // 左矩阵的n(列数),右矩阵的n(行数)
    private static final int RightMatrixL = 6;  // 右矩阵的l(列数)
    private static final int threshold = 3;// 预处理时,修改矩阵的阈值,表示当计算值与原始值相差超过多少时,用计算值替换原始值。
    //其他全局控制参数
    private static final String left_matrix = "left_matrix"; // 左矩阵文件名,注意没后缀
    private static final String right_matrix = "right_matrix"; // 右矩阵文件名
    private static final String Delimeter = ",";// 分界符,用于分割坐标

    /**
     * 矩阵乘法进阶版,程序入口
     *
     * @param args 如果本程序通过命令行启动,则附加的参数会传给args
     * @throws Exception 抛出所有异常
     */
    public static void main(String[] args) throws Exception {
        // 使用 conf 处理配置信息
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://localhost:9000");//设置hdfs地址
        conf.set("mapreduce.job.jar", "/home/qianqiu/IdeaProjects/Matrix/out/artifacts/MatrixMultiply/MatrixMultiply.jar");//部署到集群中运行
        // 分隔符只是一个byte类型的数据,即便传入的是个字符串,只会取字符串的第一个字符作为分割符。默认的分隔符其实就是\t
        conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "\t");
        //设置输入格式为key value类型
        conf.set("mapreduce.job.inputformat.class", "org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat");

        new GenericOptionsParser(conf, args);//根据命令行参数,配置conf
        // 删除中间过程和结果的输出目录
        Path outputPath = new Path("hdfs://localhost:9000/output");
        outputPath.getFileSystem(conf).delete(outputPath, true);
        outputPath = new Path("hdfs://localhost:9000/input_inter");
        outputPath.getFileSystem(conf).delete(outputPath, true);

        /*job1 预处理*/
        Job job1 = Job.getInstance(conf, "PreProcess");
        job1.setJarByClass(MatrixMultiply.class);
        job1.setMapperClass(MatrixMultiply.MyMapper1.class);
        job1.setReducerClass(MatrixMultiply.MyReducer1.class);
        job1.setMapOutputKeyClass(Text.class);//map阶段的输出的key
        job1.setMapOutputValueClass(Text.class);//map阶段的输出的value
        job1.setOutputKeyClass(Text.class);//reduce阶段的输出的key
        job1.setOutputValueClass(Text.class);//reduce阶段的输出的value

        //加入控制容器
        ControlledJob ctrljob1 = new ControlledJob(conf);
        ctrljob1.setJob(job1);
        //job1的输入输出文件路径
        FileInputFormat.addInputPath(job1, new Path("hdfs://localhost:9000/input/*"));
        FileOutputFormat.setOutputPath(job1, new Path("hdfs://localhost:9000/input_inter"));

        /*job2 矩阵乘法*/
        // 根据配置信息初始化 job2,并指定job的名字
        Job job2 = Job.getInstance(conf, "MartrixMultiply");
        // 设置job调用的类,这里就是本类
        job2.setJarByClass(MatrixMultiply.class);
        // 设置map过程调用的类
        job2.setMapperClass(MatrixMultiply.MyMapper2.class);
        // 设置合并过程调用的类(可选,我们所用的方法无法在map阶段合并,故不设置)
        // 设置reduce过程调用的类
        job2.setReducerClass(MatrixMultiply.MyReducer2.class);
        // 设置map输出键的类型为文本类型
        job2.setMapOutputKeyClass(Text.class);
        // 设置map输出值的类型为文本类型
        job2.setMapOutputValueClass(Text.class);
        // 设置reduce输出键的类型为文本类型
        job2.setOutputKeyClass(Text.class);
        // 设置reduce输出值的类型为整数类型
        job2.setOutputValueClass(IntWritable.class);

        // 指定job2要处理的输入数据存放的路径,这里指定了一个文件夹,表示文件夹中所有文件均作为输入
        FileInputFormat.setInputPaths(job2, new Path("hdfs://localhost:9000/input_inter/*"));
        // 指定job2输出结构存放路径(输出结构有多个文件,所以这里的路径为文件夹)
        FileOutputFormat.setOutputPath(job2, new Path("hdfs://localhost:9000/output"));

        //加入控制容器
        ControlledJob ctrljob2 = new ControlledJob(conf);
        ctrljob2.setJob(job2);

        // 设置多个作业直接的依赖关系
        // 意思为job2的启动,依赖于job1作业的完成
        ctrljob2.addDependingJob(ctrljob1);

        // 主控制容器,控制上面的总的两个子容器中的作业
        JobControl jobCtrl = new JobControl("MatrixMultiply_Pro");

        // 添加到总的JobControl里,进行控制
        jobCtrl.addJob(ctrljob1);
        jobCtrl.addJob(ctrljob2);

        // 在线程中启动
        Thread thread = new Thread(jobCtrl);
        thread.start();
        while (true) {
            if (jobCtrl.allFinished()) {
                System.out.println(jobCtrl.getSuccessfulJobList());
                jobCtrl.stop();
                break;
            }
        }

    }

    /**
     * 预处理:map过程调用的类,重写map方法
     */
    public static class MyMapper1 extends Mapper<Text, Text, Text, Text> {
        private String flag; // 用于判断是左矩阵,还是右矩阵  左为left_martrix  右为right_martrix

        //对输入文件进行处理,获取到文件名
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSplit split = (FileSplit) context.getInputSplit();
            flag = split.getPath().getName();
        }

        @Override
        public void map(Text key, Text value, Mapper<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
            String location=key.toString();// i,j
            String text = value.toString();//如果运行正确,则text可能为1,1,10 表示矩阵第一行第一列的值为10
            String[] locationList = location.split(Delimeter);//用分界符,分割text,正常应得到三部分
            if (locationList.length < 2) return;  // 如果坐标切割后少于两部分,说明输入数据异常
            if (text == null || text.equals("")) return;//如果value为空,也表示异常
            String rowindex = locationList[0]; // i
            String colindex = locationList[1]; // j
            String elevalue = text; // value

            //通过判断刚才获取到的输入文件名是否与预设值相同,来决定不同的处理方式
            if (flag.equals(left_matrix)) {//左矩阵的值
                if (rowindex.equals("1") || rowindex.equals(String.valueOf(LeftMatrixL)) || colindex.equals("1") || colindex.equals(String.valueOf(MiddleMatrixL))) {//四条边上
                    if (rowindex.equals("1") && colindex.equals("1")) {
                        //被派送到周围三个
                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + "2"), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+"2" + Delimeter + "1"), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+"2" + Delimeter + "2"), new Text("a#" + elevalue));

                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + colindex), new Text("a#" + elevalue+"#it"));
                    } else if (rowindex.equals("1") && colindex.equals(String.valueOf(MiddleMatrixL))) {
                        //被派送到周围三个
                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + String.valueOf(MiddleMatrixL - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+"2" + Delimeter + String.valueOf(MiddleMatrixL - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+"2" + Delimeter + String.valueOf(MiddleMatrixL)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + colindex), new Text("a#" + elevalue+"#it"));
                    } else if (rowindex.equals(String.valueOf(LeftMatrixL)) && colindex.equals("1")) {
                        //被派送到周围三个
                        context.write(new Text("a"+Delimeter+String.valueOf(LeftMatrixL - 1) + Delimeter + "1"), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(LeftMatrixL - 1) + Delimeter + "2"), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(LeftMatrixL) + Delimeter + "2"), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + colindex), new Text("a#" + elevalue+"#it"));
                    } else if (rowindex.equals(String.valueOf(LeftMatrixL)) && colindex.equals(String.valueOf(MiddleMatrixL))) {
                        //被派送到周围三个
                        context.write(new Text("a"+Delimeter+String.valueOf(LeftMatrixL - 1) + Delimeter + String.valueOf(MiddleMatrixL - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(LeftMatrixL - 1) + Delimeter + String.valueOf(MiddleMatrixL)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(LeftMatrixL) + Delimeter + String.valueOf(MiddleMatrixL - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + colindex), new Text("a#" + elevalue+"#it"));
                    } else if (rowindex.equals("1")) {//派送到周围5个,最上面一行
                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + colindex), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + colindex), new Text("a#" + elevalue+"#it"));
                    } else if (rowindex.equals(String.valueOf(LeftMatrixL))) {//派送到周围5个,最下面一行
                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + colindex), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + colindex), new Text("a#" + elevalue+"#it"));
                    } else if (colindex.equals("1")) {//派送到周围5个,最左边一列
                        context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + colindex), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + colindex), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + colindex), new Text("a#" + elevalue+"#it"));
                    } else if (colindex.equals(String.valueOf(MiddleMatrixL))) {//派送到周围5个,最右边一列
                        context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + colindex), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + colindex), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
                        context.write(new Text("a"+Delimeter+rowindex + Delimeter + colindex), new Text("a#" + elevalue+"#it"));
                    }

                } else {//派送到周围8个
                    context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
                    context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex))), new Text("a#" + elevalue));
                    context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
                    context.write(new Text("a"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
                    context.write(new Text("a"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
                    context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("a#" + elevalue));
                    context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex))), new Text("a#" + elevalue));
                    context.write(new Text("a"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("a#" + elevalue));
                    context.write(new Text("a"+Delimeter+rowindex + Delimeter + colindex), new Text("a#" + elevalue+"#it"));
                }
            } else if (flag.equals(right_matrix)) {//右矩阵的值

                if (rowindex.equals("1") || rowindex.equals(String.valueOf(MiddleMatrixL)) || colindex.equals("1") || colindex.equals(String.valueOf(RightMatrixL))) {//四条边上
                    if (rowindex.equals("1") && colindex.equals("1")) {
                        //被派送到周围三个
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + 2), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+2 + Delimeter + 1), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+2 + Delimeter + 2), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + colindex), new Text("b#" + elevalue+"#it"));
                    } else if (rowindex.equals("1") && colindex.equals(String.valueOf(RightMatrixL))) {
                        //被派送到周围三个
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + String.valueOf(RightMatrixL - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+2 + Delimeter + String.valueOf(RightMatrixL - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+2 + Delimeter + String.valueOf(RightMatrixL)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + colindex), new Text("b#" + elevalue+"#it"));
                    } else if (rowindex.equals(String.valueOf(MiddleMatrixL)) && colindex.equals("1")) {
                        //被派送到周围三个
                        context.write(new Text("b"+Delimeter+String.valueOf(MiddleMatrixL - 1) + Delimeter + 1), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(MiddleMatrixL - 1) + Delimeter + 2), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(MiddleMatrixL) + Delimeter + 2), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + colindex), new Text("b#" + elevalue+"#it"));
                    } else if (rowindex.equals(String.valueOf(MiddleMatrixL)) && colindex.equals(String.valueOf(RightMatrixL))) {
                        //被派送到周围三个
                        context.write(new Text("b"+Delimeter+String.valueOf(MiddleMatrixL - 1) + Delimeter + String.valueOf(RightMatrixL - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(MiddleMatrixL - 1) + Delimeter + String.valueOf(RightMatrixL)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(MiddleMatrixL) + Delimeter + String.valueOf(RightMatrixL - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + colindex), new Text("b#" + elevalue+"#it"));
                    } else if (rowindex.equals("1")) {//派送到周围5个,最上面一行
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + colindex), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + colindex), new Text("b#" + elevalue+"#it"));
                    } else if (rowindex.equals(String.valueOf(MiddleMatrixL))) {//派送到周围5个,最下面一行
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + colindex), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + colindex), new Text("b#" + elevalue+"#it"));
                    } else if (colindex.equals("1")) {//派送到周围5个,最左边一列
                        context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + colindex), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + colindex), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + colindex), new Text("b#" + elevalue+"#it"));
                    } else if (colindex.equals(String.valueOf(RightMatrixL))) {//派送到周围5个,最右边一列
                        context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + colindex), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + colindex), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
                        context.write(new Text("b"+Delimeter+rowindex + Delimeter + colindex), new Text("b#" + elevalue+"#it"));
                    }
                } else {//派送到周围8个
                    context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
                    context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex))), new Text("b#" + elevalue));
                    context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) - 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
                    context.write(new Text("b"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
                    context.write(new Text("b"+Delimeter+rowindex + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
                    context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) - 1)), new Text("b#" + elevalue));
                    context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex))), new Text("b#" + elevalue));
                    context.write(new Text("b"+Delimeter+String.valueOf(Integer.parseInt(rowindex) + 1) + Delimeter + String.valueOf(Integer.parseInt(colindex) + 1)), new Text("b#" + elevalue));
                    context.write(new Text("b"+Delimeter+rowindex + Delimeter + colindex), new Text("b#" + elevalue+"#it"));
                }
            }
        }
    }

    /**
     * 预处理:reduce过程调用的类,重写reduce方法
     */
    public static class MyReducer1 extends Reducer<Text, Text, Text, Text> {

        private MultipleOutputs<Text, Text> mos;

        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            mos = new MultipleOutputs(context);//用于输出到不同文件
        }
        @Override
        public void cleanup(Context context) throws IOException, InterruptedException {
            mos.close();
        }

        @Override
        public void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
            String location = key.toString();//如果运行正确,则text可能为1,1,10 表示矩阵第一行第一列的值为10
            ArrayList<Integer> evalues = new ArrayList<Integer>();
            String mode="";
            int oldvalue=0;
            String[] keyList = location.split(Delimeter);
            mode=keyList[0];//快速确定当前处理的是左矩阵还是右矩阵

            //取出当前位置的值,以及当前位置周围一圈的值
            for (Text tempV : values) { // a#9
                String val = tempV.toString();
                String[] valList = val.split("#");
                if(val.endsWith("it")) {
                    oldvalue=Integer.parseInt(valList[1]);
                }else if (val.startsWith("a#")) {
                    evalues.add(Integer.parseInt(valList[1]));
                } else if (val.startsWith("b#")) {
                    evalues.add(Integer.parseInt(valList[1]));
                }
            }
            //用当前位置一圈的数计算当前值
            int result = 0;
            for (int i = 0; i < evalues.size(); i++) {
                result += evalues.get(i);
            }
            result=result/evalues.size();//当前点周围求均值算出来的
            Text outValue = new Text();//最终矩阵当前点的值
            if(Math.abs(oldvalue-result)> threshold)//如果误差大于允许值,则用新值
            {
                outValue.set(String.valueOf(result));
            }
            else{//否则,使用旧值
                outValue.set(String.valueOf(oldvalue));
            }
            Text text = new Text();
            text.set(keyList[1]+Delimeter+keyList[2]);//优化后的key,表示坐标

            if (mode.equals("a"))//根据reduce过程获得的不同矩阵的值,写到不同文件中,作为下一个MapReduce过程输入
            {
                mos.write(text, outValue,left_matrix);
            }else{
                mos.write(text, outValue,right_matrix);
            }
        }
    }

    /**
     * 矩阵乘法:map过程调用的类,重写map方法
     */
    public static class MyMapper2 extends Mapper<Text, Text, Text, Text> {
        private String flag; // 用于判断是左矩阵,还是右矩阵  左为left_martrix  右为right_martrix

        //对输入文件进行处理,获取到文件名
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSplit split = (FileSplit) context.getInputSplit();
            flag = split.getPath().getName();
        }

        /**
         * map 过程作用:将<行号,一行的内容> 转换为 <结果矩阵某个值的坐标,辅助信息+值>
         * 注意,输入文本的每行调用一次此map函数!对左右矩阵的处理均写在这里面,即此map是一个对于所有输入数据的行是通用的
         *
         * @param key     某行行号
         * @param value   某行内容
         * @param context 用于存放经过map过程后的输出键值 key value
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        public void map(Text key, Text value, Mapper<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException {
            String location=key.toString();// i,j  如果运行正确,则为1,1,表示矩阵第一行第一列的值
            String text = value.toString();//如果运行正确,则text可能为10 表示值为10
            String[] locationList = location.split(Delimeter);//用分界符,分割text,正常应得到三部分
            if (locationList.length < 2) return;  // 如果坐标切割后少于两部分,说明输入数据异常
            if (text == null || text.equals("")) return;//如果value为空,也表示异常
            String rowindex = locationList[0]; // i
            String colindex = locationList[1]; // j
            String elevalue = text; // value

            //通过判断刚才获取到的输入文件名是否与预设值相同,来决定不同的处理方式
            if (flag.startsWith(left_matrix)) {//左矩阵的行
                //左矩阵的每个值在矩阵乘法中要被使用l次(右矩阵列数),用来计算结果矩阵(m*l)m行中某一行的所有值(l列l个),故这里分发出l个
                for (int i = 1; i <= RightMatrixL; i++) {
                    //key为结果矩阵的坐标,在shuffe阶段,相同key的value被传递给同一个reduce(即计算结果矩阵某一个点所需的2l个值),key在shuffle完后即失去作用,所以为了知道2l个值的两两对应关系,故在value中加入辅助信息,表明是左矩阵的某一行第几列
                    context.write(new Text(rowindex + Delimeter + i), new Text("a#" + colindex + "#" + elevalue));
                }
            } else if (flag.startsWith(right_matrix)) {//右矩阵的行
                //右矩阵的每个值在矩阵乘法中要被使用m次(左矩阵行数),用来计算结果矩阵(m*l)l行中某一列的所有值(m行m个),故这里分发出m个
                for (int i = 1; i <= LeftMatrixL; i++) {
                    //key为结果矩阵的坐标,在shuffe阶段,相同key的value被传递给同一个reduce(即计算结果矩阵某一个点所需的2l个值),key在shuffle完后即失去作用,所以为了知道2l个值的两两对应关系,故在value中加入辅助信息,表明是右矩阵的某一列第几行
                    context.write(new Text(i + Delimeter + colindex), new Text("b#" + rowindex + "#" + elevalue));
                }
            }
        }
    }

    /**
     * 矩阵乘法:reduce过程调用的类,重写reduce方法
     */
    public static class MyReducer2 extends Reducer<Text, Text, Text, IntWritable> {
        private IntWritable outValue = new IntWritable();

        /**
         * @param key     结果矩阵某一点的坐标
         * @param values  计算某一点所需的2l个值
         * @param context 存放经过reduce过程后的输出键值
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        public void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            //初始化计算结果矩阵某一点所需要的数据,即左矩阵的某一行,右矩阵的某一列
            int[] valA = new int[MiddleMatrixL];
            int[] valB = new int[MiddleMatrixL];
            for (int i = 0; i < MiddleMatrixL; i++) {
                valA[i] = 0;
                valB[i] = 0;
            }
            //解析values列表,根据map过程添加的辅助信息,确定两两相乘所需的数
            for (Text tempV : values) { // b#3#9
                String val = tempV.toString();
                String[] valList = val.split("#");
                if (val.startsWith("a#")) {
                    valA[Integer.parseInt(valList[1]) - 1] = Integer.parseInt(valList[2]);
                } else if (val.startsWith("b#")) {
                    valB[Integer.parseInt(valList[1]) - 1] = Integer.parseInt(valList[2]);
                }
            }
            //两两相乘再相加,计算得到结果矩阵某一坐标的值
            int result = 0;
            for (int i = 0; i < MiddleMatrixL; i++) {
                result += valA[i] * valB[i];
            }
            //以key value的形式输出 结果矩阵的某一点坐标,值
            outValue.set(result);
            context.write(key, outValue);
        }
    }
}

再探讨

实际上本算法在效率上是存在问题的
Spark 大规模稀疏矩阵乘法
【Scala-spark.mlib】本地矩阵乘法计算效率比较(稠密稀疏哪家强?)
简单相乘法,可行论文
Spark速度比MapReduce快,不仅是内存计算