
在处理诸如从大型文件中读取url并进行网络请求等i/o密集型任务时,并发编程是提升效率的关键。python的threading模块和queue.queue提供了构建并发系统的基础工具。然而,如果不正确地使用这些工具,尤其是在涉及有界队列(queue(maxsize=...))时,很容易陷入死锁或资源管理不当的困境。
在原始问题中,用户尝试使用queue.Queue(maxsize=10)来限制队列的大小,但在填充队列时,脚本却陷入了停滞。这正是典型的生产者-消费者死锁问题。
让我们分析一下原始代码的结构:
class UrlConverter:
def load(self, filename: str):
# ...
queue = Queue(maxsize=10) # 设定了最大容量
with open(urls_file_path, 'r', encoding="utf-8") as txt_file:
for line in txt_file:
line = line.strip()
queue.put(line) # 在这里尝试填充队列
return queue
# ...
def main():
url_converter = UrlConverter()
urls_queue = url_converter.load('urls.txt') # 生产者在这里一次性填充队列
fetcher_threads.execute(urls_queue) # 消费者(线程)在这里才开始从队列取数据问题出在UrlConverter.load方法中。当queue = Queue(maxsize=10)被初始化后,for line in txt_file: queue.put(line)循环会尝试将所有URL一次性放入队列。一旦队列达到其最大容量(例如10个),queue.put(line)方法就会阻塞,等待队列中有空位。
然而,此时并没有任何消费者线程正在从队列中取出数据。FetcherThreads.execute方法,即消费者逻辑,只有在url_converter.load完全执行完毕并返回队列后才会开始运行。这种顺序导致了死锁:生产者在等待消费者释放空间,而消费者尚未启动。
立即学习“Python免费学习笔记(深入)”;
如果maxsize未指定(即队列无界),queue.put将永远不会阻塞,所有URL会被一次性加载到内存中。对于小型文件这没有问题,但对于大型文件,这可能导致内存耗尽。
要解决上述问题,我们需要采用经典的“生产者-消费者”模式。在这种模式中:
关键在于,生产者和消费者必须能够并发运行。生产者在填充队列的同时,消费者也应能从队列中取出并处理数据。当队列满时,生产者应暂停;当队列空时,消费者应暂停,直到有新的数据可用。queue.Queue本身提供了这种同步机制,但手动管理线程和其生命周期会增加复杂性。
Python标准库提供了更高级的抽象来处理这类并发模式,大大简化了线程和队列的管理。multiprocessing.pool.ThreadPool是threading模块的更高级封装,它提供了一个线程池,可以方便地将任务分发给多个工作线程。对于I/O密集型任务(如网络请求),ThreadPool通常是比手动管理线程更优的选择,因为它能有效利用I/O等待时间。
该方法的核心组件包括:
以下是使用multiprocessing.pool.ThreadPool重构后的代码,它解决了原始问题中的死锁和效率问题:
from multiprocessing.pool import ThreadPool
import requests
from pathlib import Path
import time
# 辅助函数:生成示例urls.txt文件
def create_sample_urls_file(filename="urls.txt"):
urls_content = """
https://en.wikipedia.org/wiki/Sea-level_rise
https://en.wikipedia.org/wiki/Sequoia_National_Park
https://en.wikipedia.org/wiki/Serengeti
https://en.wikipedia.org/wiki/Sierra_Nevada_(Utah)
https://en.wikipedia.org/wiki/Sonoran_Desert
https://en.wikipedia.org/wiki/Steppe
https://en.wikipedia.org/wiki/Swiss_Alps
https://en.wikipedia.org/wiki/Taiga
https://en.wikipedia.org/wiki/Tatra_Mountains
https://en.wikipedia.org/wiki/Temperate_rainforest
https://en.wikipedia.org/wiki/Tropical_rainforest
https://en.wikipedia.org/wiki/Tundra
https://en.wikipedia.org/wiki/Ural_Mountains
https://en.wikipedia.org/wiki/Wetland
https://en.wikipedia.org/wiki/Wildlife_conservation
https://en.wikipedia.org/wiki/Salt_marsh
https://en.wikipedia.org/wiki/Savanna
https://en.wikipedia.org/wiki/Scandinavian_Mountains
https://en.wikipedia.org/wiki/Subarctic_tundra
https://en.wikipedia.org/wiki/Stream_(freshwater)
"""
file_path = Path(__file__).parent / Path(filename)
if not file_path.exists():
file_path.write_text(urls_content.strip(), encoding="utf-8")
print(f"创建了示例文件: {filename}")
else:
print(f"文件 {filename} 已存在,跳过创建。")
# 生成器函数:惰性地从文件中读取URL
def get_urls(file_name):
urls_file_path = str(Path(__file__).parent / Path(file_name))
try:
with open(urls_file_path, 'r', encoding="utf-8") as f_in:
for url in map(str.strip, f_in):
if url: # 过滤掉空行
yield url
except FileNotFoundError:
print(f"错误: 文件 '{file_name}' 未找到。请确保文件存在。")
return # 返回空生成器
# 工作函数:处理单个URL任务
def process_url(url):
try:
# 模拟网络请求,并设置超时以防止长时间阻塞
response = requests.get(url, timeout=10)
return url, response.status_code
except requests.exceptions.Timeout:
return url, "Error: Request timed out"
except requests.exceptions.RequestException as e:
return url, f"Error: {e}"
except Exception as e:
return url, f"Unexpected Error: {e}"
if __name__ == "__main__":
# 确保urls.txt文件存在
create_sample_urls_file("urls.txt")
num_workers = 5 # 设定线程池的大小,例如5个工作线程
print(f"开始使用 {num_workers} 个线程处理URL任务...")
start_time = time.time()
# 使用ThreadPool上下文管理器,确保线程池正确关闭
with ThreadPool(processes=num_workers) as pool:
# imap_unordered 惰性地从 get_urls 获取任务,并将它们分发给线程池中的工作线程。
# 结果会以任务完成的顺序返回,而不是输入的顺序。
for url, result in pool.imap_unordered(process_url, get_urls("urls.txt")):
print(f"处理完成: {url} -> {result}")
end_time = time.time()
print(f"\n所有URL任务处理完毕。总耗时: {end_time - start_time:.2f} 秒。")代码解析:
multiprocessing模块提供了两种主要的进程/线程池:
对于本教程中的URL抓取任务,由于其主要瓶颈在于网络I/O等待,ThreadPool是更合适的选择,因为它提供了轻量级的并发,且能有效利用I/O等待时间。
以上就是Python多线程并发:利用ThreadPool高效处理大规模任务队列的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号