> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Configure Kafka Protocol

StreamNative Private Cloud supports the Kafka protocol via the Ursa engine, a Kafka-compatible data streaming engine that has been refined through our extensive experience developing Pulsar, KoP (Kafka-on-Pulsar), and managing both Pulsar and Kafka at scale. Designed for today's cost-conscious economy, Ursa offers a genuinely cloud-native experience and enterprise-grade features that boost developer productivity and operational efficiency.

* **All of the Kafka features you're used to:** Ursa supports all the Kafka features you're accustomed to, including KStreams, KSQL, KTables with Topic Compaction, Schema Registry for the Java Client, and Kerberos Authentication for Kafka Clients.
* **Great developer experience:** Ursa features streamlined local testing, with its own testcontainers module.
* **Robust and reliable:** Architecturally designed for scale and subjected to rigorous resilience testing. We've fortified Ursa through comprehensive testing, ensuring it meets and exceeds the demands of large-scale Kafka operations.

## Prerequisites

<Note title="Note">
  To use the Kafka Protocol on StreamNative Private Cloud, ensure you are using image `streamnative/private-cloud:3.1.0.4` or later.
</Note>

To expose Kafka service outside the Kubernetes cluster, deploy your Pulsar cluster with Istio. See [Deploy Pulsar with Istio](/private-cloud/v2/operate-private-cloud/deploy/deploy-pulsar-with-istio) for detailed instructions.

## Enable Kafka Protocol

To enable the Kafka protocol, update the `PulsarBroker` CR with the following configuration:

```yaml theme={null}
spec:
  config:
    protocolHandlers:
      kop:
        enabled: true
    transactionEnabled: true
```

**Configuration Fields:**

* `spec.config.protocolHandlers.kop.enabled`: Required. Set to `true` to enable the Kafka protocol.
* `spec.config.transactionEnabled`: Required. Set to `true` to enable Kafka transaction support.

The operator will automatically restart the broker pods to apply the changes.

## Verify Kafka Protocol

After enabling the Kafka protocol, verify that it's working correctly.

### Get Service Endpoints

Retrieve the Kafka endpoints from the PulsarBroker status:

```bash theme={null}
kubectl get pulsarbroker private-cloud -n pulsar -o=jsonpath='{.status.serviceEndpoints}'
```

**Internal Endpoint (within Kubernetes cluster):**

* Kafka: `private-cloud-broker.pulsar.svc.cluster.local:9092`

**External Endpoint (outside Kubernetes cluster, if using Istio):**

* Kafka TLS: `pulsar.example.com:9093`

### Test Kafka Connectivity

#### Test Within Kubernetes Cluster

Use a Kafka client pod within the cluster to test connectivity:

```bash theme={null}
# Deploy a Kafka client pod with Istio sidecar injection
kubectl run kafka-client -n pulsar --image=confluentinc/cp-kafka:latest -it --rm -- bash

# Inside the pod, produce messages
kafka-console-producer.sh \
  --bootstrap-server private-cloud-broker.pulsar.svc.cluster.local:9092 \
  --topic test

# Consume messages
kafka-console-consumer.sh \
  --bootstrap-server private-cloud-broker.pulsar.svc.cluster.local:9092 \
  --topic test \
  --from-beginning
```

#### Test Outside Kubernetes Cluster

If you deployed with Istio and configured external access, test from outside the cluster using TLS:

1. Create a `client-ssl.properties` file:

```properties theme={null}
security.protocol=SSL
ssl.endpoint.identification.algorithm=
```

2. Use Kafka console tools with TLS:

```bash theme={null}
# Produce messages
kafka-console-producer.sh \
  --bootstrap-server pulsar.example.com:9093 \
  --topic test \
  --producer.config ./client-ssl.properties

# Consume messages
kafka-console-consumer.sh \
  --bootstrap-server pulsar.example.com:9093 \
  --topic test \
  --from-beginning \
  --consumer.config ./client-ssl.properties
```

## Advanced Configuration

### Kafka-Specific Broker Settings

You can customize Kafka-specific settings through the PulsarBroker configuration:

```yaml theme={null}
spec:
  config:
    protocolHandlers:
      kop:
        enabled: true
        # Additional KoP configurations can be added here
    transactionEnabled: true
    # Kafka transaction coordinator settings
    transactionCoordinatorEnabled: true
```

### Topic Management

Kafka topics are automatically created as Pulsar topics. The default topic format is:

```
persistent://public/default/<topic-name>
```

You can manage topics using either Kafka or Pulsar tools:

```bash theme={null}
# Using Kafka tools
kafka-topics.sh --bootstrap-server <broker-address> --list

# Using Pulsar tools
pulsar-admin topics list public/default
```

### Consumer Groups

Kafka consumer groups are mapped to Pulsar subscriptions. Each consumer group creates a corresponding subscription in Pulsar.

```bash theme={null}
# View consumer groups (Kafka)
kafka-consumer-groups.sh --bootstrap-server <broker-address> --list

# View subscriptions (Pulsar)
pulsar-admin topics stats persistent://public/default/<topic-name>
```

## Use Kafka with Authentication

If your Pulsar cluster has authentication enabled, you need to configure Kafka clients accordingly.

### With JWT Authentication

Configure Kafka clients to use SASL/PLAIN with JWT tokens:

```properties theme={null}
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="token" \
  password="<your-jwt-token>";
```

### With OAuth2 Authentication

Configure Kafka clients to use SASL/OAUTHBEARER:

```properties theme={null}
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
  clientId="<client-id>" \
  clientSecret="<client-secret>";
```

## Troubleshooting

### Kafka Protocol Not Available

1. Verify the Kafka protocol is enabled in the broker configuration:

```bash theme={null}
kubectl get pulsarbroker private-cloud -n pulsar -o yaml | grep -A 2 "kop:"
```

2. Check broker logs for Kafka-related errors:

```bash theme={null}
kubectl logs -n pulsar private-cloud-broker-0 -c pulsar-broker | grep -i kafka
```

3. Ensure the broker pods have restarted after configuration changes:

```bash theme={null}
kubectl get pods -n pulsar -l component=broker
```

### Connection Refused or Timeout

1. Verify network connectivity to the broker:

```bash theme={null}
# From within the cluster
kubectl run -it --rm debug --image=busybox --restart=Never -- \
  nc -zv private-cloud-broker.pulsar.svc.cluster.local 9092
```

2. If using Istio, check the Gateway and VirtualService:

```bash theme={null}
kubectl get gateway -n pulsar
kubectl get virtualservice -n pulsar
```

3. Verify the Istio Ingress Gateway is routing traffic correctly:

```bash theme={null}
kubectl logs -n istio-system deploy/istio-ingressgateway | grep 9093
```

### Authentication Failures

1. Verify authentication is configured correctly in the broker:

```bash theme={null}
kubectl get pulsarbroker private-cloud -n pulsar -o yaml | grep -A 10 "authentication"
```

2. Check that the JWT token or OAuth credentials are valid

3. Review broker logs for authentication errors:

```bash theme={null}
kubectl logs -n pulsar private-cloud-broker-0 -c pulsar-broker | grep -i "auth"
```

### Transaction Failures

1. Verify transactions are enabled:

```bash theme={null}
kubectl get pulsarbroker private-cloud -n pulsar -o yaml | grep "transactionEnabled"
```

2. Check transaction coordinator logs:

```bash theme={null}
kubectl logs -n pulsar private-cloud-broker-0 -c pulsar-broker | grep -i "transaction"
```

## Performance Tuning

### Broker Configuration

For high-throughput Kafka workloads, consider tuning these broker settings:

```yaml theme={null}
spec:
  config:
    # Increase max message size
    maxMessageSize: 5242880
    # Adjust managed ledger settings
    managedLedgerDefaultEnsembleSize: 3
    managedLedgerDefaultWriteQuorum: 3
    managedLedgerDefaultAckQuorum: 2
```

### Client Configuration

Optimize Kafka client configuration for better performance:

```properties theme={null}
# Producer settings
batch.size=16384
linger.ms=10
compression.type=snappy
acks=1

# Consumer settings
fetch.min.bytes=1024
fetch.max.wait.ms=500
```
