Blog

Import Partitioned Google Analytics Data in Hive Using Parquet

14 Feb, 2017
Xebia Background Header Wave

This article is featured in the free magazine "Data Science in Production 

I was recently working on importing Google Analytics data into an Amazon EMR cluster.

Google Analytics was offering files as Avro, but we wanted Parquet files partitioned by date (we literally have a field date in there). There are multiple reasons why you would choose one or another, but for us it came down to faster analytics thanks to the columnar format.

Using Spark for the ETL process makes this a piece of cake:

(spark.read.format('com.databricks.spark.avro').load('dataset.avro')
          .write.format('parquet')
          .partitionBy('date').saveAsTable('tablename'))

Or does it? The first issue is that if one of your columns has a nested schema exceeding 4000 characters, the Hive metastore will not accept it.1

If you look around, you’ll see this is a long standing issue open since October 2015. Who’s to blame here is apparently Oracle (it’s always Oracle!).

The good news is that this limit can be changed in the metastore! I’m assuming you’re using a Postgres instance as the metastore, but the syntax is similar all across the board!

Once you’re logged in type2

ALTER TABLE "COLUMNS_V2" ALTER COLUMN "TYPE_NAME"  TYPE VARCHAR(8000);
ALTER TABLE "TABLE_PARAMS" ALTER COLUMN "PARAMS_VALUES"  TYPE VARCHAR(8000);
ALTER TABLE "SERDE_PARAMS" ALTER COLUMN "PARAMS_VALUES"  TYPE VARCHAR(8000);
ALTER TABLE "SD_PARAMS" ALTER COLUMN "PARAMS_VALUES"  TYPE VARCHAR(8000);

At this point you might re-execute the Spark command above. But you’d be surprised by what Spark tells you

WARN CreateDataSourceTableUtils: Persisting partitioned data source relation <code>tmp.tmp1 into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.

This is another long standing issue where the workaround is to first create the table, and then do one of the following:

  • Insert into it for a particular partition (this can be accomplished with INSERT INTO test_partition PARTITION(date=2013) SELECT * FROM test) or:
  • Write directly to disk and then create the partition in Hive manually (for example: ALTER TABLE test_partition ADD PARTITION(date=2013));

"First create the table" is of course deceptively simple: you need to create a partitioned table which is basically equal to the initial one, save for the date field, that has to be set as the partition column.

Here’s the steps I’ve used to get this done:

  • Use Spark to read the Avro file and write it unpartitioned somewhere
(spark.read.format('com.databricks.spark.avro').load('dataset.avro')
          .write.format('parquet').saveAsTable('test'))
  • Use beeline to save the created schema:
beeline -u {jdbc connection string} -e 'DESCRIBE test' > schema.sql
  • Edit the schema.sql file by hand, remove date from the columns and add it as a partition
CREATE TABLE test_partition (
fullVisitorId  STRING,
visitorId      INT,
...
visitStartTime INT
--        date STRING,    note that this is commented out now
totals         STRUCT<...>,
...
)
PARTITION BY (<code>date STRING) STORED AS PARQUET
  • Now you should execute this query in beeline. However, as I could not restart the metastore, and the metastore was checking on an ORM level for fields not longer than 4000 characters, I could not do it. After a good hour of searching, I thought that I could just execute the query using spark.sql("""YOUR QUERY HERE"""). I totally forgot that Spark could bypass the ORM.
  • Now, normally, you could just directly write using .partitionBy('date').mode('append').saveAsTable('test_partition). However you cannot use partitionBy with saveAsTable if the table already exists. And if you remove the partitionBy, Spark assumes that field4 (the one that was coming after date in the example above) was supposed to be partition column (this is of course not correct);
  • At this point what’s left is to use:
(df.write.format('parquet')
   .mode('append').partitionBy('date')
   .save('/user/hive/warehouse/database.db/test_partition'))
  • Since we’ve manually written the files, we still need to tell Hive that new partitions are there. Doing that programmatically in Spark is pretty simple (ugly interpolation, sorry!):
for dt in df.select('date').distinct().rdd.map(lambda row: row['date']).collect():
    spark.sql("ALTER TABLE test_partition ADD PARTITION(<code>date=%s)" % dt)
  • Done! You can now query the data. Note that only the last two steps are needed when new files come in. They can automated easily enough with the workflow manager of your choice!

Let me know what you think, especially if you disagree (I’m @gglanzani on Twitter if you want to reach out!).

We are hiring


  1. In our case it was the hits column. Just
    look at the amount of nested fields it has! 
  2. 8000 is arbitrary: make it enough for your use case! 
Questions?

Get in touch with us to learn more about the subject and related solutions

Explore related posts