1. Operating StreamNative Platform
  2. Protocols

Configure AoP

AMQP on Pulsar (AoP) is a protocol handler developed by StreamNative to natively support the AMQP protocol on the Pulsar broker. By adding the AoP protocol handler to your existing Pulsar cluster, you can now migrate your existing AMQP applications and services to Pulsar without modifying the code.

Enable AoP

This section describes how to enable AoP on StreamNative Platform.

Prerequisites for enabling AoP

  • sn-platform chart: 1.6.0 or higher
  • pulsar-operator chart: 0.11.6 or higher

Procedure

To enable AoP within a Kubernetes cluster, you can set broker.aop.enabled and broker.aop.proxyEnabled to true in the values.yaml YAML file as follows and use the helm upgrade command to update the resource.

  1. Enable AoP.

    broker:
      aop:
        enabled: true
        proxyEnabled: true
    
  2. Apply the new configuration.

    helm upgrade -f /path/to/your/values.yaml <release_name> streamnative/sn-platform -n <k8s_namespace>
    

Connect to your Pulsar clusters using the RabbitMQ client

This section uses the RabbitMQ client to connect to a Pulsar cluster and then produce and consume messages to and from the Pulsar cluster.

Prerequisites

Steps

To connect to a Pulsar cluster using the RabbitMQ client, follow these steps.

  1. Create a Pulsar namespace and set the retention policy for the namespace.

    This example shows how to use the pulsar-admin CLI tool to create a Pulsar namespace named vhost1 and set the retention size and retention time to 100M and 2 days, respectively.

    bin/pulsar-admin namespaces create -b 1 public/vhost1
    bin/pulsar-admin namespaces set-retention -s 100M -t 2d public/vhost1
    
  2. Create a Maven project and add a dependency to the RabbitMQ client.

    # add the RabbitMQ client dependency in your project
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.8.0</version>
    </dependency>
    
  3. Connect to AoP using the RabbitMQ client.

    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import java.io.IOException;
    import java.time.Duration;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.Callable;
    import java.util.concurrent.CountDownLatch;
    
    public class main {
      public static void main(String[] args) throws Exception
      {
        // create connection
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setVirtualHost("vhost1");                     # --- [1]
        connectionFactory.setHost("BROKER_EXTERNAL_SERVICE");           # --- [2]
        connectionFactory.setPort(5673);                                # --- [3]
    
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
    
        String exchange = "ex";
        String queue = "qu";
    
        // declare exchange
        channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT, true, false, false, null);
    
        // queue declare and bind
        channel.queueDeclare(queue, true, false, false, null);
        channel.queueBind(queue, exchange, queue);
    
        // publish some messages
        for (int i = 0; i < 100; i++) {
          channel.basicPublish(exchange, "", null, ("hello - " + i).getBytes());
        }
    
        // consume messages
        CountDownLatch countDownLatch = new CountDownLatch(100);
        channel.basicConsume(queue, true, new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println("receive msg: " + new String(body));
            countDownLatch.countDown();
          }
        });
        countDownLatch.await();
    
        // release resource
        channel.close();
        connection.close();
      }
    }
    
    • [1] connectionFactory.setVirtualHost: represent the Pulsar namespace for AoP.
    • [2] connectionFactory.setHost: represent the Pulsar broker external service.
    • [3] connectionFactory.setPort: represent the port ID for AoP. It is set to port 5673.
Previous
Configure MQTT on Pulsar