
本文旨在帮助开发者解决在使用 Jupyter Notebook API 通过 WebSocket 连接执行代码时遇到的 "socket is already closed" 错误。我们将分析错误原因,并提供通过重新连接 WebSocket 并确保消息格式正确来解决此问题的方案,确保代码能够顺利执行并接收到服务器的响应。
在使用 Jupyter Notebook API 通过 WebSocket 连接执行代码时,遇到 "socket is already closed" 错误通常表明 WebSocket 连接在接收到服务器响应之前意外关闭。这可能是由于多种原因造成的,例如网络问题、服务器端错误或客户端代码中的逻辑错误。以下提供一种解决方案,通过重新建立连接以及确保消息格式正确来解决此问题。
以下步骤描述了如何重新连接 WebSocket,并确保发送到 Jupyter Notebook 服务器的消息格式正确,从而解决 "socket is already closed" 错误。
错误分析:
从原始问题描述可以看出,错误发生在 ws.recv() 尝试接收服务器响应时。这暗示着 WebSocket 连接可能在发送执行请求后,但在收到响应前关闭了。Jupyter Notebook 服务器的日志也显示了一些警告信息,例如 "No session ID specified" 和 "No channel specified",这表明客户端发送的请求可能缺少必要的参数。
重新连接 WebSocket:
在循环接收消息之前,如果检测到连接关闭,应该重新建立 WebSocket 连接。这可以通过将 create_connection 放在一个 try...except 块中,并在捕获到 WebSocketConnectionClosedException 异常时重新调用它来实现。
import json
from websocket import create_connection, WebSocketConnectionClosedException
import time
def execute_code(kernel_id, session_id, code, headers):
ws_url = f"ws://127.0.0.1:8888/api/kernels/{kernel_id}/channels?session_id={session_id}"
ws = create_connection(ws_url, header=headers)
ws.send(json.dumps(send_execute_request(code)))
try:
while True:
rsp = json.loads(ws.recv())
msg_type = rsp["msg_type"]
# 处理不同类型的消息,例如 'execute_result', 'stream', 'error' 等
if msg_type == 'execute_result':
# 处理执行结果
print("Execute Result:", rsp["content"]["data"])
break # 结束循环,因为我们已经得到了执行结果
elif msg_type == 'stream':
# 处理输出流(stdout/stderr)
print("Stream Output:", rsp["content"]["text"])
elif msg_type == 'error':
# 处理错误信息
print("Error:", rsp["content"]["ename"], rsp["content"]["evalue"])
break # 结束循环,因为发生了错误
except WebSocketConnectionClosedException as e:
print(f"WebSocket connection closed: {e}")
# 在这里可以选择重新连接,或者抛出异常,取决于你的应用逻辑
# 例如:
# ws = create_connection(ws_url, header=headers) # 尝试重新连接
raise # 抛出异常,向上层处理
finally:
ws.close()修正消息格式:
Jupyter Notebook 服务器的日志表明,请求可能缺少 session ID 和 channel 信息。确保在创建 WebSocket 连接时,URL 中包含了正确的 session_id 参数。此外,检查发送到 WebSocket 的消息格式是否符合 Jupyter Notebook API 的要求。 特别注意时间戳格式,需要包含时区信息。
import datetime
import uuid
def send_execute_request(code):
msg_id = str(uuid.uuid1())
session_id = str(uuid.uuid1()) # You can generate a new session ID for each request
now = datetime.datetime.now(datetime.timezone.utc).isoformat() # Include timezone information
msg = {
"header": {
"msg_id": msg_id,
"username": "test",
"session": session_id,
"data": now,
"msg_type": "execute_request",
"version": "5.0"
},
"parent_header": {
"msg_id": msg_id,
"username": "test",
"session": session_id,
"data": now,
"msg_type": "execute_request",
"version": "5.0"
},
"metadata": {},
"content": {
"code": code,
"silent": False,
"store_history": True,
"user_expressions": {},
"allow_stdin": False
},
"buffers": [],
"channel": "shell" # Explicitly specify the channel
}
return msg处理服务器响应:
修改后的代码示例中,execute_code 函数现在会处理不同类型的服务器响应(execute_result,stream,error)。请根据你的应用需求修改这些处理逻辑。
完整示例
import requests
import json
from websocket import create_connection, WebSocketConnectionClosedException
import datetime
import uuid
base = "http://127.0.0.1:8888" # 替换为你的 Jupyter Notebook 地址
headers = {"Authorization": "Token your_token"} # 替换为你的 token
def create_session(file_name):
url = base + '/api/sessions'
params = '{"path":"%s","type":"notebook","name":"","kernel":{"id":null,"name":"env37"}}' % file_name
response = requests.post(url, headers=headers, data=params)
session = json.loads(response.text)
return session
def get_notebook_content(notebook_path):
url = base + '/api/contents' + notebook_path
response = requests.get(url, headers=headers)
file = json.loads(response.text)
code = [c['source'] for c in file['content']['cells'] if len(c['source']) > 0]
return code
def send_execute_request(code):
msg_id = str(uuid.uuid1())
session_id = str(uuid.uuid1()) # You can generate a new session ID for each request
now = datetime.datetime.now(datetime.timezone.utc).isoformat() # Include timezone information
msg = {
"header": {
"msg_id": msg_id,
"username": "test",
"session": session_id,
"data": now,
"msg_type": "execute_request",
"version": "5.0"
},
"parent_header": {
"msg_id": msg_id,
"username": "test",
"session": session_id,
"data": now,
"msg_type": "execute_request",
"version": "5.0"
},
"metadata": {},
"content": {
"code": code,
"silent": False,
"store_history": True,
"user_expressions": {},
"allow_stdin": False
},
"buffers": [],
"channel": "shell" # Explicitly specify the channel
}
return msg
def execute_code(kernel_id, session_id, code, headers):
ws_url = f"ws://127.0.0.1:8888/api/kernels/{kernel_id}/channels?session_id={session_id}"
ws = create_connection(ws_url, header=headers)
ws.send(json.dumps(send_execute_request(code)))
try:
while True:
rsp = json.loads(ws.recv())
msg_type = rsp["msg_type"]
# 处理不同类型的消息,例如 'execute_result', 'stream', 'error' 等
if msg_type == 'execute_result':
# 处理执行结果
print("Execute Result:", rsp["content"]["data"])
break # 结束循环,因为我们已经得到了执行结果
elif msg_type == 'stream':
# 处理输出流(stdout/stderr)
print("Stream Output:", rsp["content"]["text"])
elif msg_type == 'error':
# 处理错误信息
print("Error:", rsp["content"]["ename"], rsp["content"]["evalue"])
break # 结束循环,因为发生了错误
except WebSocketConnectionClosedException as e:
print(f"WebSocket connection closed: {e}")
# 在这里可以选择重新连接,或者抛出异常,取决于你的应用逻辑
# 例如:
# ws = create_connection(ws_url, header=headers) # 尝试重新连接
raise # 抛出异常,向上层处理
finally:
ws.close()
# Example usage:
file_name = "example2.ipynb" # 替换为你的 notebook 文件名
notebook_path = "/" + file_name
session = create_session(file_name)
kernel = session["kernel"]
kernel_id = kernel["id"]
session_id = session["id"]
code = get_notebook_content(notebook_path)
for c in code:
try:
execute_code(kernel_id, session_id, c, headers)
except WebSocketConnectionClosedException:
print(f"Failed to execute code: {c}")
# Handle reconnection or error as needed通过重新连接 WebSocket 并确保消息格式正确,可以有效地解决 Jupyter Notebook API 中的 "socket is already closed" 错误。 此外,需要完善错误处理机制,以便在出现问题时能够及时发现并解决。
以上就是解决 Jupyter Notebook WebSocket 连接关闭错误的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号