1. Operating StreamNative Platform
  2. Protocols

Configure KoP

Kafka on Pulsar (KoP) brings the native Apache Kafka protocol support to Apache Pulsar by introducing a Kafka protocol handler on Pulsar brokers. By adding the KoP protocol handler to your existing Pulsar cluster, you can now migrate your existing Kafka applications and services to Pulsar without modifying the code.

Enable KoP

To enable KoP access within a Kubernetes cluster, you can configure the <components>.kop property of the StreamNative Platform in the values.yaml YAML file as follows, and use the helm upgrade command to update the resource.

  1. Enable KoP.

    broker:
      kop:
        enabled: true
    
  2. Apply the new configuration.

    helm upgrade -f /path/to/your/values.yaml <release_name> streamnative/sn-platform -n <k8s_namespace>
    

Note

To access KoP outside a Kubernetes cluster, you need to install Istio and Istio Ingress Gateway, and enable TLS for Istio Ingress Gateway. For details, see configure Istio.

Connect to your Pulsar cluster using the Kafka Java client

This example shows how to use the Kafka Java client to connect to a Pulsar cluster and then produce and consume messages to and from the Pulsar cluster.

Currently, StreamNative Platform supports Kafka Client v1.0.0 - v2.6.0.

Prerequisites

Steps

  1. Grant produce and consume permission to the Admin role on the namespace.

    pulsarctl namespaces grant-permission --role ROLE_NAME --actions produce,consume NAMESPACE_NAME
    
  2. Configure KoP SSL connection and enable KoP.

    KoP starts up together with the Pulsar broker. By default, KoP is enabled. You can set the following options based on your KoP SSL connection.

    broker:
      # set your own domain for accessing kop outside from the cluster
      advertisedDomain: ''
      kop:
        enabled: true
        tls:
          enabled: true
          # create a secret with keystore.jks and truststore.jks for kop tls security
          certSecretName: 'kop-secret'
          # create a secret for keystore and truststore cert
          passwordSecretRef:
            key: password
            name: kop-keystore-password
    
  3. Get the external IP address of the Istio gateway service.

    kubectl get svc/istio-ingressgateway -n <k8s_namespace>
    
  4. Install Kafka client.

    a. Download the Kafka client 2.2.0. In this example, Kafka 2.2.0 is used because the default configuration of the producer's retries parameter is changed from 0 to 2147483647 since Kafka 2.1.0.

    curl -O https://archive.apache.org/dist/kafka/2.2.0/kafka_2.11-2.2.0.tgz
    

    b. Extract the downloaded package.

    tar -xf kafka_2.11-2.2.0.tgz
    
  5. Prepare a file named client-ssl.properties. The file contains the following information.

    security.protocol=SASL_SSL
    ssl.truststore.location=client.truststore.jks
    ssl.truststore.password=client
    ssl.endpoint.identification.algorithm=
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="USER_NAME" password="token:[YOUR_TOKEN]";
    
  6. Run a Kafka client.

    a. Enter the Kafka client directory.

    cd kafka_2.11-2.2.0
    

    b. Start a Kafka producer and send a message from the Kafka producer..

    kafka-console-producer.sh --broker-list kop_service_url:9093 --topic TOPIC_NAME --producer.config client-ssl.properties
    > message-for-kafka-client
    

    c. Start a Kafka consumer.

    kafka-console-consumer.sh --bootstrap-server kop_service_url:9093 --topic TOPIC_NAME --consumer.config client-ssl.properties
    

    At the same time, you can receive the message message-for-kafka-client.

Previous
Configure Istio