The Milvus Kafka Connect Sink connector is a Kafka Connect connector that writes data from Kafka topics to Milvus.
Prerequisites
- If you don't already have a collection in Zilliz Cloud or Milvus cluster, create a collection with a vector field.
- Collect the
endpoint
,token
, andcollection.name
parameters from your Zilliz Cloud instance or Milvus cluster.
Quick Start
Setup the kcctl client: doc
Set up a Milvus cluster, we can set up a local Milvus cluster using docker-compose:
wget -q https://github.com/milvus-io/milvus/releases/download/${MILVUS_VERSION}/milvus-standalone-docker-compose.yml -O "tmp/docker-compose.yml" > /dev/null 2>&1 docker compose -f "tmp/docker-compose.yml" up -d > /dev/null 2>&1
Initialize the local Milvus cluster, below is a python script to do so:
from pymilvus import MilvusClient, DataType import time client = MilvusClient( uri="http://localhost:19530", db_name="default" ) # 3.1. Create schema schema = MilvusClient.create_schema( auto_id=False, enable_dynamic_field=False, ) schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True) schema.add_field(field_name="title", datatype=DataType.VARCHAR, max_length=65535) schema.add_field(field_name="title_vector", datatype=DataType.FLOAT_VECTOR, dim=8) schema.add_field(field_name="link", datatype=DataType.VARCHAR, max_length=65535) index_params = client.prepare_index_params() index_params.add_index( field_name="id", index_type="STL_SORT" ) index_params.add_index( field_name="title_vector", index_type="IVF_FLAT", metric_type="IP", params={ "nlist": 128 } ) client.create_collection( collection_name="demo", schema=schema, index_params=index_params ) time.sleep(5)
Create a JSON file like the following:
{ "name": "mysink13", "config": { "connector.class": "com.milvus.io.kafka.MilvusSinkConnector", "topics": "mytopic", "public.endpoint": "http://{MILVUS_IP}:19530", "token": "", "collection.name": "demo", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "tasks.max": "1" } }
Run the following command to create the connector:
kcctl create -f <filename>.json
Configuration
The Milvus Kafka Connect Sink connector is configured using the following Required properties:
Parameter | Description |
---|---|
public.endpoint | The endpoint of your Zilliz Cloud instance or Milvus cluster. |
token | The token of your Zilliz Cloud instance or Milvus cluster. |
collection.name | The name of the collection to write to. |
topics | The Kafka topics to read from. |
For more information about the configuration properties, see the offical Milvus Kafka Connect Sink Connector documentation.
Known Issues
- The Milvus Kafka Connect Sink connector does not support the dynamic fields feature yet, and when the dynamic fields feature enabled on the Milvus server, or enabled on the Zilliz Cloud instance, the connector will not work properly and throw below error:
2024-09-23T21:21:24,441+0000 [task-thread-mysink13-0] ERROR org.apache.kafka.connect.runtime.WorkerSinkTask - WorkerSinkTask{id=mysink13-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: 'com.google.protobuf.Internal$ProtobufList io.milvus.grpc.JSONArray.emptyList(java.lang.Class)'
java.lang.NoSuchMethodError: 'com.google.protobuf.Internal$ProtobufList io.milvus.grpc.JSONArray.emptyList(java.lang.Class)'
When you encounter this issue, please disable the dynamic fields feature on the Milvus server or Zilliz Cloud instance.