source
AWS SQS Source Connector

Available on
StreamNative Cloud console

Authored by
freeznet,shibd,Anonymitaet,nlu90
Support type
streamnative
License
StreamNative, Inc.. All Rights Reserved

The AWS Simple Queue Service (SQS) source connector feeds data from Amazon AWS SQS and writes data to Pulsar topics.

Quick start

Prerequisites

The prerequisites for connecting an AWS SQS source connector to external systems include:

  1. Create SQS in AWS.
  2. Create the AWS User and create AccessKey(Please record AccessKey and SecretAccessKey).
  3. Assign the following permissions to the AWS User:
  • sqs:CreateQueue
  • sqs:DeleteMessage
  • sqs:ChangeMessageVisibility
  • sqs:GetQueueUrl
  • sqs:GetQueueAttributes
  • sqs:ReceiveMessage

1. Create a connector

The following command shows how to use pulsarctl to create a builtin connector. If you want to create a non-builtin connector, you need to replace --source-type sqs with --archive /path/to/pulsar-io-sqs.nar. You can find the button to download the nar package at the beginning of the document.

For StreamNative Cloud User

If you are a StreamNative Cloud user, you need set up your environment first.

pulsarctl sources create \
  --source-type sqs \
  --name sqs-source \
  --tenant public \
  --namespace default \
  --destination-topic-name "Your topic name" \
  --parallelism 1 \
  --source-config \
  '{
    "awsRegion": "Your aws sqs region", 
    "queueName": "Your AWS SQS name",
    "awsCredentialPluginParam": "{\"accessKey\":\"Your AWS access key\",\"secretKey\":\"Your AWS secret access key\"}"
  }'

The --source-config is the minimum necessary configuration for starting this connector, and it is a JSON string. You need to substitute the relevant parameters with your own. If you want to configure more parameters, see Configuration Properties for reference.

Note

You can also choose to use a variety of other tools to create a connector:

2. Send messages to AWS SQS

You can use the following simple code to send messages to AWS SQS.

    public static void main(String[] args) {
        AmazonSQS client = AmazonSQSClientBuilder.standard()
                .withCredentials(new AWSStaticCredentialsProvider(
                        new BasicAWSCredentials("Your access key", "Your secret key")))
                .withRegion("Your AWS SQS region").build();
        String queueUrl = client.getQueueUrl(new GetQueueUrlRequest("Your SQS name")).getQueueUrl();
        client.sendMessage(queueUrl, "Hello World!");
        client.shutdown();
    }

3. Show data using Pulsar client

Note

If your connector is created on StreamNative Cloud, you need to authenticate your clients. See Build applications using Pulsar clients for more information.

bin/pulsar-client \
--url "Your Pulsar serviceUrl" \
consume "The topic that you specified when you created the connector" -s "test-sub" -n 10 -p Earliest

----- got message -----
key:[null], properties:[], content:Hello World!

Configuration Properties

Before using the AWS SQS source connector, you need to configure it. This table outlines the properties and the Descriptions of an AWS SQS source connector.

NameTypeRequiredDefaultDescription
awsRegionStringtrue" " (empty string)Supported AWS region. For example, us-west-1, us-west-2.
queueNameStringtrue" " (empty string)The name of the SQS queue that messages should be read from or written to.
awsCredentialPluginNameStringfalse" " (empty string)The fully-qualified class name of implementation of AwsCredentialProviderPlugin. For more information, see [Configure AwsCredentialProviderPlugin](###Configure AwsCredentialProviderPlugin).
awsCredentialPluginParamStringfalse" " (empty string)The JSON parameter to initialize awsCredentialsProviderPlugin. For more information, see [Configure AwsCredentialProviderPlugin](###Configure AwsCredentialProviderPlugin).
awsEndpointStringfalse" " (empty string)AWS SQS end-point URL. You can find it at AWS SQS Service endpoints.
batchSizeOfOnceReceiveintfalse1The maximum number of messages that are pulled from SQS at one time. By default, it is set to 1. The value ranges from 1 to 10.
numberOfConsumersintfalse1The expected numbers of consumers. You can scale consumers horizontally to achieve high throughput. By default, it is set to 1. The value ranges from 1 to 50.

Note

The batchSizeOfOnceReceive and numberOfConsumers options are available for SQS source 2.8.4.3+, 2.9.4.1+, and 2.10.1.13+. For details about how to test AWS SQS source performance, see Performance Test on AWS SQS Source Connector.

Configure AwsCredentialProviderPlugin

AWS SQS source connector allows you to use three ways to connect to AWS SQS by configuring awsCredentialPluginName.

  • Leave awsCredentialPluginName empty to get the connector authenticated by passing accessKey and secretKey in awsCredentialPluginParam.

    {"accessKey":"Your access key","secretKey":"Your secret key"}
    
  • Set awsCredentialPluginName to org.apache.pulsar.io.aws.AwsDefaultProviderChainPlugin to use the default AWS provider chain. With this option, you don't need to configure awsCredentialPluginParam. For more information, see AWS documentation.

  • Set awsCredentialPluginName to org.apache.pulsar.io.aws.STSAssumeRoleProviderPlugin to use the default AWS provider chain, and you need to configure roleArn and roleSessionNmae in awsCredentialPluginParam. For more information, see AWS documentation.

    {"roleArn": "arn...", "roleSessionName": "name"}