1. Build Applications
  2. Kafka Clients

Kafka Schema Registry

Kafka Schema Registry provides an interface for storing and managing schemas. Producers and consumers can register the schemas within the registry and retrieve them when necessary. Schemas are versioned, and the registry supports configurable compatibility modes between different schema versions. When a producer or consumer attempts to register a new schema version, the registry performs a compatibility check and returns an error if an incompatible change is detected. This mechanism ensures consistency and compatibility among all producers and consumers when schema changes occur.

Access Schema Registry in Kafka clients

To access the Kafka Schema Registry, you must configure how to authenticate. There are two ways to configure authentication to the Schema Registry:

  • OAuth2 authentication: only available for Kafka Java client
  • Basic authentication: available for all Kafka clients

OAuth2 authentication

First, import the following dependencies:

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>3.6.1</version>
    </dependency>
    <dependency>
      <groupId>io.streamnative.pulsar.handlers</groupId>
      <artifactId>oauth-client</artifactId>
      <version>3.2.2.6</version>
    </dependency>
    <dependency>
      <groupId>io.confluent</groupId>
      <artifactId>kafka-avro-serializer</artifactId>
      <version>7.5.0</version>
    </dependency>

Minimum required versions:

  • kafka-clients: 3.4.0
  • oauth-client: 3.1.0.4
  • kafka-avro-serializer: 7.5.0

Note

Before 3.2.2.6, oauth-client requires Java 17 or higher.

Then, in addition to the existing properties, you need to configure more properties like:

// props is the Properties object that has already configures the OAuth2 authentication
// See https://docs.streamnative.io/docs/cloud-connect-kafka-java for the necessary configs
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(KafkaAvroSerializerConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS, "io.streamnative.pulsar.handlers.kop.security.oauth.schema.OauthCredentialProvider");
props.put(KafkaAvroSerializerConfig.BEARER_AUTH_CREDENTIALS_SOURCE, "CUSTOM");

Basic authentication

Unlike the OAuth2 authentication, Basic authentication does not require the oauth-client dependency or kafka-clients >= 3.4.0.

The username can be any non-empty string. The password should be the the token (the jwtToken variable in the code below) of your account.

// props is the Properties object that has already configures the Token authentication
// See https://docs.streamnative.io/docs/cloud-connect-kafka-java for the necessary configs
props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(KafkaAvroSerializerConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
props.put(KafkaAvroSerializerConfig.USER_INFO_CONFIG, String.format("%s:%s", "any-user", jwtToken));

Configurable compatibility modes

When using serialization and deserialization formats such as Avro, JSON Schema, and Protobuf, we need to remember that there are different configurable compatibility modes. In the Schema Registry, schema compatibility is managed by versioning each individual schema. The compatibility type determines how the Schema Registry compares the new schema with previous versions of a schema, for a given subject. Upon its initial creation within a subject, a schema is assigned a unique identifier and a version number, starting at version 1. If the schema is updated and successfully passes the compatibility checks, it is given a new unique identifier and an incremented version number, i.e., version 2.

Compatibility modesAVROJSONProtobuf
NONEYESYESYES
BACKWARDYESYESYES
BACKWARD_TRANSITIVEYESYESYES
FORWARDYESYES-
FORWARD_TRANSITIVEYESYES-
FULLYESYES-
FULL_TRANSITIVEYESYES-

REST API

Kafka schema registry provides REST API for managing schemas. The following table lists the supported methods and parameters, more details about the API, please refer to Schema Registry API.

APIMethodSupport Parameters
/schemas/ids/{int: id}GET
/schemas/ids/{int: id}/schemaGET
/schemas/typesGET
/schemas/ids/{int: id}/versionsGET
/schemas/ids/{int: id}/subjectsGET
/subjectsGETdeleted (boolean), deletedOnly (boolean)
/subjects/(string: subject)POSTnormalize (boolean), deleted (boolean)
/subjects/(string: subject)DELETEpermanent (boolean)
/subjects/(string: subject)/versionsPOSTnormalize (boolean)
/subjects/(string: subject)/versionsGETdeleted (boolean), deletedOnly (boolean)
/subjects/(string: subject)/versions/(versionId: version)GETdeleted (boolean)
/subjects/(string: subject)/versions/(versionId: version)DELETEpermanent (boolean)
/subjects/(string: subject)/versions/(versionId: version)/schemaGET
/subjects/(string: subject)/versions/(versionId: version)/referencedbyGET
/compatibility/subjects/(string: subject)/versions/latestGET
/config/(string: subject)PUTonly support set compatibility
/config/(string: subject)GETonly support get compatibility
/modeGETonly support the mode READWRITE

Use Schema Registry on Console

  1. On the left navigation pane of StreamNative Console, in the Admin section, click Kafka Clients, and choose the Java client, then enable the Kafka Schema Registry by following switch. enable-kafka-schema-registry.png

  2. Please make sure you granted permission(produce) for topic public/__kafka_schemaregistry/__schema-registry in the following page. granted-permission-for-schema-registry-topic.png

We need to mention that Now the Kafka Schemas can’t work with Pulsar schemas. This is the mission of the unified schema registry.

Previous
Transactions