Hadoop快速入门

发布时间 2023-12-16 15:04:46作者: 戴莫先生Study平台

Hadoop快速入门

一、大数据思维

  • 分而治之
所谓“分而治之”,就是把一个复杂的算法问题按一定的“分解”方法分为等价的规模较小的若干部分,然后逐个分别找出各部分的解,再把各部分的解组成整个问题的解。

传统的计算都是基于内存去完成的,但是内存是有限的,数据量太大,导致无法在较短时间内迅速解决,也就是说数据太大,无法一次性装入内存。于是就出现了系统分布式处理的概念。利用网络中的所有计算机进行并行计算,大大提高了大数据的处理效率。
  • 查重
读取文件的每一行,然后模仿哈希表算出每一行的 HashCode;
根据 HashCode 对 1000 取模,也就是将文件根据 HashCode 值分为 1000 份,每份大小大约 1GB。这样就可以将 HashCode 相同的行存储到同一个文件中;
每份文件中的数据再利用 HashSet 来判断是否重复。
  • 排序
在单机且可用内存只有1G的情况下,如何对 1T 的数据进行排序?
外排序。传统的排序算法一般指内排序算法,针对的是数据可以一次全部载入内存中的情况。但是面对海量数据,即数据不可能一次全部载入内存,需要用到外排序的方法。外排序采用的就是分而治之(分块的方法),首先将数据分块,对块内数据按选择一种高效的内排序策略进行排序。然后采用归并排序的思想对于所有的块进行排序,得到所有数据的一个有序序列。

设置一个阈值(500M),按行读取数据,如果读取数据量超过阈值就产生一个新文件。大略算一下一共切分成 2048 个文件。现在的情况是:文件内部和文件外部全部无序。然后:

使用归并算法对每个文件进行排序,让文件内部有序。
然后使用归并让多个文件合并到一起。

二、Hadoop生态圈

狭义上来说,hadoop就是单独指代hadoop这个软件,
广义上来说,hadoop指代大数据的一个生态圈,包括很多其他的软件

image-20231215192955288

  • hadoop官网
https://hadoop.apache.org/
https://archive.apache.org/dist/hadoop/common/hadoop-3.3.4/
  • hadoop软件模块
Hadoop Common:Hadoop 体系最底层的一个模块,是其他模块的基础设施。
Hadoop Distributed File System (HDFS™):Hadoop 分布式文件系统,是Hadoop的基石。负责存储数据。
Hadoop YARN:Hadoop 作业调度和资源管理框架。负责资源调度。
Hadoop MapReduce:Hadoop 基于 YARN 的大数据并行处理程序。负责计算。

三、分布式系统文件系统架构

hdfs: (hadoop distributed file system) 分布式文件系统,将我们的数据存放在多台电脑上存储,分布式文件系统有很多,hdfs是mapreduce计算的基础。

  • 文件切分思想
文件存放在一个磁盘上效率肯定是低的,读取效率低,如果文件特别大会超出单机的存储范围。文件在磁盘真实存储文件的抽象概念,数组可以进行拆分和组装,源文件不会受到影响。

数据存储的原理:不管文件的大小,所有的文件都是由字节数组构成,如果我们要切分文件,就是将一个字节数组分成多份,我们将切分后的数据拼接在一起,数据可以继续使用,我们需要根据数据的偏移量将他们重新拼接在一起。
  • Block拆分
数据是被拆分为默认大小为128MB一个数据块进行存储,存储在每一个datanode中。
数据块的个数 =Ceil( 文件大小 / 每个块的大小)
HDFS中一旦文件被存储,数据不允许被修改
修改会影响偏移量
修改会导致数据倾斜
修改数据会导致蝴蝶效益
但是可以被追加,但是不推荐
追加设置需要手动打开
一般HDFS存储的都是历史数据。所以将来Hadoop的mr都用来进行离线数据的处理
块的大小一旦文件上传之后就不允许被修改
  • Block数据安全
备份存储数据,备份的数据肯定不能存放在一个节点上,备份的数据要小于等于节点数量,每个数据块会有3个副本,相同副本是不会存在同一个节点上。
  • Block的管理效率

需要专门给节点进行分工:

  • 存储:DataNode 记录:NameNode 日志:QJM

image-20231215195049661

zookeeper选举出hadoop中的active namenode和standby namenode,当搭建好集群的时候,格式化主备节点的时候,ANN和SNN都会会默认创建 初始化fsimage,为了保证数据,standbyNN会通过JournalNode获取active最新的edits日志文件元数据,通过元数据与fsimage合成数据,实现数据同步,使用数据高可用,保证了数据的安全!NameNode不断与DataNode保存心跳,存储数据的元数据(位置、权限、数据大小、文件与Block的映射关系等)。

  • HDFS特点:

优点:高容错性、运行在廉价的机器上、适合批处理、适合大数据的处理、流式数据访问、

缺点:不擅长低延迟数据访问、不擅长小文件的分区、不擅长并发写入、文件随意修改

四、hadoop基础架构

  • NameNode

image-20231215200521484

DataNode保存数据文件的block块文件,每一个文件在存入HDFS中,会被切分成一个个默认大小是128MB的数据块,存放在不同的DataNode中,NameNode保存文件的元数据block信息(文件的归属、文件的权限、文件的大小时间),但是block信息不会持久化,需要每次开启集群的时候DN上报。

NN与DN保持心跳机制,三秒钟发送一次,如果DN超过三秒没有心跳,就认为DN出现异常。如果DN超过10分钟+30秒没有心跳,那么NN会将当前DN存储的数据转存到其他节点。

性能:namenode为了效率,将所有的操作都在内存中完成,NameNode不会和磁盘进行任何的数据交换,因此会出现数据保存在内存中,掉电容易丢失数据。

  • DataNode

存放的是文件的数据信息和验证文件完整性的校验信息,数据会存放在硬盘上,1M=1条元数据 1G=1条元数据,NameNode非常排斥存储小文件,一般小文件在存储之前需要进行压缩。

启动时:汇报之前先验证block文件是否被损坏,向NameNode汇报当前DN上block的信息。

运行时:向NameNode保持心跳机制,客户可以向DN读写数据。

当客户端读写数据的时候,首先去NN查询file与block与dn的映射,然后客户端直接与dn建立连接,然后读写数据

五、Hadoop-HA架构

image-20231215202608744

当主节点出现异常的时候,集群直接将备用节点切换成主节点 要求备用节点马上就要工作 主备节点内存几乎同步 有独立的线程对主备节点进行监控健康状态 需要有一定的选举机制,帮助我们确定主从关系 我们需要实时存储日志的中间件。

image-20231215202821459

在初始化hadoop集群时,Active NameNode和Standby NameNode会生成fsimage_000000000,然后Active NameNode会定期向QJM集群发送最新的EditLog文件,StandBy NameNode定时从JournalNode集群中同步EditLog文件,然后通过EditLog与fsImage合成最新的数据,最终实现了StandbyNN与ActiveNN最新数据同步!

JournalNode只允许单个NameNode成为作者。在故障转移期间,将变为活动状态的NameNode将承担写入JournalNodes的角色,这将有效地防止另一个NameNode继续处于活动状态,从而使新的Active节点可以安全地进行故障转移。

  • ZKFC(zookeeper故障转移器)

image-20231215203908054

Failover Controller(故障转移控制器),对NameNode的主备切换进行总体控制,能及时检测NameNode的健康状况。

在主NameNode故障时借助Zookeeper实现自动的主备选举和切换,为了防止因为NN的GC失败导致心跳受影响,ZKFC作为一个daemon进程从NN分离出来!

  • 脑裂(brain-split)

定义:脑裂是Hadoop2.X版本后出现的全新问题,实际运行过程中很有可能出现两个namenode同时服务于整个集群的情况,这种情况称 之为脑裂。

脑裂通常发生在主从namenode切换时,由于ActiveNameNode的网络延迟、设备故障等问题,另一个NameNode会认为活跃的 NameNode成为失效状态,此时StandbyNameNode会转换成活跃状态,此时集群中将会出现两个活跃的namenode。因此,可能出 现的因素有网络延迟、心跳故障、设备故障等。

脑裂问题的解决方案是隔离(Fencing):

1、第三方共享存储:任一时刻,只有一个 NN 可以写入;

2、DataNode:需要保证只有一个 NN 发出与管理数据副本有关的命令;

3、Client需要保证同一时刻只有一个 NN 能够对 Client 的请求发出正确的响应。

六、HDFS读写流程

  • 写入操作
客户端项Hdfs发送请求写数据请求,fs通过rpc调用namenode的create方法,NN接收到请求后先检查是否有足够的空间权限等条件创建这个文件,或者这个路径是否已经存在。如果有,NN会针对这个文件创建一个空的Entry对象,并返回成功状态给DFS,如果没有就抛出异常,发送错误信息给客户端,DFS收到成功状态后,会创建一个对象FSDataOutputStream的对象给客户端使用,客户端需要向NN询问第一个block存放的位置,此时NN会通过机架感知策略获取对应存放的DN节点,客户端将文件按照block块切分数据并以流的方式存到缓冲中,客户端再将缓存中的数据放入packet,一个packet放满之后添加到dataqueue。DataStreamer开始从DataQueue队列上取出一个packet,通过FSDataOutputStream 发送到 Pipeline。每当Dataqueue取出一个packet,就会把这个packet加入AckQueue中,典型的生产者消费者!取出的packet会被发送到对应一个DataNode中并将其写入缓存中,写入缓存完成后将Packet发往第二个DN节点,以此类推,等到最后一个节点写完之后将ACk返回到上一个节点直至第一个节点将ACk返回给客户端!
  • 读取数据操作
客户端向DFS发送请求,申请读取某一个文件,DFS去NN询问此文件信息,检查文件是否存在具有权限,DFS创建FSInputStream,客户通过这个对象读取数据,客户端会获取第一个block的信息,以及其对应所在的几个DN节点,客户端会获取第一个Block信息,以及其对应所在的几个DN节点,客户端会通过就近原则与最近的一个DN,通过FSInputStream读取数据,以此类推直到最后一个Block块。

七、HDFS集群搭建

前提已经安装好zookeeper集群

image-20231215211023212

到hadoop官网下载hadoop-3.3.4.tar.gz压缩包上传到服务器中!:http://hadoop.apache.org/

tar -zxvf hadoop-3.3.4.tar.gz -C /usr/local/hadoop/
rm hadoop-3.3.4.tar.gz -rf

进入hadoop项目中的etc文件夹中修改 hadoop-env.sh文件

export JAVA_HOME=/usr/local/java/jdk1.8.0_381
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_ZKFC_USER=root
export HDFS_JOURNALNODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root

在etc目录下修改core-site.xml文件(在内)

<configuration>
<!-- 设置 NameNode 节点的 URI(包括协议、主机名称、端口号),用于 NameNode 与 DataNode 之
间的通讯 -->
 <property>
 <name>fs.defaultFS</name>
 <value>hdfs://hdfs-zwf</value>
 </property>
 <!-- 设置 Hadoop 运行时临时文件的存放位置,比如 HDFS 的 NameNode 数据默认都存放这个目>录下 -->
 <property>
 <name>hadoop.tmp.dir</name>
 <value>/var/hadoop/temp</value>
 </property>
 <!-- 设置在 Web 界面访问数据时使用的用户名 -->
 <property>
 <name>hadoop.http.staticuser.user</name>
 <value>root</value>
 </property>
 <!-- 设置 HA,需要一组 ZK 地址,以逗号分隔。被 ZKFailoverController 使用于自动失效备援
 failover -->
 <property>
 <name>ha.zookeeper.quorum</name>
 <value>node1:2181,master:2181,node2:2181</value>
 </property>
 <!-- 该参数表示可以通过 httpfs 接口访问 HDFS 的 IP 地址限制 -->
 <!-- 配置 root(超级用户) 允许通过 httpfs 方式访问 HDFS 的主机名、域名 -->
 <property>
 <name>hadoop.proxyuser.root.hosts</name>
 <value>*</value>
 </property>
 <!-- 通过 httpfs 接口访问的用户获得的群组身份 -->
 <!-- 配置允许通过 httpfs 方式访问的客户端的用户组 -->
 <property>
 <name>hadoop.proxyuser.root.groups</name>
 <value>*</value>
 </property>
</configuration>                                  

hadoop.tmp.dir 属性的默认值为 /tmp/hadoop-${user.name} ,NameNode 会将 HDFS 的元数据存储在这个 /tmp 目录下,如果操 作系统重启了,系统会清空 /tmp,导致 NameNode 元数据丢失,是个非常严重的问题,所以我们需要修改此路径。

hadoop.http.staticuser.user=dr.who 默认值是一个不真实存在的用户,此用户权限非常小,不能访问不同用户的数据,但是这 保证了数据的安全。也可以设置为 hdfs 和 hadoop 等具有较高权限的用户,但会导致能够登陆网页界面的人会看到其它用户的数据。 实际设置请综合考虑,如无特殊需求,使用默认值就好。

修改etc目录下的hdfs-site.xml文件

<configuration>
<!-- 设置 nameservices 列表(联邦列表),逗号分隔 -->
 <property>
 <name>dfs.nameservices</name>
 <value>hdfs-zwf</value>
 </property>
 <!-- 设置一个 NameNode 列表。hdfs-zwf 是指具体的 nameservice 名称,通常就是 dfs.nameservices 中配置的。值是预备配置的 NameNode的ID,ID是自己取的,不重复就可以,例如 nn1,nn2 -->
 <property>
 <name>dfs.ha.namenodes.hdfs-zwf</name>
 <value>nn1,nn2</value>
 </property>
 <!-- 设置 NameNode 的 RPC 地址和端口 -->
 <property>
 <name>dfs.namenode.rpc-address.hdfs-zwf.nn1</name>
 <value>node1:8020</value>
 </property>
 <!-- 设置 NameNode 的 RPC 地址和端口 -->
 <property>
 <name>dfs.namenode.rpc-address.hdfs-zwf.nn2</name>
 <value>master:8020</value>
 </property>
 <!-- 设置 NameNode 的 HTTP 地址和端口 -->
 <property>
 <name>dfs.namenode.http-address.hdfs-zwf.nn1</name>
 <value>node1:9870</value>
 </property>
 <!-- 设置 NameNode 的 HTTP 地址和端口 -->
 <property>
 <name>dfs.namenode.http-address.hdfs-zwf.nn2</name>
 <value>master:9870</value>
 </property>
 <!-- 设置 QJM 共享存储系统服务器。在多个 NameNode 中共享存储目录,用于存放 edits 文件>,该目录由 Active 写,Standby 读 -->
 <property>
 <name>dfs.namenode.shared.edits.dir</name>
 <value>qjournal://node1:8485;master:8485;node2:8485/hdfs-zwf</value>
<!-- 设置 journalnode 用于存放 edits 日志的目录,默认值为 /tmp/hadoop/dfs/journalnode -->
 <property>
 <name>dfs.journalnode.edits.dir</name>
 <value>/var/hadoop/editslog</value>
 </property>
 <!-- 设置客户端连接 Active NameNode 所用的代理类 -->
 <property>
 <name>dfs.client.failover.proxy.provider.hdfs-zwf</name>
 <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
 </property>
 <!-- 设置 HDFS-HA 功能的防脑裂方法。可以是内建的方法(例如 shell 和 sshfence)或者用户自
定义的方法 -->
 <property>
 <name>dfs.ha.fencing.methods</name>
 <value>sshfence</value>
 <value>shell(true)</value>
 </property>
 <!-- 设置失效转移时使用的秘钥文件 -->
 <property>
 <name>dfs.ha.fencing.ssh.private-key-files</name>
 <value>/root/.ssh/id_rsa</value>
 </property>
 <!-- 设置故障转移功能是否开启,建议开启 -->
 <property>
 <name>dfs.ha.automatic-failover.enabled</name>
 <value>true</value>
 </property>
 <!-- 设置 HDFS 默认的数据块副本数。可以在创建文件时指定,如果创建时未指定,则使用默认>值 -->
 <property>
 <name>dfs.replication</name>
 <value>2</value>
</property>
</configuration>

在${HADOOP_HOME}/etc/hadoop/下修改workers文件目录

node1
master
node2

把配置好的hadoop分发到其他节点中

rsync -av /usr/local/hadoop/hadoop-3.3.4 root@master:/usr/local/hadoop/
rsync -av /usr/local/hadoop/hadoop-3.3.4 root@node2:/usr/local/hadoop/

修改/etc/profile文件(三个节点都要配置)

export HADOOP_HOME=/usr/local/hadoop/hadoop-3.3.4
export PATH=${HADOOP_HOME}/bin:${HADOOP_HOME}/sbin:$PATH

使用source /etc/profile让配置文件生效!

  • 启动(三台都要启动)
zkServer.sh start
zkServer.sh status
  • JournalNode(三台机器需要执行)
hdfs --daemon start journalnode
  • 最后格式化NameNode等相关服务并启动集群
# 格式化 node1 的 namenode(第一次配置的情况下使用)
[root@node1 ~]# hdfs namenode -format
# 启动 node1 的 namenode
[root@node1 ~]# hdfs --daemon start namenode
# master 节点同步镜像数据
[root@master ~]# hdfs namenode -bootstrapStandby
# 格式化 zkfc(第一次配置的情况下使用)
[root@node1 ~]# hdfs zkfc -formatZK
# 启动 HDFS
[root@node1 ~]# start-dfs.sh

后期只需要先启动zookeeper然后再启动HDFS即可

  • 浏览器访问
http://node1:9870/
http://master:9870/

image-20231215215719112

测试文件上传

hdfs dfs -mkdir -p /test #创建目录
hdfs dfs -put hadoop-3.3.4.tar.gz /test
hdfs dfs -D dfs.blocksize=134217728 -put hadoop-3.3.4.tar.gz /test  #指定Block大小

image-20231215215952887

  • 关闭
stop-dfs.sh #关闭HDFS
zkServer.sh stop #三台机器都需要执行 关闭zookeeper服务

 环境搭建成功后 shutdown -h now 关机拍摄快照。

八、hadoop集群命令

在linux中输入shell命令

1 hadoop fs -ls <path>
列出指定目录下的内容,支持pattern匹配。输出格式如filename(full path)<r n>size.n代表备份数。
2 hadoop fs -lsr <path>
递归列出该路径下所有子目录信息
3 hadoop fs -du<path>
显示目录中所有文件大小,或者指定一个文件时,显示此文件大小
4 hadoop fs -dus<path>
显示文件大小 相当于 linux的du -sb s代表显示只显示总计,列出最后的和 b代表显示文件大小时以byte为单位
5 hadoop fs -mv <src> <dst>
将目标文件移动到指定路径下,当src为多个文件,dst必须为目录
6 hadoop fs -cp <src> <dst>
拷贝文件到目标位置,src为多个文件时,dst必须是个目录
7 hadoop fs -rm [skipTrash] <src>
删除匹配pattern的指定文件
8 hadoop fs -rmr [skipTrash] <src>
递归删除文件目录及文件
9 hadoop fs -rmi [skipTrash] <src>
为了避免误删数据,加了一个确认
10 hadoop fs -put <> ... <dst>
从本地系统拷贝到dfs中
11 hadoop fs -copyFromLocal<localsrc>...<dst>
从本地系统拷贝到dfs中,与-put一样
12 hadoop fs -moveFromLocal <localsrc>...<dst>
从本地系统拷贝文件到dfs中,拷贝完删除源文件
13 hadoop fs -get [-ignoreCrc] [-crc] <src> <localdst>
从dfs中拷贝文件到本地系统,文件匹配pattern,若是多个文件,dst必须是个目录
14 hadoop fs -getmerge <src> <localdst>
从dfs中拷贝多个文件合并排序为一个文件到本地文件系统
15 hadoop fs -cat <src>
输出文件内容
16 hadoop fs -copyTolocal [-ignoreCre] [-crc] <src> <localdst>
与 -get一致
17 hadoop fs -mkdir <path>
在指定位置创建目录
18 hadoop fs -setrep [-R] [-w] <rep> <path/file>
设置文件的备份级别,-R标志控制是否递归设置子目录及文件
19 hadoop fs -chmod [-R] <MODE[,MODE]...|OCTALMODE>PATH
修改文件权限, -R递归修改 mode为a+r,g-w,+rwx ,octalmode为755
20 hadoop fs -chown [-R] [OWNER][:[GROUP]] PATH
递归修改文件所有者和组
21 hadoop fs -count[q] <path>
  • Hdfs dfs命令
-mkdir 创建目录 hdfs dfs -mkdir [-p] < paths>

-ls 查看目录下内容,包括文件名,权限,所有者,大小和修改时间 hdfs dfs -ls [-R] < args>

-put 将本地文件或目录上传到HDFS中的路径 hdfs dfs -put < localsrc> … < dst>

-get 将文件或目录从HDFS中的路径拷贝到本地文件路径 hdfs dfs -get [-ignoreCrc] [-crc] < src> <localdst>

选项:-ignorecrc选项复制CRC校验失败的文件。-crc选项复制文件和CRC。

-du 显示给定目录中包含的文件和目录的大小或文件的长度,用字节大小表示。 hdfs dfs -du [-s] [-h] URI [URI …] 选项:-s选项将显示

文件长度的汇总摘要,而不是单个文件。-h选项将以“人可读”的方式格式化文件大小(例如64.0m而不是67108864);第一列标示该目录下总文件大小,第二列标示该目录下所有文件在集群上的总存储大小和你的副本数相关(第二列内容=文件大小*副本数),第三列标示你查询的目录
-dus 显示文件长度的摘要。 hdfs dfs -dus < args> 注意:不推荐使用此命令。而是使用hdfs dfs -du -s。
-mv 在HDFS文件系统中,将文件或目录从HDFS的源路径移动到目标路径。不允许跨文件系统移动文件。
-cp 在HDFS文件系统中,将文件或目录复制到目标路径下 hdfs dfs -cp [-f] [-p | -p [topax] ] URI [ URI …] < dest> 选项:-f
选项覆盖已经存在的目标。-p选项将保留文件属性[topx](时间戳,所有权,权限,ACL,XAttr)。如果指定了-p且没有arg,则保留时间戳,所有权和权
限。如果指定了-pa,则还保留权限,因为ACL是一组超级权限。确定是否保留原始命名空间扩展属性与-p标志无关。
-copyFromLocal 从本地复制文件到hdfs文件系统(与-put命令相似)hdfs dfs -copyFromLocal < localsrc> URI 选项:如果目标已存在,
则-f选项将覆盖目标。
-copyToLocal 复制hdfs文件系统中的文件到本地 (与-get命令相似) hdfs dfs -copyToLocal [-ignorecrc] [-crc] URI <localdst>
-rm 删除一个文件或目录 hdfs dfs -rm [-f] [-r|-R] [-skipTrash] URI [URI …] 选项:如果文件不存在,-f选项将不显示诊断消息或
修改退出状态以反映错误。-R选项以递归方式删除目录及其下的任何内容。-r选项等效于-R。-skipTrash选项将绕过垃圾桶(如果已启用),并立即删除指定
的文件。当需要从超配额目录中删除文件时,这非常有用。
-cat 显示文件内容到标准输出上。 hdfs dfs -cat URI [URI …]
-text 获取源文件并以文本格式输出文件。允许的格式为zip和TextRecordInputStream。 hdfs dfs -text
-touchz 创建一个零长度的文件。 hdfs dfs -touchz URI [URI …]
-stat 显示文件所占块数(%b),文件名(%n),块大小(%n),复制数(%r),修改时间(%y%Y) hdfs dfs -stat URI [URI …]
-tail 显示文件的最后1kb内容到标准输出 hdfs dfs -tail [-f] URI 选项: -f选项将在文件增长时输出附加数据,如在Unix中一样。
-count 统计与指定文件模式匹配的路径下的目录,文件和字节数 hdfs dfs -count [-q] [-h] < paths>
-getmerge 将源目录和目标文件作为输入,并将src中的文件连接到目标本地文件(把两个文件的内容合并起来) 

hdfs dfs -getmerge < src> <localdst> [addnl] 注:合并后的文件位于当前目录,不在hdfs中,是本地文件

-grep 从hdfs上过滤包含某个字符的行内容 hdfs dfs -cat < srcpath> | grep 过滤字段

-chown hdfs上文件权限修改 hadoop fs -chown [-R] [OWNER][:[GROUP]] URI [URI ]#修改文件的所有者 例如:hdfs dfs -chown -R
Administrator:Administrator /user/

-distcp 最常用在集群之间的拷贝:hadoop distcp hdfs://master1:8020/foo/bar hdfs://master2:8020/bar/foo

九、配置windows中hadoop的环境变量

  • 解压 hadoop-3.3.4.zip,将解压后的文件夹存放到自己软件目录,例如:D:\hadoop\hadoop-3.3.4。
  • 将 D:\hadoop\hadoop-3.3.4\bin 目录下的 winutils.exe 和 hadoop.dll 文件拷贝 C:\Windows\System32 目录下。
  • 将 Hadoop 添加到环境变量

HADOOP_HOME --> D:\hadoop\hadoop-3.3.4

HADOOP_USER_NAME --> root

Path --> %HADOOP_HOME%\bin;%HADOOP_HOME%\sbin;

JDK 的版本也设置的和服务器版本一致

修改当前 Window 的 hosts(C:\Windows\System32\drivers\etc\hosts)文件,添加以下内容:

192.168.147.110 node1
192.168.147.120 master
192.168.147.130 node2

十、 java访问Hadoop

打开IDEA,创建一个普通MAVEN项目。拷贝hadoop的以下配置文件至项目resources目录中,(如果配置了windows中hadoop环境变量,不需要拷贝配置文件!)

core-site.xml
hdfs-site.xml
mapred-site.xml
yarn-site.xml
log4j.properties
  • pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.yjxxt</groupId>
    <artifactId>hadoop-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <!-- Hadoop 版本 -->
        <hadoop.version>3.3.4</hadoop.version>
        <!-- commons-io 版本 -->
        <commons-io.version>2.11.0</commons-io.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>${commons-io.version}</version>
        </dependency>
        <dependency>
            <groupId>com.janeluo</groupId>
            <artifactId>ikanalyzer</artifactId>
            <version>2012_u6</version>
        </dependency>
        <dependency>
            <groupId>org.junit.jupiter</groupId>
            <artifactId>junit-jupiter-api</artifactId>
            <version>5.9.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>
  • 文件上传和下载代码(java代码)
package com.zwf.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.codehaus.jackson.JsonToken;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

/**
 * @author Mr Zeng
 * @version 1.0
 * @date 2023-10-28 11:51
 */
@DisplayName("HDFS 测试类")
public class FsUpOrDownload {
    FileSystem fs=null;

    @BeforeEach
    public void hdfsInit() throws IOException, URISyntaxException, InterruptedException {
        //加载配置文件
        Configuration configuration=new Configuration();
        //获取文件系统
        fs=FileSystem.get(new URI("hdfs://192.168.147.110:8020"),configuration,"root");
    }

     @DisplayName("文件上传与下载")
     @Test
    public void  testUpload() throws IOException {
          //需要上传文件的目录
        Path srcPath=new Path("C:\\Users\\Administrator\\Desktop\\test.txt");
           //上传hadoop目录位置
        Path destPath=new Path("/test/reduce.txt");
        fs.copyFromLocalFile(srcPath,destPath);
         System.out.println("文件上传成功!");

    }

    @DisplayName("文件下载")
    @Test
    public void  testDownload() throws IOException {
        //需要上传文件的目录
        Path localPath=new Path("C:\\Users\\Administrator\\Desktop");
        //上传hadoop目录位置
        Path hdfsPath=new Path("/test/reduce.txt");
        fs.copyToLocalFile(false,hdfsPath,localPath,true);
        System.out.println("文件下载成功!");
        fs.close();
    }

}

十一、Hadoop3.x新特性

HDFS默认情况下,Block的备份系数是3,一个原始数据块和其他2个副本。
其中2个副本所需要的存储开销各站100%,这样使得200%的存储开销
正常操作中很少访问具有低IO活动的冷数据集的副本,但是仍然消耗与原始数据集相同的资源量。

EC技术

EC(擦除编码)和HDFS的整合可以保持与提供存储效率相同的容错。
     HDFS:一个副本系数为3,要复制文件的6个块,需要消耗6*3=18个块的磁盘空间
     EC:6个数据块,3个奇偶校验块
擦除编码需要在执行远程读取时,对数据重建带来额外的开销,因此他通常用于存储不太频繁访问的数据

NameNode

在Hadoop3中允许用户运行多个备用的NameNode。例如,通过配置三个NameNode(1个Active NameNode和2个Standby NameNode)和5个JournalNodes节点,此时集群可以容忍2个NameNode节点故障。

image-20231215222104209

image-20231215222116837

image-20231215222128266

十二、MapReduce设计思想

分而治之、

抽象模型:Input、Split、Map、Shuffle、Reduce、Output。 Input:读取数据。 Split:对数据进行粗粒度切分。 Map:对数据进行细粒度切分。 Shuffle:洗牌。将各个 MapTask 结果合并输出到 Reduce。 Reduce:对 Shuffle 进行汇总并输出到指定存储。 Output:HDFS、Hive、Spark、Flume……

统一架构

离线框架

计算向数据靠拢

顺序处理数据、随机访问数据

失效被认为是常态:MapReduce 集群中使用大量的低端服务器(Google 目前在全球共使用百万台以上的服务器节点),因此,节点硬 件失效和软件出错是常态。因而:一个良好设计、具有容错性的并行计算系统不能因为节点失效而影响计算服务的质量,任何节点失效 都不应当导致结果的不一致或不确定性;任何一个节点失效时,其它节点要能够无缝接管失效节点的计算任务;当失效节点恢复后应能 自动无缝加入集群,而不需要管理员人工进行系统配置。MapReduce 并行计算软件框架使用了多种有效的机制,如节点自动重启技术, 使集群和计算框架具有对付节点失效的健壮性,能有效处理失效节点的检测和恢复。

  • 排序算法
快速排序(Quick Sort)是从冒泡排序算法演变而来的,实际上是在冒泡排序基础上的递归分治法。快速排序在每一轮挑选一个基准元素,并让其他比它大的元素移动到数列的一边,比它小的元素移动到数列的另一边,从而把数列拆解成了两个部分。快速排序又分为:Hoare 法(左右指针法)、前后指针法、挖坑法。

左右指针法

左右指针法算法步骤:定义一个 Begin 指向第一个元素,定义一个 End 指向最后一个元素。令第一个元素为 Key,Begin 向后找大于 Key的值,End 向前找小于 Key 的值,找到之后把 Begin 跟 End 位置的值交换,直到 Begin 的索引大于等于 End 的索引时结束,然后将 Key 和End 指针值交换。再将 Key 的左右两边重复上述操作,最终有序。

image-20231215222754204

挖坑法

image-20231215222821152

image-20231215222836375

  • 前后指针法

image-20231215222923509

  • 归并排序
归并排序算法步骤:
将原序列二等分,二等分之后的子序列继续二等分;
直到每个子序列只有一个元素时,停止拆分;
再按照分割的顺序进行归并。

image-20231215223016056

十三、MapReduce计算流程

8bb7b95af733390e04b3126103ee24b 4e3ff641de1f2a12282cc8341836f78

向Yarn集群提交一个任务,首先把要计算的数据文件切分为多个block块,默认每个block块大小是128MB大小,为了防止数据块导致数据计算时出现数据倾斜,每个block块会再次被切分成split切片,每个split切片默认大小是128MB,然后数据以kv方式读取内容到maptask中,首先载入环形数据缓冲区,默认100MB,达到80%后开始溢写,溢写期间会产生很多小文件,然后每个分区把溢写出的小文件进行快速排序和归并,然后ReduceTask拉取每个分区快排归并好的文件,进行预聚合计算操作,一个ReduceTask对应一个分区,最终把预聚合好的数据写入HDFS/mysql,或者不满意再次从HDFS上进行新一轮MapReducer计算操作。

从MapTask到ReduceTask整个过程都是shuffle阶段,会产生大量的落盘操作,影响整个计算性能。

十四、搭建Yarn集群

  • 配置${HADOOP_HOME}/etc/hadoop/hadoop-env.sh文件
export JAVA_HOME=/usr/local/java/jdk1.8.0_381
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_ZKFC_USER=root
export HDFS_JOURNALNODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root
export YARN_RESOURCEMANAGER_USER=root
  • 配置${HADOOP_HOME}/etc/hadoop/mapred-site.xml
<configuration>
<!-- 设置执行 MapReduce 任务的运行时框架为 YARN -->
 <property>
 <name>mapreduce.framework.name</name>
 <value>yarn</value>
 </property>
 <!-- 设置 MapReduce JobHistory 服务器的地址 -->
 <property>
 <name>mapreduce.jobhistory.address</name>
 <value>node1:10020</value>
 </property>
 <!-- 设置 MapReduce JobHistory 服务器的 Web 地址 -->
 <property>
 <name>mapreduce.jobhistory.webapp.address</name>
 <value>node1:19888</value>
 </property>
 <!-- 设置已经运行完的 Hadoop 作业记录的存放路径(HDFS 文件系统中的目录),默认是
      ${yarn.app.mapreduce.am.staging-dir}/history/done -->
 <property>
 <name>mapreduce.jobhistory.done-dir</name>
 <value>/history/done</value>
 </property>
<!-- 设置正在运行中的 Hadoop 作业记录的存放路径(HDFS 文件系统中的目录),默认是
      ${yarn.app.mapreduce.am.staging-dir}/history/done_intermediate -->
 <property>
 <name>mapreduce.jobhistory.intermediate-done-dir</name>
 <value>/history/done_intermediate</value>
 </property>
 <!-- 设置需要加载的 jar 包和环境配置 -->
 <property>
 <name>mapreduce.application.classpath</name>
 <value>
 /usr/local/hadoop/hadoop-3.3.4/etc/hadoop,
 /usr/local/hadoop/hadoop-3.3.4/share/hadoop/common/*,
 /usr/local/hadoop/hadoop-3.3.4/share/hadoop/common/lib/*,
 /usr/local/hadoop/hadoop-3.3.4/share/hadoop/hdfs/*,
 /usr/local/hadoop/hadoop-3.3.4/share/hadoop/hdfs/lib/*,
 /usr/local/hadoop/hadoop-3.3.4/share/hadoop/mapreduce/*,
 /usr/local/hadoop/hadoop-3.3.4/share/hadoop/mapreduce/lib/*,
 /usr/local/hadoop/hadoop-3.3.4/share/hadoop/yarn/*,
 /usr/local/hadoop/hadoop-3.3.4/share/hadoop/yarn/lib/*
 </value>
 </property>
</configuration>
  • 配置${HADOOP_HOME}/etc/hadoop/yarn-site.xml
<configuration>
<!-- 提交 MapReduce 作业的 staging 目录(HDFS 文件系统中的目录),默认是 /tmp/hadoop-yarn/staging -->
 <property>
 <name>yarn.app.mapreduce.am.staging-dir</name>
 <value>/tmp/hadoop-yarn/staging</value>
 </property>
  <!-- 设置开启 ResourceManager 高可用 -->
 <property>
 <name>yarn.resourcemanager.ha.enabled</name>
 <value>true</value>
 </property>
 <!-- 设置 ResourceManager 的集群 ID -->
 <property>
 <name>yarn.resourcemanager.cluster-id</name>
 <value>yarn-zwf</value>
 </property>
 <!-- 设置 ResourceManager 节点的名字 -->
 <property>
 <name>yarn.resourcemanager.ha.rm-ids</name>
 <value>rm1,rm2</value>
 </property>
 <!-- 设置 ResourceManager 服务器的地址 -->
 <property>
 <name>yarn.resourcemanager.hostname.rm1</name>
 <value>node1</value>
 </property>
 <!-- 设置 ResourceManager 服务器的地址 -->
 <property>
 <name>yarn.resourcemanager.hostname.rm2</name>
 <value>node2</value>
 </property>
 <!-- 设置 ResourceManager 服务器的 Web 地址 -->
 <property>
 <name>yarn.resourcemanager.webapp.address.rm1</name>
 <value>node1:8088</value>
 </property>
 <!-- 设置 ResourceManager 服务器的 Web 地址 -->
 <property>
 <name>yarn.resourcemanager.webapp.address.rm2</name>
 <value>node2:8088</value>
 </property>
<!-- 设置 YARN 的 ZK 集群地址,以逗号分隔 -->
 <property>
 <name>yarn.resourcemanager.zk-address</name>
 <value>node1:2181,master:2181,node2:2181</value>
 </property>
 <!-- 设置 NodeManager 列表,以逗号分割 -->
 <!--提示:如果 yarn.nodemanager.aux-services 选项配置为 spark_shuffle ,需要拷贝 $SPARK_HOME/yarn/spark-x.y.z-yarnshuffle.jar 到 $HADOOP_HOME/share/hadoop/yarn/lib 目录。
-->
 <property>
 <name>yarn.nodemanager.aux-services</name>
 <value>mapreduce_shuffle</value>
 </property>
 <!-- 设置开启日志聚合,日志聚合会收集每个容器的日志,并在应用程序完成后将这些日志移动到文
件系统,例如 HDFS -->
 <property>
 <name>yarn.log-aggregation-enable</name>
 <value>true</value>
 </property>
 <!-- 设置聚合日志的保留时间 -->
 <property>
 <name>yarn.log-aggregation.retain-seconds</name>
  <value>640800</value>
 </property>
 <!-- 设置是否启用自动恢复,如果为 true 则必须指定 yarn.resourcemanager.store.class -->
 <property>
 <name>yarn.resourcemanager.recovery.enabled</name>
 <value>true</value>
 </property>
 <!-- 设置 ResourceManager 的状态信息存储在 ZooKeeper 集群 -->
 <property>
 <name>yarn.resourcemanager.store.class</name>
 <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
 </property>
<!-- 设置是否对容器强制执行物理内存限制 -->
 <!-- 是否启动一个线程检查每个任务正在使用的物理内存量,如果任务超出分配值,则将其直接杀掉
,默认为 true -->
<property>
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
 <value>false</value>
 </property>
 <!-- 设置是否对容器强制执行虚拟内存限制 -->
 <!-- 是否启动一个线程检查每个任务正在使用的虚拟内存量,如果任务超出分配值,则将其直接杀掉
,默认为 true -->
 <property>
 <name>yarn.nodemanager.vmem-check-enabled</name>
 <value>false</value>
 </property>
 <!-- 设置容器的虚拟内存限制,虚拟内存与物理内存之间的比率。作用:在物理内存不够用的情况下
,如果占用了大量虚拟内
      存并且超过了一定阈值,那么就认为当前集群的性能比较差 -->
 <property>
 <name>yarn.nodemanager.vmem-pmem-ratio</name>
 <value>2.1</value>
 </property>
 <!-- 配置 JobHistory -->
 <property>
 <name>yarn.log.server.url</name>
 <value>http://node1:19888/jobhistory/logs</value>
 </property>
<property>
   <name>yarn.scheduler.minimum-allocation-mb</name>
   <value>1024</value>
   <description>default value is 1024</description>
</property>
</configuration>
  • 拷贝配置好文件到其他hadoop节点上
rsync -av /usr/local/hadoop/hadoop-3.3.4/etc/hadoop/yarn-site.xml root@master:/usr/local/hadoop/hadoop-3.3.4/etc/hadoop/

rsync -av /usr/local/hadoop/hadoop-3.3.4/etc/hadoop/mapred-site.xml root@master:/usr/local/hadoop/hadoop-3.3.4/etc/hadoop/

rsync -av /usr/local/hadoop/hadoop-3.3.4/etc/hadoop/mapred-site.xml root@master:/usr/local/hadoop/hadoop-3.3.4/etc/hadoop/hadoop-env.sh

  • 启动
zkServer.sh start
zkServer.sh status
start-dfs.sh
start-yarn.sh
mapred --daemon start historyserver

后期只需要先启动 ZooKeeper 然后启动 Hadoop(start-all.sh)再启动 JobHistory 即可。

  • 关闭
[root@node01 hadoop]# mapred --daemon stop historyserver
[root@node01 hadoop]# stop-all.sh
zkServer.sh stop

十五、MapReduce案例

  • WordCount案例

新建java Maven项目,把hadoop etc目录下的配置文件:core-site.xml、hdfs-site.xml、mapred-site.xml、yarn-site.xml、log4j.properties 5个配置文件复制到resource目录下。(windows上配置了hadoop环境不需要复制配置文件!)

  • pom.xml
   <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <hadoop.version>3.3.4</hadoop.version>
    </properties>



<dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.3.4</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.24</version>
        </dependency>
  • job端代码
package com.zwf;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;


/**
 * @author MrZeng
 * @version 1.0
 * @date 2023-10-29 21:01
 */
public class WordCountJob {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //加载配置文件  加载默认配置文件
        Configuration configuration = new Configuration();
        //创建作业
        Job job=Job.getInstance(configuration);
        //设置作业主类
        job.setJarByClass(WordCountJob.class);
        //设置作业名称
        job.setJobName("zwf-wordcount"+System.currentTimeMillis());
        //设置Reduce的数据量  也就是分区数量
        job.setNumReduceTasks(2);
        //设置hdfs读取路径(需要计算的数据从哪里读)
        FileInputFormat.setInputPaths(job,new Path("hdfs://node1:8020/test/reduce.txt"));
        //计算后的数据输出到哪
        FileOutputFormat.setOutputPath(job,new Path("hdfs://node1:8020/test/result"+job.getJobName()));
        //设置Map输出的key和value值  String int
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //设置Map和Reduce的处理类
        job.setMapperClass(MapperDeal.class);
        job.setReducerClass(ReduceDeal.class);
        job.waitForCompletion(true);

    }

}
  • reducerTask代码
package com.zwf;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2023-10-29 22:08
 */
/**
 * LongWritable:读取切片的key值  偏移量
 * Text: 切片的value值  内容
 * Text:写入Reduce的key值
 * IntWritable:写入reduce的value值
 */
public class ReduceDeal extends Reducer<Text, IntWritable,Text,IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
       //声明计数器
        int count=0;
        for (IntWritable v:values){
            count+=v.get();
        }
        //写出数据
        context.write(key,new IntWritable(count));
    }
}
  • MapperTask端代码
package com.zwf;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2023-10-29 22:08
 */

/**
 * LongWritable:读取切片的key值  偏移量
 * Text: 切片的value值  内容
 * Text:写入Reduce的key值
 * IntWritable:写入reduce的value值
 */
public class MapperDeal extends Mapper<LongWritable,Text,Text,IntWritable> {

       //定义MapTask输入的value值
     private  IntWritable one=new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
          //以空格切割
        String[] line=value.toString().split("\\s");
        System.out.println("内容--->"+value);
          //切分字符串   写出reduce数据
        for (String world:line){
             context.write(new Text(world),one);
        }
    }
}
  • 求每个城市每个月温度最高的前三天

自定义分组器

package com.zwf.weatherjob1;

import com.zwf.weater.Weather;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

import java.time.format.DateTimeFormatter;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2023-10-30 14:23
 */
public class GroupComparator extends WritableComparator {
    //传入泛型  实例化
    public GroupComparator() {
        super(Weather.class,true);
    }

    /**
     *   先比较省  再比较区  再比较年月  自定义规并
     * @param a the first object to be compared.
     * @param b the second object to be compared.
     * @return
     */
    //根据省市年月进行分组
    @Override
    public int compare(Object a, Object b) {
        //省城市不会合并  只有时间合并  return 0表示归并
      Weather a1=(Weather) a;
      Weather b1=(Weather) b;
         //负数是左对象属性值大于右对象属性值  正数反之  0是相等
        int result = a1.getProvince().compareTo(b1.getProvince());
        if (result!=0){
            return result;
        }
        result=a1.getCity().compareTo(b1.getCity());
        if (result!=0){
            return result;
        }

        return DateTimeFormatter.ofPattern("yyyyMM").format(a1.getReportTime())
                .compareTo(DateTimeFormatter.ofPattern("yyyyMM").format(b1.getReportTime()));
    }
}

自定义分区器

package com.zwf.weatherjob1;

import com.zwf.weater.Weather;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

import java.time.format.DateTimeFormatter;
import java.util.Objects;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2023-10-30 9:22
 */
public class WeaterPartitioner extends Partitioner<Weather, IntWritable> {
    @Override
    public int getPartition(Weather weather, IntWritable intWritable, int i) {
        // 自定义的分区 Key:省区年月
        int hashCode = Objects.hash(weather.getProvince(), weather.getCity(),
                DateTimeFormatter.ofPattern("yyyyMM").format(weather.getReportTime()));
        return Math.abs(hashCode) % i;
    }
}

Mapper端代码

package com.zwf.weatherjob1;

import com.zwf.weater.Weather;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2023-10-30 14:49
 */
public class MapperTask extends Mapper<LongWritable, Text, Weather, IntWritable> {

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Weather, IntWritable>.Context context) throws IOException, InterruptedException {
        //先要  key.get()==0 第一行
        //数据是一行一行的读
        String[] split = value.toString().split(",");
        //每行数据长度为11  为完整数据
       // 187,广东,南沙区,440115,雨,25,东北,≤3,96,1/6/2020 14:52:21,1/6/2020 15:00:03
            if(split.length==11){
                    Weather weather = new Weather();
                    weather.setId(Integer.parseInt(split[0]));
                    weather.setProvince(split[1]);
                    weather.setCity(split[2]);
                    weather.setAreaCode(Integer.parseInt(split[3]));
                    weather.setWea(split[4]);
                    weather.setTemperature(Integer.parseInt(split[5]));
                    weather.setWindDirection(split[6]);
                    weather.setHumidity(Integer.parseInt(split[8]));
                    //字符串转为LocalDateTime  格式要一致
                    weather.setReportTime(LocalDateTime.parse(split[9],DateTimeFormatter.ofPattern("d/M/yyyy HH:mm:ss")));
                     //LocalDateTime转字符串
                     weather.setYmd(LocalDateTime.parse(split[9],DateTimeFormatter.ofPattern("d/M/yyyy HH:mm:ss")).format(DateTimeFormatter.ofPattern("yyyyMMdd")));
                     weather.setCreateTime(LocalDateTime.parse(split[10],DateTimeFormatter.ofPattern("d/M/yyyy HH:mm:ss")));
                    context.write(weather,new IntWritable(Integer.parseInt(split[5])));
            }


    }
}
  • ReducerTask端代码
package com.zwf.weatherjob1;

import com.zwf.weater.Weather;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2023-10-30 14:49
 */
public class ReduceTask extends Reducer<Weather, IntWritable, Text, NullWritable> {
    @Override
    protected void reduce(Weather key, Iterable<IntWritable> values, Reducer<Weather, IntWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException {
       //会读取所有的对象值  每一个对象值hashcode都不相同  温度会倒叙排列
        Set<String> weather=new HashSet<>();
        //打印3行最高温度的记录并对字符串进行去重
        for (IntWritable tempor:values){
           StringBuffer buffer=new StringBuffer();
           buffer.append(key.getProvince()).append(key.getCity()).append(key.getYmd());
           if(!weather.contains(buffer.toString())) {
               weather.add(buffer.toString());
               //字符串去重
               buffer.append("[温度:").append(tempor).append("]");
//               System.out.println(buffer);
               //写出数据
               context.write(new Text(buffer.toString()),NullWritable.get());
           }
              //取温度每个城市最高温度每天前三的三个
            if (weather.size()==3){
                return;
            }
            System.out.println(key+"=="+tempor);
        }
        System.out.println("============");
    }
}
  • weather天气实体类
package com.zwf.weater;

import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;

/**
 * 天气信息实体类(自定义比较器)
 */
@NoArgsConstructor
@Data
public class Weather implements WritableComparable<Weather> {

    private Integer id;
    private String province;  //省份
    private String city;   //城市
    private Integer areaCode;  //地区编号
    private String wea;  //气候
    private Integer temperature;  //温度
    private String windDirection;  //风向
   // private String windPower;
    private Integer humidity; //湿度
    private LocalDateTime reportTime;  //发布时间
    private String ymd;  //年月日
    private LocalDateTime createTime;  //创建时间

    /**
     * 按省、区、日期(年月)、温度进行比较
     *
     * @param o
     * @return
     */
    @Override
    public int compareTo(Weather o) {
        if (o == null) {  //升序
            return 1;
        }
        // 先比较省
        int result = this.getProvince().compareTo(o.getProvince());
        if (result != 0) {
            return result;
        }
        // 再比较区
        result = this.getCity().compareTo(o.getCity());
        if (result != 0) {
            return result;
        }
        // 再比较年月
        result = DateTimeFormatter.ofPattern("yyyyMMdd").format(this.getReportTime())
                .compareTo(DateTimeFormatter.ofPattern("yyyyMMdd").format(o.getReportTime()));
        if (result != 0) {
            return result;
        }

        // 再比较温度,负数是倒叙  从大到小
        return this.getTemperature().compareTo(o.getTemperature()) * -1;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(this.getId());
        out.writeUTF(this.getProvince());
        out.writeUTF(this.getCity());
        out.writeInt(this.getAreaCode());
        out.writeUTF(this.getWea());
        out.writeInt(this.getTemperature());
        out.writeUTF(this.getWindDirection());
       // out.writeUTF(this.getWindPower());
        out.writeUTF(this.getYmd());
        out.writeInt(this.getHumidity());
        out.writeLong(this.getReportTime().toInstant(ZoneOffset.ofHours(8)).toEpochMilli());
        out.writeLong(this.getCreateTime().toInstant(ZoneOffset.ofHours(8)).toEpochMilli());
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.setId(in.readInt());
        this.setProvince(in.readUTF());
        this.setCity(in.readUTF());
        this.setAreaCode(in.readInt());
        this.setWea(in.readUTF());
        this.setTemperature(in.readInt());
        this.setWindDirection(in.readUTF());
       // this.setWindPower(in.readUTF());
        this.setYmd(in.readUTF());
        this.setHumidity(in.readInt());
        this.setReportTime(LocalDateTime.ofEpochSecond(in.readLong() / 1000, 0, ZoneOffset.ofHours(8)));
        this.setCreateTime(LocalDateTime.ofEpochSecond(in.readLong() / 1000, 0, ZoneOffset.ofHours(8)));
    }

    @Override
    public String toString() {
        return  id +
                "," + province  +
                "," + city +
                "," + areaCode +
                "," + wea  +
                "," + temperature +
                "," + windDirection  +
                "," + humidity +
                "," + reportTime +
                "," + ymd +
                "," + createTime ;
    }
}

  • job类
package com.zwf.weatherjob1;

import com.zwf.ReduceDeal;
import com.zwf.weater.Weather;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.UUID;

/**
 * @author MrZeng
 * @version 1.0
 * @date 2023-10-30 14:06
 */
public class WeatherDriver {

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        //加载默认配置
        Configuration config=new Configuration();
        //获取作业对象
        Job job=Job.getInstance(config);
        //获取远程服务器的文件
        FileInputFormat.setInputPaths(job,new Path("hdfs://master:8020/test/weather.txt"));
        //设置输出文件路径和名字
        FileOutputFormat.setOutputPath(job,new Path("hdfs://master:8020/test/result"+ UUID.randomUUID().toString().substring(0,5)));
        //设置自定义分区类
//job.setPartitionerClass(WeaterPartitioner.class);
        //设置工作任务名
        job.setJobName("weather-"+System.currentTimeMillis());
        //设置mapTask
        job.setMapperClass(MapperTask.class);
        //设置reduceTask
        job.setReducerClass(ReduceTask.class);
        //设置自定义分组类
        job.setGroupingComparatorClass(GroupComparator.class);
        //设置分区数
        job.setNumReduceTasks(3);
        //设置MapTask输出key数据类型
        job.setMapOutputKeyClass(Weather.class);
        //设置ReduceTask输出value数据类型
        job.setMapOutputValueClass(IntWritable.class);
        //交给集群处理
        job.waitForCompletion(true);

    }
}

十六、MapReducer压缩

 在时下大数据环境中,虽然机器性能好,节点多,但是并不代表我们的数据不需要做任何的压缩就开始处理。所以在某些情况下,我们 还是需要对数据做压缩处理。压缩技术能够有效减少存储系统的读写字节数,提高网络带宽和磁盘空间的效率。

注意:压缩特性运用得当能提高性能,但运用不当也可能降低性能。

image-20231216143158673

image-20231216143134550

  • 压缩实践

第一种方法:配置文件开启压缩功能,配置core-site.xml和mapred-site.xml文件

<!-- 可用于压缩/解压缩的编解码器,用逗号分隔列表 -->
<propery>
<name>io.compression.codecs</name>
<value>
org.apache.hadoop.io.compress.DefaultCodec,
org.apache.hadoop.io.compress.GzipCodec,
org.apache.hadoop.io.compress.BZip2Codec,
com.hadoop.compression.lzo.LzopCodec,
org.apache.hadoop.io.compress.Lz4Codec,
org.apache.hadoop.io.compress.SnappyCodec,
org.apache.hadoop.io.compress.ZStandardCodec
</value>
</propery>

配置文件开启压缩功能,配置mapred-site.xml文件

<!-- 开启 Mapper 输出压缩 -->
<propery>
<name>mapreduce.map.output.compress</name>
<value>true</value>
</propery>
<!-- 设置 Mapper 输出压缩的压缩方式 -->
<propery>
<name>mapreduce.map.output.compress.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
</propery>
<!-- 开启 Reducer 输出压缩 -->
<propery>
<name>mapreduce.output.fileoutputformat.compress</name>
<value>true</value>
</propery>
<!-- 设置 Reducer 输出压缩的压缩方式 -->
<propery>
<name>mapreduce.output.fileoutputformat.compress.codec</name>
<value>org.apache.hadoop.io.compress.BZip2Codec</value>
</propery>
<!-- SequenceFiles 输出可以使用的压缩类型:NONE、RECORD 或者 BLOCK -->
<!-- 如果作业输出被压缩为 SequenceFiles,该属性用来控制使用的压缩格式。默认为 RECORD,即针对每条记录进行压缩,如果将其改为 BLOCK,将针
对一组记录进行压缩,这是推荐的压缩策略,因为它的压缩效率更高。 -->
<propery>
<name>mapreduce.output.fileoutputformat.compress.type</name>
<value>BLOCK</value>
</propery>

第二种方法:代码开启压缩配置

public class WeatherCompressJob {
	public static void main(String[] args) {
		try {
			// 加载配置文件
		Configuration configuration = new Configuration(true);
		// 本地模式运行
		// configuration.set("mapreduce.framework.name", "local");
			// 开启 Mapper 输出压缩
		configuration.setBoolean(Job.MAP_OUTPUT_COMPRESS, true);
		configuration.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, SnappyCodec.class, CompressionCodec.class);
			// 创建作业
		Job job = Job.getInstance(configuration);
			// 设置作业主类
		job.setJarByClass(WeatherCompressJob.class);
				// 设置作业名称
			job.setJobName("yjx-weather-compress-" + System.currentTimeMillis());
			// 设置 Reduce 的数量
			job.setNumReduceTasks(2);
			// 设置数据的输入路径(需要计算的数据从哪里读)
			FileInputFormat.setInputPaths(job, new Path("/yjx/weather.csv"));
				// 设置数据的输出路径(计算后的数据输出到哪里)
	FileOutputFormat.setOutputPath(job, new Path("/yjx/result/" + job.getJobName()));
			// 设置 Map 的输出的 Key 和 Value 的类型
			job.setMapOutputKeyClass(Text.class);
			job.setMapOutputValueClass(IntWritable.class);
				// 设置 Map 和 Reduce 的处理类
			job.setMapperClass(WeatherMapper.class);
			job.setReducerClass(WeatherReducer.class);
				// 开启 Reducer 输出压缩
			FileOutputFormat.setCompressOutput(job, true);
			FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
				// 将作业提交到集群并等待完成
			job.waitForCompletion(true);
		} catch (IOException | InterruptedException | ClassNotFoundException e) {
				e.printStackTrace();
		}
	}
}

打成jar包上传到服务器运行

hadoop  jar  XXX.jar  com.yjxxt.mapred.weather.compress.WeatherCompressJob

十七、MR优化

hadoop优势:可构建在廉价机器上、高容错性、适合批处理、适合存储大文件

  • 小文件优化

从存储方面来说:Hadoop 存储的每个文件都会在 NameNode上记录元数据,如果同样大小的文件,文件很小的话,就会产生很多元数据文件,造成 NameNode 的压力;

从读取方面来说:同样大小的文件分为很多小文件的话,会增加磁盘寻址次数,降低性能

从计算方面来说:我们知道一个 MapTask 默认处理一个分片或者一个文件,如果 MapTask 的启动时间比数据处理的时间还要长,那么就会造成低性能。而且在 Map 端溢写磁盘的时候每一个 MapTask 最终会产生 Reduce 数量个数的中间结果,如果 MapTask 数量特别多,就 会造成临时文件很多,造成 Reduce 拉取数据的时候增加磁盘的 IO。

优化方案:1、不在hdfs上存储小文件。 2、更换小文件存储格式或者压缩格式。3、小文件已经存储HDFS的话,在读取数据时使用自定义组合器实现在拉取文件前进行小文件预聚合!

  • 数据倾斜

优化:1、不使用默认hash分区算法 2、在数据键上加随机值或者加盐,打散数据,使数据均匀分布在不同的分区。 3、可以增加 Reduce 的 memory 和 vcore,提高性能解决问题。 4、可以增加 Reduce 的数量来分摊压力。(增加分区数)

  • 推测执行

 如果不是数据倾斜带来的问题,而是节点服务有问题造成某些 Map 和 Reduce 执行缓慢呢?可以使用推测执行,你跑的慢,我们可以找 个其他节点重启一样的任务进行竞争,谁快以谁为准。推测执行默认关闭!

  • 推测执行相关参数:
# 是否启用 MapTask 推测执行,默认为 true
mapreduce.map.speculative=true
# 是否启用 ReduceTask 推测执行,默认为 true
mapreduce.reduce.speculative=true
# 推测任务占当前正在运行的任务数的比例,默认为 0.1
mapreduce.job.speculative.speculative-cap-running-tasks=0.1;
# 推测任务占全部要处理任务数的比例,默认为 0.01
mapreduce.job.speculative.speculative-cap-total-tasks=0.01
# 最少允许同时运行的推测任务数量,默认为 10
mapreduce.job.speculative.minimum-allowed-tasks=10;
# 本次推测没有任务下发,执行下一次推测任务的等待时间,默认为 1000(ms)
mapreduce.job.speculative.retry-after-no-speculate=1000;
# 本次推测有任务下发,执行下一次推测任务的等待时间,默认为 15000(ms)
mapreduce.job.speculative.retry-after-speculate=15000;
# 标准差,任务的平均进展率必须低于所有正在运行任务的平均值才会被认为是太慢的任务,默认为 1.0
mapreduce.job.speculative.slowtaskthreshold=1.0;
  • MapReduce 执行流程优化

在map端调整切片大小,可以通过 mapreduce.client.submit.file.replication 参数修改副本数量.

调整每个mapTask资源数,来提升MR处理性能。

可以根据机器的配置和数据量来设置这两个参数,当内存足够时,增大mapreduce.task.io.sort.mb=500 会提高溢写的过程,而且 会减少中间结果的文件数量。

image-20231216145317934

当文件溢写完后,Map 会对这些文件进行 Merge 合并,默认每次合并 10 个溢写的文件,由参数 mapreduce.task.io.sort.factor=50 进行设置。这样可以减少合并的次数,提高合并的并行度,降低对磁盘操作的次数。

在 Reduce 拉取数据之前,我们可以使用 Combiner 实现 Map-Side 的预聚合(不影响最终结果的情况下),如果自定义了 Combiner,此 时会根据 Combiner 定义的函数对 map 方法的结果进行合并,这样可以减少数据的传输,降低磁盘和网络 IO,提升性能。

开启压缩后,数据会被压缩写入磁盘,Reduce 读的是压缩数据所以需要解压,在实际经验中 Hive 在 Hadoop 的运行的瓶颈一般都是 IO 而不是 CPU,压缩一般可以 10 倍的减少 IO 操作。

Map 流程完成之后,会通过运行一个 HTTP Server 暴露自身,供 Reduce 端获取数据。这里用来响应 Reduce 数据请求的线程数量是可以 配置的,通过 mapreduce.shuffle.max.threads 属性进行配置,默认为 0,表示当前机器内核数量的两倍。

提高ReducerTask拉取数据的容错性

image-20231216145605145

可以通过参数设置合理的 Reduce 数量( mapreduce.job.reduces 参数控制),以及通过参数设置每个 Reduce 的资源。

image-20231216145640008

 Reduce 在 Copy 的过程中默认使用 5 个( mapreduce.reduce.shuffle.parallelcopies 参数控制)并行度进行数据复制,可以将其 调大例如 100。

调节内存缓冲区溢写大小,Copy 过来的数据会先放入内存缓冲区中,然后当使用内存达到一定量的时候才 Spill 磁盘。这里的缓冲区大小要比 Map 端的更为灵活, 它基于 JVM 的 Heap Size 进行设置。该内存大小不像 Map 一样可以通过 mapreduce.task.io.sort.mb 来设置,而是通过另外一个参数 mapreduce.reduce.shuffle.input.buffer.percent (默认为 0.7)进行设置。

同 Map 一样,当文件溢写完后,Reduce 会对这些文件进行 Merge 合并。合并因子默认为 10,由参数 mapreduce.task.io.sort.factor 进行设置。

修改参数 mapreduce.reduce.input.buffer.percent ,默认为 0.0,表示不开启缓存,直接从磁盘读。当该值大于 0 的时候,会保 留指定比例的内存用于缓存(缓冲区 -> 读缓存 -> Reduce),从而提升计算的速度。