3.1 多进程:进程间通信
发布时间:2022-12-16 11:10:49 所属栏目:Unix 来源:
导读: 1. 进程间通信
为了进程安全起见,两个进程之间的数据是不能够互相访问的(默认情况下),进程与进程之间的数据是不可以互相访问的,而且每一个进程的内存是独立的。多进程的资源是独立的,不可以互相访问,
为了进程安全起见,两个进程之间的数据是不能够互相访问的(默认情况下),进程与进程之间的数据是不可以互相访问的,而且每一个进程的内存是独立的。多进程的资源是独立的,不可以互相访问,
|
1. 进程间通信 为了进程安全起见,两个进程之间的数据是不能够互相访问的(默认情况下),进程与进程之间的数据是不可以互相访问的,而且每一个进程的内存是独立的。多进程的资源是独立的,不可以互相访问,如果想多个进程之间实现数据交互就必须通过中间件实现。进程间通信方法有Queue、Pipes、Mangers和Value,Array四种。 (1)进程队列(Queue)通信 Queue([maxsize]):建立一个共享的队列(其实并不是共享的,实际是克隆的,内部维护着数据的共享),多个进程可以向队列里存/取数据。其中,参数是队列最大项数,省略则无限制。Queue的常用方法如下: from multiprocessing import Queue, Process def fun(q, i): q.put([1 * i, 2 * i, 3 * i]) if __name__ == '__main__': Q = Queue(5) # 设置进程队列长度 for i in range(2): # 启动两个进程,想队列里put数据 process = Process(target=fun, args=(Q, i + 1)) # 创建一个进程,将Q传入,实际上是克隆了Q process.start() process.join() print(Q.qsize()) # 2 print(Q.get()) # [1, 2, 3] print(Q.get()) # [2, 4, 6] print(Q.qsize()) # 0 (2)进程管道(Pipes)通信 多进程还有一种数据传递方式叫管道原理和 Queue相同。Pipe可以在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1unix进程通信,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道。 构造方法:Pipe([dumplex]) dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。 实例方法: from multiprocessing import Process, Pipe import time def f(child_conn): time.sleep(1) child_conn.send("吃了吗") # 给父进程发送消息 print("来自父亲的问候:", child_conn.recv()) #接收父进程发送的消息 child_conn.close() if __name__ == "__main__": parent_conn, child_conn = Pipe() # 创建管道两端,必须在创建Process之前创建Pipe p = Process(target=f, args=(child_conn,)) # 创建子进程 p.start() print("来自儿子的问候:", parent_conn.recv())# 接收子进程的消息 parent_conn.send("嗯") # 给子进程发送消息 (3)进程的Mangers通信 Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问。从而达到多进程间数据通信且安全。 Manager实现了多个进程间的数据共享,支持的数据类型有 list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value , Array。 Manager中 以下类型均不是进程安全的, Manager只是保证的数据能传递但是没有保证数据的安全所以很多时候需要我们自己通过进程锁来控制的。下面不安全的类型部分在进程里是有安全的可以通过from multiprocessing import Array,Lock,Queue 方式使用,没有的那么只能加锁了。 Array(self,*args,**kwds) BoundedSemaphore(self,*args,**kwds) Condition(self,*args,**kwds) Event(self,*args,**kwds) JoinableQueue(self,*args,**kwds) Lock(self,*args,**kwds) Namespace(self,*args,**kwds) Pool(self,*args,**kwds) Queue(self,*args,**kwds) RLock(self,*args,**kwds) Semaphore(self,*args,**kwds) Value(self,*args,**kwds) dict(self,*args,**kwds) list(self,*args,**kwds) 例子: import multiprocessing def f(x, arr, l, d, n): x.value = 3.14 arr[0] = 5 l.append('Hello') d["name"] = "xxx" n.a = 10 if __name__ == '__main__': manager = multiprocessing.Manager() x = manager.Value('d', 0.0) arr = manager.Array('i', range(10)) l = manager.list() d = manager.dict() n = manager.Namespace() # 以上代码需要在进程创建前使用 proc = multiprocessing.Process(target=f, args=(x, arr, l, d, n)) proc.start() proc.join() print(x.value) print(arr) print(l) print(d) print(n.a) 假如有一个IpConnectionPool对象需要多个进程共享,步骤如下: from multiprocessing.managers import BaseManager # 多进程对象共享 manager = BaseManager() # 一定要在start前注册,不然就注册无效 # 注册数据库线程池对象 manager.register('IpConnectionPool', IpConnectionPool) # 启动manager服务(就是开了一个新进程来管理IpConnectionPool对象的) manager.start() # 获取进程对象 ipConnectionPool = manager.IpConnectionPool() # manager必须在主线程进行创建进程否则报错 pool = multiprocessing.Pool(int(multiprocessing.cpu_count())) # 创建进程池 for i in range(2,3423): # 数据库线程池对象传入到进程中使用 pool.apply_async(batchInsert,args=(i,ipConnectionPool),callback=call_back,error_callback=err_call_back) time.sleep(5) pool.close() pool.join() (4)Value,Array资源共享 multiprocessing 中Value和Array的实现原理都是在共享内存中创建ctypes()对象来达到共享数据的目的,两者实现方法大同小异,只是选用不同的ctypes数据类型而已。 ① Value 构造方法:Value((typecode_or_type, args[, lock]) ② Array 构造方法:Array(typecode_or_type, size_or_initializer, **kwds[, lock]) import multiprocessing def f(n, a): n.value = 3.14 a[0] = 5 if __name__ == '__main__': num = multiprocessing.Value('d', 0.0) arr = multiprocessing.Array('i', range(10)) p = multiprocessing.Process(target=f, args=(num, arr)) p.start() p.join() print(num.value) print(arr[:]) 2. 进程锁 多进程之间不共享数据,但共享同一套文件系统,像访问同一个文件、同一终端打印,如果不进行同步操作,就会出现错乱的现象。 所有在 threading 存在的同步方式,multiprocessing 中都有类似的等价物,如:锁(lock,RLock)、信号量Semaphore,Event(事件)Condition,Barrier等。 (1)锁Lock from multiprocessing import Process, Lock import time def fun(l, i): l.acquire() # 获取锁 print("正在运行进程: ", i) time.sleep(2) l.release() # 使用完后释放锁 if __name__ == '__main__': lock = Lock() # 生成锁的实例 for i in range(5): p = Process(target=fun, args=(lock, i)) # 创建进程 p.start() # 启动,这里没有join,进程可以并发 (2)可重入锁RLock from multiprocessing import Process,RLock if __name__ == '__main__': # 创建一个rlock对象 lock = RLock() # 初始化共享资源 abce = 0 # 本线程访问共享资源 lock.acquire() # 加锁 abce = abce + 1 # 这个线程尝试访问共享资源 lock.acquire() # 再次加锁 abce = abce + 2 lock.release() # 释放里面的锁 lock.release() # 释放外面的锁 print(abce) 上面就够用了。 (3)Condition状态 (4)Semaphore信号量 (5)Event事件 (6)Barrier屏障 (编辑:PHP编程网 - 钦州站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
推荐文章
站长推荐

浙公网安备 33038102330484号