This connector is available as a built-in connector on StreamNative Cloud.
Prerequisites
- A running ElasticSearch cluster
Quick Start
- Setup the kcctl client: doc
- Set up a ElasticSearch cluster
-
Create a JSON file like the following:
-
Run the following command to create the connector:
Configuration
The ElasticSearch Kafka Connect Sink connector is configured using the following properties:| Parameter | Required | Description | Default | |
|---|---|---|---|---|
| connection.url | true | List of Elasticsearch HTTP connection URLs e.g. http://eshost1:9200, http://eshost2:9200 | ||
| type.name | true | The Elasticsearch type name to use when indexing. | ||
| connection.username | false | The username used to authenticate with Elasticsearch. | ||
| connection.password | false | The password used to authenticate with Elasticsearch. | ||
| batch.size | false | The number of records to process as a batch when writing to Elasticsearch. | 2000 | |
| max.in.flight.requests | false | The maximum number of indexing requests that can be in-flight to Elasticsearch before blocking further requests. | 5 | |
| max.buffered.records | false | The maximum number of records each task will buffer before blocking acceptance of more records. | 20000 | |
| linger.ms | false | Linger time in milliseconds for batching. | 1 | |
| flush.timeout.ms | false | The timeout in milliseconds to use for periodic flushing. | 10000 | |
| max.retries | false | The maximum number of retries that are allowed for failed indexing requests. | 5 | |
| retry.backoff.ms | false | How long to wait in milliseconds before attempting the first retry of a failed indexing. | 100 | |
| key.ignore | false | Whether to ignore the record key for the purpose of forming the Elasticsearch document ID. | false | |
| topic.key.ignore | false | List of topics for which key.ignore should be true. | ||
| schema.ignore | false | Whether to ignore schemas during indexing. | false | |
| topic.schema.ignore | false | List of topics for which schema.ignore should be true. | ||
| drop.invalid.message | false | Whether to drop kafka message when it cannot be converted to output message. | false | |
| compact.map.entries | false | Defines how map entries with string keys within record values should be written to JSON. | true | |
| connection.timeout.ms | false | How long to wait in milliseconds when establishing a connection to the Elasticsearch server. | 1000 | |
| read.timeout.ms | false | How long to wait in milliseconds for the Elasticsearch server to send a response. | 3000 | |
| behavior.on.null.values | false | How to handle records with a non-null key and a null value, Valid options are ‘ignore’, ‘delete’, and ‘fail’. | ignore | |
| behavior.on.malformed.documents | false | How to handle records that Elasticsearch rejects due to some malformation of the document itself, Valid options are ‘ignore’, ‘warn’, and ‘fail’. | fail |

































