Pandas integration

Pandas is a Python package that provides powerful data structures for data analysis, time series, and statistics.

This topic provides a quick overview of how you can use Pandas to manipulate a DataFrame. To access the Pandas documentation, see: http://pandas.pydata.org/pandas-docs/version/0.17.1

Installing Pandas

Pandas is automatically includes in the Anaconda 2.5 version, and thus does not need a separate installation.

If Pandas is not installed in your version of Python, install Pandas via pip or conda first, re-start BDD Shell, and then run this command in the Python interpreter:
$ conda install pandas

Convert a Spark DataFrame into a Pandas DataFrame

First import the Pandas package and then do the conversion:
>>> ds = dss.dataset('default_edp_75f94d7b-dea9-4d77-8e66-ed1bf981f615')
>>> df = ds.to_spark()
>>> import pandas as pd
>>> pdf = df.toPandas()
16/03/28 11:52:33 INFO MemoryStore: ensureFreeSpace(97136) called with curMem=269241, maxMem=556038881
...
16/03/28 11:52:41 INFO YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool

Manipulate data in Pandas DataFrame

Add a new column and change a column:
pdf['tip_rolling_mean'] = pd.rolling_mean(pdf.tip_amount, window=10)
pdf['fare_tip_rolling_corr'] = pd.rolling_corr(pdf.fare_amount, pdf.tip_amount, window=10)

Convert the Pandas DataFrame back into a Spark DataFrame

>>> df2 = sqlContext.createDataFrame(pdf)

Persist the Spark DataFrame into a new Hive table

First, create the new Hive table. The table will be named "new_taxi_data" and will reside in the Hive "default" database:
>>> df2.write.saveAsTable('default.new_taxi_data')
...
16/03/28 15:12:28 INFO SparkContext: Starting job: saveAsTable at NativeMethodAccessorImpl.java:-2
16/03/28 15:12:28 INFO DAGScheduler: Got job 2 (saveAsTable at NativeMethodAccessorImpl.java:-2) with 2 output partitions
16/03/28 15:12:28 INFO DAGScheduler: Final stage: ResultStage 2(saveAsTable at NativeMethodAccessorImpl.java:-2)
...
16/03/28 15:12:32 INFO HiveContext$$anon$1: Persisting data source relation with a single input path into 
Hive metastore in Hive compatible format. Input path: hdfs://bus14.example.com:8020/user/hive/warehouse/new_taxi_data
>>>
Finally, print out the schema of the new Hive table to verify that the new columns have been added:
>>> df2.printSchema()
root
 |-- trip_distance: double (nullable = true)
 ...
 |-- PRIMARY_KEY: string (nullable = true)
 |-- tip_rolling_mean: double (nullable = true)
 |-- fare_tip_rolling_corr: double (nullable = true)

>>>