Last week, I was working on importing a large input file into the system. Part of the process involved with
reading a large file (~10GB) from remote server and them sort the inputs locally.
At that moment, I decided to give Scala’s new Future API a try. Within one day, I wrote a parallel external
merge sort that can
Read the file from remote server and split it into smaller chunks.
Use in-memory quicksort to sort each chunk and then save chunks into files.
Perform merge sort on the files generated in the previous step.
The most amazing thing is that multiple threads could work on these tasks simultanously. While one thread is busy
reading bytes from remote server, other threads would perform quicksort on the part that has already been read. Once,
all of the data has been written to file system. Another thread will start to merge sort these files.
The above code will turn a InputStream into a Stream[Int] , and then allow us to perform operation on it
without having to fully read it into memory first. But still, this is a huge Stream. Because we are going to use
in-memory sort, I need to split it into smaller pieces first.
The following code will lift a Stream into a Stream of Stream. Again, this operation does not require read the whole
original stream into memory. All the operation only happens logically.
1234567891011121314151617181920
/** * Lift a Stream into a Stream of Stream. The size of each sub-stream is specified by the chunkSize * @param stream the origin stream. * @param chunkSize the size of each substream * @tparam A * @return chunked stream of the original stream. */privatedeflift[A](stream:Stream[A],chunkSize:Int):Stream[Stream[A]]={deftailFn(remaining:Stream[A]):Stream[Stream[A]]={if(remaining.isEmpty){Stream.empty}else{val(head,tail)=remaining.splitAt(chunkSize)Stream.cons(head,tailFn(tail))}}val(head,tail)=stream.splitAt(chunkSize)returnStream.cons(head,tailFn(tail))}
Perform Quick Sort
After we have split InputStream into Streams, we can start to read each sub-stream into memory and perform
quicksort on them.
1234567891011121314151617181920212223
vallinesStream:Stream[Stream[Int]]=lift(soure,chunkSize)valchunkCounter=newAtomicInteger(0)valsortedFileDir=Files.createTempDir()sortedFileDir.deleteOnExit()// read source stream, read n entries into memory and save it to file in parallel.valfileFutures:List[Future[File]]=linesStream.map(s=>{valchunk=chunkCounter.getAndIncrementFuture{valsorted=s.sortedvalret=newFile(sortedFileDir,"%d".format(chunk*chunkSize))valout=newPrintWriter(ret)try{sorted.foreach(out.println(_))}finally{out.close()}ret}}).toList
Perform Merge Sort
Because I want to perform mergesort on all of results, I have to turn List[Future[File]] into Future[List[File]]
first. So that I can instruct the Future to do merge sort once it has all the pieces.
valsaveTmpFiles:Future[List[File]]=Future.sequence(fileFutures)valret:Future[File]=saveTmpFiles.map{files=>{varmerged=fileswhile(merged.length>1){valsplited=merged.splitAt(merged.length/2)valtuple=splited._1.zip(splited._2)valm2=tuple.map{case(f1,f2)=>{valret=newFile(sortedFileDir,f1.getName+"-"+f2.getName)valsource1=Source.fromFile(f1)valsource2=Source.fromFile(f2)valout=newPrintWriter(ret)try{valstream1=source1.getLines().toStream.map(_.toInt)valstream2=source2.getLines().toStream.map(_.toInt)merge(stream1,stream2).foreach(out.println(_))ret}finally{out.close()source1.close()source2.close()FileUtils.deleteQuietly(f1)FileUtils.deleteQuietly(f2)}}}merged=if(merged.length%2>0){m2:+merged.last}else{m2}}merged.head}/** * Merge two streams into one stream. * @param streamA * @param streamB * @return */privatedefmerge[A](streamA:Stream[A],streamB:Stream[A])(implicitord:Ordering[A]):Stream[A]={(streamA,streamB)match{case(Stream.Empty,Stream.Empty)=>Stream.Emptycase(a,Stream.Empty)=>acase(Stream.Empty,b)=>bcase_=>{vala=streamA.headvalb=streamB.headif(ord.compare(a,b)>0){Stream.cons(a,merge(streamA.tail,streamB))}else{Stream.cons(b,merge(streamA,streamB.tail))}}}}
Give this Method a Pretty Face.
So how does the method signature of this parallel external merge sort look like?
In fact, it is quite simple. It takes an InputStream and returns a Future[File]. So that, everything
happens asynchronously, nothing blocks the main thread. You can send an inputStream to this method, go to do other
things first and then come back to wait for the result.
Limits Number of Threads Running at the Same Time.
Because this parallel external mergesort is an IO and memory intense operations, we can not run too many of it
simultaneously. We must put a constraint on the number of threads it can use at a time. Otherwise, we may receive
OutOfMemoryError or having many threads writing to disk simultaneously.
Also, this constraint must be a global constraint. No matter how many requests has been sent to this method at the
same time, it should only use up-to N threads.
Luckly, this is quite easy to do with Scala’s Future API. All we need to do is to provide a fixed size thread pool
for this method. So that it won’t spawn new thread by itself, instead, it uses threads provided by this global thread
pool.
12345678910111213141516
/** * limits number of reading and sorting can be executed simultaneously. Because this is an IO * bound operation, unless the inputstream is coming from a slow http connection, otherwise, 5 * is more than enough. */privatevalGLOBAL_THREAD_LIMIT={valret=Runtime.getRuntime.availableProcessors()/2if(ret>5){5}else{ret}}privatelazyimplicitvalexecutionContext=ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(GLOBAL_THREAD_LIMIT))
importcom.google.common.io.Filesimportjava.io.{PrintWriter,File,InputStream}importjava.util.concurrent.Executorsimportjava.util.concurrent.atomic.AtomicIntegerimportorg.apache.commons.io.FileUtilsimportscala.concurrent.{ExecutionContext,Future}importscala.io.SourceobjectInputStreams{/** * limits number of reading and sorting can be executed simultaneously. Because * this is an IO bound operation, unless the inputstream is coming from a slow * http connection, otherwise, 5 is more than enough. */privatevalGLOBAL_THREAD_LIMIT={valret=Runtime.getRuntime.availableProcessors()/2if(ret>5){5}else{ret}}privatelazyimplicitvalexecutionContext=ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(GLOBAL_THREAD_LIMIT))defsort(inputStream:InputStream,chunkSize:Int=2000000):Future[File]={// open source streamvalsoure=Source.fromInputStream(inputStream).getLines().toStream.map(_.toInt)vallinesStream=lift(soure,chunkSize)valchunkCounter=newAtomicInteger(0)valsortedFileDir=Files.createTempDir()sortedFileDir.deleteOnExit()// read source stream, read n entries into memory and save it to file in parallel.valsaveTmpFiles:Future[List[File]]=Future.sequence(linesStream.map(s=>{valchunk=chunkCounter.getAndIncrementFuture{valsorted=s.sortedvalret=newFile(sortedFileDir,"%d".format(chunk*chunkSize))valout=newPrintWriter(ret)try{sorted.foreach(out.println(_))}finally{out.close()}ret}}).toList)// perform merge sort.saveTmpFiles.map{files=>{varmerged=fileswhile(merged.length>1){valsplited=merged.splitAt(merged.length/2)valtuple=splited._1.zip(splited._2)valm2=tuple.map{case(f1,f2)=>{valret=newFile(sortedFileDir,f1.getName+"-"+f2.getName)valsource1=Source.fromFile(f1)valsource2=Source.fromFile(f2)valout=newPrintWriter(ret)try{valstream1=source1.getLines().toStream.map(_.toInt)valstream2=source2.getLines().toStream.map(_.toInt)merge(stream1,stream2).foreach(out.println(_))ret}finally{out.close()source1.close()source2.close()FileUtils.deleteQuietly(f1)FileUtils.deleteQuietly(f2)}}}merged=if(merged.length%2>0){m2:+merged.last}else{m2}}merged.head}}}/** * Lift a Stream into a Stream of Stream. The size of each sub-stream is specified * by the chunkSize. * * @param stream the origin stream. * @param chunkSize the size of each substream * @tparam A * @return chunked stream of the original stream. */privatedeflift[A](stream:Stream[A],chunkSize:Int):Stream[Stream[A]]={deftailFn(remaining:Stream[A]):Stream[Stream[A]]={if(remaining.isEmpty){Stream.empty}else{val(head,tail)=remaining.splitAt(chunkSize)Stream.cons(head,tailFn(tail))}}val(head,tail)=stream.splitAt(chunkSize)returnStream.cons(head,tailFn(tail))}/** * Merge two streams into one stream. * @param streamA * @param streamB * @return */privatedefmerge[A](streamA:Stream[A],streamB:Stream[A])(implicitord:Ordering[A]):Stream[A]={(streamA,streamB)match{case(Stream.Empty,Stream.Empty)=>Stream.Emptycase(a,Stream.Empty)=>acase(Stream.Empty,b)=>bcase_=>{vala=streamA.headvalb=streamB.headif(ord.compare(a,b)>0){Stream.cons(a,merge(streamA.tail,streamB))}else{Stream.cons(b,merge(streamA,streamB.tail))}}}}
現在,這位『網路頑童』再次放出讓人驚歎的承諾:為紐西蘭的每個人提供免費光纖服務。
引用 Dotcom 在 Twitter 上 的話就是:『The new Mega company will be based in
NZ & become it's most valuable IT biz. I will relaunch Pacific Fibre. Free
broadband for all Kiwi's
為什麼 Kim DotCom/Mega 能夠提供免費的網路呢?這要先從國際專線的拆帳機制講起;所謂拆帳就是,從甲地到乙地的線路
,如果是甲地往乙地的流量佔四成,乙地往甲地的佔六成,那麼,這條專線的流量費用,就是六四分,甲付六成乙付四成。
在此順便打個廣告,台灣目前有個 functional programming user group 在運作中,若是各位有性去,可以上網
找 fpug.tw
而另一個可能的解決方案是 Actor Model , Actor Model 將狀態封裝在Actor內部,Actor及Actor間,則是透過
非同步的機制去傳遞訊息。
A Brief History of the Actor Model
在開始介紹 Actor Model 前,先簡單介紹一下 Actor Model 的歷史。 Actor Model 是於 1973 年 Carl Hewitt
命名的,而由 Gul Agha 這位先生於 1985 年左右更精確的描述 Actor Model 。
第一個大型的商業應用,則是由 Ericsson 於 1980 年代中期所完成的一個電信系統,這個系統非常的容錯,當大家在介
紹 Actor Model 時,多會提到這個例子,因為這個系統達到了 99.9999999% 九個九的 uptime ,也就是說
一整年下來,他停機的時間只有 31 ms ,非常的匪疑所思,因為,一般我們在寫的系統,能有兩個九,就已經是很了不
起的成就了。
在Actor Model 中,兩個 Actor 在傳遞訊息時,其實只有把這個訊息的 reference 丟到另一個 Actor 的信箱中,訊
息傳遞的成本非常的小,而 Actor 在被喚起處理訊息時,他的 call stack 是從他的信箱開始,而非整串的呼叫流程,因
此, context switch 的成本變的很小。
在 Actor Model 中,一個工作的處理,會變得很像我們在辦公室內做業的方法,當我們從系統外部收到一個訊息時,我們
會把這個工作連同這個工作的寄送者先記錄下來,放到信箱中延後處理,當前一手完成工作後,運行中的 Actor 會把處理
好的資料轉發給下一手,就這樣一層層轉發,最後在將完成的工作結果,送還給原始的寄送者。
Introduce Akka
接下來的部份,我們要介紹 Akka 這個 JVM 上 Actor Model 的實作。
Akka 是由 Jonas Boner 於 2009 年所發起的,在開發 Akka 之前, Jonas 是在 Terracotta 做 JVM Clustering 以及在 BEA
做 JRockit VM 的,具有多年食作 distributed system 的經驗。目前 Akka 背後的商業公司是 typesafe , typesafe
同時也是開發 Scala 這個 JVM 上最盛行 JAVA.NEXT 語言的公司。
這個實作是跑在 JVM 上,提供了 Java 及 Scala 的 API ,在接下來的簡報中,我會用 Java API 為主 Scala API 為輔
來介紹怎麼使用 Akka。
Akka另外也引入了 remote actor 這個觀念,既然,我們所有的訊息傳遞都是非同步的,透過訊息的傳遞來呼叫,而這訊息本
身,又是不帶狀態,可以被 serialize 的,狀態的本身是存在 Actor 中的,那麼,為什麼我們自然有可能的將工作放到別台
機器上來處理。
除了 Actor Model 外,Akka還提供了許多 module 來跟外部的系統整合,如 akka-camel ,能讓 akka 串接上 camel 這
個 enterprise service bus ,便程它的一個 endpoint ,讓 Actor 能處理 camel 送來的訊息然後再將訊息轉發回 camel 中。
如果說,一個 supervisor 管理的 workers 都是獨立的個體,沒有交互影響的問題,那麼, supervisor 可以
選擇將有問題的 worker 單獨重開機,當一個 actor 被重啟時,他的 actor reference 將保持不變,外面的 actor
仍可以透過原有舊的 actor reference 跟這個重啟後的 actor 溝通。而被重啟的 actor 的信箱,仍會保存的
尚未被處理的訊息在信箱內,而造成錯誤的訊息,則是在重啟時可以決定是否要保留重新處理。
在 Akka 的實作中 Actor 預設就是可以被 Remote 化的, Actor 在設計時,就已經是設計成 location transparent 及
可被配置在本機或是遠端的,一個原本設計成本機用的 Actor ,可以不經任何的程式變化,直接修改設定檔,變成在啟動一個
Actor時,自動就是啟動在遠端的。
而傳遞訊息給 Remote Actor 跟傳給本機的 Actor 是沒有差別的,同樣是透過 tell 及 ask 這兩個 API ,在底層實作上
Actor Reference 會帶有 Remote Actor 實際所在機器的位址,若是一個 Actor 是 Remote Actor 的話,被傳送的訊息
就會透過 Java Serialization, Protocol Buffer Serializer 或者是自定的 Serializer 來將訊息寫入到遠方。
Data Analysis: klout.com 例用 akka 來分析社群網站上如 facebook twitter 上訊息轉發的熱門程度
以及誰是意見領袖等資料。
股票交易系統: Akka 的一大客戶群是倫敦的金融公司,Akka的幾個特性很適合用來做股市交易用。像是用 Actor
來追蹤一個股票的現貨股價變化以及產生各種線型圖;又或者是說,結合 Rule Based Engine 可以做風險控管的
模組。例如說交易員要下單買一支股票時,Akka可以幫你同時檢查,這個交易員他目前手上可用資金的是多少,
交易員目前股票組合的風險是多少,公司目前整體股票組合的風險是多少。
數十至數百個不同規則的檢查是可以透過 remote actor 分散在許多機器上併行處理的,一個交易的風險評估,並不
會隨著檢查的數量而線性增長。
另外一個 Akka 被大量使用的環境是線上遊戲,過去有許多遊戲公司選擇用 MySQL 當後台,然後透過 cache 及 db
sharding的方式,來增加 throughput ,後來他們發現,與其把 server 寫成 stateless 然後把狀態放在後台
的 db 讓 db 變成效能的頻頸,不如把 state 放在 Actor 中,讓每個 Actor 代表一個遊戲中的角色,獨立維護
它的狀態,這樣才能更 scalable.
As some of my blog readers may know, my previous startup project was an online location based search service. Thing
does not go well and I am going to open source some of the related work.
In this post, I will show you how to use Lucene to do spatial search as well as how it works internally.
Spatial Search
Spatial Search and Geo Tagging
Geo-tagging is a popular way to do spatial search. Instead of using the actual location to tag a POI, the geo-tagging
uses geotag to group items within a predefined range as a whole. When performing a search, it is simply pull out all
items with the same tag.
There are few popular ways to do geo-tagging, including
GeoHash
QuadTree
GeoHash
GeoHash is way to encode latitude and longitude using base32 code. There is a fundamental flaw that makes it hard to
do spatial search. The problem is the geo hash generated is not continuous. The neighbor of a particular block may not
share the same prefix with its nearest eight neighbors. When using geohash to do spatial search, we need to calculate
the hash value of nearby neighbors by ourselves.
Quad-Tree addresses the issue in geohash and adapt a better way to do geo-tagging. This example gives us a
great example on how Quad-Tree does geo-tagging.
The Quad-Tree has the following advantage when doing spatial search
it can be really fast if you stores data as a tri. Such as, Lucene, internally, it uses indexed tri to store
NumericField.
the neighbors of a block share the longest common prefix.
For more extensive information regarding geo-tagging, you should check
here,
here and
here.
Our Custom Approach
Lucene’s contrib-spatial uses geo-tagging to implements spatial search and only support searching for
features near a point.
A pair of latitude and longitude gives us a quickstart for your location based application. However, not every
single feature in a LBS application can be described as a point. Let us take school district as an example.
The School A may be closer to your house than School B is, but your house is belong to school district for school B.
Another example is bike trails, bike trail is a line not a single point.
A full feature spatial search must support complicated geometric operations. This is why we create our own Lucene
spatial search implementation.
In the GIS area, Geometry are used to describe shape of features. The common geometries are:
When hashing a Geometry primitive, we will calculate the minimum bounding rectangle(MBR) of this geometry. For
example, the minumum bouding box for is the grid made up of (1, 1), (1, 4), (4, 4), (4, 1).
We will calculate the quadtree hashcode for the MBR, the quadtree hashcode for the MBR is the longest common prefix of
quadtree values for the four corners.
When indexing this geometry with Lucene, we will store these fields in lucene
geometry: POLYGON ((3 1, 1 2, 2 4, 4 4, 3 1))
geometry__quadtree: QuadTree value of the polygon.
Implementation Detail
The number of digits we used in the quad-tree implementation is 17 digits. This allows us to lay items on an 600 x 600
meter block. This number comes from 40075000 / 2 ^16 (Earth Circumference / level of details). The 17 digits number
will be stored as a long value. The digits we used are 1(upper left), 2(upper right), 3(lower left), 4(lower right),
and 0 (whole block in the given detail).
When encoding a coordinate, we will stores the quadtree value as a NumericField in Lucene. Internally, Lucene will use
indexed trie to store NumericField. The "Indexed Trie" uses buckets to group terms. We will use the default
precisionStep(4) for now.
Index Phase
During the index phase, we will store the geometry in its text form and in quadtree value. Each of them has their own
use during search phase.
filter: fetches all possible features that may contains the geometry X by using Lucene’s range query against the
quadtree field.
select: for each possible results, run geometric operation to see if result Y contains geometry X.
123456789101112131415161718
valfield="location"// create a point.valpoint=context.makeShape(Point(24,18))// turn point into search range.valcircle=point.buffer(Distance("500m"))// create a query container for complex queries.valquery=newBooleanQuery()// filtering: search for items in this rangequery.add(QuadTreeRangeQueryFactory.buildLocalQuery(field,circle.quadtree))// selecting and scoring: the value source will return the distance between// the point and each feature.query.add(newValueSourceQuery(context.makeValueSource("location",point));valfounds=indexSearcher.search(query)
Open Source Projects
Scala Shapely
Shapely is Scala binding for JTS Topology Suite. The goal of Shapely is to provide an easy to use factory to
create JTS geometry class instances. It allows us to create various geometry shape with easy to use factory methods.
// create a pointPoint(30,10)// create a line stringLineString((30,10),(10,30),(40,40))// create a polygonPolygon((30,10),(10,20),(20,40),(40,40),(30,10))// create a polygon with a hole in it.Polygon(Seq((35,10),(10,20),(15,40),(45,45),(35,10)),Seq((20,30),(35,35),(30,20),(20,30)))// create a multi-pointMultiPoint((10,40),(40,30),(20,20),(30,10))// create a multi line-string.MultiLineString(Seq((10,10),(20,20),(10,40)),Seq((40,40),(30,30),(40,20),(30,10)))// create a multi-polygon.MultiPolygon(Seq(Seq((30,20),(10,40),(45,40),(30,20))),Seq(Seq((15,5),(40,10),(10,20),(5,10),(15,5)))))
Lucene Spatial
Lucene Spatial is the geo-spatial module for lucene built on the top of shapely. The
lucene-spatial modules uses a SpatialContext instance to encode location fields and to
create location queries.
1234567891011
// create a context.valcontext=newSpatialContext// create a shape from the representation of a geometry.valshape=context.makeShape("Point(0 10)")// create lucene fieldables for a location field.valfields=context.makeFieldables("field",shape).toList// create a location query to look for features within 5KM radius.valquery=context.makeQuery("field",shape,newDistance(5,Distance.Unit.KM))
首先,今天要恭喜 iTunes Stores 台灣版總算上線了,恭喜蘋果搞定眾多台灣商家搞不定的
數位資料授權的問題;既然 iTunes Store 幫忙打通了關節,接下來許多的台灣的小 start-up 可以
透過串接 iTunes Store 的方式來賺取介紹費,又多了許多新的 business idea 可以在台
灣實作的可能。
在開始講怎麼做前,我想先講兩個我親身的經驗,在我第一份工作時,前雇主是做手機遊戲的,整間公
司有兩百個員工,但是只有三個是做Server端的,在 2005 年時,多數遊戲都是單機板的,那時公司
簽下 World Series Of Poker 要做多人的線上遊戲,需要開發一個簡易的配對及遊戲訊息交換的伺
服器,當然,大家當年都沒有經驗,所以說,要怎麼樣確保在上線時,不會被用戶衝垮就變成我的工作。
寫到這邊,大部份的讀者應該都知道接下來要講的是 Load Test 了,Load Test是個很大的議題,通
常我們在講 Load Test 時,常講的其實是三件不同的事
Performance Test
Stress Test
Longevity Test
這三個測試,用的工具雖然一樣,但是要找個指標跟目的卻是大不相同。
Performance Test
Performance Test 的目地是在找出應用程式的 baseline performance ,做為未來績效評估及設計變
革時的依據。
被 Performance Test 的標的物,通常是已經 feature complete 的模組,然後我們將對模組的每
一個功能,一個一個進行黑箱及白箱測試;然後再用腳本的型態,模擬實際上線的流量,再來測試看看
各功能間對效能的互相影響程度;最後,是找到測試標的物的 Turning Point 及 Scale Factor。
尋找 Best Case Performance
尋找應用程式的 best case performance 大概是最常被忽略的事了, best case performance 指的是你
一個功能的最佳效率,不管怎樣測試,對單一功能來說,怎麼樣你的效率都不可能會再比這個數字
好了;這個數字的第一個用處就在這,如果你的最佳效率比目標效率還糟,剩下來的就不用測了,
因為怎樣都會比這個數字還糟,只能先停下來,回去修改程式碼增進效能。
Best Case Performance 的測法是:
將應用程式啟動
先用幾個 request 替應用程式暖身(warm up),以 Java VM 來說,這樣可以讓 Hotspot VM 幫 bytecode 最
佳化,有些需要被啟動的內部原件也會被啟動。