391 lines
12 KiB
PHP
391 lines
12 KiB
PHP
<?php
|
|
// /app/socket/ChatServer.php
|
|
use Swoole\WebSocket\Server;
|
|
use Swoole\Table;
|
|
|
|
// 设置错误报告
|
|
ini_set('display_errors', 1);
|
|
ini_set('display_startup_errors', 1);
|
|
error_reporting(E_ALL);
|
|
|
|
// 设置自定义错误处理
|
|
set_error_handler(function($errno, $errstr, $errfile, $errline) {
|
|
error_log("PHP错误: [{$errno}] {$errstr} in {$errfile} on line {$errline}");
|
|
return true;
|
|
});
|
|
|
|
// 设置异常处理
|
|
set_exception_handler(function($e) {
|
|
error_log("未捕获异常: " . $e->getMessage() . " in " . $e->getFile() . " on line " . $e->getLine());
|
|
});
|
|
|
|
// 创建 WebSocket 服务器
|
|
$server = new Server("0.0.0.0", 9501, SWOOLE_PROCESS, SWOOLE_SOCK_TCP | SWOOLE_SSL);
|
|
$server->set([
|
|
'ssl_cert_file' => '/www/server/panel/vhost/cert/laowoanmo.heibaokeji.com/fullchain.pem',
|
|
'ssl_key_file' => '/www/server/panel/vhost/cert/laowoanmo.heibaokeji.com/privkey.pem',
|
|
]);
|
|
|
|
// 初始化数据库连接
|
|
$db = createDbConnection();
|
|
|
|
// 创建连接表
|
|
$table = new Table(1024);
|
|
$table->column('fd', Table::TYPE_INT);
|
|
$table->column('uid', Table::TYPE_INT);
|
|
$table->column('type', Table::TYPE_INT); // 1-用户 2-技师
|
|
$table->create();
|
|
|
|
$server->table = $table;
|
|
$server->db = $db; // 将数据库连接附加到服务器对象
|
|
|
|
// 连接处理
|
|
$server->on('open', function (Server $server, $request) {
|
|
$params = getQueryParams($request->server['query_string']);
|
|
|
|
// 获取用户类型
|
|
$userType = $params['type'] ?? 0;
|
|
|
|
// 用户验证逻辑
|
|
if (!$user = verifyToken($server->db, $params['token'] ?? '', $userType)) {
|
|
$server->close($request->fd);
|
|
return;
|
|
}
|
|
|
|
// 确保类型参数存在
|
|
if (!in_array($userType, [1, 2])) {
|
|
error_log("无效的用户类型: {$userType}");
|
|
$server->close($request->fd);
|
|
return;
|
|
}
|
|
|
|
// 存储连接信息
|
|
$info = [
|
|
'fd' => $request->fd,
|
|
'uid' => $user['id'],
|
|
'type' => $userType
|
|
];
|
|
|
|
$server->table->set($request->fd, $info);
|
|
|
|
echo "客户端 {$request->fd} 已连接 (UID: {$user['id']}, 类型: {$userType})\n";
|
|
});
|
|
|
|
// 消息处理
|
|
$server->on('message', function (Server $server, $frame) {
|
|
$data = json_decode($frame->data, true);
|
|
|
|
// 获取发送者信息
|
|
$senderInfo = $server->table->get($frame->fd);
|
|
if (!$senderInfo) {
|
|
error_log("无法获取发送者信息: FD={$frame->fd}");
|
|
return;
|
|
}
|
|
|
|
switch ($data['action']) {
|
|
case 'send':
|
|
// 确保所有必要字段都存在
|
|
if (!isset($data['conversation_id'], $data['content'])) {
|
|
error_log("消息缺少必要字段: " . json_encode($data));
|
|
return;
|
|
}
|
|
|
|
// 根据会话ID获取会话信息
|
|
$conversation = getConversationById($server->db, $data['conversation_id']);
|
|
if (!$conversation) {
|
|
error_log("会话不存在: conversation_id={$data['conversation_id']}");
|
|
return;
|
|
}
|
|
|
|
// 确定接收者信息
|
|
if ($senderInfo['type'] == 1) { // 发送者是用户
|
|
$receiverId = $conversation['tech_id'];
|
|
$receiverType = 2; // 技师
|
|
} else { // 发送者是技师
|
|
$receiverId = $conversation['user_id'];
|
|
$receiverType = 1; // 用户
|
|
}
|
|
|
|
// 1. 存储到数据库
|
|
$messageData = [
|
|
'sender_id' => $senderInfo['uid'],
|
|
'sender_type' => $senderInfo['type'],
|
|
'receiver_id' => $receiverId,
|
|
'receiver_type' => $receiverType,
|
|
'content' => $data['content'],
|
|
'order_id' => $data['order_id'] ?? 0,
|
|
'conversation_id' => $data['conversation_id']
|
|
];
|
|
|
|
$messageId = saveMessage($server->db, $messageData);
|
|
|
|
// 2. 构建消息体
|
|
$message = [
|
|
'action' => 'new',
|
|
'data' => [
|
|
'id' => $messageId,
|
|
'content' => $data['content'],
|
|
'sender_type' => $senderInfo['type'],
|
|
'create_time' => date('Y-m-d H:i:s'),
|
|
'conversation_id' => $data['conversation_id'],
|
|
]
|
|
];
|
|
|
|
// 3. 找到接收者发送消息
|
|
sendToUser($server, $receiverId, $receiverType, $message);
|
|
|
|
// 4. 也发给自己(保证消息同步)
|
|
$server->push($frame->fd, json_encode($message));
|
|
break;
|
|
|
|
case 'read':
|
|
// 确保所有必要字段都存在
|
|
if (!isset($data['conversation_id'], $data['user_id'])) {
|
|
error_log("标记已读缺少必要字段: " . json_encode($data));
|
|
return;
|
|
}
|
|
|
|
// 标记消息为已读
|
|
markMessagesAsRead($server->db, $data['conversation_id'], $data['user_id']);
|
|
break;
|
|
}
|
|
});
|
|
|
|
// 关闭连接
|
|
$server->on('close', function ($server, $fd) {
|
|
$server->table->del($fd);
|
|
echo "客户端 {$fd} 已断开连接\n";
|
|
});
|
|
|
|
/**
|
|
* 解析查询字符串
|
|
*/
|
|
function getQueryParams($queryString) {
|
|
$params = [];
|
|
parse_str($queryString, $params);
|
|
|
|
// 过滤特殊字符
|
|
foreach ($params as $key => $value) {
|
|
$params[$key] = htmlspecialchars($value, ENT_QUOTES, 'UTF-8');
|
|
}
|
|
|
|
return $params;
|
|
}
|
|
|
|
/**
|
|
* 创建数据库连接
|
|
*/
|
|
function createDbConnection() {
|
|
$host = '127.0.0.1';
|
|
$dbname = 'anmo';
|
|
$username = 'anmo';
|
|
$password = 'fmyrGXBYijbmSMbi';
|
|
|
|
try {
|
|
$db = new PDO("mysql:host=$host;dbname=$dbname;charset=utf8mb4", $username, $password);
|
|
$db->setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION);
|
|
$db->setAttribute(PDO::ATTR_DEFAULT_FETCH_MODE, PDO::FETCH_ASSOC);
|
|
$db->setAttribute(PDO::ATTR_EMULATE_PREPARES, false);
|
|
return $db;
|
|
} catch (PDOException $e) {
|
|
die("数据库连接失败: " . $e->getMessage());
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 验证 token
|
|
*/
|
|
function verifyToken(PDO $db, $token, $userType) {
|
|
try {
|
|
if (empty($token)) {
|
|
throw new Exception("Token不能为空");
|
|
}
|
|
|
|
// 根据用户类型选择不同的验证逻辑
|
|
switch ($userType) {
|
|
case 1: // 普通用户
|
|
$stmt = $db->prepare("SELECT * FROM ls_user_session WHERE token = :token");
|
|
$stmt->bindParam(':token', $token);
|
|
$stmt->execute();
|
|
$session = $stmt->fetch();
|
|
|
|
if (!$session) {
|
|
error_log("用户会话不存在: token={$token}");
|
|
return false;
|
|
}
|
|
|
|
// 检查会话是否过期
|
|
if ($session['expire_time'] < time()) {
|
|
error_log("用户会话已过期: token={$token}");
|
|
return false;
|
|
}
|
|
|
|
$stmt = $db->prepare("SELECT * FROM ls_user WHERE id = :user_id");
|
|
$stmt->bindParam(':user_id', $session['user_id']);
|
|
$stmt->execute();
|
|
return $stmt->fetch();
|
|
|
|
case 2: // 技师
|
|
$stmt = $db->prepare("SELECT * FROM ls_coach_user_session WHERE token = :token");
|
|
$stmt->bindParam(':token', $token);
|
|
$stmt->execute();
|
|
$session = $stmt->fetch();
|
|
|
|
if (!$session) {
|
|
error_log("技师会话不存在: token={$token}");
|
|
return false;
|
|
}
|
|
|
|
// 检查会话是否过期
|
|
if ($session['expire_time'] < time()) {
|
|
error_log("技师会话已过期: token={$token}");
|
|
return false;
|
|
}
|
|
|
|
$stmt = $db->prepare("
|
|
SELECT c.*
|
|
FROM ls_coach c
|
|
WHERE c.coach_user_id = :coach_user_id
|
|
");
|
|
$stmt->bindParam(':coach_user_id', $session['coach_user_id']);
|
|
$stmt->execute();
|
|
return $stmt->fetch();
|
|
|
|
default:
|
|
throw new Exception("无效的用户类型: {$userType}");
|
|
}
|
|
} catch (Exception $e) {
|
|
error_log("Token验证错误: " . $e->getMessage());
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 根据ID获取会话信息
|
|
*/
|
|
function getConversationById(PDO $db, $conversationId) {
|
|
try {
|
|
$stmt = $db->prepare("
|
|
SELECT *
|
|
FROM ls_chat_conversation
|
|
WHERE id = :conversation_id
|
|
");
|
|
$stmt->bindParam(':conversation_id', $conversationId);
|
|
$stmt->execute();
|
|
$conversation = $stmt->fetch();
|
|
|
|
if (!$conversation) {
|
|
error_log("会话不存在: conversation_id={$conversationId}");
|
|
return false;
|
|
}
|
|
|
|
return $conversation;
|
|
} catch (Exception $e) {
|
|
error_log("获取会话信息失败: " . $e->getMessage());
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 保存消息到数据库
|
|
*/
|
|
function saveMessage(PDO $db, $data) {
|
|
try {
|
|
// 确保所有必要字段都存在
|
|
if (!isset($data['sender_id'], $data['sender_type'], $data['receiver_id'], $data['receiver_type'], $data['conversation_id'])) {
|
|
throw new Exception("缺少必要的消息字段");
|
|
}
|
|
|
|
$stmt = $db->prepare("
|
|
INSERT INTO ls_chat_message
|
|
(conversation_id, sender_id, sender_type, receiver_id, receiver_type, content, message_type, read_status, order_id)
|
|
VALUES (:conversation_id, :sender_id, :sender_type, :receiver_id, :receiver_type, :content, :message_type, :read_status, :order_id)
|
|
");
|
|
|
|
$messageType = $data['message_type'] ?? 1; // 默认为文本消息
|
|
$readStatus = 0; // 默认为未读
|
|
$orderId = $data['order_id'] ?? 0;
|
|
|
|
$stmt->bindParam(':conversation_id', $data['conversation_id']);
|
|
$stmt->bindParam(':sender_id', $data['sender_id']);
|
|
$stmt->bindParam(':sender_type', $data['sender_type']);
|
|
$stmt->bindParam(':receiver_id', $data['receiver_id']);
|
|
$stmt->bindParam(':receiver_type', $data['receiver_type']);
|
|
$stmt->bindParam(':content', $data['content']);
|
|
$stmt->bindParam(':message_type', $messageType);
|
|
$stmt->bindParam(':read_status', $readStatus);
|
|
$stmt->bindParam(':order_id', $orderId);
|
|
|
|
$stmt->execute();
|
|
|
|
return $db->lastInsertId();
|
|
} catch (Exception $e) {
|
|
error_log("保存消息失败: " . $e->getMessage());
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 标记消息为已读
|
|
*/
|
|
function markMessagesAsRead(PDO $db, $conversationId, $userId) {
|
|
try {
|
|
// 确保参数有效
|
|
if (empty($conversationId) || empty($userId)) {
|
|
throw new Exception("无效的会话ID或用户ID");
|
|
}
|
|
|
|
// 开始事务
|
|
$db->beginTransaction();
|
|
|
|
// 1. 标记消息为已读
|
|
$stmt = $db->prepare("
|
|
UPDATE ls_chat_message
|
|
SET read_status = 1
|
|
WHERE conversation_id = :conversation_id
|
|
AND receiver_id = :receiver_id
|
|
");
|
|
|
|
$stmt->bindParam(':conversation_id', $conversationId);
|
|
$stmt->bindParam(':receiver_id', $userId);
|
|
$stmt->execute();
|
|
|
|
// 2. 重置会话未读数
|
|
$stmt = $db->prepare("
|
|
UPDATE ls_chat_conversation
|
|
SET unread_count = 0
|
|
WHERE id = :conversation_id
|
|
");
|
|
$stmt->bindParam(':conversation_id', $conversationId);
|
|
$stmt->execute();
|
|
|
|
// 提交事务
|
|
$db->commit();
|
|
|
|
return true;
|
|
} catch (Exception $e) {
|
|
// 回滚事务
|
|
$db->rollBack();
|
|
error_log("标记消息已读失败: " . $e->getMessage());
|
|
return false;
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 向用户发送消息
|
|
*/
|
|
function sendToUser($server, $uid, $type, $message) {
|
|
foreach ($server->table as $row) {
|
|
if ($row['uid'] == $uid && $row['type'] == $type) {
|
|
try {
|
|
$server->push($row['fd'], json_encode($message));
|
|
} catch (Exception $e) {
|
|
error_log("发送消息失败: " . $e->getMessage());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// 启动服务器
|
|
echo "启动 WebSocket 服务器在 ws://0.0.0.0:9501\n";
|
|
$server->start(); |