sink
ActiveMQ Sink
The ActiveMQ sink connector pulls messages from Pulsar topics and persist messages to ActiveMQ clusters.
Authored by
ASF
Support type
StreamNative
License
Apache License 2.0

The ActiveMQ sink connector pulls messages from Pulsar topics and persist messages to ActiveMQ clusters.

Installation

To install the ActiveMQ sink connector, follow these steps.

  1. Download the NAR package of the ActiveMQ sink connector from here.

  2. Put the NAR package pulsar-io-activemq-2.5.1.nar in the pulsar connectors catalog.

    cp pulsar-io-activemq-2.5.1.nar $PULSAR_HOME/connectors/pulsar-io-activemq-2.5.1.nar
    
  3. Start Pulsar in standalone mode.

    $PULSAR_HOME/bin/pulsar standalone
    
  4. Run the ActiveMQ sink connector locally.

    $PULSAR_HOME/bin/pulsar-admin sink localrun --sink-config-file activemq-sink-config.yaml
    

Configuration

The configuration of the ActiveMQ sink connector has the following properties.

ActiveMQ sink connector configuration

NameTypeRequiredDefaultDescription
protocolStringtrue"tcp"ActiveMQ protocol
hostStringtrue" " (empty string)ActiveMQ host
portinttrue5672ActiveMQ port
usernameStringfalse" " (empty string)Username used to authenticate to ActiveMQ
passwordStringfalse" " (empty string)Password used to authenticate to ActiveMQ
queueNameStringfalse" " (empty string)ActiveMQ queue name that messages should be read from or written to
topicNameStringfalse" " (empty string)ActiveMQ topic name that messages should be read from or written to
activeMessageTypeStringfalse0ActiveMQ message simple class name

Configure ActiveMQ sink connector

Before using the ActiveMQ sink connector, you need to create a configuration file through one of the following methods.

  • JSON

    {
        "tenant": "public",
        "namespace": "default",
        "name": "activemq-sink",
        "inputs": ["user-op-queue-topic"],
        "archive": "connectors/pulsar-io-activemq-2.5.1.nar",
        "parallelism": 1,
        "configs":
        {
            "protocol": "tcp",
            "host": "localhost",
            "port": "61616",
            "username": "admin",
            "password": "admin",
            "queueName": "user-op-queue-pulsar"
        }
    }
    
  • YAML

    tenant: "public"
    namespace: "default"
    name: "activemq-sink"
    inputs: 
      - "user-op-queue-topic"
    archive: "connectors/pulsar-io-activemq-2.5.1.nar"
    parallelism: 1
    
    configs:
        protocol: "tcp"
        host: "localhost"
        port: "61616"
        username: "admin"
        password: "admin"
        queueName: "user-op-queue-pulsar"
    

Usage

This section describes how to use the ActiveMQ sink connector to pull messages from Pulsar topics to ActiveMQ clusters.

  1. Prepare ActiveMQ service.

    docker pull rmohr/activemq
    docker run -p 61616:61616 -p 8161:8161 rmohr/activemq
    
  2. Put the pulsar-io-activemq-2.5.1.nar in the pulsar connectors catalog.

    cp pulsar-io-activemq-2.5.1.nar $PULSAR_HOME/connectors/pulsar-io-activemq-2.5.1.nar
    
  3. Start Pulsar in standalone mode.

    $PULSAR_HOME/bin/pulsar standalone
    
  4. Run ActiveMQ sink locally.

    $PULSAR_HOME/bin/pulsar-admin sink localrun --sink-config-file activemq-sink-config.yaml
    
  5. Send Pulsar messages.

    $PULSAR_HOME/bin/pulsar-client produce public/default/user-op-queue-topic --messages hello -n 10
    
  6. Consume ActiveMQ messages.

    Use the test method receiveMessage of the class org.apache.pulsar.ecosystem.io.activemq.ActiveMQDemo to consume ActiveMQ messages.

    public void receiveMessage() throws JMSException, InterruptedException {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    
        @Cleanup
        Connection connection = connectionFactory.createConnection();
        connection.start();
    
        @Cleanup
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("user-op-queue-pulsar");
        
        @Cleanup
        MessageConsumer consumer = session.createConsumer(destination);
        CountDownLatch countDownLatch = new CountDownLatch(10);
        consumer.setMessageListener(message -> {
            if (message instanceof ActiveMQTextMessage) {
                try {
                    System.out.println("get message ----------------- ");
                    System.out.println("receive: " + ((ActiveMQTextMessage) message).getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
            }
        });
        countDownLatch.await();
    }