OpenSearch Pipelines

Cree y gestione pipelines OpenSearch mediante Data Prepper para introducir datos en un cluster OpenSearch.

Data Prepper es un recopilador de datos de código abierto que puede filtrar, enriquecer, transformar, normalizar y agregar datos para el análisis y la visualización descendentes. Es una de las herramientas de ingestión de datos más recomendadas para procesar conjuntos de datos grandes y complejos.

Utilizamos el modelo PULL para ingerir datos en el cluster OpenSearch 2.x e integrarlos con el servicio Oracle Cloud Infrastructure Streaming, Kafka autogestionado y Object Storage.
Nota

La función Cluster OpenSearch con Data Prepper está disponible actualmente en el dominio OC1.

Políticas necesarias

Complete las siguientes tareas antes de continuar con los pasos descritos en este tema:

Si no es administrador de su arrendamiento, póngase en contacto con los administradores de su arrendamiento para que le otorguen estos permisos. El administrador debe actualizar el permiso de los siguientes usuarios para permitir que los usuarios que no son administradores gestionen y CRUD operen los pipelines

La siguiente política permite al administrador otorgar permiso a todos los usuarios del arrendamiento correspondiente:

Allow any-user to manage opensearch-cluster-pipeline in tenancy

La siguiente política permite al administrador otorgar permisos para un grupo en un compartimento (recomendado)

Allow group <group> to manage opensearch-cluster-pipeline in compartment <compartment>

donde <group> es todos los usuarios de ese grupo que pueden acceder al recurso.

La siguiente política permite a los pipelines OpenSearch leer los secretos de Oracle Cloud Infrastructure Vault.

Allow any-user to read secret-bundles in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }' }
Allow any-user to read secrets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }

Políticas de Object Storage

Estas políticas solo son necesarias para el origen de Object Storage:

La siguiente política permite que los pipelines OpenSearch utilicen un cubo de Object Storage como persistencia de coordinación de origen:

Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<source-coordination-bucket-name>'}

La siguiente política permite a los pipelines OpenSearch ingerir objetos de Object Storage:

Allow any-user to manage objects in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}

La siguiente política permite a los pipelines OpenSearch leer cubos de Object Storage:

Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}

La siguiente política permite a los pipelines OpenSearch leer cubos del cubo de coordinación de origen.

Allow any-user to read buckets in compartment <compartment> WHERE ALL {request.principal.type='opensearchpipeline', target.bucket.name='<bucket_name>'}

Políticas de Kafka autogestionadas y Streaming de OCI

Estas políticas son necesarias para el servicio OCI Streaming o los orígenes de Kafka autogestionados.

Políticas de red comunes

Nota

Estas políticas no son necesarias para el servicio público de OCI Streaming.

Políticas que se agregarán para permitir que el servicio OpenSearch cree, lea y actualice la supresión de los puntos finales privados en la subred del cliente.

Allow group SearchOpenSearchAdmins to manage vnics in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage vcns in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to manage subnets in compartment <network_resources_compartment>
Allow group SearchOpenSearchAdmins to use network-security-groups in compartment <network_resources_compartment>

Políticas del servicio OCI Streaming (públicas y privadas)

Estas políticas solo son necesarias para el servicio OCI Streaming.

La siguiente política permite que los pipelines OpenSearch consuman los registros del servicio OCI Streaming.

Allow ANY-USER TO {STREAM_INSPECT, STREAM_READ, STREAM_CONSUME} in compartment '<compartment-name>' 
where ALL {request.principal.type='opensearchpipeline', target.streampool.id = '<target-stream-pool-ocid>'}

La siguiente política permite a los pipelines OpenSearch leer pools de flujos del servicio de transmisión de OCI

Allow ANY-USER TO read stream-pools in compartment '<compartment-name>' where ALL {request.principal.type='opensearchpipeline', 
target.streampool.id = '<target-stream-pool-ocid>'}

Permiso de Kafka autogestionado

Estos permisos solo son necesarios para los orígenes de Kafka autogestionados.

Seleccione el siguiente enlace para agregar el permiso necesario para mostrar los temas, describir los temas, unirse al grupo y consumir los registros del tema por el pipeline OpenSearch:

https://kafka.apache.org/documentation/#security_authz

Reglas de seguridad de red

Esta configuración solo es necesaria para el servicio OCI Streaming privado y Kafka autogestionado. En el caso del servicio público OCI Streaming, no seleccione ninguno.

Agregue una regla de seguridad de entrada en la lista de seguridad de la subred o el grupo de seguridad de red a todos los pipelines OpenSearch para comunicarse con el servicio OCI Streaming privado que se ejecuta en su subred.

Para agregar la regla de seguridad, consulte Reglas de seguridad y Acceso y seguridad.

Para buscar el CIDR de la subred, consulte Obtención de detalles de una subred.

En la siguiente imagen se muestran las reglas de entrada para el grupo de seguridad de red.

Reglas de entrada para grupos de seguridad de red

Creación de los secretos en Vault

Todos los secretos de texto sin formato que necesitan los pipelines OpenSearch se deben transferir a él a través de Vault, ya que los pipelines OpenSearch no aceptan secretos de texto sin formato como nombres de usuario y contraseñas en el código yaml de los pipelines.

Realice las siguientes tareas:

  • Cree un nuevo usuario con permisos de escritura en el cluster OpenSearch. Para obtener instrucciones, consulte los siguientes temas de documentación de OpenSearch:

    Grupos de acción por defecto

    Usuarios y Roles

    Buscar con OpenSearch políticas de IAM

  • Cree secretos (nombre de usuario y contraseña) en Vault para el nuevo usuario que ha creado. Para obtener instrucciones, consulte los siguientes temas de Oracle Cloud Infrastructure Vault:

    Gestión de secretos de almacén

    Creación de un secreto en un almacén

  • Agregue las siguientes políticas de entidad de recurso en su arrendamiento OpenSearch para permitir que los pipelines OpenSearch lean los secretos del almacén.
    ALLOW ANY-USER to read secrets in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' } 
    ALLOW ANY-USER to read secret-bundles in compartment '<customer-compartment>' WHERE ALL {request.principal.type='opensearchpipeline', target.secret.id = '<target-secret-ocid>' }

    Para obtener más información sobre la creación de políticas para Vault, consulte Detalles del servicio Vault.

Puede realizar las siguientes tareas de pipeline OpenSearch:

Muestre los pipelines OpenSearch en un compartimento.

Cree un nuevo pipeline OpenSearch.

Obtenga los detalles de un pipeline OpenSearch.

Edite la configuración de un pipeline OpenSearch.

Suprimir un pipeline OpenSearch de su arrendamiento.

Procesadores soportados

Almacenamiento de objetos y coordinación de origen YAML

El origen de Object Storage depende de la configuración de coordinación del origen. El pipeline OpenSearch soporta la coordinación de origen mediante Object Storage como persistencia. Debe proporcionar los detalles del cubo de Object Storage de su arrendamiento.

A continuación, se muestra un ejemplo de coordinación de origen:

source_coordination:
  store:
    oci-object-bucket:
      name: <OCI Object storage bucket-name details from their tenancy>
      namespace: <namespace>

Para obtener información sobre cómo obtener el espacio de nombres de Object Storage de su arrendamiento, consulte Descripción de los espacios de nombres de Object Storage.

A continuación, se muestra un ejemplo de coordinación de origen mediante la persistencia de Object Storage:

source_coordination:
  store:
    oci-object-bucket:
      name: "dataprepper-test-pipelines" <-- bucket name
      namespace: "idee4xpu3dvm".         <-- namespace

YAML de Object Storage

A continuación, se muestran las secciones de la configuración de pipeline que debe tener en cuenta:

  • Secretos de OCI: puede crear un secreto en Vault con las credenciales de cluster OpenSearch y utilizarlo en el pipeline YAML para conectarse al cluster OpenSearch.
  • Disipador OpenSearch: el disipador contiene OCID de cluster OpenSearch con nombres de índice para la ingesta.
  • origenoci-object: el preparador de datos soporta la ingestión basada en exploración mediante Object Storage, que admite muchas configuraciones. Puede configurar el origen para que ingiera objetos en el cubo de Object Storage en función de la frecuencia programada o no en función de ningún programa. Tiene las siguientes opciones de exploración
    • Exploración una o más veces: esta opción permite configurar el pipeline que lee objetos en cubos de Object Storage una o más veces en función de la hora de última modificación de los objetos.
    • Exploración basada en programación: esta opción permite programar una exploración en un intervalo normal después de crear el pipeline.

En la siguiente tabla se muestran las opciones que puede utilizar para configurar el origen de Object Storage.

Configuraciones de Object Storage
Opciones Necesario Tipo Descripción
acknowledgments No Booleano Cuando true permite que los orígenes de objetos de Object Storage reciban confirmaciones integrales cuando los receptores OpenSearch reciben eventos.
buffer_timeout No Duración Cantidad de tiempo permitida para escribir eventos en el buffer de preparador de datos antes de que se produzca el timeout. Los eventos que el origen de OCI no puede escribir en el buffer durante la cantidad de tiempo especificada se descartan. El valor por defecto es 10s.
codec Codec El codec del preparador de datos que se debe aplicar.
compression No Cadena Algoritmo de compresión que se debe aplicar: none, gzip, snappy o automatic. El valor por defecto es none.
delete_oci_objects_on_read No Booleano Cuando true, la exploración de origen de Object Storage intenta suprimir objetos de Object Storage una vez que todos los eventos del objeto de Object Storage han sido confirmados correctamente por todos los receptores. acknowledgments se debe activar al suprimir objetos de Object Storage. El valor por defecto es false. La supresión no funciona si acknowledgments de extremo a extremo no está activado.
oci No OCI Configuración de OCI. Para obtener más información, consulte la siguiente sección de OCI.
records_to_accumulate No Entero Número de mensajes que se acumulan antes de ser escritos en el buffer. El valor por defecto es 100.
workers No Entero Configura el número de threads de trabajo que el origen utiliza para leer datos del cubo de OCI. Si se deja este valor por defecto, a menos que los objetos de Object Storage tengan menos de 1 MB. El rendimiento puede disminuir para objetos de Object Storage más grandes. El valor por defecto es 1.

Configuración de pipeline de Object Storage YAML

A continuación se muestra un ejemplo de la configuración del pipeline de Object Storage YAML:

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username: 
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      compression: none
      scan:
        start_time: 2024-11-18T08:01:59.363Z
        buckets:
          - bucket:
              namespace: <namespace>
              name: <bucket-name>
  sink:
    - opensearch:
        hosts: [ <cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>

configuraciones de ejemplo

Las siguientes son opciones de exploración puntual que puede aplicar en el nivel de cubo de Object Storage individual o en el nivel de exploración

oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "<namespace>"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-01-01T00:00:00Z
      compression: "none"

Hora de Finalización

A continuación, se muestra un ejemplo de hora de finalización:


simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               end_time: 2024-12-01T00:00:00Z
      compression: "none"

Hora de inicio y Hora de finalización

A continuación se muestra un ejemplo de la hora de inicio y la hora de finalización:

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               end_time: 2024-12-01T00:00:00Z
      compression: "none"

Rango

A continuación, se muestra un ejemplo de rango:

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               range: "PT12H"
      compression: "none"

Hora de Inicio, Hora de Finalización y Rango

A continuación se muestra un ejemplo de la hora de inicio, la hora de finalización y el rango:

oci-object:
      codec:
        newline:
      scan:
        start_time: 2023-01-01T00:00:00Z
        end_time: 2024-12-01T00:00:00Z
        range: "PT12H"
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"

Filtro include_prefix

A continuación, se muestra un ejemplo del filtro include_prefix:

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               filter:
                 include_prefix: ["newtest1", "10-05-2024"]
      compression: "none"

Filtrar archivos de carpeta

A continuación, se muestra un ejemplo de archivos de filtro de la carpeta. Para leer archivos solo desde carpetas específicas, utilice el filtro para especificar carpetas. A continuación, se muestra un ejemplo de inclusión de archivos de folder2 en folder1 mediante include_prefix.

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               filter:
                include_prefix: ["folder1/folder2"]
      compression: "none"

Filtro exclude_prefix

A continuación, se muestra un ejemplo del filtro exclude_prefix:

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline:
      scan:
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
               start_time: 2023-12-01T00:00:00Z
               filter:
                 include_prefix: ["newtest", "10-05-2024"]
                 exclude_suffix: [".png"]
      compression: "none"

Soporte de Codec para JSON

A continuación, se muestra un ejemplo de soporte de códec para JSON:

source:
    oci-object:
      acknowledgments: true
      codec:
        json: null
      scan:
        start_time: 2024-06-10T00:00:00Z
        end_time: 2024-06-10T23:00:00Z
        buckets:
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "data-prepper-object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
      compression: "none"

Compatibilidad con códec para CSV

A continuación, se muestra un ejemplo de compatibilidad con códec para CSV:


source:
    oci-object:
      acknowledgments: true
      codec:
        csv: null
      scan:
        start_time: 2024-06-10T00:00:00Z
        end_time: 2024-06-10T23:00:00Z
        buckets:
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "data-prepper-object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
      compression: "none"

Soporte de códec para Newline

A continuación, se muestra un ejemplo de compatibilidad con códec para la nueva línea:

source:
    oci-object:
      acknowledgments: true
      codec:
        newline: null
      scan:
        start_time: 2024-06-10T00:00:00Z
        end_time: 2024-06-10T23:00:00Z
        buckets:
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "data-prepper-object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
        - bucket:
            namespace: "idee4xpu3dvm"
            name: "object-storage-testing"
            start_time: 2024-06-13T00:00:00Z
            end_time: 2024-06-13T23:00:00Z
      compression: "none"

Programación de opciones de ingesta sin recuento

A continuación, se muestra un ejemplo de programación de opciones de ingestión sin recuento:

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline: null
      scan:
        scheduling:
          interval: PT40S
        buckets:
          - bucket:
              namespace: idee4xpu3dvm
              name: data-prepper-object-storage-testing
      compression: none

Programación de opciones de ingesta con recuento

A continuación, se muestra un ejemplo de programación de opciones de ingesta con recuento:

simple-sample-pipeline:
  source:
    oci-object:
      codec:
        newline: null
      scan:
        scheduling:
          interval: PT40S
          count: 10
        buckets:
          - bucket:
              namespace: idee4xpu3dvm
              name: data-prepper-object-storage-testing
      compression: none

Programación de opciones de ingesta con hora de inicio

A continuación, se muestra un ejemplo de programación de opciones de ingesta con hora de inicio:


oci-object:
      codec:
        newline:
      scan:
        scheduling:
          interval: "PT40S"
          count: 10
        start_time: 2023-01-01T00:00:00Z
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
      compression: "none"

Programación de opciones de ingesta con hora de finalización

A continuación, se muestra un ejemplo de programación de opciones de ingesta con hora de finalización:

oci-object:
      codec:
        newline:
      scan:
        scheduling:
          interval: "PT40S"
          count: 10
        end_time: 2023-01-01T00:00:00Z
        buckets:
           - bucket:
               namespace: "idee4xpu3dvm"
               name: "data-prepper-object-storage-testing"
      compression: "none"

Kafka YAML

La fuente de Kafka no requiere ninguna coordinación de fuentes.

Para obtener información sobre todas las configuraciones disponibles para el origen de Kafka, acceda al siguiente enlace:

https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/sources/kafka/

Puede utilizar el servicio OCI Streaming como origen de Kafka para realizar la ingesta en el cluster OpenSearch. Para obtener información sobre cómo hacerlo, consulte Uso de API de Kafka.

OCI Streaming Public Access YAML

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
kafka-pipeline:
  source:
    kafka:
      bootstrap_servers:
        - <bootstrap_servers>
      topics:
        - name: <topic_name>
          group_id: <group_id>
      acknowledgments: true
      encryption:
        type: ssl
        insecure: false
      authentication:
        sasl:
          oci:
            stream_pool_id: <target-stream-pool-ocid>
  sink:
    - opensearch:
        hosts: [ <opensearch-cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>

Pipelines OCI Streaming Service - Acceso privado YAML

A continuación se muestra un ejemplo del YAML para los pipelines OpenSearch en el servicio OCI Streaming:

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
kafka-pipeline:
  source:
    kafka:
      bootstrap_servers:
        - <bootstrap_servers>
      topics:
        - name: <topic_name>
          group_id: <group_id>
      acknowledgments: true
      encryption:
        type: ssl
        insecure: false
      authentication:
        sasl:
          oci:
            stream_pool_id: <target-stream-pool-ocid>
  sink:
    - opensearch:
        hosts: [ <opensearch-cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>

Kafka YAML autogestionado

A continuación se muestra un ejemplo del YAML de Kafka autogestionado para OpenSearch:

version: 2
pipeline_configurations:
  oci:
    secrets:
      opensearch-username:
        secret_id: <secret-ocid>
      opensearch-password:
        secret_id: <secret-ocid>
      kafka-credentials:
        secret_id: <secret-ocid>
simple-sample-pipeline:
  source:
    kafka:
      bootstrap_servers:
        - "https://<bootstrap_server_fqdn>:9092"
      topics:
        - name: <topic_name>
          group_id: <group_id>
      acknowledgments: true
      encryption:
        type: ssl
        insecure: false
        certificate: <certificate-in-pem-format>
      authentication:
        sasl:
          plaintext:
            username: ${{oci_secrets:kafka-credentials:username}}
            password: ${{oci_secrets:kafka-credentials:password}}
  sink:
    - opensearch:
        hosts: [ <opensearch-cluster-ocid> ]
        username: ${{oci_secrets:opensearch-username}}
        password: ${{oci_secrets:opensearch-password}}
        insecure: false
        index: <index-name>