- Operating StreamNative Platform
- Protocols
Configure AoP
AMQP on Pulsar (AoP) is a protocol handler developed by StreamNative to natively support the AMQP protocol on the Pulsar broker. By adding the AoP protocol handler to your existing Pulsar cluster, you can now migrate your existing AMQP applications and services to Pulsar without modifying the code.
Enable AoP
This section describes how to enable AoP on StreamNative Platform.
Prerequisites for enabling AoP
sn-platform
chart: 1.6.0 or higherpulsar-operator
chart: 0.11.6 or higher
Procedure
To enable AoP within a Kubernetes cluster, you can set broker.aop.enabled
and broker.aop.proxyEnabled
to true
in the values.yaml
YAML file as follows and use the helm upgrade
command to update the resource.
Enable AoP.
broker: aop: enabled: true proxyEnabled: true
Apply the new configuration.
helm upgrade -f /path/to/your/values.yaml <release_name> streamnative/sn-platform -n <k8s_namespace>
Connect to your Pulsar clusters using the RabbitMQ client
This section uses the RabbitMQ client to connect to a Pulsar cluster and then produce and consume messages to and from the Pulsar cluster.
Prerequisites
- Install StreamNative Platform and run a Pulsar cluster. For details, see deploy StreamNative Platform.
- Enable AoP. For details, see enable AoP.
- Get the Web service URL of your Pulsar cluster or broker service. For details, see prepare to connect to Pulsar clusters.
Steps
To connect to a Pulsar cluster using the RabbitMQ client, follow these steps.
Create a Pulsar namespace and set the retention policy for the namespace.
This example shows how to use the pulsar-admin CLI tool to create a Pulsar namespace named
vhost1
and set the retention size and retention time to 100M and 2 days, respectively.bin/pulsar-admin namespaces create -b 1 public/vhost1 bin/pulsar-admin namespaces set-retention -s 100M -t 2d public/vhost1
Create a Maven project and add a dependency to the RabbitMQ client.
# add the RabbitMQ client dependency in your project <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency>
Connect to AoP using the RabbitMQ client.
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import java.io.IOException; import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; public class main { public static void main(String[] args) throws Exception { // create connection ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setVirtualHost("vhost1"); # --- [1] connectionFactory.setHost("BROKER_EXTERNAL_SERVICE"); # --- [2] connectionFactory.setPort(5673); # --- [3] Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); String exchange = "ex"; String queue = "qu"; // declare exchange channel.exchangeDeclare(exchange, BuiltinExchangeType.FANOUT, true, false, false, null); // queue declare and bind channel.queueDeclare(queue, true, false, false, null); channel.queueBind(queue, exchange, queue); // publish some messages for (int i = 0; i < 100; i++) { channel.basicPublish(exchange, "", null, ("hello - " + i).getBytes()); } // consume messages CountDownLatch countDownLatch = new CountDownLatch(100); channel.basicConsume(queue, true, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("receive msg: " + new String(body)); countDownLatch.countDown(); } }); countDownLatch.await(); // release resource channel.close(); connection.close(); } }
- [1]
connectionFactory.setVirtualHost
: represent the Pulsar namespace for AoP. - [2]
connectionFactory.setHost
: represent the Pulsar broker external service. - [3]
connectionFactory.setPort
: represent the port ID for AoP. It is set to port5673
.
- [1]