- Build Applications
- Kafka Clients
Kafka Transactions
Warning
If you are using a Ursa-Engine powered cluster, please note that transactions and topic compaction are not supported in Ursa Engine.
StreamNative supports Apache Kafka® compatible transaction semantics and APIs. For example, you can fetch messages starting from the last consumed offset and process them transactionally one by one, updating the last consumed offset and generating events as you go.
If a producer sends multiple messages to the same or different partitions and a network connection or broker failure occurs, it can be guaranteed that all messages are either completely written to the partition or not at all. This is crucial for applications that require strict guarantees, such as financial services transactions.
Transactions ensure exactly-once semantics (EOS) and atomicity.
EOS helps developers avoid the anomalies of at-most-once processing (possible event loss) and at-least-once processing (possible event duplication). Combined with idempotent producers, StreamNative supports EOS, ensuring that events are neither lost nor duplicated. Atomicity commits a set of messages across partitions as a unit: either all messages are committed, or none are. Data encapsulated and transmitted in a single operation can only succeed or fail globally, ensuring consistent transaction outcomes.
Connect to your cluster and send transaction messages
This section describes how to connect to your cluster and send transaction messages.
Before you begin
Note
- Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.
- The password for different utilities as
kcat
will be equal totoken:<API KEY>
.
You can follow the instructions to create an API key for the service account you choose to use.
Steps
Add Maven dependencies.
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.4.0</version> </dependency>
Open a terminal and run a Kafka consumer to receive a message from the
test-transaction-topic
topic. In this case, the isolation level toread_committed
, which means only 5 committed messages will be consumed, if you want to consume both committed and uncommitted messages, set it toread_uncommitted
.package org.example; import java.time.Duration; import java.util.Collections; import java.util.Properties; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; public class SNCloudReadCommittedConsumer { public static void main(String[] args) { // Replace these configs for your cluster String serverUrl = "SERVER-URL"; String jwtToken = "YOUR-API-KEY"; String token = "token:" + jwtToken; final String topicName = "test-transaction-topic"; String namespace = "public/default"; final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello-world"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format( "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", namespace, token)); // Set the isolation level to read_committed, which means only committed messages will be consumed // If you want to consume both committed and uncommitted messages, set it to read_uncommitted props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); // Create a consumer final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singleton(topicName)); System.out.println("running"); while (true) { final ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); if (!records.isEmpty()) { records.forEach(record -> System.out.println("Receive record: " + record.value() + " from " + record.topic() + "-" + record.partition() + "@" + record.offset())); } } } }
serverUrl
: the Kafka service URL of your StreamNative cluster.jwtToken
: an API key of your service account.
Open another terminal and run a Kafka producer to send 5 messages to the
test-transaction-topic
topic and commit it, then send 5 messages and abort it.package org.example; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.serialization.StringSerializer; public class SNCloudTransactionProducer { public static void main(String[] args) throws ExecutionException, InterruptedException { // 1. Replace these configs for your cluster String serverUrl = "SERVER-URL"; String jwtToken = "YOUR-API-KEY"; String token = "token:" + jwtToken; final String topicName = "test-transaction-topic"; String namespace = "public/default"; // 2. Create a producer with token authentication, which is equivalent to SASL/PLAIN mechanism in Kafka final Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format( "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", namespace, token)); // 3. Set the transactional.id property props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id"); // 4. Create a producer and start a transaction final KafkaProducer<String, String> producer = new KafkaProducer<>(props); producer.initTransactions(); producer.beginTransaction(); // 5. Produce 5 messages and commit it for (int i = 0; i < 5; i++) { String value = "Commit message " + i; final Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<>(topicName, value)); final RecordMetadata recordMetadata = recordMetadataFuture.get(); System.out.println("Send " + value + " to " + recordMetadata); } producer.commitTransaction(); // 6. Produce 5 messages and abort it producer.beginTransaction(); for (int i = 0; i < 5; i++) { String value = "Abort message " + i; final Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<>(topicName, value)); final RecordMetadata recordMetadata = recordMetadataFuture.get(); System.out.println("Send " + value + " to " + recordMetadata); } producer.close(); } }
serverUrl
: the Kafka service URL of your StreamNative cluster.jwtToken
: an API key of your service account.