需要予測の生成

これらのステップでは、OCIデータ・サイエンス・ノートブックで需要予測を生成する方法について説明します。

環境の設定

  1. データ・サイエンス・ノートブックで、OCIデータ・フロー・セッションを設定します。カスタムcondaパッケージを構築する場合は、pip install prophet pythonライブラリを使用します。
    import ads
    
    ads.set_auth("resource_principal")
    
    %reload_ext dataflow.magics
    
    %help
    
    import json
    command = {
        "compartmentId": "your-compartment-ocid",
        "displayName": "display-name",
        "sparkVersion": "3.2.1",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "metastoreId": "your-metastore-ocid",
        "configuration":{
                 "spark.archives":"oci://your-oci-bucket-name@your-oci-tenancy-name/your-custom-conda-env-path#conda",
                         "spark.oracle.datasource.enabled":"true",
                         "spark.delta.logStore.oci.impl":"io.delta.storage.OracleCloudLogStore",
                         "spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
                         "spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"}
    }
    command = f'\'{json.dumps(command)}\''
    print("command",command)
    
  2. 依存pythonライブラリをインポートします。たとえば:
    %%spark
    #Import required libraries.
    
    import json
    import os
    import sys
    import datetime
    import oci
    import pyspark.sql 
    from pyspark.sql.functions import countDistinct
    
    from pyspark.ml import Pipeline
    from pyspark.ml.classification import DecisionTreeClassifier
    from pyspark.ml.feature import StringIndexer, VectorIndexer
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    # $example off$
    from pyspark.sql import SparkSession
    
    from delta.tables import *
    
    import pandas as pd
    
    from prophet import Prophet
    import logging
    
    import matplotlib.pyplot as plt
    

データの調査

  1. データを確認します。
    トレーニング・データセットでは、10の異なる店舗で50のアイテムの5年間のストア・アイテム・ユニット売上データを利用します。
    %%spark
    
    from pyspark.sql.types import *
    
    # structure of the training data set
    
    train_schema = StructType([
      StructField('date', DateType()),
      StructField('store', IntegerType()),
      StructField('item', IntegerType()),
      StructField('sales', IntegerType())
      ])
    
    # read the training file into a dataframe
    
    train = spark.read.csv(
      'oci://demo-bucket@orasenatdpltintegration03/forecast/bronze/train.csv', 
      header=True, 
      schema=train_schema
      )
    
    # make the dataframe queryable as a temporary view
    
    train.createOrReplaceTempView('train')
    
    # show data
    
    train.show()
    
  2. 需要予測を行う際の年次トレンドを見ることは、一般のトレンドや季節性に関心があることがよくあります。単位販売の年間トレンドを調べて、調査を始めましょう。
    %%spark
    
    df = spark.sql(""" SELECT
      year(date) as year, 
      sum(sales) as sales
    FROM train
    GROUP BY year(date)
    ORDER BY year;""")
    
    df.show(5)
    
    データからは、店舗全体のユニット売上合計に一般的に上昇傾向があることは明らかです。これらの店舗の市場についてよりよく知っていたら、予測の期間中にアプローチすると予想される最大成長能力があるかどうかを判断したいかもしれません。その知識がなければ、そしてこのデータセットをすぐに見ることで、私たちの目標が数日、数か月、あるいは1年で予測することであれば、その期間にわたって継続的な線形成長を期待できると仮定することは安全です。
  3. 月次トレンドを表示します。
    季節性を見てみましょう。毎年各月のデータを集計すると、売上全体の成長に伴って規模が拡大しているように見える、個別の年次季節パターンが観察されます。
    %%spark -o df
    
    df = spark.sql("""SELECT 
      TRUNC(date, 'MM') as month,
      SUM(sales) as sales
    FROM train
    GROUP BY TRUNC(date, 'MM')
    ORDER BY month;""")
    
    from autovizwidget.widget.utils import display_dataframe
    display_dataframe(df.head(6))
  4. 平日の傾向を表示します。
    平日レベルでデータを集計することで、発音された週次季節パターンが日曜日のピーク(週日0)で確認され、月曜日のハード・ドロップ(週日1)で確認され、週間の安定したピックアップが日曜日の高値に戻されます。このパターンは5年間の観察で安定していると考えられる。
    %%spark -o df
    
    df = spark.sql(""" 
    SELECT
      YEAR(date) as year,
      (
        CASE
          WHEN DATE_FORMAT(date, 'E') = 'Sun' THEN 0
          WHEN DATE_FORMAT(date, 'E') = 'Mon' THEN 1
          WHEN DATE_FORMAT(date, 'E') = 'Tue' THEN 2
          WHEN DATE_FORMAT(date, 'E') = 'Wed' THEN 3
          WHEN DATE_FORMAT(date, 'E') = 'Thu' THEN 4
          WHEN DATE_FORMAT(date, 'E') = 'Fri' THEN 5
          WHEN DATE_FORMAT(date, 'E') = 'Sat' THEN 6
        END
      ) % 7 as weekday,
      AVG(sales) as sales
    FROM (
      SELECT 
        date,
        SUM(sales) as sales
      FROM train
      GROUP BY date
     ) x
    GROUP BY year, weekday
    ORDER BY year, weekday;""")
    
    from autovizwidget.widget.utils import display_dataframe
    display_dataframe(df.head(6))
    

単一予測の生成

店舗と品目の個々の組合せに対して予測を生成する前に、単一の需要予測を生成しましょう。1つの予測は、預言者の使用に自らを回すこと以外に何の理由もなく成し遂げられるかもしれません。
  1. 最初のステップは、モデルをトレーニングする履歴データセットを組み立てることです。
    %%spark
    
    history_pd = spark.sql(""" 
      SELECT
        CAST(date as date) as ds,
        sales as y
      FROM train
      WHERE store=1 AND item=1
      ORDER BY ds
    """).toPandas()
    
    history_pd = history_pd.dropna()
    
    history_pd = history_pd.dropna()
    
    ここで、預言者ライブラリをインポートしますが、使用時に少し冗長になる可能性があるため、環境内のロギング設定を微調整する必要があります。
    %%spark
    
    from prophet import Prophet
    import logging
    
    # disable informational messages from prophet
    
    logging.getLogger('py4j').setLevel(logging.ERROR)
    %%spark
    
    # set model parameters
    model = Prophet(
      interval_width=0.95,
      growth='linear',
      daily_seasonality=False,
      weekly_seasonality=True,
      yearly_seasonality=True,
      seasonality_mode='multiplicative'
      )
     
    # fit the model to historical data
    model.fit(history_pd)
  2. 次に、予測を作成します。
    %%spark -o df
    
    # define a dataset including both historical dates & 90-days beyond the last available date
    future_pd = model.make_future_dataframe(
      periods=90, 
      freq='d', 
      include_history=True
      )
     
    forecast_pd = model.predict(future_pd)
    
    df=spark.createDataFrame(forecast_pd)
    from autovizwidget.widget.utils import display_dataframe
    display_dataframe(df.head(6))
  3. 予測コンポーネントを確認します。
    モデルはどのように実行されましたか?ここでは、モデルの一般的なトレンドと季節的なトレンドをグラフとして表示できます。
    %%spark 
    
    import matplotlib.pyplot as plt
    
    trends_fig = model.plot_components(forecast_pd)
    
    %matplot plt
  4. 履歴データと予測データを表示します。
    ここでは、実際のデータと予測データがどのように並んでいるか、および将来の予測を確認できます。ただし、グラフは過去1年間のデータを読みやすくするために制限します。
    %%spark 
    
    predict_fig = model.plot( forecast_pd, xlabel='date', ylabel='sales')
    
    # adjust figure to display dates from last year + the 90 day forecast
    xlim = predict_fig.axes[0].get_xlim()
    new_xlim = ( xlim[1]-(180.0+365.0), xlim[1]-90.0)
    predict_fig.axes[0].set_xlim(new_xlim)
    
    import matplotlib.pyplot as plt
    
    %matplot plt
    このビジュアライゼーションは多少ビジーです。黒い点は、予測を表す濃い青色の線と、(95%)の不確実性の間隔を表す明るい青色のバンドの実績を表します。
  5. 評価メトリックを計算します。
    ビジュアル検査は便利ですが、予測を評価するより適切な方法は、予測される平均絶対誤差、平均平方誤差および二乗平均平方誤差値を、セット内の実際の値に対して相対的に計算することです。
    %%spark 
    import pandas as pd
    from sklearn.metrics import mean_squared_error, mean_absolute_error
    from math import sqrt
    from datetime import date
    
    # get historical actuals & predictions for comparison
    
    actuals_pd = history_pd[ history_pd['ds'] < date(2018, 1, 1) ]['y']
    predicted_pd = forecast_pd[ forecast_pd['ds'] < pd.to_datetime('2018-01-01') ]['yhat']
    
    # calculate evaluation metrics
    
    mae = mean_absolute_error(actuals_pd, predicted_pd)
    mse = mean_squared_error(actuals_pd, predicted_pd)
    rmse = sqrt(mse)
    
    # print metrics to the screen
    
    print( '\n'.join(['MAE: {0}', 'MSE: {1}', 'RMSE: {2}']).format(mae, mse, rmse) )
    

大規模な予測の生成

  1. 予測生成をスケーリングします。
    弊社のベルトの下にあるメカニクスで、店舗とアイテムの個々の組合せに対して、多数の細かいモデルと予測を構築するという当初の目標に取り組みます。まず、店舗アイテム日付の粒度レベルで販売データを組み立てます。すべてのストアとアイテムの組合せのデータを取得します。このデータ・セット内のデータは、このレベルの粒度ですでに集計する必要がありますが、予期されるデータ構造を確保するために明示的に集計しています。
    %%spark
    
    sql_statement = '''
      SELECT
        store,
        item,
        CAST(date as date) as ds,
        SUM(sales) as y
      FROM train
      GROUP BY store, item, ds
      ORDER BY store, item, ds
      '''
    
    store_item_history = (
      spark
        .sql( sql_statement )
        .repartition(sc.defaultParallelism, ['store', 'item'])
      ).cache()
  2. データがストア・アイテム日付レベルで集計されたため、データを預言者にどのように渡すかを検討する必要があります。店舗とアイテムの組合せごとにモデルを作成することが目標である場合、これまでに組み立てたデータセットから店舗アイテム・サブセットを渡し、そのサブセットでモデルをトレーニングし、店舗アイテムの予測を取り戻す必要があります。予測は、予測が組み立てられた店舗識別子と品目識別子を保持し、出力をProphetモデルで生成されるフィールドの関連サブセットのみに制限するような構造を持つデータセットとして返されるものとします。
    %%spark
    
    from pyspark.sql.types import *
    
    result_schema =StructType([
      StructField('ds',DateType()),
      StructField('store',IntegerType()),
      StructField('item',IntegerType()),
      StructField('y',FloatType()),
      StructField('yhat',FloatType()),
      StructField('yhat_upper',FloatType()),
      StructField('yhat_lower',FloatType())
      ])
    
  3. モデルをトレーニングして予測を生成するために、Pandas機能を活用します。この機能を定義して、店舗と品目の組合せに基づいて編成されたデータのサブセットを受け取ります。前のセルで識別された形式で予測が返されます。
    %%spark
    
    def forecast_store_item( history_pd: pd.DataFrame ) -> pd.DataFrame:
    
      # TRAIN MODEL AS BEFORE
      # --------------------------------------
      # remove missing values (more likely at day-store-item level)
    
      history_pd = history_pd.dropna()
    
      # configure the model
      model = Prophet(
        interval_width=0.95,
        growth='linear',
        daily_seasonality=False,
        weekly_seasonality=True,
        yearly_seasonality=True,
        seasonality_mode='multiplicative'
        )
    
      # train the model
      model.fit( history_pd )
      # --------------------------------------
    
      # BUILD FORECAST AS BEFORE
      # --------------------------------------
    
      # make predictions
      future_pd = model.make_future_dataframe(
        periods=90, 
        freq='d', 
        include_history=True
        )
    
      forecast_pd = model.predict( future_pd )  
      # --------------------------------------
    
      # ASSEMBLE EXPECTED RESULT SET
      # --------------------------------------
      # get relevant fields from forecast
      f_pd = forecast_pd[ ['ds','yhat', 'yhat_upper', 'yhat_lower'] ].set_index('ds')
    
      # get relevant fields from history
      h_pd = history_pd[['ds','store','item','y']].set_index('ds')
    
      # join history and forecast
      results_pd = f_pd.join( h_pd, how='left' )
      results_pd.reset_index(level=0, inplace=True)
    
      # get store & item from incoming data set
      results_pd['store'] = history_pd['store'].iloc[0]
      results_pd['item'] = history_pd['item'].iloc[0]
      # --------------------------------------
    
      # return expected dataset
      return results_pd[ ['ds', 'store', 'item', 'y', 'yhat', 'yhat_upper', 'yhat_lower'] ]  
    
  4. 各店舗とアイテムの組合せに予測関数を適用します。
    パンダ関数を呼び出して予測を作成します。これは、履歴データセットを店舗やアイテムごとにグループ化することで行います。次に、各グループに機能を適用し、データ管理目的でtraining_dateとして今日の日付に取り組みます。
    %%spark -o df
    
    from pyspark.sql.functions import current_date
    
    df = (
      store_item_history
        .groupBy('store', 'item')
          .applyInPandas(forecast_store_item, schema=result_schema)
        .withColumn('training_date', current_date() )
        )
    
    df.createOrReplaceTempView('new_forecasts')
    from autovizwidget.widget.utils import display_dataframe
    display_dataframe(df)
  5. デルタ表を作成して、予測出力データを保持します。
    %%spark
    
    db_name = "gold_db"
    spark.sql("CREATE DATABASE IF NOT EXISTS " + db_name)
    
    spark.sql("USE " + db_name)
    %%spark
    
    spark.sql("""create table if not exists forecasts (
                  date date,
                  store integer,
                  item integer,
                  sales float,
                  sales_predicted float,
                  sales_predicted_upper float,
                  sales_predicted_lower float,
                  training_date date
                  )
                using delta
                partitioned by (date)""")
    
    spark.sql("""merge into forecasts f
                    using new_forecasts n 
                    on f.date = n.ds and f.store = n.store and f.item = n.item
                    when matched then update set f.date = n.ds,
                      f.store = n.store,
                      f.item = n.item,
                      f.sales = n.y,
                      f.sales_predicted = n.yhat,
                      f.sales_predicted_upper = n.yhat_upper,
                      f.sales_predicted_lower = n.yhat_lower,
                      f.training_date = n.training_date
                    when not matched then insert (date,
                      store,
                      item,
                      sales,
                      sales_predicted,
                      sales_predicted_upper,
                      sales_predicted_lower,
                      training_date)
                    values (n.ds,
                      n.store,
                      n.item,
                      n.y,
                      n.yhat,
                      n.yhat_upper,
                      n.yhat_lower,
                      n.training_date)""")
  6. それぞれの予測はどのくらい良い(または悪い)か?同じ手法を適用して、各予測を評価します。パンダ関数手法を使用して、各ストア- アイテム予測の評価指標を生成できます。
    %%spark
    
    # schema of expected result set
    eval_schema =StructType([
      StructField('training_date', DateType()),
      StructField('store', IntegerType()),
      StructField('item', IntegerType()),
      StructField('mae', FloatType()),
      StructField('mse', FloatType()),
      StructField('rmse', FloatType())
      ])
     
    # define function to calculate metrics
    def evaluate_forecast( evaluation_pd: pd.DataFrame ) -> pd.DataFrame:
      
      # get store & item in incoming data set
      training_date = evaluation_pd['training_date'].iloc[0]
      store = evaluation_pd['store'].iloc[0]
      item = evaluation_pd['item'].iloc[0]
      
      # calculate evaluation metrics
      mae = mean_absolute_error( evaluation_pd['y'], evaluation_pd['yhat'] )
      mse = mean_squared_error( evaluation_pd['y'], evaluation_pd['yhat'] )
      rmse = sqrt( mse )
      
      # assemble result set
      results = {'training_date':[training_date], 'store':[store], 'item':[item], 'mae':[mae], 'mse':[mse], 'rmse':[rmse]}
      return pd.DataFrame.from_dict( results )
     
    # calculate metrics
    results = (
      spark
        .table('new_forecasts')
        .filter('ds < \'2018-01-01\'') # limit evaluation to periods where we have historical data
        .select('training_date', 'store', 'item', 'y', 'yhat')
        .groupBy('training_date', 'store', 'item')
        .applyInPandas(evaluate_forecast, schema=eval_schema)
        )
  7. もう一度、評価指標を維持しましょう。各予測のメトリックをレポートする必要があるため、問合せ可能なデルタ表に保持します。
    %%spark
    
    spark.sql("""create table if not exists forecast_evals (
                  store integer,
                  item integer,
                  mae float,
                  mse float,
                  rmse float,
                  training_date date
                  )
                using delta
                partitioned by (training_date)""")
    
    spark.sql("""insert into forecast_evals
                select
                  store,
                  item,
                  mae,
                  mse,
                  rmse,
                  training_date
                from new_forecast_evals""")
  8. ここでは、店舗とアイテムの組合せごとに予測を作成し、それぞれの基本的な評価指標を作成しました。この予測データを表示するには、単純な問合せを発行できます(ここでは店舗1から店舗3にまたがる製品1に制限されます)。
    %%spark -o df
    
    df=spark.sql("""SELECT
                      store,
                      date,
                      sales_predicted,
                      sales_predicted_upper,
                      sales_predicted_lower
                    FROM forecasts a
                    WHERE item = 1 AND
                          store IN (1, 2, 3) AND
                          date >= '2018-01-01' AND
                          training_date=current_date()
                    ORDER BY store""")
    from autovizwidget.widget.utils import display_dataframe
    display_dataframe(df.head(6))
  9. 最後に、評価メトリックを取得します。
    %%spark -o df
    
    df=spark.sql("""SELECT
                      store,
                      mae,
                      mse,
                      rmse
                    FROM forecast_evals a
                    WHERE item = 1 AND
                          training_date=current_date()
                    ORDER BY store""")
    
    from autovizwidget.widget.utils import display_dataframe
    display_dataframe(df.head(6))
OCI Data Scienceノートブックを設定し、OCI Data Flowセッションを作成して、Demand Forecastingアプリケーション(PySpark)を大規模に対話的に実行します。

追加料金が発生しないようにするには、このソリューションをテストするために作成されたすべてのリソースを削除または終了し、OCIオブジェクト・ストア・バケットをクリーン・アップします。

OCI Data Flowを今すぐ開始するには、Oracle Cloud無料トライアルにサインアップするか、既存のアカウントにサインインします。Data Flowの15分間のインストール不要のチュートリアルを試して、Oracle Cloud InfrastructureでSpark処理を簡単に実行できることを確認します。