- This QuickStart assumes that you have created a StreamNative cluster with the Kafka protocol enabled, created a service account, and granted the service account
produce
and consume
permissions to a namespace for the target topic.
- This QuickStart is developed on a MAC Operating System (OS) environment. If you choose another OS, the commands might vary.
This document shows how to connect to your StreamNative cluster using Kafka Connect with OAuth2 authentication.
Before you begin
-
Open a terminal, and run the command below to start an Elasticsearch instance with ports 9200
and 9300
.
docker run --name elastic-1 \
-p 9200:9200 -p 9300:9300 -it \
-e "discovery.type=single-node" \
-e "xpack.security.enabled=false" \
docker.elastic.co/elasticsearch/elasticsearch:7.17.2
You should see the following output:
...output omitted...
{"@timestamp":"2022-04-15T15:13:17.067Z", "log.level": "INFO", "message":"successfully loaded geoip database file [GeoLite2-Country.mmdb]", "ecs.version": "1.2.0","service.name":"ES_ECS","event.dataset":"elasticsearch.server","process.thread.name":"elasticsearch[efb7b3360ba3][generic][T#7]","log.logger":"org.elasticsearch.ingest.geoip.DatabaseNodeService","elasticsearch.cluster.uuid":"ocHgh5mAQROAlUofYHE3Cg","elasticsearch.node.id":"0aWiWmaBTgC0vdp6Zw_ZnQ","elasticsearch.node.name":"efb7b3360ba3","elasticsearch.cluster.name":"docker-cluster"}
{"@timestamp":"2022-04-15T15:13:17.118Z", "log.level": "INFO", "message":"successfully loaded geoip database file [GeoLite2-City.mmdb]", "ecs.version": "1.2.0","service.name":"ES_ECS","event.dataset":"elasticsearch.server","process.thread.name":"elasticsearch[efb7b3360ba3][generic][T#13]","log.logger":"org.elasticsearch.ingest.geoip.DatabaseNodeService","elasticsearch.cluster.uuid":"ocHgh5mAQROAlUofYHE3Cg","elasticsearch.node.id":"0aWiWmaBTgC0vdp6Zw_ZnQ","elasticsearch.node.name":"efb7b3360ba3","elasticsearch.cluster.name":"docker-cluster"}
-
Verify that the Elasticsearch instance is started successfully.
curl 'http://localhost:9200'
You should see the following output:
{
"name": "eaf2be7fe2d6",
"cluster_name": "docker-cluster",
"cluster_uuid": "jolOS3_VRGq2-LpZbkT1kw",
"version": {
"number": "7.17.2",
"build_flavor": "default",
"build_type": "docker",
"build_hash": "de7261de50d90919ae53b0eff9413fd7e5307301",
"build_date": "2022-03-28T15:12:21.446567561Z",
"build_snapshot": false,
"lucene_version": "8.11.1",
"minimum_wire_compatibility_version": "6.8.0",
"minimum_index_compatibility_version": "6.0.0-beta1"
},
"tagline": "You Know, for Search"
}
Kafka Connect is an integration tool that is released with the Apache Kafka project. It provides reliable data streaming between Apache Kafka and external systems and is both scalable and flexible. Kafka Connect works with Kafka on Pulsar (KoP), which is compatible with the Kafka API.
Kafka Connect uses Source and Sink connectors for integration. Source connectors stream data from an external system to Kafka, while Sink connectors stream data from Kafka to an external system.
-
Navigate to the Apache downloads page for Kafka, and click the suggested download link for the Kafka 3.1.0 binary package.
-
Extract the Kafka binaries folder in the _YOUR_HOME_DIRECTORY_/kafka_2.13-3.1.0
directory that you created earlier.
Download the StreamNative OAuth dependency
# download supplementary libraries
curl -O https://repo1.maven.org/maven2/io/streamnative/pulsar/handlers/oauth-client/3.1.0.1/oauth-client-3.1.0.1.jar --output-dir ./libs
Download the Elasticsearch Sink connector
-
Create a connectors
folder to store all Kafka connectors.
# switch to your kafka folder
cd kafka_2.13-3.1.0
# create a connector folder to store all Kafka connectors
mkdir connectors
-
Navigate to the Elasticsearch Sink connector and click Download to download the archived binaries.
-
Extract the file and copy the unzipped folder into the connectors
directory.
-
Create a configuration file (named connect-sn-kop.properties
).
# switch to your kafka folder
cd kafka_2.13-3.1.0
# create a connect configuration file
# which contains information of Kafka server (StreamNative KoP cluster)
vim conf/connect-sn-kop.properties
-
Add the following content to the connect-sn-kop.properties
file.
# add the information of StreamNative KoP cluster
bootstrap.servers="SERVER-URL"
sasl.login.callback.handler.class=io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule \
required oauth.issuer.url="https://auth.streamnative.cloud/"\
oauth.credentials.url="file://YOUR-KEY-FILE-PATH"\
oauth.audience="YOUR-AUDIENCE-STRING";
producer.sasl.login.callback.handler.class=io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=OAUTHBEARER
producer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule \
required oauth.issuer.url="https://auth.streamnative.cloud/"\
oauth.credentials.url="file://YOUR-KEY-FILE-PATH"\
oauth.audience="YOUR-AUDIENCE-STRING";
consumer.sasl.login.callback.handler.class=io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=OAUTHBEARER
consumer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule \
required oauth.issuer.url="https://auth.streamnative.cloud/"\
oauth.credentials.url="file://YOUR-KEY-FILE-PATH"\
oauth.audience="YOUR-AUDIENCE-STRING";
#Cluster level converters
#These apply when the connectors don't define any converter
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
#JSON schemas enabled to false in cluster level
key.converter.schemas.enable=true
value.converter.schemas.enable=true
#Where to keep the Connect topic offset configurations
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
#Plugin path to put the connector binaries
plugin.path=YOUR-FULL-PATH/connectors/
oauth.credentials.url
: the path to your downloaded OAuth2 credential file.
bootstrap.servers
: the Kafka service URL of your StreamNative cluster.
oauth.audience
: the audience
parameter is a combination of the urn:sn:pulsar
, your organization name, and your Pulsar instance name.
key.converter
and value.converter
: the converter that sends JSON-format messages to Kafka.
plugin.path
: the full path of the connectors
folder that is created in the previous step.
For details about the security of Kafka Connect, see the Confluent documentation.
Add the following content to the elasticsearch-sink-connector.properties
file.
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
# Topic name to get data from
topics=test-elasticsearch-sink
key.ignore=true
# The key converter for this connector
key.converter=org.apache.kafka.connect.storage.StringConverter
# The value converter for this connector
value.converter=org.apache.kafka.connect.json.JsonConverter
# Identify if the value contains a schema.
# Required value converter is `org.apache.kafka.connect.json.JsonConverter`.
value.converter.schemas.enable=false
schema.ignore=true
# Elasticsearch server url
connection.url=http://localhost:9200
type.name=kafka-connect
Run Kafka Connect
Open a new terminal and navigate to the Kafka folder, run the following command in the directory:
cd kafka_2.13-3.1.0
bin/connect-standalone.sh config/connect-sn-kop.properties config/elasticsearch-sink-connector.properties
You should see the following output:
...output omitted...
[2023-02-07 23:11:37,102] INFO [elasticsearch-sink|task-0] [Consumer clientId=connector-consumer-elasticsearch-sink-0, groupId=connect-elasticsearch-sink] Discovered group coordinator kopyhshen-broker-0-3d0a2d7c-2875-4caf-b74e-7d3260027a9a.gcp-shared-gcp-usce1-martin.streamnative.g.snio.cloud:9093 (id: 1865975191 rack: null) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:853)
[2023-02-07 23:11:37,106] INFO [elasticsearch-sink|task-0] [Consumer clientId=connector-consumer-elasticsearch-sink-0, groupId=connect-elasticsearch-sink] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:535)
[2023-02-07 23:11:42,233] INFO [elasticsearch-sink|task-0] [Consumer clientId=connector-consumer-elasticsearch-sink-0, groupId=connect-elasticsearch-sink] Successfully joined group with generation Generation{generationId=3, memberId='connector-consumer-elasticsearch-sink-0-1718e62d-b638-419b-8376-2ee14a19d23d', protocol='range'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:595)
[2023-02-07 23:11:42,237] INFO [elasticsearch-sink|task-0] [Consumer clientId=connector-consumer-elasticsearch-sink-0, groupId=connect-elasticsearch-sink] Finished assignment for group at generation 3: {connector-consumer-elasticsearch-sink-0-1718e62d-b638-419b-8376-2ee14a19d23d=Assignment(partitions=[test-elasticsearch-sink-0])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:652)
Index data on the Elasticsearch server
This example shows how to send five JSON-format messages to the test-elasticsearch-sink
topic. These messages will be forwarded to the local Elasticsearch server.
-
Start a Kafka producer and send five JSON-format messages to the test-elasticsearch-sink
topic.
echo '{"reporterId": 8824, "reportId": 10000, "content": "Was argued independent 2002 film, The Slaughter Rule.", "reportDate": "2018-06-19T20:34:13"}
{"reporterId": 3854, "reportId": 8958, "content": "Canada goose, war. Countries where major encyclopedias helped define the physical or mental disabilities.", "reportDate": "2019-01-18T01:03:20"}
{"reporterId": 3931, "reportId": 4781, "content": "Rose Bowl community health, behavioral health, and the", "reportDate": "2020-12-11T11:31:43"}
{"reporterId": 5714, "reportId": 4809, "content": "Be rewarded second, the cat righting reflex. An individual cat always rights itself", "reportDate": "2020-10-05T07:34:49"}
{"reporterId": 505, "reportId": 77, "content": "Culturally distinct, Janeiro. In spite of the crust is subducted", "reportDate": "2018-01-19T01:53:09"}' | ./bin/kafka-console-producer.sh \
--bootstrap-server "your-pulsar-service-url" \
--producer.config ./kafka.properties \
--topic test-elasticsearch-sink
If everything goes well, the command will exit normally and output nothing. The records are sent to an Elasticsearch index called test-elasticsearch-sink
. The name of the index is the same as the name of the Kafka topic. By default, the Elasticsearch Sink connector creates the index with the same name.
For details about how to configure the Kafka configuration properties file, see get started with Kafka protocol.
-
Verify that data is indexed on the Elasticsearch server.
curl 'http://localhost:9200/test-elasticsearch-sink/_search' | jq
Ten records should be returned.
{
"took": 10,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 5,
"relation": "eq"
},
...output omitted...
{
"_index": "test-elasticsearch-sink",
"_type": "_doc",
"_id": "test-elasticsearch-sink+0+9",
"_score": 1,
"_source": {
"reportId": 4781,
"reportDate": "2020-12-11T11:31:43",
"reporterId": 3931,
"content": "Rose Bowl community health, behavioral health, and the"
}
}
]
}
}
-
Check the record count in the test-elasticsearch-sink
index.
curl 'http://localhost:9200/test-elasticsearch-sink/_count'
You should see the following output:
{"count":5,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0}}
You can see that the number of received records is identical to the number of sent messages.
-
Verify the connectivity between Kafka and Pulsar by searching a keyword in the test-elasticsearch-sink
index.
This example shows how to search the keyword health
in the test-elasticsearch-sink
index.
curl 'http://localhost:9200/test-elasticsearch-sink/_search?q=content:health' | jq
You should see the following output:
{
"took": 9,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 1,
"relation": "eq"
},
"max_score": 3.2154756,
"hits": [
{
"_index": "test-elasticsearch-sink",
"_type": "_doc",
"_id": "test-elasticsearch-sink+0+11",
"_score": 3.2154756,
"_source": {
"reportId": 4781,
"reportDate": "2020-12-11T11:31:43",
"reporterId": 3931,
"content": "Rose Bowl community health, behavioral health, and the"
}
}
]
}
}
You can see that all records have the word health
listed in their content
field.