> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Connect to your cluster using Kafka Connect

<Note title="Note">
  * This QuickStart assumes that you have created a StreamNative cluster with the Kafka protocol enabled, created a service account, and granted the service account `produce` and `consume` permissions to a namespace for the target topic.
  * This QuickStart is developed on a MAC Operating System (OS) environment. If you choose another OS, the commands might vary.
</Note>

This document shows how to connect to your StreamNative cluster using [Kafka Connect](https://docs.confluent.io/platform/current/connect/index.html) with OAuth2 authentication.

## Before you begin

* Get the OAuth2 credential file.

  1. On the left navigation pane, click **Service Accounts**.
  2. In the row of the service account you want to use, in the **Key File** column, click the **Download** icon to download the OAuth2 credential file to your local directory.

* Get the service URL of your StreamNative cluster.

  1. On the left navigation pane, in the **Admin** area, click **Pulsar Clusters**.
  2. Select the **Details** tab, and in the **Access Points** area, click **Copy** at the end of the row of the **Kafka Service URL (TCP)**.

- Install a recent version of [Docker](https://docs.docker.com/get-docker/).
- [Download the `jq` CLI JSON processor](https://stedolan.github.io/jq/download/), which provides a JSON-format output.

## Configure Elasticsearch

1. Open a terminal, and run the command below to start an Elasticsearch instance with ports `9200` and `9300`.

   ```bash theme={null}
   docker run --name elastic-1 \
     -p 9200:9200 -p 9300:9300 -it \
     -e "discovery.type=single-node" \
     -e "xpack.security.enabled=false" \
     docker.elastic.co/elasticsearch/elasticsearch:7.17.2
   ```

   You should see the following output:

   ```bash theme={null}
   ...output omitted...
   {"@timestamp":"2022-04-15T15:13:17.067Z", "log.level": "INFO", "message":"successfully loaded geoip database file [GeoLite2-Country.mmdb]", "ecs.version": "1.2.0","service.name":"ES_ECS","event.dataset":"elasticsearch.server","process.thread.name":"elasticsearch[efb7b3360ba3][generic][T#7]","log.logger":"org.elasticsearch.ingest.geoip.DatabaseNodeService","elasticsearch.cluster.uuid":"ocHgh5mAQROAlUofYHE3Cg","elasticsearch.node.id":"0aWiWmaBTgC0vdp6Zw_ZnQ","elasticsearch.node.name":"efb7b3360ba3","elasticsearch.cluster.name":"docker-cluster"}
   {"@timestamp":"2022-04-15T15:13:17.118Z", "log.level": "INFO", "message":"successfully loaded geoip database file [GeoLite2-City.mmdb]", "ecs.version": "1.2.0","service.name":"ES_ECS","event.dataset":"elasticsearch.server","process.thread.name":"elasticsearch[efb7b3360ba3][generic][T#13]","log.logger":"org.elasticsearch.ingest.geoip.DatabaseNodeService","elasticsearch.cluster.uuid":"ocHgh5mAQROAlUofYHE3Cg","elasticsearch.node.id":"0aWiWmaBTgC0vdp6Zw_ZnQ","elasticsearch.node.name":"efb7b3360ba3","elasticsearch.cluster.name":"docker-cluster"}
   ```

2. Verify that the Elasticsearch instance is started successfully.

   ```bash theme={null}
   curl 'http://localhost:9200'
   ```

   You should see the following output:

   ```json theme={null}
   {
     "name": "eaf2be7fe2d6",
     "cluster_name": "docker-cluster",
     "cluster_uuid": "jolOS3_VRGq2-LpZbkT1kw",
     "version": {
       "number": "7.17.2",
       "build_flavor": "default",
       "build_type": "docker",
       "build_hash": "de7261de50d90919ae53b0eff9413fd7e5307301",
       "build_date": "2022-03-28T15:12:21.446567561Z",
       "build_snapshot": false,
       "lucene_version": "8.11.1",
       "minimum_wire_compatibility_version": "6.8.0",
       "minimum_index_compatibility_version": "6.0.0-beta1"
     },
     "tagline": "You Know, for Search"
   }
   ```

## Configure Kafka Connect

[Kafka Connect](https://docs.confluent.io/platform/current/connect/index.html) is an integration tool that is released with the Apache Kafka project. It provides reliable data streaming between Apache Kafka and external systems and is both scalable and flexible. Kafka Connect works with Kafka on Pulsar (KoP), which is compatible with the Kafka API.

Kafka Connect uses Source and Sink connectors for integration. Source connectors stream data from an external system to Kafka, while Sink connectors stream data from Kafka to an external system.

<img src="https://mintcdn.com/streamnative/tIZ04bis3aV5g7je/media/kop_and_elasticsearch_with_kafka_connect.png?fit=max&auto=format&n=tIZ04bis3aV5g7je&q=85&s=abd8eb48bfcae44bcde80ae6e894962d" alt="Pulsar KoP and elasticsearch with kafka connect" width="720" height="472" data-path="media/kop_and_elasticsearch_with_kafka_connect.png" />

1. Navigate to the [Apache downloads page for Kafka](https://www.apache.org/dyn/closer.cgi?path=/kafka/3.1.0/kafka_2.13-3.1.0.tgz), and click the suggested download link for the Kafka 3.1.0 binary package.

2. Extract the Kafka binaries folder in the `_YOUR_HOME_DIRECTORY_/kafka_2.13-3.1.0` directory that you created earlier.

### Download the StreamNative OAuth dependency

```bash theme={null}
# download supplementary libraries
curl -O https://repo1.maven.org/maven2/io/streamnative/pulsar/handlers/oauth-client/3.1.0.1/oauth-client-3.1.0.1.jar --output-dir ./libs
```

### Download the Elasticsearch Sink connector

1. Create a `connectors` folder to store all Kafka connectors.

   ```bash theme={null}
   # switch to your kafka folder
   cd kafka_2.13-3.1.0
   # create a connector folder to store all Kafka connectors
   mkdir connectors
   ```

2. Navigate to the [Elasticsearch Sink connector](https://www.confluent.io/hub/confluentinc/kafka-connect-elasticsearch) and click **Download** to download the archived binaries.

3. Extract the file and copy the unzipped folder into the `connectors` directory.

### Configure Kafka Connect

1. Create a configuration file (named `connect-sn-kop.properties`).

   ```bash theme={null}
   # switch to your kafka folder
   cd kafka_2.13-3.1.0
   # create a connect configuration file
   # which contains information of Kafka server (StreamNative KoP cluster)
   vim conf/connect-sn-kop.properties
   ```

2. Add the following content to the `connect-sn-kop.properties` file.

   ```conf theme={null}
   # add the information of StreamNative KoP cluster
   bootstrap.servers="SERVER-URL"

   sasl.login.callback.handler.class=io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler
   security.protocol=SASL_SSL
   sasl.mechanism=OAUTHBEARER
   sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule \
     required oauth.issuer.url="https://auth.streamnative.cloud/"\
     oauth.credentials.url="file://YOUR-KEY-FILE-PATH"\
     oauth.audience="YOUR-AUDIENCE-STRING";

   producer.sasl.login.callback.handler.class=io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler
   producer.security.protocol=SASL_SSL
   producer.sasl.mechanism=OAUTHBEARER
   producer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule \
     required oauth.issuer.url="https://auth.streamnative.cloud/"\
     oauth.credentials.url="file://YOUR-KEY-FILE-PATH"\
     oauth.audience="YOUR-AUDIENCE-STRING";

   consumer.sasl.login.callback.handler.class=io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler
   consumer.security.protocol=SASL_SSL
   consumer.sasl.mechanism=OAUTHBEARER
   consumer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule \
     required oauth.issuer.url="https://auth.streamnative.cloud/"\
     oauth.credentials.url="file://YOUR-KEY-FILE-PATH"\
     oauth.audience="YOUR-AUDIENCE-STRING";

   #Cluster level converters
   #These apply when the connectors don't define any converter
   key.converter=org.apache.kafka.connect.json.JsonConverter
   value.converter=org.apache.kafka.connect.json.JsonConverter

   #JSON schemas enabled to false in cluster level
   key.converter.schemas.enable=true
   value.converter.schemas.enable=true

   #Where to keep the Connect topic offset configurations
   offset.storage.file.filename=/tmp/connect.offsets
   offset.flush.interval.ms=10000

   #Plugin path to put the connector binaries
   plugin.path=YOUR-FULL-PATH/connectors/
   ```

   * `oauth.credentials.url`: the path to your downloaded OAuth2 credential file.
   * `bootstrap.servers`: the Kafka service URL of your StreamNative cluster.
   * `oauth.audience`: the `audience` parameter is a combination of the `urn:sn:pulsar`, your organization name, and your Pulsar instance name.
   * `key.converter` and `value.converter`: the converter that sends JSON-format messages to Kafka.
   * `plugin.path`: the full path of the `connectors` folder that is created in the previous step.

   For details about the security of Kafka Connect, see the [Confluent documentation](https://docs.confluent.io/platform/current/kafka/authentication_sasl/authentication_sasl_scram.html#kconnect-long).

### Configure the Elasticsearch Sink connector

Add the following content to the `elasticsearch-sink-connector.properties` file.

```conf theme={null}
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
# Topic name to get data from
topics=test-elasticsearch-sink
key.ignore=true
# The key converter for this connector
key.converter=org.apache.kafka.connect.storage.StringConverter
# The value converter for this connector
value.converter=org.apache.kafka.connect.json.JsonConverter
# Identify if the value contains a schema.
# Required value converter is `org.apache.kafka.connect.json.JsonConverter`.
value.converter.schemas.enable=false

schema.ignore=true
# Elasticsearch server url
connection.url=http://localhost:9200
type.name=kafka-connect
```

### Run Kafka Connect

Open a new terminal and navigate to the Kafka folder, run the following command in the directory:

```bash theme={null}
cd kafka_2.13-3.1.0
bin/connect-standalone.sh config/connect-sn-kop.properties config/elasticsearch-sink-connector.properties
```

You should see the following output:

```bash theme={null}
...output omitted...
[2023-02-07 23:11:37,102] INFO [elasticsearch-sink|task-0] [Consumer clientId=connector-consumer-elasticsearch-sink-0, groupId=connect-elasticsearch-sink] Discovered group coordinator kopyhshen-broker-0-3d0a2d7c-2875-4caf-b74e-7d3260027a9a.gcp-shared-gcp-usce1-martin.streamnative.g.snio.cloud:9093 (id: 1865975191 rack: null) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:853)
[2023-02-07 23:11:37,106] INFO [elasticsearch-sink|task-0] [Consumer clientId=connector-consumer-elasticsearch-sink-0, groupId=connect-elasticsearch-sink] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:535)
[2023-02-07 23:11:42,233] INFO [elasticsearch-sink|task-0] [Consumer clientId=connector-consumer-elasticsearch-sink-0, groupId=connect-elasticsearch-sink] Successfully joined group with generation Generation{generationId=3, memberId='connector-consumer-elasticsearch-sink-0-1718e62d-b638-419b-8376-2ee14a19d23d', protocol='range'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:595)
[2023-02-07 23:11:42,237] INFO [elasticsearch-sink|task-0] [Consumer clientId=connector-consumer-elasticsearch-sink-0, groupId=connect-elasticsearch-sink] Finished assignment for group at generation 3: {connector-consumer-elasticsearch-sink-0-1718e62d-b638-419b-8376-2ee14a19d23d=Assignment(partitions=[test-elasticsearch-sink-0])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:652)
```

## Index data on the Elasticsearch server

This example shows how to send five JSON-format messages to the `test-elasticsearch-sink` topic. These messages will be forwarded to the local Elasticsearch server.

1. Start a Kafka producer and send five JSON-format messages to the `test-elasticsearch-sink` topic.

   ```bash theme={null}
   echo '{"reporterId": 8824, "reportId": 10000, "content": "Was argued independent 2002 film, The Slaughter Rule.", "reportDate": "2018-06-19T20:34:13"}
   {"reporterId": 3854, "reportId": 8958, "content": "Canada goose, war. Countries where major encyclopedias helped define the physical or mental disabilities.", "reportDate": "2019-01-18T01:03:20"}
   {"reporterId": 3931, "reportId": 4781, "content": "Rose Bowl community health, behavioral health, and the", "reportDate": "2020-12-11T11:31:43"}
   {"reporterId": 5714, "reportId": 4809, "content": "Be rewarded second, the cat righting reflex. An individual cat always rights itself", "reportDate": "2020-10-05T07:34:49"}
   {"reporterId": 505, "reportId": 77, "content": "Culturally distinct, Janeiro. In spite of the crust is subducted", "reportDate": "2018-01-19T01:53:09"}' | ./bin/kafka-console-producer.sh \
       --bootstrap-server "your-pulsar-service-url" \
       --producer.config ./kafka.properties \
       --topic test-elasticsearch-sink
   ```

   If everything goes well, the command will exit normally and output nothing. The records are sent to an Elasticsearch index called `test-elasticsearch-sink`. The name of the index is the same as the name of the Kafka topic. By default, the Elasticsearch Sink connector creates the index with the same name.

   For details about how to configure the Kafka configuration properties file, see [get started with Kafka protocol](/cloud/get-started/quickstart-kafka).

2. Verify that data is indexed on the Elasticsearch server.

   ```bash theme={null}
   curl 'http://localhost:9200/test-elasticsearch-sink/_search' | jq
   ```

   Ten records should be returned.

   ```json theme={null}
   {
     "took": 10,
     "timed_out": false,
     "_shards": {
       "total": 1,
       "successful": 1,
       "skipped": 0,
       "failed": 0
     },
     "hits": {
       "total": {
         "value": 5,
         "relation": "eq"
       },
   ...output omitted...
         {
           "_index": "test-elasticsearch-sink",
           "_type": "_doc",
           "_id": "test-elasticsearch-sink+0+9",
           "_score": 1,
           "_source": {
             "reportId": 4781,
             "reportDate": "2020-12-11T11:31:43",
             "reporterId": 3931,
             "content": "Rose Bowl community health, behavioral health, and the"
           }
         }
       ]
     }
   }
   ```

3. Check the record count in the `test-elasticsearch-sink` index.

   ```bash theme={null}
   curl 'http://localhost:9200/test-elasticsearch-sink/_count'
   ```

   You should see the following output:

   ```
   {"count":5,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0}}
   ```

   You can see that the number of received records is identical to the number of sent messages.

4. Verify the connectivity between Kafka and Pulsar by searching a keyword in the `test-elasticsearch-sink` index.

   This example shows how to search the keyword `health` in the `test-elasticsearch-sink` index.

   ```bash theme={null}
   curl 'http://localhost:9200/test-elasticsearch-sink/_search?q=content:health' | jq
   ```

   You should see the following output:

   ```json theme={null}
   {
     "took": 9,
     "timed_out": false,
     "_shards": {
       "total": 1,
       "successful": 1,
       "skipped": 0,
       "failed": 0
     },
     "hits": {
       "total": {
         "value": 1,
         "relation": "eq"
       },
       "max_score": 3.2154756,
       "hits": [
         {
           "_index": "test-elasticsearch-sink",
           "_type": "_doc",
           "_id": "test-elasticsearch-sink+0+11",
           "_score": 3.2154756,
           "_source": {
             "reportId": 4781,
             "reportDate": "2020-12-11T11:31:43",
             "reporterId": 3931,
             "content": "Rose Bowl community health, behavioral health, and the"
           }
         }
       ]
     }
   }
   ```

   You can see that all records have the word `health` listed in their `content` field.
