> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Interoperability between Apache Kafka and Apache Pulsar

## Produce and consume messages between Kafka and Pulsar

StreamNative supports three data entry formats, `kafka` and `pulsar`. Each format has distinct characteristics:

* `kafka` format: This provides the best performance; however, a Pulsar consumer cannot consume it unless a payload processor is employed.
* `pulsar` format: This is the default data entry format on StreamNative cloud which supporting interoperability between Kafka and Pulsar clients, including Kafka client to Kafka client, and Pulsar client to Kafka client interactions, and vice versa. This means data between Apache Kafka and Apache Pulsar are interoperable. It is suitable for most scenarios where performance is not a critical consideration.
* `pulsar_non_batched` format: It is similar to pulsar entry format, the difference is this entry format will encode the Kafka batch messages to non-batched messages for Pulsar client to consume messages with a key-shared subscription, for Ursa, the behavior will be same as pulsar format.

StreamNative Cloud also supports specifying the entry format on a per-topic basis. The entry format can be set through topic properties by using `bin/pulsar-admin topics update-properties`. The configuration key is `kafkaEntryFormat`, and the possible values are `kafka` or `pulsar`. The default value is `pulsar` if not specified.

### Interoperability between Pulsar and Kafka clients

With the `kafka` entry format, Kafka producers can produce and consume messages directly and freely. However, Pulsar producers **SHOULD NOT** produce messages in these topics because they are unable to encode messages into a format consumable by Kafka clients.

However, since version `2.9` of the Pulsar client, we introduced a message payload processor for Pulsar consumers. This means that messages produced from Kafka producers can now be consumed and decoded by Pulsar consumers.

For the `pulsar` format, it allows messages to be freely produced and consumed between Kafka and Pulsar clients. The message format conversion is automatically handled by the broker, enabling more flexible use of either Kafka clients or Pulsar clients.

### Details for the message format conversion with `pulsar` format

The conversion is mostly intuitive except for some edge cases. Take the following simple case for example,

```java theme={null}
// The type of `kafkaProducer` is `KafkaProducer<String, String>`
kafkaProducer.send(new ProducerRecord<>(topic, "value")).get();
// The type of `pulsarConsumer` is `Consumer<byte[]>`
final var msg = pulsarConsumer.receive();
final var value = new String(msg.getValue(), StandardCharsets.UTF_8); // value will be "value"
```

The value received by a Pulsar consumer is guaranteed to be the same with the original value sent by a Kafka producer.

However, a Kafka message has some extra metadata like:

* key: the key used for routing the message to a specific partition
* headers: a list of headers, each header is a key-value pair

A Pulsar message has similar fields:

* key: the same as Kafka's key
* ordering key: the key used for `Key_Shared` subscriptions
* properties: a list of properties, each property is a key-value pair

It should be noted that the types of key and header value are both `byte[]` in Kafka, while in Pulsar, the types of key and property value are both `String`. For keys, each key will be converted to a base64-encoded string as Pulsar's key. See the following example:

```java theme={null}
kafkaProducer.send(new ProducerRecord<>(topic, 0, "key", "value")).get();
final var msg = pulsarConsumer.receive();
final String key = msg.getKey(); // "a2V5"
final byte[] keyBytes = msg.getKeyBytes(); // [107, 101, 121] (the byte array of "key")
final boolean hasBase64EncodedKey = msg.hasBase64EncodedKey(); // true
final byte[] orderingKey = msg.getOrderingKey(); // [107, 101, 121]
```

**You should use `getKeyBytes()` or `getOrderingKey()` to retrieve the original keys of Kafka messages**. The anti-intuitive behavior is that the `getKey()` method will return the base64-encoded string. This behavior is made because the byte array could vary after the `bytes -> UTF-8 string -> bytes` conversion, for example:

```java theme={null}
final var keyBytes = new byte[]{ 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, (byte) 0x88 };
// converted: [0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0xef, 0xbf, 0xbd]
final var converted = new String(keyBytes, StandardCharsets.UTF_8).getBytes(StandardCharsets.UTF_8);
```

Things get much more complicated with the conversion on headers.

1. For a header key, there could be multiple values in a Kafka message but there is only a single property value for a given property key in a Pulsar message.
2. There is no way to get the bytes of a Pulsar property value.

For the 1st issue, the latest value will be retained.

For the 2nd issue, the conversion will be performed with the following approach:

1. Convert the bytes directly as a UTF-8 string
2. If the bytes is not a valid UTF-8 string's bytes, convert it to a base64 encoded string.

For example,

```java theme={null}
final var headers = new ArrayList<Header>();
headers.add(new RecordHeader("header-key", "header-value".getBytes(StandardCharsets.UTF_8)));
headers.add(new RecordHeader("header-key", "header-value-2".getBytes(StandardCharsets.UTF_8)));
headers.add(new RecordHeader("header-key-2", new byte[]{ (byte) 0x88 }));
kafkaProducer.send(new ProducerRecord<>(topic, 0, "key", "value", headers)).get();
final var msg = pulsarConsumer.receive();
final var value1 = msg.getProperty("header-key"); // "header-value-2"
final var value2 = msg.getProperty("header-key-2"); // "iA=="
```

There is another corner case that an extra property whose key is `__ksn_internal_header_format` will be received by the Pulsar consumer if there is a header value that is a base64-encoded string's bytes.

```java theme={null}
final var headers = new ArrayList<Header>();
headers.add(new RecordHeader("header-key", "a2v5".getBytes(StandardCharsets.UTF_8)));
kafkaProducer.send(new ProducerRecord<>(topic, 0, "key", "value", headers)).get();
final var msg = pulsarConsumer.receive();
final var value1 = msg.getProperty("header-key"); // "a2V5"
final var value2 = msg.getProperty("__ksn_internal_header_format"); // a non-null JSON value
```

### Step to Produce msg with Kafka and Consume msg with Pulsar client

**Requirements**

Pulsar Client Version ≥ 2.9
Pulsar Admin Version ≥ 2.10

**Step**

1. Create topic with kafka format via Pulsar admin CLI

<Note title="Note">
  The kafka format can be following the same step, and using `kafkaEntryFormat=kafka` property.
</Note>

```shell theme={null}
bin/pulsar-admin \
  --admin-url <Pulsar Service Http Url> \
  --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
  --auth-params token:<API Key> \
  topics create persistent://public/default/topic-with-kafka-format --metadata kafkaEntryFormat=kafka

# Get properties to check if the entry format properties setting successfully
bin/pulsar-admin \
  --admin-url <Pulsar Service Http Url> \
  --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
  --auth-params token:<API Key> \
  topics get-properties persistent://public/default/topic-with-kafka-format
```

2. Install client libraries

   ```xml theme={null}
   <!-- Use to decode the Kafka format message -->
   <dependency>
       <groupId>io.streamnative.pulsar.handlers</groupId>
       <artifactId>kafka-payload-processor</artifactId>
       <version>3.1.0.4</version>
   </dependency>

   <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
       <version>3.4.0</version>
   </dependency>

   <dependency>
     <groupId>org.apache.pulsar</groupId>
     <artifactId>pulsar-client</artifactId>
     <version>3.1.0</version>
   </dependency>
   ```

3. Kafka Producer and Pulsar Consumer with Kafka format

   ```java theme={null}
   import io.streamnative.pulsar.handlers.kop.KafkaPayloadProcessor;
   import org.apache.kafka.clients.CommonClientConfigs;
   import org.apache.kafka.clients.producer.KafkaProducer;
   import org.apache.kafka.clients.producer.ProducerConfig;
   import org.apache.kafka.clients.producer.ProducerRecord;
   import org.apache.kafka.clients.producer.RecordMetadata;
   import org.apache.kafka.common.serialization.StringSerializer;
   import org.apache.pulsar.client.api.AuthenticationFactory;
   import org.apache.pulsar.client.api.Consumer;
   import org.apache.pulsar.client.api.Message;
   import org.apache.pulsar.client.api.PulsarClient;
   import org.apache.pulsar.client.api.SubscriptionInitialPosition;

   import java.io.IOException;
   import java.util.Properties;
   import java.util.concurrent.ExecutionException;
   import java.util.concurrent.Future;

   public class KafkaFormatKafkaProducePulsarConsume {

       public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
           // replace these configs with your cluster
           final String kafkaServerUrl = "<Kafka Service Url>";
           final String pulsarServerUrl = "<Pulsar Service Http Url>";
           final String jwtToken = "<API Key>";
           final String token = "token:" + jwtToken;

           final String topicName = "topic-with-kafka-format";
           final String namespace = "public/default";

           final Properties props = new Properties();
           props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerUrl);
           props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
           props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
           // 1. Create a producer with token authentication, which is equivalent to SASL/PLAIN mechanism in Kafka
           props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
           props.put("sasl.mechanism", "PLAIN");
           props.put("sasl.jaas.config", String.format(
                   "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
                   namespace, token));

           // 2. Create a producer
           final KafkaProducer<String, String> producer = new KafkaProducer<>(props);

           // 3. Produce messages with Kafka producer
           for (int i = 0; i < 5; i++) {
               String value = "hello world";
               final Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<>(topicName, value));
               final RecordMetadata recordMetadata = recordMetadataFuture.get();
               System.out.println("Send " + value + " to " + recordMetadata);
           }
           producer.close();

           // 4. Consume messages with Pulsar consumer
           PulsarClient client = PulsarClient.builder()
                   .serviceUrl(pulsarServerUrl)
                   .authentication(AuthenticationFactory.token(jwtToken))
                   .build();

           Consumer<byte[]> consumer = client.newConsumer()
                   .topic(topicName)
                   .subscriptionName("test")
                   .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                   // Set the Kafka payload processor to decode the kafka format message
                   .messagePayloadProcessor(new KafkaPayloadProcessor())
                   .subscribe();

           for (int i = 0; i < 5; i++) {
               Message<byte[]> msg = consumer.receive();
               consumer.acknowledge(msg);
               System.out.println("Receive message " + new String(msg.getData()));
           }

           consumer.close();
           client.close();
       }
   }
   ```

### Step to Produce msg with Pulsar and Consume msg with Kafka client

1. Create topic with pulsar format via Pulsar admin CLI

   ```shell theme={null}
   bin/pulsar-admin \
     --admin-url <Pulsar Service Http Url> \
     --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
     --auth-params token:<API Key> \
     topics create-partitioned-topic persistent://public/default/topic-with-pulsar-format -p 3 --metadata kafkaEntryFormat=pulsar

   bin/pulsar-admin \
     --admin-url <Pulsar Service Http Url> \
     --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
     --auth-params token:<API Key> \
     topics get-properties persistent://public/default/topic-with-pulsar-format
   ```

2. Install client libraries

   ```xml theme={null}
   <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
       <version>3.4.0</version>
   </dependency>

   <dependency>
     <groupId>org.apache.pulsar</groupId>
     <artifactId>pulsar-client</artifactId>
     <version>3.1.0</version>
   </dependency>
   ```

3. Pulsar Producer and Kafka Consumer with Pulsar format

   ```java theme={null}
   import org.apache.kafka.clients.CommonClientConfigs;
   import org.apache.kafka.clients.consumer.ConsumerConfig;
   import org.apache.kafka.clients.consumer.ConsumerRecords;
   import org.apache.kafka.clients.consumer.KafkaConsumer;
   import org.apache.kafka.common.serialization.StringDeserializer;
   import org.apache.pulsar.client.api.AuthenticationFactory;
   import org.apache.pulsar.client.api.MessageId;
   import org.apache.pulsar.client.api.Producer;
   import org.apache.pulsar.client.api.PulsarClient;

   import java.io.IOException;
   import java.time.Duration;
   import java.util.Collections;
   import java.util.Properties;
   import java.util.concurrent.ExecutionException;

   public class PulsarFormatPulsarProduceKafkaConsume {

       public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
           // Replace these configs with your cluster
           final String kafkaServerUrl = "<Kafka Service Url>";
           final String pulsarServerUrl = "<Pulsar Service Http Url>";
           final String jwtToken = "<API Key>";
           final String token = "token:" + jwtToken;

           final String topicName = "topic-with-pulsar-format";
           final String namespace = "public/default";

           // 1. Create a Pulsar producer
           PulsarClient client = PulsarClient.builder()
                   .serviceUrl(pulsarServerUrl)
                   .authentication(AuthenticationFactory.token(jwtToken))
                   .build();

           Producer<byte[]> producer = client.newProducer()
                   .topic(topicName)
                   .create();

           // 2. Produce messages with Pulsar producer
           for (int i = 0; i < 5; i++) {
               String message = "my-message-" + i;
               MessageId msgId = producer.send(message.getBytes());
               System.out.println("Publish " + "my-message-" + i
                       + " and message ID " + msgId);
           }
           producer.close();
           client.close();

           // 3. Consume messages with Pulsar consumer
           final Properties props = new Properties();
           props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerUrl);
           props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
           props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
           props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
           props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");

           // 4. Create a producer with token authentication, which is equivalent to SASL/PLAIN mechanism in Kafka
           props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
           props.put("sasl.mechanism", "PLAIN");
           props.put("sasl.jaas.config", String.format(
                   "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
                   namespace, token));

           final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
           consumer.subscribe(Collections.singleton(topicName));

           // 5. Consume messages with Kafka consumer
           boolean running = true;
           while (running) {
               System.out.println("running");
               final ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
               if (!records.isEmpty()) {
                   records.forEach(record -> System.out.println("Receive record: " + record.value() + " from "
                           + record.topic() + "-" + record.partition() + "@" + record.offset()));
                   running = false;
               }
           }
           consumer.close();
       }
   }
   ```

### Step to Produce msg with Kafka client and Consume msg with Pulsar key-shared subscription

1. Create topic with pulsar format via Pulsar admin CLI

   ```shell theme={null}
   bin/pulsar-admin \
     --admin-url <Pulsar Service Http Url> \
     --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
     --auth-params token:<API Key> \
     topics create-partitioned-topic persistent://public/default/topic-with-pulsar-non-batched-format -p 3 --metadata kafkaEntryFormat=pulsar_non_batched

   bin/pulsar-admin \
     --admin-url <Pulsar Service Http Url> \
     --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
     --auth-params token:<API Key> \
     topics get-properties persistent://public/default/topic-with-pulsar-non-batched-format
   ```

2. Install client libraries

   ```xml theme={null}
   <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
       <version>3.4.0</version>
   </dependency>

   <dependency>
     <groupId>org.apache.pulsar</groupId>
     <artifactId>pulsar-client</artifactId>
     <version>3.1.0</version>
   </dependency>
   ```

3. Kafka Producer and Pulsar Consumer with `pulsar_non_batched` format

   ```java theme={null}
   import org.apache.kafka.clients.CommonClientConfigs;
   import org.apache.kafka.clients.producer.KafkaProducer;
   import org.apache.kafka.clients.producer.ProducerConfig;
   import org.apache.kafka.clients.producer.ProducerRecord;
   import org.apache.kafka.common.serialization.StringSerializer;
   import org.apache.pulsar.client.api.Consumer;
   import org.apache.pulsar.client.api.Message;
   import org.apache.pulsar.client.api.PulsarClient;
   import org.apache.pulsar.client.api.SubscriptionType;
   import org.apache.pulsar.client.api.AuthenticationFactory;

   import java.nio.charset.StandardCharsets;
   import java.util.*;
   import java.util.concurrent.*;
   import java.util.concurrent.atomic.AtomicInteger;

   public class PulsarFormatKafkaProducePulsarKeySharedSubscriptionConsume {

       private static final int NUM_CONSUMERS = 3;
       private static final int NUM_MESSAGES = 100;
       private static final int NUM_KEYS = 10;

       // Replace these configs with your actual cluster settings
       private static final String pulsarServiceUrl = "<Pulsar Service Http Url>";
       private static final String kafkaServiceUrl = "<Kafka Service Url>";
       private static final String jwtToken = "<API Key>";
       private static final String namespace = "public/default";

       public static void main(String[] args) throws Exception {
           final String topic = "topic-with-pulsar-non-batched-format";
           final String subscription = "test-key-shared-subscription-group";

           PulsarClient client = PulsarClient.builder()
               .serviceUrl(pulsarServiceUrl)
               .authentication(AuthenticationFactory.token(jwtToken))
               .build();

           // Create kafka producer
           Properties props = new Properties();
           props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServiceUrl);
           props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
           props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

           // Add authentication if needed
           props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
           props.put("sasl.mechanism", "PLAIN");
           props.put("sasl.jaas.config", String.format(
               "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
               namespace, "token:" + jwtToken));

           final KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

           // Create multiple pulsar consumers with Key_Shared subscription
           List<Consumer<byte[]>> consumers = new ArrayList<>();
           for (int i = 0; i < NUM_CONSUMERS; i++) {
               Consumer<byte[]> consumer = client.newConsumer()
                   .topic(topic)
                   .subscriptionName(subscription)
                   .subscriptionType(SubscriptionType.Key_Shared)
                   .subscribe();
               consumers.add(consumer);
           }

           // Maps to track message distribution
           Map<String, Integer> keyToConsumerMap = new ConcurrentHashMap<>();
           Map<Integer, Set<String>> consumerToKeysMap = new HashMap<>();
           for (int i = 0; i < NUM_CONSUMERS; i++) {
               consumerToKeysMap.put(i, new HashSet<>());
           }

           // Atomic counter to track processed messages
           AtomicInteger processedMessages = new AtomicInteger(0);

           // Create consumer futures
           List<CompletableFuture<Void>> futures = new ArrayList<>();
           for (int i = 0; i < NUM_CONSUMERS; i++) {
               final int consumerId = i;
               futures.add(CompletableFuture.runAsync(() -> {
                   try {
                       Consumer<byte[]> consumer = consumers.get(consumerId);
                       while (processedMessages.get() < NUM_MESSAGES) {
                           Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
                           if (msg != null) {
                               String key = new String(msg.getKeyBytes(), StandardCharsets.UTF_8);

                               // Record which consumer got which key
                               keyToConsumerMap.put(key, consumerId);
                               consumerToKeysMap.get(consumerId).add(key);

                               consumer.acknowledge(msg);
                               processedMessages.incrementAndGet();
                               System.out.println("Consumer " + consumerId + " received message with key: " + key);
                           }
                       }
                   } catch (Exception e) {
                       throw new CompletionException(e);
                   }
               }));
           }

           // Create combined future for all consumers
           CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

           // Send messages with different keys
           System.out.println("Sending first batch of messages");
           for (int i = 0; i < NUM_MESSAGES / 2; i++) {
               String key = "key-" + (i % NUM_KEYS);
               String value = "value-" + i;
               kafkaProducer.send(new ProducerRecord<>(topic, key, value));
           }
           kafkaProducer.flush();

           System.out.println("Sending second single batch of messages");
           for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++) {
               String key = "key-" + (i % NUM_KEYS);
               String value = "value-" + i;
               kafkaProducer.send(new ProducerRecord<>(topic, key, value));
               kafkaProducer.flush();
           }

           // Wait for all messages to be consumed (with timeout)
           try {
               System.out.println("Waiting for all messages to be consumed...");
               allFutures.get(30, TimeUnit.SECONDS);
           } catch (TimeoutException e) {
               // Cancel all futures if timeout occurs
               futures.forEach(f -> f.cancel(true));
               System.out.println("Not all messages were consumed within timeout");
               throw new RuntimeException("Not all messages were consumed within timeout", e);
           }

           // Close resources
           System.out.println("Closing consumers and producer");
           for (Consumer<byte[]> consumer : consumers) {
               consumer.close();
           }
           client.close();
           kafkaProducer.close();

           // Verification: Check that each key went to exactly one consumer
           Map<String, List<Integer>> keyViolations = new HashMap<>();
           for (String key : keyToConsumerMap.keySet()) {
               for (int i = 0; i < NUM_MESSAGES; i++) {
                   String messageKey = "key-" + (i % NUM_KEYS);
                   if (messageKey.equals(key)
                       && !Objects.equals(keyToConsumerMap.get(key), keyToConsumerMap.get(messageKey))) {
                       keyViolations.computeIfAbsent(key, k -> new ArrayList<>()).add(i);
                   }
               }
           }

           if (keyViolations.isEmpty()) {
               System.out.println("Key consistency verification passed");
           } else {
               System.out.println("Key consistency violated: " + keyViolations);
               throw new RuntimeException("Key consistency violated: " + keyViolations);
           }

           // Verify keys are distributed among consumers
           boolean allConsumersReceived = consumerToKeysMap.values().stream()
               .noneMatch(Set::isEmpty);

           if (allConsumersReceived) {
               System.out.println("All consumers received messages");
           } else {
               System.out.println("Some consumers didn't receive any messages");
               throw new RuntimeException("Some consumers didn't receive any messages");
           }

           // Print key distribution
           System.out.println("Key distribution among consumers:");
           for (int i = 0; i < NUM_CONSUMERS; i++) {
               System.out.println("Consumer " + i + " received keys: " + consumerToKeysMap.get(i));
           }
       }
   }
   ```

## Interoperability between Kafka and Pulsar Transactions

Currently, it is not possible to interoperate Kafka and Pulsar transactions together. You must choose one or the other because they use different mechanisms to store transactional states.

## Interoperability between Kafka and Pulsar Schema

StreamNative Cloud supports both the Kafka and Pulsar schema registries as central repositories to store registered schema information, which enables producers and consumers to coordinate the schema of a topic's messages through brokers. However, Kafka schemas and Pulsar schemas cannot be used simultaneously due to their differing API definitions and schema storage locations.

We also plan to achieve a unified schema registry, which will support both Kafka and Pulsar schemas. This will allow for the exchangeable production and consumption of messages with schema using both Pulsar and Kafka clients:

* Pulsar consumers will be able to consume messages with schema produced by Kafka producers.
* Kafka consumers will be able to consume messages with schema produced by Pulsar producers.

## Use Pulsar admin to get KSN producer and consumer stats

After the pulsar version `3.3.5.1` or `4.0.1.1`, we can use the `pulsar-admin` CLI to get the KSN's topic producer and consumer stats.

<Note title="Note">
  1. Ursa Engine currently only supports namespace level stats.
  2. If you're creating a Pulsar subscription on this topic, do not use `__ksn_internal_subscription` as the subscription name.
</Note>

The KSN broker will register an internal producer and consumer for each topic. The producer's name is `{clusterName}-{generatorInstanceId}-{counter}`, and the consumer's name is `__KSN__internal_consumer_{remoteAddress}`. The subscription name is `__ksn_internal_subscription`.

The internal producer and consumer will not send or consume messages, nor will they acknowledge messages or affect message retention.

You can use the following command to get the stats:

```shell theme={null}
# Get the topic stats, some of the fields are removed for better readability
$ pulsar-admin topics partitioned-stats test-topic
{
  "msgRateIn" : 5000.014579625847,
  "msgThroughputIn" : 618419.0865918549,
  "msgRateOut" : 5000.265921846748,
  "msgThroughputOut" : 576849.8534719701,
  "bytesInCounter" : 228942495,
  "msgInCounter" : 1957421,
  "bytesOutCounter" : 220383984,
  "msgOutCounter" : 1957222,
  "averageMsgSize" : 123.68345666666666,
  "storageSize" : 229986335,
  "backlogSize" : 149996147,
  "publishers" : [ {
    "msgRateIn" : 5000.014579625847,
    "msgThroughputIn" : 618419.0865918549,
    "averageMsgSize" : 123.68345666666667,
    "producerId" : 0,
  } ],
  "subscriptions" : {
    "__ksn_internal_subscription" : {
      "msgRateOut" : 5000.265921846748,
      "msgThroughputOut" : 576849.8534719701,
      "bytesOutCounter" : 220383984,
      "msgOutCounter" : 1957222,
      "consumers" : [ {
        "msgRateOut" : 5000.265921846748,
        "msgThroughputOut" : 576849.8534719701,
        "bytesOutCounter" : 220383984,
        "msgOutCounter" : 1957222,
      } ],
    }
  },
  "metadata" : {
    "partitions" : 1,
    "deleted" : false,
    "properties" : {
      "kafkaTopicUUID" : "6374b5c6-d932-43ad-8f70-7960d3236bba"
    }
  },
  "partitions" : { }
}
```

You can also see the producer and consumer stats in the StreamNative Cloud console.

<img src="https://mintcdn.com/streamnative/tIZ04bis3aV5g7je/media/ksn-producer-consumer-stats.png?fit=max&auto=format&n=tIZ04bis3aV5g7je&q=85&s=1045ea9f139832ca49e89b2559f15fb7" alt="Producer and Consumer Stats" width="3252" height="1508" data-path="media/ksn-producer-consumer-stats.png" />
