V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
qazwsxkevin
V2EX  ›  WebSocket

请教, websockets 模块起服务, websockets.serve 的方法问题。

  •  
  •   qazwsxkevin · 188 天前 · 252 次点击
    这是一个创建于 188 天前的主题,其中的信息可能已经有所发展或是发生改变。
    import datetime
    import random
    import string
    import asyncio
    import time
    
    import websockets
    
    from multiprocessing import Manager
    from concurrent import futures
    
    # 忽略警告
    import warnings
    warnings.filterwarnings("ignore")
    
    strLen = 30
    
    def putmsg(que, gvar):
        cc = 0
        while True:
            cc += 1
    
            if gvar['flag'] == True:
                break
    
            ranStr = ''
            for s in range(strLen):
                ranStr = ranStr + random.choice(string.ascii_letters + string.digits)
    
            # slTime = random.uniform(0.01,0.2)
    
            logStr = str(cc) + ' ' + "{:.2f}".format(slTime) + ' ' + str(datetime.datetime.now().replace(microsecond=0)) + ' ' + ranStr
            # print(logStr)
    		
            # test
            # print(cc, '#', que.qsize())
    
            que.put(logStr)
    
            # time.sleep(slTime)
            time.sleep(1.5)
    
    def wsock(queu, gvar):
        loop = asyncio.get_event_loop()
        async def stoploop():
            loop.stop()
    
        # Maintain a list of connected clients
        connected_clients = set()
    
        async def register(websocket):
            # Add a new client to the list
            connected_clients.add(websocket)
            print('connected_clients.add(websocket)')
    
        async def unregister(websocket):
            # Remove a client from the list
            connected_clients.remove(websocket)
            print('connected_clients.remove(websocket)')
    
        async def broadcast(message):
            # Send a message to all connected clients
            if connected_clients:
                await asyncio.gather(*(client.send(message) for client in connected_clients))
    
        async def echo(websocket, que=queu):
            await register(websocket)
    
            while True:
                if queu.qsize():
                    msgStr = queu.get()
                    if connected_clients:
                        try:
                            await asyncio.gather(*(client.send(msgStr) for client in connected_clients))
                        except Exception as e:
                            print(e) # echo:received 1001 (going away); then sent 1001 (going away)
                            break
                else:
                    asyncio.sleep(0.3)
    
    
            async for message in websocket:
                # Broadcast the received message to all clients
                if message == 'stop':
                    gvar['flag'] = True
                    await stoploop()
                await broadcast(message)
    
            await unregister(websocket)
    
    
        start_server = websockets.serve(echo, "172.17.0.2", 25299)
    
        asyncio.set_event_loop(loop)
        # loop.create_task(start_server)
        asyncio.get_event_loop().run_until_complete(start_server)
        asyncio.get_event_loop().run_forever()
    
    if __name__ == '__main__':
        # 队列
        msgQue = Manager().Queue()
        # 全局变量
        glovar = Manager().dict()
    
        # 启停开关
        glovar['flag'] = False
    
        # 处理进程
        proc = futures.ProcessPoolExecutor(max_workers=2)
    
        wsockRet = proc.submit(wsock, msgQue, glovar)
        putmsgRet = proc.submit(putmsg, msgQue, glovar)
    

    问题是 websocket.serve 使用 echo 方法作为 handle ,
    只有在 websocket 接口有事件的时候,才会调用 echo 进行处理,(被动式)

    echo 的被动方法,队列里的日志越来越多,
    想有一个永久循环,如果有 client(s),send 取出的队列内容,没有 client ,取出就 pass 了,
    websocket.serve 被动调用不适合这个场合,看官方也没有更好的提示,
    请教大家这里怎么换个方式实现呢?

    2 条回复    2023-10-24 16:48:19 +08:00
    julyclyde
        1
    julyclyde  
       187 天前
    看了一下这个库的说明,感觉好奇怪
    为什么把“处理”和“IO”合在一个库里面啊
    qazwsxkevin
        2
    qazwsxkevin  
    OP
       187 天前
    @julyclyde #1 是啊,官网上异步和线程的范例逻辑思想,是做了很高的包装,要用在其它场景,很难适合,打算过几天有时间再去仔细地看看 python websocket 官网的 API renference ,看看能不能粒度化用在我的场景上。。。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   951 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 28ms · UTC 20:46 · PVG 04:46 · LAX 13:46 · JFK 16:46
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.