- Operating StreamNative Platform
- Protocols
Configure KoP
Kafka on Pulsar (KoP) brings the native Apache Kafka protocol support to Apache Pulsar by introducing a Kafka protocol handler on Pulsar brokers. By adding the KoP protocol handler to your existing Pulsar cluster, you can now migrate your existing Kafka applications and services to Pulsar without modifying the code.
Enable KoP
To enable KoP access within a Kubernetes cluster, you can configure the <components>.kop
property of the StreamNative Platform in the values.yaml
YAML file as follows, and use the helm upgrade
command to update the resource.
Enable KoP.
broker: kop: enabled: true
Apply the new configuration.
helm upgrade -f /path/to/your/values.yaml <release_name> streamnative/sn-platform -n <k8s_namespace>
Note
To access KoP outside a Kubernetes cluster, you need to install Istio and Istio Ingress Gateway, and enable TLS for Istio Ingress Gateway. For details, see configure Istio.
Connect to your Pulsar cluster using the Kafka Java client
This example shows how to use the Kafka Java client to connect to a Pulsar cluster and then produce and consume messages to and from the Pulsar cluster.
Currently, StreamNative Platform supports Kafka Client v1.0.0 - v2.6.0.
Prerequisites
- Install Java 1.8.0 or higher version.
- Install Istio. For details, see install Istio for KoP access.
Steps
Grant produce and consume permission to the Admin role on the namespace.
pulsarctl namespaces grant-permission --role ROLE_NAME --actions produce,consume NAMESPACE_NAME
Configure KoP SSL connection and enable KoP.
KoP starts up together with the Pulsar broker. By default, KoP is enabled. You can set the following options based on your KoP SSL connection.
broker: # set your own domain for accessing kop outside from the cluster advertisedDomain: '' kop: enabled: true tls: enabled: true # create a secret with keystore.jks and truststore.jks for kop tls security certSecretName: 'kop-secret' # create a secret for keystore and truststore cert passwordSecretRef: key: password name: kop-keystore-password
Get the external IP address of the Istio gateway service.
kubectl get svc/istio-ingressgateway -n <k8s_namespace>
Install Kafka client.
a. Download the Kafka client 2.2.0. In this example, Kafka 2.2.0 is used because the default configuration of the producer's
retries
parameter is changed from0
to2147483647
since Kafka 2.1.0.curl -O https://archive.apache.org/dist/kafka/2.2.0/kafka_2.11-2.2.0.tgz
b. Extract the downloaded package.
tar -xf kafka_2.11-2.2.0.tgz
Prepare a file named
client-ssl.properties
. The file contains the following information.security.protocol=SASL_SSL ssl.truststore.location=client.truststore.jks ssl.truststore.password=client ssl.endpoint.identification.algorithm= sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="USER_NAME" password="token:[YOUR_TOKEN]";
Run a Kafka client.
a. Enter the Kafka client directory.
cd kafka_2.11-2.2.0
b. Start a Kafka producer and send a message from the Kafka producer..
kafka-console-producer.sh --broker-list kop_service_url:9093 --topic TOPIC_NAME --producer.config client-ssl.properties > message-for-kafka-client
c. Start a Kafka consumer.
kafka-console-consumer.sh --bootstrap-server kop_service_url:9093 --topic TOPIC_NAME --consumer.config client-ssl.properties
At the same time, you can receive the message
message-for-kafka-client
.