Usar o Seu Token de Delegação do Serviço Data Flow para Acessar Recursos do Oracle Cloud Infrastructure

Quando você cria uma Execução do Serviço Data Flow, o serviço Data Flow configura automaticamente o job do Spark com um token de delegação. Esse token é usado para acessar os recursos do Oracle Cloud Infrastructure em seu nome.

É por isso que os jobs do Spark executados no Data Flow podem ler e gravar no Oracle Cloud Infrastructure Object Storage sem precisar de credenciais.

O token também pode ser usado para acessar outros sistemas aptos ao IAM no Oracle Cloud Infrastructure sem credenciais. Por exemplo:
  • Seus aplicativos podem acessar o serviço Vault para recuperar senhas ou outros segredos.
  • Você pode acessar o serviço NoSQL para gerenciar estados fora do aplicativo Spark. É comum rastrear marcas d 'água altas entre as execuções do Spark ou criar bloqueios distribuídos. O serviço NoSQL pode ser usado com essas finalidades.
  • Você pode chamar o Oracle Functions para outras tarefas de computação.
  • Você pode enviar e-mail com o serviço Email.

As chamadas para os serviços do Oracle Cloud Infrastructure podem ser intercaladas livremente com outro código do Spark. A página Referência de Política é um bom lugar para obter informações sobre todos os serviços ativados do IAM.

Este tutorial fornece modelos que você pode usar no código do Spark para criar clientes do Oracle Cloud Infrastructure. Esses clientes usam o token de delegação ao executar no serviço Data Flow ou usam a chave de API ao executar fora do serviço Data Flow. Você aprende a
  1. Usar o token de delegação dos aplicativos Java
  2. Usar o token de delegação dos aplicativos Python

Antes de Começar

Você deve estar autorizado a chamar o serviço Data Flow. Ele não eleva seus privilégios e só faz chamadas em seu nome. Se você puder fazer isso na Console do Oracle Cloud Infrastructure, seu job do Spark poderá fazê-lo no serviço Data Flow. Da mesma forma, se você puder fazê-lo no serviço Data Flow, seu job do Spark poderá fazê-lo na Console do Oracle Cloud Infrastructure.

1. Usando o Token de Delegação dos Aplicativos Java

A classe a seguir, OboTokenClientConfigurator.java, cria clientes autenticados do Oracle Cloud Infrastructure. No diretório base do projeto de exemplo, crie a estrutura de diretório de origem de exemplo com um arquivo vazio para colar o código OboTokenClientConfiguration.
$ mkdir -p src/main/java/example/
$ touch src/main/java/example/OboTokenClientConfigurator.java

Se seu aplicativo estiver sendo executado localmente, o código dependerá de um arquivo de configuração e de uma chave de API. Para obter mais informações sobre essa configuração, consulte Chaves e OCIDs Obrigatórios. Exemplo de OboTokenClientConfiguration.java:

package example;
 
import com.oracle.bmc.ConfigFileReader;
import com.oracle.bmc.Region;
import com.oracle.bmc.auth.BasicAuthenticationDetailsProvider;
import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider;
import com.oracle.bmc.auth.InstancePrincipalsAuthenticationDetailsProvider;
import com.oracle.bmc.hdfs.BmcProperties;
import com.oracle.bmc.http.ClientConfigurator;
import com.oracle.bmc.http.DefaultConfigurator;
import com.oracle.bmc.http.signing.internal.Constants;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
 
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.NoSuchElementException;
import javax.annotation.Priority;
import javax.ws.rs.Priorities;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.ClientRequestContext;
import javax.ws.rs.client.ClientRequestFilter;
 
/**
 * Customize the SDK underlying REST client to use the on-behalf-of token when running on
 * Data Flow.
 */
public class OboTokenClientConfigurator implements ClientConfigurator {
 
  // TODO: Set these values for your sepcific OCI environment
  public static final String LOCAL_PROFILE = "DEV"; // TODO <your ~/.oci/config profile>
  private static final String CONFIG_FILE_PATH = ConfigFileReader.DEFAULT_FILE_PATH;
  private static final String CANONICAL_REGION_NAME = Region.US_PHOENIX_1.getRegionId();
 
  private final String delegationTokenPath;
 
  /**
   * Helper function for the Spark Driver to get the token path.
   */
  public static String getDelegationTokenPath() {
    SparkConf conf = new SparkConf();
    try {
      return conf.get("spark.hadoop.fs.oci.client.auth.delegationTokenPath");
    } catch (NoSuchElementException e) {
      return null;
    }
  }
 
  /**
   * Helper function to get the Hadoop configuration for the HDFS BmcFileSystem.
   */
  public static Configuration getConfiguration(Configuration config, String delegationTokenPath) {
    // https://objectstorage.us-phoenix-1.oraclecloud.com
    String domain = "oraclecloud.com";
    String overlayEndpoint = String
        .format("https://objectstorage.%s.%s", CANONICAL_REGION_NAME, domain);
    config.set(BmcProperties.HOST_NAME.getPropertyName(), overlayEndpoint);
    // Data Flow
    if (delegationTokenPath != null) {
      config.set("fs.oci.client.auth.delegationTokenPath", delegationTokenPath);
      config.set(BmcProperties.OBJECT_STORE_CLIENT_CLASS.getPropertyName(),
          "oracle.dfcs.hdfs.DelegationObjectStorageClient");
    } else { // local
      try {
        ConfigFileAuthenticationDetailsProvider provider =
            new ConfigFileAuthenticationDetailsProvider(CONFIG_FILE_PATH, LOCAL_PROFILE);
        config.set(BmcProperties.TENANT_ID.getPropertyName(), provider.getTenantId());
        config.set(BmcProperties.USER_ID.getPropertyName(), provider.getUserId());
        config.set(BmcProperties.FINGERPRINT.getPropertyName(), provider.getFingerprint());
        config.set(BmcProperties.PEM_FILE_PATH.getPropertyName(), provider.getPemFilePath());
      } catch (IOException ex) {
        throw new RuntimeException(ex);
      }
    }
    return config;
  }
 
  /**
   * Helper function to get an environment specific authentication provider.
   */
  public static BasicAuthenticationDetailsProvider getAuthProvider(String delegationTokenPath) {
    if (delegationTokenPath == null) { // local
      try {
        return new ConfigFileAuthenticationDetailsProvider(CONFIG_FILE_PATH, LOCAL_PROFILE);
      } catch (IOException ex) {
        throw new RuntimeException(ex);
      }
    }
    // Data Flow
    return InstancePrincipalsAuthenticationDetailsProvider.builder().build();
  }
 
  /**
   * Helper function to get an environment specific <tt>ClientConfigurator</tt>.
   */
  public static ClientConfigurator getConfigurator(String delegationTokenPath) {
    return (delegationTokenPath == null) ? new DefaultConfigurator() : // local
        new OboTokenClientConfigurator(delegationTokenPath); // Data Flow
  }
 
  /**
   * Helper function to get an environment specific working directory.
   */
  public static String getTempDirectory() {
    if (System.getenv("HOME").equals("/home/dataflow")) {
      return "/opt/spark/work-dir/";
    }
    return System.getProperty("java.io.tmpdir");
  }
 
  public OboTokenClientConfigurator(String delegationTokenPath) {
    this.delegationTokenPath = delegationTokenPath;
  }
 
  @Override
  public void customizeBuilder(ClientBuilder builder) {
  }
 
  @Override
  public void customizeClient(Client client) {
    client.register(new _OboTokenRequestFilter());
  }
 
  @Priority(_OboTokenRequestFilter.PRIORITY)
  class _OboTokenRequestFilter implements ClientRequestFilter {
 
    public static final int PRIORITY = Priorities.AUTHENTICATION - 1;
 
    @Override
    public void filter(final ClientRequestContext requestContext) throws IOException {
      String token = new String(Files.readAllBytes(Paths.get(delegationTokenPath)));
      requestContext.getHeaders().putSingle(Constants.OPC_OBO_TOKEN, token);
    }
  }
}
O exemplo a seguir mostra como usar o token de delegação com o Oracle Cloud Infrastructure Object Storage. Execute este comando:
$ touch src/main/java/example/Example.java
Exmaple.java
package example;
 
import com.oracle.bmc.auth.AbstractAuthenticationDetailsProvider;
import com.oracle.bmc.hdfs.BmcFilesystem;
import com.oracle.bmc.http.ClientConfigurator;
import com.oracle.bmc.objectstorage.ObjectStorageClient;
import com.oracle.bmc.objectstorage.requests.GetNamespaceRequest;
import com.oracle.bmc.objectstorage.requests.PutObjectRequest;
import com.oracle.bmc.objectstorage.responses.GetNamespaceResponse;
import com.oracle.bmc.objectstorage.transfer.UploadConfiguration;
import com.oracle.bmc.objectstorage.transfer.UploadManager;
import com.oracle.bmc.objectstorage.transfer.UploadManager.UploadRequest;
 
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkFiles;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
 
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.text.MessageFormat;
import java.util.Arrays;
import java.util.List;
 
// This demo illustrates:
// 1. The delegation token path must be looked up in the Driver and passed to the Executor.
// 2. If the token path is null, assume you are running locally and load an API key.
// 3. Using the HDFS-Connector to create an HDFS FileSystem to connect to Object Storage.
public class Example {
 
  // TODO: Set these values for your sepcific OCI environment
  private static final String NAMESPACE = "<your tenant namespace>";
  private static final String BUCKET_NAME = "output"; // ensure that you create this bucket
 
  private static final String OCI_URI = "oci://" + BUCKET_NAME + "@" + NAMESPACE;
  private static final String SAMPLE_JOB_PATH = "/Example";
  private static final String INPUT_FILE = SAMPLE_JOB_PATH + "/input.dat";
 
  public static void main(String[] args) throws Exception {
    // Get our Spark session.
    SparkConf conf = new SparkConf();
    String master = conf.get("spark.master", "local[*]");
    SparkSession spark = SparkSession.builder().appName("Example").master(master).getOrCreate();
    JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
 
    String delegationTokenPath = OboTokenClientConfigurator.getDelegationTokenPath();
 
    // write a file to Object Storage using an HDFS FileSystem
    try (final BmcFilesystem fs = new BmcFilesystem())
    {
      fs.initialize(new URI(OCI_URI), OboTokenClientConfigurator.getConfiguration(
          jsc.hadoopConfiguration(), delegationTokenPath));
      fs.delete(new Path(SAMPLE_JOB_PATH), true);
      final FSDataOutputStream output = fs.create(new Path(INPUT_FILE));
      output.writeChars("example\npath\ngak\ntest\nexample\ngak\n\ngak");
      output.close();
    }
 
    // Test adding a Spark file, equivalent to spark.files or --files
    jsc.addFile(OCI_URI + INPUT_FILE); // Do not use this for data files
    // Executor -> read a file
    JavaRDD<Integer> rdd = jsc.parallelize(Arrays.asList(1, 2), 2);
    rdd.foreach(item -> executorGetFile());
 
    List<Integer> collection = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
    rdd = jsc.parallelize(collection, 4);
    rdd.foreach(item -> copyFileSample(item, delegationTokenPath));
 
    // Executor -> write a file to Object Storage using an HDFS FileSystem
    rdd = jsc.parallelize(Arrays.asList(1, 2), 2);
    rdd.foreach(item -> fileSystemSample(item, delegationTokenPath));
 
    // ---------------------------------------------------------------------
    // <COPY CODE SNIPPET EXAMPLE #2 HERE>
 
    // ---------------------------------------------------------------------
 
    // ---------------------------------------------------------------------
    // <COPY CODE SNIPPET EXAMPLE #3 HERE>
 
    // ---------------------------------------------------------------------
 
    jsc.stop();
  }
 
  public static void executorGetFile() {
    String filepath = SparkFiles.get("input.dat");
    File file = new File(filepath);
    try {
      String data = IOUtils.toString(new FileInputStream(file), StandardCharsets.UTF_8);
      System.out.println("Read a file:\n" + data);
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }
 
  public static void fileSystemSample(int i, String delegationTokenPath) throws Exception {
    // write a file to Object Storage using a FileSystem
    try (final BmcFilesystem fs = new BmcFilesystem())
    {
      fs.initialize(new URI(OCI_URI), OboTokenClientConfigurator.getConfiguration(
          new Configuration(), delegationTokenPath));
      fs.delete(new Path(SAMPLE_JOB_PATH + "/" + i), true);
      final FSDataOutputStream output = fs.create(new Path(INPUT_FILE));
      output.writeChars("example\npath\ngak\ntest\nexample\ngak\n\ngak");
      output.close();
    }
  }
 
  // The driver needs to pass the delegation token path to executors.
  public static void copyFileSample(int i, String delegationTokenPath)
      throws Exception {
    // Create a file to upload.
    String outputFile = MessageFormat.format("{0}.txt", i);
    String outputPath = "/tmp/" + outputFile;
    Writer wr = new FileWriter(outputPath);
    wr.write(String.valueOf(i));
    wr.close();
 
    AbstractAuthenticationDetailsProvider provider =
        OboTokenClientConfigurator.getAuthProvider(delegationTokenPath);
    ClientConfigurator configurator = OboTokenClientConfigurator.getConfigurator(delegationTokenPath);
 
    // Create an object storage client.
    ObjectStorageClient client = ObjectStorageClient.builder()
        .clientConfigurator(configurator)
        .build(provider);
 
    // Look up our namespace.
    GetNamespaceResponse namespaceResponse = client.getNamespace(GetNamespaceRequest.builder().build());
    String namespaceName = namespaceResponse.getValue();
 
    // Upload the file.
    UploadConfiguration uploadConfiguration = UploadConfiguration.builder()
        .allowMultipartUploads(true)
        .allowParallelUploads(true)
        .build();
    UploadManager uploadManager = new UploadManager(client, uploadConfiguration);
    PutObjectRequest request = PutObjectRequest.builder()
        .bucketName(BUCKET_NAME)
        .namespaceName(namespaceName)
        .objectName(outputFile)
        .contentType("text/plain")
        .contentLanguage("EN")
        .contentEncoding("UTF-8")
        .build();
    UploadRequest uploadDetails = UploadRequest.builder(new File(outputPath))
        .allowOverwrite(true)
        .build(request);
    uploadManager.upload(uploadDetails);
  }
}
Você pode estabelecer conexão com o serviço Vault usando o código a seguir. Copie-o para o arquivo example.java na seção <COPY CODE SNIPPET EXAMPLE #2 HERE>.
<!-- Add the Vault SDK dependency to our example project pom.xml below the Secrets SDK dependency -->
    <dependency>
      <groupId>com.oracle.oci.sdk</groupId>
      <artifactId>oci-java-sdk-vault</artifactId>
      <version>${oci-java-sdk-version}</version>
    </dependency>
 
// ---------------------------------------------------------------------
// Add these imports to the top of the file
import com.oracle.bmc.secrets.SecretsClient;
import com.oracle.bmc.secrets.requests.GetSecretBundleRequest;
import com.oracle.bmc.secrets.responses.GetSecretBundleResponse;
import com.oracle.bmc.secrets.model.Base64SecretBundleContentDetails;
import org.apache.commons.codec.binary.Base64;
import com.oracle.bmc.auth.BasicAuthenticationDetailsProvider;
// ---------------------------------------------------------------------
 
// <COPY CODE SNIPPET EXAMPLE #2 HERE>
 
    String passwordOcid = null; // TODO <the vault secret OCID>
 
    BasicAuthenticationDetailsProvider provider =
        OboTokenClientConfigurator.getAuthProvider(delegationTokenPath);
    ClientConfigurator configurator = OboTokenClientConfigurator.getConfigurator(delegationTokenPath);
    SecretsClient secretsClient = SecretsClient.builder().clientConfigurator(configurator).build(provider);
 
    // create get secret bundle request
    GetSecretBundleRequest getSecretBundleRequest = GetSecretBundleRequest
        .builder()
        .secretId(passwordOcid)
        .stage(GetSecretBundleRequest.Stage.Current)
        .build();
 
    // get the secret
    GetSecretBundleResponse getSecretBundleResponse = secretsClient.
        getSecretBundle(getSecretBundleRequest);
 
    // get the bundle content details
    Base64SecretBundleContentDetails base64SecretBundleContentDetails =
        (Base64SecretBundleContentDetails) getSecretBundleResponse.
            getSecretBundle().getSecretBundleContent();
 
    // decode the encoded secret
    byte[] secretValueDecoded = Base64.decodeBase64(base64SecretBundleContentDetails.getContent());
    System.out.println("Secret: " + new String(secretValueDecoded, StandardCharsets.UTF_8));
Para estabelecer conexão com o Autonomous Database para Análise e Data Warehousing em seu Aplicativo, faça download da wallet do Autonomous Database para Análise e Data Warehousing. Siga estas etapas:
  1. Faça log-in no Oracle Cloud Infrastructure.
  2. Pesquise Autonomous Database.
  3. Selecione sua instância ADW.
  4. Selecione Conexão do BD.
  5. SelecioneFazer download da wallet.
  6. Informe a senha.
  7. Salvar a wallet no serviço Object Storage.
  8. Adicione este código ao projeto de exemplo:
    <!-- Add the ADW dependencies to our example project pom.xml below the Secrets SDK dependency -->
        <!-- Drivers for talking to ADW. Jars need to be deployed using mvn deploy:deploy-file -->
        <dependency>
          <groupId>com.oracle.database.jdbc</groupId>
          <artifactId>ojdbc8</artifactId>
          <version>18.3.0.0</version>
        </dependency>
        <dependency>
          <groupId>com.oracle.database.jdbc</groupId>
          <artifactId>ucp</artifactId>
          <version>18.3.0.0</version>
        </dependency>
        <dependency>
          <groupId>com.oracle.database.security</groupId>
          <artifactId>oraclepki</artifactId>
          <version>18.3.0.0</version>
        </dependency>
        <dependency>
          <groupId>com.oracle.database.security</groupId>
          <artifactId>osdt_cert</artifactId>
          <version>18.3.0.0</version>
        </dependency>
        <dependency>
          <groupId>com.oracle.database.security</groupId>
          <artifactId>osdt_core</artifactId>
          <version>18.3.0.0</version>
        </dependency>
     
    // ---------------------------------------------------------------------
    // Add these imports to the top of the file
    import java.util.ArrayList;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.types.StructType;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.Dataset;
    import java.util.Map;
    import java.util.HashMap;
    import oracle.jdbc.driver.OracleConnection;
    // ---------------------------------------------------------------------
     
        // <COPY CODE SNIPPET EXAMPLE #3 HERE>
     
        // TODO <set these values as appropriate to access your ADW using your ADW wallet
        String walletPath = OCI_URI + "/Wallet_EXAMPLEADW.zip";
        String user = "ADMIN";
        String tnsName = "exampleadw_high"; // this can be found inside of the wallet.zip (unpack it)
        byte[] secretValueDecoded = "example_secret".getBytes();
     
        // Build a 2 row data set to save to ADW.
        // Usually you would load data from CSV/Parquet, this is to keep the example
        // simple.
        List<String[]> stringAsList = new ArrayList<>();
        stringAsList.add(new String[] { "value11", "value21" });
        stringAsList.add(new String[] { "value12", "value22" });
        JavaSparkContext sparkContext = new JavaSparkContext(spark.sparkContext());
        JavaRDD<Row> rowRDD = sparkContext.parallelize(stringAsList).map(RowFactory::create);
        StructType schema = DataTypes
            .createStructType(new StructField[] { DataTypes.createStructField("col1", DataTypes.StringType, false),
                DataTypes.createStructField("col2", DataTypes.StringType, false) });
        Dataset<Row> df = spark.sqlContext().createDataFrame(rowRDD, schema).toDF();
     
        // Download the wallet from object storage and distribute it.
        String tmpPath = DataFlowDeployWallet.deployWallet(new URI(OCI_URI), spark.sparkContext(),
            OboTokenClientConfigurator.getConfiguration(jsc.hadoopConfiguration(), delegationTokenPath), walletPath);
     
        // Configure the ADW JDBC URL.
        String jdbcUrl = MessageFormat.format("jdbc:oracle:thin:@{0}?TNS_ADMIN={1}", tnsName, tmpPath);
        System.out.println("JDBC URL " + jdbcUrl);
     
        String password = new String(secretValueDecoded);
     
        // Save data to ADW.
        System.out.println("Saving to ADW");
        Map<String, String> options = new HashMap<String, String>();
        options.put("driver", "oracle.jdbc.driver.OracleDriver");
        options.put("url", jdbcUrl);
        options.put(OracleConnection.CONNECTION_PROPERTY_USER_NAME, user);
        options.put(OracleConnection.CONNECTION_PROPERTY_PASSWORD, password);
        options.put(OracleConnection.CONNECTION_PROPERTY_TNS_ADMIN, tmpPath);
        options.put("dbtable", "sample");
        df.write().format("jdbc").options(options).mode("Overwrite").save();
        System.out.println("Done writing to ADW");
  9. Acesse o arquivo da wallet usando a classe auxiliar DataFlowDeployWallet.java.
    1. Execute este comando para criar um arquivo:
      $ touch src/main/java/example/DataFlowDeployWallet.java
    2. Copie este código no arquivo:
      /*
       * Copyright © 2021, Oracle and/or its affiliates.
       * The Universal Permissive License (UPL), Version 1.0
       */
      package example;
       
      import com.oracle.bmc.hdfs.BmcFilesystem;
       
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.Path;
      import org.apache.spark.SparkContext;
       
      import java.io.BufferedOutputStream;
      import java.io.File;
      import java.io.FileInputStream;
      import java.io.FileOutputStream;
      import java.io.IOException;
      import java.net.URI;
      import java.net.URISyntaxException;
      import java.util.Arrays;
      import java.util.List;
      import java.util.zip.ZipEntry;
      import java.util.zip.ZipInputStream;
       
      /*
       * Helper to deploy a wallet to the Spark cluster.
       *
       * This only needs to be done once and should be done in the Spark driver.
       */
      public class DataFlowDeployWallet {
       
          private static final int BUFFER_SIZE = 4096;
       
          public static String deployWallet(URI oci_uri, SparkContext sc, Configuration configuration, String walletPath)
                  throws IOException, URISyntaxException {
              try (final BmcFilesystem fs = new BmcFilesystem()) {
                  fs.initialize(oci_uri, configuration);
       
                  String tmpPath = downloadAndExtract(fs, new Path(walletPath));
       
                  List<String> walletContents = Arrays.asList("cwallet.sso", "ewallet.p12", "keystore.jks", "ojdbc.properties",
                          "sqlnet.ora", "tnsnames.ora", "truststore.jks");
                  for (String file : walletContents) {
                      sc.addFile(tmpPath + file);
                  }
                  return tmpPath;
              }
          }
       
          private static String downloadAndExtract(BmcFilesystem bmc, Path walletRemotePath)
                  throws IllegalArgumentException, IOException {
              String tmpPath = OboTokenClientConfigurator.getTempDirectory();
              String walletLocal = tmpPath + "wallet.zip";
              bmc.copyToLocalFile(walletRemotePath, new Path(walletLocal));
              unzip(walletLocal, tmpPath);
              return tmpPath;
          }
       
          private static void unzip(String zipFilePath, String destDirectory) throws IOException {
              File destDir = new File(destDirectory);
              if (!destDir.exists()) {
                  destDir.mkdir();
              }
              ZipInputStream zipIn = new ZipInputStream(new FileInputStream(zipFilePath));
              ZipEntry entry = zipIn.getNextEntry();
              // iterates over entries in the zip file
              while (entry != null) {
                  String filePath = destDirectory + File.separator + entry.getName();
                  if (!entry.isDirectory()) {
                      // if the entry is a file, extracts it
                      extractFile(zipIn, filePath);
                  } else {
                      // if the entry is a directory, make the directory
                      File dir = new File(filePath);
                      dir.mkdir();
                  }
                  zipIn.closeEntry();
                  entry = zipIn.getNextEntry();
              }
              zipIn.close();
          }
       
          private static void extractFile(ZipInputStream zipIn, String filePath) throws IOException {
              BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(filePath));
              byte[] bytesIn = new byte[BUFFER_SIZE];
              int read = 0;
              while ((read = zipIn.read(bytesIn)) != -1) {
                  bos.write(bytesIn, 0, read);
              }
              bos.close();
          }
      }

2. Usando o Token de Delegação dos Aplicativos Python

O código a seguir permite criar clientes autenticados no Python. Se o aplicativo estiver sendo executado localmente, o código dependerá de um arquivo de configuração e de uma chave de API. Para obter mais informações sobre como configurá-lo, consulte Chaves e OCIDs Obrigatórios.
# Helper Functions
def get_token_path(spark):
    token_key = "spark.hadoop.fs.oci.client.auth.delegationTokenPath"
    token_path = spark.sparkContext.getConf().get(token_key)
    return token_path

def get_authenticated_client(token_path, client):
    import oci
    import os

    if token_path is None:
        # You are running locally, so use our API Key.
        config = oci.config.from_file()
        authenticated_client = client(config)
    else:
        # You are running in Data Flow, so use our Delegation Token.
        with open(token_path) as fd:
            delegation_token = fd.read()
        signer = oci.auth.signers.InstancePrincipalsDelegationTokenSigner(
            delegation_token=delegation_token
        )
        authenticated_client = client(config={}, signer=signer)
    return authenticated_client
Aqui estão dois exemplos de como você pode usar o código:
import oci
token_path = get_token_path(spark)

# Get an object storage client.
object_storage_client = get_authenticated_client(token_path, oci.object_storage.ObjectStorageClient)
...

# Retrieve a password using the secrets client.
import base64
password_ocid = "my_password_ocid"
secrets_client = get_authenticated_client(token_path, oci.secrets.SecretsClient)
response = secrets_client.get_secret_bundle(password_ocid)
base64_secret_content = response.data.secret_bundle_content.content
base64_secret_bytes = base64_secret_content.encode("ascii")
base64_message_bytes = base64.b64decode(base64_secret_bytes)
password = base64_message_bytes.decode("ascii")
...

Observe o padrão usado para criar clientes autenticados. Se você informar o caminho do token e o nome da classe do cliente que deseja para get_authenicated_client, um cliente autenticado será retornado. Essa abordagem funciona com qualquer Cliente fornecido pelo Oracle Cloud Infrastructure Python SDK.

O caminho do token deve ser fornecido para qualquer operação map, foreach ou foreachPartition. Não é possível procurar o caminho do token nos Executores Spark.

O Que Vem a Seguir

O Java SDK tem muitos exemplos que você pode usar para ajudar a desenvolver a integração do Oracle Cloud Infrastructure em seu aplicativo Spark.

O Oracle Cloud Infrastructure Python SDK tem muitos exemplos que você pode usar para ajudar a desenvolver a integração do Oracle Cloud Infrastructure em seu aplicativo Spark.