sink
Kafka Connect Milvus Sink
The official Milvus Kafka Connect Sink connector.

Available on
StreamNative Cloud console

Authored by
Zilliz
Support type
Community
License
Apache License 2.0

The Milvus Kafka Connect Sink connector is a Kafka Connect connector that writes data from Kafka topics to Milvus.

Prerequisites

  • If you don't already have a collection in Zilliz Cloud or Milvus cluster, create a collection with a vector field.
  • Collect the endpoint, token, and collection.name parameters from your Zilliz Cloud instance or Milvus cluster.

Quick Start

  1. Setup the kcctl client: doc

  2. Set up a Milvus cluster, we can set up a local Milvus cluster using docker-compose:

    wget -q https://github.com/milvus-io/milvus/releases/download/${MILVUS_VERSION}/milvus-standalone-docker-compose.yml -O "tmp/docker-compose.yml" > /dev/null 2>&1
    docker compose -f "tmp/docker-compose.yml" up -d > /dev/null 2>&1
    
  3. Initialize the local Milvus cluster, below is a python script to do so:

    from pymilvus import MilvusClient, DataType
    import time
    
    client = MilvusClient(
    uri="http://localhost:19530",
    db_name="default"
    )
    
    # 3.1. Create schema
    schema = MilvusClient.create_schema(
    auto_id=False,
    enable_dynamic_field=False,
    )
    schema.add_field(field_name="id", datatype=DataType.INT64, is_primary=True)
    schema.add_field(field_name="title", datatype=DataType.VARCHAR, max_length=65535)
    schema.add_field(field_name="title_vector", datatype=DataType.FLOAT_VECTOR, dim=8)
    schema.add_field(field_name="link", datatype=DataType.VARCHAR, max_length=65535)
    
    index_params = client.prepare_index_params()
    
    index_params.add_index(
    field_name="id",
    index_type="STL_SORT"
    )
    
    index_params.add_index(
    field_name="title_vector", 
    index_type="IVF_FLAT",
    metric_type="IP",
    params={ "nlist": 128 }
    )
    
    client.create_collection(
    collection_name="demo",
    schema=schema,
    index_params=index_params
    )
    
    time.sleep(5)
    
  4. Create a JSON file like the following:

    {
        "name": "mysink13",
        "config": {
            "connector.class": "com.milvus.io.kafka.MilvusSinkConnector",
            "topics": "mytopic",
            "public.endpoint": "http://{MILVUS_IP}:19530",
            "token": "",
            "collection.name": "demo",
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "value.converter.schemas.enable": "false",
            "tasks.max": "1"
        }
    }
    
  5. Run the following command to create the connector:

    kcctl create -f <filename>.json
    

Configuration

The Milvus Kafka Connect Sink connector is configured using the following Required properties:

ParameterDescription
public.endpointThe endpoint of your Zilliz Cloud instance or Milvus cluster.
tokenThe token of your Zilliz Cloud instance or Milvus cluster.
collection.nameThe name of the collection to write to.
topicsThe Kafka topics to read from.

For more information about the configuration properties, see the offical Milvus Kafka Connect Sink Connector documentation.

Known Issues

  1. The Milvus Kafka Connect Sink connector does not support the dynamic fields feature yet, and when the dynamic fields feature enabled on the Milvus server, or enabled on the Zilliz Cloud instance, the connector will not work properly and throw below error:
2024-09-23T21:21:24,441+0000 [task-thread-mysink13-0] ERROR org.apache.kafka.connect.runtime.WorkerSinkTask - WorkerSinkTask{id=mysink13-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: 'com.google.protobuf.Internal$ProtobufList io.milvus.grpc.JSONArray.emptyList(java.lang.Class)'
java.lang.NoSuchMethodError: 'com.google.protobuf.Internal$ProtobufList io.milvus.grpc.JSONArray.emptyList(java.lang.Class)'

When you encounter this issue, please disable the dynamic fields feature on the Milvus server or Zilliz Cloud instance.