sink
AMQP 1.0 Sink Connector
support sink/source for AMQP version 1.0.0

Available on
StreamNative Cloud console

Authored by
StreamNative
Support type
streamnative
License
Apache License 2.0

AMQP 1.0 sink connector

The AMQP 1.0 sink connector pulls messages from Pulsar topics and persists messages to AMQP 1.0.

Quick start

1. Start AMQP 1.0 service

Start a service that supports the AMQP 1.0 protocol, such as Solace.

docker run -d -p 8080:8080 -p:8008:8008 -p:1883:1883 -p:8000:8000 -p:5672:5672 -p:9000:9000 -p:2222:2222 --shm-size=2g --env username_admin_globalaccesslevel=admin --env username_admin_password=admin --name=solace solace/solace-pubsub-standard

2. Create a connector

The following command shows how to use pulsarctl to create a builtin connector. If you want to create a non-builtin connector, you need to replace --sink-type amqp1_0 with --archive /path/to/pulsar-io-amqp1_0.nar. You can find the button to download the nar package at the beginning of the document.

For StreamNative Cloud User

If you are a StreamNative Cloud user, you need set up your environment first.

pulsarctl sinks create \
  --sink-type amqp1_0 \
  --name amqp1_0-sink \
  --tenant public \
  --namespace default \
  --inputs "Your topic name" \
  --parallelism 1 \
  --sink-config \
  '{
    "connection": {
      "failover": {
        "useFailover": true
      },
      "uris": [
        {
          "protocol": "amqp",
          "host": "localhost",
          "port": 5672,
          "urlOptions": [
            "transport.tcpKeepAlive=true"
          ]
        }
      ]
    },
    "username": "guest",
    "password": "guest",
    "queue": "user-op-queue-pulsar"
  }'

The --sink-config is the minimum necessary configuration for starting this connector, and it is a JSON string. You need to substitute the relevant parameters with your own. If you want to configure more parameters, see Configuration Properties for reference.

Note

You can also choose to use a variety of other tools to create a connector:

2. Send messages to the topic

Note

  • If your connector is created on StreamNative Cloud, you need to authenticate your clients. See Build applications using Pulsar clients for more information.
  • The following sample code uses the Apache qpid library.
    public static void main(String[] args) {
        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("{{Your Pulsar URL}}").build();
        Producer<ByteBuffer> producer = pulsarClient.newProducer(Schema.BYTEBUFFER)
                .topic("{{The topic name that you specified when you created the connector}}")
                .create();

        JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory();
        JMSContext jmsContext = jmsConnectionFactory.createContext();

        for (int i = 0; i < 10; i++) {
            JmsTextMessage textMessage = (JmsTextMessage) jmsContext.createTextMessage("text message - " + i);
            ByteBuf byteBuf = (ByteBuf) textMessage.getFacade().encodeMessage();
            producer.send(byteBuf.nioBuffer());
        }
        System.out.println("finish send messages.");
        jmsContext.close();
        pulsarClient.close();
    }

3. Consume data from AMQP 1.0 service

    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new JmsConnectionFactory("guest", "guest", "amqp://localhost:5672");
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession();
        MessageConsumer consumer = session.createConsumer(new JmsQueue("user-op-queue-pulsar"));
        for (int i = 0; i < 10; i++) {
            JmsTextMessage textMessage = (JmsTextMessage) consumer.receive();
            System.out.println("receive msg content: " + textMessage.getText());
            textMessage.acknowledge();
        }
        consumer.close();
        session.close();
        connection.close();
    }

Configuration Properties

Before using the AMQP 1.0 sink connector, you need to configure it.

You can create a configuration file (JSON or YAML) to set the following properties.

NameTypeRequiredDefaultDescription
protocolStringrequired if connection is not used"amqp"[deprecated: use connection instead] The AMQP protocol.
hostStringrequired if connection is not used" " (empty string)[deprecated: use connection instead] The AMQP service host.
portintrequired if connection is not used5672[deprecated: use connection instead] The AMQP service port.
connectionConnectionrequired if protocol, host, port is not used" " (empty string)The connection details.
usernameStringfalse" " (empty string)The username used to authenticate to ActiveMQ.
passwordStringfalse" " (empty string)The password used to authenticate to ActiveMQ.
queueStringfalse" " (empty string)The queue name that messages should be read from or written to.
topicStringfalse" " (empty string)The topic name that messages should be read from or written to.
activeMessageTypeStringfalse0The ActiveMQ message simple class name.
onlyTextMessagebooleanfalsefalseIf it is set to true, the AMQP message type must be set to TextMessage. Pulsar consumers can consume the messages with schema ByteBuffer.

A Connection object can be specified as follows:

NameTypeRequiredDefaultDescription
failoverFailoverfalse" " (empty string)The configuration for a failover connection.
urislist of ConnectionUritrue" " (empty string)A list of ConnectionUri objects. When useFailover is set to true 1 or more should be provided. Currently only 1 uri is supported when useFailover is set to false

A Failover object can be specified as follows:

NameTypeRequiredDefaultDescription
useFailoverbooleantruefalseIf it is set to true, the connection will be created from the uris provided under uris, using qpid's failover connection factory.
jmsClientIdStringrequired if failoverConfigurationOptions is used" " (empty string)Identifying name for the jms Client
failoverConfigurationOptionsList of Stringrequired if jmsClientId is used" " (empty string)A list of options (e.g. <key=value>). The options wil be joined using an '&', prefixed with a the jmsClientId and added to the end of the failoverUri. see also: https://qpid.apache.org/releases/qpid-jms-2.2.0/docs/index.html#failover-configuration-options

A ConnectionUri object can be specified as follows:

NameTypeRequiredDefaultDescription
protocolStringtrue" " (empty string)The AMQP protocol.
hostStringtrue" " (empty string)The AMQP service host.
portinttrue0The AMQP service port.
urlOptionsList of Stringfalse" " (empty string)A list of url-options (e.g. <key=value>). The url options wil be joined using an '&', prefixed with a '?' and added to the end of the uri