OpenSearch Pipelines
Créez et gérez des pipelines OpenSearch à l'aide de Data Prepper pour ingérer des données dans une grappe OpenSearch.
Data Prepper est un collecteur de données à code source libre qui peut filtrer, enrichir, transformer, normaliser et agréger les données pour l'analyse et la visualisation en aval. Il s'agit de l'un des outils d'ingestion de données les plus recommandés pour le traitement de jeux de données volumineux et complexes.
Les pipelines OpenSearch sont compatibles avec les grappes OpenSearch exécutant la version 2.x et les versions ultérieures.
La fonction Grappe OpenSearch avec précompteur de données n'est disponible que dans le domaine OC1.
Politiques requises
Remplissez les exigences de politique décrites dans cette section avant de passer aux étapes décrites dans cette rubrique.
Si vous n'êtes pas administrateur de votre location, communiquez avec les administrateurs de votre location pour vous accorder ces autorisations. L'administrateur doit mettre à jour l'autorisation suivante pour permettre aux utilisateurs non administrateurs de gérer et de mettre à jour les opérations CRUD des pipelines
La politique suivante permet à l'administrateur d'accorder des autorisations à tous les utilisateurs de la location concernée :
Allow any-user to manage opensearch-cluster-pipeline in tenancy
La politique suivante permet à l'administrateur d'accorder l'autorisation pour un groupe dans un compartiment (recommandé)
Allow group <group> to manage opensearch-cluster-pipeline in compartment <compartment>
où <group> est tous les utilisateurs de ce groupe peuvent accéder à la ressource.
La politique suivante permet aux pipelines OpenSearch de lire les clés secrètes à partir d'Oracle Cloud Infrastructure Vault.
Allow any-user to read secret-bundles in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }' }
Allow any-user to read secrets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }
Les noms d'index OpenSearch doivent respecter les règles suivantes :
- Toutes les lettres doivent être en minuscules.
- Les noms ne peuvent pas commencer par un trait de soulignement ( _ ) ou un tiret (-).
- Les noms ne peuvent pas contenir d'espaces, de virgules ni l'un des caractères suivants : : :, ", *, +, /, \, |,?, #, >, <
- Le nom de l'index peut contenir une expression de préfixe de données.
Création des clés secrètes dans le service de chambre forte
Toutes les clés secrètes de texte brut requises par les pipelines OpenSearch doivent lui être transmises par le service de chambre forte, car les pipelines OpenSearch n'acceptent pas les clés secrètes de texte brut telles que les noms d'utilisateur et les mots de passe dans le code YAML des pipelines.
Effectuez les tâches suivantes :
- Créez un utilisateur avec des autorisations d'écriture dans votre grappe OpenSearch. Pour obtenir des instructions, consultez les rubriques de documentation suivantes sur OpenSearch :
- Créez des clés secrètes (nom d'utilisateur et mot de passe) dans le service de chambre forte pour le nouvel utilisateur que vous avez créé. Pour obtenir des instructions, voir les rubriques suivantes sur Oracle Cloud Infrastructure Vault :
- Ajoutez les politiques de principal de ressource suivantes dans votre location OpenSearch pour permettre aux pipelines OpenSearch de lire les clés secrètes de la chambre forte.
ALLOW ANY-USER to read secrets in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' } ALLOW ANY-USER to read secret-bundles in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }Pour plus d'informations sur la création de politiques pour le service de chambre forte, voir Informations détaillées sur le service de chambre forte.
Vous pouvez effectuer les tâches de pipeline OpenSearch suivantes :
Lister les pipelines OpenSearch d'un compartiment.
Créez un nouveau pipeline OpenSearch.
Obtenir les détails d'un pipeline OpenSearch.
Processeurs pris en charge
Le tableau suivant liste ces processeurs sont pris en charge dans le pipeline.
Pipelines PULL
Politiques de stockage d'objets
Ces politiques ne sont requises que pour la source de stockage d'objets :
La politique suivante permet aux pipelines OpenSearch d'utiliser un seau du stockage d'objets comme persistance de coordination source :
Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<source-coordination-bucket-name>'}
La politique suivante permet aux pipelines OpenSearch d'ingérer des objets à partir du stockage d'objets :
Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
La politique suivante permet aux pipelines OpenSearch de lire des seaux à partir du stockage d'objets :
Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
La politique suivante permet aux pipelines OpenSearch de lire les seaux à partir du seau de coordination source.
Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
Service de diffusion en continu pour OCI et politiques Kafka autogérées
Ces politiques sont requises pour le service de diffusion en continu pour OCI ou pour les sources Kafka autogérées.
Politiques de réseau communes
Ces politiques ne sont pas nécessaires pour le service de diffusion en continu OCI public.
Politiques à ajouter pour permettre au service OpenSearch de créer, lire et mettre à jour les points d'extrémité privés à supprimer dans le sous-réseau du client.
Allow group SearchOpenSearchAdmins to manage vnics in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage vcns in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage subnets in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to use network-security-groups in compartment <network_resources_compartment>
Politiques du service de diffusion en continu pour OCI (publique et privée)
Ces politiques ne sont requises que pour le service de diffusion en continu pour OCI.
La politique suivante permet aux pipelines OpenSearch de consommer les enregistrements du service de diffusion en continu pour OCI.
Allow ANY-USER TO {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in compartment '<compartment-name>'
where ALL {request.principal.type='opensearchpipeline', target.streampool.id = '<target-stream-pool-ocid>'}
La politique suivante permet aux pipelines OpenSearch de lire les groupes de flux à partir du service de diffusion en continu OCI
Allow ANY-USER TO read stream-pools in compartment '<compartment-name>' where ALL {request.principal.type='opensearchpipeline',
target.streampool.id = '<target-stream-pool-ocid>'}
Autorisation Kafka autogérée
Ces autorisations ne sont requises que pour les sources Kafka autogérées.
Sélectionnez le lien suivant pour ajouter l'autorisation requise pour lister les sujets, décrire les sujets, rejoindre le groupe et consommer les enregistrements du sujet par le pipeline OpenSearch :
Règles de sécurité de réseau
Cette configuration n'est requise que pour le service de diffusion en continu OCI privé et Kafka autogéré. Dans le cas du service de diffusion en continu Public OCI, n'en sélectionnez aucune.
Ajoutez une règle de sécurité de trafic entrant dans la liste de sécurité du sous-réseau ou du groupe de sécurité de réseau à tous les pipelines OpenSearch pour communiquer avec le service de diffusion en continu OCI privé s'exécutant dans votre sous-réseau.
Pour ajouter la règle de sécurité, voir Règles de sécurité et Accès et sécurité.
Pour trouver le CIDR de votre sous-réseau, voir Obtention des détails d'un sous-réseau.
L'image suivante présente les règles de trafic entrant pour le groupe de sécurité de réseau.
Stockage d'objets et coordination des sources YAML
La source du stockage d'objets dépend de la configuration de la coordination des sources. Le pipeline OpenSearch prend en charge la coordination des sources à l'aide du stockage d'objets en tant que persistance. Vous devez fournir les détails du seau de stockage d'objets à partir de votre location.
Voici un exemple de coordination des sources :
source_coordination:
store:
oci-object-bucket:
name: <OCI Object storage bucket-name details from their tenancy>
namespace: <namespace>
Pour plus d'informations sur l'obtention de l'espace de noms du stockage d'objets de votre location, voir Présentation des espaces de noms du stockage d'objets.
Voici un exemple de coordination source à l'aide de la persistance du service de stockage d'objets :
source_coordination:
store:
oci-object-bucket:
name: "dataprepper-test-pipelines" <-- bucket name
namespace: "idee4xpu3dvm". <-- namespace
Stockage d'objets YAML
Voici les sections de la configuration de pipeline YAML à connaître :
- Clés secrètes OCI : Vous pouvez créer une clé secrète dans le service de chambre forte avec vos données d'identification de grappe OpenSearch et l'utiliser dans le YAML de pipeline pour vous connecter à la grappe OpenSearch.
- OpenSearch Récepteur : Le récepteur contient des OCID de grappe OpenSearch avec des noms d'index pour l'ingestion.
-
oci-object source : Le préparateur de données prend en charge l'ingestion basée sur le balayage à l'aide du stockage d'objets, qui comporte de nombreuses configurations prises en charge. Vous pouvez configurer la source pour l'ingestion d'objets dans votre compartiment de stockage d'objets en fonction de la fréquence programmée ou non selon un programme. Vous disposez des options de balayage suivantes :
- Un ou plusieurs balayages temporels : Cette option vous permet de configurer un pipeline qui lit des objets dans les seaux de stockage d'objets une ou plusieurs fois en fonction de l'heure de la dernière modification des objets.
- Balayage basé sur la programmation : Cette option vous permet de programmer un balayage sur un intervalle régulier après la création du pipeline.
Le tableau suivant répertorie les options que vous pouvez utiliser pour configurer la source du stockage d'objets.
| Options | Obligatoire | Type | Description |
|---|---|---|---|
acknowledgments
|
Nombre | Boolean | Lorsque true, permet aux sources d'objets du service de stockage d'objets de recevoir des accusés de réception de bout en bout lorsque des événements sont reçus par les puits OpenSearch. |
buffer_timeout
|
Nombre | Durée | Durée d'écriture des événements dans la mémoire tampon du préparateur de données avant la temporisation. Tous les événements que la source OCI ne peut pas écrire dans la mémoire tampon pendant la durée spécifiée sont abandonnés. La valeur par défaut est 10s. |
codec
|
Oui | Codec | Le codec de Data Prepper à appliquer. |
compression
|
Nombre | Chaîne | Algorithme de compression à appliquer : none, gzip, snappy ou automatic. La valeur par défaut est none. |
delete_oci_objects_on_read
|
Nombre | Boolean | Lorsque true, le balayage source du service de stockage d'objets tente de supprimer des objets de stockage d'objets après l'accusé de réception de tous les événements de l'objet de stockage d'objets. acknowledgments doit être activé lors de la suppression des objets de stockage d'objets. La valeur par défaut est false. La suppression ne fonctionne pas si acknowledgments de bout en bout n'est pas activé. |
oci
|
Nombre | OCI | Configuration OCI. Pour plus d'informations, voir la section OCI suivante. |
records_to_accumulate
|
Nombre | Entier | Nombre de messages qui s'accumulent avant d'être écrits dans la mémoire tampon. La valeur par défaut est 100. |
workers
|
Nombre | Entier | Configure le nombre d'unités d'exécution de traitement que la source utilise pour lire des données à partir du seau OCI. Laissez cette valeur par défaut, sauf si les objets du service de stockage d'objets sont inférieurs à 1 Mo. La performance peut diminuer pour les objets de stockage d'objets de plus grande taille. La valeur par défaut est 1. |
Configuration du pipeline de stockage d'objets YAML
Voici un exemple de configuration YAML du pipeline de stockage d'objets :
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
compression: none
scan:
start_time: 2024-11-18T08:01:59.363Z
buckets:
- bucket:
namespace: <namespace>
name: <bucket-name>
sink:
- opensearch:
hosts: [ <cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
Exemples de configurations
Voici les options de balayage ponctuel que vous pouvez appliquer au niveau du compartiment de stockage d'objets ou au niveau du balayage
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "<namespace>"
name: "data-prepper-object-storage-testing"
start_time: 2023-01-01T00:00:00Z
compression: "none"
Date et heure de fin
Voici un exemple d'heure de fin :
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
end_time: 2024-12-01T00:00:00Z
compression: "none"
Heure de début et heure de fin
Voici un exemple d'heure de début et d'heure de fin :
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
end_time: 2024-12-01T00:00:00Z
compression: "none"
Plage
Voici un exemple d'intervalle :
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
range: "PT12H"
compression: "none"
Heure de début, heure de fin et intervalle
Voici un exemple d'heure de début, d'heure de fin et d'intervalle :
oci-object:
codec:
newline:
scan:
start_time: 2023-01-01T00:00:00Z
end_time: 2024-12-01T00:00:00Z
range: "PT12H"
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
Filtre include_prefix
Voici un exemple de filtre include_prefix :
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
filter:
include_prefix: ["newtest1", "10-05-2024"]
compression: "none"
Filtrer les fichiers du dossier
Voici un exemple de fichiers de filtre à partir d'un dossier. Pour lire des fichiers uniquement à partir de dossiers spécifiques, utilisez un filtre pour spécifier des dossiers. Voici un exemple d'inclusion de fichiers à partir de folder2 dans folder1 à l'aide de include_prefix.
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
filter:
include_prefix: ["folder1/folder2"]
compression: "none"
Filtre exclude_prefix
Voici un exemple de filtre exclude_prefix :
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
filter:
include_prefix: ["newtest", "10-05-2024"]
exclude_suffix: [".png"]
compression: "none"
Prise en charge du codec pour JSON
Voici un exemple de prise en charge des codecs pour JSON :
source:
oci-object:
acknowledgments: true
codec:
json: null
scan:
start_time: 2024-06-10T00:00:00Z
end_time: 2024-06-10T23:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
- bucket:
namespace: "idee4xpu3dvm"
name: "object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
compression: "none"
Prise en charge du codec pour CSV
Voici un exemple de prise en charge des codecs pour CSV :
source:
oci-object:
acknowledgments: true
codec:
csv: null
scan:
start_time: 2024-06-10T00:00:00Z
end_time: 2024-06-10T23:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
- bucket:
namespace: "idee4xpu3dvm"
name: "object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
compression: "none"
Prise en charge du codec pour Newline
Voici un exemple de prise en charge des codecs pour newline :
source:
oci-object:
acknowledgments: true
codec:
newline: null
scan:
start_time: 2024-06-10T00:00:00Z
end_time: 2024-06-10T23:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
- bucket:
namespace: "idee4xpu3dvm"
name: "object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
compression: "none"
Programmation des options d'ingestion sans nombre
Voici un exemple de programmation d'options d'ingestion sans nombre :
simple-sample-pipeline:
source:
oci-object:
codec:
newline: null
scan:
scheduling:
interval: PT40S
buckets:
- bucket:
namespace: idee4xpu3dvm
name: data-prepper-object-storage-testing
compression: none
Programmation des options d'ingestion avec le nombre
Voici un exemple de programmation d'options d'ingestion avec le nombre :
simple-sample-pipeline:
source:
oci-object:
codec:
newline: null
scan:
scheduling:
interval: PT40S
count: 10
buckets:
- bucket:
namespace: idee4xpu3dvm
name: data-prepper-object-storage-testing
compression: none
Programmation des options d'ingestion avec l'heure de début
Voici un exemple d'options d'ingestion de programmation avec l'heure de début :
oci-object:
codec:
newline:
scan:
scheduling:
interval: "PT40S"
count: 10
start_time: 2023-01-01T00:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
compression: "none"
Programmation des options d'ingestion avec heure de fin
Voici un exemple d'options d'ingestion de programmation avec heure de fin :
oci-object:
codec:
newline:
scan:
scheduling:
interval: "PT40S"
count: 10
end_time: 2023-01-01T00:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
compression: "none"
Kafka YAML
La source Kafka ne nécessite aucune coordination des sources.
Pour plus d'informations sur toutes les configurations disponibles pour la source Kafka, accédez au lien suivant :
https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kafka/
Vous pouvez utiliser le service de diffusion en continu pour OCI en tant que source Kafka pour l'ingestion dans la grappe OpenSearch. Pour plus d'informations sur cette opération, voir Utilisation des API Kafka.
Le nombre de noeuds ne peut pas dépasser le nombre maximal de partitions défini pour le sujet.
OCI - Accès public au service de diffusion en continu YAML
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
kafka-pipeline:
source:
kafka:
bootstrap_servers:
- <bootstrap_servers>
topics:
- name: <topic_name>
group_id: <group_id>
acknowledgments: true
encryption:
type: ssl
insecure: false
authentication:
sasl:
oci:
stream_pool_id: <target-stream-pool-ocid>
sink:
- opensearch:
hosts: [ <opensearch-cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
Accès privé au service de diffusion en continu OCI pour pipelines YAML
Voici un exemple de YAML pour les pipelines OpenSearch du service de diffusion en continu pour OCI :
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
kafka-pipeline:
source:
kafka:
bootstrap_servers:
- <bootstrap_servers>
topics:
- name: <topic_name>
group_id: <group_id>
acknowledgments: true
encryption:
type: ssl
insecure: false
authentication:
sasl:
oci:
stream_pool_id: <target-stream-pool-ocid>
sink:
- opensearch:
hosts: [ <opensearch-cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
Kafka YAML autogéré
Voici un exemple de Kafka YAML autogéré pour OpenSearch :
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
kafka-credentials:
secret_id: <secret-ocid>
simple-sample-pipeline:
source:
kafka:
bootstrap_servers:
- "https://<bootstrap_server_fqdn>:9092"
topics:
- name: <topic_name>
group_id: <group_id>
acknowledgments: true
encryption:
type: ssl
insecure: false
certificate: <certificate-in-pem-format>
authentication:
sasl:
plaintext:
username: ${{oci_secrets:kafka-credentials:username}}
password: ${{oci_secrets:kafka-credentials:password}}
sink:
- opensearch:
hosts: [ <opensearch-cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
Pipeline PUSH
Politiques de réseau communes
Politiques à ajouter pour permettre au service OpenSearch de créer, lire et mettre à jour les points d'extrémité privés à supprimer dans le sous-réseau du client.
Allow group SearchOpenSearchAdmins to manage vnics in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage vcns in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage subnets in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to use network-security-groups in compartment <network_resources_compartment>
Tampon persistant
Les pipelines OpenSearch avec connecteurs poussés Data Prepper nécessitent une mémoire tampon persistante pour stocker la mémoire tampon sur disque afin d'ajouter de la durabilité à vos données. Ceci est obligatoire dans les connecteurs push pour empêcher la perte de données en cas de dysfonctionnement du nœud. Le service de diffusion en continu pour OCI est le seul tampon persistant pris en charge.
Chaque pipeline d'ingestion de données doit avoir une mémoire tampon dédiée. Le partage du même tampon entre plusieurs pipelines peut entraîner une corruption des données en raison de chevauchements ou de données mixtes.
Le tableau suivant répertorie les composants sources du pipeline d'ingestion de données et indique s'il nécessite une mise en mémoire tampon.
| Source | Mémoire tampon persistante requise |
|---|---|
| Stockage d'objets | Nombre |
| Kafka autogéré | Nombre |
| Service de diffusion en continu pour OCI (public) | Nombre |
| Service de diffusion en continu pour OCI (privé) | Nombre |
| OpenTelemetry (Journaux, mesures ou trace) | Oui |
| HTTP | Oui |
Le nombre de noeuds ne peut pas dépasser le nombre maximal de partitions défini pour le sujet.
politiques
Utilisez les politiques suivantes pour appliquer des autorisations liées à la mise en mémoire tampon persistante pour le service de diffusion en continu pour OCI avec des points d'extrémité publics :
- Pour permettre aux pipelines OpenSearch de consommer et de produire les enregistrements dans le service de diffusion en continu pour OCI :
Allow any-user to {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME, STREAM_PRODUCE} in compartment '<compartment_name>' where ALL {request.principal.type='opensearchpipeline', target.streampool.id = '<target_stream_pool_ocid>'} - Pour autoriser les pipelines OpenSearch à lire des groupes de flux à partir du service de diffusion en continu pour OCI :
Allow any-user to read stream-pools in compartment '<compartment_name>' where ALL {request.principal.type='opensearchpipeline', target.streampool.id = '<target_stream_pool_ocid>'}
Pour plus d'informations, voir Informations détaillées sur le service de diffusion en continu.
Règles de sécurité de réseau
Cette configuration n'est requise que pour le service de diffusion en continu OCI privé et Kafka autogéré. Dans le cas du service de diffusion en continu OCI public, n'en sélectionnez aucune.
Ajoutez une règle de sécurité de trafic entrant dans la liste de sécurité du sous-réseau ou du groupe de sécurité de réseau à tous les pipelines OpenSearch pour communiquer avec le service de diffusion en continu OCI privé s'exécutant dans votre sous-réseau.
Pour ajouter la règle de sécurité, voir Règles de sécurité et Accès et sécurité.
Pour trouver le CIDR de votre sous-réseau, voir Obtention des détails d'un sous-réseau.
L'image suivante présente les règles de trafic entrant pour le groupe de sécurité de réseau.
Journaux OpenTelemetry
OpenTelemetry Les journaux nécessitent une mise en mémoire tampon persistante.
Le port des journaux OpenTelemetry est 21892, ce qui ne peut pas être modifié.
Pour plus d'informations sur la configuration, voir Source des journaux OTel.
Vous pouvez utiliser le service de diffusion en continu pour OCI en tant que mémoire tampon persistante. Pour plus d'informations, voir Utilisation d'API Kafka.
OpenTelemetry Configuration du pipeline de journaux
L'exemple suivant présente une configuration de pipeline pour les journaux OpenTelemetry :
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret_ocid>
opensearch-password:
secret_id: <secret_ocid>
pipeline-username:
secret_id: <secret_ocid>
pipeline-password:
secret_id: <secret_ocid>
sample-log-pipeline:
source:
otel_logs_source:
authentication:
http_basic:
username: '${{oci_secrets:pipeline-username}}'
password: '${{oci_secrets:pipeline-password}}'
buffer:
kafka:
# Idempotence should be 'false' for OCI Streaming Service
producer_properties:
enable_idempotence: false
bootstrap_servers:
- <bootstrap_servers>
topics:
- name: <topic_name>
group_id: <group_id>
encryption:
type: ssl
insecure: false
authentication:
sasl:
oci:
stream_pool_id: <target_stream_pool_ocid>
sink:
- opensearch:
hosts: [ <cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: 'otel-logs-%{yyyy.MM.dd}'
Mesures OpenTelemetry
OpenTelemetry Les mesures nécessitent une mise en mémoire tampon persistante.
Le port des mesures OpenTelemetry est 21891, ce qui ne peut pas être modifié.
Pour plus d'informations sur la configuration, voir Source des mesures OTel.
Vous pouvez utiliser le service de diffusion en continu pour OCI en tant que mémoire tampon persistante. Pour plus d'informations, voir Utilisation d'API Kafka.
Configuration du pipeline de mesures OpenTelemetry
L'exemple suivant présente une configuration de pipeline pour les mesures OpenTelemetry :
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret_ocid>
opensearch-password:
secret_id: <secret_ocid>
pipeline-username:
secret_id: <secret_ocid>
pipeline-password:
secret_id: <secret_ocid>
sample-metrics-pipeline:
source:
otel_metrics_source:
authentication:
http_basic:
username: '${{oci_secrets:pipeline-username}}'
password: '${{oci_secrets:pipeline-password}}'
buffer:
kafka:
# Idempotence should be 'false' for OCI Streaming Service
producer_properties:
enable_idempotence: false
bootstrap_servers:
- <bootstrap_servers>
topics:
- name: <topic_name>
group_id: <group_id>
encryption:
type: ssl
insecure: false
authentication:
sasl:
oci:
stream_pool_id: <target_stream_pool_ocid>
sink:
- opensearch:
hosts: [ <cluster_ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: 'otel-metrics-%{yyyy.MM.dd}'
OpenTelemetry Trace
OpenTelemetry La trace nécessite une mise en mémoire tampon persistante.
Le port de OpenTelemetry Trace est 21890, ce qui ne peut pas être modifié.
Pour des informations de configuration, voir Source de trace OTel.
Vous pouvez utiliser le service de diffusion en continu pour OCI en tant que mémoire tampon persistante. Pour plus d'informations, voir Utilisation d'API Kafka.
OpenTelemetry Configuration du pipeline de trace
L'exemple suivant présente une configuration de pipeline pour OpenTelemetry Trace :
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret_ocid>
opensearch-password:
secret_id: <secret_ocid>
pipeline-username:
secret_id: <secret_ocid>
pipeline-password:
secret_id: <secret_ocid>
traces-entry-shared-pipeline:
source:
otel_trace_source:
authentication:
http_basic:
username: '${{oci_secrets:pipeline-username}}'
password: '${{oci_secrets:pipeline-password}}'
buffer:
kafka:
# Idempotence should be 'false' for OCI Streaming Service
producer_properties:
enable_idempotence: false
bootstrap_servers:
- <bootstrap_servers>
topics:
- name: <topic_name>
group_id: <group_id>
encryption:
type: ssl
insecure: false
authentication:
sasl:
oci:
stream_pool_id: <target_stream_pool_ocid>
sink:
- pipeline:
name: traces-pipeline
- pipeline:
name: service-map-pipeline
traces-pipeline:
source:
pipeline:
name: traces-entry-shared-pipeline
processor:
- otel_traces:
sink:
- opensearch:
hosts: [ <cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index_type: trace-analytics-raw
service-map-pipeline:
source:
pipeline:
name: traces-entry-shared-pipeline
processor:
- service_map:
sink:
- opensearch:
hosts: [ <cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index_type: trace-analytics-service-map
Connecteur HTTP Push
Le connecteur de poussée HTTP nécessite une mise en mémoire tampon persistante.
Le port du connecteur de poussée HTTP est 2021, ce qui ne peut pas être modifié.
Pour plus d'informations sur la configuration, voir Source HTTP.
Vous pouvez utiliser le service de diffusion en continu pour OCI en tant que mémoire tampon persistante. Pour plus d'informations, voir Utilisation d'API Kafka.
Configuration de pipeline de connecteur poussé HTTP
L'exemple suivant présente une configuration de pipeline pour le connecteur Push HTTP :
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret_ocid>
opensearch-password:
secret_id: <secret_ocid>
http-username:
secret_id: <secret_ocid>
http-password:
secret_id: <secret_ocid>
simple-sample-pipeline:
source:
http:
authentication:
http_basic:
username: '${{oci_secrets:http-username}}'
password: '${{oci_secrets:http-password}}'
buffer:
kafka:
# Idempotence should be 'false' for OCI Streaming Service
producer_properties:
enable_idempotence: false
bootstrap_servers:
- <bootstrap_servers>
topics:
- name: <topic_name>
group_id: <group_id>
encryption:
type: ssl
insecure: false
authentication:
sasl:
oci:
stream_pool_id: <target_stream_pool_ocid>
sink:
- opensearch:
hosts: [ <cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index_name>
Connecteur de poussée HTTP avec plusieurs processeurs
L'exemple suivant présente une configuration de pipeline pour le connecteur Push HTTP avec plusieurs processeurs :
# Keep caution while adding percentage symbols
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret_ocid>
opensearch-password:
secret_id: <secret_ocid>
http-username:
secret_id: <secret_ocid>
http-password:
secret_id: <secret_ocid>
simple-sample-pipeline:
source:
http:
path: "/logs"
authentication:
http_basic:
username: '${{oci_secrets:http-username}}'
password: '${{oci_secrets:http-password}}'
buffer:
kafka:
# Idempotence should be 'false' for OCI Streaming Service
producer_properties:
enable_idempotence: false
bootstrap_servers:
- <bootstrap_servers>
topics:
- name: <topic_name>
group_id: <group_id>
encryption:
type: ssl
insecure: false
authentication:
sasl:
oci:
stream_pool_id: <target_stream_pool_ocid>
processor:
- grok:
match:
log: [ "%{COMMONAPACHELOG}" ]
- date:
from_time_received: true
destination: "@timestamp"
- substitute_string:
entries:
- source: "log"
from: '\.'
to: "-"
- uppercase_string:
with_keys:
- "log"
- trim_string:
with_keys:
- "log"
- split_string:
entries:
- source: "request"
delimiter: "?"
- key_value:
source: "/request/1"
field_split_characters: "&"
value_split_characters: "="
destination: "query_params"
- lowercase_string:
with_keys:
- "verb"
- add_entries:
entries:
- key: "entry1"
value: "entry1value"
- key: "entry2"
value: "entry2value"
- key: "entry3"
value: "entry3value"
- rename_keys:
entries:
- from_key: "entry1"
to_key: "renameEntry1"
- from_key: "entry2"
to_key: "renameEntry2"
- from_key: "entry3"
to_key: "renameEntry3"
- copy_values:
entries:
- from_key: "log"
to_key: "copy_key"
- delete_entries:
with_keys: [ "renameEntry1", "renameEntry2", "renameEntry3" ]
sink:
- opensearch:
hosts: [ <cluster_ocid> ]
username: '${{oci_secrets:opensearch-username}}'
password: '${{oci_secrets:opensearch-password}}'
insecure: false
index: <index_name>