Scala UDFs in Apache Spark

One of the most extensible features of Apache Spark is the ability to add UDFs (User Defined Functions). UDFs can be added in each of the four languages that Spark officially supports (Java, Scala, Python and R)

As any data engineer or developer can tell you, dates are one of the most difficult (and annoying) data constructs to deal
with due primarily to the wide array of formatting options, to say nothing of of different calendars around the world!

Below is an example of a UDF in the Scala programming language that I wrote to handle an atypical date format in a file that needed to be loaded into a database.

Here is an example of the date construct string that the UDF operates on: Apr-16.
This is an example of a single, entire data row from the file:
CN0100100000000 | 01 | 001 | Autauga County, AL | Apr-16 | 25,111 | 23,754 | 1,357 | 5.4

The goal is to turn Apr-16 into YYYY-MM-DD format, or 2016-04-01, which is a standard database format to store a value as a date datatype.

In Spark v1.3 and above, functions that are used to register UDFs, either for use in the DataFrame DSL or SQL, have been moved into the udf object in SQLContext for both Java and Scala.
You can register a UDF so that is can be queried from Spark SQL on a table that has been registered as well. Behind the scenes the UDF behaves like a map.

From the Spark documentation:
In addition to the basic SQLContext, you can also create a HiveContext, which provides a superset of the functionality provided by the basic SQLContext.
Additional features include the ability to write queries using the more complete HiveQL parser, access to Hive UDFs, and the ability to read data from Hive tables.
To use a HiveContext, you do not need to have an existing Hive setup, and all of the data sources available to aSQLContext are still available.

Generally speaking, you should always use the Hive Context instead of the SQLContext as it gives you more functionality.

The code below gives an example of how I wrote a simple UDF example to parse and convert this date, and how it was incorporated into a larger block of code to read, transform and write a new file to be loaded into a database.
Note that many of these steps could be combined, but I broke them apart to make each step more atomic for clarity.


// UDF definition. Registers a UDF called StrDtFormat 
// so it can be used elsewhere in the program.
    val strDtFormat = HiveContext.udf.register("strDtFormat", (s: String) => {
      if (s.substring(0,3) == "Jan")
        "20"+s.substring(4,6) + s.substring(3,4) + "01-01"
      else if (s.substring(0,3) == "Feb")
        "20"+s.substring(4,6) + s.substring(3,4) + "02-01"
      else if (s.substring(0,3) == "Mar")
        "20"+s.substring(4,6) + s.substring(3,4) + "03-01"
      else if (s.substring(0,3) == "Apr")
        "20"+s.substring(4,6) + s.substring(3,4) + "04-01"
      else if (s.substring(0,3) == "May")
        "20"+s.substring(4,6) + s.substring(3,4) + "05-01"
      else if (s.substring(0,3) == "Jun")
        "20"+s.substring(4,6) + s.substring(3,4) + "06-01"
      else if (s.substring(0,3) == "Jul")
        "20"+s.substring(4,6) + s.substring(3,4) + "07-01"
      else if (s.substring(0,3) == "Aug")
        "20"+s.substring(4,6) + s.substring(3,4) + "08-01"
      else if (s.substring(0,3) == "Sep")
        "20"+s.substring(4,6) + s.substring(3,4) + "09-01"
      else if (s.substring(0,3) == "Oct")
        "20"+s.substring(4,6) + s.substring(3,4) + "10-01"
      else if (s.substring(0,3) == "Nov")
        "20"+s.substring(4,6) + s.substring(3,4) + "11-01"
      else if (s.substring(0,3) == "Dec")
        "20"+s.substring(4,6) + s.substring(3,4) + "12-01"
      else
        "Field Month-Year format does not match expected format"
    })

// Define the case class for the data file. 
// Dataframes can use a case class to map the fields and register a table.
case class CntyUnemp(
laus_code: String,
state_fips: String,
county_fips: String,
county_name: String,
monyear: String,
laborforce: Integer,
employed: Integer,
unemployed: Integer,
unemployment_rate: Float,
state_county_fips: String
)

// Locate and load the data file
    val s3TextFile = file from some S3 bucket.

// Remove a six-line header in the file to just leave the data
    val data = s3TextFile.mapPartitionsWithIndex { 
      (idx, iter) => if (idx == 0) iter.drop(6) else iter 
    }

// Filter out any incomplete lines
    val data2 = data.filter(line => line.length() > 25)

// split the fields by the "|" delimeter.
// Map to the case class CntyUnemp (defined above) 
// to assign datatypes to a dataframe
    val cntyunemp = data2.map(_.split("|")).map(p => CntyUnemp(
       p(0),
       p(1),
       p(2),
       p(3),
       p(4),
       p(5).toInt,
       p(6).toInt,
       p(7).toInt,
       p(8).toFloat, 
       p(1)+p(2) 
    )).toDF()

// Call the registered UDF on the dataframe column "monyear".
// Save the results into a new column called "timeframe"
    val cntyunemp2 = cntyunemp.withColumn("timeframe", strDtFormat(cntyunemp("monyear")))

// Change the dataframe to an RDD.
    val cntyunempRDD = cntyunemp2.rdd

// Save out the RDD to a text file to be loaded into a database table.
    cntyunempRDD.saveAsTextFile("/home/adminuser/datasets/CntyUnemp/output")

image courtesy of www.math10.ca

Pitt Fagan

Greetings! I'm passionate about data; specifically the big data and data science ecosystems! It's such an exciting time to be working in these spaces. I run the BigDataMadison meetup where I live.