AMQP1_0 sink connector
The AMQP1_0 sink connector pulls messages from Pulsar topics and persists messages to AMQP 1.0.
How to get
You can get the AMQP1_0 sink connector using one of the following methods:
Download the NAR package from here.
Build it from source code.
Clone the source code to your machine.
git clone https://github.com/streamnative/pulsar-io-amqp-1-0
Assume that
PULSAR_IO_AMQP1_0_HOME
is the home directory for thepulsar-io-amqp1_0
repo. Build the connector in the${PULSAR_IO_AMQP1_0_HOME}
directory.mvn clean install -DskipTests
After the connector is successfully built, a
NAR
package is generated under thetarget
directory.ls pulsar-io-amqp1_0/target pulsar-io-amqp1_0-{version}.nar
How to configure
Before using the AMQP1_0 sink connector, you need to configure it.
You can create a configuration file (JSON or YAML) to set the following properties.
Name | Type | Required | Default | Description |
---|---|---|---|---|
protocol | String | true | "amqp" | The AMQP protocol. |
host | String | true | " " (empty string) | The AMQP service host. |
port | int | true | 5672 | The AMQP service port. |
username | String | false | " " (empty string) | The username used to authenticate to ActiveMQ. |
password | String | false | " " (empty string) | The password used to authenticate to ActiveMQ. |
queue | String | false | " " (empty string) | The queue name that messages should be read from or written to. |
topic | String | false | " " (empty string) | The topic name that messages should be read from or written to. |
activeMessageType | String | false | 0 | The ActiveMQ message simple class name. |
onlyTextMessage | boolean | false | false | If it is set to true , the AMQP message type must be set to TextMessage . Pulsar consumers can consume the messages with schema ByteBuffer. |
Example
JSON
{ "tenant": "public", "namespace": "default", "name": "amqp1_0-sink", "inputs": ["user-op-queue-topic"], "archive": "connectors/pulsar-io-amqp1_0-{version}.nar", "parallelism": 1, "configs": { "protocol": "amqp", "host": "localhost", "port": "5672", "username": "guest", "password": "guest", "queue": "user-op-queue-pulsar" } }
YAML
tenant: "public" namespace: "default" name: "amqp1_0-sink" inputs: - "user-op-queue-topic" archive: "connectors/pulsar-io-amqp1_0-{version}.nar" parallelism: 1 configs: protocol: "amqp" host: "localhost" port: "5672" username: "guest" password: "guest" queue: "user-op-queue-pulsar"
How to use
You can use the AMQP1_0 sink connector as a non built-in connector or a built-in connector as below.
Use as non built-in connector
If you already have a Pulsar cluster, you can use the AMQP1_0 sink connector as a non built-in connector directly.
This example shows how to create an AMQP1_0 sink connector on a Pulsar cluster using the command pulsar-admin sinks create
.
PULSAR_HOME/bin/pulsar-admin sinks create \
--name amqp1_0-sink \
--archive pulsar-io-amqp1_0-{version}.nar \
--classname org.apache.pulsar.ecosystem.io.amqp.AmqpSink \
--sink-config-file amqp-sink-config.yaml
Use as built-in connector
You can make the AMQP1_0 sink connector as a built-in connector and use it on standalone cluster, on-premises cluster, or K8S cluster.
Standalone cluster
Prepare AMQP service using Solace.
docker run -d -p 8080:8080 -p:8008:8008 -p:1883:1883 -p:8000:8000 -p:5672:5672 -p:9000:9000 -p:2222:2222 --shm-size=2g --env username_admin_globalaccesslevel=admin --env username_admin_password=admin --name=solace solace/solace-pubsub-standard
Copy the NAR package of the AMQP1_0 sink connector to the Pulsar connectors directory.
cp pulsar-io-amqp1_0-{version}.nar $PULSAR_HOME/connectors/pulsar-io-amqp1_0-{version}.nar
Start Pulsar in standalone mode.
PULSAR_HOME/bin/pulsar standalone
You can find the similar logs as below.
Searching for connectors in /Volumes/other/apache-pulsar-2.8.0-SNAPSHOT/./connectors Found connector ConnectorDefinition(name=amqp1_0, description=AMQP1_0 source and AMQP1_0 connector, sourceClass=org.apache.pulsar.ecosystem.io.amqp.AmqpSource, sinkClass=org.apache.pulsar.ecosystem.io.amqp.AmqpSink, sourceConfigClass=null, sinkConfigClass=null) from /Volumes/other/apache-pulsar-2.8.0-SNAPSHOT/./connectors/pulsar-io-amqp1_0-2.7.0.nar Searching for functions in /Volumes/other/apache-pulsar-2.8.0-SNAPSHOT/./functions
Create an AMQP1_0 sink.
Input
PULSAR_HOME/bin/pulsar-admin sinks create \ --sink-config-file amqp-sink-config.yaml \ --custom-schema-inputs '{"user-op-queue-topic": "org.apache.pulsar.client.impl.schema.ByteBufferSchema"}'
Output
"Created successfully"
Verify whether the sink is created successfully or not.
Input
PULSAR_HOME/bin/pulsar-admin sinks list
Output
[ "amqp1_0-sink" ]
Check the sink status.
Input
PULSAR_HOME/bin/pulsar-admin sinks status --name amqp1_0-sink
Output
"numInstances" : 1, "numRunning" : 1, "instances" : [ { "instanceId" : 0, "status" : { "running" : true, "error" : "", "numRestarts" : 0, "numReadFromPulsar" : 0, "numSystemExceptions" : 0, "latestSystemExceptions" : [ ], "numSinkExceptions" : 0, "latestSinkExceptions" : [ ], "numWrittenToSink" : 0, "lastReceivedTime" : 0, "workerId" : "c-standalone-fw-localhost-8080" } } ] }
Send messages to Pulsar topics.
@Test public void generateMessages() throws Exception { @Cleanup PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build(); @Cleanup Producer<ByteBuffer> producer = pulsarClient.newProducer(Schema.BYTEBUFFER) .topic("user-op-queue-topic") .create(); JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory(); @Cleanup JMSContext jmsContext = jmsConnectionFactory.createContext(); for (int i = 0; i < 10; i++) { JmsTextMessage textMessage = (JmsTextMessage) jmsContext.createTextMessage("text message - " + i); ByteBuf byteBuf = (ByteBuf) textMessage.getFacade().encodeMessage(); producer.send(byteBuf.nioBuffer()); } System.out.println("finish send messages."); }
Consume messages from AMQP service using the
receiveMessages
method.Input
@Test public void receiveMessages() throws Exception { ConnectionFactory connectionFactory = new JmsConnectionFactory("guest", "guest", "amqp://localhost:5672"); @Cleanup Connection connection = connectionFactory.createConnection(); connection.start(); @Cleanup Session session = connection.createSession(); @Cleanup MessageConsumer consumer = session.createConsumer(new JmsQueue("user-op-queue-pulsar")); for (int i = 0; i < 10; i++) { JmsTextMessage textMessage = (JmsTextMessage) consumer.receive(); System.out.println("receive msg content: " + textMessage.getText()); textMessage.acknowledge(); } }
Check the sink status.
Input
PULSAR_HOME/bin/pulsar-admin sinks status --name amqp1_0-sink
Output
{ "numInstances" : 1, "numRunning" : 1, "instances" : [ { "instanceId" : 0, "status" : { "running" : true, "error" : "", "numRestarts" : 0, "numReadFromPulsar" : 10, "numSystemExceptions" : 0, "latestSystemExceptions" : [ ], "numSinkExceptions" : 0, "latestSinkExceptions" : [ ], "numWrittenToSink" : 10, "lastReceivedTime" : 1615192471713, "workerId" : "c-standalone-fw-localhost-8080" } } ] }
On-premises cluster
This example explains how to create an AMQP1_0 sink connector on an on-premises cluster.
Copy the NAR package of the AMQP1_0 connector to the Pulsar connectors directory.
cp pulsar-io-amqp1_0-{version}.nar $PULSAR_HOME/connectors/pulsar-io-amqp1_0-{version}.nar
Reload all built-in connectors.
PULSAR_HOME/bin/pulsar-admin sinks reload
Check whether the AMQP1_0 sink connector is available on the list or not.
PULSAR_HOME/bin/pulsar-admin sinks available-sinks
Create an AMQP1_0 sink connector on a Pulsar cluster using the
pulsar-admin sinks create
command.PULSAR_HOME/bin/pulsar-admin sinks create \ --sink-type amqp1_0 \ --sink-config-file amqp-sink-config.yaml \ --name amqp1_0-sink
K8S cluster
This example demonstrates how to create an AMQP1_0 sink connector on a K8S cluster.
Build a new image based on the Pulsar image with the AMQP1_0 sink connector and push the new image to your image registry.
This example tags the new image as
streamnative/pulsar-amqp1_0:2.7.0
.FROM apachepulsar/pulsar-all:2.7.0 RUN curl https://github.com/streamnative/pulsar-io-amqp1-0/releases/download/v{version}/pulsar-io-amqp1_0-{version}.nar -o /pulsar/connectors/pulsar-io-amqp1_0-{version}.nar
Extract the previous
--set
arguments from K8S to thepulsar.yaml
file.helm get values <release-name> > pulsar.yaml
Replace the
images
section in thepulsar.yaml
file with theimages
section ofstreamnative/pulsar-amqp1_0:2.7.0
.Upgrade the K8S cluster with the
pulsar.yaml
file.helm upgrade <release-name> streamnative/pulsar \ --version <new version> \ -f pulsar.yaml
Tip
For more information about how to upgrade a Pulsar cluster with Helm, see Upgrade Guide.
Create an AMQP1_0 sink connector on a Pulsar cluster using the
pulsar-admin sinks create
command.PULSAR_HOME/bin/pulsar-admin sinks create \ --sink-type amqp1_0 \ --sink-config-file amqp-sink-config.yaml \ --name amqp1_0-sink