Connection
%python
from pyspark.sql import SparkSession
def get_yarn_connection(conn_type='kerberos',
user_principal=default_user_principal, keytab_file=default_keytab_file,
extraConfigs=None, hive_support=False):
extraConfigs = extraConfigs or {}
if not isinstance(extraConfigs, dict):
raise TypeError("extraConfigs must be a dict")
if conn_type.lower() == 'kerberos':
builder = (
SparkSession.builder
.appName("MMGYarnKerberos")
.master("yarn")
.config("spark.submit.deployMode", "client")
.config("spark.kerberos.principal", user_principal)
.config("spark.kerberos.keytab", keytab_file)
)
elif conn_type.lower() == 'impersonation':
builder = (
SparkSession.builder
.appName("MMGYarnImpersonate")
.master("yarn")
.config("spark.submit.deployMode", "client")
.config("spark.sql.hive.thriftServer.singleSession", "true")
.config("spark.hadoop.hive.server2.proxy.user", user_principal) #
Proxy/Impersonated user. Ensure proxy login is enabled in HIVE.
.config("spark.hadoop.hive.metastore.kerberos.principal",
default_service_principal) # Service principal
.config("spark.hadoop.hive.metastore.kerberos.keytab.file", keytab_file) # Should
be the keytab of service principal, not the user
.config("spark.hadoop.hive.metastore.sasl.enabled", "true")
.config("spark.sql.catalogImplementation", "hive")
)
else:
raise ValueError(f"Unsupported connection mode {conn_type} to get YARN
connection.")
for k, v in extraConfigs.items():
builder = builder.config(k, v)
if hive_support:
builder = builder.enableHiveSupport()
spark = builder.getOrCreate()
return spark