
当我们在python中处理websocket连接并期望通过回调函数(如on_ticks)接收异步数据时,一个常见的误区是忽略了程序主线程的生命周期。websocket库通常会在后台启动一个线程或利用一个事件循环来监听和处理传入的数据。这些后台操作需要主程序保持运行状态,以便事件循环能够持续调度和执行回调函数。
在提供的代码示例中,breeze.ws_connect() 建立WebSocket连接,breeze.on_ticks = on_ticks 设置回调函数,然后 breeze.subscribe_feeds() 发送订阅请求。然而,在这些操作之后,程序立即执行了 breeze.ws_disconnect() 并到达了 handle 方法的末尾。如果 ws_connect() 是非阻塞的(即它不会暂停主线程的执行),那么主线程在完成所有代码后会立即退出。当主线程退出时,整个Python进程也会终止,这包括了后台可能正在运行的WebSocket监听线程或事件循环,导致任何后续的on_ticks回调都无法被触发。
本地环境之所以可能正常工作,有时是因为运行环境(例如交互式Python shell)会隐式地保持主线程活跃,或者在某些平台/库版本下,ws_connect() 的行为有所不同,使得后台进程有足够的时间接收并处理数据。但在虚拟环境或作为Django管理命令运行时,程序的执行流程通常更为严格,主线程一旦完成其任务便会立即退出。
为了解决这个问题,我们需要确保主线程在WebSocket连接期间保持活跃,从而允许后台事件循环持续运行并触发on_ticks回调。
最简单直接的方法是使用 input() 函数来阻塞主线程,直到用户手动输入并按下回车键。这在开发或测试场景中非常方便。
立即学习“Python免费学习笔记(深入)”;
import time
from breezeconnect import BreezeConnect
from django.core.management.base import BaseCommand
from typing import Any
class Command(BaseCommand):
help = "Connects to Breeze WebSocket and subscribes to ticks."
def handle(self, *args: Any, **options: Any):
api_key = "YOUR_API_KEY" # 替换为你的API Key
api_secret = "YOUR_API_SECRET" # 替换为你的API Secret
session_token = "YOUR_SESSION_TOKEN" # 替换为你的Session Token
print("Connecting to Breeze...")
breeze = BreezeConnect(api_key=api_key)
breeze.generate_session(api_secret=api_secret, session_token=session_token)
breeze.ws_connect()
print("WebSocket connected successfully")
def on_ticks(ticks):
"""
处理接收到的行情数据回调。
"""
print("Ticks: {}".format(ticks))
breeze.on_ticks = on_ticks
breeze.subscribe_feeds(
exchange_code="NFO",
stock_code="ADAENT",
product_type="options",
expiry_date="28-Dec-2023",
strike_price="3000",
right="Call",
get_exchange_quotes=True,
get_market_depth=False
)
print("Subscribed to ADAENT options")
# 关键:阻塞主线程,等待回调发生
try:
input('Press Enter to disconnect and exit...')
except KeyboardInterrupt:
print("KeyboardInterrupt detected, disconnecting...")
finally:
breeze.ws_disconnect()
print("Disconnected from WebSocket")
说明: 在 input() 语句处,程序会暂停执行,直到用户按下回车键。在此期间,WebSocket连接保持活跃,on_ticks回调函数可以正常接收并打印数据。当用户输入后,程序才会继续执行 ws_disconnect()。
在生产环境中,我们通常不希望程序等待用户输入。更常见的方法是使用一个无限循环,结合 time.sleep() 来定期检查或简单地保持主线程活跃,并提供一种优雅的退出机制(例如通过 KeyboardInterrupt)。
import time
import signal # 用于处理信号,如Ctrl+C
import sys
from breezeconnect import BreezeConnect
from django.core.management.base import BaseCommand
from typing import Any
# 定义一个标志,用于控制循环退出
running = True
def signal_handler(sig, frame):
"""
处理Ctrl+C信号,设置退出标志。
"""
global running
print("\nCtrl+C detected, initiating graceful shutdown...")
running = False
class Command(BaseCommand):
help = "Connects to Breeze WebSocket and subscribes to ticks."
def handle(self, *args: Any, **options: Any):
global running
running = True # 重置运行标志,以防多次调用handle
# 注册信号处理器
signal.signal(signal.SIGINT, signal_handler)
api_key = "YOUR_API_KEY" # 替换为你的API Key
api_secret = "YOUR_API_SECRET" # 替换为你的API Secret
session_token = "YOUR_SESSION_TOKEN" # 替换为你的Session Token
print("Connecting to Breeze...")
breeze = BreezeConnect(api_key=api_key)
breeze.generate_session(api_secret=api_secret, session_token=session_token)
breeze.ws_connect()
print("WebSocket connected successfully")
def on_ticks(ticks):
"""
处理接收到的行情数据回调。
"""
print("Ticks: {}".format(ticks))
breeze.on_ticks = on_ticks
breeze.subscribe_feeds(
exchange_code="NFO",
stock_code="ADAENT",
product_type="options",
expiry_date="28-Dec-2023",
strike_price="3000",
right="Call",
get_exchange_quotes=True,
get_market_depth=False
)
print("Subscribed to ADAENT options")
# 关键:通过循环保持主线程活跃
try:
while running:
time.sleep(1) # 每秒检查一次退出标志,并保持主线程活跃
except Exception as e:
print(f"An error occurred: {e}")
finally:
print("Attempting to disconnect from WebSocket...")
breeze.ws_disconnect()
print("Disconnected from WebSocket")
sys.exit(0) # 确保程序优雅退出
说明:
某些高级WebSocket客户端库会提供自己的事件循环管理方法,例如 run_forever() 或 loop.run_until_complete()(在使用 asyncio 时)。如果 BreezeConnect 库提供了类似阻塞主线程直到连接关闭或特定事件发生的方法,那将是最佳实践。然而,根据当前的问题描述和解决方案,BreezeConnect 的 ws_connect() 似乎是非阻塞的,因此上述两种手动阻塞主线程的方法更为适用。
WebSocket回调函数在Python虚拟环境中不执行,通常是由于主线程在建立连接和订阅后立即退出,导致后台的事件循环或监听线程被终止。解决此问题的关键在于通过 input()、while True 循环结合 time.sleep(),或利用库提供的阻塞方法来确保主线程持续活跃,从而允许异步回调机制正常工作。在生产环境中,采用循环等待并结合信号处理实现优雅退出是更健壮的选择。理解主线程的生命周期对于开发任何涉及异步操作的Python应用都至关重要。
以上就是解决Python虚拟环境中On-Tick回调不触发的根源:主线程管理策略的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号