Configure el flujo de datos para procesar los logs locales en Oracle Cloud Infrastructure
Introducción
En este tutorial aprovisionará la infraestructura en la nube y desplegará el código PySpark para la carga, el procesamiento y el almacenamiento automáticos de los datos de log locales en la nube. Los datos se almacenan en una ubicación temporal de Object Storage, se procesan en Data Flow y se almacenan en Autonomous Data Warehouse para el análisis.
El siguiente diagrama muestra la arquitectura:
Descripción de la ilustración arquitectura-analyze-logs.png
Puede crear la aplicación PySpark y, a continuación, aprovisionar Oracle Cloud Infrastructure (OCI) mediante un script de Terraform diseñado para este fin. Puede descargar el script del repositorio de GitHub de Oracle y realizar ediciones menores para actualizar algunos valores de variables. A continuación, aplique el script de Terraform.
Objetivos
Al finalizar este tutorial, dispondrá de un sistema de trabajo que ingiere datos de Object Storage, los procesa en Data Flow mediante una aplicación PySpark y almacena los resultados en Autonomous Data Warehouse.
La aplicación PySpark que se proporciona aquí demuestra cómo autenticar y proteger las cargas de trabajo y cómo hacerlo listo para la inserción en la base de datos. Tendrá que personalizarla para satisfacer sus necesidades específicas de procesamiento de datos.
Requisitos
Antes de iniciar este tutorial, debe completar el tutorial Conectar aplicaciones de Data Flow PySpark a Autonomous Database en Oracle Cloud Infrastructure. En ese tutorial se muestra cómo crear un archivo de dependencia que contenga las bibliotecas Python y Java necesarias para este tutorial. En concreto, necesita el archivo archive.zip
de ese tutorial.
Para completar este tutorial, debe tener acceso a un arrendamiento en Oracle Cloud Infrastructure con permisos para crear los artefactos que se muestran en el diagrama de arquitectura.
En este tutorial, descargará el código de Terraform que crea los artefactos como se describe en el diagrama de arquitectura. El código está en un repositorio en GitHub. Puede ejecutar el código Terraform en la máquina de desarrollo local o puede ejecutarlo desde Cloud Shell en Oracle Cloud Infrastructure. Cloud Shell tiene Terraform y Git instalados.
Si utiliza la máquina local, se aplican los siguientes requisitos:
- Git instalado en el equipo de desarrollo. Se utiliza para clonar el repositorio de código de Terraform.
- Terraform instalado en la computadora de desarrollo. Se utiliza para crear la infraestructura en OCI.
- Una conexión de red a su arrendamiento de OCI.
Detalles de conexión de ADW
Para conectarse a una base de datos de ADW, debe transferir una cartera como parte de los parámetros de conexión. La cartera contiene credenciales y otros detalles que permiten al portador conectarse a la base de datos. El nombre de archivo de la cartera suele tener el formato wallet_<database-name>.zip
, pero puede tener un nombre diferente. La cartera proporciona una conexión a la base de datos, pero los permisos de la base de datos están determinados por el nombre de usuario y la contraseña de la base de datos.
El Terraform que acompaña a este tutorial crea una base de datos de Autonomous Data Warehouse Siempre gratis denominada logs. Almacena la cartera de la base de datos en dos lugares. Uno está dentro de un cubo en el almacenamiento de objetos denominado cartera y el otro está en el directorio de la computadora en el que se ejecutó el código de Terraform. En ambos casos, el nombre del archivo de cartera es wallet_logs.zip
.
Terraform genera la contraseña ADMIN cuando crea la base de datos y la muestra cuando se completa.
Prepare su entorno de Oracle Cloud Infrastructure
Antes de crear la aplicación PySpark y ejecutar Terraform, debe realizar alguna configuración preliminar de su arrendamiento de OCI.
Debe crear un compartimento y un grupo de usuarios dedicados a la aplicación Data Flow. Esto le permite aislar el flujo de datos de otros usuarios del arrendamiento.
Terraform puede crear el compartimento y los grupos, pero en este tutorial los creará usted mismo. Esto le permitirá practicar la navegación y el uso de la consola de OCI.
Crear un compartimento
Cree un compartimento que se utilice únicamente para contener los artefactos, la configuración y otros recursos relacionados con el pipeline.
- Conéctese a la consola de Oracle Cloud Infrastructure como usuario con privilegios de administrador.
- En el menú de navegación Consola, seleccione Identidad y, a continuación, haga clic en Compartimento.
- Haga clic en Crear compartimento.
- Introduzca
Dataflow
como nombre del nuevo compartimento, introduzca una descripción adecuada y asegúrese de que Compartimento principal es el compartimento raíz. - Haga clic en Crear compartimento.
- Una vez creado el compartimento, realice un registro de su OCID. Necesitará el OCID más tarde cuando modifique el archivo
terraform.tfvars
.
Crear Grupos
Debe crear dos grupos. Un grupo contiene usuarios que pueden gestionar Data Flow y el otro contiene usuarios que pueden utilizar Data Flow pero no pueden gestionarlo.
- Cree un grupo.
- En el menú de navegación de la consola, seleccione Identidad y haga clic en Grupos.
- Haga clic en Crear Grupo.
- Introduzca
Dataflow_Admin_Group
como nombre del grupo, introduzca una descripción adecuada y, a continuación, haga clic en Crear.
- Repita el paso anterior para crear otro grupo denominado
Dataflow_User_Group
. - Agregue el usuario a Dataflow_Admin_Group. Haga clic en Dataflow_Admin_Group y, a continuación, en Agregar usuario a grupo y seleccione el usuario de la lista.
Registrar el espacio de nombres de Object Storage
- Haga clic en el icono de perfil de usuario y, en el menú que se abre, haga clic en el nombre de su arrendamiento.
- En la página que se abre, busque el valor de Espacio de nombres de almacenamiento de objetos y realice una copia para su uso más adelante.
Generar claves de API
Terraform utiliza la API de OCI para crear y gestionar la infraestructura en OCI. Para ello, debe tener la clave pública y la huella de un par de claves de API.
Para generar su propio par de claves públicas/privadas:
- Haga clic en el icono de perfil de usuario y, en el menú que se abre, haga clic en Configuración de usuario.
- En la página Detalles de usuario, busque en la sección Recursos y haga clic en Claves de API.
- Haga clic en Agregar clave de API.
- En el panel Agregar clave de API que se abre, haga clic en Descargar clave privada.
- En la vista previa del archivo de configuración, copie el contenido del cuadro de texto y guárdelo en una ubicación conveniente. Necesitará esta información más adelante al configurar Terraform.
- Haga clic en Cerrar.
- Cambie el nombre del archivo de clave privada que se descargó. Utilice un nombre simple como
oci_api_key.pem
.
Agregar la contraseña del administrador de la base de datos al almacén
Debe crear un secreto de almacén para la contraseña ADMIN de la base de datos y registrar el OCID. Necesita esto para la constante PASSWORD_SECRET_OCID en la aplicación PySpark que cree más adelante.
Por ahora, introduzca una contraseña falsa en el secreto del almacén. Esto se debe a que todavía no sabe cuál es la contraseña. No sabrá la contraseña hasta que finalice Terraform. Sin embargo, necesita el OCID del secreto para la aplicación PySpark que Terraform carga en Object Storage. Por lo tanto, primero debe crear un secreto, registrar su OCID y, después de que finalice Terraform, actualizará el secreto con la contraseña real.
- Vaya a Seguridad y haga clic en Almacén.
- En la sección Ámbito de lista, asegúrese de que está en el compartimento Dataflow.
- Haga clic en Crear almacén.
- En el panel que se abre, introduzca Dataflow en el campo Nombre.
- Haga clic en Crear almacén.
- Cuando el estado del almacén sea Activo, haga clic en Flujo de datos para abrir la página Detalles de almacén.
- En la sección Cifrado maestro, haga clic en Crear clave.
- En el panel Crear clave, introduzca Dataflow en el campo Nombre.
- Haga clic en Crear clave.
- En la sección Recursos, haga clic en Secretos.
- Haga clic en Crear secreto.
- En el cuadro de diálogo Crear secreto, seleccione un compartimento de la lista Crear en compartimento.
- En el campo Nombre, introduzca un nombre para identificar el secreto. Evite introducir información confidencial.
- En el campo Descripción, introduzca una breve descripción del secreto como ayuda para identificarlo. Evite introducir información confidencial.
- En el campo Clave de cifrado, seleccione Flujo de datos.
- En el campo Plantilla de tipo de secreto, seleccione Texto sin formato.
- En el campo Contenido secreto, introduzca una contraseña falsa.
- Haga clic en Crear secreto.
- Cuando se cierre el panel, haga clic en el secreto de flujo de datos para abrir la página de detalles y copiar el OCID.
Crear la aplicación PySpark
La aplicación PySpark crea una sesión de Spark que lee los datos del log de Object Storage, los transforma en un marco de datos y, a continuación, almacena el marco de datos en una tabla de ADW.
Antes de crear la sesión de Spark, debe asegurarse de que se importan algunos módulos importantes y definir algunas constantes para su uso más adelante en la aplicación.
-
Utilice las siguientes sentencias para importar los módulos relevantes.
import os import oci import base64 import zipfile from urllib.parse import urlparse from pyspark import SparkConf from pyspark.sql import SparkSession from pyspark.sql.functions import regexp_extract
-
Estas son las constantes que debe definir en la función
main()
.Asegúrese de introducir sus propios valores para OBJECT_STORAGE_NAMESPACE y PASSWORD_SECRET_OCID.
OBJECT_STORAGE_NAMESPACE = "YOUR-OBJECT-STORAGE-NAMESPACE" INPUT_PATH = "oci://data@{}/sample_logs.log".format(OBJECT_STORAGE_NAMESPACE) DATABASE_NAME = "logs" PASSWORD_SECRET_OCID = "ocid1.vaultsecret..... " WALLET_PATH = "oci://Wallet@{}/wallet_{}.zip".format(OBJECT_STORAGE_NAMESPACE,DATABASE_NAME) TNS_NAME = "{}_high".format(DATABASE_NAME) USER="ADMIN" TARGET_DATABASE_TABLE="processed_logs"
-
Configure la sesión de Spark y cargue los datos del almacenamiento de objetos. Los datos tienen la forma de un archivo de texto plano, que se carga en un marco de datos de Spark.
spark_session = SparkSession.builder.appName("Dataflow").getOrCreate() input_df = spark_session.read.text(INPUT_PATH) input_df.show(5, truncate=False) # Some output for the Data Flow log file
El marco de datos debe ser similar al siguiente ejemplo:
+----------------------------------------------------------------------------------+ |value | +----------------------------------------------------------------------------------+ |10.0.0.1 - user1 [10/Mar/2021:13:55:36 -0700] "GET /index.html HTTP/1.0" 200 2326 | |10.0.0.2 - user2 [10/Mar/2021:14:55:36 -0700] "GET /file1.html HTTP/1.0" 200 9889 | |10.0.0.3 - user3 [10/Mar/2021:14:55:37 -0700] "GET /file2.html HTTP/1.0" 200 4242 | |10.0.0.4 - user4 [10/Mar/2021:14:56:36 -0700] "GET /file3.html HTTP/1.0" 200 10267| |10.0.0.1 - user1 [10/Mar/2021:15:05:36 -0700] "GET /file4.html HTTP/1.0" 200 15678| +----------------------------------------------------------------------------------+
Como puede ver, los datos constan de una sola columna denominada
value
que contiene cada entrada de log como una sola cadena. Para poder transferir los datos a la base de datos, el marco de datos debe tener varias columnas, una para cada campo del archivo log. -
Divida el marco de datos en columnas.
En el siguiente ejemplo, el campo user-identifier (normalmente "-") no se incluye en el nuevo marco de datos.
hostname = r'(^\S+\.[\S+\.]+\S+)\s' user = r'\s+.+\s+(.+)\s+\[' timestamp = r'\[(\d{2}/\w{3}/\d{4}:\d{2}:\d{2}:\d{2} -\d{4})]' method_uri_protocol = r'\"(\S+)\s(\S+)\s*(\S*)\"' status = r'\s(\d{3})\s' content_size = r'\s(\d+)$' logdata_df = input_df.select( regexp_extract('value', hostname, 1).alias('hostname'), regexp_extract('value', user, 1).alias('user'), regexp_extract('value', timestamp, 1).alias('timestamp'), regexp_extract('value', method_uri_protocol, 1).alias('method'), regexp_extract('value', method_uri_protocol, 2).alias('URI'), regexp_extract('value', method_uri_protocol, 3).alias('protocol'), regexp_extract('value', status, 1).cast('integer').alias('status'), regexp_extract('value', content_size, 1).cast('integer').alias('content_size'))
Este es también el lugar donde puede aplicar uno o más modelos de aprendizaje automático. Por ejemplo, puede que desee ejecutar una carga de trabajo de detección de intrusiones antes de almacenar los logs en la base de datos.
-
Recupere las credenciales para la base de datos de Autonomous Data Warehouse.
El código de Terraform que acompaña a este tutorial crea el almacén de datos de Autonomus y almacena el archivo de cartera en Object Storage. La aplicación PySpark necesita recuperar la cartera y utilizar su información para conectarse a ADW.
# Get an Object Store client token_key = "spark.hadoop.fs.oci.client.auth.delegationTokenPath" token_path = spark_session.sparkContext.getConf().get(token_key) with open(token_path) as fd: delegation_token = fd.read() signer = oci.auth.signers.InstancePrincipalsDelegationTokenSigner( delegation_token=delegation_token) object_store_client = oci.object_storage.ObjectStorageClient(config={}, signer=signer) # Extract the wallet file location info split_url = urlparse(WALLET_PATH) bucket_name, namespace = split_url.netloc.split("@") file_name = split_url.path[1:] # Get the wallet from Object Storage. # The response contains the wallet and some metadata response = object_store_client.get_object(namespace, bucket_name, file_name) # Extract the wallet from response and store it in the Spark work-dir wallet_path = "/opt/spark/work-dir/" zip_file_path = os.path.join(wallet_path, "temp.zip") with open(zip_file_path, "wb") as fd: for chunk in response.data.raw.stream(1024 * 1024, decode_content=False): fd.write(chunk) with zipfile.ZipFile(zip_file_path, "r") as zip_ref: zip_ref.extractall(wallet_path) # Extract the wallet contents and add the files to the Spark context contents = "cwallet.sso ewallet.p12 keystore.jks ojdbc.properties sqlnet.ora tnsnames.ora truststore.jks".split() for file in contents: spark.sparkContext.addFile(os.path.join(wallet_path, file))
-
Almacene los datos en la base de datos Autonomous Data Warehouse.
El siguiente fragmento muestra cómo conectarse a ADW y escribir el marco de datos en la tabla de destino. Si la tabla no existe, se crea. Si la tabla existe, se borra y se vuelve a crear, debido a la opción "overwrite" en el parámetro de modo.
La contraseña se recupera de Oracle Cloud Infrastructure Vault. Coloque la contraseña en el almacén después de ejecutar el código de Terraform. La colocación de la contraseña en Vault se trata más adelante en los pasos posteriores a la ejecución de Terraform.
adw_url = "jdbc:oracle:thin:@{}?TNS_ADMIN={}".format(TNS_NAME, wallet_path) # Retrieve the database password for user 'ADMIN' from OCI Vault # The password is stored as base64 text, so it must be decoded with open(token_path) as fd: delegation_token = fd.read() signer = oci.auth.signers.InstancePrincipalsDelegationTokenSigner( delegation_token=delegation_token) secrets_client = oci.secrets.SecretsClient(config={}, signer=signer) response = secrets_client.get_secret_bundle(PASSWORD_SECRET_OCID) base64_password = response.data.secret_bundle_content.content base64_secret_bytes = base64_password.encode("ascii") base64_message_bytes = base64.b64decode(base64_secret_bytes) password = base64_message_bytes.decode("ascii") # Set up some properties for connecting to the database properties = { "driver": "oracle.jdbc.driver.OracleDriver", "oracle.net.tns_admin": TNS_NAME, "password": password, "user": USER, } # Write the dataframe to the database adw_url = "jdbc:oracle:thin:@{}?TNS_ADMIN={}".format(TNS_NAME, wallet_path) logdata_df.write.jdbc(url=adw_url, table=TARGET_DATABASE_TABLE, mode="Overwrite", properties=properties)
Guarde la aplicación PySpark con el nombre dataflow-app.py
.
Descargar y aplicar el código de Terraform
El código de Terraform está disponible en el repositorio de inicio rápido de Oracle en GitHub. El código aprovisiona los componentes que se muestran en el diagrama de arquitectura.
Para descargar y aplicar el código:
-
Cambie a un directorio conveniente y clone el repositorio.
git clone https://github.com/oracle-quickstart/oci-arch-dataflow-store-analyze-data
-
Copie el archivo de dependencia
archive.zip
en el directoriooci-arch-dataflow-store-analyze-data
.Nota: si no tiene el archivo
archive.zip
, consulte la sección Requisitos para ver cómo obtenerlo. -
Cambie al directorio
oci-arch-dataflow-store-analyze-data
.cd oci-arch-dataflow-store-analyze-data
-
Busque el archivo de aplicación PySpark
dataflow-app.py
, que es un archivo stub y sustitúyalo por el que escribió. -
Abra el archivo
terraform.tfvars
en un editor de texto. -
Actualice el valor de object_storage_namespace en
terraform.tfvars
. Utilice el valor registrado anteriormente. -
Modifique
env-vars.bat
oenv-vars.sh
según el sistema operativo del equipo que esté utilizando. Estos scripts se utilizan para definir las variables de entorno que utiliza Terraform.En Windows, el archivo
env-vars.bat
debe ser similar al siguiente ejemplo. Asegúrese de pegar los valores de su propio arrendamiento.@echo off set TF_VAR_tenancy_ocid=ocid1.tenancy.oc1.. set TF_VAR_user_ocid=ocid1.user.oc1.. set TF_VAR_compartment_ocid=ocid1.compartment.oc1.. set TF_VAR_private_key_path=%HOMEPATH%\.ssh\oci_api_key.pem set TF_VAR_fingerprint=2a:b4:ef:9c:f1... set TF_VAR_region=ca-toronto-1
En Linux y Mac, el archivo
env-vars.sh
debe ser similar al siguiente ejemplo. Asegúrese de pegar los valores de su propio arrendamiento.export TF_VAR_tenancy_ocid=ocid1.tenancy.oc1.. export TF_VAR_user_ocid=ocid1.user.oc1.. export TF_VAR_compartment_ocid=ocid1.compartment.oc1.. export TF_VAR_private_key_path=~/.ssh/oci_api_key.pem export TF_VAR_fingerprint=2a:b4:ef:9c:f1... export TF_VAR_region=ca-toronto-1
Después de actualizar los archivos con su propio arrendamiento e información de usuario, asegúrese de que se han definido las variables de entorno.
En Windows, ejecute el siguiente comando:
env-vars.bat
En Linux o Mac, ejecute el siguiente comando:
source env-vars.sh
-
En un terminal, ejecute el comando init.
terraform init
-
Aprovisione los recursos a Oracle Infrastructure Cloud.
terraform apply
Cuando haya terminado, la aplicación Terraform muestra la contraseña ADMIN para la base de datos que ha creado.
-
Copie la contraseña que muestra la aplicación Terraform.
-
Verifique que se hayan aprovisionado los recursos.
- Vaya a la consola de OCI y abra el menú de navegación.
- Desplácese hacia abajo hasta la sección Gobernanza y administración, amplíe Gobernanza y, a continuación, haga clic en Explorador de arrendamiento.
- Seleccione el compartimento Dataflow que ha creado anteriormente.
Tras unos segundos, los recursos se muestran en una tabla que puede filtrar y ordenar.
Actualizar la contraseña ADMIN en el almacén
Anteriormente, ha agregado una contraseña falsa a OCI Vault para obtener su OCID para su uso en la aplicación PySpark que se ejecuta en Data Flow. Ahora debe actualizar esa contraseña al valor real. El valor real se mostró en el terminal cuando se completó el comando de aplicación de Terraform.
- En el menú de navegación de la consola, seleccione Seguridad y haga clic en Almacén.
- Haga clic en el nombre del almacén creado anteriormente.
- En la página Detalles de almacén que se abre, busque en la sección Recursos y haga clic en Secretos.
- Haga clic en el nombre del secreto que creó anteriormente.
- Haga clic en Crear versión del secreto.
- Asegúrese de que la plantilla de tipo de secreto esté definida en Texto sin formato y pegue la contraseña ADMIN de la base de datos en el cuadro Contenido del secreto.
- Haga clic en Crear versión del secreto.
Ejecutar la aplicación de Data Flow
Ha llegado el momento de ejecutar la aplicación PySpark.
- En el menú de navegación de la consola, seleccione Data Flow y haga clic en Aplicaciones.
- Haga clic en Analizar logs locales para abrir la página Detalles de la aplicación.
- Haga clic en Ejecutar para abrir el panel Ejecutar aplicación Python.
- Haga clic en Ejecutar.
Se abre la página Ejecuciones de Data Flow donde puede supervisar el progreso de la ejecución. Se tardan unos minutos en asignar los recursos y poner todo en marcha. Cuando finalice la ejecución, haga clic en el nombre de la ejecución para abrir la página Detalles de ejecución.
En la página Run Details, puede descargar y examinar los logs que se han producido. Si la ejecución ha fallado por cualquier motivo, puede examinar los logs para determinar el origen del problema.
Agradecimientos
- Autor: Jeff Schering (desarrollador de asistencia al usuario)
- Colaboradores: Prashant Jha (Director Product Management, OCI Big Data Development)
Más información
Explore otras prácticas en docs.oracle.com/learn o acceda a contenido de aprendizaje más gratuito en el canal YouTube de Oracle Learning. Además, visite education.oracle.com/learning-explorer para convertirse en un explorador de formación de Oracle.
Para obtener documentación sobre los productos, visite Oracle Help Center.
Más recursos de aprendizaje
Explore otras prácticas en docs.oracle.com/learn o acceda a contenido de aprendizaje más gratuito en el canal YouTube de Oracle Learning. Además, visite education.oracle.com/learning-explorer para convertirse en un explorador de formación de Oracle.
Para obtener documentación sobre los productos, visite Oracle Help Center.
Store and analyze your on-premises logs in Oracle Cloud Infrastructure
F52100-01
January 2022
Copyright © 2022, Oracle and/or its affiliates.