カスタム・アプリケーションからOCIデータ・カタログへの系統情報の公開

このチュートリアルでは、データ系統をデータ・カタログにプッシュおよび公開するデータ処理アプリケーションを設定します。主なタスクは次のとおりです:

  • データ・カタログ
  • データ系統の受入の設定。
  • Openlineage-sparkプラグインをSparkアプリケーションに追加してデータ系統を生成します
  • データ系統をデータ・カタログに公開します。

データ系統は、データ・ソースから消費までデータが流れる過程を示します。メタデータを通じて、データ・コンシューマは、データ・パイプラインでデータが通過した変換を理解および可視化できます。

データ系統は、あらゆるデータ処理アプリケーション/サービスで生成できます。ほとんどの標準データ処理アプリケーションは、データ系統の生成と公開をサポートしています。

データ・カタログには、次のような様々なサービスからデータ系統を取得するためのオプションがあります。

データ系統を取得するための標準演習

データ系統の収集および分析の標準の理解の詳細は、OpenLineageを参照してください

OpenLineage仕様

OpenLineage仕様に従って、データ処理アプリケーションによって生成されるデータ系統は次のように表されます。

OpenLineageフォーマット ソースを展開

{

    "eventTime": "2019-08-24T14:15:22Z",
    "producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
    "schemaURL": "https://openlineage.io/spec/0-0-1/OpenLineage.json",
    "eventType": "START|RUNNING|COMPLETE|ABORT|FAIL|OTHER",
    "run": {
        "runId": "78c33d18-170c-44d3-a227-b3194f134f73",
        "facets": {
            "property1": {
                "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
            },
            "property2": {
                "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
            }
        }
    },
    "job": {
        "namespace": "my-scheduler-namespace",
        "name": "myjob.mytask",
        "facets": {
            "property1": {
                "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
                "_deleted": true
            },
            "property2": {
                "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
                "_deleted": true
            }
        }
    },
    "inputs": [
        {
            "namespace": "my-datasource-namespace",
            "name": "instance.schema.table",
            "facets": {
                "property1": {
                    "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                    "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
                    "_deleted": true
                },
                "property2": {
                    "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                    "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
                    "_deleted": true
                }
            },
            "inputFacets": {
                "property1": {
                    "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                    "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
                },
                "property2": {
                    "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                    "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
                }
            }
        }
    ],
    "outputs": [
        {
            "namespace": "my-datasource-namespace",
            "name": "instance.schema.table",
            "facets": {
                "property1": {
                    "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                    "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
                    "_deleted": true
                },
                "property2": {
                    "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                    "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
                    "_deleted": true
                }
            },
            "outputFacets": {
                "property1": {
                    "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                    "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
                },
                "property2": {
                    "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                    "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
                }
            }
        }
    ]
}

開始する前に

このチュートリアルを正常に実行するには、次のものが必要です:

アカウントに対する管理権限がある場合は、この項の残りの部分をスキップします。それ以外の場合は、管理者に連絡して、次のポリシーをアカウントに追加してください:
allow group <the-group-your-username-belongs> to manage all-resources in compartment catalog-compartment

その他の例については、共通ポリシーを参照してください。

ノート

次の項では、catalog-compartmentというデータ・カタログ・インスタンスのコンパートメントを作成します。
データ系統を受け入れるためのデータ・カタログの設定

この項では、データ系統をデータ・カタログにプッシュするデータ処理アプリケーションを設定します。

次のタスクは、Sparkアプリケーションの例です。
  1. ナビゲーション・メニューを開き、「アナリティクスとAI」を選択します。「データ・レイク」で、「データ・カタログ」を選択します。
  2. 次の情報を入力します:
    • 名前: Lineage - Sales application
  3. 「タイプ」で、「カスタム系統プロバイダ」を選択します。
  4. 「作成」を選択します。

SparkアプリケーションへのOpenlineage-sparkプラグインの追加によるデータ系統の生成

Openlineageは、spark-contextにバインドし、そこからデータ系統を生成するApacheスパーク・プラグインを提供します。このプラグインは、データ系統をデータ・カタログに公開するために拡張できます。次のスニペットは、プラグインを起動するためのプラグイン拡張コードおよびspark-submitオプションを提供します。

プラグインPOM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>OpenLineageExtension</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <openlineage.version>1.8.0</openlineage.version>
        <oci.sdk.version>3.41.2</oci.sdk.version>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/io.openlineage/openlineage-spark -->
        <dependency>
            <groupId>io.openlineage</groupId>
            <artifactId>openlineage-spark</artifactId>
            <version>${openlineage.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.30</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.13</artifactId>
            <version>3.5.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.13.12</version>
        </dependency>

        <dependency>
            <groupId>com.oracle.oci.sdk</groupId>
            <artifactId>oci-java-sdk-datacatalog</artifactId>
            <version>${oci.sdk.version}</version>
        </dependency>

        <dependency>
            <groupId>com.oracle.oci.sdk</groupId>
            <artifactId>oci-java-sdk-common</artifactId>
            <version>${oci.sdk.version}</version>
        </dependency>

        <dependency>
            <groupId>com.oracle.oci.sdk</groupId>
            <artifactId>oci-java-sdk-common-httpclient-jersey</artifactId>
            <version>${oci.sdk.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.0.2</version>
                <configuration>
                    <source>11</source>
                    <target>11</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

Openlineageプラグイン拡張機能- OciConfig

package io.openlineage.client.transports;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.With;

@NoArgsConstructor
@AllArgsConstructor
@ToString
@With
public final class OciConfig implements TransportConfig {

    @Getter
    @Setter
    private String catalogId;
    @Getter
    @Setter
    private String dataAssetKey;
    @Getter
    @Setter
    private String authType;
    @Getter
    @Setter
    private String authProfile;
    @Getter
    @Setter
    private String endpoint;
}

Openlineageプラグイン拡張機能- OciTransport

package io.openlineage.client.transports;

import com.oracle.bmc.auth.AuthenticationDetailsProvider;
import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider;
import com.oracle.bmc.auth.SessionTokenAuthenticationDetailsProvider;
import com.oracle.bmc.datacatalog.DataCatalogClient;
import com.oracle.bmc.datacatalog.model.ImportLineageDetails;
import com.oracle.bmc.datacatalog.requests.ImportLineageRequest;
import com.oracle.bmc.datacatalog.responses.ImportLineageResponse;
import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineageClientUtils;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OciTransport extends Transport {

    private static final Logger log = LoggerFactory.getLogger(OciTransport.class);
    private final OciConfig config;
    private final String SESSION_TOKEN_AUTH = "security_token";
    private final boolean isValidConfig;

    public OciTransport(final OciConfig ociConfig) {
        config = ociConfig;
        isValidConfig = isValidConfig();
    }

    /***
     * Selectively emit only complete events to OCI Data Catalog
     * @param runEvent
     */
    @Override
    public void emit(OpenLineage.RunEvent runEvent) {
        if (runEvent.getEventType().equals(OpenLineage.RunEvent.EventType.COMPLETE)) {
            log.info("Found event type - Complete. Emitting event using OciTransport");
            final String eventAsJson = OpenLineageClientUtils.toJson(runEvent);
            emit(eventAsJson);
        } else {
            log.info("Found event type - {}. Skipping emission",runEvent.getEventType());
        }
    }

    @SneakyThrows
    @Override
    public void emit(String eventAsJson) {
        if (!isValidConfig) {
            log.info("OCI config is not valid. Please update OCI config. Skipping lineage emission.");
            return;
        }
        AuthenticationDetailsProvider provider;
        if (config.getAuthType()!=null && config.getAuthType().equals(SESSION_TOKEN_AUTH)) {
            provider = new SessionTokenAuthenticationDetailsProvider(config.getAuthProfile());
        } else {
            provider = new ConfigFileAuthenticationDetailsProvider(config.getAuthProfile());
        }

        try (DataCatalogClient client = DataCatalogClient.builder().build(provider)) {
            if (config.getEndpoint() != null && !config.getEndpoint().isBlank()) {
                client.setEndpoint(config.getEndpoint());
            }
            ImportLineageRequest importLineageRequest = ImportLineageRequest
                .builder()
                .importLineageDetails(ImportLineageDetails
                    .builder()
                    .lineagePayload(eventAsJson.getBytes())
                    .build())
                .dataAssetKey(config.getDataAssetKey())
                .catalogId(config.getCatalogId())
                .build();
            ImportLineageResponse response = client.importLineage(importLineageRequest);
            log.info("Pushed lineage event to catalog - {}, opc-request-id = {}", config.getCatalogId(), response.getOpcRequestId());
        } catch (Exception ex) {
            log.warn("Failed to push lineage {}", ex.getMessage(), ex);
        }
    }

    private boolean isValidConfig() {
        log.info("OCI Config - {}", config.toString());
        if (config.getAuthProfile() == null || config.getAuthProfile().isBlank()) {
            log.info("OCI config is missing Authentication profile. Lineage will not be emitted to OCI Data Catalog. " +
                "Check documentation on how to create Authentication profile.");
            return false;
        }
        if (config.getCatalogId() == null || config.getCatalogId().isBlank()) {
            log.info("OCI config is missing CatalogId. Lineage will not be emitted to OCI Data Catalog. " +
                "Check documentation on how to get CatalogId.");
            return false;
        }
        if (config.getDataAssetKey() == null || config.getDataAssetKey().isBlank()) {
            log.info("OCI config is missing DataAssetResourceId. Lineage will not be emitted to OCI Data Catalog. " +
                "Check documentation on how to create DataAsset and get its DataAssetResourceId");
            return false;
        }
        return true;
    }
}

Openlineageプラグイン拡張機能- OciTransportBuilder

package io.openlineage.client.transports;

public class OciTransportBuilder implements TransportBuilder {

    @Override
    public String getType() {
        return "oci";
    }

    @Override
    public TransportConfig getConfig() {
        return new OciConfig();
    }

    @Override
    public Transport build(TransportConfig config) {
        return new OciTransport((OciConfig) config);
    }
}

Spark Submitオプション

--packages "io.openlineage:openlineage-spark:1.8.0,com.oracle.oci.sdk:oci-java-sdk-common:{oci-sdk-version},com.oracle.oci.sdk:oci-java-sdk-common-httpclient-jersey:{oci-sdk-version},com.oracle.pic.dcat:datacatalog-java-client:{oci-sdk-version},io.openlineage:openlineage-oci-extension:{generated-from-above-mentioned-code-snippet}"
--conf "spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener"
--conf "spark.openlineage.debugFacet=enabled"
--conf "spark.openlineage.transport.type=oci"
--conf "spark.openlineage.transport.catalogId={catalog instance OCID}"
--conf "spark.openlineage.transport.dataAssetKey={UUID of the dataAsset registered in Data catalog}"
--conf "spark.openlineage.transport.authProfile={user authentication profile}"
--conf "spark.openlineage.transport.authType={optionally specify security_token as auth type}"
--conf "spark.openlineage.application.name={application name to display on catalog UI}"

データ系統をデータ・カタログに公開

データ系統をデータ・カタログに公開するには、データ処理アプリケーションが実行されているシステムで、IAMユーザーを作成し、このユーザーの認証を設定する必要があります。このドキュメントでは、OCIサービスに接続するための使用可能な認証方法の詳細を示します。ユーザーに対して次のIAMポリシーを指定します

allow group lineage-group to CATALOG_LINEAGE_IMPORT in tenancy where all {target.catalog.id = <catalog-ocid>, target.data-asset.key=<data-asset-key>}
他のアプリケーションを使用したデータ・カタログへのデータ系統の公開

エコシステム内のその他のデータ処理アプリケーションについては、前のステップに従います。

データ系統生成ステップのみが、データ処理アプリケーションによって異なります。Openlineage統合ページを参照して、アプリケーションにOpenlineageプラグインがすでに存在するかどうかを確認できます。それ以外の場合は、独自の実装を記述して、データ系統ペイロードをOpenlineage形式で生成できます。OCIデータ・カタログ・サービスのimportLineageエンドポイントをコールするサンプル・コードを次に示します。

ImportLineage - Java API

try (DataCatalogClient client = DataCatalogClient.builder().build(provider)) {
    ImportLineageRequest importLineageRequest = ImportLineageRequest
        .builder()
        .importLineageDetails(ImportLineageDetails
            .builder()
            .lineagePayload(payload.getBytes())
            .build())
        .dataAssetKey(dataAssetKey)
        .catalogId(catalogOcid)
        .build();
    ImportLineageResponse response = client.importLineage(importLineageRequest);
} catch (Exception ex) {
    log.warn("Failed to push lineage {}", ex.getMessage(), ex);
}

ImportLineage - Python API

import oci

# Create a default config using DEFAULT profile in default location
# Refer to
# https://docs.cloud.oracle.com/en-us/iaas/Content/API/Concepts/sdkconfig.htm#SDK_and_CLI_Configuration_File
# for more info
config = oci.config.from_file()


# Initialize service client with default config file
data_catalog_client = oci.data_catalog.DataCatalogClient(config)


# Send the request to service, some parameters are not required, see API
# doc for more info
import_lineage_response = data_catalog_client.import_lineage(
    catalog_id="ocid1.test.oc1..<unique_ID>EXAMPLE-catalogId-Value",
    data_asset_key="EXAMPLE-dataAssetKey-Value",
    import_lineage_details=oci.data_catalog.models.ImportLineageDetails(
        lineage_payload="openlineage-payload"),
    opc_retry_token="EXAMPLE-opcRetryToken-Value",
    opc_request_id="AOR2TTPR06HMXDHS3NZU<unique_ID>")

# Get the data from response
print(import_lineage_response.data)