Spark DataFramesの操作

BDDシェルを使用すると、BDDデータ・セットをSpark DataFrameに変換できます。

BDDインスタンスのレコードを処理するには、Spark DataFrameを使用します。BDDデータ・セットをSpark DataFrameに変換したら、DataFrameに対してSpark APIメソッドを使用できます。これらのメソッドのいくつかが次の例で使用されています。

Spark APIの詳細は、http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameを参照してください。

Spark DataFrameへのBDDデータ・セットの変換

dataset.to_spark()メソッドを使用して、データ・セットをDataFrameに変換します。
>>> ds = dss.dataset('default_edp_d147818d-fac2-479b-b48b-28160ae290d0')
>>> df = ds.to_spark()
16/03/30 16:04:07 INFO HiveContext: Initializing execution hive, version 1.2.1
16/03/30 16:04:07 INFO ClientWrapper: Inspected Hadoop version: 2.6.0
16/03/30 16:04:07 INFO ClientWrapper: Loaded org.apache.hadoop.hive.shims.Hadoop23Shims for Hadoop version 2.6.0
16/03/30 16:04:07 WARN HiveConf: HiveConf of name hive.enable.spark.execution.engine does not exist
16/03/30 16:04:07 INFO metastore: Trying to connect to metastore with URI thrift://busgg2014.us.oracle.com:9083
16/03/30 16:04:08 INFO metastore: Connected to metastore.
...
16/03/30 16:04:20 INFO ParseDriver: Parsing command: SELECT ORIGINAL_RECORD.`vin`[0] `vin`,ORIGINAL_RECORD.`production_country`[0] `production_country`,ORIGINAL_RECORD.`production_region`[0] `production_region`,ORIGINAL_RECORD.`make`[0] `make`,ORIGINAL_RECORD.`manufacturer`[0] `manufacturer`,ORIGINAL_RECORD.`model`[0] `model`,ORIGINAL_RECORD.`model_year`[0] `model_year`,ORIGINAL_RECORD.`claim_date`[0] `claim_date`,ORIGINAL_RECORD.`dealer_geocode`[0] `dealer_geocode`,ORIGINAL_RECORD.`vehicle_dealer`[0] `vehicle_dealer`,ORIGINAL_RECORD.`dealer_state`[0] `dealer_state`,ORIGINAL_RECORD.`dealer_city`[0] `dealer_city`,ORIGINAL_RECORD.`labor_description`[0] `labor_description`,ORIGINAL_RECORD.`commodity`[0] `commodity`,ORIGINAL_RECORD.`complaint`[0] `complaint`,ORIGINAL_RECORD.`part_number`[0] `part_number`,ORIGINAL_RECORD.`sale_date`[0] `sale_date`,ORIGINAL_RECORD.`supplier_country`[0] `supplier_country`,ORIGINAL_RECORD.`supplier`[0] `supplier`,ORIGINAL_RECORD.`supplier_state`[0] `supplier_state`,ORIGINAL_RECORD.`labor_amount`[0] `labor_amount`,ORIGINAL_RECORD.`part_amount`[0] `part_amount`,ORIGINAL_RECORD.`claim_amount`[0] `claim_amount`,ORIGINAL_RECORD.`PRIMARY_KEY` `PRIMARY_KEY` FROM temp1
16/03/30 16:04:21 INFO ParseDriver: Parse Completed
>>>

DataFrameレコードのカウント

Sparkのcount()コマンドを使用して、このDataFrameのレコード数を表示します。
>>> df.count()
16/03/30 16:08:44 INFO MemoryStore: ensureFreeSpace(97136) called with curMem=274561, maxMem=556038881
16/03/30 16:08:44 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 94.9 KB, free 529.9 MB)
...
16/03/30 16:08:51 INFO DAGScheduler: Job 1 finished: count at NativeMethodAccessorImpl.java:-2, took 6.763105 s
16/03/30 16:08:51 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 3) in 266 ms on busgg2014.us.oracle.com (1/1)
16/03/30 16:08:51 INFO YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool 
9983
>>>

コマンドの結果に示されているように、このDataFrameには9983件のレコードあります。

DataFrameスキーマの出力

SparkのprintSchema()メソッドを使用して、スキーマをツリー形式で出力します。
>>> df.printSchema()
root
 |-- vin: string (nullable = true)
 |-- production_country: string (nullable = true)
 |-- production_region: string (nullable = true)
 |-- make: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- model_year: long (nullable = true)
 |-- claim_date: string (nullable = true)
 |-- dealer_geocode: string (nullable = true)
 |-- vehicle_dealer: string (nullable = true)
 |-- dealer_state: string (nullable = true)
 |-- dealer_city: string (nullable = true)
 |-- labor_description: string (nullable = true)
 |-- commodity: string (nullable = true)
 |-- complaint: string (nullable = true)
 |-- part_number: string (nullable = true)
 |-- sale_date: string (nullable = true)
 |-- supplier_country: string (nullable = true)
 |-- supplier: string (nullable = true)
 |-- supplier_state: string (nullable = true)
 |-- labor_amount: double (nullable = true)
 |-- part_amount: double (nullable = true)
 |-- claim_amount: double (nullable = true)
 |-- PRIMARY_KEY: string (nullable = false)

>>>

表内の行の出力

df.show()コマンドを使用して、最初の5行を出力します。
>>> df.show(5)
16/03/30 16:18:09 INFO MemoryStore: ensureFreeSpace(247720) called with curMem=698025, maxMem=556038881
...
+-----------------+------------------+-----------------+---------+--------------------+------+----------+-----------+----------------+--------------------+------------+-----------+--------------------+--------------------+--------------------+-------------------+-----------+----------------+--------------------+--------------+------------+-----------+------------+-----------+
|              vin|production_country|production_region|     make|        manufacturer| model|model_year| claim_date|  dealer_geocode|      vehicle_dealer|dealer_state|dealer_city|   labor_description|           commodity|           complaint|        part_number|  sale_date|supplier_country|            supplier|supplier_state|labor_amount|part_amount|claim_amount|PRIMARY_KEY|
+-----------------+------------------+-----------------+---------+--------------------+------+----------+-----------+----------------+--------------------+------------+-----------+--------------------+--------------------+--------------------+-------------------+-----------+----------------+--------------------+--------------+------------+-----------+------------+-----------+
|2U9SF69Q77E237441|           Germany|           Europe|CHEVROLET|GENERAL MOTORS CORP.|MALIBU|      2009|2011-10-300| 30.107 -81.7167|   COGGIN CHEVEROLET|          FL|Orange Park|Replace Filter an...|POWER TRAIN:AUTOM...|WE UNFORTUNATELY ...|   p6J1RLSXXKE-1186|2009-10-278|          Canada|Magna Internation...|       Ontario|    219.2096|   319.2618|    538.4714|      0-0-0|
|3B2GL69N870242937|            Canada|    North America|   SATURN|GENERAL MOTORS CORP.|   ION|      2009|2011-10-274|27.9475 -82.4588|     SATURN OF TAMPA|          FL|      Tampa|Replace Filter an...|POWER TRAIN:AUTOM...|2003 SATURN ION I...|p1468Z160GWVVA-1444|2009-09-253|          France|       Hutchinson SA|         Paris|    135.5176|   340.9534|     476.471|      0-0-1|
|3F2DQ69P670264048|            Canada|    North America|   TOYOTA|TOYOTA MOTOR CORP...|TACOMA|      2009|2011-09-248|34.5476 -82.6276|  TOYOTA OF ANDERSON|          SC|   Anderson|Replace ABS Contr...|SERVICE BRAKES, H...|DECEMBER 16, 2006...|   p6J1RLSXXKE-1186|2009-08-226|          France|            Faurecia|      Nanterre|     78.8486|   327.8384|     406.687|      0-0-2|
|3P2KN692570293433|            Canada|    North America|    VOLVO|VOLVO CARS OF N.A...|   S80|      2009|2011-12-335|            null|SAND BERG NORTH W...|          WA|   Lindwood|Replace Control M...|VEHICLE SPEED CON...|DT: 2000 VOLVO S8...| p2617B6326AID-1248|2009-11-305|           Italy|Magneti Marelli H...|        Milano|       83.57|   340.9534|    424.5234|      0-0-3|
|3X2VM69H770222629|            Canada|    North America| CADILLAC|GENERAL MOTORS CORP.|   CTS|      2009|2011-09-271|40.7702 -73.7108|NORTH BAY CADILLA...|          NY| Great Neck|     Replace Harness|            AIR BAGS|THE CONSUMER STOP...|   p6J1RLSXXKE-1186|2009-08-226|              US|Key Safety System...|      Michigan|    295.2644|   327.8384|    623.1028|      0-0-4|
+-----------------+------------------+-----------------+---------+--------------------+------+----------+-----------+----------------+--------------------+------------+-----------+--------------------+--------------------+--------------------+-------------------+-----------+----------------+--------------------+--------------+------------+-----------+------------+-----------+
only showing top 5 rows

>>>
レコードを操作できるいくつかの他のコマンドを次に示します。
  • df.first()は最初のレコードを取得します。
  • df.head(n=1)は、最初のnレコードを取得します。nはオプションであり、デフォルトは1です。
  • df.collect()はすべてのレコードを取得します。これは、特に大きいデータ・セットの場合、コストの高い操作です。