OpenSearch Pipelines
Erstellen und verwalten Sie OpenSearch-Pipelines mit Data Prepper, um Daten in ein OpenSearch-Cluster aufzunehmen.
Data Prepper ist ein Open-Source-Datensammler, der Daten filtern, anreichern, transformieren, normalisieren und aggregieren kann, um nachgelagerte Analysen und Visualisierungen zu ermöglichen. Es ist eines der am meisten empfohlenen Datenaufnahme-Tools für die Verarbeitung großer und komplexer Datasets.
Das Feature "Cluster mit Daten-Prepper" OpenSearch ist derzeit in der Realm OC1 verfügbar.
Erforderliche Policys
Führen Sie die folgenden Aufgaben aus, bevor Sie mit den in diesem Thema beschriebenen Schritten fortfahren:
Wenn Sie kein Administrator in Ihrem Mandanten sind, bitten Sie Ihre Mandantenadministratoren, Ihnen diese Berechtigungen zu erteilen. Der Administrator muss die folgenden Benutzerberechtigungen aktualisieren, damit Nicht-Administratorbenutzer die Pipelines verwalten und CRUD-Vorgänge ausführen können
Mit der folgenden Policy kann der Administrator allen Benutzern im jeweiligen Mandanten Berechtigungen erteilen:
Allow any-user to manage opensearch-cluster-pipeline in tenancy
Mit der folgenden Policy kann der Administrator eine Berechtigung für eine Gruppe in einem Compartment erteilen (empfohlen)
Allow group <group> to manage opensearch-cluster-pipeline in compartment <compartment>
wobei <group>
alle Benutzer innerhalb dieser Gruppe auf die Ressource zugreifen können.
Mit der folgenden Policy können OpenSearch-Pipelines die Secrets aus Oracle Cloud Infrastructure Vault lesen.
Allow any-user to read secret-bundles in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }' }
Allow any-user to read secrets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }
Object Storage-Policys
Diese Policys sind nur für die Object Storage-Quelle erforderlich:
Mit der folgenden Policy können OpenSearch-Pipelines einen Bucket aus Object Storage als Quellkoordinationspersistenz verwenden:
Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<source-coordination-bucket-name>'}
Mit der folgenden Policy können OpenSearch-Pipelines Objekte aus Object Storage aufnehmen:
Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
Mit der folgenden Policy können OpenSearch-Pipelines Buckets aus Object Storage lesen:
Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
Mit der folgenden Policy können OpenSearch-Pipelines Buckets aus dem Quellkoordinations-Bucket lesen.
Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
OCI Streaming und selbstverwaltete Kafka-Policys
Diese Policys sind für OCI Streaming-Service oder selbstverwaltete Kafka-Quellen erforderlich.
Allgemeine Netzwerk-Policys
Diese Policys sind für den öffentlichen OCI Streaming-Service nicht erforderlich.
Policys, die hinzugefügt werden, damit der OpenSearch-Service die privaten Endpunkte im Kundensubnetz erstellen, lesen und aktualisieren kann.
Allow group SearchOpenSearchAdmins to manage vnics in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage vcns in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage subnets in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to use network-security-groups in compartment <network_resources_compartment>
OCI Streaming Service-Richtlinien (öffentlich und privat)
Diese Policys sind nur für den OCI Streaming-Service erforderlich.
Mit der folgenden Policy können OpenSearch-Pipelines die Datensätze aus dem OCI Streaming-Service konsumieren.
Allow ANY-USER TO {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in compartment '<compartment-name>'
where ALL {request.principal.type='opensearchpipeline', target.streampool.id = '<target-stream-pool-ocid>'}
Mit der folgenden Policy können OpenSearch-Pipelines Streampools aus dem OCI-Streamingservice lesen
Allow ANY-USER TO read stream-pools in compartment '<compartment-name>' where ALL {request.principal.type='opensearchpipeline',
target.streampool.id = '<target-stream-pool-ocid>'}
Selbstverwaltete Kafka-Berechtigung
Diese Berechtigungen sind nur für selbstverwaltete Kafka-Quellen erforderlich.
Wählen Sie den folgenden Link aus, um die erforderliche Berechtigung hinzuzufügen, um die Themen aufzulisten, die Themen zu beschreiben, der Gruppe beizutreten und die Datensätze aus dem Thema von der Pipeline OpenSearch zu konsumieren:
Netzwerksicherheitsregeln
Diese Konfiguration ist nur für den privaten OCI Streaming-Service und das selbstverwaltete Kafka erforderlich. Wählen Sie im Falle eines öffentlichen OCI-Streaming-Service keine Option aus.
Fügen Sie in der Sicherheitsliste des Subnetzes oder der Netzwerksicherheitsgruppe allen OpenSearch-Pipelines eine Ingress-Sicherheitsregel hinzu, um mit dem privaten OCI-Streaming-Service zu kommunizieren, der in Ihrem Subnetz ausgeführt wird.
Informationen zum Hinzufügen der Sicherheitsregel finden Sie unter Sicherheitsregeln und Zugriff und Sicherheit.
Informationen zum Suchen des CIDR Ihres Subnetzes finden Sie unter Details eines Subnetzes abrufen.
Die folgende Abbildung zeigt die Ingress-Regeln für die Netzwerksicherheitsgruppe.

Secrets im Vault erstellen
Alle Klartext-Secrets, die für die OpenSearch-Pipelines erforderlich sind, müssen über Vault an sie übergeben werden, da OpenSearch-Pipelines keine Klartext-Secrets wie Benutzernamen und Kennwörter im YAML-Code der Pipelines akzeptieren.
Führen Sie die folgenden Aufgaben aus:
- Erstellen Sie einen neuen Benutzer mit Schreibberechtigungen im OpenSearch-Cluster. Anweisungen finden Sie in den folgenden Dokumentationsthemen zu OpenSearch:
- Erstellen Sie Secrets (Benutzername und Kennwort) in Vault für den neu erstellten Benutzer. Anweisungen finden Sie in den folgenden Oracle Cloud Infrastructure Vault-Themen:
- Fügen Sie die folgenden Resource Principal-Policys im OpenSearch-Mandanten hinzu, damit OpenSearch-Pipelines die Secrets aus dem Vault lesen können.
ALLOW ANY-USER to read secrets in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' } ALLOW ANY-USER to read secret-bundles in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }
Weitere Informationen zum Erstellen von Policys für Vault finden Sie unter Details zum Vault-Service.
Sie können die folgenden OpenSearch-Pipelineaufgaben ausführen:
Listen Sie die OpenSearch-Pipelines in einem Compartment auf.
Neue OpenSearch-Pipeline erstellen.
Details einer OpenSearch-Pipeline abrufen.
Unterstützte Prozessore
In der folgenden Tabelle werden diese Prozessoren aufgeführt, die in der Pipeline unterstützt werden.
Object Storage und Quellkoordination YAML
Die Object Storage-Quelle hängt von der Konfiguration der Quellkoordination ab. Die OpenSearch-Pipeline unterstützt die Quellkoordination mit Object Storage als Persistenz. Sie müssen die Details des Objektspeicher-Buckets aus Ihrem Mandanten angeben.
Im Folgenden finden Sie ein Beispiel für die Quellkoordination:
source_coordination:
store:
oci-object-bucket:
name: <OCI Object storage bucket-name details from their tenancy>
namespace: <namespace>
Informationen zum Abrufen des Object Storage-Namespace Ihres Mandanten finden Sie unter Object Storage-Namespaces.
Im Folgenden finden Sie ein Beispiel für die Quellkoordination mit Object Storage-Persistence:
source_coordination:
store:
oci-object-bucket:
name: "dataprepper-test-pipelines" <-- bucket name
namespace: "idee4xpu3dvm". <-- namespace
Objektspeicher-YAML
Die folgenden Abschnitte der Pipelinekonfiguration YAML sind zu beachten:
- OCI-Secrets: Sie können ein Secret in Vault mit den Zugangsdaten des OpenSearch-Clusters erstellen und es in der Pipeline-YAML für die Verbindung zum Cluster OpenSearch verwenden.
- OpenSearch Sink: Die Sink-Datei enthält OpenSearch-Cluster-OCIDs mit Indexnamen für die Aufnahme.
- oci-object source: Data Prepper unterstützt die scanbasierte Aufnahme mit Object Storage, für die zahlreiche Konfigurationen unterstützt werden. Sie können die Quelle so konfigurieren, dass Objekte in Ihrem Objektspeicher-Bucket basierend auf der geplanten Häufigkeit oder nicht basierend auf einem Zeitplan aufgenommen werden. Sie haben folgende Scan-Optionen
- Ein oder mehrere Zeitscans: Mit dieser Option können Sie eine Pipeline konfigurieren, die Objekte in Object Storage-Buckets ein- oder mehrmals basierend auf der letzten Änderungszeit von Objekten liest.
- Planungsbasierter Scan: Mit dieser Option können Sie einen Scan in einem regulären Intervall planen, nachdem die Pipeline erstellt wurde.
In der folgenden Tabelle sind die Optionen aufgeführt, mit denen Sie die Object Storage-Quelle konfigurieren können.
Optionen | Erforderlich | Typ | Beschreibung |
---|---|---|---|
acknowledgments |
Nein | Boolescher Wert | Wenn true verwendet wird, können Object Storage-Objektquellen End-to-End-Bestätigungen empfangen, wenn Ereignisse von OpenSearch-Senken empfangen werden. |
buffer_timeout |
Nein | Zeitraum | Die zulässige Zeit für das Schreiben von Ereignissen in den Data Prepper-Puffer, bevor ein Timeout auftritt. Alle Ereignisse, die die OCI-Quelle während der angegebenen Zeit nicht in den Puffer schreiben kann, werden verworfen. Der Standardwert ist 10s . |
codec |
Ja | Codec | Der codec vom anzuwendenden Data-Prepper. |
compression |
Nein | Zeichenfolge | Der anzuwendende Komprimierungsalgorithmus: none , gzip , snappy oder automatic . Der Standardwert ist none . |
delete_oci_objects_on_read |
Nein | Boolescher Wert | Wenn true verwendet wird, versucht der Object Storage-Quellscan, Object Storage-Objekte zu löschen, nachdem alle Ereignisse aus dem Object Storage-Objekt erfolgreich von allen Senken bestätigt wurden. acknowledgments muss beim Löschen von Object Storage-Objekten aktiviert sein. Der Standardwert ist false . Löschen funktioniert nicht, wenn End-to-End-acknowledgments nicht aktiviert ist. |
oci |
Nein | OCI | Die OCI-Konfiguration. Weitere Informationen finden Sie im folgenden Abschnitt zu OCI. |
records_to_accumulate |
Nein | Ganzzahl | Die Anzahl der Nachrichten, die sich ansammeln, bevor sie in den Puffer geschrieben werden. Der Standardwert ist 100 . |
workers |
Nein | Ganzzahl | Konfiguriert die Anzahl der Worker-Threads, mit denen die Quelle Daten aus dem OCI-Bucket liest. Dieser Wert wird standardmäßig beibehalten, es sei denn, Ihre Object Storage-Objekte sind weniger als 1 MB. Bei größeren Object Storage-Objekten kann sich die Performance verringern. Der Standardwert ist 1 . |
Object Storage-Pipelinekonfiguration - YAML
Im Folgenden finden Sie ein Beispiel für die YAML-Konfiguration der Object Storage-Pipeline:
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
compression: none
scan:
start_time: 2024-11-18T08:01:59.363Z
buckets:
- bucket:
namespace: <namespace>
name: <bucket-name>
sink:
- opensearch:
hosts: [ <cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
Beispielkonfigurationen
Im Folgenden finden Sie Optionen zum einmaligen Scannen, die Sie auf Ebene des einzelnen Object Storage-Buckets oder auf Scanebene anwenden können.
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "<namespace>"
name: "data-prepper-object-storage-testing"
start_time: 2023-01-01T00:00:00Z
compression: "none"
Endzeit
Im Folgenden finden Sie ein Beispiel für die Endzeit:
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
end_time: 2024-12-01T00:00:00Z
compression: "none"
Start- und Enduhrzeit
Im Folgenden finden Sie ein Beispiel für Start- und Endzeit:
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
end_time: 2024-12-01T00:00:00Z
compression: "none"
Range
Im Folgenden finden Sie ein Beispiel für einen Bereich:
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
range: "PT12H"
compression: "none"
Startzeit, Endzeit und Bereich
Im Folgenden finden Sie ein Beispiel für Startzeit, Endzeit und Bereich:
oci-object:
codec:
newline:
scan:
start_time: 2023-01-01T00:00:00Z
end_time: 2024-12-01T00:00:00Z
range: "PT12H"
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
include_prefix
Filter
Im Folgenden finden Sie ein Beispiel für den include_prefix
-Filter:
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
filter:
include_prefix: ["newtest1", "10-05-2024"]
compression: "none"
Dateien aus Ordner filtern
Im Folgenden finden Sie ein Beispiel für Filterdateien aus dem Ordner. Um Dateien nur aus bestimmten Ordnern zu lesen, verwenden Sie den Filter, um Ordner anzugeben. Im Folgenden finden Sie ein Beispiel für das Einschließen von Dateien aus folder2 in folder1 mit include_prefix
.
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
filter:
include_prefix: ["folder1/folder2"]
compression: "none"
exclude_prefix
Filter
Im Folgenden finden Sie ein Beispiel für den Filter exclude_prefix
:
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
filter:
include_prefix: ["newtest", "10-05-2024"]
exclude_suffix: [".png"]
compression: "none"
Codec-Unterstützung für JSON
Im Folgenden finden Sie ein Beispiel für die Codec-Unterstützung für JSON:
source:
oci-object:
acknowledgments: true
codec:
json: null
scan:
start_time: 2024-06-10T00:00:00Z
end_time: 2024-06-10T23:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
- bucket:
namespace: "idee4xpu3dvm"
name: "object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
compression: "none"
Codec-Unterstützung für CSV
Im Folgenden finden Sie ein Beispiel für die Codec-Unterstützung für CSV:
source:
oci-object:
acknowledgments: true
codec:
csv: null
scan:
start_time: 2024-06-10T00:00:00Z
end_time: 2024-06-10T23:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
- bucket:
namespace: "idee4xpu3dvm"
name: "object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
compression: "none"
Codec-Unterstützung für Newline
Im Folgenden finden Sie ein Beispiel für die Codec-Unterstützung für Newline:
source:
oci-object:
acknowledgments: true
codec:
newline: null
scan:
start_time: 2024-06-10T00:00:00Z
end_time: 2024-06-10T23:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
- bucket:
namespace: "idee4xpu3dvm"
name: "object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
compression: "none"
Aufnahmeoptionen ohne Inventur planen
Im Folgenden finden Sie ein Beispiel für die Planung von Aufnahmeoptionen ohne Zählung:
simple-sample-pipeline:
source:
oci-object:
codec:
newline: null
scan:
scheduling:
interval: PT40S
buckets:
- bucket:
namespace: idee4xpu3dvm
name: data-prepper-object-storage-testing
compression: none
Aufnahmeoptionen mit Anzahl planen
Im Folgenden finden Sie ein Beispiel für die Planung von Aufnahmeoptionen mit der Anzahl:
simple-sample-pipeline:
source:
oci-object:
codec:
newline: null
scan:
scheduling:
interval: PT40S
count: 10
buckets:
- bucket:
namespace: idee4xpu3dvm
name: data-prepper-object-storage-testing
compression: none
Aufnahmeoptionen mit Startzeit planen
Im Folgenden finden Sie ein Beispiel für die Planung von Aufnahmeoptionen mit der Startzeit:
oci-object:
codec:
newline:
scan:
scheduling:
interval: "PT40S"
count: 10
start_time: 2023-01-01T00:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
compression: "none"
Aufnahmeoptionen mit Endzeit planen
Im Folgenden finden Sie ein Beispiel für die Planung von Aufnahmeoptionen mit Endzeit:
oci-object:
codec:
newline:
scan:
scheduling:
interval: "PT40S"
count: 10
end_time: 2023-01-01T00:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
compression: "none"
Kafka YAML
Für die Kafka-Quelle ist keine Quellkoordination erforderlich.
Informationen zu allen verfügbaren Konfigurationen für die Kafka-Quelle finden Sie unter folgendem Link:
https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kafka/
Sie können den OCI Streaming Service als Kafka-Quelle für die Aufnahme in das OpenSearch-Cluster verwenden. Weitere Informationen hierzu finden Sie unter Kafka-APIs verwenden.
OCI Streaming - Öffentlicher Zugriff - YAML
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
kafka-pipeline:
source:
kafka:
bootstrap_servers:
- <bootstrap_servers>
topics:
- name: <topic_name>
group_id: <group_id>
acknowledgments: true
encryption:
type: ssl
insecure: false
authentication:
sasl:
oci:
stream_pool_id: <target-stream-pool-ocid>
sink:
- opensearch:
hosts: [ <opensearch-cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
Pipelines OCI Streaming Service - Privater Zugriff YAML
Im Folgenden finden Sie ein Beispiel für die YAML für die OpenSearch-Pipelines im OCI Streaming-Service:
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
kafka-pipeline:
source:
kafka:
bootstrap_servers:
- <bootstrap_servers>
topics:
- name: <topic_name>
group_id: <group_id>
acknowledgments: true
encryption:
type: ssl
insecure: false
authentication:
sasl:
oci:
stream_pool_id: <target-stream-pool-ocid>
sink:
- opensearch:
hosts: [ <opensearch-cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
Selbstverwalteter Kafka YAML
Im Folgenden finden Sie ein Beispiel für die selbstverwaltete Kafka-YAML für die OpenSearch:
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
kafka-credentials:
secret_id: <secret-ocid>
simple-sample-pipeline:
source:
kafka:
bootstrap_servers:
- "https://<bootstrap_server_fqdn>:9092"
topics:
- name: <topic_name>
group_id: <group_id>
acknowledgments: true
encryption:
type: ssl
insecure: false
certificate: <certificate-in-pem-format>
authentication:
sasl:
plaintext:
username: ${{oci_secrets:kafka-credentials:username}}
password: ${{oci_secrets:kafka-credentials:password}}
sink:
- opensearch:
hosts: [ <opensearch-cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>