Observação:

Transmita Mensagens AVRO usando o Oracle Cloud Infrastructure Streaming e o OCI Data Flow com processamento em Micro-Batch

Introdução

No cenário atual orientado por dados, a capacidade de processar e analisar fluxos de dados em tempo real é crucial para as empresas que buscam obter insights e responder rapidamente às mudanças nas condições. As tecnologias de processamento de dados de streaming surgiram como soluções poderosas para lidar com fluxos de dados contínuos de alto volume. Neste tutorial, vamos explorar uma abordagem inovadora para transmitir mensagens AVRO de maneira eficiente usando o Oracle Cloud Infrastructure (OCI) Streaming, combinado com a técnica de processamento de microbatch, e aprimoradas com os recursos sem servidor do Oracle Functions com base no Projeto FN de código-fonte aberto.

Introdução ao AVRO e ao Streaming de Dados

O AVRO, um formato de serialização de dados amplamente adotado, é conhecido por sua eficiência em representar estruturas de dados intrincadas e sua compatibilidade com várias linguagens de programação. Quando integrado com tecnologias de streaming, o AVRO capacita as organizações a transmitir e processar dados quase em tempo real, permitindo que elas extraiam insights valiosos sem a latência normalmente associada ao processamento em lote.

OCI Streaming: Capacitando Dados em Tempo Real

A Oracle Cloud Infrastructure (OCI) tem uma variedade de ferramentas para tratar dados na nuvem, com o OCI Streaming sendo um serviço desse tipo personalizado para fluxos de dados em tempo real de alto rendimento. Utilizando o OCI Streaming, os desenvolvedores podem construir pipelines de dados escaláveis e confiáveis que ingerem, processam e distribuem fluxos de dados de forma eficiente.

Spark Gerenciado do OCI Data Flow: Uma Solução Sem Bloqueio

O Oracle Cloud Infrastructure (OCI) Data Flow é um serviço Apache Spark totalmente gerenciado que executa tarefas de processamento em conjuntos de dados extremamente grandes, sem infraestrutura para implantar ou gerenciar.

O processamento microbatch do processamento Microbatch envolve a divisão de fluxos de dados de entrada em lotes compactos usando tempo ou tamanho como critérios. Esses lotes são então processados como tarefas menores. Ao contrário do tratamento constante e individual de registros no processamento em fluxo, o processamento de microbatch introduz um pouco de atraso e armazenamento antes do processamento, o que permite mais controle sobre o que fazer com os dados. Ao contrário do processamento em lote tradicional que aborda conjuntos de big data em intervalos, o processamento de microbatch fornece processamento quase em tempo real e entrega de resultados.

Desbloqueando Sinergia: OCI Streaming, OCI Data Flow e Oracle Functions

Este tutorial se aprofunda na fusão do OCI Streaming, do OCI Data Flow gerenciado pelo Spark Streaming e do Oracle Functions. Vamos orientá-lo durante o processo de configuração de um pipeline de dados de streaming de ponta a ponta que ingere mensagens codificadas por AVRO, processa-as com eficiência usando recursos de processamento de microbatch do Spark gerenciado pelo OCI Data Flow e introduz processamento orientado a eventos sem servidor com o Oracle Functions.

Objetivo

Use o OCI Streaming e o processamento de microbatch do Spark gerenciado pelo OCI Data Flow para criar um pipeline de processamento de dados eficiente em tempo real usando o formato AVRO.

IMPORTANTE: Este tutorial é projetado exclusivamente para fins educacionais e de estudo. Ele fornece um ambiente para que os alunos experimentem e ganhem experiência prática em um ambiente controlado. É crucial observar que as configurações e práticas de segurança empregadas neste laboratório podem não ser adequadas para cenários do mundo real.

Considerações de segurança para aplicativos do mundo real geralmente são muito mais complexas e dinâmicas. Portanto, antes de implementar qualquer uma das técnicas ou configurações mostradas aqui em um ambiente de produção, é essencial realizar uma avaliação e revisão abrangentes da segurança. Esta revisão deve abranger todos os aspectos da segurança, incluindo controle de acesso, criptografia, monitoramento e conformidade, para garantir que o sistema esteja alinhado com as políticas e padrões de segurança da organização.

A segurança deve sempre ser uma prioridade máxima ao passar de um ambiente de laboratório para uma implantação no mundo real.

Fluxo de Processos
T0_1

Arquitetura de alto nível
T0_1

Pré-requisitos - Oracle Cloud Infrastructure

Pré-requisitos - Ambiente de máquina local

Tarefa 1: Configurar Grupos Dinâmicos

  1. Vá para seu domínio, clique em Grupos Dinâmicos e crie os grupos a seguir.

    Nome do grupo: MyFunctions

    ALL {resource.type = 'fnfunc', resource.compartment.id = 'pasteYourCompartmentID'}
    

    Nome do grupo: ContainerIntances

    ALL {resource.type='compute-container-instances',  resource.compartment.id = 'pasteYourCompartmentID'}
    

    Nome do grupo: DataFlowDynamicGroup

    ALL {resource.type='dataflowrun', resource.compartment.id = 'pasteYourCompartmentID'}
    

Tarefa 2: Criar Políticas

Tarefa 3: Criar buckets de armazenamento e fazer upload do esquema AVRO

  1. Vá para Buckets, clique em Criar Bucket e crie um novo bucket chamado avro-schema-bucket para armazenar o arquivo de esquema AVRO.

    T3_1

  2. Agora, selecione seu bucket e TAKE NOTE do Namespace. Precisaremos dele posteriormente.

    T3_1

  3. Faça upload do arquivo user.asvc para esse bucket criado.

    T3_1

Tarefa 4: Criar o Tópico Privado de Streaming do OCI

  1. Vá para Análise e IA e clique em Streaming e crie um novo fluxo chamado FrontDoorTopic.

    T4_0

  2. Selecione Pools de Streams, clique em PrivatePool e, em seguida, clique na opção Definições de Conexão do Kafka e OBSERVAÇÃO DE TE dos campos. Mais tarde, precisaremos dele.

    T4_0

Tarefa 5: Criar o AUTH TOKEN

Crie o AUTH TOKEN para seu usuário. Isso é necessário para trabalhar com o Tópico Kafka

  1. Clique no ícone superior direito do usuário e selecione a opção Definições do Usuário.

  2. Clique em Tokens de Autenticação e, em seguida, gere um novo token e TAKE NOTE do seu token.

    T4_1

Tarefa 6: Criar registro de contêiner

  1. Vá para o menu Serviços do Desenvolvedor, clique em Registro de contêiner e crie os seguintes repositórios privados.

    Nome do Repositório Tipo
    api-avro-sample_a Privado
    api-avro-sample_b Privado
    fn-recep-avro Privado
  2. Verifique os repositórios e TAKE NOTE do Namespace.

    T6_1

  3. Abra o shell do terminal onde você tem o OCI CLI e o Docker instalados e prossiga com o log-in no registro. Verifique qual é o URL correto para sua REGION. Neste tutorial, estamos usando o Leste do Brasil (São Paulo) em que o url do registro é gru.ocir.io.

    docker login gru.ocir.io
    Username: <your container namespace>/youruser
    Password: YOUR_AUTH_TOKEN_CREATED_EARLIER
    

    T6_1

Tarefa 7: Criar OCI Vault

Crie o OCI Vault e forneça as variáveis necessárias que serão usadas posteriormente neste tutorial.

  1. Vá para Identificar e Segurança, clique em Vault e, em seguida, clique em Criar Vault.

    T7_1new

  2. Selecione o novo vault e crie Chaves Principais de Criptografia para ele.

    T7_1new

  3. Crie um novo segredo chamado AUTH_KEY e cole a chave de autenticação que você criou anteriormente.

    T7_1new

  4. Repita o processo de criação de segredo e crie os seguintes novos segredos:

    Nome da Variável Valor
    KAFKA_BOOTSTRAPSERVER "O servidor boostrap da configuração do OCI Streaming"
    KAFKA_TOPIC “FrontDoorTopic”
    KAFKA_USERNAME "Seu nome de usuário + ID de streampool da configuração do OCI Streaming"
    AUTH_KEY "Seu Token AUTH criado em etapas anteriores"
  5. Anote o OCID do Segredo que foi criado para cada segredo e crie um novo arquivo de configuração.

    • O arquivo config.properties contém as variáveis de mapeamento do aplicativo para os ocids de segredos do vault. O aplicativo usará esse arquivo para identificar quais segredos do vault precisam ser reunidos durante o runtime.

    • Crie um novo arquivo em sua máquina local onde você tenha acesso ao OCI-CLI:
      Substitua pelo OCID para cada Secreat
      Nome do Arquivo: config.properties

      kafka_bootstrapserver_vaultOCID=ocid1.vaultsecret.REPLACE-WITH-YOURS
      kafka_topic_vaultOCID=ocid1.vaultsecret.REPLACE-WITH-YOURSxxxxxx
      kafka_username_vaultOCID=ocid1.vaultsecret.REPLACE-WITH-YOURS
      auth_token_vaultOCID=ocid1.vaultsecret.oc1.REPLACE-WITH-YOURS
      
  6. Vá para Buckets, clique em Criar Bucket e crie um novo bucket chamado config para armazenar o arquivo config.properties.

  7. Faça upload do arquivo config.properties para o bucket de armazenamento config

    ls -lrt config.properties
    oci os object put -bn config --file config.properties --force
    

    T7_1new

Tarefa 8: Criar uma mensagem AVRO simples e salvá-la em um arquivo usando o código python de amostra fornecido

  1. Abra o terminal do shell onde você tem o OCI CLI, o Docker e o Python3 instalados e crie um novo arquivo AVRO que contenha uma única mensagem com base no mesmo esquema AVRO que fizemos upload no armazenamento de objetos anteriormente.

    Observação: você deve verificar a versão do Python. Estamos usando o Python 3.9.16 neste tutorial. Talvez as versões anteriores não funcionem.

  2. Obtenha o código aqui Create_avro_sample.zip.

  3. Descompacte-o no local escolhido e execute o programa para gerar uma mensagem AVRO de amostra:

    cd ~
    mkdir create_avro_sample
    cd create_avro_sample
    unzip CreateAVRO_SampleFile.zip
    # Check the files are there
    ls -lrt
    # install the python dependencies on requirements.txt
    pip3 install -r requirements.txt
    # Run the program and create an AVRO message file
    python3 create_avro_sample.py meu_file.bin '{"id":10029,"name":"John","email":"john@bla.com"}'
    

    T8_1 T8_1 T8_1

Tarefa 9: Criar a Função OCI para receber a mensagem AVRO e publicar no tópico OCI Streaming

  1. Vá para Serviços do Desenvolvedor, em Funções, clique em Aplicativos e, em seguida, clique em Criar aplicativo.

    T9_1

  2. Vá para seu shell de terminal onde você tenha o Docker, a CLI do OCI, a CLI FN instalada e execute os comandos a seguir para inicializar a função.

    Observação: se você seguiu as etapas, seu comando de log-in do Docker já foi executado agora, caso contrário, prossiga com a etapa de log-in do Docker na tarefa Criar registro de contêiner.

    fn create context oci-cloud --provider oracle
    fn use context oci-cloud
    fn update context oracle.compartment-id PASTE_YOUR_COMPARTMENT_OCID
    fn update context api-url https://functions.sa-saopaulo-1.oraclecloud.com
    fn update context registry gru.ocir.io/PASTE_YOUR_REGISTRY_NAMESPACE
    

    Observação: Neste tutorial, estamos usando a região Leste do Brasil (São Paulo), se você estiver usando outra região, precisará alterar os locais de api-url e registro.

    T9_1

  3. Crie uma função Hello-world simples para garantir que todas as suas configurações estejam corretas.

    fn init --runtime python fn-recep-avro
    cd fn-recep-avro
    fn deploy --app MyReceptionApp
    fn invoke MyReceptionApp fn-recep-avro
    

    T9_1

  4. Obtenha o código de amostra da função AVRO no arquivo fn-recep-avro.zip e substitua o código hello-world que criamos anteriormente. Você deve obter os arquivos func.py e requirements.txt para trabalhar.

    # Check you have the right code for func.py & requirements.txt (you got from zip file)
    ls -lrt
    

    T9_1

  5. Criar o novo código e implantar a função

    fn deploy --app MyReceptionApp
    

    T9_1

  6. Para chamar a função, precisamos passar uma mensagem AVRO como parâmetro. Para isso, usaremos o arquivo de mensagem AVRO de amostra criado das etapas anteriores. A primeira vez que uma função é chamada, ela leva um pouco mais tempo desde que precisa ser iniciada.

    # Check where you created the sample avro message file
    ls -lrt ../create_avro_sample/
    
    # Invoke the function to check if it's working as expected
    echo -n -e "$(cat ../create_avro_sample/meu_file.bin)" | fn invoke MyReceptionApp fn-recep-avro
    

    T9_1

Tarefa 10: Criar Gateway de API para expor a função

  1. Em sua console, clique em Serviços ao Desenvolvedor e, em Gerenciamento de API, clique em Gateways e em Criar Gateway.

    T10_1

  2. Depois de criado, clique na opção Implantações e, em seguida, clique em Criar implantação.

    Nome: RecepFunction
    Prefixo do caminho: /

    • Na Autenticação, escolha Sem Autenticação, pois esse é um laboratório simples e não há autenticação de API implementada. O principal objetivo aqui é demonstrar uma chamada HTTPS que passa uma mensagem Binary AVRO via API e, para fins deste laboratório, não implementaremos nenhum método de autenticação para este laboratório simples.
    • Antes de avançar para um ambiente real, certifique-se de seguir as melhores práticas de segurança do API Gateway.
    • Para obter mais detalhes, consulte Protegendo Gateways e Recursos da API.

    Rota 1: Caminho: /

    Methos: POST
    Tipo de Backend: Funções Oracle
    Aplicativo: Selecione sua função

    T9_1

    T9_1

    T9_1

    T9_1

  3. Verifique o ponto final do gateway de API e anote.

    T9_1

  4. Abra seu terminal do shell do Linux e chame o API Gateway. Substitua o URL da api pelo ponto final correto que você tem na etapa anterior.

    cd create_avro_sample/
    ls -lrt
    curl -X POST -H "Content-Type: application/octet-stream" \
         -d "$(echo -n -e "$(cat meu_file.bin)")" \
         https://xxxxxxxxxxxxx.apigateway.sa-saopaulo-1.oci.customer-oci.com/
    

    T9_1

Checkpoint

T9_1

Tarefa 11: Criar imagem de contêiner para o tipo de API A

Observação: As APIs de tipo A e código B são basicamente as mesmas com apenas uma mensagem de cabeçalho diferente para simular duas APIs diferentes.

  1. Obtenha o código da API type A e descompacte-o em seu terminal de shell Linux api-avro-sample_a.zip.

  2. Obtenha seu namespace de registro do contêiner que você obteve nas etapas anteriores e crie a localização do registro do aplicativo, seguindo o padrão abaixo. O ocir url é baseado na sua região, ou seja, gru.ocir.io para Brasil East(SaoPaulo)

    [ocir url]/[seu namespace]/api-avro-sample_a:latest

  3. No terminal do shell do Linux, crie e envie a imagem do docker para esta API.

    ls -lrt
    docker build . -t gru.ocir.io/yournamespace/api-avro-sample_a:latest
    docker push gru.ocir.io/yournamespace/api-avro-sample_a:latest
    

    T10_1 T10_1

Tarefa 12: Criar imagem de contêiner para o tipo de API B

  1. Obtenha o código da API type B e descompacte-o em seu terminal de shell Linux api-avro-sample_b.zip.

  2. Obtenha seu namespace de registro do contêiner que você obteve nas etapas anteriores e crie a localização do registro do aplicativo, seguindo o padrão abaixo. O ocir url é baseado na sua região, ou seja, gru.ocir.io para Brasil East(SaoPaulo)

    [ocir url]/[seu namespace]/api-avro-sample_b:latest

  3. No terminal do shell do Linux, crie e envie a imagem do docker para esta API.

    ls -lrt
    docker build . -t gru.ocir.io/yournamespace/api-avro-sample_b:latest
    docker push gru.ocir.io/yournamespace/api-avro-sample_b:latest
    

    T10_1

    T10_1

  4. Verifique sua página de registro do contêiner se a imagem foi enviada com sucesso.

    T10_1

Tarefa 13: Implantar as APIs no serviço de contêiner

  1. Vá para Serviços do Desenvolvedor, Instâncias do Contêiner e clique em Criar instância do contêiner.

    T13_1

    T13_1

  2. Repita a Etapa 1 para api-type-b e selecione a imagem correta para a API TYPE B.

    1. Vá para Serviços do Desenvolvedor, Instâncias do Contêiner, clique em Criar instância do contêiner e repita as etapas para implantar o tipo de API B

    2. Obtenha o Endereço FQDN interno das suas instâncias de contêiner.

      T14_1

      • Clique na instância do contêiner e anote cada Endereço FQDN interno.

      T14_1

    3. Vá para Identificar & Secutiry, clique em Vault, selecione seu VAULT e crie dois novos segredos.

      Nome do Segredo Valor
      API_TIPO_A_URL cole o Endereço Privado do FQDN Interno do Tipo de API A
      API_TIPO_B_URL cole o Endereço Privado do FQDN Interno do Tipo de API B

      Anote cada OCID secreto

      Seu vault deve ter esta aparência agora:

      T14_1

    4. Edite o arquivo config.properties que você transferiu por upload para o bucket de armazenamento config e adicione novas entradas para o OCID secreto

      ls -lrt config.properties
      vi config.properties
      api_type_a_url_vaultOCID=paste_API_TYPE_A_URL_secretOCID
      api_type_b_url_vaultOCID=paste_API_TYPE_B_URL_secretOCID
      
      # After save, please upload the new version to Object Storage
      cat config.properties
      oci os object put -bn config --file config.properties --force
      

      O arquivo deve ter a seguinte aparência:
      T14_1

      T14_1

Tarefa 14: Teste as APIs usando create_avro_sample.py

  1. Vá até o terminal do shell do Linux no qual você salvou o create_avro_sample.py da Tarefa 7 e crie algumas novas mensagens para testar chamadas de API. Estamos criando dois novos arquivos AVRO com IDs diferentes (1010 a 1020) que usaremos como filtro no programa Spark Stream (DataFlow).

    ls -lrt
    python3 create_avro_sample.py type_a_message.bin '{"id":1010,"name":"Paul that goes to API type A","email":"paul@bla.com"}'
    
    python3 create_avro_sample.py type_b_message.bin '{"id":1020,"name":"Mary that goes to API type B","email":"mary@bla.com"}'
    
    

    T14_1

  2. As APIs que passam a mensagem AVRO para teste estão funcionando bem. Vá para a página Instâncias do Contêiner e obtenha o endereço FQDN interno de cada uma das APIs api-type-a e api-type-b. Lembre-se de substituir o URL abaixo do endereço FQDN Interno correspondente de suas APIs.

    ls -lrt type*
    
    curl -i -X POST -H "Content-Type: application/octet-stream" \
       --data-binary "@type_a_message.bin" \
       xxx.xx.x.xxx
    
    curl -i -X POST -H "Content-Type: application/octet-stream" \
       --data-binary "@type_b_message.bin" \
       xxx.xxx.xx.xxx
    
    

    T14_1

Tarefa 15: Configurar o aplicativo de streaming Java Spark

  1. Vá para Buckets, clique em Criar Bucket e crie dois novos buckets denominados dataflow-app-avro e dataflow-logs-avro. Isso será usado para fazer upload do seu aplicativo java.

  2. Verifique duas vezes as versões do seu ambiente java.

    Java

    java 11.0.8 2020-07-14 LTS
    Java(TM) SE Runtime Environment 18.9 (build 11.0.8+10-LTS)
    Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.8+10-LTS, mixed mode)
    

    Maven

    Apache Maven 3.5.4 (Red Hat 3.5.4-5)
    Maven home: /usr/share/maven
    Java version: 11.0.20, vendor: Red Hat, Inc., runtime: /usr/lib/jvm/java-11-openjdk-11.0.20.0.8-3.0.1.el8.x86_64
    Default locale: en_US, platform encoding: ANSI_X3.4-1968
    OS name: "linux", version: "5.15.0-103.114.4.el8uek.x86_64", arch: "amd64", family: "unix"
    
  3. Faça download do código de amostra e descompacte-o em seu ambiente local que tenha oci-cli, docker, java e maven: spark-consume-avro-message.zip.

    unzip spark-consume-avro-message.zip
    cd spark-consume-avro-message
    ls -lrt
    

    T15_1 T15_1

    Mergulhe no código proxy para chamar tipos de instâncias de contêiner A e B.

    Verifique o arquivo de programa principal .src/main/java/example/Example.java.... T15_1

  4. Como este programa Java usa uma biblioteca para lidar com o spark-avro, precisamos empacotar a depedência para passá-la ao fluxo de dados. Para isso, usaremos o Data Flow Dependency Packager, se você precisar de mais detalhes, poderá acessar o Data Flow Dependency Packager.

    O pacote org.apache.spark:spark-avro_2.12:3.2.1 já está declarado no arquivo packages.txt, você só precisa empacotá-lo executando:

    docker run --privileged --platform linux/amd64 --rm -v $(pwd):/opt/dataflow  --pull always -it phx.ocir.io/oracle/dataflow/dependency-packager:latest -p 3.8
    

    T15_1 T15_1 T15_1

  5. Faça upload do arquivo archive.zip no bucket de armazenamento chamado dataflow-app-avro, usando o oci-cli.

    oci os object put -bn dataflow-app-avro --file archive.zip --force
    
  6. Compile, empacote o aplicativo java e faça upload dele no bucket de armazenamento dataflow-app-avro

    ls -lrt
    mvn clean install
    

    T15_1
    ...reduziu o número de linhas do log de compilação... T15_1

    # upload the JAR file to the storage bucket
    oci os object put -bn dataflow-app-avro --file target/consumekafka-1.0-SNAPSHOT.jar --force
    

    T15_1

  7. Verifique seu bucket de armazenamento dataflow-app-avro atual e certifique-se de que ele seja semelhante a este.

    T15_1

  8. Vá para Análise e IA, em Data Lake, clique em Data Flow, selecione o menu esquerdo Pontos finais privados e clique em Criar ponto final privado.

    • O ponto final privado é necessário porque estamos usando a sub-rede PRIVATE para as instâncias de contêiner e o Pool de Streaming do OCI.

    • Certifique-se de preencher as Zonas de DNS com o FQDN Interno das Instâncias do Contêiner do OCI e o Pool de Streaming do OCI com delimitação por vírgula.

      T15_1

  9. Vá para Análise e IA, em Data Lake, clique em Data Flow e, em seguida, clique em Criar aplicativo.

    T15_1
    T15_1
    T15_1
    T15_1
    T15_1
    T15_1

    • Uma vez criado, selecione o fluxo de dados spark-lab-avro e, em seguida, clique em Executar para iniciar o programa, geralmente leva até 8 minutos para ser iniciado.

      T15_1
      T15_1

  10. Verifique o aplicativo de fluxo de dados em Execução e abra o SparkUI que mostrará os jobs atuais e o aplicativo está funcionando.

    T15_1

    T15_1

    T15_1

Tarefa 16: Validar o fluxo

Chame a função e passe uma mensagem para verificar se todo o fluxo está funcionando conforme esperado.

  1. Abra seu terminal do shell do Linux onde você criou as mensagens de amostra type_a_message.bin e type_b_message.bin e envie a mensagem. Substitua o URL da api pelo ponto final correto obtido da criação do Gateway de API.

    cd create_avro_sample/
    ls -lrt
    curl -X POST -H "Content-Type: application/octet-stream" \
       -d "$(echo -n -e "$(cat type_a_message.bin)")" \
       https://xxxxxxxxxxxxx.apigateway.sa-saopaulo-1.oci.customer-oci.com/
    

    T16_1

  2. Vamos verificar se o tipo de API A foi chamado verificando os logs na instância do contêiner.

    T16_1 T16_1

Você pode repetir o processo e enviar um arquivo type_b_message.bin, e ele chamará o tipo de instância do contêiner B.

T9_1

Aquisições

Mais Recursos de Aprendizagem

Explore outros laboratórios no site docs.oracle.com/learn ou acesse mais conteúdo de aprendizado gratuito no canal YouTube do Oracle Learning. Além disso, visite education.oracle.com/learning-explorer para se tornar um Oracle Learning Explorer.

Para obter a documentação do produto, visite o Oracle Help Center.