title: 大数据省赛
date: 2022-10-22 12:02:15
categories:
- bigdata
tags:
- bigdata
- hadoop
comments: false
大数据省赛
- 环境变量(用哪个添加哪个,有几个地方可以添加)
-
vi /etc/profile
-
vi .bath_profile
-
vi /etc/profile.d/my_env.sh(这个是自己创建的)
#JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.8.0_212
export PATH=$PATH:$JAVA_HOME/bin
#HADOOP_HOME
export HADOOP_HOME=/opt/module/hadoop-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
#ZOOKEEPER_HOME
export ZOOKEEPER_HOME=/opt/module/zookeeper-3.5.7
export PATH=$PATH:$ZOOKEEPER_HOME/bin
#HIVE_HOME
export HIVE_HOME=/opt/module/hive-3.1.2
export PATH=$PATH:$HIVE_HOME/bin
#SPARK
export SPARK_HOME=/opt/module/spark-3.0.3
export PATH=$PATH:$SPARK_HOME/bin
#SCALA_HOME
export SCALA_HOME=/opt/module/scala-2.12.6
export PATH=$PATH:$SCALA_HOME/bin
#FLINK_HOME
export FLINK_HOME=/opt/module/flink-1.14.0
export PATH=$PATH:$FLINK_HOME/bin
export HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
#HBASE_HOME
export HBASE_HOME=/opt/module/hbase-2.4.8
export PATH=$PATH:$HBASE_HOME/bin
#SQOOP_HOME
export SQOOP_HOME=/opt/module/sqoop-1.4.7
export PATH=$PATH:$SQOOP_HOME/bin
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka_2.12
export PATH=$PATH:$KAFKA_HOME/bin
#FLUME_HOME
export FLUME_HOME=/opt/module/flume-1.9.0
export PATH=$PATH:$FLUME_HOME/bin
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root
一.虚拟机准备
1.root用户
su root
2.修改静态ip
- vi /etc/sysconfig/network-scripts/ifcfg-ens33
BOOTPROTO="static"
ONBOOT="yes"
IPADDR=192.168.204.136
GATEWAY=192.168.204.2
DNS1=192.168.204.2
- 网络重启:
systemctl restart network
3.修改主机名
hostnamectl --static set-hostname master
4.配置主机名称映射,打开/etc/hosts
- vi /etc/hosts
192.168.204.136 master
192.168.204.137 slave01
192.168.204.138 salve02
5.进入C:\Windows\System32\drivers\etc
192.168.204.136 master
192.168.204.137 slave01
192.168.204.138 salve02
6.关闭防火墙
systemctl stop firewalld
systemctl disable firewalld
7.赋予超级权限
- vi /etc/sudoer
zkpk ALL=(ALL) NOPASSWD:A:ALL
8.免密登录
cd ~/.ssh 如果没有就ssh localhost
ssh-keygen -t rsa
ssh-copy-id master
ssh-copy-id slave01
ssh-copy-id salve02
以上操作3台都做
9.删除java,mysql
rpm -qa | grep java
rpm -qa | grep mariadb*
rpm -e --nodeps
二、JDK和hadoop
(jdk-1.8,hadoop-2.7)
1.解压 jdk
tar -xzvf jdk-8u212-linux-x64.tar.gz -C /home/zkpk
2.解压hadoop
tar -xzvf hadoop-3.1.3.tar.gz -C /home/zkpk
3.配置Hadoop配置
cd /opt/module/hadoop-2.7.3/etc/hadoop
- vi core-site.xml
<!-- 指定HDFS中NameNode的地址 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://bigdata1:9000</value>
</property>
<!-- 指定hadoop运行时产生文件的存储目录, 暂时保存文件的目录,默认是/tmp/hadoop-$user,此位置有可能在重启时被清空,因此必须另外配置。 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/module/hadoop-3.1.3/data/tmp</value>
</property>
- vi hdfs-site.xml
<!-- 指定HDFS的副本数,不配置也可以,因为默认就是3 -->
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<!--指定secondaryNameNode的http访问地址和端口号,在规划中,我们将simple01规划为SecondaryNameNode服务器。如果不配置默认是与namenode同一个节点上启动-->
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>bigdata2:50090</value>
</property>
- vi works
master
slave01
slave02
- vi yarn-site.xml
<!-- reducer获取数据的方式 -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!-- 指定YARN的ResourceManager的地址 -->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>bigdata3</value>
</property>
- vi mapred-site.xml
<!-- 指定mr运行在yarn上 -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<!-- 历史服务器端地址 -->
<property>
<name>mapreduce.jobhistory.address</name>
<value>bigdata1:10020</value>
</property>
<!-- 历史服务器web端地址 -->
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>bigdata1:19888</value>
</property>
4.hadoop格式化
hdfs namenode -format
5.Hadoop启动
bigdata1:start-dfs.sh
bigdata3:start-yarn.sh
历史服务器启动:mr-jobhistory-daemon.sh start historyserver
三、zookeeper集群部署
1.解压安装
解压:
[root@bigdata1 software]# tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz -C /opt/module/
[root@bigdata1 module]# mv apache-zookeeper-3.5.7-bin/ zookeeper-3.5.7
2.配置服务器编号
[root@bigdata1 zookeeper-3.5.7]# mkdir zkData
[root@bigdata1 zookeeper-3.5.7]# cd zkData
[root@bigdata1 zkData]# vim myid
1
3.将zk分发到其他两节点
将zookeeper-3.5.7分发到其他两个节点。
[root@bigdata1 module]# scp -r /opt/module/zookeeper-3.5.7 bigdata2:/opt/module
[root@bigdata1 module]# scp -r /opt/module/zookeeper-3.5.7 bigdata3:/opt/module
修改其他两台的myid: 路径:cd /opt/module/zookeeper-3.5.7/zkData/myid
[root@bigdata2 zkData]# vim myid
2
[root@bigdata3 zkData]# vim myid
3
4.配置zoo.cfg文件(/opt/module/zookeeper-3.5.7/conf)
1.重命名文件:
[root@bigdata1 conf]# cp zoo_sample.cfg zoo.cfg
2.配置zoo.cfg:
[root@bigdata1 conf]# vim zoo.cfg
#修改数据存储路径配置
dataDir=/opt/module/zookeeper-3.5.7/zkData
#添加配置
server.1=bigdata1:2888:3888
server.2=bigdata2:2888:3888
server.3=bigdata3:2888:3888
5.启动Zookeeper集群
(1)分别启动Zookeeper (三台都要执行)
[root@bigdata1 zookeeper-3.5.7]# zkServer.sh start
[root@bigdata2 zookeeper-3.5.7]# zkServer.sh start
[root@bigdata3 zookeeper-3.5.7]# zkServer.sh start
# 查看状态(三台都要查看)
zkServer.sh status
四、hive
1.检查及删除mysql等文件
rpm -qa | grep mysql
rpm -qa | grep mariadb
sudo rpm -e --nodeps mariadb*
2.解压hive并改名
tar -zxvf apache-hive-3.1.2-bin.tar.gz -C /home/zkpk
mv apache-hive-3.1.2-bin.tar.gz hive-3.1.2
3.移动mysql依赖jar包及更改logs名
mv mysql-connector-java-5.1.27-bin.jar /opt/module/hive/lib/
mv $HIVE_HOME/lib/log4j-slf4j-impl-2.10.0.jar $HIVE_HOME/lib/log4j-slf4j-impl-2.10.0.bak
4.修改/opt/module/hive/conf目录下的hive-env.sh.template名称为hive-env.sh
vi hive-env.sh
export HADOOP_HOME=/opt/module/hadoop-2.7.3
export HIVE_CONF_DIR=/opt/module/hive/conf
5.创建logs
mkdir logs
6.修改hive-log4j2.properties.template
mv hive-log4j2.properties.template hive-log4j2.properties
vi hive-log4j2.properties
hive.dir=/hive/logs
7.在/opt/module/hive/conf目录下创建一个hive-site.xml
- hive-site.xml
<configuration>
<!-- jdbc连接的URL -->
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://bigdata1:3306/metastore?useSSL=false</value>
</property>
<!-- jdbc连接的Driver-->
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
</property>
<!-- jdbc连接的username-->
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
</property>
<!-- jdbc连接的password -->
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>123456</value>
</property>
<!-- Hive默认在HDFS的工作目录 -->
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value>
</property>
<!-- 指定hiveserver2连接的端口号 -->(jdbc客户端连接的端口)
<property>
<name>hive.server2.thrift.port</name>
<value>10000</value>
</property>
<!-- 指定hiveserver2连接的host -->
<property>
<name>hive.server2.thrift.bind.host</name>
<value>bigdata1</value>
</property>
<!-- 指定存储元数据要连接的地址 -->
<property>
<name>hive.metastore.uris</name>
<value>thrift://bigdata1:9083</value>
</property>
<!-- 元数据存储授权 -->
<property>
<name>hive.metastore.event.db.notification.api.auth</name>
<value>false</value>
</property>
<!-- Hive元数据存储版本的验证 -->
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
<!-- hiveserver2的高可用参数,开启此参数可以提高hiveserver2的启动速度 -->
<property>
<name>hive.server2.active.passive.ha.enable</name>
<value>true</value>
</property>
<property>
<name>hive.cli.print.header</name>
<value>true</value>
</property>
<property>
<name>hive.cli.print.current.db</name>
<value>true</value>
</property>
</configuration>
bin/hdfs dfs -chmod g+w /user/hive/warehouse
8.vi hive-log4j.properties
hive.log.dir=/opt/module/hive/logs
9.建库
mysql> create database metastore;
mysql> quit;
10.格式化hive
schematool -initSchema -dbType mysql verbose
#启动客户端
hive --service metastore > /dev/null 2>&1 &
hive --service hiveserver2 > /dev/null 2>&1 &
11. 启动Hive
#创建logs目录用来存放日志:
[root@bigdata1 hive-3.1.2]# mkdir logs
#启动:
nohup hive --service metastore>/opt/module/hive-3.1.2/logs/metastore.log 2>&1 &
nohup hive --service hiveserver2>/opt/module/hive-3.1.2/logs/hive2.log 2>&1 &
#检查:
[root@bigdata1 hive-3.1.2]# ps -aux|grep hive
五、HBase(全部分发)
1.解压
tar -zxvf hbase-1.3.1-bin.tar.gz -C /opt/module
2.修改配置文件
cd conf/
- vi hbase-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_211
export HBASE_MANAGES_ZK=false
- vi hbase-site.xml
<property>
<name>hbase.rootdir</name>
<value>hdfs://bigdata1:9000/HBase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<!-- 0.98后的新变动,之前版本没有.port,默认端口为60000 -->
<property>
<name>hbase.master.port</name>
<value>16000</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name> <value>bigdata1:2181,bigdata2:2181,bigdata3:2181</value>
</property>
<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/opt/module/zookeeper-3.5.7/zkData</value>
</property>
- vi regionservers
master
slave01
slave02
软连接hadoop配置文件到hbase:
ln -s /opt/module/hadoop-3.1.3/etc/hadoop/core-site.xml
/opt/module/hbase-2.4.8/conf/core-site.xml
ln -s /opt/module/hadoop-3.1.3/etc/hadoop/hdfs-site.xml
/opt/module/hbase-2.4.8/conf/hdfs-site.xml
3.启动
bin/start-hbase.sh
六、sqoop
1.解压
tar -zxf sqoop-1.4.6.bin__hadoop-2.0.4-alpha.tar.gz -C /opt/module/
2.修改配置文件
cd conf/
- vi sqoop-env.sh
export HADOOP_COMMON_HOME=/opt/module/hadoop-2.7.2
export HADOOP_MAPRED_HOME=/opt/module/hadoop-2.7.2
export HIVE_HOME=/opt/module/hive
export ZOOKEEPER_HOME=/opt/module/zookeeper-3.4.10
export ZOOCFGDIR=/opt/module/zookeeper-3.4.10
export HBASE_HOME=/opt/module/hbase-1.3.1
- 拷贝jdbc驱动到sqoop的lib目录下,如:
cp mysql-connector-java-5.1.27-bin.jar /opt/module/sqoop-1.4.6.bin__hadoop-2.0.4-alpha/lib/
七、kafka(全部分发)
1.解压
tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/
2在/opt/module/kafka目录下创建logs文件夹
mkdir logs
3.修改配置文件
cd conf/
- vi server.properties
#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#kafka运行日志存放的路径
log.dirs=/opt/module/kafka/logs
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=bigdata1:2181,bigdata2:2181,bigdata3:2181/kafka
分别在bigdata2和bigdata3上修改配置文件/opt/module/kafka/config/server.properties中的broker.id=1、broker.id=2
4.启动
bin/kafka-server-start.sh config/server.properties &
bin/kafka-server-stop.sh stop
八、flume
1.修改名称为flume
mv apache-flume-1.7.0-bin flume
2.flume-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_144
3.启动
日志采集
nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log1.txt 2>&1 &
日志消费
nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log1.txt 2>&1 &
avro source
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
exec source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.tailsource-1.shell = /bin/bash -c
Spooling Directory source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /opt/module/testdir
a1.sources.r1.fileHeader = true
a1.sources.r1.fileSuffix = .COMPLETED
#忽略所有以.tmp 结尾的文件,不上传
a1.sources.r1.ignorePattern = ([^ ]*\.tmp)
Taildir source
#source的类型为TAILDIR,这里的类型大小写都可以
a1.sources.r1.type = taildir
#存储tial最后一个位置存储位置
a1.sources.r1.positionFile = /home/hadoop/hui/taildir_position.json
#设置tiail的组, 使用空格隔开
a1.sources.r1.filegroups = f1 f2
#设置每个分组的绝对路径
a1.sources.r1.filegroups.f1 = /home/hadoop/hui/test1/hehe.txt
#.匹配除换行符 \n 之外的任何单字符。*匹配前面的子表达式零次或多次。这里也可以用messages.*
a1.sources.r1.filegroups.f2 = /home/hadoop/hui/test2/.*
Kafka source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource //kafka类型
a1.sources.r1.batchSize = 5000 //每个批次写入channel通道的数量
a1.sources.r1.batchDurationMillis = 2000 //批次持续写入时间
a1.sources.r1.kafka.bootstrap.servers = localhost:9092 //kafka集群地址
a1.sources.r1.kafka.topics = test1, test2 //多个主题之间用逗号隔开
a1.sources.r1.kafka.consumer.group.id = custom.g.id //消费者组ID
natcat source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
hdfds sink
a3.sinks.k3.type = hdfs
a3.sinks.k3.hdfs.path = hdfs://hadoop102:9000/flume/upload/%Y%m%d/%H
#上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = log-
#是否按照时间滚动文件夹
a3.sinks.k3.hdfs.round = true
#多少时间单位创建一个新的文件夹
a3.sinks.k3.hdfs.roundValue = 1
#重新定义时间单位
a3.sinks.k3.hdfs.roundUnit = hour
#积攒多少个Event才flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 100
#多久生成一个新的文件
a3.sinks.k3.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是128M
a3.sinks.k3.hdfs.rollSize = 134217700
#文件的滚动与Event数量无关
a3.sinks.k3.hdfs.rollCount = 0
#是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
#设置文件类型,可支持压缩
a3.sinks.k3.hdfs.fileType = DataStream
logger sink
a1.sinks.k1.type = logger
avro sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
kafka sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink //kafka类型
a1.sinks.k1.kafka.topic = mytopic //主题名称
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092 //kafka集群地址
a1.sinks.k1.kafka.flumeBatchSize = 20 //每次从chanel中获取的数量
a1.sinks.k1.kafka.producer.acks = 1 //表示只要写入leader成功,就表示消息发送成功(0不管是否写入/-1所有节点都必须写入)
a1.sinks.k1.kafka.producer.linger.ms = 1 //每个批次发送的间隔时间
file_roll sink 本地
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
memory channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
file channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data
一、一个source多个sink
# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给所有channel
a1.sources.r1.selector.type = replicating
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /opt/module/hive/logs/hive.log
a1.sources.r1.shell = /bin/bash -c
# Describe the sink
# sink端的avro是一个数据发送者
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
二、故障转移(两个sink,死一个另一个还能用)
# Name the components on this agent
a1.sources = r1
a1.channels = c1
a1.sinkgroups = g1
a1.sinks = k1 k2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop102
a1.sinks.k2.port = 4142
# Describe the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1
九、spark
Standalone模式
- 解压一份Spark安装包,并修改解压后的文件夹名称为spark-standalone
[root@bigdata1 software]# tar -zxvf spark-3.0.3-bin-hadoop3.2.tgz -C /opt/module/
[root@bigdata1 module]# mv spark-3.0.3-bin-hadoop3.2/ spark-standalone
- 配置集群节点
[root@bigdata1 conf]# mv slaves.template slaves
[root@bigdata1 conf]# vim slaves
bigdata1
bigdata2
bigdata3
- 修改spark-env.sh文件,添加master节点
[root@bigdata1 conf]# mv spark-env.sh.template spark-env.sh
[root@bigdata1 conf]# vim spark-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_144
SPARK_MASTER_HOST=bigdata1
SPARK_MASTER_PORT=7077
- 向其他机器分发spark-standalone包
在其他机器创建spark-standalone目录。
[root@bigdata1 spark-standalone]# scp -r /opt/module/spark-standalone/ bigdata2:/opt/module
[root@bigdata1 spark-standalone]# scp -r /opt/module/spark-standalone/ bigdata3:/opt/module
- 启动spark集群 (注意执行路径)
[root@bigdata1 spark-standalone]# sbin/start-all.sh
starting org.apache.spark.deploy.master.Master, logging to /opt/module/spark-standalone/logs/spark-root-org.apache.spark.deploy.master.Master-1-bigdata1.out
bigdata1: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark-standalone/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-bigdata1.out
bigdata3: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark-standalone/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-bigdata3.out
bigdata2: starting org.apache.spark.deploy.worker.Worker, logging to /opt/module/spark-standalone/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-bigdata2.out
#jps 检查是否有master(bigdata1)和woker(bigdata1、bigdata2、bigdata3)
6.官方求PI案例
[root@bigdata1 spark-standalone]# bin/spark-submit \
> --class org.apache.spark.examples.SparkPi \
> --master spark://bigdata1:7077 \
> ./examples/jars/spark-examples_2.12-3.0.3.jar \
> 10
结果:(值不唯一,有值即可)
Pi is roughly 3.1408591408591406
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://bigdata1:7077 \
--executor-memory 2G \
--total-executor-cores 2 \
./examples/jars/spark-examples_2.12-3.0.3.jar \
10
结果:
Pi is roughly 3.1415351415351416
------------------------------ 命令 ---------------------------------------
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://bigdata1:7077 \
--executor-memory 2G \
--total-executor-cores 2 \
./examples/jars/spark-examples_2.12-3.0.3.jar \
10
Yarn模式
- 单独解压一份spark用来做Spark on Yarn 模式
[root@bigdata1 software]# tar -zxvf spark-3.0.3-bin-hadoop3.2.tgz -C /opt/module/
[root@bigdata1 module]# mv spark-3.0.3-bin-hadoop3.2/ spark-yarn
2.配置环境变量
#SPARK_HOME
export SPARK_HOME=/opt/module/spark-yarn
export PATH=$PATH:$SPARK_HOME/bin
3.修改配置(修改hadoop配置文件/opt/module/hadoop-3.1.3/etc/hadoop/yarn-site.xml)
#当机器内存较少时,防止执行过程进行被意外杀死,可以做如下配置:
#修改hadoop配置文件/opt/module/hadoop-3.1.3/etc/hadoop/yarn-site.xml,添加如下内容
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
3.分发配置文件
[root@bigdata1 hadoop]# scp -r /opt/module/hadoop-3.1.3/etc/hadoop/yarn-site.xml bigdata2:/opt/module/hadoop-3.1.3/etc/hadoop
[root@bigdata1 hadoop]# scp -r /opt/module/hadoop-3.1.3/etc/hadoop/yarn-site.xml bigdata3:/opt/module/hadoop-3.1.3/etc/hadoop
4.修改spark-env.sh
[root@bigdata1 conf]# mv spark-env.sh.template spark-env.sh
[root@bigdata1 conf]# vim spark-env.sh
export JAVA_HOME=/home/zkpk/jdk
YARN_CONF_DIR=/opt/module/hadoop-3.1.3/etc/hadoop
HADOOP_CONF_DIR=/opt/module/hadoop-3.1.3/etc/hadoop
5.修改slave.template为slave
master
slave01
slave02
5.重启Hadoop
[root@bigdata1 opt]# start-dfs.sh
[root@bigdata1 opt]# start-yarn.sh
6.求PI
[root@bigdata1 spark-yarn]# spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
./examples/jars/spark-examples_2.12-3.0.3.jar \
10
结果:
Pi is roughly 3.142211142211142
十、flink
standalone模式
- 解压安装Flink
[root@bigdata1 software]# tar -zxvf flink-1.14.0-bin-scala_2.12.tgz -C /opt/module/
[root@bigdata1 module]# mv flink-1.14.0/ flink-standalone
- 进入conf修改flink-conf.yaml (cd /opt/module/flink-standalone/conf)
jobmanager.rpc.address: bigdata1
- 修改wokers(路径同上)
[root@test conf]# vim workers
bigdata1
bigdata2
bigdata3
- 分发到集群其他节点
[root@bigdata1 module]# scp -r /opt/module/flink-standalone/ bigdata2:/opt/module
[root@bigdata1 module]# scp -r /opt/module/flink-standalone/ bigdata3:/opt/module
- 启动Flink集群命令(在/opt/module/flink-standalone/下执行)
bin/start-cluster.sh
-
浏览器访问 http://bigdata1:8081 可以对flink集群和任务进行监控管理。
注意:bigdata1为宿主机ip。8081应该为在宿主机上映射的端口号
-
停止Flink集群命令(在/opt/module/flink-standalone/下执行)
bin/stop-cluster.sh
Flink on Yarn模式
- 解压安装
[root@bigdata1 software]# tar -zxvf /opt/software/flink-1.14.0-bin-scala_2.12.tgz -C /opt/module/
[root@bigdata1 module]# mv flink-1.14.0/ flink-yarn
2.配置环境变量(/etc/profile)
#配置环境变量HADOOP_CLASSPATH, 如果前面已经配置可以忽略。
#export HADOOP_CLASSPATH=`hadoop classpath`
#FLINK_HOME
export FLINK_HOME=/opt/module/flink-yarn
export PATH=$PATH:$FLINK_HOME/bin
export HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
3.启动Hadoop集群
Flink on Yarn模式基于Hadoop集群Yarn。
Session Cluster
Session-Cluster模式需要先启动集群,然后再提交作业,接着会向yarn申请一块空间后,资源永远保持不变。如果资源满了,下一个作业就无法提交,只能等到yarn中的其中一个作业执行完成后,释放了资源,下个作业才会正常提交。所有作业共享Dispatcher和ResourceManager;共享资源;适合规模小执行时间短的作业。
在yarn中初始化一个flink集群,开辟指定的资源,以后提交任务都向这里提交。这个flink集群会常驻在yarn集群中,除非手工停止。
-
启动Hadoop集群。
-
启动yarn-session
bin/yarn-session.sh -d
使用以下命令检查是否启动成功:(任意路径)
yarn application -list
出现 Flink session cluster 字样则说明启动成功!!
Per Job Cluster
一个Job会对应一个集群,每提交一个作业会根据自身的情况,都会单独向yarn申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享Dispatcher和ResourceManager,按需接受资源申请;适合规模大长时间运行的作业。
每次提交都会创建一个新的flink集群,任务之间互相独立,互不影响,方便管理。任务执行完成之后创建的集群也会消失。
-
启动hadoop集群
确认是否启动: jps
如果没启:执行
start-all.sh
-
该模式不启动yarn-session,直接执行job
-
测试:
yum install -y nc
nc -lk 22222
另起一个终端,执行以下命令:
flink run -m yarn-cluster /opt/module/flink-yarn/examples/streaming/SocketWindowWordCount.jar --hostname bigdata1 --port 22222
查看yarn application:
yarn application -list
若出现 Flink per-job cluster 则说明任务正在运行,成功!!