Transactions

KSN supports Apache Kafka® compatible transaction semantics and APIs. For example, you can fetch messages starting from the last consumed offset and process them transactionally one by one, while updating the last consumed offset and generating events.

If a producer sends multiple messages to the same or different partitions and a network connection or broker failure causes the transaction to fail, it can be guaranteed that all messages are either all written to the partition or none are written. This is very important for applications that require strict guarantees, such as financial services transactions.

Transactions ensure exactly-once semantics (EOS) and atomicity.

EOS helps developers avoid anomalies of at-most-once processing (possible event loss) and at-least-once processing (possible event duplication). When transactions are combined with idempotent producers, KSN supports EOS. Atomicity also commits a set of messages across partitions as a unit: either all messages are committed, or none are committed. Encapsulated data received or sent in a single operation can only succeed or fail globally.

Connect to your cluster and send transaction messages

This section describes how to connect to your cluster and send transaction messages.

Before you begin

Note

  • Before getting the token of a service account, verify that the service account is authorized as a superuser or an admin of the tenants and namespaces.
  • A token has a system-defined Time-To-Live (TTL) of 7 days. Before a token expires, ensure that you generate a new token for your service account.
  • The password for different utilities as kcat will be equal to token:TOKEN
  • Get the JWT token.

    1. On the left navigation pane, click Service Accounts.

    2. In the row of the service account you want to use, in the Token column, click Generate new token, then click the Copy icon to copy the token to your clipboard.

  • Get the service URL of your Pulsar cluster.

    1. On the left navigation pane, in the Admin area, click Pulsar Clusters.
    2. Select the Details tab, and in the Access Points area, click Copy at the end of the row of the Kafka Service URL (TCP).

Steps

  1. Add Maven dependencies.

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.4.0</version>
    </dependency>
    
  2. Open a terminal and run a Kafka consumer to receive a message from the test-transaction-topic topic. In this case, the isolation level to read_committed, which means only 5 committed messages will be consumed, if you want to consume both committed and uncommitted messages, set it to read_uncommitted.

    package org.example;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.config.SaslConfigs;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    public class SNCloudReadCommittedConsumer {
        public static void main(String[] args) {
    
            // Replace these configs for your cluster
            String serverUrl = "SERVER-URL";
            String jwtToken = "YOUR-TOKEN";
            String token = "token:" + jwtToken;
    
            final String topicName = "test-transaction-topic";
            String namespace = "public/default";
    
            final Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "hello-world");
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
            props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
                    namespace, token));
    
            // Set the isolation level to read_committed, which means only committed messages will be consumed
            // If you want to consume both committed and uncommitted messages, set it to read_uncommitted
            props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
    
            // Create a consumer
            final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singleton(topicName));
            System.out.println("running");
            while (true) {
                final ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                if (!records.isEmpty()) {
                    records.forEach(record -> System.out.println("Receive record: " + record.value() + " from "
                         + record.topic() + "-" + record.partition() + "@" + record.offset()));
                }
            }
        }
    }
    
    • serverUrl: the Kafka service URL of your Pulsar cluster.
    • jwtToken: the token of your service account.
  3. Open another terminal and run a Kafka producer to send 5 messages to the test-transaction-topic topic and commit it, then send 5 messages and abort it.

    package org.example;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.config.SaslConfigs;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    public class SNCloudTransactionProducer {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // 1. Replace these configs for your cluster
            String serverUrl = "SERVER-URL";
            String jwtToken = "YOUR-TOKEN";
            String token = "token:" + jwtToken;
            final String topicName = "test-transaction-topic";
            String namespace = "public/default";
    
            // 2. Create a producer with token authentication, which is equivalent to SASL/PLAIN mechanism in Kafka
            final Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, serverUrl);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
            props.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
                    namespace, token));
    
            // 3. Set the transactional.id property
            props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
    
            // 4. Create a producer and start a transaction
            final KafkaProducer<String, String> producer = new KafkaProducer<>(props);
            producer.initTransactions();
            producer.beginTransaction();
    
            // 5. Produce 5 messages and commit it
            for (int i = 0; i < 5; i++) {
                String value = "Commit message " + i;
                final Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<>(topicName, value));
                final RecordMetadata recordMetadata = recordMetadataFuture.get();
                System.out.println("Send " + value + " to " + recordMetadata);
            }
    
            producer.commitTransaction();
    
            // 6. Produce 5 messages and abort it
            producer.beginTransaction();
            for (int i = 0; i < 5; i++) {
                String value = "Abort message " + i;
                final Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<>(topicName, value));
                final RecordMetadata recordMetadata = recordMetadataFuture.get();
                System.out.println("Send " + value + " to " + recordMetadata);
            }
    
            producer.close();
        }
    }
    
Previous
Lenses