Ação do Spark em Jobs do Spark

Ação do Spark em Execução no HDFS

O Spark pode acessar o sistema de arquivos HDFS para ler e gravar dados após o processamento. Veja a seguir um programa de amostra e descreve uma contagem de palavras de testFile.txt armazenadas em /data/input/ no HDFS. Esta amostra armazena o resultado em /data/output/. Esse arquivo de amostra é nomeado como pysparkWc.py.

from pyspark import SparkContext, SparkConf
appNameTEST ="my first working application"
conf = SparkConf().setAppName(appNameTEST)
sc = SparkContext(conf=conf)
text_file = sc.textFile("hdfs:///data/input/testFile.txt")
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs:///data/output/")

Veja a seguir o XML do workflow para executar o job de amostra. O arquivo pyspark anterior deve ser colocado no mesmo local que workflow.xml.

<workflow-app name="Spark Test" xmlns="uri:oozie:workflow:0.5">
    <start to="spark-ac92"/>
    <kill name="Kill">
        <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <action name="spark-ac92">
        <spark xmlns="uri:oozie:spark-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <master>yarn</master>
            <mode>client</mode>
            <name>MySpark</name>
            <jar>pysparkWc.py</jar>
            <file>pysparkWc.py#pysparkWc.py</file>
        </spark>
        <ok to="End"/>
        <error to="Kill"/>
    </action>
    <end name="End"/>
</workflow-app>

Ação do Spark em Execução no Hive

O Spark pode acessar os dados de gravação do Hive após o processamento. Veja a seguir um exemplo de programa que descreve um exemplo para criar e armazenar dados na tabela de funcionários.

from pyspark.sql import SparkSession
 
 
spark=SparkSession.builder.appName("oozie-hive").enableHiveSupport().getOrCreate()
 
print("##################### Starting execution ##################")
create_query = "create table if not exists employee (int id,string name)"
spark.sql(create_query).show()
ins_query = "insert into employee values (1, 'John')"
print("query = " + ins_query)
spark.sql(ins_query).show()
ins_query = "insert into employee values (2, 'Mary')"
print("query = " + ins_query)
spark.sql(ins_query).show()
print("##################### Execution finished #################")

Há uma distinção entre clusters HA e clusters não HA ao acessar o Hive usando o Spark. A principal diferença são os protocolos de autorização e acesso presentes para clusters HA. Exigimos permissões do hcat e do Ranger para acessar o Hive em um cluster HA.

Para clusters não HA, o arquivo XML de workflow a seguir é suficiente para acessar o Hive para o script anterior.

<workflow-app name="Spark-Hive-NHA" xmlns="uri:oozie:workflow:0.5">
    <start to="spark-ad44"/>
    <kill name="Kill">
        <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <action name="spark-ad44">
        <spark xmlns="uri:oozie:spark-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <master>yarn</master>
            <mode>client</mode>
            <name>MySpark</name>
            <jar>oozie-hive.py</jar>
            <file>oozie-hive.py#oozie-hive.py</file>
        </spark>
        <ok to="End"/>
        <error to="Kill"/>
    </action>
    <end name="End"/>
</workflow-app>

Para clusters HA, você deve fornecer a credencial hcat para o workflow. O exemplo a seguir contém hcat.metastore.urie hcat.metastore.principal para concluir a autorização. O Spark é integrado ao Ranger. Para trabalhar com tabelas Hive do Spark, você deve fornecer o keytab (deve estar presente no sistema de arquivos HDFS) do usuário em que o programa é executado como parte de spark-opts.

<workflow-app name="Spark-Hive-HA" xmlns="uri:oozie:workflow:0.5">
  <credentials>
    <credential name="hcat" type="hcat">
      <property>
        <name>hcat.metastore.uri</name>
        <value>thrift://training-cluster.bmbdcsad1.bmbdcs.oraclevcn.com:9083</value>
      </property>
      <property>
        <name>hcat.metastore.principal</name>
        <value>hive/training-cluster-un0.bmbdcsad1.bmbdcs.oraclevcn.com@BDSCLOUD.ORACLE.COM</value>
      </property>
    </credential>
  </credentials>
    <start to="spark-345a"/>
    <kill name="Kill">
        <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <action name="spark-345a" cred="hcat">
        <spark xmlns="uri:oozie:spark-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <master>yarn</master>
            <mode>client</mode>
            <name>oozie-hive</name>
            <jar>oozie-hive.py</jar>
              <spark-opts>--principal training/training-cluster-un0.bmbdcsad1.bmbdcs.oraclevcn.com@BDSCLOUD.ORACLE.COM --keytab training.service.keytab</spark-opts>
            <file>oozie-hive.py#oozie-hive.py</file>
            <file>training.service.keytab#training.service.keytab</file>
        </spark>
        <ok to="End"/>
        <error to="Kill"/>
    </action>
    <end name="End"/>
</workflow-app>

Ação do Spark em Execução no Serviço Object Storage

No Big Data Service, o Spark fornece acesso ao Object Storage usando uma chave de API criada para o cluster. Veja a seguir um exemplo de Armazenamento de Objetos, que armazena a contagem de palavras de arquivo no Armazenamento de Objetos em output.

from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("spark-object-store")
sc = SparkContext(conf=conf)
text_file = sc.textFile("hdfs:///data/input/testFile.txt")
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("oci://training-bucket@namesparce/output/")

Os seguintes parâmetros de chave de API são fornecidos como argumento em spark-opts:

  • spark.hadoop.BDS_OSS_CLIENT_AUTH_USERID
  • spark.hadoop.BDS_OSS_CLIENT_AUTH_TENANTID
  • spark.hadoop.BDS_OSS_CLIENT_AUTH_FINGERPRINT
  • conf spark.hadoop.BDS_OSS_CLIENT_AUTH_PEMFILEPATH
  • spark.hadoop.BDS_OSS_CLIENT_REGION
  • spark.hadoop.BDS_OSS_CLIENT_AUTH_PASSPHRASE

Os detalhes desses parâmetros podem ser obtidos dos detalhes da chave de API do usuário e podem ser preenchidos no XML do workflow mostrado aqui:

<workflow-app name="Spark-Obj" xmlns="uri:oozie:workflow:0.5">
    <start to="spark-74ff"/>
    <kill name="Kill">
        <message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <action name="spark-74ff">
        <spark xmlns="uri:oozie:spark-action:0.2">
            <job-tracker>${jobTracker}</job-tracker>
            <name-node>${nameNode}</name-node>
            <master>yarn</master>
            <mode>client</mode>
            <name>MySpark</name>
            <jar>spark-object-storage.py</jar>
              <spark-opts>--conf spark.hadoop.BDS_OSS_CLIENT_AUTH_USERID=<userOcid> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_TENANTID=<Tenancy ID> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_FINGERPRINT=<Fingerprint> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_PEMFILEPATH=<Location of Pem File> --conf spark.hadoop.BDS_OSS_CLIENT_REGION=<Region> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_PASSPHRASE=<PassPhrase></spark-opts>
            <file>spark-object-storage.py#spark-object-storage.py</file>
        </spark>
        <ok to="End"/>
        <error to="Kill"/>
    </action>
    <end name="End"/>
</workflow-app>

Adicionando Bibliotecas de Usuário para Execução

Adicione bibliotecas de usuário usando as opções do Spark spark.driver.extraclasspath ou spark.executor.extraclasspath.