protocol-handler
MoP
MoP brings the native MQTT protocol support to Pulsar by introducing a MQTT protocol handler on Pulsar brokers.
Authored by
StreamNative
Support type
StreamNative
License
Apache License 2.0

Overview

MQTT-on-Pulsar (aka MoP) is developed to support MQTT protocol natively on Apache Pulsar.

Installation

To install the MoP protocol handler, follow these steps:

  1. Download the NAR package of the MoP protocol handler from here.

  2. Install the MoP protocol handler.

    1. Set the configuration of the MoP protocol handler.

      Add the following properties and set their values in the Pulsar configuration file, such as conf/broker.conf or conf/standalone.conf.

      PropertyRecommended valueDefault value
      messagingProtocolsmqttnull
      protocolHandlerDirectoryLocation of MoP NAR file./protocols

      Example

      messagingProtocols=mqtt
      protocolHandlerDirectory=./protocols
      
    2. Set the MQTT server listeners.

      Note

      The hostname in the MQTT server listeners should be the same one as the Pulsar broker's advertisedAddress.

      Example

      mqttListeners=mqtt://127.0.0.1:1883
      advertisedAddress=127.0.0.1
      
  3. Load the MoP protocol handler. After installing the MoP protocol handler on the Pulsar broker, you can restart the Pulsar broker to load the MoP protocol handler.

Configuration

The following table lists configurations available for the MoP protocol handler.

PropertyRecommended valueDefault value
messagingProtocolsmqttN/A
protocolHandlerDirectoryLocation of MoP NAR file./protocols
mqttListenersmqtt://127.0.0.1:1883N/A
advertisedAddress127.0.0.1N/A

Usage

Use Pulsar Proxy

To use the Pulsar Proxy, follow these steps. For detailed steps, refer to Deploy a cluster on bare metal.

  1. Prepare a ZooKeeper cluster.

  2. Initialize the cluster metadata.

  3. Prepare a BookKeeper cluster.

  4. Copy the pulsar-protocol-handler-mqtt-${version}.nar to the $PULSAR_HOME/protocols directory.

  5. Start the Pulsar broker.

    Here is an example of the Pulsar broker configuration.

    messagingProtocols=mqtt
    protocolHandlerDirectory=./protocols
    brokerServicePort=6651
    mqttListeners=mqtt://127.0.0.1:1883
    advertisedAddress=127.0.0.1
    
    mqttProxyEnable=true
    mqttProxyPort=5682
    

Verify MoP with FuseSource MQTT client

There are many MQTT client that can be used to verify the MoP protocol handler, such as MQTTBox, MQTT Toolbox. You can choose a CLI tool or interface tool to verify the MoP protocol handler.

The following example shows how to verify the MoP protocol handler with FuseSource MqttClient.

  1. Add Maven dependency.

    <dependency>
        <groupId>org.fusesource.mqtt-client</groupId>
        <artifactId>mqtt-client</artifactId>
        <version>1.16</version>
    </dependency>
    
  2. Publish and consume messages.

    MQTT mqtt = new MQTT();
    mqtt.setHost("127.0.0.1", 1883);
    BlockingConnection connection = mqtt.blockingConnection();
    connection.connect();
    Topic[] topics = { new Topic("persistent://public/default/my-topic", QoS.AT_LEAST_ONCE) };
    connection.subscribe(topics);
    
    // publish message
    connection.publish("persistent://public/default/my-topic", "Hello MOP!".getBytes(), QoS.AT_LEAST_ONCE, false);
    
    // receive message
    Message received = connection.receive();