Generar previsiones de demanda

En estos pasos se describe cómo generar previsiones de demanda en el bloc de notas de OCI Data Science.

Configurar entorno

  1. En el bloc de notas de ciencia de datos, configure la sesión de OCI Data Flow. Utilice la biblioteca de python pip install prophet al crear el paquete Conda personalizado.
    import ads
    
    ads.set_auth("resource_principal")
    
    %reload_ext dataflow.magics
    
    %help
    
    import json
    command = {
        "compartmentId": "your-compartment-ocid",
        "displayName": "display-name",
        "sparkVersion": "3.2.1",
        "driverShape": "VM.Standard.E3.Flex",
        "executorShape": "VM.Standard.E3.Flex",
        "driverShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "executorShapeConfig":{"ocpus":1,"memoryInGBs":16},
        "numExecutors": 1,
        "metastoreId": "your-metastore-ocid",
        "configuration":{
                 "spark.archives":"oci://your-oci-bucket-name@your-oci-tenancy-name/your-custom-conda-env-path#conda",
                         "spark.oracle.datasource.enabled":"true",
                         "spark.delta.logStore.oci.impl":"io.delta.storage.OracleCloudLogStore",
                         "spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension",
                         "spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"}
    }
    command = f'\'{json.dumps(command)}\''
    print("command",command)
    
  2. Importar bibliotecas de python dependientes. Por ejemplo:
    %%spark
    #Import required libraries.
    
    import json
    import os
    import sys
    import datetime
    import oci
    import pyspark.sql 
    from pyspark.sql.functions import countDistinct
    
    from pyspark.ml import Pipeline
    from pyspark.ml.classification import DecisionTreeClassifier
    from pyspark.ml.feature import StringIndexer, VectorIndexer
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    # $example off$
    from pyspark.sql import SparkSession
    
    from delta.tables import *
    
    import pandas as pd
    
    from prophet import Prophet
    import logging
    
    import matplotlib.pyplot as plt
    

Examinar datos

  1. Revise los datos.
    Para nuestro conjunto de datos de formación, utilizaremos cinco años de datos de ventas de unidad de artículo de tienda para 50 artículos en 10 tiendas diferentes.
    %%spark
    
    from pyspark.sql.types import *
    
    # structure of the training data set
    
    train_schema = StructType([
      StructField('date', DateType()),
      StructField('store', IntegerType()),
      StructField('item', IntegerType()),
      StructField('sales', IntegerType())
      ])
    
    # read the training file into a dataframe
    
    train = spark.read.csv(
      'oci://demo-bucket@orasenatdpltintegration03/forecast/bronze/train.csv', 
      header=True, 
      schema=train_schema
      )
    
    # make the dataframe queryable as a temporary view
    
    train.createOrReplaceTempView('train')
    
    # show data
    
    train.show()
    
  2. Vea las tendencias anuales al realizar la previsión de demanda y, a menudo, nos interesan las tendencias generales y la estacionalidad. Comencemos nuestra exploración examinando la tendencia anual de las ventas unitarias.
    %%spark
    
    df = spark.sql(""" SELECT
      year(date) as year, 
      sum(sales) as sales
    FROM train
    GROUP BY year(date)
    ORDER BY year;""")
    
    df.show(5)
    
    De los datos se desprende claramente que existe una tendencia generalmente ascendente en el total de ventas de unidades en las tiendas. Si tuviéramos un mejor conocimiento de los mercados servidos por estas tiendas, podríamos querer identificar si hay una capacidad de crecimiento máxima que esperaríamos abordar a lo largo de la vida de nuestra previsión. Sin ese conocimiento y al ver rápidamente este conjunto de datos, se siente seguro asumir que si nuestro objetivo es hacer una previsión unos días, meses o incluso un año más, es posible que esperemos un crecimiento lineal continuo durante ese período de tiempo.
  3. Consulta de tendencias mensuales.
    Examinemos la estacionalidad. Si agregamos los datos alrededor de los meses individuales en cada año, se observa un patrón estacional anual distinto que parece crecer a escala con el crecimiento general de las ventas.
    %%spark -o df
    
    df = spark.sql("""SELECT 
      TRUNC(date, 'MM') as month,
      SUM(sales) as sales
    FROM train
    GROUP BY TRUNC(date, 'MM')
    ORDER BY month;""")
    
    from autovizwidget.widget.utils import display_dataframe
    display_dataframe(df.head(6))
  4. Consulta de tendencias de días de la semana.
    Al agregar los datos a un nivel de día de la semana, se observa un patrón estacional semanal pronunciado con un pico el domingo (día de la semana 0), una caída dura el lunes (día de la semana 1) y luego una recogida constante durante la semana volviendo al domingo alto. Este patrón parece ser estable en los cinco años de observaciones.
    %%spark -o df
    
    df = spark.sql(""" 
    SELECT
      YEAR(date) as year,
      (
        CASE
          WHEN DATE_FORMAT(date, 'E') = 'Sun' THEN 0
          WHEN DATE_FORMAT(date, 'E') = 'Mon' THEN 1
          WHEN DATE_FORMAT(date, 'E') = 'Tue' THEN 2
          WHEN DATE_FORMAT(date, 'E') = 'Wed' THEN 3
          WHEN DATE_FORMAT(date, 'E') = 'Thu' THEN 4
          WHEN DATE_FORMAT(date, 'E') = 'Fri' THEN 5
          WHEN DATE_FORMAT(date, 'E') = 'Sat' THEN 6
        END
      ) % 7 as weekday,
      AVG(sales) as sales
    FROM (
      SELECT 
        date,
        SUM(sales) as sales
      FROM train
      GROUP BY date
     ) x
    GROUP BY year, weekday
    ORDER BY year, weekday;""")
    
    from autovizwidget.widget.utils import display_dataframe
    display_dataframe(df.head(6))
    

Generar una única previsión

Vamos a generar una única previsión de demanda antes de intentar generar previsiones para combinaciones individuales de tiendas y artículos. Podría ser útil construir una sola previsión por ninguna otra razón que orientarnos hacia el uso del profeta.
  1. Nuestro primer paso es ensamblar el conjunto de datos históricos en el que entrenaremos el modelo.
    %%spark
    
    history_pd = spark.sql(""" 
      SELECT
        CAST(date as date) as ds,
        sales as y
      FROM train
      WHERE store=1 AND item=1
      ORDER BY ds
    """).toPandas()
    
    history_pd = history_pd.dropna()
    
    history_pd = history_pd.dropna()
    
    Ahora, importaremos la biblioteca de profeta, pero porque puede ser un poco detallado cuando esté en uso, necesitamos ajustar la configuración de registro en nuestro entorno.
    %%spark
    
    from prophet import Prophet
    import logging
    
    # disable informational messages from prophet
    
    logging.getLogger('py4j').setLevel(logging.ERROR)
    %%spark
    
    # set model parameters
    model = Prophet(
      interval_width=0.95,
      growth='linear',
      daily_seasonality=False,
      weekly_seasonality=True,
      yearly_seasonality=True,
      seasonality_mode='multiplicative'
      )
     
    # fit the model to historical data
    model.fit(history_pd)
  2. A continuación, vamos a crear una previsión.
    %%spark -o df
    
    # define a dataset including both historical dates & 90-days beyond the last available date
    future_pd = model.make_future_dataframe(
      periods=90, 
      freq='d', 
      include_history=True
      )
     
    forecast_pd = model.predict(future_pd)
    
    df=spark.createDataFrame(forecast_pd)
    from autovizwidget.widget.utils import display_dataframe
    display_dataframe(df.head(6))
  3. Examine los componentes de previsión.
    ¿Cómo funcionó nuestro modelo? Aquí podemos ver las tendencias generales y estacionales en nuestro modelo presentado como gráficos.
    %%spark 
    
    import matplotlib.pyplot as plt
    
    trends_fig = model.plot_components(forecast_pd)
    
    %matplot plt
  4. Consulta de datos históricos frente a datos de predicción.
    Aquí, podemos ver cómo nuestros datos reales y previstos se alinean, así como una previsión para el futuro, aunque limitaremos nuestro gráfico al último año de datos históricos para mantenerlos legibles.
    %%spark 
    
    predict_fig = model.plot( forecast_pd, xlabel='date', ylabel='sales')
    
    # adjust figure to display dates from last year + the 90 day forecast
    xlim = predict_fig.axes[0].get_xlim()
    new_xlim = ( xlim[1]-(180.0+365.0), xlim[1]-90.0)
    predict_fig.axes[0].set_xlim(new_xlim)
    
    import matplotlib.pyplot as plt
    
    %matplot plt
    Esta visualización está un poco ocupada. Los puntos negros representan nuestros reales con la línea azul más oscura que representa nuestras predicciones y la banda azul más ligera que representa nuestro intervalo de incertidumbre (95%).
  5. Cálculo de métricas de evaluación.
    La inspección visual es útil, pero una mejor manera de evaluar la previsión es calcular los valores de Error absoluto medio, Error cuadrático medio y Error cuadrático medio raíz para los valores previstos en relación con los valores reales de nuestro conjunto.
    %%spark 
    import pandas as pd
    from sklearn.metrics import mean_squared_error, mean_absolute_error
    from math import sqrt
    from datetime import date
    
    # get historical actuals & predictions for comparison
    
    actuals_pd = history_pd[ history_pd['ds'] < date(2018, 1, 1) ]['y']
    predicted_pd = forecast_pd[ forecast_pd['ds'] < pd.to_datetime('2018-01-01') ]['yhat']
    
    # calculate evaluation metrics
    
    mae = mean_absolute_error(actuals_pd, predicted_pd)
    mse = mean_squared_error(actuals_pd, predicted_pd)
    rmse = sqrt(mse)
    
    # print metrics to the screen
    
    print( '\n'.join(['MAE: {0}', 'MSE: {1}', 'RMSE: {2}']).format(mae, mse, rmse) )
    

Generación de previsiones a escala

  1. Escalar generación de previsión.
    Con la mecánica bajo nuestro cinturón, abordemos nuestro objetivo original de construir numerosos modelos y pronósticos detallados para combinaciones individuales de tiendas y artículos. Empezaremos por ensamblar los datos de ventas en el nivel de granularidad store-item-date. Recupere datos para todas las combinaciones de tienda-artículo. Los datos de este juego de datos ya deben agregarse en este nivel de granularidad, pero estamos agregando explícitamente para garantizar que tenemos la estructura de datos esperada.
    %%spark
    
    sql_statement = '''
      SELECT
        store,
        item,
        CAST(date as date) as ds,
        SUM(sales) as y
      FROM train
      GROUP BY store, item, ds
      ORDER BY store, item, ds
      '''
    
    store_item_history = (
      spark
        .sql( sql_statement )
        .repartition(sc.defaultParallelism, ['store', 'item'])
      ).cache()
  2. Con nuestros datos agregados en el nivel de tienda-artículo-fecha, debemos considerar cómo transmitiremos nuestros datos al profeta. Si nuestro objetivo es crear un modelo para cada combinación de tienda y artículo, tendremos que transferir un subconjunto de tienda-artículo del conjunto de datos que acabamos de ensamblar, entrenar un modelo en ese subconjunto y recibir una previsión de tienda-artículo. Esperamos que esa previsión se devuelva como un conjunto de datos con una estructura como esta en la que conservamos los identificadores de tienda y artículo para los que se ensambló la previsión, y limitamos la salida solo al subconjunto relevante de campos generados por el modelo de Profeta.
    %%spark
    
    from pyspark.sql.types import *
    
    result_schema =StructType([
      StructField('ds',DateType()),
      StructField('store',IntegerType()),
      StructField('item',IntegerType()),
      StructField('y',FloatType()),
      StructField('yhat',FloatType()),
      StructField('yhat_upper',FloatType()),
      StructField('yhat_lower',FloatType())
      ])
    
  3. Para entrenar el modelo y generar una previsión, aprovecharemos una función de Pandas. Definiremos esta función para recibir un subjuego de datos organizado en torno a una combinación de tienda y artículo. Devolverá una previsión con el formato identificado en la celda anterior.
    %%spark
    
    def forecast_store_item( history_pd: pd.DataFrame ) -> pd.DataFrame:
    
      # TRAIN MODEL AS BEFORE
      # --------------------------------------
      # remove missing values (more likely at day-store-item level)
    
      history_pd = history_pd.dropna()
    
      # configure the model
      model = Prophet(
        interval_width=0.95,
        growth='linear',
        daily_seasonality=False,
        weekly_seasonality=True,
        yearly_seasonality=True,
        seasonality_mode='multiplicative'
        )
    
      # train the model
      model.fit( history_pd )
      # --------------------------------------
    
      # BUILD FORECAST AS BEFORE
      # --------------------------------------
    
      # make predictions
      future_pd = model.make_future_dataframe(
        periods=90, 
        freq='d', 
        include_history=True
        )
    
      forecast_pd = model.predict( future_pd )  
      # --------------------------------------
    
      # ASSEMBLE EXPECTED RESULT SET
      # --------------------------------------
      # get relevant fields from forecast
      f_pd = forecast_pd[ ['ds','yhat', 'yhat_upper', 'yhat_lower'] ].set_index('ds')
    
      # get relevant fields from history
      h_pd = history_pd[['ds','store','item','y']].set_index('ds')
    
      # join history and forecast
      results_pd = f_pd.join( h_pd, how='left' )
      results_pd.reset_index(level=0, inplace=True)
    
      # get store & item from incoming data set
      results_pd['store'] = history_pd['store'].iloc[0]
      results_pd['item'] = history_pd['item'].iloc[0]
      # --------------------------------------
    
      # return expected dataset
      return results_pd[ ['ds', 'store', 'item', 'y', 'yhat', 'yhat_upper', 'yhat_lower'] ]  
    
  4. Aplicar la función de previsión a cada combinación de tienda-artículo.
    Llamemos a nuestra función pandas para crear nuestros pronósticos. Para ello, agrupamos nuestro conjunto de datos históricos en torno a la tienda y el artículo. A continuación, aplicamos nuestra función a cada grupo y nos dirigimos a la fecha de hoy como nuestro training_date con fines de gestión de datos.
    %%spark -o df
    
    from pyspark.sql.functions import current_date
    
    df = (
      store_item_history
        .groupBy('store', 'item')
          .applyInPandas(forecast_store_item, schema=result_schema)
        .withColumn('training_date', current_date() )
        )
    
    df.createOrReplaceTempView('new_forecasts')
    from autovizwidget.widget.utils import display_dataframe
    display_dataframe(df)
  5. Mantenga los datos de salida de las previsiones creando una tabla delta.
    %%spark
    
    db_name = "gold_db"
    spark.sql("CREATE DATABASE IF NOT EXISTS " + db_name)
    
    spark.sql("USE " + db_name)
    %%spark
    
    spark.sql("""create table if not exists forecasts (
                  date date,
                  store integer,
                  item integer,
                  sales float,
                  sales_predicted float,
                  sales_predicted_upper float,
                  sales_predicted_lower float,
                  training_date date
                  )
                using delta
                partitioned by (date)""")
    
    spark.sql("""merge into forecasts f
                    using new_forecasts n 
                    on f.date = n.ds and f.store = n.store and f.item = n.item
                    when matched then update set f.date = n.ds,
                      f.store = n.store,
                      f.item = n.item,
                      f.sales = n.y,
                      f.sales_predicted = n.yhat,
                      f.sales_predicted_upper = n.yhat_upper,
                      f.sales_predicted_lower = n.yhat_lower,
                      f.training_date = n.training_date
                    when not matched then insert (date,
                      store,
                      item,
                      sales,
                      sales_predicted,
                      sales_predicted_upper,
                      sales_predicted_lower,
                      training_date)
                    values (n.ds,
                      n.store,
                      n.item,
                      n.y,
                      n.yhat,
                      n.yhat_upper,
                      n.yhat_lower,
                      n.training_date)""")
  6. ¿Ahora qué tan bueno (o malo) es cada previsión? Apliquemos las mismas técnicas para evaluar cada previsión. Mediante la técnica de función pandas, podemos generar métricas de evaluación para cada previsión de artículo de tienda.
    %%spark
    
    # schema of expected result set
    eval_schema =StructType([
      StructField('training_date', DateType()),
      StructField('store', IntegerType()),
      StructField('item', IntegerType()),
      StructField('mae', FloatType()),
      StructField('mse', FloatType()),
      StructField('rmse', FloatType())
      ])
     
    # define function to calculate metrics
    def evaluate_forecast( evaluation_pd: pd.DataFrame ) -> pd.DataFrame:
      
      # get store & item in incoming data set
      training_date = evaluation_pd['training_date'].iloc[0]
      store = evaluation_pd['store'].iloc[0]
      item = evaluation_pd['item'].iloc[0]
      
      # calculate evaluation metrics
      mae = mean_absolute_error( evaluation_pd['y'], evaluation_pd['yhat'] )
      mse = mean_squared_error( evaluation_pd['y'], evaluation_pd['yhat'] )
      rmse = sqrt( mse )
      
      # assemble result set
      results = {'training_date':[training_date], 'store':[store], 'item':[item], 'mae':[mae], 'mse':[mse], 'rmse':[rmse]}
      return pd.DataFrame.from_dict( results )
     
    # calculate metrics
    results = (
      spark
        .table('new_forecasts')
        .filter('ds < \'2018-01-01\'') # limit evaluation to periods where we have historical data
        .select('training_date', 'store', 'item', 'y', 'yhat')
        .groupBy('training_date', 'store', 'item')
        .applyInPandas(evaluate_forecast, schema=eval_schema)
        )
  7. Una vez más, mantengamos las métricas de evaluación. Probablemente querremos informar de las métricas de cada previsión, por lo que las conservamos en una tabla delta consultable.
    %%spark
    
    spark.sql("""create table if not exists forecast_evals (
                  store integer,
                  item integer,
                  mae float,
                  mse float,
                  rmse float,
                  training_date date
                  )
                using delta
                partitioned by (training_date)""")
    
    spark.sql("""insert into forecast_evals
                select
                  store,
                  item,
                  mae,
                  mse,
                  rmse,
                  training_date
                from new_forecast_evals""")
  8. Ahora hemos creado una previsión para cada combinación de tienda-artículo y generado métricas de evaluación básicas para cada una. Para ver estos datos de previsión, podemos emitir una consulta simple (limitada aquí para producir uno en todas las tiendas de una a tres).
    %%spark -o df
    
    df=spark.sql("""SELECT
                      store,
                      date,
                      sales_predicted,
                      sales_predicted_upper,
                      sales_predicted_lower
                    FROM forecasts a
                    WHERE item = 1 AND
                          store IN (1, 2, 3) AND
                          date >= '2018-01-01' AND
                          training_date=current_date()
                    ORDER BY store""")
    from autovizwidget.widget.utils import display_dataframe
    display_dataframe(df.head(6))
  9. Por último, recuperemos métricas de evaluación.
    %%spark -o df
    
    df=spark.sql("""SELECT
                      store,
                      mae,
                      mse,
                      rmse
                    FROM forecast_evals a
                    WHERE item = 1 AND
                          training_date=current_date()
                    ORDER BY store""")
    
    from autovizwidget.widget.utils import display_dataframe
    display_dataframe(df.head(6))
Ahora hemos configurado un bloc de notas de OCI Data Science y hemos creado una sesión de OCI Data Flow para ejecutar interactivamente la aplicación Demand Forecasting (PySpark) a escala.

Para evitar incurrir en cargos adicionales, suprima o finalice todos los recursos creados para probar esta solución y limpie los bloques del almacén de objetos de OCI.

Para empezar a utilizar OCI Data Flow hoy mismo, regístrese para obtener la prueba gratuita de Oracle Cloud o inicie sesión en su cuenta existente. Pruebe el tutorial de 15 minutos sin necesidad de instalación de Data Flow para descubrir lo fácil que puede ser el procesamiento de Spark con Oracle Cloud Infrastructure.