- Generieren Sie skalierbare Bedarfsprognosen mit Oracle Cloud Infrastructure
- Bedarfsprognosen generieren
Bedarfsprognosen generieren
In diesen Schritten wird beschrieben, wie Bedarfsprognosen in Ihrem OCI Data Science-Notizbuch generiert werden.
Umgebung einrichten
- Richten Sie im Data Science-Notizbuch die OCI Data Flow-Session ein. Verwenden Sie die python-Library
pip install prophet
, wenn Sie das benutzerdefinierte Conda-Package erstellen.import ads ads.set_auth("resource_principal") %reload_ext dataflow.magics %help import json command = { "compartmentId": "your-compartment-ocid", "displayName": "display-name", "sparkVersion": "3.2.1", "driverShape": "VM.Standard.E3.Flex", "executorShape": "VM.Standard.E3.Flex", "driverShapeConfig":{"ocpus":1,"memoryInGBs":16}, "executorShapeConfig":{"ocpus":1,"memoryInGBs":16}, "numExecutors": 1, "metastoreId": "your-metastore-ocid", "configuration":{ "spark.archives":"oci://your-oci-bucket-name@your-oci-tenancy-name/your-custom-conda-env-path#conda", "spark.oracle.datasource.enabled":"true", "spark.delta.logStore.oci.impl":"io.delta.storage.OracleCloudLogStore", "spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension", "spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"} } command = f'\'{json.dumps(command)}\'' print("command",command)
- Importieren Sie abhängige python-Bibliotheken. Beispiel:
%%spark #Import required libraries. import json import os import sys import datetime import oci import pyspark.sql from pyspark.sql.functions import countDistinct from pyspark.ml import Pipeline from pyspark.ml.classification import DecisionTreeClassifier from pyspark.ml.feature import StringIndexer, VectorIndexer from pyspark.ml.evaluation import MulticlassClassificationEvaluator # $example off$ from pyspark.sql import SparkSession from delta.tables import * import pandas as pd from prophet import Prophet import logging import matplotlib.pyplot as plt
Daten untersuchen
- Prüfen Sie die Daten.Für unser Trainings-Dataset nutzen wir fünf Jahre Verkaufsdaten für Artikeleinheiten für 50 Artikel in 10 verschiedenen Filialen.
%%spark from pyspark.sql.types import * # structure of the training data set train_schema = StructType([ StructField('date', DateType()), StructField('store', IntegerType()), StructField('item', IntegerType()), StructField('sales', IntegerType()) ]) # read the training file into a dataframe train = spark.read.csv( 'oci://demo-bucket@orasenatdpltintegration03/forecast/bronze/train.csv', header=True, schema=train_schema ) # make the dataframe queryable as a temporary view train.createOrReplaceTempView('train') # show data train.show()
- Sehen Sie sich jährliche Trends bei der Ausführung von Bedarfsprognosen an, wir sind häufig an allgemeinen Trends und Saisonalität interessiert. Beginnen wir mit unserer Untersuchung, indem wir den jährlichen Trend im Umsatz pro Einheit untersuchen.
%%spark df = spark.sql(""" SELECT year(date) as year, sum(sales) as sales FROM train GROUP BY year(date) ORDER BY year;""") df.show(5)
Aus den Daten ist deutlich zu erkennen, dass der Gesamtumsatz pro Einheit in den Filialen im Allgemeinen nach oben geht. Wenn wir bessere Kenntnisse über die von diesen Geschäften angebotenen Märkte hätten, könnten wir feststellen, ob es eine maximale Wachstumskapazität gibt, die wir im Laufe unserer Prognose erwarten würden. Ohne dieses Wissen und durch das schnelle Anzeigen dieses Datensatzes ist es sicher, davon auszugehen, dass wir ein paar Tage, Monate oder sogar ein Jahr später mit einem kontinuierlichen linearen Wachstum über diesen Zeitraum rechnen können, wenn unser Ziel darin besteht, eine Prognose zu erstellen. - Monatstrends anzeigen.Sehen wir uns die Saisonalität an. Wenn wir die Daten rund um die einzelnen Monate jedes Jahres aggregieren, wird ein deutlich jährliches saisonales Muster beobachtet, das mit einem allgemeinen Umsatzwachstum skalierbar zu werden scheint.
%%spark -o df df = spark.sql("""SELECT TRUNC(date, 'MM') as month, SUM(sales) as sales FROM train GROUP BY TRUNC(date, 'MM') ORDER BY month;""")
from autovizwidget.widget.utils import display_dataframe display_dataframe(df.head(6))
- Wochentagstrends anzeigen.Durch die Aggregation der Daten auf Wochentag-Ebene wird ein ausgeprägtes wöchentliches saisonales Muster mit einem Spitzenwert am Sonntag (Wochentag 0), einem harten Rückgang am Montag (Wochentag 1) und dann einer stetigen Abholung über die Woche bis zum Sonntagshöhe beobachtet. Dieses Muster scheint in den fünf Jahren der Beobachtungen stabil zu sein.
%%spark -o df df = spark.sql(""" SELECT YEAR(date) as year, ( CASE WHEN DATE_FORMAT(date, 'E') = 'Sun' THEN 0 WHEN DATE_FORMAT(date, 'E') = 'Mon' THEN 1 WHEN DATE_FORMAT(date, 'E') = 'Tue' THEN 2 WHEN DATE_FORMAT(date, 'E') = 'Wed' THEN 3 WHEN DATE_FORMAT(date, 'E') = 'Thu' THEN 4 WHEN DATE_FORMAT(date, 'E') = 'Fri' THEN 5 WHEN DATE_FORMAT(date, 'E') = 'Sat' THEN 6 END ) % 7 as weekday, AVG(sales) as sales FROM ( SELECT date, SUM(sales) as sales FROM train GROUP BY date ) x GROUP BY year, weekday ORDER BY year, weekday;""")
from autovizwidget.widget.utils import display_dataframe display_dataframe(df.head(6))
Einzelne Prognose generieren
Generieren wir eine einzelne Bedarfsprognose, bevor wir versuchen, Prognosen für einzelne Kombinationen von Filialen und Artikeln zu generieren. Es könnte hilfreich sein, aus keinem anderen Grund eine einzelne Prognose zu erstellen, als sich an die Verwendung des Propheten zu orientieren.
- Unser erster Schritt besteht darin, das historische Dataset zusammenzustellen, auf dem wir das Modell trainieren werden.
%%spark history_pd = spark.sql(""" SELECT CAST(date as date) as ds, sales as y FROM train WHERE store=1 AND item=1 ORDER BY ds """).toPandas() history_pd = history_pd.dropna() history_pd = history_pd.dropna()
Jetzt importieren wir die Prophetbibliothek, aber weil es ein bisschen verbose sein kann, wenn sie verwendet wird, müssen wir die Logging-Einstellungen in unserer Umgebung optimieren.%%spark from prophet import Prophet import logging # disable informational messages from prophet logging.getLogger('py4j').setLevel(logging.ERROR)
%%spark # set model parameters model = Prophet( interval_width=0.95, growth='linear', daily_seasonality=False, weekly_seasonality=True, yearly_seasonality=True, seasonality_mode='multiplicative' ) # fit the model to historical data model.fit(history_pd)
- Als Nächstes erstellen wir eine Prognose.
%%spark -o df # define a dataset including both historical dates & 90-days beyond the last available date future_pd = model.make_future_dataframe( periods=90, freq='d', include_history=True ) forecast_pd = model.predict(future_pd) df=spark.createDataFrame(forecast_pd)
from autovizwidget.widget.utils import display_dataframe display_dataframe(df.head(6))
- Prognosekomponenten prüfenWie hat unser Modell funktioniert? Hier sehen wir die allgemeinen und saisonalen Trends in unserem Modell als Diagramme.
%%spark import matplotlib.pyplot as plt trends_fig = model.plot_components(forecast_pd) %matplot plt
- Historische Daten vs. Vorhersagedaten anzeigenHier können wir sehen, wie sich unsere tatsächlichen und vorhergesagten Daten sowie eine Prognose für die Zukunft erstrecken, auch wenn wir unser Diagramm auf das letzte Jahr historischer Daten beschränken, um es lesbar zu halten.
%%spark predict_fig = model.plot( forecast_pd, xlabel='date', ylabel='sales') # adjust figure to display dates from last year + the 90 day forecast xlim = predict_fig.axes[0].get_xlim() new_xlim = ( xlim[1]-(180.0+365.0), xlim[1]-90.0) predict_fig.axes[0].set_xlim(new_xlim) import matplotlib.pyplot as plt %matplot plt
Diese Visualisierung ist etwas ausgelastet. Die schwarzen Punkte stellen unsere Istwerte dar, wobei die dunklere blaue Linie unsere Vorhersagen und die hellere blaue Band repräsentiert, die unser (95%) Unsicherheitsintervall darstellt. - Bewertungskennzahlen berechnenEine visuelle Inspektion ist nützlich, aber eine bessere Methode zur Bewertung der Prognose ist die Berechnung des mittleren absoluten Fehlers, des mittleren quadratischen Fehlers und des mittleren quadratischen Fehlers im Verhältnis zu den tatsächlichen Werten in unserem Set.
%%spark import pandas as pd from sklearn.metrics import mean_squared_error, mean_absolute_error from math import sqrt from datetime import date # get historical actuals & predictions for comparison actuals_pd = history_pd[ history_pd['ds'] < date(2018, 1, 1) ]['y'] predicted_pd = forecast_pd[ forecast_pd['ds'] < pd.to_datetime('2018-01-01') ]['yhat'] # calculate evaluation metrics mae = mean_absolute_error(actuals_pd, predicted_pd) mse = mean_squared_error(actuals_pd, predicted_pd) rmse = sqrt(mse) # print metrics to the screen print( '\n'.join(['MAE: {0}', 'MSE: {1}', 'RMSE: {2}']).format(mae, mse, rmse) )
Skalierbare Prognosen generieren
- Prognosegenerierung skalieren.Lassen Sie uns mit der Mechanik unter unserem Gürtel unser ursprüngliches Ziel der Erstellung zahlreicher, feinkörniger Modelle und Prognosen für einzelne Shop- und Artikelkombinationen angehen. Beginnen wir mit der Zusammenstellung von Verkaufsdaten auf der Granularitätsebene "store-item-date". Daten für alle Filialartikelkombinationen abrufen. Die Daten in diesem Dataset sollten bereits auf dieser Granularitätsebene aggregiert werden. Wir aggregieren jedoch explizit, um sicherzustellen, dass die erwartete Datenstruktur vorliegt.
%%spark sql_statement = ''' SELECT store, item, CAST(date as date) as ds, SUM(sales) as y FROM train GROUP BY store, item, ds ORDER BY store, item, ds ''' store_item_history = ( spark .sql( sql_statement ) .repartition(sc.defaultParallelism, ['store', 'item']) ).cache()
- Mit unseren Daten, die auf der Ebene des Filialartikels aggregiert werden, müssen wir überlegen, wie wir unsere Daten an den Propheten weitergeben. Wenn unser Ziel darin besteht, ein Modell für jede Filial- und Artikelkombination zu erstellen, müssen wir eine Filialartikeluntergruppe aus dem gerade zusammengestellten Datenset übergeben, ein Modell für diese Untergruppe trainieren und eine Filialartikelprognose erhalten. Wir würden erwarten, dass die Prognose als Dataset mit einer solchen Struktur zurückgegeben wird, in der wir die Filial- und Artikel-IDs, für die die Prognose erstellt wurde, beibehalten, und wir begrenzen die Ausgabe auf die relevante Teilmenge der vom Modell des Propheten generierten Felder.
%%spark from pyspark.sql.types import * result_schema =StructType([ StructField('ds',DateType()), StructField('store',IntegerType()), StructField('item',IntegerType()), StructField('y',FloatType()), StructField('yhat',FloatType()), StructField('yhat_upper',FloatType()), StructField('yhat_lower',FloatType()) ])
- Zum Trainieren des Modells und Generieren einer Prognose nutzen wir eine Pandas-Funktion. Wir definieren diese Funktion, um eine Teilmenge von Daten zu empfangen, die um eine Kombination aus Shop und Artikel organisiert sind. Dadurch wird eine Prognose in dem Format zurückgegeben, das in der vorherigen Zelle angegeben wurde.
%%spark def forecast_store_item( history_pd: pd.DataFrame ) -> pd.DataFrame: # TRAIN MODEL AS BEFORE # -------------------------------------- # remove missing values (more likely at day-store-item level) history_pd = history_pd.dropna() # configure the model model = Prophet( interval_width=0.95, growth='linear', daily_seasonality=False, weekly_seasonality=True, yearly_seasonality=True, seasonality_mode='multiplicative' ) # train the model model.fit( history_pd ) # -------------------------------------- # BUILD FORECAST AS BEFORE # -------------------------------------- # make predictions future_pd = model.make_future_dataframe( periods=90, freq='d', include_history=True ) forecast_pd = model.predict( future_pd ) # -------------------------------------- # ASSEMBLE EXPECTED RESULT SET # -------------------------------------- # get relevant fields from forecast f_pd = forecast_pd[ ['ds','yhat', 'yhat_upper', 'yhat_lower'] ].set_index('ds') # get relevant fields from history h_pd = history_pd[['ds','store','item','y']].set_index('ds') # join history and forecast results_pd = f_pd.join( h_pd, how='left' ) results_pd.reset_index(level=0, inplace=True) # get store & item from incoming data set results_pd['store'] = history_pd['store'].iloc[0] results_pd['item'] = history_pd['item'].iloc[0] # -------------------------------------- # return expected dataset return results_pd[ ['ds', 'store', 'item', 'y', 'yhat', 'yhat_upper', 'yhat_lower'] ]
- Wenden Sie die Prognosefunktion auf jede Kombination aus Filiale und Artikel an.Rufen wir unsere Pandas-Funktion an, um unsere Prognosen zu erstellen. Dazu gruppieren wir unser historisches Datenset um Shop und Artikel. Dann wenden wir unsere Funktion auf jede Gruppe an und behandeln das heutige Datum als
training_date
für Datenmanagementzwecke.%%spark -o df from pyspark.sql.functions import current_date df = ( store_item_history .groupBy('store', 'item') .applyInPandas(forecast_store_item, schema=result_schema) .withColumn('training_date', current_date() ) ) df.createOrReplaceTempView('new_forecasts')
from autovizwidget.widget.utils import display_dataframe display_dataframe(df)
- Speichern Sie die Ausgabedaten von Prognosen, indem Sie eine Deltatabelle erstellen.
%%spark db_name = "gold_db" spark.sql("CREATE DATABASE IF NOT EXISTS " + db_name) spark.sql("USE " + db_name)
%%spark spark.sql("""create table if not exists forecasts ( date date, store integer, item integer, sales float, sales_predicted float, sales_predicted_upper float, sales_predicted_lower float, training_date date ) using delta partitioned by (date)""") spark.sql("""merge into forecasts f using new_forecasts n on f.date = n.ds and f.store = n.store and f.item = n.item when matched then update set f.date = n.ds, f.store = n.store, f.item = n.item, f.sales = n.y, f.sales_predicted = n.yhat, f.sales_predicted_upper = n.yhat_upper, f.sales_predicted_lower = n.yhat_lower, f.training_date = n.training_date when not matched then insert (date, store, item, sales, sales_predicted, sales_predicted_upper, sales_predicted_lower, training_date) values (n.ds, n.store, n.item, n.y, n.yhat, n.yhat_upper, n.yhat_lower, n.training_date)""")
- Wie gut (oder schlecht) ist nun jede Prognose? Wenden wir dieselben Techniken an, um jede Prognose zu bewerten. Mit der pandas-Funktionstechnik können Sie Bewertungsmetriken für jede Filialartikelprognose generieren.
%%spark # schema of expected result set eval_schema =StructType([ StructField('training_date', DateType()), StructField('store', IntegerType()), StructField('item', IntegerType()), StructField('mae', FloatType()), StructField('mse', FloatType()), StructField('rmse', FloatType()) ]) # define function to calculate metrics def evaluate_forecast( evaluation_pd: pd.DataFrame ) -> pd.DataFrame: # get store & item in incoming data set training_date = evaluation_pd['training_date'].iloc[0] store = evaluation_pd['store'].iloc[0] item = evaluation_pd['item'].iloc[0] # calculate evaluation metrics mae = mean_absolute_error( evaluation_pd['y'], evaluation_pd['yhat'] ) mse = mean_squared_error( evaluation_pd['y'], evaluation_pd['yhat'] ) rmse = sqrt( mse ) # assemble result set results = {'training_date':[training_date], 'store':[store], 'item':[item], 'mae':[mae], 'mse':[mse], 'rmse':[rmse]} return pd.DataFrame.from_dict( results ) # calculate metrics results = ( spark .table('new_forecasts') .filter('ds < \'2018-01-01\'') # limit evaluation to periods where we have historical data .select('training_date', 'store', 'item', 'y', 'yhat') .groupBy('training_date', 'store', 'item') .applyInPandas(evaluate_forecast, schema=eval_schema) )
- Daraufhin werden die Bewertungsmetriken beibehalten. Die Metriken für jede Prognose werden wahrscheinlich gemeldet. Daher werden sie in einer abfragbaren Delta-Tabelle gespeichert.
%%spark spark.sql("""create table if not exists forecast_evals ( store integer, item integer, mae float, mse float, rmse float, training_date date ) using delta partitioned by (training_date)""") spark.sql("""insert into forecast_evals select store, item, mae, mse, rmse, training_date from new_forecast_evals""")
- Jetzt haben wir eine Prognose für jede Kombination aus Filiale und Artikel erstellt und für jede Kombination Basisbewertungsmetriken generiert. Um diese Prognosedaten anzuzeigen, können wir eine einfache Abfrage ausgeben (hier maximal auf Produkt eins über Filialen hinweg mit einer bis drei).
%%spark -o df df=spark.sql("""SELECT store, date, sales_predicted, sales_predicted_upper, sales_predicted_lower FROM forecasts a WHERE item = 1 AND store IN (1, 2, 3) AND date >= '2018-01-01' AND training_date=current_date() ORDER BY store""")
from autovizwidget.widget.utils import display_dataframe display_dataframe(df.head(6))
- Schließlich rufen wir Bewertungsmetriken ab.
%%spark -o df df=spark.sql("""SELECT store, mae, mse, rmse FROM forecast_evals a WHERE item = 1 AND training_date=current_date() ORDER BY store""")
from autovizwidget.widget.utils import display_dataframe display_dataframe(df.head(6))
Wir haben jetzt ein OCI Data Science-Notizbuch eingerichtet und eine OCI Data Flow-Session erstellt, um die Demand Forecasting-Anwendung (PySpark) in großem Maßstab interaktiv auszuführen.
Um zusätzliche Gebühren zu vermeiden, löschen oder beenden Sie alle Ressourcen, die zum Testen dieser Lösung erstellt wurden, und bereinigen Sie die OCI Object Storage-Buckets.
Um noch heute mit OCI Data Flow zu beginnen, registrieren Sie sich für die kostenlose Oracle Cloud-Testversion, oder melden Sie sich bei Ihrem vorhandenen Account an. Testen Sie das 15-minütige Tutorial, das keine Installation erfordert, um zu erfahren, wie einfach die Spark-Verarbeitung mit Oracle Cloud Infrastructure sein kann.