V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
90928yao
V2EX  ›  程序员

请教一个 flink ontimer 触发的问题

  •  
  •   90928yao · 2021-03-23 20:04:20 +08:00 · 742 次点击
    这是一个创建于 1145 天前的主题,其中的信息可能已经有所发展或是发生改变。

    为什么一注册定时器,下面的定时器立马触发,有点费解

    我的理解 不是基于我时间时间的时间戳+我自己设置的时间 到了这个点才触发吗

    ps:代码复制下 直接可以跑

    public class TwoStreamsJoin {
    
       
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.setParallelism(1);
    
            KeyedStream<OrderEvent, String> orderStream = env
                    .fromElements(
                            new OrderEvent("order_1", "pay", 1616497937786L))
                    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<OrderEvent>(Time.minutes(10)) {
                        @Override
                        public long extractTimestamp(OrderEvent element) {
                            return element.eventTime;
                        }
                    })
                    .keyBy(r -> r.orderId);
    
            KeyedStream<PayEvent, String> payStream = env
                    .fromElements(
                            new PayEvent("order_2", "weixin", 1616497937786L)
                    )
                    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<PayEvent>(Time.minutes(5)) {
                        @Override
                        public long extractTimestamp(PayEvent element) {
                            return element.eventTime;
                        }
                    })
                    .keyBy(r -> r.orderId);
    
            SingleOutputStreamOperator<String> result = orderStream
                    .connect(payStream)
                    .process(new CoProcessFunction<OrderEvent, PayEvent, String>() {
                        private ValueState<OrderEvent> orderState;
                        private ValueState<PayEvent> payState;
    
                        @Override
                        public void open(Configuration parameters) throws Exception {
                            super.open(parameters);
                            orderState = getRuntimeContext().getState(
                                    new ValueStateDescriptor<OrderEvent>("order", OrderEvent.class)
                            );
                            payState = getRuntimeContext().getState(
                                    new ValueStateDescriptor<PayEvent>("pay", PayEvent.class)
                            );
                        }
    
                        @Override
                        public void processElement1(OrderEvent orderEvent, Context context, Collector<String> collector) throws Exception {
                            PayEvent pay = payState.value();
                            if (pay != null) {
                                payState.clear();
                                collector.collect("order id " + orderEvent.orderId + " matched success");
                            } else {
                                orderState.update(orderEvent);
                                context.timerService().registerEventTimeTimer(orderEvent.eventTime + 500000L);
                            }
                        }
    
                        @Override
                        public void processElement2(PayEvent payEvent, Context context, Collector<String> collector) throws Exception {
                            OrderEvent order = orderState.value();
                            if (order != null) {
                                orderState.clear();
                                collector.collect("order id" + payEvent.orderId + " matched success");
                            } else {
                                payState.update(payEvent);
                                context.timerService().registerEventTimeTimer(payEvent.eventTime + 500000L);
                            }
                        }
    
                        @Override
                        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                            System.out.println("@@@@@"+ ctx.timerService().currentWatermark());
                            if (orderState.value() != null) {
                               System.out.println("触发了 timer");
                            }
                            if (payState.value() != null) {
                                System.out.println("触发了 timer");
                            }
                        }
                    });
    
            result.print();
            
    
            env.execute();
        }
    
    
        public static class OrderEvent {
            public String orderId;
            public String eventType;
            public Long eventTime;
            public Long timestamp;
    
            public OrderEvent(String orderId, String eventType, Long eventTime) {
                this.orderId = orderId;
                this.eventType = eventType;
                this.eventTime = eventTime;
                this.timestamp = eventTime;
            }
    
            public OrderEvent() { }
        }
    
    
        public static class PayEvent {
            public String orderId;
            public String eventType;
            public Long eventTime;
            public Long timestamp;
            public PayEvent(String orderId, String eventType, Long eventTime) {
                this.orderId = orderId;
                this.eventType = eventType;
                this.eventTime = eventTime;
                this.timestamp = eventTime;
    
            }
    
            public PayEvent() {
            }
    
        }
    }
    
    
    
    目前尚无回复
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2786 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 11:32 · PVG 19:32 · LAX 04:32 · JFK 07:32
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.