Python学习笔记(进程和线程)
1、多进程
fork
os.fork()创建一个子进程,只能在Linux系统上使用,返回一个值。返回两次,系统把父进程复制一份当成子进程,分别返回。子进程返回0,父进程返回子进程ID。
os.getpid()获取本进程的pid号
os.getppid()获取父进程的pid号
运行结果如下:
子进程有父进程资源的拷贝,在子进程中修改变量对父进程不会造成影响。
multiprocessing
multiprocessing是跨平台的多进程模块,里面有个Process类,可以在任何平台上进行创建子进程。
#multiprocessing.Process([group [, target [, name [, args [, kwargs]]]]])
#先讲解参数: group:默认值为None,应该一直为None,他只是仅仅为了与线程兼容而存在,一般用不到
target:需要让子程序进行的操作,传递一个函数。默认不写为Process对象中的
run()方法。
name:子进程的别名
args: 应该为一个元组,这是用来传递进入函数的参数,也就是给子进程的参数。
kwargs: 这是一个字典,同args一样的作用。
#Process返回一个对象,假设为p。
#对象拥有如下几种方法:
1.p.is_alive():判断子进程是否在运行,返回True代表在运行,False为停止了
2.p.join([timeout]):等待进程实例完成,或者等待指定时间。可以传递一个参数
单位是s. 默认是一直等待进程实例完成。
3.p.start(): 开始运行进程实例(创建子进程运行)
4.p.run():在创建对象的时候,如果没有传递target值,那么就运行这个函数
5.p.terminate():不管子进程是否运行完,立即结束它。
#对象还有如下两个属性:
1.name:子进程的别名,默认为Process-n
2.pid: 子进程的Pid号。唯一
例子:启动一个子进程并等待其结束
执行结果如下:
Pool
如果要启动大量的子进程,可以用进程池的方式批量创建子进程:
#pool和Process一样也是一个类,咱们实际上是实例一个对象。
#pool(max_process_num): 返回一个对象,max_process_num代表着最大进程数量
# 特别解释,进程池,代表着有个最大容纳量,也就是max_process_num
# 当已经有max_process_num个进程在运行了,如果咱们还往里面添加进程
# 那么就会在这暂停,等待空闲位置,然后再加进去。
#pool对象的方法:
1.p.apply_async(func,[args,[kwargs]]): 非堵塞的往进程池中添加进程,func
指的是子进程运行的函数,args是一个元组,传递函数的参数。kwargs是个字典,同理
2.p.apply(func,[args,[kwargs]):阻塞的往进程池中添加进程。
3.p.close():关闭进程池,使其不能再往里面添加进程
4.p.join():主进程阻塞,等待执行完毕,或者等待指定的时间,必须在close或者
terminate后面使用。
5.p.terminate():立即结束子进程
#什么是阻塞?
#答疑:阻塞就是需要等你完成后,才执行下一个进程。就相当于按顺序执行。
例子:非阻塞情况
结果,子进程是并行执行
例子:阻塞情况
结果,子进程是顺序执行
子进程
subprocess.call(args, *, stdin=None, stdout=None, stderr=None, shell=False)执行一个子进程,返回子进程的returncode。
returncode:
args:要执行的命令或可执行文件的路径。一个由字符串组成的序列(通常是列表),列表的第一个元素是可执行程序的路径,剩下的是传给这个程序的参数,如果没有要传给这个程序的参数,args 参数可以仅仅是一个字符串。
例子:
结果
subprocess.Popen(args, bufsize=0, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=False, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0)
创建并返回一个子进程,并在这个进程中执行指定的程序。
参数介绍:
args:要执行的命令或可执行文件的路径。一个由字符串组成的序列(通常是列表),列表的第一个元素是可执行程序的路径,剩下的是传给这个程序的参数,如果没有要传给这个程序的参数,args 参数可以仅仅是一个字符串。
stdin:指定子进程的标准输入;
stdout:指定子进程的标准输出;
stderr:指定子进程的标准错误输出;
对于 stdin, stdout 和 stderr 而言,如果他们是 None(默认情况),那么子进程使用和父进程相同的标准流文件。
父进程如果想要和子进程通过 communicate() 方法通信,对应的参数必须是 subprocess.PIPE(见下文例子);
当然 stdin, stdout 和 stderr 也可以是已经打开的 file 对象,前提是以合理的方式打开,比如 stdin 对应的文件必须要可读等。
p.communicate(input=None)
和子进程 p 交流,将参数 input (字节流bytes)中的数据发送到子进程的 stdin,同时从子进程的 stdout 和 stderr 读取数据,直到EOF。
返回值:
二元组 (stdoutdata, stderrdata) 分别表示从标准出和标准错误中读出的数据。
例子:
进程间通信
进程之间是相互独立的,互不干扰,如果需要通信,需要一个桥梁。
Queue
#Queue是multiprocssing模块中的一个类,他是用来实现多进程的数据传递。 它本身是一个消息队列程序。
#队列:就是类似咱们排队买饭一样,先排队先买饭,俗称先进先出的一种数据结构。
#使用方式: Queue(max_num): 实例化Queue类一个对象,传递的参数是能够容纳最大数量# 的消息,如果max_num值未设置或者是负值的话,代表着# 这个队列的容量将是无限的。直到撑破你的内存。
#Queue对象的方法:
1.q.qsize():返回一个值,查看消息队列中还有多少条数据
2.q.empty():判断队列是否为空,如果为空的话,那么就返回True,否则为False
3.q.full():判读队列是否饱满,如果满的话,返回True,否则为False
4.q.put(item,[block[, timeout]]): 将item数据添加到队列中,block默认为True;
timeout为设置等待时间,如果未设置就是一直等待到可以执行为止。
特别的:<一> 如果block=True,未设置timeout,如果队列满了,那么会在这儿 阻塞。直到消息队列腾出空间,再继续执行。如果设置了 timeout,则只会等待timeout时间,如果还没有位置,那么就会 抛出异常。
<二> 如果block=False,当没有空间了,就立马抛出异常
5.q.put_nowait(item):效果如同 q.put(item,block=False)
6.q.get([block[,timeout]): 从消息队列中取出一条消息,此时队列中的消息被移 除。
特别的: <一>如果block=True,未设置timeout,如果队列空了,那么会在这儿阻 塞;直到消息队列中有信息后,再执行。 如果设置了timeout,那么 会等待timeout时间,如果还没有消息,那么就会抛出异常
<二>如果block=False,那么当队列中没有消息后,立马抛出异常。
7.q.get_nowait(): 效果如同 q.get(block=False)
例子:
结果
fork
os.fork()创建一个子进程,只能在Linux系统上使用,返回一个值。返回两次,系统把父进程复制一份当成子进程,分别返回。子进程返回0,父进程返回子进程ID。
os.getpid()获取本进程的pid号
os.getppid()获取父进程的pid号
import os
print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.
multiprocessing
multiprocessing是跨平台的多进程模块,里面有个Process类,可以在任何平台上进行创建子进程。
#multiprocessing.Process([group [, target [, name [, args [, kwargs]]]]])
#先讲解参数: group:默认值为None,应该一直为None,他只是仅仅为了与线程兼容而存在,一般用不到
target:需要让子程序进行的操作,传递一个函数。默认不写为Process对象中的
run()方法。
name:子进程的别名
args: 应该为一个元组,这是用来传递进入函数的参数,也就是给子进程的参数。
kwargs: 这是一个字典,同args一样的作用。
#Process返回一个对象,假设为p。
#对象拥有如下几种方法:
1.p.is_alive():判断子进程是否在运行,返回True代表在运行,False为停止了
2.p.join([timeout]):等待进程实例完成,或者等待指定时间。可以传递一个参数
单位是s. 默认是一直等待进程实例完成。
3.p.start(): 开始运行进程实例(创建子进程运行)
4.p.run():在创建对象的时候,如果没有传递target值,那么就运行这个函数
5.p.terminate():不管子进程是否运行完,立即结束它。
#对象还有如下两个属性:
1.name:子进程的别名,默认为Process-n
2.pid: 子进程的Pid号。唯一
例子:启动一个子进程并等待其结束
from multiprocessing import Process
import os
# 子进程要执行的代码
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')
Parent process 928.
Child process will start.
Run child process test (929)...
Process end.
如果要启动大量的子进程,可以用进程池的方式批量创建子进程:
#pool和Process一样也是一个类,咱们实际上是实例一个对象。
#pool(max_process_num): 返回一个对象,max_process_num代表着最大进程数量
# 特别解释,进程池,代表着有个最大容纳量,也就是max_process_num
# 当已经有max_process_num个进程在运行了,如果咱们还往里面添加进程
# 那么就会在这暂停,等待空闲位置,然后再加进去。
#pool对象的方法:
1.p.apply_async(func,[args,[kwargs]]): 非堵塞的往进程池中添加进程,func
指的是子进程运行的函数,args是一个元组,传递函数的参数。kwargs是个字典,同理
2.p.apply(func,[args,[kwargs]):阻塞的往进程池中添加进程。
3.p.close():关闭进程池,使其不能再往里面添加进程
4.p.join():主进程阻塞,等待执行完毕,或者等待指定的时间,必须在close或者
terminate后面使用。
5.p.terminate():立即结束子进程
#什么是阻塞?
#答疑:阻塞就是需要等你完成后,才执行下一个进程。就相当于按顺序执行。
例子:非阻塞情况
import multiprocessing,os,time,random
def sub_pro(i):
print('子程序%d 正在执行,id=%s'%(i,os.getpid()))
time.sleep(random.random()*5)
print('子程序%d 结束,id=%s'%(i,os.getpid()))
if __name__ == "__main__":
p=multiprocessing.Pool(4)
for i in range(1,5):
q=p.apply_sync(func=sub_pro,args=(i,))
p.close()
p.join()
print('任务结束')
子程序1 正在执行,id=9444
子程序2 正在执行,id=6800
子程序3 正在执行,id=4656
子程序4 正在执行,id=9892
子程序3 结束,id=4656
子程序2 结束,id=6800
子程序1 结束,id=9444
子程序4 结束,id=9892
任务结束
import multiprocessing,os,time,random
def sub_pro(i):
print('子程序%d 正在执行,id=%s'%(i,os.getpid()))
time.sleep(random.random()*5)
print('子程序%d 结束,id=%s'%(i,os.getpid()))
if __name__ == "__main__":
p=multiprocessing.Pool(4)
for i in range(1,5):
q=p.apply(func=sub_pro,args=(i,))
p.close()
p.join()
print('任务结束')
子程序1 正在执行,id=464
子程序1 结束,id=464
子程序2 正在执行,id=7140
子程序2 结束,id=7140
子程序3 正在执行,id=7404
子程序3 结束,id=7404
子程序4 正在执行,id=15412
子程序4 结束,id=15412
任务结束
subprocess.call(args, *, stdin=None, stdout=None, stderr=None, shell=False)执行一个子进程,返回子进程的returncode。
returncode:
- None —— 子进程尚未结束;
- ==0 —— 子进程正常退出;
- > 0—— 子进程异常退出,returncode对应于出错码;
- < 0—— 子进程被信号杀掉了。
args:要执行的命令或可执行文件的路径。一个由字符串组成的序列(通常是列表),列表的第一个元素是可执行程序的路径,剩下的是传给这个程序的参数,如果没有要传给这个程序的参数,args 参数可以仅仅是一个字符串。
例子:
import subprocess
print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)
$ nslookup www.python.org
服务器: RT-AC86U-EE70
Address: 192.168.50.1
非权威应答:
名称: dualstack.python.map.fastly.net
Addresses: 2a04:4e42:1a::223
151.101.228.223
Aliases: www.python.org
Exit code: 0
创建并返回一个子进程,并在这个进程中执行指定的程序。
参数介绍:
args:要执行的命令或可执行文件的路径。一个由字符串组成的序列(通常是列表),列表的第一个元素是可执行程序的路径,剩下的是传给这个程序的参数,如果没有要传给这个程序的参数,args 参数可以仅仅是一个字符串。
stdin:指定子进程的标准输入;
stdout:指定子进程的标准输出;
stderr:指定子进程的标准错误输出;
对于 stdin, stdout 和 stderr 而言,如果他们是 None(默认情况),那么子进程使用和父进程相同的标准流文件。
父进程如果想要和子进程通过 communicate() 方法通信,对应的参数必须是 subprocess.PIPE(见下文例子);
当然 stdin, stdout 和 stderr 也可以是已经打开的 file 对象,前提是以合理的方式打开,比如 stdin 对应的文件必须要可读等。
p.communicate(input=None)
和子进程 p 交流,将参数 input (字节流bytes)中的数据发送到子进程的 stdin,同时从子进程的 stdout 和 stderr 读取数据,直到EOF。
返回值:
二元组 (stdoutdata, stderrdata) 分别表示从标准出和标准错误中读出的数据。
例子:
import subprocess
print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)
进程间通信
进程之间是相互独立的,互不干扰,如果需要通信,需要一个桥梁。
Queue
#Queue是multiprocssing模块中的一个类,他是用来实现多进程的数据传递。 它本身是一个消息队列程序。
#队列:就是类似咱们排队买饭一样,先排队先买饭,俗称先进先出的一种数据结构。
#使用方式: Queue(max_num): 实例化Queue类一个对象,传递的参数是能够容纳最大数量# 的消息,如果max_num值未设置或者是负值的话,代表着# 这个队列的容量将是无限的。直到撑破你的内存。
#Queue对象的方法:
1.q.qsize():返回一个值,查看消息队列中还有多少条数据
2.q.empty():判断队列是否为空,如果为空的话,那么就返回True,否则为False
3.q.full():判读队列是否饱满,如果满的话,返回True,否则为False
4.q.put(item,[block[, timeout]]): 将item数据添加到队列中,block默认为True;
timeout为设置等待时间,如果未设置就是一直等待到可以执行为止。
特别的:<一> 如果block=True,未设置timeout,如果队列满了,那么会在这儿 阻塞。直到消息队列腾出空间,再继续执行。如果设置了 timeout,则只会等待timeout时间,如果还没有位置,那么就会 抛出异常。
<二> 如果block=False,当没有空间了,就立马抛出异常
5.q.put_nowait(item):效果如同 q.put(item,block=False)
6.q.get([block[,timeout]): 从消息队列中取出一条消息,此时队列中的消息被移 除。
特别的: <一>如果block=True,未设置timeout,如果队列空了,那么会在这儿阻 塞;直到消息队列中有信息后,再执行。 如果设置了timeout,那么 会等待timeout时间,如果还没有消息,那么就会抛出异常
<二>如果block=False,那么当队列中没有消息后,立马抛出异常。
7.q.get_nowait(): 效果如同 q.get(block=False)
例子:
import multiprocessing,time,random
def putq(q):
print('在队列中接入数据')
for each in ['1','2','3']:
print('将%s加入队列'%each)
q.put(each)
print('全部加入完成')
def readq(q):
print('在队列中读取数据')
while True:
if not q.empty():
value=q.get()
print('取得数值%s'%value)
if __name__ == "__main__":
q=multiprocessing.Queue()
a=multiprocessing.Process(target=putq,args=(q,))
b=multiprocessing.Process(target=readq,args=(q,))
a.start()
b.start()
a.join()
time.sleep(random.random()*5)
print('数据全部获取')
#由于readq进程是死循环,无法等待结束,只能强制结束
b.terminate()
在队列中接入数据
在队列中读取数据
将1加入队列
将2加入队列
取得数值1
将3加入队列
取得数值2
全部加入完成
取得数值3
数据全部获取
2、多线程
进程是由若干线程组成的,一个进程至少有一个线程。可以使用使用threading这个高级模块。
threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)创建一个线程。
参数解释:
1.group 应该为 None;为了日后扩展 ThreadGroup 类实现而保留。
2.target 是用于 run() 方法调用的可调用对象。默认是 None,表示不需要调用任 何方法。
3.name 是线程名称。默认情况下,由 "Thread-N" 格式构成一个唯一的名称,其 中N 是小的十进制数。
4.args 是用于调用目标函数的参数元组。默认是 ()。
5.kwargs 是用于调用目标函数的关键字参数字典。默认是 {}。
threading.current_thread(),返回当前线程的 Thread 对象。
threading.enumerate()方法,方法返回一个列表,里面存放着目前存在的线程。
Thread对象的方法:
1.start(),开始线程活动
2.join(),等待,直到线程终结。
3.name,线程的名称。
例子:
结果
Lock
线程和进程不一样,不同进程的资源是分开的额,线程是可以共享全局资源的,所有变量都由所有线程共享。任何一个变量都可以被任何一个线程修改,是有隐患的。
互斥锁,Lock类 : 它的作用就是解决多线程修改全局变量产生的结果不一样的问题。锁只有一个,无论多少线程,同一时刻最多只有一个线程持有该锁,所以,不会造成修改的冲突。
锁的好处:
确保了某段关键代码只能由一个线程从头到尾完整地执行
锁的坏处:
阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了。由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁
参数解释:
1.group 应该为 None;为了日后扩展 ThreadGroup 类实现而保留。
2.target 是用于 run() 方法调用的可调用对象。默认是 None,表示不需要调用任 何方法。
3.name 是线程名称。默认情况下,由 "Thread-N" 格式构成一个唯一的名称,其 中N 是小的十进制数。
4.args 是用于调用目标函数的参数元组。默认是 ()。
5.kwargs 是用于调用目标函数的关键字参数字典。默认是 {}。
threading.current_thread(),返回当前线程的 Thread 对象。
threading.enumerate()方法,方法返回一个列表,里面存放着目前存在的线程。
Thread对象的方法:
1.start(),开始线程活动
2.join(),等待,直到线程终结。
3.name,线程的名称。
例子:
import threading
def threadcount():
print('%s is running'%threading.current_thread().name)
n=0
while n < 5:
print('%s:%d'%(threading.current_thread().name,n))
n+=1
print('%s ended'%threading.current_thread().name)
if __name__ == "__main__":
print('%s is running'%threading.current_thread().name)
q=threading.Thread(target=threadcount,name='thread1',)
q.start()
q.join()
MainThread is running
thread1 is running
thread1:0
thread1:1
thread1:2
thread1:3
thread1:4
thread1 ended
Lock
线程和进程不一样,不同进程的资源是分开的额,线程是可以共享全局资源的,所有变量都由所有线程共享。任何一个变量都可以被任何一个线程修改,是有隐患的。
互斥锁,Lock类 : 它的作用就是解决多线程修改全局变量产生的结果不一样的问题。锁只有一个,无论多少线程,同一时刻最多只有一个线程持有该锁,所以,不会造成修改的冲突。
import threading,time,random
lock=threading.Lock()
def changenum(n):
global num
print('线程%s执行,num的值是%d'%(threading.current_thread().name,num))
lock.acquire()
try:
num+=n
time.sleep(random.random())
num-=n
finally:
lock.release()
print('线程%s执行完,num的值是%d'%(threading.current_thread().name,num))
if __name__ == "__main__":
lock=threading.Lock()
num=0
a=threading.Thread(target=changenum,name='Thread1',args=(3,))
b=threading.Thread(target=changenum,name='Thread2',args=(8,))
a.start()
b.start()
a.join()
b.join()
print('done')
确保了某段关键代码只能由一个线程从头到尾完整地执行
锁的坏处:
阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了。由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁
3、ThreadLocal
在多线程环境下,线程使用自己的局部变量比使用全局变量好,因为局部变量只有线程自己能看见,不会影响其他线程,而全局变量的修改必须加锁。
但是使用局部变量时,如果涉及多重函数调用,需要反复的把调用的变量写入函数的参数,一直传递下去,非常麻烦。
def process_student(name):
std = Student(name)
# std是局部变量,但是每个函数都要用它,因此必须传进去:
do_task_1(std)
do_task_2(std)
def do_task_1(std):
do_subtask_1(std)
do_subtask_2(std)
def do_task_2(std):
do_subtask_1(std)
do_subtask_2(std)
ThreadLocal可以解决这个问题,一个ThreadLocal变量虽然是全局变量,但每个线程都只能读写自己线程的独立副本,互不干扰。可以理解TheradLocal就是一个全局字典。
import threading
l=threading.local()
#全局变量l是一个ThreadLocal对象,每个Thread对它都可以读写student属性,但互不影响。
def prname():
print('%s is running %s'%(threading.current_thread().name,l.student))
def pross(name):
l.student=name
return prname()
if __name__ == "__main__":
a=threading.Thread(target=pross,args=('AA',))
b=threading.Thread(target=pross,args=('BB',))
a.start()
b.start()
a.join()
b.join()
结果
Thread-1 is running AA
Thread-2 is running BB
你可以把l看成全局变量,但每个属性如l.student都是线程的局部变量,可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal内部会处理。
4、进程 vs. 线程
多进程的优点是稳定性高,一个子进程崩溃了不会影响其他进程。
多进程的缺点是创建进程的代价大。
多线程缺点是任何一个线程挂掉都可能直接造成整个进程崩溃,因为所有线程共享进程的内存。
线程切换
在线程切换时,系统需要先保留现场环境,然后准备好新任务的执行环境,所以切换过程需要时间。如果过多的任务同时进行,系统频繁切换,效率就会急剧下降。
计算密集型 vs. IO密集型
计算密集型任务的特点是要进行大量的计算,消耗CPU资源,代码运行效率至关重要。Python这样的脚本语言运行效率低,不适合计算密集型任务。最好使用C语言。
IO密集型,涉及到网络、磁盘IO等任务,这类任务CPU消耗很少,99%的时间都花在IO上,这类任务使用Python最为适合。
多进程的缺点是创建进程的代价大。
多线程缺点是任何一个线程挂掉都可能直接造成整个进程崩溃,因为所有线程共享进程的内存。
线程切换
在线程切换时,系统需要先保留现场环境,然后准备好新任务的执行环境,所以切换过程需要时间。如果过多的任务同时进行,系统频繁切换,效率就会急剧下降。
计算密集型 vs. IO密集型
计算密集型任务的特点是要进行大量的计算,消耗CPU资源,代码运行效率至关重要。Python这样的脚本语言运行效率低,不适合计算密集型任务。最好使用C语言。
IO密集型,涉及到网络、磁盘IO等任务,这类任务CPU消耗很少,99%的时间都花在IO上,这类任务使用Python最为适合。
5、分布式进程
Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。multiprocessing的managers子模块,支持把多进程分布到多台机器上。
class multiprocessing.managers.BaseManager([address[, authkey]])创建一个 BaseManager 对象。
参数解释:
address 是管理器服务进程监听的地址。如果 address 是 None ,则允许和任意主机的请求建立连接。
authkey 是认证标识,用于检查连接服务进程的请求合法性。如果 authkey 是 None, 则会使用 current_process().authkey , 否则,就使用 authkey , 需要保证它必须是 byte 类型的字符串。
BaseManager有如下方法:
1)start()为管理器开启一个子进程。
2)connect()将本地管理器对象连接到一个远程管理器进程:
3)shutdown()停止管理器的进程。
4)register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])
一个 classmethod,可以将一个类型或者可调用对象注册到管理器类。
typeid 是一种 "类型标识符",用于唯一表示某种共享对象类型,必须是一个字符串。
callable 是一个用来为此类型标识符创建对象的可调用对象。如果一个管理器实例将使用 connect() 方法连接到服务器,或者 create_method 参数为 False,那么这里可留下 None。
proxytype 是 BaseProxy 的子类,可以根据 typeid 为共享对象创建一个代理,如果是 None , 则会自动创建一个代理类。
exposed 是一个函数名组成的序列,用来指明只有这些方法可以使用 BaseProxy._callmethod() 代理。(如果 exposed 是 None, 则会在 proxytype._exposed_ 存在的情况下转而使用它) 当暴露的方法列表没有指定的时候,共享对象的所有 “公共方法” 都会被代理。(这里的“公共方法”是指所有拥有 __call__() 方法并且不是以 '_' 开头的属性)
method_to_typeid 是一个映射,用来指定那些应该返回代理对象的暴露方法所返回的类型。(如果 method_to_typeid 是 None, 则 proxytype._method_to_typeid_ 会在存在的情况下被使用)如果方法名称不在这个映射中或者映射是 None ,则方法返回的对象会是一个值拷贝。
create_method 指明,是否要创建一个以 typeid 命名并返回一个代理对象的方法,这个函数会被服务进程用于创建共享对象,默认为 True 。
例子:
服务进程
工作进程
不同Queue的区别:
1.from queue import Queue
这个是普通的队列模式,类似于普通列表,先进先出模式,get方法会阻塞请求,直到有数据get出来为止
2.from multiprocessing.Queue import Queue(各子进程共有)
这个是多进程并发的Queue队列,用于解决多进程间的通信问题。普通Queue实现不了。例如来跑多进程对一批IP列表进行运算,运算后的结果都存到Queue队列里面,这个就必须使用multiprocessing提供的Queue来实现
class multiprocessing.managers.BaseManager([address[, authkey]])创建一个 BaseManager 对象。
参数解释:
address 是管理器服务进程监听的地址。如果 address 是 None ,则允许和任意主机的请求建立连接。
authkey 是认证标识,用于检查连接服务进程的请求合法性。如果 authkey 是 None, 则会使用 current_process().authkey , 否则,就使用 authkey , 需要保证它必须是 byte 类型的字符串。
BaseManager有如下方法:
1)start()为管理器开启一个子进程。
2)connect()将本地管理器对象连接到一个远程管理器进程:
3)shutdown()停止管理器的进程。
4)register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])
一个 classmethod,可以将一个类型或者可调用对象注册到管理器类。
typeid 是一种 "类型标识符",用于唯一表示某种共享对象类型,必须是一个字符串。
callable 是一个用来为此类型标识符创建对象的可调用对象。如果一个管理器实例将使用 connect() 方法连接到服务器,或者 create_method 参数为 False,那么这里可留下 None。
proxytype 是 BaseProxy 的子类,可以根据 typeid 为共享对象创建一个代理,如果是 None , 则会自动创建一个代理类。
exposed 是一个函数名组成的序列,用来指明只有这些方法可以使用 BaseProxy._callmethod() 代理。(如果 exposed 是 None, 则会在 proxytype._exposed_ 存在的情况下转而使用它) 当暴露的方法列表没有指定的时候,共享对象的所有 “公共方法” 都会被代理。(这里的“公共方法”是指所有拥有 __call__() 方法并且不是以 '_' 开头的属性)
method_to_typeid 是一个映射,用来指定那些应该返回代理对象的暴露方法所返回的类型。(如果 method_to_typeid 是 None, 则 proxytype._method_to_typeid_ 会在存在的情况下被使用)如果方法名称不在这个映射中或者映射是 None ,则方法返回的对象会是一个值拷贝。
create_method 指明,是否要创建一个以 typeid 命名并返回一个代理对象的方法,这个函数会被服务进程用于创建共享对象,默认为 True 。
例子:
服务进程
# task_master.py
import random, time, queue
from multiprocessing.managers import BaseManager
# 发送任务的队列:
task_queue = queue.Queue()
# 接收结果的队列:
result_queue = queue.Queue()
# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
pass
# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定端口5000, 设置验证码'abc':
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 启动Queue:
manager.start()
# 获得通过网络访问的Queue对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去:
for i in range(10):
n = random.randint(0, 10000)
print('Put task %d...' % n)
task.put(n)
# 从result队列读取结果:
print('Try get results...')
for i in range(10):
r = result.get(timeout=10)
print('Result: %s' % r)
# 关闭:
manager.shutdown()
print('master exit.')
# task_worker.py
import time, sys, queue
from multiprocessing.managers import BaseManager
# 创建类似的QueueManager:
class QueueManager(BaseManager):
pass
# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 连接到服务器,也就是运行task_master.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码注意保持与task_master.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 从网络连接:
m.connect()
# 获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
for i in range(10):
try:
n = task.get(timeout=1)
print('run task %d * %d...' % (n, n))
r = '%d * %d = %d' % (n, n, n*n)
time.sleep(1)
result.put(r)
except Queue.Empty:
print('task queue is empty.')
# 处理结束:
print('worker exit.')
不同Queue的区别:
1.from queue import Queue
这个是普通的队列模式,类似于普通列表,先进先出模式,get方法会阻塞请求,直到有数据get出来为止
2.from multiprocessing.Queue import Queue(各子进程共有)
这个是多进程并发的Queue队列,用于解决多进程间的通信问题。普通Queue实现不了。例如来跑多进程对一批IP列表进行运算,运算后的结果都存到Queue队列里面,这个就必须使用multiprocessing提供的Queue来实现
评论
发表评论