The AWS Simple Queue Service (SQS) source connector feeds data from Amazon AWS SQS and writes data to Pulsar topics.
Quick start
Prerequisites
The prerequisites for connecting an AWS SQS source connector to external systems include:
- Create SQS in AWS.
- Create the AWS User and create
AccessKey
(Please recordAccessKey
andSecretAccessKey
). - Assign the following permissions to the AWS User:
- sqs:CreateQueue
- sqs:DeleteMessage
- sqs:ChangeMessageVisibility
- sqs:GetQueueUrl
- sqs:GetQueueAttributes
- sqs:ReceiveMessage
1. Create a connector
Depending on the environment, there are several ways to create an AWS SQS source connector:
- Create Connector on StreamNative Cloud.
- Create Connector with Function worker. Using this way requires you to download a NAR package to create a built-in or non-built-in connector. You can download the version you need by clicking the Download icon on the upper-right corner of this page.
- Create Connector with Function mesh. Using this way requires you to set the docker image. You can choose the version you want to launch from Docker Hub.
No matter how you create an AWS SQS source connector, the minimum configuration contains the following parameters.
configs:
awsRegion: "Your AWS SQS region"
queueName: "Your AWS SQS name"
awsCredentialPluginParam: "{\"accessKey\":\"Your access key\",\"secretKey\":\"Your secret key\"}"
The configuration structure varies depending on how you create the AWS SQS source connector. For example, some are JSON, some are YAML, and some are Kubernetes YAML. You need to adapt the configs to the corresponding format.
If you want to configure more parameters, see Configuration Properties for reference.
2. Send messages to AWS SQS
You can use the following simple code to send messages to AWS SQS.
public static void main(String[] args) {
AmazonSQS client = AmazonSQSClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(
new BasicAWSCredentials("Your access key", "Your secret key")))
.withRegion("Your AWS SQS region").build();
String queueUrl = client.getQueueUrl(new GetQueueUrlRequest("Your SQS name")).getQueueUrl();
client.sendMessage(queueUrl, "Hello World!");
client.shutdown();
}
3. Show data using Pulsar client
Note
If your connector is created on StreamNative Cloud, you need to authenticate your clients. See Build applications using Pulsar clients for more information.
bin/pulsar-client \
--url "Your Pulsar serviceUrl" \
consume "The topic that you specified when you created the connector" -s "test-sub" -n 10 -p Earliest
----- got message -----
key:[null], properties:[], content:Hello World!
Configuration Properties
Before using the AWS SQS source connector, you need to configure it. This table outlines the properties and the Descriptions of an AWS SQS source connector.
Name | Type | Required | Default | Description |
---|---|---|---|---|
awsRegion | String | true | " " (empty string) | Supported AWS region. For example, us-west-1, us-west-2. |
queueName | String | true | " " (empty string) | The name of the SQS queue that messages should be read from or written to. |
awsCredentialPluginName | String | false | " " (empty string) | The fully-qualified class name of implementation of AwsCredentialProviderPlugin. For more information, see [Configure AwsCredentialProviderPlugin](###Configure AwsCredentialProviderPlugin). |
awsCredentialPluginParam | String | false | " " (empty string) | The JSON parameter to initialize awsCredentialsProviderPlugin . For more information, see [Configure AwsCredentialProviderPlugin](###Configure AwsCredentialProviderPlugin). |
awsEndpoint | String | false | " " (empty string) | AWS SQS end-point URL. You can find it at AWS SQS Service endpoints. |
batchSizeOfOnceReceive | int | false | 1 | The maximum number of messages that are pulled from SQS at one time. By default, it is set to 1. The value ranges from 1 to 10. |
numberOfConsumers | int | false | 1 | The expected numbers of consumers. You can scale consumers horizontally to achieve high throughput. By default, it is set to 1. The value ranges from 1 to 50. |
Note
The batchSizeOfOnceReceive
and numberOfConsumers
options are available for SQS source 2.8.4.3+, 2.9.4.1+, and 2.10.1.13+. For details about how to test AWS SQS source performance, see Performance Test on AWS SQS Source Connector.
Configure AwsCredentialProviderPlugin
AWS SQS source connector allows you to use three ways to connect to AWS SQS by configuring awsCredentialPluginName
.
Leave
awsCredentialPluginName
empty to get the connector authenticated by passingaccessKey
andsecretKey
inawsCredentialPluginParam
.{"accessKey":"Your access key","secretKey":"Your secret key"}
Set
awsCredentialPluginName
toorg.apache.pulsar.io.aws.AwsDefaultProviderChainPlugin
to use the default AWS provider chain. With this option, you don't need to configureawsCredentialPluginParam
. For more information, see AWS documentation.Set
awsCredentialPluginName
toorg.apache.pulsar.io.aws.STSAssumeRoleProviderPlugin
to use the default AWS provider chain, and you need to configureroleArn
androleSessionNmae
inawsCredentialPluginParam
. For more information, see AWS documentation.{"roleArn": "arn...", "roleSessionName": "name"}