sink
Kinesis Sink
The Kinesis sink connector pulls data from Pulsar and persists data into Amazon Kinesis.

Available on
StreamNative Cloud console

Authored by
ASF
Support type
StreamNative
License
Apache License 2.0

The AWS Kinesis sink connector pulls data from Pulsar and persists data into Amazon Kinesis. For more information about connectors, see Connector Overview.

This document introduces how to get started with creating an AWS Kinesis sink connector and get it up and running.

Quick start

Prerequisites

The prerequisites for connecting an AWS Kinesis sink connector to external systems include:

  1. Create a Kinesis data stream in AWS.
  2. Create an AWS User and an AccessKey(Please record the value of AccessKey and its SecretKey).
  3. Assign the following permissions to the AWS User:

1. Create a connector

The following command shows how to use pulsarctl to create a builtin connector. If you want to create a non-builtin connector, you need to replace --sink-type kinesis with --archive /path/to/pulsar-io-kinesis.nar. You can find the button to download the nar package at the beginning of the document.

For StreamNative Cloud User

If you are a StreamNative Cloud user, you need set up your environment first.

pulsarctl sinks create \
  --sink-type kinesis \
  --name kinesis-sink \
  --tenant public \
  --namespace default \
  --inputs "Your topic name" \
  --parallelism 1 \
  --sink-config \
  '{
    "awsRegion": "Your aws kinesis region", 
    "awsKinesisStreamName": "Your kinesis stream name",
    "awsCredentialPluginParam": "{\"accessKey\":\"Your AWS access key\",\"secretKey\":\"Your AWS secret access key\"}"
  }'

The --sink-config is the minimum necessary configuration for starting this connector, and it is a JSON string. You need to substitute the relevant parameters with your own. If you want to configure more parameters, see Configuration Properties for reference.

Note

You can also choose to use a variety of other tools to create a connector:

2. Send messages to the topic

Note

If your connector is created on StreamNative Cloud, you need to authenticate your clients. See Build applications using Pulsar clients for more information.

   PulsarClient client = PulsarClient.builder()
            .serviceUrl("{{Your Pulsar URL}}")
            .build();

   Producer<String> producer = client.newProducer(Schema.STRING)
     .topic("{{Your topic name}}")
     .create();

   String message = "test-message";
   MessageId msgID = producer.send(message);
   System.out.println("Publish " + message + " and message ID " + msgID);

   producer.flush();
   producer.close();
   client.close();

3. Show data on AWS Kinesis console

You can use the AWS Kinesis Data Viewer to view the data.

Configuration Properties

This table outlines the properties of an AWS Kinesis sink connector.

NameTypeRequiredDefaultDescription
awsKinesisStreamNameStringtrue" " (empty string)The Kinesis stream name.
awsRegionStringtrue" " (empty string)The AWS Kinesis region. <br/><br/>Example:<br/> us-west-1, us-west-2.
awsCredentialPluginNameStringfalse" " (empty string)The fully-qualified class name of implementation of AwsCredentialProviderPlugin. Please refer to [Configure AwsCredentialProviderPlugin](###Configure AwsCredentialProviderPlugin)
awsCredentialPluginParamStringfalse" " (empty string)The JSON parameter to initialize awsCredentialsProviderPlugin. Please refer to [Configure AwsCredentialProviderPlugin](###Configure AwsCredentialProviderPlugin)
awsEndpointStringfalse" " (empty string)A custom Kinesis endpoint. For more information, see AWS documentation.
retainOrderingBooleanfalsefalseWhether Pulsar connectors retain the ordering when moving messages from Pulsar to Kinesis.
messageFormatMessageFormatfalseONLY_RAW_PAYLOADMessage format in which Kinesis sink converts Pulsar messages and publishes them to Kinesis streams.<br/><br/>Available options include:<br/><br/><li>ONLY_RAW_PAYLOAD: Kinesis sink directly publishes Pulsar message payload as a message into the configured Kinesis stream. <br/><br/><li>FULL_MESSAGE_IN_JSON: Kinesis sink creates a JSON payload with Pulsar message payload, properties, and encryptionCtx, and publishes JSON payload into the configured Kinesis stream.<br/><br/><li>FULL_MESSAGE_IN_FB: Kinesis sink creates a flatbuffers serialized payload with Pulsar message payload, properties, and encryptionCtx, and publishes flatbuffers payload into the configured Kinesis stream. <br/><br/><li>FULL_MESSAGE_IN_JSON_EXPAND_VALUE: Kinesis sink sends a JSON structure containing the record topic name, key, payload, properties, and event time. The record schema is used to convert the value to JSON.
jsonIncludeNonNullsBooleanfalsetrueOnly the properties with non-null values are included when the message format is FULL_MESSAGE_IN_JSON_EXPAND_VALUE.
jsonFlattenBooleanfalsefalseWhen it is set to true and the message format is FULL_MESSAGE_IN_JSON_EXPAND_VALUE, the output JSON is flattened.
retryInitialDelayInMillisLongfalse100The initial delay (in milliseconds) between retries.
retryMaxDelayInMillisLongfalse60000The maximum delay(in milliseconds) between retries.

Configure AwsCredentialProviderPlugin

AWS Kinesis sink connector allows you to use three ways to connect to AWS Kinesis by configuring awsCredentialPluginName.

  • Leave awsCredentialPluginName empty to get the connector authenticated by passing accessKey and secretKey in awsCredentialPluginParam.
{"accessKey":"Your access key","secretKey":"Your secret key"}
  • Set awsCredentialPluginName to org.apache.pulsar.io.aws.AwsDefaultProviderChainPlugin to use the default AWS provider chain. With this option, you don’t need to configure awsCredentialPluginParam. For more information, see AWS documentation.

  • Set awsCredentialPluginNameto org.apache.pulsar.io.aws.STSAssumeRoleProviderPlugin to use the default AWS provider chain, and you need to configure roleArn and roleSessionNmae in awsCredentialPluginParam. For more information, see AWS documentation

    {"roleArn": "arn...", "roleSessionName": "name"}