1. StreamNative Platform
  2. Connect

Connect to Pulsar clusters using RabbitMQ client

StreamNative Platform brings native AMQP protocol support to Pulsar brokers using the AMQP on Pulsar (AoP) protocol handler. Therefore, you can migrate your existing AMQP applications and services to Apache Pulsar without modifying the code.

In this document, we use the RabbitMQ client to connect to a Pulsar cluster and then produce and consume messages to and from the Pulsar cluster.

Prerequisites

Procedures

To connect to a Pulsar cluster using the RabbitMQ client, follow these steps.

  1. Create a Pulsar namespace and set the retention policy for the namespace.

    This example shows how to use the pulsar-admin CLI tool to create a Pulsar namespace named vhost1 and set the retention size and retention time to 100M and 2 days, respectively.

    bin/pulsar-admin namespaces create -b 1 public/vhost1
    bin/pulsar-admin namespaces set-retention -s 100M -t 2d public/vhost1
    
  2. Create a Maven project and add a dependency to the RabbitMQ client.

    # add the RabbitMQ client dependency in your project
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.8.0</version>
    </dependency>
    
  3. Connect to AoP using the RabbitMQ client.

    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import java.io.IOException;
    import java.time.Duration;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.Callable;
    import java.util.concurrent.CountDownLatch;
    
    public class main {
      public static void main(String[] args) throws Exception
      {
        // create connection
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setVirtualHost("vhost1");                     # --- [1]
        connectionFactory.setHost("BROKER_EXTERNAL_SERVICE");           # --- [2]
        connectionFactory.setPort(5673);                                # --- [3]
    
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
    
        String exchange = "ex";
        String queue = "qu";
    
        // declare exchange
        channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT, true, false, false, null);
    
        // queue declare and bind
        channel.queueDeclare(queue, true, false, false, null);
        channel.queueBind(queue, exchange, queue);
    
        // publish some messages
        for (int i = 0; i < 100; i++) {
          channel.basicPublish(exchange, "", null, ("hello - " + i).getBytes());
        }
    
        // consume messages
        CountDownLatch countDownLatch = new CountDownLatch(100);
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("receive msg: " + new String(body));
            countDownLatch.countDown();
          }
        });
        countDownLatch.await();
    
        // release resource
        channel.close();
        connection.close();
      }
    }
    
    • [1] connectionFactory.setVirtualHost: represent the Pulsar namespace for AoP.
    • [2] connectionFactory.setHost: represent the Pulsar broker external service.
    • [3] connectionFactory.setPort: represent the port ID for AoP. It is set to port 5673.
Previous
MQTT - Mosquitto