Skip to main content
  1. Posts/

From Core to Spark

·6 mins
Rafael Fernandez
Author
Rafael Fernandez
Mathematics, programming, and life stuff
Table of Contents
Big Data with Zero Code - This article is part of a series.

Hey everyone! How is your day going? Today we are going to continue the series Big Data with Zero Code. In this post, we focus on creating the grammar and semantics in Scala. For this case, I will explain and try to define a draft of how the grammar should look, but the final implementation will be limited to just two aspects: the Input and Output expressions.

Grammar
#

Alright, let’s dive in. Since we are dealing with Spark SQL, we will define the main elements: input, transformation, and output, where:

  • input refers to the input elements that come from a given source.
  • transformation refers to the elements that are already built.
  • output refers to the final elements that are prepared to interact with the external world in some way.

Here, we have used the concept of element. However, for our purpose, instead of element, we will use the term asset.

  • An asset is the smallest entity, element, or item composed of multiple properties. The number of assets corresponds to the different sets of properties they can have. Since we are dealing with data, our assets are closely related to inputs, transformations, and outputs. Therefore:
Asset := `Asset` <AssetRef> <Source>
AssetRef := String

Source := <Input> | <Output> | <Transformation>

Again, following the idea of Spark SQL Input / Output, and the Spark SQL DataFrameReader and DataFrameWriter API, these guys must consider the following properties:

  • format: the one Spark’s datasource will use, as it: csv, parquet, json, jdbc, delta, etc.
  • options: the properties which the Spark’s datasource will use for the input / output behavior, as it: header, sep, user, pass, etc.
  • ref: the input / output reference where the source lives, as it: a path, an S3 reference, <schema>.<table_name> etc.

Of course there are more but we keep with them just for simplicity.

Source := <Input> | <Output> | <Transformation>
Input := `Input` <Format> <Options> <SourceRef>
Output := `Output` <AssetRef> <Format> <Options> <SourceRef>
Transformation := ??? // Skip definition
Format, SourceRef := String
Options := `Map` String String

Notice the AssetRef in the Output definition. Yes, keep in mind an output must be built by using a given already-built asset. So here, we could either keep the whole AST of the Asset (which would allow us to do optimization performances by rewriting rules / transformation rules) or just reference them by using the already-built asset’s references vault in some context.

Context := `Map` <AssetRef> <Asset>

Semantic
#

The semantic is known: evaluating an our grammar’s expression should be equivalent to obtaining the DataFrame which it represents.

So here we are (please it’s a pseudo evaluation, it’s not formal at all, be kind):

[Context] T ||Asset|| =>
[Context] T ||`Asset` <AssetRef> <Source>|| =>
[Context] T `Asset` ||<AssetRef>||  ||<Source>|| =>
[Context] T `Asset` assetRef  ||<Input> | <Output> | <Transformation>|| => 

case <Input>:
	[Context] T `Asset` assetRef  ||<Input>|| => 
	[Context] T ||`Input` <Format> <Options> <SourceRef>|| => 
	[Context] T `Input` ||<Format>|| ||<Options>|| ||<SourceRef>|| =>
	[Context] T (`Input` format options sourceRef) =>
	spark.read.format(format).options(options).load(sourceRef) # Input Spark SQL DataFrame

...

case <Output>:
	[Context] T `Asset` assetRef  ||<Output>|| => 
	[Context] T ||`Output` <AssetRef> <Format> <Options> <SourceRef>|| => 
	[Context] T `Output` ||<AssetRef>|| ||<Format>|| ||<Options>|| ||<SourceRef>|| =>
	`Output` ([Context] T ||<AssetRef>||) format options sourceRef =>
	([Context] T ||<AssetRef>||) =>
	DF<assetRef> # Built Spark SQL DataFrame from Asset with asset ref `assetRef`

Okay… Scala code please.

Let’s move to Scala
#

Grammar
#

type AssetRef  = String  
type SourceRef = String  
type Format    = String  
type Options   = Map[String, String]  
type Context[T]   = Map[AssetRef, T]
  
case class Asset(assetRef: AssetRef, source: Source)  
  
sealed trait Source  
  
case class Input(format: Format, options: Options, sourceRef: SourceRef)  
  extends Source  
  
case class Output(assetRef: AssetRef, format: Format, options: Options, sourceRef: SourceRef)  
    extends Source

Here we are using type aliases to simulate the newtypes, but in the real-world productive code, they should be (IMO).

In Scala 3, you can create a newtype to achieve type safety without runtime overhead. This can be done using opaque type aliases. These allow you to define a new type that is distinct at compile time but behaves like the underlying type at runtime, avoiding unnecessary boxing.

object NewTypes {
  opaque type AssetRef = String

  object AssetRef:
    def apply(value: String): AssetRef = value

  extension (assetRef: AssetRef)
    def value: String = assetRef
}

BTW, if you want to consider newtypes in Scala 2, you also can do it by using estatico/scala-newtype. However, for this academic purpose I think is not necessary at all.

Semantic
#

I describe above one semantic, but there is not just one semantic. In fact there are many interesting ones:

  • A code generation semantic: which generate Scala code with Spark SQL expressions (if you want to build repositories or at least generate a base ETL template through this DSL)
  • A DataFrame semantic: which evaluate the expression and just return the result DataFrame. This semantic allow you to debug all DataFrames that participate.
  • A execution semantic: which execute the ETL expression resulting in a extract-transformation-load execution.

In this post I will cover the last two. The code-generator semantic it’s for your homework (if you want).

The DataFrame Semantic
#

@tailrec  
def eval(context: Context[Asset], asset: Asset)(implicit  
    S: SparkSession  
): DataFrame =  
  asset.source match {  
    case Input(format, options, ref) =>  
      S.read.format(format).options(options).load(ref)  
    case Output(assetRef, _, _, _) =>  
      eval(context, context(assetRef))  
  }

// For every asset within the context
def eval(context: Context[Asset])(implicit
	spark: SparkSession
): Context[DataFrame] =  
	context.map { case (ref, asset) =>  
	  ref -> eval(context, asset)  
	}

The Execution Semantic
#

Here notice that we aim by using the DataFrame semantic described above

def exec(context: Context, asset: Asset)(implicit  
    spark: SparkSession  
): Unit =  
  asset.source match {  
    case AST.Output(_, format, options, ref) =>  
      eval(context, asset).write.format(format).options(options).save(ref)  
    case _ => ()  
  }

// For every asset within the context
def exec(context: Context)(implicit spark: SparkSession): Unit =  
  context.foreach {  
    case (ref, asset @ Asset(_, _: Output)) =>  
      ref -> exec(context, asset)  
    case _ => ()  
  }

Notice the execution semantic just work over Output, because we want to play over final results, rather than intermediate ones.

Let’s go all in
#

Alright! Let’s go all in! For this exercise we are going to assume there is a csv file in our project with the following information:

Date|Symbol|Adj Close|Close|High|Low|Open|Volume  
2024-11-05|ZTS|175.27|175.27|176.80|172.25|174.55|2453800.0
...

We would like to read the csv file located in somewhere, and write it as a parquet binary in somewhere else:

val input: Asset =  
  Asset("example", Input("csv", Map("sep" -> "|", "header" -> "true"), "data/csv/example.csv"))  
  
val output: Asset =  
  Asset("output", Output("example", "parquet", Map("partition" -> "1"), "data/parquet/example"))  
  
val context: Context =  
  Map(  
    "example" -> input,  
    "output"  -> output  
  )

// Debugging all assets' dataframe
eval(context).foreach { case (_, df) =>  
  df.show(20, truncate = false)  
}

// Launching the ETL
exec(context)

Conclusion
#

In this post, we defined the grammar and semantics for building ETL workflows in Scala using Spark. By introducing concepts like assets, inputs, transformations, and outputs, we laid the foundation for a simple and flexible DSL. The semantics demonstrated how to generate and execute Spark SQL pipelines using a structured and type-safe approach.

This method ensures a clean, maintainable, and scalable solution for managing Spark-based ETL workflows. In future posts, we can explore optimizations, additional semantics, or even automated code generation.

Stay tuned for the next step in the Big Data with Zero Code series!

Big Data with Zero Code - This article is part of a series.