- Genera previsioni della domanda su larga scala utilizzando Oracle Cloud Infrastructure
- Genera previsioni domanda
Genera previsioni domanda
Questi passi descrivono come generare previsioni della domanda nel notebook OCI Data Science.
Imposta ambiente
- Nel notebook di Data Science, impostare la sessione di Flusso dati OCI. Utilizzare la libreria python
pip install prophet
durante la creazione del pacchetto conda personalizzato.import ads ads.set_auth("resource_principal") %reload_ext dataflow.magics %help import json command = { "compartmentId": "your-compartment-ocid", "displayName": "display-name", "sparkVersion": "3.2.1", "driverShape": "VM.Standard.E3.Flex", "executorShape": "VM.Standard.E3.Flex", "driverShapeConfig":{"ocpus":1,"memoryInGBs":16}, "executorShapeConfig":{"ocpus":1,"memoryInGBs":16}, "numExecutors": 1, "metastoreId": "your-metastore-ocid", "configuration":{ "spark.archives":"oci://your-oci-bucket-name@your-oci-tenancy-name/your-custom-conda-env-path#conda", "spark.oracle.datasource.enabled":"true", "spark.delta.logStore.oci.impl":"io.delta.storage.OracleCloudLogStore", "spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension", "spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"} } command = f'\'{json.dumps(command)}\'' print("command",command)
- Importare le librerie python dipendenti. Ad esempio:
%%spark #Import required libraries. import json import os import sys import datetime import oci import pyspark.sql from pyspark.sql.functions import countDistinct from pyspark.ml import Pipeline from pyspark.ml.classification import DecisionTreeClassifier from pyspark.ml.feature import StringIndexer, VectorIndexer from pyspark.ml.evaluation import MulticlassClassificationEvaluator # $example off$ from pyspark.sql import SparkSession from delta.tables import * import pandas as pd from prophet import Prophet import logging import matplotlib.pyplot as plt
Esame dei dati
- Esaminare i dati.Per il nostro set di dati di formazione, utilizzeremo cinque anni di dati di vendita unità articolo negozio per 50 articoli in 10 negozi diversi.
%%spark from pyspark.sql.types import * # structure of the training data set train_schema = StructType([ StructField('date', DateType()), StructField('store', IntegerType()), StructField('item', IntegerType()), StructField('sales', IntegerType()) ]) # read the training file into a dataframe train = spark.read.csv( 'oci://demo-bucket@orasenatdpltintegration03/forecast/bronze/train.csv', header=True, schema=train_schema ) # make the dataframe queryable as a temporary view train.createOrReplaceTempView('train') # show data train.show()
- Visualizza le tendenze annuali durante l'esecuzione della previsione della domanda, siamo spesso interessati alle tendenze generali e alla stagionalità. Iniziamo la nostra esplorazione esaminando l'andamento annuale delle vendite unitarie.
%%spark df = spark.sql(""" SELECT year(date) as year, sum(sales) as sales FROM train GROUP BY year(date) ORDER BY year;""") df.show(5)
È molto chiaro dai dati che esiste una tendenza generalmente al rialzo delle vendite unitarie totali tra i negozi. Se avessimo una migliore conoscenza dei mercati serviti da questi negozi, potremmo voler identificare se c'è una capacità di crescita massima che ci aspettiamo di affrontare durante la vita delle nostre previsioni. Senza tale conoscenza e vedendo rapidamente questo dataset, ci si sente al sicuro presumere che se il nostro obiettivo è fare una previsione qualche giorno, mesi o anche un anno di fuori, ci si potrebbe aspettare una crescita lineare continua durante quell'arco di tempo. - Visualizza le tendenze mensili.Esaminiamo la stagionalità. Se aggreghiamo i dati relativi ai singoli mesi in ogni anno, viene osservato un modello stagionale annuale distinto che sembra crescere in scala con la crescita complessiva delle vendite.
%%spark -o df df = spark.sql("""SELECT TRUNC(date, 'MM') as month, SUM(sales) as sales FROM train GROUP BY TRUNC(date, 'MM') ORDER BY month;""")
from autovizwidget.widget.utils import display_dataframe display_dataframe(df.head(6))
- Visualizza le tendenze dei giorni feriali.Aggregando i dati a livello di giorno feriale, si osserva un pattern stagionale settimanale pronunciato con un picco di domenica (giorno feriale 0), una caduta dura del lunedì (giorno feriale 1) e poi un ritiro fisso della settimana che torna alla domenica alta. Questo modello sembra essere stabile nei cinque anni di osservazioni.
%%spark -o df df = spark.sql(""" SELECT YEAR(date) as year, ( CASE WHEN DATE_FORMAT(date, 'E') = 'Sun' THEN 0 WHEN DATE_FORMAT(date, 'E') = 'Mon' THEN 1 WHEN DATE_FORMAT(date, 'E') = 'Tue' THEN 2 WHEN DATE_FORMAT(date, 'E') = 'Wed' THEN 3 WHEN DATE_FORMAT(date, 'E') = 'Thu' THEN 4 WHEN DATE_FORMAT(date, 'E') = 'Fri' THEN 5 WHEN DATE_FORMAT(date, 'E') = 'Sat' THEN 6 END ) % 7 as weekday, AVG(sales) as sales FROM ( SELECT date, SUM(sales) as sales FROM train GROUP BY date ) x GROUP BY year, weekday ORDER BY year, weekday;""")
from autovizwidget.widget.utils import display_dataframe display_dataframe(df.head(6))
Genera una singola previsione
Generare una singola previsione della domanda prima di tentare di generare previsioni per singole combinazioni di negozi e articoli. Potrebbe essere utile costruire un'unica previsione per nessun altro motivo che orientarci all'uso del profeta.
- Il nostro primo passo è quello di assemblare il set di dati storici su cui formaremo il modello.
%%spark history_pd = spark.sql(""" SELECT CAST(date as date) as ds, sales as y FROM train WHERE store=1 AND item=1 ORDER BY ds """).toPandas() history_pd = history_pd.dropna() history_pd = history_pd.dropna()
Ora, importaremo la biblioteca dei profeti, ma perché può essere un po 'verosa quando in uso, dobbiamo perfezionare le impostazioni di registrazione nel nostro ambiente.%%spark from prophet import Prophet import logging # disable informational messages from prophet logging.getLogger('py4j').setLevel(logging.ERROR)
%%spark # set model parameters model = Prophet( interval_width=0.95, growth='linear', daily_seasonality=False, weekly_seasonality=True, yearly_seasonality=True, seasonality_mode='multiplicative' ) # fit the model to historical data model.fit(history_pd)
- È quindi possibile creare una previsione.
%%spark -o df # define a dataset including both historical dates & 90-days beyond the last available date future_pd = model.make_future_dataframe( periods=90, freq='d', include_history=True ) forecast_pd = model.predict(future_pd) df=spark.createDataFrame(forecast_pd)
from autovizwidget.widget.utils import display_dataframe display_dataframe(df.head(6))
- Esaminare i componenti della previsione.Come ha funzionato il nostro modello? Qui possiamo vedere le tendenze generali e stagionali nel nostro modello presentato come grafici.
%%spark import matplotlib.pyplot as plt trends_fig = model.plot_components(forecast_pd) %matplot plt
- Visualizza dati cronologici rispetto a dati previsione.Qui, possiamo vedere come la nostra linea di dati effettiva e prevista sia una previsione per il futuro, anche se limiteremo il nostro grafico all'ultimo anno di dati storici per mantenerlo leggibile.
%%spark predict_fig = model.plot( forecast_pd, xlabel='date', ylabel='sales') # adjust figure to display dates from last year + the 90 day forecast xlim = predict_fig.axes[0].get_xlim() new_xlim = ( xlim[1]-(180.0+365.0), xlim[1]-90.0) predict_fig.axes[0].set_xlim(new_xlim) import matplotlib.pyplot as plt %matplot plt
Questa visualizzazione è un po' occupata. I punti neri rappresentano i nostri effettivi con la linea blu più scura che rappresenta le nostre previsioni e la banda blu più leggera che rappresenta il nostro intervallo di incertezza (95%). - Calcola le metriche di valutazione.L'ispezione visiva è utile, ma un modo migliore per valutare la previsione è calcolare i valori Errore assoluto medio, Errore quadratico medio e Errore quadratico medio radice per i valori previsti rispetto ai valori effettivi nel nostro set.
%%spark import pandas as pd from sklearn.metrics import mean_squared_error, mean_absolute_error from math import sqrt from datetime import date # get historical actuals & predictions for comparison actuals_pd = history_pd[ history_pd['ds'] < date(2018, 1, 1) ]['y'] predicted_pd = forecast_pd[ forecast_pd['ds'] < pd.to_datetime('2018-01-01') ]['yhat'] # calculate evaluation metrics mae = mean_absolute_error(actuals_pd, predicted_pd) mse = mean_squared_error(actuals_pd, predicted_pd) rmse = sqrt(mse) # print metrics to the screen print( '\n'.join(['MAE: {0}', 'MSE: {1}', 'RMSE: {2}']).format(mae, mse, rmse) )
Genera previsioni su larga scala
- Ridimensiona la generazione delle previsioni.Con la meccanica sotto la nostra cintura, affrontiamo il nostro obiettivo originale di costruire numerosi e raffinati modelli e previsioni per singole combinazioni di negozi e articoli. Inizieremo assemblando i dati di vendita al livello di granularità della data articolo del negozio. Recupera i dati per tutte le combinazioni negozio-articolo. I dati di questo set di dati dovrebbero già essere aggregati a questo livello di granularità, ma stiamo aggregando esplicitamente per garantire che abbiamo la struttura di dati prevista.
%%spark sql_statement = ''' SELECT store, item, CAST(date as date) as ds, SUM(sales) as y FROM train GROUP BY store, item, ds ORDER BY store, item, ds ''' store_item_history = ( spark .sql( sql_statement ) .repartition(sc.defaultParallelism, ['store', 'item']) ).cache()
- Con i nostri dati aggregati a livello di data dell'elemento di memorizzazione, dobbiamo considerare come trasmetteremo i nostri dati al profeta. Se il nostro obiettivo è creare un modello per ogni combinazione di negozio e articolo, sarà necessario passare un sottoinsieme di articoli negozio dal set di dati appena assemblato, addestrare un modello su tale sottoinsieme e ricevere una previsione negozio-articolo indietro. Prevediamo che la previsione venga restituita come set di dati con una struttura del genere in cui manteniamo gli identificativi di negozio e articolo per i quali la previsione è stata assemblata e limiteremo l'output solo al relativo subset di campi generati dal modello Profet.
%%spark from pyspark.sql.types import * result_schema =StructType([ StructField('ds',DateType()), StructField('store',IntegerType()), StructField('item',IntegerType()), StructField('y',FloatType()), StructField('yhat',FloatType()), StructField('yhat_upper',FloatType()), StructField('yhat_lower',FloatType()) ])
- Per addestrare il modello e generare una previsione, sfrutteremo una funzione Pandas. Definire questa funzione per ricevere un subset di dati organizzati in base a una combinazione di negozio e articolo. Verrà restituita una previsione nel formato identificato nella cella precedente.
%%spark def forecast_store_item( history_pd: pd.DataFrame ) -> pd.DataFrame: # TRAIN MODEL AS BEFORE # -------------------------------------- # remove missing values (more likely at day-store-item level) history_pd = history_pd.dropna() # configure the model model = Prophet( interval_width=0.95, growth='linear', daily_seasonality=False, weekly_seasonality=True, yearly_seasonality=True, seasonality_mode='multiplicative' ) # train the model model.fit( history_pd ) # -------------------------------------- # BUILD FORECAST AS BEFORE # -------------------------------------- # make predictions future_pd = model.make_future_dataframe( periods=90, freq='d', include_history=True ) forecast_pd = model.predict( future_pd ) # -------------------------------------- # ASSEMBLE EXPECTED RESULT SET # -------------------------------------- # get relevant fields from forecast f_pd = forecast_pd[ ['ds','yhat', 'yhat_upper', 'yhat_lower'] ].set_index('ds') # get relevant fields from history h_pd = history_pd[['ds','store','item','y']].set_index('ds') # join history and forecast results_pd = f_pd.join( h_pd, how='left' ) results_pd.reset_index(level=0, inplace=True) # get store & item from incoming data set results_pd['store'] = history_pd['store'].iloc[0] results_pd['item'] = history_pd['item'].iloc[0] # -------------------------------------- # return expected dataset return results_pd[ ['ds', 'store', 'item', 'y', 'yhat', 'yhat_upper', 'yhat_lower'] ]
- Applica la funzione di previsione a ogni combinazione negozio-articolo.Chiamiamo la nostra funzione pandas per costruire le nostre previsioni. Lo facciamo raggruppando il nostro data set storico intorno al negozio e all'articolo. Applichiamo quindi la nostra funzione a ogni gruppo e affrontiamo la data odierna come il nostro
training_date
per scopi di gestione dei dati.%%spark -o df from pyspark.sql.functions import current_date df = ( store_item_history .groupBy('store', 'item') .applyInPandas(forecast_store_item, schema=result_schema) .withColumn('training_date', current_date() ) ) df.createOrReplaceTempView('new_forecasts')
from autovizwidget.widget.utils import display_dataframe display_dataframe(df)
- Rendi persistenti i dati di output delle previsioni creando una tabella delta.
%%spark db_name = "gold_db" spark.sql("CREATE DATABASE IF NOT EXISTS " + db_name) spark.sql("USE " + db_name)
%%spark spark.sql("""create table if not exists forecasts ( date date, store integer, item integer, sales float, sales_predicted float, sales_predicted_upper float, sales_predicted_lower float, training_date date ) using delta partitioned by (date)""") spark.sql("""merge into forecasts f using new_forecasts n on f.date = n.ds and f.store = n.store and f.item = n.item when matched then update set f.date = n.ds, f.store = n.store, f.item = n.item, f.sales = n.y, f.sales_predicted = n.yhat, f.sales_predicted_upper = n.yhat_upper, f.sales_predicted_lower = n.yhat_lower, f.training_date = n.training_date when not matched then insert (date, store, item, sales, sales_predicted, sales_predicted_upper, sales_predicted_lower, training_date) values (n.ds, n.store, n.item, n.y, n.yhat, n.yhat_upper, n.yhat_lower, n.training_date)""")
- Quanto è buona (o cattiva) ogni previsione? Applichiamo le stesse tecniche per valutare ogni previsione. Utilizzando la tecnica della funzione pandas, è possibile generare metriche di valutazione per ogni previsione negozio-articolo.
%%spark # schema of expected result set eval_schema =StructType([ StructField('training_date', DateType()), StructField('store', IntegerType()), StructField('item', IntegerType()), StructField('mae', FloatType()), StructField('mse', FloatType()), StructField('rmse', FloatType()) ]) # define function to calculate metrics def evaluate_forecast( evaluation_pd: pd.DataFrame ) -> pd.DataFrame: # get store & item in incoming data set training_date = evaluation_pd['training_date'].iloc[0] store = evaluation_pd['store'].iloc[0] item = evaluation_pd['item'].iloc[0] # calculate evaluation metrics mae = mean_absolute_error( evaluation_pd['y'], evaluation_pd['yhat'] ) mse = mean_squared_error( evaluation_pd['y'], evaluation_pd['yhat'] ) rmse = sqrt( mse ) # assemble result set results = {'training_date':[training_date], 'store':[store], 'item':[item], 'mae':[mae], 'mse':[mse], 'rmse':[rmse]} return pd.DataFrame.from_dict( results ) # calculate metrics results = ( spark .table('new_forecasts') .filter('ds < \'2018-01-01\'') # limit evaluation to periods where we have historical data .select('training_date', 'store', 'item', 'y', 'yhat') .groupBy('training_date', 'store', 'item') .applyInPandas(evaluate_forecast, schema=eval_schema) )
- Una volta ancora, continuiamo a rendere persistenti le metriche di valutazione. È probabile che desideriamo segnalare le metriche per ogni previsione, in modo da renderle persistenti in una tabella delta su cui è possibile eseguire query.
%%spark spark.sql("""create table if not exists forecast_evals ( store integer, item integer, mae float, mse float, rmse float, training_date date ) using delta partitioned by (training_date)""") spark.sql("""insert into forecast_evals select store, item, mae, mse, rmse, training_date from new_forecast_evals""")
- È ora stata creata una previsione per ogni combinazione negozio-articolo e sono state generate metriche di valutazione di base per ciascuna combinazione. Per visualizzare questi dati di previsione, è possibile eseguire una query semplice (limitata qui al prodotto uno nei negozi uno a tre).
%%spark -o df df=spark.sql("""SELECT store, date, sales_predicted, sales_predicted_upper, sales_predicted_lower FROM forecasts a WHERE item = 1 AND store IN (1, 2, 3) AND date >= '2018-01-01' AND training_date=current_date() ORDER BY store""")
from autovizwidget.widget.utils import display_dataframe display_dataframe(df.head(6))
- Infine, recuperiamo le metriche di valutazione.
%%spark -o df df=spark.sql("""SELECT store, mae, mse, rmse FROM forecast_evals a WHERE item = 1 AND training_date=current_date() ORDER BY store""")
from autovizwidget.widget.utils import display_dataframe display_dataframe(df.head(6))
Abbiamo ora impostato un notebook OCI Data Science e creato una sessione di Flusso dati OCI per eseguire in modo interattivo l'applicazione Demand Forecasting (PySpark) su larga scala.
Per evitare addebiti aggiuntivi, elimina o interrompi tutte le risorse create per eseguire il test di questa soluzione e cleanup dei bucket dell'area di memorizzazione degli oggetti OCI.
Per iniziare a utilizzare OCI Data Flow oggi stesso, iscriviti alla prova gratuita di Oracle Cloud o accedi al tuo account esistente. Prova l'esercitazione non installata di Data Flow di 15 minuti per scoprire quanto sia facile elaborare Spark con Oracle Cloud Infrastructure.