文章目錄
16 MQTT協議分析應用開發
16.1 mqtt協議介紹
16.1.1 概述
16.1.2 特點
16.1.3 應用
16.2 mqtt協議報文格式組成
16.2.1 mqtt控制報文結構
16.2.2 mqtt固定報頭
16.2.3 mqtt控制報文類型
16.2.4 標記
16.2.5 剩余長度
16.2.5.1 示例
16.2.6 可變報頭
16.2.7 有效載荷
16.3 報文分析
16.3.1 CONNECT-連接服務端
16.3.1.1 connect固定報頭
16.3.1.2 協議名字節組成
16.3.1.3 協議級別
16.3.1.4 連接標記
16.3.1.5 保持連接
16.3.1.6 客戶端標識符
16.3.1.7 遺囑主題
16.3.1.8 遺囑消息
16.3.1.9 用戶名和密碼
16.3.10.1 wirshark抓包分析connect報文
16.3.10.2 c語言構造mqtt connect報文
16.3.2 CONNACK-確認連接請求
16.3.2.1 固定報頭
16.3.2.2 可變報頭
16.3.2.3 CONNACK報文wireshark抓包分析
16.3.2.4 c語言構造connect ack報文
16.3.3 PUBLISH-發布消息
16.3.3.1 固定報頭
16.3.3.2 抓包分析PUBLISH報文
16.3.3.3 構造publish 報文
16.3.4 PUBREC-發布收到
16.3.4.1 固定報頭
16.3.4.2 PUBREC抓包報文
16.3.4.3 c語言構造pubrec報文
16.3.5 PUBREL-發布釋放
16.3.5.1 固定報頭
16.3.5.2 PUBREL抓包報文
16.3.5.3 c語言構造pubrel報文
16.3.6 PUBCOMP-發布完成
16.3.6.1 固定報頭
16.3.6.2 PUBCOMP抓包報文
16.3.6.3 c語言構造pubcom報文
16.3.7 PINGREQ-心跳請求
16.3.7.1 固定報頭
16.3.7.2 PINGREQ 抓包報文
16.3.7.3 c語言構造pingreq報文
16.3.8 PINGRESP – 心跳響應
16.3.8.1 固定報頭
16.3.8.2 PINGRESP 抓包報文
16.3.8.3 c語言構造pinpresp報文
16.3.9 DISCONNECT –斷開連接
16.3.9.1 固定報頭
16.3.9.2 DISCONNECT 抓包報文
16.3.9.3 c語言構造disconnect報文
16.3.10 SUBSCRIBE-訂閱主題
16.3.10.1 固定報頭
16.3.10.2 SUBSCRIBE報文抓包
16.3.10.3 c語言構造subscribe報文
16.3.11 SUBACK –訂閱確認
16.3.11.1 固定報頭
3.11.2 SUBACK抓包報文
16.3.11.3 c語言構造suback報文
16.3.12 UNSUBSCRIBE –取消訂閱
16.3.12.1 固定報頭
16.3.12.2 UNSUBSCRIBE抓包報文
16.3.12.3 c語言構造unsubscribe報文
16.3.13 UNSUBACK –取消訂閱確認
16.3.13.1 固定報頭
16.3.12.2 UNSUBSCRIBE ACK抓包報文
16.3.12.3 c語言構造unsubscribe報文
16.3.14 服務端與客戶端交互操作過程
16.3.14.1 編譯
16.3.14.2 執行
16 MQTT協議分析應用開發
16.1 mqtt協議介紹
16.1.1 概述
MQTT是一個客戶端服務端架構的發布/訂閱模式的消息傳輸協議。它的設計思想是輕巧、開放、簡單、規范,易于實現。這些特點使得它對很多場景來說都是很好的選擇,特別是對于受限的環境如機器與機器的通信(M2M)以及物聯網環境(IoT)。
16.1.2 特點
a) 開放消息協議,簡單易實現
b) 發布訂閱模式,一對多消息發布
c) 基于TCP/IP網絡連接
d) 1字節固定報頭,2字節心跳報文,報文結構緊湊
e) 消息QoS支持,可靠傳輸保證
16.1.3 應用
MQTT協議廣泛應用于物聯網、移動互聯網、智能硬件、車聯網、電力能源等領域。
a) 物聯網M2M通信,物聯網大數據采集
b) Android消息推送,WEB消息推送
c) 移動即時消息,例如Facebook Messenger
d) 智能硬件、智能家具、智能電器
e) 車聯網通信,電動車站樁采集
f) 智慧城市、遠程醫療、遠程教育
g) 電力、石油與能源等行業市場
16.2 mqtt協議報文格式組成
16.2.1 mqtt控制報文結構
MQTT 協議通過交換預定義的 MQTT 控制報文來通信。 這一節描述這些報文的格式。MQTT 控制報文由三部分組成,如下圖:
圖2.1 mqtt報文組成
16.2.2 mqtt固定報頭
每個 MQTT 控制報文都包含一個固定報頭, 固定報頭的格式如下圖:
圖2.2 mqtt固定報頭
16.2.3 mqtt控制報文類型
位置: 第 1 個字節, 二進制位 7-4,表示為 4 位無符號值。
MQTT 控制報文的類型:如下表:
Reserved | 0 | 禁止 | 保留 |
CONNECT | 1 | 客戶端到服務端 | 客戶端請求連接服務端 |
CONNACK | 2 | 服務端到客戶端 | 連接報文確認 |
PUBLISH | 3 | 兩個方向都允許 | 發布消息 |
PUBACK | 4 | 兩個方向都允許 | QoS 1消息發布收到確認 |
PUBREC | 5 | 兩個方向都允許 | 發布收到(保證交付第一步) |
PUBREL | 6 | 兩個方向都允許 | 發布釋放(保證交付第二步) |
PUBCOMP | 7 | 兩個方向都允許 | QoS 2消息發布完成(保證交互第三步) |
SUBSCRIBE | 8 | 客戶端到服務端 | 客戶端訂閱請求 |
SUBACK | 9 | 服務端到客戶端 | 訂閱請求報文確認 |
UNSUBSCRIBE | 10 | 客戶端到服務端 | 客戶端取消訂閱請求 |
UNSUBACK | 11 | 服務端到客戶端 | 取消訂閱報文確認 |
PINGREQ | 12 | 客戶端到服務端 | 心跳請求 |
PINGRESP | 13 | 服務端到客戶端 | 心跳響應 |
DISCONNECT | 14 | 客戶端到服務端 | 客戶端斷開連接 |
Reserved | 15 | 禁止 | 保留 |
名字 | 值 | 報文流動方向 | 描述 |
---|
16.2.4 標記
固定報頭第 1 個字節的剩余的 4 位 [3-0]包含每個 MQTT 控制報文類型特定的標志 。標記位說明如下表所示:
CONNECT | Reserved | 0 | 0 | 0 | 0 |
CONNACK | Reserved | 0 | 0 | 0 | 0 |
PUBLISH | Used in MQTT 3.1.1 | DUP1 | QoS2 | QoS2 | RETAIN3 |
PUBACK | Reserved | 0 | 0 | 0 | 0 |
PUBREC | Reserved | 0 | 0 | 0 | 0 |
PUBREL | Reserved | 0 | 0 | 1 | 0 |
PUBCOMP | Reserved | 0 | 0 | 0 | 0 |
SUBSCRIBE | Reserved | 0 | 0 | 1 | 0 |
SUBACK | Reserved | 0 | 0 | 0 | 0 |
UNSUBSCRIBE | Reserved | 0 | 0 | 1 | 0 |
UNSUBACK | Reserved | 0 | 0 | 0 | 0 |
PINGREQ | Reserved | 0 | 0 | 0 | 0 |
PINGRESP | Reserved | 0 | 0 | 0 | 0 |
DISCONNECT | Reserved | 0 | 0 | 0 | 0 |
控制報文 | 固定報頭標志 | Bit 3 | Bit 2 | Bit 1 | Bit 0 |
---|
DUP1 =控制報文的重復分發標志
QoS2 = PUBLISH 報文的服務質量等級
RETAIN3 = PUBLISH 報文的保留標志
16.2.5 剩余長度
位置:從第二個字節開始。剩余長度( Remaining Length) 表示當前報文剩余部分的字節數, 包括可變報頭和負載的數據。 剩余長度不包括用于編碼剩余長度字段本身的字節數。
圖2.3 剩余長度包含的報文范圍
剩余長度字段使用一個變長度編碼方案, 對小于 128 的值它使用單字節編碼。 更大的值按下面的方式處理。低 7 位有效位用于編碼數據,最高有效位用于指示是否有更多的字節。 因此每個字節可以編碼 128 個數值和一個延續位( continuation bit) 。 剩余長度字段最大 4 個字節。
例如, 十進制數 64 會被編碼為一個字節, 數值是 64, 十六進制表示為 0x40,。十進制數字321(=65+2*128)被編碼為兩個字節, 最低有效位在前。 第一個字節是 65+128=193。 注意最高位為
1 表示后面至少還有一個字節。 第二個字節是 2。
16.2.5.1 示例
123456 = 964 x 128 + 64 964 = 7x128 + 68 7 < 128 也就是123456 = (7 x 128 + 68)x128 + 64 展開:64 + 68 x128 + 7x128x128 第一字節:64 | 0x80 = x (0x80=0x1000 0000或上最高位表示是否還有更多的字節) 第二字節:68 | 0x80 = y (0x80=0x1000 0000或上最高位表示是否還有更多的字節) 第三字節:7=z c語言表示:unsigned char len_byte[4] = { 64 | 128 , 68 | 128, 7 , 0 } 反過來,如果要算出123456 x-128 + (y-128)*128 + z x 128 x 128
把剩余長度轉換成字節表示:
把字節轉換成剩余長度表示:
16.2.6 可變報頭
某些 MQTT 控制報文包含一個可變報頭部分。 它在固定報頭和負載之間??勺儓箢^的內容根據報文類型的不同而不同。報文標識符是可變報頭一種,可變報頭的報文標識符( Packet Identifier) 字段存在于在多個類型的報文里。
報文標識符類型如下圖:
圖2.4 報文標識符
很多控制報文的可變報頭部分包含一個兩字節的報文標識符字段。 這些報文是 PUBLISH( QoS>0 時) ,PUBACK, PUBREC, PUBREL, PUBCOMP, SUBSCRIBE, SUBACK, UNSUBSCIBE,UNSUBACK,如下表所示:
CONNECT | 不需要 |
CONNACK | 不需要 |
PUBLISH | 需要(如果QoS > 0) |
PUBACK | 需要 |
PUBREC | 需要 |
PUBREL | 需要 |
PUBCOMP | 需要 |
SUBSCRIBE | 需要 |
SUBACK | 需要 |
UNSUBSCRIBE | 需要 |
UNSUBACK | 需要 |
PINGREQ | 不需要 |
PINGRESP | 不需要 |
DISCONNECT | 不需要 |
控制報文 | 報文標識符字段 |
---|
客戶端和服務端彼此獨立地分配報文標識符。 因此,客戶端服務端組合使用相同的報文標識符可以實現并發的消息交換。
例如,當client發送一個packet Identifier =0x1234的報文給server時,server的回復報文packet identifier 必須是0x1234,Packet identifier 從1開始遞增,到達65535時,又從1開始計算。
圖2.5 需要 Packet Identifier 的報文類型交互示意圖
16.2.7 有效載荷
某些 MQTT 控制報文在報文的最后部分包含一個有效載荷,帶有有效載荷報文類型如下表所示:
CONNECT | 需要 |
CONNACK | 不需要 |
PUBLISH | 可選 |
PUBACK | 不需要 |
PUBREC | 不需要 |
PUBREL | 不需要 |
PUBCOMP | 不需要 |
SUBSCRIBE | 需要 |
SUBACK | 需要 |
UNSUBSCRIBE | 需要 |
UNSUBACK | 不需要 |
PINGREQ | 不需要 |
PINGRESP | 不需要 |
DISCONNECT | 不需要 |
控制報文 | 有效載荷 |
---|
16.3 報文分析
16.3.1 CONNECT-連接服務端
客戶端到服務端的網絡連接建立(完成三次握手)后,客戶端發送給服務端的第一個報文必須是 CONNECT 報文。
圖3.1 三次握手與mqtt connect交互過程
在一個網絡連接上,客戶端只能發送一次 CONNECT 報文。服務端必須將客戶端發送的第二個 CONNECT報文當作協議違規處理并斷開客戶端的連接。
有效載荷包含一個或多個編碼的字段。 包括客戶端的唯一標識符, Will 主題, Will 消息, 用戶名和密碼。 除了客戶端標識之外, 其它的字段都是可選的, 基于標志位來決定可變報頭中是否需要包含這些字段。
圖3.2 connect報文組成
16.3.1.1 connect固定報頭
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
Byte1 | Mqtt報文類型(1) | Reserved(保留位) | ||||||
0 | 0 | 0 | 1 | 0 | 0 | 0 | 0 | |
Byte2~n | 剩余長度 |
表格3.1
16.3.1.2 協議名字節組成
說明 | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
協議名 | |||||||||
Byte1 | 協議名長度MSB(0) | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 |
Byte2 | 協議名長度LSB(4) | 0 | 0 | 0 | 0 | 0 | 1 | 0 | 0 |
Byte3 | ‘M’ | 0 | 1 | 0 | 0 | 1 | 1 | 0 | 1 |
Byte4 | ‘Q’ | 0 | 1 | 0 | 1 | 0 | 0 | 0 | 1 |
Byte5 | ‘T’ | 0 | 1 | 0 | 1 | 0 | 1 | 0 | 0 |
Byte6 | ‘T’ | 0 | 1 | 0 | 1 | 0 | 1 | 0 | 0 |
數據包檢測工具, 例如防火墻, 可以使用協議名來識別 MQTT 流量。
16.3.1.3 協議級別
說明 | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
協議級別 | |||||||||
Byte7 | Level(4) | 0 | 0 | 0 | 0 | 0 | 1 | 0 | 0 |
客戶端用 8 位的無符號值表示協議的修訂版本。對于 3.1.1 版協議,協議級別字段的值是 4(0x04)。如果發現不支持的協議級別,服務端必須給發送一個返回碼為 0x01(不支持的協議級別)的CONNACK 報文響應CONNECT 報文, 然后斷開客戶端的連接。
16.3.1.4 連接標記
用戶名標記 | 用戶密碼標記 | Will retain | Will qos | Will flag | 清除會話 | reserved | ||
Byte 8 | x | x | x | x | x | x | 0 | |
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
---|
bit1清除會話
一般來說, 客戶端連接時總是將清理會話標志設置為 0 或 1, 并且不交替使用兩種值。 這個選擇取決于具體的應用。 清理會話標志設置為 1 的客戶端不會收到舊的應用消息, 而且在每次連接成功后都需要重新訂閱任何相關的主題。清理會話標志設置為 0 的客戶端會收到所有在它連接斷開期間發布的 QoS 1 和 QoS 2 級別的消息。因此, 要確保不丟失連接斷開期間的消息, 需要使用 QoS 1 或QoS 2 級別,同時將清理會話標志設置為 0。
Bit2遺囑標志
遺囑標志(Will Flag) 被設置為 1,表示如果連接請求被接受了, 遺囑(Will Message) 消息必須被存儲在服務端并且與這個網絡連接關聯。之后網絡連接關閉時,服務端必須發布這個遺囑消息, 除非服務端收到DISCONNECT 報文時刪除了這個遺囑消息。
Bit3和 bit4遺囑 QoS
這兩位用于指定發布遺囑消息時使用的服務質量等級, 如果遺囑標志被設置為 0, 遺囑 QoS 也必須設置為 0(0x00),如果遺囑標志被設置為 1, 遺囑 QoS 的值可以等于 0(0x00), 1(0x01), 2(0x02), 它的值不能等于 3。
Bit5遺囑保留
如果遺囑消息被發布時需要保留,需要指定這一位的值, 如果遺囑標志被設置為 0, 遺囑保留(Will Retain) 標志也必須設置為 0 。
如果遺囑標志被設置為 1:
· 如果遺囑保留被設置為 0, 服務端必須將遺囑消息當作非保留消息發布 。
· 如果遺囑保留被設置為 1, 服務端必須將遺囑消息當作保留消息發布。
Bit7用戶名標志
如果用戶名(User Name) 標志被設置為 0, 有效載荷中不能包含用戶名字段。
如果用戶名(User Name) 標志被設置為 1, 有效載荷中必須包含用戶名字段。
Bit6用戶名密碼標記
如果密碼(Password) 標志被設置為 0, 有效載荷中不能包含密碼字段 。
如果密碼(Password) 標志被設置為 1, 有效載荷中必須包含密碼字段 。
如果用戶名標志被設置為 0, 密碼標志也必須設置為 0 。
16.3.1.5 保持連接
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
Byte9 | 保持連接 Keep Alive MSB | |||||||
Byte10 | 保持連接 Keep Alive LSB |
a) 保持連接(Keep Alive) 是一個以秒為單位的時間間隔,表示為一個 16 位的字,它是指在客戶端傳輸完成。
b) 一個控制報文的時刻到發送下一個報文的時刻, 兩者之間允許空閑的最大時間間隔。 客戶端負責保證控制。
c) 報文發送的時間間隔不超過保持連接的值。 如果沒有任何其它的控制報文可以發送, 客戶端必須發送一個PINGREQ 報文。
d) 不管保持連接的值是多少,客戶端任何時候都可以發送 PINGREQ 報文,并且使用 PINGRESP 報文判斷網絡和服務端的活動狀態。
e) 如果保持連接的值非零,并且服務端在一點五倍的保持連接時間內沒有收到客戶端的控制報文, 它必須斷開客戶端的網絡連接, 認為網絡連接已斷開。
f) 客戶端發送了 PINGREQ 報文之后, 如果在合理的時間內仍沒有收到 PINGRESP 報文, 它應該關閉到服務端的網絡連接。
g) 保持連接的值為零表示關閉保持連接功能。 這意味著,服務端不需要因為客戶端不活躍而斷開連接。 注意:不管保持連接的值是多少, 任何時候,只要服務端認為客戶端是不活躍或無響應的, 可以斷開客戶端的連接。
16.3.1.6 客戶端標識符
服務端使用客戶端標識符 (ClientId) 識別客戶端。 連接服務端的每個客戶端都有唯一的客戶端標識符(ClientId) 。客戶端和服務端都必須使用 ClientId 識別兩者之間的 MQTT 會話相關的狀態, 客戶端標識符 (ClientId) 必須存在而且必須是 CONNECT 報文有效載荷的第一個字段,客戶端標識符必須是UTF-8 編碼字符串。
16.3.1.7 遺囑主題
如果遺囑標志被設置為 1, 有效載荷的下一個字段是遺囑主題(Will Topic) 。 遺囑主題必須是 UTF-8 編碼字符串。
16.3.1.8 遺囑消息
如果遺囑標志被設置為 1, 有效載荷的下一個字段是遺囑消息。 遺囑消息定義了將被發布到遺囑主題的應用消息。
16.3.1.9 用戶名和密碼
如果用戶名( User Name) 標志被設置為 1, 有效載荷的下一個字段就是它。 用戶名必須是定義的UTF-8 編碼字符串。服務端可以將它用于身份驗證和授權。
如果密碼( Password) 標志被設置為 1, 有效載荷的下一個字段就是它。密碼字段包含一個兩字節的長度字段, 長度表示二進制數據的字節數( 不包含長度字段本身占用的兩個字節),后面跟著 0 到 65535 字節的二進制數據。
圖3.2 用戶名和密碼在connect報文中的組成
16.3.10.1 wirshark抓包分析connect報文
從抓包可知,從上到下分別是固定報頭,可變報頭,連接標記,保持連接,用戶名,用名密碼,其中沒有遺囑相關消息字段,與3.1.1節分析的固定報頭組成分析一致。
圖 3.3使用wireshark抓包分析connect報文組成格式
16.3.10.2 c語言構造mqtt connect報文
static uint8_t client_id[512] = {"mqtt_client"}; static uint8_t user_name[512] = {"mqtt"}; static uint8_t passwd[512] = {"12345678"}; #define KEEP_ALIVE 20 int mqtt_connect(int sockfd) { uint8 flags = 0x00; uint8 *packet = NULL; uint16 packet_length = 0; uint16 clientidlen = strlen(client_id); uint16 usernamelen = strlen(user_name); uint16 passwordlen = strlen(passwd); uint16 payload_len = clientidlen + 2; // Variable header uint8 var_header[10] = { 0x00,0x04,/*len*/ 0x4d,0x51,0x54,0x54,/*mqtt*/ 0x04,/*協議版本*/}; uint8 fixedHeaderSize = 2; // Default size = one byte Message Type + one byte Remaining Length uint8 remainLen = 0; uint8 *fixed_header = NULL; uint16 offset = 0; // Preparing the flags if(usernamelen) { /*用戶名長度(可選)*/ payload_len += usernamelen + 2; flags |= MQTT_USERNAME_FLAG;/*或上用戶名標記*/ } if(passwordlen) { /*用戶密碼(可選)*/ payload_len += passwordlen + 2; flags |= MQTT_PASSWORD_FLAG;/*用戶密碼標記位*/ } flags |= MQTT_CLEAN_SESSION; var_header[7] = flags;/*連接標記*/ var_header[8] = KEEP_ALIVE>>8;/*保持連接字段,占用兩個字節*/ var_header[9] = KEEP_ALIVE&0xFF; remainLen = sizeof(var_header)+payload_len; /*剩余長度,也就是可變報頭加上負載的長度*/ if (remainLen > 127) { fixedHeaderSize++;// add an additional byte for Remaining Length } fixed_header = (uint8 *)malloc(fixedHeaderSize); /*固定報頭*/ // Message Type *fixed_header = MQTT_MSG_CONNECT;/*報文類型,connect*/ if (remainLen <= 127) {// Remaining Length,剩余長度計算,可變長編碼 *(fixed_header+1) = remainLen; } else { // first byte is remainder (mod) of 128, then set the MSB to indicate more bytes *(fixed_header+1) = remainLen % 128; *(fixed_header+1) = *(fixed_header+1) | 0x80; // second byte is number of 128s *(fixed_header+2) = remainLen / 128; } packet_length = fixedHeaderSize+sizeof(var_header)+payload_len;/*固定報頭+可變報頭+負載長度*/ packet = (uint8 *)malloc(packet_length);/*分配內存*/ memset(packet, 0, packet_length); memcpy(packet, fixed_header, fixedHeaderSize);/*填充固定報頭*/ free(fixed_header); offset += fixedHeaderSize; memcpy(packet+offset, var_header, sizeof(var_header));/*填充可變報頭*/ offset += sizeof(var_header); packet[offset++] = clientidlen>>8;// Client ID - UTF encoded,填充clientid長度+clientid packet[offset++] = clientidlen&0xFF; memcpy(packet+offset, client_id, clientidlen); offset += clientidlen; if(usernamelen) {// Username - UTF encoded,填充用戶名+用戶名長度 packet[offset++] = usernamelen>>8; packet[offset++] = usernamelen&0xFF; memcpy(packet+offset, user_name, usernamelen); offset += usernamelen; } if(passwordlen) {// Password - UTF encoded,填充用戶密碼+用戶名密碼長度 packet[offset++] = passwordlen>>8; packet[offset++] = passwordlen&0xFF; memcpy(packet+offset, passwd, passwordlen); offset += passwordlen; } // Send the packet if (client_send(sockfd,packet, packet_length) < 0){ free(packet); return -1; } free(packet); return 1; }
16.3.2 CONNACK-確認連接請求
服務端發送 CONNACK 報文響應從客戶端收到的 CONNECT 報文。服務端發送給客戶端的第一個報文必須是 CONNACK。
16.3.2.1 固定報頭
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
Byte1 | MQTT 控制報文類型 (2) | Reserved 保留位 | ||||||
0 | 0 | 1 | 0 | 0 | 0 | 0 | 0 | |
Byte2 | 剩余長度 | |||||||
0 | 0 | 0 | 0 | 0 | 0 | 1 | 0 |
剩余長度字段表示可變報頭的長度。 對于 CONNACK 報文這個值等于 2。
16.3.2.2 可變報頭
描述 | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
連接確認標記 | 保留位 | SP1 | |||||||
Byte1 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | X | |
連接返回碼 | |||||||||
Byte2 | x | x | x | x | x | x | x | x |
Byte1,Bit0連接確認標志
位 7-1 是保留位且必須設置為 0,
對于bit0,如果服務端收到一個 CleanSession 為 0 的連接, 當前會話標志的值取決于服務端是否已經保存了 ClientId對應客戶端的會話狀態。 如果服務端已經保存了會話狀態, 它必須將 CONNACK 報文中的當前會話標志設置為 1 。 如果服務端沒有已保存的會話狀態, 它必須將 CONNACK 報文中的當前會話設置為 0。 還需要將 CONNACK 報文中的返回碼設置為 0。
連接返回碼
如果服務端發送了一個包含非零返回碼的 CONNACK 報文, 那么它必須關
閉網絡連接。
0 | 0x00 | 連接已被服務端接受 |
1 | 0x01 | 服務端不支持客戶端請求的協議版本 |
2 | 0x02 | 客戶端標識符是正確的 UTF-8 編碼, 但服務 端不允許使用 |
3 | 0x03 | 網絡連接已建立, 但 MQTT 服務不可用 |
4 | 0x04 | 用戶名或密碼的數據格式無效 |
5 | 0x05 | 客戶端未被授權連接到此服務器 |
6-255 | 保留 | |
值 | 返回碼響應 | 描述 |
---|
CONNACK沒有有效載荷。
16.3.2.3 CONNACK報文wireshark抓包分析
圖3.4 CONNACK 抓包報文
16.3.2.4 c語言構造connect ack報文
void mqtt_connect_ack(int sockfd) { uint8_t cmd[]={ 0x20/*報文類型*/, 0x02/*剩余長度*/ ,0x00,0x00/*最后兩個字節可變報頭表示返回狀態碼*/ }; send_msg(sockfd,cmd,sizeof(cmd)); socket_record_t *socket_record = look_up_by_sokfd(sockfd); if(socket_record==NULL){ return; } socket_record->is_connect=0x01; }
16.3.3 PUBLISH-發布消息
PUBLISH 控制報文是指從客戶端向服務端或者服務端向客戶端傳輸一個應用消息。
圖 3.5 publish報文組成格式
16.3.3.1 固定報頭
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
Byte 1 | MQTT報文類型(3) | dup | Qos等級 | RETAIN | ||||
0 | 0 | 1 | 1 | x | x | x | x | |
Byte2 | 剩余長度 |
Bit3 dup
如果 DUP 標志被設置為 0, 表示這是客戶端或服務端第一次請求發送這個 PUBLISH 報文。 如果 DUP 標志被設置為 1,表示這可能是一個早前報文請求的重發??蛻舳嘶蚍斩苏埱笾匕l一個 PUBLISH 報文時, 必須將 DUP 標志設置為 1.。 對于 QoS0 的消息, DUP 標志必須設置為 0。
Bit1和bit2 qos等級
0 | 0 | 0 | 最多分發一次 |
1 | 0 | 1 | 至少分發一次 |
2 | 1 | 0 | 只分發一次 |
- | 1 | 1 | 保留不使用 |
Qos值 | bit2 | bit1 | 描述 |
---|
qos由發送端決定,發送端發送什么qos的消息,接收端就回復什么qos的消息。
不同qos等級mqtt報文交互流程
Bit0保留標記位
一般設置為0。
剩余長度
等于可變報頭的長度加上有效載荷的長度。
可變報頭
可變報頭按順序包含主題名和標識符。主題,用于識別有效載荷數據應該被發布到哪一個信息通道,標識符,只有當 QoS 等級是 1 或 2 時,報文標識符( Packet Identifier) 字段才能出現在 PUBLISH 報文中。
16.3.3.2 抓包分析PUBLISH報文
圖 3.6 PUBLISH 抓包報文
16.3.3.3 構造publish 報文
int mqtt_publish_with_qos(int sockfd,const char* topic, const char* msg, uint16 msgl, uint8 retain, uint8 qos, uint16* message_id) { socket_record_t *socket_record = look_up_by_sokfd(sockfd); if(NULL == socket_record){ return -1; } DEBUG_INFO("sockfd:%d",socket_record->sockfd); uint16 topiclen = strlen(topic); uint16 msglen = msgl; uint8 *var_header = NULL; // Topic size (2 bytes), utf-encoded topic uint8 *fixed_header = NULL; uint8 fixedHeaderSize = 0,var_headerSize = 0; // Default size = one byte Message Type + one byte Remaining Length uint16 remainLen = 0; uint8 *packet = NULL; uint16 packet_length = 0; uint8 qos_flag = MQTT_QOS0_FLAG; /*qos標記*/ uint8 qos_size = 0; // No QoS included if(qos == 1) { qos_size = 2; // 2 bytes for QoS qos_flag = MQTT_QOS1_FLAG; } else if(qos == 2) { qos_size = 2; // 2 bytes for QoS qos_flag = MQTT_QOS2_FLAG; } // Variable header var_headerSize = topiclen/*主題內容*/+2/*主題長度占用兩字節*/+qos_size/*標識符*/; var_header = (uint8 *)malloc(var_headerSize); memset(var_header, 0, var_headerSize); *var_header = topiclen>>8; *(var_header+1) = topiclen&0xFF; memcpy(var_header+2, topic, topiclen); if(qos_size) {//qos1和qos2的報文需要填充標識符,有點像tcp的seq socket_record->publish_seq++; if(socket_record->publish_seq == 0){ //unsigned short 表示范圍0~65535,標識符必須是非零整數 socket_record->publish_seq = 1; } var_header[topiclen+2] = (socket_record->publish_seq & 0xff00)>>8; var_header[topiclen+3] = socket_record->publish_seq & 0x00ff; if(message_id) { *message_id = socket_record->publish_seq; } } fixedHeaderSize = 2; // Default size = one byte Message Type + one byte Remaining Length remainLen = var_headerSize+msglen; if (remainLen > 127) {/*剩余長度*/ fixedHeaderSize++; // add an additional byte for Remaining Length } fixed_header = (uint8 *)malloc(fixedHeaderSize);/*固定報頭+剩余長度*/ // Message Type, DUP flag, QoS level, Retain *fixed_header = MQTT_MSG_PUBLISH | qos_flag;/*報文類型和qos標記*/ if(retain) { *fixed_header |= MQTT_RETAIN_FLAG;/*是否保留*/ } // Remaining Length,剩余長度 if (remainLen <= 127) { *(fixed_header+1) = remainLen; } else { // first byte is remainder (mod) of 128, then set the MSB to indicate more bytes *(fixed_header+1) = remainLen % 128; *(fixed_header+1) = *(fixed_header+1) | 0x80; // second byte is number of 128s *(fixed_header+2) = remainLen / 128; } packet_length = fixedHeaderSize+var_headerSize+msglen;/*固定報頭+可變報頭+負載長度*/ packet = (uint8 *)malloc(packet_length); memset(packet, 0, packet_length); memcpy(packet, fixed_header, fixedHeaderSize);/*填充固定報頭*/ memcpy(packet+fixedHeaderSize, var_header, var_headerSize);/*填充可變報頭*/ memcpy(packet+fixedHeaderSize+var_headerSize, msg, msglen);/*負載*/ free(var_header); free(fixed_header); send_msg(sockfd,packet , packet_length); free(packet); return 1; }
16.3.4 PUBREC-發布收到
PUBREC 報文是對 QoS 等級 2 的 PUBLISH 報文的響應。它是 QoS 2 等級協議交換的第二個報文。
16.3.4.1 固定報頭
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
Byte 1 | MQTT報文類型(5) | 保留位 | ||||||
0 | 1 | 0 | 1 | 0 | 0 | 0 | 0 | |
Byte2 | 剩余長度 |
剩余長度
表示可變報頭的長度。 對 PUBREC 報文它的值等于 2。
可變報頭
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
Byte1 | 報文標識符MSB | |||||||
Byte2 | 報文標識符LSB |
有效載荷
PUBREC 報文沒有有效載荷。
16.3.4.2 PUBREC抓包報文
圖 3.7 PUBREC抓包報文圖示
16.3.4.3 c語言構造pubrec報文
//如果是PUBREC報文,head_type=0x50 void mqtt_qos2_pubrec(int sockfd , unsigned char *data,unsigned char head_type) { uint16 msg_id = mqtt_parse_msg_id(data);/*報文標識符,回復報文和接受報文的標識符必須一樣*/ unsigned char qos2_pubrec_respon[]={head_type/*固定報頭*/,0x02/*剩余長度*/, (msg_id&0xff00)>>8 , msg_id&0x00ff/*最后兩個字節是報文標識符*/}; send_msg(sockfd,qos2_pubrec_respon,sizeof(qos2_pubrec_respon)); }
16.3.5 PUBREL-發布釋放
PUBREL 報文是對 PUBREC 報文的響應。 它是 QoS 2 等級協議交換的第三個報文。
16.3.5.1 固定報頭
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
Byte 1 | MQTT報文類型(6) | 保留位 | ||||||
0 | 1 | 1 | 0 | 0 | 0 | 0 | 0 | |
Byte2 | 剩余長度 |
剩余長度
表示可變報頭的長度。 對 PUBREL 報文它的值等于 2。
可變報頭
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
Byte1 | 報文標識符MSB | |||||||
Byte2 | 報文標識符LSB |
有效載荷
PUBREL 報文沒有有效載荷。
16.3.5.2 PUBREL抓包報文
圖 3.8 PUBREL抓包報文圖示
16.3.5.3 c語言構造pubrel報文
//head_type=0x62 void mqtt_qos2_pubrel(int sockfd , unsigned char *data,unsigned char head_type) { uint16 msg_id = mqtt_parse_msg_id(data); unsigned char qos2_pubrel_respon[]={head_type/*報文類型*/,0x02/*剩余長度*/, (msg_id & 0xff00)>>8 , msg_id & 0x00ff/*最后兩個字節是報文標識符*/}; send_msg(sockfd,qos2_pubrel_respon,sizeof(qos2_pubrel_respon)); }
16.3.6 PUBCOMP-發布完成
PUBCOMP 報文是對 PUBREL 報文的響應。 它是 QoS 2 等級協議交換的第四個也是最后一個報文。
16.3.6.1 固定報頭
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
Byte 1 | MQTT報文類型(7) | 保留位 | ||||||
0 | 1 | 1 | 1 | 0 | 0 | 0 | 0 | |
Byte2 | 剩余長度 |
剩余長度
表示可變報頭的長度。 對 PUBCOMP 報文它的值等于 2。
可變報頭
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
Byte1 | 報文標識符MSB | |||||||
Byte2 | 報文標識符LSB |
有效載荷
PUBCOMP 報文沒有有效載荷。
16.3.6.2 PUBCOMP抓包報文
圖 3.9 PUBCOMP抓包報文圖示
16.3.6.3 c語言構造pubcom報文
//head_type=0x70 void mqtt_qos2_pubcomp(int sockfd , unsigned char *data,unsigned char head_type) { uint16 msg_id = mqtt_parse_msg_id(data);/*報文標識符*/ unsigned char qos2_pubcomp_respon[]={head_type/*報文類型*/,0x02/*剩余長度*/, (msg_id & 0xff00)>>8 , msg_id & 0x00ff/*最后兩個字節報文標識符*/}; send_msg(sockfd,qos2_pubcomp_respon,sizeof(qos2_pubcomp_respon)); }
16.3.7 PINGREQ-心跳請求
客戶端發送 PINGREQ 報文給服務端的。用于:
a) 在沒有任何其它控制報文從客戶端發給服務的時,告知服務端客戶端還活著。
b) 請求服務端發送 響應確認它還活著。
c) 使用網絡以確認網絡連接沒有斷開。
16.3.7.1 固定報頭
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
Byte 1 | MQTT報文類型(12) | 保留位 | ||||||
1 | 1 | 0 | 0 | 0 | 0 | 0 | 0 | |
Byte2 | 剩余長度 0 |
可變報頭
報文沒有可變報頭。
有效載荷
PINGREQ 報文沒有有效載荷。
16.3.7.2 PINGREQ 抓包報文
圖 3.10 PINGREQ抓包報文圖示
16.3.7.3 c語言構造pingreq報文
int mqtt_ping(int sockfd) { uint8 packet[] = {MQTT_MSG_PINGREQ/*報文類型*/,0x00/*剩余長度*/}; int ret = send_msg(sockfd,packet, sizeof(packet)); return ret; }
16.3.8 PINGRESP – 心跳響應
服務端發送 PINGRESP 報文響應客戶端的 PINGREQ 報文。 表示服務端還活著。
16.3.8.1 固定報頭
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
Byte 1 | MQTT報文類型(13) | 保留位 | ||||||
1 | 1 | 0 | 1 | 0 | 0 | 0 | 0 | |
Byte2 | 剩余長度 0 |
可變報頭
報文沒有可變報頭。
有效載荷
PINGRESP 報文沒有有效載荷。
16.3.8.2 PINGRESP 抓包報文
圖 3.11 PINGRESP抓包報文圖示
16.3.8.3 c語言構造pinpresp報文
void mqtt_ping_req_reply(int sockfd) { uint8_t cmd[]={0xd0/*報文類型*/, 0x00/*剩余長度*/}; send_msg(sockfd,cmd,sizeof(cmd)); }
16.3.9 DISCONNECT –斷開連接
DISCONNECT 報文是客戶端發給服務端的最后一個控制報文。表示客戶端正常斷開連接。
16.3.9.1 固定報頭
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
Byte 1 | MQTT報文類型(14) | 保留位 | ||||||
1 | 1 | 1 | 0 | 0 | 0 | 0 | 0 | |
Byte2 | 剩余長度 0 |
可變報頭
DISCONNECT報文沒有可變報頭。
有效載荷
DISCONNECT 報文沒有有效載荷。
16.3.9.2 DISCONNECT 抓包報文
圖 3.12 DISCONNECT抓包報文圖示
16.3.9.3 c語言構造disconnect報文
int mqtt_disconnect(int sockfd) { uint8 packet[] = {MQTT_MSG_DISCONNECT/*報文類型*/,0x00/*剩余長度*/}; int ret = client_send(sockfd,packet, sizeof(packet)); DEBUG_INFO("ret=%d",ret); return ret; }
16.3.10 SUBSCRIBE-訂閱主題
客戶端向服務端發送 SUBSCRIBE 報文用于創建一個或多個訂閱。 每個訂閱注冊客戶端關心的一個或多個主題。 為了將應用消息轉發給與那些訂閱匹配的主題, 服務端發送 PUBLISH 報文給客戶端。 SUBSCRIBE報文也(為每個訂閱)指定了最大的 QoS 等級, 服務端根據這個發送應用消息給客戶端。
圖 3.13 訂閱主題報文組成格式
16.3.10.1 固定報頭
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
Byte 1 | MQTT報文類型(8) | 保留位 | ||||||
1 | 1 | 1 | 0 | 0 | 0 | 0 | 0 | |
Byte2 | 剩余長度 0 |
SUBSCRIBE 控制報固定報頭的第 3,2,1,0 位是保留位, 必須分別設置為 0,0,1,0,服務端必須將其它的任何值都當做是不合法的并關閉網絡連接。
剩余長度字段
等于可變報頭的長度( 2 字節) 加上有效載荷的長度。
可變報頭
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
Byte1 | 報文標識符MSB | |||||||
Byte2 | 報文標識符LSB |
服務端收到客戶端發送的一個 SUBSCRIBE 報文時, 必須使用 SUBACK 報文響應。SUBACK 報文必須和等待確認的 SUBSCRIBE 報文有相同的報文標識符。
有效載荷
7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
主題 | ||||||||
Byte1 | 主題長度MSB | |||||||
Byte2 | 主題長度LSB | |||||||
Byte3~n | 主題 | |||||||
服務質量 | ||||||||
保留 | qos等級 | |||||||
ByteN+1 | 0 | 0 | 0 | 0 | 0 | 0 | x | x |
QoS 不等于 0,1 或 2, 服務端必須認為 SUBSCRIBE 報文是不合法的并關閉網絡連接。
16.3.10.2 SUBSCRIBE報文抓包
圖 3.14 訂閱主題抓包報文
16.3.10.3 c語言構造subscribe報文
static uint16 su_seq = 1; int mqtt_subscribe_theme(int sockfd,char *Theme , uint8_t Qos) { su_seq++;//報文標識符 if(su_seq == 0){ su_seq = 1; } uint16_t MessageId = su_seq; uint8_t cmd[1024]={0}; //報文標示符長度2 + 主題長度位占用2字節+主題內容+qos標識 int data_length = 2+2+strlen(Theme)+1; int playload_len = strlen(Theme); uint8_t len_byte[4] ={0x00 , 0x00 ,0x00 ,0x00}; uint8_t byte_num = length_trans_byte_form(data_length , len_byte);/*把剩余長度轉換成變長編碼*/ cmd[0] = 0x82; memcpy(&cmd[1] , len_byte , byte_num); cmd[1+byte_num]=(MessageId & 0xff00) >> 8 ; cmd[1+byte_num+1] = MessageId & 0x00ff; cmd[1+byte_num+1+1] = (playload_len & 0xff00) >> 8; cmd[1+byte_num+1+1+1] = playload_len & 0x00ff; memcpy(&cmd[1+byte_num+1+1+1+1] , Theme , playload_len); cmd[1+byte_num+1+1+1+1+playload_len] = Qos; client_send(sockfd,cmd, 1+byte_num+1+1+1+1+playload_len+1); }
16.3.11 SUBACK –訂閱確認
服務端發送 SUBACK 報文給客戶端,用于確認它已收到并且正在處理 SUBSCRIBE 報文。
16.3.11.1 固定報頭
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
Byte 1 | MQTT報文類型(9) | 保留位 | ||||||
1 | 0 | 0 | 1 | 0 | 0 | 0 | 0 | |
Byte2 | 剩余長度 0 |
剩余長度字段
等于可變報頭的長度加上有效載荷的長度。
可變報頭
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
Byte1 | 報文標識符MSB | |||||||
Byte2 | 報文標識符LSB |
可變報頭包含等待確認的 SUBSCRIBE 報文的報文標識符。
3.11.2 SUBACK抓包報文
圖 3.15訂閱主題ack抓包報文
16.3.11.3 c語言構造suback報文
void mqtt_subscribe_ack(int sockfd,const uint8 *buf) { uint16 msg_id = mqtt_parse_msg_id(buf);/*提取報文標識符*/ uint8 qos = MQTTParseMessageQos(buf);/*提取報文qos*/ uint8 cmd[]={0x90,0x03/*剩余長度*/, (msg_id & 0xff00) >> 8, msg_id & 0x00ff,qos}; send_msg(sockfd,cmd,sizeof(cmd)); }
16.3.12 UNSUBSCRIBE –取消訂閱
客戶端發送 UNSUBSCRIBE 報文給服務端, 用于取消訂閱主題。
圖 3.16取消訂閱主題報文結構
16.3.12.1 固定報頭
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
Byte 1 | MQTT報文類型(10) | 保留位 | ||||||
1 | 0 | 1 | 0 | 0 | 0 | 0 | 0 | |
Byte2 | 剩余長度 0 |
UNSUBSCRIBE 報文固定報頭的第 3,2,1,0 位是保留位且必須分別設置為 0,0,1,0。 服務端必須認為任何其它的值都是不合法的并關閉網絡連接。
剩余長度字段
等于可變報頭的長度,加上有效載荷的長度。
可變報頭
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
Byte1 | 報文標識符MSB | |||||||
Byte2 | 報文標識符LSB |
可變報頭包含一個報文標識符。
有效載荷
7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 | |
主題1 | ||||||||
Byte1 | 主題長度MSB | |||||||
Byte2 | 主題長度LSB | |||||||
Byte3~n | 主題 | |||||||
主題2 |
UNSUBSCRIBE 報文的有效載荷必須至少包含一個消息過濾器。 沒有有效載荷的 UNSUBSCRIBE 報文是違反協議的。
16.3.12.2 UNSUBSCRIBE抓包報文
圖 3.17取消訂閱主題抓包報文
16.3.12.3 c語言構造unsubscribe報文
static uint16 un_seq = 1; int mqtt_unsubscribe_theme(int sockfd,const char* topic) { un_seq++; if(un_seq == 0){ un_seq = 1; } uint16_t MessageId = un_seq; uint8_t cmd[1024]={0}; //報文標示符長度2 + 主題長度位占用2字節+主題內容+qos標識 int data_length = 2+2+strlen(topic)+1; int playload_len = strlen(topic); uint8_t len_byte[4] ={0x00 , 0x00 ,0x00 ,0x00}; uint8_t byte_num = length_trans_byte_form(data_length , len_byte);/*剩余長度轉換成變長編碼*/ cmd[0] = 0xa2; memcpy(&cmd[1] , len_byte , byte_num); cmd[1+byte_num]=(MessageId & 0xff00) >> 8 ; cmd[1+byte_num+1] = MessageId & 0x00ff; cmd[1+byte_num+1+1] = (playload_len & 0xff00) >> 8; cmd[1+byte_num+1+1+1] = playload_len & 0x00ff; memcpy(&cmd[1+byte_num+1+1+1+1] , topic , playload_len); client_send(sockfd,cmd,1+byte_num+1+1+1+1+playload_len+1); return 1; }
16.3.13 UNSUBACK –取消訂閱確認
服務端發送 UNSUBACK 報文給客戶端用于確認收到 UNSUBSCRIBE 報文。
圖 3.18取消訂閱主題ack報文組成
16.3.13.1 固定報頭
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
Byte 1 | MQTT報文類型(11) | 保留位 | ||||||
1 | 0 | 1 | 1 | 0 | 0 | 0 | 0 | |
Byte2 | 剩余長度 0 |
剩余長度字段
表示可變報頭的長度, 對 UNSUBACK 報文這個值等于 2。
可變報頭
bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
Byte1 | 報文標識符MSB | |||||||
Byte2 | 報文標識符LSB |
可變報頭包含等待確認的 UNSUBSCRIBE 報文的報文標識符。
16.3.12.2 UNSUBSCRIBE ACK抓包報文
圖 3.19取消訂閱主題ACK抓包報文
16.3.12.3 c語言構造unsubscribe報文
void mqtt_unsubscribe_ack(int sockfd,const uint8 *buf) { uint16 msg_id = mqtt_parse_msg_id(buf); uint8 cmd[]={0xb0,0x02/*剩余長度*/,(msg_id & 0xff00) >> 8, msg_id & 0x00ff/*最后兩個字節是報文標識符*/}; send_msg(sockfd,cmd,sizeof(cmd)); }
16.3.14 服務端與客戶端交互操作過程
16.3.14.1 編譯
編譯client之前先在代碼中指定server ip
進入client目錄,直接make即可
進入server目錄,直接make即可
16.3.14.2 執行
先運行server
再運行client
Client操作流程
在server端查看
審核編輯黃昊宇
-
Linux
+關注
關注
87文章
11342瀏覽量
210152 -
MQTT
+關注
關注
5文章
653瀏覽量
22634
發布評論請先 登錄
相關推薦
評論