Skip to content

MQTT Hook

@pt/hooks/modules/network 提供了一个响应式的MQTT客户端Hook,支持Vue组件生命周期管理。

依赖文件下载

请先下载 mqttws31.js 文件,并将其放置到项目的 public/js 目录下。
下载地址:点击下载 mqttws31.js
注意: 文件路径必须为 public/js/mqttws31.js,否则MQTT功能无法正常使用。

基本使用

typescript
import { useMqtt } from '@pt/hooks/modules/network';

export default {
  setup() {
    const { 
      mqttClient,
      isConnected,
      error,
      startMqtt,
      sendMessage,
      disconnect 
    } = useMqtt({
      mqttConfig: {
        vhost: 'mqtt',
        password: 'your-password',
        webUri: 'ws://your-server:9080',
        username: 'your-username',
        useSSL: false // 根据weburl来决定是 true 或 false, 假如 weburl为“ws://mqtt.example.com:9080”,则useSSL为false,假如 weburl为“wss://mqtt.example.com:9080”,则useSSL为true
      },
      satoken: 'your-token',
      basePath: import.meta.env.BASE_URL
    });

    const handleMqttMessageMap: Record<string, (data: unknown) => void> = {
      // 举例
      restoreConnection: (data: any) => {},
    };

    // 启动连接
    await startMqtt(
      'my/topic',
      (topic, message) => {
        console.log('mqtt-chat-message:', message);
        const msg = JSON.parse(message.toString());
        if (callback) {
          callback(msg);
        }
        try {
          let sessionContent = JSON.parse(msg.sessionContent);
          console.log(sessionContent);

          // 检查必要的字段是否存在
          if (!sessionContent.type) {
            throw new Error('推送的消息没有类型,请按标准格式推送...');
          }

          // 检查类型是否在映射表中
          const handler = handleMqttMessageMap[sessionContent.type];
          if (!handler) {
            console.warn(`未知的消息类型: ${sessionContent.type}`);
            return;
          }

          // 执行对应的处理函数
          handler(sessionContent.data);
        } catch (error) {
          console.error('处理MQTT消息时出错:', error);
        }
      }
    );

    // 发送消息
    if (isConnected.value) {
      sendMessage('Hello', 'my/topic');
    }

    return {
      isConnected,
      error,
      sendMessage,
      disconnect
    };
  }
}

API 参考

Hook返回值

名称类型描述
mqttClientRef<MQTT | null>MQTT客户端实例
isConnectedRef<boolean>连接状态
errorRef<Error | undefined>错误信息
startMqtt(topic: string | string[], callback: Function, config?: MqttOptions) => Promise<void>启动MQTT连接
sendMessage(message: string, topic: string) => void发送消息
disconnect() => void断开连接

高级用法

错误处理

typescript
const { startMqtt, error } = useMqtt();

watch(error, (newError) => {
  if (newError) {
    console.error('MQTT错误:', newError);
  }
});

try {
  await startMqtt('my/topic', callback);
} catch (err) {
  console.error('连接失败:', err);
}

动态重连

typescript
const { startMqtt, disconnect, isConnected } = useMqtt();

watch(isConnected, async (connected) => {
  if (!connected) {
    console.log('连接断开,尝试重连...');
    try {
      await startMqtt('my/topic', callback);
    } catch (err) {
      console.error('重连失败:', err);
    }
  }
});

完整示例

useMqtt
消息监听
<template>
    <div class="mqtt-test-container p-6">
      <a-card title="MQTT Hook 测试" class="mb-4">
        <template #extra>
          <a-badge :status="isConnected ? 'success' : 'error'" :text="isConnected ? '已连接' : '未连接'" />
        </template>

        <!-- 配置区域 -->
        <a-row :gutter="[16, 16]" class="mb-4">
          <a-col :span="18">
            <a-input
              v-model:value="testConfig.webUri"
              placeholder="请输入 webUri"
              addonBefore="webUri"
            />
          </a-col>
          <a-col :span="6" class="flex items-center">
            <span class="mr-6px">useSSL</span> <a-switch v-model:checked="testConfig.useSSL" checked-children="开" un-checked-children="关" />
          </a-col>
        </a-row>
        <a-row :gutter="[16, 16]" class="mb-4">
          <a-col :span="12">
            <a-input
              v-model:value="testConfig.threadId"
              placeholder="请输入 Thread ID"
              addonBefore="Thread ID"
            />
          </a-col>
          <a-col :span="12">
            <a-input
              v-model:value="testConfig.token"
              placeholder="请输入 SA Token"
              addonBefore="Token"
            />
          </a-col>
        </a-row>

        <!-- 操作按钮区域 -->
        <a-space class="mb-4">
          <a-button
            type="primary"
            @click="handleConnect"
            :loading="connecting"
            :disabled="!testConfig.threadId || !testConfig.token || !testConfig.webUri"
          >
            <template #icon><wifi-outlined /></template>
            连接 MQTT
          </a-button>

          <a-button
            @click="handleDisconnect"
            :disabled="!isConnected"
            danger
          >
            <template #icon><disconnect-outlined /></template>
            断开连接
          </a-button>

          <a-button @click="clearMessages">
            <template #icon><clear-outlined /></template>
            清空消息
          </a-button>

          <a-button @click="toggleAutoScroll" :type="autoScroll ? 'primary' : 'default'">
            <template #icon><vertical-align-bottom-outlined /></template>
            {{ autoScroll ? '关闭' : '开启' }}自动滚动
          </a-button>
        </a-space>

        <!-- 状态信息 -->
        <a-descriptions :column="2" size="small" class="mb-4" bordered>
          <a-descriptions-item label="连接状态">
            <a-tag :color="isConnected ? 'green' : 'red'">
              {{ isConnected ? '已连接' : '未连接' }}
            </a-tag>
          </a-descriptions-item>
          <a-descriptions-item label="订阅主题">
            <a-tag color="blue">{{ currentTopic || '未订阅' }}</a-tag>
          </a-descriptions-item>
          <a-descriptions-item label="接收消息数">
            <a-statistic :value="messageCount" />
          </a-descriptions-item>
          <a-descriptions-item label="错误次数">
            <a-statistic :value="errorCount" :value-style="{ color: errorCount > 0 ? '#cf1322' : '#3f8600' }" />
          </a-descriptions-item>
        </a-descriptions>

        <!-- 错误信息 -->
        <a-alert
          v-if="error"
          :message="error.message"
          type="error"
          show-icon
          closable
          @close="error = null"
          class="mb-4"
        />
      </a-card>

      <!-- 消息日志区域 -->
      <a-card title="消息日志" size="small">
        <template #extra>
          <a-space>
            <a-tag color="processing">实时日志</a-tag>
            <a-switch
              v-model:checked="showDetails"
              checked-children="详细"
              un-checked-children="简洁"
            />
          </a-space>
        </template>

        <div ref="messageContainer" class="message-container">
          <div v-if="messages.length === 0" class="empty-state">
            <a-empty description="暂无消息">
              <template #image>
                <message-outlined style="font-size: 48px; color: #d9d9d9;" />
              </template>
            </a-empty>
          </div>

          <div v-else>
            <div
              v-for="(message, index) in messages"
              :key="index"
              class="message-item"
              :class="getMessageClass(message.type)"
            >
              <div class="message-header">
                <a-space>
                  <a-tag :color="getMessageTypeColor(message.type)">
                    {{ message.type }}
                  </a-tag>
                  <span class="message-time">{{ message.timestamp }}</span>
                  <a-button
                    v-if="message.type === 'message' && message.data"
                    size="small"
                    type="link"
                    @click="toggleMessageDetail(index)"
                  >
                    {{ message.showDetail ? '收起' : '展开' }}详情
                  </a-button>
                </a-space>
              </div>

              <div class="message-content">
                <template v-if="message.type === 'info' || message.type === 'error'">
                  <pre class="message-text">{{ message.content }}</pre>
                </template>

                <template v-else-if="message.type === 'message'">
                  <div class="mqtt-message">
                    <div class="topic-info">
                      <strong>主题:</strong> {{ message.topic }}
                    </div>

                    <div v-if="showDetails || message.showDetail" class="message-detail">
                      <a-divider>原始消息</a-divider>
                      <pre class="raw-message">{{ message.rawMessage }}</pre>

                      <template v-if="message.data">
                        <a-divider>解析后数据</a-divider>
                        <div class="parsed-data">
                          <div class="data-item">
                            <strong>消息类型:</strong>
                            <a-tag>{{ message.data.sessionContent?.type || '未知' }}</a-tag>
                          </div>
                          <div class="data-item">
                            <strong>会话内容:</strong>
                            <pre class="session-content">{{ formatJson(message.data.sessionContent) }}</pre>
                          </div>
                        </div>
                      </template>
                    </div>

                    <div v-else class="message-summary">
                      消息类型: <a-tag>{{ message.data?.sessionContent?.type || '未知' }}</a-tag>
                      <a-button size="small" type="link" @click="toggleMessageDetail(index)">
                        查看详情
                      </a-button>
                    </div>
                  </div>
                </template>
              </div>
            </div>
          </div>
        </div>
      </a-card>

      <!-- 测试工具区域 -->
      <a-card title="测试工具" class="mt-4" size="small">
        <a-row :gutter="16">
          <a-col :span="12">
            <h4>模拟消息发送</h4>
            <a-form layout="vertical">
              <a-form-item label="消息类型">
                <a-select v-model:value="mockMessage.type" placeholder="选择消息类型">
                  <a-select-option value="restoreConnection">恢复连接</a-select-option>
                  <a-select-option value="newMessage">新消息</a-select-option>
                  <a-select-option value="statusUpdate">状态更新</a-select-option>
                  <a-select-option value="custom">自定义</a-select-option>
                </a-select>
              </a-form-item>

              <a-form-item label="消息数据">
                <a-textarea
                  v-model:value="mockMessage.data"
                  :rows="4"
                  placeholder="请输入JSON格式的消息数据"
                />
              </a-form-item>

              <a-button type="primary" @click="sendMockMessage" :disabled="!isConnected">
                发送测试消息
              </a-button>
            </a-form>
          </a-col>

          <a-col :span="12">
            <h4>连接统计</h4>
            <a-statistic-countdown
              title="连接时长"
              :value="connectionStartTime + 1000 * 60 * 60 * 24"
              v-if="isConnected && connectionStartTime"
            />
            <a-descriptions :column="1" size="small">
              <a-descriptions-item label="总接收消息">{{ messageCount }}</a-descriptions-item>
              <a-descriptions-item label="成功处理">{{ successCount }}</a-descriptions-item>
              <a-descriptions-item label="处理失败">{{ errorCount }}</a-descriptions-item>
              <a-descriptions-item label="处理成功率">
                {{ messageCount > 0 ? ((successCount / messageCount) * 100).toFixed(1) : 0 }}%
              </a-descriptions-item>
            </a-descriptions>
          </a-col>
        </a-row>
      </a-card>
      <p class="">
        👉 注: 该测试组件仅用于演示 useMqtt Hook 的基本功能和消息处理流程,实际使用中请根据具体需求进行调整和优化。此外,日志可在浏览器控制台查看。
      </p>
    </div>
  </template>

  <script setup lang="ts" name="MqttTest">
  import { ref, reactive, nextTick, onMounted } from 'vue'
  import { message as antMessage } from 'ant-design-vue'
  import { useMqtt } from '@pt/hooks' // 根据实际路径调整
  import {
    WifiOutlined,
    DisconnectOutlined,
    ClearOutlined,
    VerticalAlignBottomOutlined,
    MessageOutlined
  } from '@ant-design/icons-vue'

  // 测试配置
  const testConfig = reactive({
    webUri: 'ws://bcapi-dev1.e-tudou.com:9080', // 默认测试地址
    threadId: '0728853c-eeb5-4708-802d-fdd6af6e8e6e', // 默认测试ID
    token: '0c60ae2e-5012-417c-8d21-35088ec00785', // 默认测试token
    useSSL: false
  })

  // MQTT实例 - 在组件setup阶段初始化
  const { startMqtt, disconnect, isConnected, error } = useMqtt({
    mqttConfig: {
      vhost: "mqtt",
      password: "tudou-agentos",
      webUri: testConfig.webUri,
      username: "tudou-agentos",
      useSSL: testConfig.useSSL
    },
    satoken: testConfig.token,
    basePath: "/webkit-pro/"
  })

  // 状态管理
  const connecting = ref(false)
  const messageContainer = ref<HTMLElement>()
  const autoScroll = ref(true)
  const showDetails = ref(false)

  // 统计数据
  const messageCount = ref(0)
  const successCount = ref(0)
  const errorCount = ref(0)
  const connectionStartTime = ref<number | null>(null)
  const currentTopic = ref('')

  // 消息日志
  interface LogMessage {
    type: 'info' | 'error' | 'message'
    content?: string
    topic?: string
    rawMessage?: string
    data?: any
    timestamp: string
    showDetail?: boolean
  }

  const messages = ref<LogMessage[]>([])

  // 模拟消息
  const mockMessage = reactive({
    type: 'restoreConnection',
    data: JSON.stringify({
      sessionId: 'test-session',
      status: 'active'
    }, null, 2)
  })

  // MQTT消息处理映射
  const handleMqttMessageMap: Record<string, (data: unknown) => void> = {
    restoreConnection: (data: any) => {
      addLogMessage('info', `恢复连接处理: ${JSON.stringify(data)}`)
      successCount.value++
    },
    newMessage: (data: any) => {
      addLogMessage('info', `新消息处理: ${JSON.stringify(data)}`)
      successCount.value++
    },
    statusUpdate: (data: any) => {
      addLogMessage('info', `状态更新处理: ${JSON.stringify(data)}`)
      successCount.value++
    }
  }

  // 添加日志消息
  const addLogMessage = (type: LogMessage['type'], content: string, topic?: string, rawMessage?: string, data?: any) => {
    messages.value.push({
      type,
      content,
      topic,
      rawMessage,
      data,
      timestamp: new Date().toLocaleTimeString(),
      showDetail: false
    })

    if (autoScroll.value) {
      nextTick(() => {
        if (messageContainer.value) {
          messageContainer.value.scrollTop = messageContainer.value.scrollHeight
        }
      })
    }
  }

  // 监听MQTT消息
  const listenMqttMessage = async (threadId: string, callback?: (msg: any) => void) => {
    addLogMessage('info', `开始连接MQTT,主题: MQTT/chat/${threadId}`)

    try {
      // 使用当前的 testConfig 配置重新连接
      await startMqtt(`MQTT/chat/${threadId}`, (_topic: any, message: any) => {
        messageCount.value++
        const rawMsg = message.toString()

        addLogMessage('message', '', _topic, rawMsg)

        if (callback) {
          callback(message)
        }

        try {
          const msg = JSON.parse(rawMsg)
          const sessionContent = JSON.parse(msg.sessionContent)

          // 更新最后一条消息的解析数据
          if (messages.value.length > 0) {
            const lastMessage = messages.value[messages.value.length - 1]
            if (lastMessage.type === 'message') {
              lastMessage.data = { ...msg, sessionContent }
            }
          }

          // 检查必要的字段
          if (!sessionContent.type) {
            throw new Error("推送的消息没有类型,请按标准格式推送...")
          }

          // 检查类型是否在映射表中
          const handler = handleMqttMessageMap[sessionContent.type]
          if (!handler) {
            addLogMessage('error', `未知的消息类型: ${sessionContent.type}`)
            errorCount.value++
            return
          }

          // 执行对应的处理函数
          handler(sessionContent.data)

        } catch (err) {
          const errorMsg = `处理MQTT消息时出错: ${err instanceof Error ? err.message : String(err)}`
          addLogMessage('error', errorMsg)
          errorCount.value++
        }
      }, {
        mqttConfig: {
          vhost: "mqtt",
          password: "tudou-agentos",
          webUri: testConfig.webUri,
          username: "tudou-agentos",
          useSSL: testConfig.useSSL
        },
        satoken: testConfig.token,
        basePath: "/webkit-pro/"
      })

      connectionStartTime.value = Date.now()
      currentTopic.value = `MQTT/chat/${threadId}`
      addLogMessage('info', 'MQTT连接成功')

    } catch (err) {
      const errorMsg = `MQTT连接失败: ${err instanceof Error ? err.message : String(err)}`
      addLogMessage('error', errorMsg)
      throw err
    }
  }

  // 连接处理
  const handleConnect = async () => {
    if (!testConfig.threadId || !testConfig.token || !testConfig.webUri) {
      antMessage.warning('请填写完整的配置信息')
      return
    }

    connecting.value = true

    try {
      await listenMqttMessage(testConfig.threadId)
    } catch (err) {
      antMessage.error(`连接失败: ${err instanceof Error ? err.message : String(err)}`)
    } finally {
      connecting.value = false
    }
  }

  // 断开连接处理
  const handleDisconnect = async () => {
    try {
      await disconnect()
      connectionStartTime.value = null
      currentTopic.value = ''
      addLogMessage('info', 'MQTT连接已断开')
      antMessage.success('MQTT连接已断开')
    } catch (err) {
      antMessage.error(`断开连接失败: ${err instanceof Error ? err.message : String(err)}`)
    }
  }

  // 清空消息
  const clearMessages = () => {
    messages.value = []
    messageCount.value = 0
    successCount.value = 0
    errorCount.value = 0
    antMessage.success('消息日志已清空')
  }

  // 切换自动滚动
  const toggleAutoScroll = () => {
    autoScroll.value = !autoScroll.value
    antMessage.info(`自动滚动已${autoScroll.value ? '开启' : '关闭'}`)
  }

  // 切换消息详情显示
  const toggleMessageDetail = (index: number) => {
    messages.value[index].showDetail = !messages.value[index].showDetail
  }

  // 获取消息类型颜色
  const getMessageTypeColor = (type: string) => {
    switch (type) {
      case 'info': return 'blue'
      case 'error': return 'red'
      case 'message': return 'green'
      default: return 'default'
    }
  }

  // 获取消息样式类
  const getMessageClass = (type: string) => {
    return `message-${type}`
  }

  // 格式化JSON
  const formatJson = (obj: any) => {
    return JSON.stringify(obj, null, 2)
  }

  // 发送模拟消息
  const sendMockMessage = () => {
    try {
      const messageData = {
        sessionContent: JSON.stringify({
          type: mockMessage.type,
          data: JSON.parse(mockMessage.data)
        })
      }

      // 模拟接收到消息(实际使用中这会通过MQTT推送)
      const mockRawMessage = JSON.stringify(messageData)

      // 手动触发消息处理流程
      messageCount.value++
      addLogMessage('message', '', currentTopic.value, mockRawMessage, {
        ...messageData,
        sessionContent: JSON.parse(messageData.sessionContent)
      })

      // 执行处理逻辑
      const sessionContent = JSON.parse(messageData.sessionContent)
      const handler = handleMqttMessageMap[sessionContent.type]
      if (handler) {
        handler(sessionContent.data)
      } else {
        addLogMessage('error', `未知的消息类型: ${sessionContent.type}`)
        errorCount.value++
      }

      antMessage.success('模拟消息发送成功')

    } catch (err) {
      antMessage.error(`发送模拟消息失败: ${err instanceof Error ? err.message : String(err)}`)
    }
  }
  </script>

  <style lang="less" scoped>
  .mqtt-test-container {
    max-width: 1200px;
    margin: 0 auto;

    .message-container {
      max-height: 500px;
      overflow-y: auto;
      border: 1px solid #f0f0f0;
      border-radius: 6px;
      padding: 16px;
      background: #fafafa;

      .empty-state {
        text-align: center;
        padding: 40px 0;
      }

      .message-item {
        margin-bottom: 12px;
        padding: 12px;
        border-radius: 6px;
        border-left: 4px solid #d9d9d9;
        background: white;

        &.message-info {
          border-left-color: #1890ff;
        }

        &.message-error {
          border-left-color: #ff4d4f;
          background: #fff2f0;
        }

        &.message-message {
          border-left-color: #52c41a;
        }

        .message-header {
          margin-bottom: 8px;

          .message-time {
            color: #999;
            font-size: 12px;
          }
        }

        .message-content {
          .message-text {
            margin: 0;
            padding: 8px;
            background: #f5f5f5;
            border-radius: 4px;
            font-size: 12px;
            white-space: pre-wrap;
            word-break: break-all;
          }

          .mqtt-message {
            .topic-info {
              margin-bottom: 8px;
              font-size: 12px;
              color: #666;
            }

            .message-detail {
              .raw-message, .session-content {
                margin: 0;
                padding: 8px;
                background: #f5f5f5;
                border-radius: 4px;
                font-size: 11px;
                white-space: pre-wrap;
                word-break: break-all;
                max-height: 200px;
                overflow-y: auto;
              }

              .parsed-data {
                .data-item {
                  margin-bottom: 8px;

                  strong {
                    display: inline-block;
                    min-width: 80px;
                  }
                }
              }
            }

            .message-summary {
              font-size: 12px;
              color: #666;
            }
          }
        }
      }
    }
  }

  // UnoCSS 工具类补充
  .p-6 { padding: 1.5rem; }
  .mb-4 { margin-bottom: 1rem; }
  .mt-4 { margin-top: 1rem; }
  </style>

Released under the MIT License.