Sparkを使用したオブジェクト・ストレージの構成

Sparkでは、オブジェクト・ストレージ接続は、spark-sqlspark-shellspark-submitなどを使用して、オブジェクト・ストレージでホストされているデータベースで問合せを実行するために使用されます。

前提条件

  • クラスタ内のHadoopユーザーは、Sparkグループの一部である必要があります。

    Linuxの場合:

    id bdsuser 
    uid=54330(bdsuser) gid=54339(superbdsgroup) groups=54339(superbdsgroup,985(hadoop),984(hdfs),980(hive),977(spark)

    レンジャーで:

    Rangerを使用したSparkグループへのユーザーの追加

    Rangerで、「ユーザー」タブにアクセスし、「グループ」列で、ユーザーがSparkグループの一部であることを確認します。

  • SparkおよびHadoopユーザーは、Rangerの次のポリシーに追加する必要があります。
    • tag.download.auth.users
    • policies.download.auth.users

    確認するには、Ranger UIで「Access Manager」「リソース・ベースのポリシー」を選択し、Sparkインスタンスの「編集」ボタンを選択します。「構成プロパティ」ウィンドウが表示されます。

    例:

    Spark構成プロパティ・ウィンドウ
  • Hadoopユーザーがall - database, table, columnポリシーに追加され、データベース、表、列に対するSELECT権限が提供されていることを確認してください。確認するには:
    1. Ranger UIで、「Access Manager」「リソース・ベースのポリシー」を選択し、SPARK3リポジトリを選択します。
    2. ユーザーに対して作成されたポリシーを選択します。
    3. 「すべて- データベース、表、列」を選択し、「編集」を選択します。
    4. 「条件の許可」セクションで、Hadoopユーザーがリストされていることを確認します(表示されていない場合は、ユーザーを追加します)。
ノート

ビッグ・データ・サービス・クラスタ・ノードは、サービス構成および例の実行に使用できます。エッジ・ノードを使用するには、エッジ・ノードを作成してサインインする必要があります。
  1. (オプション)オブジェクト・ストレージの設定にエッジ・ノードを使用するには、最初にエッジ・ノードを作成してから、ノードにサインインします。次に、APIキーをun0ノードからEdgeノードにコピーします。
    sudo dcli rsync -a <un0-hostname>:/opt/oracle/bds/.oci_oos/ /opt/oracle/bds/.oci_oos/
  2. 十分な権限を持つユーザーと、必要なパスフレーズ値を持つJCEKSファイルを作成します。ローカルJCEKSファイルを作成する場合は、ファイルをすべてのノードにコピーし、ユーザー権限を変更します。
    sudo dcli -f <location_of_jceks_file> -d <location_of_jceks_file>
    sudo dcli chown <user>:<group> <location_of_jceks_file>
  3. 次のHADOOP_OPTSの組合せのいずれかをユーザーbashプロファイルに追加します。
    オプション1:
    export HADOOP_OPTS="$HADOOP_OPTS -DOCI_SECRET_API_KEY_ALIAS=<api_key_alias> 
    -DBDS_OSS_CLIENT_REGION=<api_key_region> -DOCI_SECRET_API_KEY_PASSPHRASE=<jceks_file_provider>"
    

    オプション2:

    export HADOOP_OPTS="$HADOOP_OPTS -DBDS_OSS_CLIENT_AUTH_FINGERPRINT=<api_key_fingerprint> 
    -DBDS_OSS_CLIENT_AUTH_PASSPHRASE=<jceks_file_provider> -DBDS_OSS_CLIENT_AUTH_PEMFILEPATH=<api_key_pem_file_path> 
    -DBDS_OSS_CLIENT_AUTH_TENANTID=<api_key_tenant_id> -DBDS_OSS_CLIENT_AUTH_USERID=<api_key_user_id> -DBDS_OSS_CLIENT_REGION=<api_key_region>"
    
  4. Ambariで、オブジェクト・ストレージ・アクセス用のhive-envテンプレートにHadoopオプションを追加します。
    1. Apache Ambariにアクセスします。
    2. サイド・ツールバーの「サービス」で、「Hive」を選択します。
    3. 「構成」を選択します。
    4. 「拡張」を選択します。
    5. 「パフォーマンス」セクションで、「拡張ハイブ環境」に移動します。
    6. hive-env templateに移動し、if [ "$SERVICE" = "metastore" ]; then行の下に次のいずれかのオプションを追加します。

      オプション1:

      export HADOOP_OPTS="$HADOOP_OPTS -DOCI_SECRET_API_KEY_ALIAS=<api_key_alias> 
      -DBDS_OSS_CLIENT_REGION=<api_key_region> 
      -DOCI_SECRET_API_KEY_PASSPHRASE=<jceks_file_provider>"

      オプション2:

      export HADOOP_OPTS="$HADOOP_OPTS -DBDS_OSS_CLIENT_AUTH_FINGERPRINT=<api_key_fingerprint> 
      -DBDS_OSS_CLIENT_AUTH_PASSPHRASE=<jceks_file_provider> -DBDS_OSS_CLIENT_AUTH_PEMFILEPATH=<api_key_pem_file_path> 
      -DBDS_OSS_CLIENT_AUTH_TENANTID=<api_key_tenant_id> -DBDS_OSS_CLIENT_AUTH_USERID=<api_key_user_id> 
      -DBDS_OSS_CLIENT_REGION=<api_key_region>"
  5. Ambariを使用して、必要なすべてのサービスを再起動します。
  6. 次のいずれかのサンプル・コマンドを実行して、spark SQLシェルを起動します。

    例1:

    spark-sql --conf spark.driver.extraJavaOptions="${HADOOP_OPTS}" --conf spark.executor.extraJavaOptions="${HADOOP_OPTS}" 
    

    例2: APIキーの別名およびパスフレーズの使用。

    spark-sql --conf spark.hadoop.OCI_SECRET_API_KEY_PASSPHRASE=<api_key_passphrase> 
    --conf spark.hadoop.OCI_SECRET_API_KEY_ALIAS=<api_key_alias> 
    --conf spark.hadoop.BDS_OSS_CLIENT_REGION=<api_key_region>
    

    例3: IAM Service APIキー・パラメータの使用。

    spark-sql --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_USERID=<api_key_user_id> 
    --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_TENANTID=<api_key_tenant_id> 
    --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_FINGERPRINT=<api_key_fingerprint> 
    --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_PEMFILEPATH=<api_key_pem_file_path>> 
    --conf spark.hadoop.BDS_OSS_CLIENT_REGION=<api_key_region> --conf spark.hadoop.BDS_OSS_CLIENT_AUTH_PASSPHRASE=<api_key_passphrase>
    
    ノート

    Javaヒープ領域に問題がある場合は、Spark SQLの一部としてドライバおよびエグゼキュータ・メモリーを渡します。たとえば、--driver-memory 2g –executor-memory 4gです。spark-sql文の例:
    spark-sql --conf spark.driver.extraJavaOptions="${HADOOP_OPTS}" 
    --conf spark.executor.extraJavaOptions="${HADOOP_OPTS}" 
    --driver-memory 2g --executor-memory 4g
  7. オブジェクト・ストレージの接続性の確認:

    管理対象表の例:

    CREATE DATABASE IF NOT EXISTS <database_name> LOCATION 'oci://<bucket-name>@<namespace>/';
    USE <database_name>;
    CREATE TABLE IF NOT EXISTS <table_name> (id int, name string) partitioned by (part int, part2 int) STORED AS parquet;
    INSERT INTO <table_name> partition(part=1, part2=1) values (333, 'Object Storage Testing with Spark SQL Managed Table');
    SELECT * from <table_name>;
     

    外部表の例:

    CREATE DATABASE IF NOT EXISTS <database_name> LOCATION 'oci://<bucket-name>@<namespace>/';
    USE <database_name>;
    CREATE EXTERNAL TABLE IF NOT EXISTS <table_name> (id int, name string) partitioned by (part int, part2 int) STORED AS parquet LOCATION 'oci://<bucket-name>@<namespace>/';
    INSERT INTO <table_name> partition(part=1, part2=1) values (999, 'Object Storage Testing with Spark SQL External Table');
    SELECT * from <table_name>;
  8. (オプション)オブジェクト・ストレージでpysparkspark-submitとともに使用します。
    ノート

    これらのステップを実行する前に、データベースおよび表を作成します。
    1. 次のコードを実行します。
      from pyspark.sql import SparkSession
      import datetime
      import random
      import string
       
      spark=SparkSession.builder.appName("object-storage-testing-spark-submit").config("spark.hadoop.OCI_SECRET_API_KEY_PASSPHRASE","<jceks-provider>").config("spark.hadoop.OCI_SECRET_API_KEY_ALIAS",
            "<api_key_alias>").enableHiveSupport().getOrCreate()
       
      execution_time = datetime.datetime.now().strftime("%m/%d/%Y, %H:%M:%S")
      param1 = 12345
      param2 = ''.join(random.choices(string.ascii_uppercase + string.digits, k = 8))
      ins_query = "INSERT INTO <database_name>.<table_name> partition(part=1, part2=1) values ({},'{}')".format(param1,param2)
      print("##################### Starting execution ##################" + execution_time)
      print("query = " + ins_query)
      spark.sql(ins_query).show()
      spark.sql("select * from <database_name>.<table_name>").show()
      print("##################### Execution finished #################")
    2. /usr/lib/spark/binから次のコマンドを実行します。
      ./spark-submit --conf spark.driver.extraJavaOptions="${HADOOP_OPTS}" --conf spark.executor.extraJavaOptions="${HADOOP_OPTS}" <location_of_python_file>