Flink与mysql结合

发布时间 2023-04-01 16:31:26作者: 曹军

  在流式计算中,不是有时候需要和mysql进行结合做一些处理。

 

1.调用其他方法进行

  

 

 

2.更快的处理

  使用guava本地缓存

  对msql的操作是new对象过来

    private final static RuleService ruleService = new RuleService();

    final static Cache<Long, Map<Long,CustomerVerifyConfig>> verifyConfigCache = CacheBuilder.newBuilder()
            //设置cache的初始大小为10,要合理设置该值
            .initialCapacity(1000)
            //设置并发数为5,即同一时间最多只能有5个线程往cache执行写入操作
            .concurrencyLevel(5)
            //设置cache中的数据在写入之后的存活时间为10秒
            .expireAfterWrite(5, TimeUnit.MINUTES)
            //设置缓存最大容量为9999,超过100之后就会按照LRU最近虽少使用算法来移除缓存项
            .maximumSize(9999)
            //构建cache实例
            .build();


    public static CustomerVerifyConfig getVerifyConfigByCustomerId(Long customerId){
        Map<Long,CustomerVerifyConfig> customerVerifyConfigMap = verifyConfigCache.getIfPresent(CUSTOMER_VERIFY_CONFIG_KEY);
        if (customerVerifyConfigMap != null){
            return customerVerifyConfigMap.get(customerId);
        } else {
            List<CustomerVerifyConfig> customerVerifyConfigList = ruleService.queryCustomerVerifyConfig();
            if (CollectionUtils.isEmpty(customerVerifyConfigList)){
                verifyConfigCache.put(CUSTOMER_VERIFY_CONFIG_KEY, new HashMap<>());
                return null;
            } else {
                Map<Long, CustomerVerifyConfig> configMap = customerVerifyConfigList.stream().collect(Collectors.toMap(CustomerVerifyConfig::getCustomerId, Function.identity(), (k1, k2) -> k2));
                verifyConfigCache.put(CUSTOMER_VERIFY_CONFIG_KEY, configMap);
                return configMap.get(customerId);
            }
        }
    }

 

3.引入依赖

        <!--数据库连接池-->
        <dependency>
            <groupId>com.zaxxer</groupId>
            <artifactId>HikariCP</artifactId>
            <version>4.0.3</version>
        </dependency>


        <!--MYSQL驱动-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>

 

4.配置

application.properties

## 数据库配置
spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.driverClassName = com.mysql.jdbc.Driver
spring.datasource.url = jdbc:mysql://localhost:3306/ssm?useUnicode=true&characterEncoding=utf-8&useSSL=false
spring.datasource.username = root
spring.datasource.password = root
##  Hikari 连接池配置 ------ 详细配置请访问:https://github.com/brettwooldridge/HikariCP
## 最小空闲连接数量
spring.datasource.hikari.minimum-idle=5
## 空闲连接存活最大时间,默认600000(10分钟)
spring.datasource.hikari.idle-timeout=180000
## 连接池最大连接数,默认是10
spring.datasource.hikari.maximum-pool-size=10
## 此属性控制从池返回的连接的默认自动提交行为,默认值:true
spring.datasource.hikari.auto-commit=true
## 连接池名称
spring.datasource.hikari.pool-name=MyHikariCP
## 此属性控制池中连接的最长生命周期,值0表示无限生命周期,默认1800000即30分钟
spring.datasource.hikari.max-lifetime=1800000
## 数据库连接超时时间,默认30秒,即30000
spring.datasource.hikari.connection-timeout=30000
spring.datasource.hikari.connection-test-query=SELECT 1
spring.datasource.hikari.pool-name=hikariXXXXDbPool

 

Manager配置

@Slf4j
public class HikariPoolManager {

    private static final HikariDataSource dataSource;

    static {
        Config load = ConfigFactory.load();
        dataSource = new HikariDataSource();
        dataSource.setDriverClassName(load.getString("datasource.driver-class-name"));
        dataSource.setJdbcUrl(load.getString("datasource.url"));
        dataSource.setUsername(load.getString("datasource.username"));
        dataSource.setPassword(load.getString("datasource.password"));
        dataSource.setMinimumIdle(load.getInt("datasource.hikari.minimum-idle"));
        dataSource.setMaximumPoolSize(load.getInt("datasource.hikari.maximum-pool-size"));
        dataSource.setAutoCommit(load.getBoolean("datasource.hikari.auto-commit"));
        dataSource.setIdleTimeout(load.getInt("datasource.hikari.idle-timeout"));
        dataSource.setPoolName(load.getString("datasource.hikari.pool-name"));
        dataSource.setMaxLifetime(load.getInt("datasource.hikari.max-lifetime"));
        dataSource.setConnectionTimeout(load.getInt("datasource.hikari.connection-timeout"));
        dataSource.setConnectionTestQuery(load.getString("datasource.hikari.connection-test-query"));
    }

    public static Connection getConnection() {
        try {
            return dataSource.getConnection();
        } catch (SQLException e) {
            log.error("连接数据库失败", e);
        }
        return null;
    }

    public static void close(Connection con) {
        try {
            if (con != null) {
                con.close();
            }
        } catch (Exception e) {
            log.error("=======>close Mysql connection cause error,es={}", e.getMessage());
        }

    }

}

 

5.使用查询

    public List<CustomerVerifyConfig> queryCustomerVerifyConfig(){
        List<CustomerVerifyConfig> customerVerifyConfigList = new ArrayList<>();
        // 查询
        Connection connection = HikariPoolManager.getConnection();
        String sql = "select customer_id, offline_switch from bme_verify_db.t_customer_verify_config";
        log.info("getCustomerVerifyConfig sql == {}", sql);
        try {
            PreparedStatement preparedStatement = connection.prepareStatement(sql);
            ResultSet resultSet = preparedStatement.executeQuery();
            while (resultSet.next()) {
                CustomerVerifyConfig customerVerifyConfig = new CustomerVerifyConfig();
                customerVerifyConfig.setCustomerId(resultSet.getLong(1));
                customerVerifyConfig.setOfflineSwitch(resultSet.getInt(2));
            
                customerVerifyConfigList.add(customerVerifyConfig);
            }

        } catch (Exception e) {
            log.error("获取核查配置异常", e);
        } finally {
            HikariPoolManager.close(connection);
        }
        return customerVerifyConfigList;
    }