StreamNative Private Cloud supports the Kafka protocol via the Ursa engine, a Kafka-compatible data streaming engine that has been refined through our extensive experience developing Pulsar, KoP (Kafka-on-Pulsar), and managing both Pulsar and Kafka at scale. Designed for today’s cost-conscious economy, Ursa offers a genuinely cloud-native experience and enterprise-grade features that boost developer productivity and operational efficiency.
- All of the Kafka features you’re used to: Ursa supports all the Kafka features you’re accustomed to, including KStreams, KSQL, KTables with Topic Compaction, Schema Registry for the Java Client, and Kerberos Authentication for Kafka Clients.
- Great developer experience: Ursa features streamlined local testing, with its own testcontainers module.
- Robust and reliable: Architecturally designed for scale and subjected to rigorous resilience testing. We’ve fortified Ursa through comprehensive testing, ensuring it meets and exceeds the demands of large-scale Kafka operations.
Prerequisites
To use the Kafka Protocol on StreamNative Private Cloud, ensure you are using image streamnative/private-cloud:3.1.0.4 or later.
To expose Kafka service outside the Kubernetes cluster, deploy your Pulsar cluster with Istio. See Deploy Pulsar with Istio for detailed instructions.
Enable Kafka Protocol
To enable the Kafka protocol, update the PulsarBroker CR with the following configuration:
spec:
config:
protocolHandlers:
kop:
enabled: true
transactionEnabled: true
Configuration Fields:
spec.config.protocolHandlers.kop.enabled: Required. Set to true to enable the Kafka protocol.
spec.config.transactionEnabled: Required. Set to true to enable Kafka transaction support.
The operator will automatically restart the broker pods to apply the changes.
Verify Kafka Protocol
After enabling the Kafka protocol, verify that it’s working correctly.
Get Service Endpoints
Retrieve the Kafka endpoints from the PulsarBroker status:
kubectl get pulsarbroker private-cloud -n pulsar -o=jsonpath='{.status.serviceEndpoints}'
Internal Endpoint (within Kubernetes cluster):
- Kafka:
private-cloud-broker.pulsar.svc.cluster.local:9092
External Endpoint (outside Kubernetes cluster, if using Istio):
- Kafka TLS:
pulsar.example.com:9093
Test Kafka Connectivity
Test Within Kubernetes Cluster
Use a Kafka client pod within the cluster to test connectivity:
# Deploy a Kafka client pod with Istio sidecar injection
kubectl run kafka-client -n pulsar --image=confluentinc/cp-kafka:latest -it --rm -- bash
# Inside the pod, produce messages
kafka-console-producer.sh \
--bootstrap-server private-cloud-broker.pulsar.svc.cluster.local:9092 \
--topic test
# Consume messages
kafka-console-consumer.sh \
--bootstrap-server private-cloud-broker.pulsar.svc.cluster.local:9092 \
--topic test \
--from-beginning
Test Outside Kubernetes Cluster
If you deployed with Istio and configured external access, test from outside the cluster using TLS:
- Create a
client-ssl.properties file:
security.protocol=SSL
ssl.endpoint.identification.algorithm=
- Use Kafka console tools with TLS:
# Produce messages
kafka-console-producer.sh \
--bootstrap-server pulsar.example.com:9093 \
--topic test \
--producer.config ./client-ssl.properties
# Consume messages
kafka-console-consumer.sh \
--bootstrap-server pulsar.example.com:9093 \
--topic test \
--from-beginning \
--consumer.config ./client-ssl.properties
Advanced Configuration
Kafka-Specific Broker Settings
You can customize Kafka-specific settings through the PulsarBroker configuration:
spec:
config:
protocolHandlers:
kop:
enabled: true
# Additional KoP configurations can be added here
transactionEnabled: true
# Kafka transaction coordinator settings
transactionCoordinatorEnabled: true
Topic Management
Kafka topics are automatically created as Pulsar topics. The default topic format is:
persistent://public/default/<topic-name>
You can manage topics using either Kafka or Pulsar tools:
# Using Kafka tools
kafka-topics.sh --bootstrap-server <broker-address> --list
# Using Pulsar tools
pulsar-admin topics list public/default
Consumer Groups
Kafka consumer groups are mapped to Pulsar subscriptions. Each consumer group creates a corresponding subscription in Pulsar.
# View consumer groups (Kafka)
kafka-consumer-groups.sh --bootstrap-server <broker-address> --list
# View subscriptions (Pulsar)
pulsar-admin topics stats persistent://public/default/<topic-name>
Use Kafka with Authentication
If your Pulsar cluster has authentication enabled, you need to configure Kafka clients accordingly.
With JWT Authentication
Configure Kafka clients to use SASL/PLAIN with JWT tokens:
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="token" \
password="<your-jwt-token>";
With OAuth2 Authentication
Configure Kafka clients to use SASL/OAUTHBEARER:
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
clientId="<client-id>" \
clientSecret="<client-secret>";
Troubleshooting
Kafka Protocol Not Available
- Verify the Kafka protocol is enabled in the broker configuration:
kubectl get pulsarbroker private-cloud -n pulsar -o yaml | grep -A 2 "kop:"
- Check broker logs for Kafka-related errors:
kubectl logs -n pulsar private-cloud-broker-0 -c pulsar-broker | grep -i kafka
- Ensure the broker pods have restarted after configuration changes:
kubectl get pods -n pulsar -l component=broker
Connection Refused or Timeout
- Verify network connectivity to the broker:
# From within the cluster
kubectl run -it --rm debug --image=busybox --restart=Never -- \
nc -zv private-cloud-broker.pulsar.svc.cluster.local 9092
- If using Istio, check the Gateway and VirtualService:
kubectl get gateway -n pulsar
kubectl get virtualservice -n pulsar
- Verify the Istio Ingress Gateway is routing traffic correctly:
kubectl logs -n istio-system deploy/istio-ingressgateway | grep 9093
Authentication Failures
- Verify authentication is configured correctly in the broker:
kubectl get pulsarbroker private-cloud -n pulsar -o yaml | grep -A 10 "authentication"
-
Check that the JWT token or OAuth credentials are valid
-
Review broker logs for authentication errors:
kubectl logs -n pulsar private-cloud-broker-0 -c pulsar-broker | grep -i "auth"
Transaction Failures
- Verify transactions are enabled:
kubectl get pulsarbroker private-cloud -n pulsar -o yaml | grep "transactionEnabled"
- Check transaction coordinator logs:
kubectl logs -n pulsar private-cloud-broker-0 -c pulsar-broker | grep -i "transaction"
Broker Configuration
For high-throughput Kafka workloads, consider tuning these broker settings:
spec:
config:
# Increase max message size
maxMessageSize: 5242880
# Adjust managed ledger settings
managedLedgerDefaultEnsembleSize: 3
managedLedgerDefaultWriteQuorum: 3
managedLedgerDefaultAckQuorum: 2
Client Configuration
Optimize Kafka client configuration for better performance:
# Producer settings
batch.size=16384
linger.ms=10
compression.type=snappy
acks=1
# Consumer settings
fetch.min.bytes=1024
fetch.max.wait.ms=500