1. Build Applications
  2. Kafka Clients
  3. Advanced Features

Interoperability between Apache Kafka and Apache Pulsar

Produce and consume messages between Kafka and Pulsar

StreamNative supports three data entry formats, kafka and pulsar. Each format has distinct characteristics:

  • kafka format: This provides the best performance; however, a Pulsar consumer cannot consume it unless a payload processor is employed.
  • pulsar format: This is the default data entry format on StreamNative cloud which supporting interoperability between Kafka and Pulsar clients, including Kafka client to Kafka client, and Pulsar client to Kafka client interactions, and vice versa. This means data between Apache Kafka and Apache Pulsar are interoperable. It is suitable for most scenarios where performance is not a critical consideration.
  • pulsar_non_batched format: It is similar to pulsar entry format, the difference is this entry format will encode the Kafka batch messages to non-batched messages for Pulsar client to consume messages with a key-shared subscription, for Ursa, the behavior will be same as pulsar format.

StreamNative Cloud also supports specifying the entry format on a per-topic basis. The entry format can be set through topic properties by using bin/pulsar-admin topics update-properties. The configuration key is kafkaEntryFormat, and the possible values are kafka or pulsar. The default value is pulsar if not specified.

Interoperability between Pulsar and Kafka clients

With the kafka entry format, Kafka producers can produce and consume messages directly and freely. However, Pulsar producers SHOULD NOT produce messages in these topics because they are unable to encode messages into a format consumable by Kafka clients.

However, since version 2.9 of the Pulsar client, we introduced a message payload processor for Pulsar consumers. This means that messages produced from Kafka producers can now be consumed and decoded by Pulsar consumers.

For the pulsar format, it allows messages to be freely produced and consumed between Kafka and Pulsar clients. The message format conversion is automatically handled by the broker, enabling more flexible use of either Kafka clients or Pulsar clients.

Details for the message format conversion with pulsar format

The conversion is mostly intuitive except for some edge cases. Take the following simple case for example,

// The type of `kafkaProducer` is `KafkaProducer<String, String>`
kafkaProducer.send(new ProducerRecord<>(topic, "value")).get();
// The type of `pulsarConsumer` is `Consumer<byte[]>`
final var msg = pulsarConsumer.receive();
final var value = new String(msg.getValue(), StandardCharsets.UTF_8); // value will be "value"

The value received by a Pulsar consumer is guaranteed to be the same with the original value sent by a Kafka producer.

However, a Kafka message has some extra metadata like:

  • key: the key used for routing the message to a specific partition
  • headers: a list of headers, each header is a key-value pair

A Pulsar message has similar fields:

  • key: the same as Kafka's key
  • ordering key: the key used for Key_Shared subscriptions
  • properties: a list of properties, each property is a key-value pair

It should be noted that the types of key and header value are both byte[] in Kafka, while in Pulsar, the types of key and property value are both String. For keys, each key will be converted to a base64-encoded string as Pulsar's key. See the following example:

kafkaProducer.send(new ProducerRecord<>(topic, 0, "key", "value")).get();
final var msg = pulsarConsumer.receive();
final String key = msg.getKey(); // "a2V5"
final byte[] keyBytes = msg.getKeyBytes(); // [107, 101, 121] (the byte array of "key")
final boolean hasBase64EncodedKey = msg.hasBase64EncodedKey(); // true
final byte[] orderingKey = msg.getOrderingKey(); // [107, 101, 121]

You should use getKeyBytes() or getOrderingKey() to retrieve the original keys of Kafka messages. The anti-intuitive behavior is that the getKey() method will return the base64-encoded string. This behavior is made because the byte array could vary after the bytes -> UTF-8 string -> bytes conversion, for example:

final var keyBytes = new byte[]{ 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, (byte) 0x88 };
// converted: [0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0xef, 0xbf, 0xbd]
final var converted = new String(keyBytes, StandardCharsets.UTF_8).getBytes(StandardCharsets.UTF_8);

Things get much more complicated with the conversion on headers.

  1. For a header key, there could be multiple values in a Kafka message but there is only a single property value for a given property key in a Pulsar message.
  2. There is no way to get the bytes of a Pulsar property value.

For the 1st issue, the latest value will be retained.

For the 2nd issue, the conversion will be performed with the following approach:

  1. Convert the bytes directly as a UTF-8 string
  2. If the bytes is not a valid UTF-8 string's bytes, convert it to a base64 encoded string.

For example,

final var headers = new ArrayList<Header>();
headers.add(new RecordHeader("header-key", "header-value".getBytes(StandardCharsets.UTF_8)));
headers.add(new RecordHeader("header-key", "header-value-2".getBytes(StandardCharsets.UTF_8)));
headers.add(new RecordHeader("header-key-2", new byte[]{ (byte) 0x88 }));
kafkaProducer.send(new ProducerRecord<>(topic, 0, "key", "value", headers)).get();
final var msg = pulsarConsumer.receive();
final var value1 = msg.getProperty("header-key"); // "header-value-2"
final var value2 = msg.getProperty("header-key-2"); // "iA=="

There is another corner case that an extra property whose key is __ksn_internal_header_format will be received by the Pulsar consumer if there is a header value that is a base64-encoded string's bytes.

final var headers = new ArrayList<Header>();
headers.add(new RecordHeader("header-key", "a2v5".getBytes(StandardCharsets.UTF_8)));
kafkaProducer.send(new ProducerRecord<>(topic, 0, "key", "value", headers)).get();
final var msg = pulsarConsumer.receive();
final var value1 = msg.getProperty("header-key"); // "a2V5"
final var value2 = msg.getProperty("__ksn_internal_header_format"); // a non-null JSON value

Step to Produce msg with Kafka and Consume msg with Pulsar client

Requirements

Pulsar Client Version ≥ 2.9 Pulsar Admin Version ≥ 2.10

Step

  1. Create topic with kafka format via Pulsar admin CLI

    Note

    The kafka format can be following the same step, and using kafkaEntryFormat=kafka property.

    bin/pulsar-admin \
      --admin-url <Pulsar Service Http Url> \
      --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
      --auth-params token:<API Key> \
      topics create persistent://public/default/topic-with-kafka-format --metadata kafkaEntryFormat=kafka
    
    # Get properties to check if the entry format properties setting successfully
    bin/pulsar-admin \
      --admin-url <Pulsar Service Http Url> \
      --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
      --auth-params token:<API Key> \
      topics get-properties persistent://public/default/topic-with-kafka-format
    
  2. Install client libraries

    <!-- Use to decode the Kafka format message -->
    <dependency>
        <groupId>io.streamnative.pulsar.handlers</groupId>
        <artifactId>kafka-payload-processor</artifactId>
        <version>3.1.0.4</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.4.0</version>
    </dependency>
    
    <dependency>
      <groupId>org.apache.pulsar</groupId>
      <artifactId>pulsar-client</artifactId>
      <version>3.1.0</version>
    </dependency>
    
  3. Kafka Producer and Pulsar Consumer with Kafka format

    import io.streamnative.pulsar.handlers.kop.KafkaPayloadProcessor;
    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.apache.pulsar.client.api.AuthenticationFactory;
    import org.apache.pulsar.client.api.Consumer;
    import org.apache.pulsar.client.api.Message;
    import org.apache.pulsar.client.api.PulsarClient;
    import org.apache.pulsar.client.api.SubscriptionInitialPosition;
    
    import java.io.IOException;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    
    public class KafkaFormatKafkaProducePulsarConsume {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
            // replace these configs with your cluster
            final String kafkaServerUrl = "<Kafka Service Url>";
            final String pulsarServerUrl = "<Pulsar Service Http Url>";
            final String jwtToken = "<API Key>";
            final String token = "token:" + jwtToken;
    
            final String topicName = "topic-with-kafka-format";
            final String namespace = "public/default";
    
            final Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerUrl);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            // 1. Create a producer with token authentication, which is equivalent to SASL/PLAIN mechanism in Kafka
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            props.put("sasl.mechanism", "PLAIN");
            props.put("sasl.jaas.config", String.format(
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
                    namespace, token));
    
            // 2. Create a producer
            final KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    
            // 3. Produce messages with Kafka producer
            for (int i = 0; i < 5; i++) {
                String value = "hello world";
                final Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<>(topicName, value));
                final RecordMetadata recordMetadata = recordMetadataFuture.get();
                System.out.println("Send " + value + " to " + recordMetadata);
            }
            producer.close();
    
            // 4. Consume messages with Pulsar consumer
            PulsarClient client = PulsarClient.builder()
                    .serviceUrl(pulsarServerUrl)
                    .authentication(AuthenticationFactory.token(jwtToken))
                    .build();
    
            Consumer<byte[]> consumer = client.newConsumer()
                    .topic(topicName)
                    .subscriptionName("test")
                    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                    // Set the Kafka payload processor to decode the kafka format message
                    .messagePayloadProcessor(new KafkaPayloadProcessor())
                    .subscribe();
    
            for (int i = 0; i < 5; i++) {
                Message<byte[]> msg = consumer.receive();
                consumer.acknowledge(msg);
                System.out.println("Receive message " + new String(msg.getData()));
            }
    
            consumer.close();
            client.close();
        }
    }
    

Step to Produce msg with Pulsar and Consume msg with Kafka client

  1. Create topic with pulsar format via Pulsar admin CLI

    bin/pulsar-admin \
      --admin-url <Pulsar Service Http Url> \
      --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
      --auth-params token:<API Key> \
      topics create-partitioned-topic persistent://public/default/topic-with-pulsar-format -p 3 --metadata kafkaEntryFormat=pulsar
    
    bin/pulsar-admin \
      --admin-url <Pulsar Service Http Url> \
      --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
      --auth-params token:<API Key> \
      topics get-properties persistent://public/default/topic-with-pulsar-format
    
  2. Install client libraries

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.4.0</version>
    </dependency>
    
    <dependency>
      <groupId>org.apache.pulsar</groupId>
      <artifactId>pulsar-client</artifactId>
      <version>3.1.0</version>
    </dependency>
    
  3. Pulsar Producer and Kafka Consumer with Pulsar format

    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.pulsar.client.api.AuthenticationFactory;
    import org.apache.pulsar.client.api.MessageId;
    import org.apache.pulsar.client.api.Producer;
    import org.apache.pulsar.client.api.PulsarClient;
    
    import java.io.IOException;
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    public class PulsarFormatPulsarProduceKafkaConsume {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
            // Replace these configs with your cluster
            final String kafkaServerUrl = "<Kafka Service Url>";
            final String pulsarServerUrl = "<Pulsar Service Http Url>";
            final String jwtToken = "<API Key>";
            final String token = "token:" + jwtToken;
    
            final String topicName = "topic-with-pulsar-format";
            final String namespace = "public/default";
    
            // 1. Create a Pulsar producer
            PulsarClient client = PulsarClient.builder()
                    .serviceUrl(pulsarServerUrl)
                    .authentication(AuthenticationFactory.token(jwtToken))
                    .build();
    
            Producer<byte[]> producer = client.newProducer()
                    .topic(topicName)
                    .create();
    
            // 2. Produce messages with Pulsar producer
            for (int i = 0; i < 5; i++) {
                String message = "my-message-" + i;
                MessageId msgId = producer.send(message.getBytes());
                System.out.println("Publish " + "my-message-" + i
                        + " and message ID " + msgId);
            }
            producer.close();
            client.close();
    
            // 3. Consume messages with Pulsar consumer
            final Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerUrl);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    
            // 4. Create a producer with token authentication, which is equivalent to SASL/PLAIN mechanism in Kafka
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            props.put("sasl.mechanism", "PLAIN");
            props.put("sasl.jaas.config", String.format(
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
                    namespace, token));
    
            final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singleton(topicName));
    
            // 5. Consume messages with Kafka consumer
            boolean running = true;
            while (running) {
                System.out.println("running");
                final ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                if (!records.isEmpty()) {
                    records.forEach(record -> System.out.println("Receive record: " + record.value() + " from "
                            + record.topic() + "-" + record.partition() + "@" + record.offset()));
                    running = false;
                }
            }
            consumer.close();
        }
    }
    

Step to Produce msg with Kafka client and Consume msg with Pulsar key-shared subscription

  1. Create topic with pulsar format via Pulsar admin CLI

    bin/pulsar-admin \
      --admin-url <Pulsar Service Http Url> \
      --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
      --auth-params token:<API Key> \
      topics create-partitioned-topic persistent://public/default/topic-with-pulsar-non-batched-format -p 3 --metadata kafkaEntryFormat=pulsar_non_batched
    
    bin/pulsar-admin \
      --admin-url <Pulsar Service Http Url> \
      --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
      --auth-params token:<API Key> \
      topics get-properties persistent://public/default/topic-with-pulsar-non-batched-format
    
  2. Install client libraries

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.4.0</version>
    </dependency>
    
    <dependency>
      <groupId>org.apache.pulsar</groupId>
      <artifactId>pulsar-client</artifactId>
      <version>3.1.0</version>
    </dependency>
    
  3. Kafka Producer and Pulsar Consumer with pulsar_non_batched format

    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.apache.pulsar.client.api.Consumer;
    import org.apache.pulsar.client.api.Message;
    import org.apache.pulsar.client.api.PulsarClient;
    import org.apache.pulsar.client.api.SubscriptionType;
    import org.apache.pulsar.client.api.AuthenticationFactory;
    
    import java.nio.charset.StandardCharsets;
    import java.util.*;
    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class PulsarFormatKafkaProducePulsarKeySharedSubscriptionConsume {
    
        private static final int NUM_CONSUMERS = 3;
        private static final int NUM_MESSAGES = 100;
        private static final int NUM_KEYS = 10;
    
        // Replace these configs with your actual cluster settings
        private static final String pulsarServiceUrl = "<Pulsar Service Http Url>";
        private static final String kafkaServiceUrl = "<Kafka Service Url>";
        private static final String jwtToken = "<API Key>";
        private static final String namespace = "public/default";
    
        public static void main(String[] args) throws Exception {
            final String topic = "topic-with-pulsar-non-batched-format";
            final String subscription = "test-key-shared-subscription-group";
    
            PulsarClient client = PulsarClient.builder()
                .serviceUrl(pulsarServiceUrl)
                .authentication(AuthenticationFactory.token(jwtToken))
                .build();
    
            // Create kafka producer
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServiceUrl);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    
            // Add authentication if needed
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            props.put("sasl.mechanism", "PLAIN");
            props.put("sasl.jaas.config", String.format(
                "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
                namespace, "token:" + jwtToken));
    
            final KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
    
            // Create multiple pulsar consumers with Key_Shared subscription
            List<Consumer<byte[]>> consumers = new ArrayList<>();
            for (int i = 0; i < NUM_CONSUMERS; i++) {
                Consumer<byte[]> consumer = client.newConsumer()
                    .topic(topic)
                    .subscriptionName(subscription)
                    .subscriptionType(SubscriptionType.Key_Shared)
                    .subscribe();
                consumers.add(consumer);
            }
    
            // Maps to track message distribution
            Map<String, Integer> keyToConsumerMap = new ConcurrentHashMap<>();
            Map<Integer, Set<String>> consumerToKeysMap = new HashMap<>();
            for (int i = 0; i < NUM_CONSUMERS; i++) {
                consumerToKeysMap.put(i, new HashSet<>());
            }
    
            // Atomic counter to track processed messages
            AtomicInteger processedMessages = new AtomicInteger(0);
    
            // Create consumer futures
            List<CompletableFuture<Void>> futures = new ArrayList<>();
            for (int i = 0; i < NUM_CONSUMERS; i++) {
                final int consumerId = i;
                futures.add(CompletableFuture.runAsync(() -> {
                    try {
                        Consumer<byte[]> consumer = consumers.get(consumerId);
                        while (processedMessages.get() < NUM_MESSAGES) {
                            Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
                            if (msg != null) {
                                String key = new String(msg.getKeyBytes(), StandardCharsets.UTF_8);
    
                                // Record which consumer got which key
                                keyToConsumerMap.put(key, consumerId);
                                consumerToKeysMap.get(consumerId).add(key);
    
                                consumer.acknowledge(msg);
                                processedMessages.incrementAndGet();
                                System.out.println("Consumer " + consumerId + " received message with key: " + key);
                            }
                        }
                    } catch (Exception e) {
                        throw new CompletionException(e);
                    }
                }));
            }
    
            // Create combined future for all consumers
            CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
    
            // Send messages with different keys
            System.out.println("Sending first batch of messages");
            for (int i = 0; i < NUM_MESSAGES / 2; i++) {
                String key = "key-" + (i % NUM_KEYS);
                String value = "value-" + i;
                kafkaProducer.send(new ProducerRecord<>(topic, key, value));
            }
            kafkaProducer.flush();
    
            System.out.println("Sending second single batch of messages");
            for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++) {
                String key = "key-" + (i % NUM_KEYS);
                String value = "value-" + i;
                kafkaProducer.send(new ProducerRecord<>(topic, key, value));
                kafkaProducer.flush();
            }
    
            // Wait for all messages to be consumed (with timeout)
            try {
                System.out.println("Waiting for all messages to be consumed...");
                allFutures.get(30, TimeUnit.SECONDS);
            } catch (TimeoutException e) {
                // Cancel all futures if timeout occurs
                futures.forEach(f -> f.cancel(true));
                System.out.println("Not all messages were consumed within timeout");
                throw new RuntimeException("Not all messages were consumed within timeout", e);
            }
    
            // Close resources
            System.out.println("Closing consumers and producer");
            for (Consumer<byte[]> consumer : consumers) {
                consumer.close();
            }
            client.close();
            kafkaProducer.close();
    
            // Verification: Check that each key went to exactly one consumer
            Map<String, List<Integer>> keyViolations = new HashMap<>();
            for (String key : keyToConsumerMap.keySet()) {
                for (int i = 0; i < NUM_MESSAGES; i++) {
                    String messageKey = "key-" + (i % NUM_KEYS);
                    if (messageKey.equals(key)
                        && !Objects.equals(keyToConsumerMap.get(key), keyToConsumerMap.get(messageKey))) {
                        keyViolations.computeIfAbsent(key, k -> new ArrayList<>()).add(i);
                    }
                }
            }
    
            if (keyViolations.isEmpty()) {
                System.out.println("Key consistency verification passed");
            } else {
                System.out.println("Key consistency violated: " + keyViolations);
                throw new RuntimeException("Key consistency violated: " + keyViolations);
            }
    
            // Verify keys are distributed among consumers
            boolean allConsumersReceived = consumerToKeysMap.values().stream()
                .noneMatch(Set::isEmpty);
    
            if (allConsumersReceived) {
                System.out.println("All consumers received messages");
            } else {
                System.out.println("Some consumers didn't receive any messages");
                throw new RuntimeException("Some consumers didn't receive any messages");
            }
    
            // Print key distribution
            System.out.println("Key distribution among consumers:");
            for (int i = 0; i < NUM_CONSUMERS; i++) {
                System.out.println("Consumer " + i + " received keys: " + consumerToKeysMap.get(i));
            }
        }
    }
    

Interoperability between Kafka and Pulsar Transactions

Currently, it is not possible to interoperate Kafka and Pulsar transactions together. You must choose one or the other because they use different mechanisms to store transactional states.

Interoperability between Kafka and Pulsar Schema

StreamNative Cloud supports both the Kafka and Pulsar schema registries as central repositories to store registered schema information, which enables producers and consumers to coordinate the schema of a topic's messages through brokers. However, Kafka schemas and Pulsar schemas cannot be used simultaneously due to their differing API definitions and schema storage locations.

We also plan to achieve a unified schema registry, which will support both Kafka and Pulsar schemas. This will allow for the exchangeable production and consumption of messages with schema using both Pulsar and Kafka clients:

  • Pulsar consumers will be able to consume messages with schema produced by Kafka producers.
  • Kafka consumers will be able to consume messages with schema produced by Pulsar producers.

Use Pulsar admin to get KSN producer and consumer stats

After the pulsar version 3.3.5.1 or 4.0.1.1, we can use the pulsar-admin CLI to get the KSN's topic producer and consumer stats.

Note

  1. Ursa Engine currently only supports namespace level stats.
  2. If you're creating a Pulsar subscription on this topic, do not use __ksn_internal_subscription as the subscription name.

The KSN broker will register an internal producer and consumer for each topic. The producer's name is {clusterName}-{generatorInstanceId}-{counter}, and the consumer's name is __KSN__internal_consumer_{remoteAddress}. The subscription name is __ksn_internal_subscription.

The internal producer and consumer will not send or consume messages, nor will they acknowledge messages or affect message retention.

You can use the following command to get the stats:

# Get the topic stats, some of the fields are removed for better readability
$ pulsar-admin topics partitioned-stats test-topic
{
  "msgRateIn" : 5000.014579625847,
  "msgThroughputIn" : 618419.0865918549,
  "msgRateOut" : 5000.265921846748,
  "msgThroughputOut" : 576849.8534719701,
  "bytesInCounter" : 228942495,
  "msgInCounter" : 1957421,
  "bytesOutCounter" : 220383984,
  "msgOutCounter" : 1957222,
  "averageMsgSize" : 123.68345666666666,
  "storageSize" : 229986335,
  "backlogSize" : 149996147,
  "publishers" : [ {
    "msgRateIn" : 5000.014579625847,
    "msgThroughputIn" : 618419.0865918549,
    "averageMsgSize" : 123.68345666666667,
    "producerId" : 0,
  } ],
  "subscriptions" : {
    "__ksn_internal_subscription" : {
      "msgRateOut" : 5000.265921846748,
      "msgThroughputOut" : 576849.8534719701,
      "bytesOutCounter" : 220383984,
      "msgOutCounter" : 1957222,
      "consumers" : [ {
        "msgRateOut" : 5000.265921846748,
        "msgThroughputOut" : 576849.8534719701,
        "bytesOutCounter" : 220383984,
        "msgOutCounter" : 1957222,
      } ],
    }
  },
  "metadata" : {
    "partitions" : 1,
    "deleted" : false,
    "properties" : {
      "kafkaTopicUUID" : "6374b5c6-d932-43ad-8f70-7960d3236bba"
    }
  },
  "partitions" : { }
}

You can also see the producer and consumer stats in the StreamNative Cloud console. Producer and Consumer Stats

Previous
Multi Tenancy