- Genere previsiones de demanda a medida con Oracle Cloud Infrastructure
- Generar previsiones de demanda
Generar previsiones de demanda
En estos pasos se describe cómo generar previsiones de demanda en el bloc de notas de OCI Data Science.
Configurar entorno
- En el bloc de notas de ciencia de datos, configure la sesión de OCI Data Flow. Utilice la biblioteca de python
pip install prophetal crear el paquete Conda personalizado.import ads ads.set_auth("resource_principal") %reload_ext dataflow.magics %help import json command = { "compartmentId": "your-compartment-ocid", "displayName": "display-name", "sparkVersion": "3.2.1", "driverShape": "VM.Standard.E3.Flex", "executorShape": "VM.Standard.E3.Flex", "driverShapeConfig":{"ocpus":1,"memoryInGBs":16}, "executorShapeConfig":{"ocpus":1,"memoryInGBs":16}, "numExecutors": 1, "metastoreId": "your-metastore-ocid", "configuration":{ "spark.archives":"oci://your-oci-bucket-name@your-oci-tenancy-name/your-custom-conda-env-path#conda", "spark.oracle.datasource.enabled":"true", "spark.delta.logStore.oci.impl":"io.delta.storage.OracleCloudLogStore", "spark.sql.extensions":"io.delta.sql.DeltaSparkSessionExtension", "spark.sql.catalog.spark_catalog":"org.apache.spark.sql.delta.catalog.DeltaCatalog"} } command = f'\'{json.dumps(command)}\'' print("command",command) - Importar bibliotecas de python dependientes. Por ejemplo:
%%spark #Import required libraries. import json import os import sys import datetime import oci import pyspark.sql from pyspark.sql.functions import countDistinct from pyspark.ml import Pipeline from pyspark.ml.classification import DecisionTreeClassifier from pyspark.ml.feature import StringIndexer, VectorIndexer from pyspark.ml.evaluation import MulticlassClassificationEvaluator # $example off$ from pyspark.sql import SparkSession from delta.tables import * import pandas as pd from prophet import Prophet import logging import matplotlib.pyplot as plt
Examinar datos
- Revise los datos.Para nuestro conjunto de datos de formación, utilizaremos cinco años de datos de ventas de unidad de artículo de tienda para 50 artículos en 10 tiendas diferentes.
%%spark from pyspark.sql.types import * # structure of the training data set train_schema = StructType([ StructField('date', DateType()), StructField('store', IntegerType()), StructField('item', IntegerType()), StructField('sales', IntegerType()) ]) # read the training file into a dataframe train = spark.read.csv( 'oci://demo-bucket@orasenatdpltintegration03/forecast/bronze/train.csv', header=True, schema=train_schema ) # make the dataframe queryable as a temporary view train.createOrReplaceTempView('train') # show data train.show() - Vea las tendencias anuales al realizar la previsión de demanda y, a menudo, nos interesan las tendencias generales y la estacionalidad. Comencemos nuestra exploración examinando la tendencia anual de las ventas unitarias.
%%spark df = spark.sql(""" SELECT year(date) as year, sum(sales) as sales FROM train GROUP BY year(date) ORDER BY year;""") df.show(5)De los datos se desprende claramente que existe una tendencia generalmente ascendente en el total de ventas de unidades en las tiendas. Si tuviéramos un mejor conocimiento de los mercados servidos por estas tiendas, podríamos querer identificar si hay una capacidad de crecimiento máxima que esperaríamos abordar a lo largo de la vida de nuestra previsión. Sin ese conocimiento y al ver rápidamente este conjunto de datos, se siente seguro asumir que si nuestro objetivo es hacer una previsión unos días, meses o incluso un año más, es posible que esperemos un crecimiento lineal continuo durante ese período de tiempo. - Consulta de tendencias mensuales.Examinemos la estacionalidad. Si agregamos los datos alrededor de los meses individuales en cada año, se observa un patrón estacional anual distinto que parece crecer a escala con el crecimiento general de las ventas.
%%spark -o df df = spark.sql("""SELECT TRUNC(date, 'MM') as month, SUM(sales) as sales FROM train GROUP BY TRUNC(date, 'MM') ORDER BY month;""")from autovizwidget.widget.utils import display_dataframe display_dataframe(df.head(6)) - Consulta de tendencias de días de la semana.Al agregar los datos a un nivel de día de la semana, se observa un patrón estacional semanal pronunciado con un pico el domingo (día de la semana 0), una caída dura el lunes (día de la semana 1) y luego una recogida constante durante la semana volviendo al domingo alto. Este patrón parece ser estable en los cinco años de observaciones.
%%spark -o df df = spark.sql(""" SELECT YEAR(date) as year, ( CASE WHEN DATE_FORMAT(date, 'E') = 'Sun' THEN 0 WHEN DATE_FORMAT(date, 'E') = 'Mon' THEN 1 WHEN DATE_FORMAT(date, 'E') = 'Tue' THEN 2 WHEN DATE_FORMAT(date, 'E') = 'Wed' THEN 3 WHEN DATE_FORMAT(date, 'E') = 'Thu' THEN 4 WHEN DATE_FORMAT(date, 'E') = 'Fri' THEN 5 WHEN DATE_FORMAT(date, 'E') = 'Sat' THEN 6 END ) % 7 as weekday, AVG(sales) as sales FROM ( SELECT date, SUM(sales) as sales FROM train GROUP BY date ) x GROUP BY year, weekday ORDER BY year, weekday;""")from autovizwidget.widget.utils import display_dataframe display_dataframe(df.head(6))
Generar una única previsión
Vamos a generar una única previsión de demanda antes de intentar generar previsiones para combinaciones individuales de tiendas y artículos. Podría ser útil construir una sola previsión por ninguna otra razón que orientarnos hacia el uso del profeta.
- Nuestro primer paso es ensamblar el conjunto de datos históricos en el que entrenaremos el modelo.
%%spark history_pd = spark.sql(""" SELECT CAST(date as date) as ds, sales as y FROM train WHERE store=1 AND item=1 ORDER BY ds """).toPandas() history_pd = history_pd.dropna() history_pd = history_pd.dropna()Ahora, importaremos la biblioteca de profeta, pero porque puede ser un poco detallado cuando esté en uso, necesitamos ajustar la configuración de registro en nuestro entorno.%%spark from prophet import Prophet import logging # disable informational messages from prophet logging.getLogger('py4j').setLevel(logging.ERROR)%%spark # set model parameters model = Prophet( interval_width=0.95, growth='linear', daily_seasonality=False, weekly_seasonality=True, yearly_seasonality=True, seasonality_mode='multiplicative' ) # fit the model to historical data model.fit(history_pd) - A continuación, vamos a crear una previsión.
%%spark -o df # define a dataset including both historical dates & 90-days beyond the last available date future_pd = model.make_future_dataframe( periods=90, freq='d', include_history=True ) forecast_pd = model.predict(future_pd) df=spark.createDataFrame(forecast_pd)from autovizwidget.widget.utils import display_dataframe display_dataframe(df.head(6)) - Examine los componentes de previsión.¿Cómo funcionó nuestro modelo? Aquí podemos ver las tendencias generales y estacionales en nuestro modelo presentado como gráficos.
%%spark import matplotlib.pyplot as plt trends_fig = model.plot_components(forecast_pd) %matplot plt - Consulta de datos históricos frente a datos de predicción.Aquí, podemos ver cómo nuestros datos reales y previstos se alinean, así como una previsión para el futuro, aunque limitaremos nuestro gráfico al último año de datos históricos para mantenerlos legibles.
%%spark predict_fig = model.plot( forecast_pd, xlabel='date', ylabel='sales') # adjust figure to display dates from last year + the 90 day forecast xlim = predict_fig.axes[0].get_xlim() new_xlim = ( xlim[1]-(180.0+365.0), xlim[1]-90.0) predict_fig.axes[0].set_xlim(new_xlim) import matplotlib.pyplot as plt %matplot pltEsta visualización está un poco ocupada. Los puntos negros representan nuestros reales con la línea azul más oscura que representa nuestras predicciones y la banda azul más ligera que representa nuestro intervalo de incertidumbre (95%). - Cálculo de métricas de evaluación.La inspección visual es útil, pero una mejor manera de evaluar la previsión es calcular los valores de Error absoluto medio, Error cuadrático medio y Error cuadrático medio raíz para los valores previstos en relación con los valores reales de nuestro conjunto.
%%spark import pandas as pd from sklearn.metrics import mean_squared_error, mean_absolute_error from math import sqrt from datetime import date # get historical actuals & predictions for comparison actuals_pd = history_pd[ history_pd['ds'] < date(2018, 1, 1) ]['y'] predicted_pd = forecast_pd[ forecast_pd['ds'] < pd.to_datetime('2018-01-01') ]['yhat'] # calculate evaluation metrics mae = mean_absolute_error(actuals_pd, predicted_pd) mse = mean_squared_error(actuals_pd, predicted_pd) rmse = sqrt(mse) # print metrics to the screen print( '\n'.join(['MAE: {0}', 'MSE: {1}', 'RMSE: {2}']).format(mae, mse, rmse) )
Generación de previsiones a escala
- Escalar generación de previsión.Con la mecánica bajo nuestro cinturón, abordemos nuestro objetivo original de construir numerosos modelos y pronósticos detallados para combinaciones individuales de tiendas y artículos. Empezaremos por ensamblar los datos de ventas en el nivel de granularidad store-item-date. Recupere datos para todas las combinaciones de tienda-artículo. Los datos de este juego de datos ya deben agregarse en este nivel de granularidad, pero estamos agregando explícitamente para garantizar que tenemos la estructura de datos esperada.
%%spark sql_statement = ''' SELECT store, item, CAST(date as date) as ds, SUM(sales) as y FROM train GROUP BY store, item, ds ORDER BY store, item, ds ''' store_item_history = ( spark .sql( sql_statement ) .repartition(sc.defaultParallelism, ['store', 'item']) ).cache() - Con nuestros datos agregados en el nivel de tienda-artículo-fecha, debemos considerar cómo transmitiremos nuestros datos al profeta. Si nuestro objetivo es crear un modelo para cada combinación de tienda y artículo, tendremos que transferir un subconjunto de tienda-artículo del conjunto de datos que acabamos de ensamblar, entrenar un modelo en ese subconjunto y recibir una previsión de tienda-artículo. Esperamos que esa previsión se devuelva como un conjunto de datos con una estructura como esta en la que conservamos los identificadores de tienda y artículo para los que se ensambló la previsión, y limitamos la salida solo al subconjunto relevante de campos generados por el modelo de Profeta.
%%spark from pyspark.sql.types import * result_schema =StructType([ StructField('ds',DateType()), StructField('store',IntegerType()), StructField('item',IntegerType()), StructField('y',FloatType()), StructField('yhat',FloatType()), StructField('yhat_upper',FloatType()), StructField('yhat_lower',FloatType()) ]) - Para entrenar el modelo y generar una previsión, aprovecharemos una función de Pandas. Definiremos esta función para recibir un subjuego de datos organizado en torno a una combinación de tienda y artículo. Devolverá una previsión con el formato identificado en la celda anterior.
%%spark def forecast_store_item( history_pd: pd.DataFrame ) -> pd.DataFrame: # TRAIN MODEL AS BEFORE # -------------------------------------- # remove missing values (more likely at day-store-item level) history_pd = history_pd.dropna() # configure the model model = Prophet( interval_width=0.95, growth='linear', daily_seasonality=False, weekly_seasonality=True, yearly_seasonality=True, seasonality_mode='multiplicative' ) # train the model model.fit( history_pd ) # -------------------------------------- # BUILD FORECAST AS BEFORE # -------------------------------------- # make predictions future_pd = model.make_future_dataframe( periods=90, freq='d', include_history=True ) forecast_pd = model.predict( future_pd ) # -------------------------------------- # ASSEMBLE EXPECTED RESULT SET # -------------------------------------- # get relevant fields from forecast f_pd = forecast_pd[ ['ds','yhat', 'yhat_upper', 'yhat_lower'] ].set_index('ds') # get relevant fields from history h_pd = history_pd[['ds','store','item','y']].set_index('ds') # join history and forecast results_pd = f_pd.join( h_pd, how='left' ) results_pd.reset_index(level=0, inplace=True) # get store & item from incoming data set results_pd['store'] = history_pd['store'].iloc[0] results_pd['item'] = history_pd['item'].iloc[0] # -------------------------------------- # return expected dataset return results_pd[ ['ds', 'store', 'item', 'y', 'yhat', 'yhat_upper', 'yhat_lower'] ] - Aplicar la función de previsión a cada combinación de tienda-artículo.Llamemos a nuestra función pandas para crear nuestros pronósticos. Para ello, agrupamos nuestro conjunto de datos históricos en torno a la tienda y el artículo. A continuación, aplicamos nuestra función a cada grupo y nos dirigimos a la fecha de hoy como nuestro
training_datecon fines de gestión de datos.%%spark -o df from pyspark.sql.functions import current_date df = ( store_item_history .groupBy('store', 'item') .applyInPandas(forecast_store_item, schema=result_schema) .withColumn('training_date', current_date() ) ) df.createOrReplaceTempView('new_forecasts')from autovizwidget.widget.utils import display_dataframe display_dataframe(df) - Mantenga los datos de salida de las previsiones creando una tabla delta.
%%spark db_name = "gold_db" spark.sql("CREATE DATABASE IF NOT EXISTS " + db_name) spark.sql("USE " + db_name)%%spark spark.sql("""create table if not exists forecasts ( date date, store integer, item integer, sales float, sales_predicted float, sales_predicted_upper float, sales_predicted_lower float, training_date date ) using delta partitioned by (date)""") spark.sql("""merge into forecasts f using new_forecasts n on f.date = n.ds and f.store = n.store and f.item = n.item when matched then update set f.date = n.ds, f.store = n.store, f.item = n.item, f.sales = n.y, f.sales_predicted = n.yhat, f.sales_predicted_upper = n.yhat_upper, f.sales_predicted_lower = n.yhat_lower, f.training_date = n.training_date when not matched then insert (date, store, item, sales, sales_predicted, sales_predicted_upper, sales_predicted_lower, training_date) values (n.ds, n.store, n.item, n.y, n.yhat, n.yhat_upper, n.yhat_lower, n.training_date)""") - ¿Ahora qué tan bueno (o malo) es cada previsión? Apliquemos las mismas técnicas para evaluar cada previsión. Mediante la técnica de función pandas, podemos generar métricas de evaluación para cada previsión de artículo de tienda.
%%spark # schema of expected result set eval_schema =StructType([ StructField('training_date', DateType()), StructField('store', IntegerType()), StructField('item', IntegerType()), StructField('mae', FloatType()), StructField('mse', FloatType()), StructField('rmse', FloatType()) ]) # define function to calculate metrics def evaluate_forecast( evaluation_pd: pd.DataFrame ) -> pd.DataFrame: # get store & item in incoming data set training_date = evaluation_pd['training_date'].iloc[0] store = evaluation_pd['store'].iloc[0] item = evaluation_pd['item'].iloc[0] # calculate evaluation metrics mae = mean_absolute_error( evaluation_pd['y'], evaluation_pd['yhat'] ) mse = mean_squared_error( evaluation_pd['y'], evaluation_pd['yhat'] ) rmse = sqrt( mse ) # assemble result set results = {'training_date':[training_date], 'store':[store], 'item':[item], 'mae':[mae], 'mse':[mse], 'rmse':[rmse]} return pd.DataFrame.from_dict( results ) # calculate metrics results = ( spark .table('new_forecasts') .filter('ds < \'2018-01-01\'') # limit evaluation to periods where we have historical data .select('training_date', 'store', 'item', 'y', 'yhat') .groupBy('training_date', 'store', 'item') .applyInPandas(evaluate_forecast, schema=eval_schema) ) - Una vez más, mantengamos las métricas de evaluación. Probablemente querremos informar de las métricas de cada previsión, por lo que las conservamos en una tabla delta consultable.
%%spark spark.sql("""create table if not exists forecast_evals ( store integer, item integer, mae float, mse float, rmse float, training_date date ) using delta partitioned by (training_date)""") spark.sql("""insert into forecast_evals select store, item, mae, mse, rmse, training_date from new_forecast_evals""") - Ahora hemos creado una previsión para cada combinación de tienda-artículo y generado métricas de evaluación básicas para cada una. Para ver estos datos de previsión, podemos emitir una consulta simple (limitada aquí para producir uno en todas las tiendas de una a tres).
%%spark -o df df=spark.sql("""SELECT store, date, sales_predicted, sales_predicted_upper, sales_predicted_lower FROM forecasts a WHERE item = 1 AND store IN (1, 2, 3) AND date >= '2018-01-01' AND training_date=current_date() ORDER BY store""")from autovizwidget.widget.utils import display_dataframe display_dataframe(df.head(6)) - Por último, recuperemos métricas de evaluación.
%%spark -o df df=spark.sql("""SELECT store, mae, mse, rmse FROM forecast_evals a WHERE item = 1 AND training_date=current_date() ORDER BY store""")from autovizwidget.widget.utils import display_dataframe display_dataframe(df.head(6))
Ahora hemos configurado un bloc de notas de OCI Data Science y hemos creado una sesión de OCI Data Flow para ejecutar interactivamente la aplicación Demand Forecasting (PySpark) a escala.
Para evitar incurrir en cargos adicionales, suprima o finalice todos los recursos creados para probar esta solución y limpie los bloques del almacén de objetos de OCI.
Para empezar a utilizar OCI Data Flow hoy mismo, regístrese para obtener la prueba gratuita de Oracle Cloud o inicie sesión en su cuenta existente. Pruebe el tutorial de 15 minutos sin necesidad de instalación de Data Flow para descubrir lo fácil que puede ser el procesamiento de Spark con Oracle Cloud Infrastructure.











