
本文将介绍如何在 Jupyter Notebook 中实现一个简单的并行任务队列,它允许你在后台运行耗时的函数,而不会阻塞 Notebook 的交互。这个实现方案的核心是使用 concurrent.futures.ThreadPoolExecutor 来管理线程池,以及使用 ipywidgets.Output 来捕获和显示子线程的输出。
import sys
import asyncio
import concurrent.futures
import ipywidgets
threadpool = concurrent.futures.ThreadPoolExecutor(4)
def run(fn, *args, **kwds):
"run fn in threadpool"
out = ipywidgets.Output()
def print(*args, file=sys.stdout):
line = ' '.join(map(str, args)) + '\n'
if file is sys.stderr:
out.append_stderr(line)
else:
out.append_stdout(line)
def done(fut: asyncio.Future):
try:
result = fut.result()
except asyncio.CancelledError:
print("cancelled", fut, file=sys.stderr)
except Exception:
print("failed", fut, file=sys.stderr)
else:
print("completed", fut)
async def go():
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
threadpool,
lambda: fn(print, *args, **kwds),
)
task = asyncio.create_task(go())
task.add_done_callback(done)
return out上述代码定义了一个 run 函数,它接受一个函数 fn 和任意数量的参数 *args 和关键字参数 **kwds。run 函数的主要功能是:
使用示例
以下是一个使用 run 函数的示例:
import time
def cpu_bound(print, dt, fail=False):
for i in range(10):
time.sleep(dt)
print(i, time.time())
if fail:
1 / 0
return "done"
run(cpu_bound, 0.1)在这个例子中,cpu_bound 函数模拟一个耗时的 CPU 密集型任务。它接受一个 print 函数,一个时间间隔 dt,以及一个可选的 fail 参数。该函数循环 10 次,每次休眠 dt 秒,并使用传入的 print 函数打印当前迭代次数和时间戳。如果 fail 参数为 True,则会引发一个 ZeroDivisionError 异常。
通过调用 run(cpu_bound, 0.1),cpu_bound 函数将在后台线程中执行,而不会阻塞 Notebook。输出将显示在与 run 函数调用相关的 ipywidgets.Output 对象中。
处理异常
run 函数还能够处理在线程中发生的异常。例如:
run(cpu_bound, 0.5, fail=True)
在这个例子中,cpu_bound 函数将引发一个 ZeroDivisionError 异常。done 回调函数将捕获这个异常,并将其输出到 ipywidgets.Output 对象中。
注意事项
总结
本文介绍了一种在 Jupyter Notebook 中实现并行任务队列的简单方法。通过使用 concurrent.futures.ThreadPoolExecutor 和 ipywidgets.Output,你可以在后台运行耗时的函数,而不会阻塞 Notebook 的交互。这种方法对于交互式数据分析和原型设计非常有用。
以上就是使用 Jupyter Notebook 实现并行任务队列的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号