flink初识

发布时间 2023-07-06 19:29:58作者: 小不点丶

  一、flink:apache开源的一款流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行。

  

  二、Flink是一个计算框架和分布式的计算处理引擎,基于对流(实时、无界)和批(离散、有界)数据进行有状态的计算,它可以通过集群以内存进行任意规模的数据计算。

  • 高吞吐、低延迟、高性能
  • 支持带有事件的窗口(window)操作
  • 支持有状态的计算
  • 内存计算
  • 迭代计算

   

  三、应用场景

  Apache Flink 功能强大,支持开发和运行多种不同种类的应用程序。它的主要特性包括:批流一体化、精密的状态管理、事件时间支持以及精确一次的状态一致性保障等。Flink 不仅可以运行在包括 YARN、 Mesos、Kubernetes 在内的多种资源管理框架上,还支持在裸机集群上独立部署。在启用高可用选项的情况下,它不存在单点失效问题。事实证明,Flink 已经可以扩展到数千核心,其状态可以达到 TB 级别,且仍能保持高吞吐、低延迟的特性。世界各地有很多要求严苛的流处理应用都运行在 Flink 之上。

  四、组成:

  flink 主要分为2个部分:jobmanager、taskmanager。

  jobmanager:主要是处理作业。

  taskmanager:通过任务槽执行具体的任务。

  当然。除了2个主要的,还需要resourcemanager(资源管理器)主要协调任务的和资源的调度过程。

  五、flink 链路流程:

  

  1、source:数据来源

  2、transform:数据转换,分析处理过程。

  3、sink:结果输出下沉。

  五、本地数据库采集开发测试:

  1)maven依赖(具体的都是根据需要进行依赖即可):

   <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.16.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.16.2</version>
        </dependency>
        <!-- client -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.16.2</version>
        </dependency>
        <!-- jdbc -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>1.16.2</version>
        </dependency>
        <!-- mysql -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.29</version>
        </dependency>
    </dependencies>

  2)demo

public class Demo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // source
        DataStreamSource<Row> input = env.createInput(JdbcInputFormat
                .buildJdbcInputFormat()
                .setDrivername("com.mysql.cj.jdbc.Driver")
                .setDBUrl("jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&allowMultiQueries=true")
                .setUsername("root")
                .setPassword("root")
                .setQuery("select id from user")
                .setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO))
                .finish());


        //transform
        SingleOutputStreamOperator<String> operator = input.map(new MapFunction<Row, String>() {
            public String map(Row row) throws Exception {
                return String.valueOf(row.getField(0));
            }
        });

        //sink
        operator.addSink(JdbcSink.sink(
                "INSERT INTO test(id) values(?)",
                new JdbcStatementBuilder<String>() {
                    public void accept(PreparedStatement ps, String id) throws SQLException {
                        ps.setString(1, id);
                    }
                },
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withDriverName("com.mysql.cj.jdbc.Driver")
                        .withUrl("jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf8&useSSL=false&serverTimezone=GMT%2B8&allowMultiQueries=true")
                        .withUsername("root")
                        .withPassword("root")
                        .build()
        ));
        env.execute("demo");
    }
}

  六、集群开发测试:

  1)集群搭建

version: '3'
services:
  xbd-flink-job:
    image: flink:1.16.2
    container_name: xbd-flink-job
    restart: always
    privileged: true
    ports:
      - 8081:8081
    environment:
      - TZ=Asia/Shanghai
      - JOB_MANAGER_RPC_ADDRESS=xbd-flink-job
    command: jobmanager

  xbd-flink-task:
    image: flink:1.16.2
    container_name: xbd-flink-task
    restart: always
    privileged: true
    environment:
      - TZ=Asia/Shanghai
      - JOB_MANAGER_RPC_ADDRESS=xbd-flink-job
    command: taskmanager
    depends_on:
      - xbd-flink-job

  

  2)打包测试:

  a、maven(主要为了打成jar,包含依赖)

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <configuration>
                    <createDependencyReducedPom>false</createDependencyReducedPom>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

  b、执行、提交:

   c、结果查看

   七、官网:https://flink.apache.org/zh/