Ação do Spark em Jobs do Spark
Ação do Spark em Execução no HDFS
O Spark pode acessar o sistema de arquivos HDFS para ler e gravar dados após o processamento. Veja a seguir um programa de amostra e descreve uma contagem de palavras de testFile.txt
armazenadas em /data/input/
no HDFS. Esta amostra armazena o resultado em /data/output/
. Esse arquivo de amostra é nomeado como pysparkWc.py
.
from pyspark import SparkContext, SparkConf
appNameTEST ="my first working application"
conf = SparkConf().setAppName(appNameTEST)
sc = SparkContext(conf=conf)
text_file = sc.textFile("hdfs:///data/input/testFile.txt")
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs:///data/output/")
Veja a seguir o XML do workflow para executar o job de amostra. O arquivo pyspark anterior deve ser colocado no mesmo local que workflow.xml
.
<workflow-app name="Spark Test" xmlns="uri:oozie:workflow:0.5">
<start to="spark-ac92"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="spark-ac92">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn</master>
<mode>client</mode>
<name>MySpark</name>
<jar>pysparkWc.py</jar>
<file>pysparkWc.py#pysparkWc.py</file>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>
Ação do Spark em Execução no Hive
O Spark pode acessar os dados de gravação do Hive após o processamento. Veja a seguir um exemplo de programa que descreve um exemplo para criar e armazenar dados na tabela de funcionários.
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("oozie-hive").enableHiveSupport().getOrCreate()
print("##################### Starting execution ##################")
create_query = "create table if not exists employee (int id,string name)"
spark.sql(create_query).show()
ins_query = "insert into employee values (1, 'John')"
print("query = " + ins_query)
spark.sql(ins_query).show()
ins_query = "insert into employee values (2, 'Mary')"
print("query = " + ins_query)
spark.sql(ins_query).show()
print("##################### Execution finished #################")
Há uma distinção entre clusters HA e clusters não HA ao acessar o Hive usando o Spark. A principal diferença são os protocolos de autorização e acesso presentes para clusters HA. Exigimos permissões do hcat e do Ranger para acessar o Hive em um cluster HA.
Para clusters não HA, o arquivo XML de workflow a seguir é suficiente para acessar o Hive para o script anterior.
<workflow-app name="Spark-Hive-NHA" xmlns="uri:oozie:workflow:0.5">
<start to="spark-ad44"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="spark-ad44">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn</master>
<mode>client</mode>
<name>MySpark</name>
<jar>oozie-hive.py</jar>
<file>oozie-hive.py#oozie-hive.py</file>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>
Para clusters HA, você deve fornecer a credencial hcat para o workflow. O exemplo a seguir contém hcat.metastore.uri
e hcat.metastore.principal
para concluir a autorização. O Spark é integrado ao Ranger. Para trabalhar com tabelas Hive do Spark, você deve fornecer o keytab (deve estar presente no sistema de arquivos HDFS) do usuário em que o programa é executado como parte de spark-opts
.
<workflow-app name="Spark-Hive-HA" xmlns="uri:oozie:workflow:0.5">
<credentials>
<credential name="hcat" type="hcat">
<property>
<name>hcat.metastore.uri</name>
<value>thrift://training-cluster.bmbdcsad1.bmbdcs.oraclevcn.com:9083</value>
</property>
<property>
<name>hcat.metastore.principal</name>
<value>hive/training-cluster-un0.bmbdcsad1.bmbdcs.oraclevcn.com@BDSCLOUD.ORACLE.COM</value>
</property>
</credential>
</credentials>
<start to="spark-345a"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="spark-345a" cred="hcat">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn</master>
<mode>client</mode>
<name>oozie-hive</name>
<jar>oozie-hive.py</jar>
<spark-opts>--principal training/training-cluster-un0.bmbdcsad1.bmbdcs.oraclevcn.com@BDSCLOUD.ORACLE.COM --keytab training.service.keytab</spark-opts>
<file>oozie-hive.py#oozie-hive.py</file>
<file>training.service.keytab#training.service.keytab</file>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>
Ação do Spark em Execução no Serviço Object Storage
No Big Data Service, o Spark fornece acesso ao Object Storage usando uma chave de API criada para o cluster. Veja a seguir um exemplo de Armazenamento de Objetos, que armazena a contagem de palavras de arquivo no Armazenamento de Objetos em output
.
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("spark-object-store")
sc = SparkContext(conf=conf)
text_file = sc.textFile("hdfs:///data/input/testFile.txt")
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("oci://training-bucket@namesparce/output/")
Os seguintes parâmetros de chave de API são fornecidos como argumento em spark-opts
:
spark.hadoop.BDS_OSS_CLIENT_AUTH_USERID
spark.hadoop.BDS_OSS_CLIENT_AUTH_TENANTID
spark.hadoop.BDS_OSS_CLIENT_AUTH_FINGERPRINT
conf spark.hadoop.BDS_OSS_CLIENT_AUTH_PEMFILEPATH
spark.hadoop.BDS_OSS_CLIENT_REGION
spark.hadoop.BDS_OSS_CLIENT_AUTH_PASSPHRASE
Os detalhes desses parâmetros podem ser obtidos dos detalhes da chave de API do usuário e podem ser preenchidos no XML do workflow mostrado aqui:
<workflow-app name="Spark-Obj" xmlns="uri:oozie:workflow:0.5">
<start to="spark-74ff"/>
<kill name="Kill">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<action name="spark-74ff">
<spark xmlns="uri:oozie:spark-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<master>yarn</master>
<mode>client</mode>
<name>MySpark</name>
<jar>spark-object-storage.py</jar>
<spark-opts>--conf spark.hadoop.BDS_OSS_CLIENT_AUTH_USERID=<userOcid> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_TENANTID=<Tenancy ID> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_FINGERPRINT=<Fingerprint> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_PEMFILEPATH=<Location of Pem File> --conf spark.hadoop.BDS_OSS_CLIENT_REGION=<Region> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_PASSPHRASE=<PassPhrase></spark-opts>
<file>spark-object-storage.py#spark-object-storage.py</file>
</spark>
<ok to="End"/>
<error to="Kill"/>
</action>
<end name="End"/>
</workflow-app>
Adicionando Bibliotecas de Usuário para Execução
Adicione bibliotecas de usuário usando as opções do Spark spark.driver.extraclasspath
ou spark.executor.extraclasspath
.