Skip to content

Instantly share code, notes, and snippets.

@an9er
Last active April 18, 2018 08:29
Show Gist options
  • Save an9er/5a965dd9cf4fff8a031900a7a9c6bd00 to your computer and use it in GitHub Desktop.
Save an9er/5a965dd9cf4fff8a031900a7a9c6bd00 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
import multiprocessing as mp
import os
import random
from signal import signal, SIGINT, SIG_IGN, siginterrupt
import time
def data_source():
"""数据源。
随机选择一个浮点数,作为worker进程的sleep时间,
具体实践时可以将这部分实现改为读取数据库。
"""
dataset = [0.1, 0.2, 0.3, 0.4, 0.5]
while True:
time.sleep(0.2)
yield random.choice(dataset)
def proc_proxy(cntl_q, data_q, exit_flag):
"""从数据源读取数据。
先通过cntl_q通知主进程,
再将数据通过data_q发给worker。
"""
for item in data_source():
cntl_q.put({'event': 'data'})
data_q.put(item)
if exit_flag.is_set():
cntl_q.put({'event': 'exit', 'pid': os.getpid()})
break
def proc_worker(cntl_q, data_q):
"""处理数据。
从data_q获取数据,处理完毕后通过cntl_q通知主进程,
然后退出。
"""
item = data_q.get()
time.sleep(item)
cntl_q.put({'event': 'exit', 'pid': os.getpid()})
def main():
proc_pool = {} # 记录创建的所有子进程
cntl_q = mp.Queue() # 控制信息传递队列
data_q = mp.Queue() # 具体数据传递队列
exit_flag = mp.Event() # 退出标记,初始值为False
# 收到SIGINT,通知proxy停止读取数据
signal(SIGINT, lambda x, y: exit_flag.set())
siginterrupt(SIGINT, False)
# 启动proxy进程,后续按需启动woker进程
print 'main {} started'.format(os.getpid())
proc = mp.Process(target=proc_proxy, args=(cntl_q, data_q, exit_flag))
proc.start()
proc_pool[proc.pid] = proc
print 'proxy {} started'.format(proc.pid)
while True:
item = cntl_q.get()
if item['event'] == 'data':
proc = mp.Process(target=proc_worker, args=(cntl_q, data_q))
proc.start()
proc_pool[proc.pid] = proc
print 'worker {} started'.format(proc.pid)
elif item['event'] == 'exit':
proc = proc_pool.pop(item['pid'])
proc.join()
print 'child {} stopped'.format(item['pid'])
else:
print 'It\'s impossible !'
if not proc_pool: # 所有子进程均已退出
break
print 'main {} stopped'.format(os.getpid())
if __name__ == '__main__':
main()
@an9er
Copy link
Author

an9er commented Apr 18, 2018

多进程杀掉子进程的优雅之处在于

  • 一个子进程池中的子进程处理一条piece的数据
  • 杀死子进程通过join,而不是kill
  • 不能使用join的改变多进程组织框架,使得可以使用join
    • 通常是之前的模型是进程不可以死,现在换成可以死的
  • 这种的适用情景是不要求立刻杀死,而是warm的杀死
    • 这不是缺点,而是适用场景,两者不同

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment