수요 예측 생성
이 단계에서는 OCI Data Science 노트북에서 수요 예측을 생성하는 방법에 대해 설명합니다.
환경 설정
- 데이터 과학 노트북에서 OCI 데이터 플로우 세션을 설정합니다. 사용자정의 conda 패키지를 빌드할 때
pip install prophetpython 라이브러리를 사용합니다.import ads ads.set_auth("resource_principal") %reload_ext dataflow.magics %help import json command = { "compartmentId": "your-compartment-ocid", "displayName": "display-name", "sparkVersion": "3.2.1", "driverShape": "VM.Standard.E3.Flex", "executorShape": "VM.Standard.E3.Flex", "driverShapeConfig":{"ocpus":1,"memoryInGBs":16}, "executorShapeConfig":{"ocpus":1,"memoryInGBs":16}, "numExecutors": 1, "metastoreId": "your-metastore-ocid", "configuration":{ "spark.archives":"oci://your-oci-bucket-name@your-oci-tenancy-name/your-custom-conda-env-path#conda", "spark.oracle.datasource.enabled":"true", "spark.delta.logStore.oci.impl":"io.delta.storage.OracleCloudLogStore", "spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension", "spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"} } command = f'\'{json.dumps(command)}\'' print("command",command) - 종속 python 라이브러리를 가져옵니다. 예:
%%spark #Import required libraries. import json import os import sys import datetime import oci import pyspark.sql from pyspark.sql.functions import countDistinct from pyspark.ml import Pipeline from pyspark.ml.classification import DecisionTreeClassifier from pyspark.ml.feature import StringIndexer, VectorIndexer from pyspark.ml.evaluation import MulticlassClassificationEvaluator # $example off$ from pyspark.sql import SparkSession from delta.tables import * import pandas as pd from prophet import Prophet import logging import matplotlib.pyplot as plt
데이터 검토
- 데이터를 검토합니다.교육 데이터세트의 경우 10개의 다른 매장에서 50개의 품목에 대해 5년의 매장 품목 단위 판매 데이터를 사용합니다.
%%spark from pyspark.sql.types import * # structure of the training data set train_schema = StructType([ StructField('date', DateType()), StructField('store', IntegerType()), StructField('item', IntegerType()), StructField('sales', IntegerType()) ]) # read the training file into a dataframe train = spark.read.csv( 'oci://demo-bucket@orasenatdpltintegration03/forecast/bronze/train.csv', header=True, schema=train_schema ) # make the dataframe queryable as a temporary view train.createOrReplaceTempView('train') # show data train.show() - 수요 예측을 수행할 때 연간 추세를 확인할 수 있으며, 일반적인 추세와 계절성에도 관심이 있는 경우가 많습니다. 단위 판매의 연간 추세를 조사하여 탐색을 시작하겠습니다.
%%spark df = spark.sql(""" SELECT year(date) as year, sum(sales) as sales FROM train GROUP BY year(date) ORDER BY year;""") df.show(5)매장 전체의 총 단위 판매에서 일반적으로 상향 추세가 있다는 것은 데이터로부터 매우 분명합니다. 이 매장에서 제공하는 시장에 대해 더 잘 알고 있다면 예측 수명이 다할 것으로 예상되는 최대 성장 능력이 있는지 확인하고 싶을 것입니다. 이러한 지식이 없어 이 데이터 세트를 빠르게 볼 수 없다면, 우리의 목표가 며칠, 몇 달 또는 심지어 1년 밖에 예측하는 것이라면, 우리는 그 기간 동안 지속적으로 선형 성장을 기대할 수 있다고 가정하는 것이 안전합니다. - 월별 추세를 확인합니다.계절성을 살펴보겠습니다. 매년 개개월에 걸쳐 데이터를 집계하는 경우 전체 매출 성장에 따라 규모가 커지는 고유한 연간 계절 패턴이 관찰됩니다.
%%spark -o df df = spark.sql("""SELECT TRUNC(date, 'MM') as month, SUM(sales) as sales FROM train GROUP BY TRUNC(date, 'MM') ORDER BY month;""")from autovizwidget.widget.utils import display_dataframe display_dataframe(df.head(6)) - 평일 추세를 봅니다.평일 수준에서 데이터를 집계하면 월요일(평일 1)에 하드 드롭한 다음 주중에 계속 픽업하여 일요일(평일 0)에 피크가 있는 것으로 발음되는 주별 계절 패턴이 관찰됩니다. 이 패턴은 5년간의 관측에서 안정된 것으로 보입니다.
%%spark -o df df = spark.sql(""" SELECT YEAR(date) as year, ( CASE WHEN DATE_FORMAT(date, 'E') = 'Sun' THEN 0 WHEN DATE_FORMAT(date, 'E') = 'Mon' THEN 1 WHEN DATE_FORMAT(date, 'E') = 'Tue' THEN 2 WHEN DATE_FORMAT(date, 'E') = 'Wed' THEN 3 WHEN DATE_FORMAT(date, 'E') = 'Thu' THEN 4 WHEN DATE_FORMAT(date, 'E') = 'Fri' THEN 5 WHEN DATE_FORMAT(date, 'E') = 'Sat' THEN 6 END ) % 7 as weekday, AVG(sales) as sales FROM ( SELECT date, SUM(sales) as sales FROM train GROUP BY date ) x GROUP BY year, weekday ORDER BY year, weekday;""")from autovizwidget.widget.utils import display_dataframe display_dataframe(df.head(6))
단일 예측 생성
상점과 품목의 개별 조합에 대한 예측을 생성하기 전에 단일 수요 예측을 생성해 보겠습니다. 그것은 예언자의 사용에 자신을 지향하는 것보다 다른 이유없이 하나의 예측을 구축하는 것이 도움이 될 수 있습니다.
- 첫번째 단계는 모델을 학습시킬 과거 데이터 집합을 어셈블하는 것입니다.
%%spark history_pd = spark.sql(""" SELECT CAST(date as date) as ds, sales as y FROM train WHERE store=1 AND item=1 ORDER BY ds """).toPandas() history_pd = history_pd.dropna() history_pd = history_pd.dropna()이제 선지자 라이브러리를 임포트하겠습니다. 하지만 사용 중일 때 약간 상세할 수 있으므로 환경에서 로깅 설정을 미세 조정해야 합니다.%%spark from prophet import Prophet import logging # disable informational messages from prophet logging.getLogger('py4j').setLevel(logging.ERROR)%%spark # set model parameters model = Prophet( interval_width=0.95, growth='linear', daily_seasonality=False, weekly_seasonality=True, yearly_seasonality=True, seasonality_mode='multiplicative' ) # fit the model to historical data model.fit(history_pd) - 다음으로 예측을 작성합니다.
%%spark -o df # define a dataset including both historical dates & 90-days beyond the last available date future_pd = model.make_future_dataframe( periods=90, freq='d', include_history=True ) forecast_pd = model.predict(future_pd) df=spark.createDataFrame(forecast_pd)from autovizwidget.widget.utils import display_dataframe display_dataframe(df.head(6)) - 예측 구성 요소를 검사합니다.모델은 어떻게 수행되었습니까? 여기서는 그래프로 표시되는 모델의 일반 및 계절 추세를 확인할 수 있습니다.
%%spark import matplotlib.pyplot as plt trends_fig = model.plot_components(forecast_pd) %matplot plt - 과거 데이터 대 예측 데이터를 봅니다.여기서는 실제 및 예측된 데이터 라인과 미래에 대한 예측을 확인할 수 있습니다. 하지만 읽기 쉽게 유지하기 위해 그래프를 과거 데이터의 마지막 연도로 제한할 것입니다.
%%spark predict_fig = model.plot( forecast_pd, xlabel='date', ylabel='sales') # adjust figure to display dates from last year + the 90 day forecast xlim = predict_fig.axes[0].get_xlim() new_xlim = ( xlim[1]-(180.0+365.0), xlim[1]-90.0) predict_fig.axes[0].set_xlim(new_xlim) import matplotlib.pyplot as plt %matplot plt이 시각화는 약간 사용 중입니다. 검은색 점은 예측을 나타내는 진한 파란색 선과 (95%) 불확실성 간격을 나타내는 밝은 파란색 밴드를 가진 실제 값을 나타냅니다. - 평가 척도를 계산합니다.시각적 검사가 유용하지만 예측을 평가하는 더 좋은 방법은 세트의 실제 값을 기준으로 예측된 평균 절대 오차, 평균 제곱 오차 및 평균 제곱 오차 값을 계산하는 것입니다.
%%spark import pandas as pd from sklearn.metrics import mean_squared_error, mean_absolute_error from math import sqrt from datetime import date # get historical actuals & predictions for comparison actuals_pd = history_pd[ history_pd['ds'] < date(2018, 1, 1) ]['y'] predicted_pd = forecast_pd[ forecast_pd['ds'] < pd.to_datetime('2018-01-01') ]['yhat'] # calculate evaluation metrics mae = mean_absolute_error(actuals_pd, predicted_pd) mse = mean_squared_error(actuals_pd, predicted_pd) rmse = sqrt(mse) # print metrics to the screen print( '\n'.join(['MAE: {0}', 'MSE: {1}', 'RMSE: {2}']).format(mae, mse, rmse) )
대규모 예측 생성
- 예측 생성을 확장합니다.우리의 벨트 아래 기계공으로, 개별 상점과 품목 조합에 대한 수많은 미세한 모델과 예측을 구축하는 우리의 원래 목표를 해결해 보겠습니다. 먼저 점포-품목-일자 세분성 레벨에서 판매 데이터를 어셈블합니다. 모든 상점-품목 조합에 대한 데이터를 검색합니다. 이 데이터 세트의 데이터는 이 세분성 레벨에서 이미 집계되어야 하지만 예상되는 데이터 구조를 갖도록 명시적으로 집계됩니다.
%%spark sql_statement = ''' SELECT store, item, CAST(date as date) as ds, SUM(sales) as y FROM train GROUP BY store, item, ds ORDER BY store, item, ds ''' store_item_history = ( spark .sql( sql_statement ) .repartition(sc.defaultParallelism, ['store', 'item']) ).cache() - 저장소 품목 일자 수준에서 집계된 데이터를 사용하여 데이터를 선지자에게 전달하는 방법을 고려해야 합니다. 각 매장 및 품목 조합에 대한 모델을 구축하는 것이 목표인 경우 방금 조립한 데이터세트에서 매장 품목 하위 세트를 전달하고, 해당 하위 세트에서 모델을 교육하고, 매장 품목 예측을 다시 받아야 합니다. 예측이 조립된 상점 및 품목 식별자를 보유하는 구조와 같은 구조를 가진 데이터 세트로 예측이 반환될 것으로 예상되며, 출력은 예언자 모델에서 생성된 관련 필드 하위 세트로만 제한됩니다.
%%spark from pyspark.sql.types import * result_schema =StructType([ StructField('ds',DateType()), StructField('store',IntegerType()), StructField('item',IntegerType()), StructField('y',FloatType()), StructField('yhat',FloatType()), StructField('yhat_upper',FloatType()), StructField('yhat_lower',FloatType()) ]) - 모델을 교육하고 예측을 생성하기 위해 Pandas 함수를 활용합니다. 이 함수를 정의하여 상점과 품목 조합을 중심으로 구성된 데이터의 하위 세트를 수신합니다. 이전 셀에서 식별된 형식으로 예측이 반환됩니다.
%%spark def forecast_store_item( history_pd: pd.DataFrame ) -> pd.DataFrame: # TRAIN MODEL AS BEFORE # -------------------------------------- # remove missing values (more likely at day-store-item level) history_pd = history_pd.dropna() # configure the model model = Prophet( interval_width=0.95, growth='linear', daily_seasonality=False, weekly_seasonality=True, yearly_seasonality=True, seasonality_mode='multiplicative' ) # train the model model.fit( history_pd ) # -------------------------------------- # BUILD FORECAST AS BEFORE # -------------------------------------- # make predictions future_pd = model.make_future_dataframe( periods=90, freq='d', include_history=True ) forecast_pd = model.predict( future_pd ) # -------------------------------------- # ASSEMBLE EXPECTED RESULT SET # -------------------------------------- # get relevant fields from forecast f_pd = forecast_pd[ ['ds','yhat', 'yhat_upper', 'yhat_lower'] ].set_index('ds') # get relevant fields from history h_pd = history_pd[['ds','store','item','y']].set_index('ds') # join history and forecast results_pd = f_pd.join( h_pd, how='left' ) results_pd.reset_index(level=0, inplace=True) # get store & item from incoming data set results_pd['store'] = history_pd['store'].iloc[0] results_pd['item'] = history_pd['item'].iloc[0] # -------------------------------------- # return expected dataset return results_pd[ ['ds', 'store', 'item', 'y', 'yhat', 'yhat_upper', 'yhat_lower'] ] - 각 상점-품목 조합에 예측 기능을 적용합니다.Pandas 함수를 호출하여 예측을 작성해 보겠습니다. 이 작업은 저장소 및 품목과 관련된 과거 데이터세트를 그룹화하여 수행합니다. 그런 다음 각 그룹에 기능을 적용하고 데이터 관리 목적으로
training_date로 오늘 날짜를 기록합니다.%%spark -o df from pyspark.sql.functions import current_date df = ( store_item_history .groupBy('store', 'item') .applyInPandas(forecast_store_item, schema=result_schema) .withColumn('training_date', current_date() ) ) df.createOrReplaceTempView('new_forecasts')from autovizwidget.widget.utils import display_dataframe display_dataframe(df) - 델타 테이블을 생성하여 예측 출력 데이터를 지속합니다.
%%spark db_name = "gold_db" spark.sql("CREATE DATABASE IF NOT EXISTS " + db_name) spark.sql("USE " + db_name)%%spark spark.sql("""create table if not exists forecasts ( date date, store integer, item integer, sales float, sales_predicted float, sales_predicted_upper float, sales_predicted_lower float, training_date date ) using delta partitioned by (date)""") spark.sql("""merge into forecasts f using new_forecasts n on f.date = n.ds and f.store = n.store and f.item = n.item when matched then update set f.date = n.ds, f.store = n.store, f.item = n.item, f.sales = n.y, f.sales_predicted = n.yhat, f.sales_predicted_upper = n.yhat_upper, f.sales_predicted_lower = n.yhat_lower, f.training_date = n.training_date when not matched then insert (date, store, item, sales, sales_predicted, sales_predicted_upper, sales_predicted_lower, training_date) values (n.ds, n.store, n.item, n.y, n.yhat, n.yhat_upper, n.yhat_lower, n.training_date)""") - 각 예측의 양호성(또는 불량)은 어떻습니까? 각 예측을 평가하기 위해 동일한 기법을 적용해 보겠습니다. pandas 함수 기법을 사용하여 각 상점-품목 예측에 대한 평가 척도를 생성할 수 있습니다.
%%spark # schema of expected result set eval_schema =StructType([ StructField('training_date', DateType()), StructField('store', IntegerType()), StructField('item', IntegerType()), StructField('mae', FloatType()), StructField('mse', FloatType()), StructField('rmse', FloatType()) ]) # define function to calculate metrics def evaluate_forecast( evaluation_pd: pd.DataFrame ) -> pd.DataFrame: # get store & item in incoming data set training_date = evaluation_pd['training_date'].iloc[0] store = evaluation_pd['store'].iloc[0] item = evaluation_pd['item'].iloc[0] # calculate evaluation metrics mae = mean_absolute_error( evaluation_pd['y'], evaluation_pd['yhat'] ) mse = mean_squared_error( evaluation_pd['y'], evaluation_pd['yhat'] ) rmse = sqrt( mse ) # assemble result set results = {'training_date':[training_date], 'store':[store], 'item':[item], 'mae':[mae], 'mse':[mse], 'rmse':[rmse]} return pd.DataFrame.from_dict( results ) # calculate metrics results = ( spark .table('new_forecasts') .filter('ds < \'2018-01-01\'') # limit evaluation to periods where we have historical data .select('training_date', 'store', 'item', 'y', 'yhat') .groupBy('training_date', 'store', 'item') .applyInPandas(evaluate_forecast, schema=eval_schema) ) - 다시 시도해 보면 평가 측정항목을 지속해 보겠습니다. 각 예측에 대한 척도를 보고하려고 하므로 쿼리 가능한 델타 테이블에 계속됩니다.
%%spark spark.sql("""create table if not exists forecast_evals ( store integer, item integer, mae float, mse float, rmse float, training_date date ) using delta partitioned by (training_date)""") spark.sql("""insert into forecast_evals select store, item, mae, mse, rmse, training_date from new_forecast_evals""") - 이제 각 매장 품목 조합에 대한 예측을 구성하고 각각에 대한 기본 평가 척도를 생성했습니다. 이 예측 데이터를 보려면 간단한 쿼리를 실행할 수 있습니다(여기에서 1개에서 3개까지 하나의 제품을 생산하도록 제한).
%%spark -o df df=spark.sql("""SELECT store, date, sales_predicted, sales_predicted_upper, sales_predicted_lower FROM forecasts a WHERE item = 1 AND store IN (1, 2, 3) AND date >= '2018-01-01' AND training_date=current_date() ORDER BY store""")from autovizwidget.widget.utils import display_dataframe display_dataframe(df.head(6)) - 마지막으로 평가 측정항목을 검색해 보겠습니다.
%%spark -o df df=spark.sql("""SELECT store, mae, mse, rmse FROM forecast_evals a WHERE item = 1 AND training_date=current_date() ORDER BY store""")from autovizwidget.widget.utils import display_dataframe display_dataframe(df.head(6))
이제 OCI Data Science 노트북을 설정하고 대화식으로 Demand Forecasting 애플리케이션(PySpark)을 대규모로 실행할 수 있는 OCI Data Flow 세션을 생성했습니다.
추가 비용이 발생하지 않도록 하려면 이 솔루션을 테스트하기 위해 생성된 모든 리소스를 삭제 또는 종료하고 OCI 객체 저장소 버킷을 정리합니다.
지금 OCI Data Flow를 시작하려면 Oracle Cloud 무료 체험판에 등록하거나 기존 계정에 사인인하십시오. Data Flow의 15분 무료 설치 필요 자습서를 통해 Oracle Cloud Infrastructure를 통해 Spark 처리가 얼마나 쉬운지 확인할 수 있습니다.











