OpenSearch Pipelines

Crie e gerencie pipelines OpenSearch usando o Prepper de Dados para ingerir dados em um cluster OpenSearch.

O Data Prepper é um coletor de dados de código aberto que pode filtrar, enriquecer, transformar, normalizar e agregar dados para análise e visualização downstream. É uma das ferramentas de ingestão de dados mais recomendadas para processar conjuntos de dados grandes e complexos.

Usamos o modelo PULL para ingerir dados no cluster OpenSearch 2.x e integrá-los ao serviço Oracle Cloud Infrastructure Streaming, Kafka Autogerenciado e Object Storage.
Observação

No momento, o recurso Cluster OpenSearch com Prepper de Dados está disponível no realm OC1.

Políticas Obrigatórias

Execute as seguintes tarefas antes de continuar com as etapas descritas neste tópico:

Se você não for um administrador em sua tenancy, entre em contato com os administradores da tenancy para conceder essas permissões a você. O administrador deve atualizar a permissão de usuários a seguir para permitir que usuários não administradores gerenciem e as operações CRUD dos pipelines

A seguinte política permite que o administrador conceda permissão a todos os usuários na respectiva tenancy:

Allow any-user to manage opensearch-cluster-pipeline in tenancy

A política a seguir permite que o administrador conceda permissão para um grupo em um compartimento (recomendado)

Allow group <group> to manage opensearch-cluster-pipeline in compartment <compartment>

em que <group> é todos os usuários dentro desse grupo que podem acessar o recurso.

A política a seguir permite que os pipelines do OpenSearch leiam os segredos do Oracle Cloud Infrastructure Vault.

Allow any-user to read secret-bundles in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }' }
Allow any-user to read secrets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }

Políticas do Serviço Object Storage

Estas políticas só são necessárias para a origem do Object Storage:

A seguinte política permite que pipelines OpenSearch usem um bucket do serviço Object Storage como persistência de coordenação de origem:

Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<source-coordination-bucket-name>'}

A seguinte política permite que pipelines OpenSearch ingeram objetos do serviço Object Storage:

Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}

A seguinte política permite que pipelines do OpenSearch leiam buckets do serviço Object Storage:

Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}

A política a seguir permite que pipelines do OpenSearch leiam buckets do bucket de coordenação de origem.

Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}

OCI Streaming e Políticas do Kafka Autogerenciadas

Essas políticas são necessárias para o serviço OCI Streaming ou origens Kafka autogerenciadas.

Políticas de Rede Comuns

Observação

Essas políticas não são necessárias para o serviço público do OCI Streaming.

Políticas a serem adicionadas para permitir que o serviço OpenSearch crie, leia e atualize a exclusão dos pontos finais privados na sub-rede do cliente.

Allow group SearchOpenSearchAdmins to manage vnics in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage vcns in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage subnets in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to use network-security-groups in compartment <network_resources_compartment>

Políticas do OCI Streaming Service (Públicas e Privadas)

Essas políticas só são necessárias para o serviço OCI Streaming.

A política a seguir permite que os pipelines do OpenSearch consumam os registros do serviço OCI Streaming.

Allow ANY-USER TO {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in compartment '<compartment-name>' 
where ALL {request.principal.type='opensearchpipeline', target.streampool.id = '<target-stream-pool-ocid>'}

A política a seguir permite que pipelines OpenSearch leiam pools de streams do serviço de streaming do OCI

Allow ANY-USER TO read stream-pools in compartment '<compartment-name>' where ALL {request.principal.type='opensearchpipeline', 
target.streampool.id = '<target-stream-pool-ocid>'}

Permissão Kafka Autogerenciada

Essas permissões só são necessárias para origens Kafka autogerenciadas.

Selecione o seguinte link para adicionar a permissão necessária para listar os tópicos, descrever os tópicos, ingressar no grupo e consumir os registros do tópico pelo pipeline OpenSearch:

https://kafka.apache.org/documentation/#security_authz

Regras de Segurança de Rede

Essa configuração só é necessária para o serviço Private OCI Streaming e o Kafka Autogerenciado. No caso do serviço Public OCI Streaming, não selecione nenhum.

Adicione uma regra de segurança de entrada na Lista de Segurança da sub-rede ou no Grupo de Segurança de Rede a todos os pipelines OpenSearch para se comunicar com o serviço privado OCI Streaming em execução na sua sub-rede.

Para adicionar a regra de segurança, consulte Regras de Segurança e Acesso e Segurança.

Para localizar o CIDR da sua sub-rede, consulte Obtendo Detalhes de uma Sub-rede.

A imagem a seguir mostra as regras de entrada para o Grupo de Segurança de Rede.

Regras de entrada para Grupos de Segurança de Rede

Criando os Segredos no Vault

Todos os segredos de texto sem formatação exigidos pelos pipelines OpenSearch devem ser passados para ele por meio do Vault, pois os pipelines OpenSearch não aceitam segredos de texto sem formatação, como nomes de usuário e senhas no código yaml dos pipelines.

Execute as seguintes tarefas:

  • Crie um novo usuário com permissões de gravação no cluster OpenSearch. Para obter instruções, consulte os seguintes tópicos da documentação do OpenSearch:

    Grupos de ações padrão

    Usuários e atribuições

    Pesquisar com Políticas do Serviço IAM OpenSearch

  • Crie segredos (nome de usuário e senha) no Vault para o novo usuário que você criou. Para obter instruções, consulte os seguintes tópicos do Oracle Cloud Infrastructure Vault:

    Gerenciando Segredos do Vault

    Criando um Segredo em um Vault

  • Adicione as políticas de controlador de recursos a seguir na tenancy do OpenSearch para permitir que os pipelines do OpenSearch leiam os segredos do Vault.
    ALLOW ANY-USER to read secrets in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' } 
    ALLOW ANY-USER to read secret-bundles in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }

    Para obter mais informações sobre como criar políticas para o serviço Vault, consulte Detalhes do Serviço Vault.

Você pode executar as seguintes tarefas de pipeline OpenSearch:

Listar os pipelines OpenSearch em um compartimento.

Crie um novo pipeline do OpenSearch.

Obtenha os detalhes de um pipeline do OpenSearch.

Edite as definições de um pipeline OpenSearch.

Exclua um pipeline OpenSearch da sua tenancy.

Processadores Suportados

Object Storage e Coordenação de Origem YAML

A origem do Object Storage depende da configuração de coordenação da origem. O pipeline OpenSearch suporta coordenação de origem usando o Object Storage como persistência. Você deve fornecer os detalhes do bucket do Object Storage da sua tenancy.

Veja abaixo um exemplo de coordenação de origem:

source_coordination:
  store:
    oci-object-bucket:
      name: <OCI Object storage bucket-name details from their tenancy>
      namespace: <namespace>

Para obter informações sobre como obter o namespace do serviço Object Storage da sua tenancy, consulte Noções Básicas de Namespaces do Serviço Object Storage.

Veja a seguir um exemplo de coordenação de origem usando a persistência do serviço Object Storage:

source_coordination:
  store:
    oci-object-bucket:
      name: "dataprepper-test-pipelines" <-- bucket name
      namespace: "idee4xpu3dvm".         <-- namespace

YAML do Serviço Object Storage

Veja a seguir as seções do YAML de configuração do pipeline que você deve conhecer:

  • Segredos do OCI: Você pode criar um segredo no serviço Vault com suas credenciais de cluster OpenSearch e usá-lo no pipeline YAML para estabelecer conexão com o cluster OpenSearch.
  • OpenSearch Sink: O sumidouro contém OCIDs do cluster OpenSearch com nomes de índice para ingestão.
  • origemoci-object: O Prepper de Dados suporta ingestão baseada em varredura usando o Object Storage, que tem muitas configurações suportadas. Você pode configurar a origem para ingerir objetos dentro do bucket do Object Storage com base na frequência programada ou não com base em qualquer programação. Você tem as seguintes opções de varredura
    • Uma ou mais verificações de tempo: Esta opção permite configurar o pipeline que lê objetos nos buckets do Object Storage uma ou mais vezes com base no horário da última modificação dos objetos.
    • Verificação baseada em programação: Esta opção permite que você programe uma verificação em um intervalo regular após a criação do pipeline.

A tabela a seguir lista as opções que você pode usar para configurar a origem do Object Storage.

Configurações do Serviço Object Storage
Opções Obrigatório Tipo Descrição
acknowledgments Não Booliano Quando true, permite que origens de objetos do serviço Object Storage recebam confirmações de ponta a ponta quando eventos são recebidos por sumidouros OpenSearch.
buffer_timeout Não Duração O tempo permitido para gravar eventos no buffer Prepper de Dados antes do timeout ocorrer. Todos os eventos que a origem do OCI não puder gravar no buffer durante o período especificado serão descartados. O padrão é 10s.
codec Sim Codec O codec da preparação de dados a ser aplicado.
compression Não String O algoritmo de compactação a ser aplicado: none, gzip, snappy ou automatic. O padrão é none.
delete_oci_objects_on_read Não Booliano Quando true, a verificação de origem do serviço Object Storage tenta excluir objetos do serviço Object Storage depois que todos os eventos do objeto do serviço Object Storage são confirmados com sucesso por todos os sumidouros. acknowledgments deve ser ativado ao excluir objetos do serviço Object Storage. O padrão é false. A exclusão não funcionará se acknowledgments de ponta a ponta não estiver ativado.
oci Não OCI A configuração do OCI. Consulte a seção do OCI a seguir para obter mais informações.
records_to_accumulate Não Inteiro O número de mensagens que se acumulam antes de serem gravadas no buffer. O padrão é 100.
workers Não Inteiro Configura o número de threads de worker que a origem usa para ler dados do bucket do OCI. Deixando esse valor no padrão, a menos que seus objetos do serviço Object Storage tenham menos de 1 MB. O desempenho pode diminuir para objetos de Armazenamento de Objetos maiores. O padrão é 1.

Configuração do Pipeline de Armazenamento de Objetos - YAML

Veja a seguir um exemplo da configuração YAML do pipeline do Object Storage:

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username: 
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      compression: none
      scan:
        start_time: 2024-11-18T08:01:59.363Z
        buckets:
          - bucket:
              namespace: <namespace>
              name: <bucket-name>
  sink:
    - opensearch:
        hosts: [ <cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>

Configurações de Amostra

Veja a seguir as opções de verificação única que você pode aplicar no nível de bucket individual do Object Storage ou no nível de verificação

oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "<namespace>"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-01-01T00:00:00Z
      compression: "none"

Hora Final

Veja a seguir um exemplo de hora de término:


simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               end_time: 2024-12-01T00:00:00Z
      compression: "none"

Hora Inicial e Hora Final

Veja a seguir um exemplo de hora de início e hora de término:

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               end_time: 2024-12-01T00:00:00Z
      compression: "none"

Faixa

Veja a seguir um exemplo de intervalo:

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               range: "PT12H"
      compression: "none"

Horário de Início, Horário de Término e Intervalo

Veja a seguir um exemplo de hora de início, hora de término e intervalo:

oci-object:
      codec:
        newline:
      scan:
        start_time: 2023-01-01T00:00:00Z
        end_time: 2024-12-01T00:00:00Z
        range: "PT12H"
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"

Filtro include_prefix

Veja a seguir um exemplo do filtro include_prefix:

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               filter:
                 include_prefix: ["newtest1", "10-05-2024"]
      compression: "none"

Filtrar arquivos da pasta

Veja a seguir um exemplo de arquivos de filtro da pasta. Para ler arquivos somente de pastas específicas, use o filtro para especificar pastas. Veja um exemplo de como incluir arquivos de folder2 em folder1 usando include_prefix.

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               filter:
                include_prefix: ["folder1/folder2"]
      compression: "none"

Filtro exclude_prefix

Veja a seguir um exemplo do filtro exclude_prefix:

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               filter:
                 include_prefix: ["newtest", "10-05-2024"]
                 exclude_suffix: [".png"]
      compression: "none"

Suporte ao Codec para JSON

Veja a seguir um exemplo de suporte a codec para JSON:

source:
    oci-object:
      acknowledgments: true
      codec:
        json: null
      scan:
        start_time: 2024-06-10T00:00:00Z
        end_time: 2024-06-10T23:00:00Z
        buckets:
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "data-prepper-object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
      compression: "none"

Suporte do Codec para CSV

Veja a seguir um exemplo de suporte de codec para CSV:


source:
    oci-object:
      acknowledgments: true
      codec:
        csv: null
      scan:
        start_time: 2024-06-10T00:00:00Z
        end_time: 2024-06-10T23:00:00Z
        buckets:
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "data-prepper-object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
      compression: "none"

Suporte ao Codec para Newline

Veja a seguir um exemplo de suporte de codec para nova linha:

source:
    oci-object:
      acknowledgments: true
      codec:
        newline: null
      scan:
        start_time: 2024-06-10T00:00:00Z
        end_time: 2024-06-10T23:00:00Z
        buckets:
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "data-prepper-object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
      compression: "none"

Programando Opções de Ingestão sem Contagem

Veja a seguir um exemplo de agendamento de opções de ingestão sem contagem:

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline: null
      scan:
        scheduling:
          interval: PT40S
        buckets:
          - bucket:
              namespace: idee4xpu3dvm
              name: data-prepper-object-storage-testing
      compression: none

Programando Opções de Ingestão com Contagem

Veja a seguir um exemplo de agendamento de opções de ingestão com contagem:

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline: null
      scan:
        scheduling:
          interval: PT40S
          count: 10
        buckets:
          - bucket:
              namespace: idee4xpu3dvm
              name: data-prepper-object-storage-testing
      compression: none

Programando Opções de Ingestão com Hora Inicial

Veja a seguir um exemplo de agendamento de opções de ingestão com horário de início:


oci-object:
      codec:
        newline:
      scan:
        scheduling:
          interval: "PT40S"
          count: 10
        start_time: 2023-01-01T00:00:00Z
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
      compression: "none"

Programando Opções de Ingestão com Hora Final

Veja a seguir um exemplo de agendamento de opções de ingestão com hora de término:

oci-object:
      codec:
        newline:
      scan:
        scheduling:
          interval: "PT40S"
          count: 10
        end_time: 2023-01-01T00:00:00Z
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
      compression: "none"

Kafka YAML

A origem Kafka não requer nenhuma coordenação de origem.

Para obter informações sobre todas as configurações disponíveis para a Origem Kafka, acesse o seguinte link:

https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kafka/

Você pode usar o OCI Streaming Service como origem do Kafka para fazer a ingestão no cluster OpenSearch. Para obter informações sobre como fazer isso, consulte Usando APIs de Kafka.

Acesso Público ao OCI Streaming YAML

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
kafka-pipeline:
  source:
    kafka:
      bootstrap_servers:
        - <bootstrap_servers>
      topics:
        - name: <topic_name>
          group_id: <group_id>
      acknowledgments: true
      encryption:
        type: ssl
        insecure: false
      authentication:
        sasl:
          oci:
            stream_pool_id: <target-stream-pool-ocid>
  sink:
    - opensearch:
        hosts: [ <opensearch-cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>

Acesso Privado YAML do Serviço OCI Streaming de Pipelines

Veja a seguir um exemplo do YAML para os pipelines OpenSearch no serviço OCI Streaming:

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
kafka-pipeline:
  source:
    kafka:
      bootstrap_servers:
        - <bootstrap_servers>
      topics:
        - name: <topic_name>
          group_id: <group_id>
      acknowledgments: true
      encryption:
        type: ssl
        insecure: false
      authentication:
        sasl:
          oci:
            stream_pool_id: <target-stream-pool-ocid>
  sink:
    - opensearch:
        hosts: [ <opensearch-cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>

Kafka YAML Autogerenciado

Veja a seguir um exemplo do Kafka YAML autogerenciado para o OpenSearch:

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
      kafka-credentials:
        secret_id: <secret-ocid>
simple-sample-pipeline:
  source:
    kafka:
      bootstrap_servers:
        - "https://<bootstrap_server_fqdn>:9092"
      topics:
        - name: <topic_name>
          group_id: <group_id>
      acknowledgments: true
      encryption:
        type: ssl
        insecure: false
        certificate: <certificate-in-pem-format>
      authentication:
        sasl:
          plaintext:
            username: ${{oci_secrets:kafka-credentials:username}}
            password: ${{oci_secrets:kafka-credentials:password}}
  sink:
    - opensearch:
        hosts: [ <opensearch-cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>