Skip to main content
  1. Posts/

From Outside to Core

·5 mins
Rafael Fernandez
Author
Rafael Fernandez
Mathematics, programming, and life stuff
Table of Contents
Big Data with Zero Code - This article is part of a series.

Hey everyone! How’s it going? Today, we’re diving back into the Big Data with Zero Code series. In this post, we’ll focus on creating a serializer to map external YAML instructions to our core Scala model. Specifically, we’ll define the yaml model, and we will se how to use decode and encode instances for YAML instructions using Circe.

What is Circe?
#

Alright! First of all, for you guys who don’t know what is Circe and why we will use:

Circe is another JSON library for Scala.

So, why Circe? Circe leverages the Cats library and is built on a functional programming stack. It can be a bit tricky due to its use of implicits, which can sometimes feel tedious. But for our purpose, it’s straightforward—let me show you.

Wait… Another JSON library?
#

Where is the Yaml word in the another JSON library description? Good question! While Circe is our serialization core dependency, we’ll use circe/circe-yaml to handle YAML.

Together, they can parse YAML like this:

foo: Hello, World
bar:
    one: One Third
    two: 33.333333
baz:
    - Hello
    - World

to the following model:

case class Nested(one: String, two: BigDecimal)
case class Foo(foo: String, bar: Nested, baz: List[String])

// result
Foo("Hello, World", Nested("One Third", 33.333333), List("Hello", "World"))

And THAT is the foundation of our task!

Yaml Model
#

First of all, we have to think about how our YAML instructions should look. Even being Yaml a human-friendly syntax, we must try to define an easy, intuitive structure and if it is possible, with repetitive patterns (in some way). Let me explain:

For example, one could be a structure which follows the core model

etl:
	- assetRef: asset-1
	  source:
		  format: csv
		  options:
			- header: true
			- sep: '|'
		  sourceRef: /path/from/data.csv

	- assetRef: asset-2
	  source:
		  assetRef: asset-1
		  format: parquet
		  options:
			- header: true
			- sep: '|'
		  sourceRef: /path/to/parquet

But this raises questions. Which are inputs or outputs? Are the keywords clear and descriptive enough? We could add comments, but let’s aim for a better structure.

A Refined Structure
#

It is clear we have three parts: inputs, transformations and outputs. So, using them, we can provided a better structure:

input:
	- # some input' structure
transformation:
	- # some transformation' structure
output:
	- # some output' structure

Also, humans don’t usually use words like references, assets, sources, etc. We reference things by their names or relating and realizing them by their types or categories.

input:
  - name: table1
    format: csv
    path: path/from/data.csv
    options:
      header: true
      sep: '|'

transformation:
  - name: selectTable1
    select:
      from: table1
      columns:
        - col1
        - col2

output:
  - name: selectTable1
    format: parquet
    mode: overwrite
    path: path/to/select_table1

Here, words like input, transformation and output tell us what they are and what they do. Additionally, the select keyword in the transformation block specifies that it represents a select-type transformation asset. Finally, the name keyword serves to uniquely identify each asset, allowing us to reference them through other keywords like from.

Now, It’s intuitive and SQL-like!

Serialization Time!
#

Ingredients ready, serialization time! Let’s model our algebraic data types:

case class ETL(  
    input: NonEmptyList[Input],  
    transformation: Option[NonEmptyList[Transformation]],  
    output: NonEmptyList[Output]  
)

We use NonEmptyList data type from Cats to ensure there are always instructions, as an empty list would not make sense. On the other hand, transformation is defined as Option[NonEmptyList] because an ETL process might involve only input and output without any transformations.

// Input
case class Input(  
    name: String,  
    format: String,  
    path: String,  
    options: Option[Map[String, String]]  
)  
  
// Output  
case class Output(  
    name: String,  
    format: String,  
    mode: String,  
    path: String,  
    options: Option[Map[String, String]]  
)

// Transformation
sealed trait Transformation
// ...

Here, we’ll focus on Input and Output. Keep in mind, these aren’t the same ones we discussed in the previous post. While they could be renamed to something like InputYaml for clarity, we’ll stick with Input for simplicity.

Now, onto the second part: the serialization function. It’s well-known that working with Circe can sometimes be tricky. For now, we’ll rely on automatic serialization to keep things straightforward. If you need to implement custom serialization, be sure to check the Circe documentation for guidance.

import io.circe.Error  
import io.circe.generic.auto._  
  
val etlRaw: String =  
"""input:  
  |  - name: table1
  |    format: csv
  |    path: 'data/csv/example.csv'
  |    options:
  |      header: "true"
  |      sep: '|'
  |output:
  |  - name: table1
  |    format: parquet
  |    mode: overwrite
  |    path: 'data/parquet/example'"""
  .stripMargin
  
  def decode(value: String): Either[Error, ETL] =  
    io.circe.yaml.parser.parse(value).flatMap(json => json.as[ETL])   

Here we are! By leveraging the parser method from the circe-yaml dependency and importing Circe’s automatic encoding/decoding, we can successfully serialize our YAML into the corresponding Scala model.

decode(etlRaw)
/** result:
Right(
	ETL(
		NonEmptyList(
			Input(
				table1,
				csv,
				data/csv/example.csv,
				Some(
					Map(
						header -> true,
						sep -> |
						)
					)
				)
			),
		NonEmptyList(
			Output(
				table1,
				parquet,
				overwrite,
				data/parquet/example,
				None
				)
			)
		)
	)
*/

So, have we finished? not yet… We need to map in some way Yaml model to Core model.

Transformations or something like that
#

At this point, I haven’t settled on a definitive name for this mapping, so I’ve tentatively called them transformations. These transformations serve to convert YAML instructions into the Core model.

For instance, we need mappings like this:

def transf(item: yaml.Input): core.Asset =  
  core.Asset(item.name, core.Input(item.format, item.options.getOrElse(Map()), item.path))

def transf(item: yaml.Output): core.Asset =  
  core.Asset(  
    s"output_${item.name}",  
    core.Output(item.name, item.format, item.mode, item.options.getOrElse(Map()), item.path)  
  )

And there we have it!

Conclusion
#

In this post, we explored how to use Circe and its YAML extension to serialize YAML instructions into a structured Scala model. By refining the YAML structure and leveraging functional programming tools, we made our process intuitive and clear. The next step was to map these serialized instructions to our Core model, bridging the gap between external configuration and internal logic.

The next post we see the big picture of all we did. Stay tuned for more, and as always, feel free to share your thoughts or questions. Happy coding!

Big Data with Zero Code - This article is part of a series.