V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
whirly
V2EX  ›  程序员

V2ex 大佬云集,只能跑来这问下一个 Flink 的小问题?

  •  
  •   whirly · 2019-09-19 17:19:30 +08:00 · 1349 次点击
    这是一个创建于 1679 天前的主题,其中的信息可能已经有所发展或是发生改变。

    求助:Flink 1.9 sql 两个表 Join 后如何做 CEP ?

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
            StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
    
           // 构造订单数据
            DataStream<Order> ordersData = env.fromCollection(Arrays.asList(
                    new Order("001", "iphone", new Timestamp(1545800002000L)),
                    new Order("002", "mac", new Timestamp(1545800003000L)),
                    new Order("003", "book", new Timestamp(1545800004000L)),
                    new Order("004", "cup", new Timestamp(1545800018000L))
            ))
                    .assignTimestampsAndWatermarks(new OrderTimestampExtractor());
    
            // 构造付款表
            DataStream<Payment> paymentData = env.fromCollection(Arrays.asList(
                    new Payment("001", "alipay", new Timestamp(1545803501000L)),
                    new Payment("002", "card", new Timestamp(1545803602000L)),
                    new Payment("003", "card", new Timestamp(1545803610000L)),
                    new Payment("004", "alipay", new Timestamp(1545803611000L))
            ))
                    .assignTimestampsAndWatermarks(new PaymentTimestampExtractor());
    
            tEnv.registerDataStream("t_order", ordersData, "orderId, productName, orderTime");
            tEnv.registerDataStream("t_payment", paymentData, "orderId, payType, payTime");
    
            // 两表 JOIN
            String sqlQuery = "SELECT o.orderId as orderId, o.productName as productName, \n" +
                    "p.payType as payType, o.orderTime as orderTime, \n" +
                    "cast(payTime as timestamp) as payTime\n" +
                    "FROM t_order AS o \n" +
                    "JOIN t_payment AS p \n" +
                    "ON o.orderId = p.orderId AND\n" +
                    "\tp.payTime BETWEEN orderTime AND orderTime + INTERVAL '1' HOUR";
            Table queryResult = tEnv.sqlQuery(sqlQuery);
            tEnv.registerTable("TemporalJoinResult", queryResult);
    
    
            String cepSQL = "select *, MATCH_ROWTIME() as rowtime from TemporalJoinResult\n" +
                    "\tMATCH_RECOGNIZE (\n" +
                    "\tORDER BY rowtime\n" +
                    "      MEASURES\n" +
                    "        A.orderId AS orderId,\n" +
                    "        A.productName AS productName,\n" +
                    "        A.orderTime AS orderTime,\n" +
                    "\t\tB.payTime AS payTime\n" +
                    "\tONE ROW PER MATCH \n" +
                    "\tAFTER MATCH SKIP PAST LAST ROW\n" +
                    "    PATTERN (A B)\n" +
                    "    DEFINE\n" +
                    "        A AS payType = 'alipay',\n" +
                    "        B AS productName = 'iphone'\n" +
                    "\t) as T";
    
            Table cepResult = tEnv.sqlQuery(cepSQL);
            tEnv.toAppendStream(cepResult, Row.class).print();
    
            env.execute();
    

    MATCH_RECOGNIZE 里边 ORDER BY rowtime 不清楚怎样指定?求大佬帮忙

    目前尚无回复
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   5098 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 30ms · UTC 03:51 · PVG 11:51 · LAX 20:51 · JFK 23:51
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.