수요 예측 생성

이 단계에서는 OCI Data Science 노트북에서 수요 예측을 생성하는 방법에 대해 설명합니다.

환경 설정

  1. 데이터 과학 노트북에서 OCI 데이터 플로우 세션을 설정합니다. 사용자정의 conda 패키지를 빌드할 때 pip install prophet python 라이브러리를 사용합니다.
    import ads
    
    ads.set_auth("resource_principal")
    
    %reload_ext dataflow.magics
    
    %help
    
    import json
    command = {
        "compartmentId": "your-compartment-ocid",
        "displayName": "display-name",
        "sparkVersion": "3.2.1",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "metastoreId": "your-metastore-ocid",
        "configuration":{
                 "spark.archives":"oci://your-oci-bucket-name@your-oci-tenancy-name/your-custom-conda-env-path#conda",
                         "spark.oracle.datasource.enabled":"true",
                         "spark.delta.logStore.oci.impl":"io.delta.storage.OracleCloudLogStore",
                         "spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
                         "spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"}
    }
    command = f'\'{json.dumps(command)}\''
    print("command",command)
    
  2. 종속 python 라이브러리를 가져옵니다. 예:
    %%spark
    #Import required libraries.
    
    import json
    import os
    import sys
    import datetime
    import oci
    import pyspark.sql 
    from pyspark.sql.functions import countDistinct
    
    from pyspark.ml import Pipeline
    from pyspark.ml.classification import DecisionTreeClassifier
    from pyspark.ml.feature import StringIndexer, VectorIndexer
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    # $example off$
    from pyspark.sql import SparkSession
    
    from delta.tables import *
    
    import pandas as pd
    
    from prophet import Prophet
    import logging
    
    import matplotlib.pyplot as plt
    

데이터 검토

  1. 데이터를 검토합니다.
    교육 데이터세트의 경우 10개의 다른 매장에서 50개의 품목에 대해 5년의 매장 품목 단위 판매 데이터를 사용합니다.
    %%spark
    
    from pyspark.sql.types import *
    
    # structure of the training data set
    
    train_schema = StructType([
      StructField('date', DateType()),
      StructField('store', IntegerType()),
      StructField('item', IntegerType()),
      StructField('sales', IntegerType())
      ])
    
    # read the training file into a dataframe
    
    train = spark.read.csv(
      'oci://demo-bucket@orasenatdpltintegration03/forecast/bronze/train.csv', 
      header=True, 
      schema=train_schema
      )
    
    # make the dataframe queryable as a temporary view
    
    train.createOrReplaceTempView('train')
    
    # show data
    
    train.show()
    
  2. 수요 예측을 수행할 때 연간 추세를 확인할 수 있으며, 일반적인 추세와 계절성에도 관심이 있는 경우가 많습니다. 단위 판매의 연간 추세를 조사하여 탐색을 시작하겠습니다.
    %%spark
    
    df = spark.sql(""" SELECT
      year(date) as year, 
      sum(sales) as sales
    FROM train
    GROUP BY year(date)
    ORDER BY year;""")
    
    df.show(5)
    
    매장 전체의 총 단위 판매에서 일반적으로 상향 추세가 있다는 것은 데이터로부터 매우 분명합니다. 이 매장에서 제공하는 시장에 대해 더 잘 알고 있다면 예측 수명이 다할 것으로 예상되는 최대 성장 능력이 있는지 확인하고 싶을 것입니다. 이러한 지식이 없어 이 데이터 세트를 빠르게 볼 수 없다면, 우리의 목표가 며칠, 몇 달 또는 심지어 1년 밖에 예측하는 것이라면, 우리는 그 기간 동안 지속적으로 선형 성장을 기대할 수 있다고 가정하는 것이 안전합니다.
  3. 월별 추세를 확인합니다.
    계절성을 살펴보겠습니다. 매년 개개월에 걸쳐 데이터를 집계하는 경우 전체 매출 성장에 따라 규모가 커지는 고유한 연간 계절 패턴이 관찰됩니다.
    %%spark -o df
    
    df = spark.sql("""SELECT 
      TRUNC(date, 'MM') as month,
      SUM(sales) as sales
    FROM train
    GROUP BY TRUNC(date, 'MM')
    ORDER BY month;""")
    
    from autovizwidget.widget.utils import display_dataframe
    display_dataframe(df.head(6))
  4. 평일 추세를 봅니다.
    평일 수준에서 데이터를 집계하면 월요일(평일 1)에 하드 드롭한 다음 주중에 계속 픽업하여 일요일(평일 0)에 피크가 있는 것으로 발음되는 주별 계절 패턴이 관찰됩니다. 이 패턴은 5년간의 관측에서 안정된 것으로 보입니다.
    %%spark -o df
    
    df = spark.sql(""" 
    SELECT
      YEAR(date) as year,
      (
        CASE
          WHEN DATE_FORMAT(date, 'E') = 'Sun' THEN 0
          WHEN DATE_FORMAT(date, 'E') = 'Mon' THEN 1
          WHEN DATE_FORMAT(date, 'E') = 'Tue' THEN 2
          WHEN DATE_FORMAT(date, 'E') = 'Wed' THEN 3
          WHEN DATE_FORMAT(date, 'E') = 'Thu' THEN 4
          WHEN DATE_FORMAT(date, 'E') = 'Fri' THEN 5
          WHEN DATE_FORMAT(date, 'E') = 'Sat' THEN 6
        END
      ) % 7 as weekday,
      AVG(sales) as sales
    FROM (
      SELECT 
        date,
        SUM(sales) as sales
      FROM train
      GROUP BY date
     ) x
    GROUP BY year, weekday
    ORDER BY year, weekday;""")
    
    from autovizwidget.widget.utils import display_dataframe
    display_dataframe(df.head(6))
    

단일 예측 생성

상점과 품목의 개별 조합에 대한 예측을 생성하기 전에 단일 수요 예측을 생성해 보겠습니다. 그것은 예언자의 사용에 자신을 지향하는 것보다 다른 이유없이 하나의 예측을 구축하는 것이 도움이 될 수 있습니다.
  1. 첫번째 단계는 모델을 학습시킬 과거 데이터 집합을 어셈블하는 것입니다.
    %%spark
    
    history_pd = spark.sql(""" 
      SELECT
        CAST(date as date) as ds,
        sales as y
      FROM train
      WHERE store=1 AND item=1
      ORDER BY ds
    """).toPandas()
    
    history_pd = history_pd.dropna()
    
    history_pd = history_pd.dropna()
    
    이제 선지자 라이브러리를 임포트하겠습니다. 하지만 사용 중일 때 약간 상세할 수 있으므로 환경에서 로깅 설정을 미세 조정해야 합니다.
    %%spark
    
    from prophet import Prophet
    import logging
    
    # disable informational messages from prophet
    
    logging.getLogger('py4j').setLevel(logging.ERROR)
    %%spark
    
    # set model parameters
    model = Prophet(
      interval_width=0.95,
      growth='linear',
      daily_seasonality=False,
      weekly_seasonality=True,
      yearly_seasonality=True,
      seasonality_mode='multiplicative'
      )
     
    # fit the model to historical data
    model.fit(history_pd)
  2. 다음으로 예측을 작성합니다.
    %%spark -o df
    
    # define a dataset including both historical dates & 90-days beyond the last available date
    future_pd = model.make_future_dataframe(
      periods=90, 
      freq='d', 
      include_history=True
      )
     
    forecast_pd = model.predict(future_pd)
    
    df=spark.createDataFrame(forecast_pd)
    from autovizwidget.widget.utils import display_dataframe
    display_dataframe(df.head(6))
  3. 예측 구성 요소를 검사합니다.
    모델은 어떻게 수행되었습니까? 여기서는 그래프로 표시되는 모델의 일반 및 계절 추세를 확인할 수 있습니다.
    %%spark 
    
    import matplotlib.pyplot as plt
    
    trends_fig = model.plot_components(forecast_pd)
    
    %matplot plt
  4. 과거 데이터 대 예측 데이터를 봅니다.
    여기서는 실제 및 예측된 데이터 라인과 미래에 대한 예측을 확인할 수 있습니다. 하지만 읽기 쉽게 유지하기 위해 그래프를 과거 데이터의 마지막 연도로 제한할 것입니다.
    %%spark 
    
    predict_fig = model.plot( forecast_pd, xlabel='date', ylabel='sales')
    
    # adjust figure to display dates from last year + the 90 day forecast
    xlim = predict_fig.axes[0].get_xlim()
    new_xlim = ( xlim[1]-(180.0+365.0), xlim[1]-90.0)
    predict_fig.axes[0].set_xlim(new_xlim)
    
    import matplotlib.pyplot as plt
    
    %matplot plt
    이 시각화는 약간 사용 중입니다. 검은색 점은 예측을 나타내는 진한 파란색 선과 (95%) 불확실성 간격을 나타내는 밝은 파란색 밴드를 가진 실제 값을 나타냅니다.
  5. 평가 척도를 계산합니다.
    시각적 검사가 유용하지만 예측을 평가하는 더 좋은 방법은 세트의 실제 값을 기준으로 예측된 평균 절대 오차, 평균 제곱 오차 및 평균 제곱 오차 값을 계산하는 것입니다.
    %%spark 
    import pandas as pd
    from sklearn.metrics import mean_squared_error, mean_absolute_error
    from math import sqrt
    from datetime import date
    
    # get historical actuals & predictions for comparison
    
    actuals_pd = history_pd[ history_pd['ds'] < date(2018, 1, 1) ]['y']
    predicted_pd = forecast_pd[ forecast_pd['ds'] < pd.to_datetime('2018-01-01') ]['yhat']
    
    # calculate evaluation metrics
    
    mae = mean_absolute_error(actuals_pd, predicted_pd)
    mse = mean_squared_error(actuals_pd, predicted_pd)
    rmse = sqrt(mse)
    
    # print metrics to the screen
    
    print( '\n'.join(['MAE: {0}', 'MSE: {1}', 'RMSE: {2}']).format(mae, mse, rmse) )
    

대규모 예측 생성

  1. 예측 생성을 확장합니다.
    우리의 벨트 아래 기계공으로, 개별 상점과 품목 조합에 대한 수많은 미세한 모델과 예측을 구축하는 우리의 원래 목표를 해결해 보겠습니다. 먼저 점포-품목-일자 세분성 레벨에서 판매 데이터를 어셈블합니다. 모든 상점-품목 조합에 대한 데이터를 검색합니다. 이 데이터 세트의 데이터는 이 세분성 레벨에서 이미 집계되어야 하지만 예상되는 데이터 구조를 갖도록 명시적으로 집계됩니다.
    %%spark
    
    sql_statement = '''
      SELECT
        store,
        item,
        CAST(date as date) as ds,
        SUM(sales) as y
      FROM train
      GROUP BY store, item, ds
      ORDER BY store, item, ds
      '''
    
    store_item_history = (
      spark
        .sql( sql_statement )
        .repartition(sc.defaultParallelism, ['store', 'item'])
      ).cache()
  2. 저장소 품목 일자 수준에서 집계된 데이터를 사용하여 데이터를 선지자에게 전달하는 방법을 고려해야 합니다. 각 매장 및 품목 조합에 대한 모델을 구축하는 것이 목표인 경우 방금 조립한 데이터세트에서 매장 품목 하위 세트를 전달하고, 해당 하위 세트에서 모델을 교육하고, 매장 품목 예측을 다시 받아야 합니다. 예측이 조립된 상점 및 품목 식별자를 보유하는 구조와 같은 구조를 가진 데이터 세트로 예측이 반환될 것으로 예상되며, 출력은 예언자 모델에서 생성된 관련 필드 하위 세트로만 제한됩니다.
    %%spark
    
    from pyspark.sql.types import *
    
    result_schema =StructType([
      StructField('ds',DateType()),
      StructField('store',IntegerType()),
      StructField('item',IntegerType()),
      StructField('y',FloatType()),
      StructField('yhat',FloatType()),
      StructField('yhat_upper',FloatType()),
      StructField('yhat_lower',FloatType())
      ])
    
  3. 모델을 교육하고 예측을 생성하기 위해 Pandas 함수를 활용합니다. 이 함수를 정의하여 상점과 품목 조합을 중심으로 구성된 데이터의 하위 세트를 수신합니다. 이전 셀에서 식별된 형식으로 예측이 반환됩니다.
    %%spark
    
    def forecast_store_item( history_pd: pd.DataFrame ) -> pd.DataFrame:
    
      # TRAIN MODEL AS BEFORE
      # --------------------------------------
      # remove missing values (more likely at day-store-item level)
    
      history_pd = history_pd.dropna()
    
      # configure the model
      model = Prophet(
        interval_width=0.95,
        growth='linear',
        daily_seasonality=False,
        weekly_seasonality=True,
        yearly_seasonality=True,
        seasonality_mode='multiplicative'
        )
    
      # train the model
      model.fit( history_pd )
      # --------------------------------------
    
      # BUILD FORECAST AS BEFORE
      # --------------------------------------
    
      # make predictions
      future_pd = model.make_future_dataframe(
        periods=90, 
        freq='d', 
        include_history=True
        )
    
      forecast_pd = model.predict( future_pd )  
      # --------------------------------------
    
      # ASSEMBLE EXPECTED RESULT SET
      # --------------------------------------
      # get relevant fields from forecast
      f_pd = forecast_pd[ ['ds','yhat', 'yhat_upper', 'yhat_lower'] ].set_index('ds')
    
      # get relevant fields from history
      h_pd = history_pd[['ds','store','item','y']].set_index('ds')
    
      # join history and forecast
      results_pd = f_pd.join( h_pd, how='left' )
      results_pd.reset_index(level=0, inplace=True)
    
      # get store & item from incoming data set
      results_pd['store'] = history_pd['store'].iloc[0]
      results_pd['item'] = history_pd['item'].iloc[0]
      # --------------------------------------
    
      # return expected dataset
      return results_pd[ ['ds', 'store', 'item', 'y', 'yhat', 'yhat_upper', 'yhat_lower'] ]  
    
  4. 각 상점-품목 조합에 예측 기능을 적용합니다.
    Pandas 함수를 호출하여 예측을 작성해 보겠습니다. 이 작업은 저장소 및 품목과 관련된 과거 데이터세트를 그룹화하여 수행합니다. 그런 다음 각 그룹에 기능을 적용하고 데이터 관리 목적으로 training_date로 오늘 날짜를 기록합니다.
    %%spark -o df
    
    from pyspark.sql.functions import current_date
    
    df = (
      store_item_history
        .groupBy('store', 'item')
          .applyInPandas(forecast_store_item, schema=result_schema)
        .withColumn('training_date', current_date() )
        )
    
    df.createOrReplaceTempView('new_forecasts')
    from autovizwidget.widget.utils import display_dataframe
    display_dataframe(df)
  5. 델타 테이블을 생성하여 예측 출력 데이터를 지속합니다.
    %%spark
    
    db_name = "gold_db"
    spark.sql("CREATE DATABASE IF NOT EXISTS " + db_name)
    
    spark.sql("USE " + db_name)
    %%spark
    
    spark.sql("""create table if not exists forecasts (
                  date date,
                  store integer,
                  item integer,
                  sales float,
                  sales_predicted float,
                  sales_predicted_upper float,
                  sales_predicted_lower float,
                  training_date date
                  )
                using delta
                partitioned by (date)""")
    
    spark.sql("""merge into forecasts f
                    using new_forecasts n 
                    on f.date = n.ds and f.store = n.store and f.item = n.item
                    when matched then update set f.date = n.ds,
                      f.store = n.store,
                      f.item = n.item,
                      f.sales = n.y,
                      f.sales_predicted = n.yhat,
                      f.sales_predicted_upper = n.yhat_upper,
                      f.sales_predicted_lower = n.yhat_lower,
                      f.training_date = n.training_date
                    when not matched then insert (date,
                      store,
                      item,
                      sales,
                      sales_predicted,
                      sales_predicted_upper,
                      sales_predicted_lower,
                      training_date)
                    values (n.ds,
                      n.store,
                      n.item,
                      n.y,
                      n.yhat,
                      n.yhat_upper,
                      n.yhat_lower,
                      n.training_date)""")
  6. 각 예측의 양호성(또는 불량)은 어떻습니까? 각 예측을 평가하기 위해 동일한 기법을 적용해 보겠습니다. pandas 함수 기법을 사용하여 각 상점-품목 예측에 대한 평가 척도를 생성할 수 있습니다.
    %%spark
    
    # schema of expected result set
    eval_schema =StructType([
      StructField('training_date', DateType()),
      StructField('store', IntegerType()),
      StructField('item', IntegerType()),
      StructField('mae', FloatType()),
      StructField('mse', FloatType()),
      StructField('rmse', FloatType())
      ])
     
    # define function to calculate metrics
    def evaluate_forecast( evaluation_pd: pd.DataFrame ) -> pd.DataFrame:
      
      # get store & item in incoming data set
      training_date = evaluation_pd['training_date'].iloc[0]
      store = evaluation_pd['store'].iloc[0]
      item = evaluation_pd['item'].iloc[0]
      
      # calculate evaluation metrics
      mae = mean_absolute_error( evaluation_pd['y'], evaluation_pd['yhat'] )
      mse = mean_squared_error( evaluation_pd['y'], evaluation_pd['yhat'] )
      rmse = sqrt( mse )
      
      # assemble result set
      results = {'training_date':[training_date], 'store':[store], 'item':[item], 'mae':[mae], 'mse':[mse], 'rmse':[rmse]}
      return pd.DataFrame.from_dict( results )
     
    # calculate metrics
    results = (
      spark
        .table('new_forecasts')
        .filter('ds < \'2018-01-01\'') # limit evaluation to periods where we have historical data
        .select('training_date', 'store', 'item', 'y', 'yhat')
        .groupBy('training_date', 'store', 'item')
        .applyInPandas(evaluate_forecast, schema=eval_schema)
        )
  7. 다시 시도해 보면 평가 측정항목을 지속해 보겠습니다. 각 예측에 대한 척도를 보고하려고 하므로 쿼리 가능한 델타 테이블에 계속됩니다.
    %%spark
    
    spark.sql("""create table if not exists forecast_evals (
                  store integer,
                  item integer,
                  mae float,
                  mse float,
                  rmse float,
                  training_date date
                  )
                using delta
                partitioned by (training_date)""")
    
    spark.sql("""insert into forecast_evals
                select
                  store,
                  item,
                  mae,
                  mse,
                  rmse,
                  training_date
                from new_forecast_evals""")
  8. 이제 각 매장 품목 조합에 대한 예측을 구성하고 각각에 대한 기본 평가 척도를 생성했습니다. 이 예측 데이터를 보려면 간단한 쿼리를 실행할 수 있습니다(여기에서 1개에서 3개까지 하나의 제품을 생산하도록 제한).
    %%spark -o df
    
    df=spark.sql("""SELECT
                      store,
                      date,
                      sales_predicted,
                      sales_predicted_upper,
                      sales_predicted_lower
                    FROM forecasts a
                    WHERE item = 1 AND
                          store IN (1, 2, 3) AND
                          date >= '2018-01-01' AND
                          training_date=current_date()
                    ORDER BY store""")
    from autovizwidget.widget.utils import display_dataframe
    display_dataframe(df.head(6))
  9. 마지막으로 평가 측정항목을 검색해 보겠습니다.
    %%spark -o df
    
    df=spark.sql("""SELECT
                      store,
                      mae,
                      mse,
                      rmse
                    FROM forecast_evals a
                    WHERE item = 1 AND
                          training_date=current_date()
                    ORDER BY store""")
    
    from autovizwidget.widget.utils import display_dataframe
    display_dataframe(df.head(6))
이제 OCI Data Science 노트북을 설정하고 대화식으로 Demand Forecasting 애플리케이션(PySpark)을 대규모로 실행할 수 있는 OCI Data Flow 세션을 생성했습니다.

추가 비용이 발생하지 않도록 하려면 이 솔루션을 테스트하기 위해 생성된 모든 리소스를 삭제 또는 종료하고 OCI 객체 저장소 버킷을 정리합니다.

지금 OCI Data Flow를 시작하려면 Oracle Cloud 무료 체험판에 등록하거나 기존 계정에 사인인하십시오. Data Flow의 15분 무료 설치 필요 자습서를 통해 Oracle Cloud Infrastructure를 통해 Spark 처리가 얼마나 쉬운지 확인할 수 있습니다.