V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
V2EX 提问指南
GuoGuang
V2EX  ›  问与答

FlinkSql 求助,有偿 100!

  •  
  •   GuoGuang · 2023-03-29 17:07:27 +08:00 · 786 次点击
    这是一个创建于 409 天前的主题,其中的信息可能已经有所发展或是发生改变。
    帮朋友请教一下社区大佬们,详情请加 vx:bGl3czE5OTU=



    TABLE_QTY 是个库存表,库存表是 7 个字段作为的联合主键,PRIMARY KEY ( goodsid ,goodsdetailid ,sid,bid,lid,goodsstatusid )
    库存表中 status 在业务高峰期会频繁变动
    TABLE_B INNER JOIN 库存表,bid 与 库存表中联合主键的 bid 一一对应
    TABLE_L INNER JOIN 库存表,lid 与 库存表中联合主键的 lid 一一对应
    TABLE_S INNER JOIN 库存表,sid 与 库存表中联合主键的 sid 一一对应



    -- 目前遇到几种情况
    -------------------------------------------情况一----------------------------------------------------------
    -----------------------------------------------------------------------------------------------------------
    情况一、先根据 TABLE_QTY 筛选 where status 条件后,根据五个主键做 group by ,取库存 sum ( qty )
    拿到结果集关联 INNER JOIN TABLE_B 、INNER JOIN TABLE_L 、INNER JOIN TABLE_S 。

    -- 简写 flinkSql:

    INSERT INTO SINK values(***)
    select qty.goodsqty from
    ( select sum(goodsqty) as goodsqty from TABLE_QTY where status =0
    group by goodsid,sid,bid,lid,goodsstatusid
    ) qty
    INNER JOIN TABLE_B b on qty.bid = b.bid
    INNER JOIN TABLE_L l on qty.lid = b.lid
    INNER JOIN TABLE_S s on qty.sid = b.sid

    现象: 初始化后 sink 表的 goodsqty 库存数量与 source 表的 TABLE_QTY 数据能一致。
    当 stream 增量数据过来后,会出现 source 表的 TABLE_QTY 的行记录已经删除了,sink 表的行记录没有删除。
    source 表的 TABLE_QTY 的行记录修改了,sink 表的行记录未修改
    -----------------------------------------------------------------------------------------------------------
    -----------------------------------------------------------------------------------------------------------





    -------------------------------------------情况二----------------------------------------------------------
    -----------------------------------------------------------------------------------------------------------
    情况二、先根据 TABLE_QTY 筛选 where 条件后,
    拿到结果集关联 INNER JOIN TABLE_B 、INNER JOIN TABLE_L 、INNER JOIN TABLE_S 。
    最后再 根据五个主键做 group by ,取库存 sum ( qty )
    -- 简写 flinkSql:
    INSERT INTO SINK values(***)

    select sum(goodsqty) from TABLE_QTY
    ( select * from TABLE_QTY TABLE_QTY where status =0
    ) qty
    INNER JOIN TABLE_B b on qty.bid = b.bid
    INNER JOIN TABLE_L l on qty.lid = b.lid
    INNER JOIN TABLE_S s on qty.sid = b.sid
    group by goodsid,sid,bid,lid,goodsstatusid

    现象: 初始化后 sink 表的 goodsqty 库存数量与 source 表的 TABLE_QTY 数据能一致。
    当 stream 增量数据过来后,会出现 source 表的 TABLE_QTY 的行记录已经删除了,sink 表的行记录没有删除。
    source 表的 TABLE_QTY 的行记录修改了,sink 表的行记录未修改
    -----------------------------------------------------------------------------------------------------------
    -----------------------------------------------------------------------------------------------------------





    -------------------------------------------情况三----------------------------------------------------------
    -----------------------------------------------------------------------------------------------------------
    情况三、先根据 TABLE_QTY 筛选 where status 条件,
    拿到结果集关联 INNER JOIN TABLE_B 、INNER JOIN TABLE_L 、INNER JOIN TABLE_S 。
    不再做 group by
    -- 简写 flinkSql:
    INSERT INTO SINK values(***)

    select * from TABLE_QTY qty where status =0
    INNER JOIN TABLE_B b on qty.bid = b.bid
    INNER JOIN TABLE_L l on qty.lid = b.lid
    INNER JOIN TABLE_S s on qty.sid = b.sid


    现象: 初始化后 sink 表的 goodsqty 库存数量与 source 表的 TABLE_QTY 数据能一致。
    业务高峰期,stream 增量也出现了不一致的情况,感觉是 status 状态的频繁变更导致的。
    -----------------------------------------------------------------------------------------------------------
    -----------------------------------------------------------------------------------------------------------



    -------------------------------------------情况四----------------------------------------------------------
    -----------------------------------------------------------------------------------------------------------
    情况四、先根据 TABLE_QTY 不筛选 where status 条件,
    拿到结果集关联 INNER JOIN TABLE_B 、INNER JOIN TABLE_L 、INNER JOIN TABLE_S 。
    不再做 group by
    -- 简写 flinkSql:
    INSERT INTO SINK values(***)
    select * from TABLE_QTY qty
    INNER JOIN TABLE_B b on qty.bid = b.bid
    INNER JOIN TABLE_L l on qty.lid = b.lid
    INNER JOIN TABLE_S s on qty.sid = b.sid


    现象: 初始化后 sink 表的 goodsqty 库存数量与 source 表的 TABLE_QTY 数据能一致。
    业务高峰期,stream 增量数据也可以一致。
    -----------------------------------------------------------------------------------------------------------
    -----------------------------------------------------------------------------------------------------------

    为什么添加 group by 后数据不准?,任务并行度设置的是 20 。为何剔除了 group by 添加了 where 条件了也不准?
    flinksql 也设置 SET 'table.exec.state.ttl' = '0 ms';
    目前尚无回复
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2748 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 25ms · UTC 03:58 · PVG 11:58 · LAX 20:58 · JFK 23:58
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.