HUE中执行Flink SQL

发布时间 2024-01-08 16:08:50作者: 粒子先生

要解决的问题

  • 解决HUE操作大数据量Hive表时,执行报错的问题。
  • 为开发人员或数据管理人员提供一个便捷的Flink SQL交互式查询工具,简化SQL验证的流程,进而提高工作效率。

功能介绍

  • Flink SQL 读写Kafka
  • Flink SQL 读写HDFS
  • Flink SQL 关联
  • Flink SQL 聚合
  • Flink SQL 使用HiveCatalog读写Hive表
-- Flink SQL 读写Kafka
CREATE TABLE `kafka_test_01`(
 `caseId` string,
 `sourceUrl` string
 ) with (
 'connector' = 'kafka',
 'topic' = 'kafka_test_01',
 'properties.group.id' = 'kafka_test_01',
 'properties.bootstrap.servers' = '172.31.3.10:32058',
 'format' = 'json',
 'scan.startup.mode' = 'earliest-offset'
 )
     
insert into kafka_test_01 values('a', 'b')
    
-- Flink SQL 读写HDFS
drop table hdfs_pt_test
   
CREATE TABLE hdfs_pt_test (
 uniqid1 STRING,
 uniqid2 STRING,
 dt STRING
) PARTITIONED BY (dt) WITH (
 'connector'='filesystem',
 'path'='hdfs://172.31.6.30:9000/dw/ods/hdfs_pt_test',
 'format'='json',
 'sink.partition-commit.delay'='1 h',
 'sink.partition-commit.policy.kind'='success-file'
)
    
insert into hdfs_pt_test values('a', 'b', 'c')
select * from hdfs_pt_test where dt = 'c'
  
 
-- Flink SQL JOIN验证
CREATE TABLE hdfs_join01_test (
 uniqid STRING,
 name STRING
) WITH (
 'connector'='filesystem',
 'path'='hdfs://172.31.6.30:9000/dw/ods/hdfs_pt_join01_test',
 'format'='json',
 'sink.partition-commit.delay'='1 h',
 'sink.partition-commit.policy.kind'='success-file'
)
insert into hdfs_join01_test values('1', 'aaa')
insert into hdfs_join01_test values('2', 'bbb')
insert into hdfs_join01_test values('3', 'ccc')
select * from hdfs_join01_test
 
CREATE TABLE hdfs_join02_test (
 uniqid STRING,
 age STRING
) WITH (
 'connector'='filesystem',
 'path'='hdfs://172.31.6.30:9000/dw/ods/hdfs_pt_join02_test',
 'format'='json',
 'sink.partition-commit.delay'='1 h',
 'sink.partition-commit.policy.kind'='success-file'
)
insert into hdfs_join02_test values('1', '18')
insert into hdfs_join02_test values('2', '20')
select * from hdfs_join02_test
 
SELECT a.uniqid, a.name, b.age FROM hdfs_join01_test a  INNER JOIN hdfs_join02_test b ON a.uniqid = b.uniqid
 
-- Flink SQL GROUP验证
DROP TABLE hdfs_group01_test
 
CREATE TABLE hdfs_group01_test (
 name STRING,
 `value` INTEGER
) WITH (
 'connector'='filesystem',
 'path'='hdfs://172.31.6.30:9000/dw/ods/hdfs_group01_test',
 'format'='json',
 'sink.partition-commit.delay'='1 h',
 'sink.partition-commit.policy.kind'='success-file'
)
 
insert into hdfs_group01_test values('aaa', 100)
insert into hdfs_group01_test values('aaa', 100)
insert into hdfs_group01_test values('bbb', 100)
insert into hdfs_group01_test values('ccc', 600)
 
SELECT name, SUM(`value`) as v FROM hdfs_group01_test GROUP BY name
select count(*) from hdfs_group01_test
 
-- Flink SQL 使用HiveCatalog读写Hive表
show catalogs
use catalog myhive
show databases
use dw
show tables
 
select * from aquaticanimal
insert into dwd_cpws_general_result01 select * from dwd_cpws_general where province='100000' limit 100
insert into dwd_pil_ps_test01 select * from dwd_pil_ps

 

功能列表

statement
comment
SHOW CATALOGS List all registered catalogs
SHOW DATABASES List all databases in the current catalog
SHOW TABLES List all tables and views in the current database of the current catalog
SHOW VIEWS List all views in the current database of the current catalog
SHOW FUNCTIONS List all functions
SHOW MODULES List all modules
USE CATALOG catalog_name Set a catalog with given name as the current catalog
USE database_name Set a database with given name as the current database of the current catalog
CREATE TABLE table_name ... Create a table with a DDL statement
DROP TABLE table_name Drop a table with given name
ALTER TABLE table_name Alter a table with given name
CREATE DATABASE database_name ... Create a database in current catalog with given name
DROP DATABASE database_name ... Drop a database with given name
ALTER DATABASE database_name ... Alter a database with given name
CREATE VIEW view_name AS ... Add a view in current session with SELECT statement
DROP VIEW view_name ... Drop a table with given name
SET xx=yy Set given key's session property to the specific value
SET List all session's properties
RESET ALL Reset all session's properties set by SET command
DESCRIBE table_name Show the schema of a table
EXPLAIN PLAN FOR ... Show string-based explanation about AST and execution plan of the given statement
SELECT ... Submit a Flink SELECT SQL job
INSERT INTO ... Submit a Flink INSERT INTO SQL job
INSERT OVERWRITE ... Submit a Flink INSERT OVERWRITE SQL job

架构图

安装过程

安装HUE并配置连接Hive&Flink&TIDB

注意事项

  • Flink运行环境需要集成所需的第三方Jar包,包括Hadoop,Hive等相关Jar包,其中Hive需要源码重新编译,解决Guava版本冲突问题。
  • 修改Flink-sql-gateway源码,限制使用Catalog方式读写Hive表的最大并行度。
  • 目前HUE连接Flink SQL在聚合操作方面支持的不好,语法存在差异,需进一步探索。

其他尝试

HUE连接TIDB

执行环境

http://hue.dw.jcinfo.com/hue/editor/?type=flink

http://172.31.3.13:31741/#/overview

http://hdp0.dw.jcinfo.com:50070/dfshealth.html#tab-overview

http://metadata.jcinfo.com/#/login?redirect=%2Fredirect%2Fdatamodel%2Findex

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/sql/queries.html#%E8%81%9A%E5%90%88