OpenSearch Pipelines

Créez et gérez des pipelines OpenSearch à l'aide de Data Prepper pour ingérer des données dans une grappe OpenSearch.

Data Prepper est un collecteur de données à code source libre qui peut filtrer, enrichir, transformer, normaliser et agréger les données pour l'analyse et la visualisation en aval. Il s'agit de l'un des outils d'ingestion de données les plus recommandés pour le traitement de jeux de données volumineux et complexes.

Nous utilisons le modèle PULL pour ingérer des données dans la grappe OpenSearch 2.x et les intégrer au service Oracle Cloud Infrastructure Streaming, Kafka autogéré et au service de stockage d'objets.
Note

La fonction Grappe OpenSearch avec précompteur de données est actuellement disponible dans le domaine OC1.

Politiques requises

Effectuez les tâches suivantes avant de passer aux étapes décrites dans cette rubrique :

Si vous n'êtes pas administrateur de votre location, communiquez avec les administrateurs de votre location pour vous accorder ces autorisations. L'administrateur doit mettre à jour l'autorisation suivante pour permettre aux utilisateurs non administrateurs de gérer et de mettre à jour les opérations CRUD des pipelines

La politique suivante permet à l'administrateur d'accorder des autorisations à tous les utilisateurs de la location concernée :

Allow any-user to manage opensearch-cluster-pipeline in tenancy

La politique suivante permet à l'administrateur d'accorder l'autorisation pour un groupe dans un compartiment (recommandé)

Allow group <group> to manage opensearch-cluster-pipeline in compartment <compartment>

<group> est tous les utilisateurs de ce groupe peuvent accéder à la ressource.

La politique suivante permet aux pipelines OpenSearch de lire les clés secrètes à partir d'Oracle Cloud Infrastructure Vault.

Allow any-user to read secret-bundles in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }' }
Allow any-user to read secrets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }

Politiques de stockage d'objets

Ces politiques ne sont requises que pour la source de stockage d'objets :

La politique suivante permet aux pipelines OpenSearch d'utiliser un seau du stockage d'objets comme persistance de coordination source :

Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<source-coordination-bucket-name>'}

La politique suivante permet aux pipelines OpenSearch d'ingérer des objets à partir du stockage d'objets :

Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}

La politique suivante permet aux pipelines OpenSearch de lire des seaux à partir du stockage d'objets :

Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}

La politique suivante permet aux pipelines OpenSearch de lire les seaux à partir du seau de coordination source.

Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}

Service de diffusion en continu pour OCI et politiques Kafka autogérées

Ces politiques sont requises pour le service de diffusion en continu pour OCI ou pour les sources Kafka autogérées.

Politiques de réseau communes

Note

Ces politiques ne sont pas nécessaires pour le service de diffusion en continu OCI public.

Politiques à ajouter pour permettre au service OpenSearch de créer, lire et mettre à jour les points d'extrémité privés à supprimer dans le sous-réseau du client.

Allow group SearchOpenSearchAdmins to manage vnics in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage vcns in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage subnets in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to use network-security-groups in compartment <network_resources_compartment>

Politiques du service de diffusion en continu pour OCI (publique et privée)

Ces politiques ne sont requises que pour le service de diffusion en continu pour OCI.

La politique suivante permet aux pipelines OpenSearch de consommer les enregistrements du service de diffusion en continu pour OCI.

Allow ANY-USER TO {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in compartment '<compartment-name>' 
where ALL {request.principal.type='opensearchpipeline', target.streampool.id = '<target-stream-pool-ocid>'}

La politique suivante permet aux pipelines OpenSearch de lire les groupes de flux à partir du service de diffusion en continu OCI

Allow ANY-USER TO read stream-pools in compartment '<compartment-name>' where ALL {request.principal.type='opensearchpipeline', 
target.streampool.id = '<target-stream-pool-ocid>'}

Autorisation Kafka autogérée

Ces autorisations ne sont requises que pour les sources Kafka autogérées.

Sélectionnez le lien suivant pour ajouter l'autorisation requise pour lister les sujets, décrire les sujets, rejoindre le groupe et consommer les enregistrements du sujet par le pipeline OpenSearch :

https://kafka.apache.org/documentation/#security_authz

Règles de sécurité de réseau

Cette configuration n'est requise que pour le service de diffusion en continu OCI privé et Kafka autogéré. Dans le cas du service de diffusion en continu Public OCI, n'en sélectionnez aucune.

Ajoutez une règle de sécurité de trafic entrant dans la liste de sécurité du sous-réseau ou du groupe de sécurité de réseau à tous les pipelines OpenSearch pour communiquer avec le service de diffusion en continu OCI privé s'exécutant dans votre sous-réseau.

Pour ajouter la règle de sécurité, voir Règles de sécurité et Accès et sécurité.

Pour trouver le CIDR de votre sous-réseau, voir Obtention des détails d'un sous-réseau.

L'image suivante présente les règles de trafic entrant pour le groupe de sécurité de réseau.

Règles de trafic entrant pour les groupes de sécurité de réseau

Création des clés secrètes dans le service de chambre forte

Toutes les clés secrètes de texte brut requises par les pipelines OpenSearch doivent lui être transmises par le service de chambre forte, car les pipelines OpenSearch n'acceptent pas les clés secrètes de texte brut telles que les noms d'utilisateur et les mots de passe dans le code yaml des pipelines.

Effectuez les tâches suivantes :

  • Créez un utilisateur avec des autorisations d'écriture dans votre grappe OpenSearch. Pour obtenir des instructions, consultez les rubriques de documentation suivantes sur OpenSearch :

    Groupes d'actions par défaut

    Utilisateurs et rôles

    Rechercher avec les politiques IAM OpenSearch

  • Créez des clés secrètes (nom d'utilisateur et mot de passe) dans le service de chambre forte pour le nouvel utilisateur que vous avez créé. Pour obtenir des instructions, voir les rubriques suivantes sur Oracle Cloud Infrastructure Vault :

    Gestion des clés secrètes de chambre forte

    Création d'une clé secrète dans une chambre forte

  • Ajoutez les politiques de principal de ressource suivantes dans votre location OpenSearch pour permettre aux pipelines OpenSearch de lire les clés secrètes de la chambre forte.
    ALLOW ANY-USER to read secrets in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' } 
    ALLOW ANY-USER to read secret-bundles in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }

    Pour plus d'informations sur la création de politiques pour le service de chambre forte, voir Informations détaillées sur le service de chambre forte.

Vous pouvez effectuer les tâches de pipeline OpenSearch suivantes :

Lister les pipelines OpenSearch d'un compartiment.

Créez un nouveau pipeline OpenSearch.

Obtenir les détails d'un pipeline OpenSearch.

Modifiez les paramètres d'un pipeline OpenSearch.

Supprimez un pipeline OpenSearch de votre location.

Processeurs pris en charge

Stockage d'objets et coordination des sources YAML

La source du stockage d'objets dépend de la configuration de la coordination des sources. Le pipeline OpenSearch prend en charge la coordination des sources à l'aide du stockage d'objets en tant que persistance. Vous devez fournir les détails du seau de stockage d'objets à partir de votre location.

Voici un exemple de coordination des sources :

source_coordination:
  store:
    oci-object-bucket:
      name: <OCI Object storage bucket-name details from their tenancy>
      namespace: <namespace>

Pour plus d'informations sur l'obtention de l'espace de noms du stockage d'objets de votre location, voir Présentation des espaces de noms du stockage d'objets.

Voici un exemple de coordination source à l'aide de la persistance du service de stockage d'objets :

source_coordination:
  store:
    oci-object-bucket:
      name: "dataprepper-test-pipelines" <-- bucket name
      namespace: "idee4xpu3dvm".         <-- namespace

Stockage d'objets YAML

Voici les sections de la configuration de pipeline YAML à connaître :

  • Clés secrètes OCI : Vous pouvez créer une clé secrète dans le service de chambre forte avec vos données d'identification de grappe OpenSearch et l'utiliser dans le YAML de pipeline pour vous connecter à la grappe OpenSearch.
  • OpenSearch Récepteur : Le récepteur contient des OCID de grappe OpenSearch avec des noms d'index pour l'ingestion.
  • oci-object source : Le préparateur de données prend en charge l'ingestion basée sur le balayage à l'aide du stockage d'objets, qui comporte de nombreuses configurations prises en charge. Vous pouvez configurer la source pour l'ingestion d'objets dans votre compartiment de stockage d'objets en fonction de la fréquence programmée ou non selon un programme. Vous disposez des options de balayage suivantes :
    • Un ou plusieurs balayages temporels : Cette option vous permet de configurer un pipeline qui lit des objets dans les seaux de stockage d'objets une ou plusieurs fois en fonction de l'heure de la dernière modification des objets.
    • Balayage basé sur la programmation : Cette option vous permet de programmer un balayage sur un intervalle régulier après la création du pipeline.

Le tableau suivant répertorie les options que vous pouvez utiliser pour configurer la source du stockage d'objets.

Configurations du stockage d'objets
Options Obligatoire Type Description
acknowledgments Nombre Boolean Lorsque true, permet aux sources d'objets du service de stockage d'objets de recevoir des accusés de réception de bout en bout lorsque des événements sont reçus par les puits OpenSearch.
buffer_timeout Nombre Durée Durée d'écriture des événements dans la mémoire tampon du préparateur de données avant la temporisation. Tous les événements que la source OCI ne peut pas écrire dans la mémoire tampon pendant la durée spécifiée sont abandonnés. La valeur par défaut est 10s.
codec Oui Codec Le codec de Data Prepper à appliquer.
compression Nombre Chaîne Algorithme de compression à appliquer : none, gzip, snappy ou automatic. La valeur par défaut est none.
delete_oci_objects_on_read Nombre Boolean Lorsque true, le balayage source du service de stockage d'objets tente de supprimer des objets de stockage d'objets après l'accusé de réception de tous les événements de l'objet de stockage d'objets. acknowledgments doit être activé lors de la suppression des objets de stockage d'objets. La valeur par défaut est false. La suppression ne fonctionne pas si acknowledgments de bout en bout n'est pas activé.
oci Nombre OCI Configuration OCI. Pour plus d'informations, voir la section OCI suivante.
records_to_accumulate Nombre Entier Nombre de messages qui s'accumulent avant d'être écrits dans la mémoire tampon. La valeur par défaut est 100.
workers Nombre Entier Configure le nombre d'unités d'exécution de traitement que la source utilise pour lire des données à partir du seau OCI. Laissez cette valeur par défaut, sauf si les objets du service de stockage d'objets sont inférieurs à 1 Mo. La performance peut diminuer pour les objets de stockage d'objets de plus grande taille. La valeur par défaut est 1.

Configuration du pipeline de stockage d'objets YAML

Voici un exemple de configuration YAML du pipeline de stockage d'objets :

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username: 
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      compression: none
      scan:
        start_time: 2024-11-18T08:01:59.363Z
        buckets:
          - bucket:
              namespace: <namespace>
              name: <bucket-name>
  sink:
    - opensearch:
        hosts: [ <cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>

Exemples de configurations

Voici les options de balayage ponctuel que vous pouvez appliquer au niveau du compartiment de stockage d'objets ou au niveau du balayage

oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "<namespace>"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-01-01T00:00:00Z
      compression: "none"

Date et heure de fin

Voici un exemple d'heure de fin :


simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               end_time: 2024-12-01T00:00:00Z
      compression: "none"

Heure de début et heure de fin

Voici un exemple d'heure de début et d'heure de fin :

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               end_time: 2024-12-01T00:00:00Z
      compression: "none"

Plage

Voici un exemple d'intervalle :

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               range: "PT12H"
      compression: "none"

Heure de début, heure de fin et intervalle

Voici un exemple d'heure de début, d'heure de fin et d'intervalle :

oci-object:
      codec:
        newline:
      scan:
        start_time: 2023-01-01T00:00:00Z
        end_time: 2024-12-01T00:00:00Z
        range: "PT12H"
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"

Filtre include_prefix

Voici un exemple de filtre include_prefix :

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               filter:
                 include_prefix: ["newtest1", "10-05-2024"]
      compression: "none"

Filtrer les fichiers du dossier

Voici un exemple de fichiers de filtre à partir d'un dossier. Pour lire des fichiers uniquement à partir de dossiers spécifiques, utilisez un filtre pour spécifier des dossiers. Voici un exemple d'inclusion de fichiers à partir de folder2 dans folder1 à l'aide de include_prefix.

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               filter:
                include_prefix: ["folder1/folder2"]
      compression: "none"

Filtre exclude_prefix

Voici un exemple de filtre exclude_prefix :

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               filter:
                 include_prefix: ["newtest", "10-05-2024"]
                 exclude_suffix: [".png"]
      compression: "none"

Prise en charge du codec pour JSON

Voici un exemple de prise en charge des codecs pour JSON :

source:
    oci-object:
      acknowledgments: true
      codec:
        json: null
      scan:
        start_time: 2024-06-10T00:00:00Z
        end_time: 2024-06-10T23:00:00Z
        buckets:
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "data-prepper-object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
      compression: "none"

Prise en charge du codec pour CSV

Voici un exemple de prise en charge des codecs pour CSV :


source:
    oci-object:
      acknowledgments: true
      codec:
        csv: null
      scan:
        start_time: 2024-06-10T00:00:00Z
        end_time: 2024-06-10T23:00:00Z
        buckets:
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "data-prepper-object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
      compression: "none"

Prise en charge du codec pour Newline

Voici un exemple de prise en charge des codecs pour newline :

source:
    oci-object:
      acknowledgments: true
      codec:
        newline: null
      scan:
        start_time: 2024-06-10T00:00:00Z
        end_time: 2024-06-10T23:00:00Z
        buckets:
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "data-prepper-object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
      compression: "none"

Programmation des options d'ingestion sans nombre

Voici un exemple de programmation d'options d'ingestion sans nombre :

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline: null
      scan:
        scheduling:
          interval: PT40S
        buckets:
          - bucket:
              namespace: idee4xpu3dvm
              name: data-prepper-object-storage-testing
      compression: none

Programmation des options d'ingestion avec le nombre

Voici un exemple de programmation d'options d'ingestion avec le nombre :

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline: null
      scan:
        scheduling:
          interval: PT40S
          count: 10
        buckets:
          - bucket:
              namespace: idee4xpu3dvm
              name: data-prepper-object-storage-testing
      compression: none

Programmation des options d'ingestion avec l'heure de début

Voici un exemple d'options d'ingestion de programmation avec l'heure de début :


oci-object:
      codec:
        newline:
      scan:
        scheduling:
          interval: "PT40S"
          count: 10
        start_time: 2023-01-01T00:00:00Z
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
      compression: "none"

Programmation des options d'ingestion avec heure de fin

Voici un exemple d'options d'ingestion de programmation avec heure de fin :

oci-object:
      codec:
        newline:
      scan:
        scheduling:
          interval: "PT40S"
          count: 10
        end_time: 2023-01-01T00:00:00Z
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
      compression: "none"

Kafka YAML

La source Kafka ne nécessite aucune coordination des sources.

Pour plus d'informations sur toutes les configurations disponibles pour la source Kafka, accédez au lien suivant :

https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kafka/

Vous pouvez utiliser le service de diffusion en continu pour OCI en tant que source Kafka pour l'ingestion dans la grappe OpenSearch. Pour plus d'informations sur cette opération, voir Utilisation des API Kafka.

OCI - Accès public au service de diffusion en continu YAML

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
kafka-pipeline:
  source:
    kafka:
      bootstrap_servers:
        - <bootstrap_servers>
      topics:
        - name: <topic_name>
          group_id: <group_id>
      acknowledgments: true
      encryption:
        type: ssl
        insecure: false
      authentication:
        sasl:
          oci:
            stream_pool_id: <target-stream-pool-ocid>
  sink:
    - opensearch:
        hosts: [ <opensearch-cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>

Accès privé au service de diffusion en continu OCI pour pipelines YAML

Voici un exemple de YAML pour les pipelines OpenSearch du service de diffusion en continu pour OCI :

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
kafka-pipeline:
  source:
    kafka:
      bootstrap_servers:
        - <bootstrap_servers>
      topics:
        - name: <topic_name>
          group_id: <group_id>
      acknowledgments: true
      encryption:
        type: ssl
        insecure: false
      authentication:
        sasl:
          oci:
            stream_pool_id: <target-stream-pool-ocid>
  sink:
    - opensearch:
        hosts: [ <opensearch-cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>

Kafka YAML autogéré

Voici un exemple de Kafka YAML autogéré pour OpenSearch :

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
      kafka-credentials:
        secret_id: <secret-ocid>
simple-sample-pipeline:
  source:
    kafka:
      bootstrap_servers:
        - "https://<bootstrap_server_fqdn>:9092"
      topics:
        - name: <topic_name>
          group_id: <group_id>
      acknowledgments: true
      encryption:
        type: ssl
        insecure: false
        certificate: <certificate-in-pem-format>
      authentication:
        sasl:
          plaintext:
            username: ${{oci_secrets:kafka-credentials:username}}
            password: ${{oci_secrets:kafka-credentials:password}}
  sink:
    - opensearch:
        hosts: [ <opensearch-cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>