- StreamNative Platform
- Connect
Connect to Pulsar cluster using Kafka Java client
StreamNative Platform brings native Kafka protocol support to Pulsar brokers using Kafka on Pulsar (KoP). Therefore, you can migrate your existing Kafka applications and services to Apache Pulsar without modifying the codes.
This example shows how to use the Kafka Java client to connect to a Pulsar cluster and then produce and consume messages to and from the Pulsar cluster.
Currently, StreamNative Platform supports Kafka Client v1.0.0 - v2.6.0.
Prerequisites
- Install Java 1.8.0 or higher version.
- Install Istio. For details, see install Istio for KoP access.
Procedures
Grant produce and consume permission to the Admin role on the namespace.
pulsarctl namespaces grant-permission --role ROLE_NAME --actions produce,consume NAMESPACE_NAME
Configure KoP SSL connection and enable KoP.
KoP starts up together with the Pulsar broker. By default, KoP is enabled. You can set the following options based on your KoP SSL connection.
broker: # set your own domain for accessing kop outside from the cluster advertisedDomain: '' kop: enabled: true tls: enabled: true # create a secret with keystore.jks and truststore.jks for kop tls security certSecretName: 'kop-secret' # create a secret for keystore and truststore cert passwordSecretRef: key: password name: kop-keystore-password
Get the external IP address of the Istio gateway service.
kubectl get svc/istio-ingressgateway -n <k8s_namespace>
Install Kafka client.
a. Download the Kafka client 2.2.0. In this example, Kafka 2.2.0 is used because the default configuration of the producer's
retries
parameter is changed from0
to2147483647
since Kafka 2.1.0.curl -O https://archive.apache.org/dist/kafka/2.2.0/kafka_2.11-2.2.0.tgz
b. Extract the downloaded package.
tar -xf kafka_2.11-2.2.0.tgz
Prepare a file named
client-ssl.properties
. The file contains the following information.security.protocol=SASL_SSL ssl.truststore.location=client.truststore.jks ssl.truststore.password=client ssl.endpoint.identification.algorithm= sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="USER_NAME" password="token:[YOUR_TOKEN]";
Run Kafka client.
a. Enter the Kafka client directory.
cd kafka_2.11-2.2.0
b. Start a Kafka producer and send a message from the Kafka producer..
kafka-console-producer.sh --broker-list kop_service_url:9093 --topic TOPIC_NAME --producer.config client-ssl.properties > message-for-kafka-client
c. Start a Kafka consumer.
kafka-console-consumer.sh --bootstrap-server kop_service_url:9093 --topic TOPIC_NAME --consumer.config client-ssl.properties
At the same time, you can receive the message
message-for-kafka-client
.