- Govern Data Streams
- Manage Kafka Schemas
Kafka Schema Registry
Kafka Schema Registry provides an interface for storing and managing schemas. Producers and consumers can register the schemas within the registry and retrieve them when necessary. Schemas are versioned, and the registry supports configurable compatibility modes between different schema versions. When a producer or consumer attempts to register a new schema version, the registry performs a compatibility check and returns an error if an incompatible change is detected. This mechanism ensures consistency and compatibility among all producers and consumers when schema changes occur.
Access Schema Registry in Kafka clients
To access the Kafka Schema Registry, you must configure how to authenticate. There are two ways to configure authentication to the Schema Registry:
- OAuth2 authentication: only available for Kafka Java client
- Basic authentication: available for all Kafka clients
OAuth2 authentication
First, import the following dependencies:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1</version>
</dependency>
<dependency>
<groupId>io.streamnative.pulsar.handlers</groupId>
<artifactId>oauth-client</artifactId>
<version>3.2.2.6</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.5.0</version>
</dependency>
Minimum required versions:
kafka-clients
: 3.4.0oauth-client
: 3.1.0.4kafka-avro-serializer
: 7.5.0
Note
Before 3.2.2.6, oauth-client
requires Java 17 or higher.
Then, in addition to the existing properties, you need to configure more properties like:
// props is the Properties object that has already configures the OAuth2 authentication
// See https://docs.streamnative.io/docs/cloud-connect-kafka-java for the necessary configs
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(KafkaAvroSerializerConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS, "io.streamnative.pulsar.handlers.kop.security.oauth.schema.OauthCredentialProvider");
props.put(KafkaAvroSerializerConfig.BEARER_AUTH_CREDENTIALS_SOURCE, "CUSTOM");
Basic authentication
Unlike the OAuth2 authentication, Basic authentication does not require the oauth-client
dependency or kafka-clients
>= 3.4.0.
The username can be any non-empty string. The password should be the the token (the jwtToken
variable in the code below) of your account.
// props is the Properties object that has already configures the Token authentication
// See https://docs.streamnative.io/docs/cloud-connect-kafka-java for the necessary configs
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(KafkaAvroSerializerConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
props.put(KafkaAvroSerializerConfig.USER_INFO_CONFIG, String.format("%s:%s", "any-user", jwtToken));
Configurable compatibility modes
When using serialization and deserialization formats such as Avro, JSON Schema, and Protobuf, we need to remember that there are different configurable compatibility modes. In the Schema Registry, schema compatibility is managed by versioning each individual schema. The compatibility type determines how the Schema Registry compares the new schema with previous versions of a schema, for a given subject. Upon its initial creation within a subject, a schema is assigned a unique identifier and a version number, starting at version 1. If the schema is updated and successfully passes the compatibility checks, it is given a new unique identifier and an incremented version number, i.e., version 2.
Compatibility modes | AVRO | JSON | Protobuf |
NONE | YES | YES | YES |
BACKWARD | YES | YES | YES |
BACKWARD_TRANSITIVE | YES | YES | YES |
FORWARD | YES | YES | - |
FORWARD_TRANSITIVE | YES | YES | - |
FULL | YES | YES | - |
FULL_TRANSITIVE | YES | YES | - |
REST API
Kafka schema registry provides REST API for managing schemas. The following table lists the supported methods and parameters, more details about the API, please refer to Schema Registry API.
API | Method | Support Parameters |
---|---|---|
/schemas/ids/{int: id} | GET | |
/schemas/ids/{int: id}/schema | GET | |
/schemas/types | GET | |
/schemas/ids/{int: id}/versions | GET | |
/schemas/ids/{int: id}/subjects | GET | |
/subjects | GET | deleted (boolean), deletedOnly (boolean) |
/subjects/(string: subject) | POST | normalize (boolean), deleted (boolean) |
/subjects/(string: subject) | DELETE | permanent (boolean) |
/subjects/(string: subject)/versions | POST | normalize (boolean) |
/subjects/(string: subject)/versions | GET | deleted (boolean), deletedOnly (boolean) |
/subjects/(string: subject)/versions/(versionId: version) | GET | deleted (boolean) |
/subjects/(string: subject)/versions/(versionId: version) | DELETE | permanent (boolean) |
/subjects/(string: subject)/versions/(versionId: version)/schema | GET | |
/subjects/(string: subject)/versions/(versionId: version)/referencedby | GET | |
/compatibility/subjects/(string: subject)/versions/latest | GET | |
/config/(string: subject) | PUT | only support set compatibility |
/config/(string: subject) | GET | only support get compatibility |
/mode | GET | only support the mode READWRITE |
Use Schema Registry on Console
On the left navigation pane of StreamNative Console, in the Admin section, click Kafka Clients, and choose the Java client, then enable the Kafka Schema Registry by following switch.
Please make sure you granted permission(produce) for topic
public/__kafka_schemaregistry/__schema-registry
in the following page.
We need to mention that Now the Kafka Schemas can’t work with Pulsar schemas. This is the mission of the unified schema registry.