sink
AWS SQS Sink Connector

Available on
StreamNative Cloud console

Authored by
freeznet,Anonymitaet,nlu90,shibd
Support type
streamnative
License
Apache License 2.0

The AWS Simple Queue Service (SQS) sink connector pulls data from Pulsar topics and persists data to AWS SQS.

Quick start

Prerequisites

The prerequisites for connecting an AWS SQS sink connector to external systems include:

  1. Create SQS in AWS.
  2. Create the AWS User and create AccessKey(Please record AccessKey and SecretAccessKey).
  3. Assign the following permissions to the AWS User:
  • sqs:CreateQueue
  • sqs:SendMessage

1. Create a connector

Depending on the environment, there are several ways to create an AWS SQS sink connector:

No matter how you create an AWS SQS sink connector, the minimum configuration contains the following parameters.

configs:
  awsRegion: "Your AWS SQS region"
  queueName: "Your AWS SQS name"
  awsCredentialPluginParam: "{\"accessKey\":\"Your access key\",\"secretKey\":\"Your secret key\"}"
  • The configuration structure varies depending on how you create the AWS SQS sink connector. For example, some are JSON, some are YAML, and some are Kubernetes YAML. You need to adapt the configs to the corresponding format.

  • If you want to configure more parameters, see Configuration Properties for reference.

2. Send messages to the topic

Note

If your connector is created on StreamNative Cloud, you need to authenticate your clients. See Build applications using Pulsar clients for more information.

        PulsarClient client = PulsarClient.builder()
            .serviceUrl("{{Your Pulsar URL}}")
            .build();

        Producer<String> producer = client.newProducer(Schema.STRING)
            .topic("{{Your topic name}}")
            .create();

        String message = "test-message";
        MessageId msgID = producer.send(message);
        System.out.println("Publish " + message + " and message ID " + msgID);
        
        producer.flush();
        producer.close();
        client.close();

3. Show data on AWS SQS

You can use the following simple code to receive messages from AWS SQS.

    public static void main(String[] args) {

        AmazonSQS client = AmazonSQSClientBuilder.standard()
                .withCredentials(new AWSStaticCredentialsProvider(
                        new BasicAWSCredentials("Your access key", "Your secret key")))
                .withRegion("Your AWS SQS region").build();

        String queueUrl = client.getQueueUrl(new GetQueueUrlRequest("Your SQS name")).getQueueUrl();
        ReceiveMessageResult receiveMessageResult = client.receiveMessage(queueUrl);
        for (Message message : receiveMessageResult.getMessages()) {
            System.out.println("Receive msg: " + message.getBody());
        }
        client.shutdown();
    }
    
    // Output
    // Receive msg: test-message

Configuration Properties

Before using the AWS SQS sink connector, you need to configure it. This table outlines the properties and the Descriptions of an AWS SQS sink connector.

NameTypeRequiredDefaultDescription
awsRegionStringtrue" " (empty string)Supported AWS region. For example, us-west-1, us-west-2.
queueNameStringtrue" " (empty string)The name of the SQS queue that messages should be read from or written to.
awsCredentialPluginNameStringfalse" " (empty string)The fully-qualified class name of implementation of AwsCredentialProviderPlugin. For more information, see [Configure AwsCredentialProviderPlugin](###Configure AwsCredentialProviderPlugin).
awsCredentialPluginParamStringfalse" " (empty string)The JSON parameter to initialize awsCredentialsProviderPlugin. For more information, see [Configure AwsCredentialProviderPlugin](###Configure AwsCredentialProviderPlugin).
awsEndpointStringfalse" " (empty string)AWS SQS end-point URL. You can find it at AWS SQS Service endpoints.

Configure AwsCredentialProviderPlugin

AWS SQS sink connector allows you to use three ways to connect to AWS SQS by configuring awsCredentialPluginName.

  • Leave awsCredentialPluginName empty to get the connector authenticated by passing accessKey and secretKey in awsCredentialPluginParam.

    {"accessKey":"Your access key","secretKey":"Your secret key"}
    
  • Set awsCredentialPluginName to org.apache.pulsar.io.aws.AwsDefaultProviderChainPlugin to use the default AWS provider chain. With this option, you don't need to configure awsCredentialPluginParam. For more information, see AWS documentation.

  • Set awsCredentialPluginName to org.apache.pulsar.io.aws.STSAssumeRoleProviderPlugin to use the default AWS provider chain, and you need to configure roleArn and roleSessionNmae in awsCredentialPluginParam. For more information, see AWS documentation.

    {"roleArn": "arn...", "roleSessionName": "name"}