Pubblica informazioni derivazione da applicazioni personalizzate in OCI Data Catalog

In questa esercitazione è possibile impostare l'applicazione di elaborazione dati per eseguire il push e la pubblicazione della derivazione dati in Data Catalog. I task chiave includono come:

  • Data Catalog
  • Impostazione per l'accettazione della derivazione dati.
  • Aggiungere il plugin Openlineage-spark all'applicazione Spark per generare la derivazione dei dati
  • Pubblica derivazione dati in Data Catalog.

La derivazione dei dati indica il percorso intrapreso dai dati durante il flusso dalle origini dati al consumo. Attraverso i metadati, i consumatori di dati possono comprendere e visualizzare le trasformazioni che i dati hanno attraversato nelle pipeline di dati.

La derivazione dei dati può essere generata da qualsiasi applicazione/servizio di elaborazione dati. La maggior parte delle applicazioni standard per l'elaborazione dei dati supporta la generazione e la pubblicazione della derivazione dei dati.

Data Catalog offre opzioni per acquisire la derivazione dei dati da diversi servizi, tra cui

Pratica standard per l'acquisizione della derivazione dei dati

Per informazioni sulla comprensione dello standard per la raccolta e l'analisi della derivazione dei dati, vedere OpenLineage

Specifica OpenLineage

In base alla OpenLineage specificazione, la derivazione dati generata da qualsiasi applicazione di elaborazione dati è rappresentata come segue:

OpenLineage Formato Espandi origine

{

    "eventTime": "2019-08-24T14:15:22Z",
    "producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
    "schemaURL": "https://openlineage.io/spec/0-0-1/OpenLineage.json",
    "eventType": "START|RUNNING|COMPLETE|ABORT|FAIL|OTHER",
    "run": {
        "runId": "78c33d18-170c-44d3-a227-b3194f134f73",
        "facets": {
            "property1": {
                "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
            },
            "property2": {
                "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
            }
        }
    },
    "job": {
        "namespace": "my-scheduler-namespace",
        "name": "myjob.mytask",
        "facets": {
            "property1": {
                "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
                "_deleted": true
            },
            "property2": {
                "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
                "_deleted": true
            }
        }
    },
    "inputs": [
        {
            "namespace": "my-datasource-namespace",
            "name": "instance.schema.table",
            "facets": {
                "property1": {
                    "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                    "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
                    "_deleted": true
                },
                "property2": {
                    "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                    "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
                    "_deleted": true
                }
            },
            "inputFacets": {
                "property1": {
                    "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                    "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
                },
                "property2": {
                    "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                    "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
                }
            }
        }
    ],
    "outputs": [
        {
            "namespace": "my-datasource-namespace",
            "name": "instance.schema.table",
            "facets": {
                "property1": {
                    "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                    "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
                    "_deleted": true
                },
                "property2": {
                    "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                    "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet",
                    "_deleted": true
                }
            },
            "outputFacets": {
                "property1": {
                    "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                    "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
                },
                "property2": {
                    "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
                    "_schemaURL": "https://openlineage.io/spec/1-0-2/OpenLineage.json#/$defs/BaseFacet"
                }
            }
        }
    ]
}

Informazioni preliminari

Per eseguire correttamente questa esercitazione, è necessario che:

Se si dispone di diritti amministrativi per l'account, saltare il resto di questa sezione. In caso contrario, chiedere all'amministratore di aggiungere il criterio seguente all'account:
allow group <the-group-your-username-belongs> to manage all-resources in compartment catalog-compartment

Per ulteriori esempi, vedere Policy comuni.

Nota

Nella sezione successiva, è possibile creare un compartimento per le istanze del Data Catalog, denominato catalog-compartment.
Impostazione di Data Catalog per l'accettazione della derivazione dati

In questa sezione è possibile impostare le applicazioni di elaborazione dati per eseguire il push della derivazione dati in Data Catalog.

Il task riportato di seguito è un esempio di applicazione Spark.
  1. Aprire il menu di navigazione e selezionare Analytics e AI. In Data Lake selezionare Data Catalog.
  2. Inserire le seguenti informazioni:
    • Nome: Lineage - Sales application
  3. Per Tipo, selezionare Fornitore derivazione personalizzato.
  4. Selezionare Crea.

Aggiungi plugin Openlineage-spark all'applicazione Spark per generare derivazione dati

Openlineage fornisce un plugin spark Apache che si lega a spark-context e genera Data Lineage da esso. Questo plugin può essere esteso per pubblicare la derivazione dati in Data Catalog. I seguenti snippet forniscono il codice di estensione del plugin e le opzioni spark-submit per richiamare il plugin.

POM plugin

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>OpenLineageExtension</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <openlineage.version>1.8.0</openlineage.version>
        <oci.sdk.version>3.41.2</oci.sdk.version>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/io.openlineage/openlineage-spark -->
        <dependency>
            <groupId>io.openlineage</groupId>
            <artifactId>openlineage-spark</artifactId>
            <version>${openlineage.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.30</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.13</artifactId>
            <version>3.5.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.13.12</version>
        </dependency>

        <dependency>
            <groupId>com.oracle.oci.sdk</groupId>
            <artifactId>oci-java-sdk-datacatalog</artifactId>
            <version>${oci.sdk.version}</version>
        </dependency>

        <dependency>
            <groupId>com.oracle.oci.sdk</groupId>
            <artifactId>oci-java-sdk-common</artifactId>
            <version>${oci.sdk.version}</version>
        </dependency>

        <dependency>
            <groupId>com.oracle.oci.sdk</groupId>
            <artifactId>oci-java-sdk-common-httpclient-jersey</artifactId>
            <version>${oci.sdk.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.0.2</version>
                <configuration>
                    <source>11</source>
                    <target>11</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

Estensione del plugin Openlineage - OciConfig

package io.openlineage.client.transports;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.With;

@NoArgsConstructor
@AllArgsConstructor
@ToString
@With
public final class OciConfig implements TransportConfig {

    @Getter
    @Setter
    private String catalogId;
    @Getter
    @Setter
    private String dataAssetKey;
    @Getter
    @Setter
    private String authType;
    @Getter
    @Setter
    private String authProfile;
    @Getter
    @Setter
    private String endpoint;
}

Estensione del plugin Openlineage - OciTransport

package io.openlineage.client.transports;

import com.oracle.bmc.auth.AuthenticationDetailsProvider;
import com.oracle.bmc.auth.ConfigFileAuthenticationDetailsProvider;
import com.oracle.bmc.auth.SessionTokenAuthenticationDetailsProvider;
import com.oracle.bmc.datacatalog.DataCatalogClient;
import com.oracle.bmc.datacatalog.model.ImportLineageDetails;
import com.oracle.bmc.datacatalog.requests.ImportLineageRequest;
import com.oracle.bmc.datacatalog.responses.ImportLineageResponse;
import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineageClientUtils;
import lombok.SneakyThrows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OciTransport extends Transport {

    private static final Logger log = LoggerFactory.getLogger(OciTransport.class);
    private final OciConfig config;
    private final String SESSION_TOKEN_AUTH = "security_token";
    private final boolean isValidConfig;

    public OciTransport(final OciConfig ociConfig) {
        config = ociConfig;
        isValidConfig = isValidConfig();
    }

    /***
     * Selectively emit only complete events to OCI Data Catalog
     * @param runEvent
     */
    @Override
    public void emit(OpenLineage.RunEvent runEvent) {
        if (runEvent.getEventType().equals(OpenLineage.RunEvent.EventType.COMPLETE)) {
            log.info("Found event type - Complete. Emitting event using OciTransport");
            final String eventAsJson = OpenLineageClientUtils.toJson(runEvent);
            emit(eventAsJson);
        } else {
            log.info("Found event type - {}. Skipping emission",runEvent.getEventType());
        }
    }

    @SneakyThrows
    @Override
    public void emit(String eventAsJson) {
        if (!isValidConfig) {
            log.info("OCI config is not valid. Please update OCI config. Skipping lineage emission.");
            return;
        }
        AuthenticationDetailsProvider provider;
        if (config.getAuthType()!=null && config.getAuthType().equals(SESSION_TOKEN_AUTH)) {
            provider = new SessionTokenAuthenticationDetailsProvider(config.getAuthProfile());
        } else {
            provider = new ConfigFileAuthenticationDetailsProvider(config.getAuthProfile());
        }

        try (DataCatalogClient client = DataCatalogClient.builder().build(provider)) {
            if (config.getEndpoint() != null && !config.getEndpoint().isBlank()) {
                client.setEndpoint(config.getEndpoint());
            }
            ImportLineageRequest importLineageRequest = ImportLineageRequest
                .builder()
                .importLineageDetails(ImportLineageDetails
                    .builder()
                    .lineagePayload(eventAsJson.getBytes())
                    .build())
                .dataAssetKey(config.getDataAssetKey())
                .catalogId(config.getCatalogId())
                .build();
            ImportLineageResponse response = client.importLineage(importLineageRequest);
            log.info("Pushed lineage event to catalog - {}, opc-request-id = {}", config.getCatalogId(), response.getOpcRequestId());
        } catch (Exception ex) {
            log.warn("Failed to push lineage {}", ex.getMessage(), ex);
        }
    }

    private boolean isValidConfig() {
        log.info("OCI Config - {}", config.toString());
        if (config.getAuthProfile() == null || config.getAuthProfile().isBlank()) {
            log.info("OCI config is missing Authentication profile. Lineage will not be emitted to OCI Data Catalog. " +
                "Check documentation on how to create Authentication profile.");
            return false;
        }
        if (config.getCatalogId() == null || config.getCatalogId().isBlank()) {
            log.info("OCI config is missing CatalogId. Lineage will not be emitted to OCI Data Catalog. " +
                "Check documentation on how to get CatalogId.");
            return false;
        }
        if (config.getDataAssetKey() == null || config.getDataAssetKey().isBlank()) {
            log.info("OCI config is missing DataAssetResourceId. Lineage will not be emitted to OCI Data Catalog. " +
                "Check documentation on how to create DataAsset and get its DataAssetResourceId");
            return false;
        }
        return true;
    }
}

Estensione del plugin Openlineage - OciTransportBuilder

package io.openlineage.client.transports;

public class OciTransportBuilder implements TransportBuilder {

    @Override
    public String getType() {
        return "oci";
    }

    @Override
    public TransportConfig getConfig() {
        return new OciConfig();
    }

    @Override
    public Transport build(TransportConfig config) {
        return new OciTransport((OciConfig) config);
    }
}

Opzioni Spark Submit

--packages "io.openlineage:openlineage-spark:1.8.0,com.oracle.oci.sdk:oci-java-sdk-common:{oci-sdk-version},com.oracle.oci.sdk:oci-java-sdk-common-httpclient-jersey:{oci-sdk-version},com.oracle.pic.dcat:datacatalog-java-client:{oci-sdk-version},io.openlineage:openlineage-oci-extension:{generated-from-above-mentioned-code-snippet}"
--conf "spark.extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener"
--conf "spark.openlineage.debugFacet=enabled"
--conf "spark.openlineage.transport.type=oci"
--conf "spark.openlineage.transport.catalogId={catalog instance OCID}"
--conf "spark.openlineage.transport.dataAssetKey={UUID of the dataAsset registered in Data catalog}"
--conf "spark.openlineage.transport.authProfile={user authentication profile}"
--conf "spark.openlineage.transport.authType={optionally specify security_token as auth type}"
--conf "spark.openlineage.application.name={application name to display on catalog UI}"

Pubblica derivazione dati in Data Catalog

Per pubblicare la derivazione dati in Data Catalog, è necessario creare un utente IAM e impostare l'autenticazione per questo utente nel sistema in cui sono in esecuzione le applicazioni di elaborazione dati. Questo documento fornisce dettagli sui metodi di autenticazione disponibili per connettersi ai servizi OCI. Specificare il seguente criterio IAM per l'utente

allow group lineage-group to CATALOG_LINEAGE_IMPORT in tenancy where all {target.catalog.id = <catalog-ocid>, target.data-asset.key=<data-asset-key>}
Pubblica derivazione dati in Data Catalog utilizzando altre applicazioni

Per qualsiasi altra applicazione di elaborazione dati nel tuo ecosistema, segui i passaggi precedenti.

Solo la fase di generazione della derivazione dati varia a seconda dell'applicazione di elaborazione dati. È possibile cercare la pagina Integrazioni Openlineage per verificare se esiste già un plugin Openlineage per l'applicazione. In caso contrario, è possibile scrivere la propria implementazione per produrre il payload di derivazione dati in formato Openlineage. Di seguito è riportato il codice di esempio per chiamare l'endpoint importLineage del servizio OCI Data Catalog.

ImportLineage: API Java

try (DataCatalogClient client = DataCatalogClient.builder().build(provider)) {
    ImportLineageRequest importLineageRequest = ImportLineageRequest
        .builder()
        .importLineageDetails(ImportLineageDetails
            .builder()
            .lineagePayload(payload.getBytes())
            .build())
        .dataAssetKey(dataAssetKey)
        .catalogId(catalogOcid)
        .build();
    ImportLineageResponse response = client.importLineage(importLineageRequest);
} catch (Exception ex) {
    log.warn("Failed to push lineage {}", ex.getMessage(), ex);
}

ImportLineage: API Python

import oci

# Create a default config using DEFAULT profile in default location
# Refer to
# https://docs.cloud.oracle.com/en-us/iaas/Content/API/Concepts/sdkconfig.htm#SDK_and_CLI_Configuration_File
# for more info
config = oci.config.from_file()


# Initialize service client with default config file
data_catalog_client = oci.data_catalog.DataCatalogClient(config)


# Send the request to service, some parameters are not required, see API
# doc for more info
import_lineage_response = data_catalog_client.import_lineage(
    catalog_id="ocid1.test.oc1..<unique_ID>EXAMPLE-catalogId-Value",
    data_asset_key="EXAMPLE-dataAssetKey-Value",
    import_lineage_details=oci.data_catalog.models.ImportLineageDetails(
        lineage_payload="openlineage-payload"),
    opc_retry_token="EXAMPLE-opcRetryToken-Value",
    opc_request_id="AOR2TTPR06HMXDHS3NZU<unique_ID>")

# Get the data from response
print(import_lineage_response.data)