The ActiveMQ sink connector pulls messages from Pulsar topics and persist messages to ActiveMQ.
Installation
git clone https://github.com/streamnative/pulsar-io-activemq.git
cd pulsar-io-activemq/
mvn clean install -DskipTests
cp target/pulsar-io-activemq-0.0.1.nar $PULSAR_HOME/pulsar-io-activemq-0.0.1.nar
Configuration
The configuration of the ActiveMQ sink connector has the following properties.
ActiveMQ sink connector configuration
Name | Type | Required | Sensitive | Default | Description |
---|
protocol | String | true | false | ”tcp” | The ActiveMQ protocol. |
host | String | true | false | ” ” (empty string) | The ActiveMQ host. |
port | int | true | false | 5672 | The ActiveMQ port. |
username | String | false | true | ” ” (empty string) | The username used to authenticate to ActiveMQ. |
password | String | false | true | ” ” (empty string) | The password used to authenticate to ActiveMQ. |
queueName | String | false | false | ” ” (empty string) | The ActiveMQ queue name that messages should be read from or written to. |
topicName | String | false | false | ” ” (empty string) | The ActiveMQ topic name that messages should be read from or written to. |
activeMessageType | String | false | false | 0 | The ActiveMQ message simple class name. |
Before using the ActiveMQ sink connector, you need to create a configuration file through one of the following methods.
-
JSON
{
"tenant": "public",
"namespace": "default",
"name": "activemq-sink",
"inputs": ["user-op-queue-topic"],
"archive": "connectors/pulsar-io-activemq-2.5.1.nar",
"parallelism": 1,
"configs":
{
"protocol": "tcp",
"host": "localhost",
"port": "61616",
"username": "admin",
"password": "admin",
"queueName": "user-op-queue-pulsar"
}
}
-
YAML
tenant: "public"
namespace: "default"
name: "activemq-sink"
inputs:
- "user-op-queue-topic"
archive: "connectors/pulsar-io-activemq-2.5.1.nar"
parallelism: 1
configs:
protocol: "tcp"
host: "localhost"
port: "61616"
username: "admin"
password: "admin"
queueName: "user-op-queue-pulsar"
Usage
-
Prepare ActiveMQ service.
docker pull rmohr/activemq
docker run -p 61616:61616 -p 8161:8161 rmohr/activemq
-
Put the pulsar-io-activemq-2.5.1.nar
in the pulsar connectors catalog.
cp pulsar-io-activemq-2.5.1.nar $PULSAR_HOME/connectors/pulsar-io-activemq-2.5.1.nar
-
Start Pulsar in standalone mode.
$PULSAR_HOME/bin/pulsar standalone
-
Run ActiveMQ sink locally.
$PULSAR_HOME/bin/pulsar-admin sink localrun --sink-config-file activemq-sink-config.yaml
-
Send Pulsar messages.
$PULSAR_HOME/bin/pulsar-client produce public/default/user-op-queue-topic --messages hello -n 10
-
Consume ActiveMQ messages.
Use the test method receiveMessage
of the class org.apache.pulsar.ecosystem.io.activemq.ActiveMQDemo
to consume ActiveMQ messages.
@Test
private void receiveMessage() throws JMSException, InterruptedException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
@Cleanup
Connection connection = connectionFactory.createConnection();
connection.start();
@Cleanup
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("user-op-queue-pulsar");
@Cleanup
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (message instanceof ActiveMQTextMessage) {
try {
System.out.println("get message ----------------- ");
System.out.println("receive: " + ((ActiveMQTextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
}