1. Build Applications
  2. Pulsar Clients

Connect to your cluster using the Pulsar Rust client

This document describes how to connect to a cluster using a Rust client, and use the Rust producer and consumer to produce and consume messages to and from a topic. The Rust client supports connecting to a StreamNative cluster using either OAuth2 or API Keys authentication.

Note

This document assumes that you have created a StreamNative cluster and a service account, and have granted the service account produce and consume permissions to the namespace for the target topic.

Prerequisites

See the minimum supported versions required for the underlying libraries for more details.

Connect to your cluster using API keys

To connect a StreamNative cluster using API keys, follow these steps.

Step 1: Get the broker service URL of your cluster

To get the service URL of a Pulsar cluster through the StreamNative Console, follow these steps.

  1. On the left navigation pane, in the Admin area, click Pulsar Clusters.

  2. Select the Details tab, and in the Access Points area, click Copy at the end of the row of the service URL.

Step 2: Create an API key of your service account

Note

Before using an API key, verify that the service account is authorized to access the resources, such as tenants, namespaces, and topics.

You can follow the instructions to create an API key for the service account you choose to use.

Step 3: Connect to your cluster

For a complete example of how to connect to a cluster through the Rust client, see Rust client examples.

Create a Rust consumer to consume messages

You can create and configure a Rust consumer to consume messages using Token authentication as follows. For more information about the placeholders in the code sample, see parameters for Token authentication.

use futures::TryStreamExt;
use pulsar::{Authentication, Consumer, ConsumerOptions, Pulsar, SubType, TokioExecutor};
use pulsar::consumer::InitialPosition;

#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
    env_logger::init();

    let addr = "${brokerServiceURL}".to_string();
    let mut builder = Pulsar::builder(addr, TokioExecutor);
    let token = "${apikey}".to_string();
    builder = builder.with_auth(Authentication {
        name: "token".to_string(),
        data: token.into_bytes(),
    });

    let pulsar: Pulsar<_> = builder.build().await?;
    let mut consumer: Consumer<String, _> = pulsar
        .consumer()
        .with_topic("persistent://${tenant}/${namespace}/${topic}")
        .with_subscription_type(SubType::Exclusive)
        .with_subscription("${subscription}")
        .with_options(ConsumerOptions::default()
            .with_initial_position(InitialPosition::Earliest))
        .build()
        .await?;

    let mut counter = 0usize;
    while let Some(msg) = consumer.try_next().await? {
        consumer.ack(&msg).await?;
        let payload = match msg.deserialize() {
            Ok(payload) => payload,
            Err(e) => {
                println!("could not deserialize message: {:?}", e);
                break;
            }
        };

        counter += 1;
        println!("Received message '{:?}' id='{:?}'", payload, msg.message_id());

        if counter > 10 {
            consumer.close().await.expect("Unable to close consumer");
            break;
        }
    }

    Ok(())
}

Create a Rust producer to produce messages

You can create and configure a Rust producer to produce messages using Token authentication as follows. For more information about the placeholders in the code sample, see parameters for Token authentication.

use pulsar::{Authentication, Pulsar, TokioExecutor};

#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
    env_logger::init();

    let addr = "${brokerServiceURL}".to_string();
    let mut builder = Pulsar::builder(addr, TokioExecutor);
    let token = "${apikey}".to_string();
    builder = builder.with_auth(Authentication {
        name: "token".to_string(),
        data: token.into_bytes(),
    });

    let pulsar: Pulsar<_> = builder.build().await?;
    let mut producer = pulsar
        .producer()
        .with_topic("persistent://${tenant}/${namespace}/${topic}")
        .build()
        .await?;

    let mut counter = 0usize;
    loop {
        producer
            .send(format!("Hello-{}", counter))
            .await?
            .await
            .unwrap();

        counter += 1;
        println!("{counter} messages");

        if counter > 10 {
            producer.close().await.expect("Unable to close connection");
            break;
        }
    }

    Ok(())
}

Parameters for Token authentication

  • ${brokerServiceURL}: the broker service URL of your StreamNative cluster.
  • ${apikey}: an API key of your service account.
  • ${tenant}/${namespace}/${topic}: the full name of the topic for message production & consumption. It is a combination of the tenant name, the namespace name and the topic name.
  • ${subscription}: the name of the subscription that will determine how messages are delivered.

Connect to your cluster using OAuth2 authentication

To connect a StreamNative cluster using OAuth2 authentication, follow these steps.

Step 1: Get the broker service URL of your cluster

To get the service URL of a Pulsar cluster through the StreamNative Console, follow these steps.

  1. On the left navigation pane, in the Admin area, click Pulsar Clusters.

  2. Select the Details tab, and in the Access Points area, click Copy at the end of the row of the service URL.

Step 2: Get the OAuth2 credential file of your service account

To get an OAuth2 credential file of a service account through the StreamNative Console, follow these steps.

  1. On the left navigation pane, click Service Accounts.

  2. In the row of the service account you want to use, in the Key File column, click the Download icon to download the OAuth2 credential file to your local directory.

    The OAuth2 credential file should be something like this:

    {
      "type": "SN_SERVICE_ACCOUNT",
      "client_id": "CLIENT_ID",
      "client_secret": "CLIENT_SECRET",
      "client_email": "[email protected]",
      "issuer_url": "https://auth.streamnative.cloud"
    }
    

Step 3: Connect to your cluster

For a complete example of how to connect to a cluster using the Rust client, see Rust client examples.

Create a Rust consumer to consume messages

You can create and configure a Rust consumer to consume messages using the OAuth2 credential file as follows. For more information about the placeholders in the code sample, see parameters for OAuth2 authentication.

use futures::TryStreamExt;
use pulsar::{Consumer, ConsumerOptions, Pulsar, SubType, TokioExecutor};
use pulsar::authentication::oauth2::{OAuth2Authentication, OAuth2Params};
use pulsar::consumer::InitialPosition;

#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
    env_logger::init();

    let addr = "${brokerServiceURL}".to_string();
    let mut builder = Pulsar::builder(addr, TokioExecutor);
    builder = builder.with_auth_provider(OAuth2Authentication::client_credentials(OAuth2Params {
        issuer_url: "https://auth.streamnative.cloud/".to_string(),
        credentials_url: "file:///YOUR-KEY-FILE-PATH".to_string(), // Absolute path of your downloaded key file
        audience: Some("urn:sn:pulsar:${orgName}:${instanceName}".to_string()),
        scope: None,
    }));

    let pulsar: Pulsar<_> = builder.build().await?;
    let mut consumer: Consumer<String, _> = pulsar
        .consumer()
        .with_topic("persistent://${tenant}/${namespace}/${topic}")
        .with_subscription_type(SubType::Exclusive)
        .with_subscription("${subscription}")
        .with_options(ConsumerOptions::default()
            .with_initial_position(InitialPosition::Earliest))
        .build()
        .await?;

    let mut counter = 0usize;
    while let Some(msg) = consumer.try_next().await? {
        consumer.ack(&msg).await?;
        let payload = match msg.deserialize() {
            Ok(payload) => payload,
            Err(e) => {
                println!("could not deserialize message: {:?}", e);
                break;
            }
        };

        counter += 1;
        println!("Received message '{:?}' id='{:?}'", payload, msg.message_id());

        if counter > 10 {
            consumer.close().await.expect("Unable to close consumer");
            break;
        }
    }

    Ok(())
}

Create a Rust producer to produce messages

You can create and configure a Rust producer to produce messages using the OAuth2 credential file as follows. For more information about the placeholders in the code sample, see parameters for OAuth2 authentication.

use pulsar::{Pulsar, TokioExecutor};
use pulsar::authentication::oauth2::{OAuth2Authentication, OAuth2Params};

#[tokio::main]
async fn main() -> Result<(), pulsar::Error> {
    env_logger::init();

    let addr = "${brokerServiceURL}".to_string();
    let mut builder = Pulsar::builder(addr, TokioExecutor);
    builder = builder.with_auth_provider(OAuth2Authentication::client_credentials(OAuth2Params {
        issuer_url: "https://auth.streamnative.cloud/".to_string(),
        credentials_url: "file:///YOUR-KEY-FILE-PATH".to_string(), // Absolute path of your downloaded key file
        audience: Some("urn:sn:pulsar:${orgName}:${instanceName}".to_string()),
        scope: None,
    }));

    let pulsar: Pulsar<_> = builder.build().await?;
    let mut producer = pulsar
        .producer()
        .with_topic("persistent://${tenant}/${namespace}/${topic}")
        .build()
        .await?;

    let mut counter = 0usize;
    loop {
        producer
            .send(format!("Hello-{}", counter))
            .await?
            .await
            .unwrap();

        counter += 1;
        println!("{counter} messages");

        if counter > 10 {
            producer.close().await.expect("Unable to close connection");
            break;
        }
    }

    Ok(())
}

Parameters for OAuth2 authentication

  • ${brokerServiceURL}: the broker service URL of your StreamNative cluster.
  • credentials_url: your downloaded OAuth2 credential. This parameter supports the following two pattern formats:
    • file:///path/to/file: the path to your downloaded OAuth2 credential file.
    • data:application/json;base64,<base64-encoded value>: the credential file content encoded into Base64 format.
  • audience: the Uniform Resource Name (URN), which is a combination of the urn:sn:pulsar, your organization name, and your Pulsar instance name.
  • ${tenant}/${namespace}/${topic}: the full name of the topic for message production & consumption. It is a combination of the tenant name, the namespace name and the topic name.
  • ${subscription}: the name of the subscription that will determine how messages are delivered.
Previous
Python