> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Cluster Metrics

Metrics is a valuable tool for getting visibility into your Cloud deployment. StreamNative Cloud provides a broad range of metrics that you can use to help fine-tune performance and troubleshoot issues.

# Metrics endpoint

StreamNative Cloud provides an endpoint that exposes real-time metrics in [Prometheus metrics format](https://prometheus.io/docs/concepts/data_model/). The following table displays the currently available metrics endpoints.

<Note title="Important">
  Currently, the Cloud Metrics API only exposes resource-related metrics for Pulsar, including Tenants, Namespaces, Topics, Functions, Connectors, and others. System-level metrics are not exposed through this API. These system-level metrics are actively monitored and managed by the StreamNative Cloud team. However, for advanced observability use cases, you might need access to these system-level metrics. To meet this requirement, you can use the [Local Metrics Endpoint](/cloud/log-and-monitor/advanced-observability#local-metrics-endpoint). Please note that the Local Metrics Endpoint is only available for [**BYOC Pro** clusters](/cloud/clusters/cluster-types#byoc-pro-clusters).
</Note>

| Endpoint                                                                  | Description                                                  |
| ------------------------------------------------------------------------- | ------------------------------------------------------------ |
| `https://metrics.streamnative.cloud/v1/cloud/metrics/export`              | [Export Pulsar resource metrics](#pulsar-resource-metrics)   |
| `https://metrics.streamnative.cloud/v1/cloud/metrics/kafka/export`        | [Export Kafka resource metrics](#kafka-resource-metrics)     |
| `https://metrics.streamnative.cloud/v1/cloud/metrics/source/export`       | [Export Source connector metrics](#source-connector-metrics) |
| `https://metrics.streamnative.cloud/v1/cloud/metrics/sink/export`         | [Export Sink connector metrics](#sink-connector-metrics)     |
| `https://metrics.streamnative.cloud/v1/cloud/metrics/function/export`     | [Export Function metrics](#function-metrics)                 |
| `https://metrics.streamnative.cloud/v1/cloud/metrics/kafkaconnect/export` | [Export Kafka Connect metrics](#kafka-connect-metrics)       |
| `https://metrics.streamnative.cloud/v1/cloud/metrics/health/export`       | [Export Cluster health metrics](#health-metrics)             |

## Metrics authorization

To access and scrape metrics from the Cloud endpoints, you must use a Super Admin service account or a normal service account with `metrics-viewer` role.

### Super Admin service account

To create a super admin service account, please check the [create a service account](/cloud/security/authentication/service-accounts/service-accounts#create-a-service-account).

### metrics-viewer role

To bind a service account with `metrics-viewer`, your can configure it through `snctl` or `terraform`.

<Tabs>
  <Tab title="snctl">
    * create a normal service account

    ```
    snctl create serviceaccount metrics-account
    ```

    * create role binding with metrics-viewer

    ```
    snctl create rolebinding metrics-viewer --serviceaccount metrics-account --clusterrole metrics-viewer
    ```

    * In case you want to remove the permission to list metrics you can delete the rolebinding

    ```
    snctl delete rolebinding metrics-viewer
    ```
  </Tab>

  <Tab title="terraform">
    * Add a `streamnative_role_binding` resource in your terraform manifest file

    ```
    terraform {
      required_providers {
        streamnative = {
          source = "streamnative/streamnative"
        }
      }
    }

    provider "streamnative" {
        # Please replace path use your own key file path
        key_file_path = "/path/to/your/service/account/key.json"
    }

    resource "streamnative_service_account" "metrics-account" {
      organization = "xxxx"
      name = "metrics-account"
      admin = false
    }

    resource "streamnative_role_binding" "metrics-viewer" {
      organization = "xxxx"
      name         = "metrics-viewer"
      cluster_role_name = "metrics-viewer"
      service_account_names = ["metrics-account"]
    }
    ```

    * Run the terraform command to apply

    ```
    terrafrom apply --auto-approve
    ```
  </Tab>

  <Tab title="Console">
    This is not supported yet but will be available soon.
  </Tab>
</Tabs>

## Pulsar resource metrics

| Name                                    | Type      | Description                                                                                                                                       |
| --------------------------------------- | --------- | ------------------------------------------------------------------------------------------------------------------------------------------------- |
| pulsar\_topics\_count                   | Gauge     | The number of Pulsar topics of the namespace owned by this broker.                                                                                |
| pulsar\_subscriptions\_count            | Gauge     | The number of Pulsar subscriptions of the topic served by this broker.                                                                            |
| pulsar\_producers\_count                | Gauge     | The number of active producers of the topic connected to this broker.                                                                             |
| pulsar\_consumers\_count                | Gauge     | The number of active consumers of the topic connected to this broker.                                                                             |
| pulsar\_rate\_in                        | Gauge     | The total message rate of the namespace coming into this broker (message/second).                                                                 |
| pulsar\_rate\_out                       | Gauge     | The total message rate of the namespace going out from this broker (message/second).                                                              |
| pulsar\_throughput\_in                  | Gauge     | The total throughput of the topic coming into this broker (byte per second).                                                                      |
| pulsar\_throughput\_out                 | Gauge     | The total throughput of the topic going out from this broker (byte per second).                                                                   |
| pulsar\_storage\_size                   | Gauge     | The total storage size of the topics in this topic owned by this broker (bytes).                                                                  |
| pulsar\_storage\_backlog\_size          | Gauge     | The total backlog size of the topics of this topic owned by this broker (in bytes).                                                               |
| pulsar\_storage\_offloaded\_size        | Gauge     | The total amount of the data in this topic offloaded to the tiered storage (bytes).                                                               |
| pulsar\_storage\_write\_rate            | Gauge     | The total message batches (entries) written to the storage for this topic (message batch per second).                                             |
| pulsar\_storage\_read\_rate             | Gauge     | The total message batches (entries) read from the storage for this topic (message batch per second).                                              |
| pulsar\_subscription\_delayed           | Gauge     | The total message batches (entries) are delayed for dispatching.                                                                                  |
| pulsar\_broker\_publish\_latency        | Summary   | The total latency of pulsar broker publish.                                                                                                       |
| pulsar\_broker\_storage\_read\_rate     | Gauge     | The total message batches (entries) read from the storage for this broker (message batch per second).                                             |
| pulsar\_broker\_storage\_write\_rate    | Gauge     | The total message batches (entries) written to the storage for this broker (message batch per second).                                            |
| pulsar\_entry\_size\_le\_\*             | Histogram | The entry rate of a namespace that the entry size is smaller with a given thresholds(128 bytes,512 bytes,1 KB,2 KB,4 KB,16 KB,100 KB,1 MB,>1 MB). |
| pulsar\_in\_bytes\_total                | Counter   | The total number of messages in bytes received for this topic.                                                                                    |
| pulsar\_msg\_backlog                    | Gauge     | The total number of message backlogs in this broker (entries).                                                                                    |
| pulsar\_storage\_write\_latency\_le\_\* | Histogram | The entry rate of a namespace that the storage write latency is smaller with a given threshold(0.5ms,1ms,5ms,10ms,20ms,50ms,100ms,200ms,1s,>1s).  |
| pulsar\_subscription\_back\_log         | Gauge     | The number of entries (messages/batched-messages) in unacknowledged state for a subscription.                                                     |

## Kafka resource metrics

| Name                                | Type    | Description                                                                                                                                                                                                                                                                                             |
| ----------------------------------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| kop\_server\_MESSAGE\_IN            | Counter | The producer message in stats. <br /> Available labels: *topic*, *partition*. <br /> <ul><li>*topic*: the topic name to produce.</li><li>*partition*: the partition id for the topic to produce</li></ul>                                                                                               |
| kop\_server\_MESSAGE\_OUT           | Counter | The consumer message out stats. <br /> Available labels: *topic*, *partition*, *group*. <br /> <ul><li>*topic*: the topic name to consume.</li><li>*partition*: the partition id for the topic to consume</li><li>*group*: the group id for consumer to consumer message from topic-partition</li></ul> |
| kop\_server\_BYTES\_IN              | Counter | The producer bytes in stats. <br /> Available labels: *topic*, *partition*. <br /> <ul><li>*topic*: the topic name to produce.</li><li>*partition*: the partition id for the topic to produce</li></ul>                                                                                                 |
| kop\_server\_BYTES\_OUT             | Counter | The consumer bytes out stats. <br /> Available labels: *topic*, *partition*, *group*. <br /> <ul><li>*topic*: the topic name to consume.</li><li>*partition*: the partition id for the topic to consume</li><li>*group*: the group id for consumer to consumer message from topic-partition</li></ul>   |
| kop\_server\_ACTIVE\_CHANNEL\_COUNT | Gauge   | The number of active connections                                                                                                                                                                                                                                                                        |
| kop\_server\_LAG                    | Gauge   | The consumer lag stats. <br /> Available labels: *topic*, *partition*, *group*. <br /> <ul><li>*topic*: the topic name to consume.</li><li>*partition*: the partition id for the topic to consume</li><li>*group*: the group id for consumer to consumer message from topic-partition</li></ul>         |

## Source connector metrics

| Name                                            | Type    | Description                                                                |
| ----------------------------------------------- | ------- | -------------------------------------------------------------------------- |
| pulsar\_source\_written\_total                  | Counter | The total number of records written to a Pulsar topic                      |
| pulsar\_source\_written\_1min\_total            | Counter | The total number of records written to a Pulsar topic in the last 1 minute |
| pulsar\_source\_received\_total                 | Counter | The total number of records received from source                           |
| pulsar\_source\_received\_1min\_total           | Counter | The total number of records received from source in the last 1 minute      |
| pulsar\_source\_last\_invocation                | Gauge   | The timestamp of the last invocation of the source                         |
| pulsar\_source\_source\_exception               | Gauge   | The exception from a source                                                |
| pulsar\_source\_source\_exceptions\_total       | Counter | The total number of source exceptions                                      |
| pulsar\_source\_source\_exceptions\_1min\_total | Counter | The total number of source exceptions in the last 1 minute                 |
| pulsar\_source\_system\_exception               | Gauge   | The exception from system code                                             |
| pulsar\_source\_system\_exceptions\_total       | Counter | The total number of system exceptions                                      |
| pulsar\_source\_system\_exceptions\_1min\_total | Counter | The total number of system exceptions in the last 1 minute                 |
| pulsar\_source\_user\_metric\_\*                | Summary | The user-defined metrics                                                   |
| process\_cpu\_seconds\_total                    | Counter | Total user and system CPU time spent in seconds.                           |
| jvm\_memory\_bytes\_committed                   | Gauge   | Committed (bytes) of a given JVM memory area.                              |
| jvm\_memory\_bytes\_max                         | Gauge   | Max (bytes) of a given JVM memory area.                                    |
| jvm\_memory\_direct\_bytes\_used                | Gauge   | Used bytes of a given JVM memory area.                                     |
| jvm\_memory\_bytes\_init                        | Gauge   | Initial bytes of a given JVM memory area.                                  |
| jvm\_gc\_collection\_seconds\_sum               | Summary | Time spent in a given JVM garbage collector in seconds.                    |

## Sink connector metrics

| Name                                          | Type    | Description                                                                |
| --------------------------------------------- | ------- | -------------------------------------------------------------------------- |
| pulsar\_sink\_written\_total                  | Counter | The total number of records written to a Pulsar topic                      |
| pulsar\_sink\_written\_1min\_total            | Counter | The total number of records written to a Pulsar topic in the last 1 minute |
| pulsar\_sink\_received\_total                 | Counter | The total number of records received from sink                             |
| pulsar\_sink\_received\_1min\_total           | Counter | The total number of records received from sink in the last 1 minute        |
| pulsar\_sink\_last\_invocation                | Gauge   | The timestamp of the last invocation of the sink                           |
| pulsar\_sink\_sink\_exception                 | Gauge   | The exception from a sink                                                  |
| pulsar\_sink\_sink\_exceptions\_total         | Counter | The total number of sink exceptions                                        |
| pulsar\_sink\_sink\_exceptions\_1min\_total   | Counter | The total number of sink exceptions in the last 1 minute                   |
| pulsar\_sink\_system\_exception               | Gauge   | The exception from system code                                             |
| pulsar\_sink\_system\_exceptions\_total       | Counter | The total number of system exceptions                                      |
| pulsar\_sink\_system\_exceptions\_1min\_total | Counter | The total number of system exceptions in the last 1 minute                 |
| pulsar\_sink\_user\_metric\_\*                | Summary | The user-defined metrics                                                   |
| process\_cpu\_seconds\_total                  | Counter | Total user and system CPU time spent in seconds.                           |
| jvm\_memory\_bytes\_committed                 | Gauge   | Committed (bytes) of a given JVM memory area.                              |
| jvm\_memory\_bytes\_max                       | Gauge   | Max (bytes) of a given JVM memory area.                                    |
| jvm\_memory\_direct\_bytes\_used              | Gauge   | Used bytes of a given JVM memory area.                                     |
| jvm\_memory\_bytes\_init                      | Gauge   | Initial bytes of a given JVM memory area.                                  |
| jvm\_gc\_collection\_seconds\_sum             | Summary | Time spent in a given JVM garbage collector in seconds.                    |

## Function metrics

| Name                                                   | Type    | Description                                                                   |
| ------------------------------------------------------ | ------- | ----------------------------------------------------------------------------- |
| pulsar\_function\_processed\_successfully\_total       | Counter | The total number of messages processed successfully                           |
| pulsar\_function\_processed\_successfully\_1min\_total | Counter | The total number of messages processed successfully in the last 1 minute      |
| pulsar\_function\_system\_exceptions\_total            | Counter | The total number of system exceptions                                         |
| pulsar\_function\_system\_exceptions\_1min\_total      | Counter | The total number of system exceptions in the last 1 minute                    |
| pulsar\_function\_user\_exceptions\_total              | Counter | The total number of user exceptions                                           |
| pulsar\_function\_user\_exceptions\_1min\_total        | Counter | The total number of user exceptions in the last 1 minute                      |
| pulsar\_function\_process\_latency\_ms                 | Summary | The process latency in milliseconds                                           |
| pulsar\_function\_process\_latency\_ms\_1min           | Summary | The process latency in milliseconds in the last 1 minute                      |
| pulsar\_function\_last\_invocation                     | Gauge   | The timestamp of the last invocation of the function                          |
| pulsar\_function\_received\_total                      | Counter | The total number of messages received from source                             |
| pulsar\_function\_received\_1min\_total                | Counter | The total number of messages received from source in the last 1 minute        |
| pulsar\_function\_user\_metric\_\*                     | Summary | The user-defined metrics                                                      |
| process\_cpu\_seconds\_total                           | Counter | Total user and system CPU time spent in seconds.                              |
| jvm\_memory\_bytes\_committed                          | Gauge   | Committed (bytes) of a given JVM memory area. (Java Functions only)           |
| jvm\_memory\_bytes\_max                                | Gauge   | Max (bytes) of a given JVM memory area. (Java Functions only)                 |
| jvm\_memory\_direct\_bytes\_used                       | Gauge   | Used bytes of a given JVM memory area. (Java Functions only)                  |
| jvm\_memory\_bytes\_init                               | Gauge   | Initial bytes of a given JVM memory area. (Java Functions only)               |
| jvm\_gc\_collection\_seconds\_sum                      | Summary | Time spent in a given JVM garbage collector in seconds. (Java Functions only) |

## Kafka Connect metrics

| Name                                                                 | Type    | Description                                                                                                                                                                        |
| -------------------------------------------------------------------- | ------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| kafka\_connect\_connector\_task\_batch\_size\_avg                    | Gauge   | The average size of the batches processed by the connector                                                                                                                         |
| kafka\_connect\_connector\_task\_batch\_size\_max                    | Gauge   | The maximum size of the batches processed by the connector                                                                                                                         |
| kafka\_connect\_connector\_task\_offset\_commit\_avg\_time\_ms       | Gauge   | The average time in milliseconds taken by this task to commit offsets                                                                                                              |
| kafka\_connect\_connector\_task\_offset\_commit\_failure\_percentage | Gauge   | The average percentage of this task's offset commit attempts that failed                                                                                                           |
| kafka\_connect\_connector\_task\_offset\_commit\_max\_time\_ms       | Gauge   | The maximum time in milliseconds taken by this task to commit offsets                                                                                                              |
| kafka\_connect\_connector\_task\_offset\_commit\_success\_percentage | Gauge   | The average percentage of this task's offset commit attempts that succeeded                                                                                                        |
| kafka\_connect\_connector\_task\_pause\_ratio                        | Gauge   | The fraction of time this task has spent in the pause state                                                                                                                        |
| kafka\_connect\_connector\_task\_running\_ratio                      | Gauge   | The fraction of time this task has spent in the running state                                                                                                                      |
| kafka\_connect\_source\_task\_source\_record\_poll                   | Gauge   | The total number of records produced/polled (before transformation) by this task belonging to the named source connector in this worker                                            |
| kafka\_connect\_source\_task\_source\_record\_poll\_rate             | Gauge   | The average per-second number of records produced/polled (before transformation) by this task belonging to the named source connector in this worker                               |
| kafka\_connect\_source\_task\_source\_record\_write                  | Gauge   | The number of records output from the transformations and written to Kafka for this task belonging to the named source connector in this worker, since the task was last restarted |
| kafka\_connect\_source\_task\_source\_record\_write\_rate            | Gauge   | The average per-second number of records output from the transformations and written to Kafka for this task belonging to the named source connector in this worker                 |
| kafka\_connect\_source\_task\_poll\_batch\_avg\_time\_ms             | Gauge   | The average time in milliseconds taken by this task to poll for a batch of source records                                                                                          |
| kafka\_connect\_source\_task\_poll\_batch\_max\_time\_ms             | Gauge   | The maximum time in milliseconds taken by this task to poll for a batch of source records                                                                                          |
| kafka\_connect\_source\_task\_source\_record\_active\_count          | Gauge   | The number of records that have been produced by this task but not yet completely written to Kafka                                                                                 |
| kafka\_connect\_source\_task\_source\_record\_active\_count\_avg     | Gauge   | The average number of records that have been produced by this task but not yet completely written to Kafka                                                                         |
| kafka\_connect\_source\_task\_source\_record\_active\_count\_max     | Gauge   | The maximum number of records that have been produced by this task but not yet completely written to Kafka                                                                         |
| kafka\_connect\_sink\_task\_offset\_commit\_completion               | Gauge   | The total number of offset commit completions that were completed successfully                                                                                                     |
| kafka\_connect\_sink\_task\_offset\_commit\_completion\_rate         | Gauge   | The average per-second number of offset commit completions that were completed successfully                                                                                        |
| kafka\_connect\_sink\_task\_offset\_commit\_seq\_no                  | Gauge   | The current sequence number for offset commits                                                                                                                                     |
| kafka\_connect\_sink\_task\_offset\_commit\_skip                     | Gauge   | The total number of offset commit completions that were received too late and skipped/ignored                                                                                      |
| kafka\_connect\_sink\_task\_offset\_commit\_skip\_rate               | Gauge   | The average per-second number of offset commit completions that were received too late and skipped/ignored                                                                         |
| kafka\_connect\_sink\_task\_partition\_count                         | Gauge   | The number of topic partitions assigned to this task belonging to the named sink connector in this worker                                                                          |
| kafka\_connect\_sink\_task\_put\_batch\_avg\_time\_ms                | Gauge   | The average time taken by this task to put a batch of sinks records                                                                                                                |
| kafka\_connect\_sink\_task\_put\_batch\_max\_time\_ms                | Gauge   | The maximum time taken by this task to put a batch of sinks records                                                                                                                |
| kafka\_connect\_sink\_task\_sink\_record\_active\_count              | Gauge   | The number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task                                                        |
| kafka\_connect\_sink\_task\_sink\_record\_active\_count\_avg         | Gauge   | The average number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task                                                |
| kafka\_connect\_sink\_task\_sink\_record\_active\_count\_max         | Gauge   | The maximum number of records that have been read from Kafka but not yet completely committed/flushed/acknowledged by the sink task                                                |
| kafka\_connect\_sink\_task\_sink\_record\_read                       | Gauge   | The total number of records read from Kafka by this task belonging to the named sink connector in this worker, since the task was last restarted                                   |
| kafka\_connect\_sink\_task\_sink\_record\_read\_rate                 | Gauge   | The average per-second number of records read from Kafka for this task belonging to the named sink connector in this worker. This is before transformations are applied            |
| kafka\_connect\_sink\_task\_sink\_record\_send                       | Gauge   | The total number of records output from the transformations and sent/put to this task belonging to the named sink connector in this worker, since the task was last restarted      |
| kafka\_connect\_sink\_task\_sink\_record\_send\_rate                 | Gauge   | The average per-second number of records output from the transformations and sent/put to this task belonging to the named sink connector in this worker                            |
| kafka\_connect\_task\_error\_deadletterqueue\_produce\_failures      | Gauge   | The number of failed writes to the dead letter queue                                                                                                                               |
| kafka\_connect\_task\_error\_deadletterqueue\_produce\_requests      | Gauge   | The number of attempted writes to the dead letter queue                                                                                                                            |
| kafka\_connect\_task\_error\_last\_error\_timestamp                  | Gauge   | The epoch timestamp when this task last encountered an error                                                                                                                       |
| kafka\_connect\_task\_error\_total\_errors\_logged                   | Gauge   | The total number of errors that were logged                                                                                                                                        |
| kafka\_connect\_task\_error\_total\_record\_errors                   | Gauge   | The total number of record processing errors in this task                                                                                                                          |
| kafka\_connect\_task\_error\_total\_record\_failures                 | Gauge   | The total number of record processing failures in this task                                                                                                                        |
| kafka\_connect\_task\_error\_total\_records\_skipped                 | Gauge   | The total number of records skipped due to errors                                                                                                                                  |
| kafka\_connect\_task\_error\_total\_retries                          | Gauge   | The total number of operations retried                                                                                                                                             |
| kafka\_connect\_worker\_connector\_destroyed\_task\_count            | Gauge   | The number of destroyed tasks of the connector on the worker                                                                                                                       |
| kafka\_connect\_worker\_connector\_failed\_task\_count               | Gauge   | The number of failed tasks of the connector on the worker                                                                                                                          |
| kafka\_connect\_worker\_connector\_paused\_task\_count               | Gauge   | The number of paused tasks of the connector on the worker                                                                                                                          |
| kafka\_connect\_worker\_connector\_restarting\_task\_count           | Gauge   | The number of restarting tasks of the connector on the worker                                                                                                                      |
| kafka\_connect\_worker\_connector\_running\_task\_count              | Gauge   | The number of running tasks of the connector on the worker                                                                                                                         |
| kafka\_connect\_worker\_connector\_total\_task\_count                | Gauge   | The number of tasks of the connector on the worker                                                                                                                                 |
| kafka\_connect\_worker\_connector\_unassigned\_task\_count           | Gauge   | The number of unassigned tasks of the connector on the worker                                                                                                                      |
| process\_cpu\_seconds\_total                                         | Counter | Total user and system CPU time spent in seconds                                                                                                                                    |
| jvm\_memory\_committed\_bytes                                        | Gauge   | Committed (bytes) of a given JVM memory area                                                                                                                                       |
| jvm\_memory\_max\_bytes                                              | Gauge   | Max (bytes) of a given JVM memory area                                                                                                                                             |
| jvm\_memory\_init\_bytes                                             | Gauge   | Initial bytes of a given JVM memory area                                                                                                                                           |
| jvm\_memory\_used\_bytes                                             | Gauge   | Used bytes of a given JVM memory area                                                                                                                                              |
| jvm\_gc\_collection\_seconds\_sum                                    | Summary | Time spent in a given JVM garbage collector in seconds                                                                                                                             |

## Health metrics

| Name                                          | Type    | Description                                                                                               |
| --------------------------------------------- | ------- | --------------------------------------------------------------------------------------------------------- |
| pulsar\_detector\_e2e\_latency\_ms            | Summary | The latency distribution from message sending to message consumption                                      |
| pulsar\_detector\_publish\_latency\_ms        | Summary | The latency distribution of message sending                                                               |
| pulsar\_detector\_pulsar\_sla\_messaging\_up  | Gauge   | The gauge for indicating the messaging service up or down                                                 |
| pulsar\_detector\_pulsar\_sla\_webservice\_up | gauge   | The gauge for indicating the webservice up or down                                                        |
| pulsar\_detector\_geo\_latency\_ms            | Summary | The latency distribution Latency distribution from message sending to message consumption across clusters |

# Metrics API integration

<Note title="Note">
  The examples below demonstrate how to configure your observability tool to scrape the metrics endpoint. While StreamNative Cloud provides the metrics endpoint, it is your responsibility to set up and manage your own observability stack.
</Note>

## Prometheus integration

To collect Pulsar metrics into Prometheus, add the following to your Prometheus configuration file. The bearer tokens have a limited life cycle, therefore it is recommended to use the OAuth2 authentication method.

```yaml theme={null}
global:
  scrape_interval: 120s
  scrape_timeout: 60s
scrape_configs:
  - job_name: streamnative
    metrics_path: /v1/cloud/metrics/export
    scheme: https
    oauth2:
      client_id: '${client_id}'
      client_secret: '${client_secret}'
      token_url: https://auth.streamnative.cloud/oauth/token
      endpoint_params:
        grant_type: 'client_credentials'
        audience: '${audience}'
    static_configs:
      - targets: [metrics.streamnative.cloud]
```

You can find the values of `client_id` and `client_secret` in the `Key` file of a Super Admin Service Account. For more information, see [work with service accounts](/cloud/security/authentication/service-accounts/service-accounts).

The `audience` parameter is the [Uniform Resource Name (URN)](/cloud/references/glossary#urn), which is a combination of the `urn:sn:pulsar`, the organization name, and the Pulsar instance name at StreamNative:

```bash theme={null}
"urn:sn:pulsar:${org_name}:${instance_name}"
```

<Note>
  The Prometheus response can be large, if your cluster has a lot of topics. Make sure to set the `scrape_timeout` parameter large enough to cover the duration of the curl request above.
  Your `scrape_interval` parameter should also be larger than your `scrape_timeout` parameter.
</Note>

## OpenTelemetry collector integration

The [OpenTelemetry collector](https://opentelemetry.io/docs/collector/getting-started/), as described on its official page, is a vendor-agnostic agent process designed for gathering and sending telemetry data from various sources. StreamNative Cloud, which outputs its metrics in the Prometheus format, is compatible with the OpenTelemetry collector. To collect metrics from StreamNative Cloud, configure your OpenTelemetry collector to utilize the [Prometheus Receiver](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/receiver/prometheusreceiver), which is fully compatible with Prometheus's scape\_config settings.

To configure your collector, refer to the guidance provided in the [Prometheus Integration section](#prometheus-integration). There, you will find instructions to create a `scape_config` for collecting metrics from StreamNative Cloud. This config should be placed in your collector's configuration file under the following section:

```yaml theme={null}
receivers:
  prometheus:
    config:
```

An example of such configuration is as follows:

```yaml theme={null}
receivers:
  prometheus:
    config:
      scrape_configs:
        - job_name: streamnative
          metrics_path: /v1/cloud/metrics/export
          scheme: https
          oauth2:
            client_id: '${client_id}'
            client_secret: '${client_secret}'
            token_url: https://auth.streamnative.cloud/oauth/token
            endpoint_params:
              grant_type: 'client_credentials'
              audience: '${audience}'
          static_configs:
            - targets: [metrics.streamnative.cloud]
```

The OpenTelemetry collector's versatility allows it to support a range of exporters, facilitating the routing of metrics from StreamNative Cloud to various observability platforms. A comprehensive list of supported exporters by the OpenTelemetry collector is available [here](https://opentelemetry.io/docs/collector/configuration/#exporters).

## NewRelic integration

You can use a Prometheus instance to forward metrics to NewRelic. To do this, add a `remote_write` entry to the `prometheus.yml` configuration file as described [in the Prometheus Integration section](#prometheus-integration):

```yml theme={null}
remote_write:
  - url: https://metric-api.newrelic.com/prometheus/v1/write?prometheus_server=streamnative
    authorization:
      credentials: '${newrelic_ingest_key}'
```

<Note title="Note">
  The NewRelic ingestion point could also be `metric-api.eu.newrelic.com` depending on your account configuration.
</Note>

Then by running a Prometheus instance, the Pulsar metrics are scraped from the StreamNative endpoint and forwarded to NewRelic:

```bash theme={null}
prometheus --config.file=prometheus.yml
```

If you want to keep data from going into this Prometheus instance, you can setup a short retention time with the `storage.tsdb.retention.time` parameter:

```bash theme={null}
prometheus --config.file=prometheus.yml --storage.tsdb.retention.time=15m
```

## Grafana Cloud integration

You can use a Prometheus instance to forward metrics to Grafana Cloud. To do this, add a `remote_write` entry to the `prometheus.yml` configuration file as described [in the Prometheus Integration section](#prometheus-integration):

```yml theme={null}
remote_write:
  - url: ${grafana_cloud_endpoint}/api/prom/push
    basic_auth:
      username: '${grafana_cloud_username}'
      password: '${grafana_cloud_api_key}'
```

You can find the `grafana_cloud_endpoint` and `grafana_cloud_username` values by selecting Prometheus at `https://grafana.com/orgs/${grafana_org}`. You can find `grafana_cloud_api_key` at `https://grafana.com/orgs/${grafana_org}/api-keys`.

Then by running a Prometheus instance, the Pulsar metrics are scraped from the StreamNative endpoint and forwarded to Grafana Cloud:

```bash theme={null}
prometheus --config.file=prometheus.yml
```

If you want to keep data from going into this Prometheus instance, you can setup a short retention time with the `storage.tsdb.retention.time` parameter:

```bash theme={null}
prometheus --config.file=prometheus.yml --storage.tsdb.retention.time=15m
```

## Datadog integration

### Integrate with Datadog Agent

<Note title="Note">
  The integration with StreamNative Cloud requires the [PR 16812](https://github.com/DataDog/integrations-core/pull/16812) which released in the Datadog Agent [7.52.0](https://github.com/DataDog/datadog-agent/releases/tag/7.52.0).
</Note>

Using Datadog Agent, you can connect Datadog to the StreamNative Cloud Metrics endpoint to start collecting metrics. Datadog Agent supports most platform to host and this documentation will mainly to demonstrate with Docker and Kubernetes.

<Tabs>
  <Tab title="Docker">
    Create a file `conf.yaml`, with the spec of your Datadog Agent deployment configuration.

    ```yaml theme={null}
    init_config:
      service: docker

    instances:
      - openmetrics_endpoint: https://metrics.streamnative.cloud/v1/cloud/metrics/export
        request_size: 900
        min_collection_interval: 180
        metrics:
          - pulsar_topics_count:
              type: gauge
              name: pulsar_topics_count
          - pulsar_subscriptions_count:
              type: gauge
              name: pulsar_subscriptions_count
          - pulsar_producers_count:
              type: gauge
              name: pulsar_producers_count
          - pulsar_consumers_count:
              type: gauge
              name: pulsar_consumers_count
          - pulsar_rate_in:
              type: gauge
              name: pulsar_rate_in
          - pulsar_rate_out:
              type: gauge
              name: pulsar_rate_out
          - pulsar_throughput_in:
              type: gauge
              name: pulsar_throughput_in
          - pulsar_throughput_out:
              type: gauge
              name: pulsar_throughput_out
          - pulsar_storage_size:
              type: gauge
              name: pulsar_storage_size
          - pulsar_storage_backlog_size:
              type: gauge
              name: pulsar_storage_backlog_size
          - pulsar_storage_offloaded_size:
              type: gauge
              name: pulsar_storage_offloaded_size
          - pulsar_storage_read_rate:
              type: gauge
              name: pulsar_storage_read_rate
          - pulsar_subscription_delayed:
              type: gauge
              name: pulsar_subscription_delayed
          - pulsar_storage_write_latency_le_0_5:
              type: histogram
              name: pulsar_storage_write_latency_le_0_5
          - pulsar_storage_write_latency_le_1:
              type: histogram
              name: pulsar_storage_write_latency_le_1
          - pulsar_storage_write_latency_le_5:
              type: histogram
              name: pulsar_storage_write_latency_le_5
          - pulsar_storage_write_latency_le_10:
              type: histogram
              name: pulsar_storage_write_latency_le_10
          - pulsar_storage_write_latency_le_20:
              type: histogram
              name: pulsar_storage_write_latency_le_20
          - pulsar_storage_write_latency_le_50:
              type: histogram
              name: pulsar_storage_write_latency_le_50
          - pulsar_storage_write_latency_le_100:
              type: histogram
              name: pulsar_storage_write_latency_le_100
          - pulsar_storage_write_latency_le_200:
              type: histogram
              name: pulsar_storage_write_latency_le_200
          - pulsar_storage_write_latency_le_1000:
              type: histogram
              name: pulsar_storage_write_latency_le_1000
          - pulsar_storage_write_latency_le_overflow:
              type: histogram
              name: pulsar_storage_write_latency_le_overflow
          - pulsar_entry_size_le_128:
              type: histogram
              name: pulsar_entry_size_le_128
          - pulsar_entry_size_le_512:
              type: histogram
              name: pulsar_entry_size_le_512
          - pulsar_entry_size_le_1_kb:
              type: histogram
              name: pulsar_entry_size_le_1_kb
          - pulsar_entry_size_le_4_kb:
              type: histogram
              name: pulsar_entry_size_le_4_kb
          - pulsar_entry_size_le_16_kb:
              type: histogram
              name: pulsar_entry_size_le_16_kb
        auth_token:
          reader:
            type: oauth
            url: https://auth.streamnative.cloud/oauth/token
            client_id: { your-admin-service-account-client-id }
            client_secret: { your-admin-service-account-client-secret }
            options:
              audience: urn:sn:pulsar:{your-organization}:{your-instance}
          writer:
            type: header
            name: Authorization
            value: Bearer <TOKEN>
            placeholder: <TOKEN>
    ```

    * \[1] `client_id`: Required. You need to prepare a [service account](/cloud/security/authentication/service-accounts/service-accounts) with Super Admin pemision and the `client_id` can be obtained from an [OAuth2 credential file](/cloud/security/authentication/service-accounts/service-accounts#get-a-key-file).
    * \[2] `client_secret`: Required. You need to prepare a [service account](/cloud/security/authentication/service-accounts/service-accounts) with Super Admin pemision andt the `client_id` can be obtained from an [OAuth2 credential file](/cloud/security/authentication/service-accounts/service-accounts#get-a-key-file).
    * \[3] `audience`: Required. Audience is the [Uniform Resource Name (URN)](/cloud/references/glossary#urn), which is a combination of the `urn:sn:pulsar`, your organization name, and your Pulsar instance name. `{organization}` is the name of your [organization](/cloud/references/glossary#organization) and the `{instance}` is the name of your [instance](/cloud/references/glossary#instance).

    Run the docker commands to create a Datadog Agent container:

    ```bash theme={null}
    docker run -d --name dd-agent \
    -e DD_API_KEY={ your-Datadog-API-Key } \
    -e DD_SITE={ your-Datadog-Site-region } \
    -e DD_APM_NON_LOCAL_TRAFFIC=true \
    -v {your-config-yaml-file-path}:/etc/datadog-agent/conf.d/openmetrics.d/conf.yaml:ro \
    -v /var/run/docker.sock:/var/run/docker.sock:ro \
    -v /proc/:/host/proc/:ro \
    -v /sys/fs/cgroup/:/host/sys/fs/cgroup:ro \
    -v /var/lib/docker/containers:/var/lib/docker/containers:ro \
    datadog/agent:7.52.0
    ```

    * \[1] `DD_API_KEY`: Your Datadog API key.
    * \[2] `DD_SITE`: Destination site for your metrics, traces, and logs. Set your Datadog site to: `datadoghq.com`. Defaults to `datadoghq.com`.
    * \[3] `your-config-yaml-file-path`: The `conf.yaml` configuration file created in the first step.

    More detailed usage please refer the [Docker Agent for Docker](https://docs.datadoghq.com/containers/docker/?tab=standard).
  </Tab>

  <Tab title="Kubernetes">
    This documentation will use the [Datadog Operator](https://docs.datadoghq.com/containers/datadog_operator/) to demonstrate.

    Install the Datadog Operator

    ```bash theme={null}
    helm repo add datadog https://helm.datadoghq.com
    helm install datadog-operator datadog/datadog-operator
    ```

    Create a Kubernetes secret with your API and app keys

    ```bash theme={null}
    kubectl create secret generic datadog-secret --from-literal api-key=<DATADOG_API_KEY> --from-literal app-key=<DATADOG_APP_KEY>

    ```

    * \[1] `DATADOG_API_KEY`: Your Datadog API key.
    * \[2] `DATADOG_APP_KEY`: Your Datadog Application key.

    Create a file `datadog-agent.yaml`, with the spec of your Datadog Agent deployment configuration.

    ```yaml theme={null}
    apiVersion: datadoghq.com/v2alpha1
    kind: DatadogAgent
    metadata:
      namespace: datadog
      name: datadog-agent
    spec:
      global:
        kubelet:
          tlsVerify: false
        site: datadoghq.com
        credentials:
          apiSecret:
            secretName: datadog-secret
            keyName: api-key
          appSecret:
            secretName: datadog-secret
            keyName: app-key
      override:
        nodeAgent:
          image:
            name: gcr.io/datadoghq/agent:7.52.0
          extraConfd:
            configDataMap:
              openmetrics.yaml: |-
                init_config:
                  service: datadog_operator
                instances:
                  - openmetrics_endpoint: https://metrics.streamnative.cloud/v1/cloud/metrics/export
                    request_size: 900
                    min_collection_interval: 180
                    metrics:
                      - pulsar_topics_count:
                          type: gauge
                          name: pulsar_topics_count
                      - pulsar_subscriptions_count:
                          type: gauge
                          name: pulsar_subscriptions_count
                      - pulsar_producers_count:
                          type: gauge
                          name: pulsar_producers_count
                      - pulsar_consumers_count:
                          type: gauge
                          name: pulsar_consumers_count
                      - pulsar_rate_in:
                          type: gauge
                          name: pulsar_rate_in
                      - pulsar_rate_out:
                          type: gauge
                          name: pulsar_rate_out
                      - pulsar_throughput_in:
                          type: gauge
                          name: pulsar_throughput_in
                      - pulsar_throughput_out:
                          type: gauge
                          name: pulsar_throughput_out
                      - pulsar_storage_size:
                          type: gauge
                          name: pulsar_storage_size
                      - pulsar_storage_backlog_size:
                          type: gauge
                          name: pulsar_storage_backlog_size
                      - pulsar_storage_offloaded_size:
                          type: gauge
                          name: pulsar_storage_offloaded_size
                      - pulsar_storage_read_rate:
                          type: gauge
                          name: pulsar_storage_read_rate
                      - pulsar_subscription_delayed:
                          type: gauge
                          name: pulsar_subscription_delayed
                      - pulsar_storage_write_latency_le_0_5:
                          type: histogram
                          name: pulsar_storage_write_latency_le_0_5
                      - pulsar_storage_write_latency_le_1:
                          type: histogram
                          name: pulsar_storage_write_latency_le_1
                      - pulsar_storage_write_latency_le_5:
                          type: histogram
                          name: pulsar_storage_write_latency_le_5
                      - pulsar_storage_write_latency_le_10:
                          type: histogram
                          name: pulsar_storage_write_latency_le_10
                      - pulsar_storage_write_latency_le_20:
                          type: histogram
                          name: pulsar_storage_write_latency_le_20
                      - pulsar_storage_write_latency_le_50:
                          type: histogram
                          name: pulsar_storage_write_latency_le_50
                      - pulsar_storage_write_latency_le_100:
                          type: histogram
                          name: pulsar_storage_write_latency_le_100
                      - pulsar_storage_write_latency_le_200:
                          type: histogram
                          name: pulsar_storage_write_latency_le_200
                      - pulsar_storage_write_latency_le_1000:
                          type: histogram
                          name: pulsar_storage_write_latency_le_1000
                      - pulsar_storage_write_latency_le_overflow:
                          type: histogram
                          name: pulsar_storage_write_latency_le_overflow
                      - pulsar_entry_size_le_128:
                          type: histogram
                          name: pulsar_entry_size_le_128
                      - pulsar_entry_size_le_512:
                          type: histogram
                          name: pulsar_entry_size_le_512
                      - pulsar_entry_size_le_1_kb:
                          type: histogram
                          name: pulsar_entry_size_le_1_kb
                      - pulsar_entry_size_le_4_kb:
                          type: histogram
                          name: pulsar_entry_size_le_4_kb
                      - pulsar_entry_size_le_16_kb:
                          type: histogram
                          name: pulsar_entry_size_le_16_kb
                    auth_token:
                      reader:
                        type: oauth
                        url: https://auth.streamnative.cloud/oauth/token
                        client_id: { your-admin-service-account-client-id }
                        client_secret: { your-admin-service-account-client-secret }
                        options:
                          audience: urn:sn:pulsar:{your-organization}:{your-instance}
                      writer:
                        type: header
                        name: Authorization
                        value: Bearer <TOKEN>
                        placeholder: <TOKEN>
    ```

    * \[1] `client_id`: Required. You need to prepare a [service account](/cloud/security/authentication/service-accounts/service-accounts) with Super Admin pemision and the `client_id` can be obtained from an [OAuth2 credential file](/cloud/security/authentication/service-accounts/service-accounts#get-a-key-file).
    * \[2] `client_secret`: Required. You need to prepare a [service account](/cloud/security/authentication/service-accounts/service-accounts) with Super Admin pemision andt the `client_id` can be obtained from an [OAuth2 credential file](/cloud/security/authentication/service-accounts/service-accounts#get-a-key-file).
    * \[3] `audience`: Required. Audience is the [Uniform Resource Name (URN)](/cloud/references/glossary#urn), which is a combination of the `urn:sn:pulsar`, your organization name, and your Pulsar instance name. `{organization}` is the name of your [organization](/cloud/references/glossary#organization) and the `{instance}` is the name of your [instance](/cloud/references/glossary#instance).

    Deploy the Datadog Agent with the above configuration file

    ```bash theme={null}
    kubectl apply -f /path/to/your/datadog-agent.yaml
    ```

    More detailed usage please refer the [Install the Datadog Agent on Kubernetes](https://docs.datadoghq.com/containers/kubernetes/installation/).
  </Tab>
</Tabs>

### Bridge with OpenTelemetry

You can use [OpenTelemetry Collector](#opentelemetry-collector-integration) to collect the metrics from StreamNative Cloud and export them to Datadog.

To export metrics to Datadog, you can use the [Datadog Exporter](https://docs.datadoghq.com/opentelemetry/otel_collector_datadog_exporter/) and add it to your [OpenTelemetry Collector configuration](https://opentelemetry.io/docs/collector/configuration/). Use the example file which provides a basic configuration that is ready to use after you set your Datadog API key as the `${DD_API_KEY}` variable:

```yaml theme={null}
receivers:
  prometheus:
    config:
      scrape_configs:
        - job_name: streamnative
          metrics_path: /v1/cloud/metrics/export
          scheme: https
          oauth2:
            client_id: '${client_id}'
            client_secret: '${client_secret}'
            token_url: https://auth.streamnative.cloud/oauth/token
            endpoint_params:
              grant_type: 'client_credentials'
              audience: '${audience}'
          static_configs:
            - targets: [metrics.streamnative.cloud]

processors:
  batch:
    send_batch_max_size: '10MiB'
    send_batch_size: 4096
    timeout: 120s

exporters:
  datadog:
    api:
      site: ${DD_SITE}
      key: ${DD_API_KEY}

service:
  pipelines:
    metrics:
      receivers: [prometheus]
      processors: [batch]
      exporters: [datadog]
```

Where `${DD_SITE}` is your site, .

The above configuration enables the receiving of metrics from StreamNative Cloud, sets up a batch processor, which is mandatory for any non-development environment, and exports to Datadog. You can refer to [this full documented example configuration file](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/exporter/datadogexporter/examples/collector.yaml) for all possible configuration options for Datadog Exporter.
