Skip to main content
The DynamoDB source connector pulls data from DynamoDB table streams and persists data into Pulsar. This connector uses the DynamoDB Streams Kinesis Adapter, which uses the Kinesis Consumer Library (KCL) to do the actual consuming of messages. The KCL uses DynamoDB to track the state of consumers and requires cloudwatch access to log metrics.

Quick start

Prerequisites

The prerequisites for connecting an AWS DynamoDB source connector to external systems include:
  1. Enable stream to the DynamoDB in AWS, it will show a Latest stream ARN after the stream is turned on.
  2. Create an AWS User and an AccessKey(Please record the value of AccessKey and its SecretKey).
  3. Assign the following permissions to the AWS User:

1. Create a connector

The following command shows how to use pulsarctl to create a builtin connector. If you want to create a non-builtin connector, you need to replace --source-type dynamodb with --archive /path/to/pulsar-io-dynamodb.nar. You can find the button to download the nar package at the beginning of the document.
If you are a StreamNative Cloud user, you need set up your environment first.
pulsarctl sources create \
  --source-type dynamodb \
  --name dynamodb-source \
  --tenant public \
  --namespace default \
  --destination-topic-name "Your topic name" \
  --parallelism 1 \
  --source-config \
  '{
    "awsRegion": "Your aws dynamodb region", 
    "awsDynamodbStreamArn": "the Latest stream ARN of the DynamoDB table",
    "awsCredentialPluginParam": "{\"accessKey\":\"Your AWS access key\",\"secretKey\":\"Your AWS secret access key\"}",
    "applicationName": "Your application name, which will be used as the table name for DynamoDB. E.g.: pulsar-io-dynamodb"
  }'
The --source-config is the minimum necessary configuration for starting this connector, and it is a JSON string. You need to substitute the relevant parameters with your own. If you want to configure more parameters, see Configuration Properties for reference.
You can also choose to use a variety of other tools to create a connector:

2. Insert row to DynamoDB table

    public static void main(String[] args) {
        // 1. Create a DynamoDB client
        DynamoDbClient ddb = DynamoDbClient.builder()
                .region(Region.EU_NORTH_1)  // change to your region
                .credentialsProvider(ProfileCredentialsProvider.create())
                .build();

        for (int i = 0; i < 10; ++i) {
            // 2. Define the item to insert
            Map<String, AttributeValue> item = new HashMap<>();
            item.put("streamnative", AttributeValue.fromS("user" + i));   // partition key
            item.put("name", AttributeValue.fromS("Alice"));
            item.put("age", AttributeValue.fromN("30"));
            item.put("email", AttributeValue.fromS("[email protected]"));

            // 3. Create the PutItem request
            PutItemRequest request = PutItemRequest.builder()
                    .tableName("pulsar") // change to your table name
                    .item(item)
                    .build();

            // 4. Send the request
            try {
                ddb.putItem(request);
                System.out.println("Item inserted successfully.");
            } catch (DynamoDbException e) {
                System.err.println("Insert failed: " + e.getMessage());
            }
        }

        // 5. Close client
        ddb.close();
    }

3. Show data using Pulsar client

bin/pulsar-client \
--url "Your Pulsar serviceUrl" \
consume "The topic that you specified when you created the connector" -s "test-sub" -n 10 -p Earliest

----- got message -----
publishTime:[1752464541877], eventTime:[0], key:[390cc13abf07d7b9a233daa65231d6c8], properties:[SEQUENCE_NUMBER=18031900001003631107895421, EVENT_NAME=INSERT], content:{"eventID":"390cc13abf07d7b9a233daa65231d6c8","eventName":"INSERT","eventVersion":"1.1","eventSource":"aws:dynamodb","awsRegion":"eu-north-1","dynamodb":{"ApproximateCreationDateTime":1752464541000,"Keys":{"streamnative":{"S":"user2"}},"NewImage":{"name":{"S":"Alice"},"streamnative":{"S":"user2"},"age":{"N":"30"},"email":{"S":"[email protected]"}},"SequenceNumber":"18031900001003631107895421","SizeBytes":67,"StreamViewType":"NEW_AND_OLD_IMAGES"}}
----- got message -----
publishTime:[1752464542809], eventTime:[0], key:[cf6f7706eb6e15f7ee7accdde36fd878], properties:[EVENT_NAME=INSERT, SEQUENCE_NUMBER=18035200003649242324186849], content:{"eventID":"cf6f7706eb6e15f7ee7accdde36fd878","eventName":"INSERT","eventVersion":"1.1","eventSource":"aws:dynamodb","awsRegion":"eu-north-1","dynamodb":{"ApproximateCreationDateTime":1752464542000,"Keys":{"streamnative":{"S":"user4"}},"NewImage":{"name":{"S":"Alice"},"streamnative":{"S":"user4"},"age":{"N":"30"},"email":{"S":"[email protected]"}},"SequenceNumber":"18035200003649242324186849","SizeBytes":67,"StreamViewType":"NEW_AND_OLD_IMAGES"}}
----- got message -----
publishTime:[1752464542809], eventTime:[0], key:[dd660a4071b9607681be4f56ed7b5b73], properties:[EVENT_NAME=INSERT, SEQUENCE_NUMBER=18032900003894885254372246], content:{"eventID":"dd660a4071b9607681be4f56ed7b5b73","eventName":"INSERT","eventVersion":"1.1","eventSource":"aws:dynamodb","awsRegion":"eu-north-1","dynamodb":{"ApproximateCreationDateTime":1752464541000,"Keys":{"streamnative":{"S":"user1"}},"NewImage":{"name":{"S":"Alice"},"streamnative":{"S":"user1"},"age":{"N":"30"},"email":{"S":"[email protected]"}},"SequenceNumber":"18032900003894885254372246","SizeBytes":67,"StreamViewType":"NEW_AND_OLD_IMAGES"}}
----- got message -----
publishTime:[1752464542809], eventTime:[0], key:[526bf489c407d0d7e65af22a9a3461a5], properties:[EVENT_NAME=INSERT, SEQUENCE_NUMBER=18032100001449977882361981], content:{"eventID":"526bf489c407d0d7e65af22a9a3461a5","eventName":"INSERT","eventVersion":"1.1","eventSource":"aws:dynamodb","awsRegion":"eu-north-1","dynamodb":{"ApproximateCreationDateTime":1752464542000,"Keys":{"streamnative":{"S":"user5"}},"NewImage":{"name":{"S":"Alice"},"streamnative":{"S":"user5"},"age":{"N":"30"},"email":{"S":"[email protected]"}},"SequenceNumber":"18032100001449977882361981","SizeBytes":67,"StreamViewType":"NEW_AND_OLD_IMAGES"}}
----- got message -----
publishTime:[1752464543205], eventTime:[0], key:[25a0239947e90a1aa7d1fae5cd70cd96], properties:[EVENT_NAME=INSERT, SEQUENCE_NUMBER=18037000000661915555888784], content:{"eventID":"25a0239947e90a1aa7d1fae5cd70cd96","eventName":"INSERT","eventVersion":"1.1","eventSource":"aws:dynamodb","awsRegion":"eu-north-1","dynamodb":{"ApproximateCreationDateTime":1752464541000,"Keys":{"streamnative":{"S":"user0"}},"NewImage":{"name":{"S":"Alice"},"streamnative":{"S":"user0"},"age":{"N":"30"},"email":{"S":"[email protected]"}},"SequenceNumber":"18037000000661915555888784","SizeBytes":67,"StreamViewType":"NEW_AND_OLD_IMAGES"}}
----- got message -----
publishTime:[1752464543205], eventTime:[0], key:[a7bf1cbf97f036c198e4f49f86749a3a], properties:[EVENT_NAME=INSERT, SEQUENCE_NUMBER=18037300002951028206567171], content:{"eventID":"a7bf1cbf97f036c198e4f49f86749a3a","eventName":"INSERT","eventVersion":"1.1","eventSource":"aws:dynamodb","awsRegion":"eu-north-1","dynamodb":{"ApproximateCreationDateTime":1752464542000,"Keys":{"streamnative":{"S":"user3"}},"NewImage":{"name":{"S":"Alice"},"streamnative":{"S":"user3"},"age":{"N":"30"},"email":{"S":"[email protected]"}},"SequenceNumber":"18037300002951028206567171","SizeBytes":67,"StreamViewType":"NEW_AND_OLD_IMAGES"}}
----- got message -----
publishTime:[1752464543205], eventTime:[0], key:[14e87c4eb4a5b352422ab5632f69d226], properties:[EVENT_NAME=INSERT, SEQUENCE_NUMBER=18037500000367820008016475], content:{"eventID":"14e87c4eb4a5b352422ab5632f69d226","eventName":"INSERT","eventVersion":"1.1","eventSource":"aws:dynamodb","awsRegion":"eu-north-1","dynamodb":{"ApproximateCreationDateTime":1752464543000,"Keys":{"streamnative":{"S":"user6"}},"NewImage":{"name":{"S":"Alice"},"streamnative":{"S":"user6"},"age":{"N":"30"},"email":{"S":"[email protected]"}},"SequenceNumber":"18037500000367820008016475","SizeBytes":67,"StreamViewType":"NEW_AND_OLD_IMAGES"}}
----- got message -----
publishTime:[1752464543795], eventTime:[0], key:[6267f254261b4952b4ad6682f88dc821], properties:[SEQUENCE_NUMBER=18035500001731177388073396, EVENT_NAME=INSERT], content:{"eventID":"6267f254261b4952b4ad6682f88dc821","eventName":"INSERT","eventVersion":"1.1","eventSource":"aws:dynamodb","awsRegion":"eu-north-1","dynamodb":{"ApproximateCreationDateTime":1752464543000,"Keys":{"streamnative":{"S":"user7"}},"NewImage":{"name":{"S":"Alice"},"streamnative":{"S":"user7"},"age":{"N":"30"},"email":{"S":"[email protected]"}},"SequenceNumber":"18035500001731177388073396","SizeBytes":67,"StreamViewType":"NEW_AND_OLD_IMAGES"}}
----- got message -----
publishTime:[1752464544799], eventTime:[0], key:[eea816f9d21323655124a59e0f1b2aab], properties:[SEQUENCE_NUMBER=18035600004198491318198578, EVENT_NAME=INSERT], content:{"eventID":"eea816f9d21323655124a59e0f1b2aab","eventName":"INSERT","eventVersion":"1.1","eventSource":"aws:dynamodb","awsRegion":"eu-north-1","dynamodb":{"ApproximateCreationDateTime":1752464544000,"Keys":{"streamnative":{"S":"user9"}},"NewImage":{"name":{"S":"Alice"},"streamnative":{"S":"user9"},"age":{"N":"30"},"email":{"S":"[email protected]"}},"SequenceNumber":"18035600004198491318198578","SizeBytes":67,"StreamViewType":"NEW_AND_OLD_IMAGES"}}
----- got message -----
publishTime:[1752464545795], eventTime:[0], key:[bdc4688da075ed491848e21009ab2db1], properties:[SEQUENCE_NUMBER=18033200003338420245628766, EVENT_NAME=INSERT], content:{"eventID":"bdc4688da075ed491848e21009ab2db1","eventName":"INSERT","eventVersion":"1.1","eventSource":"aws:dynamodb","awsRegion":"eu-north-1","dynamodb":{"ApproximateCreationDateTime":1752464543000,"Keys":{"streamnative":{"S":"user8"}},"NewImage":{"name":{"S":"Alice"},"streamnative":{"S":"user8"},"age":{"N":"30"},"email":{"S":"[email protected]"}},"SequenceNumber":"18033200003338420245628766","SizeBytes":67,"StreamViewType":"NEW_AND_OLD_IMAGES"}}

Configuration Properties

This table outlines the properties of an AWS DynamoDB source connector.
NameRequiredSensitiveDefaultDescription
awsEndpointfalsefalseNoneDynamodb streams end-point url.
awsRegiontruefalseAppropriate aws region. E.g. us-west-1, us-west-2.
awsDynamodbStreamArntruefalsethe Dynamodb stream arn.
awsCredentialPluginNamefalsefalseNoneFully-Qualified class name of implementation of AwsCredentialProviderPlugin..
awsCredentialPluginParamfalsetrue” “The JSON parameter to initialize AwsCredentialsProviderPlugin.
initialPositionInStreamfalsefalseLATESTUsed to specify the position in the stream where the connector should start from, The available options are:
- AT_TIMESTAMP: Start from the record at or after the specified timestamp
- LATEST: Start after the most recent data record (fetch new data)
- TRIM_HORIZON: Start from the oldest available data record
startAtTimefalsefalseNoneIf the initalPositionInStream is set to ‘AT_TIMESTAMP’, then this property specifies the point in time to start consumption.
applicationNamefalsefalse”pulsar-dynamodb”Name of the dynamodb consumer application.
checkpointIntervalfalsefalse60000The frequency of the stream checkpointing (in milliseconds).
backoffTimefalsefalse3000The amount of time to delay between requests when the connector encounters a Throttling exception from dynamodb (in milliseconds).
numRetriesfalsefalse3The number of re-attempts to make when the connector encounters an exception while trying to set a checkpoint.
receiveQueueSizefalsefalse1000The maximum number of AWS Records that can be buffered inside the connector.
dynamoEndpointfalsefalseNoneDynamo end-point url.
cloudwatchEndpointfalsefalseNoneCloudwatch end-point url.

Configure AwsCredentialProviderPlugin

AWS DynamoDB source connector allows you to use three ways to connect to AWS DynamoDB by configuring awsCredentialPluginName.
  • Leave awsCredentialPluginName empty to get the connector authenticated by passing accessKey and secretKey in awsCredentialPluginParam.
    {"accessKey":"Your access key","secretKey":"Your secret key"}
    
  • Set awsCredentialPluginName to org.apache.pulsar.io.aws.AwsDefaultProviderChainPlugin to use the default AWS provider chain. With this option, you don’t need to configure awsCredentialPluginParam. For more information, see AWS documentation.
  • Set awsCredentialPluginNameto org.apache.pulsar.io.aws.STSAssumeRoleProviderPlugin to use the default AWS provider chain, and you need to configure roleArn and roleSessionNmae in awsCredentialPluginParam. For more information, see AWS documentation
    {"roleArn": "arn...", "roleSessionName": "name"}