副问题[/!--empirenews.page--]
小惊大怪
你是不是在用Python3可能在windows体系上编程?最重要的是你对历程和线程不是很清晰?那么恭喜你,在python漫衍式历程中,会有坑等着你去挖。。。(hahahaha,此处应承我恐吓一下你)恶作剧的啦,不外,假如你知道序列中不支持匿名函数,那这个坑就和你say byebye了。好了话不大都,直接进入正题。
漫衍式历程
正如各人所知道的Process比Thread更不变,并且Process可以漫衍到多台呆板上,而Thread最多只能漫衍到统一台呆板的多个CPU上。Python的multiprocessing模块不单支持多历程,个中managers子模块还支持把多历程漫衍到多台呆板上。一个处事历程可以作为调治者,将使命漫衍到其他多个历程中,依赖收集通讯。因为managers模块封装很好,不必相识收集通讯的细节,就可以很轻易地编写漫衍式多历程措施。
代码记录
举个例子
假如我们已经有一个通过Queue通讯的多历程措施在统一台呆板上运行,此刻,因为处理赏罚使命的历程使命沉重,但愿把发送使命的历程和处理赏罚使命的历程漫衍到两台呆板上,这应该怎么用漫衍式历程来实现呢?你已经知道了原有的Queue可以继承行使,并且通过managers模块把Queue通过收集袒暴露去,就可以让其他呆板的历程来会见Queue了。好,那我们就这么干!
写个task_master.py
我们先看处事历程。处事历程认真启动Queue,把Queue注册到收集上,然后往Queue内里写入使命。
- #!/user/bin/pytthon
- # -*- coding:utf-8 -*-
- # @Time: 2018/3/3 16:46
- # @Author: lichexo
- # @File: task_master.py
- import random, time, queue
- from multiprocessing.managers import BaseManager
- # 发送使命的行列:
- task_queue = queue.Queue()
- # 吸取功效的行列:
- result_queue = queue.Queue()
- # 从BaseManager担任的QueueManager:
- class QueueManager(BaseManager):
- pass
- # 把两个Queue都注册到收集上, callable参数关联了Queue工具:
- QueueManager.register('get_task_queue', callable=lambda: task_queue)
- QueueManager.register('get_result_queue', callable=lambda: result_queue)
- # 绑定端口5000, 配置验证码'abc':
- manager = QueueManager(address=('', 5000), authkey=b'abc')
- # 启动Queue:
- manager.start()
- # 得到通过收集会见的Queue工具:
- task = manager.get_task_queue()
- result = manager.get_result_queue()
- # 放几个使命进去:
- for i in range(10):
- n = random.randint(0, 10000)
- print('Put task %d...' % n)
- task.put(n)
- # 从result行列读取功效:
- print('Try get results...')
- for i in range(10):
- r = result.get(timeout=10)
- print('Result: %s' % r)
- # 封锁:
- manager.shutdown()
- print('master exit.')
请留意,当我们在一台呆板上写多历程措施时,建设的Queue可以直接拿来用,可是,在漫衍式多历程情形下,添加使命到Queue不行以直接对原始的task_queue举办操纵,那样就绕过了QueueManager的封装,必需通过manager.get_task_queue()得到的Queue接口添加。然后,在另一台呆板上启动使命历程(本机上启动也可以)
写个task_worker.py
- #!/user/bin/pytthon
- # -*- coding:utf-8 -*-
- # @Time: 2018/3/3 16:46
- # @Author: lichexo
- # @File: task_worker.py
- import time, sys, queue
- from multiprocessing.managers import BaseManager
- # 建设相同的QueueManager:
- class QueueManager(BaseManager):
- pass
- # 因为这个QueueManager只从收集上获取Queue,以是注册时只提供名字:
- QueueManager.register('get_task_queue')
- QueueManager.register('get_result_queue')
- # 毗连随处事器,也就是运行task_master.py的呆板:
- server_addr = '127.0.0.1'
- print('Connect to server %s...' % server_addr)
- # 端口和验证码留意保持与task_master.py配置的完全同等:
- m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
- # 从收集毗连:
- m.connect()
- # 获取Queue的工具:
- task = m.get_task_queue()
- result = m.get_result_queue()
- # 从task行列取使命,并把功效写入result行列:
- for i in range(10):
- try:
- n = task.get(timeout=1)
- print('run task %d * %d...' % (n, n))
- r = '%d * %d = %d' % (n, n, n*n)
- time.sleep(1)
- result.put(r)
- except Queue.Empty:
- print('task queue is empty.')
- # 处理赏罚竣事:
- print('worker exit.')
使命历程要通过收集毗连随处事历程,以是要指定处事历程的IP。
运行功效
(编辑:河北网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|