lang/py

Book 파이썬답게 코딩하기 - concurrent.futures

C/H 2020. 1. 21. 08:30

concurrent.futures

파이썬답게 코딩하기

concurrent.futures 모듈은 파이썬에서 사용되는 스레딩과 멀티프로세싱을 고도화한 모듈이다.
Pythone 3.2 버전에 추가되었고, __future__ 모듈과 혼동될 수 있기 때문에 concurrent.futures로 명명됐다.
이 모듈은 동기, 비동기, 블록, 논블록 4가지 방식중 비동기 논블록 방식을 스레드와 프로세스로 구현한 모듈이다.

파이썬에서 구현된 스레딩, 멀티프로세싱 API는 C기반의 코드를 파이썬에서 사용할 수 있도록 래핑(wrapping)한 수준이었다.
멀티프로세싱이 스레딩 모듈을 참조 했고, 스레딩 모듈은 C의 API를 래핑한 것이다.
이러한 기술을 이용하기 위해서는 동기화를 위한 방법과, 정보를 교환하는 방법을 고려해야 하고, 이런 방법을 알고 있어야 API로 직접 사용하는 불편한 점을 피할 수 있다.

concurrent.futures는 스레드와 멀티프로세싱을 개선해서 ThreadPoolExecutore, ProcessPoolExecutor를 만들었다.
그리고 위 두 클래스의 API를 통합하고 내부적으로 작업을 비동기 논블록으로 실행하기 만들었다.
작업으로 인해 메인 로직이 블록되지 않고 비동기 처리가 되도록 만들었다.
스레드와 멀티프로세싱에서 데몬(daemon) 실행과 유사하다.

Executor

Executor는 호출 가능한 객체를 비동기 호출하는 일종의 실행기다.
함수나 코루틴 같은 것을 실행시킨다.
Executor는 상속을 받아서 처리하는 방식에 따라서 ThreadPoolExecutor, ProcessPoolExecutor 방식으로 나뉜다.
API는 submit, map, shutdown 3가지 메서드로 이용한다.

submit

비동기 실행 함수와 인자값을 받아서 Future 클래스를 반환한다.

map

일반적인 map과 동일한 기능으로 비동기로 실행되기 때문에 종료되지 않고 멈춰 있는 경우를 대비 timeout값을 받을 수 있다.
timeout 시간을 넘으면 예외를 발생시킨다.

shutdown

종료 시그널을 보내 실행 중, 대기 중인 Feture 객체를 종료시키고, 사용중인 리소스를 정리하게 한다.
shutdown 호출한 Executor는 submit, map을 호출할 수 없다.
wait 매개변수로 True값을 설정하면 실행중인 값이 완료될 때까지 대기한다.

Future

호출 가능 객체를 비동기 캡슐화한다.
Executor에서 submit으로 전달된 함수를 비동기 실행, 실행된 결과나 현재 상태를 조회 가능하도록 만든다.
작업을 예약하고 상태를 확인할 수 있도록 만든다.
상태확인 API는 canceled: 취소, running: 동작 중,done: 완료 있고, True/False 값을 반환한다.
명령 API는 cancel: 취소, result: 결과, exception: 예외, add_done_callback: callback 추가이 있다.

Module Functions

비동기 처리로인해 병렬화된 작업을 동기화 하기 위한 API로 wait, as_completed 2개 메서드를 제공한다.

wait

전달된 시간 만큼 기다리고, 완료/미완료로 나누어 튜플을 반환한다.
timeout을 설정하지 않고 작업 처리 결과에 따라서 값을 반환받을 수 있다.
첫 번째 완료, 첫 번째 예외, 모든 작업 완료일 때를 구분해서 결과를 받을 수 있다.

as_completed

비동기 시행 중인 함수들의 집합을 받아 하나씩 순회하면 완료를 기다리는 메서드이다.
timeout 값을 설정할 수 있고, 기본값은 None이다.

future

# basic_future.py
# Python 3.2 or later
Worker Index : 0
Worker Index : 1
Worker Index : 2
Worker Index : 3
result : Completed 0 worker job
[1 worker] Wait for 1 second because it has not finished yet.
Worker Index : 4
result : Completed 1 worker job
[2 worker] Wait for 1 second because it has not finished yet.
result : Completed 2 worker job
[3 worker] Wait for 1 second because it has not finished yet.
result : Completed 3 worker job
[4 worker] Wait for 1 second because it has not finished yet.
[4 worker] Timeout error
Exception ignored in: <function ProcessPoolExecutor._start_queue_management_thread.<locals>.weakref_cb at 0x10e27c950>
Traceback (most recent call last):
...
OSError: handle is closed
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
...
Exception in thread QueueManagerThread:
Traceback (most recent call last):
...
OSError: handle is closed
...
OSError: handle is closed
# with_future.py
# Python 3.2 or later
Worker Index : 0
Worker Index : 1
Worker Index : 2
Worker Index : 3
Worker Index : 4
Finished worker : Completed 1 worker job
Finished worker : Completed 0 worker job
Not finished worker : Completed 2 worker job
Not finished worker : Completed 3 worker job
Not finished worker : Completed 4 worker job
반응형