请教一个 flink ontimer 触发的问题

2021-03-23 20:04:20 +08:00
 90928yao

为什么一注册定时器,下面的定时器立马触发,有点费解

我的理解 不是基于我时间时间的时间戳+我自己设置的时间 到了这个点才触发吗

ps:代码复制下 直接可以跑

public class TwoStreamsJoin {

   

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        KeyedStream<OrderEvent, String> orderStream = env
                .fromElements(
                        new OrderEvent("order_1", "pay", 1616497937786L))
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderEvent>(Time.minutes(10)) {
                    @Override
                    public long extractTimestamp(OrderEvent element) {
                        return element.eventTime;
                    }
                })
                .keyBy(r -> r.orderId);

        KeyedStream<PayEvent, String> payStream = env
                .fromElements(
                        new PayEvent("order_2", "weixin", 1616497937786L)
                )
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<PayEvent>(Time.minutes(5)) {
                    @Override
                    public long extractTimestamp(PayEvent element) {
                        return element.eventTime;
                    }
                })
                .keyBy(r -> r.orderId);

        SingleOutputStreamOperator<String> result = orderStream
                .connect(payStream)
                .process(new CoProcessFunction<OrderEvent, PayEvent, String>() {
                    private ValueState<OrderEvent> orderState;
                    private ValueState<PayEvent> payState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        orderState = getRuntimeContext().getState(
                                new ValueStateDescriptor<OrderEvent>("order", OrderEvent.class)
                        );
                        payState = getRuntimeContext().getState(
                                new ValueStateDescriptor<PayEvent>("pay", PayEvent.class)
                        );
                    }

                    @Override
                    public void processElement1(OrderEvent orderEvent, Context context, Collector<String> collector) throws Exception {
                        PayEvent pay = payState.value();
                        if (pay != null) {
                            payState.clear();
                            collector.collect("order id " + orderEvent.orderId + " matched success");
                        } else {
                            orderState.update(orderEvent);
                            context.timerService().registerEventTimeTimer(orderEvent.eventTime + 500000L);
                        }
                    }

                    @Override
                    public void processElement2(PayEvent payEvent, Context context, Collector<String> collector) throws Exception {
                        OrderEvent order = orderState.value();
                        if (order != null) {
                            orderState.clear();
                            collector.collect("order id" + payEvent.orderId + " matched success");
                        } else {
                            payState.update(payEvent);
                            context.timerService().registerEventTimeTimer(payEvent.eventTime + 500000L);
                        }
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        System.out.println("@@@@@"+ ctx.timerService().currentWatermark());
                        if (orderState.value() != null) {
                           System.out.println("触发了 timer");
                        }
                        if (payState.value() != null) {
                            System.out.println("触发了 timer");
                        }
                    }
                });

        result.print();
        

        env.execute();
    }


    public static class OrderEvent {
        public String orderId;
        public String eventType;
        public Long eventTime;
        public Long timestamp;

        public OrderEvent(String orderId, String eventType, Long eventTime) {
            this.orderId = orderId;
            this.eventType = eventType;
            this.eventTime = eventTime;
            this.timestamp = eventTime;
        }

        public OrderEvent() { }
    }


    public static class PayEvent {
        public String orderId;
        public String eventType;
        public Long eventTime;
        public Long timestamp;
        public PayEvent(String orderId, String eventType, Long eventTime) {
            this.orderId = orderId;
            this.eventType = eventType;
            this.eventTime = eventTime;
            this.timestamp = eventTime;

        }

        public PayEvent() {
        }

    }
}


750 次点击
所在节点    程序员
0 条回复

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/764398

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX