Kafka Schema Registry provides an interface for storing and managing schemas. Producers and consumers can register the schemas within the registry and retrieve them when necessary. Schemas are versioned, and the registry supports configurable compatibility modes between different schema versions. When a producer or consumer attempts to register a new schema version, the registry performs a compatibility check and returns an error if an incompatible change is detected. This mechanism ensures consistency and compatibility among all producers and consumers when schema changes occur.
Access Schema Registry in Kafka clients
To access the Kafka Schema Registry, you must configure how to authenticate. There are two ways to configure authentication to the Schema Registry:
- OAuth2 authentication: only available for Kafka Java client
- Basic authentication: available for all Kafka clients
OAuth2 authentication
First, import the following dependencies:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1</version>
</dependency>
<dependency>
<groupId>io.streamnative.pulsar.handlers</groupId>
<artifactId>oauth-client</artifactId>
<version>3.2.2.6</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.5.0</version>
</dependency>
Minimum required versions:
kafka-clients
: 3.4.0
oauth-client
: 3.1.0.4
kafka-avro-serializer
: 7.5.0
Before 3.2.2.6, oauth-client
requires Java 17 or higher.
Then, in addition to the existing properties, you need to configure more properties like:
// props is the Properties object that has already configures the OAuth2 authentication
// See https://docs.streamnative.io/docs/cloud-connect-kafka-java for the necessary configs
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(KafkaAvroSerializerConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS, "io.streamnative.pulsar.handlers.kop.security.oauth.schema.OauthCredentialProvider");
props.put(KafkaAvroSerializerConfig.BEARER_AUTH_CREDENTIALS_SOURCE, "CUSTOM");
Basic authentication
Unlike the OAuth2 authentication, Basic authentication does not require the oauth-client
dependency or kafka-clients
>= 3.4.0.
The username can be any non-empty string. The password should be the the token (the jwtToken
variable in the code below) of your account.
// props is the Properties object that has already configures the Token authentication
// See https://docs.streamnative.io/docs/cloud-connect-kafka-java for the necessary configs
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(KafkaAvroSerializerConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
props.put(KafkaAvroSerializerConfig.USER_INFO_CONFIG, String.format("%s:%s", "any-user", jwtToken));
Configurable compatibility modes
When using serialization and deserialization formats such as Avro, JSON Schema, and Protobuf, we need to remember that there are different configurable compatibility modes. In the Schema Registry, schema compatibility is managed by versioning each individual schema. The compatibility type determines how the Schema Registry compares the new schema with previous versions of a schema, for a given subject. Upon its initial creation within a subject, a schema is assigned a unique identifier and a version number, starting at version 1. If the schema is updated and successfully passes the compatibility checks, it is given a new unique identifier and an incremented version number, i.e., version 2.
| | | |
---|
Compatibility modes | AVRO | JSON | Protobuf |
NONE | YES | YES | YES |
BACKWARD | YES | YES | YES |
BACKWARD_TRANSITIVE | YES | YES | YES |
FORWARD | YES | YES | - |
FORWARD_TRANSITIVE | YES | YES | - |
FULL | YES | YES | - |
FULL_TRANSITIVE | YES | YES | - |
REST API
Kafka schema registry provides REST API for managing schemas. The following table lists the supported methods and parameters, more details about the API, please refer to Schema Registry API.
API | Method | Support Parameters |
---|
/schemas/ids/{int: id} | GET | |
/schemas/ids/{int: id}/schema | GET | |
/schemas/types | GET | |
/schemas/ids/{int: id}/versions | GET | |
/schemas/ids/{int: id}/subjects | GET | |
/subjects | GET | deleted (boolean), deletedOnly (boolean) |
/subjects/(string: subject) | POST | normalize (boolean), deleted (boolean) |
/subjects/(string: subject) | DELETE | permanent (boolean) |
/subjects/(string: subject)/versions | POST | normalize (boolean) |
/subjects/(string: subject)/versions | GET | deleted (boolean), deletedOnly (boolean) |
/subjects/(string: subject)/versions/(versionId: version) | GET | deleted (boolean) |
/subjects/(string: subject)/versions/(versionId: version) | DELETE | permanent (boolean) |
/subjects/(string: subject)/versions/(versionId: version)/schema | GET | |
/subjects/(string: subject)/versions/(versionId: version)/referencedby | GET | |
/compatibility/subjects/(string: subject)/versions/latest | GET | |
/config/(string: subject) | PUT | only support set compatibility |
/config/(string: subject) | GET | only support get compatibility |
/mode | GET | only support the mode READWRITE |
Use Schema Registry on Console
-
On the left navigation pane of StreamNative Console, in the Admin section, click Kafka Clients, and choose the Java client, then enable the Kafka Schema Registry by following switch.
-
Please make sure you granted permission(produce) for topic
public/__kafka_schemaregistry/__schema-registry
in the following page.
We need to mention that Now the Kafka Schemas can’t work with Pulsar schemas. This is the mission of the unified schema registry.
Enable Broker-side Schemas IDs Validation
Broker-side Schema ID Validation allows broker to validate the schema ID of the messages they send against the schema ID registered in the Schema Registry. This feature helps ensure that producers are sending messages with the correct schema, reducing the risk of data inconsistencies and errors. For more information, see Validate Broker-side Schemas IDs.
Limitations
Schema validation feature does not reject tombstone records (messages with null value) even if there is no schema ID associated with the record. This is to ensure that delete operations can still be performed on compacted topics without being blocked by schema validation.
Enable Schema ID Validation on a Topic
Create a topic with Schema ID Validation enabled you can set the topic property kop.kafka.key.schema.validation=true
and kop.kafka.value.schema.validation=true
when creating the topic. For example, to create a topic named my-topic-sv
with value schema validation, run the following command:
bin/pulsar-admin \
--admin-url <Pulsar Service Http Url> \
--auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
--auth-params token:<API Key> \
topics create-partitioned-topic persistent://public/default/my-topic-sv -p 4 -m kop.kafka.value.schema.validation=true
With this property set, if the message value does not have a schema ID or has a schema ID that does not match the schema registered in the Schema Registry, the broker will reject the message and return an error to the producer. And the message will be discarded.
Change the subject name strategy
By default, the subject name strategy is set to TopicNameStrategy
, which means that the subject name is derived from the topic name. If you want to change the subject name strategy, you can set the topic property kop.kafka.schema.subject.name.strategy
to one of the following values:
TopicNameStrategy
: The subject name is derived from the topic name. For example, for a topic named my-topic
, the subject name will be my-topic-value
for value schema and my-topic-key
for key schema.
RecordNameStrategy
: The subject name is derived from the fully qualified name of the record
TopicRecordNameStrategy
: The subject name is derived from the topic name and the fully qualified name of the record
For example, to create a topic named my-topic-sv
with value schema validation and RecordNameStrategy
, run the following command:
bin/pulsar-admin \
--admin-url <Pulsar Service Http Url> \
--auth-plugin org.apache.pulsar.client.impl.auth.AuthenticationToken \
--auth-params token:<API Key> \
topics create-partitioned-topic persistent://public/default/my-other-topic-sv -p 4 -m kop.kafka.value.schema.validation=true -m kop.kafka.value.subject.name.strategy=io.confluent.kafka.serializers.subject.RecordNameStrategy