求解协程+多进程的正确使用姿势

2018-08-06 17:00:59 +08:00
 lieh222

用 asyncio 做了一个 UDP 传输性能测试工具,目前单进程服务端性能不够,流量大的时候处理不过来,服务端用的 asyncio.DatagramProtocol,怎么变成多进程的呢?试试了一下抢占式的写法,运行报错了,运行起来也只有一个进程工作,上代码


import asyncio
import time
import socket
from multiprocessing import Process

loop = asyncio.get_event_loop()
size = 0

class ServerProtocol(asyncio.DatagramProtocol):

    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr, args=None):
        global size
        data = data.decode()
        message = data
        index = data.find('\n')
        if index > 0:
            filename = data[0:index]
            data = data[index+1::]
            size += len(data)
        task = self.WriteToFile(filename, data)
        asyncio.run_coroutine_threadsafe(task, loop)

    async def WriteToFile(self, f, data):
        await asyncio.sleep(1)
        return True

async def print_size():
    global size
    while True:
        await asyncio.sleep(1)
        print(size)

def start(sock):
    listen = loop.create_datagram_endpoint(
        ServerProtocol,
        sock = sock
    )
    transport, protocol = loop.run_until_complete(listen)

    task = print_size()
    asyncio.run_coroutine_threadsafe(task, loop)
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    transport.close()
    loop.close()
    sock.close()

if __name__ == '__main__':
    print("Starting UDP server")
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.bind(('0.0.0.0', 9873))
    for i in range(1):
        t = Process(target=start, args=(sock,))
        t.deamon = True
        t.start()
    start(sock)

报错信息

Exception in callback BaseSelectorEventLoop._add_reader(6, <bound method..., bufsize=0>>>)
handle: <Handle BaseSelectorEventLoop._add_reader(6, <bound method..., bufsize=0>>>)>
Traceback (most recent call last):
  File "/usr/local/python36/lib/python3.6/asyncio/selector_events.py", line 264, in _add_reader
    key = self._selector.get_key(fd)
  File "/usr/local/python36/lib/python3.6/selectors.py", line 191, in get_key
    raise KeyError("{!r} is not registered".format(fileobj)) from None
KeyError: '6 is not registered'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/python36/lib/python3.6/asyncio/events.py", line 145, in _run
    self._callback(*self._args)
  File "/usr/local/python36/lib/python3.6/asyncio/selector_events.py", line 267, in _add_reader
    (handle, None))
  File "/usr/local/python36/lib/python3.6/selectors.py", line 412, in register
    self._epoll.register(key.fd, epoll_events)
FileExistsError: [Errno 17] File exists
3469 次点击
所在节点    Python
3 条回复
lieh222
2018-08-07 08:39:57 +08:00
这。。。咋没人呢
lolizeppelin
2018-08-07 09:00:04 +08:00
把 fork 搞明白

一多进城就 multiprocessing
有没有想过要看 multiprocessing 的源码?
lieh222
2018-08-07 10:50:13 +08:00
@lolizeppelin 感谢回复,已经解决,因为多进程中共用了一个事件循环,add_reader 重复注册了 socket.fileno,所以报错了,用 new_event_loop 解决,与 fork、mutiprocessing 的用法和是否看了源码没有关系,另外我认为像 fork,mutiprocessing 这种系统接口函数和系统接口的高级封装直接用就是了,除非遇到相关必须要解决的问题和抱着学习的目的,没有必要看这种源码

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/477331

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX