练习 : Flink 读取 data.txt(json格式文件)文件并进行解析,写入到mysql中

发布时间 2024-01-02 19:30:02作者: sunny123456

练习 : Flink 读取 data.txt(json格式文件)文件并进行解析,写入到mysql中

 

bean 

//实体类  mysql的一条记录

复制代码
package bean;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Account {
    //账号信息
    private String 账号信息;
    //售价
    private String 售价;
    // 热度
    private String 热度;
    // 发布时间
    private String 发布时间;
    // 标题
    private String 标题;
    // 图片数量
    private String 图片数量;
}
复制代码

 

sink

// file sink 用于 清洗数据

复制代码
package sink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.io.*;
public class FileSink extends RichSinkFunction<String> {
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
    }
    @Override
    public void close() throws Exception {
        super.close();
    }
    @Override
    public void invoke(String value, Context context) throws Exception {
        String path = FileSink.class.getClassLoader().getResource("").getPath()+"OutFile.txt";
//        String path = FileSink.class.getClassLoader().getResource("OutFile.txt").getPath();
        File file = new File(path);
        if (!file.exists()){
            file.createNewFile();
        }
        FileOutputStream fos = new FileOutputStream(file);
//        value=value+"\n";
        fos.write(value.getBytes());
        System.out.println("FileSink>"+value);
        System.out.println(file.getAbsolutePath());
    }
}
复制代码

// mysql sink 用于保存到 mysql

复制代码
package sink;
import bean.Account;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import java.sql.*;
 //自定义SINK extends RichSinkFuncation
 public class SQLSink extends RichSinkFunction<Account> {
     private static Connection conn;
     private static PreparedStatement pre;
     private static String database;
     private static String sql;
 public SQLSink(String database,String sql){
     this.database=database;
     this.sql=sql;
 }
    @Override
public void open(Configuration parameters) throws Exception {
      Class.forName("com.mysql.jdbc.Driver");
      conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/"+database+"?useUnicode=true&characterEncoding=UTF-8", "root", "root");
      conn.setAutoCommit(true);
  }
    @Override
   public void invoke(Account value, Context context) throws Exception {
        String createTableSql="create table if not exists Account (info varchar(255) , price varchar(255) , hot varchar(255) , `time` varchar(255) , title varchar(255) , pic_nums varchar(255) )DEFAULT CHARSET=utf8";
        pre =conn.prepareStatement(createTableSql);
        pre.execute();
        pre.close();
        if (!String.valueOf(value.get账号信息()).equals("null")){
            sql="insert into account values (?,?,?,?,?,?)";
            pre = conn.prepareStatement(sql);
            pre.setString(1,value.get账号信息());
            pre.setString(2,value.get售价());
            pre.setString(3,value.get热度());
            pre.setString(4,value.get发布时间());
            pre.setString(5,value.get标题());
            pre.setString(6,value.get图片数量());
            pre.execute();
        }
 }
 @Override
    public void close() throws Exception {
               pre.close();
               conn.close();
           }
}
复制代码

 

flink test

// 执行清洗数据,统一数据格式

复制代码
package test;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import sink.FileSink;
public class Data_Shuffle {
    public static void main(String[] args) {
        //flinks读取data.txt(json格式文件)文件并进行解析,写入到mysql中(每个{}为一条数据) 这个可以解决吗
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        DataStreamSource<String> str = env.readTextFile(Data_Shuffle.class.getClassLoader().getResource("JSON_DATA.txt").getPath());
        str.print("str");
        str.addSink(new FileSink());
        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
复制代码

// 将保存到 mysql

复制代码
package test;
import bean.Account;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;
import sink.SQLSink;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Flink_To_MySQL {
    public static void main(String[] args) {
        //flinks读取data.txt(json格式文件)文件并进行解析,写入到mysql中(每个{}为一条数据) 这个可以解决吗
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        String path = Flink_To_MySQL.class.getClassLoader().getResource("OutFile.txt").getPath();
        System.out.println(path);
        DataStreamSource<String> newStr = env.readTextFile(path);
        newStr.print("newStr");
        SingleOutputStreamOperator<Account> items = newStr.flatMap(new FlatMapFunction<String, Account>() {
            @Override
            public void flatMap(String s, Collector<Account> collector) throws Exception {
                JSONObject jsonObject = JSON.parseObject(s);
                JSONArray items = jsonObject.getJSONArray("items");
                for (Object item : items) {
                    Account account = JSON.parseObject(item.toString(), Account.class);
                    System.out.println(account);
                    collector.collect(account);
                }
            }
        });
        items.addSink(new SQLSink("aaa",""));
        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
复制代码

data.txt

复制代码
{
"items":[
{
"账号信息":"[等级73 1-10级 无QQ好友] 便宜出",
"售价":"$ 140.00",
"热度":112,
"发布时间":"2022-04-23 14:01:36.766",
"标题":"穿越火线CFV电信区V湖南电信一区",
"图片数量":"2图"
},
{
"账号信息":"[等级39 11-20级 有QQ好友] 爆破四防王者魅影, 王心觉醒",
"售价":"$ 232.00",
"热度":288,
"发布时间":"2022-04-23 14:01:36.823",
"标题":"穿越火线CFV网通区V河北一区",
"图片数量":"2图"
},
{
"账号信息":"[等级100 11-20级 无QQ好友] cf北部超值账号",
"售价":"$ 580.00",
"热度":233,
"发布时间":"2022-04-23 14:01:36.933",
"标题":"穿越火线CFV网通区V山东一区",
"图片数量":"12图"
}
]
}
复制代码

 

pom.xml

复制代码
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>test4-8</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink.version>1.13.0</flink.version>
        <hadoop.version>3.1.3</hadoop.version>
        <scala.version>2.12</scala.version>
        <scala.binary.version>2.11</scala.binary.version>
    </properties>
    <dependencies>
        <!--flink-java-core-stream-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.22</version>
        </dependency>
        <!--jedis-->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.3.0</version>
        </dependency>
        <!--fastjson-->
       <dependency>
           <groupId>com.alibaba</groupId>
           <artifactId>fastjson</artifactId>
           <version>1.2.60</version>
       </dependency>
      <!--flink SQL table api-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!--cep-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep-scala_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!--csv-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!--sink kafka-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!--sink hadoop hdfs-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-filesystem_${scala.binary.version}</artifactId>
            <version>1.4.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hadoop-compatibility_${scala.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <!--sink mysql-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc_${scala.version}</artifactId>
            <version>1.9.2</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>
        <!--sink数据到hbse-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hbase_${scala.version}</artifactId>
            <version>1.8.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.4.3</version>
        </dependency>
        <!--jdbc sink clickhouse-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.version}</artifactId>
            <version>1.13.0</version>
        </dependency>
        <dependency>
            <groupId>ru.yandex.clickhouse</groupId>
            <artifactId>clickhouse-jdbc</artifactId>
            <version>0.2.4</version>
        </dependency>
        <!--Guava工程包含了若干被Google的Java项目广泛依赖的核心库,方便开发-->
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>30.1.1-jre</version>
        </dependency>
        <!--jdbc sink Clickhouse exclusion-->
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-connector-jdbc_${scala.version}</artifactId>
             <version>${flink.version}</version>
         </dependency>
         <dependency>
             <groupId>ru.yandex.clickhouse</groupId>
             <artifactId>clickhouse-jdbc</artifactId>
             <version>0.2.4</version>
             <exclusions>
                 <exclusion>
                     <groupId>com.fasterxml.jackson.core</groupId>
                     <artifactId>jackson-databind</artifactId>
                 </exclusion>
                 <exclusion>
                     <groupId>com.fasterxml.jackson.core</groupId>
                     <artifactId>jackson-core</artifactId>
                 </exclusion>
             </exclusions>
         </dependency>
        <!-- Flink连接redis的连接包-->
         <dependency>
             <groupId>org.apache.bahir</groupId>
             <artifactId>flink-connector-redis_2.11</artifactId>
             <version>1.0</version>
         </dependency>
        <!--jedis-->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.3.0</version>
        </dependency>
        <!--sink es-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch7_${scala.version}</artifactId>
            <version>1.10.1</version>
        </dependency>
    </dependencies>
</project>
复制代码

 

原文链接:https://www.cnblogs.com/chang09/p/16416409.html