ActiveMQ Sink
ActiveMQ Connector integrates Apache Pulsar with Apache ActiveMQ.
Authored by
Support type
Apache License 2.0

The ActiveMQ sink connector pulls messages from Pulsar topics and persist messages to ActiveMQ.


git clone
cd pulsar-io-activemq/
mvn clean install -DskipTests
cp target/pulsar-io-activemq-0.0.1.nar $PULSAR_HOME/pulsar-io-activemq-0.0.1.nar


The configuration of the ActiveMQ sink connector has the following properties.

ActiveMQ sink connector configuration

protocolStringtrue"tcp"The ActiveMQ protocol.
hostStringtrue" " (empty string)The ActiveMQ host.
portinttrue5672The ActiveMQ port.
usernameStringfalse" " (empty string)The username used to authenticate to ActiveMQ.
passwordStringfalse" " (empty string)The password used to authenticate to ActiveMQ.
queueNameStringfalse" " (empty string)The ActiveMQ queue name that messages should be read from or written to.
topicNameStringfalse" " (empty string)The ActiveMQ topic name that messages should be read from or written to.
activeMessageTypeStringfalse0The ActiveMQ message simple class name.

Configure ActiveMQ sink connector

Before using the ActiveMQ sink connector, you need to create a configuration file through one of the following methods.

  • JSON

        "tenant": "public",
        "namespace": "default",
        "name": "activemq-sink",
        "inputs": ["user-op-queue-topic"],
        "archive": "connectors/pulsar-io-activemq-0.0.1.nar",
        "parallelism": 1,
            "protocol": "tcp",
            "host": "localhost",
            "port": "61616",
            "username": "admin",
            "password": "admin",
            "queueName": "user-op-queue-pulsar"
  • YAML

    tenant: "public"
    namespace: "default"
    name: "activemq-sink"
      - "user-op-queue-topic"
    archive: "connectors/pulsar-io-activemq-0.0.1.nar"
    parallelism: 1
        protocol: "tcp"
        host: "localhost"
        port: "61616"
        username: "admin"
        password: "admin"
        queueName: "user-op-queue-pulsar"


  1. Prepare ActiveMQ service.

    docker pull rmohr/activemq
    docker run -p 61616:61616 -p 8161:8161 rmohr/activemq
  2. Put the pulsar-io-activemq-0.0.1.nar in the pulsar connectors catalog.

    cp pulsar-io-activemq-0.0.1.nar $PULSAR_HOME/connectors/pulsar-io-activemq-0.0.1.nar
  3. Start Pulsar in standalone mode.

    $PULSAR_HOME/bin/pulsar standalone
  4. Run ActiveMQ sink locally.

    $PULSAR_HOME/bin/pulsar-admin sink localrun --sink-config-file activemq-sink-config.yaml
  5. Send Pulsar messages.

    $PULSAR_HOME/bin/pulsar-client produce public/default/user-op-queue-topic --messages hello -n 10
  6. Consume ActiveMQ messages.

    Use the test method receiveMessage of the class to consume ActiveMQ messages.

    private void receiveMessage() throws JMSException, InterruptedException {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = connectionFactory.createConnection();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createQueue("user-op-queue-pulsar");
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                if (message instanceof ActiveMQTextMessage) {
                    try {
                        System.out.println("get message ----------------- ");
                        System.out.println("receive: " + ((ActiveMQTextMessage) message).getText());
                    } catch (JMSException e) {