- Build Applications
- Kafka Clients
- Advanced Features
Interoperability between Apache Kafka and Apache Pulsar
Produce and consume messages between Kafka and Pulsar
StreamNative supports three data entry formats, kafka
and pulsar
. Each format has distinct characteristics:
kafka
format: This provides the best performance; however, a Pulsar consumer cannot consume it unless a payload processor is employed.pulsar
format: This is the default data entry format on StreamNative cloud which supporting interoperability between Kafka and Pulsar clients, including Kafka client to Kafka client, and Pulsar client to Kafka client interactions, and vice versa. This means data between Apache Kafka and Apache Pulsar are interoperable. It is suitable for most scenarios where performance is not a critical consideration.pulsar_non_batched
format: It is similar to pulsar entry format, the difference is this entry format will encode the Kafka batch messages to non-batched messages for Pulsar client to consume messages with a key-shared subscription, for Ursa, the behavior will be same as pulsar format.
StreamNative Cloud also supports specifying the entry format on a per-topic basis. The entry format can be set through topic properties by using bin/pulsar-admin topics update-properties
. The configuration key is kafkaEntryFormat
, and the possible values are kafka
or pulsar
. The default value is pulsar
if not specified.
Interoperability between Pulsar and Kafka clients
With the kafka
entry format, Kafka producers can produce and consume messages directly and freely. However, Pulsar producers SHOULD NOT produce messages in these topics because they are unable to encode messages into a format consumable by Kafka clients.
However, since version 2.9
of the Pulsar client, we introduced a message payload processor for Pulsar consumers. This means that messages produced from Kafka producers can now be consumed and decoded by Pulsar consumers.
For the pulsar
format, it allows messages to be freely produced and consumed between Kafka and Pulsar clients. The message format conversion is automatically handled by the broker, enabling more flexible use of either Kafka clients or Pulsar clients.
Details for the message format conversion with pulsar
format
The conversion is mostly intuitive except for some edge cases. Take the following simple case for example,
// The type of `kafkaProducer` is `KafkaProducer<String, String>`
kafkaProducer.send(new ProducerRecord<>(topic, "value")).get();
// The type of `pulsarConsumer` is `Consumer<byte[]>`
final var msg = pulsarConsumer.receive();
final var value = new String(msg.getValue(), StandardCharsets.UTF_8); // value will be "value"
The value received by a Pulsar consumer is guaranteed to be the same with the original value sent by a Kafka producer.
However, a Kafka message has some extra metadata like:
- key: the key used for routing the message to a specific partition
- headers: a list of headers, each header is a key-value pair
A Pulsar message has similar fields:
- key: the same as Kafka's key
- ordering key: the key used for
Key_Shared
subscriptions - properties: a list of properties, each property is a key-value pair
It should be noted that the types of key and header value are both byte[]
in Kafka, while in Pulsar, the types of key and property value are both String
. For keys, each key will be converted to a base64-encoded string as Pulsar's key. See the following example:
kafkaProducer.send(new ProducerRecord<>(topic, 0, "key", "value")).get();
final var msg = pulsarConsumer.receive();
final String key = msg.getKey(); // "a2V5"
final byte[] keyBytes = msg.getKeyBytes(); // [107, 101, 121] (the byte array of "key")
final boolean hasBase64EncodedKey = msg.hasBase64EncodedKey(); // true
final byte[] orderingKey = msg.getOrderingKey(); // [107, 101, 121]
You should use getKeyBytes()
or getOrderingKey()
to retrieve the original keys of Kafka messages. The anti-intuitive behavior is that the getKey()
method will return the base64-encoded string. This behavior is made because the byte array could vary after the bytes -> UTF-8 string -> bytes
conversion, for example:
final var keyBytes = new byte[]{ 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, (byte) 0x88 };
// converted: [0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0xef, 0xbf, 0xbd]
final var converted = new String(keyBytes, StandardCharsets.UTF_8).getBytes(StandardCharsets.UTF_8);
Things get much more complicated with the conversion on headers.
- For a header key, there could be multiple values in a Kafka message but there is only a single property value for a given property key in a Pulsar message.
- There is no way to get the bytes of a Pulsar property value.
For the 1st issue, the latest value will be retained.
For the 2nd issue, the conversion will be performed with the following approach:
- Convert the bytes directly as a UTF-8 string
- If the bytes is not a valid UTF-8 string's bytes, convert it to a base64 encoded string.
For example,
final var headers = new ArrayList<Header>();
headers.add(new RecordHeader("header-key", "header-value".getBytes(StandardCharsets.UTF_8)));
headers.add(new RecordHeader("header-key", "header-value-2".getBytes(StandardCharsets.UTF_8)));
headers.add(new RecordHeader("header-key-2", new byte[]{ (byte) 0x88 }));
kafkaProducer.send(new ProducerRecord<>(topic, 0, "key", "value", headers)).get();
final var msg = pulsarConsumer.receive();
final var value1 = msg.getProperty("header-key"); // "header-value-2"
final var value2 = msg.getProperty("header-key-2"); // "iA=="
There is another corner case that an extra property whose key is __ksn_internal_header_format
will be received by the Pulsar consumer if there is a header value that is a base64-encoded string's bytes.
final var headers = new ArrayList<Header>();
headers.add(new RecordHeader("header-key", "a2v5".getBytes(StandardCharsets.UTF_8)));
kafkaProducer.send(new ProducerRecord<>(topic, 0, "key", "value", headers)).get();
final var msg = pulsarConsumer.receive();
final var value1 = msg.getProperty("header-key"); // "a2V5"
final var value2 = msg.getProperty("__ksn_internal_header_format"); // a non-null JSON value
Step to Produce msg with Kafka and Consume msg with Pulsar client
Requirements
Pulsar Client Version ≥ 2.9 Pulsar Admin Version ≥ 2.10
Step
Create topic with kafka format via Pulsar admin CLI
Note
The kafka format can be following the same step, and using
kafkaEntryFormat=kafka
property.bin/pulsar-admin \ --admin-url <Pulsar Service Http Url> \ --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \ --auth-params token:<API Key> \ topics create persistent://public/default/topic-with-kafka-format --metadata kafkaEntryFormat=kafka # Get properties to check if the entry format properties setting successfully bin/pulsar-admin \ --admin-url <Pulsar Service Http Url> \ --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \ --auth-params token:<API Key> \ topics get-properties persistent://public/default/topic-with-kafka-format
Install client libraries
<!-- Use to decode the Kafka format message --> <dependency> <groupId>io.streamnative.pulsar.handlers</groupId> <artifactId>kafka-payload-processor</artifactId> <version>3.1.0.4</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.4.0</version> </dependency> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>3.1.0</version> </dependency>
Kafka Producer and Pulsar Consumer with Kafka format
import io.streamnative.pulsar.handlers.kop.KafkaPayloadProcessor; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import java.io.IOException; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class KafkaFormatKafkaProducePulsarConsume { public static void main(String[] args) throws ExecutionException, InterruptedException, IOException { // replace these configs with your cluster final String kafkaServerUrl = "<Kafka Service Url>"; final String pulsarServerUrl = "<Pulsar Service Http Url>"; final String jwtToken = "<API Key>"; final String token = "token:" + jwtToken; final String topicName = "topic-with-kafka-format"; final String namespace = "public/default"; final Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerUrl); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 1. Create a producer with token authentication, which is equivalent to SASL/PLAIN mechanism in Kafka props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", String.format( "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", namespace, token)); // 2. Create a producer final KafkaProducer<String, String> producer = new KafkaProducer<>(props); // 3. Produce messages with Kafka producer for (int i = 0; i < 5; i++) { String value = "hello world"; final Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<>(topicName, value)); final RecordMetadata recordMetadata = recordMetadataFuture.get(); System.out.println("Send " + value + " to " + recordMetadata); } producer.close(); // 4. Consume messages with Pulsar consumer PulsarClient client = PulsarClient.builder() .serviceUrl(pulsarServerUrl) .authentication(AuthenticationFactory.token(jwtToken)) .build(); Consumer<byte[]> consumer = client.newConsumer() .topic(topicName) .subscriptionName("test") .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) // Set the Kafka payload processor to decode the kafka format message .messagePayloadProcessor(new KafkaPayloadProcessor()) .subscribe(); for (int i = 0; i < 5; i++) { Message<byte[]> msg = consumer.receive(); consumer.acknowledge(msg); System.out.println("Receive message " + new String(msg.getData())); } consumer.close(); client.close(); } }
Step to Produce msg with Pulsar and Consume msg with Kafka client
Create topic with pulsar format via Pulsar admin CLI
bin/pulsar-admin \ --admin-url <Pulsar Service Http Url> \ --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \ --auth-params token:<API Key> \ topics create-partitioned-topic persistent://public/default/topic-with-pulsar-format -p 3 --metadata kafkaEntryFormat=pulsar bin/pulsar-admin \ --admin-url <Pulsar Service Http Url> \ --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \ --auth-params token:<API Key> \ topics get-properties persistent://public/default/topic-with-pulsar-format
Install client libraries
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.4.0</version> </dependency> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>3.1.0</version> </dependency>
Pulsar Producer and Kafka Consumer with Pulsar format
import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import java.io.IOException; import java.time.Duration; import java.util.Collections; import java.util.Properties; import java.util.concurrent.ExecutionException; public class PulsarFormatPulsarProduceKafkaConsume { public static void main(String[] args) throws ExecutionException, InterruptedException, IOException { // Replace these configs with your cluster final String kafkaServerUrl = "<Kafka Service Url>"; final String pulsarServerUrl = "<Pulsar Service Http Url>"; final String jwtToken = "<API Key>"; final String token = "token:" + jwtToken; final String topicName = "topic-with-pulsar-format"; final String namespace = "public/default"; // 1. Create a Pulsar producer PulsarClient client = PulsarClient.builder() .serviceUrl(pulsarServerUrl) .authentication(AuthenticationFactory.token(jwtToken)) .build(); Producer<byte[]> producer = client.newProducer() .topic(topicName) .create(); // 2. Produce messages with Pulsar producer for (int i = 0; i < 5; i++) { String message = "my-message-" + i; MessageId msgId = producer.send(message.getBytes()); System.out.println("Publish " + "my-message-" + i + " and message ID " + msgId); } producer.close(); client.close(); // 3. Consume messages with Pulsar consumer final Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerUrl); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test"); // 4. Create a producer with token authentication, which is equivalent to SASL/PLAIN mechanism in Kafka props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", String.format( "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", namespace, token)); final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singleton(topicName)); // 5. Consume messages with Kafka consumer boolean running = true; while (running) { System.out.println("running"); final ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); if (!records.isEmpty()) { records.forEach(record -> System.out.println("Receive record: " + record.value() + " from " + record.topic() + "-" + record.partition() + "@" + record.offset())); running = false; } } consumer.close(); } }
Step to Produce msg with Kafka client and Consume msg with Pulsar key-shared subscription
Create topic with pulsar format via Pulsar admin CLI
bin/pulsar-admin \ --admin-url <Pulsar Service Http Url> \ --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \ --auth-params token:<API Key> \ topics create-partitioned-topic persistent://public/default/topic-with-pulsar-non-batched-format -p 3 --metadata kafkaEntryFormat=pulsar_non_batched bin/pulsar-admin \ --admin-url <Pulsar Service Http Url> \ --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \ --auth-params token:<API Key> \ topics get-properties persistent://public/default/topic-with-pulsar-non-batched-format
Install client libraries
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.4.0</version> </dependency> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>3.1.0</version> </dependency>
Kafka Producer and Pulsar Consumer with
pulsar_non_batched
formatimport org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.AuthenticationFactory; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; public class PulsarFormatKafkaProducePulsarKeySharedSubscriptionConsume { private static final int NUM_CONSUMERS = 3; private static final int NUM_MESSAGES = 100; private static final int NUM_KEYS = 10; // Replace these configs with your actual cluster settings private static final String pulsarServiceUrl = "<Pulsar Service Http Url>"; private static final String kafkaServiceUrl = "<Kafka Service Url>"; private static final String jwtToken = "<API Key>"; private static final String namespace = "public/default"; public static void main(String[] args) throws Exception { final String topic = "topic-with-pulsar-non-batched-format"; final String subscription = "test-key-shared-subscription-group"; PulsarClient client = PulsarClient.builder() .serviceUrl(pulsarServiceUrl) .authentication(AuthenticationFactory.token(jwtToken)) .build(); // Create kafka producer Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServiceUrl); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // Add authentication if needed props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); props.put("sasl.mechanism", "PLAIN"); props.put("sasl.jaas.config", String.format( "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";", namespace, "token:" + jwtToken)); final KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props); // Create multiple pulsar consumers with Key_Shared subscription List<Consumer<byte[]>> consumers = new ArrayList<>(); for (int i = 0; i < NUM_CONSUMERS; i++) { Consumer<byte[]> consumer = client.newConsumer() .topic(topic) .subscriptionName(subscription) .subscriptionType(SubscriptionType.Key_Shared) .subscribe(); consumers.add(consumer); } // Maps to track message distribution Map<String, Integer> keyToConsumerMap = new ConcurrentHashMap<>(); Map<Integer, Set<String>> consumerToKeysMap = new HashMap<>(); for (int i = 0; i < NUM_CONSUMERS; i++) { consumerToKeysMap.put(i, new HashSet<>()); } // Atomic counter to track processed messages AtomicInteger processedMessages = new AtomicInteger(0); // Create consumer futures List<CompletableFuture<Void>> futures = new ArrayList<>(); for (int i = 0; i < NUM_CONSUMERS; i++) { final int consumerId = i; futures.add(CompletableFuture.runAsync(() -> { try { Consumer<byte[]> consumer = consumers.get(consumerId); while (processedMessages.get() < NUM_MESSAGES) { Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS); if (msg != null) { String key = new String(msg.getKeyBytes(), StandardCharsets.UTF_8); // Record which consumer got which key keyToConsumerMap.put(key, consumerId); consumerToKeysMap.get(consumerId).add(key); consumer.acknowledge(msg); processedMessages.incrementAndGet(); System.out.println("Consumer " + consumerId + " received message with key: " + key); } } } catch (Exception e) { throw new CompletionException(e); } })); } // Create combined future for all consumers CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); // Send messages with different keys System.out.println("Sending first batch of messages"); for (int i = 0; i < NUM_MESSAGES / 2; i++) { String key = "key-" + (i % NUM_KEYS); String value = "value-" + i; kafkaProducer.send(new ProducerRecord<>(topic, key, value)); } kafkaProducer.flush(); System.out.println("Sending second single batch of messages"); for (int i = NUM_MESSAGES / 2; i < NUM_MESSAGES; i++) { String key = "key-" + (i % NUM_KEYS); String value = "value-" + i; kafkaProducer.send(new ProducerRecord<>(topic, key, value)); kafkaProducer.flush(); } // Wait for all messages to be consumed (with timeout) try { System.out.println("Waiting for all messages to be consumed..."); allFutures.get(30, TimeUnit.SECONDS); } catch (TimeoutException e) { // Cancel all futures if timeout occurs futures.forEach(f -> f.cancel(true)); System.out.println("Not all messages were consumed within timeout"); throw new RuntimeException("Not all messages were consumed within timeout", e); } // Close resources System.out.println("Closing consumers and producer"); for (Consumer<byte[]> consumer : consumers) { consumer.close(); } client.close(); kafkaProducer.close(); // Verification: Check that each key went to exactly one consumer Map<String, List<Integer>> keyViolations = new HashMap<>(); for (String key : keyToConsumerMap.keySet()) { for (int i = 0; i < NUM_MESSAGES; i++) { String messageKey = "key-" + (i % NUM_KEYS); if (messageKey.equals(key) && !Objects.equals(keyToConsumerMap.get(key), keyToConsumerMap.get(messageKey))) { keyViolations.computeIfAbsent(key, k -> new ArrayList<>()).add(i); } } } if (keyViolations.isEmpty()) { System.out.println("Key consistency verification passed"); } else { System.out.println("Key consistency violated: " + keyViolations); throw new RuntimeException("Key consistency violated: " + keyViolations); } // Verify keys are distributed among consumers boolean allConsumersReceived = consumerToKeysMap.values().stream() .noneMatch(Set::isEmpty); if (allConsumersReceived) { System.out.println("All consumers received messages"); } else { System.out.println("Some consumers didn't receive any messages"); throw new RuntimeException("Some consumers didn't receive any messages"); } // Print key distribution System.out.println("Key distribution among consumers:"); for (int i = 0; i < NUM_CONSUMERS; i++) { System.out.println("Consumer " + i + " received keys: " + consumerToKeysMap.get(i)); } } }
Interoperability between Kafka and Pulsar Transactions
Currently, it is not possible to interoperate Kafka and Pulsar transactions together. You must choose one or the other because they use different mechanisms to store transactional states.
Interoperability between Kafka and Pulsar Schema
StreamNative Cloud supports both the Kafka and Pulsar schema registries as central repositories to store registered schema information, which enables producers and consumers to coordinate the schema of a topic's messages through brokers. However, Kafka schemas and Pulsar schemas cannot be used simultaneously due to their differing API definitions and schema storage locations.
We also plan to achieve a unified schema registry, which will support both Kafka and Pulsar schemas. This will allow for the exchangeable production and consumption of messages with schema using both Pulsar and Kafka clients:
- Pulsar consumers will be able to consume messages with schema produced by Kafka producers.
- Kafka consumers will be able to consume messages with schema produced by Pulsar producers.
Use Pulsar admin to get KSN producer and consumer stats
After the pulsar version 3.3.5.1
or 4.0.1.1
, we can use the pulsar-admin
CLI to get the KSN's topic producer and consumer stats.
Note
- Ursa Engine currently only supports namespace level stats.
- If you're creating a Pulsar subscription on this topic, do not use
__ksn_internal_subscription
as the subscription name.
The KSN broker will register an internal producer and consumer for each topic. The producer's name is {clusterName}-{generatorInstanceId}-{counter}
, and the consumer's name is __KSN__internal_consumer_{remoteAddress}
. The subscription name is __ksn_internal_subscription
.
The internal producer and consumer will not send or consume messages, nor will they acknowledge messages or affect message retention.
You can use the following command to get the stats:
# Get the topic stats, some of the fields are removed for better readability
$ pulsar-admin topics partitioned-stats test-topic
{
"msgRateIn" : 5000.014579625847,
"msgThroughputIn" : 618419.0865918549,
"msgRateOut" : 5000.265921846748,
"msgThroughputOut" : 576849.8534719701,
"bytesInCounter" : 228942495,
"msgInCounter" : 1957421,
"bytesOutCounter" : 220383984,
"msgOutCounter" : 1957222,
"averageMsgSize" : 123.68345666666666,
"storageSize" : 229986335,
"backlogSize" : 149996147,
"publishers" : [ {
"msgRateIn" : 5000.014579625847,
"msgThroughputIn" : 618419.0865918549,
"averageMsgSize" : 123.68345666666667,
"producerId" : 0,
} ],
"subscriptions" : {
"__ksn_internal_subscription" : {
"msgRateOut" : 5000.265921846748,
"msgThroughputOut" : 576849.8534719701,
"bytesOutCounter" : 220383984,
"msgOutCounter" : 1957222,
"consumers" : [ {
"msgRateOut" : 5000.265921846748,
"msgThroughputOut" : 576849.8534719701,
"bytesOutCounter" : 220383984,
"msgOutCounter" : 1957222,
} ],
}
},
"metadata" : {
"partitions" : 1,
"deleted" : false,
"properties" : {
"kafkaTopicUUID" : "6374b5c6-d932-43ad-8f70-7960d3236bba"
}
},
"partitions" : { }
}
You can also see the producer and consumer stats in the StreamNative Cloud console.