Python 多线程+队列的问题

2017-02-03 16:24:26 +08:00
 simple221
# -*- coding: UTF-8 -*-
import re
import sys
import threading
import traceback
import random
import time
import subprocess
import shlex
try:
    import Queue            # Python 2
except ImportError:
    import queue as Queue   # Python 3
class NoResultsPending(Exception):
    pass

class NoWorkersAvailable(Exception):
    pass
def _handle_thread_exception(request, exc_info):
    traceback.print_exception(*exc_info)
def makeRequests(callable_, args_list, callback=None,
        exc_callback=_handle_thread_exception):
    requests = []
    for item in args_list:
        if isinstance(item, tuple):
            requests.append(
                WorkRequest(callable_, item[0], item[1], callback=callback,
                    exc_callback=exc_callback)
            )
        else:
            requests.append(
                WorkRequest(callable_, [item], None, callback=callback,
                    exc_callback=exc_callback)
            )
    return requests
class WorkerThread(threading.Thread):
    def __init__(self, requests_queue, results_queue, poll_timeout=5, **kwds):
        threading.Thread.__init__(self, **kwds)
        self.setDaemon(1)
        self._requests_queue = requests_queue
        self._results_queue = results_queue
        self._poll_timeout = poll_timeout
        self._dismissed = threading.Event()
        self.start()
    def run(self):
        while True:
            if self._dismissed.isSet():
                # we are dismissed, break out of loop
                break
            try:
                request = self._requests_queue.get(True, self._poll_timeout)
            except Queue.Empty:
                continue
            except Exception as e:
                pass
            else:
                if self._dismissed.isSet():
                    self._requests_queue.put(request)
                    break
                try:
                    result = request.callable(*request.args, **request.kwds)
                    self._results_queue.put((request, result))
                except:
                    request.exception = True
                    self._results_queue.put((request, sys.exc_info()))
    def dismiss(self):
        self._dismissed.set()

class WorkRequest:
    def __init__(self, callable_, args=None, kwds=None, requestID=None,
            callback=None, exc_callback=_handle_thread_exception):
        if requestID is None:
            self.requestID = id(self)
        else:
            try:
                self.requestID = hash(requestID)
            except TypeError:
                raise TypeError("requestID must be hashable.")
        self.exception = False
        self.callback = callback
        self.exc_callback = exc_callback
        self.callable = callable_
        self.args = args or []
        self.kwds = kwds or {}

    def __str__(self):
        return "<WorkRequest funname=%s id=%s args=%s kwargs=%s exception=%s>" % \
            (self.callable,self.requestID, self.args, self.kwds, self.exception)

class ThreadPool:
    """A thread pool, distributing work requests and collecting results.

    See the module docstring for more information.

    """

    def __init__(self, num_workers, q_size=0, resq_size=0, poll_timeout=5):
        self._requests_queue = Queue.Queue(q_size)
        self._results_queue = Queue.Queue(resq_size)
        self.workers = []
        self.dismissedWorkers = []
        self.workRequests = {}
        self.createWorkers(num_workers, poll_timeout)

    def createWorkers(self, num_workers, poll_timeout=5):
        for i in range(num_workers):
            self.workers.append(WorkerThread(self._requests_queue,
                self._results_queue, poll_timeout=poll_timeout))

    def dismissWorkers(self, num_workers, do_join=False):
        """Tell num_workers worker threads to quit after their current task."""
        dismiss_list = []
        for i in range(min(num_workers, len(self.workers))):
            worker = self.workers.pop()
            worker.dismiss()
            dismiss_list.append(worker)

        if do_join:
            for worker in dismiss_list:
                worker.join()
        else:
            self.dismissedWorkers.extend(dismiss_list)

    def joinAllDismissedWorkers(self):
        """Perform Thread.join() on all worker threads that have been dismissed.
        """
        for worker in self.dismissedWorkers:
            worker.join()
        self.dismissedWorkers = []

    def putRequest(self, request, block=True, timeout=None):
        """Put work request into work queue and save its id for later."""
        assert isinstance(request, WorkRequest)
        # don't reuse old work requests
        assert not getattr(request, 'exception', None)
        self._requests_queue.put(request, block, timeout)
        self.workRequests[request.requestID] = request

    def poll(self, block=False):
        """Process any new results in the queue."""
        while True:
            # still results pending?
            if not self.workRequests:
                raise NoResultsPending
            # are there still workers to process remaining requests?
            elif block and not self.workers:
                raise NoWorkersAvailable
            try:
                # get back next results
                request, result = self._results_queue.get(block=block)
                # has an exception occured?
                if request.exception and request.exc_callback:
                    request.exc_callback(request, result)
                # hand results to callback, if any
                if request.callback and not \
                       (request.exception and request.exc_callback):
                    request.callback(request, result)
                del self.workRequests[request.requestID]
            except Queue.Empty:
                break

    def wait(self):
        """Wait for results, blocking until all have arrived."""
        while 1:
            try:
                self.poll(True)
            except NoResultsPending:
                break
def func1(domain):
    cmd = cmd = 'tracert  %s ' % domain
    try:
        proc = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE)
        outroute1, err = proc.communicate()
    except Exception as e:
        pass
    return outroute1
def do_something(data):
    time.sleep(data*5)
    result = round(random.random() * data, 5)
    return result
global requests
global getrnum
def callfunction(mytaskpool,funname,data):
    global SimuRunCount
    requests=makeRequests(funname,data,print_result,handle_exception)
    for req in requests:
        global iIndex
        global iwhile
        iwhile=iwhile+1
        iIndex = iIndex + 1
        mytaskpool.putRequest(req)
        funargsstr = re.findall(".*args=(.*) kwargs=.*", str(req))
        funnamestr = re.findall(".*<function (.*) at .*", str(funname))
        args=funargsstr[0]
        funNamet=funnamestr[0]
        print("Work request #funName %s #params %s #id %s added." % (funNamet,args, req.requestID))
    if iIndex==SimuRunCount:
        while True:
            try:
                time.sleep(0.5)
                mytaskpool.poll()
                if iwhile == SimuRunCount:
                    mytaskpool.createWorkers(SimuRunCount)
                if iwhile== 20:
                    mytaskpool.dismissWorkers(2)
                iwhile += 1
            except KeyboardInterrupt:
                print("**** Interrupted!")
                break
            except NoResultsPending:
                break
        iIndex = 0
        iwhile = 0

        # mytaskpool.wait()
            #print finshedarrylist
def getreuslt(getnum):
    getresult=[]
    if getnum>0:
        if getnum>len(finshedarrylist):
            getnum=len(finshedarrylist)
            #print getnum
        for c in range(0,getnum):
            getresult.append(finshedarrylist[c])
        for ritem in getresult:
            finshedarrylist.remove(ritem)
    return getresult


def print_result(request, result):
    requestlist=str(request)
    funnamestr = re.findall(".*<WorkRequest funname=<function (.*)at.*", requestlist)
    funargstr = re.findall(".* args=(.*) kwargs=.*", requestlist)
    funName = str(funnamestr[0]).replace(' ', '')
    argslist=str(funargstr[0]).replace(',', '').replace('(', '').replace(')', '')
    finshExecArray = {}
    finshExecArray['request'] = request.requestID
    finshExecArray["params"] = argslist
    finshExecArray["funname"]=funName
    finshExecArray['result'] = result
    print("**** Result from request #%s:%s:%s:%s" % (request.requestID,funName,argslist, result))
    finshedarrylist.append(finshExecArray)
# this will be called when an exception occurs within a thread
# this example exception handler does little more than the default handler
def handle_exception(request, exc_info):
    if not isinstance(exc_info, tuple):
        # Something is seriously wrong...
        print(request)
        print(exc_info)
        raise SystemExit
    print("**** Exception occured in request #%s: %s" % \
          (request.requestID, exc_info))
SimuRunCount = 2
finshedarrylist=[]
if __name__ == '__main__':
    iIndex = 0
    iwhile = 0
    requests = None
    FinishArray=[]
    mytaskpool = ThreadPool(SimuRunCount)
    data1 = [((3,), {}), ((5,), {})]
    callfunction(mytaskpool, do_something, data1)
    dataurl = [(('www.lessnet.cn',), {})]
    callfunction(mytaskpool, func1,dataurl)
    data = [((6,), {})]
    callfunction(mytaskpool, do_something, data)
    data = [((7,), {})]
    callfunction(mytaskpool, do_something, data)
    print finshedarrylist
    getreusltlist = getreuslt(SimuRunCount)
    print getreusltlist
    print finshedarrylist
    if mytaskpool.dismissedWorkers:
        mytaskpool.joinAllDismissedWorkers()
    print("Joining all dismissed worker threads...")

目前只能等待设定的线程数完成后才执行下一轮,如何更改表示我完成 5 个线程,继续添加等待的任务放入执行队列中。

1747 次点击
所在节点    Python
3 条回复
wwqgtxx
2017-02-03 19:12:17 +08:00
直接用 concurrent.futures 类库不就行了,还有你这个排版…
simple221
2017-02-16 17:53:15 +08:00
:-D 我直接复制这段段代码过来的,然后就成这样了。
Livid
2017-03-04 16:01:55 +08:00
@simple221 帮你编辑了一下, V2EX 是支持 Markdown 代码高亮的。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/337888

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX