1. APIs
  2. CloudAPI

Cloud API

The StreamNative cloud API is an extension of the kubernetes API that you can use to manage resources on the StreamNative cloud.

You can use the StreamNative cloud API to manage serivce accounts, users, Pulsar Instances, Pulsar Clusters, and ApiKeys programatically.

Prerequisites

Authentication

StreamNative cloud supports OAuth2 authentication to managing cloud resources. You can use any language http OAuth2 client to connect to the SN cloud.

Run the OAuth2 client credential flow using the curl command to get an access token to access the cloud. Download the service account with admin permissions from the console, then run the following command.

curl --request POST -v \
   --url https://auth.streamnative.cloud/oauth/token \
   --header 'content-type: application/json' \
   --data '{"client_id": "<your-client-id>", "client_secret": "<your-client-secret>", "audience": "https://api.streamnative.cloud", "grant_type": "client_credentials"}'

Use go http module run OAuth2 client credential flow to get access token

package main

import (
	"bytes"
	"io"
	"net/http"
)

func main() {

	url := "https://auth.streamnative.cloud/oauth/token"
	var jsonStr = []byte(`{
	"client_id":"<your-client-id>",
	"client_secret":"<your-client-secret>",
	"audience":"https://api.streamnative.cloud",
	"grant_type":"client_credentials"
}`)
	req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonStr))
	req.Header.Set("Content-Type", "application/json")

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		panic(err)
	}
	defer resp.Body.Close()

	_, _ = io.ReadAll(resp.Body)
}

Response:

{
  "access_token": "<your-access-token>",
  "expires_in": 84086,
  "token_type": "Bearer"
}

Management ServiceAccount

List ServiceAccounts

 	host := "https://api.streamnative.cloud"
	pathPrefix := "/apis/cloud.streamnative.io/v1alpha1/namespaces/"
	organization := "<your-organization-name>"
	token := "<your-access-token>"
	req, err := http.NewRequest(
		"GET",
		fmt.Sprintf("%s%s%s%s", host, pathPrefix, organization, "/serviceaccounts"),
		nil)
	if err != nil {
		panic(err)
	}
	req.Header.Set("Authorization", "bearer "+token)
	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		panic(err)
	}
	defer resp.Body.Close()
	res, _ := io.ReadAll(resp.Body)
	fmt.Println(string(res))

Response:

{
  "kind": "ServiceAccountList",
  "apiVersion": "cloud.streamnative.io/v1alpha1",
  "metadata": {
    "resourceVersion": "3110513"
  },
  "items": [
  ]
}

Create ServiceAccount

Request Body:

{
  "apiVersion": "cloud.streamnative.io/v1alpha1",
  "kind": "ServiceAccount",
  "metadata": {
    "name": "<your-service-account-name>",
    "namespace": "<your-organization-name>",
    "annotations": {
      "annotations.cloud.streamnative.io/service-account-role": "admin"
    }
  }
}

If you set annotations.cloud.streamnative.io/service-account-role to "", it will create a service account with non-admin permissions

Example:

	var jsonStr = []byte(`{
	  "apiVersion": "cloud.streamnative.io/v1alpha1",
	  "kind": "ServiceAccount",
	  "metadata": {
		"name": "<your-service-account-name>",
		"namespace": "<your-organization-name>",
		"annotations": {
		  "annotations.cloud.streamnative.io/service-account-role": "admin"
		}
	  }
	}
`)

 	host := "https://api.streamnative.cloud"
	pathPrefix := "/apis/cloud.streamnative.io/v1alpha1/namespaces/"
	organization := "<your-organization-name>"
	token := "<your-access-token>"
	req, err := http.NewRequest(
		"POST", fmt.Sprintf("%s%s%s%s", host, pathPrefix, organization, "/serviceaccounts"),
		bytes.NewBuffer(jsonStr))
	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("Authorization", "bearer "+token)

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		panic(err)
	}
	defer resp.Body.Close()

	res, _ := io.ReadAll(resp.Body)
	fmt.Println(string(res))

Response:

{
  "kind": "ServiceAccount",
  "apiVersion": "cloud.streamnative.io/v1alpha1",
  "metadata": {
    "name": "<your-service-account-name>",
    "namespace": "<your-organization-name>",
    "creationTimestamp": "2024-05-11T09:12:14Z",
    "annotations": {
      "annotations.cloud.streamnative.io/service-account-role": "admin"
    },
  },
  "spec": {},
  "status": {}
}

Get Service Account

	host := "https://api.streamnative.cloud"
	pathPrefix := "/apis/cloud.streamnative.io/v1alpha1/namespaces/"
	organization := "<your-organization-name>"
	token := "<your-access-token>"
	req, err := http.NewRequest(
		"GET",
		fmt.Sprintf("%s%s%s%s%s", host, pathPrefix, organization, "/serviceaccounts/", "<your-serviceaccount-name>"),
		nil)
	if err != nil {
		panic(err)
	}
	req.Header.Set("Authorization", "bearer "+token)
	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		panic(err)
	}
	defer resp.Body.Close()
	res, _ := io.ReadAll(resp.Body)
	fmt.Println(string(res))

Response:

{
  "kind": "ServiceAccount",
  "apiVersion": "cloud.streamnative.io/v1alpha1",
  "metadata": {
    "name": "<your-service-account-name>",
    "namespace": "<your-organization-name>",
    "creationTimestamp": "2024-05-11T09:12:14Z",
    "annotations": {
      "annotations.cloud.streamnative.io/service-account-role": "admin",
    },
  },
  "spec": {},
  "status": {
    "privateKeyType": "TYPE_SN_CREDENTIALS_FILE",
    "privateKeyData": "<base64-encode-data>",
    "conditions": [
      {
        "type": "Ready",
        "status": "True",
        "reason": "Provisioned",
        "lastTransitionTime": "2024-05-11T09:12:15Z"
      }
    ]
  }
}

conditions status is ready means that the service account was created successfully

privateKeyData is the base64-encoded credentials to the service account

Delete Service Account

	host := "https://api.streamnative.cloud"
	pathPrefix := "/apis/cloud.streamnative.io/v1alpha1/namespaces/"
	organization := "<your-organization-name>"
	token := "<your-access-token>"
	req, err := http.NewRequest(
		"DELETE",
		fmt.Sprintf("%s%s%s%s%s", host, pathPrefix, organization, "/serviceaccounts/", "test-sa"),
		nil)
	if err != nil {
		panic(err)
	}
	req.Header.Set("Authorization", "bearer "+token)
	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		panic(err)
	}
	defer resp.Body.Close()
	res, _ := io.ReadAll(resp.Body)
	fmt.Println(string(res))

Response:

{
  "kind": "ServiceAccount",
  "apiVersion": "cloud.streamnative.io/v1alpha1",
  "metadata": {
     "name": "<your-service-account-name>",
    "namespace": "<your-organization-name>",
    "uid": "ef7e24e8-536b-48b4-aeeb-7333531f12b2",
    "resourceVersion": "3110507",
    "generation": 2,
    "creationTimestamp": "2024-05-11T10:47:15Z",
    "deletionTimestamp": "2024-05-11T10:51:12Z",
    "deletionGracePeriodSeconds": 0,
    "annotations": {
      "annotations.cloud.streamnative.io/service-account-role": "admin",
      "cloud.streamnative.io/allowed-origin-set": "true"
    },
    "finalizers": [
      "serviceaccount.finalizers.cloud.streamnative.io"
    ],
  },
	......
}

deletionTimestamp means this resource will be deleted

Management PulsarInstance

List PulsarInstances

See list service accounts, just replace serviceaccounts with pulsarinstances

Create PulsarInstance

var jsonStr = []byte(`{
		  "apiVersion": "cloud.streamnative.io/v1alpha1",
		  "kind": "PulsarInstance",
		  "metadata": {
			"name": "<your-instance-name>",
			"namespace": "<your-organization-name>"
		  },
		  "spec": {
				"availabilityMode": "zonal",
				"poolRef": {
				  "name": "shared-aws",
	 			  "namespace": "streamnative"
				}
		 }
}`)

	host := "https://api.streamnative.cloud"
	pathPrefix := "/apis/cloud.streamnative.io/v1alpha1/namespaces/"
	organization := "<your-organization-name>"
	token := "<your-access-token>"
	req, err := http.NewRequest(
		"POST", fmt.Sprintf("%s%s%s%s", host, pathPrefix, organization, "/pulsarinstances"),
		bytes.NewBuffer(jsonStr))
	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("Authorization", "bearer "+token)

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		panic(err)
	}
	defer resp.Body.Close()

	res, _ := io.ReadAll(resp.Body)
	fmt.Println(string(res))

availabilityMode and poolRef Please take a look at the pulsarinsatnce section of snctl

Response:

{
  "kind": "PulsarInstance",
  "apiVersion": "cloud.streamnative.io/v1alpha1",
  "metadata": {
		"name": "<your-instance-name>",
		"namespace": "<your-organization-name>",
    "uid": "d9036adf-ba1f-4f7b-a199-de4159bc7e47",
    "resourceVersion": "3110630",
    "generation": 1,
    "creationTimestamp": "2024-05-11T11:55:19Z",
    "annotations": {
      "annotations.cloud.streamnative.io/istio-enabled": "true"
    },
  },
  "spec": {
    "poolRef": {
      "namespace": "streamnative",
      "name": "shared-aws"
    },
    "availabilityMode": "zonal",
    "type": "standard",
    "auth": {
      "apikey": {}
    }
  },
  "status": {
    "auth": null
  }
}

Get PulsarInstance

See get service account, just replace serviceaccounts with pulsarinstances

Delete PulsarInstance

See delete service account, just replace serviceaccounts with pulsarinstances

Management User

List User

See list service accounts, just replace serviceaccounts with users

Get User

See get service account, just replace serviceaccounts with users

Create User

var jsonStr = []byte(`{
		  "apiVersion": "cloud.streamnative.io/v1alpha1",
		  "kind": "User",
		  "metadata": {
			"name": "<your-user-name>",
			"namespace": "<your-organization-name>"
		  },
		  "spec": {
				"email": "<user-email>"
		 }
}`)

	host := "https://api.streamnative.cloud"
	pathPrefix := "/apis/cloud.streamnative.io/v1alpha1/namespaces/"
	organization := "<your-organization-name>"
	token := "<your-access-token>"
	req, err := http.NewRequest(
		"POST", fmt.Sprintf("%s%s%s%s", host, pathPrefix, organization, "/users"),
		bytes.NewBuffer(jsonStr))
	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("Authorization", "bearer "+token)

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		panic(err)
	}
	defer resp.Body.Close()

	res, _ := io.ReadAll(resp.Body)
	fmt.Println(string(res))

Response:

{
  "kind": "User",
  "apiVersion": "cloud.streamnative.io/v1alpha1",
  "metadata": {
		"name": "<your-instance-name>",
		"namespace": "<your-organization-name>",
    "uid": "ff7fbc4f-4037-4d6d-afb4-4304d57b33e8",
    "resourceVersion": "3110657",
    "generation": 1,
    "creationTimestamp": "2024-05-11T12:06:02Z",
  },
  "spec": {
    "email": "<user-email>"
  },
  "status": {}
}

Delete User

See delete service account, just replace serviceaccounts with users

Management PulsarCluster

List PulsarCluster

See list service accounts, just replace serviceaccounts with pulsarclusters

Create PulsarCluster

var jsonStr = []byte(`{
		  "apiVersion": "cloud.streamnative.io/v1alpha1",
		  "kind": "PulsarCluster",
		  "metadata": {
			"name": "<your-user-name>",
			"namespace": "<your-organization-name>"
		  },
		  "spec": {
				"instanceName": "test-instance",
				"location": "us-east-2",
				"bookkeeper": {
				  "replicas": 3,
					"resources": {
						"cpu":"400m",
						"memory":"1.6Gi"
					}
				},
				"broker": {
					"replicas": 2,
					"resources": {
						"cpu":"400m",
						"memory":"1.6Gi"
					}
				},
	 			"config": {
					"custom": {
						"backlogQuotaDefaultLimitBytes": "1000000000"
					},
					"functionEnabled": true
				}
		 }
}`)

	host := "https://api.streamnative.cloud"
	pathPrefix := "/apis/cloud.streamnative.io/v1alpha1/namespaces/"
	organization := "<your-organization-name>"
	token := "<your-access-token>"
	req, err := http.NewRequest(
		"POST", fmt.Sprintf("%s%s%s%s", host, pathPrefix, organization, "/pulsarclusters"),
		bytes.NewBuffer(jsonStr))
	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("Authorization", "bearer "+token)

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		panic(err)
	}
	defer resp.Body.Close()

	res, _ := io.ReadAll(resp.Body)
	fmt.Println(string(res))

Spec : Please take a look at the pulsarcluster section of snctl

Response:

{
  "kind": "PulsarCluster",
  "apiVersion": "cloud.streamnative.io/v1alpha1",
  "metadata": {
		"name": "<your-user-name>",
		"namespace": "<your-organization-name>"
    "uid": "d05258aa-ed83-4d61-ac61-d3f040b48204",
    "resourceVersion": "3110692",
    "generation": 1,
    "creationTimestamp": "2024-05-11T12:21:41Z",
  },
  "spec": {
    "instanceName": "test-instance",
    "location": "us-east-2",
    "poolMemberRef": {
      "namespace": "streamnative",
      "name": "<poolmember-name>"
    },
    "serviceEndpoints": [
      {
        "dnsName": "<cluster-dns-name>",
        "type": "service"
      }
    ],
    "broker": {
      "replicas": 2,
      "image": "docker-proxy.streamnative.io/streamnative/pulsar-cloud:3.2.2.5",
      "resources": {
        "cpu": "400m",
        "memory": "1717986918400m",
        "heapPercentage": 0,
        "directPercentage": 0
      }
    },
    "bookkeeper": {
      "replicas": 3,
      "image": "docker-proxy.streamnative.io/streamnative/pulsar-cloud:3.2.2.5",
      "resources": {
        "cpu": "400m",
        "memory": "1717986918400m",
        "heapPercentage": 0,
        "directPercentage": 0,
        "ledgerDisk": "16Gi",
        "journalDisk": "3Gi"
      }
    },
    "config": {
      "functionEnabled": true,
      "protocols": {
        "kafka": {}
      },
      "custom": {
        "backlogQuotaDefaultLimitBytes": "1000000000",
        "managedLedgerOffloadAutoTriggerSizeThresholdBytes": "0"
      }
    },
    "releaseChannel": "rapid"
  },
  "status": {}
}

Get PulsarClustser

See get service account, just replace serviceaccounts with pulsarclusters

Update PulsarCluster

var jsonStr = []byte(`{
			  "spec": {
					"broker": {
							"replicas": 3
					}
			 }
	}`)

	host := "https://api.streamnative.cloud"
	pathPrefix := "/apis/cloud.streamnative.io/v1alpha1/namespaces/"
	organization := "<your-organization-name>"
	token := "<your-access-token>"
	req, err := http.NewRequest(
		"PATCH", fmt.Sprintf("%s%s%s%s%s", host, pathPrefix, organization, "/pulsarclusters/", "<your-pulsar-cluster-name>"),
		bytes.NewBuffer(jsonStr))
	req.Header.Set("Content-Type", "application/merge-patch+json")
	req.Header.Set("Authorization", "bearer "+token)

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		panic(err)
	}
	defer resp.Body.Close()

	res, _ := io.ReadAll(resp.Body)
	fmt.Println(string(res))

Delete PulsarCluster

See delete service account, just replace serviceaccounts with pulsarclusters

Management Apikey

List Apikey

See list service accounts, just replace serviceaccounts with apikeys

Create Apikey

Generate public key and private key, please refer to this module to generate it. Please save the pem and private key in a secure place, it will be used to encrypt and decrypt your apikey token in the future

 	privateKey, err := GenerateEncryptionKey()
	if err != nil {
		panic(err)
	}
	encryptionKey, err := ExportPublicKey(privateKey)
	if err != nil {
		panic(err)
	}
	fmt.Println(encryptionKey.PEM)
	fmt.Println(base64.StdEncoding.EncodeToString([]byte(ExportPrivateKey(privateKey))))
 	pem := `-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAoONi2FfPcpddBKZCxpXD\nBfEylQeoaOWQhFEnnMnAE2teeDgz92XUVj5xQj/ldqkIakje7RfPiGOpCYwoJxwf\nrahB+FpQTrF28POASEEJctNDUkp5DI/Dxx64zUP8gF+z8djsOB3YhSlrR6zZm4jS\nuseF5l97njzJeiZhd40of0LbvfwuEklaQwuze0KuY4Kq6xACiOH91bI8mxh6qDDP\nyUUPr5nbUyMxgyNPke0oqUDexY3RtzB7wYJksPEPFPnyRQ5QMqMoTy2ydCoULiZW\n6FvolFwjgPWgcIEkDjYo+x0x6CHAvILiNSzN47Ooq6vVnILuJGbZkhsHT6EU68NJ\nNwIDAQAB\n-----END PUBLIC KEY-----`
	var jsonStr = `{
			  "apiVersion": "cloud.streamnative.io/v1alpha1",
			  "kind": "APIKey",
			  "metadata": {
		    "name": "<your-apikey-name>",
		    "namespace": "<your-organization-name>"
			  },
			  "spec": {
				"encryptionKey": {
					"pem": "%s"
				},
				"instanceName": "test-instance",
				"serviceAccountName": "admin",
				"expirationTime": "2024-06-10T13:08:42Z"
			 }
	}`
	host := "https://api.streamnative.cloud"
	pathPrefix := "/apis/cloud.streamnative.io/v1alpha1/namespaces/"
	organization := "<your-organization-name>"
	token := "<your-access-token>"
	req, err := http.NewRequest(
		"POST", fmt.Sprintf("%s%s%s%s", host, pathPrefix, organization, "/apikeys"),
		bytes.NewBuffer([]byte(fmt.Sprintf(jsonStr, pem))))
	req.Header.Set("Content-Type", "application/json")
	req.Header.Set("Authorization", "bearer "+token)

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		panic(err)
	}
	defer resp.Body.Close()
	res, _ := io.ReadAll(resp.Body)
	fmt.Println(string(res))

Response:

{
  "kind": "APIKey",
  "apiVersion": "cloud.streamnative.io/v1alpha1",
  "metadata": {
    "name": "<your-apikey-name>",
    "namespace": "<your-organization-name>",
    "uid": "502f340f-7316-4541-ba5f-37cca9aa0162",
    "resourceVersion": "3110875",
    "generation": 1,
    "creationTimestamp": "2024-05-11T14:04:56Z",
    "ownerReferences": [
      {
        "apiVersion": "cloud.streamnative.io/v1alpha1",
        "kind": "ServiceAccount",
        "name": "admin",
        "uid": "9867d06e-d81b-4901-aee7-25bd2dffcf63",
        "controller": true,
        "blockOwnerDeletion": true
      }
    ]
  },
  "spec": {
    "instanceName": "test-instance",
    "serviceAccountName": "admin",
    "expirationTime": "2024-06-10T13:08:42Z",
    "encryptionKey": {
      "pem": "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAoONi2FfPcpddBKZCxpXD\nBfEylQeoaOWQhFEnnMnAE2teeDgz92XUVj5xQj/ldqkIakje7RfPiGOpCYwoJxwf\nrahB+FpQTrF28POASEEJctNDUkp5DI/Dxx64zUP8gF+z8djsOB3YhSlrR6zZm4jS\nuseF5l97njzJeiZhd40of0LbvfwuEklaQwuze0KuY4Kq6xACiOH91bI8mxh6qDDP\nyUUPr5nbUyMxgyNPke0oqUDexY3RtzB7wYJksPEPFPnyRQ5QMqMoTy2ydCoULiZW\n6FvolFwjgPWgcIEkDjYo+x0x6CHAvILiNSzN47Ooq6vVnILuJGbZkhsHT6EU68NJ\nNwIDAQAB\n-----END PUBLIC KEY-----"
    }
  },
  "status": {}
}

Get Apikey

See get service account, just replace serviceaccounts with apikeys

If you save the privateKey from above, you can, you can convert status.encryptedToken.jwe to original token, ImportPrivateKey from this module. If you lost the privateKey, you will no longer be able to get the original token

	privateKey := "<your-private-key>"
	key, err := base64.StdEncoding.DecodeString(privateKey)
	if err != nil {
		panic(err)
	}
	pk, err := ImportPrivateKey(string(key))
	if err != nil {
		panic(err)
	}
	token, err := jwe.Decrypt([]byte("<your-api-key-status-encryptedToken-jwe>"), jwe.WithKey(jwa.RSA_OAEP, pk))
	if err != nil {
		panic(err)
	}
	fmt.Println(string(token))

Revoke ApiKey

var jsonStr = []byte(`{
			  "spec": {
					"revoke": true
			 }
	}`)

	host := "https://api.streamnative.cloud"
	pathPrefix := "/apis/cloud.streamnative.io/v1alpha1/namespaces/"
	organization := "<your-organization-name>"
	token := "<your-access-token>"
	req, err := http.NewRequest(
		"PATCH", fmt.Sprintf("%s%s%s%s%s", host, pathPrefix, organization, "/apikeys/", "<your-apikey-name>"),
		bytes.NewBuffer(jsonStr))
	req.Header.Set("Content-Type", "application/merge-patch+json")
	req.Header.Set("Authorization", "bearer "+token)

	client := &http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		panic(err)
	}
	defer resp.Body.Close()

	res, _ := io.ReadAll(resp.Body)
	fmt.Println(string(res))

Response:


	{
		"type": "Revoked",
		"status": "True",
		"lastTransitionTime": "2024-05-11T14:04:58Z",
		"reason": "API Key has been revoked",
		"message": ""
	},

Delete Apikey

See get service account, just replace serviceaccounts with apikeys

Previous
Message Rest API reference