Spotify's Scio Library and Google Cloud DataFlow

I recently attended a conference called DataEngConf in New York City and one of my favorite talks was from Spotify. They spoke about their data engineering workflow using a library that they recently opensourced called Scio. Scio is basically a Scala wrapper around Google Cloud DataFlow and Apache Beam, which is a data pipeline tool that is part of the Google Cloud suite of cloud-based tools. Spotify made news in the last year as a big customer to defect from AWS to Google Cloud. I have had good luck with another Spotify open source library called Luigi, so I wanted to give Scio a whirl.

The timing was optimal for this as a friend had a Google Cloud 60 day free trial expiring in the last week so I wanted to take advantage of this opportunity. I have used certain products in AWS but never evaluated Google Cloud so let's kill two birds with one stone.

After cloning the Scio Github repo locally and getting my Google Cloud Developer account set up, I set up the credentials to access the project, called hip-apricot-143923. I logged into the Storage product and created a bucket and loaded a 355MB text file containing weather data.

Being a data engineer I had heard of BigQuery, which is a data warehouse, so I created a dataset called sitesweather, created a table called sites, connected it to the file in the bucket, and manually defined the schema to map the fields. One of the pieces of advice mentioned in the Scio Wiki is to leverage BigQuery whenever you can, due to the optimizations in the product. You can manually type in query to produce datasets in the BigQuery. An example of the BigQuery interface is shown below, with results of the Scio application run.

On my laptop I got the Scio jar file and started up the REPL using this command:

pitt@huginn:~/workspace/scio$ java -jar -Dbigquery.project=hip-apricot-143923 scio-repl-0.2.2.jar  

I wanted to make use of BigQuery, so in the REPL I ran a query to find the fifteen lowest temperatures in the dataset.

scio> val lowestTemps = sc.bigQuerySelect("select LocationID, min(Temperature) as minTemp from sitesweather.sites group by LocationID order by minTemp limit 15")  
[main] INFO com.google.cloud.dataflow.sdk.options.GcpOptions$DefaultProjectFactory - Inferred default GCP project 'hip-apricot-143923' from gcloud. If this is the incorrect project, please cancel this Pipeline and specify the command-line argument --project.
[main] INFO com.spotify.scio.bigquery.BigQueryClient - Cache miss for query: `select LocationID, min(Temperature) as minTemp from sitesweather.sites group by LocationID order by minTemp limit 15`
[main] INFO com.spotify.scio.bigquery.BigQueryClient - New destination table: hip-apricot-143923:scio_bigquery_staging_us.scio_query_20161114040859_435411381
lowestTemps: com.spotify.scio.values.SCollection[com.spotify.scio.bigquery.TableRow] = com.spotify.scio.values.SCollectionImpl@6aaf1ce8

scio> val result = lowestTemps.take(5).materialize  
result: scala.concurrent.Future[com.spotify.scio.io.Tap[com.spotify.scio.bigquery.TableRow]] = List()

scio> sc.close()  
[main] INFO com.spotify.scio.bigquery.BigQueryClient - Creating staging dataset hip-apricot-143923:scio_bigquery_staging_us
[main] INFO com.spotify.scio.bigquery.BigQueryClient - Executing SQL query: `select LocationID, min(Temperature) as minTemp from sitesweather.sites group by LocationID order by minTemp limit 15`
[main] INFO com.spotify.scio.bigquery.BigQueryClient - Query: 0 out of 1 completed
[main] INFO com.spotify.scio.bigquery.BigQueryClient - Query completed: jobId: hip-apricot-143923-f779db22-f892-4f58-829f-5bf6276692fa
[main] INFO com.spotify.scio.bigquery.BigQueryClient - Query: `select LocationID, min(Temperature) as minTemp from sitesweather.sites group by LocationID order by minTemp limit 15`
[main] INFO com.spotify.scio.bigquery.BigQueryClient - Elapsed: 2.010s, pending: 0.485s, execution: 1.525s
[main] INFO com.spotify.scio.bigquery.BigQueryClient - Total bytes processed: 51 MB, cache hit: false
[main] INFO com.spotify.scio.bigquery.BigQueryClient - Query: 1 out of 1 completed
[main] INFO com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner - Executing pipeline using the DirectPipelineRunner.
[main] INFO com.google.cloud.dataflow.sdk.io.Write - Initializing write operation com.google.cloud.dataflow.sdk.io.AvroIO$AvroSink$AvroWriteOperation@2719139f
[main] INFO com.google.cloud.dataflow.sdk.io.Write - Opening writer for write operation com.google.cloud.dataflow.sdk.io.AvroIO$AvroSink$AvroWriteOperation@2719139f
[main] INFO com.google.cloud.dataflow.sdk.io.Write - Finalizing write operation com.google.cloud.dataflow.sdk.io.AvroIO$AvroSink$AvroWriteOperation@2719139f
[main] INFO com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner - Pipeline execution complete.
res1: com.spotify.scio.ScioResult = com.spotify.scio.ScioResult@59f56892

scio> result.waitForResult().value.foreach(println)  
{LocationID=13153, minTemp=-5.6}
{LocationID=14796, minTemp=-5.6}
{LocationID=19669, minTemp=-5.5}
{LocationID=19667, minTemp=-5.5}
{LocationID=12593, minTemp=-5.6}

This uploads the Scio jars files and associated code to a bucket specified in the setup, spins up a compute engine instance, runs the specified query against the BigQuery table, and returns the resultset to the REPL to be modified (or in this case, printed out). In the screenshot of the BigQuery interface below you can see the results of the various Scio runs which create temporary tables as part of the query/compute process. The schema of the selected table is LocationID and MinTemp.

I wish I had more time to explore Scio but alas my friends' Google Cloud trial expired before I had a chance to get too in depth.

Thanks to David Webber for use of his G-Cloud account!

Pitt Fagan

Greetings! I'm passionate about data; specifically the big data and data science ecosystems! It's such an exciting time to be working in these spaces. I run the BigDataMadison meetup where I live.