1. Build Applications
  2. Kafka Clients

Kafka Transactions

StreamNative supports Apache Kafka® compatible transaction semantics and APIs. For example, you can fetch messages starting from the last consumed offset and process them transactionally one by one, updating the last consumed offset and generating events as you go.

If a producer sends multiple messages to the same or different partitions and a network connection or broker failure occurs, it can be guaranteed that all messages are either completely written to the partition or not at all. This is crucial for applications that require strict guarantees, such as financial services transactions.

Transactions ensure exactly-once semantics (EOS) and atomicity.

EOS helps developers avoid the anomalies of at-most-once processing (possible event loss) and at-least-once processing (possible event duplication). Combined with idempotent producers, StreamNative supports EOS, ensuring that events are neither lost nor duplicated. Atomicity commits a set of messages across partitions as a unit: either all messages are committed, or none are. Data encapsulated and transmitted in a single operation can only succeed or fail globally, ensuring consistent transaction outcomes.

Connect to your cluster and send transaction messages

This section describes how to connect to your cluster and send transaction messages.

Before you begin

Note

  • Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.
  • The password for different utilities as kcat will be equal to token:<API KEY>.

You can follow the instructions to create an API key for the service account you choose to use.

Steps

  1. Add Maven dependencies.

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.4.0</version>
    </dependency>
    
  2. Open a terminal and run a Kafka consumer to receive a message from the test-transaction-topic topic. In this case, the isolation level to read_committed, which means only 5 committed messages will be consumed, if you want to consume both committed and uncommitted messages, set it to read_uncommitted.

    package org.example;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.config.SaslConfigs;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    public class SNCloudReadCommittedConsumer {
        public static void main(String[] args) {
    
            // Replace these configs for your cluster
            String serverUrl = "SERVER-URL";
            String jwtToken = "YOUR-API-KEY";
            String token = "token:" + jwtToken;
    
            final String topicName = "test-transaction-topic";
            String namespace = "public/default";
    
            final Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello-world");
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
            props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
                    namespace, token));
    
            // Set the isolation level to read_committed, which means only committed messages will be consumed
            // If you want to consume both committed and uncommitted messages, set it to read_uncommitted
            props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
    
            // Create a consumer
            final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singleton(topicName));
            System.out.println("running");
            while (true) {
                final ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                if (!records.isEmpty()) {
                    records.forEach(record -> System.out.println("Receive record: " + record.value() + " from "
                         + record.topic() + "-" + record.partition() + "@" + record.offset()));
                }
            }
        }
    }
    
    • serverUrl: the Kafka service URL of your StreamNative cluster.
    • jwtToken: an API key of your service account.
  3. Open another terminal and run a Kafka producer to send 5 messages to the test-transaction-topic topic and commit it, then send 5 messages and abort it.

    package org.example;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.config.SaslConfigs;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    public class SNCloudTransactionProducer {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // 1. Replace these configs for your cluster
            String serverUrl = "SERVER-URL";
            String jwtToken = "YOUR-API-KEY";
            String token = "token:" + jwtToken;
            final String topicName = "test-transaction-topic";
            String namespace = "public/default";
    
            // 2. Create a producer with token authentication, which is equivalent to SASL/PLAIN mechanism in Kafka
            final Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
            props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
                    namespace, token));
    
            // 3. Set the transactional.id property
            props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
    
            // 4. Create a producer and start a transaction
            final KafkaProducer<String, String> producer = new KafkaProducer<>(props);
            producer.initTransactions();
            producer.beginTransaction();
    
            // 5. Produce 5 messages and commit it
            for (int i = 0; i < 5; i++) {
                String value = "Commit message " + i;
                final Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<>(topicName, value));
                final RecordMetadata recordMetadata = recordMetadataFuture.get();
                System.out.println("Send " + value + " to " + recordMetadata);
            }
    
            producer.commitTransaction();
    
            // 6. Produce 5 messages and abort it
            producer.beginTransaction();
            for (int i = 0; i < 5; i++) {
                String value = "Abort message " + i;
                final Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<>(topicName, value));
                final RecordMetadata recordMetadata = recordMetadataFuture.get();
                System.out.println("Send " + value + " to " + recordMetadata);
            }
    
            producer.close();
        }
    }
    
    • serverUrl: the Kafka service URL of your StreamNative cluster.
    • jwtToken: an API key of your service account.
Previous
Lenses