Monday 16 October 2017

Scala type alias in Apache Spark

There is one feature in Scala that I see quite rarely - type aliases. While it can be easily misused and bring only confusion into Scala project, there is very good use-case for it in Apache Spark Jobs on RDDs

Spark job pipelines can be long and complex, which combined with type inference Scala, often leads to code which very hard to understand. Usually you can fight against this by making "check-points", where you assign interim result into value just to make it for reader easier to follow and understand long stream of transformations chained together.

For example, long code of combining two Maps into another was extracted into method. You can see that author took advantage of Scala type inference but recognised need to inform reader about type of result by creating long expressive name for it. Method name pretty much repeats the same idea of type explaining

val eligibleSegment2EligibleCountries = extractEligibleSegment2EligibleCountries(filteredSegment2PositiveCount, segmentCountry2PositiveCounts)
I might be a good idea to put explicit type of value into code. Reader then knows exactly what the result is and does not have to jump away to examine method's return type
val eligibleSegment2EligibleCountries: List[(Int, Set[Int])] = extractEligibleSegment2EligibleCountries(filteredSegment2PositiveCount, segmentCountry2PositiveCounts)
Bit better but it can be dramatically improved by using type aliases. In this domain, Segment ID and Country ID are Integers so let's create alias for them
type SegmentId = Int
type CountryId = Int
Now collection type becomes really self-explanatory to reader and we can also save some characters on variable and method names
val es2c: List[(SegmentId, Set[CountryId])] = extract(fs2pcount, sc2pcount)
But there are also have other numeric types which have some special meaning in the domain of the job - Counts and Rates
type Count = Long
type SampleRate = Double
Then very easy to understand method signatures or variable types can be used
def counts2rates(counts: Map[SegmentId, Count], maxLimit: Long): Map[SegmentId, SampleRate]
def train(exploded: RDD[(SegmentId, DmpCookieProfile)]): RDD[Either[SegmentId, (SegmentId, Count)]]

One might consider aliasing whole Map or List but I believe it would contrary obscure collection data types rather then helping reader trying to understand the code

Happy type aliasing

Thursday 16 March 2017

Apache Spark sample and coalesce

It is wasteful to work with large number of small partitions of data, especially when S3 is used as data storage. IO then becomes unacceptably large part of time spent in task, most annoyingly when Spark is just moving data files from _temporary location into final destination, after real work has been completed.

Small tip how to down-sample dataset but keeping resulting partitions sizes similar to original dataset. Key element is to get original number of partition, decrease it accordingly and coalesce RDD with it.

It is useful when you test your Spark job on single node (or few) but with the amount of data you expect production cluster nodes should be able to handle.


  def sample(sparkCtx: SparkContext, inputPath: String, outputPath: String, fraction: Double, seed: Int = 1) = {
    val inputRdd = sparkCtx.textFile(inputPath)
    val outputPartitions = (inputRdd.partitions.length * fraction).toInt
    inputRdd
    	.sample(false, fraction, seed)
    	.coalesce(outputPartitions, shuffle = false)
        .saveAsTextFile(outputPath, codec = classOf[GzipCodec])
  }

Happy sampling!