The ElasticSearch Kafka Connect Sink connector is a Kafka Connect connector that writes data from Kafka topics to ElasticSearch.
- A running ElasticSearch cluster
Quick Start
Setup the kcctl client: doc
Set up a ElasticSearch cluster
Create a JSON file like the following:
{ "name": "elasticsearch-sink", "config": { "connector.class": "io.aiven.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max": "1", "topics": "kafka-elastic-input", "connection.url": "http://elastic:9200", "": "kafka-connect", "key.ignore": "true", "schema.ignore": "true", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false" } }
Run the following command to create the connector:
kcctl create -f <filename>.json
The ElasticSearch Kafka Connect Sink connector is configured using the following properties:
Parameter | Required | Description | Default |
connection.url | true | List of Elasticsearch HTTP connection URLs e.g. http://eshost1:9200, http://eshost2:9200 | | | true | The Elasticsearch type name to use when indexing. | |
connection.username | false | The username used to authenticate with Elasticsearch. | |
connection.password | false | The password used to authenticate with Elasticsearch. | |
batch.size | false | The number of records to process as a batch when writing to Elasticsearch. | 2000 | | false | The maximum number of indexing requests that can be in-flight to Elasticsearch before blocking further requests. | 5 |
max.buffered.records | false | The maximum number of records each task will buffer before blocking acceptance of more records. | 20000 | | false | Linger time in milliseconds for batching. | 1 | | false | The timeout in milliseconds to use for periodic flushing. | 10000 |
max.retries | false | The maximum number of retries that are allowed for failed indexing requests. | 5 | | false | How long to wait in milliseconds before attempting the first retry of a failed indexing. | 100 |
key.ignore | false | Whether to ignore the record key for the purpose of forming the Elasticsearch document ID. | false |
topic.key.ignore | false | List of topics for which key.ignore should be true. | |
schema.ignore | false | Whether to ignore schemas during indexing. | false |
topic.schema.ignore | false | List of topics for which schema.ignore should be true. | |
drop.invalid.message | false | Whether to drop kafka message when it cannot be converted to output message. | false | | false | Defines how map entries with string keys within record values should be written to JSON. | true | | false | How long to wait in milliseconds when establishing a connection to the Elasticsearch server. | 1000 | | false | How long to wait in milliseconds for the Elasticsearch server to send a response. | 3000 |
behavior.on.null.values | false | How to handle records with a non-null key and a null value, Valid options are 'ignore', 'delete', and 'fail'. | ignore |
behavior.on.malformed.documents | false | How to handle records that Elasticsearch rejects due to some malformation of the document itself, Valid options are 'ignore', 'warn', and 'fail'. | fail |
For more information about the configuration properties, see the Official ElasticSearch Kafka Connect Sink Connector documentation.