因进程空间相对独立, 资源无法相互获取, 此时在不同进程间通信需要专门方法.
进程间通信方法: 管道 消息队列 共享内存 信号 信号量 套接字
1. 管道通信 Pipe
通信原理: 在内存中开辟管道空间生成管道操作对象, 多个进程使用"同一个"管道对象进行操作即可实现通信
multiprocessing / Pipefd1, fd2 = Pipe(duplex = True)
功能: 创建管道 参数: 默认表示双向管道 设置为False则为单向管道 返回值: 表示管道的两端 如果是双向管道, 都可以读写 如果是单向管道, 则fd1只读, fd2只写
fd.recv()
功能: 从管道读取信息 返回值: 读取到的内容 * 如果管道为空则阻塞
fd.send(data)
功能: 向管道写入内容 参数: 要写入的内容 * 可以发送python数据类型
from multprocessing import Process, Pipeimport os, timefd1, fd2 = Pipe()def fun(name): time.sleep(3) fd1.send('hello'+str(name))jobs = []for i in range(5): p = Process(target = fun, args=(i,)) jobs.append(p) p.start()for i in range(5): data = fd2.recv() print(data)for i in jobs: i.join()
2. 消息队列
队列: 先进先出
通信原理: 在内存中建立队列数据结构模型. 多个进程都可以通过队列存入内容, 取出内容的顺序和存入顺序保持一致创建队列
q = Queue(maxsize = 0) 功能: 创建消息队列 参数: 表示最多存放多少消息. 默认表示根据内存分配存储 返回值: 队列对象
q.put(data, [block, timeout])
功能: 向队列存储消息 参数: data 要存的内容 block 默认队列满时会阻塞, 设置为False则非阻塞 timeout 超时时间
data = q.get([block, timeout])
功能: 获取队列消息 参数: block 默认队列空时会阻塞, 设置为False则非阻塞 timeout 超时时间
q.full() 判断队列是否为满q.empty() 判断队列是否为空q.qsize() 判断队列中存在的消息数量q.close() 关闭队列
from multiprocessing import Queuefrom time import sleep#创建队列q = Queue(3)q.put(1)print(q.empty()) #True 内存还来不及写入, 所以判断为队列空q.put(2)print(q.get())print(q.size())q.close()
from multiprocessing import Process, Queueimport time#创建消息队列q = Queue()def fun1(): time.sleep(1) q.put({'a':1, 'b':2})def fun2(): time.sleep(2) print('收到消息:', q.get())p1 = Process(target = fun1)p2 = Process(target = fun2)p1.start()p2.start()p1.join()p2.join()
3. 共享内存sharememory
通信原理: 在内存中开辟一块空间, 对多个进程可见, 进程可以写入, 但是每次写入的内容会覆盖之前的内容.
obj = Value(ctype, obj)
功能: 开辟共享内存空间 参数: ctype 要存储的数据类型 obj 共享内存的初化数据 返回: 共享内存对象
from multiprocessing import Process, Valueimport timeimport random#创建共享内存money = Value('i', 2000)def deposite(): for i in range(100): time.sleep(0.05) #对value属性操作即操作共存数据 money.value += random.randint(1, 200)#取钱def withdraw(): for i in range(100): time.sleep(0.04) money.value -= random.randint(1, 180)d = Process(target = deposite)w = Process(target = withdraw)d.start()w.start()d.join()w.join()print('余额:', money.value)
obj.value 即为共享内存值, 对其修改即修改共享内存
obj = Array(ctype, obj)
功能: 开辟共享内存空间 参数: ctype 要存储的数据格式 obj 初始化存入的内容, 比如列表, 字符串 如果是整数则表示开辟空间的个数 返回值: 返回共享内存对象
- 可以通过遍历获取每个元素的值 e.g. [1, 2, 3] ----> obj[1] ==2
- 如果存入的是字符串 obj.value 表示字符串的首地址
from multiprocessing import Process, Arrayimport time##创建共享内存, 初始放入列表#shm = Array('i', [1, 2, 3, 4, 5])##创建共享内存,开辟5个整型空间#shm = Array('i', 5) #打印出是5 个0#存入字符串, 要求bytes格式shm = Array('c', b'Hello')def fun(): for i in shm: print(i) #shm[3] = 10000 shm[0] = b'h'p = Process(target = fun)p.start()p.join()for i in shm: print(i)print(shm.value) #r打印字符串
管道 | 消息队列 | 共享内存 | |
---|---|---|---|
读写方式 | 两端读写双向/单向 | 先进先出 | 覆盖之前内容 |
效率 | 一般 | 一般 | 较高 |
应用 | 多用于父子进程 | 广泛灵活 | 需要注意进行互斥操作 |
4. 信号通信
一个进程向另一个进程发送一个信号来传递某种讯息, 接受者根据接收到的信号进行相应的行为
- kill -l 查看系统信号
- kill -sig PID 向一个进程发送信号
关于信号
python 发送信号
signal
os.kill(pid, sig)
功能:发送信号 参数: pid 目标进程 sig 要发送的信号
import signalimport os向20959发送信号os.kill(20959, signal.SIGKILL)
from signal import * import time #信号处理函数def handler(sig,frame): if sig == SIGALRM: print("接收到时钟信号") elif sig == SIGINT: print("就不结束")alarm(5)signal(SIGALRM,handler)signal(SIGINT,handler)while True: print("Waiting for a signal") time.sleep(2)
day7
signal.alarm(sec)
向自身
发送时钟信号 SIGALRM sec 时钟时间
同步执行: 按照顺序逐句执行, 一步完成再做下一步
异步执行: 在执行过程中利用内核记录延迟发生或者准备处理的事件. 这样不影响应用层的持续执行.当事件发生时再由内核告知应用层处理- 信号是唯一的异步通信方法
import signalimport timesignal.alarm(3)while True: time.sleep(1) print('88888')
signal.pause()
阻塞等待接收一个信号
signal.signal(signum, handler)
处理信号 signum 要处理的信号 handler 信号的处理方法
- SIG_DFL 表示使用默认的方法处理
- SIG_IGN 表示忽略这个信号
- func 传入一个函数表示用指定函数处理
- def func(sig, frame)
- sig:捕获到的信号
- frame: 信号对象
5. 信号量(信号灯)
给定一个数量, 对多个进程可见, 且多个进程都可以操作. 进程通过对数量多少的判断执行各自的行为.
multiprocessing/Semaphore()Semaphore(num)
创建信号量 num信号量初始值 返回: 信号量对象
sem.get_value() 获取信号量值
sem.acquire() 将信号量减1, 当信号量为0时会阻塞sem.release() 将信号量加1