- APIs
Message Rest API reference
StreamNative Console supports a RESTful interface to Pulsar clusters. You can produce and consume messages without using the native Pulsar protocol or clients.
The Message Rest API supports both non-partitioned and partitioned topics as well as basic and Avro base struct schema.
Example use cases include:
- Send data to Pulsar from any frontend application built in any language
- Integrate Pulsar with existing automation tools
- Ingest Pulsar data into corporate dashboards and monitoring systems
- Provide instant access to data in motion for data scientist notebooks
- Ingest messages into a stream processing framework that may not support Pulsar
Download the Message Rest API Swagger
Produce
Description:
Send a single message to the topic.
Method: POST
Path: /admin/rest/topics/v1/<tenant>/<namespace>/<topic>/message
Request headers(*
means required):
Accept:application/json *
Accept the response body in JSON formatContent-Type: application/octet-stream *
Send the request body in binary formatX-Pulsar-Long-Schema-Version
Schema version in long typeX-Pulsar-Sequence-Id
Message sequence IDX-Pulsar-Event-Time
Message event timeX-Pulsar-Property-<key>
Message property, key-value pairX-Pulsar-Partition-Key
Message key
Request body:
Binary data
Response headers:
Content-Type: application/json *
Send the response body in JSON format
Response body:
String
: message ID in string format
Status code:
201
: Send single message success400
: Wrong request params401
: Unauthorized404
: Topic/Schema doesn't exist500
: Internal server error
Error:
JsonObject
:
reason(string
): error information
Consume
Description:
Consume a single message from the topic.
Method: POST
Path: /admin/rest/topics/v1/<tenant>/<namespace>/<topic>/<subscriptionName>/message
Request headers(*
means required):
Accept:application/octet-stream *
Accept the response body in JSON formatContent-Type: application/json *
Send the request body in binary format
Request body:
JsonObject
:timeoutMillis (int)
: 3000
Response headers( *
means required):
Content-Type: application/octet-stream *
Send the response body in JSON formatX-Pulsar-Base64-Schema-Version
Schema version in base64 encoded formatX-Pulsar-Long-Schema-Version
Schema version in long typeX-Pulsar-Message-Id *
Message ID in base64 encoded formatX-Pulsar-Message-String-Id *
Message ID in string formatX-Pulsar-Sequence-Id
Message sequence IDX-Pulsar-Event-Time
Message event timeX-Pulsar-Property-<key>
Message property, key-value pairX-Pulsar-Partition-Key
Message key
Response body:
Binary data
Status code:
200
: Get single message success204
: Get null message by timeout400
: Wrong request params401
: Unauthorized404
: Topic doesn't exist500
: Internal server error
Error:
JsonObject
:- reason(
string
): error information
- reason(
Acknowledge
Description:
Acknowledge a single message from the topic.
Method: PUT
Path: /admin/rest/topics/v1/<tenant>/<namespace>/<topic>/<subscriptionName>/message
Request headers(*
means required):
Accept: application/json *
Accept the response body in JSON formatContent-Type: application/json *
Send the request body in binary format
Request body:
String
: message ID in base64 encoded format
Response body:
no content
Status code:
204
: Acknowledge success400
: Wrong request params401
: Unauthorized404
: Topic doesn't exist500
: Internal server error
Error:
JsonObject
:
reason(string
): error information
Working with Schemas
pulsar-rest supports all schemas and is compatible with the Pulsar client in other languages. If you want to produce/consume a topic with schemas through pulsar-rest, see the following example.
Pulsar schema uses the Pulsar Admin API to create. see manage schemas to learn the schema API or see create schema for topics to learn topic schema management on the StreamNative Console.
Binary Data
Because the HTTP client needs to send binary data, use the cURL tool to encode the string Hi Pulsar
into UTF-8 format bytes. If you need to send bytes in another format, specify the appropriate file. For more information see the cURL documentation.
Avro Base Struct Schema
Pulsar uses Avro Specification to declare the schema definition for AvroBaseStructSchema
, which supports AvroSchema
, JsonSchema
, and ProtobufSchema
.
Multi-version schema
For basic schema types like String and JSON, pulsar-rest will automatically take the schema of the topic and put it into the metadata of the message. The client in other languages will serialize/deserialize the data based on this metadata.
String Schema
Create topic schema
curl -X POST https://<endpoint>:<port>/admin/v2/schemas/public/default/pulsar-rest-string/schema \
--header "Authorization: Bearer <Access Token>" \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--data-raw '{"schema":"","type":"STRING","properties":{}}'
Expected output:
{"version":{"version":0}}
Create a subscription on the topic
curl -X PUT https://<endpoint>:<port>/admin/v2/persistent/public/default/pulsar-rest-string/subscription/rest-sub \
--header 'Authorization: Bearer <Access Token>' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json'
Expected output:
# No content
Produce messages with String schema
curl -X POST https://<endpoint>:<port>/admin/rest/topics/v1/persistent/public/default/pulsar-rest-string/message \
-H "Authorization: Bearer <Access Token>" \
-H "Accept: application/json" \
-H "Content-Type: application/octet-stream" \
--data-binary 'Hi, Pulsar'
Expected output:
#string format message id
10:0:-1:0
Consume messages with String schema
curl -X POST https://<endpoint>:<port>/admin/rest/topics/v1/persistent/public/default/pulsar-rest-string/rest-sub/message \
--header 'Authorization: Bearer <Access Token>' \
--header 'Accept: application/octet-stream' \
--data-raw '{"timeoutMillis":3000}' \
--header 'Content-Type: application/json' -v
Expected output:
# Headers
X-Pulsar-Message-Id: CAoQACAAMAE=
X-Pulsar-Message-String-Id: 10:0:-1:0
X-Pulsar-Sequence-Id: 0
# Body
Hi, Pulsar
Acknowledge messages
curl -X PUT https://<endpoint>:<port>/admin/rest/topics/v1/persistent/public/default/pulsar-rest-string/rest-sub/message \
--header 'Authorization: Bearer <Access Token>' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json' \
--data-raw 'CAoQACAAMAE'
Expected output:
# No content
Negative acknowledge messages
curl -X PUT https://<endpoint>:<port>/admin/rest/topics/v2/persistent/public/default/pulsar-rest-string/rest-sub/message \
--header 'Authorization: Bearer <Access Token>' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json' \
--data-raw '{"encodedMessageId":"CAoQACAAMAE=","negativeAck":"true"}'
Expected output:
# No content
JSON Schema
Create topic schema
curl -X POST https://<endpoint>:<port>/admin/v2/schemas/public/default/pulsar-rest-json/schema \
--header "Authorization: Bearer <Access Token>" \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--data-raw '{"schema":"{\"type\":\"record\",\"name\":\"Student\",\"fields\":[{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"name\",\"type\":[\"null\",\"string\"]}]}","type":"JSON","properties":{}}'
Expected output:
{"version":{"version":0}}
Create a subscription on the topic
curl -X PUT https://<endpoint>:<port>/admin/v2/persistent/public/default/pulsar-rest-json/subscription/rest-sub \
--header 'Authorization: Bearer <Access Token>' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json'
Expected output:
# No content
Produce messages with JSON schema
curl -X POST https://<endpoint>:<port>/admin/rest/topics/v1/persistent/public/default/pulsar-rest-json/message \
--header 'Authorization: Bearer <Access Token>' \
--header 'Accept: application/json' \
--header 'Content-Type: application/octet-stream' \
--data-binary '{"name":"f8a42816-6183-4e3e-9f5d-51b36c17de5c","age":79}'
Expected output:
#string format message id
10:0:-1:0
Consume messages with JSON schema
curl -X POST https://<endpoint>:<port>/admin/rest/topics/v1/persistent/public/default/pulsar-rest-json/rest-sub/message \
--header 'Authorization: Bearer <Access Token>' \
--header 'Accept: application/octet-stream' \
--data-raw '{"timeoutMillis":3000}' \
--header 'Content-Type: application/json' -v
Expected output:
# Headers
< X-Pulsar-Base64-Schema-Version: AAAAAAAAAAA=
< X-Pulsar-Long-Schema-Version: 0
< X-Pulsar-Message-Id: CIUCEAAgADAB
< X-Pulsar-Message-String-Id: 261:0:-1:0
< X-Pulsar-Sequence-Id: 0
# Body
{"name":"f8a42816-6183-4e3e-9f5d-51b36c17de5c","age":79}
Acknowledge messages
curl -X PUT https://<endpoint>:<port>/admin/rest/topics/v1/persistent/public/default/pulsar-rest-json/rest-sub/message \
--header "Authorization: Bearer <Access Token>" \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--data-raw 'CIUCEAAgADAB'
Expected output:
# No content
Avro Schema
Create topic schema
curl -X POST https://<endpoint>:<port>/admin/v2/schemas/public/default/pulsar-rest-avro/schema \
--header "Authorization: Bearer <Access Token>" \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--data-raw '{"schema":"{\"type\":\"record\",\"name\":\"Student\",\"fields\":[{\"name\":\"age\",\"type\":\"int\"},{\"name\":\"name\",\"type\":[\"null\",\"string\"]}]}","type":"AVRO","properties":{}}'
Expected output:
{"version":{"version":0}}
Create a subscription on the topic
curl -X PUT https://<endpoint>:<port>/admin/v2/persistent/public/default/pulsar-rest-avro/subscription/rest-sub \
--header 'Authorization: Bearer <Access Token>' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json'
Expected output:
# No content
Prepare Avro binary data
Create ~/test-avro-student.avsc
{
"type": "record",
"name": "Student",
"fields": [
{
"name": "age",
"type": "int"
},
{
"name": "name",
"type": ["null", "string"]
}
]
}
Create ~/test-avro-student.json
{ "age": 8, "name": { "string": "1ffc0cee-a3d9-42f2-b96d-efa09f54e782" } }
Renders a JSON-encoded Avro datum as binary.
avro-tools jsontofrag --schema-file ~/test-avro-student.avsc ~/test-avro-student.json > ~/test-avro-student.bin
Note
If you dont have avro-tools
installed, see the Avro product page.
Produce messages with Avro schema
When you want to send a message to a topic with a multi-version schema, put the version of the schema in the header.
X-Pulsar-Long-Schema-Version
Schema version in long type
cat ~/test-avro-student.bin | \
curl -X POST https://<endpoint>:<port>/admin/rest/topics/v1/persistent/public/default/pulsar-rest-avro/message \
--header 'Authorization: Bearer <Access Token>' \
--header 'Accept: application/json' \
--header 'Content-Type: application/octet-stream' \
--header 'X-Pulsar-Long-Schema-Version: 0' \
--data-binary @-
Expected output:
#string format message id
10:0:-1:0
Consume messages with Avro schema
curl -X POST https://<endpoint>:<port>/admin/rest/topics/v1/persistent/public/default/pulsar-rest-avro/rest-sub/message \
--header 'Authorization: Bearer <Access Token>' \
--header 'Accept: application/octet-stream' \
--header 'Content-Type: application/json' \
--dump-header ~/test-avro-student-recv-header.txt \
--data-raw '{"timeoutMillis":3000}' \
--output ~/test-avro-student-recv.bin
Verify the consumed message
avro-tools fragtojson --schema-file ~/test-avro-student.avsc ~/test-avro-student-recv.bin
Expected output:
{
"age" : 8,
"name" : {
"string" : "1ffc0cee-a3d9-42f2-b96d-efa09f54e782"
}
}
Acknowledge messages
curl -X PUT https://<endpoint>:<port>/admin/rest/topics/v1/persistent/public/default/pulsar-rest-avro/rest-sub/message \
--header "Authorization: Bearer <Access Token>" \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--data-raw 'CIUCEAAgADAB'
Expected output:
# no content
Protobuf Schema
Create topic schema
curl -X POST https://<endpoint>:<port>/admin/v2/schemas/public/default/pulsar-rest-proto/schema \
--header "Authorization: Bearer <Access Token>" \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--data-raw '{"schema":"{\"type\":\"record\",\"name\":\"TestMessage\",\"fields\":[{\"name\":\"stringField\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"default\":\"\"},{\"name\":\"doubleField\",\"type\":\"double\",\"default\":0},{\"name\":\"intField\",\"type\":\"int\",\"default\":0}]}","type":"PROTOBUF","properties":{}}'
Expected output:
{"version":{"version":0}}
Create a subscription on the topic
curl -X PUT https://<endpoint>:<port>/admin/v2/persistent/public/default/pulsar-rest-proto/subscription/rest-sub \
--header 'Authorization: Bearer <Access Token>' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json'
Expected output:
# No content
Prepare protobuf data
Create ~/test-proto-test.proto
syntax = "proto3";
message TestMessage {
string stringField = 1;
double doubleField = 2;
int32 intField = 3;
}
Create ~/test-proto-message.txt
stringField:"string filed"
doubleField: 3.14
intField: 8
Read a text-format message of the given type from standard input and write it in binary to standard output.
cat ~/test-proto-message.txt | protoc --proto_path=<your user directory, ~> --encode='TestMessage' ~/test-proto-test.proto > ~/test-proto-message.bin
Produce messages with Protobuf schema
When you want to send a message to a topic with a multi-version schema, put the version of the schema in the header.
X-Pulsar-Long-Schema-Version
Schema version in long type
cat ~/test-proto-message.bin | \
curl -X POST https://<endpoint>:<port>/admin/rest/topics/v1/persistent/public/default/pulsar-rest-proto/message \
--header 'Authorization: Bearer <Access Token>' \
--header 'Accept: application/json' \
--header 'Content-Type: application/octet-stream' \
--header 'X-Pulsar-Long-Schema-Version: 0' \
--data-binary '@-'
Expected output:
#string format message id
10:0:-1:0
Consume messages with Protobuf schema
curl -X POST https://<endpoint>:<port>/admin/rest/topics/v1/persistent/public/default/pulsar-rest-proto/rest-sub/message \
--header 'Authorization: Bearer <Access Token>' \
--header 'Accept: application/octet-stream' \
--header 'Content-Type: application/json' \
--dump-header ~/test-proto-message-recv-header.txt \
--data-raw '{"timeoutMillis":3000}' \
--output ~/test-proto-message-recv.bin
Verify the consumed data
cat ~/test-proto-message-recv.bin | protoc --proto_path=<your user directory> --decode='TestMessage' ~/test-proto-test.proto > ~/test-proto-message-recv.txt && cat ~/test-proto-message-recv.txt
Expected output:
stringField: "string filed"
doubleField: 3.14
intField: 8
Acknowledge messages
curl -X PUT https://<endpoint>:<port>/admin/rest/topics/v1/persistent/public/default/pulsar-rest-ptoto/rest-sub/message \
--header "Authorization: Bearer <Access Token>" \
--header "Accept: application/json" \
--header "Content-Type: application/json" \
--data-raw 'CIUCEAAgADAB'
Expected output:
# No content