需要予測の生成
これらのステップでは、OCIデータ・サイエンス・ノートブックで需要予測を生成する方法について説明します。
環境の設定
- データ・サイエンス・ノートブックで、OCIデータ・フロー・セッションを設定します。カスタムcondaパッケージを構築する場合は、
pip install prophet
pythonライブラリを使用します。import ads ads.set_auth("resource_principal") %reload_ext dataflow.magics %help import json command = { "compartmentId": "your-compartment-ocid", "displayName": "display-name", "sparkVersion": "3.2.1", "driverShape": "VM.Standard.E3.Flex", "executorShape": "VM.Standard.E3.Flex", "driverShapeConfig":{"ocpus":1,"memoryInGBs":16}, "executorShapeConfig":{"ocpus":1,"memoryInGBs":16}, "numExecutors": 1, "metastoreId": "your-metastore-ocid", "configuration":{ "spark.archives":"oci://your-oci-bucket-name@your-oci-tenancy-name/your-custom-conda-env-path#conda", "spark.oracle.datasource.enabled":"true", "spark.delta.logStore.oci.impl":"io.delta.storage.OracleCloudLogStore", "spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension", "spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"} } command = f'\'{json.dumps(command)}\'' print("command",command)
- 依存pythonライブラリをインポートします。たとえば:
%%spark #Import required libraries. import json import os import sys import datetime import oci import pyspark.sql from pyspark.sql.functions import countDistinct from pyspark.ml import Pipeline from pyspark.ml.classification import DecisionTreeClassifier from pyspark.ml.feature import StringIndexer, VectorIndexer from pyspark.ml.evaluation import MulticlassClassificationEvaluator # $example off$ from pyspark.sql import SparkSession from delta.tables import * import pandas as pd from prophet import Prophet import logging import matplotlib.pyplot as plt
データの調査
- データを確認します。トレーニング・データセットでは、10の異なる店舗で50のアイテムの5年間のストア・アイテム・ユニット売上データを利用します。
%%spark from pyspark.sql.types import * # structure of the training data set train_schema = StructType([ StructField('date', DateType()), StructField('store', IntegerType()), StructField('item', IntegerType()), StructField('sales', IntegerType()) ]) # read the training file into a dataframe train = spark.read.csv( 'oci://demo-bucket@orasenatdpltintegration03/forecast/bronze/train.csv', header=True, schema=train_schema ) # make the dataframe queryable as a temporary view train.createOrReplaceTempView('train') # show data train.show()
- 需要予測を行う際の年次トレンドを見ることは、一般のトレンドや季節性に関心があることがよくあります。単位販売の年間トレンドを調べて、調査を始めましょう。
%%spark df = spark.sql(""" SELECT year(date) as year, sum(sales) as sales FROM train GROUP BY year(date) ORDER BY year;""") df.show(5)
データからは、店舗全体のユニット売上合計に一般的に上昇傾向があることは明らかです。これらの店舗の市場についてよりよく知っていたら、予測の期間中にアプローチすると予想される最大成長能力があるかどうかを判断したいかもしれません。その知識がなければ、そしてこのデータセットをすぐに見ることで、私たちの目標が数日、数か月、あるいは1年で予測することであれば、その期間にわたって継続的な線形成長を期待できると仮定することは安全です。 - 月次トレンドを表示します。季節性を見てみましょう。毎年各月のデータを集計すると、売上全体の成長に伴って規模が拡大しているように見える、個別の年次季節パターンが観察されます。
%%spark -o df df = spark.sql("""SELECT TRUNC(date, 'MM') as month, SUM(sales) as sales FROM train GROUP BY TRUNC(date, 'MM') ORDER BY month;""")
from autovizwidget.widget.utils import display_dataframe display_dataframe(df.head(6))
- 平日の傾向を表示します。平日レベルでデータを集計することで、発音された週次季節パターンが日曜日のピーク(週日0)で確認され、月曜日のハード・ドロップ(週日1)で確認され、週間の安定したピックアップが日曜日の高値に戻されます。このパターンは5年間の観察で安定していると考えられる。
%%spark -o df df = spark.sql(""" SELECT YEAR(date) as year, ( CASE WHEN DATE_FORMAT(date, 'E') = 'Sun' THEN 0 WHEN DATE_FORMAT(date, 'E') = 'Mon' THEN 1 WHEN DATE_FORMAT(date, 'E') = 'Tue' THEN 2 WHEN DATE_FORMAT(date, 'E') = 'Wed' THEN 3 WHEN DATE_FORMAT(date, 'E') = 'Thu' THEN 4 WHEN DATE_FORMAT(date, 'E') = 'Fri' THEN 5 WHEN DATE_FORMAT(date, 'E') = 'Sat' THEN 6 END ) % 7 as weekday, AVG(sales) as sales FROM ( SELECT date, SUM(sales) as sales FROM train GROUP BY date ) x GROUP BY year, weekday ORDER BY year, weekday;""")
from autovizwidget.widget.utils import display_dataframe display_dataframe(df.head(6))
単一予測の生成
店舗と品目の個々の組合せに対して予測を生成する前に、単一の需要予測を生成しましょう。1つの予測は、預言者の使用に自らを回すこと以外に何の理由もなく成し遂げられるかもしれません。
- 最初のステップは、モデルをトレーニングする履歴データセットを組み立てることです。
%%spark history_pd = spark.sql(""" SELECT CAST(date as date) as ds, sales as y FROM train WHERE store=1 AND item=1 ORDER BY ds """).toPandas() history_pd = history_pd.dropna() history_pd = history_pd.dropna()
ここで、預言者ライブラリをインポートしますが、使用時に少し冗長になる可能性があるため、環境内のロギング設定を微調整する必要があります。%%spark from prophet import Prophet import logging # disable informational messages from prophet logging.getLogger('py4j').setLevel(logging.ERROR)
%%spark # set model parameters model = Prophet( interval_width=0.95, growth='linear', daily_seasonality=False, weekly_seasonality=True, yearly_seasonality=True, seasonality_mode='multiplicative' ) # fit the model to historical data model.fit(history_pd)
- 次に、予測を作成します。
%%spark -o df # define a dataset including both historical dates & 90-days beyond the last available date future_pd = model.make_future_dataframe( periods=90, freq='d', include_history=True ) forecast_pd = model.predict(future_pd) df=spark.createDataFrame(forecast_pd)
from autovizwidget.widget.utils import display_dataframe display_dataframe(df.head(6))
- 予測コンポーネントを確認します。モデルはどのように実行されましたか?ここでは、モデルの一般的なトレンドと季節的なトレンドをグラフとして表示できます。
%%spark import matplotlib.pyplot as plt trends_fig = model.plot_components(forecast_pd) %matplot plt
- 履歴データと予測データを表示します。ここでは、実際のデータと予測データがどのように並んでいるか、および将来の予測を確認できます。ただし、グラフは過去1年間のデータを読みやすくするために制限します。
%%spark predict_fig = model.plot( forecast_pd, xlabel='date', ylabel='sales') # adjust figure to display dates from last year + the 90 day forecast xlim = predict_fig.axes[0].get_xlim() new_xlim = ( xlim[1]-(180.0+365.0), xlim[1]-90.0) predict_fig.axes[0].set_xlim(new_xlim) import matplotlib.pyplot as plt %matplot plt
このビジュアライゼーションは多少ビジーです。黒い点は、予測を表す濃い青色の線と、(95%)の不確実性の間隔を表す明るい青色のバンドの実績を表します。 - 評価メトリックを計算します。ビジュアル検査は便利ですが、予測を評価するより適切な方法は、予測される平均絶対誤差、平均平方誤差および二乗平均平方誤差値を、セット内の実際の値に対して相対的に計算することです。
%%spark import pandas as pd from sklearn.metrics import mean_squared_error, mean_absolute_error from math import sqrt from datetime import date # get historical actuals & predictions for comparison actuals_pd = history_pd[ history_pd['ds'] < date(2018, 1, 1) ]['y'] predicted_pd = forecast_pd[ forecast_pd['ds'] < pd.to_datetime('2018-01-01') ]['yhat'] # calculate evaluation metrics mae = mean_absolute_error(actuals_pd, predicted_pd) mse = mean_squared_error(actuals_pd, predicted_pd) rmse = sqrt(mse) # print metrics to the screen print( '\n'.join(['MAE: {0}', 'MSE: {1}', 'RMSE: {2}']).format(mae, mse, rmse) )
大規模な予測の生成
- 予測生成をスケーリングします。弊社のベルトの下にあるメカニクスで、店舗とアイテムの個々の組合せに対して、多数の細かいモデルと予測を構築するという当初の目標に取り組みます。まず、店舗アイテム日付の粒度レベルで販売データを組み立てます。すべてのストアとアイテムの組合せのデータを取得します。このデータ・セット内のデータは、このレベルの粒度ですでに集計する必要がありますが、予期されるデータ構造を確保するために明示的に集計しています。
%%spark sql_statement = ''' SELECT store, item, CAST(date as date) as ds, SUM(sales) as y FROM train GROUP BY store, item, ds ORDER BY store, item, ds ''' store_item_history = ( spark .sql( sql_statement ) .repartition(sc.defaultParallelism, ['store', 'item']) ).cache()
- データがストア・アイテム日付レベルで集計されたため、データを預言者にどのように渡すかを検討する必要があります。店舗とアイテムの組合せごとにモデルを作成することが目標である場合、これまでに組み立てたデータセットから店舗アイテム・サブセットを渡し、そのサブセットでモデルをトレーニングし、店舗アイテムの予測を取り戻す必要があります。予測は、予測が組み立てられた店舗識別子と品目識別子を保持し、出力をProphetモデルで生成されるフィールドの関連サブセットのみに制限するような構造を持つデータセットとして返されるものとします。
%%spark from pyspark.sql.types import * result_schema =StructType([ StructField('ds',DateType()), StructField('store',IntegerType()), StructField('item',IntegerType()), StructField('y',FloatType()), StructField('yhat',FloatType()), StructField('yhat_upper',FloatType()), StructField('yhat_lower',FloatType()) ])
- モデルをトレーニングして予測を生成するために、Pandas機能を活用します。この機能を定義して、店舗と品目の組合せに基づいて編成されたデータのサブセットを受け取ります。前のセルで識別された形式で予測が返されます。
%%spark def forecast_store_item( history_pd: pd.DataFrame ) -> pd.DataFrame: # TRAIN MODEL AS BEFORE # -------------------------------------- # remove missing values (more likely at day-store-item level) history_pd = history_pd.dropna() # configure the model model = Prophet( interval_width=0.95, growth='linear', daily_seasonality=False, weekly_seasonality=True, yearly_seasonality=True, seasonality_mode='multiplicative' ) # train the model model.fit( history_pd ) # -------------------------------------- # BUILD FORECAST AS BEFORE # -------------------------------------- # make predictions future_pd = model.make_future_dataframe( periods=90, freq='d', include_history=True ) forecast_pd = model.predict( future_pd ) # -------------------------------------- # ASSEMBLE EXPECTED RESULT SET # -------------------------------------- # get relevant fields from forecast f_pd = forecast_pd[ ['ds','yhat', 'yhat_upper', 'yhat_lower'] ].set_index('ds') # get relevant fields from history h_pd = history_pd[['ds','store','item','y']].set_index('ds') # join history and forecast results_pd = f_pd.join( h_pd, how='left' ) results_pd.reset_index(level=0, inplace=True) # get store & item from incoming data set results_pd['store'] = history_pd['store'].iloc[0] results_pd['item'] = history_pd['item'].iloc[0] # -------------------------------------- # return expected dataset return results_pd[ ['ds', 'store', 'item', 'y', 'yhat', 'yhat_upper', 'yhat_lower'] ]
- 各店舗とアイテムの組合せに予測関数を適用します。パンダ関数を呼び出して予測を作成します。これは、履歴データセットを店舗やアイテムごとにグループ化することで行います。次に、各グループに機能を適用し、データ管理目的で
training_date
として今日の日付に取り組みます。%%spark -o df from pyspark.sql.functions import current_date df = ( store_item_history .groupBy('store', 'item') .applyInPandas(forecast_store_item, schema=result_schema) .withColumn('training_date', current_date() ) ) df.createOrReplaceTempView('new_forecasts')
from autovizwidget.widget.utils import display_dataframe display_dataframe(df)
- デルタ表を作成して、予測出力データを保持します。
%%spark db_name = "gold_db" spark.sql("CREATE DATABASE IF NOT EXISTS " + db_name) spark.sql("USE " + db_name)
%%spark spark.sql("""create table if not exists forecasts ( date date, store integer, item integer, sales float, sales_predicted float, sales_predicted_upper float, sales_predicted_lower float, training_date date ) using delta partitioned by (date)""") spark.sql("""merge into forecasts f using new_forecasts n on f.date = n.ds and f.store = n.store and f.item = n.item when matched then update set f.date = n.ds, f.store = n.store, f.item = n.item, f.sales = n.y, f.sales_predicted = n.yhat, f.sales_predicted_upper = n.yhat_upper, f.sales_predicted_lower = n.yhat_lower, f.training_date = n.training_date when not matched then insert (date, store, item, sales, sales_predicted, sales_predicted_upper, sales_predicted_lower, training_date) values (n.ds, n.store, n.item, n.y, n.yhat, n.yhat_upper, n.yhat_lower, n.training_date)""")
- それぞれの予測はどのくらい良い(または悪い)か?同じ手法を適用して、各予測を評価します。パンダ関数手法を使用して、各ストア- アイテム予測の評価指標を生成できます。
%%spark # schema of expected result set eval_schema =StructType([ StructField('training_date', DateType()), StructField('store', IntegerType()), StructField('item', IntegerType()), StructField('mae', FloatType()), StructField('mse', FloatType()), StructField('rmse', FloatType()) ]) # define function to calculate metrics def evaluate_forecast( evaluation_pd: pd.DataFrame ) -> pd.DataFrame: # get store & item in incoming data set training_date = evaluation_pd['training_date'].iloc[0] store = evaluation_pd['store'].iloc[0] item = evaluation_pd['item'].iloc[0] # calculate evaluation metrics mae = mean_absolute_error( evaluation_pd['y'], evaluation_pd['yhat'] ) mse = mean_squared_error( evaluation_pd['y'], evaluation_pd['yhat'] ) rmse = sqrt( mse ) # assemble result set results = {'training_date':[training_date], 'store':[store], 'item':[item], 'mae':[mae], 'mse':[mse], 'rmse':[rmse]} return pd.DataFrame.from_dict( results ) # calculate metrics results = ( spark .table('new_forecasts') .filter('ds < \'2018-01-01\'') # limit evaluation to periods where we have historical data .select('training_date', 'store', 'item', 'y', 'yhat') .groupBy('training_date', 'store', 'item') .applyInPandas(evaluate_forecast, schema=eval_schema) )
- もう一度、評価指標を維持しましょう。各予測のメトリックをレポートする必要があるため、問合せ可能なデルタ表に保持します。
%%spark spark.sql("""create table if not exists forecast_evals ( store integer, item integer, mae float, mse float, rmse float, training_date date ) using delta partitioned by (training_date)""") spark.sql("""insert into forecast_evals select store, item, mae, mse, rmse, training_date from new_forecast_evals""")
- ここでは、店舗とアイテムの組合せごとに予測を作成し、それぞれの基本的な評価指標を作成しました。この予測データを表示するには、単純な問合せを発行できます(ここでは店舗1から店舗3にまたがる製品1に制限されます)。
%%spark -o df df=spark.sql("""SELECT store, date, sales_predicted, sales_predicted_upper, sales_predicted_lower FROM forecasts a WHERE item = 1 AND store IN (1, 2, 3) AND date >= '2018-01-01' AND training_date=current_date() ORDER BY store""")
from autovizwidget.widget.utils import display_dataframe display_dataframe(df.head(6))
- 最後に、評価メトリックを取得します。
%%spark -o df df=spark.sql("""SELECT store, mae, mse, rmse FROM forecast_evals a WHERE item = 1 AND training_date=current_date() ORDER BY store""")
from autovizwidget.widget.utils import display_dataframe display_dataframe(df.head(6))
OCI Data Scienceノートブックを設定し、OCI Data Flowセッションを作成して、Demand Forecastingアプリケーション(PySpark)を大規模に対話的に実行します。
追加料金が発生しないようにするには、このソリューションをテストするために作成されたすべてのリソースを削除または終了し、OCIオブジェクト・ストア・バケットをクリーン・アップします。
OCI Data Flowを今すぐ開始するには、Oracle Cloud無料トライアルにサインアップするか、既存のアカウントにサインインします。Data Flowの15分間のインストール不要のチュートリアルを試して、Oracle Cloud InfrastructureでSpark処理を簡単に実行できることを確認します。