> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
> Use this file to discover all available pages before exploring further.

> ActiveMQ Connector integrates Apache Pulsar with Apache ActiveMQ.

# Io activemq sink

The ActiveMQ sink connector pulls messages from Pulsar topics and persist messages to ActiveMQ.

# Installation

```
git clone https://github.com/streamnative/pulsar-io-activemq.git
cd pulsar-io-activemq/
mvn clean install -DskipTests
cp target/pulsar-io-activemq-0.0.1.nar $PULSAR_HOME/pulsar-io-activemq-0.0.1.nar
```

# Configuration

The configuration of the ActiveMQ sink connector has the following properties.

## ActiveMQ sink connector configuration

| Name                | Type   | Required | Sensitive | Default            | Description                                                              |
| ------------------- | ------ | -------- | --------- | ------------------ | ------------------------------------------------------------------------ |
| `protocol`          | String | true     | false     | "tcp"              | The ActiveMQ protocol.                                                   |
| `host`              | String | true     | false     | " " (empty string) | The ActiveMQ host.                                                       |
| `port`              | int    | true     | false     | 5672               | The ActiveMQ port.                                                       |
| `username`          | String | false    | true      | " " (empty string) | The username used to authenticate to ActiveMQ.                           |
| `password`          | String | false    | true      | " " (empty string) | The password used to authenticate to ActiveMQ.                           |
| `queueName`         | String | false    | false     | " " (empty string) | The ActiveMQ queue name that messages should be read from or written to. |
| `topicName`         | String | false    | false     | " " (empty string) | The ActiveMQ topic name that messages should be read from or written to. |
| `activeMessageType` | String | false    | false     | 0                  | The ActiveMQ message simple class name.                                  |

## Configure ActiveMQ sink connector

Before using the ActiveMQ sink connector, you need to create a configuration file through one of the following methods.

* JSON

  ```json theme={null}
  {
      "tenant": "public",
      "namespace": "default",
      "name": "activemq-sink",
      "inputs": ["user-op-queue-topic"],
      "archive": "connectors/pulsar-io-activemq-2.5.1.nar",
      "parallelism": 1,
      "configs":
      {
          "protocol": "tcp",
          "host": "localhost",
          "port": "61616",
          "username": "admin",
          "password": "admin",
          "queueName": "user-op-queue-pulsar"
      }
  }
  ```

* YAML

  ```yaml theme={null}
  tenant: "public"
  namespace: "default"
  name: "activemq-sink"
  inputs: 
    - "user-op-queue-topic"
  archive: "connectors/pulsar-io-activemq-2.5.1.nar"
  parallelism: 1

  configs:
      protocol: "tcp"
      host: "localhost"
      port: "61616"
      username: "admin"
      password: "admin"
      queueName: "user-op-queue-pulsar"
  ```

# Usage

1. Prepare ActiveMQ service.

   ```
   docker pull rmohr/activemq
   docker run -p 61616:61616 -p 8161:8161 rmohr/activemq
   ```

2. Put the `pulsar-io-activemq-2.5.1.nar` in the pulsar connectors catalog.

   ```
   cp pulsar-io-activemq-2.5.1.nar $PULSAR_HOME/connectors/pulsar-io-activemq-2.5.1.nar
   ```

3. Start Pulsar in standalone mode.

   ```
   $PULSAR_HOME/bin/pulsar standalone
   ```

4. Run ActiveMQ sink locally.

   ```
   $PULSAR_HOME/bin/pulsar-admin sink localrun --sink-config-file activemq-sink-config.yaml
   ```

5. Send Pulsar messages.

   ```
   $PULSAR_HOME/bin/pulsar-client produce public/default/user-op-queue-topic --messages hello -n 10
   ```

6. Consume ActiveMQ messages.

   Use the test method `receiveMessage` of the class `org.apache.pulsar.ecosystem.io.activemq.ActiveMQDemo`
   to consume ActiveMQ messages.

   ```
   @Test
   private void receiveMessage() throws JMSException, InterruptedException {

       ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");

       @Cleanup
       Connection connection = connectionFactory.createConnection();
       connection.start();

       @Cleanup
       Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

       Destination destination = session.createQueue("user-op-queue-pulsar");

       @Cleanup
       MessageConsumer consumer = session.createConsumer(destination);

       consumer.setMessageListener(new MessageListener() {
           @Override
           public void onMessage(Message message) {
               if (message instanceof ActiveMQTextMessage) {
                   try {
                       System.out.println("get message ----------------- ");
                       System.out.println("receive: " + ((ActiveMQTextMessage) message).getText());
                   } catch (JMSException e) {
                       e.printStackTrace();
                   }
               }
           }
       });
   }
   ```
