2019年4月21日

OpenMP による C++ コードの並列化

Abstract
 並列化の方針には,1) 1タスクあたりの実行時間を削減する方針 (ストロング・スケーリング [1]) と,2) 1タスクの実行時間はそのままだが,多数のタスクでも1タスクの時と同じ実行時間で処理する方針 (ウィーク・スケーリング [1]) の 2 種類がある.OpenMP は「OpenMP が使用できない環境では無視されるディレクティブ [2]」を用いて,for 文単位で並列化を行うため,ストロング・スケーリングに向いているが,実際には,並列化をタスク単位に調整すれば,ウェーク・スケーリングに利用できる.
 並列化を行う場合に,まず思い浮かべるのは,スレッドを生成しての並列化である.C++11 からは std::thread が標準化されているため,Windows か Linux かを意識せずに並列化できる.しかしながら,単純にタスクをスレッド数で割って処理させると,実行時間の不均一性から,処理の後半でプロセッサに空きができるなど問題がある.プロセッサを遊ばせないためには,タスクを一度スレッドプールに集約し,スレッドプールから,各 CPU に処理を振る必要がある.このような煩雑な処理の実装は,非常に骨の折れる作業である.このような状況に際しても,OpenMP は,ディレクティブ 1 行で並列化され,schedule(guided) オプション1つでスレッドプーロを生成する.
 実際に並列化が必要とされる状況について,例えば,多量のデータを扱う場合,CSV 等からタスクの一覧を読み込み,タスクごとに処理することは多い.これは,アムダールの法則 [3] を考えた場合,処理の単位が大きく並列化不能な箇所が少ないため,並列度を上げて実行時間を削減し易い.また,実装に際しても,大きな単位でのみ並列化の面倒を見ればよいため,実装の負担が少ない.
 本投稿では,OpenMP の簡単な実装例/実行例を示した後,多数のグラフプロットを必要とする GIF 動画の生成について,OpenMP を用いた並列化を行う.
How to use OpenMP
  ここでは,OpenMP の簡単な使い方を示すとともに,イベントプールが機能していることを,実験的に確認する.

 この投稿における計算機環境は,
・Ubuntu 16.04 LTS
・AMD Ryzen7 1700 (8C/16T)
・DDR4-2666 32GB
・gcc version 5.4.0 20160609 (Ubuntu 5.4.0-6ubuntu1~16.04.11)
・Python 3.6.4 :: Anaconda, Inc.
である.

まず,サンプルコードと実行結果を示す.

./main.cpp
#include <sstd/sstd.hpp>

std::string worker(const uint i, const std::string& str){
    uint64 n=0;
    uint64 loopNum = std::pow(1.3, i);
    for(; n<loopNum; n++){
        n++;
    }
    return sstd::ssprintf("%u%s %llu", i, str.c_str(), n);
}

int main(){
    printf("■ measureTime_start---------------\n\n"); time_m timem; sstd::measureTime_start(timem);
    
    uint size = 100;
    std::string str=":";
    std::vector<uint> vecIn(size); for(uint i=0; i<vecIn.size(); i++){ vecIn[i]=i; }
    std::vector<std::string> vecOut(size);
    
    #pragma omp parallel for schedule(guided)
    for(uint i=0; i<vecIn.size(); i++){
        vecOut[i] = worker(vecIn[i], str);
    }
    
    sstd::printn(vecOut);
    
    printf("\n");
    printf("■ measureTime_stop----------------\n");
    sstd::measureTime_stop_print(timem);
    return 0;
}
実行結果 (OpenMP 並列化済み)
$ ./exe 
■ measureTime_start---------------

vecOut = ["0: 2" "1: 2" "2: 2" "3: 2" "4: 2" "5: 4" "6: 4" "7: 6" "8: 8" "9: 10" "10: 14" "11: 18" "12: 24" "13: 30" "14: 40" "15: 52" "16: 66" "17: 86" "18: 112" "19: 146" "20: 190" "21: 248" "22: 322" "23: 418" "24: 542" "25: 706" "26: 918" "27: 1192" "28: 1550" "29: 2016" "30: 2620" "31: 3406" "32: 4428" "33: 5756" "34: 7482" "35: 9728" "36: 12646" "37: 16440" "38: 21372" "39: 27784" "40: 36118" "41: 46954" "42: 61040" "43: 79354" "44: 103160" "45: 134106" "46: 174338" "47: 226640" "48: 294632" "49: 383022" "50: 497930" "51: 647308" "52: 841500" "53: 1093950" "54: 1422136" "55: 1848776" "56: 2403410" "57: 3124432" "58: 4061762" "59: 5280290" "60: 6864378" "61: 8923690" "62: 11600798" "63: 15081036" "64: 19605348" "65: 25486952" "66: 33133038" "67: 43072948" "68: 55994834" "69: 72793284" "70: 94631268" "71: 123020648" "72: 159926844" "73: 207904896" "74: 270276366" "75: 351359276" "76: 456767058" "77: 593797176" "78: 771936328" "79: 1003517226" "80: 1304572396" "81: 1695944114" "82: 2204727348" "83: 2866145552" "84: 3725989218" "85: 4843785982" "86: 6296921778" "87: 8185998310" "88: 10641797804" "89: 13834337146" "90: 17984638288" "91: 23380029776" "92: 30394038708" "93: 39512250320" "94: 51365925418" "95: 66775703042" "96: 86808413954" "97: 112850938142" "98: 146706219584" "99: 190718085458"]

■ measureTime_stop----------------
--------------------------------
 Execution time:    31. 670 sec
--------------------------------
実行結果 (並列化なし)
$ ./exe 
 --- Omission ---
--------------------------------
 Execution time:   123. 537 sec
--------------------------------
システムモニター (CPU 使用率の履歴) (OpenMP 並列化済み)
 まず,./main.cpp について説明する.まず,タスク一覧に見立てた vecIn に数値をセットし,行いたい処理に見立てた worker が処理を実行し,最後に結果一覧に見立てた vecOut へ値を返している.worker は,入力した数値に応じた回数だけ変数 n を加算し,実行時間を稼いでいる.また,std::string 等のオブジェクトが受け渡せることや,多数の引数を処理できることを確認した.OpenMP による並列化は,#pragma omp parallel for schedule(guided) で指定されている.このディレクティブは,直下の for 文を並列実行する.これにより worker は,デフォルトでは理論プロセッサ数分だけ並列に実行 [7] される.これは,コンパイル時に GCC へ -fopenmp オプションを渡すことで有効化される.-fopenmp オプションを渡さない場合,このディレクティブは無視されるが,warning は発生するため,これを回避するには,
#ifdef _OPENMP
#pragma omp parallel for schedule(guided)
#endif
とすればよい.
 実行時間については,並列化の有無により 123.537 / 31.670 ≒ 3.9 倍 高速化されたことが分かる.これは,物理 CPU が 8 コアあることを考えれば,8 倍まで高速化してほしいところであるが,CPU 使用率の履歴から分かるように,各タスクの粒度が大きく異なるため,一番大きなタスクの実行時間以下には高速化されないことが分かる.同様に,CPU 使用率の履歴には,一度下がった CPU 使用率が再び上昇するパターンを含んでおり,イベントプールが機能していることを確認できる.
A little complex sample with OpenMP
  ここでは,5 mm/sec の位相速度を持つ sin 波の物理位置 [0, 60] mm の波について,波長を 10 mm とした際の 1 周期分の時間発展 [0, 5) sec を C++ で解き,matplotlib を用いて描画する.これは,パラメータ調整やイベント解析における,処理やプロットの負担が重いことを模擬したサンプルコードとなっている.これを OpenMP により並列化する.ただし,この波は,サンプルコード作成のために設定された仮想的な波であり,物理的に実在しうるスケールを考慮していない.また,実際に本サンプルコードと同様に数値計算負荷の軽い処理を行うだけであれば,全て Python コードで実装し,並列化には multiprocessing モジュールを用いるほうが素直である.
 以下,実装を示す.

./pyFuncs.py
import numpy as np

#--------------------------------------------------------------------------------------------------------

import matplotlib as mpl        # Not to occur "QXcbConnection: Could not connect to display"
mpl.use('Agg')                  # Not to occur "QXcbConnection: Could not connect to display"
import matplotlib.pyplot as plt # Not to occur "QXcbConnection: Could not connect to display"
import matplotlib.ticker as tick

def vec2graph(writeName, vecX, vecY):
    plt.clf()
    fig = plt.figure(figsize=(8, 2.5)) # 3*5, 1*5 # アスペクト比の設定
    ax1 = fig.add_subplot(111)
    ax1.plot(vecX, vecY, color='k', linewidth=0.5)
    
    title  = ""
    ax1.set_title(title)
    
    ax1.grid(which='minor', linewidth=0.5, linestyle=':',  color='gainsboro')
    ax1.grid(which='major', linewidth=0.5, linestyle='-',  color='silver'    )
    
    ax1.tick_params(pad=5, which='major', direction='in', bottom=True, top=True, left=True, right=True, length=4) # 軸の余白 # which: major tick と minor tick に対して変更を適用 # tick を内側方向に # tick を bottom, top, left, right に付加 # tick width # tick length
    ax1.tick_params(pad=5, which='minor', direction='in', bottom=True, top=True, left=True, right=True, length=2) # 軸の余白 # which: major tick と minor tick に対して変更を適用 # tick を内側方向に # tick を bottom, top, left, right に付加 # tick width # tick length
    
    ax1.set_xlabel("Physical place [mm]")
    ax1.set_xlim(0-1, 60+1)
    ax1.xaxis.set_major_locator(tick.MultipleLocator(5))
    ax1.xaxis.set_minor_locator(tick.MultipleLocator(1))
    
    ax1.set_ylabel("Amplitude [mm]")
    ax1.set_ylim(-11, 11)
    ax1.yaxis.set_major_locator(tick.MultipleLocator(5))
    ax1.yaxis.set_minor_locator(tick.MultipleLocator(1))
    
    plt.savefig(writeName, bbox_inches="tight") # , dpi=100

#--------------------------------------------------------------------------------------------------------

import glob
from PIL import Image, ImageDraw
def imgDir2GIF(savePath, imgDir, frameRate):
    vecImgPath = sorted(glob.glob(imgDir+"/*.png"))
    vecImg = [Image.open(path) for path in vecImgPath]
    vecImg[0].save(savePath, save_all=True, append_images=vecImg[1:], optimize=False, duration=int(1000/frameRate), loop=0) # duration [milli sec/frame], frameRate [frame/sec]

#--------------------------------------------------------------------------------------------------------
./main.cpp
#include <sstd/sstd.hpp>

std::vector<double> gen_vecSin(double sample_per_mm, double A, double k, double x_begin, double x_end, double omega, double time){
    std::vector<double> vecSin( (uint)sample_per_mm*(x_end-x_begin) + 1 );
    for(uint i=0; i<vecSin.size(); i++){
        double x = x_begin + (double) i / sample_per_mm;
        vecSin[i] = A * std::sin( k * x - omega * time );
    }
    return vecSin;
}

void plot_vec(const char* tmpDir, const char* fileName,
              const char* savepath, const std::vector<double>& vecX, const std::vector<double>& vecY){
    
    sstd::c2py<void> vec2graph(tmpDir, fileName, "vec2graph", "void, const char*, vec<double>, vec<double>");
    vec2graph(savepath, vecX, vecY);
}

int main(){
    printf("■ measureTime_start---------------\n\n"); time_m timem; sstd::measureTime_start(timem);
    
    const char* tmpDir     = "tmpDir";   // tmp dir for c2py
    const char* fileName   = "pyFuncs";  // name of python file
    const char* tmpDir_sin = "./tmpSin"; // tmp dir for sequential img files
    sstd::mkdir(tmpDir_sin);
    
    //---
    
    double frameRate = 25;      // frame rate [frame/sec]
    double Vp =  2;             // phase velocity [mm/sec]
    double lambda = 10;         // wavelength [mm]
    double sample_per_mm = 5;   // physical place resolution
    
    double A = 10;              //   amplitude [mm]
    double k = (2*M_PI)/lambda; // wave number [mm]
    double T = lambda/Vp;       // Vp == lambda / T == k / omega
    double omega = (2*M_PI)/T;  // angular frequency
    
    double x_begin=0, x_end=60; // 0 to 60 mm of x axis.
    
    double timeLength = lambda/Vp;
    uint flameNum = frameRate * timeLength;
    
    // generating a specific physical place of sin wave.
    std::vector<std::vector<double>> vvecSin(flameNum);
    #pragma omp parallel for schedule(static)
    for(uint i=0; i<flameNum; i++){ // i is Correspond to time variation
        vvecSin[i] = gen_vecSin(sample_per_mm, A, k, x_begin, x_end, omega, i/frameRate);
    }
    
    uint xNum = (uint)sample_per_mm*(x_end-x_begin) + 1;
    std::vector<double> vecX(xNum); for(uint i=0; i<vecX.size(); i++){ vecX[i]=(double)i/sample_per_mm; }
    
    // conversion of vecSin to graph img
    #pragma omp parallel for schedule(guided)
    for(uint i=0; i<flameNum; i++){
        plot_vec(tmpDir, fileName, sstd::ssprintf("./%s/sin_%04u.png", tmpDir_sin, i).c_str(), vecX, vvecSin[i]);
    }
    
    //---
    
    // convert imgs to GIF
    const char* savePath = "./sin.gif";
    sstd::c2py<void> imgDir2GIF(tmpDir, fileName, "imgDir2GIF", "void, const char*, const char*, const double");
    imgDir2GIF(savePath, tmpDir_sin, frameRate);
    
    //---
    
    sstd::rm(tmpDir    );
    sstd::rm(tmpDir_sin);
    
    printf("\n");
    printf("■ measureTime_stop----------------\n");
    sstd::measureTime_stop_print(timem);
    return 0;
}
実行結果 (OpenMP 並列化済み)
$ ./exe 
■ measureTime_start---------------


■ measureTime_stop----------------
--------------------------------
 Execution time:    11. 565 sec
--------------------------------
実行結果 (並列化なし)
$ ./exe 
■ measureTime_start---------------


■ measureTime_stop----------------
--------------------------------
 Execution time:    86. 402 sec
--------------------------------

 まず,pyFuncs.py について説明する.vec2graph 関数では,入力されたvecX, vecY を元に,グラフを描画し,writeName に保存する.この関数は,C++ から呼びだされ,連番グラフ画像を生成する.imgDir2GIF 関数は,imgDir に保存された連番画像を glob により全てリストに読み込みソートした後,frameRate [frame/sec] に基づき GIF 動画化する.このとき,frameRate は 25 [frame/sec] を下回るとチラつき易くなる.imgDir2GIF 関数もまた C++ から呼び出される.
 次に,main.cpp について説明する.ここでは,まず,波形の生成を行っている.波形は,
\begin{align*} f(x,t) &= A\sin\left(\frac{2\pi}{\lambda}x - \frac{2\pi}{T}t \right)\\ &= A\sin(kx-\omega t) \end{align*} で表される.ただし,波数 $k=\frac{2\pi}{\lambda}$,各周波数 $\omega=\frac{2\pi}{T}$ である.また,$x$ は波形の物理位置,$t$ は時間発展,$\lambda$ は波長,$T$ は周期を,それぞれ示す [4]
 また,並列化については,単純な処理の分配を,#pragma omp parallel for schedule(static) として,スレッドプールが必要な処理を #pragma omp parallel for schedule(guided) として記述した. オプションは staticdynamicguidedruntime があり,static はタスク数で分割,dynamic は処理がなくなる度に,全てスレッドプールから一つ一つ処理を分配する.guided はこの中間で,ある程度の個数の処理をまとめて worker に投げ,処理が終わった順に新しい処理を分配していく.runtime は環境変数により,上記 3 種類の中から処理を指定する.(これらオプションに関するの出典は[5].)
 実行速度について,86.402 / 11.565 ≒ 7.47 倍 となっており,8 コアであることをや,連番画像から GIF への変換がシングルスレッドであることを考慮すれば,十分な結果といえる.

 最終的な生成物は,
である.この GIF は PNG 形式の連番プロット画像 125 枚を繋ぎあわせて作成され,8.0 MB の容量を持つ.
Summary
  本投稿では,OpenMP を用いた簡単な計算例と,多数の連番画像からグラフプロットを行う例を示した.
Source code
本投稿で提示したコードの URL を下記に示す.
github.com/admiswalker/blog_2019_0421_OpenMP
References

0 件のコメント:

コメントを投稿