> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Connect to your cluster using the Pulsar Rust client

This document describes how to connect to a cluster using a Rust client, and use the Rust producer and consumer to produce and consume messages to and from a topic. The Rust client supports connecting to a StreamNative cluster using either [OAuth2](#use-oauth2) or [API Keys](#use-apikeys) authentication.

<Note title="Note">
  This document assumes that you have created a StreamNative cluster and a service account, and have granted the service account `produce` and `consume` permissions to the namespace for the target topic.
</Note>

## Prerequisites

See [the minimum supported versions required for the underlying libraries](https://github.com/streamnative/pulsar-rs#getting-started) for more details.

## Connect to your cluster using API keys

<span id="use-apikeys" />

To connect a StreamNative cluster using [API keys](/cloud/security/authentication/service-accounts/use-api-keys/api-keys-overview), follow these steps.

### Step 1: Get the broker service URL of your cluster

To get the service URL(s) of a StreamNative cluster, follow these steps.

<Tabs>
  <Tab title="StreamNative Console">
    1. Navigate to the **Cluster Dashboard** page by [switching to the cluster workspace](/cloud/get-started/cloud-console#switch-a-cluster).

    2. On the **Cluster Dashboard** page, click **Details** tab.

    3. You will see the available service URLs in the **Access Points** area.

    4. You can click **Copy** at the end of the row of the service URL that you want to use.
  </Tab>
</Tabs>

### Step 2: Create an API key of your service account

<Note title="Note">
  Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.
</Note>

You can follow the instructions to [create an API key](/cloud/security/authentication/service-accounts/use-api-keys/api-keys-overview#using-api-keys-to-connect-to-your-cluster) for the service account you choose to use.

### Step 3: Connect to your cluster

For a complete example of how to connect to a cluster through the Rust client, see [Rust client examples](https://github.com/streamnative/cloud-manager/tree/master/ui/src/data/code/clients/rust).

#### Create a Rust consumer to consume messages

You can create and configure a Rust consumer to consume messages using Token authentication as follows. For more information about the placeholders in the code sample, see [parameters for Token authentication](/cloud/build/pulsar-clients/cloud-connect-rust#parameters-for-token-authentication).

```rust theme={null}
use futures::TryStreamExt;
use pulsar::{Authentication, Consumer, ConsumerOptions, Pulsar, SubType, TokioExecutor};
use pulsar::consumer::InitialPosition;

#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
    env_logger::init();

    let addr = "${brokerServiceURL}".to_string();
    let mut builder = Pulsar::builder(addr, TokioExecutor);
    let token = "${apikey}".to_string();
    builder = builder.with_auth(Authentication {
        name: "token".to_string(),
        data: token.into_bytes(),
    });

    let pulsar: Pulsar<_> = builder.build().await?;
    let mut consumer: Consumer<String, _> = pulsar
        .consumer()
        .with_topic("persistent://${tenant}/${namespace}/${topic}")
        .with_subscription_type(SubType::Exclusive)
        .with_subscription("${subscription}")
        .with_options(ConsumerOptions::default()
            .with_initial_position(InitialPosition::Earliest))
        .build()
        .await?;

    let mut counter = 0usize;
    while let Some(msg) = consumer.try_next().await? {
        consumer.ack(&msg).await?;
        let payload = match msg.deserialize() {
            Ok(payload) => payload,
            Err(e) => {
                println!("could not deserialize message: {:?}", e);
                break;
            }
        };

        counter += 1;
        println!("Received message '{:?}' id='{:?}'", payload, msg.message_id());

        if counter > 10 {
            consumer.close().await.expect("Unable to close consumer");
            break;
        }
    }

    Ok(())
}
```

#### Create a Rust producer to produce messages

You can create and configure a Rust producer to produce messages using Token authentication as follows. For more information about the placeholders in the code sample, see [parameters for Token authentication](/cloud/build/pulsar-clients/cloud-connect-rust#parameters-for-token-authentication).

```rust theme={null}
use pulsar::{Authentication, Pulsar, TokioExecutor};

#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
    env_logger::init();

    let addr = "${brokerServiceURL}".to_string();
    let mut builder = Pulsar::builder(addr, TokioExecutor);
    let token = "${apikey}".to_string();
    builder = builder.with_auth(Authentication {
        name: "token".to_string(),
        data: token.into_bytes(),
    });

    let pulsar: Pulsar<_> = builder.build().await?;
    let mut producer = pulsar
        .producer()
        .with_topic("persistent://${tenant}/${namespace}/${topic}")
        .build()
        .await?;

    let mut counter = 0usize;
    loop {
        producer
            .send(format!("Hello-{}", counter))
            .await?
            .await
            .unwrap();

        counter += 1;
        println!("{counter} messages");

        if counter > 10 {
            producer.close().await.expect("Unable to close connection");
            break;
        }
    }

    Ok(())
}
```

#### Parameters for Token authentication

* `${brokerServiceURL}`: the broker service URL of your StreamNative cluster.
* `${apikey}`: an API key of your service account.
* `${tenant}/${namespace}/${topic}`: the full name of the topic for message production & consumption. It is a combination of the tenant name, the namespace name and the topic name.
* `${subscription}`: the name of the subscription that will determine how messages are delivered.

## Connect to your cluster using OAuth2 authentication

<span id="use-oauth2" />

To connect a StreamNative cluster using OAuth2 authentication, follow these steps.

### Step 1: Get the broker service URL of your cluster

To get the service URL(s) of a StreamNative cluster, follow these steps.

<Tabs>
  <Tab title="StreamNative Console">
    1. Navigate to the **Cluster Dashboard** page by [switching to the cluster workspace](/cloud/get-started/cloud-console#switch-a-cluster).

    2. On the **Cluster Dashboard** page, click **Details** tab.

    3. You will see the available service URLs in the **Access Points** area.

    4. You can click **Copy** at the end of the row of the service URL that you want to use.
  </Tab>
</Tabs>

### Step 2: Get the OAuth2 credential file of your service account

To get an OAuth2 credential file of a service account through the StreamNative Console, follow these steps.

1. On the left navigation pane, click **Service Accounts**.

2. In the row of the service account you want to use, in the **Key File** column, click the **Download** icon to download the OAuth2 credential file to your local directory.

   The OAuth2 credential file should be something like this:

   ```json theme={null}
   {
     "type": "SN_SERVICE_ACCOUNT",
     "client_id": "CLIENT_ID",
     "client_secret": "CLIENT_SECRET",
     "client_email": "test@auth.streamnative.cloud",
     "issuer_url": "https://auth.streamnative.cloud"
   }
   ```

### Step 3: Connect to your cluster

For a complete example of how to connect to a cluster using the Rust client, see [Rust client examples](https://github.com/streamnative/cloud-manager/tree/master/ui/src/data/code/clients/rust).

#### Create a Rust consumer to consume messages

You can create and configure a Rust consumer to consume messages using the OAuth2 credential file as follows. For more information about the placeholders in the code sample, see [parameters for OAuth2 authentication](/cloud/build/pulsar-clients/cloud-connect-rust#parameters-for-oauth2-authentication).

```rust theme={null}
use futures::TryStreamExt;
use pulsar::{Consumer, ConsumerOptions, Pulsar, SubType, TokioExecutor};
use pulsar::authentication::oauth2::{OAuth2Authentication, OAuth2Params};
use pulsar::consumer::InitialPosition;

#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
    env_logger::init();

    let addr = "${brokerServiceURL}".to_string();
    let mut builder = Pulsar::builder(addr, TokioExecutor);
    builder = builder.with_auth_provider(OAuth2Authentication::client_credentials(OAuth2Params {
        issuer_url: "https://auth.streamnative.cloud/".to_string(),
        credentials_url: "file:///YOUR-KEY-FILE-PATH".to_string(), // Absolute path of your downloaded key file
        audience: Some("urn:sn:pulsar:${orgName}:${instanceName}".to_string()),
        scope: None,
    }));

    let pulsar: Pulsar<_> = builder.build().await?;
    let mut consumer: Consumer<String, _> = pulsar
        .consumer()
        .with_topic("persistent://${tenant}/${namespace}/${topic}")
        .with_subscription_type(SubType::Exclusive)
        .with_subscription("${subscription}")
        .with_options(ConsumerOptions::default()
            .with_initial_position(InitialPosition::Earliest))
        .build()
        .await?;

    let mut counter = 0usize;
    while let Some(msg) = consumer.try_next().await? {
        consumer.ack(&msg).await?;
        let payload = match msg.deserialize() {
            Ok(payload) => payload,
            Err(e) => {
                println!("could not deserialize message: {:?}", e);
                break;
            }
        };

        counter += 1;
        println!("Received message '{:?}' id='{:?}'", payload, msg.message_id());

        if counter > 10 {
            consumer.close().await.expect("Unable to close consumer");
            break;
        }
    }

    Ok(())
}
```

#### Create a Rust producer to produce messages

You can create and configure a Rust producer to produce messages using the OAuth2 credential file as follows. For more information about the placeholders in the code sample, see [parameters for OAuth2 authentication](/cloud/build/pulsar-clients/cloud-connect-rust#parameters-for-oauth2-authentication).

```rust theme={null}
use pulsar::{Pulsar, TokioExecutor};
use pulsar::authentication::oauth2::{OAuth2Authentication, OAuth2Params};

#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
    env_logger::init();

    let addr = "${brokerServiceURL}".to_string();
    let mut builder = Pulsar::builder(addr, TokioExecutor);
    builder = builder.with_auth_provider(OAuth2Authentication::client_credentials(OAuth2Params {
        issuer_url: "https://auth.streamnative.cloud/".to_string(),
        credentials_url: "file:///YOUR-KEY-FILE-PATH".to_string(), // Absolute path of your downloaded key file
        audience: Some("urn:sn:pulsar:${orgName}:${instanceName}".to_string()),
        scope: None,
    }));

    let pulsar: Pulsar<_> = builder.build().await?;
    let mut producer = pulsar
        .producer()
        .with_topic("persistent://${tenant}/${namespace}/${topic}")
        .build()
        .await?;

    let mut counter = 0usize;
    loop {
        producer
            .send(format!("Hello-{}", counter))
            .await?
            .await
            .unwrap();

        counter += 1;
        println!("{counter} messages");

        if counter > 10 {
            producer.close().await.expect("Unable to close connection");
            break;
        }
    }

    Ok(())
}
```

#### Parameters for OAuth2 authentication

* `${brokerServiceURL}`: the broker service URL of your StreamNative cluster.
* `credentials_url`: your downloaded OAuth2 credential. This parameter supports the following two pattern formats:
  * `file:///path/to/file`: the path to your downloaded OAuth2 credential file.
  * `data:application/json;base64,<base64-encoded value>`: the credential file content encoded into Base64 format.
* `audience`: the [Uniform Resource Name (URN)](/cloud/references/glossary#urn), which is a combination of the `urn:sn:pulsar`, your organization name, and your Pulsar instance name.
  * `${orgName}`: the name of your [organization](/cloud/references/glossary#organization).
  * `${instanceName}`: the name of your [instance](/cloud/references/glossary#instance).
* `${tenant}/${namespace}/${topic}`: the full name of the topic for message production & consumption. It is a combination of the tenant name, the namespace name and the topic name.
* `${subscription}`: the name of the subscription that will determine how messages are delivered.
