1. Java

Kafka Java Client Guide

You can use Kafka Java client to produce and consume messages from a StreamNative Cloud cluster. An overview of the Kafka producers and consumers for the Java client is provided below.

A producer sends messages to topics in a StreamNative Cloud cluster. Key components of a Java producer are listed below:

  • ProducerRecord: Represents a message to be sent to a topic. It requires a topic name to send the message, and optionally, you can also specify a key and a partition number.
  • KafkaProducer: Responsible for sending messages to their respective topics.
  • Serializer: Converts user objects to bytes to be sent to the StreamNative Cloud cluster. Kafka provides serializers for common data types, and you can also write your own serializers.

A consumer reads messages from topics in a StreamNative Cloud cluster. Key components of a Java consumer are listed below:

  • ConsumerRecord: Represents a message read from StreamNative Cloud.
  • KafkaConsumer: Responsible for reading messages from the StreamNative Cloud cluster.
  • Deserializer: Converts bytes received from the StreamNative Cloud cluster to user objects. Kafka provides deserializers for common data types, and you can also write your own deserializers.
  • ConsumerGroup: A group of consumers that work together to consume messages from a topic.

For a step-by-step guide on building a Java client application using Kafka Protocol, see Getting Started with Kafka Protocol and Java.

Client installation

To use the Kafka Java client, you can add the following maven dependency to your pom.xml file:

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>3.9.0</version>
</dependency>

Authentication

StreamNative Cloud supports using SASL/PLAIN authentication to connect Kafka clients to StreamNative Cloud. You can specify the SASL mechanism in the properties file used for initializing the Kafka producer or consumer. An example of the properties file is provided below:

security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="unused" password="token:<API KEY>";

Replace <API KEY> with your StreamNative Cloud API key. See API Keys for more information.

Kafka Producer

Initialization

The Java producer is constructed with a standard Properties file. The following example shows how to initialize a producer:

Properties props = new Properties();
props.put("client.id", InetAddress.getLocalHost().getHostName());
props.put("bootstrap.servers", "<BOOTSTRAP_SERVERS>");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);

Replace <BOOTSTRAP_SERVERS> with the bootstrap servers for your StreamNative Cloud cluster. You can find the bootstrap servers from Cluster Details page in Cluster Dashboard.

Configuration errors will result in a raised KafkaException from the constructor of KafkaProducer.

Asynchronous send

The Java producer supports asynchronous send of messages to StreamNative Cloud via the send() API. The send() API returns a future which can be polled to get the result of the send operation.

final ProducerRecord<K, V> record = new ProducerRecord<>(topic, key, value);
Future<RecordMetadata> future = producer.send(record);

The producer example shows how to invoke some code after the write operation has completed you can also provide a callback. In Java, this is done by implementing the Callback interface.

final ProducerRecord<K, V> record = new ProducerRecord<>(topic, key, value);
producer.send(record, new Callback() {
  public void onCompletion(RecordMetadata metadata, Exception e) {
    if (e != null)
      log.debug("Send failed for record {}", record, e);
  }
});

In the Java implementation you should avoid doing any expensive work in this callback since it is executed in the producer’s IO thread.

Synchronous send

Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();

Kafka Consumer

The Java consumer is constructed with a standard Properties file. The following example shows how to initialize a consumer:

Properties config = new Properties();
config.put("client.id", InetAddress.getLocalHost().getHostName());
config.put("group.id", "foo");
config.put("bootstrap.servers", "<BOOTSTRAP_SERVERS>");
KafkaConsumer<K, V> consumer = new KafkaConsumer<>(config);

Replace <BOOTSTRAP_SERVERS> with the bootstrap servers for your StreamNative Cloud cluster. You can find the bootstrap servers from Cluster Details page in Cluster Dashboard.

Configuration errors will result in a raised KafkaException from the constructor of KafkaConsumer.

Basic usage

The Java client is designed around an event loop which is driven by the poll() API. This design is motivated by the UNIX select and poll system calls. A basic consumption loop with the Java API usually takes the following form:

while (running) {
  ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
  for (ConsumerRecord<K, V> record : records) {
    // application-specific processing
    System.out.println("Received message: " + record.value());
  }
  consumer.commitSync();
}

There is no background thread in the Java consumer. The API depends on calls to poll() to drive all of its IO including:

  • Joining the consumer group and handling partition rebalances.
  • Sending periodic heartbeats if part of an active generation.
  • Sending periodic offset commits (if autocommit is enabled).
  • Sending and receiving fetch requests for assigned partitions.

Due to this single-threaded model, no heartbeats can be sent while the application is handling the records returned from a call to poll(). This means that the consumer will fall out of the consumer group if either the event loop terminates or if a delay in record processing causes the session timeout to expire before the next iteration of the loop. This is actually by design. One of the problems that the Java client attempts to solve is ensuring the liveness of consumers in the group. As long as the consumer is assigned partitions, no other members in the group can consume from the same partitions, so it is important to ensure that it is actually making progress and has not become a zombie.

This feature protects your application from a large class of failures, but the downside is that it puts the burden on you to tune the session timeout so that the consumer does not exceed it in its normal record processing. The max.poll.records configuration option places an upper bound on the number of records returned from each call. You should use both poll() and max.poll.records with a fairly high session timeout (e.g. 30 to 60 seconds), and keeping the number of records processed on each iteration bounded so that worst-case behavior is predictable.

If you fail to tune these settings appropriately, the consequence is typically a CommitFailedException raised from the call to commit offsets for the processed records. If you are using the automatic commit policy, then you might not even notice when this happens since the consumer silently ignores commit failures internally (unless it’s occurring often enough to impact lag metrics). You can catch this exception and either ignore it or perform any needed rollback logic.

try {
  consumer.commitSync();
} catch (CommitFailedException e) {
  // application-specific rollback logic of processed records
}

Synchronous commit

The simplest and most reliable way to manually commit offsets is using a synchronous commit with commitSync(). As its name suggests, this method blocks until the commit has completed successfully.

private void doCommitSync() {
  try {
    consumer.commitSync();
  } catch (WakeupException e) {
    // we're shutting down, but finish the commit first and then
    // rethrow the exception so that the main loop can exit
    doCommitSync();
    throw e;
  } catch (CommitFailedException e) {
    // the commit failed with an unrecoverable error. if there is any
    // internal state which depended on the commit, you can clean it
    // up here. otherwise it's reasonable to ignore the error and go on
    log.debug("Commit failed", e);
  }
}

public void run() {
  try {
    consumer.subscribe(topics);

    while (true) {
      ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
      records.forEach(record -> process(record));
      doCommitSync();
    }
  } catch (WakeupException e) {
    // ignore, we're closing
  } catch (Exception e) {
    log.error("Unexpected error", e);
  } finally {
    consumer.close();
    shutdownLatch.countDown();
  }
}

In this example, a try/catch block is added around the call to commitSync(). The CommitFailedException is thrown when the commit cannot be completed because the group has been rebalanced. This is the main thing to be careful of when using the Java client. Since all network IO (including heartbeating) and message processing is done in the foreground, it is possible for the session timeout to expire while a batch of messages is being processed. To handle this, you have two choices.

First you can adjust the session.timeout.ms setting to ensure that the handler has enough time to finish processing messages. You can then tune max.partition.fetch.bytes to limit the amount of data returned in a single batch, though you will have to consider how many partitions are in the subscribed topics.

The second option is to do message processing in a separate thread, but you will have to manage flow control to ensure that the threads can keep up. For example, just pushing messages into a blocking queue would probably not be sufficient unless the rate of processing can keep up with the rate of delivery (in which case you might not need a separate thread). It may even exacerbate the problem if the poll loop is stuck blocking on a call to offer() while the background thread is handling an even larger batch of messages. The Java API offers a pause() method to help in these situations.

For now, you should set session.timeout.ms large enough that commit failures from rebalances are rare. As mentioned above, the only drawback to this is a longer delay before partitions can be re-assigned in the event of a hard failure (where the consumer cannot be cleanly shut down with close()). This should be rare in practice.

You should be careful in this example since the wakeup() might be triggered while the commit is pending. The recursive call is safe since the wakeup will only be triggered once.

Delivery guarantees

In the previous example, you get “at least once” delivery since the commit follows the message processing. By changing the order, however, you can get “at most once” delivery. But you must be a little careful with the commit failure, so you should change doCommitSync to return whether or not the commit succeeded. There’s also no longer any need to catch the WakeupException in the synchronous commit.

private boolean doCommitSync() {
  try {
    consumer.commitSync();
    return true;
  } catch (CommitFailedException e) {
    // the commit failed with an unrecoverable error. if there is any
    // internal state which depended on the commit, you can clean it
    // up here. otherwise it's reasonable to ignore the error and go on
    log.debug("Commit failed", e);
    return false;
  }
}

public void run() {
  try {
    consumer.subscribe(topics);

    while (true) {
      ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
      if (doCommitSync())
        records.forEach(record -> process(record));
    }
  } catch (WakeupException e) {
    // ignore, we're closing
  } catch (Exception e) {
    log.error("Unexpected error", e);
  } finally {
    consumer.close();
    shutdownLatch.countDown();
  }
}

Correct offset management is crucial because it affects the delivery guarantees of your application.

Asynchronous commit

public void run() {
  try {
    consumer.subscribe(topics);

    while (true) {
      ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
      records.forEach(record -> process(record));
      consumer.commitAsync();
    }
  } catch (WakeupException e) {
    // ignore, we're closing
  } catch (Exception e) {
    log.error("Unexpected error", e);
  } finally {
    consumer.close();
    shutdownLatch.countDown();
  }
}

The API gives you a callback which is invoked when the commit either succeeds or fails:

consumer.commitAsync(new OffsetCommitCallback() {
  public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
    if (exception != null)
      log.debug("Commit failed for offsets {}", offsets, exception);
  }
});

In the example below, synchronous commits are incorporated on rebalances and on close. For this, the subscribe() method has a variant which accepts a ConsumerRebalanceListener, which has two methods to hook into rebalance behavior.

private void doCommitSync() {
  try {
    consumer.commitSync();
  } catch (WakeupException e) {
    // we're shutting down, but finish the commit first and then
    // rethrow the exception so that the main loop can exit
    doCommitSync();
    throw e;
  } catch (CommitFailedException e) {
    // the commit failed with an unrecoverable error. if there is any
    // internal state which depended on the commit, you can clean it
    // up here. otherwise it's reasonable to ignore the error and go on
    log.debug("Commit failed", e);
  }
}

public void run() {
  try {
    consumer.subscribe(topics, new ConsumerRebalanceListener() {
      @Override
      public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        doCommitSync();
      }

      @Override
      public void onPartitionsAssigned(Collection<TopicPartition> partitions) {}
    });

    while (true) {
      ConsumerRecords<K, V> records = consumer.poll(Long.MAX_VALUE);
      records.forEach(record -> process(record));
      consumer.commitAsync();
    }
  } catch (WakeupException e) {
    // ignore, we're closing
  } catch (Exception e) {
    log.error("Unexpected error", e);
  } finally {
    try {
      doCommitSync();
    } finally {
      consumer.close();
      shutdownLatch.countDown();
    }
  }
}

API documentation

Click here to view the Java Client API documentation.

References

Previous
Overview