Kingbase ES 自定义聚合函数浅析

发布时间 2023-09-18 14:27:02作者: KINGBASE研究院

文章概要:

基于前面的博文《Kingbase ES 自定义聚合函数和一次改写案例》这篇文章,我们只考虑了自定义聚合函数非并行的情况,
因此,本篇文章将着重解析一下使用PLPGSQL编写并行聚合函数,同时对比了非并行聚合函数的运行效果。

一,KES自定义聚合函数入门解析

如同前面的文章,KES能支持的聚合函数(PLPGSQL)语法罗列如下:

CREATE AGGREGATE name ( [ argmode ] [ argname ] arg_data_type [ , ... ] ) (  
  SFUNC = sfunc,    ---迭代函数,每行数据迭代调用计算结果
  STYPE = state_data_type    ----聚合函数状态值的数据类型
  [ , SSPACE = state_data_size ]  
  [ , FINALFUNC = ffunc ]   ----每组的最终计算函数,可选
  [ , FINALFUNC_EXTRA ]  
  [ , COMBINEFUNC = combinefunc ]   ---部分聚合,COMBINEFUNC函数,并行聚合需要该函数
  [ , SERIALFUNC = serialfunc ]  
  [ , DESERIALFUNC = deserialfunc ]  
  [ , INITCOND = initial_condition ]  ---INITCOND是第一次调用SFUNC给第一个参数的传值,可以不写。
  [ , MSFUNC = msfunc ]  
  [ , MINVFUNC = minvfunc ]  
  [ , MSTYPE = mstate_data_type ]  
  [ , MSSPACE = mstate_data_size ]  
  [ , MFINALFUNC = mffunc ]  
  [ , MFINALFUNC_EXTRA ]  
  [ , MINITCOND = minitial_condition ]  
  [ , SORTOP = sort_operator ]  
  [ , PARALLEL = { SAFE | RESTRICTED | UNSAFE } ]   ----SAFE并行聚合
)  

其中重点需要讲的是:

聚合函数是每组独立计算的,比如一张teachers表,按照age字段进行聚合(GROUP BY age),那么就会分4组,4组分别内部进行计算。

--teachers表如下:
 teacher_id | teacher_name | age |  sal  | gender | title  | position | department
------------+--------------+-----+----------+--------+----------+------------+------------
  10001001 | 陈思宇    | 46 | 15689.00 | 男   | 特级教师 | 校长    | 校长室
  10001002 | 文强     | 44 | 29942.00 | 男   | 特级教师 | 副校长   | 校长室
  10001003 | 吴玲     | 41 | 29142.00 | 女   | 高级教师 | 办公室主任 | 办公室
  10001004 | 章丽     | 41 | 28242.00 | 女   | 高级教师 | 教务处主任 | 教务处
  10001005 | 张志东    | 41 | 28242.00 | 男   | 高级教师 | 财务处主任 | 财务处
  10001006 | 熊浩宇    | 49 | 28356.00 | 女   | 一级教师 | 招生办主任 | 招生办
  10001007 | 朱雯     | 49 | 24016.00 | 女   | 一级教师 | 招生办助理 | 招生办
  10001008 | 张志强    | 49 | 23964.00 | 女   | 一级教师 | 财务处助理 | 财务处
  10001009 | 朱国斌    | 49 | 21974.00 | 男   | 二级教师 | 财务处助理 | 财务处

1,非并行自定义聚合函数

1),SFUNC迭代函数,假设自定义SFUNC函数如下:

CREATE OR REPLACE FUNCTION YOUR_SFUNC_NAME (numeric, numeric, numeric......)
RETURNS numeric    --返回的数据类型就是STYPE
as
begin
    .......(省略)
end

这个函数就是每行数据的迭代函数

参数一:$1, 上一次迭代的计算结果;
参数二:$2, YOUR_AGGREGATE_NAME的第一个参数(如果聚合函数传入的是表的列,则表示当前行数据)
参数三:$3, YOUR_AGGREGATE_NAME的第二个参数(如果是个固定值,比如数值2,则每次传入2)
参数四:$4, YOUR_AGGREGATE_NAME的第三个参数

...........

2),FINALFUNC最终函数,假设自定义FINALFUNC函数如下:

CREATE OR REPLACE FUNCTION YOUR_FINALFUNC_NAME (numeric)    ---只有一个参数,SFUNC函数返回的是numeric,所以这里是numeric
RETURNS numeric   
as
begin
    .......(省略)
end

通过YOUR_SFUNC_NAME函数将每组计算完后,,最后调用一次,也就是说最后还可以进行一次规则计算。

3),YOUR_AGGREGATE_NAME聚合函数
聚合函数的定义会引用到上述YOUR_SFUNC_NAME和YOUR_FINALFUNC_NAME函数,最终达成特定目标的计算。

一个典型的非并行举例说明;

CREATE AGGREGATE YOUR_AGGREGATE_NAME(numeric, numeric)    ---接受两个参数,会作为SFUNC函数的后两个参数
(
        INITCOND = xxxx,    ---INITCOND是第一次调用YOUR_SFUNC_NAME函数,给第一个参数的传值xxxx,可以不写。
        STYPE = numeric,     ---聚合函数返回的数据类型numeric(仅举例示范)
        SFUNC = YOUR_SFUNC_NAME,    ---每组的自定义迭代函数
        FINALFUNC = YOUR_FINALFUNC_NAME   ---每组的最终函数
);

按照上面这个聚合函数的定义,一个简单的非并行聚合函数,运行流程基本流程就是:
A,使用迭代SFUNC函数对每组数据进行迭代计算
B,每组数据计算完成后,使用FINALFUNC最终计算函数将每组的最后一次的SFUNC结果再自定义计算一次,获得最终结果

简单写一个流程图就是:
sfunc( internal-state, next-data-values ) ---> next-internal-state --(必须)
--反复调用sfunc函数,最后获得next-internal-state最后状态值
ffunc( next-internal-state ) ---> final-aggregate-value --(可选)
--每组都最后调用一次ffunc函数,根据sfunc函数提供的next-internal-state最后状态值,传入ffunc函数,
--最终计算获得final-aggregate-value 值

一个具体的例子1-taxi聚合函数:

t_taxi表的背景:
模拟保存了路程数据,记录了司机的两单数据,第一单ID=1记录了三段路程,需要合并计算。
价格计算规则是:起步价3.5,且每公里2.2,最后每单向上取整。
第一单的价格就应该是: 3.5+3.4x2.2+5.3x2.2+2.9x2.2=29.02,向上取整就是30

CREATE TABLE t_taxi(trip_id int, km numeric);

insert into t_taxi values (1, 3.4);
insert into t_taxi values (1, 5.3);
insert into t_taxi values (1, 2.9);
insert into t_taxi values (2, 9.3);
insert into t_taxi values (2, 1.6);
insert into t_taxi values (2, 4.3);

 trip_id | km  
---------+-----
       1 | 3.4
       1 | 5.3
       1 | 2.9
       2 | 9.3
       2 | 1.6
       2 | 4.3
————————————————

CREATE OR REPLACE FUNCTION taxi_accum (numeric, numeric, numeric)
RETURNS numeric AS
$$
BEGIN
    RAISE NOTICE 'prev:[%] curr:(%) outer:(%) return:(%)', $1, $2, $3, $1 + $2 * $3;
    RETURN $1 + $2 * $3;
END;
$$
LANGUAGE 'plpgsql';


CREATE OR REPLACE FUNCTION taxi_final (numeric)
RETURNS numeric AS
$$
BEGIN
    RAISE NOTICE 'final:(%) return:(%)', $1, round($1 + 5, -1);
    RETURN round($1 + 5, -1);
END;
$$
LANGUAGE 'plpgsql';

CREATE AGGREGATE taxi(numeric, numeric)
(
        SFUNC = taxi_accum,
        STYPE = numeric,
        FINALFUNC = taxi_final,
        INITCOND = 3.50
);

--测试
test=# SELECT  trip_id, taxi(km, 2.20), 3.50 + sum(km)*2.2 AS manual FROM t_taxi GROUP BY trip_id;
test-# /
NOTICE:  prev:[3.50] curr:(3.4) outer:(2.20) return:(10.980)
NOTICE:  prev:[10.980] curr:(5.3) outer:(2.20) return:(22.640)
NOTICE:  prev:[22.640] curr:(2.9) outer:(2.20) return:(29.020)--对trip_id为1的组进行SFUNC函数迭代计算
NOTICE:  prev:[3.50] curr:(9.3) outer:(2.20) return:(23.960)
NOTICE:  prev:[23.960] curr:(1.6) outer:(2.20) return:(27.480)
NOTICE:  prev:[27.480] curr:(4.3) outer:(2.20) return:(36.940)--对trip_id为2的组进行SFUNC函数迭代计算
NOTICE:  final:(29.020) return:(30)--对trip_id为1的组将SFUNC函数返回的结果(参数第一个值)进行最终计算
NOTICE:  final:(36.940) return:(40)--对trip_id为2的组将SFUNC函数返回的结果(参数第一个值)进行最终计算
 trip_id | taxi | manual
---------+------+--------
       1 |   30 |  29.02
       2 |   40 |  36.94
(2 rows)

Time: 1.775 ms

--可以看到基本的聚合函数(非并行)流程就是
--1,使用SFUNC函数对每组数据进行迭代计算
--2,每组数据计算完成后,使用FINALFUNC将每组的SFUNC结果

2,并行自定义聚合函数

4),COMBINEFUNC部分聚合函数(可选,但是并行必须),并行需要:

combinefunc函数是可选的,如果提供,则必须被声明为有两个 state_data_type 参数并且返回一个state_data_type 值的函数。

一个combinefunc函数包含在输入值某个子集上的聚集结果,它会产生一个新的 state_data_type来表示在两个输入集上的集合结果。
这个函数可以被看做是一个sfunc,和在输入行上一行一行迭代计算不同,这个函数是把部分聚合状态值聚集到另一个部分聚合状态值上。

如下例子:

$1就是多次部分聚合值(他的初始值就是INITCOND),$2就是一次新的部分聚合值,

CREATE OR REPLACE FUNCTION YOUR_COMBINEFUNC_NAME (numeric,numeric)    
RETURNS numeric   
as
begin
    .......(省略)
end

5),PARALLEL并行参数

如果希望自定义聚合函数能并行运行,PARALLEL设置为SAFE是必要的。

所以如果希望自定义聚合函数能并行运行,则可以使用如下基本定义形式:

CREATE AGGREGATE YOUR_AGGREGATE_NAME(numeric, numeric)    ---接受两个参数,会作为SFUNC函数的后两个参数
(
        INITCOND = xxxx,    ---INITCOND是第一次调用YOUR_SFUNC_NAME函数,给第一个参数的传值xxxx,可以不写。
        STYPE = numeric,     ---聚合函数返回的数据类型numeric(仅举例示范)
        SFUNC = YOUR_SFUNC_NAME,    ---每组的自定义迭代函数
        FINALFUNC = YOUR_FINALFUNC_NAME,   ---每组的小结函数
        COMBINEFUNC = YOUR_COMBINEFUNC_NAME,   ---并行聚合必须写该函数,可选项
        PARALLEL = SAFE   ---并行聚合必须有该参数,可选项
);

一个很容易理解的并行聚合例子2-sum2函数

创建自定义聚合函数sum2(就是sum()函数的功能),

create or replace function func_numadd(NUMERIC,NUMERIC) 
returns NUMERIC 
as 
$$  
BEGIN
  raise notice 'pre:(%),cur:(%)',$1,$2;
  return $1+$2; 
END  ;
$$ language plpgsql strict parallel safe;  

create or replace function func_numadd_combin(NUMERIC,NUMERIC) 
returns NUMERIC 
as 
$$  
BEGIN
  raise notice 'combin pre:(%),cur:(%)',$1,$2;
  return $1+$2; 
END  ;
$$ language plpgsql parallel safe;  

create or replace function func_numadd_final(NUMERIC) 
returns NUMERIC 
as 
$$  
BEGIN
  raise notice 'final cur:(%)',$1;
  return $1; 
END  ;
$$ language plpgsql strict parallel safe;  

drop aggregate sum2 (NUMERIC);
create aggregate sum2 (NUMERIC) 
(
    SFUNC = func_numadd, 
    STYPE = NUMERIC, 
    FINALFUNC = func_numadd_final,
    COMBINEFUNC = func_numadd_combin, 
    PARALLEL = safe,
    INITCOND = 0
);   

---配置并发参数

set max_parallel_workers=4;   
set max_parallel_workers_per_gather =4;   
set parallel_setup_cost =0;   
set parallel_tuple_cost =0;   
alter table t_taxi set (parallel_workers =4);  

--创建表和数据

CREATE TABLE t_taxi(trip_id int, km numeric);

delete from t_taxi;

declare
    rr numeric(5,1);
begin
    for k in 1..4 loop
        for i in 1..1000 loop
            SELECT random()*10 into rr ;
            insert into t_taxi values (k, rr);
        end loop;
    end loop;    
end;


--查看数据
test=# select * from t_taxi;
test-# /
 trip_id | km
---------+-----
       1 | 9.9
       1 | 1.1
       1 | 6.2
       1 | 2.4
       1 | 0.5
       1 | 4.9
       1 | 6.3
       1 | 5.1
       1 | 0.0
       1 | 4.8
       2 | 5.0
       2 | 1.9
       2 | 0.3
       2 | 4.3
       2 | 0.7
       2 | 3.0
       2 | 0.9
       2 | 7.0
       2 | 0.8
       2 | 3.8
       3 | 1.8
       3 | 2.9
       3 | 6.2
       3 | 0.5
       3 | 1.1
       3 | 5.0
       3 | 4.9
       3 | 7.7
       3 | 2.5
       3 | 2.5
       4 | 4.8
       4 | 2.7
       4 | 6.5
       4 | 2.7
       4 | 2.8
       4 | 0.2
       4 | 8.2
       4 | 7.6
       4 | 0.0
       4 | 4.0
(40 rows)

Time: 7.798 ms

---基本的正确性验证:运行结果和sum()函数一样

test=# select sum2(km) from t_taxi ;
test-# /
 sum2
-------
 143.5
(1 row)

Time: 7.432 ms
test=# select sum(km) from t_taxi ;
test-# /
  sum
-------
 143.5
(1 row)

Time: 7.820 ms

---不进行分组的聚合函数

test=# explain (analyze,verbose,timing,costs,buffers)   select sum2(km) from t_taxi ;
test-# /
NOTICE:  pre:(0),cur:(9.9)
NOTICE:  pre:(9.9),cur:(1.1)
NOTICE:  pre:(11.0),cur:(6.2)
NOTICE:  pre:(17.2),cur:(2.4)
NOTICE:  pre:(19.6),cur:(0.5)
NOTICE:  pre:(20.1),cur:(4.9)
NOTICE:  pre:(25.0),cur:(6.3)
NOTICE:  pre:(31.3),cur:(5.1)
NOTICE:  pre:(36.4),cur:(0.0)
NOTICE:  pre:(36.4),cur:(4.8)
NOTICE:  pre:(41.2),cur:(5.0)
NOTICE:  pre:(46.2),cur:(1.9)
NOTICE:  pre:(48.1),cur:(0.3)
NOTICE:  pre:(48.4),cur:(4.3)
NOTICE:  pre:(52.7),cur:(0.7)
NOTICE:  pre:(53.4),cur:(3.0)
NOTICE:  pre:(56.4),cur:(0.9)
NOTICE:  pre:(57.3),cur:(7.0)
NOTICE:  pre:(64.3),cur:(0.8)
NOTICE:  pre:(65.1),cur:(3.8)
NOTICE:  pre:(68.9),cur:(1.8)
NOTICE:  pre:(70.7),cur:(2.9)
NOTICE:  pre:(73.6),cur:(6.2)
NOTICE:  pre:(79.8),cur:(0.5)
NOTICE:  pre:(80.3),cur:(1.1)
NOTICE:  pre:(81.4),cur:(5.0)
NOTICE:  pre:(86.4),cur:(4.9)
NOTICE:  pre:(91.3),cur:(7.7)
NOTICE:  pre:(99.0),cur:(2.5)
NOTICE:  pre:(101.5),cur:(2.5)
NOTICE:  pre:(104.0),cur:(4.8)
NOTICE:  pre:(108.8),cur:(2.7)
NOTICE:  pre:(111.5),cur:(6.5)
NOTICE:  pre:(118.0),cur:(2.7)
NOTICE:  pre:(120.7),cur:(2.8)
NOTICE:  pre:(123.5),cur:(0.2)
NOTICE:  pre:(123.7),cur:(8.2)
NOTICE:  pre:(131.9),cur:(7.6)
NOTICE:  pre:(139.5),cur:(0.0)
NOTICE:  pre:(139.5),cur:(4.0)
NOTICE:  combin pre:(0),cur:(143.5)
NOTICE:  combin pre:(143.5),cur:(0)
NOTICE:  combin pre:(143.5),cur:(0)
NOTICE:  combin pre:(143.5),cur:(0)
NOTICE:  combin pre:(143.5),cur:(0)
NOTICE:  final cur:(143.5)
                                                             QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------
 Finalize Aggregate  (cost=25.86..25.87 rows=1 width=32) (actual time=7.074..7.147 rows=1 loops=1)
   Output: sum2(km)
   Buffers: shared hit=22
   ->  Gather  (cost=24.60..24.61 rows=4 width=32) (actual time=0.543..6.986 rows=5 loops=1)
         Output: (PARTIAL sum2(km))
         Workers Planned: 4
         Workers Launched: 4
         Buffers: shared hit=22
         --combinefunc函数完成的聚合并行(显示为Partial Aggregate) 
         ->  Partial Aggregate  (cost=24.60..24.61 rows=1 width=32) (actual time=0.067..0.068 rows=1 loops=5)
               Output: PARTIAL sum2(km)
               Buffers: shared hit=22
               Worker 0: actual time=0.007..0.007 rows=1 loops=1
               Worker 1: actual time=0.007..0.007 rows=1 loops=1
               Worker 2: actual time=0.008..0.009 rows=1 loops=1
               Worker 3: actual time=0.007..0.007 rows=1 loops=1
               --并行扫描
               ->  Parallel Seq Scan on public.t_taxi  (cost=0.00..22.10 rows=10 width=6) (actual time=0.005..0.006 rows=8 loops=5)
                     Output: trip_id, km
                     Buffers: shared hit=22
                     Worker 0: actual time=0.001..0.001 rows=0 loops=1
                     Worker 1: actual time=0.001..0.001 rows=0 loops=1
                     Worker 2: actual time=0.001..0.001 rows=0 loops=1
                     Worker 3: actual time=0.001..0.001 rows=0 loops=1
 Planning Time: 0.057 ms
 Execution Time: 7.175 ms
(24 rows)

Time: 7.741 ms

---进行分组的聚合函数

test=# explain (analyze,verbose,timing,costs,buffers) select trip_id,sum2(km) from t_taxi group by trip_id;
test-# /
NOTICE:  pre:(0),cur:(9.9)
NOTICE:  pre:(9.9),cur:(1.1)
NOTICE:  pre:(11.0),cur:(6.2)
NOTICE:  pre:(17.2),cur:(2.4)
NOTICE:  pre:(19.6),cur:(0.5)
NOTICE:  pre:(20.1),cur:(4.9)
NOTICE:  pre:(25.0),cur:(6.3)
NOTICE:  pre:(31.3),cur:(5.1)
NOTICE:  pre:(36.4),cur:(0.0)
NOTICE:  pre:(36.4),cur:(4.8)
NOTICE:  combin pre:(0),cur:(41.2)
NOTICE:  pre:(0),cur:(5.0)
NOTICE:  pre:(5.0),cur:(1.9)
NOTICE:  pre:(6.9),cur:(0.3)
NOTICE:  pre:(7.2),cur:(4.3)
NOTICE:  pre:(11.5),cur:(0.7)
NOTICE:  pre:(12.2),cur:(3.0)
NOTICE:  pre:(15.2),cur:(0.9)
NOTICE:  pre:(16.1),cur:(7.0)
NOTICE:  pre:(23.1),cur:(0.8)
NOTICE:  pre:(23.9),cur:(3.8)
NOTICE:  final cur:(41.2)
NOTICE:  combin pre:(0),cur:(27.7)
NOTICE:  pre:(0),cur:(1.8)
NOTICE:  pre:(1.8),cur:(2.9)
NOTICE:  pre:(4.7),cur:(6.2)
NOTICE:  pre:(10.9),cur:(0.5)
NOTICE:  pre:(11.4),cur:(1.1)
NOTICE:  pre:(12.5),cur:(5.0)
NOTICE:  pre:(17.5),cur:(4.9)
NOTICE:  pre:(22.4),cur:(7.7)
NOTICE:  pre:(30.1),cur:(2.5)
NOTICE:  pre:(32.6),cur:(2.5)
NOTICE:  final cur:(27.7)
NOTICE:  combin pre:(0),cur:(35.1)
NOTICE:  pre:(0),cur:(4.8)
NOTICE:  pre:(4.8),cur:(2.7)
NOTICE:  pre:(7.5),cur:(6.5)
NOTICE:  pre:(14.0),cur:(2.7)
NOTICE:  pre:(16.7),cur:(2.8)
NOTICE:  pre:(19.5),cur:(0.2)
NOTICE:  pre:(19.7),cur:(8.2)
NOTICE:  pre:(27.9),cur:(7.6)
NOTICE:  pre:(35.5),cur:(0.0)
NOTICE:  pre:(35.5),cur:(4.0)
NOTICE:  final cur:(35.1)
NOTICE:  combin pre:(0),cur:(39.5)
NOTICE:  final cur:(39.5)

SFUNC函数对每组进行迭代计算


                                                            QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------
 Finalize GroupAggregate  (cost=22.32..30.22 rows=4 width=36) (actual time=7.463..7.753 rows=4 loops=1)
   Output: trip_id, sum2(km)
   Group Key: t_taxi.trip_id
   Buffers: shared hit=138
   ->  Gather Merge  (cost=22.32..25.14 rows=16 width=36) (actual time=7.243..7.588 rows=4 loops=1)
         Output: trip_id, (PARTIAL sum2(km))
         Workers Planned: 4
         Workers Launched: 4
         Buffers: shared hit=138
         --combinefunc函数完成的聚合并行(显示为Partial Aggregate) 
         ->  Partial GroupAggregate  (cost=22.27..24.86 rows=4 width=36) (actual time=0.111..0.156 rows=1 loops=5)
               Output: trip_id, PARTIAL sum2(km)
               Group Key: t_taxi.trip_id
               Buffers: shared hit=138
               Worker 0: actual time=0.101..0.101 rows=0 loops=1
                 Buffers: shared hit=29
               Worker 1: actual time=0.095..0.095 rows=0 loops=1
                 Buffers: shared hit=29
               Worker 2: actual time=0.096..0.096 rows=0 loops=1
                 Buffers: shared hit=29
               Worker 3: actual time=0.118..0.119 rows=0 loops=1
                 Buffers: shared hit=29
               --排序
               ->  Sort  (cost=22.27..22.29 rows=10 width=10) (actual time=0.087..0.088 rows=8 loops=5)
                     Output: trip_id, km
                     Sort Key: t_taxi.trip_id
                     Sort Method: quicksort  Memory: 26kB
                     Worker 0:  Sort Method: quicksort  Memory: 25kB
                     Worker 1:  Sort Method: quicksort  Memory: 25kB
                     Worker 2:  Sort Method: quicksort  Memory: 25kB
                     Worker 3:  Sort Method: quicksort  Memory: 25kB
                     Buffers: shared hit=138
                     Worker 0: actual time=0.099..0.099 rows=0 loops=1
                       Buffers: shared hit=29
                     Worker 1: actual time=0.093..0.093 rows=0 loops=1
                       Buffers: shared hit=29
                     Worker 2: actual time=0.094..0.094 rows=0 loops=1
                       Buffers: shared hit=29
                     Worker 3: actual time=0.116..0.116 rows=0 loops=1
                       Buffers: shared hit=29
                       --并行扫描
                     ->  Parallel Seq Scan on public.t_taxi  (cost=0.00..22.10 rows=10 width=10) (actual time=0.005..0.006 rows=8 loops=5)
                           Output: trip_id, km
                           Buffers: shared hit=22
                           Worker 0: actual time=0.001..0.001 rows=0 loops=1
                           Worker 1: actual time=0.001..0.001 rows=0 loops=1
                           Worker 2: actual time=0.001..0.001 rows=0 loops=1
                           Worker 3: actual time=0.001..0.001 rows=0 loops=1
 Planning Time: 0.059 ms
 Execution Time: 7.792 ms
(47 rows)

Time: 8.844 ms

从这里我们就可以得出(可以把t_taxi的数据再弄多一点就更直观),整体的流程就是:

1,自定义聚合函数把每组数据分成了多个小部分,每个小部分调用SFUNC函数进行迭代计算

2,每个小部分SFUNC函数最终迭代计算得到的和值将传递给COMBINEFUNC函数,再次进行计算

3,最后每组调用FINALFUNC函数获取每组的最终结果。

并行时COMBINEFUNC是否是必须的?

drop aggregate sum2 (NUMERIC);
create aggregate sum2 (NUMERIC) 
(
    SFUNC = func_numadd, 
    STYPE = NUMERIC, 
    FINALFUNC = func_numadd_final,
    --COMBINEFUNC = func_numadd_combin,    --注释掉COMBINEFUNC函数
    PARALLEL = safe,
    INITCOND = 0
);   


test=# explain (analyze,verbose,timing,costs,buffers) select trip_id,sum2(km) from t_taxi group by trip_id;
test-# /
                                                          QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------
 HashAggregate  (cost=32.20..33.24 rows=4 width=36) (actual time=10.156..10.512 rows=4 loops=1)
   Output: trip_id, sum2(km)
   Group Key: t_taxi.trip_id
   Buffers: shared hit=22
   ->  Gather  (cost=0.00..22.10 rows=40 width=10) (actual time=1.280..10.088 rows=40 loops=1)
         Output: trip_id, km
         Workers Planned: 4
         Workers Launched: 4
         Buffers: shared hit=22
         --仅做了并行扫描
         ->  Parallel Seq Scan on public.t_taxi  (cost=0.00..22.10 rows=10 width=10) (actual time=0.006..0.007 rows=8 loops=5)
               Output: trip_id, km
               Buffers: shared hit=22
               Worker 0: actual time=0.001..0.001 rows=0 loops=1
               Worker 1: actual time=0.001..0.001 rows=0 loops=1
               Worker 2: actual time=0.001..0.001 rows=0 loops=1
               Worker 3: actual time=0.001..0.001 rows=0 loops=1
 Planning Time: 0.043 ms
 Execution Time: 10.540 ms
(18 rows)

Time: 11.107 ms

仅并行扫描,未未并行聚合,显然确实是必须的。PARALLEL = safe也同理是必须配置的,如下

drop aggregate sum2 (NUMERIC);
create aggregate sum2 (NUMERIC) 
(
    SFUNC = func_numadd, 
    STYPE = NUMERIC, 
    FINALFUNC = func_numadd_final,
    COMBINEFUNC = func_numadd_combin, 
    --PARALLEL = safe,
    INITCOND = 0
);   

test=# explain (analyze,verbose,timing,costs,buffers) select trip_id,sum2(km) from t_taxi group by trip_id;

                                                   QUERY PLAN
-----------------------------------------------------------------------------------------------------------------
 HashAggregate  (cost=32.50..33.54 rows=4 width=36) (actual time=0.336..0.356 rows=4 loops=1)
   Output: trip_id, sum2(km)
   Group Key: t_taxi.trip_id
   Buffers: shared hit=22
   ->  Seq Scan on public.t_taxi  (cost=0.00..22.40 rows=40 width=10) (actual time=0.019..0.024 rows=40 loops=1)
         Output: trip_id, km
         Buffers: shared hit=22
 Planning Time: 0.037 ms
 Execution Time: 0.378 ms
(9 rows)

Time: 1.623 ms

既不并行扫描,也未并行聚合。

那自定义函数必须配置parallel safe吗?

create or replace function func_numadd(NUMERIC,NUMERIC) 
returns NUMERIC 
as 
$$  
BEGIN
  raise notice 'pre:(%),cur:(%)',$1,$2;
  return $1+$2; 
END  ;
$$ language plpgsql ;      --没有配置parallel safe

create or replace function func_numadd_combin(NUMERIC,NUMERIC) 
returns NUMERIC 
as 
$$  
BEGIN
  raise notice 'combin pre:(%),cur:(%)',$1,$2;
  return $1+$2; 
END  ;
$$ language plpgsql  ;  

create or replace function func_numadd_final(NUMERIC) 
returns NUMERIC 
as 
$$  
BEGIN
  raise notice 'final cur:(%)',$1;
  return $1; 
END  ;
$$ language plpgsql  ;  


drop aggregate sum2 (NUMERIC);
create aggregate sum2 (NUMERIC) 
(
    SFUNC = func_numadd, 
    STYPE = NUMERIC, 
    FINALFUNC = func_numadd_final,
    COMBINEFUNC = func_numadd_combin, 
    PARALLEL = safe,
    INITCOND = 0
);  


test=# explain (analyze,verbose,timing,costs,buffers) select trip_id,sum2(km) from t_taxi group by trip_id;
test-# /
                                                                QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------
 Finalize GroupAggregate  (cost=22.32..30.22 rows=4 width=36) (actual time=7.393..7.685 rows=4 loops=1)
   Output: trip_id, sum2(km)
   Group Key: t_taxi.trip_id
   Buffers: shared hit=138
   ->  Gather Merge  (cost=22.32..25.14 rows=16 width=36) (actual time=7.126..7.435 rows=4 loops=1)
         Output: trip_id, (PARTIAL sum2(km))
         Workers Planned: 4
         Workers Launched: 4
         Buffers: shared hit=138
         --部分聚合
         ->  Partial GroupAggregate  (cost=22.27..24.86 rows=4 width=36) (actual time=0.112..0.157 rows=1 loops=5)
               Output: trip_id, PARTIAL sum2(km)
               Group Key: t_taxi.trip_id
               Buffers: shared hit=138
               Worker 0: actual time=0.101..0.101 rows=0 loops=1
                 Buffers: shared hit=29
               Worker 1: actual time=0.118..0.118 rows=0 loops=1
                 Buffers: shared hit=29
               Worker 2: actual time=0.091..0.092 rows=0 loops=1
                 Buffers: shared hit=29
               Worker 3: actual time=0.099..0.100 rows=0 loops=1
                 Buffers: shared hit=29
                 --排序
               ->  Sort  (cost=22.27..22.29 rows=10 width=10) (actual time=0.086..0.087 rows=8 loops=5)
                     Output: trip_id, km
                     Sort Key: t_taxi.trip_id
                     Sort Method: quicksort  Memory: 26kB
                     Worker 0:  Sort Method: quicksort  Memory: 25kB
                     Worker 1:  Sort Method: quicksort  Memory: 25kB
                     Worker 2:  Sort Method: quicksort  Memory: 25kB
                     Worker 3:  Sort Method: quicksort  Memory: 25kB
                     Buffers: shared hit=138
                     Worker 0: actual time=0.099..0.099 rows=0 loops=1
                       Buffers: shared hit=29
                     Worker 1: actual time=0.116..0.116 rows=0 loops=1
                       Buffers: shared hit=29
                     Worker 2: actual time=0.090..0.090 rows=0 loops=1
                       Buffers: shared hit=29
                     Worker 3: actual time=0.097..0.097 rows=0 loops=1
                       Buffers: shared hit=29
                       --并行扫描
                     ->  Parallel Seq Scan on public.t_taxi  (cost=0.00..22.10 rows=10 width=10) (actual time=0.005..0.005 rows=8 loops=5)
                           Output: trip_id, km
                           Buffers: shared hit=22
                           Worker 0: actual time=0.001..0.001 rows=0 loops=1
                           Worker 1: actual time=0.001..0.001 rows=0 loops=1
                           Worker 2: actual time=0.001..0.001 rows=0 loops=1
                           Worker 3: actual time=0.001..0.001 rows=0 loops=1
 Planning Time: 0.048 ms
 Execution Time: 7.716 ms
(47 rows)

Time: 8.372 ms
test=#

显然不是,并行真正起作用的是聚合函数里的PARALLEL = safe,

二,重新思考自定义聚合函数taxi

梳理了并行聚合的流程和例子后,结合非并行聚合章节给出的例子taxi聚合函数,我们能添加COMBINEFUNC把它实现成并行的聚合函数吗?笔者思考之后认为可以,不过函数得改造一下实现细节。

--非并行时的迭代函数
CREATE OR REPLACE FUNCTION taxi_accum (numeric, numeric, numeric)
RETURNS numeric AS
$$
BEGIN
    RAISE NOTICE 'prev:[%] curr:(%) outer:(%) return:(%)', $1, $2, $3, $1 + $2 * $3;
    RETURN $1 + $2 * $3;
END;
$$
LANGUAGE 'plpgsql' strict parallel safe;

比如执行的SELECT语句是:

SELECT trip_id, taxi(km, 2.20), 3.50 + sum(km)2.2 AS manual FROM t_taxi GROUP BY trip_id;

那么taxi_accum函数对于同组的每一行数据都调用一次,参数二和三就是聚合函数taxi2传进来的两个参数

  • 参数一:上次迭代计算的结果(初始值就是INITCOND=3.5);
  • 参数二:当前行数据(聚合函数传入的是表的km列,则表示当前行数据);
  • 参数三:执行时传进去的数据(根据select语句来决定,数值是2.2,则每次传入2.2);

由于每次进行部分聚合时,参数1(即$1)都会有3.5这个初始值,这肯定不行(结合部分聚合函数COMBINEFUNC会偏差越来越大),应该为0才符合预期,不过好在我们又最终计算函数FINALFUNC,所以可以把3.5这个起步价放到FINALFUNC函数中实现。

部分聚合函数只能有两个参数且是迭代过程中的部分聚合值,则可以进行相加动作。如此具体并行改造实现如下(函数的名字重命名):

drop table t_taxi;
CREATE TABLE t_taxi(trip_id int, km numeric);
--t_taxi表的背景:
--模拟保存了路程数据,记录了司机的两单数据,第一单ID=1记录了三段路程,需要合并计算。
--价格计算规则是:起步价3.5,且每公里2.2。
declare
    rr numeric(5,1);
begin
    for k in 1..4 loop
        for i in 1..100 loop
            SELECT random()*10 into rr ;
            insert into t_taxi values (k, rr);
        end loop;
    end loop;    
end;

test=# select count(*) from t_taxi;
test-# /
 count
-------
  40000
(1 row)


CREATE OR REPLACE FUNCTION taxi_accum2 (numeric, numeric, numeric)
RETURNS numeric AS
$$
BEGIN
    --RAISE NOTICE 'prev:[%] curr:(%) outer:(%) return:(%)', $1, $2, $3, $1 + $2 * $3;
    RETURN $1 + $2 * $3;
END;
$$
LANGUAGE 'plpgsql' strict parallel safe;

CREATE OR REPLACE FUNCTION taxi_combin2 (numeric, numeric)
RETURNS numeric AS
$$
BEGIN
    --RAISE NOTICE 'combin: pre(%), cur (%), return (%)', $1, $2, $1 + $2;
    RETURN $1 + $2;
END;
$$
LANGUAGE 'plpgsql' strict parallel safe;

CREATE OR REPLACE FUNCTION taxi_final2 (numeric)
RETURNS numeric AS
$$
BEGIN
    --RAISE NOTICE 'final:(%) return:(%)', $1, round($1 + 5, -1);
    RETURN round($1 +3.5 + 5, -1);   --向上取整时,
END;
$$
LANGUAGE 'plpgsql' strict parallel safe;


CREATE AGGREGATE taxi2(numeric, numeric)
(
        SFUNC = taxi_accum2,
        STYPE = numeric,
        COMBINEFUNC = taxi_combin2,   ---并行聚合需要写该函数,可选
        PARALLEL = SAFE,   ---并行聚合,可选
        FINALFUNC = taxi_final2,
        INITCOND = 0    ---初始值不再是3.5,当道最终函数里来实现
);

--强制并行

set max_parallel_workers=4;   
set max_parallel_workers_per_gather =4;   
set parallel_setup_cost =0;   
set parallel_tuple_cost =0;   
alter table t_taxi set (parallel_workers =4);  

运行结果对比:

原非并行聚合结果:
test=# SELECT  trip_id, 3.5+taxi2(km, 2.20), 3.50 + sum(km)*2.2 AS manual FROM t_taxi GROUP BY trip_id;
test-# /
 trip_id | taxi | manual
---------+------+---------
       1 | 1020 | 1010.88
       3 | 1190 | 1183.14
       4 | 1110 | 1106.36
       2 | 1130 | 1122.42
(4 rows)

并行聚合运行结果:
test=# SELECT  trip_id, taxi2(km, 2.20) as taxi2, 3.50 + sum(km)*2.2 AS manual FROM t_taxi GROUP BY trip_id;
test-# /
 trip_id | taxi2 | manual
---------+-------+---------
       1 |  1020 | 1010.88
       2 |  1130 | 1122.42
       3 |  1190 | 1183.14
       4 |  1110 | 1106.36
(4 rows)

---增加大量数据
declare
    rr numeric(5,1);
begin
    for k in 1..4 loop
        for i in 1..100000 loop
            SELECT random()*10 into rr ;
            insert into t_taxi values (k, rr);
        end loop;
    end loop;    
end;

---测试并行和非并行的运行时间

test=# SELECT  trip_id, taxi(km, 2.20), 3.50 + sum(km)*2.2 AS manual FROM t_taxi GROUP BY trip_id;
test-# /
 trip_id |  taxi   |   manual
---------+---------+------------
       1 | 1211030 | 1211026.06
       3 | 1211530 | 1211520.18
       4 | 1211230 | 1211224.06
       2 | 1212780 | 1212775.28
(4 rows)

Time: 597.179 ms
test=# SELECT  trip_id, taxi2(km, 2.20) as taxi2, 3.50 + sum(km)*2.2 AS manual FROM t_taxi GROUP BY trip_id;
test-# /
 trip_id |  taxi2  |   manual
---------+---------+------------
       1 | 1211030 | 1211026.06
       2 | 1212780 | 1212775.28
       3 | 1211530 | 1211520.18
       4 | 1211230 | 1211224.06
(4 rows)

Time: 200.219 ms

可以看出改造成功,且当前这个数据规模来看,并行要快一倍左右(多次运行也基本一样)

三,小结自定义聚合函数

基于使用plpgsql编写聚合函数,小结一般用法如下:

1,非并行聚合函数

--一般定义的结构如下
CREATE AGGREGATE YOUR_AGGREGATE_NAME(numeric, numeric)    ---接受两个参数,会作为SFUNC函数的后两个参数
(
        INITCOND = xxxx,    ---INITCOND是第一次调用YOUR_SFUNC_NAME函数,给第一个参数的传值xxxx,可选项。
        STYPE = numeric,     ---聚合函数返回的数据类型numeric(仅举例示范)
        SFUNC = YOUR_SFUNC_NAME,    ---每组的自定义迭代函数
        FINALFUNC = YOUR_FINALFUNC_NAME   ---每组的最终函数
);

按照上面这个聚合函数的定义,一个简单的非并行聚合函数,运行流程基本流程就是:

A,使用迭代SFUNC函数对每组数据进行迭代计算

B,每组数据计算完成后,使用FINALFUNC最终计算函数将每组的最后一次的SFUNC结果再自定义计算一次,获得最终结果

2,并行聚合函数

--一般定义的结构如下
CREATE AGGREGATE YOUR_AGGREGATE_NAME(numeric, numeric)    ---接受两个参数,会作为SFUNC函数的后两个参数
(
        INITCOND = xxxx,    ---INITCOND是第一次调用YOUR_SFUNC_NAME函数,给第一个参数的传值xxxx,
        STYPE = numeric,     ---聚合函数返回的数据类型numeric(仅举例示范)
        SFUNC = YOUR_SFUNC_NAME,    ---每组的自定义迭代函数
        FINALFUNC = YOUR_FINALFUNC_NAME,   ---每组的最终计算函数,对于并行来说非必选
        COMBINEFUNC = YOUR_COMBINEFUNC_NAME,   ---并行聚合必须写该函数
        PARALLEL = SAFE   ---并行聚合必须有该参数
);

运行流程如下:

A,自定义聚合函数把每组数据分成了多个小部分,每个小部分调用SFUNC函数进行迭代计算

B,每个小部分SFUNC函数最终迭代计算得到的和值将传递给COMBINEFUNC函数,再次进行计算

C,最后每组调用FINALFUNC函数获取每组的最终结果。

其他:

1)COMBINEFUNC函数则必须被声明为有两个 state_data_type 参数,并且返回一个state_data_type 值的函数。

2)COMBINEFUNC函数和PARALLEL参数是并行聚合必须