Documentation Index
Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
Use this file to discover all available pages before exploring further.
The ActiveMQ source connector receives messages from ActiveMQ clusters and writes messages to Pulsar topics.
Installation
git clone https://github.com/streamnative/pulsar-io-activemq.git
cd pulsar-io-activemq/
mvn clean install -DskipTests
cp target/pulsar-io-activemq-0.0.1.nar $PULSAR_HOME/pulsar-io-activemq-0.0.1.nar
Configuration
The configuration of the ActiveMQ source connector has the following properties.
ActiveMQ source connector configuration
| Name | Type | Required | Sensitive | Default | Description |
|---|
protocol | String | true | false | ”tcp” | The ActiveMQ protocol. |
host | String | true | false | ” ” (empty string) | The ActiveMQ host. |
port | int | true | false | 5672 | The ActiveMQ port. |
username | String | false | true | ” ” (empty string) | The username used to authenticate to ActiveMQ. |
password | String | false | true | ” ” (empty string) | The password used to authenticate to ActiveMQ. |
queueName | String | false | false | ” ” (empty string) | The ActiveMQ queue name that messages should be read from or written to. |
topicName | String | false | false | ” ” (empty string) | The ActiveMQ topic name that messages should be read from or written to. |
Before using the ActiveMQ source connector, you need to create a configuration file through one of the following methods.
-
JSON
{
"tenant": "public",
"namespace": "default",
"name": "activemq-source",
"topicName": "user-op-queue-topic",
"archive": "connectors/pulsar-io-activemq-2.5.1.nar",
"parallelism": 1,
"configs": {
"protocol": "tcp",
"host": "localhost",
"port": "61616",
"username": "admin",
"password": "admin",
"queueName": "user-op-queue"
}
}
-
YAML
tenant: "public"
namespace: "default"
name: "activemq-source"
topicName: "user-op-queue-topic"
archive: "connectors/pulsar-io-activemq-2.5.1.nar"
parallelism: 1
configs:
protocol: "tcp"
host: "localhost"
port: "61616"
username: "admin"
password: "admin"
queueName: "user-op-queue"
-
Prepare ActiveMQ service.
docker pull rmohr/activemq
docker run -p 61616:61616 -p 8161:8161 rmohr/activemq
-
Put the
pulsar-io-activemq-2.5.1.nar in the pulsar connectors catalog.
cp pulsar-io-activemq-2.5.1.nar $PULSAR_HOME/connectors/pulsar-io-activemq-2.5.1.nar
-
Start Pulsar in standalone mode.
$PULSAR_HOME/bin/pulsar standalone
-
Run ActiveMQ source locally.
$PULSAR_HOME/bin/pulsar-admin source localrun --source-config-file activemq-source-config.yaml
-
Consume Pulsar messages.
bin/pulsar-client consume -s "sub-products" public/default/user-op-queue-topic -n 0
-
Send ActiveMQ messages.
Use the test method
sendMessage of the class org.apache.pulsar.ecosystem.io.activemq.ActiveMQDemo
to send ActiveMQ messages.
@Test
private void sendMessage() throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
@Cleanup
Connection connection = connectionFactory.createConnection();
connection.start();
@Cleanup
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination destination = session.createQueue("user-op-queue");
@Cleanup
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for (int i = 0; i < 10; i++) {
String msgContent = "Hello ActiveMQ - " + i;
ActiveMQTextMessage message = new ActiveMQTextMessage();
message.setText(msgContent);
producer.send(message);
}
}