
本文深入探讨了在python中处理io密集型web api调用时,多进程方法可能比单进程更慢的常见问题。文章分析了进程创建与进程间通信(ipc)的开销,阐明了io密集型任务的特性,并提供了使用`multiprocessing.pool`来优化进程管理、以及考虑多线程或异步io作为更高效替代方案的详细指导,强调了`requests.session`在连接复用中的重要性。
在Python中,当尝试利用多进程加速Web API调用等IO密集型任务时,有时会发现其性能反而不如单进程执行。这通常是由于以下几个关键因素导致的:
原始的多进程代码尝试手动创建并管理进程,如下所示:
from multiprocessing import Process, Queue
import requests
import time
def pull_data(row, q):
url_api = '*web api*' # 实际API地址
post_json = {'data': row} # 示例数据结构
try:
x = requests.post(url_api, json=post_json)
q.put(x.json())
except requests.exceptions.RequestException as e:
print(f"Error pulling data for {row}: {e}")
q.put(None) # 放入None或其他错误标识
rows = ['SN1', 'SN2', 'SN3', 'SN4', 'SN5', 'SN6'] # 示例数据
# 模拟分批处理
for i in range(0, len(rows), 3):
jobs = []
json_f = []
q = Queue()
t_s = time.time()
# 手动创建并启动进程
if 0 <= i < len(rows):
p1 = Process(target=pull_data, args=(rows[i], q))
jobs.append(p1)
p1.start()
if 1 <= i + 1 < len(rows):
p2 = Process(target=pull_data, args=(rows[i + 1], q))
jobs.append(p2)
p2.start()
if 2 <= i + 2 < len(rows):
p3 = Process(target=pull_data, args=(rows[i + 2], q))
jobs.append(p3)
p3.start()
for proc in jobs:
proc.join() # 等待所有进程完成
t_e = time.time()
while not q.empty():
result = q.get()
if result is not None:
json_f.append(result)
print(f"\nBatch query completed in {format(t_e - t_s, '.2f')} seconds. Results: {len(json_f)}")这段代码的问题在于:
multiprocessing.Pool提供了一种更高效、更简洁的方式来管理进程池,它能够复用固定数量的进程来执行任务,从而摊销了进程创建的开销。
立即学习“Python免费学习笔记(深入)”;
from multiprocessing import Pool
import requests
import time
# 优化后的pull_data函数,不再需要Queue参数
def pull_data(row):
url_api = '*web api*' # 实际API地址
post_json = {'data': row} # 示例数据结构
try:
# 建议在实际应用中使用requests.Session来复用连接
# worker_session = requests.Session()
# x = worker_session.post(url_api, json=post_json)
x = requests.post(url_api, json=post_json)
return x.json(), row # 返回结果和原始row,方便追踪
except requests.exceptions.RequestException as e:
print(f"Error pulling data for {row}: {e}")
return None, row # 发生错误时返回None和原始row
def database_test():
rows = [f'SN{i}' for i in range(1, 21)] # 示例数据,假设有20个SN
t_s = time.time()
# 使用Pool来管理进程
# max_workers参数决定了同时运行的进程数量
# 建议根据CPU核心数和任务类型进行调整
with Pool(processes=5) as pool:
# pool.map会阻塞直到所有任务完成,并按输入顺序返回结果
results = pool.map(pull_data, rows)
t_e = time.time()
print(f"\nTotal query completed in {format(t_e - t_s, '.2f')} seconds.")
successful_results = [res for res, row in results if res is not None]
print(f"Successfully retrieved {len(successful_results)} results.")
# 打印部分结果示例
# for res, row in results[:3]:
# print(f"Row {row} result: {res}")
if __name__ == '__main__':
database_test()对于IO密集型任务,除了multiprocessing.Pool,还有更适合的并发模型:
多线程(threading模块配合concurrent.futures.ThreadPoolExecutor): 尽管Python的GIL限制了多线程在CPU密集型任务上的并行性,但对于IO密集型任务(如网络请求),当一个线程在等待IO时,GIL会被释放,允许其他线程运行。这意味着多线程可以有效地并行执行IO操作。线程的创建和切换开销远小于进程。
from concurrent.futures import ThreadPoolExecutor
import requests
import time
def pull_data_thread(row, session):
url_api = '*web api*'
post_json = {'data': row}
try:
x = session.post(url_api, json=post_json)
return x.json(), row
except requests.exceptions.RequestException as e:
print(f"Error pulling data for {row}: {e}")
return None, row
def database_test_threaded():
rows = [f'SN{i}' for i in range(1, 21)]
t_s = time.time()
results = []
# 使用requests.Session来复用连接,这对于Web API请求至关重要
with requests.Session() as session:
with ThreadPoolExecutor(max_workers=10) as executor: # 线程数量可以设置得更高
# 提交任务,并传递session对象
futures = [executor.submit(pull_data_thread, row, session) for row in rows]
for future in futures:
results.append(future.result())
t_e = time.time()
print(f"\nTotal threaded query completed in {format(t_e - t_s, '.2f')} seconds.")
successful_results = [res for res, row in results if res is not None]
print(f"Successfully retrieved {len(successful_results)} results.")
if __name__ == '__main__':
database_test_threaded()关键点:在多线程中,使用requests.Session至关重要。Session对象允许requests复用底层的TCP连接,避免了每次请求都建立新连接的开销,这能显著提升性能。
异步IO(asyncio模块配合aiohttp): 对于极致的IO并发性能,asyncio是Python的现代解决方案。它使用单个线程和事件循环来管理大量的并发IO操作,而没有线程切换或进程创建的开销。需要使用支持asyncio的HTTP客户端库,如aiohttp。
import asyncio
import aiohttp
import time
async def pull_data_async(row, session):
url_api = '*web api*'
post_json = {'data': row}
try:
async with session.post(url_api, json=post_json) as response:
return await response.json(), row
except aiohttp.ClientError as e:
print(f"Error pulling data for {row}: {e}")
return None, row
async def database_test_async():
rows = [f'SN{i}' for i in range(1, 21)]
t_s = time.time()
results = []
# aiohttp.ClientSession用于异步IO中的连接复用
async with aiohttp.ClientSession() as session:
tasks = [pull_data_async(row, session) for row in rows]
results = await asyncio.gather(*tasks)
t_e = time.time()
print(f"\nTotal async query completed in {format(t_e - t_s, '.2f')} seconds.")
successful_results = [res for res, row in results if res is not None]
print(f"Successfully retrieved {len(successful_results)} results.")
if __name__ == '__main__':
asyncio.run(database_test_async())关键点:aiohttp.ClientSession是asyncio环境下进行HTTP请求并复用连接的标准做法。
在Python中处理Web API等IO密集型任务时,multiprocessing.Process的直接使用可能因进程创建和IPC开销而适得其反。为了优化性能,我们应该:
通过选择合适的并发模型和遵循最佳实践,可以显著提升Python应用程序在处理大量Web API请求时的效率。
以上就是优化Python Web API调用性能:多进程为何可能更慢及其解决方案的详细内容,更多请关注php中文网其它相关文章!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号