source
Kinesis Source
The Kinesis source connector pulls data from Amazon Kinesis and persists data into Pulsar

Available on
StreamNative Cloud console

Authored by
ASF
Support type
StreamNative
License
Apache License 2.0

The Kinesis source connector pulls data from Amazon Kinesis and persists data into Pulsar. For more information about connectors, see Connector Overview.

This connector uses the Kinesis Consumer Library (KCL) to consume messages. The KCL uses DynamoDB to track checkpoints for consumers, and uses CloudWatch to track metrics for consumers.

This document introduces how to get started with creating an AWS Kinesis source connector and get it up and running.

Note

Currently, the Kinesis source connector only supports raw messages. If you use AWS Key Management Service (KMS) encrypted messages, the encrypted messages are sent to Pulsar directly. You need to manually decrypt the data on the consumer side of Pulsar.

Quick start

Prerequisites

The prerequisites for connecting an AWS Kinesis source connector to external systems include:

  1. Create a Kinesis data stream in AWS.
  2. Create an AWS User and an AccessKey(Please record the value of AccessKey and its SecretKey).
  3. Assign the following permissions to the AWS User:

1. Create a connector

Depending on the environment, there are several ways to create an AWS Kinesis source connector:

No matter how you create an AWS Kinesis source connector, the minimum configuration contains the following parameters.

configs:
  awsRegion: "Your aws kinesis region"
  awsKinesisStreamName: "Your kinesis stream name"
  awsCredentialPluginParam: "{\"accessKey\":\"Your access key\",\"secretKey\":\"Your secret key\"}"
  applicationName: "Your application name, which will be used as the table name for DynamoDB. E.g.: pulsar-io-kinesis"
  • The configuration structure varies depending on how you create the AWS Kinesis source connector. For example, some are JSON, some are YAML, and some are Kubernetes YAML. You need to adapt the configs to the corresponding format.

  • If you want to configure more parameters, see Configuration Properties for reference.

2. Send messages to Kinesis

Note

The following example uses KPL to send data to Kinesis. For more details, see Writing to your Kinesis Data Stream Using the KPL

public static void main(String[] args) throws Exception {

  AWSCredentialsProvider credentialsProvider =
  new AWSStaticCredentialsProvider(new BasicAWSCredentials("Your access key", "Your secret key"));

  KinesisProducerConfiguration kinesisConfig = new KinesisProducerConfiguration();
  kinesisConfig.setRegion("Your aws kinesis region");
  kinesisConfig.setCredentialsProvider(credentialsProvider);
  KinesisProducer kinesis = new KinesisProducer(kinesisConfig);
  // Put some records 
  for (int i = 0; i < 10; ++i) {
    ByteBuffer data = ByteBuffer.wrap("test-kinesis-data".getBytes("UTF-8"));
    // doesn't block       
    kinesis.addUserRecord("Your kinesis stream name", "myPartitionKey", data);
  }
  kinesis.flush();
  Thread.sleep(60000);
}

3. Show data using Pulsar client

Note

If your connector is created on StreamNative Cloud, you need to authenticate your clients. See Build applications using Pulsar clients for more information.

bin/pulsar-client \
--url "Your Pulsar serviceUrl" \
consume "The topic that you specified when you created the connector" -s "test-sub" -n 10 -p Earliest

----- got message -----
key:[myPartitionKey], properties:[=49643665543143983613442895450427674751028642409795813410], content:test-kinesis-data
----- got message -----
key:[myPartitionKey], properties:[=49643665543143983613442895450430092602667871668145225762], content:test-kinesis-data
----- got message -----
key:[myPartitionKey], properties:[=49643665543143983613442895450431301528487486297319931938], content:test-kinesis-data
----- got message -----
key:[myPartitionKey], properties:[=49643665543143983613442895450432510454307100926494638114], content:test-kinesis-data
----- got message -----
key:[myPartitionKey], properties:[=49643665543143983613442895450433719380126715555669344290], content:test-kinesis-data
----- got message -----
key:[myPartitionKey], properties:[=49643665543143983613442895450434928305946330184844050466], content:test-kinesis-data
----- got message -----
key:[myPartitionKey], properties:[=49643665543143983613442895450436137231765944814018756642], content:test-kinesis-data
----- got message -----
key:[myPartitionKey], properties:[=49643665543143983613442895450437346157585559443193462818], content:test-kinesis-data
----- got message -----
key:[myPartitionKey], properties:[=49643665543143983613442895450438555083405174072368168994], content:test-kinesis-data
----- got message -----
key:[myPartitionKey], properties:[=49643665543143983613442895450439764009224788701542875170], content:test-kinesis-data

Configuration Properties

This table outlines the properties of an AWS Kinesis source connector.

NameTypeRequiredDefaultDescription
awsKinesisStreamNameStringtrue" " (empty string)The Kinesis stream name.
awsRegionStringfalse" " (empty string)The AWS region. <br/><br/>Example<br/> us-west-1, us-west-2.
awsCredentialPluginNameStringfalse" " (empty string)The fully-qualified class name of implementation of AwsCredentialProviderPlugin. For more information, see [Configure AwsCredentialProviderPlugin](###Configure AwsCredentialProviderPlugin).
awsCredentialPluginParamStringfalse" " (empty string)The JSON parameter to initialize awsCredentialsProviderPlugin. For more information, see [Configure AwsCredentialProviderPlugin](###Configure AwsCredentialProviderPlugin).
awsEndpointStringfalse" " (empty string)The Kinesis end-point URL, which can be found at here.
dynamoEndpointStringfalse" " (empty string)The Dynamo end-point URL, which can be found at here.
cloudwatchEndpointStringfalse" " (empty string)The Cloudwatch end-point URL. For more information, seeAmazon documentation.
applicationNameStringfalsePulsar IO connectorThe name of the Amazon Kinesis application, which will be used as the table name for DynamoDB.
initialPositionInStreamInitialPositionInStreamfalseLATESTThe position where the connector starts from.<br/><br/>Below are the available options:<br/><br/><li>AT_TIMESTAMP: start from the record at or after the specified timestamp.<br/><br/><li>LATEST: start after the most recent data record.<br/><br/><li>TRIM_HORIZON: start from the oldest available data record.
startAtTimeDatefalse" " (empty string)If set to AT_TIMESTAMP, it specifies the time point to start consumption.
checkpointIntervalLongfalse60000The frequency of the Kinesis stream checkpoint in milliseconds.
backoffTimeLongfalse3000The amount of time to delay between requests when the connector encounters a throttling exception from AWS Kinesis in milliseconds.
numRetriesintfalse3The number of re-attempts when the connector encounters an exception while trying to set a checkpoint.
receiveQueueSizeintfalse1000The maximum number of AWS records that can be buffered inside the connector. <br/><br/>Once the receiveQueueSize is reached, the connector does not consume any messages from Kinesis until some messages in the queue are successfully consumed.
useEnhancedFanOutbooleanfalsetrueIf set to true, it uses Kinesis enhanced fan-out.<br><br>If set to false, it uses polling.

Configure AwsCredentialProviderPlugin

AWS Kinesis source connector allows you to use three ways to connect to AWS Kinesis by configuring awsCredentialPluginName.

  • Leave awsCredentialPluginName empty to get the connector authenticated by passing accessKey and secretKey in awsCredentialPluginParam.

    {"accessKey":"Your access key","secretKey":"Your secret key"}
    
  • Set awsCredentialPluginName to org.apache.pulsar.io.aws.AwsDefaultProviderChainPlugin to use the default AWS provider chain. With this option, you don’t need to configure awsCredentialPluginParam. For more information, see AWS documentation.

  • Set awsCredentialPluginNameto org.apache.pulsar.io.aws.STSAssumeRoleProviderPlugin to use the default AWS provider chain, and you need to configure roleArn and roleSessionNmae in awsCredentialPluginParam. For more information, see AWS documentation

    {"roleArn": "arn...", "roleSessionName": "name"}