Pandasの統合

Pandasは、データ分析、時系列および統計のための強力なデータ構造を提供するPythonパッケージです。

このトピックでは、Pandasを使用してDataFrameを操作する方法の簡単な概要を説明します。Pandasのドキュメントについては、http://pandas.pydata.org/pandas-docs/version/0.17.1にアクセスしてください。

Pandasのインストール

Pandasは、Anaconda 2.5バージョンに含まれており、別個にインストールする必要はありません。

使用しているバージョンのPythonにPandasがインストールされていない場合は、最初にpipまたはcondaを使用してPandasをインストールし、BDDシェルを再起動して、次のコマンドをPythonインタープリタで実行します。
$ conda install pandas

Pandas DataFrameへのSpark DataFrameの変換

最初にPandasパッケージをインポートし、変換を行います。
>>> ds = dss.dataset('default_edp_75f94d7b-dea9-4d77-8e66-ed1bf981f615')
>>> df = ds.to_spark()
>>> import pandas as pd
>>> pdf = df.toPandas()
16/03/28 11:52:33 INFO MemoryStore: ensureFreeSpace(97136) called with curMem=269241, maxMem=556038881
...
16/03/28 11:52:41 INFO YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool

Pandas DataFrameのデータの操作

新しい列を追加し、列を変更します。
pdf['tip_rolling_mean'] = pd.rolling_mean(pdf.tip_amount, window=10)
pdf['fare_tip_rolling_corr'] = pd.rolling_corr(pdf.fare_amount, pdf.tip_amount, window=10)

Spark DataFrameへのPandas DataFrameの変換

>>> df2 = sqlContext.createDataFrame(pdf)

新しいHive表へのSpark DataFrameの保存

最初に、新しいHive表を作成します。この表は、new_taxi_dataという名前であり、Hiveのdefaultデータベースにあります。
>>> df2.write.saveAsTable('default.new_taxi_data')
...
16/03/28 15:12:28 INFO SparkContext: Starting job: saveAsTable at NativeMethodAccessorImpl.java:-2
16/03/28 15:12:28 INFO DAGScheduler: Got job 2 (saveAsTable at NativeMethodAccessorImpl.java:-2) with 2 output partitions
16/03/28 15:12:28 INFO DAGScheduler: Final stage: ResultStage 2(saveAsTable at NativeMethodAccessorImpl.java:-2)
...
16/03/28 15:12:32 INFO HiveContext$$anon$1: Persisting data source relation with a single input path into 
Hive metastore in Hive compatible format. Input path: hdfs://bus14.example.com:8020/user/hive/warehouse/new_taxi_data
>>>
最後に、新しいHive表のスキーマを出力して、新しい列が追加されたことを確認します。
>>> df2.printSchema()
root
 |-- trip_distance: double (nullable = true)
 ...
 |-- PRIMARY_KEY: string (nullable = true)
 |-- tip_rolling_mean: double (nullable = true)
 |-- fare_tip_rolling_corr: double (nullable = true)

>>>