1장에서는 sorted_queue가 어디서부터 생겨나는가?를 보았습니다.
class CeleryExecutor.trigger_tasks()
1 |
|
sorted_queue 그 밑으로는 간단합니다. open_slot
은 최대 병렬처리 가능한 프로세스 갯수에서 현재 running으로 표시되어 있는 task의 갯수를 뺀 갯수. 즉 현재 실행 가능한 프로세스의 갯수입니다. queue 앞에서부터 실행 가능한 프로세스 갯수만큼 pop(0)으로 빼서 task_tuples_to_send
에 넣은 뒤, _process_tasks
에 인자로 넘겨줍니다.
_process_tasks() / _send_tasks_to_celery()
1 |
|
first_task에서는 가장 첫번째 execute_command를 할당합니다.
중간에 _send_tasks_to_celery()가 등장하는데, 실행할 수 있는 task가 1개밖에 없다면 task를 바로 map 함수에 send_task_to_executor를 적용시켜 executor로 보냅니다. send_task_to_executor
는 Celery의 task 클래스의 메서드인데, 이 안에 있는 메서드를 따라가다보면 execute_command를 OS 단의 API Call로 직접 호출하는 메서드까지 나오지 않을까?하는 추측을 해봅니다. 이것까지 들어가지는 않겠습니다.
어쨌든 여기서는 ProcessPoolExecutor를 사용한 병렬처리가 들어갑니다. 각자 executor에 task를 병렬로 할당하는 execute_async()
메서드는 CeleryExecutor에서 사용하지 않았지만, 실행 가능한 task의 list를 한군데
에서 계속 갱신한 뒤 실행하는것만 병렬로 각각 executor에 뿌려주는 방식이라면 가능할것도 같습니다. 실행하는 task가 겹칠 일도 없고 성공하거나 실패한 task가 queue에 남아있을 일도 없습니다.
for loop 부터는 task의 상태를 갱신합니다. _send_tasks_to_celery()
로 실행시킨 task는 queue에서 빼고 running 상태인 task는 self.running
에 집어넣는 등의 동작이 실행됩니다. self.event_buffer에도 queue와 task id를 저장하는데 정확한 쓰임새는 잘 모르겠습니다.
_process_tasks()
에서는 task를 실제로 실행하는 단계로 넘기고 상태를 갱신하는 작업이 이루어지는 것을 알 수 있습니다.
정리
이 외에도 CeleryExecutor에서는 실행되는 메서드가 더 많지만, change_state / update_task_state
등 task의 상태를 관리하는 메서드입니다. 이 메서드들은 더 깊게 보지는 않겠습니다.