1. APIs

Message Rest API reference

StreamNative Console supports a RESTful interface to Pulsar clusters. You can produce and consume messages without using the native Pulsar protocol or clients.

The Message Rest API supports both non-partitioned and partitioned topics as well as basic and Avro base struct schema.

Example use cases include:

  • Send data to Pulsar from any frontend application built in any language
  • Integrate Pulsar with existing automation tools
  • Ingest Pulsar data into corporate dashboards and monitoring systems
  • Provide instant access to data in motion for data scientist notebooks
  • Ingest messages into a stream processing framework that may not support Pulsar

Download the Message Rest API Swagger

Produce

Description:

Send a single message to the topic.

Method: POST

Path: /admin/rest/topics/v1/<tenant>/<namespace>/<topic>/message

Request headers(* means required):

  • Accept:application/json * Accept the response body in JSON format
  • Content-Type: application/octet-stream * Send the request body in binary format
  • X-Pulsar-Long-Schema-Version Schema version in long type
  • X-Pulsar-Sequence-Id Message sequence ID
  • X-Pulsar-Event-Time Message event time
  • X-Pulsar-Property-<key> Message property, key-value pair
  • X-Pulsar-Partition-Key Message key

Request body:

Binary data

Response headers:

  • Content-Type: application/json * Send the response body in JSON format

Response body:

String: message ID in string format

Status code:

  • 201: Send single message success
  • 400: Wrong request params
  • 401: Unauthorized
  • 404: Topic/Schema doesn't exist
  • 500: Internal server error

Error:

JsonObject:

reason(string): error information

Consume

Description:

Consume a single message from the topic.

Method: POST

Path: /admin/rest/topics/v1/<tenant>/<namespace>/<topic>/<subscriptionName>/message

Request headers(* means required):

  • Accept:application/octet-stream * Accept the response body in JSON format
  • Content-Type: application/json * Send the request body in binary format

Request body:

  • JsonObject :
    • timeoutMillis (int): 3000

Response headers( * means required):

  • Content-Type: application/octet-stream * Send the response body in JSON format
  • X-Pulsar-Base64-Schema-Version Schema version in base64 encoded format
  • X-Pulsar-Long-Schema-Version Schema version in long type
  • X-Pulsar-Message-Id * Message ID in base64 encoded format
  • X-Pulsar-Message-String-Id * Message ID in string format
  • X-Pulsar-Sequence-Id Message sequence ID
  • X-Pulsar-Event-Time Message event time
  • X-Pulsar-Property-<key> Message property, key-value pair
  • X-Pulsar-Partition-Key Message key

Response body:

Binary data

Status code:

  • 200: Get single message success
  • 204: Get null message by timeout
  • 400: Wrong request params
  • 401: Unauthorized
  • 404: Topic doesn't exist
  • 500: Internal server error

Error:

  • JsonObject:
    • reason(string): error information

Acknowledge

Description:

Acknowledge a single message from the topic.

Method: PUT

Path: /admin/rest/topics/v1/<tenant>/<namespace>/<topic>/<subscriptionName>/message

Request headers(* means required):

  • Accept: application/json * Accept the response body in JSON format
  • Content-Type: application/json * Send the request body in binary format

Request body:

String: message ID in base64 encoded format

Response body:

no content

Status code:

  • 204: Acknowledge success
  • 400: Wrong request params
  • 401: Unauthorized
  • 404: Topic doesn't exist
  • 500: Internal server error

Error:

JsonObject:

reason(string): error information

Working with Schemas

pulsar-rest supports all schemas and is compatible with the Pulsar client in other languages. If you want to produce/consume a topic with schemas through pulsar-rest, see the following example.

Pulsar schema uses the Pulsar Admin API to create. see manage schemas to learn the schema API or see create schema for topics to learn topic schema management on the StreamNative Console.

Binary Data

Because the HTTP client needs to send binary data, use the cURL tool to encode the string Hi Pulsar into UTF-8 format bytes. If you need to send bytes in another format, specify the appropriate file. For more information see the cURL documentation.

Avro Base Struct Schema

Pulsar uses Avro Specification to declare the schema definition for AvroBaseStructSchema, which supports AvroSchema, JsonSchema, and ProtobufSchema.

Multi-version schema

For basic schema types like String and JSON, pulsar-rest will automatically take the schema of the topic and put it into the metadata of the message. The client in other languages will serialize/deserialize the data based on this metadata.

String Schema

Create topic schema

curl -X POST https://<endpoint>:<port>/admin/v2/schemas/public/default/pulsar-rest-string/schema \
	--header "Authorization: Bearer <Access Token>" \
 	--header "Accept: application/json" \
 	--header "Content-Type: application/json" \
 	--data-raw '{"schema":"","type":"STRING","properties":{}}'

Expected output:

{"version":{"version":0}}

Create a subscription on the topic

curl -X PUT https://<endpoint>:<port>/admin/v2/persistent/public/default/pulsar-rest-string/subscription/rest-sub \
	--header 'Authorization: Bearer <Access Token>' \
 	--header 'Accept: application/json' \
 	--header 'Content-Type: application/json'

Expected output:

# No content

Produce messages with String schema

curl -X POST https://<endpoint>:<port>/admin/rest/topics/v1/persistent/public/default/pulsar-rest-string/message \
	-H "Authorization: Bearer <Access Token>" \
 	-H "Accept: application/json" \
 	-H "Content-Type: application/octet-stream" \
 	--data-binary 'Hi, Pulsar'

Expected output:

#string format message id
10:0:-1:0

Consume messages with String schema

curl -X POST https://<endpoint>:<port>/admin/rest/topics/v1/persistent/public/default/pulsar-rest-string/rest-sub/message \
	--header 'Authorization: Bearer <Access Token>' \
 	--header 'Accept: application/octet-stream' \
 	--data-raw '{"timeoutMillis":3000}' \
 	--header 'Content-Type: application/json' -v

Expected output:

# Headers
X-Pulsar-Message-Id: CAoQACAAMAE=
X-Pulsar-Message-String-Id: 10:0:-1:0
X-Pulsar-Sequence-Id: 0

# Body
Hi, Pulsar

Acknowledge messages

curl -X PUT https://<endpoint>:<port>/admin/rest/topics/v1/persistent/public/default/pulsar-rest-string/rest-sub/message \
	--header 'Authorization: Bearer <Access Token>' \
 	--header 'Accept: application/json' \
 	--header 'Content-Type: application/json' \
  --data-raw 'CAoQACAAMAE'

Expected output:

# No content

JSON Schema

Create topic schema

curl -X POST https://<endpoint>:<port>/admin/v2/schemas/public/default/pulsar-rest-json/schema \
	--header "Authorization: Bearer <Access Token>" \
 	--header "Accept: application/json" \
 	--header "Content-Type: application/json" \
 	--data-raw '{"schema":"{\"type\":\"record\",\"name\":\"Student\",\"fields\":[{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"name\",\"type\":[\"null\",\"string\"]}]}","type":"JSON","properties":{}}'

Expected output:

{"version":{"version":0}}

Create a subscription on the topic

curl -X PUT https://<endpoint>:<port>/admin/v2/persistent/public/default/pulsar-rest-json/subscription/rest-sub \
	--header 'Authorization: Bearer <Access Token>' \
 	--header 'Accept: application/json' \
 	--header 'Content-Type: application/json'

Expected output:

# No content

Produce messages with JSON schema

curl -X POST https://<endpoint>:<port>/admin/rest/topics/v1/persistent/public/default/pulsar-rest-json/message \
	--header 'Authorization: Bearer <Access Token>' \
  --header 'Accept: application/json' \
  --header 'Content-Type: application/octet-stream' \
 	--data-binary '{"name":"f8a42816-6183-4e3e-9f5d-51b36c17de5c","age":79}'

Expected output:

#string format message id
10:0:-1:0

Consume messages with JSON schema

curl -X POST https://<endpoint>:<port>/admin/rest/topics/v1/persistent/public/default/pulsar-rest-json/rest-sub/message \
	--header 'Authorization: Bearer <Access Token>' \
  --header 'Accept: application/octet-stream' \
  --data-raw '{"timeoutMillis":3000}' \
  --header 'Content-Type: application/json' -v

Expected output:

# Headers
< X-Pulsar-Base64-Schema-Version: AAAAAAAAAAA=
< X-Pulsar-Long-Schema-Version: 0
< X-Pulsar-Message-Id: CIUCEAAgADAB
< X-Pulsar-Message-String-Id: 261:0:-1:0
< X-Pulsar-Sequence-Id: 0

# Body
{"name":"f8a42816-6183-4e3e-9f5d-51b36c17de5c","age":79}

Acknowledge messages

curl -X PUT https://<endpoint>:<port>/admin/rest/topics/v1/persistent/public/default/pulsar-rest-json/rest-sub/message \
	--header "Authorization: Bearer <Access Token>" \
	--header "Accept: application/json" \
	--header "Content-Type: application/json" \
	--data-raw 'CIUCEAAgADAB'

Expected output:

# No content

Avro Schema

Create topic schema

curl -X POST https://<endpoint>:<port>/admin/v2/schemas/public/default/pulsar-rest-avro/schema \
	--header "Authorization: Bearer <Access Token>" \
 	--header "Accept: application/json" \
 	--header "Content-Type: application/json" \
 	--data-raw '{"schema":"{\"type\":\"record\",\"name\":\"Student\",\"fields\":[{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"name\",\"type\":[\"null\",\"string\"]}]}","type":"AVRO","properties":{}}'

Expected output:

{"version":{"version":0}}

Create a subscription on the topic

curl -X PUT https://<endpoint>:<port>/admin/v2/persistent/public/default/pulsar-rest-avro/subscription/rest-sub \
	--header 'Authorization: Bearer <Access Token>' \
 	--header 'Accept: application/json' \
 	--header 'Content-Type: application/json'

Expected output:

# No content

Prepare Avro binary data

Create ~/test-avro-student.avsc

{
  "type": "record",
  "name": "Student",
  "fields": [
    {
      "name": "age",
      "type": "int"
    },
    {
      "name": "name",
      "type": ["null", "string"]
    }
  ]
}

Create ~/test-avro-student.json

{ "age": 8, "name": { "string": "1ffc0cee-a3d9-42f2-b96d-efa09f54e782" } }

Renders a JSON-encoded Avro datum as binary.

avro-tools jsontofrag --schema-file ~/test-avro-student.avsc   ~/test-avro-student.json > ~/test-avro-student.bin

Note

If you dont have avro-tools installed, see the Avro product page.

Produce messages with Avro schema

When you want to send a message to a topic with a multi-version schema, put the version of the schema in the header.

X-Pulsar-Long-Schema-Version Schema version in long type

cat ~/test-avro-student.bin | \
curl -X POST https://<endpoint>:<port>/admin/rest/topics/v1/persistent/public/default/pulsar-rest-avro/message \
	--header 'Authorization: Bearer <Access Token>' \
  --header 'Accept: application/json' \
  --header 'Content-Type: application/octet-stream' \
	--header 'X-Pulsar-Long-Schema-Version: 0' \
 	--data-binary @-

Expected output:

#string format message id
10:0:-1:0

Consume messages with Avro schema

curl -X POST https://<endpoint>:<port>/admin/rest/topics/v1/persistent/public/default/pulsar-rest-avro/rest-sub/message \
	--header 'Authorization: Bearer <Access Token>' \
  --header 'Accept: application/octet-stream' \
  --header 'Content-Type: application/json' \
	--dump-header ~/test-avro-student-recv-header.txt \
 	--data-raw '{"timeoutMillis":3000}' \
  --output ~/test-avro-student-recv.bin

Verify the consumed message

avro-tools fragtojson --schema-file ~/test-avro-student.avsc ~/test-avro-student-recv.bin

Expected output:

{
  "age" : 8,
  "name" : {
    "string" : "1ffc0cee-a3d9-42f2-b96d-efa09f54e782"
  }
}

Acknowledge messages

curl -X PUT https://<endpoint>:<port>/admin/rest/topics/v1/persistent/public/default/pulsar-rest-avro/rest-sub/message \
	--header "Authorization: Bearer <Access Token>" \
	--header "Accept: application/json" \
	--header "Content-Type: application/json" \
	--data-raw 'CIUCEAAgADAB'

Expected output:

# no content

Protobuf Schema

Create topic schema

curl -X POST https://<endpoint>:<port>/admin/v2/schemas/public/default/pulsar-rest-proto/schema \
	--header "Authorization: Bearer <Access Token>" \
 	--header "Accept: application/json" \
 	--header "Content-Type: application/json" \
 	--data-raw '{"schema":"{\"type\":\"record\",\"name\":\"TestMessage\",\"fields\":[{\"name\":\"stringField\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"default\":\"\"},{\"name\":\"doubleField\",\"type\":\"double\",\"default\":0},{\"name\":\"intField\",\"type\":\"int\",\"default\":0}]}","type":"PROTOBUF","properties":{}}'

Expected output:

{"version":{"version":0}}

Create a subscription on the topic

curl -X PUT https://<endpoint>:<port>/admin/v2/persistent/public/default/pulsar-rest-proto/subscription/rest-sub \
	--header 'Authorization: Bearer <Access Token>' \
 	--header 'Accept: application/json' \
 	--header 'Content-Type: application/json'

Expected output:

# No content

Prepare protobuf data

Create ~/test-proto-test.proto

syntax = "proto3";

message TestMessage {
    string stringField = 1;
    double doubleField = 2;
    int32 intField = 3;
}

Create ~/test-proto-message.txt

stringField:"string filed"
doubleField: 3.14
intField: 8

Read a text-format message of the given type from standard input and write it in binary to standard output.

cat ~/test-proto-message.txt |  protoc --proto_path=<your user directory, ~> --encode='TestMessage' ~/test-proto-test.proto > ~/test-proto-message.bin

Produce messages with Protobuf schema

When you want to send a message to a topic with a multi-version schema, put the version of the schema in the header.

X-Pulsar-Long-Schema-Version Schema version in long type

cat ~/test-proto-message.bin | \
curl -X POST https://<endpoint>:<port>/admin/rest/topics/v1/persistent/public/default/pulsar-rest-proto/message \
	--header 'Authorization: Bearer <Access Token>' \
  --header 'Accept: application/json' \
  --header 'Content-Type: application/octet-stream' \
	--header 'X-Pulsar-Long-Schema-Version: 0' \
 	--data-binary '@-'

Expected output:

#string format message id
10:0:-1:0

Consume messages with Protobuf schema

curl -X POST https://<endpoint>:<port>/admin/rest/topics/v1/persistent/public/default/pulsar-rest-proto/rest-sub/message \
	--header 'Authorization: Bearer <Access Token>' \
  --header 'Accept: application/octet-stream' \
  --header 'Content-Type: application/json' \
	--dump-header ~/test-proto-message-recv-header.txt \
 	--data-raw '{"timeoutMillis":3000}' \
  --output ~/test-proto-message-recv.bin

Verify the consumed data

cat ~/test-proto-message-recv.bin |  protoc --proto_path=<your user directory> --decode='TestMessage' ~/test-proto-test.proto > ~/test-proto-message-recv.txt && cat ~/test-proto-message-recv.txt

Expected output:

stringField: "string filed"
doubleField: 3.14
intField: 8

Acknowledge messages

curl -X PUT https://<endpoint>:<port>/admin/rest/topics/v1/persistent/public/default/pulsar-rest-ptoto/rest-sub/message \
	--header "Authorization: Bearer <Access Token>" \
	--header "Accept: application/json" \
	--header "Content-Type: application/json" \
	--data-raw 'CIUCEAAgADAB'

Expected output:

# No content
Previous
Stop Charges for Your Pulsar Cluster