tornado 拿到 gen.return 异步返回的结果后,没有在 yield 的地方恢复继续执行

2017-08-29 13:04:31 +08:00
 mactec

请教大伙一个问题,我调试了半天,发现 gen.return 返回异步结果后,程序不是接着 yield 的地方执行,而是又跳到 ioloop,tornado 初学,文档较少,希望大家能帮忙指出错误在哪,感激 部分程序如下


@gen.coroutine
def _fetch_and_extract(self, task):
    self.processing_task_number += 1
    self.processing_task_set.add(task)
    if self.processing_task_number != self.processing_task_set.size():ioloop.IOLoop.instance().stop()
    self.logger.info("%s: start to fetch and extract" % self.worker_name)
    if check_task(task):
        try:
            spider = load_object(task["spider"])
        except Exception, e:
            handle_fail_task(task,"load %s object failed" % (task["spider"]),self.process_fail_task_queue, self.wait_for_process_task_queue)
            self.logger.error("%s: load object failed.path:%s, exception:%s" % (self.worker_name, task["spider"], e))
        else:
            spider_object = spider(self.processed_url_set, self.wait_for_process_task_queue)
            fetch_start_time = datetime.datetime.now()
            resp = yield spider_object.fetch(task["request"])
            self.logger.debug("got resp already ”)#已打印
	    #这里已经拿到异步返回的 resp 了,但是代码没有恢复接着往下走,而是直接回到 loop 去取 task 了,但是这时候 task 里空了,所以之后就一直提示任务空
	    #相当于只抓了美团 api 的 city 列表,后面 extract、再添加 task 都没有执行,我 debug 时候比较奇怪这点,按道理异步返回 resp 了应该接着之前代码的位置继续执行 不是么
            if resp == None or resp.error != None:
                handle_fail_task(task, "fetch request: %s failed,code:%d error:%s" % (task["request"], resp.code, resp.error), self.process_fail_task_queue, self.wait_for_process_task_queue)
                self.logger.error("%s: fetch request: %s failed, error: %s" % (self.worker_name, task["request"], resp))
            else:
                content_length = resp.headers['Content-Length'] if resp.headers.has_key('Content-Length') else None
                last_modified = resp.headers['Last-Modified'] if resp.headers.has_key("Last-Modified") else None
                fetch_time = datetime.datetime.now() - fetch_start_time
                extract_start_time = datetime.datetime.now()
                status = yield spider_object.extract(resp, **dict(task["kwargs"])
2545 次点击
所在节点    Python
3 条回复
mactec
2017-08-29 13:08:04 +08:00
mark 下,如果有答案了再来更新
bluesky139
2017-08-30 08:18:03 +08:00
肉眼表示没看出什么问题,你在那句打印出来 log 的地方以下每隔一行打 log 出来看看呢,或者能否精简个可独立运行的简陋版本出来
mactec
2017-09-02 17:13:01 +08:00
@bluesky139
'''
class Worker(object):
def __init__(self,str):
self._name = str
self._installed = False
def install(self):
if self._installed:
print "%s: no need to start again.worker has been installed!" % self._name
else:
self._installed = True
ioloop.IOLoop.instance().add_callback(self.test)
print "%s: worker is installed" % self._name
@gen.coroutine
def fetch(self,url):
req = HTTPRequest(url,connect_timeout=3,request_timeout=5)
client = httpclient.AsyncHTTPClient()
resp = yield gen.Task(client.fetch, req)
raise gen.Return(resp)

@gen.coroutine
def exact(self,resp):
yield gen.sleep(5)
raise gen.Return(10)

@gen.coroutine
def test(self):
if self._installed:
ioloop.IOLoop.instance().add_timeout(datetime.timedelta(microseconds=5000),self.test)
str = self._name
task_url = u'http://api.caipiao.163.com/missNumber_trend.html?gameEn=kuai3'
resp = yield self.fetch(task_url)
print "%s get resp already at %s" %(str,datetime.datetime.now())
staus = yield self.exact(resp)
print "callback!!!%s status returned %d at %s" %(str,staus,datetime.datetime.now())


if __name__ == '__main__':
arrs = ['aaa','bbb']
for arr in arrs:
worker = Worker(arr)
worker.install()
ioloop.IOLoop.instance().start()

'''

谢谢,还在找原因
请问 ioloop.IOLoop.instance().add_timeout(datetime.timedelta(microseconds=5000),self.test)
这个能让 test 方法定期执行么,测试结果这个间隔没有用

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/386587

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX