Last week, I was working on importing a large input file into the system. Part of the process involved with reading a large file (~10GB) from remote server and them sort the inputs locally.

At that moment, I decided to give Scala's new Future API a try. Within one day, I wrote a parallel external merge sort that can

  1. Read the file from remote server and split it into smaller chunks.
  2. Use in-memory quicksort to sort each chunk and then save chunks into files.
  3. Perform merge sort on the files generated in the previous step.

The most amazing thing is that multiple threads could work on these tasks simultanously. While one thread is busy reading bytes from remote server, other threads would perform quicksort on the part that has already been read. Once, all of the data has been written to file system. Another thread will start to merge sort these files.

Here is how I do it.

Read Lines From InputStream

1
val soure: Stream[Int] = Source.fromInputStream(inputStream).getLines().toStream.map(_.toInt)

The above code will turn a InputStream into a Stream[Int] , and then allow us to perform operation on it without having to fully read it into memory first. But still, this is a huge Stream. Because we are going to use in-memory sort, I need to split it into smaller pieces first.

The following code will lift a Stream into a Stream of Stream. Again, this operation does not require read the whole original stream into memory. All the operation only happens logically.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
 * Lift a Stream into a Stream of Stream. The size of each sub-stream is specified by the chunkSize
 * @param stream        the origin stream.
 * @param chunkSize     the size of each substream
 * @tparam A
 * @return              chunked stream of the original stream.
 */
private def lift[A](stream: Stream[A], chunkSize: Int): Stream[Stream[A]] = {

  def tailFn(remaining: Stream[A]): Stream[Stream[A]] = {
    if (remaining.isEmpty) {
      Stream.empty
    } else {
      val (head, tail) = remaining.splitAt(chunkSize)
      Stream.cons(head, tailFn(tail))
    }
  }
  val (head, tail) = stream.splitAt(chunkSize)
  return Stream.cons(head, tailFn(tail))
}

Perform Quick Sort

After we have split InputStream into Streams, we can start to read each sub-stream into memory and perform quicksort on them.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
val linesStream: Stream[Stream[Int]] = lift(soure, chunkSize)
val chunkCounter = new AtomicInteger(0)

val sortedFileDir = Files.createTempDir()
sortedFileDir.deleteOnExit()

// read source stream, read n entries into memory and save it to file in parallel.
val fileFutures: List[Future[File]] = linesStream.map(
  s => {
    val chunk = chunkCounter.getAndIncrement
    Future {
      val sorted = s.sorted
      val ret = new File(sortedFileDir, "%d".format(chunk * chunkSize))
      val out = new PrintWriter(ret)

      try {
        sorted.foreach(out.println(_))
      } finally {
        out.close()
      }
      ret
    }
  }).toList

Perform Merge Sort

Because I want to perform mergesort on all of results, I have to turn List[Future[File]] into Future[List[File]] first. So that I can instruct the Future to do merge sort once it has all the pieces.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
val saveTmpFiles: Future[List[File]] = Future.sequence(fileFutures)

val ret: Future[File] = saveTmpFiles.map {
    files => {
  var merged = files

  while (merged.length > 1) {
    val splited = merged.splitAt(merged.length / 2)
    val tuple = splited._1.zip(splited._2)

    val m2 = tuple.map {
      case (f1, f2) => {
        val ret = new File(sortedFileDir, f1.getName + "-" + f2.getName)

        val source1 = Source.fromFile(f1)
        val source2 = Source.fromFile(f2)
        val out = new PrintWriter(ret)

        try {
          val stream1 = source1.getLines().toStream.map(_.toInt)
          val stream2 = source2.getLines().toStream.map(_.toInt)
          merge(stream1, stream2).foreach(out.println(_))
          ret
        } finally {
          out.close()
          source1.close()
          source2.close()

          FileUtils.deleteQuietly(f1)
          FileUtils.deleteQuietly(f2)
        }

      }
    }
    merged = if (merged.length % 2 > 0) {
      m2 :+ merged.last
    } else {
      m2
    }
  }
  merged.head
}


/**
 * Merge two streams into one stream.
 * @param streamA
 * @param streamB
 * @return
 */
private def merge[A](streamA: Stream[A], streamB: Stream[A])(implicit ord: Ordering[A]) : Stream[A] = {

  (streamA, streamB) match {
    case (Stream.Empty, Stream.Empty) => Stream.Empty
    case (a, Stream.Empty) => a
    case (Stream.Empty, b) => b
    case _ => {
      val a = streamA.head
      val b = streamB.head

      if (ord.compare(a, b) > 0) {
        Stream.cons(a, merge(streamA.tail, streamB))
      } else {
        Stream.cons(b, merge(streamA, streamB.tail))
      }
    }
  }
}

Give this Method a Pretty Face.

So how does the method signature of this parallel external merge sort look like?

In fact, it is quite simple. It takes an InputStream and returns a Future[File]. So that, everything happens asynchronously, nothing blocks the main thread. You can send an inputStream to this method, go to do other things first and then come back to wait for the result.

1
def sort(inputStream: InputStream, chunkSize: Int = 2000000): Future[File] = ???

Limits Number of Threads Running at the Same Time.

Because this parallel external mergesort is an IO and memory intense operations, we can not run too many of it simultaneously. We must put a constraint on the number of threads it can use at a time. Otherwise, we may receive OutOfMemoryError or having many threads writing to disk simultaneously.

Also, this constraint must be a global constraint. No matter how many requests has been sent to this method at the same time, it should only use up-to N threads.

Luckly, this is quite easy to do with Scala's Future API. All we need to do is to provide a fixed size thread pool for this method. So that it won't spawn new thread by itself, instead, it uses threads provided by this global thread pool.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
 * limits number of reading and sorting can be executed simultaneously. Because this is an IO
 * bound operation, unless the inputstream is coming from a slow http connection, otherwise, 5
 * is more than enough.
 */
private val GLOBAL_THREAD_LIMIT = {
  val ret = Runtime.getRuntime.availableProcessors() / 2
  if (ret > 5) {
    5
  } else {
    ret
  }
}

private lazy implicit val executionContext =
  ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(GLOBAL_THREAD_LIMIT))

Put Everything Alltogether

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import com.google.common.io.Files
import java.io.{PrintWriter, File, InputStream}
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicInteger

import org.apache.commons.io.FileUtils

import scala.concurrent.{ExecutionContext, Future}
import scala.io.Source

object InputStreams {

/**
 * limits number of reading and sorting can be executed simultaneously. Because
 * this is an IO bound operation, unless the inputstream is coming from a slow
 * http connection, otherwise, 5 is more than enough.
 */
private val GLOBAL_THREAD_LIMIT = {
  val ret = Runtime.getRuntime.availableProcessors() / 2
  if (ret > 5) {
    5
  } else {
    ret
  }
}

private lazy implicit val executionContext =
  ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(GLOBAL_THREAD_LIMIT))

def sort(inputStream: InputStream, chunkSize: Int = 2000000): Future[File] = {

  // open source stream
  val soure = Source.fromInputStream(inputStream).getLines().toStream.map(_.toInt)
  val linesStream = lift(soure, chunkSize)
  val chunkCounter = new AtomicInteger(0)

  val sortedFileDir = Files.createTempDir()
  sortedFileDir.deleteOnExit()

  // read source stream, read n entries into memory and save it to file in parallel.
  val saveTmpFiles: Future[List[File]] = Future.sequence(
    linesStream.map(s => {
      val chunk = chunkCounter.getAndIncrement
      Future {
        val sorted = s.sorted
        val ret = new File(sortedFileDir, "%d".format(chunk * chunkSize))
        val out = new PrintWriter(ret)

        try {
          sorted.foreach(out.println(_))
        } finally {
          out.close()
        }
        ret
      }
    }).toList
  )

  // perform merge sort.
  saveTmpFiles.map {
    files => {
      var merged = files
      while (merged.length > 1) {
        val splited = merged.splitAt(merged.length / 2)
        val tuple = splited._1.zip(splited._2)

        val m2 = tuple.map {
          case (f1, f2) => {
            val ret = new File(sortedFileDir, f1.getName + "-" + f2.getName)

            val source1 = Source.fromFile(f1)
            val source2 = Source.fromFile(f2)
            val out = new PrintWriter(ret)

            try {
              val stream1 = source1.getLines().toStream.map(_.toInt)
              val stream2 = source2.getLines().toStream.map(_.toInt)
              merge(stream1, stream2).foreach(out.println(_))
              ret
            } finally {
              out.close()
              source1.close()
              source2.close()

              FileUtils.deleteQuietly(f1)
              FileUtils.deleteQuietly(f2)
            }

          }
        }
        merged = if (merged.length % 2 > 0) {
          m2 :+ merged.last
        } else {
          m2
        }
      }
      merged.head
    }
  }
}

/**
 * Lift a Stream into a Stream of Stream. The size of each sub-stream is specified
 * by the chunkSize.
 *
 * @param stream        the origin stream.
 * @param chunkSize     the size of each substream
 * @tparam A
 * @return              chunked stream of the original stream.
 */
private def lift[A](stream: Stream[A], chunkSize: Int): Stream[Stream[A]] = {

  def tailFn(remaining: Stream[A]): Stream[Stream[A]] = {
    if (remaining.isEmpty) {
      Stream.empty
    } else {
      val (head, tail) = remaining.splitAt(chunkSize)
      Stream.cons(head, tailFn(tail))
    }
  }
  val (head, tail) = stream.splitAt(chunkSize)
  return Stream.cons(head, tailFn(tail))
}


/**
 * Merge two streams into one stream.
 * @param streamA
 * @param streamB
 * @return
 */
private def merge[A](streamA: Stream[A], streamB: Stream[A])(implicit ord: Ordering[A]) : Stream[A] = {

  (streamA, streamB) match {
    case (Stream.Empty, Stream.Empty) => Stream.Empty
    case (a, Stream.Empty) => a
    case (Stream.Empty, b) => b
    case _ => {
      val a = streamA.head
      val b = streamB.head

      if (ord.compare(a, b) > 0) {
        Stream.cons(a, merge(streamA.tail, streamB))
      } else {
        Stream.cons(b, merge(streamA, streamB.tail))
      }
    }
  }
}