pom.xml
file:
<API KEY>
with your StreamNative Cloud API key. See API Keys for more information.
Properties
file. The following example shows how to initialize a producer:
<BOOTSTRAP_SERVERS>
with the bootstrap servers for your StreamNative Cloud cluster. You can find the bootstrap servers from Cluster Details page in Cluster Dashboard.
Configuration errors will result in a raised KafkaException
from the constructor of KafkaProducer
.
send()
API. The send()
API returns a future which can be polled to get the result of the send operation.
Callback
interface.
Properties
file. The following example shows how to initialize a consumer:
<BOOTSTRAP_SERVERS>
with the bootstrap servers for your StreamNative Cloud cluster. You can find the bootstrap servers from Cluster Details page in Cluster Dashboard.
Configuration errors will result in a raised KafkaException
from the constructor of KafkaConsumer
.
poll()
API. This design is motivated by the UNIX select
and poll
system calls. A basic consumption loop with the Java API usually takes the following form:
poll()
to drive all of its IO including:
poll()
. This means that the consumer will fall out of the consumer group if either the event loop terminates or if a delay in record processing causes the session timeout to expire before the next iteration of the loop. This is actually by design. One of the problems that the Java client attempts to solve is ensuring the liveness of consumers in the group. As long as the consumer is assigned partitions, no other members in the group can consume from the same partitions, so it is important to ensure that it is actually making progress and has not become a zombie.
This feature protects your application from a large class of failures, but the downside is that it puts the burden on you to tune the session timeout so that the consumer does not exceed it in its normal record processing. The max.poll.records
configuration option places an upper bound on the number of records returned from each call. You should use both poll()
and max.poll.records
with a fairly high session timeout (e.g. 30 to 60 seconds), and keeping the number of records processed on each iteration bounded so that worst-case behavior is predictable.
If you fail to tune these settings appropriately, the consequence is typically a CommitFailedException
raised from the call to commit offsets for the processed records. If you are using the automatic commit policy, then you might not even notice when this happens since the consumer silently ignores commit failures internally (unless it’s occurring often enough to impact lag metrics). You can catch this exception and either ignore it or perform any needed rollback logic.
commitSync()
. As its name suggests, this method blocks until the commit has completed successfully.
commitSync()
. The CommitFailedException
is thrown when the commit cannot be completed because the group has been rebalanced. This is the main thing to be careful of when using the Java client. Since all network IO (including heartbeating) and message processing is done in the foreground, it is possible for the session timeout to expire while a batch of messages is being processed. To handle this, you have two choices.
First you can adjust the session.timeout.ms
setting to ensure that the handler has enough time to finish processing messages. You can then tune max.partition.fetch.bytes
to limit the amount of data returned in a single batch, though you will have to consider how many partitions are in the subscribed topics.
The second option is to do message processing in a separate thread, but you will have to manage flow control to ensure that the threads can keep up. For example, just pushing messages into a blocking queue would probably not be sufficient unless the rate of processing can keep up with the rate of delivery (in which case you might not need a separate thread). It may even exacerbate the problem if the poll loop is stuck blocking on a call to offer() while the background thread is handling an even larger batch of messages. The Java API offers a pause() method to help in these situations.
For now, you should set session.timeout.ms
large enough that commit failures from rebalances are rare. As mentioned above, the only drawback to this is a longer delay before partitions can be re-assigned in the event of a hard failure (where the consumer cannot be cleanly shut down with close()). This should be rare in practice.
You should be careful in this example since the wakeup()
might be triggered while the commit is pending. The recursive call is safe since the wakeup will only be triggered once.
doCommitSync
to return whether or not the commit succeeded. There’s also no longer any need to catch the WakeupException
in the synchronous commit.
subscribe()
method has a variant which accepts a ConsumerRebalanceListener
, which has two methods to hook into rebalance behavior.