protocol-handler
KoP - Kafka on Pulsar
Kafka-on-Pulsar - A protocol handler that brings native Kafka protocol to Apache Pulsar
Authored by
BewareMyPower,Demogorgon314,jiazhai,eolivelli
Support type
streamnative
License
Apache License 2.0

KoP (Kafka on Pulsar) brings the native Apache Kafka protocol support to Apache Pulsar by introducing a Kafka protocol handler on Pulsar brokers. By adding the KoP protocol handler to your existing Pulsar cluster, you can migrate your existing Kafka applications and services to Pulsar without modifying the code. This enables Kafka applications to leverage Pulsar’s powerful features, such as:

  • Streamlined operations with enterprise-grade multi-tenancy
  • Simplified operations with a rebalance-free architecture
  • Infinite event stream retention with Apache BookKeeper and tiered storage
  • Serverless event processing with Pulsar Functions

KoP, implemented as a Pulsar protocol handler plugin with the protocol name "kafka", is loaded when Pulsar broker starts. It helps reduce the barriers for people adopting Pulsar to achieve their business success by providing a native Kafka protocol support on Apache Pulsar. By integrating the two popular event streaming ecosystems, KoP unlocks new use cases. You can leverage advantages from each ecosystem and build a truly unified event streaming platform with Apache Pulsar to accelerate the development of real-time applications and services.

KoP implements the Kafka wire protocol on Pulsar by leveraging the existing components (such as topic discovery, the distributed log library - ManagedLedger, cursors and so on) that Pulsar already has.

The following figure illustrates how the Kafka-on-Pulsar protocol handler is implemented within Pulsar.

Get Started with KoP

If you have an Apache Pulsar cluster, you can enable Kafka-on-Pulsar on your existing Pulsar cluster by downloading and installing the KoP protocol handler to Pulsar brokers directly. It takes three steps:

  1. Download KoP protocol handler, or build the ./kafka-impl/target/pulsar-protocol-handler-kafka-{{protocol:version}}.nar file, and then copy it to your Pulsar protocols directory.
  2. Set the configuration of the KoP protocol handler in Pulsar broker.conf or standalone.conf files.
  3. Restart Pulsar brokers to load KoP protocol handler.

Then you can start your broker and use KoP.

This getting-started guide offers several ways to get started with KoP:

  • Setting up an existing Pulsar cluster to run KoP based on steps above
  • Using Docker Compose with a standalone pulsar (all in one, including Zookeeper, Bookkeeper and Pulsar), including configuration needed for KoP
  • Using Docker Compose with a service for Zookeeper, Bookkeeper and Pulsar

Once KoP is installed and running in Pulsar, follow the instruction in "Validating KoP is running correctly" section to validate KoP is working.

Setting up an existing Pulsar Cluster to run KoP

Step 1: Get KoP protocol handler

This section describes how to get the KoP protocol handler.

Download KoP protocol handler

StreamNative provide ready-to-use KoP docker images. You can also download the KoP protocol handler directly to deploy with the official Apache Pulsar docker images or the Pulsar binaries.

Build KoP protocol handler from source code

To build the KoP protocol handler from the source, follow these steps:

  1. Clone the KoP GitHub project to your local.

    git clone https://github.com/streamnative/kop.git
    cd kop
    
  2. Build the project.

    mvn clean install -DskipTests
    
  3. Get the .nar file in the following directory and copy it to your Pulsar protocols directory. You need to create the protocols folder in Pulsar if it's the first time you use protocol handlers.

    ./kafka-impl/target/pulsar-protocol-handler-kafka-{{protocol:version}}.nar
    

Step 2: Set configuration for KoP

After you copy the .nar file to your Pulsar /protocols directory, you need to configure the Pulsar broker to run the KoP protocol handler as a plugin by adding configurations in the Pulsar configuration file broker.conf or standalone.conf.

  1. Set the configuration of the KoP protocol handler in broker.conf or standalone.conf file.

    messagingProtocols=kafka
    protocolHandlerDirectory=./protocols
    allowAutoTopicCreationType=partitioned
    narExtractionDirectory=/path/to/nar
    
    PropertyDefault valueProposed value
    messagingProtocolskafka
    protocolHandlerDirectory./protocolsLocation of KoP NAR file
    allowAutoTopicCreationTypenon-partitionedpartitioned
    narExtractionDirectory/tmp/pulsar-narLocation of unpacked KoP NAR file

    By default, allowAutoTopicCreationType is set to non-partitioned. Since topics are partitioned by default in Kafka, it's better to avoid creating non-partitioned topics for Kafka clients unless Kafka clients need to interact with existing non-partitioned topics.

    By default, the /tmp/pulsar-nar directory is under the /tmp directory. If we unpack the KoP NAR file into the /tmp directory, some classes could be automatically deleted by the system, which will generate a ClassNotFoundException or NoClassDefFoundError error. Therefore, it is recommended to set the narExtractionDirectory option to another path.

  2. Set Kafka listeners.

    # Use `kafkaListeners` here for KoP 2.8.0 because `listeners` is marked as deprecated from KoP 2.8.0 
    kafkaListeners=PLAINTEXT://0.0.0.0:9092
    # This config is not required unless you want to expose another address to the Kafka client.
    # If it’s not configured, it will be the same with `kafkaListeners` config by default
    kafkaAdvertisedListeners=PLAINTEXT://127.0.0.1:9092
    
    • kafkaListeners is a comma-separated list of listeners and the host/IP and port to which Kafka binds to for listening.
    • kafkaAdvertisedListeners is a comma-separated list of listeners with their host/IP and port.
  3. Set offset management as below, since offset management for KoP depends on Pulsar "Broker Entry Metadata". It’s required for KoP 2.8.0 or higher version.

    brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
    
  4. Disable the deletion of inactive topics. It’s not required but very important in KoP. Currently, Pulsar deletes inactive partitions of a partitioned topic while the metadata of the partitioned topic is not deleted. KoP cannot create missed partitions in this case.

    brokerDeleteInactiveTopicsEnabled=false
    

Step 3: Load KoP by restarting Pulsar brokers

After you have installed the KoP protocol handler to Pulsar broker, you can restart the Pulsar brokers to load KoP if you have configured the conf/broker.conf file. For a quick start, you can configure the conf/standalone.conf file and run a Pulsar standalone.

Run KoP on Standalone Pulsar in Docker Compose

KoP is a built-in component in StreamNative's sn-pulsar image, whose tag matches KoP's version. Take KoP 2.9.1.1 for example, you can execute docker compose up command in the KoP project directory to start a Pulsar standalone with KoP being enabled. KoP has a single advertised listener 127.0.0.1:19092, so you should use Kafka's CLI tool to connect KoP, as shown below:

$ ./bin/kafka-console-producer.sh --bootstrap-server localhost:19092 --topic my-topic                 
>hello
>world
                                                                                                                                                         $ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:19092 --topic my-topic --from-beginning
hello
world

See docker-compose.yml for more details.

Similar to configuring KoP in a cluster that is started in Docker, you only need to add the environment variable according to your customized configuration and ensure to execute bin/apply-config-from-env.py conf/broker.conf before executing bin/pulsar broker. The environment variable should be a property's key if it already exists in the configuration file. Otherwise, it should have the prefix PULSAR_PREFIX_.

Run KoP in Pulsar with component for each system using Docker Compose

The Docker compose file is docker-compose-cluster.yml and contains Pulsar image which is bundled with the KoP plugin, and the required configuration both for Pulsar and KoP. The Docker compose file will create a directory named data containing the data directories for ZK, BK and Pulsar broker, allowing you to preserve data across restarts

You can start the cluster using the following command:

docker compose -f docker-compose-cluster.yaml up -d

You can follow Pulsar's broker logs by using this command:

docker logs -f broker

Once you see the following log line, you know Pulsar is up and ready to be validated.

2023-02-28T15:45:06,358+0000 [main] INFO  org.apache.pulsar.PulsarBrokerStarter - PulsarService started.

Validating KoP is running correctly

You can verify if your KoP works well by running a Kafka client. Use can you Kafka 2.x if you use Pulsar 2.10.x. You can use Kafka 3.x only if you use Pulsar 2.11.x and above.

  1. Download Kafka 2.0.0 and untar the release package.

    tar -xzf kafka_2.11-2.0.0.tgz
    cd kafka_2.11-2.0.0
    
  2. Verify the KoP by using a Kafka producer and a Kafka consumer. Kafka binary contains a command-line producer and consumer.

    1. Run the command-line producer and send messages to the server.

      > bin/kafka-console-producer.sh --broker-list [pulsar-broker-address]:9092 --topic test
      This is a message
      This is another message
      
    2. Run the command-line consumer to receive messages from the server.

      > bin/kafka-console-consumer.sh --bootstrap-server [pulsar-broker-address]:9092 --topic test --from-beginning
      This is a message
      This is another message
      

Important note You can't use the option --zookeeper when working with Kafka command line or programmatically since it won't go through KoP. Only use --bootstrap-server option.

How to use KoP

You can configure and manage KoP based on your requirements. Check the following guides for more details.

NOTE

The following links are invalid when you check this document in the master branch from GitHub. You can go to the same chapter of the README for the correct links.

The followings are important information when you configure and use KoP.

  • Set both retention and time to live (TTL) for KoP topics. If you only configure retention without configuring TTL, all messages of KoP topics cannot be deleted because KoP does not update a durable cursor.
  • If a Pulsar consumer and a Kafka consumer both subscribe the same topic with the same subscription (or group) name, the two consumers consume messages independently and they do not share the same subscription though the subscription name of a Pulsar client is the same with the group name of a Kafka client.
  • KoP supports interaction between Pulsar client and Kafka client by default. If your topic is used only by the Pulsar client or only by the Kafka client, you can set entryFormat=kafka for better performance.