Sparkジョブに対するSparkアクション
HDFSに対して実行されるSparkアクション
Sparkは、処理後にデータの読取りおよび書込みを行うためにHDFSファイル・システムにアクセスできます。次に、サンプル・プログラムを示し、HDFSの/data/input/
に格納されているtestFile.txt
のワード数について説明します。このサンプルは、結果を/data/output/
に格納します。このサンプル・ファイルの名前はpysparkWc.py
です。
from pyspark import SparkContext, SparkConf
appNameTEST ="my first working application"
conf = SparkConf().setAppName(appNameTEST)
sc = SparkContext(conf=conf)
text_file = sc.textFile("hdfs:///data/input/testFile.txt")
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs:///data/output/")
サンプル・ジョブを実行するためのワークフローXMLを次に示します。前のpysparkファイルは、workflow.xml
と同じ場所に配置する必要があります。
<workflow-app name="Spark Test" xmlns="uri:oozie:workflow:0.5">
<start to="spark-ac92"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="spark-ac92">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn</master>
<mode>client</mode>
<name>MySpark</name>
<jar>pysparkWc.py</jar>
<file>pysparkWc.py#pysparkWc.py</file>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>
Hiveに対して実行されるSparkアクション
Sparkは、処理後にHive書込みデータにアクセスできます。従業員テーブルにデータを作成して保存する例を説明するサンプル プログラムを次に示します。
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("oozie-hive").enableHiveSupport().getOrCreate()
print("##################### Starting execution ##################")
create_query = "create table if not exists employee (int id,string name)"
spark.sql(create_query).show()
ins_query = "insert into employee values (1, 'John')"
print("query = " + ins_query)
spark.sql(ins_query).show()
ins_query = "insert into employee values (2, 'Mary')"
print("query = " + ins_query)
spark.sql(ins_query).show()
print("##################### Execution finished #################")
Sparkを使用してHiveにアクセスする場合、HAクラスタと非HAクラスタには違いがあります。主な違いは、HAクラスタに存在する認可プロトコルとアクセス・プロトコルです。HAクラスタ内のHiveにアクセスするには、hcatおよびRanger権限が必要です。
非HAクラスタの場合、次のワークフローXMLファイルは、前述のスクリプトのHiveにアクセスするのに十分です。
<workflow-app name="Spark-Hive-NHA" xmlns="uri:oozie:workflow:0.5">
<start to="spark-ad44"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="spark-ad44">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn</master>
<mode>client</mode>
<name>MySpark</name>
<jar>oozie-hive.py</jar>
<file>oozie-hive.py#oozie-hive.py</file>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>
HAクラスタの場合、ワークフローのhcat資格証明を指定する必要があります。次の例では、認可を完了するためのhcat.metastore.uri
およびhcat.metastore.principal
が含まれています。SparkはRangerと統合されています。SparkのHive表を使用するには、プログラムがspark-opts
の一部として実行されるユーザーのキータブ(HDFSファイル・システムに存在する必要があります)を指定する必要があります。
<workflow-app name="Spark-Hive-HA" xmlns="uri:oozie:workflow:0.5">
<credentials>
<credential name="hcat" type="hcat">
<property>
<name>hcat.metastore.uri</name>
<value>thrift://training-cluster.bmbdcsad1.bmbdcs.oraclevcn.com:9083</value>
</property>
<property>
<name>hcat.metastore.principal</name>
<value>hive/training-cluster-un0.bmbdcsad1.bmbdcs.oraclevcn.com@BDSCLOUD.ORACLE.COM</value>
</property>
</credential>
</credentials>
<start to="spark-345a"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="spark-345a" cred="hcat">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn</master>
<mode>client</mode>
<name>oozie-hive</name>
<jar>oozie-hive.py</jar>
<spark-opts>--principal training/training-cluster-un0.bmbdcsad1.bmbdcs.oraclevcn.com@BDSCLOUD.ORACLE.COM --keytab training.service.keytab</spark-opts>
<file>oozie-hive.py#oozie-hive.py</file>
<file>training.service.keytab#training.service.keytab</file>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>
オブジェクト・ストレージに対して実行されているSparkアクション
ビッグ・データ・サービスでは、Sparkはクラスタ用に作成されたAPIキーを使用してオブジェクト・ストレージへのアクセスを提供します。次に、オブジェクト・ストレージの例を示します。この例では、output
のオブジェクト・ストレージにファイルの単語数が格納されます。
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("spark-object-store")
sc = SparkContext(conf=conf)
text_file = sc.textFile("hdfs:///data/input/testFile.txt")
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("oci://training-bucket@namesparce/output/")
spark-opts
では、次のAPIキー・パラメータが引数として提供されます。
spark.hadoop.BDS_OSS_CLIENT_AUTH_USERID
spark.hadoop.BDS_OSS_CLIENT_AUTH_TENANTID
spark.hadoop.BDS_OSS_CLIENT_AUTH_FINGERPRINT
conf spark.hadoop.BDS_OSS_CLIENT_AUTH_PEMFILEPATH
spark.hadoop.BDS_OSS_CLIENT_REGION
spark.hadoop.BDS_OSS_CLIENT_AUTH_PASSPHRASE
これらのパラメータの詳細は、ユーザーのAPIキー詳細から取得でき、次に示すワークフローXMLに移入できます。
<workflow-app name="Spark-Obj" xmlns="uri:oozie:workflow:0.5">
<start to="spark-74ff"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="spark-74ff">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn</master>
<mode>client</mode>
<name>MySpark</name>
<jar>spark-object-storage.py</jar>
<spark-opts>--conf spark.hadoop.BDS_OSS_CLIENT_AUTH_USERID=<userOcid> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_TENANTID=<Tenancy ID> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_FINGERPRINT=<Fingerprint> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_PEMFILEPATH=<Location of Pem File> --conf spark.hadoop.BDS_OSS_CLIENT_REGION=<Region> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_PASSPHRASE=<PassPhrase></spark-opts>
<file>spark-object-storage.py#spark-object-storage.py</file>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>
実行用のユーザー・ライブラリの追加
Sparkオプションspark.driver.extraclasspath
またはspark.executor.extraclasspath
を使用して、ユーザー・ライブラリを追加します。