support sink/source for AMQP version 1.0.0

Available on
StreamNative Cloud console

Authored by
Support type
Apache License 2.0

AMQP1_0 sink connector

The AMQP1_0 sink connector pulls messages from Pulsar topics and persists messages to AMQP 1.0.

How to get

You can get the AMQP1_0 sink connector using one of the following methods:

  • Download the NAR package from here.

  • Build it from source code.

    1. Clone the source code to your machine.

      git clone
    2. Assume that PULSAR_IO_AMQP1_0_HOME is the home directory for the pulsar-io-amqp1_0 repo. Build the connector in the ${PULSAR_IO_AMQP1_0_HOME} directory.

      mvn clean install -DskipTests

      After the connector is successfully built, a NAR package is generated under the target directory.

      ls pulsar-io-amqp1_0/target

How to configure

Before using the AMQP1_0 sink connector, you need to configure it.

You can create a configuration file (JSON or YAML) to set the following properties.

protocolStringtrue"amqp"The AMQP protocol.
hostStringtrue" " (empty string)The AMQP service host.
portinttrue5672The AMQP service port.
usernameStringfalse" " (empty string)The username used to authenticate to ActiveMQ.
passwordStringfalse" " (empty string)The password used to authenticate to ActiveMQ.
queueStringfalse" " (empty string)The queue name that messages should be read from or written to.
topicStringfalse" " (empty string)The topic name that messages should be read from or written to.
activeMessageTypeStringfalse0The ActiveMQ message simple class name.
onlyTextMessagebooleanfalsefalseIf it is set to true, the AMQP message type must be set to TextMessage. Pulsar consumers can consume the messages with schema ByteBuffer.


  • JSON

        "tenant": "public",
        "namespace": "default",
        "name": "amqp1_0-sink",
        "inputs": ["user-op-queue-topic"],
        "archive": "connectors/pulsar-io-amqp1_0-{version}.nar",
        "parallelism": 1,
            "protocol": "amqp",
            "host": "localhost",
            "port": "5672",
            "username": "guest",
            "password": "guest",
            "queue": "user-op-queue-pulsar"
  • YAML

    tenant: "public"
    namespace: "default"
    name: "amqp1_0-sink"
      - "user-op-queue-topic"
    archive: "connectors/pulsar-io-amqp1_0-{version}.nar"
    parallelism: 1
        protocol: "amqp"
        host: "localhost"
        port: "5672"
        username: "guest"
        password: "guest"
        queue: "user-op-queue-pulsar"

How to use

You can use the AMQP1_0 sink connector as a non built-in connector or a built-in connector as below.

Use as non built-in connector

If you already have a Pulsar cluster, you can use the AMQP1_0 sink connector as a non built-in connector directly.

This example shows how to create an AMQP1_0 sink connector on a Pulsar cluster using the command pulsar-admin sinks create.

PULSAR_HOME/bin/pulsar-admin sinks create \
--name amqp1_0-sink \
--archive pulsar-io-amqp1_0-{version}.nar \
--classname \
--sink-config-file amqp-sink-config.yaml

Use as built-in connector

You can make the AMQP1_0 sink connector as a built-in connector and use it on standalone cluster, on-premises cluster, or K8S cluster.

Standalone cluster

  1. Prepare AMQP service using Solace.

    docker run -d -p 8080:8080 -p:8008:8008 -p:1883:1883 -p:8000:8000 -p:5672:5672 -p:9000:9000 -p:2222:2222 --shm-size=2g --env username_admin_globalaccesslevel=admin --env username_admin_password=admin --name=solace solace/solace-pubsub-standard
  2. Copy the NAR package of the AMQP1_0 sink connector to the Pulsar connectors directory.

    cp pulsar-io-amqp1_0-{version}.nar
  3. Start Pulsar in standalone mode.

    PULSAR_HOME/bin/pulsar standalone

    You can find the similar logs as below.

    Searching for connectors in /Volumes/other/apache-pulsar-2.8.0-SNAPSHOT/./connectors
    Found connector ConnectorDefinition(name=amqp1_0, description=AMQP1_0 source and AMQP1_0 connector,,, sourceConfigClass=null, sinkConfigClass=null) from /Volumes/other/apache-pulsar-2.8.0-SNAPSHOT/./connectors/pulsar-io-amqp1_0-2.7.0.nar
    Searching for functions in /Volumes/other/apache-pulsar-2.8.0-SNAPSHOT/./functions
  4. Create an AMQP1_0 sink.


    PULSAR_HOME/bin/pulsar-admin sinks create \
    --sink-config-file amqp-sink-config.yaml \
    --custom-schema-inputs '{"user-op-queue-topic": "org.apache.pulsar.client.impl.schema.ByteBufferSchema"}'


    "Created successfully"

    Verify whether the sink is created successfully or not.


    PULSAR_HOME/bin/pulsar-admin sinks list



    Check the sink status.


    PULSAR_HOME/bin/pulsar-admin sinks status --name amqp1_0-sink


      "numInstances" : 1,
      "numRunning" : 1,
      "instances" : [ {
        "instanceId" : 0,
        "status" : {
          "running" : true,
          "error" : "",
          "numRestarts" : 0,
          "numReadFromPulsar" : 0,
          "numSystemExceptions" : 0,
          "latestSystemExceptions" : [ ],
          "numSinkExceptions" : 0,
          "latestSinkExceptions" : [ ],
          "numWrittenToSink" : 0,
          "lastReceivedTime" : 0,
          "workerId" : "c-standalone-fw-localhost-8080"
      } ]
  5. Send messages to Pulsar topics.

    public void generateMessages() throws Exception {
        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
        Producer<ByteBuffer> producer = pulsarClient.newProducer(Schema.BYTEBUFFER)
        JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory();
        JMSContext jmsContext = jmsConnectionFactory.createContext();
        for (int i = 0; i < 10; i++) {
            JmsTextMessage textMessage = (JmsTextMessage) jmsContext.createTextMessage("text message - " + i);
            ByteBuf byteBuf = (ByteBuf) textMessage.getFacade().encodeMessage();
        System.out.println("finish send messages.");
  6. Consume messages from AMQP service using the receiveMessages method.


    public void receiveMessages() throws Exception {
        ConnectionFactory connectionFactory = new JmsConnectionFactory("guest", "guest", "amqp://localhost:5672");
        Connection connection = connectionFactory.createConnection();
        Session session = connection.createSession();
        MessageConsumer consumer = session.createConsumer(new JmsQueue("user-op-queue-pulsar"));
        for (int i = 0; i < 10; i++) {
            JmsTextMessage textMessage = (JmsTextMessage) consumer.receive();
            System.out.println("receive msg content: " + textMessage.getText());

    Check the sink status.


    PULSAR_HOME/bin/pulsar-admin sinks status --name amqp1_0-sink


      "numInstances" : 1,
      "numRunning" : 1,
      "instances" : [ {
        "instanceId" : 0,
        "status" : {
          "running" : true,
          "error" : "",
          "numRestarts" : 0,
          "numReadFromPulsar" : 10,
          "numSystemExceptions" : 0,
          "latestSystemExceptions" : [ ],
          "numSinkExceptions" : 0,
          "latestSinkExceptions" : [ ],
          "numWrittenToSink" : 10,
          "lastReceivedTime" : 1615192471713,
          "workerId" : "c-standalone-fw-localhost-8080"
      } ]

On-premises cluster

This example explains how to create an AMQP1_0 sink connector on an on-premises cluster.

  1. Copy the NAR package of the AMQP1_0 connector to the Pulsar connectors directory.

    cp pulsar-io-amqp1_0-{version}.nar $PULSAR_HOME/connectors/pulsar-io-amqp1_0-{version}.nar
  2. Reload all built-in connectors.

    PULSAR_HOME/bin/pulsar-admin sinks reload
  3. Check whether the AMQP1_0 sink connector is available on the list or not.

    PULSAR_HOME/bin/pulsar-admin sinks available-sinks
  4. Create an AMQP1_0 sink connector on a Pulsar cluster using the pulsar-admin sinks create command.

    PULSAR_HOME/bin/pulsar-admin sinks create \
    --sink-type amqp1_0 \
    --sink-config-file amqp-sink-config.yaml \
    --name amqp1_0-sink

K8S cluster

This example demonstrates how to create an AMQP1_0 sink connector on a K8S cluster.

  1. Build a new image based on the Pulsar image with the AMQP1_0 sink connector and push the new image to your image registry.

    This example tags the new image as streamnative/pulsar-amqp1_0:2.7.0.

    FROM apachepulsar/pulsar-all:2.7.0
    RUN curl{version}/pulsar-io-amqp1_0-{version}.nar -o /pulsar/connectors/pulsar-io-amqp1_0-{version}.nar
  2. Extract the previous --set arguments from K8S to the pulsar.yaml file.

    helm get values <release-name> > pulsar.yaml
  3. Replace the images section in the pulsar.yaml file with the images section of streamnative/pulsar-amqp1_0:2.7.0.

  4. Upgrade the K8S cluster with the pulsar.yaml file.

    helm upgrade <release-name> streamnative/pulsar \
        --version <new version> \
        -f pulsar.yaml


    For more information about how to upgrade a Pulsar cluster with Helm, see Upgrade Guide.

  5. Create an AMQP1_0 sink connector on a Pulsar cluster using the pulsar-admin sinks create command.

    PULSAR_HOME/bin/pulsar-admin sinks create \
    --sink-type amqp1_0 \
    --sink-config-file amqp-sink-config.yaml \
    --name amqp1_0-sink