碎碎唸

Parallel External Merge Sort

Last week, I was working on importing a large input file into the system. Part of the process involved with reading a large file (~10GB) from remote server and them sort the inputs locally.

At that moment, I decided to give Scala’s new Future API a try. Within one day, I wrote a parallel external merge sort that can

  1. Read the file from remote server and split it into smaller chunks.
  2. Use in-memory quicksort to sort each chunk and then save chunks into files.
  3. Perform merge sort on the files generated in the previous step.

The most amazing thing is that multiple threads could work on these tasks simultanously. While one thread is busy reading bytes from remote server, other threads would perform quicksort on the part that has already been read. Once, all of the data has been written to file system. Another thread will start to merge sort these files.

Here is how I do it.

Read Lines From InputStream

1
val soure: Stream[Int] = Source.fromInputStream(inputStream).getLines().toStream.map(_.toInt)

The above code will turn a InputStream into a Stream[Int] , and then allow us to perform operation on it without having to fully read it into memory first. But still, this is a huge Stream. Because we are going to use in-memory sort, I need to split it into smaller pieces first.

The following code will lift a Stream into a Stream of Stream. Again, this operation does not require read the whole original stream into memory. All the operation only happens logically.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
 * Lift a Stream into a Stream of Stream. The size of each sub-stream is specified by the chunkSize
 * @param stream        the origin stream.
 * @param chunkSize     the size of each substream
 * @tparam A
 * @return              chunked stream of the original stream.
 */
private def lift[A](stream: Stream[A], chunkSize: Int): Stream[Stream[A]] = {

  def tailFn(remaining: Stream[A]): Stream[Stream[A]] = {
    if (remaining.isEmpty) {
      Stream.empty
    } else {
      val (head, tail) = remaining.splitAt(chunkSize)
      Stream.cons(head, tailFn(tail))
    }
  }
  val (head, tail) = stream.splitAt(chunkSize)
  return Stream.cons(head, tailFn(tail))
}

Perform Quick Sort

After we have split InputStream into Streams, we can start to read each sub-stream into memory and perform quicksort on them.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
val linesStream: Stream[Stream[Int]] = lift(soure, chunkSize)
val chunkCounter = new AtomicInteger(0)

val sortedFileDir = Files.createTempDir()
sortedFileDir.deleteOnExit()

// read source stream, read n entries into memory and save it to file in parallel.
val fileFutures: List[Future[File]] = linesStream.map(
  s => {
    val chunk = chunkCounter.getAndIncrement
    Future {
      val sorted = s.sorted
      val ret = new File(sortedFileDir, "%d".format(chunk * chunkSize))
      val out = new PrintWriter(ret)

      try {
        sorted.foreach(out.println(_))
      } finally {
        out.close()
      }
      ret
    }
  }).toList

Perform Merge Sort

Because I want to perform mergesort on all of results, I have to turn List[Future[File]] into Future[List[File]] first. So that I can instruct the Future to do merge sort once it has all the pieces.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
val saveTmpFiles: Future[List[File]] = Future.sequence(fileFutures)

val ret: Future[File] = saveTmpFiles.map {
    files => {
  var merged = files

  while (merged.length > 1) {
    val splited = merged.splitAt(merged.length / 2)
    val tuple = splited._1.zip(splited._2)

    val m2 = tuple.map {
      case (f1, f2) => {
        val ret = new File(sortedFileDir, f1.getName + "-" + f2.getName)

        val source1 = Source.fromFile(f1)
        val source2 = Source.fromFile(f2)
        val out = new PrintWriter(ret)

        try {
          val stream1 = source1.getLines().toStream.map(_.toInt)
          val stream2 = source2.getLines().toStream.map(_.toInt)
          merge(stream1, stream2).foreach(out.println(_))
          ret
        } finally {
          out.close()
          source1.close()
          source2.close()

          FileUtils.deleteQuietly(f1)
          FileUtils.deleteQuietly(f2)
        }

      }
    }
    merged = if (merged.length % 2 > 0) {
      m2 :+ merged.last
    } else {
      m2
    }
  }
  merged.head
}


/**
 * Merge two streams into one stream.
 * @param streamA
 * @param streamB
 * @return
 */
private def merge[A](streamA: Stream[A], streamB: Stream[A])(implicit ord: Ordering[A]) : Stream[A] = {

  (streamA, streamB) match {
    case (Stream.Empty, Stream.Empty) => Stream.Empty
    case (a, Stream.Empty) => a
    case (Stream.Empty, b) => b
    case _ => {
      val a = streamA.head
      val b = streamB.head

      if (ord.compare(a, b) > 0) {
        Stream.cons(a, merge(streamA.tail, streamB))
      } else {
        Stream.cons(b, merge(streamA, streamB.tail))
      }
    }
  }
}

Give this Method a Pretty Face.

So how does the method signature of this parallel external merge sort look like?

In fact, it is quite simple. It takes an InputStream and returns a Future[File]. So that, everything happens asynchronously, nothing blocks the main thread. You can send an inputStream to this method, go to do other things first and then come back to wait for the result.

1
def sort(inputStream: InputStream, chunkSize: Int = 2000000): Future[File] = ???

Limits Number of Threads Running at the Same Time.

Because this parallel external mergesort is an IO and memory intense operations, we can not run too many of it simultaneously. We must put a constraint on the number of threads it can use at a time. Otherwise, we may receive OutOfMemoryError or having many threads writing to disk simultaneously.

Also, this constraint must be a global constraint. No matter how many requests has been sent to this method at the same time, it should only use up-to N threads.

Luckly, this is quite easy to do with Scala’s Future API. All we need to do is to provide a fixed size thread pool for this method. So that it won’t spawn new thread by itself, instead, it uses threads provided by this global thread pool.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
 * limits number of reading and sorting can be executed simultaneously. Because this is an IO
 * bound operation, unless the inputstream is coming from a slow http connection, otherwise, 5
 * is more than enough.
 */
private val GLOBAL_THREAD_LIMIT = {
  val ret = Runtime.getRuntime.availableProcessors() / 2
  if (ret > 5) {
    5
  } else {
    ret
  }
}

private lazy implicit val executionContext =
  ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(GLOBAL_THREAD_LIMIT))

Put Everything Alltogether

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import com.google.common.io.Files
import java.io.{PrintWriter, File, InputStream}
import java.util.concurrent.Executors
import java.util.concurrent.atomic.AtomicInteger

import org.apache.commons.io.FileUtils

import scala.concurrent.{ExecutionContext, Future}
import scala.io.Source

object InputStreams {

/**
 * limits number of reading and sorting can be executed simultaneously. Because
 * this is an IO bound operation, unless the inputstream is coming from a slow
 * http connection, otherwise, 5 is more than enough.
 */
private val GLOBAL_THREAD_LIMIT = {
  val ret = Runtime.getRuntime.availableProcessors() / 2
  if (ret > 5) {
    5
  } else {
    ret
  }
}

private lazy implicit val executionContext =
  ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(GLOBAL_THREAD_LIMIT))

def sort(inputStream: InputStream, chunkSize: Int = 2000000): Future[File] = {

  // open source stream
  val soure = Source.fromInputStream(inputStream).getLines().toStream.map(_.toInt)
  val linesStream = lift(soure, chunkSize)
  val chunkCounter = new AtomicInteger(0)

  val sortedFileDir = Files.createTempDir()
  sortedFileDir.deleteOnExit()

  // read source stream, read n entries into memory and save it to file in parallel.
  val saveTmpFiles: Future[List[File]] = Future.sequence(
    linesStream.map(s => {
      val chunk = chunkCounter.getAndIncrement
      Future {
        val sorted = s.sorted
        val ret = new File(sortedFileDir, "%d".format(chunk * chunkSize))
        val out = new PrintWriter(ret)

        try {
          sorted.foreach(out.println(_))
        } finally {
          out.close()
        }
        ret
      }
    }).toList
  )

  // perform merge sort.
  saveTmpFiles.map {
    files => {
      var merged = files
      while (merged.length > 1) {
        val splited = merged.splitAt(merged.length / 2)
        val tuple = splited._1.zip(splited._2)

        val m2 = tuple.map {
          case (f1, f2) => {
            val ret = new File(sortedFileDir, f1.getName + "-" + f2.getName)

            val source1 = Source.fromFile(f1)
            val source2 = Source.fromFile(f2)
            val out = new PrintWriter(ret)

            try {
              val stream1 = source1.getLines().toStream.map(_.toInt)
              val stream2 = source2.getLines().toStream.map(_.toInt)
              merge(stream1, stream2).foreach(out.println(_))
              ret
            } finally {
              out.close()
              source1.close()
              source2.close()

              FileUtils.deleteQuietly(f1)
              FileUtils.deleteQuietly(f2)
            }

          }
        }
        merged = if (merged.length % 2 > 0) {
          m2 :+ merged.last
        } else {
          m2
        }
      }
      merged.head
    }
  }
}

/**
 * Lift a Stream into a Stream of Stream. The size of each sub-stream is specified
 * by the chunkSize.
 *
 * @param stream        the origin stream.
 * @param chunkSize     the size of each substream
 * @tparam A
 * @return              chunked stream of the original stream.
 */
private def lift[A](stream: Stream[A], chunkSize: Int): Stream[Stream[A]] = {

  def tailFn(remaining: Stream[A]): Stream[Stream[A]] = {
    if (remaining.isEmpty) {
      Stream.empty
    } else {
      val (head, tail) = remaining.splitAt(chunkSize)
      Stream.cons(head, tailFn(tail))
    }
  }
  val (head, tail) = stream.splitAt(chunkSize)
  return Stream.cons(head, tailFn(tail))
}


/**
 * Merge two streams into one stream.
 * @param streamA
 * @param streamB
 * @return
 */
private def merge[A](streamA: Stream[A], streamB: Stream[A])(implicit ord: Ordering[A]) : Stream[A] = {

  (streamA, streamB) match {
    case (Stream.Empty, Stream.Empty) => Stream.Empty
    case (a, Stream.Empty) => a
    case (Stream.Empty, b) => b
    case _ => {
      val a = streamA.head
      val b = streamB.head

      if (ord.compare(a, b) > 0) {
        Stream.cons(a, merge(streamA.tail, streamB))
      } else {
        Stream.cons(b, merge(streamA, streamB.tail))
      }
    }
  }
}

為什麼 Kim DotCom/Mega 能夠提供免費的網路

NOWNews 為紐西蘭提供免費光纖!盜版之王Kim Dotcom發豪語

現在,這位『網路頑童』再次放出讓人驚歎的承諾:為紐西蘭的每個人提供免費光纖服務。

引用 Dotcom 在 Twitter 上 的話就是:『The new Mega company will be based in
NZ & become it's most valuable IT biz. I will relaunch Pacific Fibre. Free
broadband for all Kiwi's

為什麼 Kim DotCom/Mega 能夠提供免費的網路呢?這要先從國際專線的拆帳機制講起;所謂拆帳就是,從甲地到乙地的線路 ,如果是甲地往乙地的流量佔四成,乙地往甲地的佔六成,那麼,這條專線的流量費用,就是六四分,甲付六成乙付四成。

所以說,如果 Kim Dotcom 可以在紐西蘭建立起一個超大的 ICP ,這條專線,百分之九十九的流量都是流出紐西蘭的,那麼 他就有機會只要付出 1% 的費用就可,當然要怎樣說服對連的電信商,接受這個條件,那就是 KD 的功力了。

反觀中XX電信,由於台灣的對外頻寬,由於多數是外往內,國內沒有好的 ICP ,造成這些專線的費用,都是台灣買單的。

小時候學的那些,台灣是東南亞重要戰略樞紐都是狗屁,過境的專線一堆,但是都沒留下實質的好處

之前同事有問我,為什麼國外的 Hosting 公司可以免錢的,我說,就是因為養 ICP 對 ISP 有好處,所以國外的公司養一堆當談判條件用。

台灣這邊,除了 「無名」被Giga當成談判條件 過之外,就沒有聽過有人拿 ICP 當談判條件了,話說回來,大部份的 ICP 都被中華給拉過去後,也 沒人有條件跟中華談了

但是,在國際專線這塊,台灣應該也來養幾個這種免費空間,這樣子才不會在國際專線費用這塊,被人家吃透透

中小企業ERP系統上雲端?

※ 引述《NightWind (gin and tonic)》之銘言:
: 我們是一家中小企業
: 大約三年前才把古早的DOS系統
: 換裝成鼎新的ERP
: 現在當初買的伺服器主機保固也要到了
: 老闆突發奇想把ERP系統裝在中華HiCloud那樣的雲端虛擬主機上
: 節省定期維護主機與擔心主機突然掛掉的費用
: 請問真的有人會這樣做的嗎?
: 又如果有的話,台北地區有沒有規劃的廠商?

HiCloud 沒用過,所以用 AWS 的例子來解釋要這麼做會碰上那些例子。

第一個問題是 VM 的可靠度是比你自己買伺服器等級的機器還來的低的, AWS EC2 的 合約是說 99.95% Available ,但是這不包括你 VM 被 AWS會有隨機關掉的機會。當 被關掉後,你是要自己手動或自動重啟才有 99.95%

第二個問題是,台灣多數的 ERP是2-tier的架構而非 3 tier 的,若是用 2-tier 的 軟體,使用者端要從本機連到 AWS去,一個來回的時間從 <10ms跳到 100ms 以上,如 果程式寫的不好,本來一個流程的轉換要跑兩個 SQL command,原來是 (10ms*2 + SQL execution time) * 2, 還有機會是小於 100ms看起來很即時就可以反應,一搬到雲端,時間就跳昇到 100ms * 4,變的超級鈍。

再來,放到 AWS 去,還要考慮台美專線速度的問題,過去幾年來,台灣對外的海纜,幾乎 每兩年都會斷線超過一天。所以放在雲端你的架構能提供的,就只剩不到 99.5%

在 EC2 上,如果要用 EBS讓 VM 當機後資料還在的話, EBS的效能是有口皆呸的 ,但是如果不用 EBS,那 VM 一死,上面的資料又全消失了,變成你自己要做 Data Replication

最後,如果要用台灣的雲端,不用要 HiCloud,比較建議的環境會是開發資源及支援較多 的 MS Azure 或是使用 joynet 技術的 MiCloud 會比較妥當。

縱觀來講,搬到雲端有其它新的挑戰要處理,第一個看軟體要先 3tier 化,再來是要 做DB online replication 保護資料,最後,是公司對外的專線要能跟的上資料量。

如果前述問題都能解決的話,貴公司得到的會是

  1. 更短的錯誤迴復時間:伺服器一死,跟廠商搬機器來迴復也要一個工作天以上,用雲端可以降到一兩小時
  2. 更敏捷的硬體效能:如果硬體效能不夠,使用雲端系統,是直接選更高等級的VM重開,當下就可以昇級。 不用等採買硬體所需的數星期到數個月的時間,也不必擔心折舊週期還沒到就又要昇級。

在是否該選用雲端方案的話,就看你們目前的 ERP是否是 3-tier 的,另外就是找顧問來來編 寫相關的工具及作業流程。

為什麼雲端產業在台灣行不通

很想寫這個主題很久了,雖然可能永遠也無法把道理完整的解釋清楚,不過,有個開頭總是好的。

首先,先寫個嘴炮給非軟體業的人看,台灣名嘴大喊:「台灣現在要找到能殺出血路的產業,雲端計算是很 好的機會」、「打造雲端大國,再造兆元產業」,政客名嘴喊喊口號就好,問題是,我們在這產業有什麼利 機,能夠在 落後歐美社會四百年 的狀況下,在不懂數字管理下,一次追趕過這些差距,超英趕美?

如果沒有確實的利基及施行方案, 「打造雲端大國,再造兆元產業」 同樣也可以改寫成 「打造月球 基地,開採外星綠能」、「打造火星計劃,一統太陽系」,後兩者講起來不是更威嗎?

台灣有許多硬體公司在喊,雲端是未來的機會,問題是,雲端的目標是共享資源、是消減 IT 的支出,對硬 體廠來說,是個產業危機,也是個大洗牌的機會,但是,對台灣整體來說,絕對是個不利的未來。至於某些 公司開始喊,雲端業務已經佔公司營業而多少,我的看法是,把伺服器部門改個招牌,雲端營業額就大增了 啊,問題是, 公司的總營收有成長嗎?

講完了這些後,再講個小故事, 2010 年我回台灣前,公司同事問我,回台灣要做雲端嗎?我回,我不會 在一個有 颱風、地震、洪水、土石流、及戰爭危機 的地方建資料中心。另外,台灣雖然是美國 對亞洲網路的樞紐,但是台灣的網路流量費過高,就是另一個要面對的問題。已上幾點,就是台灣要做雲端 資料中心地理環境上的外部問題。

再來講到台灣軟體業自己內部的問題,做網路服務,同時能服務 十萬個用戶 是一個門坎, 一百萬個用戶一千萬個用戶一億個用戶十億個用戶 都是不同的門坎,每跨過一階,整個 系統架構跟考量的點都完全不同,十萬個用戶用 LAMP 一台就夠了,一百萬用戶,那就要開始做 server farm, cache 等,到了 一千萬用戶,連 DB 都受不了用量,要找其它的解決方案( ex: shard ),到上億用戶,你的客戶已 經不會都是居住在同一地了,光解決 data locality 都是個大問題了,更不用講怎麼 deploy service

台灣目前有超過百萬用戶的服務,大概就是 Yahoo、無名、痞客幫、PCHome,這四間了,除了這四間外, 很少公司會有經營百萬人服務的經驗,問題是,台灣喊著要做雲端運算的,都不是這四間,而是剩下來沒有 經營網路服務經驗的公司。

為什麼這經驗很重要呢?因為,一個新架構的好壞,只有經過實際的流量摧殘才能知道,在這之前,一切只 是工程師們用腦補的假設而已,腦補的假設越多,系統上線後要修改的地方越多,至於沒做過百萬人服務的 工程師要怎麼用腦補來假設上億人服務的狀況,那就不是我能想像的了。

美國最大的 Hosting Company DreamHost 的創辦人 Sage Weil,在幾年前回去念博士,發展了 Ceph 這套 distributed file system ,人家想要研究資料時,只要回 DreamHost 回去挖一挖 log 就 有用不完的資料了,至於要驗證就更簡單了,只要把 5% 的用戶放到新架構上,就有用不完的資料可以用來 發掘新架構的缺陷。

這一點,是美國現有網路公司在跨足雲端服務的極大優勢;台灣的雲端廠商,如XX電信、XX研院、XX勢,跟本無 法彌補這中間的差距,記得在某場 CloudTW 的會議中,某家台廠在展視他們的雲端 OS 時, 葉博士 問了一 個問題就把這個雲端OS擊沉了,該雲端平台只能該台廠所提供的客制化OS,所有的系統 library 都是經過修改 ,不能自行更換的,於是 葉博士 就問,「要是今天 PHP X.Y.Z 有個 security hole 那麼,他能不能 自行補洞,還是要等台廠把整個環境更新才可把洞補上?」

現實( physical) 的環境沒有優勢,在經營網路服務上,更是落後國外數十年,沒有人材、經驗上的優勢, 這也就是為什麼 雲端產業在台灣行不通 的原因。

關於遊戲公司裁員的感想

看到 半路 分享 最近 幾間遊戲公司裁員 的新聞,不經讓我想到過去在矽谷流浪歲月所認知到 的黑暗面,底下就是一個我親身看過的一個例子。

許多年前,AE用三億美金買了一家flash game公司,當時我上 linkedin 看一下 CEO 的背景,一看, 不得了了,我竟然在 linkedin 上跟他是有直接聯結,原來我認識大人物自己都不知道。

後來一查才知道,他們是我工作公司,在三年前買的一間英國公司,我們把他們買下來後,兩年閉鎖期一到 ,他們一群六七個人就離職開新公司去了,這間公司後來就又被AE買過去。

當時,我不懂的是,為什麼AE要買這間公司呢?因為一個遊戲的生命週期就是六個月而已,一間遊戲公司的 價值是在於有一個團隊可以持續生產出賣座的遊戲,然而這間flash game的團隊,已經證明了他們兩年一到 就會走人,那為什麼AE公司要買這個公司呢?

後來問一問做金融的朋友,答案跟我猜想的不遠,AE就是買一天的新聞就好 「AE買下flash game公司,跨足FB市場,未來營收將有爆發性成長」

上市公司們,他們想的,跟你我想的不一樣;靠股票換鈔票的公司,新聞效果過了,股價漲了,執行長選擇權 生效後,就是可以把買進的公司全裁了,反正當初就是買新聞效應而已。

[未完] Introduction to Actor Model

The Challenge

在今天的講題開始前,我要先花一點時間講一下,我們這一代程式設計師面對的挑戰是什麼,為什麼我們需要一個新的程式架構?

大家應該都很熟悉摩爾定律,摩爾定律是指:一個尺寸相同的晶片上,所容納的電晶體數量,約每隔18個月便會增加一倍,性能也將提升一倍。 在Dr. Moore於1965年提出摩爾而後的40年間,摩爾定律準確的預測電晶體的發展,每18月就有更快一倍的處理器出來,每18個月記憶體的就倍 增。對於軟體工程師來說,摩爾定律讓我們的程式,不用更新軟體,每隔一段時間,效能就自動增加。

然而這一切,在 2005 年左右開始崩壞,那年,我在 ACM Queue 九月號讀到 "The Future of Microprocessors", "The Free Lunch Is Over: A Fundamental Turn Toward Concurrency in Software."等文章,提到,軟體工程師的免費午餐 已經結束了,未來,處理器的時脈將不會隨著電晶體數而增長,只有處理器的核心數依然跟隨著摩爾定律,每18個月增加一倍,而後,類似的 文章開始大量的出現在各樣的軟體雜誌之中。 Intel於 2006 年一月發表了個人電腦用的雙核處理器 - Core Duo, 2007 年四核的Core 2 Quad 發表,我想現在在場的聽眾中,有不少的是用 Intel i7 八核的處理器。

對軟體工程師來說,新的現實是 Amdahl’s Law ,一個程式能被多核心處理器加速的程度,不會高於程式中不可平行處理的部份的倒數。 也就是說,如果程式中只有50%的部份是可平行處理的,那麼,多核心的處理器最多只能幫你加速兩倍,不管你有雙核,四核,還是六十四 核等。如果程式中有 95% 的部份是可平行處理的,縱使你有兩千核的處理器,還是只能幫你加速 20 倍。

不管你是寫單機版程式,或者是多人用的網路系統,如何善用多核心的處理器,是現在不得不開始面對的問題。

Concurrency and Parallelism

接下來我們要先定義一下名詞,我們中文中講的平行處理,代表了兩個不同的涵意,一個是 Concurrency , 另一個是 Parallelism

Parallelism 講的是,程式中有兩個以上的部份,可以同時間被處理,而不會互相干擾。

Concurrency 指的是,程式中的兩個以上的部份,可以同時間被處理,或者透過分時分享資源,同時間被處理器處理。

然而,對軟體工程師來說,這兩者都是很困難的,因為我們的程式不免會用到共享的資源,在 Parallelism 中,如何找出共享的資源 將這些模組分離出來最小化,是一種挑戰,在 Concurrency 中,如何確保共享的資源不變成程式中的瓶頸,又是另一個挑戰。

在 Java 6 Concurrency API 就提供了不少工具來協助處理 Concurrency 的問題,如 Executor, CountDownLatch, Semaphore , DelayQueue 等工具。

Issue: Shared Memory Concurrency

那 Shared Memory Concurrency 的問題在那呢?我記得我在學寫網頁時,第一次寫 Web Counter 時就學到了,原來小 到一個 counter++ ,都會因為 thread 存取的順序,而發生 race condition 造成結果與預測的不同。Shared Memory Concurrency的難點就在於,如何正確的使用 Lock 及 Lock 的不可預測性。

由於作業系統設計時,為了效能,不會將 Lock 設計成完全的公平,當有三個 Thread 依序去要一個 Lock 時,你執行三次, 可能三次都是不同的 Thread 第一個拿到這個 Lock ,這個特性,造成了測試時及執行時的不可決定性,無法在測試時,模擬可 能發生的所有狀況。

此外 Lock 並不是容易去使用的,如果你在程式中使用了大量的 Lock ,將會造成程式中可被併行處理的部份減少,在 Java6 之前,若是你在沒有需要用 lock 的地方使用了 lock , jvm 仍然會去要這個 lock ,在 Java6 Mustang Release中, HotSpot JVM 的實作引入了 lock elision 的觀念, HotSpot VM 將會透過執行階段的分析,去忽略掉這些多餘的 lock。

然而, Lock 仍帶入了許多問題,如 lock starvation ,當許多 thread 大量、重覆的取用同一個 lock 時,由於取得 lock 的機制並不是公平的,所以有可能會有 thread 一直取不到 lock 的狀況發生。

另外,若是多個 Thread 都要存取相同的數個 lock ,則會造成互相卡位的狀況,形成 live lock。

當然,你也可以說,用 Lock 太麻煩,而減少 Lock 的使用,那麼你的程式就會變得不 Thread-Safe ,在處理量大時,會發生 Race Condition造成不可預期的結果產生。

而用 Lock 最讓人絕望的是,若是 Lock 的順序弄錯了,會造成 Dead Lock ,變成,程式在設計的初期就要把各物件呼叫的順序 先定出來,才可以避免 Dead Lock 的產生,但是你能保證你的程式 1.0版、1.1版、2.0版的呼叫順序都不變嗎?

False sharing: Cache Line Issue

如果說 Lock 的問題就夠讓人頭大了,那麼底下這個我最近讀到的問題,則會讓你更吃驚。在電腦的架構中,我們有主記憶體、處理器 上有L1 L2 L3快取,處理器存取資料時,會把資料放在快取中,在適當的時機 在把修改寫回主記憶體;這點,大家應該不難去想像。

然而,處理器在從主記憶體存取資料至快取時,是一個區塊一個區塊的去存取,所以在取用資料時,不只會拿回所需的資料,同時也會 取用到相鄰的資料,這在多核心的機器上造成,若是core A 及 core B會去存取相鄰的資料,那麼他鍆將互相 invalidate 對方的 L1快取,造成要從下層的快取或主計憶體上重新讀取資料。

在一篇測試報告中,在一台八核的機器上的測式結果,有False Sharing問題的程式將比沒有False Sharing問題的程式慢12倍,這 數字差不多就是L1及L2讀取速度的差異。依此推測,若是要從主記億體重讀的話,則會慢上200倍!

The Solution

A new high level programming model

講完了問題,當然之後,當然要開始講解決方案了;許多大神、大師們覺得我們須要一個不同的 Programming Model 來解決這問題,他們覺的,這個新的解決方案要有底下幾個特性

  • 要容易被理解,跟容易被測試。
  • 換言之,也就是要deterministic,如果程式流程是輸入 A 產出 B ,那麼不管執行多少次,這個結果仍是要 一樣的。
  • 既然 shared mutable state 是問題的根源,我們就把這問題給移除。
  • 能夠善用多核心所帶來的優點。

Possible Solutions

有個不同的解決方案被提出來,首先是 Functional Programming ,完全的把 Mutable State 移除,程式變 成如同數學函式一般可以被堆疊,程式函式的本身不帶有狀態,所有的狀態都是由外部傳入的,如此一來,程式本身 的運算處理就變的 deterministic

而既然函式本身不帶有狀態,那麼他們便可以平行的被處理,例如在 scala 中,就引如了 parallel collection ,讓軟體工程師可以很簡單的就會一群資料做平行的運算

1
2
scala> List(1, 2, 3).par.map(_ + 2)
res: List[Int] = List(3, 4, 5)

Functional Programming所需的 Lambda expression 將會在 Java8 時被實作出來,而 Parallel Collection 也有機會在 Java8 時出現。

在此順便打個廣告,台灣目前有個 functional programming user group 在運作中,若是各位有性去,可以上網 找 fpug.tw

而另一個可能的解決方案是 Actor Model , Actor Model 將狀態封裝在Actor內部,Actor及Actor間,則是透過 非同步的機制去傳遞訊息。

A Brief History of the Actor Model

在開始介紹 Actor Model 前,先簡單介紹一下 Actor Model 的歷史。 Actor Model 是於 1973 年 Carl Hewitt 命名的,而由 Gul Agha 這位先生於 1985 年左右更精確的描述 Actor Model 。

第一個大型的商業應用,則是由 Ericsson 於 1980 年代中期所完成的一個電信系統,這個系統非常的容錯,當大家在介 紹 Actor Model 時,多會提到這個例子,因為這個系統達到了 99.9999999% 九個九的 uptime ,也就是說 一整年下來,他停機的時間只有 31 ms ,非常的匪疑所思,因為,一般我們在寫的系統,能有兩個九,就已經是很了不 起的成就了。

這個專案另一個有名的產品是 erlang 這程式語言, erlang 在 1990 年代被 opensource ,後來被大量運用在 message queue 跟 Concurrency 的程式之中。例如大家常用的 RabbitMQ 就是利用 erlang 寫成的,據說他的核 心只有 5000 行 erlang 的程式碼。

Actor

那什麼是 Actor 呢, Actor 是一個非常輕量化的元件,在 Akka 的實作中,每一個 Actor 只占了約400 bytes 的記憶體空間,每一個 Actor 擁有自己的狀態以及行為,將些狀態及行為都被封裝在 Actor 裡面。

其他的 Actor 只能夠透過送訊息的方式,來與這個 Actor 來溝通,存取 Actor 的狀態及觸發行為;當 Actor 收 到外部的訊息時,這些訊息會被放在信箱裡,依照順序來被這個 Actor 來處理。

這個送訊息的及處理訊息的機制,是非同步的,你不能假設這個訊息一定會立刻被同一個 thread 處理,甚至,你不能 假設說,你一定會收到一個回應。

當 Actor 處理訊息時,他會被放在系統中預先配制好的 Thread 上執行、處理訊息。

很有趣的是,當你這樣設計一個系統時,你的系統便會變得非常的 scalable 及非常的輕量化。因為使用 Actor Model 你的 call stack 會變的很小,在我們用 imperative programming ,我們的 call stack 是從系統最外部,透過 一層層的 method call 累加上來起來,當 CPU 在 Context Switching ,在從一個 Thread 跳到另一個 Thread 去 執行時,需要讀入該 Thread 的 Call Stack ,當 Call Stack 越大時,Context Switch的成本就愈高。

在Actor Model 中,兩個 Actor 在傳遞訊息時,其實只有把這個訊息的 reference 丟到另一個 Actor 的信箱中,訊 息傳遞的成本非常的小,而 Actor 在被喚起處理訊息時,他的 call stack 是從他的信箱開始,而非整串的呼叫流程,因 此, context switch 的成本變的很小。

在 Actor Model 中,一個工作的處理,會變得很像我們在辦公室內做業的方法,當我們從系統外部收到一個訊息時,我們 會把這個工作連同這個工作的寄送者先記錄下來,放到信箱中延後處理,當前一手完成工作後,運行中的 Actor 會把處理 好的資料轉發給下一手,就這樣一層層轉發,最後在將完成的工作結果,送還給原始的寄送者。

Introduce Akka

接下來的部份,我們要介紹 Akka 這個 JVM 上 Actor Model 的實作。

Akka 是由 Jonas Boner 於 2009 年所發起的,在開發 Akka 之前, Jonas 是在 Terracotta 做 JVM Clustering 以及在 BEA 做 JRockit VM 的,具有多年食作 distributed system 的經驗。目前 Akka 背後的商業公司是 typesafe , typesafe 同時也是開發 Scala 這個 JVM 上最盛行 JAVA.NEXT 語言的公司。

這個實作是跑在 JVM 上,提供了 Java 及 Scala 的 API ,在接下來的簡報中,我會用 Java API 為主 Scala API 為輔 來介紹怎麼使用 Akka。

Akka另外也引入了 remote actor 這個觀念,既然,我們所有的訊息傳遞都是非同步的,透過訊息的傳遞來呼叫,而這訊息本 身,又是不帶狀態,可以被 serialize 的,狀態的本身是存在 Actor 中的,那麼,為什麼我們自然有可能的將工作放到別台 機器上來處理。

除了 Actor Model 外,Akka還提供了許多 module 來跟外部的系統整合,如 akka-camel ,能讓 akka 串接上 camel 這 個 enterprise service bus ,便程它的一個 endpoint ,讓 Actor 能處理 camel 送來的訊息然後再將訊息轉發回 camel 中。

Define Actor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import akka.actor.UntypedActor;

public class Counter extends UntypedActor {

   private int count = 0;

   public void onReceive(Object message) throws Exception {
     if (message.equals("increase") {
       count += 1;
    } else if (message.equals("get") {
      getSender().tell(new Result(count));
    } else {
      unhandled(message);
    }
  }
}

Fault Tolerance in Akka

接下來我們要談的就是為什麼 ericsson 的系統能夠達到那麼高可靠度的原因以及 Akka 容錯的機制。

當一個系統在運作時,不免因為一些設計外的因素、或不可預期的輸入,造成系統產生錯誤的狀態,有時,這個錯 誤的狀態只是丟出一個 exception 就結束,但是在某些情況下,系統會處於一個混亂的狀態之中,無法再繼續處 理任何的訊息。

這個狀況,我相信大多數的 Windows 用戶都有碰上過,也非常清楚這個問題的解決方案,也就是 – 重開機

在 Akka 中,Actor們如同現實生活中公司的組織,可以有低層的員工以及高層的 Supervisor ,當一個員工 在處理訊息時,若是有發生無法自己無法排解的錯誤狀況的話,他將會把這錯誤狀況,回報給他的上級,由上級來 決定如何處理。

如果說,一個 supervisor 管理的 workers 都是獨立的個體,沒有交互影響的問題,那麼, supervisor 可以 選擇將有問題的 worker 單獨重開機,當一個 actor 被重啟時,他的 actor reference 將保持不變,外面的 actor 仍可以透過原有舊的 actor reference 跟這個重啟後的 actor 溝通。而被重啟的 actor 的信箱,仍會保存的 尚未被處理的訊息在信箱內,而造成錯誤的訊息,則是在重啟時可以決定是否要保留重新處理。

若是一個 supervisor 下的 workers 有相互依賴的情況的話,那麼, supervisor 可以在某一個 worker 發 生錯誤時,重新啟動所有的 workers。

這個錯誤排解的機制不止是可以串上一層而以,而是可以一層層的堆疊起來,位在最上端的,則是前面所看過的 ActorSystem ;當一個錯誤是直屬主管無法排除的錯誤,那麼,這個錯誤訊息可以再轉發給更高層的主管,由他來解決。同樣的,重啟的觀 念也可以套用在這邊。當一個系統的子系統發生錯誤時,我們可以重新啟動整個環境

Remote Actor

在 Akka 的實作中 Actor 預設就是可以被 Remote 化的, Actor 在設計時,就已經是設計成 location transparent 及 可被配置在本機或是遠端的,一個原本設計成本機用的 Actor ,可以不經任何的程式變化,直接修改設定檔,變成在啟動一個 Actor時,自動就是啟動在遠端的。

而傳遞訊息給 Remote Actor 跟傳給本機的 Actor 是沒有差別的,同樣是透過 tell 及 ask 這兩個 API ,在底層實作上 Actor Reference 會帶有 Remote Actor 實際所在機器的位址,若是一個 Actor 是 Remote Actor 的話,被傳送的訊息 就會透過 Java Serialization, Protocol Buffer Serializer 或者是自定的 Serializer 來將訊息寫入到遠方。

而要使用那一種 Serializer ,是可以透過設定檔做切換的。

Remote Actor 的配置,也是可以透過程式來控製,下圖便是一個例子。

Routing & Clustering

對於 Cluster 的設計,目前 Akka 的團隊還在重新設計中,目標將會是在下一個 release 時,將 cluster 的功能 加入到 Akka 中。

目前,Akka 則是只支援 Routing 而已。 Routing 的機制是說,你有一群一樣的 actor,他們可以處理同樣類型的訊息 ,但是由誰來處理訊息,則是由 Router 來決定。

你可能會問,什麼時候需要這樣做呢?會有 Routing 的需求時,多是因為有處理量限制的問題,例如,我想要最多不超過五個 這類型的工作可以同時間被處理,或者是從使用者A來的需求,最多只讓它同時跑五個,但是從使用者B來的需求,最多可以跑十 個,而且兩個人的程序不能混用。

Akka 目前支援的 Routing 機制有

  • RoundRobinRouter: 在一群 actor 中,依序將工作配發給他們。
  • RandomRouter: 在一群 actor 中,隨意的將工作配發給他們。
  • SmallestMailboxRouter: 在一群 actor 中,將工作配發給目前沒有在工作的,或者是待辦工作最少的 actor
  • BroadcastRouter: 將工作配發給所有的 Actor
  • ScatterGatherFirstCompletedRouter: 將工作配發給所有的 Actor,然後等待第一個回應,其餘的回應則是會被忽略捨棄掉。

Routing 的機制也可以跟 Remote Actor 混用,將 Remote Actor 配發在多台機器上,然後透過 Router 將訊息配 發給他們。

Performance

下圖的是 Akka 官方,於一台48核的 AMD 機器上,做簡單訊息傳送的 Performance Test 的測試結果。

在一開始的實作中,Akka是使用 java.util.concurrent.ThreadPoolExecutor 控制最多有多少個 Actor 能同 時被執行,然而在這 48核 的機器上,Akka Team發現,Akka無法 scale 超過 12 個併行的 actor ,不管你增加 多少個 actor ,同時間內被處理的最大量卡在每秒一百四十萬個訊息這個數字。然而,同時間 CPU 的 Load 並沒 有超過 10%

Akka Team猜想是因為 ThreadPoolExecutor 底層的 task queue, LinkedBlockingQueue 有一個共用的 lock, 最後造成擁塞,才會造成效率的不佳。

後來在經過了 Doug Lea 的協助,使用 Fork-Join 重新實作 task executor ,於是效能就拉高到每秒兩千萬個訊息 這數字。

Use Case

那 Akka 可以被用在那些地方呢?目前 Akka 被應用在下面幾個地方。

  • messaging system: 例如國內的 Cubie 就有用到 Akka 來處理訊息。
  • Data Analysis: klout.com 例用 akka 來分析社群網站上如 facebook twitter 上訊息轉發的熱門程度 以及誰是意見領袖等資料。
  • 股票交易系統: Akka 的一大客戶群是倫敦的金融公司,Akka的幾個特性很適合用來做股市交易用。像是用 Actor 來追蹤一個股票的現貨股價變化以及產生各種線型圖;又或者是說,結合 Rule Based Engine 可以做風險控管的 模組。例如說交易員要下單買一支股票時,Akka可以幫你同時檢查,這個交易員他目前手上可用資金的是多少, 交易員目前股票組合的風險是多少,公司目前整體股票組合的風險是多少。 數十至數百個不同規則的檢查是可以透過 remote actor 分散在許多機器上併行處理的,一個交易的風險評估,並不 會隨著檢查的數量而線性增長。
  • 另外一個 Akka 被大量使用的環境是線上遊戲,過去有許多遊戲公司選擇用 MySQL 當後台,然後透過 cache 及 db sharding的方式,來增加 throughput ,後來他們發現,與其把 server 寫成 stateless 然後把狀態放在後台 的 db 讓 db 變成效能的頻頸,不如把 state 放在 Actor 中,讓每個 Actor 代表一個遊戲中的角色,獨立維護 它的狀態,這樣才能更 scalable.

Case Study

Spatial Search With Lucene

Before We Start

As some of my blog readers may know, my previous startup project was an online location based search service. Thing does not go well and I am going to open source some of the related work.

In this post, I will show you how to use Lucene to do spatial search as well as how it works internally.

Our Custom Approach

Lucene’s contrib-spatial uses geo-tagging to implements spatial search and only support searching for features near a point.

A pair of latitude and longitude gives us a quickstart for your location based application. However, not every single feature in a LBS application can be described as a point. Let us take school district as an example. The School A may be closer to your house than School B is, but your house is belong to school district for school B. Another example is bike trails, bike trail is a line not a single point.

A full feature spatial search must support complicated geometric operations. This is why we create our own Lucene spatial search implementation.

In the GIS area, Geometry are used to describe shape of features. The common geometries are:

Geometry primitives(2D)

Type Text Format Example
Point POINT (30 10) PointExample
LineString LINESTRING (30 10, 10 30, 40 40) LineStringExample
Polygon POLYGON ((30 10, 10 20, 20 40, 40 40, 30 10)) PolygonExample
POLYGON ((35 10, 10 20, 15 40, 45 45, 35 10), (20 30, 35 35, 30 20, 20 30))| PolygonExample2

Multipart geometries (2D)

Type Text Format Example
MultiPoint MULTIPOINT ((10 40), (40 30), (20 20), (30 10)) MultiPointExample
MultiLineString MULTILINESTRING ((10 10, 20 20, 10 40) (40 40, 30 30, 40 20, 30 10)) MultiLineStringExample
MultiPolygon MULTIPOLYGON (((30 20, 10 40, 45 40, 30 20)), ((15 5, 40 10, 10 20, 5 10, 15 5))) MultiPolygonExample

How to Calculate QuadTree value for Geometries

When hashing a Geometry primitive, we will calculate the minimum bounding rectangle(MBR) of this geometry. For example, the minumum bouding box for PolygonExample is the grid made up of (1, 1), (1, 4), (4, 4), (4, 1).

We will calculate the quadtree hashcode for the MBR, the quadtree hashcode for the MBR is the longest common prefix of quadtree values for the four corners.

When indexing this geometry with Lucene, we will store these fields in lucene
  • geometry: POLYGON ((3 1, 1 2, 2 4, 4 4, 3 1))
  • geometry__quadtree: QuadTree value of the polygon.

Implementation Detail

The number of digits we used in the quad-tree implementation is 17 digits. This allows us to lay items on an 600 x 600 meter block. This number comes from 40075000 / 2 ^16 (Earth Circumference / level of details). The 17 digits number will be stored as a long value. The digits we used are 1(upper left), 2(upper right), 3(lower left), 4(lower right), and 0 (whole block in the given detail).

When encoding a coordinate, we will stores the quadtree value as a NumericField in Lucene. Internally, Lucene will use indexed trie to store NumericField. The "Indexed Trie" uses buckets to group terms. We will use the default precisionStep(4) for now.

Index Phase

During the index phase, we will store the geometry in its text form and in quadtree value. Each of them has their own use during search phase.

1
2
3
4
5
6
7
8
9
10
val point = Point(24.123, 18.921)
val code = point.quadtree


val raw = new Field("location", point.wkt, Field.Store.YES, Field.Index.NOT_ANALYZED)
val field = new NumericField("location" + "__quadtree")
field.setLongValue(code)

doc.add(raw)
doc.add(field)

Search Phase

The search phase has two parts
  • filter: fetches all possible features that may contains the geometry X by using Lucene’s range query against the quadtree field.
  • select: for each possible results, run geometric operation to see if result Y contains geometry X.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
val field = "location"
// create a point.
val point = context.makeShape(Point(24, 18))

// turn point into search range.
val circle = point.buffer(Distance("500m"))

// create a query container for complex queries.
val query = new BooleanQuery()

// filtering: search for items in this range
query.add(QuadTreeRangeQueryFactory.buildLocalQuery(field, circle.quadtree))

// selecting and scoring: the value source will return the distance between
// the point and each feature.
query.add(new ValueSourceQuery(context.makeValueSource("location", point));

val founds = indexSearcher.search(query)

Open Source Projects

Scala Shapely

Shapely is Scala binding for JTS Topology Suite. The goal of Shapely is to provide an easy to use factory to create JTS geometry class instances. It allows us to create various geometry shape with easy to use factory methods.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// create a point
Point(30, 10)

// create a line string
LineString((30, 10), (10, 30), (40, 40))

// create a polygon
Polygon((30, 10), (10, 20), (20, 40), (40, 40), (30, 10))

// create a polygon with a hole in it.
Polygon(
    Seq((35, 10), (10, 20), (15, 40), (45, 45), (35, 10)),
    Seq((20, 30), (35, 35), (30, 20), (20, 30)))

// create a multi-point
MultiPoint((10, 40), (40, 30), (20, 20), (30, 10))

// create a multi line-string.
MultiLineString(
    Seq((10, 10), (20, 20), (10, 40)),
    Seq((40, 40), (30, 30), (40, 20), (30, 10))
)

// create a multi-polygon.
MultiPolygon(
    Seq(
        Seq((30, 20), (10, 40), (45, 40), (30, 20))
        ),
    Seq(
        Seq((15, 5), (40, 10), (10, 20), (5, 10), (15, 5))
        )
    )
)

Lucene Spatial

Lucene Spatial is the geo-spatial module for lucene built on the top of shapely. The lucene-spatial modules uses a SpatialContext instance to encode location fields and to create location queries.

1
2
3
4
5
6
7
8
9
10
11
// create a context.
val context = new SpatialContext

// create a shape from the representation of a geometry.
val shape = context.makeShape("Point(0 10)")

// create lucene fieldables for a location field.
val fields = context.makeFieldables("field", shape).toList

// create a location query to look for features within 5KM radius.
val query = context.makeQuery("field", shape, new Distance(5, Distance.Unit.KM))

那些系統整合教我的事 – 資料錯誤

首先,今天要恭喜 iTunes Stores 台灣版總算上線了,恭喜蘋果搞定眾多台灣商家搞不定的 數位資料授權的問題;既然 iTunes Store 幫忙打通了關節,接下來許多的台灣的小 start-up 可以 透過串接 iTunes Store 的方式來賺取介紹費,又多了許多新的 business idea 可以在台 灣實作的可能。

iTunes Store 上線, 許多人發現 許多專輯的費用只有 20 元,這讓我想到,當年我在 某音樂公司的經驗,當時是美國幾大音樂公司開放開始賣 non-DRM 音樂的年代,我們公司在線 上串流服務上做到全美最大,有三百萬的訂戶,音樂資料庫中,有超過一百萬首歌曲的資料,隨 著 non-DRM 的熱潮,我們也非得開賣 non-DRM 的音樂。

當時我是主管 QA 工程師的,問了再上頭的主管,問說除了網站的功能外,需要再做資料測試嗎 ,因為跟據我當時的經驗,許多的音樂專籍的資料是有缺或有錯的;但是主管的說法是有其它的 單位會保證資料無誤,所以我這邊只要擔心功能性的問題就好。

當時產品的訂價方式是,一首歌的成本是 0.6 美金,我們賣 0.99 美金,每一張專輯我們則是 賣 9.99 ,等於是專輯如果超過 10 首歌,我們就開始吸收中間的價差。

然而一上線的隔天我們就知道錯了,上線的當晚就有人發現,有許多張精選輯,一張精選輯有數 張CD我們仍是只賣 9.99 每元,當晚,這些專輯就賣掉四千套,一晚就幫公司賠掉了五十萬美金 以上。由於這些音樂都是 DRM Free 的,所以賣出去就是賣出去了,沒有辦法收回,我們仍是要 依照賣掉的數量付錢原始授權的擁有者。

其實,這些歌曲的資料庫我們是跟外部另一家公司買的,並不是自建的;我(們)在這邊學到的一課 是,不管資料是內部還是外部建的,要用時,還是要自己全部檢查一遍才可用。

至於給台灣 iTunes Store 消費者的建議是 快買 等美國那邊睡起來後,產品就會下架再 改回原價了

軟體品質指標系列(三):軟體品質如何測量

在這一系列的第一回我們談到,我們是軟體工程師,不談虛幻的概念,只講能夠量化追蹤的數值指標,那麼, 我們要怎麼把品質轉換程數值指標呢?

軟體品質如何測量

幾個故事

在開始講怎麼做前,我想先講兩個我親身的經驗,在我第一份工作時,前雇主是做手機遊戲的,整間公 司有兩百個員工,但是只有三個是做Server端的,在 2005 年時,多數遊戲都是單機板的,那時公司 簽下 World Series Of Poker 要做多人的線上遊戲,需要開發一個簡易的配對及遊戲訊息交換的伺 服器,當然,大家當年都沒有經驗,所以說,要怎麼樣確保在上線時,不會被用戶衝垮就變成我的工作。

而在我第二份工作時,也有相似的經驗,我們本來已有一個運作中的音樂平台,讓公司內外超過40個Partner使用; 當時,公司又簽下一個合約,說是在三個月後的 6/1 ,會新增 200萬用戶!如何不讓大水衝垮我們的 應用程式,又變成我那幾個月的工作重點。

/images/2012-05-23/troops.jpg

Load Test

寫到這邊,大部份的讀者應該都知道接下來要講的是 Load Test 了,Load Test是個很大的議題,通 常我們在講 Load Test 時,常講的其實是三件不同的事

  • Performance Test
  • Stress Test
  • Longevity Test

這三個測試,用的工具雖然一樣,但是要找個指標跟目的卻是大不相同。

Performance Test

Performance Test 的目地是在找出應用程式的 baseline performance ,做為未來績效評估及設計變 革時的依據。

被 Performance Test 的標的物,通常是已經 feature complete 的模組,然後我們將對模組的每 一個功能,一個一個進行黑箱及白箱測試;然後再用腳本的型態,模擬實際上線的流量,再來測試看看 各功能間對效能的互相影響程度;最後,是找到測試標的物的 Turning Point 及 Scale Factor。

尋找 Best Case Performance

尋找應用程式的 best case performance 大概是最常被忽略的事了, best case performance 指的是你 一個功能的最佳效率,不管怎樣測試,對單一功能來說,怎麼樣你的效率都不可能會再比這個數字 好了;這個數字的第一個用處就在這,如果你的最佳效率比目標效率還糟,剩下來的就不用測了, 因為怎樣都會比這個數字還糟,只能先停下來,回去修改程式碼增進效能。

Best Case Performance 的測法是:

  1. 將應用程式啟動
  2. 先用幾個 request 替應用程式暖身(warm up),以 Java VM 來說,這樣可以讓 Hotspot VM 幫 bytecode 最 佳化,有些需要被啟動的內部原件也會被啟動。
  3. 接著是一個一個的慢慢送一百個 requests ,然後取平均值及標準差。

若是標準差的值過高,或者是反應時間有增長的現像,那麼這個狀況應該記錄下來,然後使用白箱 測試的工具(如 yourkit profiler 去找問題的根源。

除了平均數及標準差外,常用的數字還有

  • mean
  • standard deviation
  • minimum
  • maximum
  • 96 percentile
  • 99 percentile
  • 99.9 percentile
  • 5 minutes moving average
  • throughput
  • error rate

另外,圖表也是常用的工具,因為相較於數字,圖表更容易看出來趨勢的走向;以下圖為例,藍線的 平均反應時間是1.354秒,但跑load test二十分鐘後,每一個 request 都已經超過了平均數,因 此將數字用圖像來呈現是有助於判別數字的。

/images/2012-05-23/chart-1.png

建立 Performance Baseline

接下來的工作,是定出比較的基礎,建立一個你覺得合理的流量,然後就這個比較基礎去找出來不同 狀況下的平均反應時間及處理量的變化。

基礎效能的建立,我們要先控制兩個變數 requests per minutesnumber of concurrent requests ; 透過調整這兩個變數,先找出反應時間較 best case 的流量,然後就此數字開始調整到一個你覺 得合理的值,開始進行 Load Test。

劉接下來我們開始把 requests per minutes(RPM) 倍增number of concurrent requests(CR) 不變 來看,當流量 增加時應用程式的反應時間會如何增長,接著是 RPM 及 CR 同時都倍增,看反應時間如何增長, 依此規則,就二的次方(2^N)開始往上增加,直到 response 開始嚴重衰退,或者 throughput 開 始衰退。

接著,如果你的應用程式支援 Scale-Out ,那麼,我們依此規則,就 RPM, CR, Machines 三 個變數再次就二的次方(2^N)開始調整流量來做測試。

前面這個過程,可以幫我們了解以下幾件事

  • 應用程式對壓力的反應是如何,當流量被增時,反應時間成長的幅度是線性還是成等比級數成長。
  • 處理量 throughput 是否會隨著 request 數增加而增加,是否在超過某個轉折點時,會開始 反轉下降
  • 上述的轉折點,便可當做未來評估是否需要增加機器或者是昇級機器的基準值。當實既須求接近這 個點,或有事件會造成流量超過這個點,那麼,我們就可以在事前進行反應。或者,在監控應用程 式中加入警告,當線上程式逼近這個值的時候,主動告知我們該加機器了
  • 當我們增加機器時,如果應用程式不是寫成完全 stateless 的,而是有共享資料資源時,那麼 ,scale factor便會小於 2,那麼,我們需要關查 scale factor 到底是隨著機器的增加而 不變、還是緩慢成長、或者是快速成長。即使因為某些因素讓我們不能夠準備有實際上線時的機器 數,但是透過分析 scale factor ,我們可以預估上線該使用多少機器。

Stress Test

與 Performance Test 不同的是, Stress Test 是看,應用程式在極度的狀況下,是怎麼樣反 應的,是全面性的停止服務,還是仍能正常處理部份的 request ,其餘未能處理的部份是被堆到排 程中等待處理、還是直接收到錯誤訊息告知服務暫時不可用。而當這極端的狀況停下來後,應用程式 是否會自行回復正常,還是需要 重新啟動才能回復正常。

Stress Test 的方式是直接把 performance test 中 處理量 開始反轉的那個點的流量再 次倍增,看處理量及反應時間會如何的變化。

Longevity Test

Longevity Test 跟前述兩者不同的是, Longevity Test 是在看,應用程式對長時間、持續性、 平緩的流量是如何反應的,是否在某個時間點會有不良的反應,或者是說會有處理量越來越少或反應時 間越來越長的跡象。

透過 Longevity Test 我們可以發覺 - 應用服務是否有 memory leak - 是否有背景的排程工作,造成某個時間點系統資源會被大量耗用

Monitoring

除了從應用程式的外部去觀查應用程式的執行效能,我們還可以更進一步的

  • 從作業系統了解CPU, Memory, Disc I/O, and Network Usage
  • 從JVM 了解 Memory Usage, Object counts, GC Cycles.
  • 從應用程式自訂的 metrics 來觀察 request count, method call time, size of queue, cache hit/miss ratio, etc…

透過截取、記錄、追縱這些數據,搭配上 Performance Test 產生的圖表,我們可以再進一步的了解 應用程式耗用了多少的資源,應用程式的 bottleneck 在那,是 cpu bound, memory bound, disc io bound or network bound.

Note

不要小看了這些監控的細節,我的某個程式,有少量用到 ActiveMQ, ZooKeeper 做溝通,照理說 應該是 cpu or memory bound的應用,但是經過幾次 load test ,發覺不管加多少機器,總處 理量還是不變。後來翻了翻OS來的數據才發現,整個環境的網路頻寬被限制在 250M bps ,再進一 步了解到原來 Amazon EC2 的 Load Balancer 把流量限制在這個數字。

監控的工具有很多種,不過脫不了兩大類 - 監控系統用量 如: Munin, Graphite, RRDTools - 監控系統異常 如: Nagios

這些監控工具,我們不只是可以套用在 Load Test 時使用,更該被大量的佈建在監控線上的應用程 式及外部的服務上,透過長期的追縱,我們可以了解到應用程式及外部的服務的可用度及可靠度。

(全文完)

軟體品質指標系列(二):軟體品質指標有那些

接下來,我們就來探討,軟體品質指標有那些種類

可用度 Availability

一個服務的可用度,是由很多環節組合起來的,某些環節是掌控在開發者手中,某些環節則是 在使用者手中的;對使用者來說,一個服務的可用度,是由底下幾個環節的總合:

  • 軟體本身的可用度
  • IT/Cloud 架構本身的可用度
  • 服務提供者端的網路環境
  • 外部服務的可用度
  • 使用者端的網路環境

在計算 Availability 時,是把上面幾個因素相乘起來,便是你服務的可用度,假設你的IT架構 本身只能提供 99.99% 的可用度,那你的服務本身的可用度將會小於 99.99% 。當然,你可以透 過 Multi-Host 的方式,將服務布建在多個不同 Data Center,此時你的服務可用度,將可超 過單一平台可以提供的可用度。

假設 Cloud A 可以提供的可用度是 99% , Cloud B 可以提供的可用度是 98% ,而你將服 務同部布建在兩者之上,那麼兩者一起停機的機會是 (1-99%)*(1-98%) = 0.02% ,因此可靠 度提昇為 99.98% 。

然而,為了提高可靠到而佈建到 Multi-Host 時,不只要把 Presentation Layer / Frontend 放 到多個平台上面去,同時,使用者的資料也要同步到多個平台去,這樣,才能夠必免單一IT平台停機的 問題。但是這麼一來,當停機的平台再次上線時,如何處理資料的不一致問題,便成了另一難題。因此 ,多數的服務提供者只會把服務佈建在單一平台上。

服務提供者端的網路環境

目前,多數的雲端服務商仍是把主機設立在海外,在台灣沒有 Data Center ,因此,當台灣對外的海 纜出了問題時,雖然服務本身在海外仍是可用的,但是在台灣就連不上線,這一點,也是台灣的開發者 在選用雲端平台時需要考慮進去的。

過去幾年來,台灣對外的海纜,幾乎每兩年都會斷線超過一天。

  • 2001/02/09 中美海纜遭魚船勾破,數據傳輸容量遽減至原來的四成。HiNet, SeedNet, TaNet 均受影響
  • 2003/10/02 中美海纜遭魚船勾破
  • 2006/12/26 恆春地震造成四條國際海纜中美海纜、法新歐亞三號、亞太光纜一號二號網絡斷線, 影響到台灣對中國、香港、新加坡等東南亞國家和地區,以及上述地區對外的國際通信。耗時三週修複。
  • 2009/08/13 莫拉克颱風造成對外海纜斷線,影響台灣往中國大陸汕頭及東南亞地區,包括新加坡、菲律 賓及香港等地之通信。影響時間超過一週
  • 2010/03/04 高雄甲仙地震造成中美海纜斷線,影響時間超過一個月
  • 2011/11/14 ~ 2012/21/26 海纜進行維修,將造成HiNet連線中國之部份連網服務傳輸延遲時間變長。

若是你服務的對像是以亞洲為主,還是建議放在台灣或日本,若是服務對像以全球為目標,建議還是就放在美國吧。

外部服務的可用度 / SOA in Reality

/images/2012-05-22/cloud-farm.jpg

在幾年前,流行的系統架構是 SOA ,讓每一個團隊專注在單一面向功能的服務,將服務透 過SOAP/Restful API供內部及外部的使用者來使用。這樣的好處是,可以增快服務更新的速度減少 測試的範圍,在組織上,也可讓組織將重點放在核心服務上,而將其它的需求委由外部服務處理。

如此一來,對使用者來說,多重 SOA 子服務組合而成的服務,看起來就像是在雲端運行的一台大電 腦,跟舊式的服務並沒有差異。

/images/2012-05-22/cloud-server.png

然而經過幾年的經驗後,大家發現實際上的經驗並不是那麼的美好;如同開頭所說的,服務的可用度, 是所有子服務的總合,單一服務的問題,將會把整體的可用度都拉低。

在我的實務經驗上, SOA 架構,反而變成無止境的 finger pointing ,最後變成,需要對所有外 部服務的可用度做長期的追縱,評估他實際上的可用度。另外,在委外給外部服務時,只會把可以離線 處理的服務委外給外部服務,需要線上即時處理的,還是自行開發較好。

/images/2012-05-22/finger-pointing.png

Service Level Agreement / Term of Service

TBD

可維護性 Manageability

可維護性指的是,一個服務在運行時,留下了多少的資料,供 admin 做為判斷服務狀況的依據。一 個線上的服務,除了基本的 log 檔外,還需提供底下的特性

  • Configurability: 提供設定檔,讓 admin / tester 能夠在不修改程式碼本身,而對軟體的 功能做調整。
  • Monitorability : 提供開放性的介面,讓 admin 可以很容易的去追縱服務本身的建康狀態及用 量,供 admin 來判斷,是否需要增加硬體的來分散線上伺服器的負擔。

Monitoring is Must

在提供一個服務時,我們一定要監控服務本身的狀況,並且要把這份資料與外部分享,減少本身溝通的 成本及增加客戶對服務的信任。

不管是設計多麼精良的服務,各個軟體平台商仍免不了會發生大型的災難,如:

  • Amazon AWS 於 2008, 2010, 2011 各發生過數小時的問題,造成 EBS 使用上的問題,導 制 reddit, zynga 等下遊廠商的服務問題
  • Google Gamil 於 2009 年發生過大形的停機,耖造成拳體用戶無法使用 Gmail 兩個半小時。
  • Azure 於 2012/02/29 ,由於時間同步的問題,造成用戶無法開啟新的 VM instance.

Performance

效能,算是個最常用的品質指標了,在講效能時我們通常會分成兩塊來講,一塊是反應時間(Response Time), 另一塊是處理量(throughput),前者只的是服務單一需求所要花的時間,後者則是單位時間內,可同 時服務的總需求數量。

影響效能的因素有很多,在多數的系統中,往往處理量增加的結果就是反應時間也一起增加,因此在系 統設計的初期,這些就要納入規格中,變成設計的一環;那麼,什麼是好的效能呢?在反應時間這邊有 個很好的指標,就是跟人互動的UI程式,如果反應時間能壓在 100ms 以下,那麼人們的認知就會覺的 這個程式是即時使用的,如果反應時間拉到 200ms 以上,就會開始覺得頓頓的。

對於網頁程式來說,反應時間不只是從 Web Server 這一端出去的時間,還要算上網路傳輸的時間以 及流覽器繪製畫面的時間,因此能留給網站主機的反應時間只剩下 50ms 上下;如果你的程式還有呼叫 其於的外部程式,那麼每多一層,反應時間至少增加 20ms ,或者是說少 20ms 給你的程式使用。

Reliability

可靠度(Reliability)跟可用度(Availability)是兩個常被弄混的名詞,Availability講的是, 在一段長的時間內,系統可被使用的時間(Up Time),Reliability則講的是在單位時間內,系統出 錯的次數。一個線上服務,可能是可用的(Available)但是他的輸出結果是有問題的(Unreliable)

Reliability 的評量指標有

  • 單位時間內,回傳錯誤訊息或者是無回應的次數。
  • 單位時間內,輸出結果是錯誤的次數。

Reliability的問題,往往是系統內的 Bug ,只是在使用量大時,才會凸顯他的存在,因此軟體開發 者不可忽略這微小的訊號。

Scalability

Scalability 講的是你的服務能不能夠隨著用戶數的增長,而跟著成長,服務的可延展性可以分為 , 直向擴充 Scale Up橫向擴充 Scale Out ,兩者都可以幫你增加可同時服務的 線上用戶數,但是後者的可擴充的程度,仍是遠高過前者的。

Scalability 讓我們動態的依事件配置不同數量的伺服器數量,以防止 Slashdot Effect 的發生 ,如總統大選或報稅截止日等事件,往往會在一日內擁入超過平日百倍的用戶數,如何讓服務正常運做 ,是軟體品質極重要的一環。

而 Scalability 對 Application Service Provider 更是重要,因為 Cloud Based ERP or CRM 系 統,較一般的網頁更有黏著性,一個有數萬個用戶的中型網站,同時間上線的用戶可能只有數百個,而 於他們的 Session 中存的資料也只有數 kilo bytes。然而對企業用戶來說 Cloud Based ERP, CRM , 在上班時間,可是一直在使用的,一間 200 人公司用的 ERP 系統,可能就會有 200 active sessions ,每 個 Session 又存了許多正在處理中的表格資料。

如何 scale up/out database intensive application 變成 ASP 提供者不得不面對的難題。

Security

網路服務的安全性問題,常常是個被誤會且忽略的問題,台灣許多主流的 Hosting Company 及 Billing Provider 的網站都出 過問題;他們的問題是,某本常用的軟體手冊的程式碼,在教導如何實作使用者認證時,只有在首頁 有檢查帳號,等帳號檢查過了,就會被導到內部的網頁去,然而,在內部網頁這邊,卻沒有檢查使用 者有沒有登入。

變成,只要得知內部網業的網址(form target)就可以直接存取內部的資料,因此,許多網站就 被 Search Engine Crawler 長驅直入,把所有的使用者資料都放在搜尋結果中公開了。至於實際 的例子我就不多舉了。

另外我有寫過一篇 專文 討論,該如合保護一個 WebService ,台灣的政府及民間網站,常常 用一些沒有被認可的電子簽證來做 https 的安全通訊,然而 https 的金鑰交換,在第一次是不安 全的,因此,若是你的金鑰是沒有經過第三方簽核的,那麼再交換金鑰時可能會受到 Man in the middle attack 直接把金鑰換掉。

使用 https 時,必需遵守幾個使用規範,這樣子通訊才會是安全的。

回到正題,服務的安全性可以分為幾塊

  • 通訊的安全性。
  • 身份的認可查核
  • 資料的安全性及使用權限
  • 資料的使用及變更的計錄追蹤

待續

在下一篇,我將講述,如何把這些概念,轉換成實際的數值指標。