カスタム・アプリケーションからOCIデータ・カタログへの系統情報の公開
このチュートリアルでは、データ系統をデータ・カタログにプッシュおよび公開するデータ処理アプリケーションを設定します。主なタスクは次のとおりです:
- データ・カタログ
- データ系統の受入の設定。
Openlineage-sparkプラグインをSparkアプリケーションに追加してデータ系統を生成します- データ系統をデータ・カタログに公開します。
データ系統は、データ・ソースから消費までデータが流れる過程を示します。メタデータを通じて、データ・コンシューマは、データ・パイプラインでデータが通過した変換を理解および可視化できます。
データ系統は、あらゆるデータ処理アプリケーション/サービスで生成できます。ほとんどの標準データ処理アプリケーションは、データ系統の生成と公開をサポートしています。
データ・カタログには、次のような様々なサービスからデータ系統を取得するためのオプションがあります。
- Data Integration
- データ・フロー
- Data Integration
- 独自のカスタム・データ処理アプリケーション
データ系統を取得するための標準演習
データ系統の収集および分析の標準の理解の詳細は、OpenLineageを参照してください
OpenLineage仕様
OpenLineage仕様に従って、データ処理アプリケーションによって生成されるデータ系統は次のように表されます。
OpenLineageフォーマット ソースを展開
{
"eventTime": "2019-08-24T14:15:22Z",
"producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"schemaURL": "https://openlineage.io/spec/0-0-1/OpenLineage.json",
"eventType": "START|RUNNING|COMPLETE|ABORT|FAIL|OTHER",
"run": {
"runId": "78c33d18-170c-44d3-a227-b3194f134f73",
"facets": {
"property1": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
},
"property2": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
}
}
},
"job": {
"namespace": "my-scheduler-namespace",
"name": "myjob.mytask",
"facets": {
"property1": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
"_deleted": true
},
"property2": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
"_deleted": true
}
}
},
"inputs": [
{
"namespace": "my-datasource-namespace",
"name": "instance.schema.table",
"facets": {
"property1": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
"_deleted": true
},
"property2": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
"_deleted": true
}
},
"inputFacets": {
"property1": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
},
"property2": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
}
}
}
],
"outputs": [
{
"namespace": "my-datasource-namespace",
"name": "instance.schema.table",
"facets": {
"property1": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
"_deleted": true
},
"property2": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
"_deleted": true
}
},
"outputFacets": {
"property1": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
},
"property2": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
}
}
}
]
}
開始する前に
このチュートリアルを正常に実行するには、次のものが必要です:
- Oracle Cloud Infrastructureアカウント。詳細は、「無料Oracle Cloudプロモーションのリクエストおよび管理」を参照してください。
- データ・カタログ・リソースを使用するためのアクセス権。詳細は、データ・カタログ・スタート・ガイドおよびデータ・カタログ・ポリシーの作成を参照してください。
- 作成されたデータ・カタログ・インスタンス。詳細は、データ・カタログ・インスタンスの作成を参照してください。カタログ管理者である必要はありませんが、次のIAMポリシーが必要です:
allow group lineage-group to CATALOG_LINEAGE_IMPORT in tenancy where all {target.catalog.id = <catalog-ocid>, target.data-asset.key=<data-asset-key>}
allow group <the-group-your-username-belongs> to manage all-resources in compartment catalog-compartment
その他の例については、共通ポリシーを参照してください。
次の項では、
catalog-compartmentというデータ・カタログ・インスタンスのコンパートメントを作成します。この項では、データ系統をデータ・カタログにプッシュするデータ処理アプリケーションを設定します。
SparkアプリケーションへのOpenlineage-sparkプラグインの追加によるデータ系統の生成
Openlineageは、spark-contextにバインドし、そこからデータ系統を生成するApacheスパーク・プラグインを提供します。このプラグインは、データ系統をデータ・カタログに公開するために拡張できます。次のスニペットは、プラグインを起動するためのプラグイン拡張コードおよびspark-submitオプションを提供します。
プラグインPOM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>OpenLineageExtension</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<openlineage.version>1.8.0</openlineage.version>
<oci.sdk.version>3.41.2</oci.sdk.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/io.openlineage/openlineage-spark -->
<dependency>
<groupId>io.openlineage</groupId>
<artifactId>openlineage-spark</artifactId>
<version>${openlineage.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.5.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.13.12</version>
</dependency>
<dependency>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-java-sdk-datacatalog</artifactId>
<version>${oci.sdk.version}</version>
</dependency>
<dependency>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-java-sdk-common</artifactId>
<version>${oci.sdk.version}</version>
</dependency>
<dependency>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-java-sdk-common-httpclient-jersey</artifactId>
<version>${oci.sdk.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Openlineageプラグイン拡張機能- OciConfig
package io.openlineage.client.transports;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.With;
@NoArgsConstructor
@AllArgsConstructor
@ToString
@With
public final class OciConfig implements TransportConfig {
@Getter
@Setter
private String catalogId;
@Getter
@Setter
private String dataAssetKey;
@Getter
@Setter
private String authType;
@Getter
@Setter
private String authProfile;
@Getter
@Setter
private String endpoint;
}
Openlineageプラグイン拡張機能- OciTransport
package io.openlineage.client.transports;
import com.oracle.bmc.auth.AuthenticationDetailsProvider;
import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider;
import com.oracle.bmc.auth.SessionTokenAuthenticationDetailsProvider;
import com.oracle.bmc.datacatalog.DataCatalogClient;
import com.oracle.bmc.datacatalog.model.ImportLineageDetails;
import com.oracle.bmc.datacatalog.requests.ImportLineageRequest;
import com.oracle.bmc.datacatalog.responses.ImportLineageResponse;
import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineageClientUtils;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OciTransport extends Transport {
private static final Logger log = LoggerFactory.getLogger(OciTransport.class);
private final OciConfig config;
private final String SESSION_TOKEN_AUTH = "security_token";
private final boolean isValidConfig;
public OciTransport(final OciConfig ociConfig) {
config = ociConfig;
isValidConfig = isValidConfig();
}
/***
* Selectively emit only complete events to OCI Data Catalog
* @param runEvent
*/
@Override
public void emit(OpenLineage.RunEvent runEvent) {
if (runEvent.getEventType().equals(OpenLineage.RunEvent.EventType.COMPLETE)) {
log.info("Found event type - Complete. Emitting event using OciTransport");
final String eventAsJson = OpenLineageClientUtils.toJson(runEvent);
emit(eventAsJson);
} else {
log.info("Found event type - {}. Skipping emission",runEvent.getEventType());
}
}
@SneakyThrows
@Override
public void emit(String eventAsJson) {
if (!isValidConfig) {
log.info("OCI config is not valid. Please update OCI config. Skipping lineage emission.");
return;
}
AuthenticationDetailsProvider provider;
if (config.getAuthType()!=null && config.getAuthType().equals(SESSION_TOKEN_AUTH)) {
provider = new SessionTokenAuthenticationDetailsProvider(config.getAuthProfile());
} else {
provider = new ConfigFileAuthenticationDetailsProvider(config.getAuthProfile());
}
try (DataCatalogClient client = DataCatalogClient.builder().build(provider)) {
if (config.getEndpoint() != null && !config.getEndpoint().isBlank()) {
client.setEndpoint(config.getEndpoint());
}
ImportLineageRequest importLineageRequest = ImportLineageRequest
.builder()
.importLineageDetails(ImportLineageDetails
.builder()
.lineagePayload(eventAsJson.getBytes())
.build())
.dataAssetKey(config.getDataAssetKey())
.catalogId(config.getCatalogId())
.build();
ImportLineageResponse response = client.importLineage(importLineageRequest);
log.info("Pushed lineage event to catalog - {}, opc-request-id = {}", config.getCatalogId(), response.getOpcRequestId());
} catch (Exception ex) {
log.warn("Failed to push lineage {}", ex.getMessage(), ex);
}
}
private boolean isValidConfig() {
log.info("OCI Config - {}", config.toString());
if (config.getAuthProfile() == null || config.getAuthProfile().isBlank()) {
log.info("OCI config is missing Authentication profile. Lineage will not be emitted to OCI Data Catalog. " +
"Check documentation on how to create Authentication profile.");
return false;
}
if (config.getCatalogId() == null || config.getCatalogId().isBlank()) {
log.info("OCI config is missing CatalogId. Lineage will not be emitted to OCI Data Catalog. " +
"Check documentation on how to get CatalogId.");
return false;
}
if (config.getDataAssetKey() == null || config.getDataAssetKey().isBlank()) {
log.info("OCI config is missing DataAssetResourceId. Lineage will not be emitted to OCI Data Catalog. " +
"Check documentation on how to create DataAsset and get its DataAssetResourceId");
return false;
}
return true;
}
}
Openlineageプラグイン拡張機能- OciTransportBuilder
package io.openlineage.client.transports;
public class OciTransportBuilder implements TransportBuilder {
@Override
public String getType() {
return "oci";
}
@Override
public TransportConfig getConfig() {
return new OciConfig();
}
@Override
public Transport build(TransportConfig config) {
return new OciTransport((OciConfig) config);
}
}
Spark Submitオプション
--packages "io.openlineage:openlineage-spark:1.8.0,com.oracle.oci.sdk:oci-java-sdk-common:{oci-sdk-version},com.oracle.oci.sdk:oci-java-sdk-common-httpclient-jersey:{oci-sdk-version},com.oracle.pic.dcat:datacatalog-java-client:{oci-sdk-version},io.openlineage:openlineage-oci-extension:{generated-from-above-mentioned-code-snippet}"
--conf "spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener"
--conf "spark.openlineage.debugFacet=enabled"
--conf "spark.openlineage.transport.type=oci"
--conf "spark.openlineage.transport.catalogId={catalog instance OCID}"
--conf "spark.openlineage.transport.dataAssetKey={UUID of the dataAsset registered in Data catalog}"
--conf "spark.openlineage.transport.authProfile={user authentication profile}"
--conf "spark.openlineage.transport.authType={optionally specify security_token as auth type}"
--conf "spark.openlineage.application.name={application name to display on catalog UI}"
データ系統をデータ・カタログに公開
データ系統をデータ・カタログに公開するには、データ処理アプリケーションが実行されているシステムで、IAMユーザーを作成し、このユーザーの認証を設定する必要があります。このドキュメントでは、OCIサービスに接続するための使用可能な認証方法の詳細を示します。ユーザーに対して次のIAMポリシーを指定します
allow group lineage-group to CATALOG_LINEAGE_IMPORT in tenancy where all {target.catalog.id = <catalog-ocid>, target.data-asset.key=<data-asset-key>}エコシステム内のその他のデータ処理アプリケーションについては、前のステップに従います。
データ系統生成ステップのみが、データ処理アプリケーションによって異なります。Openlineage統合ページを参照して、アプリケーションにOpenlineageプラグインがすでに存在するかどうかを確認できます。それ以外の場合は、独自の実装を記述して、データ系統ペイロードをOpenlineage形式で生成できます。OCIデータ・カタログ・サービスのimportLineageエンドポイントをコールするサンプル・コードを次に示します。
ImportLineage - Java API
try (DataCatalogClient client = DataCatalogClient.builder().build(provider)) {
ImportLineageRequest importLineageRequest = ImportLineageRequest
.builder()
.importLineageDetails(ImportLineageDetails
.builder()
.lineagePayload(payload.getBytes())
.build())
.dataAssetKey(dataAssetKey)
.catalogId(catalogOcid)
.build();
ImportLineageResponse response = client.importLineage(importLineageRequest);
} catch (Exception ex) {
log.warn("Failed to push lineage {}", ex.getMessage(), ex);
}
ImportLineage - Python API
import oci
# Create a default config using DEFAULT profile in default location
# Refer to
# https://docs.cloud.oracle.com/en-us/iaas/Content/API/Concepts/sdkconfig.htm#SDK_and_CLI_Configuration_File
# for more info
config = oci.config.from_file()
# Initialize service client with default config file
data_catalog_client = oci.data_catalog.DataCatalogClient(config)
# Send the request to service, some parameters are not required, see API
# doc for more info
import_lineage_response = data_catalog_client.import_lineage(
catalog_id="ocid1.test.oc1..<unique_ID>EXAMPLE-catalogId-Value",
data_asset_key="EXAMPLE-dataAssetKey-Value",
import_lineage_details=oci.data_catalog.models.ImportLineageDetails(
lineage_payload="openlineage-payload"),
opc_retry_token="EXAMPLE-opcRetryToken-Value",
opc_request_id="AOR2TTPR06HMXDHS3NZU<unique_ID>")
# Get the data from response
print(import_lineage_response.data)