请教如何在 concurrent.futures 子进程里,无冲突地使用整个程序的全局队列?

2020-10-23 01:40:43 +08:00
 qazwsxkevin
from concurrent import futures
from multiprocessing import Manager


# 此函数无限循环,消费掉 sqltextqueen
def procSQL(sqltextqueen):
    while True
        while not sqltextqueen:
            inserttext = sqltextqueen.get()
            mysqldb =xxxx
            mysqldb.insert(inserttext)
            #略
        time.sleep(3)


if __name__ == '__main__':

	MainSQLProcess = futures.ProcessPoolExecutor(max_workers=1)
  
       # 全局 SQL 队列
        manager = Manager()
	SQLQueen = manager.queue
	MainSQLProcessRet = MainSQLProcess.sumit(procSQL,SQLQueen)
	

################ another.py ##################

	TaskProcess = {}
	TaskProcessRet = {}
	
	# 提交任务
	TaskinfoA = {
	    'TYPE': 'CPS'
	    'countt': countt,
	    'Rget': stRget,
	    'DLoption': DLoption,
	    'DTO': mtinfo.get('DTO'),
	    'errFlag': errFlag
	    # 字典内容引用的一些值,有些是从函数外几层的函数传过来的,距离 main()已经好 N 层了
	}
	
	
	TaskProcess['A'] = futures.ProcessPoolExecutor(max_workers=1)
	TaskProcess['B'] = futures.ProcessPoolExecutor(max_workers=1)
	# 交给进程池
	TaskRet['A'] = TaskProcess['A'].submit(ProcessSuit,TaskinfoA)
	TaskRet['B'] = TaskProcess['B'].submit(ProcessSuit,TaskinfoB)

ProcessSuit 函数里,产生的一些 SQL 语句,希望能及时送到全局队列里去消费,而不是通过 concurrent.futures 的回调 函数一层一层地往回 main 送到才处理...

我尝试了把 SQLQueen 引用在

	TaskinfoA = {
	    'TYPE': 'CPS'
	    'countt': countt,
	    'Rget': stRget,
	    'DLoption': DLoption,
	    'DTO': mtinfo.get('DTO'),
	    'errFlag': errFlag,
	    'SQLQueen' : SQLQueen    # <-----
	}

是不行的,貌似是有个 concurrent.futures pick 锁什么的 请教如何可以让各个子进程,都能不冲突地,实时送到全局的队列里,各自进程都可以 put,get,但是又不冲突?

408 次点击
所在节点    Python
1 条回复
qazwsxkevin
2020-10-25 00:25:56 +08:00
最后重新改写了大部分函数
再用了 Manager().Queue() 作为队列
事件算是解决了。。。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/717655

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX