- ProducerRecord: Represents a message to be sent to a topic. It requires a topic name to send the message, and optionally, you can also specify a key and a partition number.
- KafkaProducer: Responsible for sending messages to their respective topics.
- Serializer: Converts user objects to bytes to be sent to the StreamNative Cloud cluster. Kafka provides serializers for common data types, and you can also write your own serializers.
- ConsumerRecord: Represents a message read from StreamNative Cloud.
- KafkaConsumer: Responsible for reading messages from the StreamNative Cloud cluster.
- Deserializer: Converts bytes received from the StreamNative Cloud cluster to user objects. Kafka provides deserializers for common data types, and you can also write your own deserializers.
- ConsumerGroup: A group of consumers that work together to consume messages from a topic.
Client installation
To use the Kafka Java client, you can add the following maven dependency to yourpom.xml
file:
Authentication
StreamNative Cloud supports using SASL/PLAIN authentication to connect Kafka clients to StreamNative Cloud. You can specify the SASL mechanism in the properties file used for initializing the Kafka producer or consumer. An example of the properties file is provided below:<API KEY>
with your StreamNative Cloud API key. See API Keys for more information.
Kafka Producer
Initialization
The Java producer is constructed with a standardProperties
file. The following example shows how to initialize a producer:
<BOOTSTRAP_SERVERS>
with the bootstrap servers for your StreamNative Cloud cluster. You can find the bootstrap servers from Cluster Details page in Cluster Dashboard.
Configuration errors will result in a raised KafkaException
from the constructor of KafkaProducer
.
Asynchronous send
The Java producer supports asynchronous send of messages to StreamNative Cloud via thesend()
API. The send()
API returns a future which can be polled to get the result of the send operation.
Callback
interface.
Synchronous send
Kafka Consumer
The Java consumer is constructed with a standardProperties
file. The following example shows how to initialize a consumer:
<BOOTSTRAP_SERVERS>
with the bootstrap servers for your StreamNative Cloud cluster. You can find the bootstrap servers from Cluster Details page in Cluster Dashboard.
Configuration errors will result in a raised KafkaException
from the constructor of KafkaConsumer
.
Basic usage
The Java client is designed around an event loop which is driven by thepoll()
API. This design is motivated by the UNIX select
and poll
system calls. A basic consumption loop with the Java API usually takes the following form:
poll()
to drive all of its IO including:
- Joining the consumer group and handling partition rebalances.
- Sending periodic heartbeats if part of an active generation.
- Sending periodic offset commits (if autocommit is enabled).
- Sending and receiving fetch requests for assigned partitions.
poll()
. This means that the consumer will fall out of the consumer group if either the event loop terminates or if a delay in record processing causes the session timeout to expire before the next iteration of the loop. This is actually by design. One of the problems that the Java client attempts to solve is ensuring the liveness of consumers in the group. As long as the consumer is assigned partitions, no other members in the group can consume from the same partitions, so it is important to ensure that it is actually making progress and has not become a zombie.
This feature protects your application from a large class of failures, but the downside is that it puts the burden on you to tune the session timeout so that the consumer does not exceed it in its normal record processing. The max.poll.records
configuration option places an upper bound on the number of records returned from each call. You should use both poll()
and max.poll.records
with a fairly high session timeout (e.g. 30 to 60 seconds), and keeping the number of records processed on each iteration bounded so that worst-case behavior is predictable.
If you fail to tune these settings appropriately, the consequence is typically a CommitFailedException
raised from the call to commit offsets for the processed records. If you are using the automatic commit policy, then you might not even notice when this happens since the consumer silently ignores commit failures internally (unless it’s occurring often enough to impact lag metrics). You can catch this exception and either ignore it or perform any needed rollback logic.
Synchronous commit
The simplest and most reliable way to manually commit offsets is using a synchronous commit withcommitSync()
. As its name suggests, this method blocks until the commit has completed successfully.
commitSync()
. The CommitFailedException
is thrown when the commit cannot be completed because the group has been rebalanced. This is the main thing to be careful of when using the Java client. Since all network IO (including heartbeating) and message processing is done in the foreground, it is possible for the session timeout to expire while a batch of messages is being processed. To handle this, you have two choices.
First you can adjust the session.timeout.ms
setting to ensure that the handler has enough time to finish processing messages. You can then tune max.partition.fetch.bytes
to limit the amount of data returned in a single batch, though you will have to consider how many partitions are in the subscribed topics.
The second option is to do message processing in a separate thread, but you will have to manage flow control to ensure that the threads can keep up. For example, just pushing messages into a blocking queue would probably not be sufficient unless the rate of processing can keep up with the rate of delivery (in which case you might not need a separate thread). It may even exacerbate the problem if the poll loop is stuck blocking on a call to offer() while the background thread is handling an even larger batch of messages. The Java API offers a pause() method to help in these situations.
For now, you should set session.timeout.ms
large enough that commit failures from rebalances are rare. As mentioned above, the only drawback to this is a longer delay before partitions can be re-assigned in the event of a hard failure (where the consumer cannot be cleanly shut down with close()). This should be rare in practice.
You should be careful in this example since the wakeup()
might be triggered while the commit is pending. The recursive call is safe since the wakeup will only be triggered once.
Delivery guarantees
In the previous example, you get “at least once” delivery since the commit follows the message processing. By changing the order, however, you can get “at most once” delivery. But you must be a little careful with the commit failure, so you should changedoCommitSync
to return whether or not the commit succeeded. There’s also no longer any need to catch the WakeupException
in the synchronous commit.
Asynchronous commit
subscribe()
method has a variant which accepts a ConsumerRebalanceListener
, which has two methods to hook into rebalance behavior.