Action Spark sur les tâches Spark

Action Spark exécutée sur HDFS

Spark peut accéder au système de fichiers HDFS pour lire et écrire des données après le traitement. Voici un exemple de programme qui décrit le nombre de mots testFile.txt stockés dans /data/input/ dans HDFS. Cet exemple stocke le résultat dans /data/output/. Cet exemple de fichier est nommé pysparkWc.py.

from pyspark import SparkContext, SparkConf
appNameTEST ="my first working application"
conf = SparkConf().setAppName(appNameTEST)
sc = SparkContext(conf=conf)
text_file = sc.textFile("hdfs:///data/input/testFile.txt")
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs:///data/output/")

Voici le XML de flux de travail pour l'exécution de l'exemple de tâche. Le fichier pyspark précédent doit être placé au même emplacement que workflow.xml.

<workflow-app name="Spark Test" xmlns="uri:oozie:workflow:0.5">
    <start to="spark-ac92"/>
    <kill name="Kill">
        <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <action name="spark-ac92">
        <spark xmlns="uri:oozie:spark-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <master>yarn</master>
            <mode>client</mode>
            <name>MySpark</name>
            <jar>pysparkWc.py</jar>
            <file>pysparkWc.py#pysparkWc.py</file>
        </spark>
        <ok to="End"/>
        <error to="Kill"/>
    </action>
    <end name="End"/>
</workflow-app>

Action Spark exécutée sur Hive

Spark peut accéder aux données d'écriture Hive après traitement. Voici un exemple de programme qui décrit un exemple de création et de stockage de données dans la table des employés.

from pyspark.sql import SparkSession
 
 
spark=SparkSession.builder.appName("oozie-hive").enableHiveSupport().getOrCreate()
 
print("##################### Starting execution ##################")
create_query = "create table if not exists employee (int id,string name)"
spark.sql(create_query).show()
ins_query = "insert into employee values (1, 'John')"
print("query = " + ins_query)
spark.sql(ins_query).show()
ins_query = "insert into employee values (2, 'Mary')"
print("query = " + ins_query)
spark.sql(ins_query).show()
print("##################### Execution finished #################")

Il existe une distinction entre les grappes hautement disponibles et les grappes non hautement disponibles lors de l'accès à Hive à l'aide de Spark. La principale différence réside dans les protocoles d'autorisation et d'accès présents pour les grappes haute disponibilité. Nous avons besoin des autorisations hcat et Ranger pour accéder à Hive dans une grappe hautement disponible.

Pour les grappes non hautement disponibles, le fichier XML de flux de travail suivant suffit pour accéder à Hive pour le script précédent.

<workflow-app name="Spark-Hive-NHA" xmlns="uri:oozie:workflow:0.5">
    <start to="spark-ad44"/>
    <kill name="Kill">
        <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <action name="spark-ad44">
        <spark xmlns="uri:oozie:spark-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <master>yarn</master>
            <mode>client</mode>
            <name>MySpark</name>
            <jar>oozie-hive.py</jar>
            <file>oozie-hive.py#oozie-hive.py</file>
        </spark>
        <ok to="End"/>
        <error to="Kill"/>
    </action>
    <end name="End"/>
</workflow-app>

Pour les grappes hautement disponibles, vous devez fournir les données d'identification hcat pour le flux de travail. L'exemple suivant contient hcat.metastore.uriet hcat.metastore.principal pour terminer l'autorisation. Spark est intégré à Ranger. Pour utiliser les tables Hive de Spark, vous devez fournir le keytab (doit être présent dans le système de fichiers HDFS) de l'utilisateur pour lequel le programme est exécuté dans le cadre de spark-opts.

<workflow-app name="Spark-Hive-HA" xmlns="uri:oozie:workflow:0.5">
  <credentials>
    <credential name="hcat" type="hcat">
      <property>
        <name>hcat.metastore.uri</name>
        <value>thrift://training-cluster.bmbdcsad1.bmbdcs.oraclevcn.com:9083</value>
      </property>
      <property>
        <name>hcat.metastore.principal</name>
        <value>hive/training-cluster-un0.bmbdcsad1.bmbdcs.oraclevcn.com@BDSCLOUD.ORACLE.COM</value>
      </property>
    </credential>
  </credentials>
    <start to="spark-345a"/>
    <kill name="Kill">
        <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <action name="spark-345a" cred="hcat">
        <spark xmlns="uri:oozie:spark-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <master>yarn</master>
            <mode>client</mode>
            <name>oozie-hive</name>
            <jar>oozie-hive.py</jar>
              <spark-opts>--principal training/training-cluster-un0.bmbdcsad1.bmbdcs.oraclevcn.com@BDSCLOUD.ORACLE.COM --keytab training.service.keytab</spark-opts>
            <file>oozie-hive.py#oozie-hive.py</file>
            <file>training.service.keytab#training.service.keytab</file>
        </spark>
        <ok to="End"/>
        <error to="Kill"/>
    </action>
    <end name="End"/>
</workflow-app>

Action Spark exécutée sur le stockage d'objets

Dans le service de mégadonnées, Spark donne accès au stockage d'objets à l'aide d'une clé d'API créée pour la grappe. Voici un exemple pour le stockage d'objets, qui stocke le nombre de mots de fichier dans le stockage d'objets dans output.

from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("spark-object-store")
sc = SparkContext(conf=conf)
text_file = sc.textFile("hdfs:///data/input/testFile.txt")
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("oci://training-bucket@namesparce/output/")

Les paramètres de clé d'API suivants sont fournis en tant qu'argument dans spark-opts :

  • spark.hadoop.BDS_OSS_CLIENT_AUTH_USERID
  • spark.hadoop.BDS_OSS_CLIENT_AUTH_TENANTID
  • spark.hadoop.BDS_OSS_CLIENT_AUTH_FINGERPRINT
  • conf spark.hadoop.BDS_OSS_CLIENT_AUTH_PEMFILEPATH
  • spark.hadoop.BDS_OSS_CLIENT_REGION
  • spark.hadoop.BDS_OSS_CLIENT_AUTH_PASSPHRASE

Les détails de ces paramètres peuvent être obtenus à partir des détails de clé d'API de l'utilisateur et peuvent être alimentés dans le XML de flux de travail affiché ici :

<workflow-app name="Spark-Obj" xmlns="uri:oozie:workflow:0.5">
    <start to="spark-74ff"/>
    <kill name="Kill">
        <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <action name="spark-74ff">
        <spark xmlns="uri:oozie:spark-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <master>yarn</master>
            <mode>client</mode>
            <name>MySpark</name>
            <jar>spark-object-storage.py</jar>
              <spark-opts>--conf spark.hadoop.BDS_OSS_CLIENT_AUTH_USERID=<userOcid> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_TENANTID=<Tenancy ID> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_FINGERPRINT=<Fingerprint> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_PEMFILEPATH=<Location of Pem File> --conf spark.hadoop.BDS_OSS_CLIENT_REGION=<Region> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_PASSPHRASE=<PassPhrase></spark-opts>
            <file>spark-object-storage.py#spark-object-storage.py</file>
        </spark>
        <ok to="End"/>
        <error to="Kill"/>
    </action>
    <end name="End"/>
</workflow-app>

Ajout de bibliothèques d'utilisateurs pour l'exécution

Ajoutez des bibliothèques d'utilisateurs à l'aide des options Spark spark.driver.extraclasspath ou spark.executor.extraclasspath.