Genera previsioni domanda

Questi passi descrivono come generare previsioni della domanda nel notebook OCI Data Science.

Imposta ambiente

  1. Nel notebook di Data Science, impostare la sessione di Flusso dati OCI. Utilizzare la libreria python pip install prophet durante la creazione del pacchetto conda personalizzato.
    import ads
    
    ads.set_auth("resource_principal")
    
    %reload_ext dataflow.magics
    
    %help
    
    import json
    command = {
        "compartmentId": "your-compartment-ocid",
        "displayName": "display-name",
        "sparkVersion": "3.2.1",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "metastoreId": "your-metastore-ocid",
        "configuration":{
                 "spark.archives":"oci://your-oci-bucket-name@your-oci-tenancy-name/your-custom-conda-env-path#conda",
                         "spark.oracle.datasource.enabled":"true",
                         "spark.delta.logStore.oci.impl":"io.delta.storage.OracleCloudLogStore",
                         "spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
                         "spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"}
    }
    command = f'\'{json.dumps(command)}\''
    print("command",command)
    
  2. Importare le librerie python dipendenti. Ad esempio:
    %%spark
    #Import required libraries.
    
    import json
    import os
    import sys
    import datetime
    import oci
    import pyspark.sql 
    from pyspark.sql.functions import countDistinct
    
    from pyspark.ml import Pipeline
    from pyspark.ml.classification import DecisionTreeClassifier
    from pyspark.ml.feature import StringIndexer, VectorIndexer
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    # $example off$
    from pyspark.sql import SparkSession
    
    from delta.tables import *
    
    import pandas as pd
    
    from prophet import Prophet
    import logging
    
    import matplotlib.pyplot as plt
    

Esame dei dati

  1. Esaminare i dati.
    Per il nostro set di dati di formazione, utilizzeremo cinque anni di dati di vendita unità articolo negozio per 50 articoli in 10 negozi diversi.
    %%spark
    
    from pyspark.sql.types import *
    
    # structure of the training data set
    
    train_schema = StructType([
      StructField('date', DateType()),
      StructField('store', IntegerType()),
      StructField('item', IntegerType()),
      StructField('sales', IntegerType())
      ])
    
    # read the training file into a dataframe
    
    train = spark.read.csv(
      'oci://demo-bucket@orasenatdpltintegration03/forecast/bronze/train.csv', 
      header=True, 
      schema=train_schema
      )
    
    # make the dataframe queryable as a temporary view
    
    train.createOrReplaceTempView('train')
    
    # show data
    
    train.show()
    
  2. Visualizza le tendenze annuali durante l'esecuzione della previsione della domanda, siamo spesso interessati alle tendenze generali e alla stagionalità. Iniziamo la nostra esplorazione esaminando l'andamento annuale delle vendite unitarie.
    %%spark
    
    df = spark.sql(""" SELECT
      year(date) as year, 
      sum(sales) as sales
    FROM train
    GROUP BY year(date)
    ORDER BY year;""")
    
    df.show(5)
    
    È molto chiaro dai dati che esiste una tendenza generalmente al rialzo delle vendite unitarie totali tra i negozi. Se avessimo una migliore conoscenza dei mercati serviti da questi negozi, potremmo voler identificare se c'è una capacità di crescita massima che ci aspettiamo di affrontare durante la vita delle nostre previsioni. Senza tale conoscenza e vedendo rapidamente questo dataset, ci si sente al sicuro presumere che se il nostro obiettivo è fare una previsione qualche giorno, mesi o anche un anno di fuori, ci si potrebbe aspettare una crescita lineare continua durante quell'arco di tempo.
  3. Visualizza le tendenze mensili.
    Esaminiamo la stagionalità. Se aggreghiamo i dati relativi ai singoli mesi in ogni anno, viene osservato un modello stagionale annuale distinto che sembra crescere in scala con la crescita complessiva delle vendite.
    %%spark -o df
    
    df = spark.sql("""SELECT 
      TRUNC(date, 'MM') as month,
      SUM(sales) as sales
    FROM train
    GROUP BY TRUNC(date, 'MM')
    ORDER BY month;""")
    
    from autovizwidget.widget.utils import display_dataframe
    display_dataframe(df.head(6))
  4. Visualizza le tendenze dei giorni feriali.
    Aggregando i dati a livello di giorno feriale, si osserva un pattern stagionale settimanale pronunciato con un picco di domenica (giorno feriale 0), una caduta dura del lunedì (giorno feriale 1) e poi un ritiro fisso della settimana che torna alla domenica alta. Questo modello sembra essere stabile nei cinque anni di osservazioni.
    %%spark -o df
    
    df = spark.sql(""" 
    SELECT
      YEAR(date) as year,
      (
        CASE
          WHEN DATE_FORMAT(date, 'E') = 'Sun' THEN 0
          WHEN DATE_FORMAT(date, 'E') = 'Mon' THEN 1
          WHEN DATE_FORMAT(date, 'E') = 'Tue' THEN 2
          WHEN DATE_FORMAT(date, 'E') = 'Wed' THEN 3
          WHEN DATE_FORMAT(date, 'E') = 'Thu' THEN 4
          WHEN DATE_FORMAT(date, 'E') = 'Fri' THEN 5
          WHEN DATE_FORMAT(date, 'E') = 'Sat' THEN 6
        END
      ) % 7 as weekday,
      AVG(sales) as sales
    FROM (
      SELECT 
        date,
        SUM(sales) as sales
      FROM train
      GROUP BY date
     ) x
    GROUP BY year, weekday
    ORDER BY year, weekday;""")
    
    from autovizwidget.widget.utils import display_dataframe
    display_dataframe(df.head(6))
    

Genera una singola previsione

Generare una singola previsione della domanda prima di tentare di generare previsioni per singole combinazioni di negozi e articoli. Potrebbe essere utile costruire un'unica previsione per nessun altro motivo che orientarci all'uso del profeta.
  1. Il nostro primo passo è quello di assemblare il set di dati storici su cui formaremo il modello.
    %%spark
    
    history_pd = spark.sql(""" 
      SELECT
        CAST(date as date) as ds,
        sales as y
      FROM train
      WHERE store=1 AND item=1
      ORDER BY ds
    """).toPandas()
    
    history_pd = history_pd.dropna()
    
    history_pd = history_pd.dropna()
    
    Ora, importaremo la biblioteca dei profeti, ma perché può essere un po 'verosa quando in uso, dobbiamo perfezionare le impostazioni di registrazione nel nostro ambiente.
    %%spark
    
    from prophet import Prophet
    import logging
    
    # disable informational messages from prophet
    
    logging.getLogger('py4j').setLevel(logging.ERROR)
    %%spark
    
    # set model parameters
    model = Prophet(
      interval_width=0.95,
      growth='linear',
      daily_seasonality=False,
      weekly_seasonality=True,
      yearly_seasonality=True,
      seasonality_mode='multiplicative'
      )
     
    # fit the model to historical data
    model.fit(history_pd)
  2. È quindi possibile creare una previsione.
    %%spark -o df
    
    # define a dataset including both historical dates & 90-days beyond the last available date
    future_pd = model.make_future_dataframe(
      periods=90, 
      freq='d', 
      include_history=True
      )
     
    forecast_pd = model.predict(future_pd)
    
    df=spark.createDataFrame(forecast_pd)
    from autovizwidget.widget.utils import display_dataframe
    display_dataframe(df.head(6))
  3. Esaminare i componenti della previsione.
    Come ha funzionato il nostro modello? Qui possiamo vedere le tendenze generali e stagionali nel nostro modello presentato come grafici.
    %%spark 
    
    import matplotlib.pyplot as plt
    
    trends_fig = model.plot_components(forecast_pd)
    
    %matplot plt
  4. Visualizza dati cronologici rispetto a dati previsione.
    Qui, possiamo vedere come la nostra linea di dati effettiva e prevista sia una previsione per il futuro, anche se limiteremo il nostro grafico all'ultimo anno di dati storici per mantenerlo leggibile.
    %%spark 
    
    predict_fig = model.plot( forecast_pd, xlabel='date', ylabel='sales')
    
    # adjust figure to display dates from last year + the 90 day forecast
    xlim = predict_fig.axes[0].get_xlim()
    new_xlim = ( xlim[1]-(180.0+365.0), xlim[1]-90.0)
    predict_fig.axes[0].set_xlim(new_xlim)
    
    import matplotlib.pyplot as plt
    
    %matplot plt
    Questa visualizzazione è un po' occupata. I punti neri rappresentano i nostri effettivi con la linea blu più scura che rappresenta le nostre previsioni e la banda blu più leggera che rappresenta il nostro intervallo di incertezza (95%).
  5. Calcola le metriche di valutazione.
    L'ispezione visiva è utile, ma un modo migliore per valutare la previsione è calcolare i valori Errore assoluto medio, Errore quadratico medio e Errore quadratico medio radice per i valori previsti rispetto ai valori effettivi nel nostro set.
    %%spark 
    import pandas as pd
    from sklearn.metrics import mean_squared_error, mean_absolute_error
    from math import sqrt
    from datetime import date
    
    # get historical actuals & predictions for comparison
    
    actuals_pd = history_pd[ history_pd['ds'] < date(2018, 1, 1) ]['y']
    predicted_pd = forecast_pd[ forecast_pd['ds'] < pd.to_datetime('2018-01-01') ]['yhat']
    
    # calculate evaluation metrics
    
    mae = mean_absolute_error(actuals_pd, predicted_pd)
    mse = mean_squared_error(actuals_pd, predicted_pd)
    rmse = sqrt(mse)
    
    # print metrics to the screen
    
    print( '\n'.join(['MAE: {0}', 'MSE: {1}', 'RMSE: {2}']).format(mae, mse, rmse) )
    

Genera previsioni su larga scala

  1. Ridimensiona la generazione delle previsioni.
    Con la meccanica sotto la nostra cintura, affrontiamo il nostro obiettivo originale di costruire numerosi e raffinati modelli e previsioni per singole combinazioni di negozi e articoli. Inizieremo assemblando i dati di vendita al livello di granularità della data articolo del negozio. Recupera i dati per tutte le combinazioni negozio-articolo. I dati di questo set di dati dovrebbero già essere aggregati a questo livello di granularità, ma stiamo aggregando esplicitamente per garantire che abbiamo la struttura di dati prevista.
    %%spark
    
    sql_statement = '''
      SELECT
        store,
        item,
        CAST(date as date) as ds,
        SUM(sales) as y
      FROM train
      GROUP BY store, item, ds
      ORDER BY store, item, ds
      '''
    
    store_item_history = (
      spark
        .sql( sql_statement )
        .repartition(sc.defaultParallelism, ['store', 'item'])
      ).cache()
  2. Con i nostri dati aggregati a livello di data dell'elemento di memorizzazione, dobbiamo considerare come trasmetteremo i nostri dati al profeta. Se il nostro obiettivo è creare un modello per ogni combinazione di negozio e articolo, sarà necessario passare un sottoinsieme di articoli negozio dal set di dati appena assemblato, addestrare un modello su tale sottoinsieme e ricevere una previsione negozio-articolo indietro. Prevediamo che la previsione venga restituita come set di dati con una struttura del genere in cui manteniamo gli identificativi di negozio e articolo per i quali la previsione è stata assemblata e limiteremo l'output solo al relativo subset di campi generati dal modello Profet.
    %%spark
    
    from pyspark.sql.types import *
    
    result_schema =StructType([
      StructField('ds',DateType()),
      StructField('store',IntegerType()),
      StructField('item',IntegerType()),
      StructField('y',FloatType()),
      StructField('yhat',FloatType()),
      StructField('yhat_upper',FloatType()),
      StructField('yhat_lower',FloatType())
      ])
    
  3. Per addestrare il modello e generare una previsione, sfrutteremo una funzione Pandas. Definire questa funzione per ricevere un subset di dati organizzati in base a una combinazione di negozio e articolo. Verrà restituita una previsione nel formato identificato nella cella precedente.
    %%spark
    
    def forecast_store_item( history_pd: pd.DataFrame ) -> pd.DataFrame:
    
      # TRAIN MODEL AS BEFORE
      # --------------------------------------
      # remove missing values (more likely at day-store-item level)
    
      history_pd = history_pd.dropna()
    
      # configure the model
      model = Prophet(
        interval_width=0.95,
        growth='linear',
        daily_seasonality=False,
        weekly_seasonality=True,
        yearly_seasonality=True,
        seasonality_mode='multiplicative'
        )
    
      # train the model
      model.fit( history_pd )
      # --------------------------------------
    
      # BUILD FORECAST AS BEFORE
      # --------------------------------------
    
      # make predictions
      future_pd = model.make_future_dataframe(
        periods=90, 
        freq='d', 
        include_history=True
        )
    
      forecast_pd = model.predict( future_pd )  
      # --------------------------------------
    
      # ASSEMBLE EXPECTED RESULT SET
      # --------------------------------------
      # get relevant fields from forecast
      f_pd = forecast_pd[ ['ds','yhat', 'yhat_upper', 'yhat_lower'] ].set_index('ds')
    
      # get relevant fields from history
      h_pd = history_pd[['ds','store','item','y']].set_index('ds')
    
      # join history and forecast
      results_pd = f_pd.join( h_pd, how='left' )
      results_pd.reset_index(level=0, inplace=True)
    
      # get store & item from incoming data set
      results_pd['store'] = history_pd['store'].iloc[0]
      results_pd['item'] = history_pd['item'].iloc[0]
      # --------------------------------------
    
      # return expected dataset
      return results_pd[ ['ds', 'store', 'item', 'y', 'yhat', 'yhat_upper', 'yhat_lower'] ]  
    
  4. Applica la funzione di previsione a ogni combinazione negozio-articolo.
    Chiamiamo la nostra funzione pandas per costruire le nostre previsioni. Lo facciamo raggruppando il nostro data set storico intorno al negozio e all'articolo. Applichiamo quindi la nostra funzione a ogni gruppo e affrontiamo la data odierna come il nostro training_date per scopi di gestione dei dati.
    %%spark -o df
    
    from pyspark.sql.functions import current_date
    
    df = (
      store_item_history
        .groupBy('store', 'item')
          .applyInPandas(forecast_store_item, schema=result_schema)
        .withColumn('training_date', current_date() )
        )
    
    df.createOrReplaceTempView('new_forecasts')
    from autovizwidget.widget.utils import display_dataframe
    display_dataframe(df)
  5. Rendi persistenti i dati di output delle previsioni creando una tabella delta.
    %%spark
    
    db_name = "gold_db"
    spark.sql("CREATE DATABASE IF NOT EXISTS " + db_name)
    
    spark.sql("USE " + db_name)
    %%spark
    
    spark.sql("""create table if not exists forecasts (
                  date date,
                  store integer,
                  item integer,
                  sales float,
                  sales_predicted float,
                  sales_predicted_upper float,
                  sales_predicted_lower float,
                  training_date date
                  )
                using delta
                partitioned by (date)""")
    
    spark.sql("""merge into forecasts f
                    using new_forecasts n 
                    on f.date = n.ds and f.store = n.store and f.item = n.item
                    when matched then update set f.date = n.ds,
                      f.store = n.store,
                      f.item = n.item,
                      f.sales = n.y,
                      f.sales_predicted = n.yhat,
                      f.sales_predicted_upper = n.yhat_upper,
                      f.sales_predicted_lower = n.yhat_lower,
                      f.training_date = n.training_date
                    when not matched then insert (date,
                      store,
                      item,
                      sales,
                      sales_predicted,
                      sales_predicted_upper,
                      sales_predicted_lower,
                      training_date)
                    values (n.ds,
                      n.store,
                      n.item,
                      n.y,
                      n.yhat,
                      n.yhat_upper,
                      n.yhat_lower,
                      n.training_date)""")
  6. Quanto è buona (o cattiva) ogni previsione? Applichiamo le stesse tecniche per valutare ogni previsione. Utilizzando la tecnica della funzione pandas, è possibile generare metriche di valutazione per ogni previsione negozio-articolo.
    %%spark
    
    # schema of expected result set
    eval_schema =StructType([
      StructField('training_date', DateType()),
      StructField('store', IntegerType()),
      StructField('item', IntegerType()),
      StructField('mae', FloatType()),
      StructField('mse', FloatType()),
      StructField('rmse', FloatType())
      ])
     
    # define function to calculate metrics
    def evaluate_forecast( evaluation_pd: pd.DataFrame ) -> pd.DataFrame:
      
      # get store & item in incoming data set
      training_date = evaluation_pd['training_date'].iloc[0]
      store = evaluation_pd['store'].iloc[0]
      item = evaluation_pd['item'].iloc[0]
      
      # calculate evaluation metrics
      mae = mean_absolute_error( evaluation_pd['y'], evaluation_pd['yhat'] )
      mse = mean_squared_error( evaluation_pd['y'], evaluation_pd['yhat'] )
      rmse = sqrt( mse )
      
      # assemble result set
      results = {'training_date':[training_date], 'store':[store], 'item':[item], 'mae':[mae], 'mse':[mse], 'rmse':[rmse]}
      return pd.DataFrame.from_dict( results )
     
    # calculate metrics
    results = (
      spark
        .table('new_forecasts')
        .filter('ds < \'2018-01-01\'') # limit evaluation to periods where we have historical data
        .select('training_date', 'store', 'item', 'y', 'yhat')
        .groupBy('training_date', 'store', 'item')
        .applyInPandas(evaluate_forecast, schema=eval_schema)
        )
  7. Una volta ancora, continuiamo a rendere persistenti le metriche di valutazione. È probabile che desideriamo segnalare le metriche per ogni previsione, in modo da renderle persistenti in una tabella delta su cui è possibile eseguire query.
    %%spark
    
    spark.sql("""create table if not exists forecast_evals (
                  store integer,
                  item integer,
                  mae float,
                  mse float,
                  rmse float,
                  training_date date
                  )
                using delta
                partitioned by (training_date)""")
    
    spark.sql("""insert into forecast_evals
                select
                  store,
                  item,
                  mae,
                  mse,
                  rmse,
                  training_date
                from new_forecast_evals""")
  8. È ora stata creata una previsione per ogni combinazione negozio-articolo e sono state generate metriche di valutazione di base per ciascuna combinazione. Per visualizzare questi dati di previsione, è possibile eseguire una query semplice (limitata qui al prodotto uno nei negozi uno a tre).
    %%spark -o df
    
    df=spark.sql("""SELECT
                      store,
                      date,
                      sales_predicted,
                      sales_predicted_upper,
                      sales_predicted_lower
                    FROM forecasts a
                    WHERE item = 1 AND
                          store IN (1, 2, 3) AND
                          date >= '2018-01-01' AND
                          training_date=current_date()
                    ORDER BY store""")
    from autovizwidget.widget.utils import display_dataframe
    display_dataframe(df.head(6))
  9. Infine, recuperiamo le metriche di valutazione.
    %%spark -o df
    
    df=spark.sql("""SELECT
                      store,
                      mae,
                      mse,
                      rmse
                    FROM forecast_evals a
                    WHERE item = 1 AND
                          training_date=current_date()
                    ORDER BY store""")
    
    from autovizwidget.widget.utils import display_dataframe
    display_dataframe(df.head(6))
Abbiamo ora impostato un notebook OCI Data Science e creato una sessione di Flusso dati OCI per eseguire in modo interattivo l'applicazione Demand Forecasting (PySpark) su larga scala.

Per evitare addebiti aggiuntivi, elimina o interrompi tutte le risorse create per eseguire il test di questa soluzione e cleanup dei bucket dell'area di memorizzazione degli oggetti OCI.

Per iniziare a utilizzare OCI Data Flow oggi stesso, iscriviti alla prova gratuita di Oracle Cloud o accedi al tuo account esistente. Prova l'esercitazione non installata di Data Flow di 15 minuti per scoprire quanto sia facile elaborare Spark con Oracle Cloud Infrastructure.