- Connect to External Systems
- Self-Host Connectors
Connect to your cluster using Kafka Connect
Note
- This QuickStart assumes that you have created a StreamNative cluster with the Kafka protocol enabled, created a service account, and granted the service account
produce
andconsume
permissions to a namespace for the target topic. - This QuickStart is developed on a MAC Operating System (OS) environment. If you choose another OS, the commands might vary.
This document shows how to connect to your StreamNative cluster using Kafka Connect with OAuth2 authentication.
Before you begin
Get the OAuth2 credential file.
- On the left navigation pane, click Service Accounts.
- In the row of the service account you want to use, in the Key File column, click the Download icon to download the OAuth2 credential file to your local directory.
Get the service URL of your StreamNative cluster.
- On the left navigation pane, in the Admin area, click Pulsar Clusters.
- Select the Details tab, and in the Access Points area, click Copy at the end of the row of the Kafka Service URL (TCP).
- Install a recent version of Docker.
- Download the
jq
CLI JSON processor, which provides a JSON-format output.
Configure Elasticsearch
Open a terminal, and run the command below to start an Elasticsearch instance with ports
9200
and9300
.docker run --name elastic-1 \ -p 9200:9200 -p 9300:9300 -it \ -e "discovery.type=single-node" \ -e "xpack.security.enabled=false" \ docker.elastic.co/elasticsearch/elasticsearch:7.17.2
You should see the following output:
...output omitted... {"@timestamp":"2022-04-15T15:13:17.067Z", "log.level": "INFO", "message":"successfully loaded geoip database file [GeoLite2-Country.mmdb]", "ecs.version": "1.2.0","service.name":"ES_ECS","event.dataset":"elasticsearch.server","process.thread.name":"elasticsearch[efb7b3360ba3][generic][T#7]","log.logger":"org.elasticsearch.ingest.geoip.DatabaseNodeService","elasticsearch.cluster.uuid":"ocHgh5mAQROAlUofYHE3Cg","elasticsearch.node.id":"0aWiWmaBTgC0vdp6Zw_ZnQ","elasticsearch.node.name":"efb7b3360ba3","elasticsearch.cluster.name":"docker-cluster"} {"@timestamp":"2022-04-15T15:13:17.118Z", "log.level": "INFO", "message":"successfully loaded geoip database file [GeoLite2-City.mmdb]", "ecs.version": "1.2.0","service.name":"ES_ECS","event.dataset":"elasticsearch.server","process.thread.name":"elasticsearch[efb7b3360ba3][generic][T#13]","log.logger":"org.elasticsearch.ingest.geoip.DatabaseNodeService","elasticsearch.cluster.uuid":"ocHgh5mAQROAlUofYHE3Cg","elasticsearch.node.id":"0aWiWmaBTgC0vdp6Zw_ZnQ","elasticsearch.node.name":"efb7b3360ba3","elasticsearch.cluster.name":"docker-cluster"}
Verify that the Elasticsearch instance is started successfully.
curl 'http://localhost:9200'
You should see the following output:
{ "name": "eaf2be7fe2d6", "cluster_name": "docker-cluster", "cluster_uuid": "jolOS3_VRGq2-LpZbkT1kw", "version": { "number": "7.17.2", "build_flavor": "default", "build_type": "docker", "build_hash": "de7261de50d90919ae53b0eff9413fd7e5307301", "build_date": "2022-03-28T15:12:21.446567561Z", "build_snapshot": false, "lucene_version": "8.11.1", "minimum_wire_compatibility_version": "6.8.0", "minimum_index_compatibility_version": "6.0.0-beta1" }, "tagline": "You Know, for Search" }
Configure Kafka Connect
Kafka Connect is an integration tool that is released with the Apache Kafka project. It provides reliable data streaming between Apache Kafka and external systems and is both scalable and flexible. Kafka Connect works with Kafka on Pulsar (KoP), which is compatible with the Kafka API.
Kafka Connect uses Source and Sink connectors for integration. Source connectors stream data from an external system to Kafka, while Sink connectors stream data from Kafka to an external system.
Navigate to the Apache downloads page for Kafka, and click the suggested download link for the Kafka 3.1.0 binary package.
Extract the Kafka binaries folder in the
_YOUR_HOME_DIRECTORY_/kafka_2.13-3.1.0
directory that you created earlier.
Download the StreamNative OAuth dependency
# download supplementary libraries
curl -O https://repo1.maven.org/maven2/io/streamnative/pulsar/handlers/oauth-client/3.1.0.1/oauth-client-3.1.0.1.jar --output-dir ./libs
Download the Elasticsearch Sink connector
Create a
connectors
folder to store all Kafka connectors.# switch to your kafka folder cd kafka_2.13-3.1.0 # create a connector folder to store all Kafka connectors mkdir connectors
Navigate to the Elasticsearch Sink connector and click Download to download the archived binaries.
Extract the file and copy the unzipped folder into the
connectors
directory.
Configure Kafka Connect
Create a configuration file (named
connect-sn-kop.properties
).# switch to your kafka folder cd kafka_2.13-3.1.0 # create a connect configuration file # which contains information of Kafka server (StreamNative KoP cluster) vim conf/connect-sn-kop.properties
Add the following content to the
connect-sn-kop.properties
file.# add the information of StreamNative KoP cluster bootstrap.servers="SERVER-URL" sasl.login.callback.handler.class=io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler security.protocol=SASL_SSL sasl.mechanism=OAUTHBEARER sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule \ required oauth.issuer.url="https://auth.streamnative.cloud/"\ oauth.credentials.url="file://YOUR-KEY-FILE-PATH"\ oauth.audience="YOUR-AUDIENCE-STRING"; producer.sasl.login.callback.handler.class=io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler producer.security.protocol=SASL_SSL producer.sasl.mechanism=OAUTHBEARER producer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule \ required oauth.issuer.url="https://auth.streamnative.cloud/"\ oauth.credentials.url="file://YOUR-KEY-FILE-PATH"\ oauth.audience="YOUR-AUDIENCE-STRING"; consumer.sasl.login.callback.handler.class=io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler consumer.security.protocol=SASL_SSL consumer.sasl.mechanism=OAUTHBEARER consumer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule \ required oauth.issuer.url="https://auth.streamnative.cloud/"\ oauth.credentials.url="file://YOUR-KEY-FILE-PATH"\ oauth.audience="YOUR-AUDIENCE-STRING"; #Cluster level converters #These apply when the connectors don't define any converter key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter #JSON schemas enabled to false in cluster level key.converter.schemas.enable=true value.converter.schemas.enable=true #Where to keep the Connect topic offset configurations offset.storage.file.filename=/tmp/connect.offsets offset.flush.interval.ms=10000 #Plugin path to put the connector binaries plugin.path=YOUR-FULL-PATH/connectors/
oauth.credentials.url
: the path to your downloaded OAuth2 credential file.bootstrap.servers
: the Kafka service URL of your StreamNative cluster.oauth.audience
: theaudience
parameter is a combination of theurn:sn:pulsar
, your organization name, and your Pulsar instance name.key.converter
andvalue.converter
: the converter that sends JSON-format messages to Kafka.plugin.path
: the full path of theconnectors
folder that is created in the previous step.
For details about the security of Kafka Connect, see the Confluent documentation.
Configure the Elasticsearch Sink connector
Add the following content to the elasticsearch-sink-connector.properties
file.
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
# Topic name to get data from
topics=test-elasticsearch-sink
key.ignore=true
# The key converter for this connector
key.converter=org.apache.kafka.connect.storage.StringConverter
# The value converter for this connector
value.converter=org.apache.kafka.connect.json.JsonConverter
# Identify if the value contains a schema.
# Required value converter is `org.apache.kafka.connect.json.JsonConverter`.
value.converter.schemas.enable=false
schema.ignore=true
# Elasticsearch server url
connection.url=http://localhost:9200
type.name=kafka-connect
Run Kafka Connect
Open a new terminal and navigate to the Kafka folder, run the following command in the directory:
cd kafka_2.13-3.1.0
bin/connect-standalone.sh config/connect-sn-kop.properties config/elasticsearch-sink-connector.properties
You should see the following output:
...output omitted...
[2023-02-07 23:11:37,102] INFO [elasticsearch-sink|task-0] [Consumer clientId=connector-consumer-elasticsearch-sink-0, groupId=connect-elasticsearch-sink] Discovered group coordinator kopyhshen-broker-0-3d0a2d7c-2875-4caf-b74e-7d3260027a9a.gcp-shared-gcp-usce1-martin.streamnative.g.snio.cloud:9093 (id: 1865975191 rack: null) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:853)
[2023-02-07 23:11:37,106] INFO [elasticsearch-sink|task-0] [Consumer clientId=connector-consumer-elasticsearch-sink-0, groupId=connect-elasticsearch-sink] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:535)
[2023-02-07 23:11:42,233] INFO [elasticsearch-sink|task-0] [Consumer clientId=connector-consumer-elasticsearch-sink-0, groupId=connect-elasticsearch-sink] Successfully joined group with generation Generation{generationId=3, memberId='connector-consumer-elasticsearch-sink-0-1718e62d-b638-419b-8376-2ee14a19d23d', protocol='range'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:595)
[2023-02-07 23:11:42,237] INFO [elasticsearch-sink|task-0] [Consumer clientId=connector-consumer-elasticsearch-sink-0, groupId=connect-elasticsearch-sink] Finished assignment for group at generation 3: {connector-consumer-elasticsearch-sink-0-1718e62d-b638-419b-8376-2ee14a19d23d=Assignment(partitions=[test-elasticsearch-sink-0])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:652)
Index data on the Elasticsearch server
This example shows how to send five JSON-format messages to the test-elasticsearch-sink
topic. These messages will be forwarded to the local Elasticsearch server.
Start a Kafka producer and send five JSON-format messages to the
test-elasticsearch-sink
topic.echo '{"reporterId": 8824, "reportId": 10000, "content": "Was argued independent 2002 film, The Slaughter Rule.", "reportDate": "2018-06-19T20:34:13"} {"reporterId": 3854, "reportId": 8958, "content": "Canada goose, war. Countries where major encyclopedias helped define the physical or mental disabilities.", "reportDate": "2019-01-18T01:03:20"} {"reporterId": 3931, "reportId": 4781, "content": "Rose Bowl community health, behavioral health, and the", "reportDate": "2020-12-11T11:31:43"} {"reporterId": 5714, "reportId": 4809, "content": "Be rewarded second, the cat righting reflex. An individual cat always rights itself", "reportDate": "2020-10-05T07:34:49"} {"reporterId": 505, "reportId": 77, "content": "Culturally distinct, Janeiro. In spite of the crust is subducted", "reportDate": "2018-01-19T01:53:09"}' | ./bin/kafka-console-producer.sh \ --bootstrap-server "your-pulsar-service-url" \ --producer.config ./kafka.properties \ --topic test-elasticsearch-sink
If everything goes well, the command will exit normally and output nothing. The records are sent to an Elasticsearch index called
test-elasticsearch-sink
. The name of the index is the same as the name of the Kafka topic. By default, the Elasticsearch Sink connector creates the index with the same name.For details about how to configure the Kafka configuration properties file, see get started with Kafka protocol.
Verify that data is indexed on the Elasticsearch server.
curl 'http://localhost:9200/test-elasticsearch-sink/_search' | jq
Ten records should be returned.
{ "took": 10, "timed_out": false, "_shards": { "total": 1, "successful": 1, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 5, "relation": "eq" }, ...output omitted... { "_index": "test-elasticsearch-sink", "_type": "_doc", "_id": "test-elasticsearch-sink+0+9", "_score": 1, "_source": { "reportId": 4781, "reportDate": "2020-12-11T11:31:43", "reporterId": 3931, "content": "Rose Bowl community health, behavioral health, and the" } } ] } }
Check the record count in the
test-elasticsearch-sink
index.curl 'http://localhost:9200/test-elasticsearch-sink/_count'
You should see the following output:
{"count":5,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0}}
You can see that the number of received records is identical to the number of sent messages.
Verify the connectivity between Kafka and Pulsar by searching a keyword in the
test-elasticsearch-sink
index.This example shows how to search the keyword
health
in thetest-elasticsearch-sink
index.curl 'http://localhost:9200/test-elasticsearch-sink/_search?q=content:health' | jq
You should see the following output:
{ "took": 9, "timed_out": false, "_shards": { "total": 1, "successful": 1, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 1, "relation": "eq" }, "max_score": 3.2154756, "hits": [ { "_index": "test-elasticsearch-sink", "_type": "_doc", "_id": "test-elasticsearch-sink+0+11", "_score": 3.2154756, "_source": { "reportId": 4781, "reportDate": "2020-12-11T11:31:43", "reporterId": 3931, "content": "Rose Bowl community health, behavioral health, and the" } } ] } }
You can see that all records have the word
health
listed in theircontent
field.