The AWS Lambda sink connector allows you to send messages from Apache Pulsar to AWS Lambda.
The AWS Lambda sink connector is a Pulsar IO connector for pulling messages from Pulsar topics to AWS Lambda to invoke Lambda functions.
The prerequisites for connecting an AWS Lambda sink connector to external systems include:
AccessKey
(
Please record AccessKey
and SecretAccessKey
).The following command shows how to use pulsarctl to create a builtin
connector. If you want to create a non-builtin
connector,
you need to replace --sink-type aws-lambda
with --archive /path/to/pulsar-io-aws-lambda.nar
. You can find the button to download the nar
package at the beginning of the document.
If you are a StreamNative Cloud user, you need set up your environment first.
The --sink-config
is the minimum necessary and recommended configuration for starting this connector, and it is a JSON string. You need to substitute the relevant parameters with your own.
If you want to configure more parameters, see Configuration Properties for reference.
You can also choose to use a variety of other tools to create a connector:
pulsar-admin
are similar to those of pulsarctl
. You can find an example for StreamNative Cloud Doc.If your connector is created on StreamNative Cloud, you need to authenticate your clients. See Build applications using Pulsar clients for more information.
You can also send the message using the command line:
Once you have sent messages to your Pulsar topic, the AWS Lambda sink connector should automatically forward them to the specified AWS Lambda function. To verify that your messages have been correctly received by AWS Lambda, you can inspect the logs in the AWS Management Console.
Here are the steps to inspect messages in AWS Lambda:
Remember, the logs may take a few minutes to appear in CloudWatch due to the nature of distributed systems and potential network latencies.
If you do not see your messages in the logs, make sure that your AWS Lambda function is correctly logging incoming events. You may need to modify your function to explicitly log the event data it receives.
By regularly checking the CloudWatch logs for your AWS Lambda function, you can ensure that your Pulsar AWS Lambda sink connector is correctly forwarding messages and troubleshoot any issues that may arise.
Before using the AWS Lambda sink connector, you need to configure it. This table outlines the properties and the descriptions.
Name | Type | Required | Sensitive | Default | Description |
---|---|---|---|---|---|
awsEndpoint | String | false | false | ” ” (empty string) | The AWS Lambda endpoint URL. It can be found at AWS Lambda endpoints and quotas. |
awsRegion | String | true | false | ” ” (empty string) | The supported AWS region. For example, us-west-1 , us-west-2 . |
awsAccessKey | String | false | true | ” ” (empty string) | The AWS access key. See here for how to get it: Managing Access Keys for IAM Users. |
awsSecretKey | String | false | true | ” ” (empty string) | The AWS secret key. See here for how to get it: Managing Access Keys for IAM Users. |
lambdaFunctionName | String | true | false | ” ” (empty string) | The Lambda function that should be invoked by the messages. |
awsCredentialPluginName | String | false | false | ” ” (empty string) | The fully-qualified class name of the AwsCredentialProviderPlugin implementation. |
awsCredentialPluginParam | String | false | true | ” ” (empty string) | The JSON parameters to initialize AwsCredentialsProviderPlugin . |
synchronousInvocation | Boolean | false | false | true | - true : invoke a Lambda function synchronously. - false : invoke a Lambda function asynchronously. |
payloadFormat | String | false | false | ”V1” | The format of the payload to be sent to the lambda function. Valid values are “V1” and “V2”. “V1” is the default value. |
metadataFields | String | false | false | ”topic,key,partitionIndex,sequence,properties,eventTime” | The metadata fields to be sent to the lambda function. Valid values are topic,key,partitionIndex,sequence,properties,eventTime,publishTime , This configuration only takes effect when using the V2 data format (payloadFormat=V2). |
batchMaxSize | Integer | false | false | 10 | The maximum number of records to send to the lambda function in a single batch. This configuration only takes effect when using the V2 data format (payloadFormat=V2). |
batchMaxBytesSize | Integer | false | false | 262144 | The maximum size of the payload to send to the lambda function in a single batch. This configuration only takes effect when using the V2 data format (payloadFormat=V2). |
batchMaxTimeMs | Integer | false | false | 5000 | The maximum wait time for batching in milliseconds. This configuration only takes effect when using the V2 data format (payloadFormat=V2). |
The payload refers to the actual data that the AWS Lambda sink connector sends to the AWS Lambda function. The AWS
Lambda sink connector supports two payload formats: V1
and V2
. It is strongly recommended for you to utilize
the V2
payload format.
The V2
payload format provides a more standardized method for managing message data, with added support for schema
conversion and batching.
The V1
payload format is the default payload format. It incorporates three types of data formats, all of which are
represented as JSON objects.
Record
object into a JSON object encounters an
exception, the connector will attempt to convert the message value itself into a JSON object. The format of this data
entirely depends on how the user has set the message value. It can take any form specified by the user.Here is an example of the V1 format payload:
The payload in the V2
format consists of an array of JSON objects, each representing a message. Each message includes
metadata fields and a value, with the value being either a JSON object or a primitive JSON value.
Here is an example of the V2 payload format:
The AWS Lambda sink connector supports the following schema types: Primitive Schema
, Avro Schema
, and JSON Schema
.
For the primitive type, the payload format is as follows:
The value types include: Number, Boolean, and String. Here’s a table indicating the conversion type for each Primitive Schema Type:
Primitive Schema Type | JSON Conversion Type | Example |
---|---|---|
Boolean | Boolean | true |
INT8, INT16, INT32, INT64, FLOAT, DOUBLE | Number | 1234 |
STRING | String | ”Hello” |
BYTES | Base64-encoded String | ”SGVsbG8=” (base64-encoded version of the string “Hello”) |
DATE, TIME, TIMESTAMP | ISO 8601 String (yyy-MM-dd’T’HH:mm:ss.SSSXXX) | ‘2023-10-30T06:13:48.123+08:00’ |
LocalDate | ISO 8601 String (yyyy-MM-dd) | ‘2023-10-17’ |
LocalTime | ISO 8601 String (HH:mm:ss.SSSSSSSSS) | ‘04:30:33.123456789’ |
LocalDateTime | ISO 8601 String (yyyy-MM-dd’T’HH:mm:ss.SSSSSSSSS) | ‘2023-10-17T04:30:33.123456789’ |
Instant | ISO 8601 String (yyyy-MM-dd’T’HH:mm:ss.SSSSSSSSSXXX) | ‘2023-10-30T06:13:48.123456789+08:00’ |
For the struct schema types JSON
and AVRO
, the value is converted into a JSON object. The conversion rules outlined
in the Primitive schema section
are applied to all primitive type fields within this value object. Nested objects are
also supported.
Here is an example:
Here are the rules for handling the logical type of the Avro based struct schema (AVRO
and JSON
):
Logical Type | JSON Conversion Type | Example |
---|---|---|
time-millis , time-micros | ISO 8601 String (HH:mm:ss.SSS) | ‘13:48:41.123’ |
timestamp-millis | ISO 8601 String (yyy-MM-dd’T’HH:mm:ss.SSSXXX) | ‘2023-10-30T06:13:48.123+08:00’ |
timestamp-micros | ISO 8601 String (yyy-MM-dd’T’HH:mm:ss.SSSSSSXXX) | ‘2023-10-30T06:13:48.123456+08:00’ |
local-timestamp-millis | ISO 8601 String (yyyy-MM-dd’T’HH:mm:ss.SSS) | ‘2023-10-29T22:13:48.123’ |
local-timestamp-micros | ISO 8601 String (yyyy-MM-dd’T’HH:mm:ss.SSSSSS) | ‘2023-10-29T22:13:48.123456’ |
You can select the metadata fields through the metaDataField
configuration. The supported metadata fields include:
topic
: The source topic namekey
: The string type keypartitionIndex
: The partition index of the topicsequence
: The sequence ID of the messageproperties
: The String to String mapeventTime
: The event time of the message in the ISO 8601 formatmessageId
: The string representation of a message ID. e.g., ‘1:1:-1:-1’The AWS Lambda sink connector supports combining multiple messages into a single Lambda invocation for delivery. Each batch is a V2 format payload that contains multiple messages. The batching support only works when using the V2 format.
You can use the following configurations to control the batch sink policy:
batchMaxSize
: The maximum number of records to send to the Lambda function in a single batch.batchMaxBytesSize
: The maximum size of the payload to send to the Lambda function in a single batch.batchMaxTimeMs
: The maximum wait time for batching in milliseconds.You can simply set batchMaxSize
to 1
to disable batching.
Please note that AWS Lambda has a payload quota limit: AWS Lambda Payload Quotas. The byte size of a batch should not exceed this quota limit.