Integrar Registro de Esquema com o Apache Kafka

O Registro de Esquema armazena um histórico com controle de versão de todos os esquemas com base em um nome de assunto especificado, fornece muitas definições de compatibilidade e permite a evolução do esquema ao longo do tempo. O Registro de Esquema vive fora e separadamente dos corretores Kafka.

O Registro de Esquema tem três componentes principais:

  • Servidor Web de registro: Aplicativo Web expondo os pontos finais REST usados para gerenciar entidades de esquema. Use um proxy Web e um Balanceador de Carga com muitos Servidores Web para fornecer HA e escalabilidade. Para ativar o HA do Registro de Esquema, consulte HA do Registro de Esquema.
  • Armazenamento plugável/Armazenamento de Metadados do Esquema: Armazenamento relacional que contém os metadados das entidades do esquema. Há suporte para armazenamento na memória e bancos de dados MySQL.
  • Serdes Storage: Armazenamento de arquivos para os frascos de serializador e desserializador. Há suporte para sistemas de arquivos locais e armazenamento HDFS. O armazenamento do sistema de arquivos local é o padrão.

Os principais componentes do Registro de Esquema incluem:

  • Servidor Web de registro: Aplicativo Web expondo os pontos finais REST usados para gerenciar entidades de esquema. Use um proxy Web e um Balanceador de Carga com muitos Servidores Web para fornecer HA e escalabilidade. Consulte HA do Registro de Esquema.
  • Armazenamento Plugável de Metadados do Esquema: Armazenamento relacional que contém os metadados das entidades do esquema. Há suporte para armazenamento na memória e bancos de dados MySQL.
  • Serdes Storage: Armazenamento de arquivos para os frascos de serializador e desserializador. Há suporte para sistemas de arquivos locais e armazenamento HDFS. O armazenamento do sistema de arquivos local é o padrão.

Interagindo com o Registro de Esquema

Use os clientes do Registro de Esquema para que um aplicativo interaja com o servidor do Registro de Esquema diretamente para acessar ou alterar o esquema.
Observação

A definição da configuração JAAS é obrigatória somente para um cluster ativado para Kerberos (cluster HA).
  1. Crie um arquivo de configuração JAAS com o Cliente de Registro:
    RegistryClient {
      com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true
      keyTab="/path/to/registry/keytab"
      storeKey=true
      useTicketCache=false
      principal="principal@realm";
    };
  2. No código Java, adicione o seguinte à configuração do Kafka Producer:
    import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer;
    ..
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.setProperty("schema.registry.url", "http://<host name>:9093/api/v1");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
  3. Crie o cliente de Registro de Esquema definindo a variável env com configurações de cliente da seguinte forma:
    System.setProperty("java.security.auth.login.config","/path/to/registryClient_jaas.conf");
    
    Map<String, Object> config = new HashMap<>();
    config.put("schema.registry.url", "http://<registry host name>:9093/api/v1");
    
    String topicName = "my-topic";
    SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClient(config);
    
    try {
        SchemaMetadata schemaMetadata = new SchemaMetadata.Builder(topicName)
            .type(AvroSchemaProvider.TYPE)
            .schemaGroup("sample-group")
            .description("Sample schema")
            .compatibility(SchemaCompatibility.BACKWARD)
            .build();
        SchemaIdVersion v1 = schemaRegistryClient.addSchemaVersion(schemaMetadata, new SchemaVersion(schemaMetadata, "Initial version of the schema"));
        LOG.info("Registered schema [{}] and returned version [{}]", schemaMetadata, v1);
    } catch (SchemaNotFoundException e) {
        LOG.info("Schema addition failed for topic {}", topicName);
        throw e;
    }}