- Build Applications
- Kafka Clients
- Optimize and Tune
Optimize Kafka Client for Throughput
To optimize for throughput, producers and consumers need to move as much data as possible within a given time period. This means maximizing the data transfer rate to achieve the highest possible throughput.
The configuration parameters discussed in this guide have varying ranges of values. The optimal settings depend on your chosen data streaming engine, specific requirements, and environmental factors such as average message size, number of partitions, and other system characteristics. Therefore, benchmarking is essential to validate and fine-tune the configuration for your particular application and environment.
Data Streaming Engine
Both the Classic Engine and Ursa Engine support high-throughput data streaming with different cost and latency trade-offs. The Classic Engine uses BookKeeper for storage, providing lower latency but at a higher cost. The Ursa Engine uses object storage, offering reduced costs but with slightly higher latency. Choose the engine that best aligns with your specific requirements - Classic Engine for latency-sensitive workloads, or Ursa Engine for more cost-effective solutions.
Data Format
StreamNative supports storing data in different formats to achieve varying levels of interoperability between protocols: kafka
, mixed_kafka
, and pulsar
. Each format has different performance characteristics:
kafka
format: Provides the best performance with Kafka clients. However, Pulsar consumers cannot consume this format unless a payload processor is employed.mixed_kafka
format: Functions similarly to thekafka
format and supports some non-official Kafka clients for encoding or decoding Kafka messages. Offers moderate performance.pulsar
format: Provides the highest interoperability between protocols. However, it incurs a performance penalty as it requires transforming data from Kafka producers into the Pulsar format before storage.
If you want to achieve the highest throughput with Kafka clients and don't need Pulsar clients to read the data, consider configuring your cluster to store data in the kafka
format.
Number of Partitions
A topic partition is the unit of parallelism in Kafka. Producers can send messages to different partitions in parallel, brokers can process different partitions in parallel, and consumers can read from different partitions in parallel. In general, a higher number of topic partitions results in higher throughput. To maximize throughput, you need enough partitions to effectively distribute the workload across all brokers in your StreamNative Cloud cluster.
However, there are trade-offs to increasing the number of partitions. When choosing the partition count, consider both producer and consumer throughput requirements, and validate performance through benchmarking in your environment. Additionally, carefully design your data patterns and key assignments to ensure messages are distributed evenly across topic partitions. This prevents hotspots where certain partitions become overloaded while others remain underutilized.
Batching Messages
Kafka producers can batch messages going to the same partition by collecting multiple messages to send together in a single request. One of the most important steps to optimize throughput is tuning producer batching to increase both the batch size and the time spent waiting for batches to fill with messages. Larger batch sizes result in fewer requests to the broker, which reduces load on producers and decreases broker CPU overhead for processing requests.
With the Java client, you can configure the batch.size
parameter to increase the maximum size in bytes of each message batch. To give more time for batches to fill, you can configure the linger.ms
parameter to have the producer wait longer before sending. This delay allows the producer to wait for the batch to reach the configured batch.size
. The trade-off is higher latency since messages aren't sent immediately when they're ready.
For Ursa Engine clusters, it is recommended to use large batch sizes and higher linger.ms
values to achieve better throughput.
Compression
To optimize for throughput, you can enable compression on the producer to reduce the number of bytes transmitted over the network. Enable compression by configuring the compression.type
parameter to one of the following standard compression codecs:
lz4
(recommended for performance)snappy
zstd
gzip
none
(default, meaning no compression)
Use lz4
for optimal performance instead of gzip
, which is more compute-intensive and may impact application performance. Compression is applied on full batches of data, so better batching results in better compression ratios.
Producer Acknowledgments
When a producer sends a message to StreamNative Cloud, the message is routed to a designated broker based on the underlying data streaming engine for the target partition. By default, the producer waits for an acknowledgment from the broker before sending subsequent messages. However, if acks=0
is configured, the producer sends messages without waiting for acknowledgment.
The acks
configuration parameter controls how many acknowledgments the designated broker must receive before responding to the producer:
acks=0
: Producer sends messages without waiting for any acknowledgmentacks=1
: Designated broker acknowledges after receiving at least one acknowledgment from the underlying storage. In StreamNative Cloud, due to its storage architecture differing from Apache Kafka, this setting behaves the same asacks=all
acks=all
: Designated broker waits for all acknowledgments from the underlying storage
Notes for Classic Engine clusters
In StreamNative Cloud, message durability is not determined by the acks
setting (acks=1
or acks=all
). Instead, durability is controlled by the underlying storage settings and namespace policies that define write quorum and ack quorum sizes. From a Kafka protocol perspective, both acks=1
and acks=all
behave identically in StreamNative Cloud - they wait for acknowledgments based on the cluster and namespace configurations.
To adjust throughput, modify the write quorum size and acknowledgment quorum size in either the namespace policies or cluster configuration. Cluster-level settings will apply globally to all namespaces.
Memory Allocation
Kafka producers automatically allocate memory for the Java client to store unsent messages. If that memory limit is reached, the producer blocks additional sends until memory frees up or until max.block.ms
time passes. You can adjust how much memory is allocated with the buffer.memory
configuration parameter.
If you don't have many partitions, you may not need to adjust this parameter at all. However, if you have many partitions, you can tune buffer.memory
—while taking into account the message size, linger time, and partition count—to maintain pipelines across more partitions. This enables better use of bandwidth across more brokers.
Consumer Fetching
Another way to optimize for throughput is to adjust how much data consumers receive from each fetch from the designated broker in StreamNative Cloud. You can increase how much data the consumers get from the designated broker for each fetch request by increasing the configuration parameter fetch.min.bytes
. This parameter sets the minimum number of bytes expected for a fetch response from a consumer.
Increasing fetch.min.bytes
reduces the number of fetch requests made to StreamNative Cloud, reducing the broker CPU overhead to process each fetch, thereby improving throughput. Similar to increasing batching on the producer side, there may be a resulting trade-off with higher latency when increasing this parameter on the consumer. This is because the broker won't send the consumer new messages until either:
- The fetch request has enough messages to fulfill the size requirement (
fetch.min.bytes
) - The wait time expires (
fetch.max.wait.ms
)
Consumer Groups
Assuming your application allows it, use consumer groups with multiple consumers to parallelize consumption. Parallelizing consumption can improve throughput because multiple consumers can balance the load by processing multiple partitions simultaneously. The upper limit on this parallelization is the number of partitions in the topic.
Summary
Here's a summary of key configurations for optimizing throughput:
Producer Configurations
Configuration | Recommended Value | Default Value | Description |
---|---|---|---|
batch.size | 100000-200000 | 16384 | Maximum size in bytes for message batches |
linger.ms | 10-100 | 0 | Time to wait for batches to fill |
compression.type | lz4 | none | Compression codec to use |
acks | all | all | Number of acknowledgments required |
buffer.memory | Increase if many partitions | 33554432 | Memory buffer size for unsent messages |
Consumer Configurations
Configuration | Recommended Value | Default Value | Description |
---|---|---|---|
fetch.min.bytes | ~100000 | 1 | Minimum data size for fetch responses |
fetch.max.wait.ms | 500 | 500 | Maximum time to wait for fetch response |