Pipeline OpenSearch

Creare e gestire le pipeline OpenSearch utilizzando Data Prepper per includere dati in un cluster OpenSearch.

Prepper dati è un raccoglitore di dati open source in grado di filtrare, arricchire, trasformare, normalizzare e aggregare i dati per l'analisi e la visualizzazione a valle. Si tratta di uno degli strumenti di inclusione dati più consigliati per l'elaborazione di set di dati grandi e complessi.

Utilizziamo il modello PULL per includere i dati nel cluster OpenSearch 2.x e integrarli con il servizio Oracle Cloud Infrastructure Streaming, il Kafka autogestito e lo storage degli oggetti.
Nota

La funzione Cluster OpenSearch con prepper dati è attualmente disponibile nel realm OC1.

Criteri richiesti

Completare i task riportati di seguito prima di procedere con i passi descritti in questo argomento.

Se non sei un amministratore della tenancy, contatta gli amministratori della tenancy per concedere queste autorizzazioni all'utente. L'amministratore deve aggiornare l'autorizzazione utente seguente per consentire agli utenti non amministratori di gestire e CRUD le operazioni delle pipeline

Il criterio riportato di seguito consente all'amministratore di concedere l'autorizzazione a tutti gli utenti nella rispettiva tenancy.

Allow any-user to manage opensearch-cluster-pipeline in tenancy

Il criterio seguente consente all'amministratore di concedere l'autorizzazione per un gruppo in un compartimento (consigliato)

Allow group <group> to manage opensearch-cluster-pipeline in compartment <compartment>

dove <group> indica che tutti gli utenti all'interno di tale gruppo possono accedere alla risorsa.

Il criterio riportato di seguito consente alle pipeline OpenSearch di leggere i segreti di Oracle Cloud Infrastructure Vault.

Allow any-user to read secret-bundles in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }' }
Allow any-user to read secrets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }

Criteri di storage degli oggetti

Questi criteri sono necessari solo per l'origine dello storage degli oggetti:

Il criterio seguente consente alle pipeline OpenSearch di utilizzare un bucket dello storage degli oggetti come persistenza del coordinamento di origine:

Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<source-coordination-bucket-name>'}

Il criterio seguente consente alle pipeline OpenSearch di includere gli oggetti dallo storage degli oggetti:

Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}

Il criterio seguente consente alle pipeline OpenSearch di leggere i bucket dallo storage degli oggetti:

Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}

Il criterio riportato di seguito consente alle pipeline OpenSearch di leggere i bucket dal bucket di coordinamento di origine.

Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}

Criteri di OCI Streaming e Kafka autogestiti

Questi criteri sono necessari per il servizio di streaming OCI o le origini Kafka autogestite.

Criteri di rete comuni

Nota

Questi criteri non sono necessari per il servizio OCI Streaming pubblico.

Criteri da aggiungere per consentire al servizio OpenSearch di creare, leggere e aggiornare l'eliminazione degli endpoint privati nella subnet del cliente.

Allow group SearchOpenSearchAdmins to manage vnics in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage vcns in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage subnets in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to use network-security-groups in compartment <network_resources_compartment>

Criteri del servizio di streaming OCI (pubblico e privato)

Questi criteri sono necessari solo per il servizio di streaming OCI.

Il criterio riportato di seguito consente alle pipeline OpenSearch di utilizzare i record del servizio di streaming OCI.

Allow ANY-USER TO {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in compartment '<compartment-name>' 
where ALL {request.principal.type='opensearchpipeline', target.streampool.id = '<target-stream-pool-ocid>'}

Il criterio seguente consente alle pipeline OpenSearch di leggere i pool di flussi dal servizio di streaming OCI

Allow ANY-USER TO read stream-pools in compartment '<compartment-name>' where ALL {request.principal.type='opensearchpipeline', 
target.streampool.id = '<target-stream-pool-ocid>'}

Autorizzazione Kafka autogestita

Queste autorizzazioni sono necessarie solo per le origini Kafka autogestite.

Selezionare il collegamento seguente per aggiungere l'autorizzazione necessaria per elencare gli argomenti, descrivere gli argomenti, partecipare al gruppo e utilizzare i record dell'argomento mediante la pipeline OpenSearch.

https://kafka.apache.org/documentation/#security_authz

Regole di sicurezza di rete

Questa configurazione è necessaria solo per il servizio di streaming OCI privato e il Kafka autogestito. Nel caso di un servizio di streaming OCI pubblico, non selezionarne nessuno.

Aggiungere una regola di sicurezza in entrata nella lista di sicurezza della subnet o nel gruppo di sicurezza di rete a tutte le pipeline OpenSearch per comunicare al servizio OCI Streaming privato in esecuzione nella subnet.

Per aggiungere la regola di sicurezza, vedere Regole di sicurezza e Accesso e sicurezza.

Per trovare il CIDR della subnet, vedere Ottenere i dettagli di una subnet.

L'immagine seguente mostra le regole di entrata per il gruppo di sicurezza di rete.

Regole di entrata per i gruppi di sicurezza di rete

Creazione dei segreti nel vault

Tutti i segreti di testo non codificato richiesti dalle pipeline OpenSearch devono essere trasmessi tramite Vault, poiché le pipeline OpenSearch non accettano segreti di testo non codificato come nomi utente e password nel codice yaml delle pipeline.

Eseguire i task riportati di seguito.

  • Creare un nuovo utente con autorizzazioni di scrittura nel cluster OpenSearch. Per istruzioni, consultare i seguenti argomenti della documentazione OpenSearch:

    Gruppi di azioni predefiniti

    Utenti e ruoli

    Cerca con i criteri IAM OpenSearch

  • Creare i segreti (nome utente e password) nel vault per il nuovo utente creato. Per istruzioni, consulta i seguenti argomenti di Oracle Cloud Infrastructure Vault:

    Gestione dei segreti del vault

    Creazione di un segreto in un vault

  • Aggiungere i criteri principal delle risorse seguenti nella tenancy OpenSearch per consentire alle pipeline OpenSearch di leggere i segreti dal vault.
    ALLOW ANY-USER to read secrets in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' } 
    ALLOW ANY-USER to read secret-bundles in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }

    Per ulteriori informazioni sulla creazione dei criteri per Vault, vedere Dettagli per il servizio Vault.

È possibile eseguire i task della pipeline OpenSearch riportati di seguito.

Elencare le pipeline OpenSearch in un compartimento.

Creare una nuova pipeline OpenSearch.

Ottieni i dettagli di una pipeline OpenSearch.

Modificare le impostazioni di una pipeline OpenSearch.

Eliminare una pipeline OpenSearch dalla tenancy.

Processori supportati

Storage degli oggetti e coordinamento dell'origine YAML

L'origine dello storage degli oggetti dipende dalla configurazione del coordinamento di origine. La pipeline OpenSearch supporta il coordinamento dell'origine utilizzando lo storage degli oggetti come persistenza. Devi fornire i dettagli del bucket di storage degli oggetti dalla tua tenancy.

Di seguito viene fornito un esempio di coordinamento dell'origine.

source_coordination:
  store:
    oci-object-bucket:
      name: <OCI Object storage bucket-name details from their tenancy>
      namespace: <namespace>

Per informazioni su come ottenere lo spazio di nomi dello storage degli oggetti della tenancy, vedere Informazioni sugli spazi di nomi dello storage degli oggetti.

Di seguito è riportato un esempio di coordinamento dell'origine mediante la persistenza dello storage degli oggetti.

source_coordination:
  store:
    oci-object-bucket:
      name: "dataprepper-test-pipelines" <-- bucket name
      namespace: "idee4xpu3dvm".         <-- namespace

Storage degli oggetti YAML

Di seguito sono riportate le sezioni della configurazione della pipeline YAML di cui tenere conto:

  • Segreti OCI: è possibile creare un segreto nel vault con le credenziali del cluster OpenSearch e utilizzarlo nella pipeline YAML per la connessione al cluster OpenSearch.
  • OpenSearch Sink: il sink contiene gli OCID del cluster OpenSearch con i nomi di indice per l'inclusione.
  • origine oci-object: il prepper dati supporta l'inclusione basata sulla scansione utilizzando lo storage degli oggetti con molte configurazioni supportate. Puoi configurare l'origine in modo che includa gli oggetti all'interno del bucket di storage degli oggetti in base alla frequenza pianificata o meno in base a qualsiasi pianificazione. Sono disponibili le seguenti opzioni di scansione
    • Una o più scansione temporale: questa opzione consente di configurare la pipeline che legge gli oggetti nei bucket di storage degli oggetti una o più volte in base all'ora dell'ultima modifica degli oggetti.
    • Scansione basata sulla pianificazione: questa opzione consente di pianificare una scansione a intervalli regolari dopo la creazione della pipeline.

La tabella riportata di seguito elenca le opzioni che è possibile utilizzare per configurare l'origine dello storage degli oggetti.

Configurazioni storage degli oggetti
Opzioni Obbligatorio Tipo Descrizione
acknowledgments N Boolean Quando true, consente alle origini oggetto dello storage degli oggetti di ricevere conferme end-to-end quando gli eventi vengono ricevuti dai sink di OpenSearch.
buffer_timeout N Duration Il periodo di tempo consentito per la scrittura di eventi nel buffer di Data Prepper prima che si verifichi il timeout. Tutti gli eventi che l'origine OCI non è in grado di scrivere nel buffer durante il periodo di tempo specificato vengono eliminati. L'impostazione predefinita è 10s.
codec Codice Il codec del prepper dati da applicare.
compression N Stringa L'algoritmo di compressione da applicare: none, gzip, snappy o automatic. L'impostazione predefinita è none.
delete_oci_objects_on_read N Boolean Quando si utilizza true, la scansione dell'origine dello storage degli oggetti tenta di eliminare gli oggetti di storage degli oggetti dopo che tutti gli eventi dell'oggetto di storage degli oggetti sono stati riconosciuti correttamente da tutti gli sink. Il comando acknowledgments deve essere abilitato quando si eliminano gli oggetti di storage degli oggetti. L'impostazione predefinita è false. L'eliminazione non funziona se acknowledgments end-to-end non è abilitato.
oci N OCI Configurazione OCI. Per ulteriori informazioni, vedere la sezione OCI riportata di seguito.
records_to_accumulate N Numero intero Il numero di messaggi accumulati prima di essere scritti nel buffer. L'impostazione predefinita è 100.
workers N Numero intero Configura il numero di thread operativi utilizzati dall'origine per leggere i dati dal bucket OCI. Lasciare questo valore predefinito a meno che gli oggetti di storage degli oggetti non siano inferiori a 1 MB. Le prestazioni potrebbero diminuire per gli oggetti di storage degli oggetti più grandi. L'impostazione predefinita è 1.

Configurazione pipeline di storage degli oggetti YAML

Di seguito è riportato un esempio della configurazione YAML della pipeline di storage degli oggetti.

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username: 
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      compression: none
      scan:
        start_time: 2024-11-18T08:01:59.363Z
        buckets:
          - bucket:
              namespace: <namespace>
              name: <bucket-name>
  sink:
    - opensearch:
        hosts: [ <cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>

Configurazioni di esempio

Di seguito sono riportate le opzioni di scansione una tantum che puoi applicare a livello di singolo bucket di storage degli oggetti o a livello di scansione

oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "<namespace>"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-01-01T00:00:00Z
      compression: "none"

Ora di fine

Di seguito è riportato un esempio di ora di fine.


simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               end_time: 2024-12-01T00:00:00Z
      compression: "none"

Ora inizio e ora fine

Di seguito è riportato un esempio di ora di inizio e di fine.

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               end_time: 2024-12-01T00:00:00Z
      compression: "none"

A intervalli

Di seguito è riportato un esempio di intervallo:

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               range: "PT12H"
      compression: "none"

Ora di inizio, ora di fine e intervallo

Di seguito è riportato un esempio di ora di inizio, ora di fine e intervallo.

oci-object:
      codec:
        newline:
      scan:
        start_time: 2023-01-01T00:00:00Z
        end_time: 2024-12-01T00:00:00Z
        range: "PT12H"
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"

Filtro include_prefix

Di seguito è riportato un esempio di filtro include_prefix.

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               filter:
                 include_prefix: ["newtest1", "10-05-2024"]
      compression: "none"

Filtra file da cartella

Di seguito è riportato un esempio di file di filtro dalla cartella. Per leggere i file solo da cartelle specifiche, utilizzare il filtro per specificare le cartelle. Di seguito è riportato un esempio di inclusione dei file di folder2 in folder1 utilizzando include_prefix.

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               filter:
                include_prefix: ["folder1/folder2"]
      compression: "none"

Filtro exclude_prefix

Di seguito è riportato un esempio di filtro exclude_prefix.

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               filter:
                 include_prefix: ["newtest", "10-05-2024"]
                 exclude_suffix: [".png"]
      compression: "none"

Supporto codec per JSON

Di seguito è riportato un esempio di supporto codec per JSON:

source:
    oci-object:
      acknowledgments: true
      codec:
        json: null
      scan:
        start_time: 2024-06-10T00:00:00Z
        end_time: 2024-06-10T23:00:00Z
        buckets:
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "data-prepper-object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
      compression: "none"

Supporto codec per CSV

Di seguito è riportato un esempio di supporto codec per CSV.


source:
    oci-object:
      acknowledgments: true
      codec:
        csv: null
      scan:
        start_time: 2024-06-10T00:00:00Z
        end_time: 2024-06-10T23:00:00Z
        buckets:
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "data-prepper-object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
      compression: "none"

Supporto del codec per Newline

Di seguito è riportato un esempio di supporto codec per newline:

source:
    oci-object:
      acknowledgments: true
      codec:
        newline: null
      scan:
        start_time: 2024-06-10T00:00:00Z
        end_time: 2024-06-10T23:00:00Z
        buckets:
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "data-prepper-object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
      compression: "none"

Pianificazione delle opzioni di inclusione senza conteggio

Di seguito è riportato un esempio di pianificazione delle opzioni di inclusione senza conteggio.

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline: null
      scan:
        scheduling:
          interval: PT40S
        buckets:
          - bucket:
              namespace: idee4xpu3dvm
              name: data-prepper-object-storage-testing
      compression: none

Pianificazione delle opzioni di inclusione con conteggio

Di seguito è riportato un esempio di pianificazione delle opzioni di inclusione con conteggio.

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline: null
      scan:
        scheduling:
          interval: PT40S
          count: 10
        buckets:
          - bucket:
              namespace: idee4xpu3dvm
              name: data-prepper-object-storage-testing
      compression: none

Pianificazione delle opzioni di inclusione con ora di inizio

Di seguito è riportato un esempio di pianificazione delle opzioni di inclusione con l'ora di inizio.


oci-object:
      codec:
        newline:
      scan:
        scheduling:
          interval: "PT40S"
          count: 10
        start_time: 2023-01-01T00:00:00Z
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
      compression: "none"

Pianificazione delle opzioni di inclusione con ora di fine

Di seguito è riportato un esempio di pianificazione delle opzioni di inclusione con l'ora di fine.

oci-object:
      codec:
        newline:
      scan:
        scheduling:
          interval: "PT40S"
          count: 10
        end_time: 2023-01-01T00:00:00Z
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
      compression: "none"

Kafka YAML

L'origine Kafka non richiede alcun coordinamento di origine.

Per informazioni su tutte le configurazioni disponibili per l'origine Kafka, accedere al collegamento seguente:

https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kafka/

È possibile utilizzare il servizio di streaming OCI come origine Kafka da includere nel cluster OpenSearch. Per informazioni su come eseguire l'installazione, vedere Uso delle API Kafka.

Accesso pubblico streaming OCI YAML

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
kafka-pipeline:
  source:
    kafka:
      bootstrap_servers:
        - <bootstrap_servers>
      topics:
        - name: <topic_name>
          group_id: <group_id>
      acknowledgments: true
      encryption:
        type: ssl
        insecure: false
      authentication:
        sasl:
          oci:
            stream_pool_id: <target-stream-pool-ocid>
  sink:
    - opensearch:
        hosts: [ <opensearch-cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>

Pipeline - Servizio di streaming OCI - Accesso privato YAML

Di seguito è riportato un esempio di YAML per le pipeline OpenSearch nel servizio di streaming OCI.

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
kafka-pipeline:
  source:
    kafka:
      bootstrap_servers:
        - <bootstrap_servers>
      topics:
        - name: <topic_name>
          group_id: <group_id>
      acknowledgments: true
      encryption:
        type: ssl
        insecure: false
      authentication:
        sasl:
          oci:
            stream_pool_id: <target-stream-pool-ocid>
  sink:
    - opensearch:
        hosts: [ <opensearch-cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>

Kafka YAML autogestita

Di seguito è riportato un esempio di Kafka YAML autogestito per OpenSearch:

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
      kafka-credentials:
        secret_id: <secret-ocid>
simple-sample-pipeline:
  source:
    kafka:
      bootstrap_servers:
        - "https://<bootstrap_server_fqdn>:9092"
      topics:
        - name: <topic_name>
          group_id: <group_id>
      acknowledgments: true
      encryption:
        type: ssl
        insecure: false
        certificate: <certificate-in-pem-format>
      authentication:
        sasl:
          plaintext:
            username: ${{oci_secrets:kafka-credentials:username}}
            password: ${{oci_secrets:kafka-credentials:password}}
  sink:
    - opensearch:
        hosts: [ <opensearch-cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>