Pubblica informazioni derivazione da applicazioni personalizzate in OCI Data Catalog
In questa esercitazione è possibile impostare l'applicazione di elaborazione dati per eseguire il push e la pubblicazione della derivazione dati in Data Catalog. I task chiave includono come:
- Data Catalog
- Impostazione per l'accettazione della derivazione dati.
- Aggiungere il plugin
Openlineage-sparkall'applicazione Spark per generare la derivazione dei dati - Pubblica derivazione dati in Data Catalog.
La derivazione dei dati indica il percorso intrapreso dai dati durante il flusso dalle origini dati al consumo. Attraverso i metadati, i consumatori di dati possono comprendere e visualizzare le trasformazioni che i dati hanno attraversato nelle pipeline di dati.
La derivazione dei dati può essere generata da qualsiasi applicazione/servizio di elaborazione dati. La maggior parte delle applicazioni standard per l'elaborazione dei dati supporta la generazione e la pubblicazione della derivazione dei dati.
Data Catalog offre opzioni per acquisire la derivazione dei dati da diversi servizi, tra cui
- Data Integration
- Flusso di dati
- Data Integration
- Le tue applicazioni di elaborazione dati personalizzate
Pratica standard per l'acquisizione della derivazione dei dati
Per informazioni sulla comprensione dello standard per la raccolta e l'analisi della derivazione dei dati, vedere OpenLineage
Specifica OpenLineage
In base alla OpenLineage specificazione, la derivazione dati generata da qualsiasi applicazione di elaborazione dati è rappresentata come segue:
OpenLineage Formato Espandi origine
{
"eventTime": "2019-08-24T14:15:22Z",
"producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"schemaURL": "https://openlineage.io/spec/0-0-1/OpenLineage.json",
"eventType": "START|RUNNING|COMPLETE|ABORT|FAIL|OTHER",
"run": {
"runId": "78c33d18-170c-44d3-a227-b3194f134f73",
"facets": {
"property1": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
},
"property2": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
}
}
},
"job": {
"namespace": "my-scheduler-namespace",
"name": "myjob.mytask",
"facets": {
"property1": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
"_deleted": true
},
"property2": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
"_deleted": true
}
}
},
"inputs": [
{
"namespace": "my-datasource-namespace",
"name": "instance.schema.table",
"facets": {
"property1": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
"_deleted": true
},
"property2": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
"_deleted": true
}
},
"inputFacets": {
"property1": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
},
"property2": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
}
}
}
],
"outputs": [
{
"namespace": "my-datasource-namespace",
"name": "instance.schema.table",
"facets": {
"property1": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
"_deleted": true
},
"property2": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
"_deleted": true
}
},
"outputFacets": {
"property1": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
},
"property2": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
}
}
}
]
}
Informazioni preliminari
Per eseguire correttamente questa esercitazione, è necessario che:
- Un account Oracle Cloud Infrastructure. Per ulteriori informazioni, consulta la sezione relativa alla richiesta e gestione di promozioni gratuite su Oracle Cloud.
- Accesso per utilizzare le risorse di Data Catalog. Per ulteriori informazioni, vedere Guida introduttiva a Data Catalog e Creazione dei criteri di Data Catalog.
- Istanza di Data Catalog creata. Per ulteriori informazioni, vedere Creazione di un'istanza di Data Catalog. Non è necessario essere un amministratore del catalogo, tuttavia è richiesto il seguente criterio IAM:
allow group lineage-group to CATALOG_LINEAGE_IMPORT in tenancy where all {target.catalog.id = <catalog-ocid>, target.data-asset.key=<data-asset-key>}
allow group <the-group-your-username-belongs> to manage all-resources in compartment catalog-compartment
Per ulteriori esempi, vedere Policy comuni.
Nella sezione successiva, è possibile creare un compartimento per le istanze del Data Catalog, denominato
catalog-compartment.In questa sezione è possibile impostare le applicazioni di elaborazione dati per eseguire il push della derivazione dati in Data Catalog.
Aggiungi plugin Openlineage-spark all'applicazione Spark per generare derivazione dati
Openlineage fornisce un plugin spark Apache che si lega a spark-context e genera Data Lineage da esso. Questo plugin può essere esteso per pubblicare la derivazione dati in Data Catalog. I seguenti snippet forniscono il codice di estensione del plugin e le opzioni spark-submit per richiamare il plugin.
POM plugin
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>OpenLineageExtension</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<openlineage.version>1.8.0</openlineage.version>
<oci.sdk.version>3.41.2</oci.sdk.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/io.openlineage/openlineage-spark -->
<dependency>
<groupId>io.openlineage</groupId>
<artifactId>openlineage-spark</artifactId>
<version>${openlineage.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.13</artifactId>
<version>3.5.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.13.12</version>
</dependency>
<dependency>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-java-sdk-datacatalog</artifactId>
<version>${oci.sdk.version}</version>
</dependency>
<dependency>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-java-sdk-common</artifactId>
<version>${oci.sdk.version}</version>
</dependency>
<dependency>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-java-sdk-common-httpclient-jersey</artifactId>
<version>${oci.sdk.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Estensione del plugin Openlineage - OciConfig
package io.openlineage.client.transports;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.With;
@NoArgsConstructor
@AllArgsConstructor
@ToString
@With
public final class OciConfig implements TransportConfig {
@Getter
@Setter
private String catalogId;
@Getter
@Setter
private String dataAssetKey;
@Getter
@Setter
private String authType;
@Getter
@Setter
private String authProfile;
@Getter
@Setter
private String endpoint;
}
Estensione del plugin Openlineage - OciTransport
package io.openlineage.client.transports;
import com.oracle.bmc.auth.AuthenticationDetailsProvider;
import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider;
import com.oracle.bmc.auth.SessionTokenAuthenticationDetailsProvider;
import com.oracle.bmc.datacatalog.DataCatalogClient;
import com.oracle.bmc.datacatalog.model.ImportLineageDetails;
import com.oracle.bmc.datacatalog.requests.ImportLineageRequest;
import com.oracle.bmc.datacatalog.responses.ImportLineageResponse;
import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineageClientUtils;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class OciTransport extends Transport {
private static final Logger log = LoggerFactory.getLogger(OciTransport.class);
private final OciConfig config;
private final String SESSION_TOKEN_AUTH = "security_token";
private final boolean isValidConfig;
public OciTransport(final OciConfig ociConfig) {
config = ociConfig;
isValidConfig = isValidConfig();
}
/***
* Selectively emit only complete events to OCI Data Catalog
* @param runEvent
*/
@Override
public void emit(OpenLineage.RunEvent runEvent) {
if (runEvent.getEventType().equals(OpenLineage.RunEvent.EventType.COMPLETE)) {
log.info("Found event type - Complete. Emitting event using OciTransport");
final String eventAsJson = OpenLineageClientUtils.toJson(runEvent);
emit(eventAsJson);
} else {
log.info("Found event type - {}. Skipping emission",runEvent.getEventType());
}
}
@SneakyThrows
@Override
public void emit(String eventAsJson) {
if (!isValidConfig) {
log.info("OCI config is not valid. Please update OCI config. Skipping lineage emission.");
return;
}
AuthenticationDetailsProvider provider;
if (config.getAuthType()!=null && config.getAuthType().equals(SESSION_TOKEN_AUTH)) {
provider = new SessionTokenAuthenticationDetailsProvider(config.getAuthProfile());
} else {
provider = new ConfigFileAuthenticationDetailsProvider(config.getAuthProfile());
}
try (DataCatalogClient client = DataCatalogClient.builder().build(provider)) {
if (config.getEndpoint() != null && !config.getEndpoint().isBlank()) {
client.setEndpoint(config.getEndpoint());
}
ImportLineageRequest importLineageRequest = ImportLineageRequest
.builder()
.importLineageDetails(ImportLineageDetails
.builder()
.lineagePayload(eventAsJson.getBytes())
.build())
.dataAssetKey(config.getDataAssetKey())
.catalogId(config.getCatalogId())
.build();
ImportLineageResponse response = client.importLineage(importLineageRequest);
log.info("Pushed lineage event to catalog - {}, opc-request-id = {}", config.getCatalogId(), response.getOpcRequestId());
} catch (Exception ex) {
log.warn("Failed to push lineage {}", ex.getMessage(), ex);
}
}
private boolean isValidConfig() {
log.info("OCI Config - {}", config.toString());
if (config.getAuthProfile() == null || config.getAuthProfile().isBlank()) {
log.info("OCI config is missing Authentication profile. Lineage will not be emitted to OCI Data Catalog. " +
"Check documentation on how to create Authentication profile.");
return false;
}
if (config.getCatalogId() == null || config.getCatalogId().isBlank()) {
log.info("OCI config is missing CatalogId. Lineage will not be emitted to OCI Data Catalog. " +
"Check documentation on how to get CatalogId.");
return false;
}
if (config.getDataAssetKey() == null || config.getDataAssetKey().isBlank()) {
log.info("OCI config is missing DataAssetResourceId. Lineage will not be emitted to OCI Data Catalog. " +
"Check documentation on how to create DataAsset and get its DataAssetResourceId");
return false;
}
return true;
}
}
Estensione del plugin Openlineage - OciTransportBuilder
package io.openlineage.client.transports;
public class OciTransportBuilder implements TransportBuilder {
@Override
public String getType() {
return "oci";
}
@Override
public TransportConfig getConfig() {
return new OciConfig();
}
@Override
public Transport build(TransportConfig config) {
return new OciTransport((OciConfig) config);
}
}
Opzioni Spark Submit
--packages "io.openlineage:openlineage-spark:1.8.0,com.oracle.oci.sdk:oci-java-sdk-common:{oci-sdk-version},com.oracle.oci.sdk:oci-java-sdk-common-httpclient-jersey:{oci-sdk-version},com.oracle.pic.dcat:datacatalog-java-client:{oci-sdk-version},io.openlineage:openlineage-oci-extension:{generated-from-above-mentioned-code-snippet}"
--conf "spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener"
--conf "spark.openlineage.debugFacet=enabled"
--conf "spark.openlineage.transport.type=oci"
--conf "spark.openlineage.transport.catalogId={catalog instance OCID}"
--conf "spark.openlineage.transport.dataAssetKey={UUID of the dataAsset registered in Data catalog}"
--conf "spark.openlineage.transport.authProfile={user authentication profile}"
--conf "spark.openlineage.transport.authType={optionally specify security_token as auth type}"
--conf "spark.openlineage.application.name={application name to display on catalog UI}"
Pubblica derivazione dati in Data Catalog
Per pubblicare la derivazione dati in Data Catalog, è necessario creare un utente IAM e impostare l'autenticazione per questo utente nel sistema in cui sono in esecuzione le applicazioni di elaborazione dati. Questo documento fornisce dettagli sui metodi di autenticazione disponibili per connettersi ai servizi OCI. Specificare il seguente criterio IAM per l'utente
allow group lineage-group to CATALOG_LINEAGE_IMPORT in tenancy where all {target.catalog.id = <catalog-ocid>, target.data-asset.key=<data-asset-key>}Per qualsiasi altra applicazione di elaborazione dati nel tuo ecosistema, segui i passaggi precedenti.
Solo la fase di generazione della derivazione dati varia a seconda dell'applicazione di elaborazione dati. È possibile cercare la pagina Integrazioni Openlineage per verificare se esiste già un plugin Openlineage per l'applicazione. In caso contrario, è possibile scrivere la propria implementazione per produrre il payload di derivazione dati in formato Openlineage. Di seguito è riportato il codice di esempio per chiamare l'endpoint importLineage del servizio OCI Data Catalog.
ImportLineage: API Java
try (DataCatalogClient client = DataCatalogClient.builder().build(provider)) {
ImportLineageRequest importLineageRequest = ImportLineageRequest
.builder()
.importLineageDetails(ImportLineageDetails
.builder()
.lineagePayload(payload.getBytes())
.build())
.dataAssetKey(dataAssetKey)
.catalogId(catalogOcid)
.build();
ImportLineageResponse response = client.importLineage(importLineageRequest);
} catch (Exception ex) {
log.warn("Failed to push lineage {}", ex.getMessage(), ex);
}
ImportLineage: API Python
import oci
# Create a default config using DEFAULT profile in default location
# Refer to
# https://docs.cloud.oracle.com/en-us/iaas/Content/API/Concepts/sdkconfig.htm#SDK_and_CLI_Configuration_File
# for more info
config = oci.config.from_file()
# Initialize service client with default config file
data_catalog_client = oci.data_catalog.DataCatalogClient(config)
# Send the request to service, some parameters are not required, see API
# doc for more info
import_lineage_response = data_catalog_client.import_lineage(
catalog_id="ocid1.test.oc1..<unique_ID>EXAMPLE-catalogId-Value",
data_asset_key="EXAMPLE-dataAssetKey-Value",
import_lineage_details=oci.data_catalog.models.ImportLineageDetails(
lineage_payload="openlineage-payload"),
opc_retry_token="EXAMPLE-opcRetryToken-Value",
opc_request_id="AOR2TTPR06HMXDHS3NZU<unique_ID>")
# Get the data from response
print(import_lineage_response.data)