网站首页 文章专栏 物联网之—MQTT协议
物联网之—MQTT协议
编辑时间:2022-03-16 17:19:51 作者:lindingding 浏览量:269

       最近在做海上平台压裂数据远传的项目,由于海上移动通信网络质量不佳,我们不得不选择卫星网络,在考虑弱网络条件下,我们选择了MQTT协议作为海上采集端和陆地数据存储、处理端的通信协议,本文将讲解该协议相关原理、优缺点、应用场景及使用方法等。

      物联网是新一代信息技术的重要组成部分,也是“信息化”时代的重要发展阶段。其英文名称是:“Internet of things(IoT)”。顾名思义,物联网就是物物相连的互联网。这有两层意思:其一,物联网的核心和基础仍然是互联网,是在互联网基础上的延伸和扩展的网络;其二,其用户端延伸和扩展到了任何物品与物品之间,进行信息交换和通信,也就是物物相息。物联网通过智能感知、识别技术与普适计算等通信感知技术,广泛应用于网络的融合中,也因此被称为继计算机、互联网之后世界信息产业发展的第三次浪潮。

       而在物联网的应用上,对于信息传输,MQTT是一种再合适不过的协议工具了。


一、MQTT简介

   MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的轻量级协议,该协议构建于TCP/IP协议之上,MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

  MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。 

二、特性 

      MQTT协议工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:

  (1)使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。

  (2)对负载内容屏蔽的消息传输。

  (3)使用TCP/IP提供网络连接。

    主流的MQTT是基于TCP连接进行数据推送的,但是同样有基于UDP的版本,叫做MQTT-SN。这两种版本由于基于不同的连接方式,优缺点自然也就各有不同了。

  (4)有三种消息发布服务质量:

  “至多一次”,消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要普通APP的推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,再次联网也就收不到了。

  “至少一次”,确保消息到达,但消息重复可能会发生。

  “只有一次”,确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。在计费系统中,消息重复或丢失会导致不正确的结果。这种最高质量的消息发布服务还可以用于即时通讯类的APP的推送,确保用户收到且只会收到一次。 

  (5)小型传输,开销很小(固定长度的头部是2字节),协议交换最小化,以降低网络流量。

  这就是为什么在介绍里说它非常适合“在物联网领域,传感器与服务器的通信,信息的收集”,要知道嵌入式设备的运算能力和带宽都相对薄弱,使用这种协议来传递消息再适合不过了。

三、实现方式  

       实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

    MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

  (1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);

  (2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

四、MQTT的搭建(ubuntu)

        //MQTT

       1、安装mosquitto

            apt-get install mosquitto

       2、查看运行状态

           service mosquitto status

       3、安装mosquitto-clients

           apt install mosquitto-clients

       4、测试

           打开一个终端,订阅主题

           mosquitto_sub -h 192.168.1.102 -t "mqtt" -v

              【-h】指定要连接的MQTT服务器

              【-t】订阅主题,此处为mqtt 

              【-v】打印更多的调试信息

          再打开一个终端,发布主题

          mosquitto_pub -h 192.168.1.102 -t "mqtt" -m "Hello Stonegeek"

              【-h】指定要连接的MQTT服务器 

              【-t】向指定主题推送消息 

              【-m】指定消息内容


image.png

image.png


五、MQTT的权限配置

       前面我们基于Mosquitto服务器已经搭建成功了,但是默认是允许匿名用户登录,对于正式上线的项目则是需要进行用户认证(当然,用户一般都会与数据库映射,不过在这里我们就会直接将用户写入配置文件中)

  1、Mosquitto服务器的配置文件为/etc/mosquitto/mosquitto.conf,关于用户认证的方式和读取的配置都在这个文件中进行

  配置文件参数说明:

IDallow_anonymouspassword_fileacl_fileresult
1True(默认)

允许匿名方式登录
2Falsepassword_file
开启用户验证机制
3Falsepassword_fileacl_file开启用户验证机制,但访问控制不起作用
4Truepassword_fileacl_file用户名及密码不为空,将自动进行用户验证且受到访问控制的限制;用户名及密码为空,将不进行用户验证且受到访问控制的限制
5False

无法启动服务

        2、修改配置文件:

        vi /etc/mosquitto/mosquitto.conf

 1646903876(1).jpg

        3、添加用户信息

        mosquitto_passwd -c /etc/mosquitto/pwfile lmc

image.png

          命令解释: -c 创建一个用户、/etc/mosquitto/pwfile.example 是将用户创建到 pwfile.example  文件中、admin 是用户名。 

        同样连续会提示连续输入两次密码。注意第二次创建用户时不用加 -c 如果加 -c 会把第一次创建的用户覆盖。

        至此两个用户创建成功,此时如果查看 pwfile.example 文件会发现其中多了两个用户。

         4、添加Topic和用户的关系

image.png

        5、重启服务

image.png

        6、账号密码,发布/订阅

image.png

       主题不对,也无法收到(配置文件acl中控制)

image.png

image.png

       我们修改配置文件acl

image.png

image.png

image.png

      可以看出,只有dev主题的才能收到。

六、MQTT的代码实现(go、java)

MQTT.png

接收端API

ConnectCommand为连接操作类,可以设置相应属性。

  1. setClientId()

    设置客户身份唯一标识

  2. setServer()

    设置建立连接的域名或者服务器ip

  3. setPort

    设置端口号

  4. setUserNameAndPassword

    设置连接认证的用户名和密码

  5. setKeepAlive

    设置保持长连接ping的频率,单位为秒,建议100

  6. setTimeout

    设置操作超时时间。

  7. setCleanSession

    设置cleansession,若为true,当 disconnect 时,会移除这个 client 所有的 subscriptions.

  8. setSsl

    建立ssl长连接,若没有设置的话,默认为tcp长连接。

  9. setLastWill

    设置遗愿消息,即当设备断开连接时会主动pub的消息。

  10. setTraceEnabled

    是否打印日志,默认false

  11. setTraceCallback

    监听日志回调,需要setTraceEnabled(true)


发送端API

  1. setMessage

    设置消息内容

  2. setQos

    设置qos,决定消息到达次数。

  3. setTopic

    设置消息主题

  4. setRetained

    服务器是否保存消息

代码实现

1、java 实现

____________________publish_____________________

package www.mqtt.com;

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.MqttCallback;

/**
 * 服务器向多个客户端推送主题,即不同客户端可向服务端订阅相同的主题
 */
public class ServerMQTT {
    //tcp://MQTT安装的服务器地址:MQTT定义的端口号
    public static final String HOST = "tcp://47.94.126.xx:1883";
    //定义一个主题
    public static final String TOPIC = "dev";
    //定义MQTT的ID,可以在MQTT服务配置中指定
    private static final String clientid = "server01";

    private MqttClient client;
    private MqttTopic topic11;
    private String userName = "lmc";
    private String passWord = "123456";
    private int timeout = 10;
    private int keepAliveInterval = 20;
    private MqttMessage message;
    /**
     * 构造函数
     * @throws MqttException
     */
    public ServerMQTT() throws MqttException {
        // MemoryPersistence设置clientid的保存形式,默认为以内存保存
        client = new MqttClient(HOST, clientid, new MemoryPersistence());
        connect();
    }

    public void connect() {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(false);
        options.setUserName(userName);
        options.setPassword(passWord.toCharArray());
        // 设置超时时间
        options.setConnectionTimeout(timeout);
        // 设置会话心跳时间
        options.setKeepAliveInterval(keepAliveInterval);
        try {
            client.setCallback(new PushCallback());
            client.connect(options);
            topic11 = client.getTopic(TOPIC);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     *
     * @param topic
     * @param message
     * @throws MqttPersistenceException
     * @throws MqttException
     */
    public void publish(MqttTopic topic,MqttMessage message) throws MqttPersistenceException,
        MqttException {
            MqttDeliveryToken token = topic.publish(message);
            token.waitForCompletion();
            System.out.println("message is published completely by lxy ! " + token.isComplete());
            System.out.println(topic + " " + message);
    }

    /**
     *  启动入口
     * @param args
     * @throws MqttException
     */
    public static void main(String[] args) throws MqttException {
        ServerMQTT  server = new ServerMQTT();

        server.message = new MqttMessage();
        //消息到达次数
        server.message.setQos(1);
        //发布方式
        /*
        1.当消息发布到MQTT服务器时,我们需要保留最新的消息到服务器上,以免订阅时丢失上一次最新的消息(最新的,不是所有的);
          当订阅消费端服务器重新连接MQTT服务器时,总能拿到该主题最新消息,这个时候我们需要把retained设置为true;
        2.当消息发布到MQ服务器时,我们不需要保留最新的消息到服务器上,设置为false。
        */
        server.message.setRetained(true);
        server.message.setPayload("Hello lxy! I am testting!".getBytes());
        server.publish(server.topic11, server.message);
        System.out.println(server.message.isRetained() + "------ratained状态");
    }
}

____________________subscribe ___________________

package www.mqtt.com;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.MqttException;

public class ClientMQTT {
    public static final String Host = "tcp://47.94.126.xx:1883";
    public static final String Topic = "dev";
    public static final String clientId = "client1";
    private MqttClient client;
    private MqttConnectOptions options;
    private String userName = "lmc";
    private String passWord = "123456";

    // 可以将定时任务与线程池功能结合使用
    private ScheduledExecutorService scheduler;

    public void start() {
        try {
            // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
            client = new MqttClient(Host, clientId, new MemoryPersistence());
            // MQTT的连接设置
            options = new MqttConnectOptions();
            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
            options.setCleanSession(true);
            // 设置连接的用户名
            options.setUserName(userName);
            // 设置连接的密码
            options.setPassword(passWord.toCharArray());
            // 设置超时时间 单位为秒
            options.setConnectionTimeout(10);
            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
            options.setKeepAliveInterval(20);
            // 设置回调
            client.setCallback(new PushCallback());
            MqttTopic topic = client.getTopic(Topic);
            //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
            options.setWill(topic, "close".getBytes(), 2, true);

            client.connect(options);
            //订阅消息
            int[] Qos  = {1};
            String[] topic1 = {Topic};
            client.subscribe(topic1, Qos);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws MqttException {
        ClientMQTT client  = new ClientMQTT();
        client.start();
    }
}

________________________callback____________________________

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.eclipse.paho.client.mqttv3;

public class MqttMessage {
    private boolean mutable = true;
    private byte[] payload;
    private int qos = 1;
    private boolean retained = false;
    private boolean dup = false;
    private int messageId;

    public static void validateQos(int qos) {
        if (qos < 0 || qos > 2) {
            throw new IllegalArgumentException();
        }
    }

    public MqttMessage() {
        this.setPayload(new byte[0]);
    }

    public MqttMessage(byte[] payload) {
        this.setPayload(payload);
    }

    public byte[] getPayload() {
        return this.payload;
    }

    public void clearPayload() {
        this.checkMutable();
        this.payload = new byte[0];
    }

    public void setPayload(byte[] payload) {
        this.checkMutable();
        if (payload == null) {
            throw new NullPointerException();
        } else {
            this.payload = payload;
        }
    }

    public boolean isRetained() {
        return this.retained;
    }

    public void setRetained(boolean retained) {
        this.checkMutable();
        this.retained = retained;
    }

    public int getQos() {
        return this.qos;
    }

    public void setQos(int qos) {
        this.checkMutable();
        validateQos(qos);
        this.qos = qos;
    }

    public String toString() {
        return new String(this.payload);
    }

    protected void setMutable(boolean mutable) {
        this.mutable = mutable;
    }

    protected void checkMutable() throws IllegalStateException {
        if (!this.mutable) {
            throw new IllegalStateException();
        }
    }

    protected void setDuplicate(boolean dup) {
        this.dup = dup;
    }

    public boolean isDuplicate() {
        return this.dup;
    }

    public void setId(int messageId) {
        this.messageId = messageId;
    }

    public int getId() {
        return this.messageId;
    }
}

image.png

image.png

2、go实现

____________________publish_____________________

package main

import (
   "fmt"
   MQTT "github.com/eclipse/paho.mqtt.golang"
   "os"
   "time"
)

//define a function for the default message handler
var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
   fmt.Printf("topic: %s\n", msg.Topic())
   fmt.Printf("msg: %s\n", msg.Payload())
}

var (
   topic string = "dev"
   qos uint8 = 1
   userName string = "lmc"
   passWord string = "123456"
)

func main() {
   //create a ClientOptions struct setting the broker address, clientid, turn
   //off trace output and set the default message handler
   opts := MQTT.NewClientOptions().AddBroker("tcp://47.94.14.169:1883")
   opts.SetClientID("go-simple")
   opts.SetDefaultPublishHandler(nil)
   opts.Username = userName
   opts.Password = passWord
   //create and start a client using the above ClientOptions
   c := MQTT.NewClient(opts)
   if token := c.Connect(); token.Wait() && token.Error() != nil {
      panic(token.Error())
   }

   //subscribe to the topic "dev" and request messages to be delivered
   //at a maximum qos of zero, wait for the receipt to confirm the subscription
   /*if token := c.Subscribe(topic, 0, nil); token.Wait() && token.Error() != nil {
      fmt.Println(token.Error())
      os.Exit(1)
   }*/

   fmt.Println("I have connected to the mqtt server or broker to publish! ")

   //Publish 5 messages to dev at qos 1 and wait for the receipt
   //from the server after sending each message
   for i := 0; i < 5; i++ {
      text := fmt.Sprintf("this is msg #%d!" + "\n", i)
      fmt.Print(text)
      token := c.Publish(topic, 0, false, text)
      token.Wait()
   }

   time.Sleep(3 * time.Second)
   //unsubscribe from dev
   if token := c.Unsubscribe(topic); token.Wait() && token.Error() != nil {
      fmt.Println(token.Error())
      os.Exit(1)
   }

   //shut the connection
   c.Disconnect(250)
}

____________________subscribe_____________________

package main

import (
   "fmt"
   MQTT "github.com/eclipse/paho.mqtt.golang"
   "os"
   "time"
)

//define a function for the default message handler
var f MQTT.MessageHandler = func(client MQTT.Client, msg MQTT.Message) {
   fmt.Printf("topic: %s\n", msg.Topic())
   fmt.Printf("msg: %s\n", msg.Payload())
}

var (
   topic string = "dev"
   qos uint8 = 1
   userName string = "lmc"
   passWord string = "123456"
)

func main() {
   //create a ClientOptions struct setting the broker address, clientid, turn
   //off trace output and set the default message handler
   opts := MQTT.NewClientOptions().AddBroker("tcp://47.94.14.169:1883")
   opts.SetClientID("go-simple1")
   opts.SetDefaultPublishHandler(f)
   opts.Username = userName
   opts.Password = passWord
   opts.KeepAlive = 20
   //create and start a client using the above ClientOptions
   c := MQTT.NewClient(opts)
   if token := c.Connect(); token.Wait() && token.Error() != nil {
      panic(token.Error())
   }

   //subscribe to the topic "dev" and request messages to be delivered
   //at a maximum qos of zero, wait for the receipt to confirm the subscription
   if token := c.Subscribe(topic, 0, nil); token.Wait() && token.Error() != nil {
      fmt.Println(token.Error())
      os.Exit(1)
   }

   fmt.Println("I have connected to the mqtt server or broker to subscribe! ")

   ////Publish 5 messages to dev at qos 1 and wait for the receipt
   ////from the server after sending each message
   //for i := 0; i < 5; i++ {
   // text := fmt.Sprintf("this is msg #%d!" + "\n", i)
   // fmt.Print(text)
   // token := c.Publish(topic, 0, false, text)
   // token.Wait()
   //}
   //
   time.Sleep(120*time.Second)
   //unsubscribe from dev
   if token := c.Unsubscribe(topic); token.Wait() && token.Error() != nil {
      fmt.Println(token.Error())
      os.Exit(1)
   }

   fmt.Println("I have disconnected to the mqtt server or broker to exit! ")
   //shut the connection
   c.Disconnect(250)
   return
}

image.png

image.png 


来说两句吧
最新评论