Action Spark sur les travaux Spark

Action Spark exécutée sur HDFS

Spark peut accéder au système de fichiers HDFS pour lire et écrire des données après le traitement. Voici un exemple de programme qui décrit le nombre de mots de testFile.txt stockés dans /data/input/ dans HDFS. Cet exemple stocke le résultat dans /data/output/. Cet exemple de fichier est nommé pysparkWc.py.

from pyspark import SparkContext, SparkConf
appNameTEST ="my first working application"
conf = SparkConf().setAppName(appNameTEST)
sc = SparkContext(conf=conf)
text_file = sc.textFile("hdfs:///data/input/testFile.txt")
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs:///data/output/")

Le code XML de workflow suivant permet d'exécuter l'exemple de travail. Le fichier pyspark précédent doit être placé au même emplacement que workflow.xml.

<workflow-app name="Spark Test" xmlns="uri:oozie:workflow:0.5">
    <start to="spark-ac92"/>
    <kill name="Kill">
        <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <action name="spark-ac92">
        <spark xmlns="uri:oozie:spark-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <master>yarn</master>
            <mode>client</mode>
            <name>MySpark</name>
            <jar>pysparkWc.py</jar>
            <file>pysparkWc.py#pysparkWc.py</file>
        </spark>
        <ok to="End"/>
        <error to="Kill"/>
    </action>
    <end name="End"/>
</workflow-app>

Action Spark exécutée sur Hive

Spark peut accéder aux données d'écriture Hive après traitement. Voici un exemple de programme qui décrit un exemple de création et de stockage de données dans la table des employés.

from pyspark.sql import SparkSession
 
 
spark=SparkSession.builder.appName("oozie-hive").enableHiveSupport().getOrCreate()
 
print("##################### Starting execution ##################")
create_query = "create table if not exists employee (int id,string name)"
spark.sql(create_query).show()
ins_query = "insert into employee values (1, 'John')"
print("query = " + ins_query)
spark.sql(ins_query).show()
ins_query = "insert into employee values (2, 'Mary')"
print("query = " + ins_query)
spark.sql(ins_query).show()
print("##################### Execution finished #################")

Il existe une distinction entre les clusters HA et les clusters non HA lors de l'accès à Hive à l'aide de Spark. La principale différence réside dans les protocoles d'autorisation et d'accès présents pour les clusters HA. Nous avons besoin des autorisations hcat et Ranger pour accéder à Hive dans un cluster HA.

Pour les clusters non hautement disponibles, le fichier XML de workflow suivant suffit pour accéder à Hive pour le script précédent.

<workflow-app name="Spark-Hive-NHA" xmlns="uri:oozie:workflow:0.5">
    <start to="spark-ad44"/>
    <kill name="Kill">
        <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <action name="spark-ad44">
        <spark xmlns="uri:oozie:spark-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <master>yarn</master>
            <mode>client</mode>
            <name>MySpark</name>
            <jar>oozie-hive.py</jar>
            <file>oozie-hive.py#oozie-hive.py</file>
        </spark>
        <ok to="End"/>
        <error to="Kill"/>
    </action>
    <end name="End"/>
</workflow-app>

Pour les clusters HA, vous devez fournir les informations d'identification hcat pour le workflow. L'exemple suivant contient hcat.metastore.uri et hcat.metastore.principal pour terminer l'autorisation. Spark est intégré à Ranger. Pour utiliser des tables Hive à partir de Spark, vous devez indiquer le fichier keytab (qui doit être présent dans le système de fichiers HDFS) de l'utilisateur pour lequel le programme est exécuté dans le cadre de spark-opts.

<workflow-app name="Spark-Hive-HA" xmlns="uri:oozie:workflow:0.5">
  <credentials>
    <credential name="hcat" type="hcat">
      <property>
        <name>hcat.metastore.uri</name>
        <value>thrift://training-cluster.bmbdcsad1.bmbdcs.oraclevcn.com:9083</value>
      </property>
      <property>
        <name>hcat.metastore.principal</name>
        <value>hive/training-cluster-un0.bmbdcsad1.bmbdcs.oraclevcn.com@BDSCLOUD.ORACLE.COM</value>
      </property>
    </credential>
  </credentials>
    <start to="spark-345a"/>
    <kill name="Kill">
        <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <action name="spark-345a" cred="hcat">
        <spark xmlns="uri:oozie:spark-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <master>yarn</master>
            <mode>client</mode>
            <name>oozie-hive</name>
            <jar>oozie-hive.py</jar>
              <spark-opts>--principal training/training-cluster-un0.bmbdcsad1.bmbdcs.oraclevcn.com@BDSCLOUD.ORACLE.COM --keytab training.service.keytab</spark-opts>
            <file>oozie-hive.py#oozie-hive.py</file>
            <file>training.service.keytab#training.service.keytab</file>
        </spark>
        <ok to="End"/>
        <error to="Kill"/>
    </action>
    <end name="End"/>
</workflow-app>

Action Spark exécutée sur Object Storage

Dans Big Data Service, Spark fournit l'accès à Object Storage à l'aide d'une clé d'API créée pour le cluster. Voici un exemple pour Object Storage, qui stocke le nombre de mots de fichiers dans Object Storage dans output.

from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("spark-object-store")
sc = SparkContext(conf=conf)
text_file = sc.textFile("hdfs:///data/input/testFile.txt")
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("oci://training-bucket@namesparce/output/")

Les paramètres de clé d'API suivants sont fournis en tant qu'argument dans spark-opts :

  • spark.hadoop.BDS_OSS_CLIENT_AUTH_USERID
  • spark.hadoop.BDS_OSS_CLIENT_AUTH_TENANTID
  • spark.hadoop.BDS_OSS_CLIENT_AUTH_FINGERPRINT
  • conf spark.hadoop.BDS_OSS_CLIENT_AUTH_PEMFILEPATH
  • spark.hadoop.BDS_OSS_CLIENT_REGION
  • spark.hadoop.BDS_OSS_CLIENT_AUTH_PASSPHRASE

Les détails de ces paramètres peuvent être obtenus à partir des détails de clé d'API de l'utilisateur et peuvent être renseignés dans le XML de workflow affiché ici :

<workflow-app name="Spark-Obj" xmlns="uri:oozie:workflow:0.5">
    <start to="spark-74ff"/>
    <kill name="Kill">
        <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <action name="spark-74ff">
        <spark xmlns="uri:oozie:spark-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <master>yarn</master>
            <mode>client</mode>
            <name>MySpark</name>
            <jar>spark-object-storage.py</jar>
              <spark-opts>--conf spark.hadoop.BDS_OSS_CLIENT_AUTH_USERID=<userOcid> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_TENANTID=<Tenancy ID> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_FINGERPRINT=<Fingerprint> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_PEMFILEPATH=<Location of Pem File> --conf spark.hadoop.BDS_OSS_CLIENT_REGION=<Region> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_PASSPHRASE=<PassPhrase></spark-opts>
            <file>spark-object-storage.py#spark-object-storage.py</file>
        </spark>
        <ok to="End"/>
        <error to="Kill"/>
    </action>
    <end name="End"/>
</workflow-app>

Ajouter des bibliothèques d'utilisateurs pour l'exécution

Ajoutez des bibliothèques utilisateur à l'aide des options Spark spark.driver.extraclasspath ou spark.executor.extraclasspath.