The AWS Lambda sink connector is a Pulsar IO connector for pulling messages from Pulsar topics to AWS Lambda to invoke Lambda functions.
How to get
You can get the AWS Lambda sink connector using one of the following methods:
Download the NAR package from here.
Build it from the source code.
Clone the source code to your machine.
git clone https://github.com/streamnative/pulsar-io-aws-lambda
Assume that
PULSAR_IO_AWS_LAMBDA_HOME
is the home directory for thepulsar-io-aws-lambda
repo. Build the connector in the${PULSAR_IO_AWS_LAMBDA_HOME}
directory.mvn clean install -DskipTests
After the connector is successfully built, a
NAR
package is generated under thetarget
directory.ls target pulsar-io-aws-lambda-2.7.0.nar
How to configure
Before using the AWS Lambda sink connector, you need to configure it.
You can create a configuration file (JSON or YAML) to set the following properties.
Name | Type | Required | Default | Description |
---|---|---|---|---|
awsEndpoint | String | false | " " (empty string) | AWS Lambda end-point URL. It can be found at here. |
awsRegion | String | true | " " (empty string) | Supported AWS region. For example, us-west-1, us-west-2. |
awsCredentialPluginName | String | false | " " (empty string) | Fully-qualified class name of implementation of AwsCredentialProviderPlugin . |
awsCredentialPluginParam | String | true | " " (empty string) | JSON parameter to initialize AwsCredentialsProviderPlugin . |
lambdaFunctionName | String | true | " " (empty string) | The Lambda function that should be invoked by the messages. |
synchronousInvocation | Boolean | true | true | true means invoking a Lambda function synchronously. <br>false means invoking a Lambda function asynchronously. |
Example
JSON
{ "tenant": "public", "namespace": "default", "name": "aws-lambda-sink", "inputs": [ "test-aws-lambda-topic" ], "archive": "connectors/pulsar-io-aws-lambda-2.7.0.nar", "parallelism": 1, "configs": { "awsEndpoint": "https://lambda.us-west-2.amazonaws.com", "awsRegion": "us-west-2", "lambdaFunctionName": "test-function", "awsCredentialPluginName": "", "awsCredentialPluginParam": '{"accessKey":"myKey","secretKey":"my-Secret"}', "synchronousInvocation": true } }
YAML
tenant: "public" namespace: "default" name: "aws-lambda-sink" inputs: - "test-aws-lambda-topic" archive: "connectors/pulsar-io-aws-lambda-2.7.0.nar" parallelism: 1 configs: awsEndpoint: "https://lambda.us-west-2.amazonaws.com" awsRegion: "us-west-2" lambdaFunctionName: "test-function" awsCredentialPluginName: "" awsCredentialPluginParam: '{"accessKey":"myKey","secretKey":"my-Secret"}' synchronousInvocation: true
How to use
You can use the AWS Lambda sink connector as a non built-in connector or a built-in connector as below.
Use as non built-in connector
If you already have a Pulsar cluster, you can use the AWS Lambda sink connector as a non built-in connector directly.
This example shows how to create an AWS Lambda sink connector on a Pulsar cluster using the pulsar-admin sinks create
command.
PULSAR_HOME/bin/pulsar-admin sinks create \
--archive pulsar-io-aws-lambda-2.7.0.nar \
--sink-config-file aws-lambda-sink-config.yaml \
--classname org.apache.pulsar.ecosystem.io.aws.lambda.AWSLambdaBytesSink \
--name aws-lambda-sink
Use as built-in connector
You can make the AWS Lambda sink connector as a built-in connector and use it on a standalone cluster, on-premises cluster, or K8S cluster.
Standalone cluster
This example describes how to use the AWS Lambda sink connector to pull data from Pulsar topics and persist data to AWS Lambda in standalone mode.
Prepare AWS Lambda service. Make sure the Lambda function is ready to use.
For more information, see Getting Started with Amazon AWS Lambda.
Copy the NAR package of the AWS Lambda connector to the Pulsar connectors directory.
cp pulsar-io-aws-lambda-2.7.0.nar PULSAR_HOME/connectors/pulsar-io-aws-lambda-2.7.0.nar
Start Pulsar in standalone mode.
PULSAR_HOME/bin/pulsar standalone
Run the AWS Lambda sink connector locally.
PULSAR_HOME/bin/pulsar-admin sink localrun \ --sink-type aws-lambda \ --sink-config-file aws-lambda-sink-config.yaml
Send messages to Pulsar topics.
PULSAR_HOME/bin/pulsar-client produce public/default/test-queue-pulsar --messages hello -n 10
Monitor the Lambda function status through the AWS Lambda console.
For more information, see monitoring and troubleshooting Lambda applications.
On-premises cluster
This example explains how to create an AWS Lambda sink connector in an on-premises cluster.
Copy the NAR package of the AWS Lambda connector to the Pulsar connectors directory.
cp pulsar-io-aws-lambda-2.7.0.nar $PULSAR_HOME/connectors/pulsar-io-aws-lambda-2.7.0.nar
Reload all built-in connectors.
PULSAR_HOME/bin/pulsar-admin sinks reload
Check whether the AWS Lambda sink connector is available on the list or not.
PULSAR_HOME/bin/pulsar-admin sinks available-sinks
Create an AWS Lambda sink connector on a Pulsar cluster using the
pulsar-admin sinks create
command.PULSAR_HOME/bin/pulsar-admin sinks create \ --sink-type aws-lambda \ --sink-config-file aws-lambda-sink-config.yaml \ --name aws-lambda-sink
K8S cluster
Build a new image based on the Pulsar image with the AWS Lambda sink connector and push the new image to your image registry. This example tags the new image as
streamnative/pulsar-aws-lambda:2.7.0
.FROM apachepulsar/pulsar-all:2.7.0 RUN curl https://github.com/streamnative/pulsar-io-aws-lambda/releases/download/v2.7.0/pulsar-io-aws-lambda-2.7.0.nar -o /pulsar/connectors/pulsar-io-aws-lambda-2.7.0.nar
Extract the previous
--set
arguments from K8S to thepulsar.yaml
file.helm get values <release-name> > pulsar.yaml
Replace the
images
section in thepulsar.yaml
file with theimages
section ofstreamnative/pulsar-aws-lambda:2.7.0
.Upgrade the K8S cluster with the
pulsar.yaml
file.helm upgrade <release-name> streamnative/pulsar \ --version <new version> \ -f pulsar.yaml
Tip
For more information about how to upgrade a Pulsar cluster with Helm, see Upgrade Guide.
Create a AWS Lambda sink connector on a Pulsar cluster using the
pulsar-admin sinks create
command.PULSAR_HOME/bin/pulsar-admin sinks create \ --sink-type aws-lambda \ --sink-config-file aws-lambda-sink-config.yaml \ --name aws-lambda-sink