Flink实验

发布时间 2023-12-28 11:37:09作者: 代不动码

 

题目:

实验八

姓名

 

日期12.8

实验环境:1Ubuntu18.04(或Ubuntu16.04)。

2IntelliJ IDEA

3Flink1.9.1

 

实验内容与完成情况:1)使用IntelliJ IDEA工具开发WordCount程序

Linux系统中安装IntelliJ IDEA,然后使用IntelliJ IDEA工具开发WordCount程序,并打包成JAR文件,提交到Flink中运行。

 

 

 

 

 

2)数据流词频统计

使用Linux系统自带的NC程序模拟生成数据流,不断产生单词并发送出去。编写Flink程序对NC程序发来的单词进行实时处理,计算词频,并把词频统计结果输出。要求首先在IntelliJ IDEA中开发和调试程序,然后,再打成JAR包部署到Flink中运行。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class WordCount {
    public static void main(String[] args) throws Exception {
    //定义socket的端口号
        int port;
        try {
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            port = parameterTool.getInt("port");
        } catch (Exception e) {
            System.err.println("指定port参数,默认值为9000");
            port = 9000;
        }
//获取运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//连接socket获取输入的数据
        DataStreamSource<String> text = env.socketTextStream("node1", port, "\n");
//计算数据
        DataStream<WordWithCount> windowCount = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
            public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
                String[] splits = value.split("\\s");
                for (String word : splits) {
                    out.collect(new WordWithCount(word, 1L));
                }
            }
                })//打平操作,把每行的单词转为<word,count>类型的数据
                .keyBy("word")//针对相同的word数据进行分组
                .timeWindow(Time.seconds(2), Time.seconds(1))//指定计算数据的窗口大小和滑动窗口大小
                .sum("count");
//把数据打印到控制台
        windowCount.print()
                .setParallelism(1);//使用一个并行度
//注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
        env.execute("streaming word count");
    }
    /**
     * 主要为了存储单词以及单词出现的次数
     */
    public static class WordWithCount {
        public String word;
        public long count;
        public WordWithCount() {
        }
        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }
        @Override
        public String toString() {
            return "WordWithCount{" +
                    "word='" + word + '\'' +
                    ", count=" + count +
                    '}';
        }
    }
}

 

 

出现的问题:1.Idea中编写代码时,遇到无法找到依赖库和配置不正确的问题。2.maven打包出现问题

 

解决方案(列出遇到的问题和解决办法,列出没有解决的问题):1.网上查找资料,正确导包2.查阅资料解决