OpenSearch Pipelines
Créez et gérez des pipelines OpenSearch à l'aide de Data Prepper pour ingérer des données dans une grappe OpenSearch.
Data Prepper est un collecteur de données à code source libre qui peut filtrer, enrichir, transformer, normaliser et agréger les données pour l'analyse et la visualisation en aval. Il s'agit de l'un des outils d'ingestion de données les plus recommandés pour le traitement de jeux de données volumineux et complexes.
La fonction Grappe OpenSearch avec précompteur de données est actuellement disponible dans le domaine OC1.
Politiques requises
Effectuez les tâches suivantes avant de passer aux étapes décrites dans cette rubrique :
Si vous n'êtes pas administrateur de votre location, communiquez avec les administrateurs de votre location pour vous accorder ces autorisations. L'administrateur doit mettre à jour l'autorisation suivante pour permettre aux utilisateurs non administrateurs de gérer et de mettre à jour les opérations CRUD des pipelines
La politique suivante permet à l'administrateur d'accorder des autorisations à tous les utilisateurs de la location concernée :
Allow any-user to manage opensearch-cluster-pipeline in tenancy
La politique suivante permet à l'administrateur d'accorder l'autorisation pour un groupe dans un compartiment (recommandé)
Allow group <group> to manage opensearch-cluster-pipeline in compartment <compartment>
où <group>
est tous les utilisateurs de ce groupe peuvent accéder à la ressource.
La politique suivante permet aux pipelines OpenSearch de lire les clés secrètes à partir d'Oracle Cloud Infrastructure Vault.
Allow any-user to read secret-bundles in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }' }
Allow any-user to read secrets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }
Politiques de stockage d'objets
Ces politiques ne sont requises que pour la source de stockage d'objets :
La politique suivante permet aux pipelines OpenSearch d'utiliser un seau du stockage d'objets comme persistance de coordination source :
Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<source-coordination-bucket-name>'}
La politique suivante permet aux pipelines OpenSearch d'ingérer des objets à partir du stockage d'objets :
Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
La politique suivante permet aux pipelines OpenSearch de lire des seaux à partir du stockage d'objets :
Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
La politique suivante permet aux pipelines OpenSearch de lire les seaux à partir du seau de coordination source.
Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
Service de diffusion en continu pour OCI et politiques Kafka autogérées
Ces politiques sont requises pour le service de diffusion en continu pour OCI ou pour les sources Kafka autogérées.
Politiques de réseau communes
Ces politiques ne sont pas nécessaires pour le service de diffusion en continu OCI public.
Politiques à ajouter pour permettre au service OpenSearch de créer, lire et mettre à jour les points d'extrémité privés à supprimer dans le sous-réseau du client.
Allow group SearchOpenSearchAdmins to manage vnics in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage vcns in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage subnets in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to use network-security-groups in compartment <network_resources_compartment>
Politiques du service de diffusion en continu pour OCI (publique et privée)
Ces politiques ne sont requises que pour le service de diffusion en continu pour OCI.
La politique suivante permet aux pipelines OpenSearch de consommer les enregistrements du service de diffusion en continu pour OCI.
Allow ANY-USER TO {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in compartment '<compartment-name>'
where ALL {request.principal.type='opensearchpipeline', target.streampool.id = '<target-stream-pool-ocid>'}
La politique suivante permet aux pipelines OpenSearch de lire les groupes de flux à partir du service de diffusion en continu OCI
Allow ANY-USER TO read stream-pools in compartment '<compartment-name>' where ALL {request.principal.type='opensearchpipeline',
target.streampool.id = '<target-stream-pool-ocid>'}
Autorisation Kafka autogérée
Ces autorisations ne sont requises que pour les sources Kafka autogérées.
Sélectionnez le lien suivant pour ajouter l'autorisation requise pour lister les sujets, décrire les sujets, rejoindre le groupe et consommer les enregistrements du sujet par le pipeline OpenSearch :
Règles de sécurité de réseau
Cette configuration n'est requise que pour le service de diffusion en continu OCI privé et Kafka autogéré. Dans le cas du service de diffusion en continu Public OCI, n'en sélectionnez aucune.
Ajoutez une règle de sécurité de trafic entrant dans la liste de sécurité du sous-réseau ou du groupe de sécurité de réseau à tous les pipelines OpenSearch pour communiquer avec le service de diffusion en continu OCI privé s'exécutant dans votre sous-réseau.
Pour ajouter la règle de sécurité, voir Règles de sécurité et Accès et sécurité.
Pour trouver le CIDR de votre sous-réseau, voir Obtention des détails d'un sous-réseau.
L'image suivante présente les règles de trafic entrant pour le groupe de sécurité de réseau.

Création des clés secrètes dans le service de chambre forte
Toutes les clés secrètes de texte brut requises par les pipelines OpenSearch doivent lui être transmises par le service de chambre forte, car les pipelines OpenSearch n'acceptent pas les clés secrètes de texte brut telles que les noms d'utilisateur et les mots de passe dans le code yaml des pipelines.
Effectuez les tâches suivantes :
- Créez un utilisateur avec des autorisations d'écriture dans votre grappe OpenSearch. Pour obtenir des instructions, consultez les rubriques de documentation suivantes sur OpenSearch :
- Créez des clés secrètes (nom d'utilisateur et mot de passe) dans le service de chambre forte pour le nouvel utilisateur que vous avez créé. Pour obtenir des instructions, voir les rubriques suivantes sur Oracle Cloud Infrastructure Vault :
- Ajoutez les politiques de principal de ressource suivantes dans votre location OpenSearch pour permettre aux pipelines OpenSearch de lire les clés secrètes de la chambre forte.
ALLOW ANY-USER to read secrets in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' } ALLOW ANY-USER to read secret-bundles in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }
Pour plus d'informations sur la création de politiques pour le service de chambre forte, voir Informations détaillées sur le service de chambre forte.
Vous pouvez effectuer les tâches de pipeline OpenSearch suivantes :
Lister les pipelines OpenSearch d'un compartiment.
Créez un nouveau pipeline OpenSearch.
Obtenir les détails d'un pipeline OpenSearch.
Processeurs pris en charge
Le tableau suivant liste ces processeurs sont pris en charge dans le pipeline.
Stockage d'objets et coordination des sources YAML
La source du stockage d'objets dépend de la configuration de la coordination des sources. Le pipeline OpenSearch prend en charge la coordination des sources à l'aide du stockage d'objets en tant que persistance. Vous devez fournir les détails du seau de stockage d'objets à partir de votre location.
Voici un exemple de coordination des sources :
source_coordination:
store:
oci-object-bucket:
name: <OCI Object storage bucket-name details from their tenancy>
namespace: <namespace>
Pour plus d'informations sur l'obtention de l'espace de noms du stockage d'objets de votre location, voir Présentation des espaces de noms du stockage d'objets.
Voici un exemple de coordination source à l'aide de la persistance du service de stockage d'objets :
source_coordination:
store:
oci-object-bucket:
name: "dataprepper-test-pipelines" <-- bucket name
namespace: "idee4xpu3dvm". <-- namespace
Stockage d'objets YAML
Voici les sections de la configuration de pipeline YAML à connaître :
- Clés secrètes OCI : Vous pouvez créer une clé secrète dans le service de chambre forte avec vos données d'identification de grappe OpenSearch et l'utiliser dans le YAML de pipeline pour vous connecter à la grappe OpenSearch.
- OpenSearch Récepteur : Le récepteur contient des OCID de grappe OpenSearch avec des noms d'index pour l'ingestion.
- oci-object source : Le préparateur de données prend en charge l'ingestion basée sur le balayage à l'aide du stockage d'objets, qui comporte de nombreuses configurations prises en charge. Vous pouvez configurer la source pour l'ingestion d'objets dans votre compartiment de stockage d'objets en fonction de la fréquence programmée ou non selon un programme. Vous disposez des options de balayage suivantes :
- Un ou plusieurs balayages temporels : Cette option vous permet de configurer un pipeline qui lit des objets dans les seaux de stockage d'objets une ou plusieurs fois en fonction de l'heure de la dernière modification des objets.
- Balayage basé sur la programmation : Cette option vous permet de programmer un balayage sur un intervalle régulier après la création du pipeline.
Le tableau suivant répertorie les options que vous pouvez utiliser pour configurer la source du stockage d'objets.
Options | Obligatoire | Type | Description |
---|---|---|---|
acknowledgments |
Nombre | Boolean | Lorsque true , permet aux sources d'objets du service de stockage d'objets de recevoir des accusés de réception de bout en bout lorsque des événements sont reçus par les puits OpenSearch. |
buffer_timeout |
Nombre | Durée | Durée d'écriture des événements dans la mémoire tampon du préparateur de données avant la temporisation. Tous les événements que la source OCI ne peut pas écrire dans la mémoire tampon pendant la durée spécifiée sont abandonnés. La valeur par défaut est 10s . |
codec |
Oui | Codec | Le codec de Data Prepper à appliquer. |
compression |
Nombre | Chaîne | Algorithme de compression à appliquer : none , gzip , snappy ou automatic . La valeur par défaut est none . |
delete_oci_objects_on_read |
Nombre | Boolean | Lorsque true , le balayage source du service de stockage d'objets tente de supprimer des objets de stockage d'objets après l'accusé de réception de tous les événements de l'objet de stockage d'objets. acknowledgments doit être activé lors de la suppression des objets de stockage d'objets. La valeur par défaut est false . La suppression ne fonctionne pas si acknowledgments de bout en bout n'est pas activé. |
oci |
Nombre | OCI | Configuration OCI. Pour plus d'informations, voir la section OCI suivante. |
records_to_accumulate |
Nombre | Entier | Nombre de messages qui s'accumulent avant d'être écrits dans la mémoire tampon. La valeur par défaut est 100 . |
workers |
Nombre | Entier | Configure le nombre d'unités d'exécution de traitement que la source utilise pour lire des données à partir du seau OCI. Laissez cette valeur par défaut, sauf si les objets du service de stockage d'objets sont inférieurs à 1 Mo. La performance peut diminuer pour les objets de stockage d'objets de plus grande taille. La valeur par défaut est 1 . |
Configuration du pipeline de stockage d'objets YAML
Voici un exemple de configuration YAML du pipeline de stockage d'objets :
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
compression: none
scan:
start_time: 2024-11-18T08:01:59.363Z
buckets:
- bucket:
namespace: <namespace>
name: <bucket-name>
sink:
- opensearch:
hosts: [ <cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
Exemples de configurations
Voici les options de balayage ponctuel que vous pouvez appliquer au niveau du compartiment de stockage d'objets ou au niveau du balayage
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "<namespace>"
name: "data-prepper-object-storage-testing"
start_time: 2023-01-01T00:00:00Z
compression: "none"
Date et heure de fin
Voici un exemple d'heure de fin :
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
end_time: 2024-12-01T00:00:00Z
compression: "none"
Heure de début et heure de fin
Voici un exemple d'heure de début et d'heure de fin :
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
end_time: 2024-12-01T00:00:00Z
compression: "none"
Plage
Voici un exemple d'intervalle :
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
range: "PT12H"
compression: "none"
Heure de début, heure de fin et intervalle
Voici un exemple d'heure de début, d'heure de fin et d'intervalle :
oci-object:
codec:
newline:
scan:
start_time: 2023-01-01T00:00:00Z
end_time: 2024-12-01T00:00:00Z
range: "PT12H"
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
Filtre include_prefix
Voici un exemple de filtre include_prefix
:
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
filter:
include_prefix: ["newtest1", "10-05-2024"]
compression: "none"
Filtrer les fichiers du dossier
Voici un exemple de fichiers de filtre à partir d'un dossier. Pour lire des fichiers uniquement à partir de dossiers spécifiques, utilisez un filtre pour spécifier des dossiers. Voici un exemple d'inclusion de fichiers à partir de folder2 dans folder1 à l'aide de include_prefix
.
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
filter:
include_prefix: ["folder1/folder2"]
compression: "none"
Filtre exclude_prefix
Voici un exemple de filtre exclude_prefix
:
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
filter:
include_prefix: ["newtest", "10-05-2024"]
exclude_suffix: [".png"]
compression: "none"
Prise en charge du codec pour JSON
Voici un exemple de prise en charge des codecs pour JSON :
source:
oci-object:
acknowledgments: true
codec:
json: null
scan:
start_time: 2024-06-10T00:00:00Z
end_time: 2024-06-10T23:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
- bucket:
namespace: "idee4xpu3dvm"
name: "object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
compression: "none"
Prise en charge du codec pour CSV
Voici un exemple de prise en charge des codecs pour CSV :
source:
oci-object:
acknowledgments: true
codec:
csv: null
scan:
start_time: 2024-06-10T00:00:00Z
end_time: 2024-06-10T23:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
- bucket:
namespace: "idee4xpu3dvm"
name: "object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
compression: "none"
Prise en charge du codec pour Newline
Voici un exemple de prise en charge des codecs pour newline :
source:
oci-object:
acknowledgments: true
codec:
newline: null
scan:
start_time: 2024-06-10T00:00:00Z
end_time: 2024-06-10T23:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
- bucket:
namespace: "idee4xpu3dvm"
name: "object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
compression: "none"
Programmation des options d'ingestion sans nombre
Voici un exemple de programmation d'options d'ingestion sans nombre :
simple-sample-pipeline:
source:
oci-object:
codec:
newline: null
scan:
scheduling:
interval: PT40S
buckets:
- bucket:
namespace: idee4xpu3dvm
name: data-prepper-object-storage-testing
compression: none
Programmation des options d'ingestion avec le nombre
Voici un exemple de programmation d'options d'ingestion avec le nombre :
simple-sample-pipeline:
source:
oci-object:
codec:
newline: null
scan:
scheduling:
interval: PT40S
count: 10
buckets:
- bucket:
namespace: idee4xpu3dvm
name: data-prepper-object-storage-testing
compression: none
Programmation des options d'ingestion avec l'heure de début
Voici un exemple d'options d'ingestion de programmation avec l'heure de début :
oci-object:
codec:
newline:
scan:
scheduling:
interval: "PT40S"
count: 10
start_time: 2023-01-01T00:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
compression: "none"
Programmation des options d'ingestion avec heure de fin
Voici un exemple d'options d'ingestion de programmation avec heure de fin :
oci-object:
codec:
newline:
scan:
scheduling:
interval: "PT40S"
count: 10
end_time: 2023-01-01T00:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
compression: "none"
Kafka YAML
La source Kafka ne nécessite aucune coordination des sources.
Pour plus d'informations sur toutes les configurations disponibles pour la source Kafka, accédez au lien suivant :
https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kafka/
Vous pouvez utiliser le service de diffusion en continu pour OCI en tant que source Kafka pour l'ingestion dans la grappe OpenSearch. Pour plus d'informations sur cette opération, voir Utilisation des API Kafka.
OCI - Accès public au service de diffusion en continu YAML
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
kafka-pipeline:
source:
kafka:
bootstrap_servers:
- <bootstrap_servers>
topics:
- name: <topic_name>
group_id: <group_id>
acknowledgments: true
encryption:
type: ssl
insecure: false
authentication:
sasl:
oci:
stream_pool_id: <target-stream-pool-ocid>
sink:
- opensearch:
hosts: [ <opensearch-cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
Accès privé au service de diffusion en continu OCI pour pipelines YAML
Voici un exemple de YAML pour les pipelines OpenSearch du service de diffusion en continu pour OCI :
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
kafka-pipeline:
source:
kafka:
bootstrap_servers:
- <bootstrap_servers>
topics:
- name: <topic_name>
group_id: <group_id>
acknowledgments: true
encryption:
type: ssl
insecure: false
authentication:
sasl:
oci:
stream_pool_id: <target-stream-pool-ocid>
sink:
- opensearch:
hosts: [ <opensearch-cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
Kafka YAML autogéré
Voici un exemple de Kafka YAML autogéré pour OpenSearch :
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
kafka-credentials:
secret_id: <secret-ocid>
simple-sample-pipeline:
source:
kafka:
bootstrap_servers:
- "https://<bootstrap_server_fqdn>:9092"
topics:
- name: <topic_name>
group_id: <group_id>
acknowledgments: true
encryption:
type: ssl
insecure: false
certificate: <certificate-in-pem-format>
authentication:
sasl:
plaintext:
username: ${{oci_secrets:kafka-credentials:username}}
password: ${{oci_secrets:kafka-credentials:password}}
sink:
- opensearch:
hosts: [ <opensearch-cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>