更新 http_handler.py

This commit is contained in:
2025-04-16 22:53:36 +08:00
parent 01da0dc601
commit a362333ad6

View File

@@ -15,7 +15,7 @@ import json
import time
import traceback
import threading
from http.server import BaseHTTPRequestHandler, HTTPServer
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from port_config import SERVER_HOST, SERVER_PORT
from log_config import get_logger, initialize_logging
@@ -26,7 +26,8 @@ logger = get_logger("HTTPHandler")
# Global variables
_server_instance = None
_server_running = False
_clients = []
_clients = [] # List of (handler, client_id) tuples
_clients_lock = threading.RLock() # Lock for thread-safe client list operations
_event_callbacks = {}
class MCPHTTPHandler(BaseHTTPRequestHandler):
@@ -44,28 +45,42 @@ class MCPHTTPHandler(BaseHTTPRequestHandler):
# Support both root path and /events path for SSE
if ('text/event-stream' in accept_header and
(self.path == '/' or self.path == '/events')):
logger.info(f"Handling SSE request from {self.client_address[0]}:{self.client_address[1]}")
logger.info(f"Received SSE request from {self.client_address[0]}:{self.client_address[1]}")
self._handle_sse_request()
else:
logger.info(f"Handling regular request for path: {self.path}")
logger.info(f"Received regular request for path: {self.path}")
self._handle_regular_request()
except Exception as e:
logger.error(f"Error handling GET request: {e}")
logger.error(f"Error occurred while handling GET request: {e}")
logger.error(traceback.format_exc())
self._send_error(500, str(e))
def do_POST(self):
"""Handle POST requests"""
try:
logger.debug(f"Received POST request for path: {self.path}")
logger.debug(f"Headers: {self.headers}")
# Handle POST request
self._handle_regular_request()
except Exception as e:
logger.error(f"Error occurred while handling POST request: {e}")
logger.error(traceback.format_exc())
self._send_error(500, str(e))
def _handle_sse_request(self):
"""Handle Server-Sent Events (SSE) request"""
client_id = f"client-{int(time.time())}"
try:
# Add client to list
client_id = f"client-{int(time.time())}"
_clients.append((self, client_id))
with _clients_lock:
_clients.append((self, client_id))
logger.info(f"New SSE client connected: {client_id}")
# Send SSE headers
self.send_response(200)
self.send_header('Content-Type', 'text/event-stream')
self.send_header('Cache-Control', 'no-cache')
self.send_header('Cache-Control', 'no-cache, no-transform')
self.send_header('Connection', 'keep-alive')
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, OPTIONS')
@@ -73,12 +88,25 @@ class MCPHTTPHandler(BaseHTTPRequestHandler):
self.send_header('X-Accel-Buffering', 'no') # Disable Nginx buffering
self.end_headers()
# Send initial comment to keep connection alive
self.wfile.write(": ping\n\n".encode('utf-8'))
self.wfile.flush()
logger.debug(f"Sent initial ping to client {client_id}")
# Force immediate flush of headers
if hasattr(self.wfile, 'flush'):
self.wfile.flush()
# Send connection event
# Send initial comment to keep connection alive
logger.debug(f"Sending initial ping to client {client_id}")
try:
# Send multiple initial comments to ensure connection is established
for _ in range(3):
self.wfile.write(": ping\n\n".encode('utf-8'))
if hasattr(self.wfile, 'flush'):
self.wfile.flush()
time.sleep(0.1) # Brief delay to ensure data is sent
logger.debug(f"Sent initial pings to client {client_id}")
except Exception as e:
logger.error(f"Error sending initial ping: {e}")
raise
# Send connection event immediately
connection_data = {
"status": "connected",
"client_id": client_id,
@@ -89,46 +117,142 @@ class MCPHTTPHandler(BaseHTTPRequestHandler):
"protocol": "SSE"
}
# Send connection event
self._send_sse_event("connection", connection_data)
logger.info(f"Sent connection event to client {client_id}")
# Send ready event
# Send ready event immediately after
ready_data = {
"status": "ready",
"client_id": client_id,
"timestamp": int(time.time() * 1000)
}
self._send_sse_event("ready", ready_data)
logger.info(f"Sent ready event to client {client_id}")
# Keep connection alive with heartbeats
heartbeat_count = 0
# Send initial scene info
try:
# import server module dynamically
import importlib
import server
importlib.reload(server)
scene_info = server.get_scene_info()
self._send_sse_event("scene_info", scene_info)
logger.info(f"Sent initial scene info to client {client_id}")
except Exception as e:
logger.warning(f"Could not send initial scene info: {e}")
logger.debug(traceback.format_exc())
# Start a thread to keep the connection alive
thread = threading.Thread(target=self._sse_connection_thread, args=(client_id,))
thread.daemon = True
thread.start()
logger.info(f"SSE connection thread started for client {client_id}")
# Keep connection open until client disconnects
# This will block until client disconnects
while True:
try:
# Send heartbeat every 5 seconds
time.sleep(5)
heartbeat_data = {
"timestamp": int(time.time() * 1000),
"client_id": client_id
}
self._send_sse_event("heartbeat", heartbeat_data)
heartbeat_count += 1
if heartbeat_count % 12 == 0: # Log every minute (12 * 5 seconds)
logger.debug(f"Sent {heartbeat_count} heartbeats to client {client_id}")
except Exception as e:
logger.error(f"Error in SSE loop: {e}")
logger.debug(traceback.format_exc())
break
time.sleep(1)
except (BrokenPipeError, ConnectionResetError) as e:
# Client disconnected
logger.info(f"Client {client_id} disconnected: {e}")
except Exception as e:
logger.error(f"Error handling SSE request: {e}")
logger.error(traceback.format_exc())
finally:
# Remove client from list
if (self, client_id) in _clients:
_clients.remove((self, client_id))
logger.info(f"SSE client disconnected: {client_id}")
with _clients_lock:
for i, (client, cid) in enumerate(_clients):
if cid == client_id:
_clients.pop(i)
break
logger.info(f"Client {client_id} removed from clients list")
def _sse_connection_thread(self, client_id):
"""SSE connection thread"""
try:
# Keep connection alive with periodic pings
last_ping_time = time.time()
last_heartbeat_time = time.time()
connection_active = True
while connection_active:
try:
# get current time
current_time = time.time()
# send ping every 5 seconds
if current_time - last_ping_time > 5:
# check if socket is still valid
if hasattr(self.wfile, '_sock') and self.wfile._sock is not None:
try:
# send ping comment
self.wfile.write(": ping\n\n".encode('utf-8'))
if hasattr(self.wfile, 'flush'):
self.wfile.flush()
last_ping_time = current_time
logger.debug(f"Sent ping to client {client_id}")
except (BrokenPipeError, ConnectionResetError) as e:
logger.info(f"Client {client_id} disconnected during ping: {e}")
connection_active = False
break
except OSError as e:
if e.errno == 10038: # Socket operation on non-socket
logger.info(f"Client {client_id} socket closed")
connection_active = False
break
else:
logger.error(f"OSError in SSE thread: {e}")
connection_active = False
break
# send heartbeat event every 30 seconds
if current_time - last_heartbeat_time > 30:
# send heartbeat event
heartbeat_data = {
"status": "alive",
"timestamp": int(current_time * 1000)
}
success = self._send_sse_event("heartbeat", heartbeat_data)
if success:
last_heartbeat_time = current_time
logger.debug(f"Sent heartbeat event to client {client_id}")
else:
logger.warning(f"Failed to send heartbeat to client {client_id}")
connection_active = False
break
# Sleep briefly to avoid high CPU usage
time.sleep(0.5)
except (BrokenPipeError, ConnectionResetError) as e:
logger.info(f"Client {client_id} disconnected: {e}")
connection_active = False
break
except OSError as e:
if e.errno == 10038: # Socket operation on non-socket
logger.info(f"Client {client_id} socket closed")
connection_active = False
break
else:
logger.error(f"OSError in SSE thread: {e}")
connection_active = False
break
except Exception as e:
logger.error(f"Unexpected error in SSE thread: {e}")
logger.error(traceback.format_exc())
connection_active = False
break
except Exception as e:
logger.error(f"Error in SSE thread: {e}")
logger.error(traceback.format_exc())
finally:
# Always remove client from list when connection is closed
with _clients_lock:
for i, (client, cid) in enumerate(_clients):
if cid == client_id:
_clients.pop(i)
break
logger.info(f"SSE client disconnected: {client_id}")
def _send_sse_event(self, event_type, data):
"""Send SSE event"""
@@ -142,13 +266,21 @@ class MCPHTTPHandler(BaseHTTPRequestHandler):
# Send event
self.wfile.write(event.encode('utf-8'))
self.wfile.flush()
if hasattr(self.wfile, 'flush'):
self.wfile.flush()
logger.debug(f"Sent SSE event: {event_type}")
except ConnectionResetError as e:
logger.error(f"Error sending SSE event: {e}")
# Don't raise the exception, just log it
# This prevents the SSE loop from crashing when a client disconnects
return False
except Exception as e:
logger.error(f"Error sending SSE event: {e}")
logger.error(traceback.format_exc())
raise
return False
return True
def _handle_regular_request(self):
"""Handle regular HTTP request"""
@@ -167,6 +299,59 @@ class MCPHTTPHandler(BaseHTTPRequestHandler):
"port": SERVER_PORT
}
logger.debug(f"Sending server info: {response_data}")
elif self.path.startswith("/command"):
# Execute Maya command
try:
# Check if this is a POST request
if self.command == "POST":
# Get command from request body
content_length = int(self.headers.get('Content-Length', 0))
if content_length > 0:
body = self.rfile.read(content_length).decode('utf-8')
try:
command_data = json.loads(body)
command = command_data.get('command', '')
if command:
logger.info(f"Executing command: {command}")
# Import Maya commands
import maya.cmds as cmds
# Execute command
try:
result = eval(f"cmds.{command}")
response_data = {
"status": "success",
"result": result
}
logger.info(f"Command executed successfully: {command}")
except Exception as e:
logger.error(f"Error executing command: {e}")
logger.error(traceback.format_exc())
status_code = 500
response_data = {
"status": "error",
"error": "Command execution failed",
"details": str(e)
}
else:
status_code = 400
response_data = {"error": "No command specified"}
except json.JSONDecodeError:
status_code = 400
response_data = {"error": "Invalid JSON in request body"}
else:
status_code = 400
response_data = {"error": "Empty request body"}
else:
status_code = 405
response_data = {"error": "Method not allowed, use POST for commands"}
except Exception as e:
logger.error(f"Error handling command request: {e}")
logger.error(traceback.format_exc())
status_code = 500
response_data = {"error": "Internal server error", "details": str(e)}
elif self.path == "/scene":
# Get scene info
try:
@@ -275,9 +460,9 @@ class MCPHTTPServer:
def start(self):
"""Start server"""
try:
# Create server
# Create server - use ThreadingHTTPServer instead of HTTPServer
print(f"Attempting to start HTTP server on {SERVER_HOST}:{self.port}")
self.server = HTTPServer((SERVER_HOST, self.port), MCPHTTPHandler)
self.server = ThreadingHTTPServer((SERVER_HOST, self.port), MCPHTTPHandler)
# Get actual port used
_, self.port = self.server.server_address
@@ -390,10 +575,11 @@ def is_http_server_running():
def broadcast_event(event_type, data):
"""Broadcast event to all connected clients"""
try:
for handler, client_id in _clients:
try:
handler._send_sse_event(event_type, data)
except Exception as e:
logger.error(f"Error broadcasting event to client {client_id}: {e}")
with _clients_lock:
for handler, client_id in _clients:
try:
handler._send_sse_event(event_type, data)
except Exception as e:
logger.error(f"Error broadcasting event to client {client_id}: {e}")
except Exception as e:
logger.error(f"Error broadcasting event: {e}")