Python基础-分布式进程

发布时间:2017-09-13 12:41:56
Python基础-分布式进程

Python基础-分布式进程,在Thread和Process中,应当优选Process,因为Process更稳定,而且,Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。
Python的multiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。由于managers模块封装很好,不必了解网络通信的细节,就可以很容易地编写分布式多进程程序。
举个例子:如果我们已经有一个通过Queue通信的多进程程序在同一台机器上运行,现在,由于处理任务的进程任务繁重,希望把发送任务的进程和处理任务的进程分布到两台机器上。怎么用分布式进程实现?
原有的Queue可以继续使用,但是,通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了。

实现步骤

服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务:
首先是产生任务

import random,time,queue,threading from multiprocessing.managers import BaseManager from multiprocessing import freeze_support #发送任务队列 task_queue = queue.Queue() #接受结果队列 result_queue = queue.Queue() #因为callable接受的是函数 但是不知为什么 lambda 会出错 def task_queue_callable(): return task_queue def result_queue_callable(): return result_queue #从BaseManager继承得到 class QueueManager(BaseManager): pass def run_task(): #注册2个Queue到网上 callable关联Queue对象 #callable=lambda : task_queue QueueManager.register('get_task_queue', callable=task_queue_callable) QueueManager.register('get_result_queue', callable=result_queue_callable) #绑定端口为 60000 验证码 '123456' manager = QueueManager(address=('127.0.0.1',6000), authkey=b'123456') #启动Queue manager.start() #获得通过网络访问的Queue对象 task = manager.get_task_queue() result = manager.get_result_queue() #往里面放任务 等待计算结果:其他机器执行n*n操作 并放入结果队列 for i in range(10): n = random.randint(0,1000) print('put task : %d' % n) task.put(n) print('wait for result') for i in range(10): r = result.get(timeout=100) print('result : %s' % r) #关闭 manager.shutdown() print('master shutdown') if __name__ == '__main__': freeze_support() thread = threading.Thread(target=run_task) thread.start() thread.join() #假如不安装freeze_support这个模块 就会包以下错误 ''' RuntimeError: An attempt has been made to start a new process before the current process has finished its bootstrapping phase. This probably means that you are not using fork to start your child processes and you have forgotten to use the proper idiom in the main module: if __name__ == '__main__': freeze_support() ... The "freeze_support()" line can be omitted if the program is not going to be frozen to produce an executable. '''

接着是处理任务的:

import random import time,sys,queue,threading from multiprocessing.managers import BaseManager class QueueManager(BaseManager): pass def run_worker(): QueueManager.register('get_task_queue') QueueManager.register('get_result_queue') server_addr = '127.0.0.1' server_port = 6000 print('connect to server %s:%d' % (server_addr,server_port)) manager = QueueManager(address=(server_addr,server_port), authkey=b'123456') #网络连接 manager.connect() #获取Queue对象 task = manager.get_task_queue() result = manager.get_result_queue() #取出task任务 完成计算 并写到result while True: try: n = task.get(timeout=10) print('do task %d * %d' % (n,n)) r = '%d * %d = %d' % (n,n,n*n) time.sleep(random.random() % 2) result.put(r) except queue.Empty: print('task do over!') manager.shutdown print('worker shutdowm') if __name__=='__main__': thread = threading.Thread(target=run_worker) thread.start() thread.join()

处理结果流程

#产生任务 put task : 125 put task : 621 put task : 996 put task : 815 put task : 172 put task : 425 put task : 163 put task : 132 put task : 474 put task : 211 wait for result #运行处理任务后 connect to server 127.0.0.1:6000 do task 125 * 125 do task 621 * 621 do task 996 * 996 do task 815 * 815 do task 172 * 172 do task 425 * 425 do task 163 * 163 do task 132 * 132 do task 474 * 474 do task 211 * 211 #同样生产任务这边也会打印 result : 125 * 125 = 15625 result : 621 * 621 = 385641 result : 996 * 996 = 992016 result : 815 * 815 = 664225 result : 172 * 172 = 29584 result : 425 * 425 = 180625 result : 163 * 163 = 26569 result : 132 * 132 = 17424 result : 474 * 474 = 224676 result : 211 * 211 = 44521 master shutdown

企业建站2800元起,携手武汉肥猫科技,做一个有见地的颜值派!更多优惠请戳:十堰SEO http://shiyan.raoyu.net

上一篇:python中的排序
下一篇:最后一页