Pipeline OpenSearch
Creare e gestire le pipeline OpenSearch utilizzando Data Prepper per includere dati in un cluster OpenSearch.
Prepper dati è un raccoglitore di dati open source in grado di filtrare, arricchire, trasformare, normalizzare e aggregare i dati per l'analisi e la visualizzazione a valle. Si tratta di uno degli strumenti di inclusione dati più consigliati per l'elaborazione di set di dati grandi e complessi.
La funzione Cluster OpenSearch con prepper dati è attualmente disponibile nel realm OC1.
Criteri richiesti
Completare i task riportati di seguito prima di procedere con i passi descritti in questo argomento.
Se non sei un amministratore della tenancy, contatta gli amministratori della tenancy per concedere queste autorizzazioni all'utente. L'amministratore deve aggiornare l'autorizzazione utente seguente per consentire agli utenti non amministratori di gestire e CRUD le operazioni delle pipeline
Il criterio riportato di seguito consente all'amministratore di concedere l'autorizzazione a tutti gli utenti nella rispettiva tenancy.
Allow any-user to manage opensearch-cluster-pipeline in tenancy
Il criterio seguente consente all'amministratore di concedere l'autorizzazione per un gruppo in un compartimento (consigliato)
Allow group <group> to manage opensearch-cluster-pipeline in compartment <compartment>
dove <group>
indica che tutti gli utenti all'interno di tale gruppo possono accedere alla risorsa.
Il criterio riportato di seguito consente alle pipeline OpenSearch di leggere i segreti di Oracle Cloud Infrastructure Vault.
Allow any-user to read secret-bundles in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }' }
Allow any-user to read secrets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }
Criteri di storage degli oggetti
Questi criteri sono necessari solo per l'origine dello storage degli oggetti:
Il criterio seguente consente alle pipeline OpenSearch di utilizzare un bucket dello storage degli oggetti come persistenza del coordinamento di origine:
Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<source-coordination-bucket-name>'}
Il criterio seguente consente alle pipeline OpenSearch di includere gli oggetti dallo storage degli oggetti:
Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
Il criterio seguente consente alle pipeline OpenSearch di leggere i bucket dallo storage degli oggetti:
Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
Il criterio riportato di seguito consente alle pipeline OpenSearch di leggere i bucket dal bucket di coordinamento di origine.
Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
Criteri di OCI Streaming e Kafka autogestiti
Questi criteri sono necessari per il servizio di streaming OCI o le origini Kafka autogestite.
Criteri di rete comuni
Questi criteri non sono necessari per il servizio OCI Streaming pubblico.
Criteri da aggiungere per consentire al servizio OpenSearch di creare, leggere e aggiornare l'eliminazione degli endpoint privati nella subnet del cliente.
Allow group SearchOpenSearchAdmins to manage vnics in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage vcns in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage subnets in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to use network-security-groups in compartment <network_resources_compartment>
Criteri del servizio di streaming OCI (pubblico e privato)
Questi criteri sono necessari solo per il servizio di streaming OCI.
Il criterio riportato di seguito consente alle pipeline OpenSearch di utilizzare i record del servizio di streaming OCI.
Allow ANY-USER TO {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in compartment '<compartment-name>'
where ALL {request.principal.type='opensearchpipeline', target.streampool.id = '<target-stream-pool-ocid>'}
Il criterio seguente consente alle pipeline OpenSearch di leggere i pool di flussi dal servizio di streaming OCI
Allow ANY-USER TO read stream-pools in compartment '<compartment-name>' where ALL {request.principal.type='opensearchpipeline',
target.streampool.id = '<target-stream-pool-ocid>'}
Autorizzazione Kafka autogestita
Queste autorizzazioni sono necessarie solo per le origini Kafka autogestite.
Selezionare il collegamento seguente per aggiungere l'autorizzazione necessaria per elencare gli argomenti, descrivere gli argomenti, partecipare al gruppo e utilizzare i record dell'argomento mediante la pipeline OpenSearch.
Regole di sicurezza di rete
Questa configurazione è necessaria solo per il servizio di streaming OCI privato e il Kafka autogestito. Nel caso di un servizio di streaming OCI pubblico, non selezionarne nessuno.
Aggiungere una regola di sicurezza in entrata nella lista di sicurezza della subnet o nel gruppo di sicurezza di rete a tutte le pipeline OpenSearch per comunicare al servizio OCI Streaming privato in esecuzione nella subnet.
Per aggiungere la regola di sicurezza, vedere Regole di sicurezza e Accesso e sicurezza.
Per trovare il CIDR della subnet, vedere Ottenere i dettagli di una subnet.
L'immagine seguente mostra le regole di entrata per il gruppo di sicurezza di rete.

Creazione dei segreti nel vault
Tutti i segreti di testo non codificato richiesti dalle pipeline OpenSearch devono essere trasmessi tramite Vault, poiché le pipeline OpenSearch non accettano segreti di testo non codificato come nomi utente e password nel codice yaml delle pipeline.
Eseguire i task riportati di seguito.
- Creare un nuovo utente con autorizzazioni di scrittura nel cluster OpenSearch. Per istruzioni, consultare i seguenti argomenti della documentazione OpenSearch:
- Creare i segreti (nome utente e password) nel vault per il nuovo utente creato. Per istruzioni, consulta i seguenti argomenti di Oracle Cloud Infrastructure Vault:
- Aggiungere i criteri principal delle risorse seguenti nella tenancy OpenSearch per consentire alle pipeline OpenSearch di leggere i segreti dal vault.
ALLOW ANY-USER to read secrets in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' } ALLOW ANY-USER to read secret-bundles in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }
Per ulteriori informazioni sulla creazione dei criteri per Vault, vedere Dettagli per il servizio Vault.
È possibile eseguire i task della pipeline OpenSearch riportati di seguito.
Elencare le pipeline OpenSearch in un compartimento.
Creare una nuova pipeline OpenSearch.
Ottieni i dettagli di una pipeline OpenSearch.
Processori supportati
Nella tabella seguente sono elencati i processori supportati nella pipeline.
Storage degli oggetti e coordinamento dell'origine YAML
L'origine dello storage degli oggetti dipende dalla configurazione del coordinamento di origine. La pipeline OpenSearch supporta il coordinamento dell'origine utilizzando lo storage degli oggetti come persistenza. Devi fornire i dettagli del bucket di storage degli oggetti dalla tua tenancy.
Di seguito viene fornito un esempio di coordinamento dell'origine.
source_coordination:
store:
oci-object-bucket:
name: <OCI Object storage bucket-name details from their tenancy>
namespace: <namespace>
Per informazioni su come ottenere lo spazio di nomi dello storage degli oggetti della tenancy, vedere Informazioni sugli spazi di nomi dello storage degli oggetti.
Di seguito è riportato un esempio di coordinamento dell'origine mediante la persistenza dello storage degli oggetti.
source_coordination:
store:
oci-object-bucket:
name: "dataprepper-test-pipelines" <-- bucket name
namespace: "idee4xpu3dvm". <-- namespace
Storage degli oggetti YAML
Di seguito sono riportate le sezioni della configurazione della pipeline YAML di cui tenere conto:
- Segreti OCI: è possibile creare un segreto nel vault con le credenziali del cluster OpenSearch e utilizzarlo nella pipeline YAML per la connessione al cluster OpenSearch.
- OpenSearch Sink: il sink contiene gli OCID del cluster OpenSearch con i nomi di indice per l'inclusione.
- origine oci-object: il prepper dati supporta l'inclusione basata sulla scansione utilizzando lo storage degli oggetti con molte configurazioni supportate. Puoi configurare l'origine in modo che includa gli oggetti all'interno del bucket di storage degli oggetti in base alla frequenza pianificata o meno in base a qualsiasi pianificazione. Sono disponibili le seguenti opzioni di scansione
- Una o più scansione temporale: questa opzione consente di configurare la pipeline che legge gli oggetti nei bucket di storage degli oggetti una o più volte in base all'ora dell'ultima modifica degli oggetti.
- Scansione basata sulla pianificazione: questa opzione consente di pianificare una scansione a intervalli regolari dopo la creazione della pipeline.
La tabella riportata di seguito elenca le opzioni che è possibile utilizzare per configurare l'origine dello storage degli oggetti.
Opzioni | Obbligatorio | Tipo | Descrizione |
---|---|---|---|
acknowledgments |
N | Boolean | Quando true , consente alle origini oggetto dello storage degli oggetti di ricevere conferme end-to-end quando gli eventi vengono ricevuti dai sink di OpenSearch. |
buffer_timeout |
N | Duration | Il periodo di tempo consentito per la scrittura di eventi nel buffer di Data Prepper prima che si verifichi il timeout. Tutti gli eventi che l'origine OCI non è in grado di scrivere nel buffer durante il periodo di tempo specificato vengono eliminati. L'impostazione predefinita è 10s . |
codec |
Sì | Codice | Il codec del prepper dati da applicare. |
compression |
N | Stringa | L'algoritmo di compressione da applicare: none , gzip , snappy o automatic . L'impostazione predefinita è none . |
delete_oci_objects_on_read |
N | Boolean | Quando si utilizza true , la scansione dell'origine dello storage degli oggetti tenta di eliminare gli oggetti di storage degli oggetti dopo che tutti gli eventi dell'oggetto di storage degli oggetti sono stati riconosciuti correttamente da tutti gli sink. Il comando acknowledgments deve essere abilitato quando si eliminano gli oggetti di storage degli oggetti. L'impostazione predefinita è false . L'eliminazione non funziona se acknowledgments end-to-end non è abilitato. |
oci |
N | OCI | Configurazione OCI. Per ulteriori informazioni, vedere la sezione OCI riportata di seguito. |
records_to_accumulate |
N | Numero intero | Il numero di messaggi accumulati prima di essere scritti nel buffer. L'impostazione predefinita è 100 . |
workers |
N | Numero intero | Configura il numero di thread operativi utilizzati dall'origine per leggere i dati dal bucket OCI. Lasciare questo valore predefinito a meno che gli oggetti di storage degli oggetti non siano inferiori a 1 MB. Le prestazioni potrebbero diminuire per gli oggetti di storage degli oggetti più grandi. L'impostazione predefinita è 1 . |
Configurazione pipeline di storage degli oggetti YAML
Di seguito è riportato un esempio della configurazione YAML della pipeline di storage degli oggetti.
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
compression: none
scan:
start_time: 2024-11-18T08:01:59.363Z
buckets:
- bucket:
namespace: <namespace>
name: <bucket-name>
sink:
- opensearch:
hosts: [ <cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
Configurazioni di esempio
Di seguito sono riportate le opzioni di scansione una tantum che puoi applicare a livello di singolo bucket di storage degli oggetti o a livello di scansione
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "<namespace>"
name: "data-prepper-object-storage-testing"
start_time: 2023-01-01T00:00:00Z
compression: "none"
Ora di fine
Di seguito è riportato un esempio di ora di fine.
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
end_time: 2024-12-01T00:00:00Z
compression: "none"
Ora inizio e ora fine
Di seguito è riportato un esempio di ora di inizio e di fine.
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
end_time: 2024-12-01T00:00:00Z
compression: "none"
A intervalli
Di seguito è riportato un esempio di intervallo:
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
range: "PT12H"
compression: "none"
Ora di inizio, ora di fine e intervallo
Di seguito è riportato un esempio di ora di inizio, ora di fine e intervallo.
oci-object:
codec:
newline:
scan:
start_time: 2023-01-01T00:00:00Z
end_time: 2024-12-01T00:00:00Z
range: "PT12H"
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
Filtro include_prefix
Di seguito è riportato un esempio di filtro include_prefix
.
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
filter:
include_prefix: ["newtest1", "10-05-2024"]
compression: "none"
Filtra file da cartella
Di seguito è riportato un esempio di file di filtro dalla cartella. Per leggere i file solo da cartelle specifiche, utilizzare il filtro per specificare le cartelle. Di seguito è riportato un esempio di inclusione dei file di folder2 in folder1 utilizzando include_prefix
.
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
filter:
include_prefix: ["folder1/folder2"]
compression: "none"
Filtro exclude_prefix
Di seguito è riportato un esempio di filtro exclude_prefix
.
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
filter:
include_prefix: ["newtest", "10-05-2024"]
exclude_suffix: [".png"]
compression: "none"
Supporto codec per JSON
Di seguito è riportato un esempio di supporto codec per JSON:
source:
oci-object:
acknowledgments: true
codec:
json: null
scan:
start_time: 2024-06-10T00:00:00Z
end_time: 2024-06-10T23:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
- bucket:
namespace: "idee4xpu3dvm"
name: "object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
compression: "none"
Supporto codec per CSV
Di seguito è riportato un esempio di supporto codec per CSV.
source:
oci-object:
acknowledgments: true
codec:
csv: null
scan:
start_time: 2024-06-10T00:00:00Z
end_time: 2024-06-10T23:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
- bucket:
namespace: "idee4xpu3dvm"
name: "object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
compression: "none"
Supporto del codec per Newline
Di seguito è riportato un esempio di supporto codec per newline:
source:
oci-object:
acknowledgments: true
codec:
newline: null
scan:
start_time: 2024-06-10T00:00:00Z
end_time: 2024-06-10T23:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
- bucket:
namespace: "idee4xpu3dvm"
name: "object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
compression: "none"
Pianificazione delle opzioni di inclusione senza conteggio
Di seguito è riportato un esempio di pianificazione delle opzioni di inclusione senza conteggio.
simple-sample-pipeline:
source:
oci-object:
codec:
newline: null
scan:
scheduling:
interval: PT40S
buckets:
- bucket:
namespace: idee4xpu3dvm
name: data-prepper-object-storage-testing
compression: none
Pianificazione delle opzioni di inclusione con conteggio
Di seguito è riportato un esempio di pianificazione delle opzioni di inclusione con conteggio.
simple-sample-pipeline:
source:
oci-object:
codec:
newline: null
scan:
scheduling:
interval: PT40S
count: 10
buckets:
- bucket:
namespace: idee4xpu3dvm
name: data-prepper-object-storage-testing
compression: none
Pianificazione delle opzioni di inclusione con ora di inizio
Di seguito è riportato un esempio di pianificazione delle opzioni di inclusione con l'ora di inizio.
oci-object:
codec:
newline:
scan:
scheduling:
interval: "PT40S"
count: 10
start_time: 2023-01-01T00:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
compression: "none"
Pianificazione delle opzioni di inclusione con ora di fine
Di seguito è riportato un esempio di pianificazione delle opzioni di inclusione con l'ora di fine.
oci-object:
codec:
newline:
scan:
scheduling:
interval: "PT40S"
count: 10
end_time: 2023-01-01T00:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
compression: "none"
Kafka YAML
L'origine Kafka non richiede alcun coordinamento di origine.
Per informazioni su tutte le configurazioni disponibili per l'origine Kafka, accedere al collegamento seguente:
https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kafka/
È possibile utilizzare il servizio di streaming OCI come origine Kafka da includere nel cluster OpenSearch. Per informazioni su come eseguire l'installazione, vedere Uso delle API Kafka.
Accesso pubblico streaming OCI YAML
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
kafka-pipeline:
source:
kafka:
bootstrap_servers:
- <bootstrap_servers>
topics:
- name: <topic_name>
group_id: <group_id>
acknowledgments: true
encryption:
type: ssl
insecure: false
authentication:
sasl:
oci:
stream_pool_id: <target-stream-pool-ocid>
sink:
- opensearch:
hosts: [ <opensearch-cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
Pipeline - Servizio di streaming OCI - Accesso privato YAML
Di seguito è riportato un esempio di YAML per le pipeline OpenSearch nel servizio di streaming OCI.
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
kafka-pipeline:
source:
kafka:
bootstrap_servers:
- <bootstrap_servers>
topics:
- name: <topic_name>
group_id: <group_id>
acknowledgments: true
encryption:
type: ssl
insecure: false
authentication:
sasl:
oci:
stream_pool_id: <target-stream-pool-ocid>
sink:
- opensearch:
hosts: [ <opensearch-cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
Kafka YAML autogestita
Di seguito è riportato un esempio di Kafka YAML autogestito per OpenSearch:
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
kafka-credentials:
secret_id: <secret-ocid>
simple-sample-pipeline:
source:
kafka:
bootstrap_servers:
- "https://<bootstrap_server_fqdn>:9092"
topics:
- name: <topic_name>
group_id: <group_id>
acknowledgments: true
encryption:
type: ssl
insecure: false
certificate: <certificate-in-pem-format>
authentication:
sasl:
plaintext:
username: ${{oci_secrets:kafka-credentials:username}}
password: ${{oci_secrets:kafka-credentials:password}}
sink:
- opensearch:
hosts: [ <opensearch-cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>