Hey everyone! How is your day going? Today we are going to continue the series Big Data with Zero Code. In this post, we focus on creating the grammar and semantics in Scala. For this case, I will explain and try to define a draft of how the grammar should look, but the final implementation will be limited to just two aspects: the Input and Output expressions.
Grammar #
Alright, let’s dive in. Since we are dealing with Spark SQL, we will define the main elements: input, transformation, and output, where:
- input refers to the input elements that come from a given source.
- transformation refers to the elements that are already built.
- output refers to the final elements that are prepared to interact with the external world in some way.
Here, we have used the concept of element. However, for our purpose, instead of element, we will use the term asset.
- An asset is the smallest entity, element, or item composed of multiple properties. The number of assets corresponds to the different sets of properties they can have. Since we are dealing with data, our assets are closely related to inputs, transformations, and outputs. Therefore:
Asset := `Asset` <AssetRef> <Source>
AssetRef := String
Source := <Input> | <Output> | <Transformation>
Again, following the idea of Spark SQL Input / Output, and the Spark SQL DataFrameReader
and DataFrameWriter
API, these guys must consider the following properties:
- format: the one Spark’s
datasource
will use, as it:csv
,parquet
,json
,jdbc
,delta
, etc. - options: the properties which the Spark’s
datasource
will use for the input / output behavior, as it:header
,sep
,user
,pass
, etc. - ref: the input / output reference where the source lives, as it: a path, an S3 reference,
<schema>.<table_name>
etc.
Of course there are more but we keep with them just for simplicity.
Source := <Input> | <Output> | <Transformation>
Input := `Input` <Format> <Options> <SourceRef>
Output := `Output` <AssetRef> <Format> <Options> <SourceRef>
Transformation := ??? // Skip definition
Format, SourceRef := String
Options := `Map` String String
Notice the AssetRef
in the Output
definition. Yes, keep in mind an output must be built by using a given already-built asset. So here, we could either keep the whole AST of the Asset
(which would allow us to do optimization performances by rewriting rules / transformation rules) or just reference them by using the already-built asset’s references vault in some context.
Context := `Map` <AssetRef> <Asset>
Semantic #
The semantic is known: evaluating an our grammar’s expression should be equivalent to obtaining the DataFrame
which it represents.
So here we are (please it’s a pseudo evaluation, it’s not formal at all, be kind):
[Context] T ||Asset|| =>
[Context] T ||`Asset` <AssetRef> <Source>|| =>
[Context] T `Asset` ||<AssetRef>|| ||<Source>|| =>
[Context] T `Asset` assetRef ||<Input> | <Output> | <Transformation>|| =>
case <Input>:
[Context] T `Asset` assetRef ||<Input>|| =>
[Context] T ||`Input` <Format> <Options> <SourceRef>|| =>
[Context] T `Input` ||<Format>|| ||<Options>|| ||<SourceRef>|| =>
[Context] T (`Input` format options sourceRef) =>
spark.read.format(format).options(options).load(sourceRef) # Input Spark SQL DataFrame
...
case <Output>:
[Context] T `Asset` assetRef ||<Output>|| =>
[Context] T ||`Output` <AssetRef> <Format> <Options> <SourceRef>|| =>
[Context] T `Output` ||<AssetRef>|| ||<Format>|| ||<Options>|| ||<SourceRef>|| =>
`Output` ([Context] T ||<AssetRef>||) format options sourceRef =>
([Context] T ||<AssetRef>||) =>
DF<assetRef> # Built Spark SQL DataFrame from Asset with asset ref `assetRef`
Okay… Scala code please.
Let’s move to Scala #
Grammar #
type AssetRef = String
type SourceRef = String
type Format = String
type Options = Map[String, String]
type Context[T] = Map[AssetRef, T]
case class Asset(assetRef: AssetRef, source: Source)
sealed trait Source
case class Input(format: Format, options: Options, sourceRef: SourceRef)
extends Source
case class Output(assetRef: AssetRef, format: Format, options: Options, sourceRef: SourceRef)
extends Source
Here we are using type aliases to simulate the newtypes, but in the real-world productive code, they should be (IMO).
In Scala 3, you can create a newtype to achieve type safety without runtime overhead. This can be done using opaque type aliases. These allow you to define a new type that is distinct at compile time but behaves like the underlying type at runtime, avoiding unnecessary boxing.
object NewTypes {
opaque type AssetRef = String
object AssetRef:
def apply(value: String): AssetRef = value
extension (assetRef: AssetRef)
def value: String = assetRef
}
BTW, if you want to consider newtypes in Scala 2, you also can do it by using estatico/scala-newtype. However, for this academic purpose I think is not necessary at all.
Semantic #
I describe above one semantic, but there is not just one semantic. In fact there are many interesting ones:
- A code generation semantic: which generate Scala code with Spark SQL expressions (if you want to build repositories or at least generate a base ETL template through this DSL)
- A
DataFrame
semantic: which evaluate the expression and just return the resultDataFrame
. This semantic allow you to debug allDataFrame
s that participate. - A execution semantic: which execute the ETL expression resulting in a extract-transformation-load execution.
In this post I will cover the last two. The code-generator semantic it’s for your homework (if you want).
The DataFrame
Semantic
#
@tailrec
def eval(context: Context[Asset], asset: Asset)(implicit
S: SparkSession
): DataFrame =
asset.source match {
case Input(format, options, ref) =>
S.read.format(format).options(options).load(ref)
case Output(assetRef, _, _, _) =>
eval(context, context(assetRef))
}
// For every asset within the context
def eval(context: Context[Asset])(implicit
spark: SparkSession
): Context[DataFrame] =
context.map { case (ref, asset) =>
ref -> eval(context, asset)
}
The Execution Semantic #
Here notice that we aim by using the DataFrame
semantic described above
def exec(context: Context, asset: Asset)(implicit
spark: SparkSession
): Unit =
asset.source match {
case AST.Output(_, format, options, ref) =>
eval(context, asset).write.format(format).options(options).save(ref)
case _ => ()
}
// For every asset within the context
def exec(context: Context)(implicit spark: SparkSession): Unit =
context.foreach {
case (ref, asset @ Asset(_, _: Output)) =>
ref -> exec(context, asset)
case _ => ()
}
Notice the execution semantic just work over Output, because we want to play over final results, rather than intermediate ones.
Let’s go all in #
Alright! Let’s go all in! For this exercise we are going to assume there is a csv file in our project with the following information:
Date|Symbol|Adj Close|Close|High|Low|Open|Volume
2024-11-05|ZTS|175.27|175.27|176.80|172.25|174.55|2453800.0
...
We would like to read the csv file located in somewhere, and write it as a parquet binary in somewhere else:
val input: Asset =
Asset("example", Input("csv", Map("sep" -> "|", "header" -> "true"), "data/csv/example.csv"))
val output: Asset =
Asset("output", Output("example", "parquet", Map("partition" -> "1"), "data/parquet/example"))
val context: Context =
Map(
"example" -> input,
"output" -> output
)
// Debugging all assets' dataframe
eval(context).foreach { case (_, df) =>
df.show(20, truncate = false)
}
// Launching the ETL
exec(context)
Conclusion #
In this post, we defined the grammar and semantics for building ETL workflows in Scala using Spark. By introducing concepts like assets, inputs, transformations, and outputs, we laid the foundation for a simple and flexible DSL. The semantics demonstrated how to generate and execute Spark SQL pipelines using a structured and type-safe approach.
This method ensures a clean, maintainable, and scalable solution for managing Spark-based ETL workflows. In future posts, we can explore optimizations, additional semantics, or even automated code generation.
Stay tuned for the next step in the Big Data with Zero Code series!