sink
AWS Lambda Sink

Available on
StreamNative Cloud console

Authored by
freeznet,timmyyuan,Anonymitaet,Technoboy-
Support type
streamnative
License
Business License

The AWS Lambda sink connector is a Pulsar IO connector for pulling messages from Pulsar topics to AWS Lambda to invoke Lambda functions.

How to get

This section describes how to build the AWS Lambda sink connector.

Work with Function Worker

You can get the AWS Lambda sink connector using one of the following methods if you use Pulsar Function Worker to run connectors in a cluster.

To build the AWS Lambda sink connector from the source code, follow these steps.

  1. Clone the source code to your machine.

    git clone https://github.com/streamnative/pulsar-io-aws-lambda
    
  2. Build the connector in the pulsar-io-aws-lambda directory.

    mvn clean install -DskipTests
    

    After the connector is successfully built, a NAR package is generated under the target directory.

    ls target
    pulsar-io-aws-lambda-2.8.4.2.nar
    

Work with Function Mesh

You can pull the AWS Lambda sink connector Docker image from the Docker Hub if you use Function Mesh to run the connector.

How to configure

Before using the AWS Lambda sink, you need to configure it. This table lists the properties and the descriptions.

NameTypeRequiredDefaultDescription
awsEndpointStringfalse" " (empty string)The AWS Lambda endpoint URL. It can be found at AWS Lambda endpoints and quotas.
awsRegionStringtrue" " (empty string)The supported AWS region. For example, us-west-1, us-west-2.
awsCredentialPluginNameStringfalse" " (empty string)The fully-qualified class name of the AwsCredentialProviderPlugin implementation.
awsCredentialPluginParamStringtrue" " (empty string)The JSON parameter to initialize AwsCredentialsProviderPlugin.
lambdaFunctionNameStringtrue" " (empty string)The Lambda function that should be invoked by the messages.
synchronousInvocationBooleantruetrue<br />- true: invoke a Lambda function synchronously. <br />- false: invoke a Lambda function asynchronously.

Work with Function Worker

You can create a configuration file (JSON or YAML) to set the properties if you use Pulsar Function Worker to run connectors in a cluster.

Example

  • JSON

      {
        "tenant": "public",
        "namespace": "default",
        "name": "aws-lambda-sink",
        "inputs": [
          "test-aws-lambda-topic"
        ],
        "archive": "connectors/pulsar-io-aws-lambda-2.8.4.2.nar",
        "parallelism": 1,
        "configs":
        {
            "awsEndpoint": "https://lambda.us-west-2.amazonaws.com",
            "awsRegion": "us-west-2",
            "lambdaFunctionName": "test-function",
            "awsCredentialPluginName": "",
            "awsCredentialPluginParam": '{"accessKey":"myKey","secretKey":"my-Secret"}',
            "synchronousInvocation": true
        }
      }
    
  • YAML

    tenant: "public"
    namespace: "default"
    name: "aws-lambda-sink"
    inputs: 
     - "test-aws-lambda-topic"
    archive: "connectors/pulsar-io-aws-lambda-2.8.4.2.nar"
    parallelism: 1
    
    configs:
     awsEndpoint: "https://lambda.us-west-2.amazonaws.com"
     awsRegion: "us-west-2"
     lambdaFunctionName: "test-function"
     awsCredentialPluginName: ""
     awsCredentialPluginParam: '{"accessKey":"myKey","secretKey":"my-Secret"}'
     synchronousInvocation: true
    

Work with Function Mesh

You can create a CustomResourceDefinitions (CRD) to create an AWS Lambda sink connector. Using CRD makes Function Mesh naturally integrate with the Kubernetes ecosystem. For more information about Pulsar sink CRD configurations, see sink CRD configurations.

You can define a CRD file (YAML) to set the properties as below.

apiVersion: compute.functionmesh.io/v1alpha1
kind: Sink
metadata:
  name: aws-lambda-sink-sample
spec:
  image: streamnative/pulsar-io-aws-lambda:2.8.4.2
  className: org.apache.pulsar.ecosystem.io.aws.lambda.AWSLambdaBytesSink
  replicas: 1
  input:
    topics: 
    - persistent://public/default/destination
    typeClassName:[B”
  sinkConfig:
    awsEndpoint: "https://lambda.us.us-west-2.amazonaws.com"
    awsRegion: "us-west-2"
    lambdaFunctionName: "test-function"
    awsCredentialPluginName: ""
    awsCredentialPluginParam: '{"accessKey":"myKey","secretKey":"my-Secret"}'
    synchronousInvocation: true
  pulsar:
    pulsarConfig: "test-pulsar-sink-config"
  resources:
    limits:
    cpu: "0.2"
    memory: 1.1G
    requests:
    cpu: "0.1"
    memory: 1G
  java:
    jar: connectors/pulsar-io-aws-lambda-2.8.4.2.nar
  clusterName: test-pulsar
  autoAck: true

How to use

You can use the AWS Lambda sink connector with Function Worker or Function Mesh.

Work with Function Worker

You can use the AWS Lambda sink connector as a non built-in connector or a built-in connector.

If you already have a Pulsar cluster, you can use the AWS Lambda sink connector as a non built-in connector directly.

This example shows how to create an AWS Lambda sink connector on a Pulsar cluster using the pulsar-admin sinks create command.

PULSAR_HOME/bin/pulsar-admin sinks create \
--archive pulsar-io-aws-lambda-2.8.4.2.nar \
--sink-config-file aws-lambda-sink-config.yaml \
--classname org.apache.pulsar.ecosystem.io.aws.lambda.AWSLambdaBytesSink \
--name aws-lambda-sink

Work with Function Mesh

This example describes how to create an AWS Lambda sink connector for a Kuberbetes cluster using Function Mesh.

Prerequisites

Step

  1. Define the AWS Lambda sink connector with a YAML file and save it as sink-sample.yaml.

    This example shows how to publish the AWS Lambda sink connector to Function Mesh with a Docker image.

    apiVersion: compute.functionmesh.io/v1alpha1
    kind: Sink
    metadata:
      name: aws-lambda-sink-sample
    spec:
      image: streamnative/pulsar-io-aws-lambda:2.8.4.2
      className: org.apache.pulsar.ecosystem.io.aws.lambda.AWSLambdaBytesSink
      replicas: 1
      input:
        topics: 
        - persistent://public/default/destination
        typeClassName:[B”
      sinkConfig:
        awsEndpoint: "https://lambda.us.us-west-2.amazonaws.com"
        awsRegion: "us-west-2"
        lambdaFunctionName: "test-function"
        awsCredentialPluginName: ""
        awsCredentialPluginParam: '{"accessKey":"myKey","secretKey":"my-Secret"}'
        synchronousInvocation: true
      pulsar:
        pulsarConfig: "test-pulsar-sink-config"
      resources:
        limits:
        cpu: "0.2"
        memory: 1.1G
        requests:
        cpu: "0.1"
        memory: 1G
      java:
        jar: connectors/pulsar-io-aws-lambda-2.8.4.2.nar
      clusterName: test-pulsar
      autoAck: true
    
  2. Apply the YAML file to create the AWS Lambda sink connector.

    Input

    kubectl apply -f <path-to-sink-sample.yaml>
    

    Output

    sink.compute.functionmesh.io/aws-lambda-sink-sample created
    
  3. Check whether the AWS Lambda sink connector is created successfully.

    Input

    kubectl get all
    

    Output

    NAME                                         READY   STATUS      RESTARTS   AGE
    pod/aws-lambda-sink-sample-0               1/1    Running     0          77s
    

    After that, you can use the AWS Lambda sink connector to export Pulsar messages to AWS Lambda to invoke Lambda functions.