- APIs
- CloudAPI
Cloud API
The StreamNative cloud API is an extension of the kubernetes API that you can use to manage resources on the StreamNative cloud.
You can use the StreamNative cloud API to manage serivce accounts, users, Pulsar Instances, Pulsar Clusters, and ApiKeys programatically.
Prerequisites
- You need to have an organization
- You need a service account with admin permission
Authentication
StreamNative cloud supports OAuth2 authentication to managing cloud resources. You can use any language http OAuth2 client to connect to the SN cloud.
Run the OAuth2 client credential flow using the curl command to get an access token to access the cloud. Download the service account with admin permissions from the console, then run the following command.
curl --request POST -v \
--url https://auth.streamnative.cloud/oauth/token \
--header 'content-type: application/json' \
--data '{"client_id": "<your-client-id>", "client_secret": "<your-client-secret>", "audience": "https://api.streamnative.cloud", "grant_type": "client_credentials"}'
Use go http module run OAuth2 client credential flow to get access token
package main
import (
"bytes"
"io"
"net/http"
)
func main() {
url := "https://auth.streamnative.cloud/oauth/token"
var jsonStr = []byte(`{
"client_id":"<your-client-id>",
"client_secret":"<your-client-secret>",
"audience":"https://api.streamnative.cloud",
"grant_type":"client_credentials"
}`)
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonStr))
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
_, _ = io.ReadAll(resp.Body)
}
Response:
{
"access_token": "<your-access-token>",
"expires_in": 84086,
"token_type": "Bearer"
}
Management ServiceAccount
List ServiceAccounts
host := "https://api.streamnative.cloud"
pathPrefix := "/apis/cloud.streamnative.io/v1alpha1/namespaces/"
organization := "<your-organization-name>"
token := "<your-access-token>"
req, err := http.NewRequest(
"GET",
fmt.Sprintf("%s%s%s%s", host, pathPrefix, organization, "/serviceaccounts"),
nil)
if err != nil {
panic(err)
}
req.Header.Set("Authorization", "bearer "+token)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
res, _ := io.ReadAll(resp.Body)
fmt.Println(string(res))
Response:
{
"kind": "ServiceAccountList",
"apiVersion": "cloud.streamnative.io/v1alpha1",
"metadata": {
"resourceVersion": "3110513"
},
"items": [
]
}
Create ServiceAccount
Request Body:
{
"apiVersion": "cloud.streamnative.io/v1alpha1",
"kind": "ServiceAccount",
"metadata": {
"name": "<your-service-account-name>",
"namespace": "<your-organization-name>",
"annotations": {
"annotations.cloud.streamnative.io/service-account-role": "admin"
}
}
}
If you set annotations.cloud.streamnative.io/service-account-role
to ""
, it will create a service account with non-admin permissions
Example:
var jsonStr = []byte(`{
"apiVersion": "cloud.streamnative.io/v1alpha1",
"kind": "ServiceAccount",
"metadata": {
"name": "<your-service-account-name>",
"namespace": "<your-organization-name>",
"annotations": {
"annotations.cloud.streamnative.io/service-account-role": "admin"
}
}
}
`)
host := "https://api.streamnative.cloud"
pathPrefix := "/apis/cloud.streamnative.io/v1alpha1/namespaces/"
organization := "<your-organization-name>"
token := "<your-access-token>"
req, err := http.NewRequest(
"POST", fmt.Sprintf("%s%s%s%s", host, pathPrefix, organization, "/serviceaccounts"),
bytes.NewBuffer(jsonStr))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "bearer "+token)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
res, _ := io.ReadAll(resp.Body)
fmt.Println(string(res))
Response:
{
"kind": "ServiceAccount",
"apiVersion": "cloud.streamnative.io/v1alpha1",
"metadata": {
"name": "<your-service-account-name>",
"namespace": "<your-organization-name>",
"creationTimestamp": "2024-05-11T09:12:14Z",
"annotations": {
"annotations.cloud.streamnative.io/service-account-role": "admin"
},
},
"spec": {},
"status": {}
}
Get Service Account
host := "https://api.streamnative.cloud"
pathPrefix := "/apis/cloud.streamnative.io/v1alpha1/namespaces/"
organization := "<your-organization-name>"
token := "<your-access-token>"
req, err := http.NewRequest(
"GET",
fmt.Sprintf("%s%s%s%s%s", host, pathPrefix, organization, "/serviceaccounts/", "<your-serviceaccount-name>"),
nil)
if err != nil {
panic(err)
}
req.Header.Set("Authorization", "bearer "+token)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
res, _ := io.ReadAll(resp.Body)
fmt.Println(string(res))
Response:
{
"kind": "ServiceAccount",
"apiVersion": "cloud.streamnative.io/v1alpha1",
"metadata": {
"name": "<your-service-account-name>",
"namespace": "<your-organization-name>",
"creationTimestamp": "2024-05-11T09:12:14Z",
"annotations": {
"annotations.cloud.streamnative.io/service-account-role": "admin",
},
},
"spec": {},
"status": {
"privateKeyType": "TYPE_SN_CREDENTIALS_FILE",
"privateKeyData": "<base64-encode-data>",
"conditions": [
{
"type": "Ready",
"status": "True",
"reason": "Provisioned",
"lastTransitionTime": "2024-05-11T09:12:15Z"
}
]
}
}
conditions status is ready means that the service account was created successfully
privateKeyData is the base64-encoded credentials to the service account
Delete Service Account
host := "https://api.streamnative.cloud"
pathPrefix := "/apis/cloud.streamnative.io/v1alpha1/namespaces/"
organization := "<your-organization-name>"
token := "<your-access-token>"
req, err := http.NewRequest(
"DELETE",
fmt.Sprintf("%s%s%s%s%s", host, pathPrefix, organization, "/serviceaccounts/", "test-sa"),
nil)
if err != nil {
panic(err)
}
req.Header.Set("Authorization", "bearer "+token)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
res, _ := io.ReadAll(resp.Body)
fmt.Println(string(res))
Response:
{
"kind": "ServiceAccount",
"apiVersion": "cloud.streamnative.io/v1alpha1",
"metadata": {
"name": "<your-service-account-name>",
"namespace": "<your-organization-name>",
"uid": "ef7e24e8-536b-48b4-aeeb-7333531f12b2",
"resourceVersion": "3110507",
"generation": 2,
"creationTimestamp": "2024-05-11T10:47:15Z",
"deletionTimestamp": "2024-05-11T10:51:12Z",
"deletionGracePeriodSeconds": 0,
"annotations": {
"annotations.cloud.streamnative.io/service-account-role": "admin",
"cloud.streamnative.io/allowed-origin-set": "true"
},
"finalizers": [
"serviceaccount.finalizers.cloud.streamnative.io"
],
},
......
}
deletionTimestamp
means this resource will be deleted
Management PulsarInstance
List PulsarInstances
See list service accounts, just replace serviceaccounts
with pulsarinstances
Create PulsarInstance
var jsonStr = []byte(`{
"apiVersion": "cloud.streamnative.io/v1alpha1",
"kind": "PulsarInstance",
"metadata": {
"name": "<your-instance-name>",
"namespace": "<your-organization-name>"
},
"spec": {
"availabilityMode": "zonal",
"poolRef": {
"name": "shared-aws",
"namespace": "streamnative"
}
}
}`)
host := "https://api.streamnative.cloud"
pathPrefix := "/apis/cloud.streamnative.io/v1alpha1/namespaces/"
organization := "<your-organization-name>"
token := "<your-access-token>"
req, err := http.NewRequest(
"POST", fmt.Sprintf("%s%s%s%s", host, pathPrefix, organization, "/pulsarinstances"),
bytes.NewBuffer(jsonStr))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "bearer "+token)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
res, _ := io.ReadAll(resp.Body)
fmt.Println(string(res))
availabilityMode
and poolRef
Please take a look at the pulsarinsatnce section of snctl
Response:
{
"kind": "PulsarInstance",
"apiVersion": "cloud.streamnative.io/v1alpha1",
"metadata": {
"name": "<your-instance-name>",
"namespace": "<your-organization-name>",
"uid": "d9036adf-ba1f-4f7b-a199-de4159bc7e47",
"resourceVersion": "3110630",
"generation": 1,
"creationTimestamp": "2024-05-11T11:55:19Z",
"annotations": {
"annotations.cloud.streamnative.io/istio-enabled": "true"
},
},
"spec": {
"poolRef": {
"namespace": "streamnative",
"name": "shared-aws"
},
"availabilityMode": "zonal",
"type": "standard",
"auth": {
"apikey": {}
}
},
"status": {
"auth": null
}
}
Get PulsarInstance
See get service account, just replace serviceaccounts
with pulsarinstances
Delete PulsarInstance
See delete service account, just replace serviceaccounts
with pulsarinstances
Management User
List User
See list service accounts, just replace serviceaccounts
with users
Get User
See get service account, just replace serviceaccounts
with users
Create User
var jsonStr = []byte(`{
"apiVersion": "cloud.streamnative.io/v1alpha1",
"kind": "User",
"metadata": {
"name": "<your-user-name>",
"namespace": "<your-organization-name>"
},
"spec": {
"email": "<user-email>"
}
}`)
host := "https://api.streamnative.cloud"
pathPrefix := "/apis/cloud.streamnative.io/v1alpha1/namespaces/"
organization := "<your-organization-name>"
token := "<your-access-token>"
req, err := http.NewRequest(
"POST", fmt.Sprintf("%s%s%s%s", host, pathPrefix, organization, "/users"),
bytes.NewBuffer(jsonStr))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "bearer "+token)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
res, _ := io.ReadAll(resp.Body)
fmt.Println(string(res))
Response:
{
"kind": "User",
"apiVersion": "cloud.streamnative.io/v1alpha1",
"metadata": {
"name": "<your-instance-name>",
"namespace": "<your-organization-name>",
"uid": "ff7fbc4f-4037-4d6d-afb4-4304d57b33e8",
"resourceVersion": "3110657",
"generation": 1,
"creationTimestamp": "2024-05-11T12:06:02Z",
},
"spec": {
"email": "<user-email>"
},
"status": {}
}
Delete User
See delete service account, just replace serviceaccounts
with users
Management PulsarCluster
List PulsarCluster
See list service accounts, just replace serviceaccounts
with pulsarclusters
Create PulsarCluster
var jsonStr = []byte(`{
"apiVersion": "cloud.streamnative.io/v1alpha1",
"kind": "PulsarCluster",
"metadata": {
"name": "<your-user-name>",
"namespace": "<your-organization-name>"
},
"spec": {
"instanceName": "test-instance",
"location": "us-east-2",
"bookkeeper": {
"replicas": 3,
"resources": {
"cpu":"400m",
"memory":"1.6Gi"
}
},
"broker": {
"replicas": 2,
"resources": {
"cpu":"400m",
"memory":"1.6Gi"
}
},
"config": {
"custom": {
"backlogQuotaDefaultLimitBytes": "1000000000"
},
"functionEnabled": true
}
}
}`)
host := "https://api.streamnative.cloud"
pathPrefix := "/apis/cloud.streamnative.io/v1alpha1/namespaces/"
organization := "<your-organization-name>"
token := "<your-access-token>"
req, err := http.NewRequest(
"POST", fmt.Sprintf("%s%s%s%s", host, pathPrefix, organization, "/pulsarclusters"),
bytes.NewBuffer(jsonStr))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "bearer "+token)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
res, _ := io.ReadAll(resp.Body)
fmt.Println(string(res))
Spec
: Please take a look at the pulsarcluster section of snctl
Response:
{
"kind": "PulsarCluster",
"apiVersion": "cloud.streamnative.io/v1alpha1",
"metadata": {
"name": "<your-user-name>",
"namespace": "<your-organization-name>"
"uid": "d05258aa-ed83-4d61-ac61-d3f040b48204",
"resourceVersion": "3110692",
"generation": 1,
"creationTimestamp": "2024-05-11T12:21:41Z",
},
"spec": {
"instanceName": "test-instance",
"location": "us-east-2",
"poolMemberRef": {
"namespace": "streamnative",
"name": "<poolmember-name>"
},
"serviceEndpoints": [
{
"dnsName": "<cluster-dns-name>",
"type": "service"
}
],
"broker": {
"replicas": 2,
"image": "docker-proxy.streamnative.io/streamnative/pulsar-cloud:3.2.2.5",
"resources": {
"cpu": "400m",
"memory": "1717986918400m",
"heapPercentage": 0,
"directPercentage": 0
}
},
"bookkeeper": {
"replicas": 3,
"image": "docker-proxy.streamnative.io/streamnative/pulsar-cloud:3.2.2.5",
"resources": {
"cpu": "400m",
"memory": "1717986918400m",
"heapPercentage": 0,
"directPercentage": 0,
"ledgerDisk": "16Gi",
"journalDisk": "3Gi"
}
},
"config": {
"functionEnabled": true,
"protocols": {
"kafka": {}
},
"custom": {
"backlogQuotaDefaultLimitBytes": "1000000000",
"managedLedgerOffloadAutoTriggerSizeThresholdBytes": "0"
}
},
"releaseChannel": "rapid"
},
"status": {}
}
Get PulsarClustser
See get service account, just replace serviceaccounts
with pulsarclusters
Update PulsarCluster
var jsonStr = []byte(`{
"spec": {
"broker": {
"replicas": 3
}
}
}`)
host := "https://api.streamnative.cloud"
pathPrefix := "/apis/cloud.streamnative.io/v1alpha1/namespaces/"
organization := "<your-organization-name>"
token := "<your-access-token>"
req, err := http.NewRequest(
"PATCH", fmt.Sprintf("%s%s%s%s%s", host, pathPrefix, organization, "/pulsarclusters/", "<your-pulsar-cluster-name>"),
bytes.NewBuffer(jsonStr))
req.Header.Set("Content-Type", "application/merge-patch+json")
req.Header.Set("Authorization", "bearer "+token)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
res, _ := io.ReadAll(resp.Body)
fmt.Println(string(res))
Delete PulsarCluster
See delete service account, just replace serviceaccounts
with pulsarclusters
Management Apikey
List Apikey
See list service accounts, just replace serviceaccounts
with apikeys
Create Apikey
Generate public key and private key, please refer to this module to generate it. Please save the pem and private key in a secure place, it will be used to encrypt and decrypt your apikey token in the future
privateKey, err := GenerateEncryptionKey()
if err != nil {
panic(err)
}
encryptionKey, err := ExportPublicKey(privateKey)
if err != nil {
panic(err)
}
fmt.Println(encryptionKey.PEM)
fmt.Println(base64.StdEncoding.EncodeToString([]byte(ExportPrivateKey(privateKey))))
pem := `-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAoONi2FfPcpddBKZCxpXD\nBfEylQeoaOWQhFEnnMnAE2teeDgz92XUVj5xQj/ldqkIakje7RfPiGOpCYwoJxwf\nrahB+FpQTrF28POASEEJctNDUkp5DI/Dxx64zUP8gF+z8djsOB3YhSlrR6zZm4jS\nuseF5l97njzJeiZhd40of0LbvfwuEklaQwuze0KuY4Kq6xACiOH91bI8mxh6qDDP\nyUUPr5nbUyMxgyNPke0oqUDexY3RtzB7wYJksPEPFPnyRQ5QMqMoTy2ydCoULiZW\n6FvolFwjgPWgcIEkDjYo+x0x6CHAvILiNSzN47Ooq6vVnILuJGbZkhsHT6EU68NJ\nNwIDAQAB\n-----END PUBLIC KEY-----`
var jsonStr = `{
"apiVersion": "cloud.streamnative.io/v1alpha1",
"kind": "APIKey",
"metadata": {
"name": "<your-apikey-name>",
"namespace": "<your-organization-name>"
},
"spec": {
"encryptionKey": {
"pem": "%s"
},
"instanceName": "test-instance",
"serviceAccountName": "admin",
"expirationTime": "2024-06-10T13:08:42Z"
}
}`
host := "https://api.streamnative.cloud"
pathPrefix := "/apis/cloud.streamnative.io/v1alpha1/namespaces/"
organization := "<your-organization-name>"
token := "<your-access-token>"
req, err := http.NewRequest(
"POST", fmt.Sprintf("%s%s%s%s", host, pathPrefix, organization, "/apikeys"),
bytes.NewBuffer([]byte(fmt.Sprintf(jsonStr, pem))))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "bearer "+token)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
res, _ := io.ReadAll(resp.Body)
fmt.Println(string(res))
Response:
{
"kind": "APIKey",
"apiVersion": "cloud.streamnative.io/v1alpha1",
"metadata": {
"name": "<your-apikey-name>",
"namespace": "<your-organization-name>",
"uid": "502f340f-7316-4541-ba5f-37cca9aa0162",
"resourceVersion": "3110875",
"generation": 1,
"creationTimestamp": "2024-05-11T14:04:56Z",
"ownerReferences": [
{
"apiVersion": "cloud.streamnative.io/v1alpha1",
"kind": "ServiceAccount",
"name": "admin",
"uid": "9867d06e-d81b-4901-aee7-25bd2dffcf63",
"controller": true,
"blockOwnerDeletion": true
}
]
},
"spec": {
"instanceName": "test-instance",
"serviceAccountName": "admin",
"expirationTime": "2024-06-10T13:08:42Z",
"encryptionKey": {
"pem": "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAoONi2FfPcpddBKZCxpXD\nBfEylQeoaOWQhFEnnMnAE2teeDgz92XUVj5xQj/ldqkIakje7RfPiGOpCYwoJxwf\nrahB+FpQTrF28POASEEJctNDUkp5DI/Dxx64zUP8gF+z8djsOB3YhSlrR6zZm4jS\nuseF5l97njzJeiZhd40of0LbvfwuEklaQwuze0KuY4Kq6xACiOH91bI8mxh6qDDP\nyUUPr5nbUyMxgyNPke0oqUDexY3RtzB7wYJksPEPFPnyRQ5QMqMoTy2ydCoULiZW\n6FvolFwjgPWgcIEkDjYo+x0x6CHAvILiNSzN47Ooq6vVnILuJGbZkhsHT6EU68NJ\nNwIDAQAB\n-----END PUBLIC KEY-----"
}
},
"status": {}
}
Get Apikey
See get service account, just replace serviceaccounts
with apikeys
If you save the privateKey from above, you can, you can convert status.encryptedToken.jwe
to original token, ImportPrivateKey
from this module. If you lost the privateKey, you will no longer be able to get the original token
privateKey := "<your-private-key>"
key, err := base64.StdEncoding.DecodeString(privateKey)
if err != nil {
panic(err)
}
pk, err := ImportPrivateKey(string(key))
if err != nil {
panic(err)
}
token, err := jwe.Decrypt([]byte("<your-api-key-status-encryptedToken-jwe>"), jwe.WithKey(jwa.RSA_OAEP, pk))
if err != nil {
panic(err)
}
fmt.Println(string(token))
Revoke ApiKey
var jsonStr = []byte(`{
"spec": {
"revoke": true
}
}`)
host := "https://api.streamnative.cloud"
pathPrefix := "/apis/cloud.streamnative.io/v1alpha1/namespaces/"
organization := "<your-organization-name>"
token := "<your-access-token>"
req, err := http.NewRequest(
"PATCH", fmt.Sprintf("%s%s%s%s%s", host, pathPrefix, organization, "/apikeys/", "<your-apikey-name>"),
bytes.NewBuffer(jsonStr))
req.Header.Set("Content-Type", "application/merge-patch+json")
req.Header.Set("Authorization", "bearer "+token)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
res, _ := io.ReadAll(resp.Body)
fmt.Println(string(res))
Response:
{
"type": "Revoked",
"status": "True",
"lastTransitionTime": "2024-05-11T14:04:58Z",
"reason": "API Key has been revoked",
"message": ""
},
Delete Apikey
See get service account, just replace serviceaccounts
with apikeys