This connector allows you to make sink connections from Pulsar to AWS EventBridge.
The Amazon EventBridge sink connector pulls data from Pulsar topics and persists data to Amazon EventBridge.
The prerequisites for connecting an AWS EventBridge sink connector to external systems include:
AccessKey
(Please record AccessKey
and SecretAccessKey
).PutEvents
permissions to the AWS EventBus. For details, see permissions for event busesaccessKey
and secretAccessKey
.accessKey
and secretAccessKey
, you also need to configure role
and roleSessionName
.The data structure sent to Event Bridge is described [here](## Metadata mapping), and you can create event pattern based on this structure.
Set the target according to your needs. If you’re testing this connector, you can set the target to Cloud Watch.
The following command shows how to use pulsarctl to create a builtin
connector. If you want to create a non-builtin
connector,
you need to replace --sink-type aws-eventbridge
with --archive /path/to/pulsar-io-aws-eventbridge.nar
. You can find the button to download the nar
package at the beginning of the document.
If you are a StreamNative Cloud user, you need set up your environment first.
The --sink-config
is the minimum necessary configuration for starting this connector, and it is a JSON string. You need to substitute the relevant parameters with your own.
If you want to configure more parameters, see Configuration Properties for reference.
You can also choose to use a variety of other tools to create a connector:
pulsar-admin
are similar to those of pulsarctl
. You can find an example for StreamNative Cloud Doc.If your connector is created on StreamNative Cloud, you need to authenticate your clients. See Build applications using Pulsar clients for more information.
The connector will send the following format of JSON event to EventBridge.
Before using the AWS EventBridge sink connector, you need to configure it. This table outlines the properties and the descriptions.
Name | Type | Required | Sensitive | Default | Description |
---|---|---|---|---|---|
accessKeyId | String | yes | true | "" (empty string) | The AWS EventBridge access key ID. |
secretAccessKey | String | yes | true | "" (empty string) | The AWS EventBridge secret access key. |
region | String | yes | false | "" (empty string) | The region where AWS EventBridge service is located. All AWS region |
eventBusName | String | yes | false | "" (empty string) | The Event Bus name. |
role | String | false | false | "" (empty string) | The AWS STS roleArn. Example: arn:aws:iam::598203581484:role/test-role |
roleSessionName | String | false | false | "" (empty string) | The AWS role session name, Name it yourself. |
stsEndpoint | String | false | false | "" (empty string) | The AWS STS endpoint. By default, the default STS endpoint: https://sts.amazonaws.com is used. See Amazon documentation for more details. |
stsRegion | String | false | false | "" (empty string) | The AWS STS region, By default, the ‘region’ config or env region is used. |
eventBusResourceName | String | no | false | "" (empty string) | The Event Bus ARN (AWS Resource Name). Example: arn:aws:events:ap-northeast-1:598263551484:event-bus/my_eventbus |
metaDataField | String | no | false | "" (empty string) | The metadata fields added to the event. Multiple fields are separated with commas. Optional values: schema_version , partition , event_time , publish_time , message_id , sequence_id , producer_name , key , and properties . |
batchPendingQueueSize | int | no | false | 1000 | Pending queue size. This value must be greater than batchMaxSize . |
batchMaxSize | int | no | false | 10 | Maximum number of batch messages. The number must be less than or equal to 10 (AWS EventBridge required). |
batchMaxBytesSize | long | no | false | 640 | Maximum number of batch bytes payload size. This value cannot be greater than 512KB. |
batchMaxTimeMs | long | no | false | 5000 | Batch max wait time: milliseconds. |
maxRetryCount | long | no | false | 100 | Maximum number of retries to send events, when put events failed. |
intervalRetryTimeMs | long | no | false | 1000 | The interval time(milliseconds) for each retry, when the put events failed. |
For details about this connector’s advanced features and configurations, see Advanced features.
The AWS EventBridge sink connector provides two delivery guarantees: at-most-once and at-least-once.
Currently, the effectively-once delivery guarantee is not supported, because Amazon EventBridge cannot offer the support of the Sink downstream system.
In AWS EventBridge, all events is JSON format.
Pulsar supports multiple schema types. When receiving the data from Pulsar, the AWS EventBridge sink connectors recognize it and convert it to a JSON string according to the following table:
Pulsar Schema | Convert to JSON | Note |
---|---|---|
Primitive | ✔* | Just support primitive type is string and data is JSON format. |
Avro | ✔ | Take advantage of toolkit conversions |
Json | ✔ | Just send it directly |
Protobuf | X | The Protobuf schema is based on the Avro schema. It uses Avro as an intermediate format, so it may not provide the best effort conversion. |
ProtobufNative | ✔ | Take advantage of toolkit conversions |
In EventBridge, the user data is in the detail$data
field.
In EventBridge, a complete event contains many system fields. These system fields can help you to configure the rule.
An Event containing event data:
This connector maps the following fields:
${{Connector Name}}
.${{Topic Name}}
.And, this connector supports setting the metadata of Pulsar to every Event (set in the detail field).
You can select the desired metadata through the following configuration:
An Event containing metadata :
You can configure the parallelism of Sink execution by using the scheduling mechanism of the Function, and multiple sink instances will be scheduled to run on different worker nodes. Multiple sinks will consume messages together according to the configured subscription mode.
Since EventBus doesn’t need to guarantee sequentiality, the connectors support the shared
subscription model.
To increase the write throughput, you can configure the following:
When
retainOrdering
is set tofalse
, theShared
subscription mode is used.
AWS EventBridge connectors support batch put events, which are mainly controlled by the following three parameters:
0
means no
trigger.batchTimeMs
, it will trigger flush pending
events. 0
means no trigger.In addition to these three parameters that control flush behavior, in AWS EventBridge, batches larger than 256KB per write are not allowed. So, when the buffered message is larger than 256KB, it will trigger a flush.
In AWS Event Bridge, about Handling failures with PutEvents, It suggests retrying each error message until it succeeds.
This connector will provide two flow configs for the controller’s retry strategy: