The ActiveMQ sink connector pulls messages from Pulsar topics and persist messages to ActiveMQ clusters.
Installation
To install the ActiveMQ sink connector, follow these steps.
Download the NAR package of the ActiveMQ sink connector from here.
Put the NAR package
pulsar-io-activemq-2.5.1.nar
in the pulsar connectors catalog.cp pulsar-io-activemq-2.5.1.nar $PULSAR_HOME/connectors/pulsar-io-activemq-2.5.1.nar
Start Pulsar in standalone mode.
$PULSAR_HOME/bin/pulsar standalone
Run the ActiveMQ sink connector locally.
$PULSAR_HOME/bin/pulsar-admin sink localrun --sink-config-file activemq-sink-config.yaml
Configuration
The configuration of the ActiveMQ sink connector has the following properties.
ActiveMQ sink connector configuration
Name | Type | Required | Default | Description |
---|---|---|---|---|
protocol | String | true | "tcp" | ActiveMQ protocol |
host | String | true | " " (empty string) | ActiveMQ host |
port | int | true | 5672 | ActiveMQ port |
username | String | false | " " (empty string) | Username used to authenticate to ActiveMQ |
password | String | false | " " (empty string) | Password used to authenticate to ActiveMQ |
queueName | String | false | " " (empty string) | ActiveMQ queue name that messages should be read from or written to |
topicName | String | false | " " (empty string) | ActiveMQ topic name that messages should be read from or written to |
activeMessageType | String | false | 0 | ActiveMQ message simple class name |
Configure ActiveMQ sink connector
Before using the ActiveMQ sink connector, you need to create a configuration file through one of the following methods.
JSON
{ "tenant": "public", "namespace": "default", "name": "activemq-sink", "inputs": ["user-op-queue-topic"], "archive": "connectors/pulsar-io-activemq-2.5.1.nar", "parallelism": 1, "configs": { "protocol": "tcp", "host": "localhost", "port": "61616", "username": "admin", "password": "admin", "queueName": "user-op-queue-pulsar" } }
YAML
tenant: "public" namespace: "default" name: "activemq-sink" inputs: - "user-op-queue-topic" archive: "connectors/pulsar-io-activemq-2.5.1.nar" parallelism: 1 configs: protocol: "tcp" host: "localhost" port: "61616" username: "admin" password: "admin" queueName: "user-op-queue-pulsar"
Usage
This section describes how to use the ActiveMQ sink connector to pull messages from Pulsar topics to ActiveMQ clusters.
Prepare ActiveMQ service.
docker pull rmohr/activemq docker run -p 61616:61616 -p 8161:8161 rmohr/activemq
Put the
pulsar-io-activemq-2.5.1.nar
in the pulsar connectors catalog.cp pulsar-io-activemq-2.5.1.nar $PULSAR_HOME/connectors/pulsar-io-activemq-2.5.1.nar
Start Pulsar in standalone mode.
$PULSAR_HOME/bin/pulsar standalone
Run ActiveMQ sink locally.
$PULSAR_HOME/bin/pulsar-admin sink localrun --sink-config-file activemq-sink-config.yaml
Send Pulsar messages.
$PULSAR_HOME/bin/pulsar-client produce public/default/user-op-queue-topic --messages hello -n 10
Consume ActiveMQ messages.
Use the test method
receiveMessage
of the classorg.apache.pulsar.ecosystem.io.activemq.ActiveMQDemo
to consume ActiveMQ messages.public void receiveMessage() throws JMSException, InterruptedException { ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); @Cleanup Connection connection = connectionFactory.createConnection(); connection.start(); @Cleanup Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("user-op-queue-pulsar"); @Cleanup MessageConsumer consumer = session.createConsumer(destination); CountDownLatch countDownLatch = new CountDownLatch(10); consumer.setMessageListener(message -> { if (message instanceof ActiveMQTextMessage) { try { System.out.println("get message ----------------- "); System.out.println("receive: " + ((ActiveMQTextMessage) message).getText()); } catch (JMSException e) { e.printStackTrace(); } countDownLatch.countDown(); } }); countDownLatch.await(); }