这是一个创建于 409 天前的主题,其中的信息可能已经有所发展或是发生改变。
帮朋友请教一下社区大佬们,详情请加 vx:bGl3czE5OTU=
TABLE_QTY 是个库存表,库存表是 7 个字段作为的联合主键,PRIMARY KEY ( goodsid ,goodsdetailid ,sid,bid,lid,goodsstatusid )
库存表中 status 在业务高峰期会频繁变动
TABLE_B INNER JOIN 库存表,bid 与 库存表中联合主键的 bid 一一对应
TABLE_L INNER JOIN 库存表,lid 与 库存表中联合主键的 lid 一一对应
TABLE_S INNER JOIN 库存表,sid 与 库存表中联合主键的 sid 一一对应
-- 目前遇到几种情况
-------------------------------------------情况一----------------------------------------------------------
-----------------------------------------------------------------------------------------------------------
情况一、先根据 TABLE_QTY 筛选 where status 条件后,根据五个主键做 group by ,取库存 sum ( qty )
拿到结果集关联 INNER JOIN TABLE_B 、INNER JOIN TABLE_L 、INNER JOIN TABLE_S 。
-- 简写 flinkSql:
INSERT INTO SINK values(***)
select qty.goodsqty from
( select sum(goodsqty) as goodsqty from TABLE_QTY where status =0
group by goodsid,sid,bid,lid,goodsstatusid
) qty
INNER JOIN TABLE_B b on qty.bid = b.bid
INNER JOIN TABLE_L l on qty.lid = b.lid
INNER JOIN TABLE_S s on qty.sid = b.sid
现象: 初始化后 sink 表的 goodsqty 库存数量与 source 表的 TABLE_QTY 数据能一致。
当 stream 增量数据过来后,会出现 source 表的 TABLE_QTY 的行记录已经删除了,sink 表的行记录没有删除。
source 表的 TABLE_QTY 的行记录修改了,sink 表的行记录未修改
-----------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------
-------------------------------------------情况二----------------------------------------------------------
-----------------------------------------------------------------------------------------------------------
情况二、先根据 TABLE_QTY 筛选 where 条件后,
拿到结果集关联 INNER JOIN TABLE_B 、INNER JOIN TABLE_L 、INNER JOIN TABLE_S 。
最后再 根据五个主键做 group by ,取库存 sum ( qty )
-- 简写 flinkSql:
INSERT INTO SINK values(***)
select sum(goodsqty) from TABLE_QTY
( select * from TABLE_QTY TABLE_QTY where status =0
) qty
INNER JOIN TABLE_B b on qty.bid = b.bid
INNER JOIN TABLE_L l on qty.lid = b.lid
INNER JOIN TABLE_S s on qty.sid = b.sid
group by goodsid,sid,bid,lid,goodsstatusid
现象: 初始化后 sink 表的 goodsqty 库存数量与 source 表的 TABLE_QTY 数据能一致。
当 stream 增量数据过来后,会出现 source 表的 TABLE_QTY 的行记录已经删除了,sink 表的行记录没有删除。
source 表的 TABLE_QTY 的行记录修改了,sink 表的行记录未修改
-----------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------
-------------------------------------------情况三----------------------------------------------------------
-----------------------------------------------------------------------------------------------------------
情况三、先根据 TABLE_QTY 筛选 where status 条件,
拿到结果集关联 INNER JOIN TABLE_B 、INNER JOIN TABLE_L 、INNER JOIN TABLE_S 。
不再做 group by
-- 简写 flinkSql:
INSERT INTO SINK values(***)
select * from TABLE_QTY qty where status =0
INNER JOIN TABLE_B b on qty.bid = b.bid
INNER JOIN TABLE_L l on qty.lid = b.lid
INNER JOIN TABLE_S s on qty.sid = b.sid
现象: 初始化后 sink 表的 goodsqty 库存数量与 source 表的 TABLE_QTY 数据能一致。
业务高峰期,stream 增量也出现了不一致的情况,感觉是 status 状态的频繁变更导致的。
-----------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------
-------------------------------------------情况四----------------------------------------------------------
-----------------------------------------------------------------------------------------------------------
情况四、先根据 TABLE_QTY 不筛选 where status 条件,
拿到结果集关联 INNER JOIN TABLE_B 、INNER JOIN TABLE_L 、INNER JOIN TABLE_S 。
不再做 group by
-- 简写 flinkSql:
INSERT INTO SINK values(***)
select * from TABLE_QTY qty
INNER JOIN TABLE_B b on qty.bid = b.bid
INNER JOIN TABLE_L l on qty.lid = b.lid
INNER JOIN TABLE_S s on qty.sid = b.sid
现象: 初始化后 sink 表的 goodsqty 库存数量与 source 表的 TABLE_QTY 数据能一致。
业务高峰期,stream 增量数据也可以一致。
-----------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------
为什么添加 group by 后数据不准?,任务并行度设置的是 20 。为何剔除了 group by 添加了 where 条件了也不准?
flinksql 也设置 SET 'table.exec.state.ttl' = '0 ms';