sink
AWS EventBridge Sink Connector

Available on
StreamNative Cloud console

Authored by
shibd,sijie,Anonymitaet,RobertIndie
Support type
streamnative
License
Business License

The Amazon EventBridge sink connector pulls data from Pulsar topics and persists data to Amazon EventBridge.

Quick start

Prerequisites

The prerequisites for connecting an AWS EventBridge sink connector to external systems include:

  1. Create EventBridge and EventBus in AWS.
  2. Create the AWS User and create AccessKey(Please record AccessKey and SecretAccessKey).
  3. Assign permissions to AWS User, and ensure they have the PutEvents permissions to the AWS EventBus. For details, see permissions for event buses
  {
    "Version": "2012-10-17",
    "Statement": [
      {
        "Sid": "AllowAccountToPutEvents",
        "Effect": "Allow",
        "Principal": {
          "AWS": "<ACCOUNT_ID>"
        },
        "Action": "events:PutEvents",
        "Resource": "{EventBusArn}"
      }
    ]
}
  • You can set permissions directly for this user. With this method, when you create a connector, you only need to configure accessKey and secretAccessKey.
  • Or you can use Security Token Service, this video explains how to use STS on AWS. With this method, when you create a connector, in addition to configuring accessKey and secretAccessKey, you also need to configure role and roleSessionName.
  1. Create a Rule in EventBridge.
  • The data structure sent to Event Bridge is described [here](## Metadata mapping), and you can create event pattern based on this structure.
  • Set the target according to your needs. If you're testing this connector, you can set the target to Cloud Watch.

1. Create a connector

The following command shows how to use pulsarctl to create a builtin connector. If you want to create a non-builtin connector, you need to replace --sink-type aws-eventbridge with --archive /path/to/pulsar-io-aws-eventbridge.nar. You can find the button to download the nar package at the beginning of the document.

For StreamNative Cloud User

If you are a StreamNative Cloud user, you need set up your environment first.

pulsarctl sinks create \
  --sink-type aws-eventbridge \
  --name aws-eventbridge-sink \
  --tenant public \
  --namespace default \
  --inputs "Your topic name" \
  --parallelism 1 \
  --sink-config \
  '{
    "accessKeyId": "Your AWS access key", 
    "secretAccessKey": "Your AWS secret access key",
    "region": "Your event bridge region",
    "eventBusName": "Your eventbus name"
  }'

The --sink-config is the minimum necessary configuration for starting this connector, and it is a JSON string. You need to substitute the relevant parameters with your own. If you want to configure more parameters, see Configuration Properties for reference.

Note

You can also choose to use a variety of other tools to create a connector:

2. Send messages to the topic

Note

If your connector is created on StreamNative Cloud, you need to authenticate your clients. See Build applications using Pulsar clients for more information.

        PulsarClient client = PulsarClient.builder()
            .serviceUrl("{{Your Pulsar URL}}")
            .build();

        Producer<String> producer = client.newProducer(Schema.STRING)
            .topic("{{Your topic name}}")
            .create();

        String message = "{\"msg\": \"msg-data\"}";
        MessageId msgID = producer.send(message);
        System.out.println("Publish " + message + " and message ID " + msgID);
        
        producer.flush();
        producer.close();
        client.close();

3. Show data on AWS EventBridge

The connector will send the following format of JSON event to EventBridge.

{
  "version": "0",
  "id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718",
  "detail-type": "{{Your topic name}}",
  "source": "{{Your connector name}}",
  "account": "111122223333",
  "time": "2017-12-22T18:43:48Z",
  "region": "us-west-1",
  "resources": [
    "arn:aws:ec2:us-west-1:123456789012:instance/i-1234567890abcdef0"
  ],
  "detail": {
    "data": {
      "msg": "msg-data"
    },
    "message_id": "124:191:0"
  }
}

Configuration Properties

Before using the AWS EventBridge sink connector, you need to configure it. This table outlines the properties and the descriptions.

NameTypeRequiredSensitiveDefaultDescription
accessKeyIdStringyestrue"" (empty string)The AWS EventBridge access key ID.
secretAccessKeyStringyestrue"" (empty string)The AWS EventBridge secret access key.
regionStringyesfalse"" (empty string)The region where AWS EventBridge service is located. All AWS region
eventBusNameStringyesfalse"" (empty string)The Event Bus name.
roleStringfalsefalse"" (empty string)The AWS STS roleArn. Example: arn:aws:iam::598203581484:role/test-role
roleSessionNameStringfalsefalse"" (empty string)The AWS role session name, Name it yourself.
stsEndpointStringfalsefalse"" (empty string)The AWS STS endpoint. By default, the default STS endpoint: https://sts.amazonaws.com is used. See Amazon documentation for more details.
stsRegionStringfalsefalse"" (empty string)The AWS STS region, By default, the 'region' config or env region is used.
eventBusResourceNameStringnofalse"" (empty string)The Event Bus ARN (AWS Resource Name). Example: arn:aws:events:ap-northeast-1:598263551484:event-bus/my_eventbus
metaDataFieldStringnofalse"" (empty string)The metadata fields added to the event. Multiple fields are separated with commas. Optional values: schema_version, partition, event_time, publish_time, message_id, sequence_id, producer_name, key, and properties.
batchPendingQueueSizeintnofalse1000Pending queue size. This value must be greater than batchMaxSize.
batchMaxSizeintnofalse10Maximum number of batch messages. The number must be less than or equal to 10 (AWS EventBridge required).
batchMaxBytesSizelongnofalse640Maximum number of batch bytes payload size. This value cannot be greater than 512KB.
batchMaxTimeMslongnofalse5000Batch max wait time: milliseconds.
maxRetryCountlongnofalse100Maximum number of retries to send events, when put events failed.
intervalRetryTimeMslongnofalse1000The interval time(milliseconds) for each retry, when the put events failed.

For details about this connector's advanced features and configurations, see Advanced features.

Advanced features

Delivery guarantees

The AWS EventBridge sink connector provides two delivery guarantees: at-most-once and at-least-once.

Note

Currently, the effectively-once delivery guarantee is not supported, because Amazon EventBridge cannot offer the support of the Sink downstream system.

Data convert

In AWS EventBridge, all events is JSON format.

Pulsar supports multiple schema types. When receiving the data from Pulsar, the AWS EventBridge sink connectors recognize it and convert it to a JSON string according to the following table:

Pulsar SchemaConvert to JSONNote
Primitive✔*Just support primitive type is string and data is JSON format.
Avro✔Take advantage of toolkit conversions
Json✔Just send it directly
ProtobufXThe Protobuf schema is based on the Avro schema. It uses Avro as an intermediate format, so it may not provide the best effort conversion.
ProtobufNative✔Take advantage of toolkit conversions

In EventBridge, the user data is in the detail$data field.

{
  "version": "0",
  "id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718",
  "detail": {
    "data": {
      "instance-id": " i-1234567890abcdef0",
      "state": "terminated"
    }
  }
}

Metadata mapping

In EventBridge, a complete event contains many system fields. These system fields can help you to configure the rule.

An Event containing event data:

{
  "version": "0",
  "id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718",
  "source-type": "test-aws-event-bridge-sink-connector",
  "detail-type": "topic_name_test_1",
  "source": "aws.ec2",
  "account": "111122223333",
  "time": "2017-12-22T18:43:48Z",
  "region": "us-west-1",
  "resources": [
    "arn:aws:ec2:us-west-1:123456789012:instance/i-1234567890abcdef0"
  ],
  "detail": {
    "data": {
      "instance-id": " i-1234567890abcdef0",
      "state": "terminated"
    }
  }
}

This connector maps the following fields:

  • sourceType: The default value is ${{Connector Name}}.
  • detailType: The default value is ${{Topic Name}}.

And, this connector supports setting the metadata of Pulsar to every Event (set in the detail field).

You can select the desired metadata through the following configuration:

#
optional: schema_version | partition | event_time | publish_time
#
message_id | sequence_id | producer_name | key | properties
metaDataField = event_time, message_id

An Event containing metadata :

{
  "version": "0",
  "id": "6a7e8feb-b491-4cf7-a9f1-bf3703467718",
  "source-type": "test-aws-event-bridge-sink-connector",
  "detail-type": "topic_name_test_1",
  "source": "aws.ec2",
  "account": "111122223333",
  "time": "2017-12-22T18:43:48Z",
  "region": "us-west-1",
  "resources": [
    "arn:aws:ec2:us-west-1:123456789012:instance/i-1234567890abcdef0"
  ],
  "detail": {
    "data": {
      "instance-id": " i-1234567890abcdef0",
      "state": "terminated"
    },
    "event_time": 789894645625,
    "message_id": "1,1,1"
  }
}

Parallelism

You can configure the parallelism of Sink execution by using the scheduling mechanism of the Function, and multiple sink instances will be scheduled to run on different worker nodes. Multiple sinks will consume messages together according to the configured subscription mode.

Since EventBus doesn't need to guarantee sequentiality, the connectors support the shared subscription model.

To increase the write throughput, you can configure the following:

parallelism = 4

When retainOrdering is set to false, the Shared subscription mode is used.

Batch Put

AWS EventBridge connectors support batch put events, which are mainly controlled by the following three parameters:

  • batchSize: When the buffered message is larger than batchSize, it will trigger flush (put) events. 0 means no trigger.
  • maxBatchBytes: When the buffered message data size is larger than maxBatchBytes, it will trigger flush pending events. This value should be less than 256000 and greater than 0, The default value is 640.
  • batchTimeMs: When the interval from the last flush exceeds batchTimeMs, it will trigger flush pending events. 0 means no trigger.

In addition to these three parameters that control flush behavior, in AWS EventBridge, batches larger than 256KB per write are not allowed. So, when the buffered message is larger than 256KB, it will trigger a flush.

Retry Put

In AWS Event Bridge, about Handling failures with PutEvents, It suggests retrying each error message until it succeeds.

This connector will provide two flow configs for the controller's retry strategy:

maxRetryCount: 100 // Maximum retry send event count, when event send failed.
intervalRetryTimeMs: 1000 //The interval time(milliseconds) for each retry, when event send failed.