MQTT Hook
@pt/hooks/modules/network 提供了一个响应式的MQTT客户端Hook,支持Vue组件生命周期管理。
依赖文件下载
请先下载 mqttws31.js 文件,并将其放置到项目的 public/js 目录下。
下载地址:点击下载 mqttws31.js
注意: 文件路径必须为 public/js/mqttws31.js,否则MQTT功能无法正常使用。
基本使用
typescript
import { useMqtt } from '@pt/hooks/modules/network';
export default {
setup() {
const {
mqttClient,
isConnected,
error,
startMqtt,
sendMessage,
disconnect
} = useMqtt({
mqttConfig: {
vhost: 'mqtt',
password: 'your-password',
webUri: 'ws://your-server:9080',
username: 'your-username',
useSSL: false // 根据weburl来决定是 true 或 false, 假如 weburl为“ws://mqtt.example.com:9080”,则useSSL为false,假如 weburl为“wss://mqtt.example.com:9080”,则useSSL为true
},
satoken: 'your-token',
basePath: import.meta.env.BASE_URL
});
const handleMqttMessageMap: Record<string, (data: unknown) => void> = {
// 举例
restoreConnection: (data: any) => {},
};
// 启动连接
await startMqtt(
'my/topic',
(topic, message) => {
console.log('mqtt-chat-message:', message);
const msg = JSON.parse(message.toString());
if (callback) {
callback(msg);
}
try {
let sessionContent = JSON.parse(msg.sessionContent);
console.log(sessionContent);
// 检查必要的字段是否存在
if (!sessionContent.type) {
throw new Error('推送的消息没有类型,请按标准格式推送...');
}
// 检查类型是否在映射表中
const handler = handleMqttMessageMap[sessionContent.type];
if (!handler) {
console.warn(`未知的消息类型: ${sessionContent.type}`);
return;
}
// 执行对应的处理函数
handler(sessionContent.data);
} catch (error) {
console.error('处理MQTT消息时出错:', error);
}
}
);
// 发送消息
if (isConnected.value) {
sendMessage('Hello', 'my/topic');
}
return {
isConnected,
error,
sendMessage,
disconnect
};
}
}API 参考
Hook返回值
| 名称 | 类型 | 描述 |
|---|---|---|
| mqttClient | Ref<MQTT | null> | MQTT客户端实例 |
| isConnected | Ref<boolean> | 连接状态 |
| error | Ref<Error | undefined> | 错误信息 |
| startMqtt | (topic: string | string[], callback: Function, config?: MqttOptions) => Promise<void> | 启动MQTT连接 |
| sendMessage | (message: string, topic: string) => void | 发送消息 |
| disconnect | () => void | 断开连接 |
高级用法
错误处理
typescript
const { startMqtt, error } = useMqtt();
watch(error, (newError) => {
if (newError) {
console.error('MQTT错误:', newError);
}
});
try {
await startMqtt('my/topic', callback);
} catch (err) {
console.error('连接失败:', err);
}动态重连
typescript
const { startMqtt, disconnect, isConnected } = useMqtt();
watch(isConnected, async (connected) => {
if (!connected) {
console.log('连接断开,尝试重连...');
try {
await startMqtt('my/topic', callback);
} catch (err) {
console.error('重连失败:', err);
}
}
});完整示例
useMqtt
消息监听
<template>
<div class="mqtt-test-container p-6">
<a-card title="MQTT Hook 测试" class="mb-4">
<template #extra>
<a-badge :status="isConnected ? 'success' : 'error'" :text="isConnected ? '已连接' : '未连接'" />
</template>
<!-- 配置区域 -->
<a-row :gutter="[16, 16]" class="mb-4">
<a-col :span="18">
<a-input
v-model:value="testConfig.webUri"
placeholder="请输入 webUri"
addonBefore="webUri"
/>
</a-col>
<a-col :span="6" class="flex items-center">
<span class="mr-6px">useSSL</span> <a-switch v-model:checked="testConfig.useSSL" checked-children="开" un-checked-children="关" />
</a-col>
</a-row>
<a-row :gutter="[16, 16]" class="mb-4">
<a-col :span="12">
<a-input
v-model:value="testConfig.threadId"
placeholder="请输入 Thread ID"
addonBefore="Thread ID"
/>
</a-col>
<a-col :span="12">
<a-input
v-model:value="testConfig.token"
placeholder="请输入 SA Token"
addonBefore="Token"
/>
</a-col>
</a-row>
<!-- 操作按钮区域 -->
<a-space class="mb-4">
<a-button
type="primary"
@click="handleConnect"
:loading="connecting"
:disabled="!testConfig.threadId || !testConfig.token || !testConfig.webUri"
>
<template #icon><wifi-outlined /></template>
连接 MQTT
</a-button>
<a-button
@click="handleDisconnect"
:disabled="!isConnected"
danger
>
<template #icon><disconnect-outlined /></template>
断开连接
</a-button>
<a-button @click="clearMessages">
<template #icon><clear-outlined /></template>
清空消息
</a-button>
<a-button @click="toggleAutoScroll" :type="autoScroll ? 'primary' : 'default'">
<template #icon><vertical-align-bottom-outlined /></template>
{{ autoScroll ? '关闭' : '开启' }}自动滚动
</a-button>
</a-space>
<!-- 状态信息 -->
<a-descriptions :column="2" size="small" class="mb-4" bordered>
<a-descriptions-item label="连接状态">
<a-tag :color="isConnected ? 'green' : 'red'">
{{ isConnected ? '已连接' : '未连接' }}
</a-tag>
</a-descriptions-item>
<a-descriptions-item label="订阅主题">
<a-tag color="blue">{{ currentTopic || '未订阅' }}</a-tag>
</a-descriptions-item>
<a-descriptions-item label="接收消息数">
<a-statistic :value="messageCount" />
</a-descriptions-item>
<a-descriptions-item label="错误次数">
<a-statistic :value="errorCount" :value-style="{ color: errorCount > 0 ? '#cf1322' : '#3f8600' }" />
</a-descriptions-item>
</a-descriptions>
<!-- 错误信息 -->
<a-alert
v-if="error"
:message="error.message"
type="error"
show-icon
closable
@close="error = null"
class="mb-4"
/>
</a-card>
<!-- 消息日志区域 -->
<a-card title="消息日志" size="small">
<template #extra>
<a-space>
<a-tag color="processing">实时日志</a-tag>
<a-switch
v-model:checked="showDetails"
checked-children="详细"
un-checked-children="简洁"
/>
</a-space>
</template>
<div ref="messageContainer" class="message-container">
<div v-if="messages.length === 0" class="empty-state">
<a-empty description="暂无消息">
<template #image>
<message-outlined style="font-size: 48px; color: #d9d9d9;" />
</template>
</a-empty>
</div>
<div v-else>
<div
v-for="(message, index) in messages"
:key="index"
class="message-item"
:class="getMessageClass(message.type)"
>
<div class="message-header">
<a-space>
<a-tag :color="getMessageTypeColor(message.type)">
{{ message.type }}
</a-tag>
<span class="message-time">{{ message.timestamp }}</span>
<a-button
v-if="message.type === 'message' && message.data"
size="small"
type="link"
@click="toggleMessageDetail(index)"
>
{{ message.showDetail ? '收起' : '展开' }}详情
</a-button>
</a-space>
</div>
<div class="message-content">
<template v-if="message.type === 'info' || message.type === 'error'">
<pre class="message-text">{{ message.content }}</pre>
</template>
<template v-else-if="message.type === 'message'">
<div class="mqtt-message">
<div class="topic-info">
<strong>主题:</strong> {{ message.topic }}
</div>
<div v-if="showDetails || message.showDetail" class="message-detail">
<a-divider>原始消息</a-divider>
<pre class="raw-message">{{ message.rawMessage }}</pre>
<template v-if="message.data">
<a-divider>解析后数据</a-divider>
<div class="parsed-data">
<div class="data-item">
<strong>消息类型:</strong>
<a-tag>{{ message.data.sessionContent?.type || '未知' }}</a-tag>
</div>
<div class="data-item">
<strong>会话内容:</strong>
<pre class="session-content">{{ formatJson(message.data.sessionContent) }}</pre>
</div>
</div>
</template>
</div>
<div v-else class="message-summary">
消息类型: <a-tag>{{ message.data?.sessionContent?.type || '未知' }}</a-tag>
<a-button size="small" type="link" @click="toggleMessageDetail(index)">
查看详情
</a-button>
</div>
</div>
</template>
</div>
</div>
</div>
</div>
</a-card>
<!-- 测试工具区域 -->
<a-card title="测试工具" class="mt-4" size="small">
<a-row :gutter="16">
<a-col :span="12">
<h4>模拟消息发送</h4>
<a-form layout="vertical">
<a-form-item label="消息类型">
<a-select v-model:value="mockMessage.type" placeholder="选择消息类型">
<a-select-option value="restoreConnection">恢复连接</a-select-option>
<a-select-option value="newMessage">新消息</a-select-option>
<a-select-option value="statusUpdate">状态更新</a-select-option>
<a-select-option value="custom">自定义</a-select-option>
</a-select>
</a-form-item>
<a-form-item label="消息数据">
<a-textarea
v-model:value="mockMessage.data"
:rows="4"
placeholder="请输入JSON格式的消息数据"
/>
</a-form-item>
<a-button type="primary" @click="sendMockMessage" :disabled="!isConnected">
发送测试消息
</a-button>
</a-form>
</a-col>
<a-col :span="12">
<h4>连接统计</h4>
<a-statistic-countdown
title="连接时长"
:value="connectionStartTime + 1000 * 60 * 60 * 24"
v-if="isConnected && connectionStartTime"
/>
<a-descriptions :column="1" size="small">
<a-descriptions-item label="总接收消息">{{ messageCount }}</a-descriptions-item>
<a-descriptions-item label="成功处理">{{ successCount }}</a-descriptions-item>
<a-descriptions-item label="处理失败">{{ errorCount }}</a-descriptions-item>
<a-descriptions-item label="处理成功率">
{{ messageCount > 0 ? ((successCount / messageCount) * 100).toFixed(1) : 0 }}%
</a-descriptions-item>
</a-descriptions>
</a-col>
</a-row>
</a-card>
<p class="">
👉 注: 该测试组件仅用于演示 useMqtt Hook 的基本功能和消息处理流程,实际使用中请根据具体需求进行调整和优化。此外,日志可在浏览器控制台查看。
</p>
</div>
</template>
<script setup lang="ts" name="MqttTest">
import { ref, reactive, nextTick, onMounted } from 'vue'
import { message as antMessage } from 'ant-design-vue'
import { useMqtt } from '@pt/hooks' // 根据实际路径调整
import {
WifiOutlined,
DisconnectOutlined,
ClearOutlined,
VerticalAlignBottomOutlined,
MessageOutlined
} from '@ant-design/icons-vue'
// 测试配置
const testConfig = reactive({
webUri: 'ws://bcapi-dev1.e-tudou.com:9080', // 默认测试地址
threadId: '0728853c-eeb5-4708-802d-fdd6af6e8e6e', // 默认测试ID
token: '0c60ae2e-5012-417c-8d21-35088ec00785', // 默认测试token
useSSL: false
})
// MQTT实例 - 在组件setup阶段初始化
const { startMqtt, disconnect, isConnected, error } = useMqtt({
mqttConfig: {
vhost: "mqtt",
password: "tudou-agentos",
webUri: testConfig.webUri,
username: "tudou-agentos",
useSSL: testConfig.useSSL
},
satoken: testConfig.token,
basePath: "/webkit-pro/"
})
// 状态管理
const connecting = ref(false)
const messageContainer = ref<HTMLElement>()
const autoScroll = ref(true)
const showDetails = ref(false)
// 统计数据
const messageCount = ref(0)
const successCount = ref(0)
const errorCount = ref(0)
const connectionStartTime = ref<number | null>(null)
const currentTopic = ref('')
// 消息日志
interface LogMessage {
type: 'info' | 'error' | 'message'
content?: string
topic?: string
rawMessage?: string
data?: any
timestamp: string
showDetail?: boolean
}
const messages = ref<LogMessage[]>([])
// 模拟消息
const mockMessage = reactive({
type: 'restoreConnection',
data: JSON.stringify({
sessionId: 'test-session',
status: 'active'
}, null, 2)
})
// MQTT消息处理映射
const handleMqttMessageMap: Record<string, (data: unknown) => void> = {
restoreConnection: (data: any) => {
addLogMessage('info', `恢复连接处理: ${JSON.stringify(data)}`)
successCount.value++
},
newMessage: (data: any) => {
addLogMessage('info', `新消息处理: ${JSON.stringify(data)}`)
successCount.value++
},
statusUpdate: (data: any) => {
addLogMessage('info', `状态更新处理: ${JSON.stringify(data)}`)
successCount.value++
}
}
// 添加日志消息
const addLogMessage = (type: LogMessage['type'], content: string, topic?: string, rawMessage?: string, data?: any) => {
messages.value.push({
type,
content,
topic,
rawMessage,
data,
timestamp: new Date().toLocaleTimeString(),
showDetail: false
})
if (autoScroll.value) {
nextTick(() => {
if (messageContainer.value) {
messageContainer.value.scrollTop = messageContainer.value.scrollHeight
}
})
}
}
// 监听MQTT消息
const listenMqttMessage = async (threadId: string, callback?: (msg: any) => void) => {
addLogMessage('info', `开始连接MQTT,主题: MQTT/chat/${threadId}`)
try {
// 使用当前的 testConfig 配置重新连接
await startMqtt(`MQTT/chat/${threadId}`, (_topic: any, message: any) => {
messageCount.value++
const rawMsg = message.toString()
addLogMessage('message', '', _topic, rawMsg)
if (callback) {
callback(message)
}
try {
const msg = JSON.parse(rawMsg)
const sessionContent = JSON.parse(msg.sessionContent)
// 更新最后一条消息的解析数据
if (messages.value.length > 0) {
const lastMessage = messages.value[messages.value.length - 1]
if (lastMessage.type === 'message') {
lastMessage.data = { ...msg, sessionContent }
}
}
// 检查必要的字段
if (!sessionContent.type) {
throw new Error("推送的消息没有类型,请按标准格式推送...")
}
// 检查类型是否在映射表中
const handler = handleMqttMessageMap[sessionContent.type]
if (!handler) {
addLogMessage('error', `未知的消息类型: ${sessionContent.type}`)
errorCount.value++
return
}
// 执行对应的处理函数
handler(sessionContent.data)
} catch (err) {
const errorMsg = `处理MQTT消息时出错: ${err instanceof Error ? err.message : String(err)}`
addLogMessage('error', errorMsg)
errorCount.value++
}
}, {
mqttConfig: {
vhost: "mqtt",
password: "tudou-agentos",
webUri: testConfig.webUri,
username: "tudou-agentos",
useSSL: testConfig.useSSL
},
satoken: testConfig.token,
basePath: "/webkit-pro/"
})
connectionStartTime.value = Date.now()
currentTopic.value = `MQTT/chat/${threadId}`
addLogMessage('info', 'MQTT连接成功')
} catch (err) {
const errorMsg = `MQTT连接失败: ${err instanceof Error ? err.message : String(err)}`
addLogMessage('error', errorMsg)
throw err
}
}
// 连接处理
const handleConnect = async () => {
if (!testConfig.threadId || !testConfig.token || !testConfig.webUri) {
antMessage.warning('请填写完整的配置信息')
return
}
connecting.value = true
try {
await listenMqttMessage(testConfig.threadId)
} catch (err) {
antMessage.error(`连接失败: ${err instanceof Error ? err.message : String(err)}`)
} finally {
connecting.value = false
}
}
// 断开连接处理
const handleDisconnect = async () => {
try {
await disconnect()
connectionStartTime.value = null
currentTopic.value = ''
addLogMessage('info', 'MQTT连接已断开')
antMessage.success('MQTT连接已断开')
} catch (err) {
antMessage.error(`断开连接失败: ${err instanceof Error ? err.message : String(err)}`)
}
}
// 清空消息
const clearMessages = () => {
messages.value = []
messageCount.value = 0
successCount.value = 0
errorCount.value = 0
antMessage.success('消息日志已清空')
}
// 切换自动滚动
const toggleAutoScroll = () => {
autoScroll.value = !autoScroll.value
antMessage.info(`自动滚动已${autoScroll.value ? '开启' : '关闭'}`)
}
// 切换消息详情显示
const toggleMessageDetail = (index: number) => {
messages.value[index].showDetail = !messages.value[index].showDetail
}
// 获取消息类型颜色
const getMessageTypeColor = (type: string) => {
switch (type) {
case 'info': return 'blue'
case 'error': return 'red'
case 'message': return 'green'
default: return 'default'
}
}
// 获取消息样式类
const getMessageClass = (type: string) => {
return `message-${type}`
}
// 格式化JSON
const formatJson = (obj: any) => {
return JSON.stringify(obj, null, 2)
}
// 发送模拟消息
const sendMockMessage = () => {
try {
const messageData = {
sessionContent: JSON.stringify({
type: mockMessage.type,
data: JSON.parse(mockMessage.data)
})
}
// 模拟接收到消息(实际使用中这会通过MQTT推送)
const mockRawMessage = JSON.stringify(messageData)
// 手动触发消息处理流程
messageCount.value++
addLogMessage('message', '', currentTopic.value, mockRawMessage, {
...messageData,
sessionContent: JSON.parse(messageData.sessionContent)
})
// 执行处理逻辑
const sessionContent = JSON.parse(messageData.sessionContent)
const handler = handleMqttMessageMap[sessionContent.type]
if (handler) {
handler(sessionContent.data)
} else {
addLogMessage('error', `未知的消息类型: ${sessionContent.type}`)
errorCount.value++
}
antMessage.success('模拟消息发送成功')
} catch (err) {
antMessage.error(`发送模拟消息失败: ${err instanceof Error ? err.message : String(err)}`)
}
}
</script>
<style lang="less" scoped>
.mqtt-test-container {
max-width: 1200px;
margin: 0 auto;
.message-container {
max-height: 500px;
overflow-y: auto;
border: 1px solid #f0f0f0;
border-radius: 6px;
padding: 16px;
background: #fafafa;
.empty-state {
text-align: center;
padding: 40px 0;
}
.message-item {
margin-bottom: 12px;
padding: 12px;
border-radius: 6px;
border-left: 4px solid #d9d9d9;
background: white;
&.message-info {
border-left-color: #1890ff;
}
&.message-error {
border-left-color: #ff4d4f;
background: #fff2f0;
}
&.message-message {
border-left-color: #52c41a;
}
.message-header {
margin-bottom: 8px;
.message-time {
color: #999;
font-size: 12px;
}
}
.message-content {
.message-text {
margin: 0;
padding: 8px;
background: #f5f5f5;
border-radius: 4px;
font-size: 12px;
white-space: pre-wrap;
word-break: break-all;
}
.mqtt-message {
.topic-info {
margin-bottom: 8px;
font-size: 12px;
color: #666;
}
.message-detail {
.raw-message, .session-content {
margin: 0;
padding: 8px;
background: #f5f5f5;
border-radius: 4px;
font-size: 11px;
white-space: pre-wrap;
word-break: break-all;
max-height: 200px;
overflow-y: auto;
}
.parsed-data {
.data-item {
margin-bottom: 8px;
strong {
display: inline-block;
min-width: 80px;
}
}
}
}
.message-summary {
font-size: 12px;
color: #666;
}
}
}
}
}
}
// UnoCSS 工具类补充
.p-6 { padding: 1.5rem; }
.mb-4 { margin-bottom: 1rem; }
.mt-4 { margin-top: 1rem; }
</style>