Cluster Metrics

Metrics is a valuable tool for getting visibility into your Cloud deployment. StreamNative Cloud provides a broad range of metrics that you can use to help fine-tune performance and troubleshoot issues.

Metrics endpoint

StreamNative Cloud proposes an endpoint exposing real-time metrics in Prometheus metrics format. The following table displays the metrics currently exposed.

Note

Cloud Metrics export needs the Super Admin service account.

EndpointDescription
https://metrics.streamnative.cloud/v1/cloud/metrics/exportExport Pulsar resource metrics
https://metrics.streamnative.cloud/v1/cloud/metrics/source/exportExport Source connector metrics
https://metrics.streamnative.cloud/v1/cloud/metrics/sink/exportExport Sink connector metrics
https://metrics.streamnative.cloud/v1/cloud/metrics/function/exportExport Function metrics
https://metrics.streamnative.cloud/v1/cloud/metrics/health/exportExport Cluster health metrics

Pulsar resource metrics

NameTypeDescription
pulsar_topics_countGaugeThe number of Pulsar topics of the namespace owned by this broker.
pulsar_subscriptions_countGaugeThe number of Pulsar subscriptions of the topic served by this broker.
pulsar_producers_countGaugeThe number of active producers of the topic connected to this broker.
pulsar_consumers_countGaugeThe number of active consumers of the topic connected to this broker.
pulsar_rate_inGaugeThe total message rate of the namespace coming into this broker (message/second).
pulsar_rate_outGaugeThe total message rate of the namespace going out from this broker (message/second).
pulsar_throughput_inGaugeThe total throughput of the topic coming into this broker (byte per second).
pulsar_throughput_outGaugeThe total throughput of the topic going out from this broker (byte per second).
pulsar_storage_sizeGaugeThe total storage size of the topics in this topic owned by this broker (bytes).
pulsar_storage_backlog_sizeGaugeThe total backlog size of the topics of this topic owned by this broker (in bytes).
pulsar_storage_offloaded_sizeGaugeThe total amount of the data in this topic offloaded to the tiered storage (bytes).
pulsar_storage_write_rateGaugeThe total message batches (entries) written to the storage for this topic (message batch per second).
pulsar_storage_read_rateGaugeThe total message batches (entries) read from the storage for this topic (message batch per second).
pulsar_subscription_delayedGaugeThe total message batches (entries) are delayed for dispatching.
pulsar_storage_write_latency_le_*HistogramThe entry rate of a topic that the storage write latency is smaller with a given threshold..
Available thresholds:
  • pulsar_storage_write_latency_le_0_5: <= 0.5ms
  • pulsar_storage_write_latency_le_1: <= 1ms
  • pulsar_storage_write_latency_le_5: <= 5ms
  • pulsar_storage_write_latency_le_10: <= 10ms
  • pulsar_storage_write_latency_le_20: <= 20ms
  • pulsar_storage_write_latency_le_50: <= 50ms
  • pulsar_storage_write_latency_le_100: <= 100ms
  • pulsar_storage_write_latency_le_200: <= 200ms
  • pulsar_storage_write_latency_le_1000: <= 1s
  • pulsar_storage_write_latency_le_overflow: > 1s
pulsar_entry_size_le_*HistogramThe entry rate of a topic that the entry size is smaller with a given threshold.
Available thresholds:
  • pulsar_entry_size_le_128: <= 128 bytes
  • pulsar_entry_size_le_512: <= 512 bytes
  • pulsar_entry_size_le_1_kb: <= 1 KB
  • pulsar_entry_size_le_2_kb: <= 2 KB
  • pulsar_entry_size_le_4_kb: <= 4 KB
  • pulsar_entry_size_le_16_kb: <= 16 KB
  • pulsar_entry_size_le_100_kb: <= 100 KB
  • pulsar_entry_size_le_1_mb: <= 1 MB
  • pulsar_entry_size_le_overflow: > 1 MB

Source connector metrics

NameTypeDescription
pulsar_source_written_totalCounterThe total number of records written to a Pulsar topic
pulsar_source_written_1min_totalCounterThe total number of records written to a Pulsar topic in the last 1 minute
pulsar_source_received_totalCounterThe total number of records received from source
pulsar_source_received_1min_totalCounterThe total number of records received from source in the last 1 minute
pulsar_source_last_invocationGaugeThe timestamp of the last invocation of the source
pulsar_source_source_exceptionGaugeThe exception from a source
pulsar_source_source_exceptions_totalCounterThe total number of source exceptions
pulsar_source_source_exceptions_1min_totalCounterThe total number of source exceptions in the last 1 minute
pulsar_source_system_exceptionGaugeThe exception from system code
pulsar_source_system_exceptions_totalCounterThe total number of system exceptions
pulsar_source_system_exceptions_1min_totalCounterThe total number of system exceptions in the last 1 minute
pulsar_source_user_metric_*SummaryThe user-defined metrics

Sink connector metrics

NameTypeDescription
pulsar_sink_written_totalCounterThe total number of records written to a Pulsar topic
pulsar_sink_written_1min_totalCounterThe total number of records written to a Pulsar topic in the last 1 minute
pulsar_sink_received_totalCounterThe total number of records received from sink
pulsar_sink_received_1min_totalCounterThe total number of records received from sink in the last 1 minute
pulsar_sink_last_invocationGaugeThe timestamp of the last invocation of the sink
pulsar_sink_sink_exceptionGaugeThe exception from a sink
pulsar_sink_sink_exceptions_totalCounterThe total number of sink exceptions
pulsar_sink_sink_exceptions_1min_totalCounterThe total number of sink exceptions in the last 1 minute
pulsar_sink_system_exceptionGaugeThe exception from system code
pulsar_sink_system_exceptions_totalCounterThe total number of system exceptions
pulsar_sink_system_exceptions_1min_totalCounterThe total number of system exceptions in the last 1 minute
pulsar_sink_user_metric_*SummaryThe user-defined metrics

Function metrics

NameTypeDescription
pulsar_function_processed_successfully_totalCounterThe total number of messages processed successfully
pulsar_function_processed_successfully_1min_totalCounterThe total number of messages processed successfully in the last 1 minute
pulsar_function_system_exceptions_totalCounterThe total number of system exceptions
pulsar_function_system_exceptions_1min_totalCounterThe total number of system exceptions in the last 1 minute
pulsar_function_user_exceptions_totalCounterThe total number of user exceptions
pulsar_function_user_exceptions_1min_totalCounterThe total number of user exceptions in the last 1 minute
pulsar_function_process_latency_msSummaryThe process latency in milliseconds
pulsar_function_process_latency_ms_1minSummaryThe process latency in milliseconds in the last 1 minute
pulsar_function_last_invocationGaugeThe timestamp of the last invocation of the function
pulsar_function_received_totalCounterThe total number of messages received from source
pulsar_function_received_1min_totalCounterThe total number of messages received from source in the last 1 minute
pulsar_function_user_metric_*SummaryThe user-defined metrics
process_cpu_seconds_totalCounterTotal user and system CPU time spent in seconds.
jvm_memory_bytes_committedGaugeCommitted (bytes) of a given JVM memory area. (Java Functions only)
jvm_memory_bytes_maxGaugeMax (bytes) of a given JVM memory area. (Java Functions only)
jvm_memory_direct_bytes_usedGaugeUsed bytes of a given JVM memory area. (Java Functions only)
jvm_memory_bytes_initGaugeInitial bytes of a given JVM memory area. (Java Functions only)
jvm_gc_collection_seconds_sumSummaryTime spent in a given JVM garbage collector in seconds. (Java Functions only)

Health metrics

NameTypeDescription
pulsar_detector_e2e_latency_msSummaryThe latency distribution from message sending to message consumption
pulsar_detector_publish_latency_msSummaryThe latency distribution of message sending
pulsar_detector_pulsar_sla_messaging_upGaugeThe gauge for indicating the messaging service up or down
pulsar_detector_pulsar_sla_webservice_upgaugeThe gauge for indicating the webservice up or down
pulsar_detector_geo_latency_msSummaryThe latency distribution Latency distribution from message sending to message consumption across clusters

Metrics API integration

Prometheus integration

To collect Pulsar metrics into Prometheus, add the following to your Prometheus configuration file. The bearer tokens have a limited life cycle, therefore it is recommended to use the OAuth2 authentication method.

global:
  scrape_interval: 120s
  scrape_timeout: 60s
scrape_configs:
  - job_name: streamnative
    metrics_path: /v1/cloud/metrics/export
    scheme: https
    oauth2:
      client_id: '${client_id}'
      client_secret: '${client_secret}'
      token_url: https://auth.streamnative.cloud/oauth/token
      endpoint_params:
        grant_type: 'client_credentials'
        audience: '${audience}'
    static_configs:
      - targets: [metrics.streamnative.cloud]

You can find the values of client_id and client_secret in the Key file of a Super Admin Service Account. For more information, see work with service accounts.

The audience parameter is the Uniform Resource Name (URN), which is a combination of the urn:sn:pulsar, the organization name, and the Pulsar instance name at StreamNative:

"urn:sn:pulsar:${org_name}:${instance_name}"

The Prometheus response can be large, if your cluster has a lot of topics. Make sure to set the scrape_timeout parameter large enough to cover the duration of the curl request above. Your scrape_interval parameter should also be larger than your scrape_timeout parameter.

OpenTelemetry collector integration

The OpenTelemetry collector, as described on its official page, is a vendor-agnostic agent process designed for gathering and sending telemetry data from various sources. StreamNative Cloud, which outputs its metrics in the Prometheus format, is compatible with the OpenTelemetry collector. To collect metrics from StreamNative Cloud, configure your OpenTelemetry collector to utilize the Prometheus Receiver, which is fully compatible with Prometheus's scape_config settings.

To configure your collector, refer to the guidance provided in the Prometheus Integration section. There, you will find instructions to create a scape_config for collecting metrics from StreamNative Cloud. This config should be placed in your collector's configuration file under the following section:

receivers:
  prometheus:
    config:

An example of such configuration is as follows:

receivers:
  prometheus:
    config:
      scrape_configs:
        - job_name: streamnative
          metrics_path: /v1/cloud/metrics/export
          scheme: https
          oauth2:
            client_id: '${client_id}'
            client_secret: '${client_secret}'
            token_url: https://auth.streamnative.cloud/oauth/token
            endpoint_params:
              grant_type: 'client_credentials'
              audience: '${audience}'
          static_configs:
            - targets: [metrics.streamnative.cloud]

The OpenTelemetry collector's versatility allows it to support a range of exporters, facilitating the routing of metrics from StreamNative Cloud to various observability platforms. A comprehensive list of supported exporters by the OpenTelemetry collector is available here.

NewRelic integration

Currently, remote writing of metrics directly into NewRelic is not supported. You can use a Prometheus instance to forward metrics to NewRelic. To do this, add a remote_write entry to the prometheus.yml configuration file as described in the Prometheus Integration section:

remote_write:
  - url: https://metric-api.newrelic.com/prometheus/v1/write?prometheus_server=streamnative
    authorization:
      credentials: '${newrelic_ingest_key}'

Note

The NewRelic ingestion point could also be metric-api.eu.newrelic.com depending on your account configuration.

Then by running a Prometheus instance, the Pulsar metrics are scraped from the StreamNative endpoint and forwarded to NewRelic:

prometheus --config.file=prometheus.yml

If you want to keep data from going into this Prometheus instance, you can setup a short retention time with the storage.tsdb.retention.time parameter:

prometheus --config.file=prometheus.yml --storage.tsdb.retention.time=15m

Grafana Cloud integration

Currently, remote writing of metrics directly into Grafana Cloud is not supported. You can use a Prometheus instance to forward metrics to Grafana Cloud. To do this, add a remote_write entry to the prometheus.yml configuration file as described in the Prometheus Integration section:

remote_write:
  - url: ${grafana_cloud_endpoint}/api/prom/push
    basic_auth:
      username: '${grafana_cloud_username}'
      password: '${grafana_cloud_api_key}'

You can find the grafana_cloud_endpoint and grafana_cloud_username values by selecting Prometheus at https://grafana.com/orgs/${grafana_org}. You can find grafana_cloud_api_key at https://grafana.com/orgs/${grafana_org}/api-keys.

Then by running a Prometheus instance, the Pulsar metrics are scraped from the StreamNative endpoint and forwarded to Grafana Cloud:

prometheus --config.file=prometheus.yml

If you want to keep data from going into this Prometheus instance, you can setup a short retention time with the storage.tsdb.retention.time parameter:

prometheus --config.file=prometheus.yml --storage.tsdb.retention.time=15m

Datadog integration

Integrate with Datadog Agent

Note

The integration with StreamNative Cloud requires the PR 16812 which released in the Datadog Agent 7.52.0.

Using Datadog Agent, you can connect Datadog to the StreamNative Cloud Metrics endpoint to start collecting metrics. Datadog Agent supports most platform to host and this documentation will mainly to demonstrate with Docker and Kubernetes.

Create a file conf.yaml, with the spec of your Datadog Agent deployment configuration.

init_config:
  service: docker

instances:
  - openmetrics_endpoint: https://metrics.streamnative.cloud/v1/cloud/metrics/export
    request_size: 900
    min_collection_interval: 180
    metrics:
      - pulsar_topics_count:
          type: gauge
          name: pulsar_topics_count
      - pulsar_subscriptions_count:
          type: gauge
          name: pulsar_subscriptions_count
      - pulsar_producers_count:
          type: gauge
          name: pulsar_producers_count
      - pulsar_consumers_count:
          type: gauge
          name: pulsar_consumers_count
      - pulsar_rate_in:
          type: gauge
          name: pulsar_rate_in
      - pulsar_rate_out:
          type: gauge
          name: pulsar_rate_out
      - pulsar_throughput_in:
          type: gauge
          name: pulsar_throughput_in
      - pulsar_throughput_out:
          type: gauge
          name: pulsar_throughput_out
      - pulsar_storage_size:
          type: gauge
          name: pulsar_storage_size
      - pulsar_storage_backlog_size:
          type: gauge
          name: pulsar_storage_backlog_size
      - pulsar_storage_offloaded_size:
          type: gauge
          name: pulsar_storage_offloaded_size
      - pulsar_storage_read_rate:
          type: gauge
          name: pulsar_storage_read_rate
      - pulsar_subscription_delayed:
          type: gauge
          name: pulsar_subscription_delayed
      - pulsar_storage_write_latency_le_0_5:
          type: histogram
          name: pulsar_storage_write_latency_le_0_5
      - pulsar_storage_write_latency_le_1:
          type: histogram
          name: pulsar_storage_write_latency_le_1
      - pulsar_storage_write_latency_le_5:
          type: histogram
          name: pulsar_storage_write_latency_le_5
      - pulsar_storage_write_latency_le_10:
          type: histogram
          name: pulsar_storage_write_latency_le_10
      - pulsar_storage_write_latency_le_20:
          type: histogram
          name: pulsar_storage_write_latency_le_20
      - pulsar_storage_write_latency_le_50:
          type: histogram
          name: pulsar_storage_write_latency_le_50
      - pulsar_storage_write_latency_le_100:
          type: histogram
          name: pulsar_storage_write_latency_le_100
      - pulsar_storage_write_latency_le_200:
          type: histogram
          name: pulsar_storage_write_latency_le_200
      - pulsar_storage_write_latency_le_1000:
          type: histogram
          name: pulsar_storage_write_latency_le_1000
      - pulsar_storage_write_latency_le_overflow:
          type: histogram
          name: pulsar_storage_write_latency_le_overflow
      - pulsar_entry_size_le_128:
          type: histogram
          name: pulsar_entry_size_le_128
      - pulsar_entry_size_le_512:
          type: histogram
          name: pulsar_entry_size_le_512
      - pulsar_entry_size_le_1_kb:
          type: histogram
          name: pulsar_entry_size_le_1_kb
      - pulsar_entry_size_le_4_kb:
          type: histogram
          name: pulsar_entry_size_le_4_kb
      - pulsar_entry_size_le_16_kb:
          type: histogram
          name: pulsar_entry_size_le_16_kb
    auth_token:
      reader:
        type: oauth
        url: https://auth.streamnative.cloud/oauth/token
        client_id: { your-admin-service-account-client-id }
        client_secret: { your-admin-service-account-client-secret }
        options:
          audience: urn:sn:pulsar:{your-organization}:{your-instance}
      writer:
        type: header
        name: Authorization
        value: Bearer <TOKEN>
        placeholder: <TOKEN>

Run the docker commands to create a Datadog Agent container:

docker run -d --name dd-agent \
-e DD_API_KEY={ your-Datadog-API-Key } \
-e DD_SITE={ your-Datadog-Site-region } \
-e DD_APM_NON_LOCAL_TRAFFIC=true \
-v {your-config-yaml-file-path}:/etc/datadog-agent/conf.d/openmetrics.d/conf.yaml:ro \
-v /var/run/docker.sock:/var/run/docker.sock:ro \
-v /proc/:/host/proc/:ro \
-v /sys/fs/cgroup/:/host/sys/fs/cgroup:ro \
-v /var/lib/docker/containers:/var/lib/docker/containers:ro \
datadog/agent:7.52.0
  • [1] DD_API_KEY: Your Datadog API key.
  • [2] DD_SITE: Destination site for your metrics, traces, and logs. Set your Datadog site to: datadoghq.com. Defaults to datadoghq.com.
  • [3] your-config-yaml-file-path: The conf.yaml configuration file created in the first step.

More detailed usage please refer the Docker Agent for Docker.

Bridge with OpenTelemetry

You can use OpenTelemetry Collector to collect the metrics from StreamNative Cloud and export them to Datadog.

To export metrics to Datadog, you can use the Datadog Exporter and add it to your OpenTelemetry Collector configuration. Use the example file which provides a basic configuration that is ready to use after you set your Datadog API key as the ${DD_API_KEY} variable:

receivers:
  prometheus:
    config:
      scrape_configs:
        - job_name: streamnative
          metrics_path: /v1/cloud/metrics/export
          scheme: https
          oauth2:
            client_id: '${client_id}'
            client_secret: '${client_secret}'
            token_url: https://auth.streamnative.cloud/oauth/token
            endpoint_params:
              grant_type: 'client_credentials'
              audience: '${audience}'
          static_configs:
            - targets: [metrics.streamnative.cloud]

processors:
  batch:
    send_batch_max_size: '10MiB'
    send_batch_size: 4096
    timeout: 120s

exporters:
  datadog:
    api:
      site: ${DD_SITE}
      key: ${DD_API_KEY}

service:
  pipelines:
    metrics:
      receivers: [prometheus]
      processors: [batch]
      exporters: [datadog]

Where ${DD_SITE} is your site, .

The above configuration enables the receiving of metrics from StreamNative Cloud, sets up a batch processor, which is mandatory for any non-development environment, and exports to Datadog. You can refer to this full documented example configuration file for all possible configuration options for Datadog Exporter.

Previous
Audit log