Python学习笔记(进程和线程)

1、多进程
     fork
     os.fork()创建一个子进程,只能在Linux系统上使用,返回一个值。返回两次,系统把父进程复制一份当成子进程,分别返回。子进程返回0,父进程返回子进程ID。
     os.getpid()获取本进程的pid号
     os.getppid()获取父进程的pid号
import os

print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
    print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
    print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
运行结果如下:
Process (876) start...
I (876) just created a child process (877).
I am child process (877and my parent is 876.
      子进程有父进程资源的拷贝,在子进程中修改变量对父进程不会造成影响。
     multiprocessing
     multiprocessing是跨平台的多进程模块,里面有个Process类,可以在任何平台上进行创建子进程。
#multiprocessing.Process([group [, target [, name [, args [, kwargs]]]]])
#先讲解参数: group:默认值为None,应该一直为None,他只是仅仅为了与线程兼容而存在,一般用不到
             target:需要让子程序进行的操作,传递一个函数。默认不写为Process对象中的
                    run()方法。
             name:子进程的别名
             args: 应该为一个元组,这是用来传递进入函数的参数,也就是给子进程的参数。
             kwargs: 这是一个字典,同args一样的作用。
#Process返回一个对象,假设为p。
#对象拥有如下几种方法:
               1.p.is_alive():判断子进程是否在运行,返回True代表在运行,False为停止了
               2.p.join([timeout]):等待进程实例完成,或者等待指定时间。可以传递一个参数
                 单位是s. 默认是一直等待进程实例完成。
               3.p.start(): 开始运行进程实例(创建子进程运行)
               4.p.run():在创建对象的时候,如果没有传递target值,那么就运行这个函数
               5.p.terminate():不管子进程是否运行完,立即结束它。
#对象还有如下两个属性:
               1.name:子进程的别名,默认为Process-n
               2.pid: 子进程的Pid号。唯一
       例子:启动一个子进程并等待其结束
from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
    print('Run child process %s (%s)...' % (name, os.getpid()))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Process(target=run_proc, args=('test',))
    print('Child process will start.')
    p.start()
    p.join()
    print('Child process end.')
        执行结果如下:
Parent process 928.
Child process will start.
Run child process test (929)...
Process end.
       Pool
       如果要启动大量的子进程,可以用进程池的方式批量创建子进程:
#pool和Process一样也是一个类,咱们实际上是实例一个对象。
#pool(max_process_num): 返回一个对象,max_process_num代表着最大进程数量
#                       特别解释,进程池,代表着有个最大容纳量,也就是max_process_num
#                       当已经有max_process_num个进程在运行了,如果咱们还往里面添加进程
#                       那么就会在这暂停,等待空闲位置,然后再加进去。

#pool对象的方法:
                 1.p.apply_async(func,[args,[kwargs]]): 非堵塞的往进程池中添加进程,func
                   指的是子进程运行的函数,args是一个元组,传递函数的参数。kwargs是个字典,同理
                 2.p.apply(func,[args,[kwargs]):阻塞的往进程池中添加进程。
                 3.p.close():关闭进程池,使其不能再往里面添加进程
                 4.p.join():主进程阻塞,等待执行完毕,或者等待指定的时间,必须在close或者
                   terminate后面使用。
                 5.p.terminate():立即结束子进程
#什么是阻塞?
#答疑:阻塞就是需要等你完成后,才执行下一个进程。就相当于按顺序执行。
         例子:非阻塞情况
import multiprocessing,os,time,random

def sub_pro(i):
    print('子程序%d 正在执行,id=%s'%(i,os.getpid()))
    time.sleep(random.random()*5)
    print('子程序%d 结束,id=%s'%(i,os.getpid()))

if __name__ == "__main__":
    p=multiprocessing.Pool(4)
    for i in range(1,5):
        q=p.apply_sync(func=sub_pro,args=(i,))
    p.close()
    p.join()
    print('任务结束')
         结果,子进程是并行执行
子程序1 正在执行,id=9444
子程序2 正在执行,id=6800
子程序3 正在执行,id=4656
子程序4 正在执行,id=9892
子程序3 结束,id=4656
子程序2 结束,id=6800
子程序1 结束,id=9444
子程序4 结束,id=9892
任务结束
          例子:阻塞情况
import multiprocessing,os,time,random

def sub_pro(i):
    print('子程序%d 正在执行,id=%s'%(i,os.getpid()))
    time.sleep(random.random()*5)
    print('子程序%d 结束,id=%s'%(i,os.getpid()))

if __name__ == "__main__":
    p=multiprocessing.Pool(4)
    for i in range(1,5):
        q=p.apply(func=sub_pro,args=(i,))
    p.close()
    p.join()
    print('任务结束')
          结果,子进程是顺序执行
子程序1 正在执行,id=464
子程序1 结束,id=464
子程序2 正在执行,id=7140
子程序2 结束,id=7140
子程序3 正在执行,id=7404
子程序3 结束,id=7404
子程序4 正在执行,id=15412
子程序4 结束,id=15412
任务结束
         子进程
         subprocess.call(args, *, stdin=None, stdout=None, stderr=None, shell=False)执行一个子进程,返回子进程的returncode。
returncode:
  • None —— 子进程尚未结束;
  • ==0 —— 子进程正常退出;
  • > 0—— 子进程异常退出,returncode对应于出错码;
  • < 0—— 子进程被信号杀掉了。
参数介绍:
         args:要执行的命令或可执行文件的路径。一个由字符串组成的序列(通常是列表),列表的第一个元素是可执行程序的路径,剩下的是传给这个程序的参数,如果没有要传给这个程序的参数,args 参数可以仅仅是一个字符串。
         例子:
import subprocess

print('$ nslookup www.python.org')
r = subprocess.call(['nslookup''www.python.org'])
print('Exit code:', r)
         结果
$ nslookup www.python.org
服务器:  RT-AC86U-EE70
Address:  192.168.50.1

非权威应答:
名称:    dualstack.python.map.fastly.net
Addresses:  2a04:4e42:1a::223
          151.101.228.223
Aliases:  www.python.org

Exit code: 0
         subprocess.Popen(args, bufsize=0, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=False, shell=False, cwd=None, env=None, universal_newlines=False, startupinfo=None, creationflags=0)
  创建并返回一个子进程,并在这个进程中执行指定的程序。
参数介绍:
       args:要执行的命令或可执行文件的路径。一个由字符串组成的序列(通常是列表),列表的第一个元素是可执行程序的路径,剩下的是传给这个程序的参数,如果没有要传给这个程序的参数,args 参数可以仅仅是一个字符串。
       stdin:指定子进程的标准输入;
       stdout:指定子进程的标准输出;
       stderr:指定子进程的标准错误输出;
  对于 stdin, stdout 和 stderr 而言,如果他们是 None(默认情况),那么子进程使用和父进程相同的标准流文件。
  父进程如果想要和子进程通过 communicate() 方法通信,对应的参数必须是 subprocess.PIPE(见下文例子);
  当然 stdin, stdout 和 stderr 也可以是已经打开的 file 对象,前提是以合理的方式打开,比如 stdin 对应的文件必须要可读等。
 
       p.communicate(input=None)
       和子进程 p 交流,将参数 input (字节流bytes)中的数据发送到子进程的 stdin,同时从子进程的 stdout 和 stderr 读取数据,直到EOF。
       返回值:
       二元组 (stdoutdata, stderrdata) 分别表示从标准出和标准错误中读出的数据。
       例子:
import subprocess

print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)

        进程间通信
        进程之间是相互独立的,互不干扰,如果需要通信,需要一个桥梁。
        Queue
#Queue是multiprocssing模块中的一个类,他是用来实现多进程的数据传递。 它本身是一个消息队列程序。
#队列:就是类似咱们排队买饭一样,先排队先买饭,俗称先进先出的一种数据结构。

#使用方式: Queue(max_num): 实例化Queue类一个对象,传递的参数是能够容纳最大数量#                                                的消息,如果max_num值未设置或者是负值的话,代表着#                                                这个队列的容量将是无限的。直到撑破你的内存。

#Queue对象的方法:
                1.q.qsize():返回一个值,查看消息队列中还有多少条数据
                2.q.empty():判断队列是否为空,如果为空的话,那么就返回True,否则为False
                3.q.full():判读队列是否饱满,如果满的话,返回True,否则为False
                4.q.put(item,[block[, timeout]]): 将item数据添加到队列中,block默认为True;
                  timeout为设置等待时间,如果未设置就是一直等待到可以执行为止。
               
                  特别的:<一> 如果block=True,未设置timeout,如果队列满了,那么会在这儿                                             阻塞。直到消息队列腾出空间,再继续执行。如果设置了                                                     timeout,则只会等待timeout时间,如果还没有位置,那么就会                                             抛出异常。
                                <二> 如果block=False,当没有空间了,就立马抛出异常
                5.q.put_nowait(item):效果如同 q.put(item,block=False)
                6.q.get([block[,timeout]): 从消息队列中取出一条消息,此时队列中的消息被移                                                             除。
                    特别的: <一>如果block=True,未设置timeout,如果队列空了,那么会在这儿阻                                    塞;直到消息队列中有信息后,再执行。 如果设置了timeout,那么                                    会等待timeout时间,如果还没有消息,那么就会抛出异常
                                <二>如果block=False,那么当队列中没有消息后,立马抛出异常。
                7.q.get_nowait(): 效果如同 q.get(block=False)
         例子:
import multiprocessing,time,random
def putq(q):
    print('在队列中接入数据')
    for each in ['1','2','3']:
        print('将%s加入队列'%each)
        q.put(each)
    print('全部加入完成')
def readq(q):
    print('在队列中读取数据')
    while True:
        if not q.empty():
            value=q.get()
            print('取得数值%s'%value)
    
if __name__ == "__main__":
    q=multiprocessing.Queue()
    a=multiprocessing.Process(target=putq,args=(q,))
    b=multiprocessing.Process(target=readq,args=(q,))
    a.start()
    b.start()
    a.join()
    time.sleep(random.random()*5)
    print('数据全部获取')
    #由于readq进程是死循环,无法等待结束,只能强制结束
    b.terminate()
          结果
在队列中接入数据
在队列中读取数据
将1加入队列
将2加入队列
取得数值1
将3加入队列
取得数值2
全部加入完成
取得数值3
数据全部获取
       

2、多线程

     进程是由若干线程组成的,一个进程至少有一个线程。可以使用使用threading这个高级模块。
     threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)创建一个线程。
参数解释:
               1.group 应该为 None;为了日后扩展 ThreadGroup 类实现而保留。
               2.target 是用于 run() 方法调用的可调用对象。默认是 None,表示不需要调用任                 何方法。
               3.name 是线程名称。默认情况下,由 "Thread-N" 格式构成一个唯一的名称,其                 中N 是小的十进制数。
               4.args 是用于调用目标函数的参数元组。默认是 ()。
               5.kwargs 是用于调用目标函数的关键字参数字典。默认是 {}。
      threading.current_thread(),返回当前线程的 Thread 对象。
      threading.enumerate()方法,方法返回一个列表,里面存放着目前存在的线程。
      Thread对象的方法:
                1.start(),开始线程活动
                2.join(),等待,直到线程终结。
                3.name,线程的名称。
      例子:
import threading
def threadcount():
    print('%s is running'%threading.current_thread().name)
    n=0
    while n < 5:
        print('%s:%d'%(threading.current_thread().name,n))
        n+=1
    print('%s ended'%threading.current_thread().name)
    
if __name__ == "__main__":
    print('%s is running'%threading.current_thread().name)
    q=threading.Thread(target=threadcount,name='thread1',)
    q.start()
    q.join()
       结果
MainThread is running
thread1 is running
thread1:0
thread1:1
thread1:2
thread1:3
thread1:4
thread1 ended
     
       Lock
       线程和进程不一样,不同进程的资源是分开的额,线程是可以共享全局资源的,所有变量都由所有线程共享。任何一个变量都可以被任何一个线程修改,是有隐患的。
       互斥锁,Lock类 : 它的作用就是解决多线程修改全局变量产生的结果不一样的问题。锁只有一个,无论多少线程,同一时刻最多只有一个线程持有该锁,所以,不会造成修改的冲突。
import threading,time,random

lock=threading.Lock()
def changenum(n):
    global num
    print('线程%s执行,num的值是%d'%(threading.current_thread().name,num))
    lock.acquire()
    try:
        num+=n
        time.sleep(random.random())
        num-=n
    finally:
        lock.release()
        print('线程%s执行完,num的值是%d'%(threading.current_thread().name,num))


    
if __name__ == "__main__":
    lock=threading.Lock()
    num=0
    a=threading.Thread(target=changenum,name='Thread1',args=(3,))
    b=threading.Thread(target=changenum,name='Thread2',args=(8,))
    a.start()
    b.start()
    a.join()
    b.join()
    print('done')
         锁的好处:
         确保了某段关键代码只能由一个线程从头到尾完整地执行
         锁的坏处:
         阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了。由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁

3、ThreadLocal

      在多线程环境下,线程使用自己的局部变量比使用全局变量好,因为局部变量只有线程自己能看见,不会影响其他线程,而全局变量的修改必须加锁。
      但是使用局部变量时,如果涉及多重函数调用,需要反复的把调用的变量写入函数的参数,一直传递下去,非常麻烦。

def process_student(name):
    std = Student(name)
    # std是局部变量,但是每个函数都要用它,因此必须传进去:
    do_task_1(std)
    do_task_2(std)

def do_task_1(std):
    do_subtask_1(std)
    do_subtask_2(std)

def do_task_2(std):
    do_subtask_1(std)
    do_subtask_2(std)
      ThreadLocal可以解决这个问题,一个ThreadLocal变量虽然是全局变量,但每个线程都只能读写自己线程的独立副本,互不干扰。可以理解TheradLocal就是一个全局字典。
import threading

l=threading.local()
#全局变量l是一个ThreadLocal对象,每个Thread对它都可以读写student属性,但互不影响。
def prname():
    print('%s is running %s'%(threading.current_thread().name,l.student))

def pross(name):
    l.student=name
    return prname()    

if __name__ == "__main__":
    a=threading.Thread(target=pross,args=('AA',))
    b=threading.Thread(target=pross,args=('BB',))
    a.start()
    b.start()
    a.join()
    b.join()
       结果
Thread-1 is running AA
Thread-2 is running BB
       你可以把l看成全局变量,但每个属性如l.student都是线程的局部变量,可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal内部会处理。

4、进程 vs. 线程

      多进程的优点是稳定性高,一个子进程崩溃了不会影响其他进程。
      多进程的缺点是创建进程的代价大。
      多线程缺点是任何一个线程挂掉都可能直接造成整个进程崩溃,因为所有线程共享进程的内存。
      线程切换
      在线程切换时,系统需要先保留现场环境,然后准备好新任务的执行环境,所以切换过程需要时间。如果过多的任务同时进行,系统频繁切换,效率就会急剧下降。
      计算密集型 vs. IO密集型
      计算密集型任务的特点是要进行大量的计算,消耗CPU资源,代码运行效率至关重要。Python这样的脚本语言运行效率低,不适合计算密集型任务。最好使用C语言。
      IO密集型,涉及到网络、磁盘IO等任务,这类任务CPU消耗很少,99%的时间都花在IO上,这类任务使用Python最为适合。


5、分布式进程

      Process可以分布到多台机器上,而Thread最多只能分布到同一台机器的多个CPU上。multiprocessing的managers子模块,支持把多进程分布到多台机器上。
      class multiprocessing.managers.BaseManager([address[, authkey]])创建一个 BaseManager 对象。
参数解释:
       address 是管理器服务进程监听的地址。如果 address 是 None ,则允许和任意主机的请求建立连接。
       authkey 是认证标识,用于检查连接服务进程的请求合法性。如果 authkey 是 None, 则会使用 current_process().authkey , 否则,就使用 authkey , 需要保证它必须是 byte 类型的字符串。
      BaseManager有如下方法:
      1)start()为管理器开启一个子进程。
      2)connect()将本地管理器对象连接到一个远程管理器进程:

from multiprocessing.managers import BaseManager
m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc')
m.connect()
      3)shutdown()停止管理器的进程。
      4)register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])
         一个 classmethod,可以将一个类型或者可调用对象注册到管理器类。
         typeid 是一种 "类型标识符",用于唯一表示某种共享对象类型,必须是一个字符串。

         callable 是一个用来为此类型标识符创建对象的可调用对象。如果一个管理器实例将使用 connect() 方法连接到服务器,或者 create_method 参数为 False,那么这里可留下 None。

         proxytype 是 BaseProxy  的子类,可以根据 typeid 为共享对象创建一个代理,如果是 None , 则会自动创建一个代理类。

         exposed 是一个函数名组成的序列,用来指明只有这些方法可以使用 BaseProxy._callmethod() 代理。(如果 exposed 是 None, 则会在 proxytype._exposed_ 存在的情况下转而使用它) 当暴露的方法列表没有指定的时候,共享对象的所有 “公共方法” 都会被代理。(这里的“公共方法”是指所有拥有 __call__() 方法并且不是以 '_' 开头的属性)

         method_to_typeid 是一个映射,用来指定那些应该返回代理对象的暴露方法所返回的类型。(如果 method_to_typeid 是 None, 则 proxytype._method_to_typeid_ 会在存在的情况下被使用)如果方法名称不在这个映射中或者映射是 None ,则方法返回的对象会是一个值拷贝。

         create_method 指明,是否要创建一个以 typeid 命名并返回一个代理对象的方法,这个函数会被服务进程用于创建共享对象,默认为 True 。
         例子:
服务进程
# task_master.py

import random, time, queue
from multiprocessing.managers import BaseManager

# 发送任务的队列:
task_queue = queue.Queue()
# 接收结果的队列:
result_queue = queue.Queue()

# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
    pass

# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register('get_task_queue'callable=lambda: task_queue)
QueueManager.register('get_result_queue'callable=lambda: result_queue)
# 绑定端口5000, 设置验证码'abc':
manager = QueueManager(address=(''5000), authkey=b'abc')
# 启动Queue:
manager.start()
# 获得通过网络访问的Queue对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去:
for i in range(10):
    n = random.randint(010000)
    print('Put task %d...' % n)
    task.put(n)
# 从result队列读取结果:
print('Try get results...')
for i in range(10):
    r = result.get(timeout=10)
    print('Result: %s' % r)
# 关闭:
manager.shutdown()
print('master exit.')
工作进程
# task_worker.py

import time, sys, queue
from multiprocessing.managers import BaseManager

# 创建类似的QueueManager:
class QueueManager(BaseManager):
    pass

# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 连接到服务器,也就是运行task_master.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码注意保持与task_master.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 从网络连接:
m.connect()
# 获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
for i in range(10):
    try:
        n = task.get(timeout=1)
        print('run task %d * %d...' % (n, n))
        r = '%d * %d = %d' % (n, n, n*n)
        time.sleep(1)
        result.put(r)
    except Queue.Empty:
        print('task queue is empty.')
# 处理结束:
print('worker exit.')


不同Queue的区别:
1.from queue import Queue
这个是普通的队列模式,类似于普通列表,先进先出模式,get方法会阻塞请求,直到有数据get出来为止

2.from multiprocessing.Queue import Queue(各子进程共有)
这个是多进程并发的Queue队列,用于解决多进程间的通信问题。普通Queue实现不了。例如来跑多进程对一批IP列表进行运算,运算后的结果都存到Queue队列里面,这个就必须使用multiprocessing提供的Queue来实现

评论