Acción de Spark en trabajos de Spark

Acción de Spark que se ejecuta en HDFS

Spark puede acceder al sistema de archivos de HDFS para leer y escribir datos después del procesamiento. A continuación se muestra un programa de ejemplo y se describe un recuento de palabras de testFile.txt almacenado en /data/input/ en HDFS. Este ejemplo almacena el resultado en /data/output/. Este archivo de ejemplo se denomina pysparkWc.py.

from pyspark import SparkContext, SparkConf
appNameTEST ="my first working application"
conf = SparkConf().setAppName(appNameTEST)
sc = SparkContext(conf=conf)
text_file = sc.textFile("hdfs:///data/input/testFile.txt")
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs:///data/output/")

El siguiente es el XML de flujo de trabajo para ejecutar el trabajo de ejemplo. El archivo pyspark anterior se debe colocar en la misma ubicación que workflow.xml.

<workflow-app name="Spark Test" xmlns="uri:oozie:workflow:0.5">
    <start to="spark-ac92"/>
    <kill name="Kill">
        <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <action name="spark-ac92">
        <spark xmlns="uri:oozie:spark-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <master>yarn</master>
            <mode>client</mode>
            <name>MySpark</name>
            <jar>pysparkWc.py</jar>
            <file>pysparkWc.py#pysparkWc.py</file>
        </spark>
        <ok to="End"/>
        <error to="Kill"/>
    </action>
    <end name="End"/>
</workflow-app>

Acción de Spark que se ejecuta en Hive

Spark puede acceder a los datos de escritura de Hive después del procesamiento. A continuación se muestra un programa de ejemplo que describe un ejemplo para crear y almacenar datos en la tabla de empleados.

from pyspark.sql import SparkSession
 
 
spark=SparkSession.builder.appName("oozie-hive").enableHiveSupport().getOrCreate()
 
print("##################### Starting execution ##################")
create_query = "create table if not exists employee (int id,string name)"
spark.sql(create_query).show()
ins_query = "insert into employee values (1, 'John')"
print("query = " + ins_query)
spark.sql(ins_query).show()
ins_query = "insert into employee values (2, 'Mary')"
print("query = " + ins_query)
spark.sql(ins_query).show()
print("##################### Execution finished #################")

Al acceder a Hive mediante Spark, hay una diferencia entre los clusters de alta disponibilidad y los clusters sin alta disponibilidad. La principal diferencia son los protocolos de autorización y acceso presentes para los clusters de alta disponibilidad. Necesitamos permisos de hcat y Ranger para acceder a Hive en un cluster de HA.

Para clusters sin HA, el siguiente archivo XML de flujo de trabajo es suficiente para acceder a Hive para la secuencia de comandos anterior.

<workflow-app name="Spark-Hive-NHA" xmlns="uri:oozie:workflow:0.5">
    <start to="spark-ad44"/>
    <kill name="Kill">
        <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <action name="spark-ad44">
        <spark xmlns="uri:oozie:spark-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <master>yarn</master>
            <mode>client</mode>
            <name>MySpark</name>
            <jar>oozie-hive.py</jar>
            <file>oozie-hive.py#oozie-hive.py</file>
        </spark>
        <ok to="End"/>
        <error to="Kill"/>
    </action>
    <end name="End"/>
</workflow-app>

Para los clusters de alta disponibilidad, debe proporcionar la credencial de hcat para el flujo de trabajo. El siguiente ejemplo contiene hcat.metastore.uriy hcat.metastore.principal para completar la autorización. Spark está integrado con Ranger. Para trabajar con tablas de Hive desde Spark, debe proporcionar el separador de claves (debe estar presente en el sistema de archivos HDFS) del usuario al que se ejecuta el programa como parte de spark-opts.

<workflow-app name="Spark-Hive-HA" xmlns="uri:oozie:workflow:0.5">
  <credentials>
    <credential name="hcat" type="hcat">
      <property>
        <name>hcat.metastore.uri</name>
        <value>thrift://training-cluster.bmbdcsad1.bmbdcs.oraclevcn.com:9083</value>
      </property>
      <property>
        <name>hcat.metastore.principal</name>
        <value>hive/training-cluster-un0.bmbdcsad1.bmbdcs.oraclevcn.com@BDSCLOUD.ORACLE.COM</value>
      </property>
    </credential>
  </credentials>
    <start to="spark-345a"/>
    <kill name="Kill">
        <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <action name="spark-345a" cred="hcat">
        <spark xmlns="uri:oozie:spark-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <master>yarn</master>
            <mode>client</mode>
            <name>oozie-hive</name>
            <jar>oozie-hive.py</jar>
              <spark-opts>--principal training/training-cluster-un0.bmbdcsad1.bmbdcs.oraclevcn.com@BDSCLOUD.ORACLE.COM --keytab training.service.keytab</spark-opts>
            <file>oozie-hive.py#oozie-hive.py</file>
            <file>training.service.keytab#training.service.keytab</file>
        </spark>
        <ok to="End"/>
        <error to="Kill"/>
    </action>
    <end name="End"/>
</workflow-app>

Acción de Spark que se ejecuta en Object Storage

En Big Data Service, Spark proporciona acceso al almacenamiento de objetos con una clave de API creada para el cluster. A continuación se muestra un ejemplo de Object Storage, que almacena el recuento de palabras del archivo en Object Storage en output.

from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("spark-object-store")
sc = SparkContext(conf=conf)
text_file = sc.textFile("hdfs:///data/input/testFile.txt")
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("oci://training-bucket@namesparce/output/")

Los siguientes parámetros de clave de API se proporcionan como argumento en spark-opts:

  • spark.hadoop.BDS_OSS_CLIENT_AUTH_USERID
  • spark.hadoop.BDS_OSS_CLIENT_AUTH_TENANTID
  • spark.hadoop.BDS_OSS_CLIENT_AUTH_FINGERPRINT
  • conf spark.hadoop.BDS_OSS_CLIENT_AUTH_PEMFILEPATH
  • spark.hadoop.BDS_OSS_CLIENT_REGION
  • spark.hadoop.BDS_OSS_CLIENT_AUTH_PASSPHRASE

Los detalles de estos parámetros se pueden obtener de los detalles de clave de API del usuario y se pueden rellenar en el XML de flujo de trabajo que se muestra aquí:

<workflow-app name="Spark-Obj" xmlns="uri:oozie:workflow:0.5">
    <start to="spark-74ff"/>
    <kill name="Kill">
        <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <action name="spark-74ff">
        <spark xmlns="uri:oozie:spark-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <master>yarn</master>
            <mode>client</mode>
            <name>MySpark</name>
            <jar>spark-object-storage.py</jar>
              <spark-opts>--conf spark.hadoop.BDS_OSS_CLIENT_AUTH_USERID=<userOcid> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_TENANTID=<Tenancy ID> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_FINGERPRINT=<Fingerprint> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_PEMFILEPATH=<Location of Pem File> --conf spark.hadoop.BDS_OSS_CLIENT_REGION=<Region> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_PASSPHRASE=<PassPhrase></spark-opts>
            <file>spark-object-storage.py#spark-object-storage.py</file>
        </spark>
        <ok to="End"/>
        <error to="Kill"/>
    </action>
    <end name="End"/>
</workflow-app>

Adición de Bibliotecas de Usuario para la Ejecución

Agregue bibliotecas de usuarios mediante las opciones spark.driver.extraclasspath o spark.executor.extraclasspath de Spark.