Oracle Cloud Infrastructureドキュメント

オブジェクト・ストレージ用HDFSコネクタ

Hadoop Distributed File System (HDFS) Connectorでは、Apache HadoopアプリケーションがOracle Cloud Infrastructure Object Storageサービスとの間でデータを読み書きできるようになります。

このSDKとサンプルは、Universal Permissive License 1.0とApache License 2.0の下でデュアル・ライセンスです。サード・パーティ・コンテンツは、コードの説明に従って個別にライセンスされています。

要件

HDFSコネクタを使用するには、次のものが必要です:

資格証明とパスワード

暗号化されたPEMファイルを資格証明に使用する場合、パスフレーズはgetPassword Hadoop Configurationメソッドを使用して構成から読み取られます。 getPasswordオプションは、登録されたセキュリティ・プロバイダのパスワードをチェックします。 セキュリティ・プロバイダにリクエストされたキーが含まれていない場合は、構成ファイルから直接プレーンテキストのパスフレーズを読み込むことになります。

DNSの名前検索のためのJVM TTLの構成

Java仮想マシン(JVM)は、time-to-live (TTL)と呼ばれる一定時間、ルックアップからのDNSレスポンスをキャッシュします。 これにより、頻繁な名前解決が必要なコードでのレスポンス時間が短縮されます。

JVMは、networkaddress.cache.ttlプロパティを使用して、DNS名前ルックアップのキャッシング・ポリシーを指定します。 値は、成功したルックアップをキャッシュする秒数を表す整数です。 多くのJVMのデフォルト値-1は、ルックアップを永続的にキャッシュする必要があることを示します。

Oracle Cloud Infrastructureのリソースは変更可能なDNS名を使用するため、TTL値を60秒に変更することをお勧めします。 これにより、リソースの新しいIPアドレスが次のDNS問合せで返されます。 アプリケーションに対してこの値をグローバルに、または具体的に変更することができます:

  • JVMを使用するすべてのアプリケーションでTTLをグローバルに設定するには、$JAVA_HOME/jre/lib/security/java.securityファイルに次の行を追加します:

    networkaddress.cache.ttl=60
  • アプリケーションでのみTTLを設定するには、アプリケーションの初期化コードで次のように設定します:

    java.security.Security.setProperty("networkaddress.cache.ttl" , "60");

インストール

バンドルしたjarをlibおよびthird-party/libからHadoopクラスタの各ノードにコピーして、HadoopのCLASSPATHに含められるようにします。

JavaおよびMavenアーティファクト用SDK

HDFSコネクタの構築は、Java用のSDKで提供されるMavenアーティファクトに依存します。 アーティファクトを取得するには、「Java用のSDKのダウンロード」を実行してローカルに構築する必要があります。 これで、HDFSコネクタを構築できます。

重要

「「Oracleリリース」ページ」からダウンロードするJavafileバージョンのSDKは、HDFSコネクタ・バージョンと一致する必要があります。このバージョンは、groupId属性を持つ依存性タグ・ブロックでhdfs-connector/pom.xmlファイルを検索できます。

HDFS ConnectorおよびMavenのアーティファクト

HDFS Connectorは、「Maven中部」およびJCenterで入手できます。

プロジェクトでHDFS Connectorを使用するには、次のプロジェクト依存性をインポートします。 例えば:

<dependency>
  <groupId>com.oracle.oci.sdk</groupId>
  <artifactId>oci-hdfs-connector</artifactId>
  <!-- Replace the version below with your required version -->
  <version>2.9.2.0</version>
</dependency>

 

プロパティ

core-site.xmlファイルには、次のHDFSコネクタ・プロパティを設定できます。 BmcPropertiesページには、オブジェクト・ストレージへの接続に対して構成できる追加のプロパティがリストされます。

.<bucket_name>.<namespace_name>をプロパティ名に追加することによって、プロパティ値を特定のバケットに適用するように指定することができます。

この例では、プロパティをcore-site.xmlファイルでどのように構成できるかを示しています(OCIDは簡潔にするために短縮されています):

<configuration>
...
  <property>
    <name>fs.oci.client.hostname</name>
    <value>https://objectstorage.us-ashburn-1.oraclecloud.com</value>
  </property>
  <property>
    <name>fs.oci.client.hostname.myBucket.myNamespace</name>
    <value>https://objectstorage.phoenix-1.oraclecloud.com</value><!-- Use Phoenix for myBucket@myNamespace -->
  </property>
  <property>
    <name>fs.oci.client.auth.tenantId</name>
    <value>ocid1.tenancy.oc1..aaaaaaaaba3pv6wkcr4j...stifsfdsq</value> 
  </property>
  <property>
    <name>fs.oci.client.auth.userId</name>
    <value>ocid1.user.oc1..aaaaaaaat5nvwcnazjc...aqw3rynjq</value>
  </property>
  <property>
    <name>fs.oci.client.auth.fingerprint</name>
    <value>20:3b:97:13:55:1c:5b:0d:d3:37:d8:50:4e:c5:3a:34</value>
  </property>
  <property>
    <name>fs.oci.client.auth.pemfilepath</name>
    <value>~/.oci/oci_api_key.pem</value>
  </property>
...
</configuration>

認証にインスタンス主体を使用

Oracleは「インスタンス・プリンシパル」を提供しているため、ユーザー資格証明を構成したり、インスタンス上で実行されているサービスにPEMファイルを提供する必要はありません。 これらの各インスタンスは独自のアイデンティティを持ち、インスタンス・プリンシパルによってインスタンスに追加された証明書を使用して認証されます。

HDFSコネクタでインスタンス・プリンシパル認証を使用するには、プロパティfs.oci.client.custom.authenticatorを入力し、値をcom.oracle.bmc.hdfs.auth.InstancePrincipalsCustomAuthenticatorに設定するだけです。

インスタンス・プリンシパルを使用すると、インスタンスにカスタム・オーセンティケータが提供されるため、次のプロパティを構成する必要はありません:

  • fs.oci.client.auth.tenantId
  • fs.oci.client.auth.userId
  • fs.oci.client.auth.fingerprint
  • fs.oci.client.auth.pemfilepath
  • fs.oci.client.auth.passphrase

次のコード例は、HDFSコネクタでの認証にインスタンス・プリンシパルを使用する方法を示しています:

<?xml version="1.0"?>
<configuration>
  <property>
    <name>fs.oci.client.hostname</name>
    <value>https://objectstorage.us-phoenix-1.oraclecloud.com</value>
  </property>
  <property>
    <name>fs.oci.client.custom.authenticator</name>
    <value>com.oracle.bmc.hdfs.auth.InstancePrincipalsCustomAuthenticator</value>
  </property>
</configuration>

インスタンス・プリンシパルの詳細については、「アイデンティティおよびアクセス管理のインスタンス・プリンシパルのお知らせ」を参照してください。

HTTPプロキシの構成

core-site.xmlファイルで次のオプションのプロパティを設定して、HTTPプロキシを構成できます:

ノート

プロキシを構成すると、オブジェクト・ストレージへの接続時にApacheConnectorProviderを使用できるようになります。
サイズはリクエストをメモリーにバッファリングし、ラージ・オブジェクトのアップロード時にメモリー使用率に影響を与えます。 マルチパートのアップロードを有効にし、マルチパートのプロパティを調整してメモリー使用量を管理することをお薦めします。

ラージ・オブジェクトのアップロード

大きなオブジェクトは、マルチパート・アップロードを使用してオブジェクト・ストレージにアップロードされます。 ファイルは並列にアップロードされる小さな部分に分割され、アップロード時間が短縮されます。 これにより、HDFSコネクタはアップロード全体に失敗するのではなく、失敗したパーツのアップロードを再試行することもできます。 ただし、アップロードが一時的に失敗し、コネクタが部分的にアップロードされたファイルを中止しようとします。 これらのファイルが累積されて(およびストレージの請求が行われる)ため、定期的にアップロードをリストし、特定の日数が経過するとJavaのSDKを使用して手動で中断されます。

マルチパートのアップロードを管理するためのオブジェクト・ストレージAPIの使用については、「APIドキュメント」を参照してください。

ノート

マルチパートのアップロードを使用しない場合は、fs.oci.client.multipart.allowedプロパティをfalseに設定することによって無効にできます。

ベスト・プラクティス

以下のセクションでは、使用方法とパフォーマンスを最適化するためのベスト・プラクティスについて説明します。

ディレクトリ名

オブジェクト・ストレージには実際のディレクトリはありません。 ディレクトリのグループ化は、オブジェクトが名前に/デリミタを使用する命名規則の機能です。 たとえば、a/example.jsonという名前のオブジェクトは、aという名前のディレクトリが存在することを意味します。 ただし、そのオブジェクトが削除されると、aディレクトリも暗黙的に削除されます。 ファイルが存在しないディレクトリが存在する可能性のあるファイルシステムの意味を保持するために、HDFSコネクタは実際のオブジェクトを作成します。実際のオブジェクトの名前は/で終わり、パスはディレクトリです(例:a/というオブジェクトを作成します)。 今、a/オブジェクトがその存在を維持するので、a/example.jsonを削除しても、aディレクトリの存在には影響しません。 ただし、その下のファイル/ディレクトリを削除せずに誰かがそのa/オブジェクトを削除する可能性は全くあります。 HDFSコネクタは、そのパスの下にオブジェクトがない場合にのみフォルダ・オブジェクトを削除します。 フォルダ・オブジェクト自体はゼロ・バイトです。

矛盾したファイルシステム

ディレクトリを削除するとは、そのディレクトリを表す接頭辞で始まるすべてのオブジェクトを削除することです。 HDFSでは、ファイルまたはディレクトリのファイル・ステータスを問合せできます。 ディレクトリのファイル・ステータスは、そのディレクトリのフォルダ・オブジェクトが存在することを確認することによって実装されます。 ただし、フォルダ・オブジェクトが削除されている可能性はありますが、その接頭辞を持つオブジェクトの一部がまだ存在しています。 たとえば、これらのオブジェクトの状況では、次のようになります:

  • a/b/example.json
  • a/b/file.json
  • a/b/

HDFSは、ディレクトリ/a/b/が存在し、ディレクトリであることを知り、それを走査すると、example.jsonfile.jsonとなります。 しかし、オブジェクトa/b/が削除された場合、ファイルシステムは不整合な状態に見えます。 ディレクトリ/a/b/内のすべてのファイルを検索して2つのエントリを見つけることができますが、実際の/a/b/ディレクトリのステータスを問合せすると、ディレクトリが存在しないため例外が発生します。 HDFSコネクタは、ファイルシステムの状態を修復しようとしません。

ファイルの作成

オブジェクト・ストレージは、数ギガバイトのサイズのオブジェクトをサポートしています。 ファイルを作成するには、通常、一時ファイルに書き込んだ後、ストリームが閉じられたときにファイルの内容をアップロードします。 一時的なスペースは、複数のアップロードを処理するのに十分な大きさでなければなりません。 使用される一時ディレクトリは、hadoop.tmp.dir構成プロパティによって制御されます。

リード/シークのサポート

メモリー内バッファが有効になっている場合(fs.oci.io.read.inmemory)、ファイル全体がバイト配列にバッファリングされるため、シークが完全にサポートされます。 メモリー内バッファが有効でない場合(オブジェクト・サイズが大きい可能性が高いため)、ストリームを閉じて指定されたオフセットから新しい範囲リクエストを作成することによってシークが実装されます。

ディレクトリ・リスティング

ディレクトリをリストすることは、本質的に接頭辞と区切り文字が指定されたListバケット操作です。 各キーのHDFS FileStatusインスタンスを作成するために、コネクタは追加のHEADリクエストを実行して、個々のキーごとにObjectMetadataを取得します。 これは、オブジェクト・ストレージがより豊富なリスト操作データをサポートするまで必要です。

ファイルシステムとファイルのURI形式

HDFSファイルシステムとファイルはURIによって参照されます。 スキームはファイルシステムのタイプを指定し、URIの残りの部分は、ファイルシステムの実装が望みどおりに解釈するためにほとんど自由です。

オブジェクト・ストレージはオブジェクト・ストアであるため、ファイルシステム内のファイルであるかのようにオブジェクトに名前を付ける機能は、実際のファイルシステムを模倣するために使用されます。

ルート

オブジェクト・ストレージ・ファイルシステムのルートは、次のように、権限コンポーネントにバケット名とネームスペース名が含まれるパスで示されます:

ノート

例では、"MyBucket"と"MyNamespace"はプレースホルダーであり、適切な値に置き換えてください。

oci://MyBucket@MyNamespace/
		

これは常にファイルシステムのルートです。 バケットとネームスペースの両方に権限を使用する理由は、HDFSは権限部がファイルシステムのどこにあるかだけを判断できるためです。パス部分はリソースへのパスだけを示しています(例えば、"oci//MyNamespace/MyBucket"は動作しません)。 @文字は、バケットまたはネームスペースの有効な文字ではないため、権限を正しく解析できるようにする必要があります。

Sub-directories

サブディレクトリは実際には存在しませんが、/文字でオブジェクトを作成することによって模倣することができます。 たとえば、a/b/c/example.jsona/b/d/path.jsonの2つのファイルは、共通のディレクトリa/bにあるかのように表示されます。 これは、オブジェクト・ストレージ接頭辞と区切り文字ベースの問合せを使用することで実現できます。 与えられた例では、サブディレクトリをURIとして参照すると次のようになります:

oci://MyBucket@MyNamespace/a/b/
		

Objects/Files

a/b/c/example.jsonという名前のオブジェクトは、次のように参照されます:

oci://MyBucket@MyNamespace/a/b/c/example.json
		

ロギング

コネクタのログインはSLF4Jを介して行われます。 SLF4Jは、ユーザー提供のロギング・ライブラリ(log4jなど)の使用を可能にするロギング抽象化です。 詳細は、「SLF4Jマニュアル」を参照してください。

次の例は、標準出力に基本ログを有効にする方法を示しています。

  1. SLF4J Simpleバインディング・ジャーをダウンロード: SLF4J単純バインディング
  2. jarファイルをクラスパスに追加
  3. 次のVM引数を追加して、デバッグ・レベルのロギングを有効にします(デフォルトでは、infoレベルが使用されます): -Dorg.slf4j.simpleLogger.defaultLogLevel=debug

log4jバインディングを使用すると、より高度なロギング・オプションを構成できます。

サンプルのHadoopジョブ

hadoop_sample_hdfs:


package com.oracle.oci.hadoop.example;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.oracle.oci.hdfs.BmcFilesystem;

import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
public class SampleOracleBmcHadoopJob
{
    private static final String SAMPLE_JOB_PATH = "/samplehadoopjob";
    private static final String INPUT_FILE = SAMPLE_JOB_PATH + "/input.dat";
    private static final String OUTPUT_DIR = SAMPLE_JOB_PATH + "/output";

    // non-static since this is the runner class it needs to initialize after we set the properties
    private final Logger log = LoggerFactory.getLogger(SampleOracleBmcHadoopJob.class);

    /**
     * Runner for sample hadoop job. This expects 3 args: path to configuration file, Object Store namespace, Object
     * Store bucket. To run this, you must:
     *{@code 


         * 
    Create a standard hadoop configuration file

         * 
    Create the bucket ahead of time.

         *} 


     * This runner will create a test input file in a file '/samplehadoopjob/input.dat', and job results will be written
     * to '/samplehadoopjob/output'.
     * 
     * @param args
     *            1) path to configuration file, 2) namespace, 3) bucket
     * @throws Exception
     */
    public static void main(final String[] args) throws Exception
    {
        if (args.length != 3)
        {
            throw new IllegalArgumentException(
                    "Must have 3 args: 1) path to config file, 2) object storage namespace, 3) object storage bucket");
        }

        // redirect all logs to sysout
        System.setProperty("org.slf4j.simpleLogger.logFile", "System.out");
        System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "debug");

        final SampleOracleBmcHadoopJob job = new SampleOracleBmcHadoopJob(args[0], args[1], args[2]);
        System.exit(job.execute());
    }

    private final String configurationFilePath;
    private final String namespace;
    private final String bucket;

    public int execute() throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException
    {
        log.info("Creating hadoop configuration");
        final Configuration configuration = this.createConfiguration(this.configurationFilePath);

        final String authority = this.bucket + "@" + this.namespace;
        final String uri = "oci://" + authority;
        log.info("Using uri: {}", uri);

        log.info("Creating job inputs");
        this.setup(uri, configuration);

        log.info("Creating job");
        final Job job = this.createJob(configuration);

        final String in = uri + INPUT_FILE;
        final String out = uri + OUTPUT_DIR;
        log.info("Using input: {}", in);
        log.info("Using output: {}", out);

        FileInputFormat.addInputPath(job, new Path(in));
        FileOutputFormat.setOutputPath(job, new Path(out));

        log.info("Executing job...");
        final int response = job.waitForCompletion(true) ? 0 : 1;

        log.info("Attempting to read job results");
        this.tryReadResult(uri, configuration);
        return response;
    }

    private Configuration createConfiguration(final String configFilePath)
    {
        final Configuration configuration = new Configuration();
        configuration.addResource(new Path(configFilePath));
        return configuration;
    }

    private void setup(final String uri, final Configuration configuration) throws IOException, URISyntaxException
    {
        try (final BmcFilesystem fs = new BmcFilesystem())
        {
            fs.initialize(new URI(uri), configuration);
            fs.delete(new Path(SAMPLE_JOB_PATH), true);
            final FSDataOutputStream output = fs.create(new Path(INPUT_FILE));
            output.writeChars("example\npath\ngak\ntest\nexample\ngak\n\ngak");
            output.close();
        }
    }

    private Job createJob(final Configuration configuration) throws IOException
    {
        final Job job = Job.getInstance(configuration, "word count");
        job.setJarByClass(SampleOracleBmcHadoopJob.class);
        job.setMapperClass(SimpleMapper.class);
        job.setCombinerClass(SimpleReducer.class);
        job.setReducerClass(SimpleReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        return job;
    }

    private void tryReadResult(final String uri, final Configuration configuration)
            throws IOException, URISyntaxException
    {
        try (final BmcFilesystem fs = new BmcFilesystem())
        {
            fs.initialize(new URI(uri), configuration);
            // this should be the output file name, but that could change
            final FSDataInputStream input = fs.open(new Path(OUTPUT_DIR + "/part-r-00000"));

            final ByteArrayOutputStream baos = new ByteArrayOutputStream();
            IOUtils.copy(input, baos);
            log.info("\n=====\n" + baos.toString() + "=====");
            input.close();
        }
    }
}


package com.oracle.oci.hadoop.example;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SimpleMapper extends Mapper
{
    private final static IntWritable one = new IntWritable(1);
    private final Text word = new Text();

    @Override
    public void map(final Object key, final Text value, final Context context) throws IOException, InterruptedException
    {
        final StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens())
        {
            this.word.set(itr.nextToken());
            context.write(this.word, one);
        }
    }
}


package com.oracle.oci.hadoop.example;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SimpleReducer extends Reducer
{
    private final IntWritable result = new IntWritable();

    @Override
    public void reduce(final Text key, final Iterable values, final Context context)
            throws IOException, InterruptedException
    {
        int sum = 0;
        for (final IntWritable val : values)
        {
            sum += val.get();
        }
        this.result.set(sum);
        context.write(key, this.result);
    }
}

トラブルシューティング

ここでは、HDFSコネクタのトラブルシューティング情報について説明します。

サービス・エラーのトラブルシューティング

サービス・エラーを引き起こす操作は、HDFSコネクタによってcom.oracle.bmc.model.BmcException型の例外がスローされる原因となります。 OCIから返される一般的なサービス・エラーの詳細は、「APIエラー」を参照してください。

Java暗号化キー・サイズ・エラー

HDFSコネクタは、キー長が128ビット以下のキーのみを処理できます。 AES256などの長いキーを使用すると、無効なキー例外および不正なキー・サイズ・エラーが発生します。 この問題を解決するには、次のいずれかの回避策を使用します:

貢献

バグの修正や貢献したい新機能がありますか? SDKはソースをオープンし、GitHub上で「プル・リクエストの受入れ」をオープンしています。

通知

新しいバージョンのHDFSコネクタがリリースされたときに通知を受けたい場合は、「Atomフィード」をサブスクライブしてください。

質問またはフィードバック

連絡方法: