STL and Holt from R to SparkR

Shubham Raizada
Walmart Global Tech Blog
6 min readNov 10, 2020

--

Scaling machine learning algorithms in R with SparkR

To scale our machine learning algorithms currently running in R, we recently undertook the activity of rewriting the entire data preprocessing and machine learning pipeline in SparkR.

We are using time series models STL and Holt for forecasting. The data is preprocessed to cast date columns with date datatypes, imputation, standardization and finally sort w.r.t. relevant columns for preparing input to ML algorithms. Since I was unfamiliar with R, the major challenge was to understand the data manipulations being performed in the current setup and find corresponding transformations in SparkR. This article aims at sharing my learnings in the process.

Getting started

  1. Load SparkR library and initialize a spark session
library(SparkR, lib.loc = "/usr/local/spark/R/lib")
sparkEnvir <- list(spark.num.executors='5',spark.executor.cores='5')
# initializing Spark context
sc <- sparkR.init(sparkHome = "/usr/local/spark",
sparkEnvir = sparkEnvir)
# initializing SQL context
sqlContext <- sparkRSQL.init(sc)

2. Read a CSV file

We can read the CSV file with inferred schema = “true” (i.e. we allow Spark to infer the data type of each column in the DF), as it reduces the programmer’s effort to explicitly cast the columns to their datatypes. If the schema is not inferred, then each column is read as a String data type. We can also provide a custom schema.

# sales_training_file : location of the file sales_df <- read.df(sales_training_file, "csv", header = "true", inferSchema = "true", na.strings = "NA", integer64 = "numeric")# number of rows in the dataframe
nrow
(sales_df)
#list the columns of a df
columns(sales_df)
# show the first 5 rows
head(sales_df, num = 5)
# schema of the dataframe
schema(sales_df)
# visualisation of starting data in the spark dataframe
str(sales_df)
# 'SparkDataFrame': 4 variables:
# $ _c0 : int 1 1 4 7 5 6
# $ audit_date: POSIXct 2017-02-07 2017-03-07 2017-04-07 2017-05-07 2017-06-07 2017-07-07
# $ sales : int 17 20 21 32 20 72
# $ price : num 15.02 9.04 15.10 12.61 13.17 21.11
# rename a column
sales_df <- withColumnRenamed(sales_df, "_c0", "product_id")
# add a new column "count" with 0 as default value
sales_df_with_count <- withColumn(sales_df, "count", 0)
# create custom schema, and pass to load callcustomSchema <- structType(
structField("product_id", type = "integer"),
structField("audit_date", type = "string"),
structField("sales", type = "integer"),
structField("price", type = "double"))
sales_df_custom_schema <- read.df(price_training_file, "csv", header = "true", customSchema, na.strings = "NA", integer64 = "numeric")

3. Data manipulations

data.table and data.frame in R give us rich functionalities for selections, casting columns with datatypes, filtering, filling null values, ordering, aggregations, and even joins across data frames. With SparkR we can leverage spark-sql to perform all these manipulations.

# cast column audit_date to data type string
sales_df$audit_date <- cast(sales_df$audit_date, dataType = "string")
# create a view on price_df
createOrReplaceTempView
(sales_df, "sales_table")
sales_select <- sql("select product_id, audit_date, price from sales_table where price >= 15 order by audit_date")sales_grouped <- sql("select sum(sales),product_id from sales_table group by product_id")

In addition to this, we can also use the spark data frame methods for the above transformations.

# fill all null entries of audit_date with "2016-02-07"
sales_df_cleaned <- fillna(sales_df, list("audit_date" = "2016-02-07"))
# update price to 0, if price is less than 0
sales_df_cleaned$price <- ifelse(sales_df_cleaned$price < 0, 0, sales_df_cleaned$price)
# filter all rows with price greater than 10
sales_df_filtered <- filter(sales_df, sales_df$price > 10)
# group by audit_date and sum
sales_df_grouped <- groupBy(sales_df, sales_df$audit_date)

sales_df_agg <- agg(sales_df_grouped, tot_sales=sum(sales_df$sales))

4. SparkR dataframe to R dataframe and vice versa

# SparkR data frame to R
sales_R_df <- collect(sales_df)
# R to sparkR dataframe
sales_spark_df <- createDataFrame(sales_R_df)

Note: We should be careful with collect() as the entire dataset is collected at the driver. In case of huge data, if the Spark driver does not have enough memory we might face OOM.

Using R packages with SparkR

In addition to the basic manipulations on the data frames, R has a rich library of packages for statistical analysis of the data. These functions expect an R data.frame or data.table and do not have support for Spark data frames.

SparkR User-Defined Function (UDF) API opens up opportunities for big data workloads running on Apache Spark to embrace R’s rich package ecosystem.

SparkR UDF API transfers data between Spark JVM and R process back and forth. Inside the UDF function, we can work with R data.frames with access to the entire R ecosystem.

SparkR offers four APIs that run a user-defined function in R to a SparkDataFrame

  • dapply()
  • dapplyCollect()
  • gapply()
  • gapplyCollect()

The following diagram illustrates the serialization and deserialization performed during the execution of the UDF. The data gets serialized twice and deserialized twice in total.

https://databricks.com/blog/2018/08/15/100x-faster-bridge-between-spark-and-r-with-user-defined-functions-on-databricks.html

gapply()

The function is to be applied to each group of the SparkDataFrame and should have only two parameters: grouping key and R data.frame corresponding to that key. The groups are chosen from SparkDataFrames column(s). The output of function should be a data.frame. Schema specifies the row format of the resulting SparkDataFrame. It must represent R function’s output schema on the basis of Spark data types.

gapplyCollect is a shortcut if we want to call collect() on the result. But, the schema is not required to be passed. Note that gapplyCollect can fail if the output of UDF run on all the partition cannot be pulled to the driver and fit in driver memory.

R code

# library(energy)
sales_df[, p_value := poisson.mtest(sales,
R = 100)$p.value, by = .(product_id)]

Equivalent SparkR code

sales_result_schema <- structType(
structField("product_id", "integer"),
structField("p_value", "double"))

poisson_sales <- gapply(sales_df, c("product_id"), function(key, x) {
library(energy)
y <- data.frame(key, poisson.mtest(x$sales, R = 100)$p.value)
}, sales_result_schema)

dapply()

Apply a function to each partition of a SparkDataFrame. The function to be applied to each partition of the SparkDataFrame and should have only one parameter, to which a data.frame corresponds to each partition will be passed. The output of the function should be a data.frame. Schema specifies the row format of the resulting SparkDataFrame. It must match to data types of the returned value.

Likedapply, we can use dapplyCollect to apply a function to each partition of a SparkDataFrame and collect the result back. The output of the function should be a data.frame. But, Schema is not required to be passed.dapplyCollect can fail if the output of UDF run on all the partition cannot be pulled to the driver and fit in driver memory.

Running STL and Holt with SparkR

Transforming STL to SparkR

R code to execute a custom function stl_forecast() which internally uses the function stlf() from library(forecast). The argument to stl_forecast() is R data.table.

sales_R_df %>%
group_by(product_id) %>%
do(stl_forecast(data.table(.))) %>%
data.table(.) -> dt_stl

For SparkR, we first need to create a schema that will be mapped to the resulting data frame.

df_stl_schema <- structType(
structField("product_id", "integer"),
structField("audit_date", "date"),
structField("stl_forecast", "double"),
structField("stl_forecast_std", "double")
)

Use gapply to execute stl_forecast() on a SparkR data frame.

df_stl <- gapply(sales_df, c("product_id"), function(key, x) {
library(data.table)
library(lubridate)
library(dplyr)
library(forecast)
library(TTR)
# gapply passes R data frame, change to data table and pass as arg to stl_forecast() sales1 <- data.table(x)
y <- data.frame(key,stl_forecast(sales1))
}, df_stl_schema)

Transforming Holt to SparkR

Similarly, we have a custom method holt_forecast() which expects an R data table.

sales_R_df %>%
group_by(product_id) %>%
do(holt_forecast(data.table(.))) %>%
data.table(.) -> dt_holt

Implementation in SparkR

dt_holt_schema <- structType(
structField("product_id", "integer"),
structField("audit_date", "date"),
structField("holt_unit_forecast", "double"),
structField("holt_unit_forecast_std", "double")
)

dt_holt <- gapply(sales_df, c("product_id"), function(key, x) {
library(data.table)
library(lubridate)
library(dplyr)
library(forecast)
sales <- data.table(x)
y <- data.frame(key,holt_forecast(sales))
}, dt_holt_schema)

Observations

We were expecting some differences in precision, which can occur due to multiple data transfer across Spark JVM and R environment. But during our validations on the generated dataset, we observed that the forecast using SparkR UDFs exactly matches the results generated earlier in R.

--

--