sink
Kafka Connect ElasticSearch Sink
The official ElasticSearch Kafka Connect Sink connector.

Available on
StreamNative Cloud console

Authored by
Aiven-Open
Support type
Community
License
Apache License 2.0

The ElasticSearch Kafka Connect Sink connector is a Kafka Connect connector that writes data from Kafka topics to ElasticSearch.

Prerequisites

  • A running ElasticSearch cluster

Configuration

The ElasticSearch Kafka Connect Sink connector is configured using the following properties:

ParameterRequiredDescriptionDefault
connection.urltrueList of Elasticsearch HTTP connection URLs e.g. http://eshost1:9200, http://eshost2:9200
type.nametrueThe Elasticsearch type name to use when indexing.
connection.usernamefalseThe username used to authenticate with Elasticsearch.
connection.passwordfalseThe password used to authenticate with Elasticsearch.
batch.sizefalseThe number of records to process as a batch when writing to Elasticsearch.2000
max.in.flight.requestsfalseThe maximum number of indexing requests that can be in-flight to Elasticsearch before blocking further requests.5
max.buffered.recordsfalseThe maximum number of records each task will buffer before blocking acceptance of more records.20000
linger.msfalseLinger time in milliseconds for batching.1
flush.timeout.msfalseThe timeout in milliseconds to use for periodic flushing.10000
max.retriesfalseThe maximum number of retries that are allowed for failed indexing requests.5
retry.backoff.msfalseHow long to wait in milliseconds before attempting the first retry of a failed indexing.100
key.ignorefalseWhether to ignore the record key for the purpose of forming the Elasticsearch document ID.false
topic.key.ignorefalseList of topics for which key.ignore should be true.
schema.ignorefalseWhether to ignore schemas during indexing.false
topic.schema.ignorefalseList of topics for which schema.ignore should be true.
drop.invalid.messagefalseWhether to drop kafka message when it cannot be converted to output message.false
compact.map.entriesfalseDefines how map entries with string keys within record values should be written to JSON.true
connection.timeout.msfalseHow long to wait in milliseconds when establishing a connection to the Elasticsearch server.1000
read.timeout.msfalseHow long to wait in milliseconds for the Elasticsearch server to send a response.3000
behavior.on.null.valuesfalseHow to handle records with a non-null key and a null value, Valid options are 'ignore', 'delete', and 'fail'.ignore
behavior.on.malformed.documentsfalseHow to handle records that Elasticsearch rejects due to some malformation of the document itself, Valid options are 'ignore', 'warn', and 'fail'.fail

For more information about the configuration properties, see the Official ElasticSearch Kafka Connect Sink Connector documentation.