page contents

Python 3.15 这3个线程工具救了多少新手?生成器多线程再也不崩溃了

这三个工具叫 serialize_iterator、synchronized_iterator 和 concurrent_tee。名字看着吓人,但用起来比你加锁的方案简单十倍。看完这篇,你再也不用为生成器多线程崩溃发愁了。

attachments-2026-07-ZE7IVxKx6a47130ba5645.png

你有没有遇到过这种情况——

你写了一个生成器函数,用它来逐条处理数据。效果很好,单线程跑起来没问题。然后你想提速,加了两条线程一起跑,结果还没跑两秒就炸了:

ValueError: generator already executing

你一脸懵:什么是"generator already executing"?我没重复调用啊,就是两个线程一起读同一个生成器而已。

然后你开始百度,搜到一堆英文论坛在讨论"generator is not thread-safe",有人说加锁有人说不加锁有人说用队列。你试了加锁,锁加在哪儿?生成器又不是普通函数,你锁了 next() 调用还是锁了 yield?折腾半小时,代码更乱了。

这不是你的问题。Python 的生成器在设计之初就不支持多线程并发访问,这是语言层面的限制,不是你代码写得不好。但 Python 3.15 终于给了你一把钥匙——三个专门解决这个问题的工具。

这三个工具叫 serialize_iteratorsynchronized_iterator  concurrent_tee。名字看着吓人,但用起来比你加锁的方案简单十倍。看完这篇,你再也不用为生成器多线程崩溃发愁了。

一、生成器为什么在多线程下会崩溃?

先搞清楚原因,后面看解决方案才更有感觉。

Python 的生成器是一种特殊的迭代器。它不像 list 那样把所有数据放在内存里等着你来取,而是你每次调用 next(),它才""一个值出来,吐完就暂停(yield),等下一次 next() 再继续。

这个暂停-继续机制,依赖一个叫"执行帧"的东西。生成器记住自己停在了哪里,下次被唤醒时从暂停点继续往下跑。

问题来了:一个执行帧同一时间只能被一个线程占用。如果线程 A 正在执行 next() 让生成器从 yield 恢复,线程 B 也来调用 next(),生成器发现"我已经在跑着了",直接抛出 ValueError

import threading

 

def counter(n):

    for i in range(n):

        yield i

 

gen = counter(10)

 

# 两个线程同时调用 next(gen)

def consume():

    try:

        while True:

            print(next(gen))

    except StopIteration:

        pass

 

t1 = threading.Thread(target=consume)

t2 = threading.Thread(target=consume)

t1.start()

t2.start()

# 很快就会抛出 ValueError

你可能觉得:"那我加个锁不就行了?"——确实可以,但你自己加锁有三个坑:

1. 锁加在哪儿? 你需要锁住整个 next() 调用链,不是简单加一行 threading.Lock() 就完事。

2. 生成器还有 send()throw()close(),这些也得锁,漏一个就炸。

3. 每个生成器都要自己的锁,你得手动管理锁的创建和销毁,代码量翻倍。

所以 Python 3.15 直接帮你干了这件事,而且干得更干净。

二、serialize_iterator:给生成器套上一层"防护罩"

serialize_iterator 是最基础的工具。它的作用很简单:给你的迭代器套上一层内部锁,让多个线程轮流访问,谁也不能同时挤进去。

用法极其简单,一行代码搞定:

import threading

 

def counter(n):

    for i in range(n):

        yield i

 

# 原来的生成器,多线程会崩溃

# gen = counter(10)

 

# serialize_iterator 包装,多线程安全

gen = threading.serialize_iterator(counter(10))

 

def consume(name):

    for item in gen:

        print(f"{name} got {item}")

 

t1 = threading.Thread(target=consume, args=("A",))

t2 = threading.Thread(target=consume, args=("B",))

t1.start()

t2.start()

t1.join()

t2.join()

# 每个数字只被打印一次,不会崩溃

注意一个关键细节:每个值只会被一个线程拿到。不是两个线程各拿一份,而是像切蛋糕一样瓜分——线程 A 拿了 0,线程 B 拿了 1,线程 A 又拿了 2,以此类推。

这种"瓜分"模式非常适合任务分发场景:比如你有1000URL要爬,两个线程各爬一部分,谁也不重复。以前你得用队列 Queue 来做这件事,现在用 serialize_iterator 包一下生成器就行,代码少一半。

还有一个好处:serialize_iterator 不仅锁住了 next(),还锁住了 send()throw()close()。你不用操心"还有哪个方法没锁"Python 全帮你锁了。

三、synchronized_iterator concurrent_tee:更聪明的两个兄弟

serialize_iterator 是手动包装,每次调用都要写 threading.serialize_iterator(...)。如果你有一个生成器函数经常被多线程使用,每次手动包也太烦了。

于是 Python 3.15 给了你第二个工具:synchronized_iterator,它是个装饰器,直接贴在函数头上就行。

import threading

 

@threading.synchronized_iterator

def counter(n):

    for i in range(n):

        yield i

 

# 调用时就自动线程安全了,不用手动包装

gen = counter(10)

 

def consume(name):

    for item in gen:

        print(f"{name} got {item}")

 

t1 = threading.Thread(target=consume, args=("A",))

t2 = threading.Thread(target=consume, args=("B",))

t1.start()

t2.start()

t1.join()

t2.join()

效果和 serialize_iterator 一样——每个值只给一个线程,但写法更优雅。你只要在函数定义的时候加一行装饰器,以后所有调用点自动线程安全,不用到处写包装代码。

但如果你想让每个线程都拿到完整的所有值(不是瓜分,而是广播),那就得用第三个工具了。

concurrent_tee itertools.tee() 的线程安全版。tee() 的作用是把一个迭代器拆成几个独立的副本,每个副本都能从头到尾读完所有值。但 itertools.tee() 不支持多线程——你两个线程各拿一个副本并发跑,底层共享缓冲区会打架。

import threading

 

def squares(n):

    for x in range(n):

        yield x * x

 

source = squares(5)

# 拆成2个线程安全的独立副本

left, right = threading.concurrent_tee(source)

 

def consume(name, it):

    for item in it:

        print(f"{name} got {item}")

 

t1 = threading.Thread(target=consume, args=("A", left))

t2 = threading.Thread(target=consume, args=("B", right))

t1.start()

t2.start()

t1.join()

t2.join()

# A B 都能看到完整的 0,1,4,9,16

concurrent_tee serialize_iterator 的区别,我用一个比喻帮你记住:

serialize_iterator = 切蛋糕。一块蛋糕,大家各切一块,切完就没有了。适合"任务分发"

concurrent_tee = 复印。一份文件,每个线程复印一份,各看各的。适合"广播通知"

举个实际场景:你有一个生成器实时推送股价变动。两个线程,一个是日志线程要记录每一笔变动,一个是策略线程要根据变动做决策。两者都需要完整的数据流,不能互相抢。这就是 concurrent_tee 的用武之地。

四、3个新手容易踩的坑

工具好用,但有几个坑提前知道能省不少时间:

1serialize_iterator 包装后还是同一个迭代器

serialize_iterator 返回的是包装器,底层还是同一个生成器。这意味着所有线程共享同一个数据源,值被消费了就没了。如果你不小心在某个线程里用 for 循环跑完了整个迭代器,其他线程拿到的是空迭代器。

gen = threading.serialize_iterator(counter(10))

 

# 线程A先跑完了所有值

for item in gen:

    print(item)

 

# 线程B再来,gen 已经空了

# for item in gen: ...  # StopIteration!

如果你需要每个线程都拿到完整数据,用 concurrent_tee,不是 serialize_iterator

2concurrent_tee 会缓冲数据,别用在无限生成器上

concurrent_tee 需要把值保存在缓冲区里,直到所有副本都消费完。如果你的生成器是无限的(比如 while True: yield ...),缓冲区会越来越大,最终内存炸掉。

def infinite_stream():

    while True:

        yield random.randint(0, 100)

 

# 千万别这么做!

# left, right = threading.concurrent_tee(infinite_stream())

# 缓冲区无限增长,内存迟早爆

无限生成器用 serialize_iterator 就行——因为它不缓冲,值拿走就没了。

3synchronized_iterator 只能装饰生成器函数,不能装饰普通迭代器

synchronized_iterator 是装饰器,它只能用在定义生成器的函数上。如果你已经有一个 list range 对象,不能给它贴装饰器——请用 serialize_iterator 手动包装。

# 错误:不能装饰普通迭代器

# @threading.synchronized_iterator

# my_list = [1, 2, 3]  # 这不是函数,报错

 

# 正确:手动包装

safe_list = threading.serialize_iterator(iter([1, 2, 3]))

五、总结:三句话记住这三个工具

这篇文章讲的东西很简单,但它解决了一个困扰新手多年的真实痛点。

以前你想在多线程里用生成器,要么崩溃要么手动加锁。加锁的代码又丑又容易漏,每次写完都心虚。Python 3.15 直接给了你三个工具,一行代码搞定。

最后,三句话帮你记住:

1. serialize_iterator = 切蛋糕。多个线程瓜分同一个生成器,每个值只给一个人。一行包装,告别 ValueError

2. synchronized_iterator = 装饰器版的 serialize_iterator。贴在函数头上,以后所有调用自动线程安全。

3. concurrent_tee = 复印。每个线程拿到完整数据副本,适合广播场景。但别用在无限生成器上。

说到底,Python 3.15 做了一件很"新手友好"的事:你不需要懂锁的原理,不需要知道生成器的执行帧,只需要一行代码,问题就解决了。这才是编程工具该有的样子——把复杂的事情藏起来,让你专注写业务逻辑。

你有没有在多线程里用过生成器?踩过什么坑?评论区聊聊,说不定你的经历能帮到更多人。

 更多相关技术内容咨询欢迎前往并持续关注好学星城论坛了解详情。

想高效系统的学习Python编程语言,推荐大家关注一个微信公众号:Python编程学习圈。每天分享行业资讯、技术干货供大家阅读,关注即可免费领取整套Python入门到进阶的学习资料以及教程,感兴趣的小伙伴赶紧行动起来吧。

attachments-2022-05-rLS4AIF8628ee5f3b7e12.jpg

你可能感兴趣的文章

相关问题

0 条评论

请先 登录 后评论
Pack
Pack

2187 篇文章

作家榜 »

  1. 轩辕小不懂 2403 文章
  2. 小柒 2228 文章
  3. Pack 2187 文章
  4. Nen 576 文章
  5. 王昭君 209 文章
  6. 文双 71 文章
  7. 小威 64 文章
  8. Cara 36 文章