FlinkSql 求助,有偿 100!

2023-03-29 17:07:27 +08:00
 GuoGuang
帮朋友请教一下社区大佬们,详情请加 vx:bGl3czE5OTU=



TABLE_QTY 是个库存表,库存表是 7 个字段作为的联合主键,PRIMARY KEY ( goodsid ,goodsdetailid ,sid,bid,lid,goodsstatusid )
库存表中 status 在业务高峰期会频繁变动
TABLE_B INNER JOIN 库存表,bid 与 库存表中联合主键的 bid 一一对应
TABLE_L INNER JOIN 库存表,lid 与 库存表中联合主键的 lid 一一对应
TABLE_S INNER JOIN 库存表,sid 与 库存表中联合主键的 sid 一一对应



-- 目前遇到几种情况
-------------------------------------------情况一----------------------------------------------------------
-----------------------------------------------------------------------------------------------------------
情况一、先根据 TABLE_QTY 筛选 where status 条件后,根据五个主键做 group by ,取库存 sum ( qty )
拿到结果集关联 INNER JOIN TABLE_B 、INNER JOIN TABLE_L 、INNER JOIN TABLE_S 。

-- 简写 flinkSql:

INSERT INTO SINK values(***)
select qty.goodsqty from
( select sum(goodsqty) as goodsqty from TABLE_QTY where status =0
group by goodsid,sid,bid,lid,goodsstatusid
) qty
INNER JOIN TABLE_B b on qty.bid = b.bid
INNER JOIN TABLE_L l on qty.lid = b.lid
INNER JOIN TABLE_S s on qty.sid = b.sid

现象: 初始化后 sink 表的 goodsqty 库存数量与 source 表的 TABLE_QTY 数据能一致。
当 stream 增量数据过来后,会出现 source 表的 TABLE_QTY 的行记录已经删除了,sink 表的行记录没有删除。
source 表的 TABLE_QTY 的行记录修改了,sink 表的行记录未修改
-----------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------





-------------------------------------------情况二----------------------------------------------------------
-----------------------------------------------------------------------------------------------------------
情况二、先根据 TABLE_QTY 筛选 where 条件后,
拿到结果集关联 INNER JOIN TABLE_B 、INNER JOIN TABLE_L 、INNER JOIN TABLE_S 。
最后再 根据五个主键做 group by ,取库存 sum ( qty )
-- 简写 flinkSql:
INSERT INTO SINK values(***)

select sum(goodsqty) from TABLE_QTY
( select * from TABLE_QTY TABLE_QTY where status =0
) qty
INNER JOIN TABLE_B b on qty.bid = b.bid
INNER JOIN TABLE_L l on qty.lid = b.lid
INNER JOIN TABLE_S s on qty.sid = b.sid
group by goodsid,sid,bid,lid,goodsstatusid

现象: 初始化后 sink 表的 goodsqty 库存数量与 source 表的 TABLE_QTY 数据能一致。
当 stream 增量数据过来后,会出现 source 表的 TABLE_QTY 的行记录已经删除了,sink 表的行记录没有删除。
source 表的 TABLE_QTY 的行记录修改了,sink 表的行记录未修改
-----------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------





-------------------------------------------情况三----------------------------------------------------------
-----------------------------------------------------------------------------------------------------------
情况三、先根据 TABLE_QTY 筛选 where status 条件,
拿到结果集关联 INNER JOIN TABLE_B 、INNER JOIN TABLE_L 、INNER JOIN TABLE_S 。
不再做 group by
-- 简写 flinkSql:
INSERT INTO SINK values(***)

select * from TABLE_QTY qty where status =0
INNER JOIN TABLE_B b on qty.bid = b.bid
INNER JOIN TABLE_L l on qty.lid = b.lid
INNER JOIN TABLE_S s on qty.sid = b.sid


现象: 初始化后 sink 表的 goodsqty 库存数量与 source 表的 TABLE_QTY 数据能一致。
业务高峰期,stream 增量也出现了不一致的情况,感觉是 status 状态的频繁变更导致的。
-----------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------



-------------------------------------------情况四----------------------------------------------------------
-----------------------------------------------------------------------------------------------------------
情况四、先根据 TABLE_QTY 不筛选 where status 条件,
拿到结果集关联 INNER JOIN TABLE_B 、INNER JOIN TABLE_L 、INNER JOIN TABLE_S 。
不再做 group by
-- 简写 flinkSql:
INSERT INTO SINK values(***)
select * from TABLE_QTY qty
INNER JOIN TABLE_B b on qty.bid = b.bid
INNER JOIN TABLE_L l on qty.lid = b.lid
INNER JOIN TABLE_S s on qty.sid = b.sid


现象: 初始化后 sink 表的 goodsqty 库存数量与 source 表的 TABLE_QTY 数据能一致。
业务高峰期,stream 增量数据也可以一致。
-----------------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------------

为什么添加 group by 后数据不准?,任务并行度设置的是 20 。为何剔除了 group by 添加了 where 条件了也不准?
flinksql 也设置 SET 'table.exec.state.ttl' = '0 ms';
788 次点击
所在节点    问与答
0 条回复

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/928207

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX