OpenSearch Pipelines
Cree y gestione pipelines OpenSearch mediante Data Prepper para introducir datos en un cluster OpenSearch.
Data Prepper es un recopilador de datos de código abierto que puede filtrar, enriquecer, transformar, normalizar y agregar datos para el análisis y la visualización descendentes. Es una de las herramientas de ingestión de datos más recomendadas para procesar conjuntos de datos grandes y complejos.
La función Cluster OpenSearch con Data Prepper está disponible actualmente en el dominio OC1.
Políticas necesarias
Complete las siguientes tareas antes de continuar con los pasos descritos en este tema:
Si no es administrador de su arrendamiento, póngase en contacto con los administradores de su arrendamiento para que le otorguen estos permisos. El administrador debe actualizar el permiso de los siguientes usuarios para permitir que los usuarios que no son administradores gestionen y CRUD operen los pipelines
La siguiente política permite al administrador otorgar permiso a todos los usuarios del arrendamiento correspondiente:
Allow any-user to manage opensearch-cluster-pipeline in tenancy
La siguiente política permite al administrador otorgar permisos para un grupo en un compartimento (recomendado)
Allow group <group> to manage opensearch-cluster-pipeline in compartment <compartment>
donde <group>
es todos los usuarios de ese grupo que pueden acceder al recurso.
La siguiente política permite a los pipelines OpenSearch leer los secretos de Oracle Cloud Infrastructure Vault.
Allow any-user to read secret-bundles in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }' }
Allow any-user to read secrets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }
Políticas de Object Storage
Estas políticas solo son necesarias para el origen de Object Storage:
La siguiente política permite que los pipelines OpenSearch utilicen un cubo de Object Storage como persistencia de coordinación de origen:
Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<source-coordination-bucket-name>'}
La siguiente política permite a los pipelines OpenSearch ingerir objetos de Object Storage:
Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
La siguiente política permite a los pipelines OpenSearch leer cubos de Object Storage:
Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
La siguiente política permite a los pipelines OpenSearch leer cubos del cubo de coordinación de origen.
Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}
Políticas de Kafka autogestionadas y Streaming de OCI
Estas políticas son necesarias para el servicio OCI Streaming o los orígenes de Kafka autogestionados.
Políticas de red comunes
Estas políticas no son necesarias para el servicio público de OCI Streaming.
Políticas que se agregarán para permitir que el servicio OpenSearch cree, lea y actualice la supresión de los puntos finales privados en la subred del cliente.
Allow group SearchOpenSearchAdmins to manage vnics in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage vcns in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage subnets in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to use network-security-groups in compartment <network_resources_compartment>
Políticas del servicio OCI Streaming (públicas y privadas)
Estas políticas solo son necesarias para el servicio OCI Streaming.
La siguiente política permite que los pipelines OpenSearch consuman los registros del servicio OCI Streaming.
Allow ANY-USER TO {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in compartment '<compartment-name>'
where ALL {request.principal.type='opensearchpipeline', target.streampool.id = '<target-stream-pool-ocid>'}
La siguiente política permite a los pipelines OpenSearch leer pools de flujos del servicio de transmisión de OCI
Allow ANY-USER TO read stream-pools in compartment '<compartment-name>' where ALL {request.principal.type='opensearchpipeline',
target.streampool.id = '<target-stream-pool-ocid>'}
Permiso de Kafka autogestionado
Estos permisos solo son necesarios para los orígenes de Kafka autogestionados.
Seleccione el siguiente enlace para agregar el permiso necesario para mostrar los temas, describir los temas, unirse al grupo y consumir los registros del tema por el pipeline OpenSearch:
Reglas de seguridad de red
Esta configuración solo es necesaria para el servicio OCI Streaming privado y Kafka autogestionado. En el caso del servicio público OCI Streaming, no seleccione ninguno.
Agregue una regla de seguridad de entrada en la lista de seguridad de la subred o el grupo de seguridad de red a todos los pipelines OpenSearch para comunicarse con el servicio OCI Streaming privado que se ejecuta en su subred.
Para agregar la regla de seguridad, consulte Reglas de seguridad y Acceso y seguridad.
Para buscar el CIDR de la subred, consulte Obtención de detalles de una subred.
En la siguiente imagen se muestran las reglas de entrada para el grupo de seguridad de red.

Creación de los secretos en Vault
Todos los secretos de texto sin formato que necesitan los pipelines OpenSearch se deben transferir a él a través de Vault, ya que los pipelines OpenSearch no aceptan secretos de texto sin formato como nombres de usuario y contraseñas en el código yaml de los pipelines.
Realice las siguientes tareas:
- Cree un nuevo usuario con permisos de escritura en el cluster OpenSearch. Para obtener instrucciones, consulte los siguientes temas de documentación de OpenSearch:
- Cree secretos (nombre de usuario y contraseña) en Vault para el nuevo usuario que ha creado. Para obtener instrucciones, consulte los siguientes temas de Oracle Cloud Infrastructure Vault:
- Agregue las siguientes políticas de entidad de recurso en su arrendamiento OpenSearch para permitir que los pipelines OpenSearch lean los secretos del almacén.
ALLOW ANY-USER to read secrets in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' } ALLOW ANY-USER to read secret-bundles in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }
Para obtener más información sobre la creación de políticas para Vault, consulte Detalles del servicio Vault.
Puede realizar las siguientes tareas de pipeline OpenSearch:
Muestre los pipelines OpenSearch en un compartimento.
Cree un nuevo pipeline OpenSearch.
Obtenga los detalles de un pipeline OpenSearch.
Procesadores soportados
En la siguiente tabla, se muestran los procesadores soportados en el pipeline.
Almacenamiento de objetos y coordinación de origen YAML
El origen de Object Storage depende de la configuración de coordinación del origen. El pipeline OpenSearch soporta la coordinación de origen mediante Object Storage como persistencia. Debe proporcionar los detalles del cubo de Object Storage de su arrendamiento.
A continuación, se muestra un ejemplo de coordinación de origen:
source_coordination:
store:
oci-object-bucket:
name: <OCI Object storage bucket-name details from their tenancy>
namespace: <namespace>
Para obtener información sobre cómo obtener el espacio de nombres de Object Storage de su arrendamiento, consulte Descripción de los espacios de nombres de Object Storage.
A continuación, se muestra un ejemplo de coordinación de origen mediante la persistencia de Object Storage:
source_coordination:
store:
oci-object-bucket:
name: "dataprepper-test-pipelines" <-- bucket name
namespace: "idee4xpu3dvm". <-- namespace
YAML de Object Storage
A continuación, se muestran las secciones de la configuración de pipeline que debe tener en cuenta:
- Secretos de OCI: puede crear un secreto en Vault con las credenciales de cluster OpenSearch y utilizarlo en el pipeline YAML para conectarse al cluster OpenSearch.
- Disipador OpenSearch: el disipador contiene OCID de cluster OpenSearch con nombres de índice para la ingesta.
- origenoci-object: el preparador de datos soporta la ingestión basada en exploración mediante Object Storage, que admite muchas configuraciones. Puede configurar el origen para que ingiera objetos en el cubo de Object Storage en función de la frecuencia programada o no en función de ningún programa. Tiene las siguientes opciones de exploración
- Exploración una o más veces: esta opción permite configurar el pipeline que lee objetos en cubos de Object Storage una o más veces en función de la hora de última modificación de los objetos.
- Exploración basada en programación: esta opción permite programar una exploración en un intervalo normal después de crear el pipeline.
En la siguiente tabla se muestran las opciones que puede utilizar para configurar el origen de Object Storage.
Opciones | Necesario | Tipo | Descripción |
---|---|---|---|
acknowledgments |
No | Booleano | Cuando true permite que los orígenes de objetos de Object Storage reciban confirmaciones integrales cuando los receptores OpenSearch reciben eventos. |
buffer_timeout |
No | Duración | Cantidad de tiempo permitida para escribir eventos en el buffer de preparador de datos antes de que se produzca el timeout. Los eventos que el origen de OCI no puede escribir en el buffer durante la cantidad de tiempo especificada se descartan. El valor por defecto es 10s . |
codec |
Sí | Codec | El codec del preparador de datos que se debe aplicar. |
compression |
No | Cadena | Algoritmo de compresión que se debe aplicar: none , gzip , snappy o automatic . El valor por defecto es none . |
delete_oci_objects_on_read |
No | Booleano | Cuando true , la exploración de origen de Object Storage intenta suprimir objetos de Object Storage una vez que todos los eventos del objeto de Object Storage han sido confirmados correctamente por todos los receptores. acknowledgments se debe activar al suprimir objetos de Object Storage. El valor por defecto es false . La supresión no funciona si acknowledgments de extremo a extremo no está activado. |
oci |
No | OCI | Configuración de OCI. Para obtener más información, consulte la siguiente sección de OCI. |
records_to_accumulate |
No | Entero | Número de mensajes que se acumulan antes de ser escritos en el buffer. El valor por defecto es 100 . |
workers |
No | Entero | Configura el número de threads de trabajo que el origen utiliza para leer datos del cubo de OCI. Si se deja este valor por defecto, a menos que los objetos de Object Storage tengan menos de 1 MB. El rendimiento puede disminuir para objetos de Object Storage más grandes. El valor por defecto es 1 . |
Configuración de pipeline de Object Storage YAML
A continuación se muestra un ejemplo de la configuración del pipeline de Object Storage YAML:
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
compression: none
scan:
start_time: 2024-11-18T08:01:59.363Z
buckets:
- bucket:
namespace: <namespace>
name: <bucket-name>
sink:
- opensearch:
hosts: [ <cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
configuraciones de ejemplo
Las siguientes son opciones de exploración puntual que puede aplicar en el nivel de cubo de Object Storage individual o en el nivel de exploración
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "<namespace>"
name: "data-prepper-object-storage-testing"
start_time: 2023-01-01T00:00:00Z
compression: "none"
Hora de Finalización
A continuación, se muestra un ejemplo de hora de finalización:
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
end_time: 2024-12-01T00:00:00Z
compression: "none"
Hora de inicio y Hora de finalización
A continuación se muestra un ejemplo de la hora de inicio y la hora de finalización:
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
end_time: 2024-12-01T00:00:00Z
compression: "none"
Rango
A continuación, se muestra un ejemplo de rango:
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
range: "PT12H"
compression: "none"
Hora de Inicio, Hora de Finalización y Rango
A continuación se muestra un ejemplo de la hora de inicio, la hora de finalización y el rango:
oci-object:
codec:
newline:
scan:
start_time: 2023-01-01T00:00:00Z
end_time: 2024-12-01T00:00:00Z
range: "PT12H"
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
Filtro include_prefix
A continuación, se muestra un ejemplo del filtro include_prefix
:
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
filter:
include_prefix: ["newtest1", "10-05-2024"]
compression: "none"
Filtrar archivos de carpeta
A continuación, se muestra un ejemplo de archivos de filtro de la carpeta. Para leer archivos solo desde carpetas específicas, utilice el filtro para especificar carpetas. A continuación, se muestra un ejemplo de inclusión de archivos de folder2 en folder1 mediante include_prefix
.
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
filter:
include_prefix: ["folder1/folder2"]
compression: "none"
Filtro exclude_prefix
A continuación, se muestra un ejemplo del filtro exclude_prefix
:
simple-sample-pipeline:
source:
oci-object:
codec:
newline:
scan:
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2023-12-01T00:00:00Z
filter:
include_prefix: ["newtest", "10-05-2024"]
exclude_suffix: [".png"]
compression: "none"
Soporte de Codec para JSON
A continuación, se muestra un ejemplo de soporte de códec para JSON:
source:
oci-object:
acknowledgments: true
codec:
json: null
scan:
start_time: 2024-06-10T00:00:00Z
end_time: 2024-06-10T23:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
- bucket:
namespace: "idee4xpu3dvm"
name: "object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
compression: "none"
Compatibilidad con códec para CSV
A continuación, se muestra un ejemplo de compatibilidad con códec para CSV:
source:
oci-object:
acknowledgments: true
codec:
csv: null
scan:
start_time: 2024-06-10T00:00:00Z
end_time: 2024-06-10T23:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
- bucket:
namespace: "idee4xpu3dvm"
name: "object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
compression: "none"
Soporte de códec para Newline
A continuación, se muestra un ejemplo de compatibilidad con códec para la nueva línea:
source:
oci-object:
acknowledgments: true
codec:
newline: null
scan:
start_time: 2024-06-10T00:00:00Z
end_time: 2024-06-10T23:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
- bucket:
namespace: "idee4xpu3dvm"
name: "object-storage-testing"
start_time: 2024-06-13T00:00:00Z
end_time: 2024-06-13T23:00:00Z
compression: "none"
Programación de opciones de ingesta sin recuento
A continuación, se muestra un ejemplo de programación de opciones de ingestión sin recuento:
simple-sample-pipeline:
source:
oci-object:
codec:
newline: null
scan:
scheduling:
interval: PT40S
buckets:
- bucket:
namespace: idee4xpu3dvm
name: data-prepper-object-storage-testing
compression: none
Programación de opciones de ingesta con recuento
A continuación, se muestra un ejemplo de programación de opciones de ingesta con recuento:
simple-sample-pipeline:
source:
oci-object:
codec:
newline: null
scan:
scheduling:
interval: PT40S
count: 10
buckets:
- bucket:
namespace: idee4xpu3dvm
name: data-prepper-object-storage-testing
compression: none
Programación de opciones de ingesta con hora de inicio
A continuación, se muestra un ejemplo de programación de opciones de ingesta con hora de inicio:
oci-object:
codec:
newline:
scan:
scheduling:
interval: "PT40S"
count: 10
start_time: 2023-01-01T00:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
compression: "none"
Programación de opciones de ingesta con hora de finalización
A continuación, se muestra un ejemplo de programación de opciones de ingesta con hora de finalización:
oci-object:
codec:
newline:
scan:
scheduling:
interval: "PT40S"
count: 10
end_time: 2023-01-01T00:00:00Z
buckets:
- bucket:
namespace: "idee4xpu3dvm"
name: "data-prepper-object-storage-testing"
compression: "none"
Kafka YAML
La fuente de Kafka no requiere ninguna coordinación de fuentes.
Para obtener información sobre todas las configuraciones disponibles para el origen de Kafka, acceda al siguiente enlace:
https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kafka/
Puede utilizar el servicio OCI Streaming como origen de Kafka para realizar la ingesta en el cluster OpenSearch. Para obtener información sobre cómo hacerlo, consulte Uso de API de Kafka.
OCI Streaming Public Access YAML
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
kafka-pipeline:
source:
kafka:
bootstrap_servers:
- <bootstrap_servers>
topics:
- name: <topic_name>
group_id: <group_id>
acknowledgments: true
encryption:
type: ssl
insecure: false
authentication:
sasl:
oci:
stream_pool_id: <target-stream-pool-ocid>
sink:
- opensearch:
hosts: [ <opensearch-cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
Pipelines OCI Streaming Service - Acceso privado YAML
A continuación se muestra un ejemplo del YAML para los pipelines OpenSearch en el servicio OCI Streaming:
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
kafka-pipeline:
source:
kafka:
bootstrap_servers:
- <bootstrap_servers>
topics:
- name: <topic_name>
group_id: <group_id>
acknowledgments: true
encryption:
type: ssl
insecure: false
authentication:
sasl:
oci:
stream_pool_id: <target-stream-pool-ocid>
sink:
- opensearch:
hosts: [ <opensearch-cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>
Kafka YAML autogestionado
A continuación se muestra un ejemplo del YAML de Kafka autogestionado para OpenSearch:
version: 2
pipeline_configurations:
oci:
secrets:
opensearch-username:
secret_id: <secret-ocid>
opensearch-password:
secret_id: <secret-ocid>
kafka-credentials:
secret_id: <secret-ocid>
simple-sample-pipeline:
source:
kafka:
bootstrap_servers:
- "https://<bootstrap_server_fqdn>:9092"
topics:
- name: <topic_name>
group_id: <group_id>
acknowledgments: true
encryption:
type: ssl
insecure: false
certificate: <certificate-in-pem-format>
authentication:
sasl:
plaintext:
username: ${{oci_secrets:kafka-credentials:username}}
password: ${{oci_secrets:kafka-credentials:password}}
sink:
- opensearch:
hosts: [ <opensearch-cluster-ocid> ]
username: ${{oci_secrets:opensearch-username}}
password: ${{oci_secrets:opensearch-password}}
insecure: false
index: <index-name>