Python asyncio 下载爬虫没法停止的一个问题

2019-05-09 11:08:43 +08:00
 zckun

一个礼拜前粗略的学习了下 asyncio + aiohttp 实现异步爬虫,三天前为了联手写了一个 ins 下载爬虫。 爬虫思路草图:用文字描述把: 首先我参考了 aiohttp 官方的爬虫例子,官方爬虫例子在这:crawl.py,我的思路是这样的

1.获取 user id ; user id 是必须的,所以我把这个写成了一个方法在创建类的时候直接调用

2.获取页的数据;因为没发说是第几页,这个请求需要三个参数,一个是 user id,一个是一次获取到的数量,第三个是 end_cursor 用来获取下一页

3.解析数据;获取到的数据是 json 格式的,我需要获取两个东西,第一个是图片链接,第二个是 end_cursor,用来获取下一页

4.处理 url ;这个方法遍历 urls 调用 download 方法下载

5.下载;用到了 aiohttp 和 aiofiles,没有异常、下载完后我用 asyncio.Task(self.get_display_urls(end_cursor))回到了获取页数据的方法,以此循环,当然获取页数据的方法有判读 end_cursor 是否为空,直接 loop.stop()

为了实现抓取所有图片,我没有使用 run_until_complete,因为它只获取了一次就停了,我就是用的 run_forever

全部代码如下:

import aiohttp
import asyncio
import aiofiles
import aioredis
import re
import json
import os
import signal
import time
import logging


class Instagram(object):

    def __init__(self, username, loop, depth=0, maxtasks=200):
        """
        :param username: 用户名
        :param loop:
        :param depth: 下载页面数量
        :param maxtasks: 最大并发限制
        """
        self.down_tasks = set()
        self.down_todo = set()
        self.down_busy = set()
        self.down_done = {}

        self.loop = loop
        self.sem = asyncio.Semaphore(maxtasks, loop=loop)

        self.username = username
        self.max_page = depth if depth >= 1 else -1

        self.ROOT_URL = 'https://www.instagram.com/'
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.131 Safari/537.36',
            'Cookie': 'rur=ATN; mid=XM2K8QAEAAGy8fiEf1b2T05Pssas; fbm_124024574287414=base_domain=.instagram.com; fbsr_124024574287414=ns7o0TqnERhbPihnN390KYuDdI7xVM2vgUunMZT4URY.eyJjb2RlIjoiQVFESlVpaVhaSFNwWnBTZ2VGUE1nUGlfUXlsdElpRG9vOHJDdHB3Qm14Q25rNUx6YnJsNHdBX1JRVnowaDREU3J4ZzFGTWVHWHdlWFlhVGxuVi0yMk84ZXdlUVBNWTg5bVF6MFg5RG40b3psSEozTGk4WW40N1lPeFQzdE0yQUNJWkg5SWh1VmhpRHBoaXZ4ZXNMM3dhc2hMcHdQQ2RkSDZWR2FQMlR1QVM4V3U1SElGTERWaEpfYzl3akstem94TFl3QWRESE9wSjNwcDlhTjVhcXFBWGlWM0lfNTducGZ0cmpCWlFLd2xUZzlYZjBEbUlFdmR5RTBsMng3OEY0RkJ6Q1NtNWEzQ2RISTRYckVqNXB6LWVrYjRyNHRza05HOUhHUmZSaXAwS0hya1VqQ3l4T3YwNDBEU2txOHI4MGJvZG9GU3o4THFHelpSckZ4dldVMjNUWGhkZ2d6MTEzbHNfVnN5T1V5X01EUHZlSHVtUkQ5bXJ1V01ObGUxOFBuV2hvIiwidXNlcl9pZCI6IjEwMDAyNDA3NTU3MTE2NyIsImFsZ29yaXRobSI6IkhNQUMtU0hBMjU2IiwiaXNzdWVkX2F0IjoxNTU3MDQ0NTE0fQ; csrftoken=2JzdvnHL9iMuxbV7KiJcASk8RlKuYWAQ; shbid=2545; shbts=1557044558.2494695; ds_user_id=5561946202; sessionid=5561946202%3AwE5Vb00lI1bmIb%3A23; urlgen="{"2001:19f0:7001:1e1d:5400:1ff:fef7:67fd": 20473}:1hND0O:dQodCbp0SM_24vfenOyhBT-Curk"'
        }
        self.proxy = "http://localhost:8001"
        t = asyncio.ensure_future(self.init(), loop=loop)
        loop.run_until_complete(t)

    async def run(self):
        """
        :return:
        """
        await self.init()
        t = asyncio.ensure_future(self.addurls(), loop=self.loop)
        while self.down_busy:
            await asyncio.sleep(1, loop=self.loop)
        await t
        self.loop.close()

    async def init(self):
        """
        初始化必要参数:user id
        :return:
        """
        print('[init] 初始化参数...')
        shared_data = await self.get_shared_data()
        if not shared_data:
            print('!!!!!!!')
            exit(0)
        self.user_id = re.findall('"logging_page_id":.?"profilePage_(.*?)"', shared_data)[0]

    async def _http_request(self, url, **kwargs):
        """
        http 请求
        :param url: 请求链接
        :param kwargs: 链接参数
        :return: 网页 response
        """
        params = dict()
        if kwargs:
            for k, v in kwargs.items():
                params.update(v)
        async with self.sem:
            async with aiohttp.ClientSession() as session:
                try:
                    async with session.get(url, timeout=10, proxy=self.proxy, headers=self.headers,
                                                params=params) as response:
                        html = (await response.read()).decode('utf-8', 'replace')
                        return html
                except Exception as exc:
                    logging.warning("[_http_request] 异常: {}".format(exc))

    async def get_shared_data(self):
        """
        获取 shared data
        :return:
        """
        html = await self._http_request(self.ROOT_URL + self.username)
        if html:
            shared_data = html.split("window._sharedData = ")[1].split(";</script>")[0]
            return shared_data

    def get_ends_cursor(self, html):
        """

        :param html:
        :return:
        """
        if html:
            edge_media = json.loads(html)['data']['user']['edge_owner_to_timeline_media']
            edges = edge_media['edges']
            if edges:
                end_cursor = edge_media['page_info']['end_cursor']
                has_next_page = edge_media['page_info']['has_next_page']
                if has_next_page:
                    return end_cursor
                return ''

    async def get_display_url(self, max=50, end_cursor=""):
        """
        解析 display url
        :param max: 单次获取图片总量
        :param end_cursor: end_cursor 是获取下一页的参数
        :return: 包含{max}数量的图片链接列表
        """
        pic_params = {
            'query_hash': 'f2405b236d85e8296cf30347c9f08c2a',
            'variables': '{{"id":"{0}","first":{1},"after":"{2}"}}'.format(self.user_id, max, end_cursor),
        }
        pic_url = self.ROOT_URL + 'graphql/query/'
        html = await self._http_request(pic_url, parms=pic_params)
        if html:
            edge_media = json.loads(html)['data']['user']['edge_owner_to_timeline_media']
            edges = edge_media['edges']
            if edges:
                display_urls = []
                for edge in edges:
                    display_urls.append(edge['node']['display_url'])
                return display_urls, self.get_ends_cursor(html)

    async def download(self, url):
        """
        下载到本地
        :param url:
        :return:
        """
        print('processing:', url)
        # try:
            # async with self.sem: //如果使用 Semaphore 会卡住。。。虽然不会报错
        self.down_todo.remove(url)
        self.down_busy.add(url)
        path = './instagram/' + self.username
        if not os.path.exists(path):
            os.makedirs(path)

        filename = path + '/' + url.split('?')[0].split('/')[-1]
        print('start download:', url)
        async with aiohttp.ClientSession() as session:
            try:
                async with session.get(url, headers=self.headers, proxy=self.proxy) as resp:
                    if resp.status == 200:
                        f = await aiofiles.open(filename, 'wb')
                        await f.write(await resp.read())
                        await f.close()
                        await asyncio.Task(self.addurls(self.end_cursor))
                    resp.close()
                    self.down_done[url] = True
            except Exception as exc:
                logging.error('[download]下载异常,异常:{}\n 链接:{}'.format(repr(exc), url))
                self.down_done[url] = False

        self.down_busy.remove(url)
        print(len(self.down_done), 'completed tasks,', len(self.down_tasks),
              'still pending, todo', len(self.down_todo))
        # 这个判断根本没有任何用,不会调用,直接卡住
        if self.end_cursor is False:
            print('下载完 la')
            self.loop.close()

        # except Exception as exc:
        #     logging.error('[download]下载异常,异常:{}\n 链接:{}'.format(repr(exc), url))

    async def add_down_urls(self, urls):
        print('[add_down_urls] 开始下载,数量:', len(urls))
        async with asyncio.Semaphore()
        for url in urls:
            self.down_todo.add(url)
            await self.sem.acquire()
            task = asyncio.ensure_future(self.download(url), loop=self.loop)
            task.add_done_callback(lambda t: self.sem.release())
            task.add_done_callback(self.down_tasks.remove)
            self.down_tasks.add(task)

    async def addurls(self, end_cursor=""):
        """
        :param end_cursor: 当前页面的标示 base64 加密,用于加载下一页,如果没有下一页改参数为 Fasle
        :return:
        """
        print("\n\n 开始获取下一页,end_cursor:", end_cursor)

        display_urls, self.end_cursor = await self.get_display_url(end_cursor=end_cursor)
        await self.add_down_urls(display_urls)
        if not self.end_cursor:
            return


'''
流程:
run() --> addurls() --> add_own_urls() --> download()
             ^                                |
             |                                |
             <-------<-----<--------<-------<--
'''


if __name__ == '__main__':
    start = time.time()
    loop = asyncio.get_event_loop()
    ins = Instagram('taeri__taeri', loop)
    future = asyncio.ensure_future(ins.addurls(), loop=loop)
    try:
        loop.add_signal_handler(signal.SIGINT, loop.stop)
    except RuntimeError:
        pass
    loop.run_forever()
    # loop.run_until_complete(future)
    # for i in future.result():
    #     print(">>>>", i)
    # ins.main()
    end = time.time()
    print('耗时:', end - start)

我遇到的问题是不使用 Semaphore 的情况下一开始是疯狂下载,也的确是下载成功了,然后就直接卡住,也不停,就一直卡住(原谅我使用卡住这个词),希望能帮忙看一下错在哪,谢谢了

2211 次点击
所在节点    Python
10 条回复
zckun
2019-05-09 11:09:23 +08:00
草图在这。。。为了找画图工具用了半个多小时 https://github.com/ZCKun/d/blob/master/a.png
zckun
2019-05-09 11:11:55 +08:00
ins 是新号,所以 cookie 没去掉就算了
zckun
2019-05-09 11:15:11 +08:00
不能重新编辑么。。代码有些错误忘了删除,希望别介意
zckun
2019-05-09 14:08:25 +08:00
emmmm
CSM
2019-05-09 14:15:46 +08:00
你好,研究了下你的代码,发现一个小问题

# 这个判断根本没有任何用,不会调用,直接卡住
if self.end_cursor is False:

这个是因为之前没有下一页的时候 end_cursor 是 '' 空字符串,而不是 False。


另外就是我觉得你的架构上有问题,这个问题是经典的生产者-消费者模型,请求并解析出图片链接作为生产者,然后启动多个消费者来下载这些链接就行了。我重构了一下你的代码,具体可见 https://gist.github.com/cshuaimin/4cf8d769b88e93fc805ceefb9af8c1f4
CSM
2019-05-09 14:25:25 +08:00
还有就是可以看到在 _http_request 方法里为每一个请求都生成了一个 ClientSession,这样太浪费了,建议只用一个 session。doc:

Session encapsulates a connection pool (connector instance) and supports keepalives by default. Unless you are connecting to a large, unknown number of different servers over the lifetime of your application, it is suggested you use a single session for the lifetime of your application to benefit from connection pooling.

https://docs.aiohttp.org/en/stable/client_reference.html
zckun
2019-05-09 14:49:42 +08:00
@GSM 之前 get_end_cursor 方法获取不到的话是返回 False 的,,忘了改了
zckun
2019-05-09 14:50:01 +08:00
@CSM 谢谢你,我看一下
shawndev
2019-05-09 15:06:32 +08:00
GvR 有一个 500lines 项目你可以参考一下。另外楼上说的很对,这是典型的生产者消费者模式,另外似乎没有考虑去重、重定向和超时重试?
zckun
2019-05-09 16:05:44 +08:00
@shawndev 知道了,以前学 java 的时候学过,但是已经忘得差不多了,我去复习,谢谢

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/562445

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX