Connect to your cluster using Kafka Connect

Note

  • This QuickStart assumes that you have created a Pulsar cluster with the Kafka protocol enabled, created a service account, and granted the service account produce and consume permissions to a namespace for the target topic.
  • This QuickStart is developed on a MAC Operating System (OS) environment. If you choose another OS, the commands might vary.

This document shows how to connect to your Pulsar cluster using Kafka Connect through OAuth2 authentication.

Before you begin

Note

  • Before getting the token of a service account, verify that the service account is authorized as a superuser or an admin of the tenants and namespaces.
  • A token has a system-defined Time-To-Live (TTL) of 7 days. Before a token expires, ensure that you generate a new token for your service account.
  • The password for different utilities as kcat will be equal to token:TOKEN
  • Get the JWT token.

    1. On the left navigation pane, click Service Accounts.

    2. In the row of the service account you want to use, in the Token column, click Generate new token, then click the Copy icon to copy the token to your clipboard.

  • Get the service URL of your Pulsar cluster.

    1. On the left navigation pane, in the Admin area, click Pulsar Clusters.
    2. Select the Details tab, and in the Access Points area, click Copy at the end of the row of the Kafka Service URL (TCP).

Configure Elasticsearch

  1. Open a terminal, and run the command below to start an Elasticsearch instance with ports 9200 and 9300.

    docker run --name elastic-1 \
      -p 9200:9200 -p 9300:9300 -it \
      -e "discovery.type=single-node" \
      -e "xpack.security.enabled=false" \
      docker.elastic.co/elasticsearch/elasticsearch:7.17.2
    

    You should see the following output:

    ...output omitted...
    {"@timestamp":"2022-04-15T15:13:17.067Z", "log.level": "INFO", "message":"successfully loaded geoip database file [GeoLite2-Country.mmdb]", "ecs.version": "1.2.0","service.name":"ES_ECS","event.dataset":"elasticsearch.server","process.thread.name":"elasticsearch[efb7b3360ba3][generic][T#7]","log.logger":"org.elasticsearch.ingest.geoip.DatabaseNodeService","elasticsearch.cluster.uuid":"ocHgh5mAQROAlUofYHE3Cg","elasticsearch.node.id":"0aWiWmaBTgC0vdp6Zw_ZnQ","elasticsearch.node.name":"efb7b3360ba3","elasticsearch.cluster.name":"docker-cluster"}
    {"@timestamp":"2022-04-15T15:13:17.118Z", "log.level": "INFO", "message":"successfully loaded geoip database file [GeoLite2-City.mmdb]", "ecs.version": "1.2.0","service.name":"ES_ECS","event.dataset":"elasticsearch.server","process.thread.name":"elasticsearch[efb7b3360ba3][generic][T#13]","log.logger":"org.elasticsearch.ingest.geoip.DatabaseNodeService","elasticsearch.cluster.uuid":"ocHgh5mAQROAlUofYHE3Cg","elasticsearch.node.id":"0aWiWmaBTgC0vdp6Zw_ZnQ","elasticsearch.node.name":"efb7b3360ba3","elasticsearch.cluster.name":"docker-cluster"}
    
  2. Verify that the Elasticsearch instance is started successfully.

    curl 'http://localhost:9200'
    

    You should see the following output:

    {
      "name": "eaf2be7fe2d6",
      "cluster_name": "docker-cluster",
      "cluster_uuid": "jolOS3_VRGq2-LpZbkT1kw",
      "version": {
        "number": "7.17.2",
        "build_flavor": "default",
        "build_type": "docker",
        "build_hash": "de7261de50d90919ae53b0eff9413fd7e5307301",
        "build_date": "2022-03-28T15:12:21.446567561Z",
        "build_snapshot": false,
        "lucene_version": "8.11.1",
        "minimum_wire_compatibility_version": "6.8.0",
        "minimum_index_compatibility_version": "6.0.0-beta1"
      },
      "tagline": "You Know, for Search"
    }
    

Configure Kafka Connect

Kafka Connect is an integration tool that is released with the Apache Kafka project. It provides reliable data streaming between Apache Kafka and external systems and is both scalable and flexible. Kafka Connect works with Kafka on Pulsar (KoP), which is compatible with the Kafka API.

Kafka Connect uses Source and Sink connectors for integration. Source connectors stream data from an external system to Kafka, while Sink connectors stream data from Kafka to an external system.

Pulsar KoP and elasticsearch with kafka connect

  1. Navigate to the Apache downloads page for Kafka, and click the suggested download link for the Kafka 3.1.0 binary package.

  2. Extract the Kafka binaries folder in the _YOUR_HOME_DIRECTORY_/kafka_2.13-3.1.0 directory that you created earlier.

Download the StreamNative OAuth dependency

# download supplementary libraries
curl -O https://repo1.maven.org/maven2/io/streamnative/pulsar/handlers/oauth-client/3.1.0.1/oauth-client-3.1.0.1.jar --output-dir ./libs

Download the Elasticsearch Sink connector

  1. Create a connectors folder to store all Kafka connectors.

    # switch to your kafka folder
    cd kafka_2.13-3.1.0
    # create a connector folder to store all Kafka connectors
    mkdir connectors
    
  2. Navigate to the Elasticsearch Sink connector and click Download to download the archived binaries.

  3. Extract the file and copy the unzipped folder into the connectors directory.

Configure Kafka Connect

  1. Create a configuration file (named connect-sn-kop.properties).

    # switch to your kafka folder
    cd kafka_2.13-3.1.0
    # create a connect configuration file
    # which contains information of Kafka server (StreamNative KoP cluster)
    vim conf/connect-sn-kop.properties
    
  2. Add the following content to the connect-sn-kop.properties file.

    # add the information of StreamNative KoP cluster
    bootstrap.servers="SERVER-URL"
    
    sasl.login.callback.handler.class=io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler
    security.protocol=SASL_SSL
    sasl.mechanism=OAUTHBEARER
    sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule \
      required oauth.issuer.url="https://auth.streamnative.cloud/"\
      oauth.credentials.url="file://YOUR-KEY-FILE-PATH"\
      oauth.audience="YOUR-AUDIENCE-STRING";
    
    producer.sasl.login.callback.handler.class=io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler
    producer.security.protocol=SASL_SSL
    producer.sasl.mechanism=OAUTHBEARER
    producer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule \
      required oauth.issuer.url="https://auth.streamnative.cloud/"\
      oauth.credentials.url="file://YOUR-KEY-FILE-PATH"\
      oauth.audience="YOUR-AUDIENCE-STRING";
    
    consumer.sasl.login.callback.handler.class=io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler
    consumer.security.protocol=SASL_SSL
    consumer.sasl.mechanism=OAUTHBEARER
    consumer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule \
      required oauth.issuer.url="https://auth.streamnative.cloud/"\
      oauth.credentials.url="file://YOUR-KEY-FILE-PATH"\
      oauth.audience="YOUR-AUDIENCE-STRING";
    
    #Cluster level converters
    #These apply when the connectors don't define any converter
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    
    #JSON schemas enabled to false in cluster level
    key.converter.schemas.enable=true
    value.converter.schemas.enable=true
    
    #Where to keep the Connect topic offset configurations
    offset.storage.file.filename=/tmp/connect.offsets
    offset.flush.interval.ms=10000
    
    #Plugin path to put the connector binaries
    plugin.path=YOUR-FULL-PATH/connectors/
    
    • oauth.credentials.url: the path to your downloaded OAuth2 credential file.
    • bootstrap.servers: the Kafka service URL of your Pulsar cluster.
    • oauth.audience: the audience parameter is a combination of the urn:sn:pulsar, your organization name, and your Pulsar instance name.
    • key.converter and value.converter: the converter that sends JSON-format messages to Kafka.
    • plugin.path: the full path of the connectors folder that is created in the previous step.

    For details about the security of Kafka Connect, see the Confluent documentation.

Configure the Elasticsearch Sink connector

Add the following content to the elasticsearch-sink-connector.properties file.

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
# Topic name to get data from
topics=test-elasticsearch-sink
key.ignore=true
# The key converter for this connector
key.converter=org.apache.kafka.connect.storage.StringConverter
# The value converter for this connector
value.converter=org.apache.kafka.connect.json.JsonConverter
# Identify if the value contains a schema.
# Required value converter is `org.apache.kafka.connect.json.JsonConverter`.
value.converter.schemas.enable=false

schema.ignore=true
# Elasticsearch server url
connection.url=http://localhost:9200
type.name=kafka-connect

Run Kafka Connect

Open a new terminal and navigate to the Kafka folder, run the following command in the directory:

cd kafka_2.13-3.1.0
bin/connect-standalone.sh config/connect-sn-kop.properties config/elasticsearch-sink-connector.properties

You should see the following output:

...output omitted...
[2023-02-07 23:11:37,102] INFO [elasticsearch-sink|task-0] [Consumer clientId=connector-consumer-elasticsearch-sink-0, groupId=connect-elasticsearch-sink] Discovered group coordinator kopyhshen-broker-0-3d0a2d7c-2875-4caf-b74e-7d3260027a9a.gcp-shared-gcp-usce1-martin.streamnative.g.snio.cloud:9093 (id: 1865975191 rack: null) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:853)
[2023-02-07 23:11:37,106] INFO [elasticsearch-sink|task-0] [Consumer clientId=connector-consumer-elasticsearch-sink-0, groupId=connect-elasticsearch-sink] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:535)
[2023-02-07 23:11:42,233] INFO [elasticsearch-sink|task-0] [Consumer clientId=connector-consumer-elasticsearch-sink-0, groupId=connect-elasticsearch-sink] Successfully joined group with generation Generation{generationId=3, memberId='connector-consumer-elasticsearch-sink-0-1718e62d-b638-419b-8376-2ee14a19d23d', protocol='range'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:595)
[2023-02-07 23:11:42,237] INFO [elasticsearch-sink|task-0] [Consumer clientId=connector-consumer-elasticsearch-sink-0, groupId=connect-elasticsearch-sink] Finished assignment for group at generation 3: {connector-consumer-elasticsearch-sink-0-1718e62d-b638-419b-8376-2ee14a19d23d=Assignment(partitions=[test-elasticsearch-sink-0])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:652)

Index data on the Elasticsearch server

This example shows how to send five JSON-format messages to the test-elasticsearch-sink topic. These messages will be forwarded to the local Elasticsearch server.

  1. Start a Kafka producer and send five JSON-format messages to the test-elasticsearch-sink topic.

    echo '{"reporterId": 8824, "reportId": 10000, "content": "Was argued independent 2002 film, The Slaughter Rule.", "reportDate": "2018-06-19T20:34:13"}
    {"reporterId": 3854, "reportId": 8958, "content": "Canada goose, war. Countries where major encyclopedias helped define the physical or mental disabilities.", "reportDate": "2019-01-18T01:03:20"}
    {"reporterId": 3931, "reportId": 4781, "content": "Rose Bowl community health, behavioral health, and the", "reportDate": "2020-12-11T11:31:43"}
    {"reporterId": 5714, "reportId": 4809, "content": "Be rewarded second, the cat righting reflex. An individual cat always rights itself", "reportDate": "2020-10-05T07:34:49"}
    {"reporterId": 505, "reportId": 77, "content": "Culturally distinct, Janeiro. In spite of the crust is subducted", "reportDate": "2018-01-19T01:53:09"}' | ./bin/kafka-console-producer.sh \
        --bootstrap-server "your-pulsar-service-url" \
        --producer.config ./kafka.properties \
        --topic test-elasticsearch-sink
    

    If everything goes well, the command will exit normally and output nothing. The records are sent to an Elasticsearch index called test-elasticsearch-sink. The name of the index is the same as the name of the Kafka topic. By default, the Elasticsearch Sink connector creates the index with the same name.

    For details about how to configure the Kafka configuration properties file, see get started with Kafka protocol.

  2. Verify that data is indexed on the Elasticsearch server.

    curl 'http://localhost:9200/test-elasticsearch-sink/_search' | jq
    

    Ten records should be returned.

    {
      "took": 10,
      "timed_out": false,
      "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
      },
      "hits": {
        "total": {
          "value": 5,
          "relation": "eq"
        },
    ...output omitted...
          {
            "_index": "test-elasticsearch-sink",
            "_type": "_doc",
            "_id": "test-elasticsearch-sink+0+9",
            "_score": 1,
            "_source": {
              "reportId": 4781,
              "reportDate": "2020-12-11T11:31:43",
              "reporterId": 3931,
              "content": "Rose Bowl community health, behavioral health, and the"
            }
          }
        ]
      }
    }
    
  3. Check the record count in the test-elasticsearch-sink index.

    curl 'http://localhost:9200/test-elasticsearch-sink/_count'
    

    You should see the following output:

    {"count":5,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0}}
    

    You can see that the number of received records is identical to the number of sent messages.

  4. Verify the connectivity between Kafka and Pulsar by searching a keyword in the test-elasticsearch-sink index.

    This example shows how to search the keyword health in the test-elasticsearch-sink index.

    curl 'http://localhost:9200/test-elasticsearch-sink/_search?q=content:health' | jq
    

    You should see the following output:

    {
      "took": 9,
      "timed_out": false,
      "_shards": {
        "total": 1,
        "successful": 1,
        "skipped": 0,
        "failed": 0
      },
      "hits": {
        "total": {
          "value": 1,
          "relation": "eq"
        },
        "max_score": 3.2154756,
        "hits": [
          {
            "_index": "test-elasticsearch-sink",
            "_type": "_doc",
            "_id": "test-elasticsearch-sink+0+11",
            "_score": 3.2154756,
            "_source": {
              "reportId": 4781,
              "reportDate": "2020-12-11T11:31:43",
              "reporterId": 3931,
              "content": "Rose Bowl community health, behavioral health, and the"
            }
          }
        ]
      }
    }
    

    You can see that all records have the word health listed in their content field.

Previous
Kafka CLI