Azione Spark sui job Spark
Azione Spark in esecuzione su HDFS
Spark può accedere al file system HDFS per leggere e scrivere i dati dopo l'elaborazione. Di seguito è riportato un programma di esempio che descrive un conteggio di parole di testFile.txt
memorizzato in /data/input/
in HDFS. In questo esempio viene memorizzato il risultato in /data/output/
. Questo file di esempio è denominato pysparkWc.py
.
from pyspark import SparkContext, SparkConf
appNameTEST ="my first working application"
conf = SparkConf().setAppName(appNameTEST)
sc = SparkContext(conf=conf)
text_file = sc.textFile("hdfs:///data/input/testFile.txt")
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs:///data/output/")
Di seguito è riportato l'XML del workflow per l'esecuzione del job di esempio. Il file pyspark precedente deve essere posizionato nella stessa posizione di workflow.xml
.
<workflow-app name="Spark Test" xmlns="uri:oozie:workflow:0.5">
<start to="spark-ac92"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="spark-ac92">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn</master>
<mode>client</mode>
<name>MySpark</name>
<jar>pysparkWc.py</jar>
<file>pysparkWc.py#pysparkWc.py</file>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>
Azione Spark in esecuzione su Hive
Spark può accedere ai dati di scrittura Hive dopo l'elaborazione. Di seguito è riportato un programma di esempio che descrive un esempio per creare e memorizzare i dati nella tabella dei dipendenti.
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("oozie-hive").enableHiveSupport().getOrCreate()
print("##################### Starting execution ##################")
create_query = "create table if not exists employee (int id,string name)"
spark.sql(create_query).show()
ins_query = "insert into employee values (1, 'John')"
print("query = " + ins_query)
spark.sql(ins_query).show()
ins_query = "insert into employee values (2, 'Mary')"
print("query = " + ins_query)
spark.sql(ins_query).show()
print("##################### Execution finished #################")
Quando si accede a Hive utilizzando Spark, esiste una distinzione tra cluster HA e cluster non HA. La differenza principale è rappresentata dai protocolli di autorizzazione e accesso presenti per i cluster HA. Per accedere a Hive in un cluster HA sono necessarie le autorizzazioni hcat e Ranger.
Per i cluster non HA, il seguente file XML del workflow è sufficiente per accedere a Hive per lo script precedente.
<workflow-app name="Spark-Hive-NHA" xmlns="uri:oozie:workflow:0.5">
<start to="spark-ad44"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="spark-ad44">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn</master>
<mode>client</mode>
<name>MySpark</name>
<jar>oozie-hive.py</jar>
<file>oozie-hive.py#oozie-hive.py</file>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>
Per i cluster HA, è necessario fornire la credenziale hcat per il workflow. L'esempio seguente contiene hcat.metastore.uri
e hcat.metastore.principal
per completare l'autorizzazione. Spark è integrato con Ranger. Per utilizzare le tabelle Hive di Spark, è necessario fornire la tabella chiavi (deve essere presente nel file system HDFS) dell'utente che il programma viene eseguito nell'ambito di spark-opts
.
<workflow-app name="Spark-Hive-HA" xmlns="uri:oozie:workflow:0.5">
<credentials>
<credential name="hcat" type="hcat">
<property>
<name>hcat.metastore.uri</name>
<value>thrift://training-cluster.bmbdcsad1.bmbdcs.oraclevcn.com:9083</value>
</property>
<property>
<name>hcat.metastore.principal</name>
<value>hive/training-cluster-un0.bmbdcsad1.bmbdcs.oraclevcn.com@BDSCLOUD.ORACLE.COM</value>
</property>
</credential>
</credentials>
<start to="spark-345a"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="spark-345a" cred="hcat">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn</master>
<mode>client</mode>
<name>oozie-hive</name>
<jar>oozie-hive.py</jar>
<spark-opts>--principal training/training-cluster-un0.bmbdcsad1.bmbdcs.oraclevcn.com@BDSCLOUD.ORACLE.COM --keytab training.service.keytab</spark-opts>
<file>oozie-hive.py#oozie-hive.py</file>
<file>training.service.keytab#training.service.keytab</file>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>
Azione Spark in esecuzione nello storage degli oggetti
In Big Data Service, Spark fornisce l'accesso allo storage degli oggetti utilizzando una chiave API creata per il cluster. Di seguito è riportato un esempio di storage degli oggetti in cui viene memorizzato il conteggio delle parole dei file nello storage degli oggetti in output
.
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("spark-object-store")
sc = SparkContext(conf=conf)
text_file = sc.textFile("hdfs:///data/input/testFile.txt")
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("oci://training-bucket@namesparce/output/")
I seguenti parametri chiave API vengono forniti come argomento in spark-opts
:
spark.hadoop.BDS_OSS_CLIENT_AUTH_USERID
spark.hadoop.BDS_OSS_CLIENT_AUTH_TENANTID
spark.hadoop.BDS_OSS_CLIENT_AUTH_FINGERPRINT
conf spark.hadoop.BDS_OSS_CLIENT_AUTH_PEMFILEPATH
spark.hadoop.BDS_OSS_CLIENT_REGION
spark.hadoop.BDS_OSS_CLIENT_AUTH_PASSPHRASE
I dettagli di questi parametri possono essere ottenuti dai dettagli della chiave API dell'utente e possono essere inseriti nell'XML del flusso di lavoro mostrato qui:
<workflow-app name="Spark-Obj" xmlns="uri:oozie:workflow:0.5">
<start to="spark-74ff"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="spark-74ff">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn</master>
<mode>client</mode>
<name>MySpark</name>
<jar>spark-object-storage.py</jar>
<spark-opts>--conf spark.hadoop.BDS_OSS_CLIENT_AUTH_USERID=<userOcid> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_TENANTID=<Tenancy ID> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_FINGERPRINT=<Fingerprint> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_PEMFILEPATH=<Location of Pem File> --conf spark.hadoop.BDS_OSS_CLIENT_REGION=<Region> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_PASSPHRASE=<PassPhrase></spark-opts>
<file>spark-object-storage.py#spark-object-storage.py</file>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>
Aggiunta di librerie utente per l'esecuzione
Aggiungere librerie utente utilizzando le opzioni Spark spark.driver.extraclasspath
o spark.executor.extraclasspath
.