OpenSearch Pipelines
Crie e gerencie pipelines OpenSearch usando o Prepper de Dados para ingerir dados em um cluster OpenSearch.
O Data Prepper é um coletor de dados de código aberto que pode filtrar, enriquecer, transformar, normalizar e agregar dados para análise e visualização downstream. É uma das ferramentas de ingestão de dados mais recomendadas para processar conjuntos de dados grandes e complexos.
No momento, o recurso Cluster OpenSearch com Prepper de Dados está disponível no realm OC1.
Políticas Obrigatórias
Execute as seguintes tarefas antes de continuar com as etapas descritas neste tópico:
Se você não for um administrador em sua tenancy, entre em contato com os administradores da tenancy para conceder essas permissões a você. O administrador deve atualizar a permissão de usuários a seguir para permitir que usuários não administradores gerenciem e as operações CRUD dos pipelines
A seguinte política permite que o administrador conceda permissão a todos os usuários na respectiva tenancy:
Allow any-user to manage opensearch-cluster-pipeline in tenancy
A política a seguir permite que o administrador conceda permissão para um grupo em um compartimento (recomendado)
Allow group <group> to manage opensearch-cluster-pipeline in compartment <compartment>
em que <group>
é todos os usuários dentro desse grupo que podem acessar o recurso.
A política a seguir permite que os pipelines do OpenSearch leiam os segredos do Oracle Cloud Infrastructure Vault.
Allow any-user to read secret-bundles in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }' }
Allow any-user to read secrets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }
Políticas do Serviço Object Storage
Estas políticas só são necessárias para a origem do Object Storage:
A seguinte política permite que pipelines OpenSearch usem um bucket do serviço Object Storage como persistência de coordenação de origem:
Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<source-coordination-bucket-name>'}
A seguinte política permite que pipelines OpenSearch ingeram objetos do serviço Object Storage:
Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
A seguinte política permite que pipelines do OpenSearch leiam buckets do serviço Object Storage:
Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
A política a seguir permite que pipelines do OpenSearch leiam buckets do bucket de coordenação de origem.
Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
OCI Streaming e Políticas do Kafka Autogerenciadas
Essas políticas são necessárias para o serviço OCI Streaming ou origens Kafka autogerenciadas.
Políticas de Rede Comuns
Essas políticas não são necessárias para o serviço público do OCI Streaming.
Políticas a serem adicionadas para permitir que o serviço OpenSearch crie, leia e atualize a exclusão dos pontos finais privados na sub-rede do cliente.
Allow group SearchOpenSearchAdmins to manage vnics in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage vcns in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage subnets in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to use network-security-groups in compartment <network_resources_compartment>
Políticas do OCI Streaming Service (Públicas e Privadas)
Essas políticas só são necessárias para o serviço OCI Streaming.
A política a seguir permite que os pipelines do OpenSearch consumam os registros do serviço OCI Streaming.
Allow ANY-USER TO {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in compartment '<compartment-name>'
where ALL {request.principal.type='opensearchpipeline', target.streampool.id = '<target-stream-pool-ocid>'}
A política a seguir permite que pipelines OpenSearch leiam pools de streams do serviço de streaming do OCI
Allow ANY-USER TO read stream-pools in compartment '<compartment-name>' where ALL {request.principal.type='opensearchpipeline',
target.streampool.id = '<target-stream-pool-ocid>'}
Permissão Kafka Autogerenciada
Essas permissões só são necessárias para origens Kafka autogerenciadas.
Selecione o seguinte link para adicionar a permissão necessária para listar os tópicos, descrever os tópicos, ingressar no grupo e consumir os registros do tópico pelo pipeline OpenSearch:
Regras de Segurança de Rede
Essa configuração só é necessária para o serviço Private OCI Streaming e o Kafka Autogerenciado. No caso do serviço Public OCI Streaming, não selecione nenhum.
Adicione uma regra de segurança de entrada na Lista de Segurança da sub-rede ou no Grupo de Segurança de Rede a todos os pipelines OpenSearch para se comunicar com o serviço privado OCI Streaming em execução na sua sub-rede.
Para adicionar a regra de segurança, consulte Regras de Segurança e Acesso e Segurança.
Para localizar o CIDR da sua sub-rede, consulte Obtendo Detalhes de uma Sub-rede.
A imagem a seguir mostra as regras de entrada para o Grupo de Segurança de Rede.

Criando os Segredos no Vault
Todos os segredos de texto sem formatação exigidos pelos pipelines OpenSearch devem ser passados para ele por meio do Vault, pois os pipelines OpenSearch não aceitam segredos de texto sem formatação, como nomes de usuário e senhas no código yaml dos pipelines.
Execute as seguintes tarefas:
- Crie um novo usuário com permissões de gravação no cluster OpenSearch. Para obter instruções, consulte os seguintes tópicos da documentação do OpenSearch:
- Crie segredos (nome de usuário e senha) no Vault para o novo usuário que você criou. Para obter instruções, consulte os seguintes tópicos do Oracle Cloud Infrastructure Vault:
- Adicione as políticas de controlador de recursos a seguir na tenancy do OpenSearch para permitir que os pipelines do OpenSearch leiam os segredos do Vault.
ALLOW ANY-USER to read secrets in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' } ALLOW ANY-USER to read secret-bundles in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }
Para obter mais informações sobre como criar políticas para o serviço Vault, consulte Detalhes do Serviço Vault.
Você pode executar as seguintes tarefas de pipeline OpenSearch:
Listar os pipelines OpenSearch em um compartimento.
Crie um novo pipeline do OpenSearch.
Obtenha os detalhes de um pipeline do OpenSearch.
Processadores Suportados
A tabela a seguir lista esses processadores são suportados no pipeline.
Object Storage e Coordenação de Origem YAML
A origem do Object Storage depende da configuração de coordenação da origem. O pipeline OpenSearch suporta coordenação de origem usando o Object Storage como persistência. Você deve fornecer os detalhes do bucket do Object Storage da sua tenancy.
Veja abaixo um exemplo de coordenação de origem:
source_coordination:
store:
oci-object-bucket:
name: <OCI Object storage bucket-name details from their tenancy>
namespace: <namespace>
Para obter informações sobre como obter o namespace do serviço Object Storage da sua tenancy, consulte Noções Básicas de Namespaces do Serviço Object Storage.
Veja a seguir um exemplo de coordenação de origem usando a persistência do serviço Object Storage:
source_coordination:
store:
oci-object-bucket:
name: "dataprepper-test-pipelines" <-- bucket name
namespace: "idee4xpu3dvm". <-- namespace
YAML do Serviço Object Storage
Veja a seguir as seções do YAML de configuração do pipeline que você deve conhecer:
- Segredos do OCI: Você pode criar um segredo no serviço Vault com suas credenciais de cluster OpenSearch e usá-lo no pipeline YAML para estabelecer conexão com o cluster OpenSearch.
- OpenSearch Sink: O sumidouro contém OCIDs do cluster OpenSearch com nomes de índice para ingestão.
- origemoci-object: O Prepper de Dados suporta ingestão baseada em varredura usando o Object Storage, que tem muitas configurações suportadas. Você pode configurar a origem para ingerir objetos dentro do bucket do Object Storage com base na frequência programada ou não com base em qualquer programação. Você tem as seguintes opções de varredura
- Uma ou mais verificações de tempo: Esta opção permite configurar o pipeline que lê objetos nos buckets do Object Storage uma ou mais vezes com base no horário da última modificação dos objetos.
- Verificação baseada em programação: Esta opção permite que você programe uma verificação em um intervalo regular após a criação do pipeline.
A tabela a seguir lista as opções que você pode usar para configurar a origem do Object Storage.
Opções | Obrigatório | Tipo | Descrição |
---|---|---|---|
acknowledgments |
Não | Booliano | Quando true , permite que origens de objetos do serviço Object Storage recebam confirmações de ponta a ponta quando eventos são recebidos por sumidouros OpenSearch. |
buffer_timeout |
Não | Duração | O tempo permitido para gravar eventos no buffer Prepper de Dados antes do timeout ocorrer. Todos os eventos que a origem do OCI não puder gravar no buffer durante o período especificado serão descartados. O padrão é 10s . |
codec |
Sim | Codec | O codec da preparação de dados a ser aplicado. |
compression |
Não | String | O algoritmo de compactação a ser aplicado: none , gzip , snappy ou automatic . O padrão é none . |
delete_oci_objects_on_read |
Não | Booliano | Quando true , a verificação de origem do serviço Object Storage tenta excluir objetos do serviço Object Storage depois que todos os eventos do objeto do serviço Object Storage são confirmados com sucesso por todos os sumidouros. acknowledgments deve ser ativado ao excluir objetos do serviço Object Storage. O padrão é false . A exclusão não funcionará se acknowledgments de ponta a ponta não estiver ativado. |
oci |
Não | OCI | A configuração do OCI. Consulte a seção do OCI a seguir para obter mais informações. |
records_to_accumulate |
Não | Inteiro | O número de mensagens que se acumulam antes de serem gravadas no buffer. O padrão é 100 . |
workers |
Não | Inteiro | Configura o número de threads de worker que a origem usa para ler dados do bucket do OCI. Deixando esse valor no padrão, a menos que seus objetos do serviço Object Storage tenham menos de 1 MB. O desempenho pode diminuir para objetos de Armazenamento de Objetos maiores. O padrão é 1 . |
Configuração do Pipeline de Armazenamento de Objetos - YAML
Veja a seguir um exemplo da configuração YAML do pipeline do Object Storage:
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
compression: none
scan:
start_time: 2024-11-18T08:01:59.363Z
buckets:
- bucket:
namespace: <namespace>
name: <bucket-name>
sink:
- opensearch:
hosts: [ <cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
Configurações de Amostra
Veja a seguir as opções de verificação única que você pode aplicar no nível de bucket individual do Object Storage ou no nível de verificação
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "<namespace>"
name: "data-prepper-object-storage-testing"
start_time: 2023-01-01T00:00:00Z
compression: "none"
Hora Final
Veja a seguir um exemplo de hora de término:
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
end_time: 2024-12-01T00:00:00Z
compression: "none"
Hora Inicial e Hora Final
Veja a seguir um exemplo de hora de início e hora de término:
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
end_time: 2024-12-01T00:00:00Z
compression: "none"
Faixa
Veja a seguir um exemplo de intervalo:
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
range: "PT12H"
compression: "none"
Horário de Início, Horário de Término e Intervalo
Veja a seguir um exemplo de hora de início, hora de término e intervalo:
oci-object:
codec:
newline:
scan:
start_time: 2023-01-01T00:00:00Z
end_time: 2024-12-01T00:00:00Z
range: "PT12H"
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
Filtro include_prefix
Veja a seguir um exemplo do filtro include_prefix
:
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
filter:
include_prefix: ["newtest1", "10-05-2024"]
compression: "none"
Filtrar arquivos da pasta
Veja a seguir um exemplo de arquivos de filtro da pasta. Para ler arquivos somente de pastas específicas, use o filtro para especificar pastas. Veja um exemplo de como incluir arquivos de folder2 em folder1 usando include_prefix
.
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
filter:
include_prefix: ["folder1/folder2"]
compression: "none"
Filtro exclude_prefix
Veja a seguir um exemplo do filtro exclude_prefix
:
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
filter:
include_prefix: ["newtest", "10-05-2024"]
exclude_suffix: [".png"]
compression: "none"
Suporte ao Codec para JSON
Veja a seguir um exemplo de suporte a codec para JSON:
source:
oci-object:
acknowledgments: true
codec:
json: null
scan:
start_time: 2024-06-10T00:00:00Z
end_time: 2024-06-10T23:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
- bucket:
namespace: "idee4xpu3dvm"
name: "object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
compression: "none"
Suporte do Codec para CSV
Veja a seguir um exemplo de suporte de codec para CSV:
source:
oci-object:
acknowledgments: true
codec:
csv: null
scan:
start_time: 2024-06-10T00:00:00Z
end_time: 2024-06-10T23:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
- bucket:
namespace: "idee4xpu3dvm"
name: "object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
compression: "none"
Suporte ao Codec para Newline
Veja a seguir um exemplo de suporte de codec para nova linha:
source:
oci-object:
acknowledgments: true
codec:
newline: null
scan:
start_time: 2024-06-10T00:00:00Z
end_time: 2024-06-10T23:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
- bucket:
namespace: "idee4xpu3dvm"
name: "object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
compression: "none"
Programando Opções de Ingestão sem Contagem
Veja a seguir um exemplo de agendamento de opções de ingestão sem contagem:
simple-sample-pipeline:
source:
oci-object:
codec:
newline: null
scan:
scheduling:
interval: PT40S
buckets:
- bucket:
namespace: idee4xpu3dvm
name: data-prepper-object-storage-testing
compression: none
Programando Opções de Ingestão com Contagem
Veja a seguir um exemplo de agendamento de opções de ingestão com contagem:
simple-sample-pipeline:
source:
oci-object:
codec:
newline: null
scan:
scheduling:
interval: PT40S
count: 10
buckets:
- bucket:
namespace: idee4xpu3dvm
name: data-prepper-object-storage-testing
compression: none
Programando Opções de Ingestão com Hora Inicial
Veja a seguir um exemplo de agendamento de opções de ingestão com horário de início:
oci-object:
codec:
newline:
scan:
scheduling:
interval: "PT40S"
count: 10
start_time: 2023-01-01T00:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
compression: "none"
Programando Opções de Ingestão com Hora Final
Veja a seguir um exemplo de agendamento de opções de ingestão com hora de término:
oci-object:
codec:
newline:
scan:
scheduling:
interval: "PT40S"
count: 10
end_time: 2023-01-01T00:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
compression: "none"
Kafka YAML
A origem Kafka não requer nenhuma coordenação de origem.
Para obter informações sobre todas as configurações disponíveis para a Origem Kafka, acesse o seguinte link:
https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kafka/
Você pode usar o OCI Streaming Service como origem do Kafka para fazer a ingestão no cluster OpenSearch. Para obter informações sobre como fazer isso, consulte Usando APIs de Kafka.
Acesso Público ao OCI Streaming YAML
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
kafka-pipeline:
source:
kafka:
bootstrap_servers:
- <bootstrap_servers>
topics:
- name: <topic_name>
group_id: <group_id>
acknowledgments: true
encryption:
type: ssl
insecure: false
authentication:
sasl:
oci:
stream_pool_id: <target-stream-pool-ocid>
sink:
- opensearch:
hosts: [ <opensearch-cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
Acesso Privado YAML do Serviço OCI Streaming de Pipelines
Veja a seguir um exemplo do YAML para os pipelines OpenSearch no serviço OCI Streaming:
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
kafka-pipeline:
source:
kafka:
bootstrap_servers:
- <bootstrap_servers>
topics:
- name: <topic_name>
group_id: <group_id>
acknowledgments: true
encryption:
type: ssl
insecure: false
authentication:
sasl:
oci:
stream_pool_id: <target-stream-pool-ocid>
sink:
- opensearch:
hosts: [ <opensearch-cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
Kafka YAML Autogerenciado
Veja a seguir um exemplo do Kafka YAML autogerenciado para o OpenSearch:
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
kafka-credentials:
secret_id: <secret-ocid>
simple-sample-pipeline:
source:
kafka:
bootstrap_servers:
- "https://<bootstrap_server_fqdn>:9092"
topics:
- name: <topic_name>
group_id: <group_id>
acknowledgments: true
encryption:
type: ssl
insecure: false
certificate: <certificate-in-pem-format>
authentication:
sasl:
plaintext:
username: ${{oci_secrets:kafka-credentials:username}}
password: ${{oci_secrets:kafka-credentials:password}}
sink:
- opensearch:
hosts: [ <opensearch-cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>