1. Build Applications
  2. Kafka Clients

Connect to your cluster using the Kafka C client (rdkafka)

Note

This QuickStart assumes that you have created a StreamNative cluster with the Kafka protocol enabled, created a service account, and granted the service account produce and consume permissions to a namespace for the target topic.

This document describes how to connect to your StreamNative cluster using the Kafka C client using API Keys authentication.

Before you begin

Note

  • Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.
  • The password for different utilities as kcat will be equal to token:<API KEY>.

You can follow the instructions to create an API key for the service account you choose to use.

Steps

  1. Install the Kafka C client. See https://github.com/confluentinc/librdkafka?tab=readme-ov-file#installation

  2. Build a C application to produce and consume messages.

    // Note: for code simplicity, this example does not check the return value of
    // the rd_kafka_* APIs.
    #include <librdkafka/rdkafka.h>
    #include <stdatomic.h>
    #include <string.h>
    
    static void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *msg, void *opaque) {
      if (msg->err == RD_KAFKA_RESP_ERR_NO_ERROR) {
        printf("Sent message %s to %lu\n", (char *)msg->payload, (unsigned long)msg->offset);
        atomic_fetch_add((atomic_int *)opaque, 1);
      } else {
        fprintf(stderr, "Received error: %s\n", rd_kafka_err2str(msg->err));
      }
    }
    
    int main(int argc, char *argv[]) {
      const char *bootstrap_servers = "<bootstrap-servers>";
      const char *topic = "my-topic";
      const char *username = "public";  // the tenant name
      const char *token = "token:<api-key>";
      const char *group = "my-group";
      char errstr[1024];
    
      rd_kafka_conf_t *consumer_conf = rd_kafka_conf_new();
      rd_kafka_conf_set(consumer_conf, "bootstrap.servers", bootstrap_servers, errstr, sizeof(errstr));
      rd_kafka_conf_set(consumer_conf, "group.id", group, errstr, sizeof(errstr));
      rd_kafka_conf_set(consumer_conf, "auto.offset.reset", "earliest", errstr, sizeof(errstr));
      rd_kafka_conf_set(consumer_conf, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr));
      rd_kafka_conf_set(consumer_conf, "security.protocol", "SASL_SSL", errstr, sizeof(errstr));
      rd_kafka_conf_set(consumer_conf, "sasl.username", username, errstr, sizeof(errstr));
      rd_kafka_conf_set(consumer_conf, "sasl.password", token, errstr, sizeof(errstr));
    
      rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, consumer_conf, errstr, sizeof(errstr));
      if (!consumer) {
        fprintf(stderr, "Failed to create consumer: %s\n", errstr);
        return 1;
      }
    
      rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);
      rd_kafka_topic_partition_list_add(topics, topic, RD_KAFKA_PARTITION_UA);
      rd_kafka_subscribe(consumer, topics);
      rd_kafka_topic_partition_list_destroy(topics);
    
      rd_kafka_conf_t *producer_conf = rd_kafka_conf_new();
      rd_kafka_conf_set(producer_conf, "bootstrap.servers", bootstrap_servers, errstr, sizeof(errstr));
      rd_kafka_conf_set(producer_conf, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr));
      rd_kafka_conf_set(producer_conf, "security.protocol", "SASL_SSL", errstr, sizeof(errstr));
      rd_kafka_conf_set(producer_conf, "sasl.username", username, errstr, sizeof(errstr));
      rd_kafka_conf_set(producer_conf, "sasl.password", token, errstr, sizeof(errstr));
    
      rd_kafka_conf_set_dr_msg_cb(producer_conf, dr_msg_cb);
      atomic_int count = ATOMIC_VAR_INIT(0);
      rd_kafka_conf_set_opaque(producer_conf, &count);
    
      rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, producer_conf, errstr, sizeof(errstr));
      if (!producer) {
        fprintf(stderr, "Failed to create producer: %s\n", errstr);
        return 1;
      }
    
      const int num_messages = 10;
      for (int i = 0; i < 10; i++) {
        char value[128];
        snprintf(value, sizeof(value), "msg-%d", i);
        rd_kafka_producev(producer, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
                          RD_KAFKA_V_VALUE(&value[0], strlen(value)), RD_KAFKA_V_OPAQUE(NULL),
                          RD_KAFKA_V_END);
      }
      // Poll until the message is sent
      int num_produce_done;
      while ((num_produce_done = atomic_load(&count)) < num_messages) {
        rd_kafka_poll(producer, 1);
      }
      printf("%d messages are loaded\n", num_messages);
    
      for (int i = 0; i < num_messages;) {
        rd_kafka_message_t *msg = rd_kafka_consumer_poll(consumer, 10);
        if (!msg) continue;
    
        printf("Received msg %s from %s-%d@%lu\n", (char *)msg->payload, topic, msg->partition,
               (unsigned long)msg->offset);
        i++;
        rd_kafka_message_destroy(msg);
      }
    
      rd_kafka_destroy(producer);
      rd_kafka_consumer_close(consumer);
      rd_kafka_destroy(consumer);
      return 0;
    }
    
    • <bootstrap-servers>: the Kafka service URL of your StreamNative cluster.
    • <api-key>: an API key of your service account.

    The code example uses C11 standard, so you need to compile it like:

    gcc main.c -std=c11 -lrdkafka
    
  3. Run the Go application and you should see the following output:

    Sent message msg-0 to 0
    Sent message msg-1 to 1
    Sent message msg-2 to 2
    Sent message msg-3 to 3
    Sent message msg-4 to 4
    Sent message msg-5 to 5
    Sent message msg-6 to 6
    Sent message msg-7 to 7
    Sent message msg-8 to 8
    Sent message msg-9 to 9
    10 messages are loaded
    Received msg msg-0 from my-topic-0@0
    Received msg msg-1 from my-topic-0@1
    Received msg msg-2 from my-topic-0@2
    Received msg msg-3 from my-topic-0@3
    Received msg msg-4 from my-topic-0@4
    Received msg msg-5 from my-topic-0@5
    Received msg msg-6 from my-topic-0@6
    Received msg msg-7 from my-topic-0@7
    Received msg msg-8 from my-topic-0@8
    Received msg msg-9 from my-topic-0@9
    
Previous
Kafka Java