首页   注册   登录
V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
爱意满满的作品展示区。
V2EX  ›  分享创造

使用生成器把 Kafka 写入速度提高 1000 倍

  •  1
     
  •   locktionc · 8 天前 · 1913 次点击

    原文地址:使用生成器把 Kafka 写入速度提高 1000 倍

    疑惑

    多年以前,当我刚刚开始学习 Python 协程的时候,我看到绝大多数的文章都举了一个生产者-消费者的例子,用来表示在生产者内部可以随时调用消费者,达到和多线程相同的效果。这里凭记忆简单还原一下当年我看到的代码:

    import time
    
    
    def consumer():
        product = None
        while True:
            if product is not None:
                print('consumer: {}'.format(product))
            product = yield None
    
    
    def producer():
        c = consumer()
        next(c)
        for i in range(10):
            c.send(i)
    
    start = time.time()
    producer()
    end = time.time()
    print(f'直到把所有数据塞入 Kafka,一共耗时:{end - start}秒')
    

    运行效果如下图所示。

    这些文章的说法,就像统一好了口径一样,说这样写可以减少线程切换开销,从而大大提高程序的运行效率。但是当年我始终想不明白,这种写法与直接调用函数有什么区别,如下图所示。

    直到后来我需要操作 Kafka 的时候,我明白了使用 yield 的好处。

    探索

    为了便于理解,我会把实际场景做一些简化,以方便说明事件的产生发展和解决过程。事件的起因是我需要把一些信息写入到 Kafka 中,我的代码一开始是这样的:

    import time
    from pykafka import KafkaClient
    
    client = KafkaClient(hosts="127.0.0.1:9092")
    topic = client.topics[b'test']
    
    
    def consumer(product):
        with topic.get_producer(delivery_reports=True) as producer:
            producer.produce(str(product).encode())
    
    
    def feed():
        for i in range(10):
            consumer(i)
    
    
    start = time.time()
    feed()
    end = time.time()
    print(f'直到把所有数据塞入 Kafka,一共耗时:{end - start}秒')
    

    这段代码的运行效果如下图所示。

    写入 10 条数据需要 100 秒,这样的龟速显然是有问题的。问题就出在这一句代码:

    with topic.get_producer(delivery_reports=True) as producer
    

    获得 Kafka 生产者对象是一个非常耗费时间的过程,每获取一次都需要 10 秒钟才能完成。所以写入 10 个数据就获取十次生产者对象。这消耗的 100 秒主要就是在获取生产者对象,而真正写入数据的时间短到可以忽略不计。

    由于生产者对象是可以复用的,于是我对代码作了一些修改:

    import time
    from pykafka import KafkaClient
    
    client = KafkaClient(hosts="127.0.0.1:9092")
    topic = client.topics[b'test']
    products = []
    
    
    def consumer(product_list):
        with topic.get_producer(delivery_reports=True) as producer:
            for product in product_list:
                producer.produce(str(product).encode())
    
    
    def feed():
        for i in range(10):
            products.append(i)
        consumer(products)
    
    
    start = time.time()
    feed()
    end = time.time()
    print(f'直到把所有数据塞入 Kafka,一共耗时:{end - start}秒')
    

    首先把所有数据存放在一个列表中,最后再一次性给 consumer 函数。在一个 Kafka 生产者对象中展开列表,再把数据一条一条塞入 Kafka。这样由于只需要获取一次生产者对象,所以需要耗费的时间大大缩短,如下图所示。

    这种写法在数据量小的时候是没有问题的,但数据量一旦大起来,如果全部先放在一个列表里面的话,服务器内存就爆了。

    于是我又修改了代码。每 100 条数据保存一次,并清空暂存的列表:

    import time
    from pykafka import KafkaClient
    
    client = KafkaClient(hosts="127.0.0.1:9092")
    topic = client.topics[b'test']
    
    
    def consumer(product_list):
        with topic.get_producer(delivery_reports=True) as producer:
            for product in product_list:
                producer.produce(str(product).encode())
    
    
    def feed():
        products = []
        for i in range(1003):
            products.append(i)
            if len(products) >= 100:
                consumer(products)
                products = []
    
        if products:
            consumer(products)
    
    
    start = time.time()
    feed()
    end = time.time()
    print(f'直到把所有数据塞入 Kafka,一共耗时:{end - start}秒')
    

    由于最后一轮循环可能无法凑够 100 条数据,所以feed函数里面,循环结束以后还需要判断products列表是否为空,如果不为空,还要再消费一次。这样的写法,在上面这段代码中,一共 1003 条数据,每 100 条数据获取一次生产者对象,那么需要获取 11 次生产者对象,耗时至少为 110 秒。

    显然,要解决这个问题,最直接的办法就是减少获取 Kafka 生产者对象的次数并最大限度复用生产者对象。如果读者举一反三的能力比较强,那么根据开关文件的两种写法:

    # 写法一
    with open('test.txt', 'w', encoding='utf-8') as f:
        f.write('xxx')
        
    # 写法二
    f = open('test.txt', 'w', encoding='utf-8')
    f.write('xxx')
    f.close()
    

    可以推测出获取 Kafka 生产者对象的另一种写法:

    # 写法二
    producer = topic.get_producer(delivery_reports=True)
    producer.produce(b'xxxx')
    producer.close()
    

    这样一来,只要获取一次生产者对象并把它作为全局变量就可以一直使用了。

    然而,pykafka 的官方文档中使用的是第一种写法,通过上下文管理器with来获得生产者对象。暂且不论第二种方式是否会报错,只从写法上来说,第二种方式必需要手动关闭对象。开发者经常会出现开了忘记关的情况,从而导致很多问题。而且如果中间出现了异常,使用上下文管理器的第一种方式会自动关闭生产者对象,但第二种方式仍然需要开发者手动关闭。

    函数 VS 生成器

    但是如果使用第一种方式,怎么能在一个上下文里面接收生产者传进来的数据呢?这个时候才是 yield 派上用场的时候。

    首先需要明白,使用 yield 以后,函数就变成了一个生成器。生成器与普通函数的不同之处可以通过下面两段代码来进行说明:

    def funciton(i):
        print('进入')
        print(i)
        print('结束')
    
    for i in range(5):
        funciton(i)
    
    

    运行效果如下图所示。

    函数在被调用的时候,函数会从里面的第一行代码一直运行到某个return或者函数的最后一行才会退出。

    而生成器可以从中间开始运行,从中间跳出。例如下面的代码:

    def generator():
        print('进入')
        i = None
        while True:
            if i is not None:
                print(i)
            print('跳出')
            i = yield None
    
    g = generator()
    next(g)
    for i in range(5):
        g.send(i)
    

    运行效果如下图所示。

    从图中可以看到,进入只打印了一次。代码运行到i = yield None后就跳到外面,外面的数据可以通过g.send(i)的形式传进生成器,生成器内部拿到外面传进来的数据以后继续执行下一轮while循环,打印出被传进来的内容,然后到i = yield None的时候又跳出。如此反复。

    所以回到最开始的 Kafka 问题。如果把with topic.get_producer(delivery_reports=True) as producer写在上面这一段代码的print('进入')这个位置上,那岂不是只需要获取一次 Kafka 生产者对象,然后就可以一直使用了?

    根据这个逻辑,设计如下代码:

    import time
    from pykafka import KafkaClient
    
    client = KafkaClient(hosts="127.0.0.1:9092")
    topic = client.topics[b'test']
    
    
    def consumer():
        with topic.get_producer(delivery_reports=True) as producer:
            print('init finished..')
            next_data = ''
            while True:
                if next_data:
                    producer.produce(str(next_data).encode())
                next_data = yield True
    
    
    def feed():
        c = consumer()
        next(c)
        for i in range(1000):
            c.send(i)
    
    start = time.time()
    feed()
    end = time.time()
    print(f'直到把所有数据塞入 Kafka,一共耗时:{end - start}秒')
    

    这一次直接插入 1000 条数据,总共只需要 10 秒钟,相比于每插入一次都获取一次 Kafka 生产者对象的方法,效率提高了 1000 倍。运行效果如下图所示。

    后记

    读者如果仔细对比第一段代码和最后一段代码,就会发现他们本质上是一回事。但是第一段代码,也就是网上很多人讲 yield 的时候举的生产者-消费者的例子之所以会让人觉得毫无用处,就在于他们的消费者几乎就是秒运行,这样看不出和函数调用的差别。而我最后这一段代码,它的消费者分成两个部分,第一部分是获取 Kafka 生产者对象,这个过程非常耗时;第二部分是把数据通过 Kafka 生产者对象插入 Kafka,这一部分运行速度极快。在这种情况下,使用生成器把这个消费者代码分开,让耗时长的部分只运行一次,让耗时短的反复运行,这样就能体现出生成器的优势。

    12 回复  |  直到 2018-04-15 21:14:10 +08:00
        1
    ligyxy   8 天前   ♥ 1
    1. 速度慢是因为它写错了,和生成器没有关系
    2. 根据这篇文章,confluent_kafka 有更好的效率 http://activisiongamescience.github.io/2016/06/15/Kafka-Client-Benchmarking/
    3. 既然你提到生成器,aiokafka 也可以了解一下
        2
    kingname   8 天前 via iPhone
    @ligyxy 这篇文章的目的是列举一个更合适的 yield 应用场景。kafka 只是一个例子而已。
        3
    lusi1990   8 天前 via Android
    mark 回去了解一下
        4
    laxenade   8 天前
    脑洞:如果你把 with ... as producer 放在 feed()里,然后把 producer 传进 consumer()会怎么样,理论上也能解决问题吧。(就单线程而言)
        5
    locktionc   8 天前
    @laxenade 是这样的,解决方法有很多种。我觉得我这篇文章的标题没有取好。Kafka 只是一个例子而已。结果大家都去关心 Kafka 去了。
        6
    mengzx   8 天前 via Android
    mark
        7
    freshpassport   8 天前
    谢谢分享
        8
    KIDJourney   7 天前
    。。。。这明显是代码问题。
        9
    KIDJourney   7 天前
    我理解把 bug 修好不算优化。
        10
    dbdd   6 天前 via iPhone   ♥ 1
    想起了

    通过去掉代码中预先写好的 sleep 把程序速度提高 1000 倍
        11
    locktionc   6 天前
    @KIDJourney 是我的标题取的不好。我只是想写 yield 的一个应用场景。Kafka 只是一个例子而已。这篇文章不是讲 Kafka 优化的。
        12
    KIDJourney   6 天前
    @locktionc 你要解决的无非是 kafka client init 时间过长,你用生成器无非是绕了个圈子解决了这个问题。
    关于   ·   FAQ   ·   API   ·   我们的愿景   ·   广告投放   ·   鸣谢   ·   1798 人在线   最高记录 3541   ·  
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.0 · 19ms · UTC 10:34 · PVG 18:34 · LAX 03:34 · JFK 06:34
    ♥ Do have faith in what you're doing.
    沪ICP备16043287号-1