Connection

%python

from pyspark.sql import SparkSession

def get_yarn_connection(conn_type='kerberos', user_principal=default_user_principal, keytab_file=default_keytab_file, extraConfigs=None, hive_support=False):

extraConfigs = extraConfigs or {}

if not isinstance(extraConfigs, dict):

raise TypeError("extraConfigs must be a dict")

if conn_type.lower() == 'kerberos':

builder = (

SparkSession.builder

.appName("MMGYarnKerberos")

.master("yarn")

.config("spark.submit.deployMode", "client")

.config("spark.kerberos.principal", user_principal)

.config("spark.kerberos.keytab", keytab_file)

)

elif conn_type.lower() == 'impersonation':

builder = (

SparkSession.builder

.appName("MMGYarnImpersonate")

.master("yarn")

.config("spark.submit.deployMode", "client")

.config("spark.sql.hive.thriftServer.singleSession", "true")

.config("spark.hadoop.hive.server2.proxy.user", user_principal) # Proxy/Impersonated user. Ensure proxy login is enabled in HIVE.

.config("spark.hadoop.hive.metastore.kerberos.principal", default_service_principal) # Service principal

.config("spark.hadoop.hive.metastore.kerberos.keytab.file", keytab_file) # Should be the keytab of service principal, not the user

.config("spark.hadoop.hive.metastore.sasl.enabled", "true")

.config("spark.sql.catalogImplementation", "hive")

)

else:

raise ValueError(f"Unsupported connection mode {conn_type} to get YARN connection.")

for k, v in extraConfigs.items():

builder = builder.config(k, v)

if hive_support:

builder = builder.enableHiveSupport()

spark = builder.getOrCreate()

return spark