lang/py

Book 파이썬답게 코딩하기 - Multiprocessing

C/H 2020. 1. 14. 08:30

Multiprocessing

파이썬답게 코딩하기
multiprocessing

multiprocessing 모듈을 사용하면 각각의 작업이 분리된 메모리 공간을 갖게 된다.
CPU 코어 개수만큰 동시에 일을 처리할 수 있고, 자원을 최대한 활용할 수 있다.
shared memory를 사용하지 않는 한 GIL 영향을 받지 않는다.
CPython에서 CPU Bound 관련 작업은 multiprocessing 모듈로 구현하는 것이 효율적이다.

mutiprocessing은 스레드보다 많은 메모리를 사용한다.
각각의 작업(프로세스)끼리 공유할 별도의 IPC(Inter Process Communication)를 구현해야 한다.
이 과정은 스레드보다 복작할 수 있다.

Multiprocessing 구현

# basic_multiprocessing_function.py
name : process 0, argument : 0
parent pid : 68919, pid : 68920

name : process 1, argument : 1
parent pid : 68919, pid : 68921

name : process 2, argument : 2
parent pid : 68919, pid : 68922

name : process 3, argument : 3
parent pid : 68919, pid : 68923

name : process 4, argument : 4
parent pid : 68919, pid : 68924
# basic_multiprocessing_class.py
name : process 0, argument : 0
parent pid : 68967, pid : 68968

name : process 1, argument : 1
parent pid : 68967, pid : 68969

name : process 2, argument : 2
parent pid : 68967, pid : 68970

name : process 3, argument : 3
parent pid : 68967, pid : 68971

name : process 4, argument : 4
parent pid : 68967, pid : 68972

Multiprcessing의 로깅

# logging_multiprocessing.py
[INFO/Process-1] child process calling self.run()
count : 0
[INFO/Process-1] process shutting down
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[INFO/Process-1] process exiting with exitcode 0
[INFO/Process-2] child process calling self.run()
count : 1
[INFO/Process-2] process shutting down
[DEBUG/Process-2] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-2] running the remaining "atexit" finalizers
[INFO/Process-2] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[INFO/Process-3] child process calling self.run()
count : 2
[INFO/Process-3] process shutting down
[DEBUG/Process-3] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-3] running the remaining "atexit" finalizers
[INFO/MainProcess] calling join() for process Process-5
[INFO/Process-3] process exiting with exitcode 0
[INFO/Process-4] child process calling self.run()
count : 3
[INFO/Process-4] process shutting down
[DEBUG/Process-4] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-4] running the remaining "atexit" finalizers
[INFO/Process-4] process exiting with exitcode 0
[INFO/Process-5] child process calling self.run()
count : 4
[INFO/Process-5] process shutting down
[DEBUG/Process-5] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-5] running the remaining "atexit" finalizers
[INFO/Process-5] process exiting with exitcode 0
[INFO/MainProcess] calling join() for process Process-3
[INFO/MainProcess] calling join() for process Process-2
[INFO/MainProcess] calling join() for process Process-4
[DEBUG/MainProcess] running the remaining "atexit" finalizers

Daemon Process

스레드와 같이 자식이 종료되지 않으면 메인 프로그램도 종료되지 않는다.

# daemon_process.py
Start
# daemon_process_join.py
Start
Exit

Process Exit

스레드는 프로세스내 자식 스레드를 종료할 수 있는 방법이 없다.
하지만 프로세스는 자식을 종료할 수 있고, 상태 확인과 수행 결과를 반환받을 수 있다.

# process_exit.py
Process check : <Process(good_job, initial)>, False
Start name:good_job, pid:69653
Process check : <Process(fail_job, initial)>, False
Start name:fail_job, pid:69654
Process check : <Process(kill_job, initial)>, False
Start name:kill_job, pid:69655
Process check : <Process(good_job, started)>, True
Process check : <Process(fail_job, started)>, True
Process check : <Process(kill_job, started)>, True
Exit name:good_job, pid:69653
Exit name:fail_job, pid:69654
Process check : <Process(good_job, stopped)>, False
Process check : <Process(fail_job, stopped[1])>, False
Process check : <Process(kill_job, started)>, True
Terminate process : <Process(kill_job, started)>
Process name : good_job, exit code : 0
Process name : fail_job, exit code : 1
Process name : kill_job, exit code : None

Process Event

이벤트 통신

# process_event.py
Wait ...
[first] Event status : (False)
[second] Event status : (False)
[first] Event status : (False)
[second] Event status : (False)
[first] Event status : (False)
[second] Event status : (False)
[first] Event status : (False)
[second] Event status : (False)
Set e1
[first] Event status : (True)
[first] e1 is set.
[second] Event status : (False)
[second] Event status : (False)
[second] Event status : (False)
[first] Set e2
[second] Event status : (True)
[second] e2 is set.
Exit

Process Communication

프로세스간 통신으로 queue, pipe를 지원한다.

# process_queue.py
[set_data] set queue data : Hello World
[get_data] get queue data : Hello World

queue는 한번에 여러 proces와 정보 교환을 할 수 있다.
pip는 1:1 통신 방시으로 소켓통신과 비슷하다.

# process_pipe.py
[child] Send a message to pipe : Hello World
Recieved message : Hello World

Process 동기화

스레드에서 사용하던 Lock, RLock, Condition, Semaphore 모두 multiprocessing 모듈에서 사용 가능하다.
이런 장치는 메모리 공간을 공유하지만 사용하는 주체가 다를 때, 메모리 즉 자원의 무결성을 보장하기 위해 만들어진 장치다.
다시 말해 동기화 장치다.
그래서 메모리를 공유하는 스레드에서는 유용하지만, 메모리 공간이 분리되는 프로세스는 사용할 필요가 없다.

프로세스간에더 데이터를 공유해야 하는 상황에서 queue, pipe를 사용할 수 있지만,
메모리 공간 자체를 공유해야 할 경우 manager 클래스를 만들어 관리한다.

Process 메모리 공유

multiprocess에서 공유된 메모리 공간에 저장할 수 있는 Value와 Array라는 API를 제공한다.
이 API들은 프로세스와 스레드간 호출에도 무결성이 보장된다.

# process_shared_memoey.py
[worker] num : 5
[worker] num list[0] : 0
[worker] num list[1] : 1
[worker] num list[2] : 2
[worker] num list[3] : 3
[worker] num list[4] : 4
[worker] num list[5] : 5
[worker] num list[6] : 6
[worker] num list[7] : 7
[worker] num list[8] : 8
[worker] num list[9] : 9
num : 50
num list[0] : 0
num list[1] : 10
num list[2] : 20
num list[3] : 30
num list[4] : 40
num list[5] : 50
num list[6] : 60
num list[7] : 70
num list[8] : 80
num list[9] : 90

server_process 는 내부적으로 자원을 관리하는 프로세스가 하나 더 있다고 보면 된다.

# server_process.py
worker] value : Value('i', 5), dict : Python2
[worker] num list[0] : 0
[worker] num list[1] : 1
[worker] num list[2] : 2
[worker] num list[3] : 3
[worker] num list[4] : 4
[worker] num list[5] : 5
[worker] num list[6] : 6
[worker] num list[7] : 7
[worker] num list[8] : 8
[worker] num list[9] : 9
[worker] num list[0] : 0
[worker] num list[1] : 1
[worker] num list[2] : 2
[worker] num list[3] : 3
[worker] num list[4] : 4
[worker] num list[5] : 5
[worker] num list[6] : 6
[worker] num list[7] : 7
[worker] num list[8] : 8
[worker] num list[9] : 9
[main] value : Value('i', 50), dict : Python3
[main] num list[0] : 0
[main] num list[1] : 10
[main] num list[2] : 20
[main] num list[3] : 30
[main] num list[4] : 40
[main] num list[5] : 50
[main] num list[6] : 60
[main] num list[7] : 70
[main] num list[8] : 80
[main] num list[9] : 90
[main] num list[0] : 0
[main] num list[1] : 10
[main] num list[2] : 20
[main] num list[3] : 30
[main] num list[4] : 40
[main] num list[5] : 50
[main] num list[6] : 60
[main] num list[7] : 70
[main] num list[8] : 80
[main] num list[9] : 90

Process Pool

프로세스는 pool을 만들어서 작업을 분리 처리할 수 있다.
작업과 데이터를 pool에 등록 후 프로세스 수를 정하면, 프로세스 pool은 작업과 데이터를 나눠서 정해진 프로세스만큼 나눠서 처리한다.

# process_pool.py
Start process : ForkPoolWorker-1
Start process : ForkPoolWorker-2
Start process : ForkPoolWorker-3
Start process : ForkPoolWorker-4
Result : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
# cowork_coroutine.py
[w1] Total : 43, work : 7
[w2] Total : 40, work : 3
[w1] Total : 33, work : 7
[w2] Total : 31, work : 2
[w1] Total : 30, work : 1
[w2] Total : 23, work : 7
[w1] Total : 15, work : 8
[w2] Total : 11, work : 4
[w1] Total : 8, work : 3
[w2] Total : 4, work : 4
[w1] Total : 0, work : 4
[w2] Total : 0, work : 0

Yield From

yield 객체 반환, Python 3.3에서 yield from 구문을 추가됐다.

# return_coroutine.py
== Get coroutine ==
<generator object get_coroutine at 0x10aa72150>
== Get values ==
0
[1, 2, 3, 4, 5, 6, 7, 8, 9]
반응형