1. Build Applications
  2. Kafka Clients
  3. Advanced Features

Interoperability between Apache Kafka and Apache Pulsar

Produce and consume messages between Kafka and Pulsar

StreamNative supports three data entry formats, kafka, mixed_kafka, and pulsar. Each format has distinct characteristics:

  • kafka format: This provides the best performance; however, a Pulsar consumer cannot consume it unless a payload processor is employed.
  • mixed_kafka format: The mixed_kafka format functions similarly to the kafka format. It supports some non-official Kafka clients for encoding or decoding Kafka messages. The performance is moderate.
  • pulsar format: This is the default data entry format on StreamNative cloud which supporting interoperability between Kafka and Pulsar clients, including Kafka client to Kafka client, and Pulsar client to Kafka client interactions, and vice versa. This means data between Apache Kafka and Apache Pulsar are interoperable. It is suitable for most scenarios where performance is not a critical consideration.

StreamNative Cloud also supports specifying the entry format on a per-topic basis. The entry format can be set through topic properties by using bin/pulsar-admin topics update-properties. The configuration key is kafkaEntryFormat, and the possible values are kafka, mixed_kafka, or pulsar. The default value is pulsar if not specified.

Interoperability between Pulsar and Kafka clients

interoperability-between-pulsar-and-kafka-clients.png

With the kafka or mixed_kafka entry format, Kafka producers can produce and consume messages directly and freely. However, Pulsar producers SHOULD NOT produce messages in these topics because they are unable to encode messages into a format consumable by Kafka clients.

However, since version 2.9 of the Pulsar client, we introduced a message payload processor for Pulsar consumers. This means that messages produced from Kafka producers can now be consumed and decoded by Pulsar consumers.

For the pulsar format, it allows messages to be freely produced and consumed between Kafka and Pulsar clients. The message format conversion is automatically handled by the broker, enabling more flexible use of either Kafka clients or Pulsar clients.

Step to Produce msg with Kafka and Consume msg with Pulsar client

Requirements

Pulsar Client Version ≥ 2.9 Pulsar Admin Version ≥ 2.10

Step

  1. Create topic with kafka format via Pulsar admin CLI

    Note

    The mixed_kafka format can be following the same step, and using kafkaEntryFormat=mixed_kafka property.

    bin/pulsar-admin \
      --admin-url <Pulsar Service Http Url> \
      --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
      --auth-params token:<API Key> \
      topics create persistent://public/default/topic-with-kafka-format --metadata kafkaEntryFormat=kafka
    
    # Get properties to check if the entry format properties setting successfully
    bin/pulsar-admin \
      --admin-url <Pulsar Service Http Url> \
      --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
      --auth-params token:<API Key> \
      topics get-properties persistent://public/default/topic-with-kafka-format
    
  2. Install client libraries

    <!-- Use to decode the Kafka format message -->
    <dependency>
        <groupId>io.streamnative.pulsar.handlers</groupId>
        <artifactId>kafka-payload-processor</artifactId>
        <version>3.1.0.4</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.4.0</version>
    </dependency>
    
    <dependency>
      <groupId>org.apache.pulsar</groupId>
      <artifactId>pulsar-client</artifactId>
      <version>3.1.0</version>
    </dependency>
    
  3. Kafka Producer and Pulsar Consumer with Kafka format

    import io.streamnative.pulsar.handlers.kop.KafkaPayloadProcessor;
    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.apache.pulsar.client.api.AuthenticationFactory;
    import org.apache.pulsar.client.api.Consumer;
    import org.apache.pulsar.client.api.Message;
    import org.apache.pulsar.client.api.PulsarClient;
    import org.apache.pulsar.client.api.SubscriptionInitialPosition;
    
    import java.io.IOException;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    
    public class KafkaFormatKafkaProducePulsarConsume {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
            // replace these configs with your cluster
            final String kafkaServerUrl = "<Kafka Service Url>";
            final String pulsarServerUrl = "<Pulsar Service Http Url>";
            final String jwtToken = "<API Key>";
            final String token = "token:" + jwtToken;
    
            final String topicName = "topic-with-kafka-format";
            final String namespace = "public/default";
    
            final Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerUrl);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            // 1. Create a producer with token authentication, which is equivalent to SASL/PLAIN mechanism in Kafka
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            props.put("sasl.mechanism", "PLAIN");
            props.put("sasl.jaas.config", String.format(
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
                    namespace, token));
    
            // 2. Create a producer
            final KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    
            // 3. Produce messages with Kafka producer
            for (int i = 0; i < 5; i++) {
                String value = "hello world";
                final Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<>(topicName, value));
                final RecordMetadata recordMetadata = recordMetadataFuture.get();
                System.out.println("Send " + value + " to " + recordMetadata);
            }
            producer.close();
    
            // 4. Consume messages with Pulsar consumer
            PulsarClient client = PulsarClient.builder()
                    .serviceUrl(pulsarServerUrl)
                    .authentication(AuthenticationFactory.token(jwtToken))
                    .build();
    
            Consumer<byte[]> consumer = client.newConsumer()
                    .topic(topicName)
                    .subscriptionName("test")
                    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                    // Set the Kafka payload processor to decode the kafka format message
                    .messagePayloadProcessor(new KafkaPayloadProcessor())
                    .subscribe();
    
            for (int i = 0; i < 5; i++) {
                Message<byte[]> msg = consumer.receive();
                consumer.acknowledge(msg);
                System.out.println("Receive message " + new String(msg.getData()));
            }
    
            consumer.close();
            client.close();
        }
    }
    

Step to Produce msg with Pulsar and Consume msg with Kafka client

  1. Create topic with pulsar format via Pulsar admin CLI

    bin/pulsar-admin \
      --admin-url <Pulsar Service Http Url> \
      --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
      --auth-params token:<API Key> \
      topics create-partitioned-topic persistent://public/default/topic-with-pulsar-format -p 3 --metadata kafkaEntryFormat=pulsar
    
    bin/pulsar-admin \
      --admin-url <Pulsar Service Http Url> \
      --auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
      --auth-params token:<API Key> \
      topics get-properties persistent://public/default/topic-with-pulsar-format
    
  2. Install client libraries

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.4.0</version>
    </dependency>
    
    <dependency>
      <groupId>org.apache.pulsar</groupId>
      <artifactId>pulsar-client</artifactId>
      <version>3.1.0</version>
    </dependency>
    
  3. Pulsar Producer and Kafka Consumer with Pulsar format

    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.pulsar.client.api.AuthenticationFactory;
    import org.apache.pulsar.client.api.MessageId;
    import org.apache.pulsar.client.api.Producer;
    import org.apache.pulsar.client.api.PulsarClient;
    
    import java.io.IOException;
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    public class PulsarFormatPulsarProduceKafkaConsume {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
            // Replace these configs with your cluster
            final String kafkaServerUrl = "<Kafka Service Url>";
            final String pulsarServerUrl = "<Pulsar Service Http Url>";
            final String jwtToken = "<API Key>";
            final String token = "token:" + jwtToken;
    
            final String topicName = "topic-with-pulsar-format";
            final String namespace = "public/default";
    
            // 1. Create a Pulsar producer
            PulsarClient client = PulsarClient.builder()
                    .serviceUrl(pulsarServerUrl)
                    .authentication(AuthenticationFactory.token(jwtToken))
                    .build();
    
            Producer<byte[]> producer = client.newProducer()
                    .topic(topicName)
                    .create();
    
            // 2. Produce messages with Pulsar producer
            for (int i = 0; i < 5; i++) {
                String message = "my-message-" + i;
                MessageId msgId = producer.send(message.getBytes());
                System.out.println("Publish " + "my-message-" + i
                        + " and message ID " + msgId);
            }
            producer.close();
            client.close();
    
            // 3. Consume messages with Pulsar consumer
            final Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServerUrl);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
    
            // 4. Create a producer with token authentication, which is equivalent to SASL/PLAIN mechanism in Kafka
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            props.put("sasl.mechanism", "PLAIN");
            props.put("sasl.jaas.config", String.format(
                    "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s\" password=\"%s\";",
                    namespace, token));
    
            final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Collections.singleton(topicName));
    
            // 5. Consume messages with Kafka consumer
            boolean running = true;
            while (running) {
                System.out.println("running");
                final ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                if (!records.isEmpty()) {
                    records.forEach(record -> System.out.println("Receive record: " + record.value() + " from "
                            + record.topic() + "-" + record.partition() + "@" + record.offset()));
                    running = false;
                }
            }
            consumer.close();
        }
    }
    

Interoperability between Kafka and Pulsar Transactions

Currently, it is not possible to interoperate Kafka and Pulsar transactions together. You must choose one or the other because they use different mechanisms to store transactional states.

Interoperability between Kafka and Pulsar Schema

StreamNative Cloud supports both the Kafka and Pulsar schema registries as central repositories to store registered schema information, which enables producers and consumers to coordinate the schema of a topic's messages through brokers. However, Kafka schemas and Pulsar schemas cannot be used simultaneously due to their differing API definitions and schema storage locations.

We also plan to achieve a unified schema registry, which will support both Kafka and Pulsar schemas. This will allow for the exchangeable production and consumption of messages with schema using both Pulsar and Kafka clients:

  • Pulsar consumers will be able to consume messages with schema produced by Kafka producers.
  • Kafka consumers will be able to consume messages with schema produced by Pulsar producers.
Previous
Multi Tenancy