Integração do Serviço Data Flow com o Serviço Data Science

Com o serviço Data Flow, você pode configurar notebooks do Data Science para executar aplicativos interativos no serviço Data Flow.

O serviço Data Flow usa Jupyter Notebooks totalmente gerenciados para permitir que cientistas e engenheiros de dados criem, visualizem, colaborem e depurem aplicativos de engenharia e ciência de Dados. Você pode gravar esses aplicativos em Python, Scala e PySpark. Você também pode conectar uma sessão de notebook do serviço Data Science ao serviço Data Flow para executar aplicativos. Os kernels e aplicativos do Data Flow são executados no Oracle Cloud Infrastructure Data Flow .

O Apache Spark é um sistema de computação distribuído projetado para processar dados em escala. Ele suporta tarefas SQL em grande escala, tarefas de processamento em batch e de streams e tarefas de aprendizado de máquina. O Spark SQL fornece suporte semelhante a banco de dados. Para consultar dados estruturados, use o Spark SQL. Ele é uma implementação SQL padrão ANSI.

As Sessões do Serviço Data Flow suportam recursos do cluster do Data Flow com dimensionamento automático. Para obter mais informações, consulte Dimensionamento Automático na documentação do serviço Data Flow.

As Sessões do Serviço Data Flow suportam o uso de ambientes conda como ambientes de runtime Spark personalizáveis.

Um notebook do serviço Data Science usa o Data Flow Magic para enviar solicitações ao serviço Data Flow usando as APIs NotebookSession para executar o código Spark em um servidor do serviço Data Flow.
Limitações
  • As Sessões do serviço Data Flow duram até 7 dias ou 10.080 minutos (maxDurationInMinutes).

  • As sessões do serviço Data Flow têm um valor de timeout por inatividade padrão de 480 minutos (8 horas) (idleTimeoutInMinutes). Você pode configurar outro valor.
  • A Sessão do serviço Data Flow só está disponível por meio de uma Sessão de Notebook do serviço Data Science.
  • Somente o Spark versão 3.5.0 e 3.2.1 são suportados.
Dica

Assista ao vídeo tutorial sobre como usar o serviço Data Science com o Data Flow Studio. Consulte também a documentação do Oracle Accelerated Data Science SDK para obter mais informações sobre como integrar o serviço Data Science e o serviço Data Flow.

Instalando o Ambiente Conda

Siga estas etapas para usar o Data Flow com o Data Flow Magic.

  1. Crie ou abra uma sessão de notebook no serviço Data Science.
    • A sessão de notebook deve estar na região em que o serviço foi ativado para a tenancy.
    • A sessão de notebook deve estar no compartimento do grupo dinâmico de sessões de notebook.
  2. Instale e ative o ambiente conda pyspark32_p38_cpu_v3 na sessão de notebook:
    copy odsc conda install -s pyspark32_p38_cpu_v3
    Observação

    Recomendamos instalar o ambiente conda atualizado pyspark32_p38_cpu_v3, pois ele inclui suporte para reiniciar sessões automaticamente que foram interrompidas porque excederam a duração máxima do timeout.

    O ambiente conda estável anterior era pyspark32_p38_cpu_v2.

  3. Ative o ambiente conda pyspark32_p38_cpu_v3:
    source activate /home/datascience/conda/pyspark32_p38_cpu_v3

Usando o Serviço Data Flow com o Serviço Data Science

Siga estas etapas para executar um aplicativo usando o serviço Data Flow com o serviço Data Science.

  • Certifique-se de ter as políticas configuradas para usar um notebook com o serviço Data Flow.

  • Certifique-se de ter as políticas do serviço Data Science configuradas corretamente.

  • Para obter uma lista de todos os comandos suportados, use o comando %help.
  • Os comandos nas etapas a seguir se aplicam ao Spark 3.5.0 e ao Spark 3.2.1. O Spark 3.5.0 é usado nos exemplos. Defina o valor de sparkVersion de acordo com a versão do Spark usada.
  1. Configure a autenticação no ADS.
    • O ADS SDK é usado para controlar o tipo de autenticação usado no Data Flow Magic.
    • A autenticação API KEY é usada por padrão. Para alterar o tipo de autenticação, use o comando ads.set_auth("resource_principal"):
      import ads
      ads.set_auth("resource_principal") # Supported values: resource_principal, api_key
  2. Carregar a extensão do Data Flow Magic
    %load_ext dataflow.magics
  3. (Opcional) Crie uma sessão do serviço Data Flow usando o comando mágico, %create_session:
    Sessão Comum
    Este exemplo mostra como criar uma nova sessão em formas flexíveis:
    import json
    command = {
        "compartmentId": "<compartment_ocid>",
        "displayName": "<session_name>",
        "language": "PYTHON",
        "sparkVersion": "3.5.0",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "type": "SESSION",
        "logsBucketUri": "oci://<bucket>@<namespace>/",
    }
    command = f'\'{json.dumps(command)}\''
     
    %create_session -l python -c $command
    Sessão com URI do arquivo compactado
    import json
    command = {
        "compartmentId": "<compartment_ocid>",
        "displayName": "<session_name>",
        "language": "PYTHON",
        "sparkVersion": "3.5.0",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "type": "SESSION",
        "logsBucketUri": "oci://<bucket>@<namespace>",
        "archiveUri": <oci://<bucket>@<namespace>/archive.zip"
    }
    command = f'\'{json.dumps(command)}\''
     
    %create_session -l python -c $command
    Sessão com um ambiente conda personalizado
    import json
    command = {
        "compartmentId": "<compartment_ocid>",
        "displayName": "<session_name>",
        "language": "PYTHON",
        "sparkVersion": "3.5.0",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "type": "SESSION",
        "logsBucketUri": "oci://<bucket>@<namespace>",
        "configuration": {
            "spark.archives": "oci://<bucket>@<namespace>/conda_pack/<pack_name>"
        },
    }
    command = f'\'{json.dumps(command)}\''
     
    %create_session -l python -c $command
    Sessão com Metastore
    import json
    command = {
        "compartmentId": "<compartment_ocid>",
        "displayName": "<session_name>",
        "language": "PYTHON",
        "sparkVersion": "3.5.0",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "type": "SESSION",
        "logsBucketUri": oci://<bucket>@<namespace>",
        "metastoreId": "<ocid1.datacatalogmetastore.oc1.iad...>",
        "configuration": {
            "spark.archives": "oci://<bucket>@<namespace>/conda_pack/<pack_name>"
        },
    }
    command = f'\'{json.dumps(command)}\''
     
    %create_session -l python -c $command
  4. (Opcional) Use uma sessão existente do serviço Data Flow:
    Use o comando %use_session. Copie o OCID da Console.
    %use_session -s <session_OCID> -r <region_name>
                                
  5. Configure a sessão usando o comando %config_session:
    • Para ver a configuração atual:
      %config
    • Para alterar os drivers e executores:
      %configure_session -i '{"driverShape": "VM.Standard2.1", "executorShape": "VM.Standard2.1", "numExecutors": 1}'
    • Para aplicar o Dimensionamento Automático:
      %configure_session -i '{"driverShape": "VM.Standard2.1",\
      "executorShape": "VM.Standard2.1", "numExecutors": 16,\
      "sparkVersion":"3.5.0",\
      "configuration":{"spark.dynamicAllocation.enabled":"true",\
      "spark.dynamicAllocation.shuffleTracking.enabled":"true",\
      "spark.dynamicAllocation.minExecutors":"16",\
      "spark.dynamicAllocation.maxExecutors":"54",\
      "spark.dynamicAllocation.executorIdleTimeout":"60",\
      "spark.dynamicAllocation.schedulerBacklogTimeout":"60",\
      "spark.dataflow.dynamicAllocation.quotaPolicy":"max"} \
      }'
      Você também pode aplicar uma política de dimensionamento automático ao criar uma sessão.
  6. (Opcional) Para ativar uma sessão existente, use o comando %activate_session:
    %activate_session -l python -c 
    '{"compartmentId": "<Compartment_OCID>",
        "displayName": "<Name>",
        "applicationId": "<Application_OCID>"
    }'
  7. Para interromper uma sessão, use o comando %stop_session:
    %stop_session

Personalizando um Ambiente Spark do Serviço Data Flow com um Ambiente Conda

Você pode usar um ambiente conda publicado como um ambiente de runtime.

  1. Instale o Spark 3.5.0 e o Data Flow na sessão de notebook:
    odsc conda install -s pyspark32_p38_cpu_v3
  2. Instale as bibliotecas usando o conda.
  3. Publique o ambiente conda:
    odsc conda publish -s pyspark32_p38_cpu_v3
  4. Inicie o cluster do serviço Data Flow, por exemplo:
    %create_session -l python -c '{"compartmentId":"<your-compartment-ocid>", \
    "displayName":"<your-display-name>",\
    "sparkVersion":"3.5.0", \
    "language":"PYTHON", \
    "type": "SESSION",\
    "driverShape":"VM.Standard2.1", \
    "executorShape":"VM.Standard2.1",\
    "numExecutors":1,\
    "configuration": {"spark.archives":"oci://<your-bucket>@<your-tenancy-namespace>/<your-path-to-the-conda-environment>#conda"}}'
    Observação

    A configuração da sessão deve incluir os seguintes parâmetros:
    • "sparkVersion":"3.5.0"
    • "language":"PYTHON"
    • "configuration" com um caminho spark.archives para o ambiente conda no armazenamento de objetos.

Executando a Biblioteca spark-nlp no Serviço Data Flow

Siga estas etapas para instalar a biblioteca Spark-nlp e executar no serviço Data Flow.

Você deve ter concluído as etapas 1 e 2 em Personalizando um Ambiente Spark do Serviço Data Flow com um Ambiente Conda. A biblioteca spark-nlp é pré-instalada no ambiente conda pyspark32_p38_cpu_v2.

  1. Instale os modelos e pipelines pré-treinados da biblioteca spark-nlp.

    Se você precisar de qualquer modelo pré-treinado da biblioteca spark-nlp, faça download deles e descompacte-os na pasta do ambiente conda. O serviço Data Flow ainda não suporta saída para a internet pública. Não é possível fazer download de forma dinâmica de modelos pré-treinados do AWS S3 no serviço Data Flow.

    Você pode fazer download de modelos pré-treinados do model hub como arquivos compactados zip. Descompacte o modelo na pasta do ambiente conda. O modelo de exemplo é https://nlp.johnsnowlabs.com/2021/03/23/explain_document_dl_en.html:
    mkdir /home/datascience/conda/pyspark32_p38_cpu_v2/sparknlp-models
    unzip explain_document_dl_en_3.0.0_3.0_1616473268265.zip -d /home/datascience/conda/pyspark32_p38_cpu_v2/sparknlp-models/
  2. Publique o ambiente conda. Consulte a etapa 3 em Personalizando um Ambiente Spark do Serviço Data Flow com um Ambiente Conda.
  3. Inicie o cluster do serviço Data Flow.

    Em uma célula de notebook em execução no kernel da sessão de notebook pyspark30_p37_cpu_v5, verifique novamente o

    parâmetro spark.jars.packages. Ele reflete a versão da instalação spark-nlp.
    %create_session -l python -c '{"compartmentId":" <your-compartment-ocid>", \
    "displayName":"sparknlp",\
    "sparkVersion":"3.2.1", \
    "language":"PYTHON", \
    "type": "SESSION",\
    "driverShape":"VM.Standard2.1", \
    "executorShape":"VM.Standard2.1",\
    "numExecutors":1,\
    "configuration": {"spark.archives":"oci://<your-bucket>@<your-tenancy-namespace>/<your-path-to-the-conda-environment>#conda",\
    "spark.jars.ivy":"/opt/spark/work-dir/conda/.ivy2",\
    "spark.jars.packages": "com.johnsnowlabs.nlp:spark-nlp_2.12:4.1.0"}\
    }'
  4. Teste-o com um trecho de código do repositório spark-nlp GitHub:
    %%spark
     
    from sparknlp.base import *
    from sparknlp.annotator import *
    from sparknlp.pretrained import PretrainedPipeline
    import sparknlp
     
    # Start SparkSession with Spark NLP
    # start() functions has 3 parameters: gpu, m1, and memory
    # sparknlp.start(gpu=True) will start the session with GPU support
    # sparknlp.start(m1=True) will start the session with macOS M1 support
    # sparknlp.start(memory="16G") to change the default driver memory in SparkSession
    spark = sparknlp.start()
     
    # Download a pre-trained pipeline
    pipeline = PretrainedPipeline('explain_document_dl', lang='en', disk_location="/opt/spark/work-dir/conda/sparknlp-models/")
     
     
    # Your testing dataset
    text = """
    Lawrence Joseph Ellison (born August 17, 1944) is an American business magnate and investor who is the co-founder,
    executive chairman, chief technology officer (CTO) and former chief executive officer (CEO) of the
    American computer technology company Oracle Corporation.[2] As of September 2022, he was listed by
    Bloomberg Billionaires Index as the ninth-wealthiest person in the world, with an estimated
    fortune of $93 billion.[3] Ellison is also known for his 98% ownership stake in Lanai,
    the sixth-largest island in the Hawaiian Archipelago.[4]
    """
     
    # Annotate your testing dataset
    result = pipeline.annotate(text)
     
    # What's in the pipeline
    print(list(result.keys()))
     
    # Check the results
    print(result['entities'])
    A seguinte saída está na célula de notebook:
    ['entities', 'stem', 'checked', 'lemma', 'document', 'pos', 'token', 'ner', 'embeddings', 'sentence']
    ['Lawrence Joseph Ellison', 'American', 'American', 'Oracle Corporation', 'Bloomberg Billionaires Index', 'Ellison', 'Lanai', 'Hawaiian Archipelago']

Exemplos

Veja alguns exemplos de uso de Dados FlowMagic.

PySpark

A variável sc representa o Spark e fica disponível quando o comando mágico %%spark é usado. A célula seguinte é um exemplo de brinquedo de como usar sc em uma célula de Dados FlowMagic. A célula chama o método .parallelize() , que cria um RDD, numbers, a partir de uma lista de números. As informações sobre o RDD são impressas. O método .toDebugString() retorna uma descrição da RDD.
%%spark
print(sc.version)
 
numbers = sc.parallelize([4, 3, 2, 1])
print(f"First element of numbers is {numbers.first()}")
print(f"The RDD, numbers, has the following description\n{numbers.toDebugString()}")

Spark SQL

O uso da opção -c sql permite executar comandos SQL do Spark em uma célula. Nesta seção, o conjunto de dados citi bike é usado. A célula a seguir lê o conjunto de dados em um dataframe do Spark e o salva como uma tabela. Este exemplo é usado para mostrar o Spark SQL.

O conjunto de dados Citibike é transferido por upload para o armazenamento de objetos. Talvez você precise fazer o mesmo no seu realm.
%%spark
df_bike_trips = spark.read.csv("oci://dmcherka-dev@ociodscdev/201306-citibike-tripdata.csv", header=False, inferSchema=True)
df_bike_trips.show()
df_bike_trips.createOrReplaceTempView("bike_trips")

O exemplo a seguir usa a opção -c sql para dizer aos Dados FlowMagic que o conteúdo da célula é SparkSQL. A opção -o <variable> obtém os resultados da operação Spark SQL e os armazena na variável definida. Neste caso, o

df_bike_trips é um dataframe Pandas que está disponível para uso no notebook.
%%spark -c sql -o df_bike_trips
SELECT _c0 AS Duration, _c4 AS Start_Station, _c8 AS End_Station, _c11 AS Bike_ID FROM bike_trips;
Imprima as primeiras linhas de dados:
df_bike_trips.head()
Da mesma forma, você pode usar sqlContext para consultar a tabela:
%%spark
df_bike_trips_2 = sqlContext.sql("SELECT * FROM bike_trips")
df_bike_trips_2.show()
Por fim, você pode descrever a tabela:
%%spark -c sql
SHOW TABLES

Widget de Visualização Automática

Os dados FlowMagic vêm com autovizwidget que permite a visualização de dataframes Pandas. A função display_dataframe() utiliza um dataframe Pandas como parâmetro e gera uma GUI interativa no notebook. Ela tem guias que mostram a visualização dos dados em várias formas, como tabular, gráficos de pizza, gráficos de dispersão e gráficos de área e de barras.

A célula a seguir chama display_dataframe() com o dataframe df_people que foi criado na seção Spark SQL do notebook:
from autovizwidget.widget.utils import display_dataframe
display_dataframe(df_bike_trips)

Matplotlib

Uma tarefa comum que os cientistas de dados realizam é visualizar seus dados. Com grandes conjuntos de dados, geralmente não é possível e nem sempre é preferível extrair os dados do cluster do Spark do serviço Data Flow para a sessão de notebook. Este exemplo prova como usar recursos do lado do servidor para gerar um gráfico e incluí-lo no notebook.

O dataframe df_bike_trips é definido na sessão e é reutilizado. Para criar uma Matplotlib, inclua as bibliotecas necessárias e gere o gráfico. Use o comando mágico %matplot plt para exibir o gráfico no notebook, mesmo que ele seja renderizado no lado do servidor:
%%spark
import matplotlib.pyplot as plt
df_bike_trips.groupby("_c4").count().toPandas().plot.bar(x="_c4", y="count")
%matplot plt

Mais Exemplos

Mais exemplos estão disponíveis em GitHub com amostras do Serviço Data Flow e amostras do Serviço Data Science.