增加技术端与客户端聊天
This commit is contained in:
387
server/app/socket/ChatServer.php
Normal file
387
server/app/socket/ChatServer.php
Normal file
@@ -0,0 +1,387 @@
|
||||
<?php
|
||||
// /app/socket/ChatServer.php
|
||||
use Swoole\WebSocket\Server;
|
||||
use Swoole\Table;
|
||||
|
||||
// 设置错误报告
|
||||
ini_set('display_errors', 1);
|
||||
ini_set('display_startup_errors', 1);
|
||||
error_reporting(E_ALL);
|
||||
|
||||
// 设置自定义错误处理
|
||||
set_error_handler(function($errno, $errstr, $errfile, $errline) {
|
||||
error_log("PHP错误: [{$errno}] {$errstr} in {$errfile} on line {$errline}");
|
||||
return true;
|
||||
});
|
||||
|
||||
// 设置异常处理
|
||||
set_exception_handler(function($e) {
|
||||
error_log("未捕获异常: " . $e->getMessage() . " in " . $e->getFile() . " on line " . $e->getLine());
|
||||
});
|
||||
|
||||
// 创建 WebSocket 服务器
|
||||
$server = new Server("0.0.0.0", 9501);
|
||||
|
||||
// 初始化数据库连接
|
||||
$db = createDbConnection();
|
||||
|
||||
// 创建连接表
|
||||
$table = new Table(1024);
|
||||
$table->column('fd', Table::TYPE_INT);
|
||||
$table->column('uid', Table::TYPE_INT);
|
||||
$table->column('type', Table::TYPE_INT); // 1-用户 2-技师
|
||||
$table->create();
|
||||
|
||||
$server->table = $table;
|
||||
$server->db = $db; // 将数据库连接附加到服务器对象
|
||||
|
||||
// 连接处理
|
||||
$server->on('open', function (Server $server, $request) {
|
||||
$params = getQueryParams($request->server['query_string']);
|
||||
|
||||
// 获取用户类型
|
||||
$userType = $params['type'] ?? 0;
|
||||
|
||||
// 用户验证逻辑
|
||||
if (!$user = verifyToken($server->db, $params['token'] ?? '', $userType)) {
|
||||
$server->close($request->fd);
|
||||
return;
|
||||
}
|
||||
|
||||
// 确保类型参数存在
|
||||
if (!in_array($userType, [1, 2])) {
|
||||
error_log("无效的用户类型: {$userType}");
|
||||
$server->close($request->fd);
|
||||
return;
|
||||
}
|
||||
|
||||
// 存储连接信息
|
||||
$info = [
|
||||
'fd' => $request->fd,
|
||||
'uid' => $user['id'],
|
||||
'type' => $userType
|
||||
];
|
||||
|
||||
$server->table->set($request->fd, $info);
|
||||
|
||||
echo "客户端 {$request->fd} 已连接 (UID: {$user['id']}, 类型: {$userType})\n";
|
||||
});
|
||||
|
||||
// 消息处理
|
||||
$server->on('message', function (Server $server, $frame) {
|
||||
$data = json_decode($frame->data, true);
|
||||
|
||||
// 获取发送者信息
|
||||
$senderInfo = $server->table->get($frame->fd);
|
||||
if (!$senderInfo) {
|
||||
error_log("无法获取发送者信息: FD={$frame->fd}");
|
||||
return;
|
||||
}
|
||||
|
||||
switch ($data['action']) {
|
||||
case 'send':
|
||||
// 确保所有必要字段都存在
|
||||
if (!isset($data['conversation_id'], $data['content'])) {
|
||||
error_log("消息缺少必要字段: " . json_encode($data));
|
||||
return;
|
||||
}
|
||||
|
||||
// 根据会话ID获取会话信息
|
||||
$conversation = getConversationById($server->db, $data['conversation_id']);
|
||||
if (!$conversation) {
|
||||
error_log("会话不存在: conversation_id={$data['conversation_id']}");
|
||||
return;
|
||||
}
|
||||
|
||||
// 确定接收者信息
|
||||
if ($senderInfo['type'] == 1) { // 发送者是用户
|
||||
$receiverId = $conversation['tech_id'];
|
||||
$receiverType = 2; // 技师
|
||||
} else { // 发送者是技师
|
||||
$receiverId = $conversation['user_id'];
|
||||
$receiverType = 1; // 用户
|
||||
}
|
||||
|
||||
// 1. 存储到数据库
|
||||
$messageData = [
|
||||
'sender_id' => $senderInfo['uid'],
|
||||
'sender_type' => $senderInfo['type'],
|
||||
'receiver_id' => $receiverId,
|
||||
'receiver_type' => $receiverType,
|
||||
'content' => $data['content'],
|
||||
'order_id' => $data['order_id'] ?? 0,
|
||||
'conversation_id' => $data['conversation_id']
|
||||
];
|
||||
|
||||
$messageId = saveMessage($server->db, $messageData);
|
||||
|
||||
// 2. 构建消息体
|
||||
$message = [
|
||||
'action' => 'new',
|
||||
'data' => [
|
||||
'id' => $messageId,
|
||||
'content' => $data['content'],
|
||||
'sender_type' => $senderInfo['type'],
|
||||
'create_time' => date('Y-m-d H:i:s'),
|
||||
'conversation_id' => $data['conversation_id'],
|
||||
]
|
||||
];
|
||||
|
||||
// 3. 找到接收者发送消息
|
||||
sendToUser($server, $receiverId, $receiverType, $message);
|
||||
|
||||
// 4. 也发给自己(保证消息同步)
|
||||
$server->push($frame->fd, json_encode($message));
|
||||
break;
|
||||
|
||||
case 'read':
|
||||
// 确保所有必要字段都存在
|
||||
if (!isset($data['conversation_id'], $data['user_id'])) {
|
||||
error_log("标记已读缺少必要字段: " . json_encode($data));
|
||||
return;
|
||||
}
|
||||
|
||||
// 标记消息为已读
|
||||
markMessagesAsRead($server->db, $data['conversation_id'], $data['user_id']);
|
||||
break;
|
||||
}
|
||||
});
|
||||
|
||||
// 关闭连接
|
||||
$server->on('close', function ($server, $fd) {
|
||||
$server->table->del($fd);
|
||||
echo "客户端 {$fd} 已断开连接\n";
|
||||
});
|
||||
|
||||
/**
|
||||
* 解析查询字符串
|
||||
*/
|
||||
function getQueryParams($queryString) {
|
||||
$params = [];
|
||||
parse_str($queryString, $params);
|
||||
|
||||
// 过滤特殊字符
|
||||
foreach ($params as $key => $value) {
|
||||
$params[$key] = htmlspecialchars($value, ENT_QUOTES, 'UTF-8');
|
||||
}
|
||||
|
||||
return $params;
|
||||
}
|
||||
|
||||
/**
|
||||
* 创建数据库连接
|
||||
*/
|
||||
function createDbConnection() {
|
||||
$host = '127.0.0.1';
|
||||
$dbname = 'anmo';
|
||||
$username = 'anmo';
|
||||
$password = 'fmyrGXBYijbmSMbi';
|
||||
|
||||
try {
|
||||
$db = new PDO("mysql:host=$host;dbname=$dbname;charset=utf8mb4", $username, $password);
|
||||
$db->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
|
||||
$db->setAttribute(PDO::ATTR_DEFAULT_FETCH_MODE, PDO::FETCH_ASSOC);
|
||||
$db->setAttribute(PDO::ATTR_EMULATE_PREPARES, false);
|
||||
return $db;
|
||||
} catch (PDOException $e) {
|
||||
die("数据库连接失败: " . $e->getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 验证 token
|
||||
*/
|
||||
function verifyToken(PDO $db, $token, $userType) {
|
||||
try {
|
||||
if (empty($token)) {
|
||||
throw new Exception("Token不能为空");
|
||||
}
|
||||
|
||||
// 根据用户类型选择不同的验证逻辑
|
||||
switch ($userType) {
|
||||
case 1: // 普通用户
|
||||
$stmt = $db->prepare("SELECT * FROM ls_user_session WHERE token = :token");
|
||||
$stmt->bindParam(':token', $token);
|
||||
$stmt->execute();
|
||||
$session = $stmt->fetch();
|
||||
|
||||
if (!$session) {
|
||||
error_log("用户会话不存在: token={$token}");
|
||||
return false;
|
||||
}
|
||||
|
||||
// 检查会话是否过期
|
||||
if ($session['expire_time'] < time()) {
|
||||
error_log("用户会话已过期: token={$token}");
|
||||
return false;
|
||||
}
|
||||
|
||||
$stmt = $db->prepare("SELECT * FROM ls_user WHERE id = :user_id");
|
||||
$stmt->bindParam(':user_id', $session['user_id']);
|
||||
$stmt->execute();
|
||||
return $stmt->fetch();
|
||||
|
||||
case 2: // 技师
|
||||
$stmt = $db->prepare("SELECT * FROM ls_coach_user_session WHERE token = :token");
|
||||
$stmt->bindParam(':token', $token);
|
||||
$stmt->execute();
|
||||
$session = $stmt->fetch();
|
||||
|
||||
if (!$session) {
|
||||
error_log("技师会话不存在: token={$token}");
|
||||
return false;
|
||||
}
|
||||
|
||||
// 检查会话是否过期
|
||||
if ($session['expire_time'] < time()) {
|
||||
error_log("技师会话已过期: token={$token}");
|
||||
return false;
|
||||
}
|
||||
|
||||
$stmt = $db->prepare("
|
||||
SELECT c.*
|
||||
FROM ls_coach c
|
||||
WHERE c.coach_user_id = :coach_user_id
|
||||
");
|
||||
$stmt->bindParam(':coach_user_id', $session['coach_user_id']);
|
||||
$stmt->execute();
|
||||
return $stmt->fetch();
|
||||
|
||||
default:
|
||||
throw new Exception("无效的用户类型: {$userType}");
|
||||
}
|
||||
} catch (Exception $e) {
|
||||
error_log("Token验证错误: " . $e->getMessage());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据ID获取会话信息
|
||||
*/
|
||||
function getConversationById(PDO $db, $conversationId) {
|
||||
try {
|
||||
$stmt = $db->prepare("
|
||||
SELECT *
|
||||
FROM ls_chat_conversation
|
||||
WHERE id = :conversation_id
|
||||
");
|
||||
$stmt->bindParam(':conversation_id', $conversationId);
|
||||
$stmt->execute();
|
||||
$conversation = $stmt->fetch();
|
||||
|
||||
if (!$conversation) {
|
||||
error_log("会话不存在: conversation_id={$conversationId}");
|
||||
return false;
|
||||
}
|
||||
|
||||
return $conversation;
|
||||
} catch (Exception $e) {
|
||||
error_log("获取会话信息失败: " . $e->getMessage());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存消息到数据库
|
||||
*/
|
||||
function saveMessage(PDO $db, $data) {
|
||||
try {
|
||||
// 确保所有必要字段都存在
|
||||
if (!isset($data['sender_id'], $data['sender_type'], $data['receiver_id'], $data['receiver_type'], $data['conversation_id'])) {
|
||||
throw new Exception("缺少必要的消息字段");
|
||||
}
|
||||
|
||||
$stmt = $db->prepare("
|
||||
INSERT INTO ls_chat_message
|
||||
(conversation_id, sender_id, sender_type, receiver_id, receiver_type, content, message_type, read_status, order_id)
|
||||
VALUES (:conversation_id, :sender_id, :sender_type, :receiver_id, :receiver_type, :content, :message_type, :read_status, :order_id)
|
||||
");
|
||||
|
||||
$messageType = $data['message_type'] ?? 1; // 默认为文本消息
|
||||
$readStatus = 0; // 默认为未读
|
||||
$orderId = $data['order_id'] ?? 0;
|
||||
|
||||
$stmt->bindParam(':conversation_id', $data['conversation_id']);
|
||||
$stmt->bindParam(':sender_id', $data['sender_id']);
|
||||
$stmt->bindParam(':sender_type', $data['sender_type']);
|
||||
$stmt->bindParam(':receiver_id', $data['receiver_id']);
|
||||
$stmt->bindParam(':receiver_type', $data['receiver_type']);
|
||||
$stmt->bindParam(':content', $data['content']);
|
||||
$stmt->bindParam(':message_type', $messageType);
|
||||
$stmt->bindParam(':read_status', $readStatus);
|
||||
$stmt->bindParam(':order_id', $orderId);
|
||||
|
||||
$stmt->execute();
|
||||
|
||||
return $db->lastInsertId();
|
||||
} catch (Exception $e) {
|
||||
error_log("保存消息失败: " . $e->getMessage());
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 标记消息为已读
|
||||
*/
|
||||
function markMessagesAsRead(PDO $db, $conversationId, $userId) {
|
||||
try {
|
||||
// 确保参数有效
|
||||
if (empty($conversationId) || empty($userId)) {
|
||||
throw new Exception("无效的会话ID或用户ID");
|
||||
}
|
||||
|
||||
// 开始事务
|
||||
$db->beginTransaction();
|
||||
|
||||
// 1. 标记消息为已读
|
||||
$stmt = $db->prepare("
|
||||
UPDATE ls_chat_message
|
||||
SET read_status = 1
|
||||
WHERE conversation_id = :conversation_id
|
||||
AND receiver_id = :receiver_id
|
||||
");
|
||||
|
||||
$stmt->bindParam(':conversation_id', $conversationId);
|
||||
$stmt->bindParam(':receiver_id', $userId);
|
||||
$stmt->execute();
|
||||
|
||||
// 2. 重置会话未读数
|
||||
$stmt = $db->prepare("
|
||||
UPDATE ls_chat_conversation
|
||||
SET unread_count = 0
|
||||
WHERE id = :conversation_id
|
||||
");
|
||||
$stmt->bindParam(':conversation_id', $conversationId);
|
||||
$stmt->execute();
|
||||
|
||||
// 提交事务
|
||||
$db->commit();
|
||||
|
||||
return true;
|
||||
} catch (Exception $e) {
|
||||
// 回滚事务
|
||||
$db->rollBack();
|
||||
error_log("标记消息已读失败: " . $e->getMessage());
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 向用户发送消息
|
||||
*/
|
||||
function sendToUser($server, $uid, $type, $message) {
|
||||
foreach ($server->table as $row) {
|
||||
if ($row['uid'] == $uid && $row['type'] == $type) {
|
||||
try {
|
||||
$server->push($row['fd'], json_encode($message));
|
||||
} catch (Exception $e) {
|
||||
error_log("发送消息失败: " . $e->getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 启动服务器
|
||||
echo "启动 WebSocket 服务器在 ws://0.0.0.0:9501\n";
|
||||
$server->start();
|
||||
Reference in New Issue
Block a user