Import Partitioned Google Analytics Data in Hive Using Parquet

14 Februar, 2017

This article is featured in the free magazine "Data Science in Production – Download here

I was recently working on importing Google Analytics data into an Amazon EMR cluster.

Google Analytics was offering files as Avro, but we wanted Parquet files partitioned by date (we literally have a field date in there). There are multiple reasons why you would choose one or another, but for us it came down to faster analytics thanks to the columnar format.

Using Spark for the ETL process makes this a piece of cake:

(spark.read.format('com.databricks.spark.avro').load('dataset.avro')
          .write.format('parquet')
          .partitionBy('date').saveAsTable('tablename'))

Or does it? The first issue is that if one of your columns has a nested schema exceeding 4000 characters, the Hive metastore will not accept it.1

If you look around, you’ll see this is a long standing issue open since October 2015. Who’s to blame here is apparently Oracle (it’s always Oracle!).

The good news is that this limit can be changed in the metastore! I’m assuming you’re using a Postgres instance as the metastore, but the syntax is similar all across the board!

Once you’re logged in type2

ALTER TABLE "COLUMNS_V2" ALTER COLUMN "TYPE_NAME"  TYPE VARCHAR(8000);
ALTER TABLE "TABLE_PARAMS" ALTER COLUMN "PARAMS_VALUES"  TYPE VARCHAR(8000);
ALTER TABLE "SERDE_PARAMS" ALTER COLUMN "PARAMS_VALUES"  TYPE VARCHAR(8000);
ALTER TABLE "SD_PARAMS" ALTER COLUMN "PARAMS_VALUES"  TYPE VARCHAR(8000);

At this point you might re-execute the Spark command above. But you’d be surprised by what Spark tells you

WARN CreateDataSourceTableUtils: Persisting partitioned data source relation tmp.tmp1 into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.

This is another long standing issue where the workaround is to first create the table, and then do one of the following:

"First create the table" is of course deceptively simple: you need to create a partitioned table which is basically equal to the initial one, save for the date field, that has to be set as the partition column.

Here’s the steps I’ve used to get this done:

(spark.read.format('com.databricks.spark.avro').load('dataset.avro')
          .write.format('parquet').saveAsTable('test'))
beeline -u {jdbc connection string} -e 'DESCRIBE test' > schema.sql
CREATE TABLE test_partition (
fullVisitorId  STRING,
visitorId      INT,
...
visitStartTime INT
--        date STRING,    note that this is commented out now
totals         STRUCT<...>,
...
)
PARTITION BY (date STRING)
STORED AS PARQUET
(df.write.format('parquet')
   .mode('append').partitionBy('date')
   .save('/user/hive/warehouse/database.db/test_partition'))
for dt in df.select('date').distinct().rdd.map(lambda row: row['date']).collect():
    spark.sql("ALTER TABLE test_partition ADD PARTITION(date=%s)" % dt)

Let me know what you think, especially if you disagree (I’m @gglanzani on Twitter if you want to reach out!).

We are hiring


  1. In our case it was the hits column. Just
    look at the amount of nested fields it has! 
  2. 8000 is arbitrary: make it enough for your use case! 

Subscribe to our newsletter

Stay up to date on the latest insights and best-practices by registering for the GoDataDriven newsletter.