Integrate Schema Registry with Apache Kafka

Schema Registry stores a versioned history of all schemas based on a specified subject name, provides many compatibility settings, and allows schema evolution over time. Schema Registry lives outside of and separately from the Kafka brokers.

Schema Registry has three main components:

  • Registry web server: Web Application exposing the REST endpoints used to manage schema entities. Use a web proxy and Load Balancer with many Web Servers to provide HA and scalability. To enable Schema Registry HA, see Schema Registry HA.
  • Pluggable storage/Schema Metadata Storage: Relational store that holds the metadata for the schema entities. In-memory storage and MySQL databases are supported.
  • Serdes Storage: File storage for the serializer and deserializer jars. Local file systems and HDFS storage are supported. Local file system storage is the default.

The main components of Schema Registry include:

  • Registry web server: Web Application exposing the REST endpoints used to manage schema entities. Use a web proxy and Load Balancer with many Web Servers to provide HA and scalability. See Schema Registry HA.
  • Pluggable storage Schema Metadata Storage: Relational store that holds the metadata for the schema entities. In-memory storage and MySQL databases are supported.
  • Serdes Storage: File storage for the serializer and deserializer jars. Local file systems and HDFS storage are supported. Local file system storage is the default.

Interacting with Schema Registry

Use the Schema Registry clients for an application to interact with Schema Registry server directly to access or change schema.
Note

Setting JAAS config is required for a Kerberos enabled cluster (HA cluster) only.
  1. Create a JAAS config file with Registry Client:
    RegistryClient {
      com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true
      keyTab="/path/to/registry/keytab"
      storeKey=true
      useTicketCache=false
      principal="principal@realm";
    };
  2. In Java code, add the following to the Kafka Producer configuration:
    import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer;
    ..
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.setProperty("schema.registry.url", "http://<host name>:9093/api/v1");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
  3. Create Schema Registry client by setting the env variable with client configs as follows:
    System.setProperty("java.security.auth.login.config","/path/to/registryClient_jaas.conf");
    
    Map<String, Object> config = new HashMap<>();
    config.put("schema.registry.url", "http://<registry host name>:9093/api/v1");
    
    String topicName = "my-topic";
    SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClient(config);
    
    try {
        SchemaMetadata schemaMetadata = new SchemaMetadata.Builder(topicName)
            .type(AvroSchemaProvider.TYPE)
            .schemaGroup("sample-group")
            .description("Sample schema")
            .compatibility(SchemaCompatibility.BACKWARD)
            .build();
        SchemaIdVersion v1 = schemaRegistryClient.addSchemaVersion(schemaMetadata, new SchemaVersion(schemaMetadata, "Initial version of the schema"));
        LOG.info("Registered schema [{}] and returned version [{}]", schemaMetadata, v1);
    } catch (SchemaNotFoundException e) {
        LOG.info("Schema addition failed for topic {}", topicName);
        throw e;
    }}