请问在 Python 的事件系统中,如何可以通过事件通知立刻终结一个正在运行的子线程?

2021-02-22 10:43:16 +08:00
 seventhbible

大家好,最近在完善手头上一个基于事件系统的 GUI 。

现在遇到一个问题,就是当我在执行一个按钮点击事件的时候,实际会开一个子线程进行业务逻辑的处理,这个处理过程可能会比较长,并且中间可能会出现一些不符合预期的情况发生,当发生这种情况时,我希望会借由发送一个错误的事件通知 EventManager,然后调用 listener 的方法来立刻终结这个对应的错误线程(因为之前已经发生了错误了,后续逻辑代码继续执行没有意义了)

代码如下,大佬们直接复制运行即可观察。如能解答,万分感谢!

from queue import Queue, Empty
from threading import *
from tkinter import *
import time
from tkinter import ttk

EVENT_TYPE_1 = "Count"
EVENT_TYPE_2 = "Error"
MAX_NUMBER = 10
CUR_NUMBER = 0


class event_manager:
    def __init__(self):
        self._eventQueue = Queue()
        self._thread = Thread(target=self.Run, daemon=True)
        self._handlers = {}
        self._active = False

    def Start(self):
        self._active = True
        self._thread.start()

    def Run(self):
        while self._active is True:
            try:
                event = self._eventQueue.get(block=True, timeout=1)
                self.Process(event)
            except Empty:
                pass

    def Process(self, event):
        if event.type in self._handlers:
            for handler in self._handlers[event.type]:
                handler()
        else:
            pass

    def Stop(self):
        self._active = False
        self._thread.join()

    def addEventListenter(self, type_, handler):
        try:
            handlerList = self._handlers[type_]

        except KeyError:
            handlerList = []
            self._handlers[type_] = handlerList

        if handler not in handlerList:
            handlerList.append(handler)

    def removeEventListenter(self, type_, handler):
        try:
            handlerList = self._handlers[type_]
            if handler in handlerList:
                handlerList.remove(handler)
            if not handlerList:
                del self._handlers[type_]
        except KeyError:
            pass

    def sendEvent(self, event):
        self._eventQueue.put(event)


class Event:
    def __init__(self, event_event_name, cur_done_task, type_=None):
        self.type = type_
        self._event_name = event_event_name
        self._curDoneTask = cur_done_task


class EventSource:
    def __init__(self, event_name, event_mgr, max_number, type):
        self._event_name = event_name
        self._event_manager = event_mgr
        self._type = type
        self._max_number = max_number

    def count(self):
        global CUR_NUMBER
        for i in range(self._max_number):
            CUR_NUMBER = i + 1

            if CUR_NUMBER == 4:  # 在业务逻辑线程中增加检测环节,如果发生错误就会发送错误事件,希望可以立刻终结当前的线程,不执行后续的代码
                print("************ detect error occurred , this thread should be terminated immediately !")
                errorEvent = Event("error", CUR_NUMBER, type_=EVENT_TYPE_2)
                self._event_manager.sendEvent(errorEvent)

            print(
                "************ main thread start:now start process {} - count : {}".format(self._event_name, CUR_NUMBER))
            event = Event("test", CUR_NUMBER, type_=self._type)
            self._event_manager.sendEvent(event)
            time.sleep(1)


class GUIListener(Tk):
    def __init__(self):
        super(GUIListener, self).__init__()

        self.title("Progress GUI")
        self.geometry("1200x805+600+100")
        self.config(bg="#535353")
        self.resizable(True, True)
        self.taskThread = None

        self.progressBar = ttk.Progressbar(master=self, orient=HORIZONTAL, maximum=MAX_NUMBER, length=300)
        self.progressBar.pack()
        self.button = ttk.Button(self, text="Run", command=lambda: self.button_function(MAX_NUMBER))
        self.button.pack()

    def update_progress_value(self):
        print("************Sub thread start: detect progress bar value is now...{}".format(self.progressBar['value']))
        self.progressBar['value'] = CUR_NUMBER
        self.progressBar.update_idletasks()
        print("************Sub thread start: update progress bar value to...{}".format(CUR_NUMBER))

    def button_function(self, max_number):
        # 在正式开始执行逻辑子线程之前,确实可以提前做一些判断,来决定是否满足条件,开始接下来的逻辑子线程,但是这个不在本次讨论范围内
        es = EventSource("eventSource", eventMgr, max_number, EVENT_TYPE_1)
        self.taskThread = Thread(target=es.count, daemon=True).start()  # 这里开始执行实际的业务逻辑子线程

    def terminate_error_thread(self):  # 这个方法在 GUIListener 接受到事件源发出的错误逻辑时被唤起,用来立刻终结正在执行事件源的线程,不做后续无用的代码逻辑处理
        pass
        # TODO: but how to implement this method ?


if __name__ == '__main__':
    gui = GUIListener()

    eventMgr = event_manager()
    eventMgr.addEventListenter(EVENT_TYPE_1, gui.update_progress_value)
    eventMgr.addEventListenter(EVENT_TYPE_2, gui.terminate_error_thread)

    eventMgr.Start()

    gui.mainloop()

顺便一提,希望得到的结果是这样的

************ main thread start:now start process eventSource - count : 1
************Sub thread start: detect progress bar value is now...0.0
************Sub thread start: update progress bar value to...1
************ main thread start:now start process eventSource - count : 2
************Sub thread start: detect progress bar value is now...1
************Sub thread start: update progress bar value to...2
************ main thread start:now start process eventSource - count : 3
************Sub thread start: detect progress bar value is now...2
************Sub thread start: update progress bar value to...3
************ detect error occurred , this thread should be terminated immediately !

到这里就应该自然停止。

1902 次点击
所在节点    Python
14 条回复
ch2
2021-02-22 11:49:59 +08:00
seventhbible
2021-02-22 13:37:21 +08:00
emmmmm,看了一下楼上大佬的链接,大意是我需要重新定义一个继承了线程的类,然后重写这个类下的 stop 方法(用一个布尔值的开关来控制 thread 的 run 方法)。
在我的代码示例中,就是每次发送出错误事件的时候,通知修改这个布尔值开关 [设为全局变量] 变为 False 然后自然使得接下来的线程自动跳出 run 方法?
不好意思,我接触 python 时间还不够长久,有些地方理解力还不是很强,如果有说错请指正。
todd7zhang
2021-02-22 14:07:42 +08:00
子线程处理逻辑,中间可能会有异常,然后子线程发事件给 manager,然后让 manager 来结束这个子线程?
如果是这样的话,为啥不是子线程中间出错了,自己退出不就行了?
seventhbible
2021-02-22 15:23:36 +08:00
@todd7zhang 理想情况下我是希望可以借助发送 event 来终结当前的子线程,因为 event 可以带出来错误的各种信息。对后续处理会很有帮助。
no1xsyzy
2021-02-22 15:28:08 +08:00
你可以直接 eventMgr.Stop() 来停止
不要在 GUIListener 里写 event_manager 的停止逻辑

- eventMgr.addEventListenter(EVENT_TYPE_2, gui.terminate_error_thread)
+ eventMgr.addEventListenter(EVENT_TYPE_2, eventMgr.Stop)

但会造成内存泄漏,请用 weakref 替换这一 call
seventhbible
2021-02-22 15:34:56 +08:00
@no1xsyzy 抱歉小弟我才疏学浅,这里的 weakref 是如何替换?
imn1
2021-02-22 15:51:43 +08:00
代码就不阅读了,没空去研究逻辑

简单的做法,就是主线程立一个 flag,子线程读取这个 flag,变更就跳出循环(每次循环判断),跳出后重置 flag,结束子线程,要确保其他控件的事件可以更改这个 flag,GUI 做这个不太难

粗略看,你的代码是在主线程 loop ?应该放到子线程,这样主线程才能接收其他事件
no1xsyzy
2021-02-22 15:59:05 +08:00
@imn1 他这不是主线程 loop,主线程在 gui

@seventhbible 你这里 eventMgr 在 globals 里,不好调,也不想帮你大修,你随便地看一下官方的 weakref 实现吧(不是指 C 实现,而是有了 _weakref.ref 之后如何实现其他的工具)。
当然,因为它在 globals 里面,估计不修也没事儿。我说的内存泄漏是 eventMgr 循环引用自身。

any( hanlder.__self__ is eventMgr for handler in eventMgr._handlers[EVENT_TYPE_2] )
seventhbible
2021-02-22 16:45:32 +08:00
感谢大佬们的回复,可能是一下子知识出现断层了,我先补一下其他知识。如果不懂再问。。。
ec0
2021-02-22 17:01:56 +08:00
子线程自己退出,退出前发送 event

比如在 count 函数中

if CUR_NUMBER == 4:
(缩进)errorEvent = Event("error", CUR_NUMBER, type_=EVENT_TYPE_2)
(缩进)self._event_manager.sendEvent(errorEvent)
(缩进)return

也就是说 event 只是传递消息,线程的终结交给子线程自己
seventhbible
2021-02-22 17:44:17 +08:00
@ec0 对!这也是一种方法,感谢大佬回复。但是如果我需要将这个封装成一个传参的通用方法 check_error 的话,从结构上来说它应该属于哪里呢?
seventhbible
2021-02-22 17:51:54 +08:00
@ec0 而且这样的话线程并不会自己结束,会无限循环这个 for 循环,从 1 到 3
seventhbible
2021-02-23 10:47:37 +08:00
@ec0 确实这个方法可以,我刚刚说错了。。。但是请问有没有一种统一的方法由错误事件唤起一个通用的方法来退出特定的子线程?因为可能我处理不同子线程的逻辑业务都会很多,每个逻辑业务的判断错误条件五花八门,如果可以的话,我希望只要子线程出现异常,就统一发送错误信息,交由事件管理器唤起一个统一的方法来退出这个子线程。
seventhbible
2021-02-25 15:24:09 +08:00
最后报告一下,是我的思考方式错误了。应该使用 try except 来判断异常错误,最后让线程自动退出关闭的。主动杀掉正在运行的线程这种操作并不合理。。。。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/754999

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX