getMessage() . " in " . $e->getFile() . " on line " . $e->getLine()); }); // 创建 WebSocket 服务器 $server = new Server("0.0.0.0", 9501); // 初始化数据库连接 $db = createDbConnection(); // 创建连接表 $table = new Table(1024); $table->column('fd', Table::TYPE_INT); $table->column('uid', Table::TYPE_INT); $table->column('type', Table::TYPE_INT); // 1-用户 2-技师 $table->create(); $server->table = $table; $server->db = $db; // 将数据库连接附加到服务器对象 // 连接处理 $server->on('open', function (Server $server, $request) { $params = getQueryParams($request->server['query_string']); // 获取用户类型 $userType = $params['type'] ?? 0; // 用户验证逻辑 if (!$user = verifyToken($server->db, $params['token'] ?? '', $userType)) { $server->close($request->fd); return; } // 确保类型参数存在 if (!in_array($userType, [1, 2])) { error_log("无效的用户类型: {$userType}"); $server->close($request->fd); return; } // 存储连接信息 $info = [ 'fd' => $request->fd, 'uid' => $user['id'], 'type' => $userType ]; $server->table->set($request->fd, $info); echo "客户端 {$request->fd} 已连接 (UID: {$user['id']}, 类型: {$userType})\n"; }); // 消息处理 $server->on('message', function (Server $server, $frame) { $data = json_decode($frame->data, true); // 获取发送者信息 $senderInfo = $server->table->get($frame->fd); if (!$senderInfo) { error_log("无法获取发送者信息: FD={$frame->fd}"); return; } switch ($data['action']) { case 'send': // 确保所有必要字段都存在 if (!isset($data['conversation_id'], $data['content'])) { error_log("消息缺少必要字段: " . json_encode($data)); return; } // 根据会话ID获取会话信息 $conversation = getConversationById($server->db, $data['conversation_id']); if (!$conversation) { error_log("会话不存在: conversation_id={$data['conversation_id']}"); return; } // 确定接收者信息 if ($senderInfo['type'] == 1) { // 发送者是用户 $receiverId = $conversation['tech_id']; $receiverType = 2; // 技师 } else { // 发送者是技师 $receiverId = $conversation['user_id']; $receiverType = 1; // 用户 } // 1. 存储到数据库 $messageData = [ 'sender_id' => $senderInfo['uid'], 'sender_type' => $senderInfo['type'], 'receiver_id' => $receiverId, 'receiver_type' => $receiverType, 'content' => $data['content'], 'order_id' => $data['order_id'] ?? 0, 'conversation_id' => $data['conversation_id'] ]; $messageId = saveMessage($server->db, $messageData); // 2. 构建消息体 $message = [ 'action' => 'new', 'data' => [ 'id' => $messageId, 'content' => $data['content'], 'sender_type' => $senderInfo['type'], 'create_time' => date('Y-m-d H:i:s'), 'conversation_id' => $data['conversation_id'], ] ]; // 3. 找到接收者发送消息 sendToUser($server, $receiverId, $receiverType, $message); // 4. 也发给自己(保证消息同步) $server->push($frame->fd, json_encode($message)); break; case 'read': // 确保所有必要字段都存在 if (!isset($data['conversation_id'], $data['user_id'])) { error_log("标记已读缺少必要字段: " . json_encode($data)); return; } // 标记消息为已读 markMessagesAsRead($server->db, $data['conversation_id'], $data['user_id']); break; } }); // 关闭连接 $server->on('close', function ($server, $fd) { $server->table->del($fd); echo "客户端 {$fd} 已断开连接\n"; }); /** * 解析查询字符串 */ function getQueryParams($queryString) { $params = []; parse_str($queryString, $params); // 过滤特殊字符 foreach ($params as $key => $value) { $params[$key] = htmlspecialchars($value, ENT_QUOTES, 'UTF-8'); } return $params; } /** * 创建数据库连接 */ function createDbConnection() { $host = '127.0.0.1'; $dbname = 'anmo'; $username = 'anmo'; $password = 'fmyrGXBYijbmSMbi'; try { $db = new PDO("mysql:host=$host;dbname=$dbname;charset=utf8mb4", $username, $password); $db->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); $db->setAttribute(PDO::ATTR_DEFAULT_FETCH_MODE, PDO::FETCH_ASSOC); $db->setAttribute(PDO::ATTR_EMULATE_PREPARES, false); return $db; } catch (PDOException $e) { die("数据库连接失败: " . $e->getMessage()); } } /** * 验证 token */ function verifyToken(PDO $db, $token, $userType) { try { if (empty($token)) { throw new Exception("Token不能为空"); } // 根据用户类型选择不同的验证逻辑 switch ($userType) { case 1: // 普通用户 $stmt = $db->prepare("SELECT * FROM ls_user_session WHERE token = :token"); $stmt->bindParam(':token', $token); $stmt->execute(); $session = $stmt->fetch(); if (!$session) { error_log("用户会话不存在: token={$token}"); return false; } // 检查会话是否过期 if ($session['expire_time'] < time()) { error_log("用户会话已过期: token={$token}"); return false; } $stmt = $db->prepare("SELECT * FROM ls_user WHERE id = :user_id"); $stmt->bindParam(':user_id', $session['user_id']); $stmt->execute(); return $stmt->fetch(); case 2: // 技师 $stmt = $db->prepare("SELECT * FROM ls_coach_user_session WHERE token = :token"); $stmt->bindParam(':token', $token); $stmt->execute(); $session = $stmt->fetch(); if (!$session) { error_log("技师会话不存在: token={$token}"); return false; } // 检查会话是否过期 if ($session['expire_time'] < time()) { error_log("技师会话已过期: token={$token}"); return false; } $stmt = $db->prepare(" SELECT c.* FROM ls_coach c WHERE c.coach_user_id = :coach_user_id "); $stmt->bindParam(':coach_user_id', $session['coach_user_id']); $stmt->execute(); return $stmt->fetch(); default: throw new Exception("无效的用户类型: {$userType}"); } } catch (Exception $e) { error_log("Token验证错误: " . $e->getMessage()); return false; } } /** * 根据ID获取会话信息 */ function getConversationById(PDO $db, $conversationId) { try { $stmt = $db->prepare(" SELECT * FROM ls_chat_conversation WHERE id = :conversation_id "); $stmt->bindParam(':conversation_id', $conversationId); $stmt->execute(); $conversation = $stmt->fetch(); if (!$conversation) { error_log("会话不存在: conversation_id={$conversationId}"); return false; } return $conversation; } catch (Exception $e) { error_log("获取会话信息失败: " . $e->getMessage()); return false; } } /** * 保存消息到数据库 */ function saveMessage(PDO $db, $data) { try { // 确保所有必要字段都存在 if (!isset($data['sender_id'], $data['sender_type'], $data['receiver_id'], $data['receiver_type'], $data['conversation_id'])) { throw new Exception("缺少必要的消息字段"); } $stmt = $db->prepare(" INSERT INTO ls_chat_message (conversation_id, sender_id, sender_type, receiver_id, receiver_type, content, message_type, read_status, order_id) VALUES (:conversation_id, :sender_id, :sender_type, :receiver_id, :receiver_type, :content, :message_type, :read_status, :order_id) "); $messageType = $data['message_type'] ?? 1; // 默认为文本消息 $readStatus = 0; // 默认为未读 $orderId = $data['order_id'] ?? 0; $stmt->bindParam(':conversation_id', $data['conversation_id']); $stmt->bindParam(':sender_id', $data['sender_id']); $stmt->bindParam(':sender_type', $data['sender_type']); $stmt->bindParam(':receiver_id', $data['receiver_id']); $stmt->bindParam(':receiver_type', $data['receiver_type']); $stmt->bindParam(':content', $data['content']); $stmt->bindParam(':message_type', $messageType); $stmt->bindParam(':read_status', $readStatus); $stmt->bindParam(':order_id', $orderId); $stmt->execute(); return $db->lastInsertId(); } catch (Exception $e) { error_log("保存消息失败: " . $e->getMessage()); return 0; } } /** * 标记消息为已读 */ function markMessagesAsRead(PDO $db, $conversationId, $userId) { try { // 确保参数有效 if (empty($conversationId) || empty($userId)) { throw new Exception("无效的会话ID或用户ID"); } // 开始事务 $db->beginTransaction(); // 1. 标记消息为已读 $stmt = $db->prepare(" UPDATE ls_chat_message SET read_status = 1 WHERE conversation_id = :conversation_id AND receiver_id = :receiver_id "); $stmt->bindParam(':conversation_id', $conversationId); $stmt->bindParam(':receiver_id', $userId); $stmt->execute(); // 2. 重置会话未读数 $stmt = $db->prepare(" UPDATE ls_chat_conversation SET unread_count = 0 WHERE id = :conversation_id "); $stmt->bindParam(':conversation_id', $conversationId); $stmt->execute(); // 提交事务 $db->commit(); return true; } catch (Exception $e) { // 回滚事务 $db->rollBack(); error_log("标记消息已读失败: " . $e->getMessage()); return false; } } /** * 向用户发送消息 */ function sendToUser($server, $uid, $type, $message) { foreach ($server->table as $row) { if ($row['uid'] == $uid && $row['type'] == $type) { try { $server->push($row['fd'], json_encode($message)); } catch (Exception $e) { error_log("发送消息失败: " . $e->getMessage()); } } } } // 启动服务器 echo "启动 WebSocket 服务器在 ws://0.0.0.0:9501\n"; $server->start();