MapReduce矩阵乘法(基础)

前提环境

ubuntu 20.04 LTS 中文版 下载地址https://cn.ubuntu.com/download/desktop
hadoop 3.1.3(包含hdfs和mapreduce组件) 下载地址https://archive.apache.org/dist/hadoop/common/hadoop-3.1.3/hadoop-3.1.3.tar.gz

如果你想使用虚拟机,那你的主机(如果是轻薄本的话)配置最少16G,cpu6核12逻辑处理器,9代i7以上,分给虚拟机至少12G,否则建议双系统,虚拟机真的会卡到你心态崩溃的!!!
游戏本则推荐虚拟机
原因,在linux里开idea真的很吃配置,由于我们使用的是ubuntu带桌面版,开机就占2,3G,hadoop环境一开,内存占用就到4G左右,idea再一开就5G多了,内存就用的差不多了。根本没有内存再去跑程序。

Hadoop基本介绍

以下表格参考其他资料并加入个人理解,不确保完全正确,但仍请大家认真看一遍。后续教程均依赖于此理解

Hadoop模式 HDFS特点 MapReduce特点
单机模式 Hadoop 默认模式为非分布式模式(本地模式),无需进行其他配置即可运行(其实啥也没开)。非分布式没有启动hdfs文件系统 MapReduce作业为单线程,使用linux本地文件系统。方便进行调试
伪分布式 启用了HDFS文件系统,节点既作为 NameNode 也作为 DataNode(即多线程方式模拟hdfs集群) 读取的是 HDFS 中的文件。伪分布式运行 MapReduce 作业的方式跟单机模式相同,仍为单线程,方便进行调试
伪分布式+yarn(几乎就是分布式) 启用了HDFS文件系统,hdfs仍是单机多线程,但是启用了用于集群管理的yarn,MapReduce以多线程方式运行,可以查看MapReduce运行时间。已经和分布式很相似了(仅仅差在机器数量,只需再配置ssh用于机器间通信并配置/etc/hosts和workers文件就是分布式了) 读取的是 HDFS 中的文件。但为多线程,不方便进行调试。但可通过yarn看到单个map与reduce所费时间
分布式 不用使用多线程模拟集群,而是本身就在不同的机器上 多进程,多线程,不易调试

请注意,hadoop是个大的概念,我们这里说的伪分布式
指的是hadoop生态中hdfs组件是伪分布式的(hdfs采用多线程模拟分布式,故称为伪分布式),MapReduce访问的是hdfs文件系统中的文件。

我们课设中使用伪分布式来进行开发和调试,为什么不使用单机模式呢?因为最终还是要部署到分布式中获取运行数据,用伪分布式更方便过渡到最终环境。否则代码还需做较多调整
最后才使用伪分布式+yarn来模拟分布式的运行,获取具体Map/reduce过程运行时间的数据

单机/伪分布式模式(单线程)开发与调试

搭建伪分布式环境

基础环境配置

1.添加hadoop用户

sudo useradd -m hadoop -s /bin/bash #新建用户
sudo passwd hadoop #设置用户密码
sudo adduser hadoop sudo #赋予sudo权限

2.更新软件源的软件列表
sudo apt-get update
3.添加本机免密码ssh登录

cd ~/.ssh/                     # 若没有该目录,请先执行一次ssh localhost
ssh-keygen -t rsa              # 会有提示,都按回车就可以
cat ./id_rsa.pub >> ./authorized_keys  # 加入授权

4.安装jdk8

sudo apt list|grep jdk # 在软件源列表中查找可安装的jdk列表,找出含1.8或者8字样的名称
sudo apt install -y java-1.8.0-openjdk.x86_64 # java-1.8.0-openjdk.x86_64如果不存在,则替换成上一步查找到的其他类似的名字

java -version # 查看当前使用的java版本,用于验证是否安装成功

安装hadoop

hadoop3.1.3 下载地址https://archive.apache.org/dist/hadoop/common/hadoop-3.1.3/hadoop-3.1.3.tar.gz

sudo tar -zxf ~/下载/hadoop-3.1.3.tar.gz -C /usr/local    # 解压到/usr/local中
cd /usr/local/
sudo mv ./hadoop-3.1.3/ ./hadoop            # 将文件夹名改为hadoop
sudo chown -R hadoop ./hadoop       # 修改文件权限,赋予hadoop用户对此文件的权限

配置hadoop为伪分布式(即开启hdfs)

gedit ./etc/hadoop/core-site.xml
修改成如下内容

<configuration>
    <property>
        <name>hadoop.tmp.dir</name>
        <value>file:/usr/local/hadoop/tmp</value>
        <description>Abase for other temporary directories.</description>
    </property>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
    </property>
</configuration>

gedit ./etc/hadoop/hdfs-site.xml
修改成如下内容

<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
    <property>
        <name>dfs.namenode.name.dir</name>
        <value>file:/usr/local/hadoop/tmp/dfs/name</value>
    </property>
    <property>
        <name>dfs.datanode.data.dir</name>
        <value>file:/usr/local/hadoop/tmp/dfs/data</value>
    </property>
</configuration>

格式化名称节点

cd /usr/local/hadoop
./bin/hdfs namenode -format

关闭hdfs权限验证

cd /usr/local/hadoop/etc/hadoop
sudo vim hdfs-site.xml
添加红圈所示内容
file
wq 保存
注意,需要重启hdfs才会生效

file

启动hdfs

cd /usr/local/hadoop
./sbin/start-dfs.sh

网页端访问hdfs

http://localhost:9870/ hdfs文件系统
远比命令行好用的多
这里附赠修改文件权限的命令,放开所有权限,方便调试
sudo hadoop fs -chmod 777 /input

设置网页端访问所用的用户

如果不配置,则会有下图所示错误
file

配置方法:
修改/usr/local/hadoop/etc/hadoop目录下的core-site.xml

<!-- 配置 HDFS 网页登录使用的静态用户为 hdfs -->
<property>
    <name>hadoop.http.staticuser.user</name>
    <value>用户名</value>
</property>

结果如下图
file

将hadoop加入环境变量

否则每次必须cd /usr/local/hadoop到这个目录,才能用
./sbin/start-dfs.sh这些命令
vim ~/.bashrc
添加如下内容
file
source ~/.bashrc //执行此命令,让修改生效

创建/input目录

hadoop dfs -mkdir /input

使用idea开发与调试mapreduce程序

下载安装idea

新建工程并修改文件名

file
file
file
右键修改文件名为MatrixMultiply
右键文件->refactor->rename->修改为MatrixMultiply
file

添加代码所依赖的库文件

点击左上角File->Project Structure
再如图点击+号
file
添加下图所示的库文件
file
注:这些库文件中既包含了基础版课设所需库文件,也包含了进阶版课设所需的库文件。
注意不要偷懒一次加入整个目录,加的过多也会报错的~

设计算法与实现代码

将如下内容复制到MatrixMultiply.java文件中

package com.company;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;//hadoop独有的配置文件管理系统,处理配置信息
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/**
 * MapReduce下的矩阵乘法(采用简单相乘法,map过程负责将文件内容转化为键值对,每个reduce过程根据 shuffle传递过来的左矩阵一行和右矩阵的一列的值 计算出结果矩阵一个点的值)
 * 本算法特点,适合于稀疏矩阵
 * @author qianqiu
 * @version 1.0
 * @hadoop版本 3.1.3
 * @输入 hdfs中的两个矩阵文件,所处路径/input/left_martrix,/input/right_martrix
 * @输出 hdfs中的结果矩阵,所处路径/output/*
 * @使用说明 只需修改三个参数LeftMatrixL RightMatrixL MiddleMatrixL,并将所需的两个矩阵文件存放到hdfs中的指定路径中
 * @矩阵文件格式 每行记录矩阵中的一个值,具体形式为i,j,value分别表示 行号,列号,值
 * 示例如下
 * 1,1,10
 * 1,2,5
 * 2,1,5
 * 2,2,10
 */

public class MatrixMultiply {
    //假设矩阵A为m*n 矩阵B为n*l
    private static final int LeftMatrixL = 4;   // 左矩阵的m(行数)
    private static final int RightMatrixL = 2;  // 右矩阵的l(列数)
    private static final int MiddleMatrixL = 3;   // 左矩阵的n(列数),右矩阵的n(行数)
    private static String left_martrix = "left_matrix"; // 左矩阵文件名,注意没带后缀
    private static String right_martrix = "right_matrix"; // 右矩阵文件名
    private static String Delimeter = ",";// 分界符

    /**
     * 矩阵乘法,程序入口
     * @param args 如果本程序通过命令行启动,则附加的参数会传给args
     * @throws Exception 抛出所有异常
     */
    public static void main(String[] args) throws Exception {

        // 使用 conf 处理配置信息
        // 详细介绍网址 https://blog.csdn.net/nfbing/article/details/19562189
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://localhost:9000");//设置hdfs地址
        // GenericOptionsParser 解析命令行参数,来处理配置
        // 详细介绍网址 https://blog.csdn.net/xin15200793067/article/details/12623797
        new GenericOptionsParser(conf, args);//根据命令行参数,配置conf

        // 删除输出目录
        Path outputPath = new Path("hdfs://localhost:9000/output");
        outputPath.getFileSystem(conf).delete(outputPath, true);

        // 根据配置信息初始化 job,并指定job的名字
        // 详细介绍网址 https://blog.csdn.net/wohaqiyi/article/details/85316531
        Job job = Job.getInstance(conf, "MartrixMultiply");
        // 设置job调用的类,这里就是本类
        job.setJarByClass(MatrixMultiply.class);
        // 设置map过程调用的类
        job.setMapperClass(MatrixMultiply.MyMapper.class);
        // 设置合并过程调用的类(可选,我们所用的方法无法在map阶段合并,故不设置)
        // 设置reduce过程调用的类
        job.setReducerClass(MatrixMultiply.MyReducer.class);

        // 设置map输出键的类型为文本类型
        job.setMapOutputKeyClass(Text.class);
        // 设置map输出值的类型为文本类型
        job.setMapOutputValueClass(Text.class);
        // 设置reduce输出键的类型为文本类型
        job.setOutputKeyClass(Text.class);
        // 设置reduce输出值的类型为整数类型
        job.setOutputValueClass(IntWritable.class);

        // 指定要处理的输入数据存放的路径,这里指定了一个文件夹,表示文件夹中所有文件均作为输入
        FileInputFormat.setInputPaths(job, new Path("hdfs://localhost:9000/input"));
        // 指定输出结构存放路径(输出结构有多个文件,所以这里的路径为文件夹)
        FileOutputFormat.setOutputPath(job, new Path("hdfs://localhost:9000/output"));
        // 开始执行,并等待执行完毕
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    /**
     * map过程调用的类,重写map方法
     */
    public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
        private String flag; // 用于判断是左矩阵,还是右矩阵  左为left_martrix  右为right_martrix

        //对输入文件进行处理,获取到文件名
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSplit split = (FileSplit)context.getInputSplit();
            flag = split.getPath().getName();
        }

        /**
         * map 过程作用:将<行号,一行的内容> 转换为 <结果矩阵某个值的坐标,辅助信息+值>
         * 注意,输入文本的每行调用一次此map函数!对左右矩阵的处理均写在这里面,即此map是一个对于所有输入数据的行是通用的
         * @param key 某行行号
         * @param value 某行内容
         * @param context 用于存放经过map过程后的输出键值 key value
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        public void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
            String text = value.toString();//如果运行正确,则text可能为1,1,10 表示矩阵第一行第一列的值为10
            if(text==null||text.equals("")) return;
            String[] lineList = text.split(Delimeter);//用分界符,分割text,正常应得到三部分
            if (lineList.length < 3) return;  // 如果line切割后少于三部分,说明输入数据异常
            String rowindex = lineList[0]; // i
            String colindex = lineList[1]; // j
            String elevalue = lineList[2]; // value

            //通过判断刚才获取到的输入文件名是否与预设值相同,来决定不同的处理方式
            if(flag.equals(left_martrix)){//左矩阵的行
                //左矩阵的每个值在矩阵乘法中要被使用l次(右矩阵列数),用来计算结果矩阵(m*l)m行中某一行的所有值(l列l个),故这里分发出l个
                for(int i=1;i<=RightMatrixL;i++){
                    //key为结果矩阵的坐标,在shuffe阶段,相同key的value被传递给同一个reduce(即计算结果矩阵某一个点所需的2l个值),key在shuffle完后即失去作用,所以为了知道2l个值的两两对应关系,故在value中加入辅助信息,表明是左矩阵的某一行第几列
                    context.write(new Text(rowindex+Delimeter+i),new Text("a#"+colindex+"#"+elevalue));
                }
            }else if(flag.equals(right_martrix)){//右矩阵的行
                //右矩阵的每个值在矩阵乘法中要被使用m次(左矩阵行数),用来计算结果矩阵(m*l)l行中某一列的所有值(m行m个),故这里分发出m个
                for(int i=1;i<=LeftMatrixL;i++){
                    //key为结果矩阵的坐标,在shuffe阶段,相同key的value被传递给同一个reduce(即计算结果矩阵某一个点所需的2l个值),key在shuffle完后即失去作用,所以为了知道2l个值的两两对应关系,故在value中加入辅助信息,表明是右矩阵的某一列第几行
                    context.write(new Text(i+Delimeter+colindex),new Text("b#"+rowindex+"#"+elevalue));
                }
            }
        }
    }
    /**
     * reduce过程调用的类,重写reduce方法
     */
    public static class MyReducer extends Reducer<Text, Text, Text, IntWritable> {
        private IntWritable outValue = new IntWritable();

        /**
         *
         * @param key 结果矩阵某一点的坐标
         * @param values 计算某一点所需的2l个值
         * @param context 存放经过reduce过程后的输出键值
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        public void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            //初始化计算结果矩阵某一点所需要的数据,即左矩阵的某一行,右矩阵的某一列
            int[] valA = new int[MiddleMatrixL];
            int[] valB = new int[MiddleMatrixL];
            for(int i=0;i<MiddleMatrixL;i++){
                valA[i] = 0;
                valB[i] = 0;
            }
            //解析values列表,根据map过程添加的辅助信息,确定两两相乘所需的数
            for(Text tempV:values){ // b#3#9
                String val = tempV.toString();
                String[] valList = val.split("#");
                if(val.startsWith("a#")){
                    valA[Integer.parseInt(valList[1])-1] = Integer.parseInt(valList[2]);
                }else if(val.startsWith("b#")){
                    valB[Integer.parseInt(valList[1])-1] = Integer.parseInt(valList[2]);
                }
            }
            //两两相乘再相加,计算得到结果矩阵某一坐标的值
            int result = 0;
            for(int i=0;i<MiddleMatrixL;i++){
                result += valA[i]*valB[i];
            }
            //以key value的形式输出 结果矩阵的某一点坐标,值
            outValue.set(result);
            context.write(key,outValue);
        }
    }
}

参考来源
其他参考代码1
其他参考代码2
其他参考代码3
多种算法介绍

准备运行所需数据

新建文件left_matrix 注意没有后缀名,用记事本打开,添加如下内容
1,1,1
2,1,1
3,1,1
4,1,1
1,2,2
2,2,2
3,2,2
4,2,2
1,3,3
2,3,3
3,3,3
4,3,3
新建right_matrix,内容如下
1,1,2
2,1,2
3,1,2
1,2,1
2,2,1
3,2,1

打开http://localhost:9870/
在/下新建input目录,并点击上传按钮上传新建的两个文件
注意上传可能会提示失败(等待5s左右其实就已经上传好了),如果刷新后出现两个文件,说明其实是成功了,继续下一步

调试与运行

之后即可运行,并正常打断点调试

查看运行后的结果

hadoop dfs -cat /output/*

注意每次运行前需要删除/output文件。如果懒,可以在代码中Job job = Job.getInstance(conf, "word count");之前加入

        /* 删除输出目录 */
        Path outputPath = new Path("hdfs://localhost:9000/output");
        outputPath.getFileSystem(conf).delete(outputPath, true);

以后就不需要手动删除以前运行的结果了

看到这,你就可以去开发调试属于你的MapReduce算法了,后面的讲述如何查看MapRedue算法运行的时间信息

部署到集群中(部署后可以查看时间信息)

启用yarn监视job详细信息

需要hadoop分布式的部分配置

修改以下两个文件

Master是集群的名称节点主机名,我们伪分布式可以写成localhost,将以下所有Master修改为localhost

4)修改文件mapred-site.xml
“/usr/local/hadoop/etc/hadoop”目录下有一个mapred-site.xml.template,需要修改文件名称,把它重命名为mapred-site.xml,然后,把mapred-site.xml文件配置成如下内容:

<configuration>
        <property>
                <name>mapreduce.framework.name</name>
                <value>yarn</value>
        </property>
        <property>
                <name>mapreduce.jobhistory.address</name>
                <value>localhost:10020</value>
        </property>
        <property>
                <name>mapreduce.jobhistory.webapp.address</name>
                <value>localhost:19888</value>
        </property>
        <property>
                <name>yarn.app.mapreduce.am.env</name>
                <value>HADOOP_MAPRED_HOME=/usr/local/hadoop</value>
        </property>
        <property>
                <name>mapreduce.map.env</name>
                <value>HADOOP_MAPRED_HOME=/usr/local/hadoop</value>
        </property>
        <property>
                <name>mapreduce.reduce.env</name>
                <value>HADOOP_MAPRED_HOME=/usr/local/hadoop</value>
        </property>
</configuration>

(5)修改文件 yarn-site.xml
请把yarn-site.xml文件配置成如下内容:
其中Master改成你自己的主机名

<configuration>
        <property>
                <name>yarn.resourcemanager.hostname</name>
                <value>Master</value>
        </property>
        <property>
                <name>yarn.nodemanager.aux-services</name>
                <value>mapreduce_shuffle</value>
        </property>
</configuration>

启动yarn

start-dfs.sh //启动hdfs文件系统,如果已经启动,可以不执行
start-yarn.sh //启动yarn

启动历史记录服务器,依赖于yarn

sudo mapred historyserver start

之后就可以重新运行MapReduce程序了

在localhost:19888 可以网页端查看yarn
可能需要替换成本机ip地址,本机ip可以通过ifconfig看到,wlo表示无线网卡,inet后的192.168.3.29即为ip地址
file

使用idea部署

打包成jar包

依次点击工具栏Build->build Artifacts->Build即可
如果为暗的,则File->ProjectStructure->Artifacts
新建配置
file
file
配置详情
file
点击OK
file
生成的jar包如图所示
file

这样做完之后就发现可以使用build了
实际上此步做完,就可以直接命令行上传到集群中,然后运行。
但是为了提高效率(不是偷懒!),下面教如何直接在idea里面部署到集群中

idea直接部署

1.在conf.set("fs.defaultFS", "hdfs://localhost:9000");之后添加
conf.set("mapreduce.job.jar", "/home/qianqiu/IdeaProjects/MatrixMultiply/out/artifacts/MatrixMultiply_jar/MatrixMultiply.jar");//部署到集群中运行
注意第二个参数,路径需要修改成你自己的
2.将/usr/local/hadoop/etc/hadoop中的core-site.xml,hdfs-site.xml,mapred-site.xml,yarn-site.xml拷贝到工程的src目录里(必须是这个目录)

之后再次点击运行,即在集群中运行,此时可以在yarn中查看job详细信息
localhost:19888

注意这样做之后无法进行断点调试,如需要调试,需再切换回单机模式(取消以上两步的操作即可,无需关闭yarn)

进阶版教程

多了一个预处理环节
详细教程

实操教程(如果hadoop环境已配好,可以直接看这里)

hdfs+yarn+historyserver均已配置好
前面已经介绍如何在idea里完成全部开发和运行操作,而下面链接则着重介绍如何在命令行运行,并在网页端查看结果,这样可以关闭idea,节省虚拟机大量性能。
注意,实操使用的是基础版+进阶版代码(两轮MapReduce),你也可以删除掉进阶版部分代码

详细教程

基础版参考资料

Hadoop3.1.3安装教程_单机/伪分布式配置_Hadoop3.1.3/Ubuntu18.04(16.04)
Hadoop基础—MapReduce的几种运行模式(方便调试)
Hadoop之——MapReduce job的几种运行模式
MapReduce实现大矩阵乘法