Bedarfsprognosen generieren

In diesen Schritten wird beschrieben, wie Bedarfsprognosen in Ihrem OCI Data Science-Notizbuch generiert werden.

Umgebung einrichten

  1. Richten Sie im Data Science-Notizbuch die OCI Data Flow-Session ein. Verwenden Sie die python-Library pip install prophet, wenn Sie das benutzerdefinierte Conda-Package erstellen.
    import ads
    
    ads.set_auth("resource_principal")
    
    %reload_ext dataflow.magics
    
    %help
    
    import json
    command = {
        "compartmentId": "your-compartment-ocid",
        "displayName": "display-name",
        "sparkVersion": "3.2.1",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "metastoreId": "your-metastore-ocid",
        "configuration":{
                 "spark.archives":"oci://your-oci-bucket-name@your-oci-tenancy-name/your-custom-conda-env-path#conda",
                         "spark.oracle.datasource.enabled":"true",
                         "spark.delta.logStore.oci.impl":"io.delta.storage.OracleCloudLogStore",
                         "spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
                         "spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"}
    }
    command = f'\'{json.dumps(command)}\''
    print("command",command)
    
  2. Importieren Sie abhängige python-Bibliotheken. Beispiel:
    %%spark
    #Import required libraries.
    
    import json
    import os
    import sys
    import datetime
    import oci
    import pyspark.sql 
    from pyspark.sql.functions import countDistinct
    
    from pyspark.ml import Pipeline
    from pyspark.ml.classification import DecisionTreeClassifier
    from pyspark.ml.feature import StringIndexer, VectorIndexer
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    # $example off$
    from pyspark.sql import SparkSession
    
    from delta.tables import *
    
    import pandas as pd
    
    from prophet import Prophet
    import logging
    
    import matplotlib.pyplot as plt
    

Daten untersuchen

  1. Prüfen Sie die Daten.
    Für unser Trainings-Dataset nutzen wir fünf Jahre Verkaufsdaten für Artikeleinheiten für 50 Artikel in 10 verschiedenen Filialen.
    %%spark
    
    from pyspark.sql.types import *
    
    # structure of the training data set
    
    train_schema = StructType([
      StructField('date', DateType()),
      StructField('store', IntegerType()),
      StructField('item', IntegerType()),
      StructField('sales', IntegerType())
      ])
    
    # read the training file into a dataframe
    
    train = spark.read.csv(
      'oci://demo-bucket@orasenatdpltintegration03/forecast/bronze/train.csv', 
      header=True, 
      schema=train_schema
      )
    
    # make the dataframe queryable as a temporary view
    
    train.createOrReplaceTempView('train')
    
    # show data
    
    train.show()
    
  2. Sehen Sie sich jährliche Trends bei der Ausführung von Bedarfsprognosen an, wir sind häufig an allgemeinen Trends und Saisonalität interessiert. Beginnen wir mit unserer Untersuchung, indem wir den jährlichen Trend im Umsatz pro Einheit untersuchen.
    %%spark
    
    df = spark.sql(""" SELECT
      year(date) as year, 
      sum(sales) as sales
    FROM train
    GROUP BY year(date)
    ORDER BY year;""")
    
    df.show(5)
    
    Aus den Daten ist deutlich zu erkennen, dass der Gesamtumsatz pro Einheit in den Filialen im Allgemeinen nach oben geht. Wenn wir bessere Kenntnisse über die von diesen Geschäften angebotenen Märkte hätten, könnten wir feststellen, ob es eine maximale Wachstumskapazität gibt, die wir im Laufe unserer Prognose erwarten würden. Ohne dieses Wissen und durch das schnelle Anzeigen dieses Datensatzes ist es sicher, davon auszugehen, dass wir ein paar Tage, Monate oder sogar ein Jahr später mit einem kontinuierlichen linearen Wachstum über diesen Zeitraum rechnen können, wenn unser Ziel darin besteht, eine Prognose zu erstellen.
  3. Monatstrends anzeigen.
    Sehen wir uns die Saisonalität an. Wenn wir die Daten rund um die einzelnen Monate jedes Jahres aggregieren, wird ein deutlich jährliches saisonales Muster beobachtet, das mit einem allgemeinen Umsatzwachstum skalierbar zu werden scheint.
    %%spark -o df
    
    df = spark.sql("""SELECT 
      TRUNC(date, 'MM') as month,
      SUM(sales) as sales
    FROM train
    GROUP BY TRUNC(date, 'MM')
    ORDER BY month;""")
    
    from autovizwidget.widget.utils import display_dataframe
    display_dataframe(df.head(6))
  4. Wochentagstrends anzeigen.
    Durch die Aggregation der Daten auf Wochentag-Ebene wird ein ausgeprägtes wöchentliches saisonales Muster mit einem Spitzenwert am Sonntag (Wochentag 0), einem harten Rückgang am Montag (Wochentag 1) und dann einer stetigen Abholung über die Woche bis zum Sonntagshöhe beobachtet. Dieses Muster scheint in den fünf Jahren der Beobachtungen stabil zu sein.
    %%spark -o df
    
    df = spark.sql(""" 
    SELECT
      YEAR(date) as year,
      (
        CASE
          WHEN DATE_FORMAT(date, 'E') = 'Sun' THEN 0
          WHEN DATE_FORMAT(date, 'E') = 'Mon' THEN 1
          WHEN DATE_FORMAT(date, 'E') = 'Tue' THEN 2
          WHEN DATE_FORMAT(date, 'E') = 'Wed' THEN 3
          WHEN DATE_FORMAT(date, 'E') = 'Thu' THEN 4
          WHEN DATE_FORMAT(date, 'E') = 'Fri' THEN 5
          WHEN DATE_FORMAT(date, 'E') = 'Sat' THEN 6
        END
      ) % 7 as weekday,
      AVG(sales) as sales
    FROM (
      SELECT 
        date,
        SUM(sales) as sales
      FROM train
      GROUP BY date
     ) x
    GROUP BY year, weekday
    ORDER BY year, weekday;""")
    
    from autovizwidget.widget.utils import display_dataframe
    display_dataframe(df.head(6))
    

Einzelne Prognose generieren

Generieren wir eine einzelne Bedarfsprognose, bevor wir versuchen, Prognosen für einzelne Kombinationen von Filialen und Artikeln zu generieren. Es könnte hilfreich sein, aus keinem anderen Grund eine einzelne Prognose zu erstellen, als sich an die Verwendung des Propheten zu orientieren.
  1. Unser erster Schritt besteht darin, das historische Dataset zusammenzustellen, auf dem wir das Modell trainieren werden.
    %%spark
    
    history_pd = spark.sql(""" 
      SELECT
        CAST(date as date) as ds,
        sales as y
      FROM train
      WHERE store=1 AND item=1
      ORDER BY ds
    """).toPandas()
    
    history_pd = history_pd.dropna()
    
    history_pd = history_pd.dropna()
    
    Jetzt importieren wir die Prophetbibliothek, aber weil es ein bisschen verbose sein kann, wenn sie verwendet wird, müssen wir die Logging-Einstellungen in unserer Umgebung optimieren.
    %%spark
    
    from prophet import Prophet
    import logging
    
    # disable informational messages from prophet
    
    logging.getLogger('py4j').setLevel(logging.ERROR)
    %%spark
    
    # set model parameters
    model = Prophet(
      interval_width=0.95,
      growth='linear',
      daily_seasonality=False,
      weekly_seasonality=True,
      yearly_seasonality=True,
      seasonality_mode='multiplicative'
      )
     
    # fit the model to historical data
    model.fit(history_pd)
  2. Als Nächstes erstellen wir eine Prognose.
    %%spark -o df
    
    # define a dataset including both historical dates & 90-days beyond the last available date
    future_pd = model.make_future_dataframe(
      periods=90, 
      freq='d', 
      include_history=True
      )
     
    forecast_pd = model.predict(future_pd)
    
    df=spark.createDataFrame(forecast_pd)
    from autovizwidget.widget.utils import display_dataframe
    display_dataframe(df.head(6))
  3. Prognosekomponenten prüfen
    Wie hat unser Modell funktioniert? Hier sehen wir die allgemeinen und saisonalen Trends in unserem Modell als Diagramme.
    %%spark 
    
    import matplotlib.pyplot as plt
    
    trends_fig = model.plot_components(forecast_pd)
    
    %matplot plt
  4. Historische Daten vs. Vorhersagedaten anzeigen
    Hier können wir sehen, wie sich unsere tatsächlichen und vorhergesagten Daten sowie eine Prognose für die Zukunft erstrecken, auch wenn wir unser Diagramm auf das letzte Jahr historischer Daten beschränken, um es lesbar zu halten.
    %%spark 
    
    predict_fig = model.plot( forecast_pd, xlabel='date', ylabel='sales')
    
    # adjust figure to display dates from last year + the 90 day forecast
    xlim = predict_fig.axes[0].get_xlim()
    new_xlim = ( xlim[1]-(180.0+365.0), xlim[1]-90.0)
    predict_fig.axes[0].set_xlim(new_xlim)
    
    import matplotlib.pyplot as plt
    
    %matplot plt
    Diese Visualisierung ist etwas ausgelastet. Die schwarzen Punkte stellen unsere Istwerte dar, wobei die dunklere blaue Linie unsere Vorhersagen und die hellere blaue Band repräsentiert, die unser (95%) Unsicherheitsintervall darstellt.
  5. Bewertungskennzahlen berechnen
    Eine visuelle Inspektion ist nützlich, aber eine bessere Methode zur Bewertung der Prognose ist die Berechnung des mittleren absoluten Fehlers, des mittleren quadratischen Fehlers und des mittleren quadratischen Fehlers im Verhältnis zu den tatsächlichen Werten in unserem Set.
    %%spark 
    import pandas as pd
    from sklearn.metrics import mean_squared_error, mean_absolute_error
    from math import sqrt
    from datetime import date
    
    # get historical actuals & predictions for comparison
    
    actuals_pd = history_pd[ history_pd['ds'] < date(2018, 1, 1) ]['y']
    predicted_pd = forecast_pd[ forecast_pd['ds'] < pd.to_datetime('2018-01-01') ]['yhat']
    
    # calculate evaluation metrics
    
    mae = mean_absolute_error(actuals_pd, predicted_pd)
    mse = mean_squared_error(actuals_pd, predicted_pd)
    rmse = sqrt(mse)
    
    # print metrics to the screen
    
    print( '\n'.join(['MAE: {0}', 'MSE: {1}', 'RMSE: {2}']).format(mae, mse, rmse) )
    

Skalierbare Prognosen generieren

  1. Prognosegenerierung skalieren.
    Lassen Sie uns mit der Mechanik unter unserem Gürtel unser ursprüngliches Ziel der Erstellung zahlreicher, feinkörniger Modelle und Prognosen für einzelne Shop- und Artikelkombinationen angehen. Beginnen wir mit der Zusammenstellung von Verkaufsdaten auf der Granularitätsebene "store-item-date". Daten für alle Filialartikelkombinationen abrufen. Die Daten in diesem Dataset sollten bereits auf dieser Granularitätsebene aggregiert werden. Wir aggregieren jedoch explizit, um sicherzustellen, dass die erwartete Datenstruktur vorliegt.
    %%spark
    
    sql_statement = '''
      SELECT
        store,
        item,
        CAST(date as date) as ds,
        SUM(sales) as y
      FROM train
      GROUP BY store, item, ds
      ORDER BY store, item, ds
      '''
    
    store_item_history = (
      spark
        .sql( sql_statement )
        .repartition(sc.defaultParallelism, ['store', 'item'])
      ).cache()
  2. Mit unseren Daten, die auf der Ebene des Filialartikels aggregiert werden, müssen wir überlegen, wie wir unsere Daten an den Propheten weitergeben. Wenn unser Ziel darin besteht, ein Modell für jede Filial- und Artikelkombination zu erstellen, müssen wir eine Filialartikeluntergruppe aus dem gerade zusammengestellten Datenset übergeben, ein Modell für diese Untergruppe trainieren und eine Filialartikelprognose erhalten. Wir würden erwarten, dass die Prognose als Dataset mit einer solchen Struktur zurückgegeben wird, in der wir die Filial- und Artikel-IDs, für die die Prognose erstellt wurde, beibehalten, und wir begrenzen die Ausgabe auf die relevante Teilmenge der vom Modell des Propheten generierten Felder.
    %%spark
    
    from pyspark.sql.types import *
    
    result_schema =StructType([
      StructField('ds',DateType()),
      StructField('store',IntegerType()),
      StructField('item',IntegerType()),
      StructField('y',FloatType()),
      StructField('yhat',FloatType()),
      StructField('yhat_upper',FloatType()),
      StructField('yhat_lower',FloatType())
      ])
    
  3. Zum Trainieren des Modells und Generieren einer Prognose nutzen wir eine Pandas-Funktion. Wir definieren diese Funktion, um eine Teilmenge von Daten zu empfangen, die um eine Kombination aus Shop und Artikel organisiert sind. Dadurch wird eine Prognose in dem Format zurückgegeben, das in der vorherigen Zelle angegeben wurde.
    %%spark
    
    def forecast_store_item( history_pd: pd.DataFrame ) -> pd.DataFrame:
    
      # TRAIN MODEL AS BEFORE
      # --------------------------------------
      # remove missing values (more likely at day-store-item level)
    
      history_pd = history_pd.dropna()
    
      # configure the model
      model = Prophet(
        interval_width=0.95,
        growth='linear',
        daily_seasonality=False,
        weekly_seasonality=True,
        yearly_seasonality=True,
        seasonality_mode='multiplicative'
        )
    
      # train the model
      model.fit( history_pd )
      # --------------------------------------
    
      # BUILD FORECAST AS BEFORE
      # --------------------------------------
    
      # make predictions
      future_pd = model.make_future_dataframe(
        periods=90, 
        freq='d', 
        include_history=True
        )
    
      forecast_pd = model.predict( future_pd )  
      # --------------------------------------
    
      # ASSEMBLE EXPECTED RESULT SET
      # --------------------------------------
      # get relevant fields from forecast
      f_pd = forecast_pd[ ['ds','yhat', 'yhat_upper', 'yhat_lower'] ].set_index('ds')
    
      # get relevant fields from history
      h_pd = history_pd[['ds','store','item','y']].set_index('ds')
    
      # join history and forecast
      results_pd = f_pd.join( h_pd, how='left' )
      results_pd.reset_index(level=0, inplace=True)
    
      # get store & item from incoming data set
      results_pd['store'] = history_pd['store'].iloc[0]
      results_pd['item'] = history_pd['item'].iloc[0]
      # --------------------------------------
    
      # return expected dataset
      return results_pd[ ['ds', 'store', 'item', 'y', 'yhat', 'yhat_upper', 'yhat_lower'] ]  
    
  4. Wenden Sie die Prognosefunktion auf jede Kombination aus Filiale und Artikel an.
    Rufen wir unsere Pandas-Funktion an, um unsere Prognosen zu erstellen. Dazu gruppieren wir unser historisches Datenset um Shop und Artikel. Dann wenden wir unsere Funktion auf jede Gruppe an und behandeln das heutige Datum als training_date für Datenmanagementzwecke.
    %%spark -o df
    
    from pyspark.sql.functions import current_date
    
    df = (
      store_item_history
        .groupBy('store', 'item')
          .applyInPandas(forecast_store_item, schema=result_schema)
        .withColumn('training_date', current_date() )
        )
    
    df.createOrReplaceTempView('new_forecasts')
    from autovizwidget.widget.utils import display_dataframe
    display_dataframe(df)
  5. Speichern Sie die Ausgabedaten von Prognosen, indem Sie eine Deltatabelle erstellen.
    %%spark
    
    db_name = "gold_db"
    spark.sql("CREATE DATABASE IF NOT EXISTS " + db_name)
    
    spark.sql("USE " + db_name)
    %%spark
    
    spark.sql("""create table if not exists forecasts (
                  date date,
                  store integer,
                  item integer,
                  sales float,
                  sales_predicted float,
                  sales_predicted_upper float,
                  sales_predicted_lower float,
                  training_date date
                  )
                using delta
                partitioned by (date)""")
    
    spark.sql("""merge into forecasts f
                    using new_forecasts n 
                    on f.date = n.ds and f.store = n.store and f.item = n.item
                    when matched then update set f.date = n.ds,
                      f.store = n.store,
                      f.item = n.item,
                      f.sales = n.y,
                      f.sales_predicted = n.yhat,
                      f.sales_predicted_upper = n.yhat_upper,
                      f.sales_predicted_lower = n.yhat_lower,
                      f.training_date = n.training_date
                    when not matched then insert (date,
                      store,
                      item,
                      sales,
                      sales_predicted,
                      sales_predicted_upper,
                      sales_predicted_lower,
                      training_date)
                    values (n.ds,
                      n.store,
                      n.item,
                      n.y,
                      n.yhat,
                      n.yhat_upper,
                      n.yhat_lower,
                      n.training_date)""")
  6. Wie gut (oder schlecht) ist nun jede Prognose? Wenden wir dieselben Techniken an, um jede Prognose zu bewerten. Mit der pandas-Funktionstechnik können Sie Bewertungsmetriken für jede Filialartikelprognose generieren.
    %%spark
    
    # schema of expected result set
    eval_schema =StructType([
      StructField('training_date', DateType()),
      StructField('store', IntegerType()),
      StructField('item', IntegerType()),
      StructField('mae', FloatType()),
      StructField('mse', FloatType()),
      StructField('rmse', FloatType())
      ])
     
    # define function to calculate metrics
    def evaluate_forecast( evaluation_pd: pd.DataFrame ) -> pd.DataFrame:
      
      # get store & item in incoming data set
      training_date = evaluation_pd['training_date'].iloc[0]
      store = evaluation_pd['store'].iloc[0]
      item = evaluation_pd['item'].iloc[0]
      
      # calculate evaluation metrics
      mae = mean_absolute_error( evaluation_pd['y'], evaluation_pd['yhat'] )
      mse = mean_squared_error( evaluation_pd['y'], evaluation_pd['yhat'] )
      rmse = sqrt( mse )
      
      # assemble result set
      results = {'training_date':[training_date], 'store':[store], 'item':[item], 'mae':[mae], 'mse':[mse], 'rmse':[rmse]}
      return pd.DataFrame.from_dict( results )
     
    # calculate metrics
    results = (
      spark
        .table('new_forecasts')
        .filter('ds < \'2018-01-01\'') # limit evaluation to periods where we have historical data
        .select('training_date', 'store', 'item', 'y', 'yhat')
        .groupBy('training_date', 'store', 'item')
        .applyInPandas(evaluate_forecast, schema=eval_schema)
        )
  7. Daraufhin werden die Bewertungsmetriken beibehalten. Die Metriken für jede Prognose werden wahrscheinlich gemeldet. Daher werden sie in einer abfragbaren Delta-Tabelle gespeichert.
    %%spark
    
    spark.sql("""create table if not exists forecast_evals (
                  store integer,
                  item integer,
                  mae float,
                  mse float,
                  rmse float,
                  training_date date
                  )
                using delta
                partitioned by (training_date)""")
    
    spark.sql("""insert into forecast_evals
                select
                  store,
                  item,
                  mae,
                  mse,
                  rmse,
                  training_date
                from new_forecast_evals""")
  8. Jetzt haben wir eine Prognose für jede Kombination aus Filiale und Artikel erstellt und für jede Kombination Basisbewertungsmetriken generiert. Um diese Prognosedaten anzuzeigen, können wir eine einfache Abfrage ausgeben (hier maximal auf Produkt eins über Filialen hinweg mit einer bis drei).
    %%spark -o df
    
    df=spark.sql("""SELECT
                      store,
                      date,
                      sales_predicted,
                      sales_predicted_upper,
                      sales_predicted_lower
                    FROM forecasts a
                    WHERE item = 1 AND
                          store IN (1, 2, 3) AND
                          date >= '2018-01-01' AND
                          training_date=current_date()
                    ORDER BY store""")
    from autovizwidget.widget.utils import display_dataframe
    display_dataframe(df.head(6))
  9. Schließlich rufen wir Bewertungsmetriken ab.
    %%spark -o df
    
    df=spark.sql("""SELECT
                      store,
                      mae,
                      mse,
                      rmse
                    FROM forecast_evals a
                    WHERE item = 1 AND
                          training_date=current_date()
                    ORDER BY store""")
    
    from autovizwidget.widget.utils import display_dataframe
    display_dataframe(df.head(6))
Wir haben jetzt ein OCI Data Science-Notizbuch eingerichtet und eine OCI Data Flow-Session erstellt, um die Demand Forecasting-Anwendung (PySpark) in großem Maßstab interaktiv auszuführen.

Um zusätzliche Gebühren zu vermeiden, löschen oder beenden Sie alle Ressourcen, die zum Testen dieser Lösung erstellt wurden, und bereinigen Sie die OCI Object Storage-Buckets.

Um noch heute mit OCI Data Flow zu beginnen, registrieren Sie sich für die kostenlose Oracle Cloud-Testversion, oder melden Sie sich bei Ihrem vorhandenen Account an. Testen Sie das 15-minütige Tutorial, das keine Installation erfordert, um zu erfahren, wie einfach die Spark-Verarbeitung mit Oracle Cloud Infrastructure sein kann.