1. StreamNative Platform
  2. Connect

Connect to Pulsar cluster using Kafka Java client

StreamNative Platform brings native Kafka protocol support to Pulsar brokers using Kafka on Pulsar (KoP). Therefore, you can migrate your existing Kafka applications and services to Apache Pulsar without modifying the codes.

This example shows how to use the Kafka Java client to connect to a Pulsar cluster and then produce and consume messages to and from the Pulsar cluster.

Currently, StreamNative Platform supports Kafka Client v1.0.0 - v2.6.0.

Prerequisites

Procedures

  1. Grant produce and consume permission to the Admin role on the namespace.

    pulsarctl namespaces grant-permission --role ROLE_NAME --actions produce,consume NAMESPACE_NAME
    
  2. Configure KoP SSL connection and enable KoP.

    KoP starts up together with the Pulsar broker. By default, KoP is enabled. You can set the following options based on your KoP SSL connection.

    broker:
      # set your own domain for accessing kop outside from the cluster
      advertisedDomain: ''
      kop:
        enabled: true
        tls:
          enabled: true
          # create a secret with keystore.jks and truststore.jks for kop tls security
          certSecretName: 'kop-secret'
          # create a secret for keystore and truststore cert
          passwordSecretRef:
            key: password
            name: kop-keystore-password
    
  3. Get the external IP address of the Istio gateway service.

    kubectl get svc/istio-ingressgateway -n <k8s_namespace>
    
  4. Install Kafka client.

    a. Download the Kafka client 2.2.0. In this example, Kafka 2.2.0 is used because the default configuration of the producer's retries parameter is changed from 0 to 2147483647 since Kafka 2.1.0.

    curl -O https://archive.apache.org/dist/kafka/2.2.0/kafka_2.11-2.2.0.tgz
    

    b. Extract the downloaded package.

    tar -xf kafka_2.11-2.2.0.tgz
    
  5. Prepare a file named client-ssl.properties. The file contains the following information.

    security.protocol=SASL_SSL
    ssl.truststore.location=client.truststore.jks
    ssl.truststore.password=client
    ssl.endpoint.identification.algorithm=
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="USER_NAME" password="token:[YOUR_TOKEN]";
    
  6. Run Kafka client.

    a. Enter the Kafka client directory.

    cd kafka_2.11-2.2.0
    

    b. Start a Kafka producer and send a message from the Kafka producer..

    kafka-console-producer.sh --broker-list kop_service_url:9093 --topic TOPIC_NAME --producer.config client-ssl.properties
    > message-for-kafka-client
    

    c. Start a Kafka consumer.

    kafka-console-consumer.sh --bootstrap-server kop_service_url:9093 --topic TOPIC_NAME --consumer.config client-ssl.properties
    

    At the same time, you can receive the message message-for-kafka-client.

Previous
Pulsar - Websocket