1. Build Applications
  2. Pulsar Clients

Pulsar Transactions

Pulsar transactions enables event-streaming applications to consume, process, and produce messages in one atomic operation. That means:

  • Atomic writes across multiple topic partitions
  • Atomic acknowledgments across multiple topic partitions
  • All the operations made within one transaction either all succeed or all fail
  • Consumers are ONLY allowed to read committed messages

Note

Quick start

This section describes how to use the Transaction API to send and receive messages.

Prerequisites

  • Java 1.8 or higher version
  • Pulsar cluster 2.9.1 or higher version

Get the service URL of your StreamNative cluster

To get the service URL of a Pulsar cluster through the StreamNative Console, follow these steps.

  1. On the left navigation pane, in the Admin area, click Pulsar Clusters.

  2. Select the Details tab, and in the Access Points area, click Copy at the end of the row of the service URL.

Get the OAuth2 credential file of your service account

To get an OAuth2 credential file of a service account through the StreamNative Console, follow these steps.

  1. On the left navigation pane, click Service Accounts.

  2. In the row of the service account you want to use, in the Key File column, click the Download icon to download the OAuth2 credential file to your local directory.

    The OAuth2 credential file should be something like this:

    {
      "type": "SN_SERVICE_ACCOUNT",
      "client_id": "CLIENT_ID",
      "client_secret": "CLIENT_SECRET",
      "client_email": "[email protected]",
      "issuer_url": "https://auth.streamnative.cloud"
    }
    

Enable Transactions on your StreamNative cluster

  1. Log in to StreamNative Cloud Console.

  2. On the left navigation pane, in the Admin area, click Pulsar Clusters.

  3. Click Edit Cluster.

  4. Select the Advanced tab. In the Features area, enable the Transaction option.

    Transactions on Cloud

Use Transactions on your application

This example enables you to perform the following operations.

  1. Create a Pulsar client and enable Transactions.
  2. Create three producers to produce messages with a transaction to one input topic (input-topic) and two output topics (output-topic-1 and output-topic-2).
  3. Create three consumers to consume messages with a transaction from one input topic (input-topic) and two output topics (output-topic-1 and output-topic-2).
  4. Commit the transaction after the consumers consume messages successfully.
package io.streamnative.examples.transaction;

import com.beust.jcommander.JCommander;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URL;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class TransactionSyncExample {
    private static final Logger log = LoggerFactory.getLogger(TransactionSyncExample.class);
    public static void main(String[] args) throws Exception {
        JCommanderPulsar jct = new JCommanderPulsar();
        JCommander jCommander = new JCommander(jct, args);
        if (jct.help) {
            jCommander.usage();
            return;
        }

        String inputTopic = "persistent://public/default/input-topic";
        String outputTopicOne = "persistent://public/default/output-topic-1";
        String outputTopicTwo = "persistent://public/default/output-topic-2";

        PulsarClient client = PulsarClient.builder()
                // Create a Pulsar client and enable Transactions.
                .enableTransaction(true)
                .serviceUrl(jct.serviceUrl)
                .authentication(
                        AuthenticationFactoryOAuth2.clientCredentials(new URL(jct.issuerUrl), new URL(jct.credentialsUrl), jct.audience))
                .build();

        // Create three producers to produce messages to input and output topics.
        ProducerBuilder<String> producerBuilder = client.newProducer(Schema.STRING);
        Producer<String> inputProducer = producerBuilder.topic(inputTopic)
                .sendTimeout(0, TimeUnit.SECONDS).create();
        Producer<String> outputProducerOne = producerBuilder.topic(outputTopicOne)
                .sendTimeout(0, TimeUnit.SECONDS).create();
        Producer<String> outputProducerTwo = producerBuilder.topic(outputTopicTwo)
                .sendTimeout(0, TimeUnit.SECONDS).create();
        // Create three consumers to consume messages from input and output topics.
        Consumer<String> inputConsumer = client.newConsumer(Schema.STRING)
                .subscriptionName("your-subscription-name").topic(inputTopic).subscribe();
        Consumer<String> outputConsumerOne = client.newConsumer(Schema.STRING)
                .subscriptionName("your-subscription-name").topic(outputTopicOne).subscribe();
        Consumer<String> outputConsumerTwo = client.newConsumer(Schema.STRING)
                .subscriptionName("your-subscription-name").topic(outputTopicTwo).subscribe();

        int count = 2;
        // Produce messages to topics.
        for (int i = 0; i < count; i++) {
            inputProducer.send("Hello Pulsar! count : " + i);
        }

        // consume messages and produce to output topics with transaction
        for (int i = 0; i < count; i++) {

            // The consumer successfully receives messages. Then, create a transaction.
            Message<String> message = inputConsumer.receive();
            Transaction txn = null;
            try {
                txn = client.newTransaction()
                        .withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
                // process the message here...

                // The producers produce messages to output topics with the transaction
                outputProducerOne.newMessage(txn).value("Hello Pulsar! outputTopicOne count : " + i).send();
                outputProducerTwo.newMessage(txn).value("Hello Pulsar! outputTopicTwo count : " + i).send();

                // The consumers acknowledge the input message with the transaction
                inputConsumer.acknowledgeAsync(message.getMessageId(), txn).get();
                // commit the transaction
                txn.commit();
            } catch (ExecutionException e) {
                if (!(e.getCause() instanceof PulsarClientException.TransactionConflictException)) {
                    // if not TransactionConflictException,
                    // we should redeliver or negativeAcknowledge this message
                    // if you don't redeliver or negativeAcknowledge, the message will not receive again
                    inputConsumer.negativeAcknowledge(message);
                }

                // if a transaction has been created, should abort this transaction
                if (txn != null) {
                    txn.abort();
                }
            }
        }

        // consume messages from output topics and print them
        for (int i = 0; i < count; i++) {
            Message<String> message =  outputConsumerOne.receive();
            System.out.println("Receive transaction message: " + message.getValue());
        }

        for (int i = 0; i < count; i++) {
            Message<String> message =  outputConsumerTwo.receive();
            System.out.println("Receive transaction message: " + message.getValue());
        }
    }
}
  • serviceUrl: the broker service URL of your StreamNative cluster.
  • issuerUrl: the URL of your OAuth2 authentication provider. You can get the value from your downloaded OAuth2 credential file.
  • credentialsUrl: the path to your downloaded OAuth2 credential file. The privateKey parameter supports the following pattern formats:
    • file:///path/to/file
    • file:/path/to/file
    • data:application/json;base64,<base64-encoded value>
  • audience: the audience parameter is the Uniform Resource Name (URN), which is a combination of the urn:sn:pulsar, the organization name, and the Pulsar instance name, in this format urn:sn:pulsar:<org_name>:<instance_name>.

You should see the following output:

Receive transaction message: Hello Pulsar! count : 1
Receive transaction message: Hello Pulsar! count : 2
Receive transaction message: Hello Pulsar! count : 1
Receive transaction message: Hello Pulsar! count : 2

For a complete example about how to connect to a StreamNative cluster through the Transaction API, see Transaction API examples.

Previous
Message Rest API