White papers Webinars

Spark is an open-source distributed computing framework that is more efficient than Hadoop, supports three main languages (Scala, Java and Python). It has rapidly carved out a significant niche in Big Data projects thanks to its ability to process high volumes of data in batch and streaming mode. Its 2.0 version introduced us to a new streaming approach known as Structured Streaming. What is it all about? Well, we invite you to read this series of 3 articles to find out! This 2nd article focuses on data transformation, quality control and unit testing.

Spark-Structured-Streaming-Data-Transformation-Unit-Testing

Data transformation

In our previous article, “Spark Structured Streaming: from data management to processing maintenance“, we described our processing cycle. Data transformation plays a major role in this cycle. We have already talked about aggregation. Let us now move on to the normalize function.

val input = readFromKafka("topicTest", "kafka:9092")
val outputHdfs = writeToHdfs(input, "testHdfs", "ckpHdfs")
val data = normalize(input)
val agregat = aggData(data)
val outputDb   = writeToPostgres(agregat, "testTable", "ckpTable")
spark.streams.awaitAnyTermination(TIMEOUT)

In our example, Kafka sends us the data in a unique string of characters.
The normalize function thus takes a Dataset[String] as an input and creates a DataFrame containing columns that will be used to compute the aggregate. The aim of the normalize function is therefore to parse data and split strings of characters to store them in more appropriate columns for analysis.

Data parsing

How to parse a message that looks like this?

1 696027349027032572630986000005802382 ocdn 3 10.10.26.96 10.10.3.6 live-tv.google.ci live-tv.google.ci 
GET /ss/nt1hd/mss_low.isml/QualityLevels(800000)/Fragments(video=456815276572111) 
HTTP/1.1 bytes=0-1023999 206 600 217 217 301868 302259 2020-07-03T13:47:08.000Z 0.000 0.000 0.002 0.000 
video%2fmp4 - 1132 o - 0x00 http "- -" "- - -" yak01cache1.yak01.mea.cdn.google.com

It could be by splitting the text using a “space” separator, except for the strings that are within double quotes.

The first approach involves using dataframes’ split function. But this is an overly simplistic approach that does not give good results. This is partly due to the fact that the “space” separator is located inside a substring within double quotes that we do not want to split.

The second logical approach involves using a dataset[String] and Scala’s split function to split our string of characters. But the native split function also has its limitations and the result obtained is not really satisfactory.

A dedicated parsing solution

To overcome these limitations, we have created a function that complies with our expectations in Scala.

@tailrec
    final def splitCsv(sep: Char, in: String = s, work: String = "", out: Array[String] = Array(), intoDoubleQuote: Boolean = false): Array[String] = {
      if(in.length() == 0) out ++ Array(work)
      else {
        in.charAt(0) match {
          case '"' => if(intoDoubleQuote) splitCsv(sep, in.drop(1), work, out, false) else splitCsv(sep, in.drop(1), work, out, true)
          case `sep` => if(intoDoubleQuote) splitCsv(sep, in.drop(1), work + in.take(1), out, intoDoubleQuote) else splitCsv(sep, in.drop(1), "", out ++ Array(work), intoDoubleQuote)
          case _ => splitCsv(sep, in.drop(1), work + in.take(1), out, intoDoubleQuote)
        }
      }
    }

The function is tailrec type in such manner that it optimises performance when handling huge volumes of data, i.e. fully recursive to avoid any memory problems.

Data quality is a main topic in all data projects, but it takes a particular dimension when streaming is involved.

Stéphane Walter

I will not elaborate on code details as this is not what this post is about. Just remember that with Spark, you can create your own functions to meet specific project constraints in the best possible way.

Data control

Data quality is a main topic in all data projects, but it takes a particular dimension when streaming is involved.

Of course, we should immediately think about integrating rules that will ensure the exclusion of all non-compliant messages in order to prevent processing crashes.

  • if the message does not contain the correct number of columns
  • if a numerical field contains non-numerical data
  • if a field containing a number cannot be converted into timestamp

In these cases, the data is discarded and not taken into account in indicator calculation.

Now, let’s imagine that the data transmitter’s application is upgraded and the format of transmitted data has been modified as a result.
This will shift the columns and lead to abnormal rejection of the data.

You will tell me that this never happens in real life. And that the person responsible for the application would give you a 6-month heads-up before the format change…
The answer is simple: the person responsible for the source application does not necessarily belong to your organisation and does not really care about your concerns.

You will add that you will rapidly detect the problem anyway as there will be no more data coming in.
I can answer that the new version’s deployment to data transmitting machines will probably be gradual. Which will without a doubt complicate problem detection.

How to ensure that no data is lost?

and more precisely: how to ensure that no data is lost in production?

Integrating control at the heart of the code

We tried a few options, including metrics calculation and sending results to the Postgres base.
But this adversely affected performance.

We found the best solution  which consists in fact in integrating the control in the very heart of the code.
To do so, you simply need to add a control field directly in the data structure.

The value of the “control” column (during the control phase) is set to 1 when all goes well and 0 otherwise.

val control = if ((bytes == -1) || (datum == -1) || (t.length != nbCol(version))) 0 else 1

Detailed data continues to be stored on HDFS.
To apply quality control to data in production, simply screen the data on HDFS by filtering it with the control field set to 0.

The key strength of this approach is that setting up control did not make processing less efficient.

Without testing, the code is nothing

How to unit test streaming programmes?

Traditional tests (such as: reading a test file, processing it and comparing it to results) are not enough to test streaming code.

With this approach, you cannot test the watermark for example, or the processing of upserts in the database.

We must realistically simulate the streaming operation, generating data at different points in time.

To do so, I was greatly inspired by Bartosz Koniecznys blog.

Generating data at different points in time

new Thread(new Runnable() {
          override def run(): Unit = {

            inputStream.addData(
              ("1","node1","hostname1","s1", new Timestamp(30000L),100, 60, 10, 15, 1),
              ("2","node1","hostname1","s1", new Timestamp(40000L),80, 40, 20, 10, 1),
              ("3","node2","hostname1","s2", new Timestamp(70000L),101, 60, 10, 15, 1),
              ("4","node1","hostname1","s2", new Timestamp(100000L),100, 65, 10, 15, 1)),

            while (!query.isActive) {}
            // to be efficient, sleep must be > trigger processing (60s when window("datum","60 seconds","60 seconds")
            Thread.sleep(20000)

            inputStream.addData(
              ("5","node1","hostname1","s1", new Timestamp(58500L),100, 60, 10, 15, 1),    // ko: time < limit time
              ("6","node2","hostname1","s1", new Timestamp(59500L),100, 60, 10, 15, 1),    // ko: time < limit time
              ("7","node1","hostname1","s1", new Timestamp(95500L),100, 120, 40, 15, 1),    // ok: time > limit time
              ("4","node1","hostname1","s2", new Timestamp(100000L),100, 65, 10, 15, 1),    // ko: duplicate
              ("8","node2","hostname1","s1", new Timestamp(190000L),100, 90, 10, 15, 1))   // ok: time > limit time
            Thread.sleep(20000)
          }
        }).start()

Storing results in an in-memory object

The generated data is added to an in-memory object of MemoryStream type.

val testKey = "aggAccess should have good results withwatermark"
      val inputStream = new MemoryStream[(String, String, String, String, 
            Timestamp, Long, Long, Float, Float, Long)](1, spark.sqlContext)
      val inputDf = inputStream.toDS().toDF("request_id", "cp","node", "status",
            "datum", "bytes", "cache_hit_bytes", "request_time", "response_start_time", "total")
      val aggregatedStream = aggAccess(inputDf, "5 seconds")
      val query = aggregatedStream.writeStream.trigger(Trigger.ProcessingTime(10000)).outputMode("update")
            .foreach(
                  new InMemoryStoreWriter[Row](testKey, row =>
                        s"${row.getAs[Long]("datum")} / " +
                        s"${row.getAs[String]("cp")} / " +
                        s"${row.getAs[String]("node")} / " +
                        s"${row.getAs[String]("status")} -> " +
                        s"${row.getAs[Long]("bytes")} -> " +
                        s"${row.getAs[Long]("cachehitbytes")} -> " +
                        s"${row.getAs[Float]("requesttime")} -> " +
                        s"${row.getAs[Float]("responsestarttime")} -> " +
                        s"${row.getAs[Long]("total")}"))
          .start()

In-memory storage prevents disk writing problems and eliminates the risk of latency. On the parameter side, we pass a function that helps format the data in memory as desired.

Results comparison

Finally, we simply compare what we have stored in the memory to a list of expected results.

val readValues = InMemoryKeyedStore.getValues(testKey)
        readValues.foreach(println)

        assert(readValues.size == 6)
        val list = List(
          "1970-01-01 00:01:00.0 / node2 / hostname1 / s2 -> 101 -> 60 -> 10.0 -> 15.0 -> 1",
          "1970-01-01 00:00:00.0 / node1 / hostname1 / s1 -> 180 -> 100 -> 30.0 -> 25.0 -> 2",
          "1970-01-01 00:01:00.0 / node1 / hostname1 / s2 -> 100 -> 65 -> 10.0 -> 15.0 -> 1",
          "1970-01-01 00:01:00.0 / node1 / hostname1 / s1 -> 100 -> 120 -> 40.0 -> 15.0 -> 1",
          "1970-01-01 00:03:00.0 / node2 / hostname1 / s1 -> 100 -> 90 -> 10.0 -> 15.0 -> 1",
          "1970-01-01 00:03:00.0 / node2 / hostname1 / s1 -> 277 -> 233 -> 33.0 -> 52.0 -> 3"
        )
        assert(list.filter(readValues.contains(_)) == list)

This concludes our second article. In the next and last blog article, we will focus on performance testing. To be continued… 😉

Your email address will not be published. Required fields are marked *

Your email address is only used by Business & Decision, the controller, to process your request and to send any Business & Decision communication related to your request only. Learn more about managing your data and your rights.