AMQP 1.0 sink connector
The AMQP 1.0 sink connector pulls messages from Pulsar topics and persists messages to AMQP 1.0.
This connector is available as a built-in connector on StreamNative Cloud.
Quick start
1. Start AMQP 1.0 service
Start a service that supports the AMQP 1.0 protocol, such as Solace.
docker run -d -p 8080:8080 -p:8008:8008 -p:1883:1883 -p:8000:8000 -p:5672:5672 -p:9000:9000 -p:2222:2222 --shm-size=2g --env username_admin_globalaccesslevel=admin --env username_admin_password=admin --name=solace solace/solace-pubsub-standard
2. Create a connector
The following command shows how to use pulsarctl to create a builtin connector. If you want to create a non-builtin connector,
you need to replace --sink-type amqp1_0 with --archive /path/to/pulsar-io-amqp1_0.nar. You can find the button to download the nar package at the beginning of the document.
pulsarctl sinks create \
--sink-type amqp1_0 \
--name amqp1_0-sink \
--tenant public \
--namespace default \
--inputs "Your topic name" \
--parallelism 1 \
--sink-config \
'{
"connection": {
"failover": {
"useFailover": true
},
"uris": [
{
"protocol": "amqp",
"host": "localhost",
"port": 5672,
"urlOptions": [
"transport.tcpKeepAlive=true"
]
}
]
},
"username": "guest",
"password": "guest",
"queue": "user-op-queue-pulsar"
}'
The --sink-config is the minimum necessary configuration for starting this connector, and it is a JSON string. You need to substitute the relevant parameters with your own.
If you want to configure more parameters, see Configuration Properties for reference.
You can also choose to use a variety of other tools to create a connector:
2. Send messages to the topic
- If your connector is created on StreamNative Cloud, you need to authenticate your clients. See Build applications using Pulsar clients for more information.
- The following sample code uses the Apache qpid library.
public static void main(String[] args) {
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("{{Your Pulsar URL}}").build();
Producer<ByteBuffer> producer = pulsarClient.newProducer(Schema.BYTEBUFFER)
.topic("{{The topic name that you specified when you created the connector}}")
.create();
JmsConnectionFactory jmsConnectionFactory = new JmsConnectionFactory();
JMSContext jmsContext = jmsConnectionFactory.createContext();
for (int i = 0; i < 10; i++) {
JmsTextMessage textMessage = (JmsTextMessage) jmsContext.createTextMessage("text message - " + i);
ByteBuf byteBuf = (ByteBuf) textMessage.getFacade().encodeMessage();
producer.send(byteBuf.nioBuffer());
}
System.out.println("finish send messages.");
jmsContext.close();
pulsarClient.close();
}
3. Consume data from AMQP 1.0 service
public static void main(String[] args) {
ConnectionFactory connectionFactory = new JmsConnectionFactory("guest", "guest", "amqp://localhost:5672");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession();
MessageConsumer consumer = session.createConsumer(new JmsQueue("user-op-queue-pulsar"));
for (int i = 0; i < 10; i++) {
JmsTextMessage textMessage = (JmsTextMessage) consumer.receive();
System.out.println("receive msg content: " + textMessage.getText());
textMessage.acknowledge();
}
consumer.close();
session.close();
connection.close();
}
Configuration Properties
Before using the AMQP 1.0 sink connector, you need to configure it.
You can create a configuration file (JSON or YAML) to set the following properties.
| Name | Type | Required | Sensitive | Default | Description |
|---|
protocol | String | required if connection is not used | false | ”amqp” | [deprecated: use connection instead] The AMQP protocol. |
host | String | required if connection is not used | false | ” ” (empty string) | [deprecated: use connection instead] The AMQP service host. |
port | int | required if connection is not used | false | 5672 | [deprecated: use connection instead] The AMQP service port. |
connection | Connection | required if protocol, host, port is not used | false | ” ” (empty string) | The connection details. |
username | String | false | true | ” ” (empty string) | The username used to authenticate to ActiveMQ. |
password | String | false | true | ” ” (empty string) | The password used to authenticate to ActiveMQ. |
queue | String | false | false | ” ” (empty string) | The queue name that messages should be read from or written to. |
topic | String | false | false | ” ” (empty string) | The topic name that messages should be read from or written to. |
activeMessageType | String | false | false | 0 | The ActiveMQ message simple class name. |
onlyTextMessage | boolean | false | false | false | If it is set to true, the AMQP message type must be set to TextMessage. Pulsar consumers can consume the messages with schema ByteBuffer. |
A Connection object can be specified as follows:
| Name | Type | Required | Default | Description |
|---|
failover | Failover | false | ” ” (empty string) | The configuration for a failover connection. |
uris | list of ConnectionUri | true | ” ” (empty string) | A list of ConnectionUri objects. When useFailover is set to true 1 or more should be provided. Currently only 1 uri is supported when useFailover is set to false |
A Failover object can be specified as follows:
| Name | Type | Required | Default | Description |
|---|
useFailover | boolean | true | false | If it is set to true, the connection will be created from the uris provided under uris, using qpid’s failover connection factory. |
jmsClientId | String | required if failoverConfigurationOptions is used | ” ” (empty string) | Identifying name for the jms Client |
failoverConfigurationOptions | List of String | required if jmsClientId is used | ” ” (empty string) | A list of options (e.g. <key=value>). The options wil be joined using an ’&’, prefixed with a the jmsClientId and added to the end of the failoverUri. see also: https://qpid.apache.org/releases/qpid-jms-2.2.0/docs/index.html#failover-configuration-options |
A ConnectionUri object can be specified as follows:
| Name | Type | Required | Default | Description |
|---|
protocol | String | true | ” ” (empty string) | The AMQP protocol. |
host | String | true | ” ” (empty string) | The AMQP service host. |
port | int | true | 0 | The AMQP service port. |
urlOptions | List of String | false | ” ” (empty string) | A list of url-options (e.g. <key=value>). The url options wil be joined using an ’&’, prefixed with a ’?’ and added to the end of the uri |