Sqs sink
The SQS source connector is used to consume messages from Pulsar topics and publish them to AWS SQS.
The AWS Simple Queue Service (SQS) sink connector pulls data from Pulsar topics and persists data to AWS SQS.
Quick start
Prerequisites
The prerequisites for connecting an AWS SQS sink connector to external systems include:
- Create SQS in AWS.
- Create the AWS User and create
AccessKey
(Please recordAccessKey
andSecretAccessKey
). - Assign the following permissions to the AWS User:
- sqs:CreateQueue
- sqs:SendMessage
1. Create a connector
The following command shows how to use pulsarctl to create a builtin
connector. If you want to create a non-builtin
connector,
you need to replace --sink-type sqs
with --archive /path/to/pulsar-io-sqs.nar
. You can find the button to download the nar
package at the beginning of the document.
If you are a StreamNative Cloud user, you need set up your environment first.
The --sink-config
is the minimum necessary configuration for starting this connector, and it is a JSON string. You need to substitute the relevant parameters with your own.
If you want to configure more parameters, see Configuration Properties for reference.
You can also choose to use a variety of other tools to create a connector:
- pulsar-admin: The command arguments for
pulsar-admin
are similar to those ofpulsarctl
. You can find an example for StreamNative Cloud Doc. - RestAPI: You can find an example for StreamNative Cloud Doc.
- Terraform: You can find an example for StreamNative Cloud Doc.
- Function Mesh: The docker image can be found at the beginning of the document.
2. Send messages to the topic
If your connector is created on StreamNative Cloud, you need to authenticate your clients. See Build applications using Pulsar clients for more information.
3. Show data on AWS SQS
You can use the following simple code to receive messages from AWS SQS.
Configuration Properties
Before using the AWS SQS sink connector, you need to configure it. This table outlines the properties and the Descriptions of an AWS SQS sink connector.
Name | Type | Required | Sensitive | Default | Description |
---|---|---|---|---|---|
awsRegion | String | true | false | ” ” (empty string) | Supported AWS region. For example, us-west-1, us-west-2. |
queueName | String | true | false | ” ” (empty string) | The name of the SQS queue that messages should be read from or written to. |
awsCredentialPluginName | String | false | false | ” ” (empty string) | The fully-qualified class name of implementation of AwsCredentialProviderPlugin. For more information, see [Configure AwsCredentialProviderPlugin](###Configure AwsCredentialProviderPlugin). |
awsCredentialPluginParam | String | false | true | ” ” (empty string) | The JSON parameter to initialize awsCredentialsProviderPlugin . For more information, see [Configure AwsCredentialProviderPlugin](###Configure AwsCredentialProviderPlugin). |
awsEndpoint | String | false | false | ” ” (empty string) | AWS SQS end-point URL. You can find it at AWS SQS Service endpoints. |
metadataFields | String | false | false | ”pulsar.key” | The metadata fields to be sent to the SQS message attributes. Valid values are ‘pulsar.topic, pulsar.key, pulsar.partitionIndex, pulsar.sequence, pulsar.properties.{{Your properties key}} , pulsar.eventTime’ |
Configure AwsCredentialProviderPlugin
AWS SQS sink connector allows you to use three ways to connect to AWS SQS by configuring awsCredentialPluginName
.
-
Leave
awsCredentialPluginName
empty to get the connector authenticated by passingaccessKey
andsecretKey
inawsCredentialPluginParam
. -
Set
awsCredentialPluginName
toorg.apache.pulsar.io.aws.AwsDefaultProviderChainPlugin
to use the default AWS provider chain. With this option, you don’t need to configureawsCredentialPluginParam
. For more information, see AWS documentation. -
Set
awsCredentialPluginName
toorg.apache.pulsar.io.aws.STSAssumeRoleProviderPlugin
to use the default AWS provider chain, and you need to configureroleArn
androleSessionNmae
inawsCredentialPluginParam
. For more information, see AWS documentation.
Advanced features
Schema Support
The AWS SQS sink connector supports the following schema types: Primitive Schema
, Avro Schema
, and JSON Schema
.
Primitive Schema
For the primitive type, the payload format is as follows:
The value types include: Number, Boolean, and String. Here’s a table indicating the conversion type for each Primitive Schema Type:
Primitive Schema Type | JSON Conversion Type | Example |
---|---|---|
Boolean | Boolean | true |
INT8, INT16, INT32, INT64, FLOAT, DOUBLE | Number | 1234 |
STRING | String | ”Hello” |
BYTES | Base64-encoded String | ”SGVsbG8=” (base64-encoded version of the string “Hello”) |
DATE, TIME, TIMESTAMP | ISO 8601 String (yyy-MM-dd’T’HH:mm:ss.SSSXXX) | ‘2023-10-30T06:13:48.123+08:00’ |
LocalDate | ISO 8601 String (yyyy-MM-dd) | ‘2023-10-17’ |
LocalTime | ISO 8601 String (HH:mm:ss.SSSSSSSSS) | ‘04:30:33.123456789’ |
LocalDateTime | ISO 8601 String (yyyy-MM-dd’T’HH:mm:ss.SSSSSSSSS) | ‘2023-10-17T04:30:33.123456789’ |
Instant | ISO 8601 String (yyyy-MM-dd’T’HH:mm:ss.SSSSSSSSSXXX) | ‘2023-10-30T06:13:48.123456789+08:00’ |
Struct Schema (Avro Schema and JSON Schema)
For the struct schema types JSON
and AVRO
, the value is converted into a JSON object. The conversion rules outlined
in the Primitive schema section
are applied to all primitive type fields within this value object. Nested objects are
also supported.
Here is an example:
Here are the rules for handling the logical type of the Avro based struct schema (AVRO
and JSON
):
Logical Type | JSON Conversion Type | Example |
---|---|---|
time-millis , time-micros | ISO 8601 String (HH:mm:ss.SSS) | ‘13:48:41.123’ |
timestamp-millis | ISO 8601 String (yyy-MM-dd’T’HH:mm:ss.SSSXXX) | ‘2023-10-30T06:13:48.123+08:00’ |
timestamp-micros | ISO 8601 String (yyy-MM-dd’T’HH:mm:ss.SSSSSSXXX) | ‘2023-10-30T06:13:48.123456+08:00’ |
local-timestamp-millis | ISO 8601 String (yyyy-MM-dd’T’HH:mm:ss.SSS) | ‘2023-10-29T22:13:48.123’ |
local-timestamp-micros | ISO 8601 String (yyyy-MM-dd’T’HH:mm:ss.SSSSSS) | ‘2023-10-29T22:13:48.123456’ |
Metadata Support
SQS sink connector will put metadata of Pulsar into SQS message attributes
. SQS message attributes accommodate various data types such as String, Number, Binary, and so forth.
The supported metadata fields of Pulsar are:
topic
: Thestring
type of source topic namekey
: Thestring
type of the message key.partitionIndex
: Thenumber
type of the topic partition index of the topic.sequence
: Thenumber
type of the sequence ID.properties
: This is a map, and will unfold this map, placing each key-value pair into the SQSmessage attribute
. The type of the key isstring
, and the type of the value isstring
.eventTime
: The event time of the message in the ISO 8601 formatmessageId
: The string representation of a message ID. eg,"1:1:-1:-1"
You can get metadata form message attributes
, for examples:
Users can choose the metadata fields through the metaDataField
configuration. It is a string
in which multiple fields are separated by commas. And this connector will verify that the number of metadata cannot exceed 10.
For examples: