2017年1月25日 星期三

[ Elixir ] Enum, Stream, Flow 有什麼不一樣?



最近看到一篇 Blog ,內容是如何調教 Flow.from_enumerable, 與 Flow.partition 的 :stages 與 :max_demand 參數。

第一篇是版主探討他怎麼用 flow 打造他期望的工作模型。
TUNING ELIXIR GENSTAGE/FLOW PIPELINE PROCESSING

第二篇是版主應回應的要求,教大家怎麼建立第一篇使用的工具。
MEASURING AND VISUALIZING GENSTAGE/FLOW WITH GNUPLOT

該版主用
1. File 作為紀錄時間搓的工具,並以 stage_work_name.log 為檔名。
2. gnuplot 讀取 log 並繪圖。

這種作法很好上手,大概一個小時內你就可以套在自己想要檢測的 flow 上了,其他需要尋找資料流卡點的,也可以套用這手法。

如何 log 與 製作圖表的 blog 包含 elixir code ,可以直接拿來用。所以本文不重複這塊。



推薦大家花一個小時左右的時間,把 log 與圖表那篇實作起來。
圖表工具與檢視併發執行情況的小套路講完了。

回到正題。其實是要嘗試呈現下圖描述的東西。



Elixir 不同的 Collection 的資料流,到底是怎麼一回事呢?上述的行為,跑起來真實樣貌是長怎樣的?



目前 Elixir 內建有 Enum, Stream 兩個處理 Collection 的庫。
另外還有一個最近從 GenStage 拆出來的庫 Flow 。 ( 2017-01-18 ) 分家的版本是 0.11 版。可檢視 0.11 的 change log

首先,先解釋圖表,圖表呈現每筆資料何時進入到不同的 Job 上,並在完工前打卡。 座標分別是 x: time ( ms ) 與 y: counter 。

解讀的方式是,
1. 你可以畫一個橫線。橫線會經過四個,或是更多的符號,該符號代表 該次的 counter 經過特定 job 時打卡的時間。 job1 與 job 2 的間隔,就是打卡的時間差。
也可以只看第一個與最後一個打卡的時間差。可以看出一個 element 通過 pipeline 的效率。

2. 畫一個直線,直線碰到的符號個數,代表該時段有多少個 process 打卡。在本篇可以拿來解讀同時有多少個 process 在工作。

另外我的示範代碼。每個 job 都簡化成一個 花費  1 ms ( 0.001 second ) 的任務。所以對於同一個 counter 而言,圖表上應該沒有機會重疊。

Part 1 & 2   Enum and Stream  


所有代碼的結構如下, job1 到 job4 在 stream, flow 版本不會有變化。然後 pipeline 部分, Enum 會換成 Stream 與 Flow 的版本。後續只會貼上 pipeline 的部分。



@max 25

def plot_enum do
    Progress.start_link([:enum_job1, :enum_job2, :enum_job3, :enum_job4])

    job1 = fn x -> Process.sleep(@sleep_time); Progress.incr(:enum_job1) ; x end
    job2 = fn x -> Process.sleep(@sleep_time); Progress.incr(:enum_job2) ; x end
    job3 = fn x -> Process.sleep(@sleep_time); Progress.incr(:enum_job3) ; x end
    job4 = fn x -> Process.sleep(@sleep_time); Progress.incr(:enum_job4) ; x end

    1..@max
    |> Enum.map(&(job1.(&1)))
    |> Enum.map(&(job2.(&1)))
    |> Enum.map(&(job3.(&1)))
    |> Enum.map(&(job4.(&1)))

    Progress.stop()
  end


然後是 Steam 版本的代碼。

    1..@max
    |> Stream.map(&(job1.(&1)))
    |> Stream.map(&(job2.(&1)))
    |> Stream.map(&(job3.(&1)))
    |> Stream.map(&(job4.(&1)))
    |> Enum.to_list()



我們來看 Enum 與 Stream 的圖表吧。





兩個圖表,花費的時間都差不多,不過這不是本文的重點。原則上不該這樣,因為我的示範資料是很小的,而且不是從外部來的,每個 job 又都要花 1 ms 所以, stream 的其他特性沒辦法發揮。

但我們可以比較 Enum 與 Strem 資料流的特徵。

首先,可以看到, Enum 是把 25 count 都跑完 job1 之後, job2 再從頭 count 1 依序處理到 25 ,然後 job3 與 job4 。 這是 Enum 的特性, list 進 list 出。
對比的 Stream 可以看到, count 1 先獨自跑完 job1,2,3,4 ,然後才換 count 2 跑 這四個 job ,直到 count 25 跑完。

這兩種方式有什麼差別呢?假設job4 處理完後,是大家集合。那 Enum, Stream 就沒差別。

但假設 job 是從叫客戶叫披薩,到快遞的送達的時間,那其實 job1 到 job4 中間越少等待時間越好。這種情境下, Steam 就有優勢。

這種方式就是畫橫線看。一個東西從原料開始,完成組裝,出貨,送達。我們可以看到 Enum 與 Stream 在這邊表現的不同。 Enum 不在乎單個 element 的情況,Enum 總是看整體結果的。而 Stream 是看個體。
圖表也顯示了, Enum 版本的 counter 在每個 job 打卡都隔了很長的時間。我們不太喜歡這樣。Stream 解決了這樣的問題。

這是第一次的進化。 透過 Stream 。

不過圖表還可以檢視出一個狀況, Enum, Stream 都是單個 process 。我們可以畫直線,很明顯的,一個時間點,就只會出現一個打卡符號。所以我們可以更細部的看 Stream 的處理過程。

我們可以觀察 count10 ,你可以發現, 處理 job1 時,其他的 job 都是沒在運作的。 count10 都完工時, count11 才可以啟動。 Stream 似乎有進步的空間。

Part 3 Flow


所以我們引進了 Flow , flow 可以分割 pipe 成為不同的 process 個體。下圖是在 pipe 最上頭分配 4 個 process 的結果。語法是在 Flow 的起點用 from_enumerable( stages: 4 )


     # @max_demand 是 另一個參數,本文目前不探討,但設定太多或太少,會嚴重影響執行的表現。

    1..@max
    |> Flow.from_enumerable(stages: 4, max_demand: @max_demand)
    |> Flow.map(&(job1.(&1)))
    |> Flow.map(&(job2.(&1)))
    |> Flow.map(&(job3.(&1)))
    |> Flow.map(&(job4.(&1)))
    |> Enum.to_list()





我們可以看到,圖表很像 Stream 的版本,不過這次有 4 個 process 同時進行。比起 Stream 版本, 會有 3 個 counter 可以提早完成任務。 效果是根據你有多少 cpu 核心,或啟用多少個 process (語法上用 stages: n ,設定)。

畫橫線看, Flow 與 Stream 取得的結果是一樣的。
但畫直線,可以看到同一個時間點有 4 個 process 一起 做 job1 。 然後一起做 job2 。最後一起處理完 job4 ,之後再從下一輪的 job1 開始。

我們可以觀察到,總時間大概只要 1/3 或 1/4 上下,這就是加 processs 的好處。(補充一點, 這裡有 4 個 process , 每個 process 裡面,都負責處理整套的 job1~job4 。)

這次是第二次的進化, 用 Flow ,獲得多個 process 的效益。

接下來我們再刁鑽一點,先來觀察 flow 的圖表, 每批 process 在處理 job4 的時間差,基本上就是隔了一套 job1 ~ job4  的時間,這個時間差跟 sream 版本依樣。我們可不可以縮短這個時間差呢?

可以,我們用 Flow.partition( stages: n ) 來加 process 。


    1..@max
    |> Flow.from_enumerable(stages: 1, max_demand: @max_demand)
    |> Flow.map(&(job1.(&1)))
    |> Flow.partition(stages: 1, max_demand: @max_demand)
    |> Flow.map(&(job2.(&1)))
    |> Flow.partition(stages: 1, max_demand: @max_demand)
    |> Flow.map(&(job3.(&1)))
    |> Flow.partition(stages: 1, max_demand: @max_demand)
    |> Flow.map(&(job4.(&1)))
    |> Enum.to_list()



首先先說明代碼的變化。上一版本, flow 一開始分配了 stages: 4 。
這一版本,一開始是 stages: 1 ,接著 job1 ,然後在其他 job 之前,用 Flow.partition(stage: 1 ) 切割。
兩個版本都是使用了 4 個 process 。只是分工的方式不同。

舉個簡單的類比:
上一個版本是開了四個工廠,每個工廠只有一個人。 4 process
這一個版本是一個工廠,每道 Job 有一個人負責做事。 4 process

再來看看這個版本的圖。 job4 與 job4 之間的時間差縮短了。


這次我們的 pipeline 終於有個真正的「流水線作業」流程了。

我們再來比較一下兩種拆 process 的不同處。 上一個確實能更提早發貨。核心數越多,就越能享有更早的完工時間。不過另一個觀點是,就拿紡織廠來舉例,以4工廠版本來說,job1在跑的時候, job2,3,4 的機器設備就晾在那邊沒做事了,設備買多餘了。反而是有四個人的那個工廠,才不會浪費機器的生產力,人與設備都滿載。

但我們軟體的設備是函式,似乎沒關係。4 工廠的方式目前來看是更好的拆法。把 兩個 flow 版本重疊, job4 圍出來的每個三角形,就是被提早的時間量。目前來說, partition 沒有帶來新的進化。

但如果這些任務是動畫,那 partition 的版本,會比上一個版本更流暢,這時候用 partition 分配 process 會更好。因為上一個版本的結果,4 個同時完成,畫面只會有一個,反而會造成跳格的視覺感受, 60 fps 變成 15 fps 的體驗。所以要根據我們的需求來分配切割的方式。


Part 4  More Partition


另外一個目前代碼沒有表現出來的是, partition 可以多派幾個 process 來處理複雜的 job 。我們來修改吧。

我們把 job1 後面接上一個 job_spent ,讓他耗時 2 ms。 那麼我們可以在 job1 前的 from_enumerable  上,設定 3 倍的 process 量,來平衡  ( job1 + job_spent = 3 ms )  所消耗的時間 這時候,這邊的 一個 process 要處理 job1, job_spent 兩個工作。

    

     job_spent = fn x -> Process.sleep(2); x end

    1..@max
    |> Flow.from_enumerable(stages: 3, max_demand: @max_demand)
    |> Flow.map(&(job1.(&1)))                #  1 ms
    |> Flow.map(&(job_spent.(&1)))      #  2 ms
    |> Flow.partition(stages: 1, max_demand: @max_demand)
    |> Flow.map(&(job2.(&1)))
    |> Flow.partition(stages: 1, max_demand: @max_demand)
    |> Flow.map(&(job3.(&1)))
    |> Flow.partition(stages: 1, max_demand: @max_demand)
    |> Flow.map(&(job4.(&1)))
    |> Enum.to_list()







上面兩個圖表,可以發現 調整過 job1 的 stages: 3 ,成果是接近上面那個所有階段只需要 1ms 的情況。而沒有 調整 stages 的,因為 第一個 process 每次都必須用掉 3 ms 。 即使 job 2,3,4 都只要 1 ms 就完工了。 但情況是瓶頸卡在第一個 process 的地方。全部完成,耗時是最花時間的那個 job 乘上 list 大小。
spent_time =  length( list ) * jobs_count * max( [ job1, job2, job3, job4 ] )

現在的問題是, job2 每三秒才會收到一個結果,所以有兩秒是閒閒的,連帶的, job3, 4 都是做一秒,休息兩秒的情況,圖表也能觀察出這樣的狀況。job2 做完就沒事了,畫直線看,每 3 秒碰到一次 job2 ,被前面最耗時的卡著了。所以即使知道其他 job 都很快,但反映出來的時效就是最大的,上面的 jobs_count * max( list ) 就是這麼來的。


假設前面那三秒的 job 是不能再切割的,那要怎麼避免 job2,3,4 的閒置呢?

所以要怎麼調節?做法就是把比較耗時的部分,增加更多的 stages 來貼近最不耗時的。最快的就不必加核心了因為快的job 要等慢的,也就是要調整耗時的。其他慢的就是乘上某個倍數。這樣的結果是,我們把 3 ms 產出一個結果的 job1 , 變回每 1 ms 就產出一個結果的 job 。這樣 job 2 就沒有等待 job1 的空閒產生。(這邊有個小狀況,狀況是: job1 一次丟 3 個成品給 job2 , job2 處理一個,有兩個要先等著! job2 屯貨了!不過 job1 有三秒的間隔,所以 job2 在時間內消化完了。)

我們透過 上述 partition 的分配,把比較耗時的 pipeline 變得跟 上一版本的 flow 有一樣的總時效。 這才是多個 partition 切割方式帶來的 power 。 在耗時的 partition 上,分配更多的 process,降低類似 job1,與 job2 這種 pipeline 之間等待的閒置時間。

這是第三次的演進,使用 partition 調節 stages 比例。盡可能減少 pipeline 之間 process 的閒置。用多個 partition 調節整個 pipeline 的瓶頸處。使得 pipeline 運作更均衡順暢。


調節好的話,耗時就會從上一個式子往下面的式子貼近。
spent_time = length( list ) *  jobs_count *  min (  [ job1, job2, job3, job4 ]  )


總結

我們從單 process 的 Enum 處理 list ,對照組,一切的開始。
演化成單 process 的 Stream 操作資料,讓先處理的 element 盡快完成任務,可以先行離開。

第三步,我們用 Flow 創造 n 個 process ,一次用 n 個 process 跑 Stream 的方式, 總體耗時是原來 的 1/n 或 1/(n -1 ) 左右。獲得相當幅度的進步。大部分的 element 也比純 Stream 又更早完成了。

第四步,對付難纏的又不可分割的任務(特別吃時間),我們透過 partition 分配更多的 process ,用來填補後續 job 閒置的計算時間。而總體耗時表現,又能變回第三步那樣的水準。而且每個 生成的 process 都保持在運算的狀態。這一步最大的功勞是擠掉了 process 空閒的時間。 process 是拿來算的,不是拿來閒置的。

以上是本文對 Elixir 資料集處理的特徵整理。希望本文能協助大家學習 Elixir 併發功能的理解與使用。

有錯誤或疑問的地方,歡迎大家回饋。

沒有留言:

張貼留言

歡迎回饋