Overview
MQTT-on-Pulsar (aka MoP) is developed to support MQTT protocol natively on Apache Pulsar.
Installation
To install the MoP protocol handler, follow these steps:
Download the NAR package of the MoP protocol handler from here.
Install the MoP protocol handler.
Set the configuration of the MoP protocol handler.
Add the following properties and set their values in the Pulsar configuration file, such as
conf/broker.conf
orconf/standalone.conf
.Property Recommended value Default value messagingProtocols
mqtt null protocolHandlerDirectory
Location of MoP NAR file ./protocols Example
messagingProtocols=mqtt protocolHandlerDirectory=./protocols
Set the MQTT server listeners.
Note
The hostname in the MQTT server listeners should be the same one as the Pulsar broker's
advertisedAddress
.Example
mqttListeners=mqtt://127.0.0.1:1883 advertisedAddress=127.0.0.1
Load the MoP protocol handler. After installing the MoP protocol handler on the Pulsar broker, you can restart the Pulsar broker to load the MoP protocol handler.
Configuration
The following table lists configurations available for the MoP protocol handler.
Property | Recommended value | Default value |
---|---|---|
messagingProtocols | mqtt | N/A |
protocolHandlerDirectory | Location of MoP NAR file | ./protocols |
mqttListeners | mqtt://127.0.0.1:1883 | N/A |
advertisedAddress | 127.0.0.1 | N/A |
Usage
Use Pulsar Proxy
To use the Pulsar Proxy, follow these steps. For detailed steps, refer to Deploy a cluster on bare metal.
Prepare a ZooKeeper cluster.
Initialize the cluster metadata.
Prepare a BookKeeper cluster.
Copy the
pulsar-protocol-handler-mqtt-${version}.nar
to the$PULSAR_HOME/protocols
directory.Start the Pulsar broker.
Here is an example of the Pulsar broker configuration.
messagingProtocols=mqtt protocolHandlerDirectory=./protocols brokerServicePort=6651 mqttListeners=mqtt://127.0.0.1:1883 advertisedAddress=127.0.0.1 mqttProxyEnable=true mqttProxyPort=5682
Verify MoP with FuseSource MQTT client
There are many MQTT client that can be used to verify the MoP protocol handler, such as MQTTBox, MQTT Toolbox. You can choose a CLI tool or interface tool to verify the MoP protocol handler.
The following example shows how to verify the MoP protocol handler with FuseSource MqttClient.
Add Maven dependency.
<dependency> <groupId>org.fusesource.mqtt-client</groupId> <artifactId>mqtt-client</artifactId> <version>1.16</version> </dependency>
Publish and consume messages.
MQTT mqtt = new MQTT(); mqtt.setHost("127.0.0.1", 1883); BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); Topic[] topics = { new Topic("persistent://public/default/my-topic", QoS.AT_LEAST_ONCE) }; connection.subscribe(topics); // publish message connection.publish("persistent://public/default/my-topic", "Hello MOP!".getBytes(), QoS.AT_LEAST_ONCE, false); // receive message Message received = connection.receive();