Swoole可通过TCP服务器实现MQTT协议解析,核心包括处理CONNECT、PUBLISH、SUBSCRIBE等报文,管理客户端订阅关系与消息转发,需手动解析变长头部与主题长度,支持PINGREQ心跳与连接状态维护,配合mosquitto工具测试基础通信,适用于轻量级物联网场景,但生产环境需扩展QoS、TLS、持久化等机制。

Swoole 是一个强大的 PHP 扩展,支持异步、并发和长连接网络编程,非常适合用来构建自定义协议的服务器,比如 MQTT 服务。虽然 Swoole 不直接内置 MQTT 协议解析,但你可以基于其 TCP 或 WebSocket 服务器功能,手动实现一个轻量级的 MQTT 服务器。
MQTT 是一种基于发布/订阅模式的轻量级消息协议,常用于物联网场景。它使用二进制报文格式,通过 TCP 传输。关键报文类型包括:CONNECT、PUBLISH、SUBSCRIBE、PINGREQ、DISCONNECT 等。
要实现简单 MQTT 服务器,你至少需要:
下面是一个基于 Swoole 实现的极简 MQTT 服务器骨架:
$server = new Swoole\Server('0.0.0.0', 1883);
// 存储客户端订阅的主题,格式:topic => [fd1, fd2, ...]
$subscribers = [];
$server->on('connect', function ($serv, $fd) {
echo "Client: {$fd} connected.\n";
});
$server->on('receive', function ($serv, $fd, $reactorId, $data) use (&$subscribers) {
if (strlen($data) < 2) return;
$firstByte = ord($data[0]);
$msgType = ($firstByte & 0xF0) >> 4;
// 解析剩余长度(MQTT 变长编码)
$i = 1;
$remainingLength = 0;
$multiplier = 1;
do {
if ($i >= strlen($data)) return;
$byte = ord($data[$i]);
$remainingLength += ($byte & 127) * $multiplier;
$multiplier *= 128;
$i++;
} while (($byte & 128) > 0);
$payload = substr($data, $i, $remainingLength);
switch ($msgType) {
case 1: // CONNECT
handleConnect($serv, $fd, $payload);
break;
case 3: // PUBLISH
handlePublish($serv, $payload, $subscribers);
break;
case 8: // SUBSCRIBE
handleSubscribe($serv, $fd, $payload, $subscribers);
break;
case 12: // PINGREQ
$serv->send($fd, chr(0xC0) . chr(0x00)); // 返回 PINGRESP
break;
case 14: // DISCONNECT
$serv->close($fd);
break;
}
});
$server->on('close', function ($serv, $fd) use (&$subscribers) {
// 清理该客户端的订阅
foreach ($subscribers as $topic => $clients) {
$subscribers[$topic] = array_filter($clients, function ($clientFd) use ($fd) {
return $clientFd !== $fd;
});
}
echo "Client: {$fd} closed.\n";
});
$server->start();
function handleConnect($serv, $fd, $payload) {
// 跳过协议名(通常是'MQTT')
$offset = 0;
$protocolNameLen = (ord($payload[$offset]) << 8) | ord($payload[$offset + 1]);
$offset += 2 + $protocolNameLen;
$protocolLevel = $payload[$offset];
$offset += 1;
$connectFlags = ord($payload[$offset]);
$offset += 1;
$keepAlive = (ord($payload[$offset]) << 8) | ord($payload[$offset + 1]);
$offset += 2;
// 解析 Client ID
$clientIdLen = (ord($payload[$offset]) << 8) | ord($payload[$offset + 1]);
$offset += 2;
$clientId = substr($payload, $offset, $clientIdLen);
$offset += $clientIdLen;
echo "Client ID: {$clientId} connected.\n";
// 发送 CONNACK (连接确认)
$connack = chr(0x20) . chr(0x02) . chr(0x00) . chr(0x00); // 0x00 表示连接成功
$serv->send($fd, $connack);
}
function handleSubscribe($serv, $fd, $payload, &$subscribers) {
$packetId = (ord($payload[0]) << 8) | ord($payload[1]);
$offset = 2;
$topics = [];
while ($offset < strlen($payload)) {
$topicLen = (ord($payload[$offset]) << 8) | ord($payload[$offset + 1]);
$offset += 2;
$topic = substr($payload, $offset, $topicLen);
$offset += $topicLen;
$qos = $payload[$offset] & 0x03;
$offset += 1;
$topics[] = $topic;
if (!isset($subscribers[$topic])) {
$subscribers[$topic] = [];
}
$subscribers[$topic][] = $fd;
}
// 返回 SUBACK
$suback = chr(0x90) . chr(count($topics) + 2) . chr($packetId >> 8) . chr($packetId & 0xFF);
foreach ($topics as $t) {
$suback .= chr(0x01); // 返回 QoS 1
}
$serv->send($fd, $suback);
}
function handlePublish($serv, $payload, &$subscribers) {
$offset = 0;
$topicLen = (ord($payload[$offset]) << 8) | ord($payload[$offset + 1]);
$offset += 2;
$topic = substr($payload, $offset, $topicLen);
$offset += $topicLen;
$message = substr($payload, $offset);
// 查找订阅了该主题的客户端并发送消息
if (isset($subscribers[$topic])) {
$pubMsg = chr(0x30) . pack('C', 2 + $topicLen + strlen($message))
. chr($topicLen >> 8) . chr($topicLen & 0xFF)
. $topic . $message;
foreach ($subscribers[$topic] as $clientFd) {
$serv->send($clientFd, $pubMsg);
}
}
}使用 mosquitto_pub 和 mosquitto_sub 工具测试:
mosquitto_sub -h 127.0.0.1 -p 1883 -t 'test/topic'mosquitto_pub -h 127.0.0.1 -p 1883 -t 'test/topic' -m 'Hello Swoole MQTT'如果一切正常,订阅端会收到消息。
这个实现非常基础,仅用于学习。生产环境需考虑:
也可以考虑基于开源项目如 EMQX 或使用 Swoole 配合 Workerman + GatewayWorker 构建更复杂的 MQTT 代理。
基本上就这些。用 Swoole 写 MQTT 服务器核心是解析二进制协议并管理连接状态,难点在协议细节处理。
以上就是Swoole如何实现一个简单的MQTT服务器的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号