Working with Hive data sources

With BDD Shell, you can access the data in the Hive tables that are the data set sources.

All BDD data sets are sourced from Hive tables.

Get BddDataSource from BddDataset

Get the data source for the data set and print it:
>>> # get all data sets
>>> dss = bc.datasets()
>>> # get the WarrantyClaims data set
>>> ds = dss.dataset('default_edp_e35f9cbe-96c7-4183-8485-71459b8bd620')
>>> # get the data source for WarrantyClaims
>>> bds = ds.source()
>>> # print out the source Hive table for the data set
>>> bds
Hive    default.warrantyclaims

The output shows that the data set's source is a Hive table named "warrantyclaims" and is stored in the Hive "default" database.

Converting to a Spark DataFrame

You can convert the BddDataSource to a Spark DataFrame:
>>> df = bds.to_spark()
16/03/24 16:21:22 INFO ParseDriver: Parsing command: SELECT * FROM default.warrantyclaims
16/03/24 16:21:22 INFO ParseDriver: Parse Completed
16/03/24 16:21:23 INFO AvroSerDe: Avro schema is {"type":"record","name":"schema",
"namespace":"com.oracle.eid.appwizard","doc":"schema for upload file",
"fields":[{"name":"VIN","type":["null","string"],"doc":"VIN","default":null},
...
{"name":"Claim_Amount","type":["null","string"],"doc":"Claim_Amount","default":null}]}
>>> # print out the DataFrame's schema
>>> df.printSchema()
root
 |-- vin: string (nullable = true)
 |-- production_country: string (nullable = true)
 |-- production_region: string (nullable = true)
 |-- make: string (nullable = true)
 |-- manufacturer: string (nullable = true)
 |-- model: string (nullable = true)
 |-- model_year: string (nullable = true)
 |-- claim_date: string (nullable = true)
 |-- dealer_geocode: string (nullable = true)
 |-- vehicle_dealer: string (nullable = true)
 |-- dealer_state: string (nullable = true)
 |-- dealer_city: string (nullable = true)
 |-- labor_description: string (nullable = true)
 |-- commodity: string (nullable = true)
 |-- complaint: string (nullable = true)
 |-- part_number: string (nullable = true)
 |-- sale_date: string (nullable = true)
 |-- supplier_country: string (nullable = true)
 |-- supplier: string (nullable = true)
 |-- supplier_state: string (nullable = true)
 |-- labor_amount: string (nullable = true)
 |-- part_amount: string (nullable = true)
 |-- claim_amount: string (nullable = true)

Saving the DataFrame as a new Hive table

We will now create a new Hive table that contains only vehicle dealer information. The four dealer-related columns (as shown in the schema above) are:
  • vehicle_dealer
  • dealer_state
  • dealer_city
  • dealer_geocode

The new Hive table will be named "dealers_info" and will be stored in the Hive "default" database. Before creating the table, make sure that a table of that name does not already exist in the Hive database.

You select the four columns from the DataFrame and then save it as a new Hive table:
>>> # select the columns
>>> df2 = df.select('vehicle_dealer','dealer_state','dealer_city','dealer_geocode')
>>> # write the table
>>> df2.write.saveAsTable("default.dealers_info")
Note that if a table of that name already exits, you should see the following exception messages:
>>> df2.write.saveAsTable("default.dealers_info")
Traceback (most recent call last):
  File "<console>", line 1, in <module>
  File "/localdisk/hadoop/spark-1.5.0-bin-hadoop2.6/python/pyspark/sql/readwriter.py", line 370, in saveAsTable
    self._jwrite.saveAsTable(name)
  File "/scratch/localdisk/Oracle/Middleware/BDD-1.2.0.31.801/bdd-shell/py4j/java_gateway.py", line 537, in __call__
    self.target_id, self.name)
  File "/localdisk/hadoop/spark-1.5.0-bin-hadoop2.6/python/pyspark/sql/utils.py", line 40, in deco
    raise AnalysisException(s.split(': ', 1)[1])
AnalysisException: path hdfs://bus014.example.com:8020/user/hive/warehouse/dealers_info already exists.;
Assuming that the write.saveAsTable operation did not return an error, you can verify the operation by first querying the new table:
>>> # query the new table
>>> df3 = sqlContext.sql("select * from default.dealers_info")
16/04/08 13:01:21 INFO ParseDriver: Parsing command: select * from default.dealers_info
16/04/08 13:01:21 INFO ParseDriver: Parse Completed
16/04/08 13:01:21 INFO ParquetRelation: Listing hdfs://bus014.example.com:8020/user/hive/warehouse/dealers_info on driver
...
>>>
Then print the new table's schema:
>>> df3.printSchema()
root
 |-- vehicle_dealer: string (nullable = true)
 |-- dealer_state: string (nullable = true)
 |-- dealer_city: string (nullable = true)
 |-- dealer_geocode: string (nullable = true)

As the results show, the table does have the four expected columns.

Finally, print the first three rows from the table:
>>> df3.show(3)
+--------------------+------------+-----------+--------------------+
|      vehicle_dealer|dealer_state|dealer_city|      dealer_geocode|
+--------------------+------------+-----------+--------------------+
|NORTH GATE LINCOL...|          FL|      Tampa|27.947500 -82.458800|
|               KERRY|          OH| Cincinnati|39.161600 -84.456900|
|        MANKATO FORD|          MN|    Mankato|44.115600 -93.998400|
+--------------------+------------+-----------+--------------------+
only showing top 3 rows

As the results shows, each row has assignments from the four columns.

Creating a new BDD data set from BDD Shell

From within BDD Shell, you can call the Data Processing CLI via Python code. Running the DP CLI will create a new BDD data set from the Hive table you just created.

The DP CLI is called via a Python os.system() call. The syntax is:
os.system("/localdisk/Oracle/Middleware/BDD/dataprocessing/edp_cli/data_processing_CLI -d dbName -t tableName")
where:
  • -t (or --table) specifies the name of the Hive table to process.
  • -d (or -database) specifies the Hive database where the table is stored.
For example:
>>> os.system("/localdisk/Oracle/Middleware/BDD/dataprocessing/edp_cli/data_processing_CLI -d default -t dealers_info")

For information on the Data Processing CLI, see the Data Processing Guide.