使用 Jupyter Notebook 实现并行任务队列

霞舞
发布: 2025-08-17 22:24:16
原创
549人浏览过

使用 jupyter notebook 实现并行任务队列

使用 Jupyter Notebook 实现并行任务队列

本文将介绍如何在 Jupyter Notebook 中实现一个简单的并行任务队列,它允许你在后台运行耗时的函数,而不会阻塞 Notebook 的交互。这个实现方案的核心是使用 concurrent.futures.ThreadPoolExecutor 来管理线程池,以及使用 ipywidgets.Output 来捕获和显示子线程的输出。

import sys
import asyncio
import concurrent.futures

import ipywidgets

threadpool = concurrent.futures.ThreadPoolExecutor(4)

def run(fn, *args, **kwds):
    "run fn in threadpool"
    out = ipywidgets.Output()

    def print(*args, file=sys.stdout):
        line = ' '.join(map(str, args)) + '\n'
        if file is sys.stderr:
            out.append_stderr(line)
        else:
            out.append_stdout(line)

    def done(fut: asyncio.Future):
        try:
            result = fut.result()
        except asyncio.CancelledError:
            print("cancelled", fut, file=sys.stderr)
        except Exception:
            print("failed", fut, file=sys.stderr)
        else:
            print("completed", fut)

    async def go():
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(
            threadpool,
            lambda: fn(print, *args, **kwds),
        )

    task = asyncio.create_task(go())
    task.add_done_callback(done)

    return out
登录后复制

上述代码定义了一个 run 函数,它接受一个函数 fn 和任意数量的参数 *args 和关键字参数 **kwds。run 函数的主要功能是:

  1. 创建一个 ipywidgets.Output 对象 out: 这个对象用于捕获和显示在线程中执行的函数的输出。
  2. 定义一个自定义的 print 函数: 这个函数将输出重定向到 out 对象。ipywidgets.Output 似乎依赖于 asyncio 的一些机制,才能正确地将 "real" print 输出到正确的位置。
  3. 定义一个 done 回调函数: 这个函数在线程中的函数执行完成后被调用,用于处理结果或异常。
  4. 定义一个异步函数 go: 这个函数使用 asyncio.loop.run_in_executor 在线程池中执行传入的函数 fn。
  5. 创建一个 asyncio 任务 task: 这个任务执行异步函数 go。
  6. 添加 done 回调函数到 task: 当任务完成时,done 函数会被调用。
  7. 返回 out 对象: 这个对象可以在 Notebook 中显示线程的输出。

使用示例

以下是一个使用 run 函数的示例:

import time

def cpu_bound(print, dt, fail=False):
    for i in range(10):
        time.sleep(dt)
        print(i, time.time())
    if fail:
        1 / 0
    return "done"

run(cpu_bound, 0.1)
登录后复制

在这个例子中,cpu_bound 函数模拟一个耗时的 CPU 密集型任务。它接受一个 print 函数,一个时间间隔 dt,以及一个可选的 fail 参数。该函数循环 10 次,每次休眠 dt 秒,并使用传入的 print 函数打印当前迭代次数和时间戳。如果 fail 参数为 True,则会引发一个 ZeroDivisionError 异常。

通过调用 run(cpu_bound, 0.1),cpu_bound 函数将在后台线程中执行,而不会阻塞 Notebook。输出将显示在与 run 函数调用相关的 ipywidgets.Output 对象中。

处理异常

行者AI
行者AI

行者AI绘图创作,唤醒新的灵感,创造更多可能

行者AI 100
查看详情 行者AI

run 函数还能够处理在线程中发生的异常。例如:

run(cpu_bound, 0.5, fail=True)
登录后复制

在这个例子中,cpu_bound 函数将引发一个 ZeroDivisionError 异常。done 回调函数将捕获这个异常,并将其输出到 ipywidgets.Output 对象中。

注意事项

  • concurrent.futures.ThreadPoolExecutor 使用线程,适合于 I/O 密集型任务。对于 CPU 密集型任务,可以考虑使用 concurrent.futures.ProcessPoolExecutor 来利用多核 CPU。
  • 在线程中执行的函数必须是可序列化的。这意味着它们不能依赖于在主线程中定义的局部变量或对象。
  • 使用 ipywidgets.Output 可以方便地捕获和显示线程的输出,但它可能会引入一些性能开销。

总结

本文介绍了一种在 Jupyter Notebook 中实现并行任务队列的简单方法。通过使用 concurrent.futures.ThreadPoolExecutor 和 ipywidgets.Output,你可以在后台运行耗时的函数,而不会阻塞 Notebook 的交互。这种方法对于交互式数据分析和原型设计非常有用。

以上就是使用 Jupyter Notebook 实现并行任务队列的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号