The AWS Kinesis sink connector pulls data from Pulsar and persists data into Amazon Kinesis. For more information about connectors, see Connector Overview.
This document introduces how to get started with creating an AWS Kinesis sink connector and get it up and running.
Quick start
Prerequisites
The prerequisites for connecting an AWS Kinesis sink connector to external systems include:
- Create a Kinesis data stream in AWS.
- Create an AWS User and an
AccessKey
(Please record the value ofAccessKey
and itsSecretKey
). - Assign the following permissions to the AWS User:
- AmazonKinesisFullAccess
- CloudWatch:PutMetricData: it is required because AWS Kinesis producer will periodically send metrics to CloudWatch.
1. Create a connector
Depending on the environment, there are several ways to create an AWS Kinesis sink connector:
- Create a Connector on StreamNative Cloud.
- Create a Connector with Function worker. Using this way requires you to download a NAR package to create a connector. You can download the version you need from the
download button
at the beginning of the article. - Create a Connector with Function mesh. Using this way requires you to set the docker image. You can choose the version you want to launch from here.
No matter how you create an AWS Kinesis sink connector, the minimum configuration contains the following parameters.
configs:
awsRegion: "Your aws kinesis region"
awsKinesisStreamName: "Your kinesis stream name"
awsCredentialPluginParam: "{\"accessKey\":\"Your access key\",\"secretKey\":\"Your secret key\"}"
The configuration structure varies depending on how you create the AWS Kinesis sink connector. For example, some are JSON, some are YAML, and some are Kubernetes YAML. You need to adapt the configs to the corresponding format.
If you want to configure more parameters, see Configuration Properties for reference.
2. Send messages to the topic
Note
If your connector is created on StreamNative Cloud, you need to authenticate your clients. See Build applications using Pulsar clients for more information.
PulsarClient client = PulsarClient.builder()
.serviceUrl("{{Your Pulsar URL}}")
.build();
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("{{Your topic name}}")
.create();
String message = "test-message";
MessageId msgID = producer.send(message);
System.out.println("Publish " + message + " and message ID " + msgID);
producer.flush();
producer.close();
client.close();
3. Show data on AWS Kinesis console
You can use the AWS Kinesis Data Viewer
to view the data.
Configuration Properties
This table outlines the properties of an AWS Kinesis sink connector.
Name | Type | Required | Default | Description |
---|---|---|---|---|
awsKinesisStreamName | String | true | " " (empty string) | The Kinesis stream name. |
awsRegion | String | true | " " (empty string) | The AWS Kinesis region. <br/><br/>Example:<br/> us-west-1, us-west-2. |
awsCredentialPluginName | String | false | " " (empty string) | The fully-qualified class name of implementation of AwsCredentialProviderPlugin. Please refer to [Configure AwsCredentialProviderPlugin](###Configure AwsCredentialProviderPlugin) |
awsCredentialPluginParam | String | false | " " (empty string) | The JSON parameter to initialize awsCredentialsProviderPlugin . Please refer to [Configure AwsCredentialProviderPlugin](###Configure AwsCredentialProviderPlugin) |
awsEndpoint | String | false | " " (empty string) | A custom Kinesis endpoint. For more information, see AWS documentation. |
retainOrdering | Boolean | false | false | Whether Pulsar connectors retain the ordering when moving messages from Pulsar to Kinesis. |
messageFormat | MessageFormat | false | ONLY_RAW_PAYLOAD | Message format in which Kinesis sink converts Pulsar messages and publishes them to Kinesis streams.<br/><br/>Available options include:<br/><br/><li>ONLY_RAW_PAYLOAD : Kinesis sink directly publishes Pulsar message payload as a message into the configured Kinesis stream. <br/><br/><li>FULL_MESSAGE_IN_JSON : Kinesis sink creates a JSON payload with Pulsar message payload, properties, and encryptionCtx, and publishes JSON payload into the configured Kinesis stream.<br/><br/><li>FULL_MESSAGE_IN_FB : Kinesis sink creates a flatbuffers serialized payload with Pulsar message payload, properties, and encryptionCtx, and publishes flatbuffers payload into the configured Kinesis stream. <br/><br/><li>FULL_MESSAGE_IN_JSON_EXPAND_VALUE : Kinesis sink sends a JSON structure containing the record topic name, key, payload, properties, and event time. The record schema is used to convert the value to JSON. |
jsonIncludeNonNulls | Boolean | false | true | Only the properties with non-null values are included when the message format is FULL_MESSAGE_IN_JSON_EXPAND_VALUE . |
jsonFlatten | Boolean | false | false | When it is set to true and the message format is FULL_MESSAGE_IN_JSON_EXPAND_VALUE , the output JSON is flattened. |
retryInitialDelayInMillis | Long | false | 100 | The initial delay (in milliseconds) between retries. |
retryMaxDelayInMillis | Long | false | 60000 | The maximum delay(in milliseconds) between retries. |
Configure AwsCredentialProviderPlugin
AWS Kinesis sink connector allows you to use three ways to connect to AWS Kinesis by configuring awsCredentialPluginName
.
- Leave
awsCredentialPluginName
empty to get the connector authenticated by passingaccessKey
andsecretKey
inawsCredentialPluginParam
.
{"accessKey":"Your access key","secretKey":"Your secret key"}
Set
awsCredentialPluginName
toorg.apache.pulsar.io.aws.AwsDefaultProviderChainPlugin
to use the default AWS provider chain. With this option, you don’t need to configureawsCredentialPluginParam
. For more information, see AWS documentation.Set
awsCredentialPluginName
toorg.apache.pulsar.io.aws.STSAssumeRoleProviderPlugin
to use the default AWS provider chain, and you need to configureroleArn
androleSessionNmae
inawsCredentialPluginParam
. For more information, see AWS documentation{"roleArn": "arn...", "roleSessionName": "name"}