12.08.2015

Python: How to Execute Parallel Processing using multiprocessing.Pool

Python: multiprocessing.Pool を使った並列処理の実行

 

Python で並列処理を行う場合、自分でスレッドを書くこともできるが、グローバルインタプリタロック (GIL) の制約を受けて意図しない結果に陥ることが少なくない。

multithreading モジュールの Pool を利用するのが最も簡単かつ安全である。

 

基本形

引数を1個取る関数を定義し、Pool#map を実行する。

import time
from datetime import datetime
from multiprocessing import Pool, current_process

def f(x):
    proc_name = current_process().name
    print('(%s) [%s] Started: x=%d' % (proc_name, datetime.now(), x))
    time.sleep(1)
    print('(%s) [%s] Ended  : x=%d' % (proc_name, datetime.now(), x))
    return x * x

pool = Pool(4)
print(pool.map(f, range(8)))

尚、Python 3.4 以降なら with 句を使った表現もできる。

実行結果は以下のようになるはずだ。

(PoolWorker-1) [2015-12-08 01:49:03.666114] Started: x=0
(PoolWorker-2) [2015-12-08 01:49:03.666246] Started: x=1
(PoolWorker-3) [2015-12-08 01:49:03.666400] Started: x=2
(PoolWorker-4) [2015-12-08 01:49:03.666620] Started: x=3
(PoolWorker-1) [2015-12-08 01:49:04.667421] Ended  : x=0
(PoolWorker-3) [2015-12-08 01:49:04.667421] Ended  : x=2
(PoolWorker-2) [2015-12-08 01:49:04.667423] Ended  : x=1
(PoolWorker-4) [2015-12-08 01:49:04.667511] Ended  : x=3
(PoolWorker-3) [2015-12-08 01:49:04.668070] Started: x=4
(PoolWorker-2) [2015-12-08 01:49:04.668151] Started: x=5
(PoolWorker-1) [2015-12-08 01:49:04.668282] Started: x=6
(PoolWorker-4) [2015-12-08 01:49:04.668382] Started: x=7
(PoolWorker-3) [2015-12-08 01:49:05.669206] Ended  : x=4
(PoolWorker-2) [2015-12-08 01:49:05.669206] Ended  : x=5
(PoolWorker-4) [2015-12-08 01:49:05.669218] Ended  : x=7
(PoolWorker-1) [2015-12-08 01:49:05.669230] Ended  : x=6
[0, 1, 4, 9, 16, 25, 36, 49]

実行中はプロセスがフォークされているので、特にメモリ使用量は気にかける必要がある。

 

複数のパラメータを取る

タプルを渡して、ワーカー側で分解するのが常套手段。

import time
from datetime import datetime
from multiprocessing import Pool, current_process

def f(args):
    x, y = args
    proc_name = current_process().name
    print('(%s) [%s] Started: x=%d, y=%d' % (proc_name, datetime.now(), x, y))
    time.sleep(1)
    print('(%s) [%s] Ended  : x=%d, y=%d' % (proc_name, datetime.now(), x, y))
    return x * y

pool = Pool(4)
print(pool.map(f, zip(range(8), range(10, 18))))

実行例

(PoolWorker-1) [2015-12-08 01:55:01.700005] Started: x=0, y=10
(PoolWorker-2) [2015-12-08 01:55:01.700078] Started: x=1, y=11
(PoolWorker-3) [2015-12-08 01:55:01.700196] Started: x=2, y=12
(PoolWorker-4) [2015-12-08 01:55:01.700463] Started: x=3, y=13
(PoolWorker-3) [2015-12-08 01:55:02.701283] Ended  : x=2, y=12
(PoolWorker-2) [2015-12-08 01:55:02.701275] Ended  : x=1, y=11
(PoolWorker-1) [2015-12-08 01:55:02.701261] Ended  : x=0, y=10
(PoolWorker-4) [2015-12-08 01:55:02.701293] Ended  : x=3, y=13
(PoolWorker-3) [2015-12-08 01:55:02.701914] Started: x=4, y=14
(PoolWorker-1) [2015-12-08 01:55:02.702046] Started: x=5, y=15
(PoolWorker-2) [2015-12-08 01:55:02.702165] Started: x=6, y=16
(PoolWorker-4) [2015-12-08 01:55:02.702361] Started: x=7, y=17
(PoolWorker-1) [2015-12-08 01:55:03.703123] Ended  : x=5, y=15
(PoolWorker-3) [2015-12-08 01:55:03.703106] Ended  : x=4, y=14
(PoolWorker-4) [2015-12-08 01:55:03.703131] Ended  : x=7, y=17
(PoolWorker-2) [2015-12-08 01:55:03.703140] Ended  : x=6, y=16
[0, 11, 24, 39, 56, 75, 96, 119]

 

ワーカーのスタックトレースを出力する

何らかの例外が発生する場合を考える。

import time
from datetime import datetime
from multiprocessing import Pool, current_process

def f(args):
    x, y = args
    proc_name = current_process().name
    print('(%s) [%s] Started: x=%d, y=%d' % (proc_name, datetime.now(), x, y))
    time.sleep(1)
    print('(%s) [%s] Ended  : x=%d, y=%d' % (proc_name, datetime.now(), x, y))
    return y / x

pool = Pool(4)
print(pool.map(f, zip(range(8), range(10, 18))))

そのままだと、スタックトレースの内容が分かりづらい。

(PoolWorker-1) [2015-12-08 01:59:54.088732] Started: x=0, y=10
(PoolWorker-2) [2015-12-08 01:59:54.088835] Started: x=1, y=11
(PoolWorker-3) [2015-12-08 01:59:54.088960] Started: x=2, y=12
(PoolWorker-4) [2015-12-08 01:59:54.089304] Started: x=3, y=13
(PoolWorker-3) [2015-12-08 01:59:55.090067] Ended  : x=2, y=12
(PoolWorker-2) [2015-12-08 01:59:55.090095] Ended  : x=1, y=11
(PoolWorker-1) [2015-12-08 01:59:55.090049] Ended  : x=0, y=10
(PoolWorker-4) [2015-12-08 01:59:55.090160] Ended  : x=3, y=13
(PoolWorker-2) [2015-12-08 01:59:55.090750] Started: x=4, y=14
(PoolWorker-3) [2015-12-08 01:59:55.090926] Started: x=5, y=15
(PoolWorker-4) [2015-12-08 01:59:55.091020] Started: x=6, y=16
(PoolWorker-1) [2015-12-08 01:59:55.091336] Started: x=7, y=17
Traceback (most recent call last):
  File "xxx.py", line 14, in
    print(pool.map(f, zip(range(8), range(10, 18))))
  File "/Users/xxxxxx/.pyenv/versions/2.7.9/lib/python2.7/multiprocessing/pool.py", line 251, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/Users/xxxxxx/.pyenv/versions/2.7.9/lib/python2.7/multiprocessing/pool.py", line 558, in get
    raise self._value
ZeroDivisionError: integer division or modulo by zero

デコレータでスタックトレースを表示してみる。副産物として、関数 f が複数のパラメータを取れるようになった。

import time
from datetime import datetime
from multiprocessing import Pool, current_process
import traceback

def with_stacktrace(func):
    import functools

    @functools.wraps(func)
    def wrapper(args):
        try:
            return func(*args)
        except:
            proc_name = current_process().name
            for line in traceback.format_exc().splitlines():
                print('[TRACE:%s] %s' % (proc_name, line))
            raise
    return wrapper

@with_stacktrace
def f(x, y):
    proc_name = current_process().name
    print('(%s) [%s] Started: x=%d, y=%d' % (proc_name, datetime.now(), x, y))
    time.sleep(1)
    print('(%s) [%s] Ended  : x=%d, y=%d' % (proc_name, datetime.now(), x, y))
    return y / x

pool = Pool(4)
print(pool.map(f, zip(range(8), range(10, 18))))

実行例

(PoolWorker-1) [2015-12-08 02:31:36.959575] Started: x=0, y=10
(PoolWorker-2) [2015-12-08 02:31:36.959657] Started: x=1, y=11
(PoolWorker-3) [2015-12-08 02:31:36.959721] Started: x=2, y=12
(PoolWorker-4) [2015-12-08 02:31:36.960174] Started: x=3, y=13
(PoolWorker-4) [2015-12-08 02:31:37.960911] Ended  : x=3, y=13
(PoolWorker-2) [2015-12-08 02:31:37.960904] Ended  : x=1, y=11
(PoolWorker-3) [2015-12-08 02:31:37.960920] Ended  : x=2, y=12
(PoolWorker-1) [2015-12-08 02:31:37.960888] Ended  : x=0, y=10
(PoolWorker-2) [2015-12-08 02:31:37.961423] Started: x=4, y=14
(PoolWorker-3) [2015-12-08 02:31:37.961634] Started: x=5, y=15
(PoolWorker-4) [2015-12-08 02:31:37.961724] Started: x=6, y=16
[TRACE:PoolWorker-1] Traceback (most recent call last):
[TRACE:PoolWorker-1]   File "xxx.py", line 9, in f
[TRACE:PoolWorker-1]     return func(*args)
[TRACE:PoolWorker-1]   File "xxx.py", line 23, in f
[TRACE:PoolWorker-1]     return y / x
[TRACE:PoolWorker-1] ZeroDivisionError: integer division or modulo by zero
(PoolWorker-1) [2015-12-08 02:31:37.962384] Started: x=7, y=17
Traceback (most recent call last):
  File "xxx.py", line 26, in
    print(pool.map(f, zip(range(8), range(10, 18))))
  File "/Users/xxxxxx/.pyenv/versions/2.7.9/lib/python2.7/multiprocessing/pool.py", line 251, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/Users/xxxxxx/.pyenv/versions/2.7.9/lib/python2.7/multiprocessing/pool.py", line 558, in get
    raise self._value
ZeroDivisionError: integer division or modulo by zero

これでデバッグが捗る。

 

 

References

0 件のコメント:

コメントを投稿