> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Configuring Kafka Clients

You can use Kafka clients to produce and consume messages to and from a StreamNative Cloud cluster.

Before you start using Kafka clients, you need to configure them properly. This section provides the necessary configurations and general guidelines for Kafka clients.

## General Recommendations

Kafka client configurations provide flexibility and control over various aspects of the client's behavior, performance, security, reliability, and more. Properly configuring these settings helps optimize the client's interactions with the StreamNative Cloud cluster and ensures efficient message processing. The following are two specific areas where ensuring correct settings positively impacts the workload:

* **Performance**: Client configurations can be adjusted to optimize the client's performance. Adjusting properties that control batching, compression, linger, and prefetch can significantly impact client throughput, latency, and resource utilization.

* **Robustness**: Kafka clients need to handle errors with retries or fail gracefully until a solution can be implemented to resolve the issue. Ensuring the configuration is correct can enhance application resilience and ensure reliability for mission-critical workloads.

## Configuration Overview

Client configuration settings can be grouped into the following categories:

* **Connection and network**: A Kafka client must establish a connection with StreamNative Cloud clusters to produce and consume messages. This category includes settings for bootstrap servers, connection timeout, and network buffer sizes. Optimizing these settings can ensure reliable and efficient communication between your clients and StreamNative Cloud.

* **Authentication and security**: Kafka supports various security mechanisms, such as TLS encryption, SASL authentication, and authorization using ACLs. This category includes security-related settings, such as SSL certificates, authentication protocols, and user credentials. Properly configuring security settings ensure the confidentiality, integrity, and authenticity of the communication between clients and StreamNative Cloud.

* **Message delivery and processing**: Kafka clients can process messages in various ways, such as consuming messages from specific topics, committing message offsets, or specifying how to handle message errors. This category includes settings for message delivery guarantees, acknowledgment mechanisms, and error handling strategies. Properly configuring these settings can ensure consistent and reliable message delivery, optimize processing performance, and handle errors effectively.

## Connection and Network settings

### Bootstrap Servers

The bootstrap servers you need to configure for Kafka clients is the **Kafka Service URL** of your StreamNative Cloud cluster.

You can obtain the bootstrap servers URL in two ways:

* **Cloud Console**: Navigate to the **Cluster Details** page in the **Cluster Dashboard**, locate the **Kafka Service URL** and copy it.
* **snctl CLI**: Run the following command, replacing `<cluster-name>` with your StreamNative Cloud cluster name:
  ```bash theme={null}
  echo "$(snctl get pulsarclusters <cluster-name> -o jsonpath='{.spec.serviceEndpoints[0].dnsName}'):9093"
  ```

For more information about service URLs, see [Cluster Service URLs](/cloud/clusters/manage-clusters/cluster#cluster-service-urls).

### Tune DNS resolution

Please consider the following JVM settings when using Java clients to connect to StreamNative Cloud:

* JVM properties
  * `networkaddress.cache.ttl`: Set it to `30` seconds.
  * `networkaddress.cache.negative.ttl`: Set it to `0` seconds.
* Kafka Producer and Consumer settings
  * `consumer.client.dns.lookup`: Set it to `use_all_dns_ips`.
  * `producer.client.dns.lookup`: Set it to `use_all_dns_ips`.

### Eliminate Cross-AZ Networking Traffic

<Note title="Note">
  This feature is only available in **Ursa Engine** clusters running version `4.0.0.7` or later. **Classic Engine** clusters use cross-AZ replication for data durability and availability and cannot take advantage of this optimization.
</Note>

Ursa Engine leverages object storage to store data, eliminating the need for cross-AZ replication. This architecture provides two key benefits:

1. You can produce and consume messages across different availability zones without incurring additional inter-AZ networking costs
2. Kafka clients can connect exclusively to brokers within their same availability zone, reducing network latency and costs

#### Configure Availability Zone Affinity

<Note title="Note">
  For cluster versions 4.0.0.7 or older, you must set the `client.id` to match the availability zone ID to enable zone-aware routing.
</Note>

To enable zone-aware routing and optimize your network costs:

1. Ensure at least one broker is deployed in the same availability zone as your Kafka clients
2. Specify your availability **zone ID** in your client ID by appending `zone_id=<zone-id>` to the client ID. The client ID must follow this format: `zone_id=<zone-id>,key1=value1,key2=value2`

For example, if your application runs in availability zone `us-west-1a` and the zone ID is `usw-az1`, set your client ID to `zone_id=usw-az1,other=value`. This ensures your client connects to brokers in the same zone.

<Note title="Note">
  The `zone_id` in the client ID must exactly match the availability zone ID for zone-aware routing to work correctly.
</Note>

To find the availability zone ID where your application runs, refer to:

* [Availability Zone IDs for your AWS resources](https://docs.aws.amazon.com/ram/latest/userguide/working-with-az-ids.html)

## Authentication and Security settings

### Authentication

StreamNative Cloud uses [SASL/PLAIN](https://kafka.apache.org/documentation/#security_sasl_plain) authentication for Kafka client connections. To authenticate your Kafka clients, you'll need to:

1. Create a [**Service Account**](/cloud/security/authentication/service-accounts/service-accounts) and generate an API key. For details, see [API Keys](/cloud/security/authentication/service-accounts/use-api-keys/api-keys-overview).

2. Configure the following authentication settings when initializing your Kafka producer or consumer:

* `sasl.mechanism=PLAIN` - Specifies SASL/PLAIN as the authentication mechanism
* `security.protocol=SASL_SSL` - Enables SASL authentication over SSL/TLS
* `sasl.username` - Can be set to any value as it is not used
* `sasl.password=token:<API KEY>` - Must be set to `token:` followed by your generated API key

#### Java Client Settings

An example of the properties file for Java based applications and clients is provided below:

```properties theme={null}
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="unused" password="token:<API KEY>";
```

Please replace `<API KEY>` with the API key you generated.

#### librdkafka Settings

An example of the properties file for [librdkafka](https://github.com/edenhill/librdkafka) based applications and clients is provided below:

```properties theme={null}
sasl.mechanism=PLAIN
security.protocol=SASL_SSL
sasl.username=unused
sasl.password=token:<API KEY>
```

Please replace `<API KEY>` with the API key you generated.

## Common client settings

The following table provides several common client settings for **Producers** and **Consumers** that you can review for potential modification.

| Configuration property                   | Java default       | librdkafka default                 | Notes                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
| ---------------------------------------- | ------------------ | ---------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
| `client.id`                              | empty string       | `rdkafka`                          | You should set the `client.id` to something meaningful in your application, especially if you are running multiple clinets or want to easily trace logs or activities to specific client instances. This setting is also important for zone-aware routing, as it helps StreamNative Cloud route the traffic to the correct availability zone to eliminate cross-AZ networking traffic. See [Eliminate Cross-AZ Networking Traffic](#eliminate-cross-az-networking-traffic) for more information. |
| `connections.max.idle.ms`                | 540000 ms (9 mins) | See librdkafka `socket.timeout.ms` | You can change this when an intermediate load balancer disconnects idle connections after inactivity. For example, AWS 350 seconds, Azure 4 minutes, Google Cloud 10 minutes.                                                                                                                                                                                                                                                                                                                    |
| `socket.connection.setup.timeout.max.ms` | 30000 ms (30 secs) | not available                      | librdkafka doesn't have exponential backoff for this timeout, so you can increase `socket.connection.setup.timeout.ms` to avoid connection failures.                                                                                                                                                                                                                                                                                                                                             |
| `socket.connection.setup.timeout.ms`     | 10000 ms (10 secs) | 30000 ms (30 secs)                 | librdkafka doesn't have exponential backoff for this timeout, so you can increase this value to avoid connection failures.                                                                                                                                                                                                                                                                                                                                                                       |
| `metadata.max.age.ms`                    | 300000 ms (5 mins) | 900000 ms (15 mins)                | librdkafka has the `topic.metadata.refresh.interval.ms` setting that defaults to 300000 ms (5 mins).                                                                                                                                                                                                                                                                                                                                                                                             |
| `reconnect.backoff.max.ms`               | 1000 ms (1 second) | 10000 ms (10 seconds)              |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
| `reconnect.backoff.ms`                   | 50 ms              | 100 ms                             |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
| `max.in.flight.requests.per.connection`  | 5                  | 1000000                            | librdkafka produces to a single partition per batch, setting it to 5 limits producing to 5 partitions per broker                                                                                                                                                                                                                                                                                                                                                                                 |
