> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Connect to your cluster using the Kafka C client (rdkafka)

<Note title="Note">
  This QuickStart assumes that you have created a StreamNative cluster with the Kafka protocol enabled, created a service account, and granted the service account `produce` and `consume` permissions to a namespace for the target topic.
</Note>

This document describes how to connect to your StreamNative cluster using the Kafka C client using [API Keys](/cloud/security/authentication/service-accounts/use-api-keys/api-keys-overview#kafka-clients) authentication.

## Before you begin

<Note title="Note">
  * Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.
  * The password for different utilities as `kcat` will be equal to `token:<API KEY>`.
</Note>

You can follow the instructions to [create an API key](/cloud/security/authentication/service-accounts/use-api-keys/api-keys-overview#using-api-keys-to-connect-to-your-cluster) for the service account you choose to use.

## Steps

1. Install the Kafka C client. See [https://github.com/confluentinc/librdkafka?tab=readme-ov-file#installation](https://github.com/confluentinc/librdkafka?tab=readme-ov-file#installation)

2. Build a C application to produce and consume messages.

   ```c theme={null}
   // Note: for code simplicity, this example does not check the return value of
   // the rd_kafka_* APIs.
   #include <librdkafka/rdkafka.h>
   #include <stdatomic.h>
   #include <string.h>

   static void dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *msg, void *opaque) {
     if (msg->err == RD_KAFKA_RESP_ERR_NO_ERROR) {
       printf("Sent message %s to %lu\n", (char *)msg->payload, (unsigned long)msg->offset);
       atomic_fetch_add((atomic_int *)opaque, 1);
     } else {
       fprintf(stderr, "Received error: %s\n", rd_kafka_err2str(msg->err));
     }
   }

   int main(int argc, char *argv[]) {
     const char *bootstrap_servers = "<bootstrap-servers>";
     const char *topic = "my-topic";
     const char *username = "public";  // the tenant name
     const char *token = "token:<api-key>";
     const char *group = "my-group";
     char errstr[1024];

     rd_kafka_conf_t *consumer_conf = rd_kafka_conf_new();
     rd_kafka_conf_set(consumer_conf, "bootstrap.servers", bootstrap_servers, errstr, sizeof(errstr));
     rd_kafka_conf_set(consumer_conf, "group.id", group, errstr, sizeof(errstr));
     rd_kafka_conf_set(consumer_conf, "auto.offset.reset", "earliest", errstr, sizeof(errstr));
     rd_kafka_conf_set(consumer_conf, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr));
     rd_kafka_conf_set(consumer_conf, "security.protocol", "SASL_SSL", errstr, sizeof(errstr));
     rd_kafka_conf_set(consumer_conf, "sasl.username", username, errstr, sizeof(errstr));
     rd_kafka_conf_set(consumer_conf, "sasl.password", token, errstr, sizeof(errstr));
     // Note: Ursa does not support read_committed for now
     rd_kafka_conf_set(consumer_conf, "isolation.level", "read_uncommitted", errstr, sizeof(errstr));

     rd_kafka_t *consumer = rd_kafka_new(RD_KAFKA_CONSUMER, consumer_conf, errstr, sizeof(errstr));
     if (!consumer) {
       fprintf(stderr, "Failed to create consumer: %s\n", errstr);
       return 1;
     }

     rd_kafka_topic_partition_list_t *topics = rd_kafka_topic_partition_list_new(1);
     rd_kafka_topic_partition_list_add(topics, topic, RD_KAFKA_PARTITION_UA);
     rd_kafka_subscribe(consumer, topics);
     rd_kafka_topic_partition_list_destroy(topics);

     rd_kafka_conf_t *producer_conf = rd_kafka_conf_new();
     rd_kafka_conf_set(producer_conf, "bootstrap.servers", bootstrap_servers, errstr, sizeof(errstr));
     rd_kafka_conf_set(producer_conf, "sasl.mechanisms", "PLAIN", errstr, sizeof(errstr));
     rd_kafka_conf_set(producer_conf, "security.protocol", "SASL_SSL", errstr, sizeof(errstr));
     rd_kafka_conf_set(producer_conf, "sasl.username", username, errstr, sizeof(errstr));
     rd_kafka_conf_set(producer_conf, "sasl.password", token, errstr, sizeof(errstr));

     rd_kafka_conf_set_dr_msg_cb(producer_conf, dr_msg_cb);
     atomic_int count = ATOMIC_VAR_INIT(0);
     rd_kafka_conf_set_opaque(producer_conf, &count);

     rd_kafka_t *producer = rd_kafka_new(RD_KAFKA_PRODUCER, producer_conf, errstr, sizeof(errstr));
     if (!producer) {
       fprintf(stderr, "Failed to create producer: %s\n", errstr);
       return 1;
     }

     const int num_messages = 10;
     for (int i = 0; i < 10; i++) {
       char value[128];
       snprintf(value, sizeof(value), "msg-%d", i);
       rd_kafka_producev(producer, RD_KAFKA_V_TOPIC(topic), RD_KAFKA_V_MSGFLAGS(RD_KAFKA_MSG_F_COPY),
                         RD_KAFKA_V_VALUE(&value[0], strlen(value)), RD_KAFKA_V_OPAQUE(NULL),
                         RD_KAFKA_V_END);
     }
     // Poll until the message is sent
     int num_produce_done;
     while ((num_produce_done = atomic_load(&count)) < num_messages) {
       rd_kafka_poll(producer, 1);
     }
     printf("%d messages are loaded\n", num_messages);

     for (int i = 0; i < num_messages;) {
       rd_kafka_message_t *msg = rd_kafka_consumer_poll(consumer, 10);
       if (!msg) continue;

       printf("Received msg %s from %s-%d@%lu\n", (char *)msg->payload, topic, msg->partition,
              (unsigned long)msg->offset);
       i++;
       rd_kafka_message_destroy(msg);
     }

     rd_kafka_destroy(producer);
     rd_kafka_consumer_close(consumer);
     rd_kafka_destroy(consumer);
     return 0;
   }
   ```

   * `<bootstrap-servers>`: the Kafka service URL of your StreamNative cluster.
   * `<api-key>`: an API key of your service account.

   The code example uses C11 standard, so you need to compile it like:

   ```bash theme={null}
   gcc main.c -std=c11 -lrdkafka
   ```

3. Run the Go application and you should see the following output:

   ```bash theme={null}
   Sent message msg-0 to 0
   Sent message msg-1 to 1
   Sent message msg-2 to 2
   Sent message msg-3 to 3
   Sent message msg-4 to 4
   Sent message msg-5 to 5
   Sent message msg-6 to 6
   Sent message msg-7 to 7
   Sent message msg-8 to 8
   Sent message msg-9 to 9
   10 messages are loaded
   Received msg msg-0 from my-topic-0@0
   Received msg msg-1 from my-topic-0@1
   Received msg msg-2 from my-topic-0@2
   Received msg msg-3 from my-topic-0@3
   Received msg msg-4 from my-topic-0@4
   Received msg msg-5 from my-topic-0@5
   Received msg msg-6 from my-topic-0@6
   Received msg msg-7 from my-topic-0@7
   Received msg msg-8 from my-topic-0@8
   Received msg msg-9 from my-topic-0@9
   ```
