Sparkジョブに対するSparkアクション

HDFSに対して実行されるSparkアクション

Sparkは、処理後にデータの読取りおよび書込みを行うためにHDFSファイル・システムにアクセスできます。次に、サンプル・プログラムを示し、HDFSの/data/input/に格納されているtestFile.txtのワード数について説明します。このサンプルは、結果を/data/output/に格納します。このサンプル・ファイルの名前はpysparkWc.pyです。

from pyspark import SparkContext, SparkConf
appNameTEST ="my first working application"
conf = SparkConf().setAppName(appNameTEST)
sc = SparkContext(conf=conf)
text_file = sc.textFile("hdfs:///data/input/testFile.txt")
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs:///data/output/")

サンプル・ジョブを実行するためのワークフローXMLを次に示します。前のpysparkファイルは、workflow.xmlと同じ場所に配置する必要があります。

<workflow-app name="Spark Test" xmlns="uri:oozie:workflow:0.5">
    <start to="spark-ac92"/>
    <kill name="Kill">
        <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <action name="spark-ac92">
        <spark xmlns="uri:oozie:spark-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <master>yarn</master>
            <mode>client</mode>
            <name>MySpark</name>
            <jar>pysparkWc.py</jar>
            <file>pysparkWc.py#pysparkWc.py</file>
        </spark>
        <ok to="End"/>
        <error to="Kill"/>
    </action>
    <end name="End"/>
</workflow-app>

Hiveに対して実行されるSparkアクション

Sparkは、処理後にHive書込みデータにアクセスできます。従業員テーブルにデータを作成して保存する例を説明するサンプル プログラムを次に示します。

from pyspark.sql import SparkSession
 
 
spark=SparkSession.builder.appName("oozie-hive").enableHiveSupport().getOrCreate()
 
print("##################### Starting execution ##################")
create_query = "create table if not exists employee (int id,string name)"
spark.sql(create_query).show()
ins_query = "insert into employee values (1, 'John')"
print("query = " + ins_query)
spark.sql(ins_query).show()
ins_query = "insert into employee values (2, 'Mary')"
print("query = " + ins_query)
spark.sql(ins_query).show()
print("##################### Execution finished #################")

Sparkを使用してHiveにアクセスする場合、HAクラスタと非HAクラスタには違いがあります。主な違いは、HAクラスタに存在する認可プロトコルとアクセス・プロトコルです。HAクラスタ内のHiveにアクセスするには、hcatおよびRanger権限が必要です。

非HAクラスタの場合、次のワークフローXMLファイルは、前述のスクリプトのHiveにアクセスするのに十分です。

<workflow-app name="Spark-Hive-NHA" xmlns="uri:oozie:workflow:0.5">
    <start to="spark-ad44"/>
    <kill name="Kill">
        <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <action name="spark-ad44">
        <spark xmlns="uri:oozie:spark-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <master>yarn</master>
            <mode>client</mode>
            <name>MySpark</name>
            <jar>oozie-hive.py</jar>
            <file>oozie-hive.py#oozie-hive.py</file>
        </spark>
        <ok to="End"/>
        <error to="Kill"/>
    </action>
    <end name="End"/>
</workflow-app>

HAクラスタの場合、ワークフローのhcat資格証明を指定する必要があります。次の例では、認可を完了するためのhcat.metastore.uriおよびhcat.metastore.principalが含まれています。SparkはRangerと統合されています。SparkのHive表を使用するには、プログラムがspark-optsの一部として実行されるユーザーのキータブ(HDFSファイル・システムに存在する必要があります)を指定する必要があります。

<workflow-app name="Spark-Hive-HA" xmlns="uri:oozie:workflow:0.5">
  <credentials>
    <credential name="hcat" type="hcat">
      <property>
        <name>hcat.metastore.uri</name>
        <value>thrift://training-cluster.bmbdcsad1.bmbdcs.oraclevcn.com:9083</value>
      </property>
      <property>
        <name>hcat.metastore.principal</name>
        <value>hive/training-cluster-un0.bmbdcsad1.bmbdcs.oraclevcn.com@BDSCLOUD.ORACLE.COM</value>
      </property>
    </credential>
  </credentials>
    <start to="spark-345a"/>
    <kill name="Kill">
        <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <action name="spark-345a" cred="hcat">
        <spark xmlns="uri:oozie:spark-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <master>yarn</master>
            <mode>client</mode>
            <name>oozie-hive</name>
            <jar>oozie-hive.py</jar>
              <spark-opts>--principal training/training-cluster-un0.bmbdcsad1.bmbdcs.oraclevcn.com@BDSCLOUD.ORACLE.COM --keytab training.service.keytab</spark-opts>
            <file>oozie-hive.py#oozie-hive.py</file>
            <file>training.service.keytab#training.service.keytab</file>
        </spark>
        <ok to="End"/>
        <error to="Kill"/>
    </action>
    <end name="End"/>
</workflow-app>

オブジェクト・ストレージに対して実行されているSparkアクション

ビッグ・データ・サービスでは、Sparkはクラスタ用に作成されたAPIキーを使用してオブジェクト・ストレージへのアクセスを提供します。次に、オブジェクト・ストレージの例を示します。この例では、outputのオブジェクト・ストレージにファイルの単語数が格納されます。

from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("spark-object-store")
sc = SparkContext(conf=conf)
text_file = sc.textFile("hdfs:///data/input/testFile.txt")
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("oci://training-bucket@namesparce/output/")

spark-optsでは、次のAPIキー・パラメータが引数として提供されます。

  • spark.hadoop.BDS_OSS_CLIENT_AUTH_USERID
  • spark.hadoop.BDS_OSS_CLIENT_AUTH_TENANTID
  • spark.hadoop.BDS_OSS_CLIENT_AUTH_FINGERPRINT
  • conf spark.hadoop.BDS_OSS_CLIENT_AUTH_PEMFILEPATH
  • spark.hadoop.BDS_OSS_CLIENT_REGION
  • spark.hadoop.BDS_OSS_CLIENT_AUTH_PASSPHRASE

これらのパラメータの詳細は、ユーザーのAPIキー詳細から取得でき、次に示すワークフローXMLに移入できます。

<workflow-app name="Spark-Obj" xmlns="uri:oozie:workflow:0.5">
    <start to="spark-74ff"/>
    <kill name="Kill">
        <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <action name="spark-74ff">
        <spark xmlns="uri:oozie:spark-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <master>yarn</master>
            <mode>client</mode>
            <name>MySpark</name>
            <jar>spark-object-storage.py</jar>
              <spark-opts>--conf spark.hadoop.BDS_OSS_CLIENT_AUTH_USERID=<userOcid> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_TENANTID=<Tenancy ID> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_FINGERPRINT=<Fingerprint> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_PEMFILEPATH=<Location of Pem File> --conf spark.hadoop.BDS_OSS_CLIENT_REGION=<Region> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_PASSPHRASE=<PassPhrase></spark-opts>
            <file>spark-object-storage.py#spark-object-storage.py</file>
        </spark>
        <ok to="End"/>
        <error to="Kill"/>
    </action>
    <end name="End"/>
</workflow-app>

実行用のユーザー・ライブラリの追加

Sparkオプションspark.driver.extraclasspathまたはspark.executor.extraclasspathを使用して、ユーザー・ライブラリを追加します。