OpenSearch Pipelines

Crie e gerencie pipelines OpenSearch usando o Prepper de Dados para ingerir dados em um cluster OpenSearch.

O Prepper de Dados é um coletor de dados de código aberto que pode filtrar, enriquecer, transformar, normalizar e agregar dados para análise e visualização downstream. É uma das ferramentas de ingestão de dados mais recomendadas para processar conjuntos de dados grandes e complexos.

Os pipelines OpenSearch são compatíveis com clusters OpenSearch que executam a versão 2.x e posterior.

Observação

O recurso Cluster OpenSearch com Prepper de Dados só está disponível no realm OC1.

Políticas Obrigatórias

Conclua os requisitos de política descritos nesta seção antes de prosseguir com as etapas descritas neste tópico.

Se você não for um administrador em sua tenancy, entre em contato com os administradores da tenancy para conceder essas permissões a você. O administrador deve atualizar a permissão de usuários a seguir para permitir que usuários não administradores gerenciem e as operações CRUD dos pipelines

A seguinte política permite que o administrador conceda permissão a todos os usuários na respectiva tenancy:

Allow any-user to manage opensearch-cluster-pipeline in tenancy

A política a seguir permite que o administrador conceda permissão para um grupo em um compartimento (recomendado)

Allow group <group> to manage opensearch-cluster-pipeline in compartment <compartment>
                

em que <group> é todos os usuários dentro desse grupo que podem acessar o recurso.

A política a seguir permite que os pipelines do OpenSearch leiam os segredos do Oracle Cloud Infrastructure Vault.

Allow any-user to read secret-bundles in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }' }
Allow any-user to read secrets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }
Observação

Os nomes de índice OpenSearch devem seguir estas regras:

  • Todas as letras devem estar em letras minúsculas.
  • Os nomes não podem começar com um sublinhado ( _ ) ou hífen (-).
  • Os nomes não podem conter espaços, vírgulas ou qualquer um dos seguintes caracteres: :, ", *, +, /, \, |, ?, #, >, <
  • O nome do índice pode conter uma expressão prepper de dados.

Criando os Segredos no Vault

Todos os segredos de texto sem formatação exigidos pelos pipelines OpenSearch devem ser passados para ele por meio do Vault, pois os pipelines OpenSearch não aceitam segredos de texto sem formatação, como nomes de usuário e senhas no código YAML dos pipelines.

Execute as seguintes tarefas:

  • Crie um novo usuário com permissões de gravação no cluster OpenSearch. Para obter instruções, consulte os seguintes tópicos da documentação do OpenSearch:

    Grupos de ações padrão

    Usuários e atribuições

    Pesquisar com Políticas do Serviço IAM OpenSearch

  • Crie segredos (nome de usuário e senha) no Vault para o novo usuário que você criou. Para obter instruções, consulte os seguintes tópicos do Oracle Cloud Infrastructure Vault:

    Gerenciando Segredos do Vault

    Criando um Segredo em um Vault

  • Adicione as políticas de controlador de recursos a seguir na tenancy do OpenSearch para permitir que os pipelines do OpenSearch leiam os segredos do Vault.
    ALLOW ANY-USER to read secrets in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' } 
    ALLOW ANY-USER to read secret-bundles in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }

    Para obter mais informações sobre como criar políticas para o serviço Vault, consulte Detalhes do Serviço Vault.

Você pode executar as seguintes tarefas de pipeline OpenSearch:

Listar os pipelines OpenSearch em um compartimento.

Crie um novo pipeline do OpenSearch.

Obtenha os detalhes de um pipeline do OpenSearch.

Edite as definições de um pipeline OpenSearch.

Exclua um pipeline OpenSearch da sua tenancy.

Processadores Suportados

Pipelines de PULL

Políticas do Serviço Object Storage

Estas políticas só são necessárias para a origem do Object Storage:

A seguinte política permite que pipelines OpenSearch usem um bucket do serviço Object Storage como persistência de coordenação de origem:

Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<source-coordination-bucket-name>'}

A seguinte política permite que pipelines OpenSearch ingeram objetos do serviço Object Storage:

Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}

A seguinte política permite que pipelines do OpenSearch leiam buckets do serviço Object Storage:

Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}

A política a seguir permite que pipelines do OpenSearch leiam buckets do bucket de coordenação de origem.

Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}

OCI Streaming e Políticas do Kafka Autogerenciadas

Essas políticas são necessárias para o serviço OCI Streaming ou origens Kafka autogerenciadas.

Políticas de Rede Comuns

Observação

Essas políticas não são necessárias para o serviço público do OCI Streaming.

Políticas a serem adicionadas para permitir que o serviço OpenSearch crie, leia e atualize a exclusão dos pontos finais privados na sub-rede do cliente.

Allow group SearchOpenSearchAdmins to manage vnics in compartment <network_resources_compartment>
                    
Allow group SearchOpenSearchAdmins to manage vcns in compartment <network_resources_compartment>
                    
Allow group SearchOpenSearchAdmins to manage subnets in compartment <network_resources_compartment>
                    
Allow group SearchOpenSearchAdmins to use network-security-groups in compartment <network_resources_compartment>
                    

Políticas do OCI Streaming Service (Públicas e Privadas)

Essas políticas só são necessárias para o serviço OCI Streaming.

A política a seguir permite que os pipelines do OpenSearch consumam os registros do serviço OCI Streaming.

Allow ANY-USER TO {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in compartment '<compartment-name>' 
where ALL {request.principal.type='opensearchpipeline', target.streampool.id = '<target-stream-pool-ocid>'}

A política a seguir permite que pipelines OpenSearch leiam pools de streams do serviço de streaming do OCI

Allow ANY-USER TO read stream-pools in compartment '<compartment-name>' where ALL {request.principal.type='opensearchpipeline', 
target.streampool.id = '<target-stream-pool-ocid>'}

Permissão Kafka Autogerenciada

Essas permissões só são necessárias para origens Kafka autogerenciadas.

Selecione o seguinte link para adicionar a permissão necessária para listar os tópicos, descrever os tópicos, ingressar no grupo e consumir os registros do tópico pelo pipeline OpenSearch:

https://kafka.apache.org/documentation/#security_authz

Regras de Segurança de Rede

Essa configuração só é necessária para o serviço Private OCI Streaming e o Kafka Autogerenciado. No caso do serviço Public OCI Streaming, não selecione nenhum.

Adicione uma regra de segurança de entrada na Lista de Segurança da sub-rede ou no Grupo de Segurança de Rede a todos os pipelines OpenSearch para se comunicar com o serviço privado OCI Streaming em execução na sua sub-rede.

Para adicionar a regra de segurança, consulte Regras de Segurança e Acesso e Segurança.

Para localizar o CIDR da sua sub-rede, consulte Obtendo Detalhes de uma Sub-rede.

A imagem a seguir mostra as regras de entrada para o Grupo de Segurança de Rede.

Regras de entrada para Grupos de Segurança de Rede

Object Storage e Coordenação de Origem YAML

A origem do Object Storage depende da configuração de coordenação da origem. O pipeline OpenSearch suporta coordenação de origem usando o Object Storage como persistência. Você deve fornecer os detalhes do bucket do Object Storage da sua tenancy.

Veja abaixo um exemplo de coordenação de origem:

source_coordination:
  store:
    oci-object-bucket:
      name: <OCI Object storage bucket-name details from their tenancy>
      namespace: <namespace>
                

Para obter informações sobre como obter o namespace do serviço Object Storage da sua tenancy, consulte Noções Básicas de Namespaces do Serviço Object Storage.

Veja a seguir um exemplo de coordenação de origem usando a persistência do serviço Object Storage:

source_coordination:
  store:
    oci-object-bucket:
      name: "dataprepper-test-pipelines" <-- bucket name
      namespace: "idee4xpu3dvm".         <-- namespace

YAML do Serviço Object Storage

Veja a seguir as seções do YAML de configuração do pipeline que você deve conhecer:

  • Segredos do OCI: Você pode criar um segredo no serviço Vault com suas credenciais de cluster OpenSearch e usá-lo no pipeline YAML para estabelecer conexão com o cluster OpenSearch.
  • OpenSearch Sink: O sumidouro contém OCIDs do cluster OpenSearch com nomes de índice para ingestão.
  • origemoci-object: O Prepper de Dados suporta ingestão baseada em varredura usando o Object Storage, que tem muitas configurações suportadas. Você pode configurar a origem para ingerir objetos dentro do bucket do Object Storage com base na frequência programada ou não com base em qualquer programação. Você tem as seguintes opções de varredura
    • Uma ou mais verificações de tempo: Esta opção permite configurar o pipeline que lê objetos nos buckets do Object Storage uma ou mais vezes com base no horário da última modificação dos objetos.
    • Verificação baseada em programação: Esta opção permite que você programe uma verificação em um intervalo regular após a criação do pipeline.

A tabela a seguir lista as opções que você pode usar para configurar a origem do Object Storage.

Configurações do Serviço Object Storage
Opções Obrigatório Tipo Descrição
acknowledgments Não Booliano Quando true, permite que origens de objetos do serviço Object Storage recebam confirmações de ponta a ponta quando eventos são recebidos por sumidouros OpenSearch.
buffer_timeout Não Duração O tempo permitido para gravar eventos no buffer Prepper de Dados antes do timeout ocorrer. Todos os eventos que a origem do OCI não puder gravar no buffer durante o período especificado serão descartados. O padrão é 10s.
codec Sim Codec O codec da preparação de dados a ser aplicado.
compression Não String O algoritmo de compactação a ser aplicado: none, gzip, snappy ou automatic. O padrão é none.
delete_oci_objects_on_read Não Booliano Quando true, a verificação de origem do serviço Object Storage tenta excluir objetos do serviço Object Storage depois que todos os eventos do objeto do serviço Object Storage são confirmados com sucesso por todos os sumidouros. acknowledgments deve ser ativado ao excluir objetos do serviço Object Storage. O padrão é false. A exclusão não funcionará se acknowledgments de ponta a ponta não estiver ativado.
oci Não OCI A configuração do OCI. Consulte a seção do OCI a seguir para obter mais informações.
records_to_accumulate Não Inteiro O número de mensagens que se acumulam antes de serem gravadas no buffer. O padrão é 100.
workers Não Inteiro Configura o número de threads de worker que a origem usa para ler dados do bucket do OCI. Deixando esse valor no padrão, a menos que seus objetos do serviço Object Storage tenham menos de 1 MB. O desempenho pode diminuir para objetos de Armazenamento de Objetos maiores. O padrão é 1.

Configuração do Pipeline de Armazenamento de Objetos - YAML

Veja a seguir um exemplo da configuração YAML do pipeline do Object Storage:

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username: 
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      compression: none
      scan:
        start_time: 2024-11-18T08:01:59.363Z
        buckets:
          - bucket:
              namespace: <namespace>
              name: <bucket-name>
  sink:
    - opensearch:
        hosts: [ <cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>
                        

Configurações de Amostra

Veja a seguir as opções de verificação única que você pode aplicar no nível de bucket individual do Object Storage ou no nível de verificação

oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "<namespace>"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-01-01T00:00:00Z
      compression: "none"

Hora Final

Veja a seguir um exemplo de hora de término:


simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               end_time: 2024-12-01T00:00:00Z
      compression: "none"

Hora Inicial e Hora Final

Veja a seguir um exemplo de hora de início e hora de término:

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               end_time: 2024-12-01T00:00:00Z
      compression: "none"

Faixa

Veja a seguir um exemplo de intervalo:

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               range: "PT12H"
      compression: "none"

Horário de Início, Horário de Término e Intervalo

Veja a seguir um exemplo de hora de início, hora de término e intervalo:

oci-object:
      codec:
        newline:
      scan:
        start_time: 2023-01-01T00:00:00Z
        end_time: 2024-12-01T00:00:00Z
        range: "PT12H"
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"

Filtro include_prefix

Veja a seguir um exemplo do filtro include_prefix:

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               filter:
                 include_prefix: ["newtest1", "10-05-2024"]
      compression: "none"

Filtrar arquivos da pasta

Veja a seguir um exemplo de arquivos de filtro da pasta. Para ler arquivos somente de pastas específicas, use o filtro para especificar pastas. Veja um exemplo de como incluir arquivos de folder2 em folder1 usando include_prefix.

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               filter:
                include_prefix: ["folder1/folder2"]
      compression: "none"

Filtro exclude_prefix

Veja a seguir um exemplo do filtro exclude_prefix:

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               filter:
                 include_prefix: ["newtest", "10-05-2024"]
                 exclude_suffix: [".png"]
      compression: "none"

Suporte ao Codec para JSON

Veja a seguir um exemplo de suporte a codec para JSON:

source:
    oci-object:
      acknowledgments: true
      codec:
        json: null
      scan:
        start_time: 2024-06-10T00:00:00Z
        end_time: 2024-06-10T23:00:00Z
        buckets:
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "data-prepper-object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
      compression: "none"

Suporte do Codec para CSV

Veja a seguir um exemplo de suporte de codec para CSV:


source:
    oci-object:
      acknowledgments: true
      codec:
        csv: null
      scan:
        start_time: 2024-06-10T00:00:00Z
        end_time: 2024-06-10T23:00:00Z
        buckets:
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "data-prepper-object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
      compression: "none"

Suporte ao Codec para Newline

Veja a seguir um exemplo de suporte de codec para nova linha:

source:
    oci-object:
      acknowledgments: true
      codec:
        newline: null
      scan:
        start_time: 2024-06-10T00:00:00Z
        end_time: 2024-06-10T23:00:00Z
        buckets:
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "data-prepper-object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
      compression: "none"

Programando Opções de Ingestão sem Contagem

Veja a seguir um exemplo de agendamento de opções de ingestão sem contagem:

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline: null
      scan:
        scheduling:
          interval: PT40S
        buckets:
          - bucket:
              namespace: idee4xpu3dvm
              name: data-prepper-object-storage-testing
      compression: none

Programando Opções de Ingestão com Contagem

Veja a seguir um exemplo de agendamento de opções de ingestão com contagem:

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline: null
      scan:
        scheduling:
          interval: PT40S
          count: 10
        buckets:
          - bucket:
              namespace: idee4xpu3dvm
              name: data-prepper-object-storage-testing
      compression: none

Programando Opções de Ingestão com Hora Inicial

Veja a seguir um exemplo de agendamento de opções de ingestão com horário de início:


oci-object:
      codec:
        newline:
      scan:
        scheduling:
          interval: "PT40S"
          count: 10
        start_time: 2023-01-01T00:00:00Z
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
      compression: "none"

Programando Opções de Ingestão com Hora Final

Veja a seguir um exemplo de agendamento de opções de ingestão com hora de término:

oci-object:
      codec:
        newline:
      scan:
        scheduling:
          interval: "PT40S"
          count: 10
        end_time: 2023-01-01T00:00:00Z
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
      compression: "none"

Kafka YAML

A origem Kafka não requer nenhuma coordenação de origem.

Para obter informações sobre todas as configurações disponíveis para a Origem Kafka, acesse o seguinte link:

https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kafka/

Você pode usar o OCI Streaming Service como origem do Kafka para fazer a ingestão no cluster OpenSearch. Para obter informações sobre como fazer isso, consulte Usando APIs de Kafka.

Observação

O número de nós não pode exceder o número máximo de partições definidas para o tópico.

Acesso Público ao OCI Streaming YAML

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
kafka-pipeline:
  source:
    kafka:
      bootstrap_servers:
        - <bootstrap_servers>
      topics:
        - name: <topic_name>
          group_id: <group_id>
      acknowledgments: true
      encryption:
        type: ssl
        insecure: false
      authentication:
        sasl:
          oci:
            stream_pool_id: <target-stream-pool-ocid>
  sink:
    - opensearch:
        hosts: [ <opensearch-cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>

Acesso Privado YAML do Serviço OCI Streaming de Pipelines

Veja a seguir um exemplo do YAML para os pipelines OpenSearch no serviço OCI Streaming:

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
kafka-pipeline:
  source:
    kafka:
      bootstrap_servers:
        - <bootstrap_servers>
      topics:
        - name: <topic_name>
          group_id: <group_id>
      acknowledgments: true
      encryption:
        type: ssl
        insecure: false
      authentication:
        sasl:
          oci:
            stream_pool_id: <target-stream-pool-ocid>
  sink:
    - opensearch:
        hosts: [ <opensearch-cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>
                    

Kafka YAML Autogerenciado

Veja a seguir um exemplo do Kafka YAML autogerenciado para o OpenSearch:

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
      kafka-credentials:
        secret_id: <secret-ocid>
simple-sample-pipeline:
  source:
    kafka:
      bootstrap_servers:
        - "https://<bootstrap_server_fqdn>:9092"
      topics:
        - name: <topic_name>
          group_id: <group_id>
      acknowledgments: true
      encryption:
        type: ssl
        insecure: false
        certificate: <certificate-in-pem-format>
      authentication:
        sasl:
          plaintext:
            username: ${{oci_secrets:kafka-credentials:username}}
            password: ${{oci_secrets:kafka-credentials:password}}
  sink:
    - opensearch:
        hosts: [ <opensearch-cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>
                    

Pipeline PUSH

Políticas de Rede Comuns

Políticas a serem adicionadas para permitir que o serviço OpenSearch crie, leia e atualize a exclusão dos pontos finais privados na sub-rede do cliente.

Allow group SearchOpenSearchAdmins to manage vnics in compartment <network_resources_compartment>
                    
Allow group SearchOpenSearchAdmins to manage vcns in compartment <network_resources_compartment>
                    
Allow group SearchOpenSearchAdmins to manage subnets in compartment <network_resources_compartment>
                    
Allow group SearchOpenSearchAdmins to use network-security-groups in compartment <network_resources_compartment>
                    

Buffer Persistente

Os pipelines OpenSearch com conectores push do Data Prepper requerem buffer persistente para armazenar o buffer baseado em disco a fim de adicionar durabilidade aos seus dados. Isso é obrigatório em conectores push para evitar a perda de dados em caso de qualquer mau funcionamento do nó. O OCI Streaming Service é o único buffer persistente suportado.

Observação

Cada pipeline de ingestão de dados deve ter um buffer dedicado. Compartilhar o mesmo buffer em vários pipelines pode levar à corrupção de dados por causa de dados sobrepostos ou mistos.

A tabela a seguir lista os componentes de origem do pipeline de ingestão de dados e se isso requer buffer.

OpenSearch Requisitos de Buffer Persistente do Pipeline
Origem Buffer Persistente Obrigatório
Armazenamento de Objetos Não
Kafka Autogerido Não
Serviço OCI Streaming (Público) Não
Serviço OCI Streaming (Privado) Não
OpenTelemetry (Logs, Métricas ou Rastreamento) Sim
HTTP Sim
Observação

O número de nós não pode exceder o número máximo de partições definidas para o tópico.

Políticas

Use as seguintes políticas para aplicar permissões relacionadas ao buffer persistente para o OCI Streaming Service com pontos finais públicos:

  • Para permitir que os pipelines OpenSearch consumam e produzam os registros no OCI Streaming Service:
    Allow any-user to {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME, STREAM_PRODUCE} in compartment '<compartment_name>' where ALL {request.principal.type='opensearchpipeline', target.streampool.id = '<target_stream_pool_ocid>'}
    
  • Para permitir que pipelines OpenSearch leiam pools de streams no OCI Streaming Service:
    Allow any-user to read stream-pools in compartment '<compartment_name>' where ALL {request.principal.type='opensearchpipeline', target.streampool.id = '<target_stream_pool_ocid>'}
    

Para obter mais informações, consulte Detalhes do Serviço de Streaming.

Regras de Segurança de Rede

Essa configuração só é necessária para o serviço Private OCI Streaming e o Kafka Autogerenciado. No caso do serviço Public OCI Streaming, selecione nenhum.

Adicione uma regra de segurança de entrada na Lista de Segurança da sub-rede ou no Grupo de Segurança de Rede a todos os pipelines OpenSearch para se comunicar com o serviço privado OCI Streaming em execução na sua sub-rede.

Para adicionar a regra de segurança, consulte Regras de Segurança e Acesso e Segurança.

Para localizar o CIDR da sua sub-rede, consulte Obtendo Detalhes de uma Sub-rede.

A imagem a seguir mostra as regras de entrada para o Grupo de Segurança de Rede.

Regras de entrada para Grupos de Segurança de Rede

OpenTelemetry Logs

OpenTelemetry Os logs exigem buffer persistente.

A porta para Logs do OpenTelemetry é 21892, que não pode ser alterada.

Para obter informações de configuração, consulte origem de logs OTel.

Você pode usar o Serviço OCI Streaming como buffer persistente. Para obter mais informações, consulte Usando APIs do Kafka.

OpenTelemetry Configuração do Pipeline de Logs

O exemplo a seguir mostra uma configuração de pipeline para Logs OpenTelemetry:

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret_ocid>
      opensearch-password:
        secret_id: <secret_ocid>
      pipeline-username:
        secret_id: <secret_ocid>
      pipeline-password:
        secret_id: <secret_ocid>
sample-log-pipeline:
  source:
    otel_logs_source:
      authentication:
        http_basic:
          username: '${{oci_secrets:pipeline-username}}'
          password: '${{oci_secrets:pipeline-password}}'
  buffer:
    kafka:
# Idempotence should be 'false' for OCI Streaming Service
        producer_properties:
          enable_idempotence: false
        bootstrap_servers:
        - <bootstrap_servers>
        topics:
          - name: <topic_name>
            group_id: <group_id>
        encryption:
          type: ssl
          insecure: false
        authentication:
          sasl:
            oci:
               stream_pool_id: <target_stream_pool_ocid>
  sink:
    - opensearch:
        hosts: [ <cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: 'otel-logs-%{yyyy.MM.dd}'

OpenTelemetry Métricas

OpenTelemetry As métricas exigem buffer persistente.

A porta para as Métricas OpenTelemetry é 21891, que não pode ser alterada.

Para obter informações de configuração, consulte a origem de métricas OTel.

Você pode usar o Serviço OCI Streaming como buffer persistente. Para obter mais informações, consulte Usando APIs do Kafka.

OpenTelemetry Configuração do Pipeline de Métricas

O exemplo a seguir mostra uma configuração de pipeline para Métricas do OpenTelemetry:

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret_ocid>
      opensearch-password:
        secret_id: <secret_ocid>
      pipeline-username:
        secret_id: <secret_ocid>
      pipeline-password:
        secret_id: <secret_ocid>
sample-metrics-pipeline:
  source:
    otel_metrics_source:
      authentication:
        http_basic:
          username: '${{oci_secrets:pipeline-username}}'
          password: '${{oci_secrets:pipeline-password}}'
  buffer:
    kafka:
# Idempotence should be 'false' for OCI Streaming Service
        producer_properties:
          enable_idempotence: false
        bootstrap_servers:
        - <bootstrap_servers>
        topics:
          - name: <topic_name>
            group_id: <group_id>
        encryption:
          type: ssl
          insecure: false
        authentication:
          sasl:
            oci:
              stream_pool_id: <target_stream_pool_ocid>
  sink:
    - opensearch:
        hosts: [ <cluster_ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: 'otel-metrics-%{yyyy.MM.dd}'

OpenTelemetry Rastreamento

OpenTelemetry O rastreamento requer buffer persistente.

A porta para o Rastreamento OpenTelemetry é 21890, que não pode ser alterada.

Para obter informações de configuração, consulte a origem de rastreamento OTel.

Você pode usar o Serviço OCI Streaming como buffer persistente. Para obter mais informações, consulte Usando APIs do Kafka.

OpenTelemetry Configuração do Pipeline de Rastreamento

O exemplo a seguir mostra uma configuração de pipeline para o Rastreamento OpenTelemetry:

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret_ocid>
      opensearch-password:
        secret_id: <secret_ocid>
      pipeline-username:
        secret_id: <secret_ocid>
      pipeline-password:
        secret_id: <secret_ocid>
traces-entry-shared-pipeline:
  source:
    otel_trace_source:
      authentication:
        http_basic:
          username: '${{oci_secrets:pipeline-username}}'
          password: '${{oci_secrets:pipeline-password}}'
  buffer:
    kafka:
# Idempotence should be 'false' for OCI Streaming Service
        producer_properties:
          enable_idempotence: false
        bootstrap_servers:
        - <bootstrap_servers>
        topics:
          - name: <topic_name>
            group_id: <group_id>
        encryption:
          type: ssl
          insecure: false
        authentication:
          sasl:
            oci:
              stream_pool_id: <target_stream_pool_ocid>
  sink:
    - pipeline:
        name: traces-pipeline
    - pipeline:
        name: service-map-pipeline
traces-pipeline:
  source:
    pipeline:
      name: traces-entry-shared-pipeline
  processor:
    - otel_traces:
  sink:
    - opensearch:
        hosts: [ <cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index_type: trace-analytics-raw
service-map-pipeline:
  source:
    pipeline:
      name: traces-entry-shared-pipeline
  processor:
    - service_map:
  sink:
    - opensearch:
        hosts: [ <cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index_type: trace-analytics-service-map

Conector Push HTTP

O Conector Push HTTP requer buffer persistente.

A porta do Conector Push HTTP é 2021, que não pode ser alterada.

Para obter informações de configuração, consulte origem HTTP.

Você pode usar o Serviço OCI Streaming como buffer persistente. Para obter mais informações, consulte Usando APIs do Kafka.

Configuração do Pipeline do Conector Push HTTP

O exemplo a seguir mostra uma configuração de pipeline para o Conector Push HTTP:

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret_ocid>
      opensearch-password:
        secret_id: <secret_ocid>
      http-username:
        secret_id: <secret_ocid>
      http-password:
        secret_id: <secret_ocid>
simple-sample-pipeline:
  source:
    http:
      authentication:
        http_basic:
          username: '${{oci_secrets:http-username}}'
          password: '${{oci_secrets:http-password}}'
  buffer:
    kafka:
# Idempotence should be 'false' for OCI Streaming Service
        producer_properties:
          enable_idempotence: false
        bootstrap_servers:
          - <bootstrap_servers>
        topics:
          - name: <topic_name>
            group_id: <group_id>
        encryption:
          type: ssl
          insecure: false
        authentication:
          sasl:
            oci:
              stream_pool_id: <target_stream_pool_ocid>
  sink:
    - opensearch:
        hosts: [ <cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index_name>

Conector Push HTTP com Vários Processadores

O exemplo a seguir mostra uma configuração de pipeline para o Conector Push HTTP com vários processadores:

# Keep caution while adding percentage symbols
version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret_ocid>
      opensearch-password:
        secret_id: <secret_ocid>
      http-username:
        secret_id: <secret_ocid>
      http-password:
        secret_id: <secret_ocid>
simple-sample-pipeline:
  source:
    http:
      path: "/logs"
      authentication:
        http_basic:
          username: '${{oci_secrets:http-username}}'
          password: '${{oci_secrets:http-password}}'
  buffer:
    kafka:
      # Idempotence should be 'false' for OCI Streaming Service
      producer_properties:
        enable_idempotence: false
      bootstrap_servers:
        - <bootstrap_servers>
      topics:
        - name: <topic_name>
          group_id: <group_id>
      encryption:
        type: ssl
        insecure: false
      authentication:
        sasl:
          oci:
            stream_pool_id: <target_stream_pool_ocid>
  processor:
    - grok:
        match:
          log: [ "%{COMMONAPACHELOG}" ]
    - date:
        from_time_received: true
        destination: "@timestamp"
    - substitute_string:
        entries:
          - source: "log"
            from: '\.'
            to: "-"
    - uppercase_string:
        with_keys:
          - "log"
    - trim_string:
        with_keys:
          - "log"
    - split_string:
        entries:
          - source: "request"
            delimiter: "?"
    - key_value:
        source: "/request/1"
        field_split_characters: "&"
        value_split_characters: "="
        destination: "query_params"
    - lowercase_string:
        with_keys:
          - "verb"
    - add_entries:
        entries:
          - key: "entry1"
            value: "entry1value"
          - key: "entry2"
            value: "entry2value"
          - key: "entry3"
            value: "entry3value"
    - rename_keys:
        entries:
          - from_key: "entry1"
            to_key: "renameEntry1"
          - from_key: "entry2"
            to_key: "renameEntry2"
          - from_key: "entry3"
            to_key: "renameEntry3"
    - copy_values:
        entries:
          - from_key: "log"
            to_key: "copy_key"
    - delete_entries:
        with_keys: [ "renameEntry1", "renameEntry2", "renameEntry3" ]
  sink:
    - opensearch:
        hosts: [ <cluster_ocid> ]
        username: '${{oci_secrets:opensearch-username}}'
        password: '${{oci_secrets:opensearch-password}}'
        insecure: false
        index: <index_name>