The AWS Lambda sink connector is a Pulsar IO connector for pulling messages from Pulsar topics to AWS Lambda to invoke Lambda functions.
How to get
This section describes how to build the AWS Lambda sink connector.
Work with Function Worker
You can get the AWS Lambda sink connector using one of the following methods if you use Pulsar Function Worker to run connectors in a cluster.
Download the NAR package from the download page.
Build it from the source code.
To build the AWS Lambda sink connector from the source code, follow these steps.
Clone the source code to your machine.
git clone https://github.com/streamnative/pulsar-io-aws-lambda
Build the connector in the
pulsar-io-aws-lambda
directory.mvn clean install -DskipTests
After the connector is successfully built, a
NAR
package is generated under thetarget
directory.ls target pulsar-io-aws-lambda-2.10.5.17.nar
Work with Function Mesh
You can pull the AWS Lambda sink connector Docker image from the Docker Hub if you use Function Mesh to run the connector.
How to configure
Before using the AWS Lambda sink, you need to configure it. This table lists the properties and the descriptions.
Name | Type | Required | Default | Description |
---|---|---|---|---|
awsEndpoint | String | false | " " (empty string) | The AWS Lambda endpoint URL. It can be found at AWS Lambda endpoints and quotas. |
awsRegion | String | true | " " (empty string) | The supported AWS region. For example, us-west-1 , us-west-2 . |
awsCredentialPluginName | String | false | " " (empty string) | The fully-qualified class name of the AwsCredentialProviderPlugin implementation. |
awsCredentialPluginParam | String | true | " " (empty string) | The JSON parameter to initialize AwsCredentialsProviderPlugin . |
lambdaFunctionName | String | true | " " (empty string) | The Lambda function that should be invoked by the messages. |
synchronousInvocation | Boolean | true | true | <br />- true : invoke a Lambda function synchronously. <br />- false : invoke a Lambda function asynchronously. |
Work with Function Worker
You can create a configuration file (JSON or YAML) to set the properties if you use Pulsar Function Worker to run connectors in a cluster.
Example
JSON
{ "tenant": "public", "namespace": "default", "name": "aws-lambda-sink", "inputs": [ "test-aws-lambda-topic" ], "archive": "connectors/pulsar-io-aws-lambda-2.10.5.17.nar", "parallelism": 1, "configs": { "awsEndpoint": "https://lambda.us-west-2.amazonaws.com", "awsRegion": "us-west-2", "lambdaFunctionName": "test-function", "awsCredentialPluginName": "", "awsCredentialPluginParam": '{"accessKey":"myKey","secretKey":"my-Secret"}', "synchronousInvocation": true } }
YAML
tenant: "public" namespace: "default" name: "aws-lambda-sink" inputs: - "test-aws-lambda-topic" archive: "connectors/pulsar-io-aws-lambda-2.10.5.17.nar" parallelism: 1 configs: awsEndpoint: "https://lambda.us-west-2.amazonaws.com" awsRegion: "us-west-2" lambdaFunctionName: "test-function" awsCredentialPluginName: "" awsCredentialPluginParam: '{"accessKey":"myKey","secretKey":"my-Secret"}' synchronousInvocation: true
Work with Function Mesh
You can create a CustomResourceDefinitions (CRD) to create an AWS Lambda sink connector. Using CRD makes Function Mesh naturally integrate with the Kubernetes ecosystem. For more information about Pulsar sink CRD configurations, see sink CRD configurations.
You can define a CRD file (YAML) to set the properties as below.
apiVersion: compute.functionmesh.io/v1alpha1
kind: Sink
metadata:
name: aws-lambda-sink-sample
spec:
image: streamnative/pulsar-io-aws-lambda:2.10.5.17
className: org.apache.pulsar.ecosystem.io.aws.lambda.AWSLambdaBytesSink
replicas: 1
input:
topics:
- persistent://public/default/destination
typeClassName: “[B”
sinkConfig:
awsEndpoint: "https://lambda.us.us-west-2.amazonaws.com"
awsRegion: "us-west-2"
lambdaFunctionName: "test-function"
awsCredentialPluginName: ""
awsCredentialPluginParam: '{"accessKey":"myKey","secretKey":"my-Secret"}'
synchronousInvocation: true
pulsar:
pulsarConfig: "test-pulsar-sink-config"
resources:
limits:
cpu: "0.2"
memory: 1.1G
requests:
cpu: "0.1"
memory: 1G
java:
jar: connectors/pulsar-io-aws-lambda-2.10.5.17.nar
clusterName: test-pulsar
autoAck: true
How to use
You can use the AWS Lambda sink connector with Function Worker or Function Mesh.
Work with Function Worker
You can use the AWS Lambda sink connector as a non built-in connector or a built-in connector.
If you already have a Pulsar cluster, you can use the AWS Lambda sink connector as a non built-in connector directly.
This example shows how to create an AWS Lambda sink connector on a Pulsar cluster using the pulsar-admin sinks create
command.
PULSAR_HOME/bin/pulsar-admin sinks create \
--archive pulsar-io-aws-lambda-2.10.5.17.nar \
--sink-config-file aws-lambda-sink-config.yaml \
--classname org.apache.pulsar.ecosystem.io.aws.lambda.AWSLambdaBytesSink \
--name aws-lambda-sink
Work with Function Mesh
This example describes how to create an AWS Lambda sink connector for a Kuberbetes cluster using Function Mesh.
Prerequisites
Create and connect to a Kubernetes cluster.
Create a Pulsar cluster in the Kubernetes cluster.
Install the Function Mesh Operator and CRD into the Kubernetes cluster.
Step
Define the AWS Lambda sink connector with a YAML file and save it as
sink-sample.yaml
.This example shows how to publish the AWS Lambda sink connector to Function Mesh with a Docker image.
apiVersion: compute.functionmesh.io/v1alpha1 kind: Sink metadata: name: aws-lambda-sink-sample spec: image: streamnative/pulsar-io-aws-lambda:2.10.5.17 className: org.apache.pulsar.ecosystem.io.aws.lambda.AWSLambdaBytesSink replicas: 1 input: topics: - persistent://public/default/destination typeClassName: “[B” sinkConfig: awsEndpoint: "https://lambda.us.us-west-2.amazonaws.com" awsRegion: "us-west-2" lambdaFunctionName: "test-function" awsCredentialPluginName: "" awsCredentialPluginParam: '{"accessKey":"myKey","secretKey":"my-Secret"}' synchronousInvocation: true pulsar: pulsarConfig: "test-pulsar-sink-config" resources: limits: cpu: "0.2" memory: 1.1G requests: cpu: "0.1" memory: 1G java: jar: connectors/pulsar-io-aws-lambda-2.10.5.17.nar clusterName: test-pulsar autoAck: true
Apply the YAML file to create the AWS Lambda sink connector.
Input
kubectl apply -f <path-to-sink-sample.yaml>
Output
sink.compute.functionmesh.io/aws-lambda-sink-sample created
Check whether the AWS Lambda sink connector is created successfully.
Input
kubectl get all
Output
NAME READY STATUS RESTARTS AGE pod/aws-lambda-sink-sample-0 1/1 Running 0 77s
After that, you can use the AWS Lambda sink connector to export Pulsar messages to AWS Lambda to invoke Lambda functions.