zoukankan      html  css  js  c++  java
  • Python中的并发

    Python并发

    并发三种层次

    个人理解,并发是计算机在逻辑上能处理多任务的能力。一般分类三种类型:

    1. 异步,异步本质上是单线程的,因为 IO 操作在很多时候会存在阻塞,异步就是在这种阻塞的时候,通过控制权的交换来实现多任务的。即异步本质上是运行过程中的控制权的交换。最典型的例子就是生产者消费者模型。
      • 异步这个概念在不同的地方有不同的说法,比如 python 里面叫做协程,内部通过生成器来实现控制权的交换。但是无论怎么称呼,异步这种并发方式都脱离不了控制权的交换这么一个事实。
    2. 多进程,进程是一个程序具体的实例,拥有自己独立的内存单元。
    3. 多线程,线程依附于进程,共享存储空间。
      • 由于 Python 官方的解释器 Cython 对多线程有一个全局的锁(GIL),所以 Cython 中的线程局限性会比较大。这里不多解释。

    这里还有一个概念需要注意,在使用并发的时候弄清楚需要并发的任务是计算密集还是IO密集
    因为异步对于计算密集的任务是无效的。因为异步的本质是 IO 操作过程中阻塞时的控制权交换。在计算密集的任务中是没有这样的阻塞的。

    协程

    前面说了异步的本质是控制权的交换,这里通过一个生产者消费者模型的例子来体会一下这么个过程。

    生成者消费者

    def consumer():         # 定义消费者,由于有yeild关键词,此消费者为一个生成器
        print("[Consumer] Init Consumer ......")
        r = "init ok"       # 初始化返回结果,并在启动消费者时,返回给生产者
        while True:
            n = yield r     # 消费者通过yield接收生产者的消息,同时返给其结果
            print("[Consumer] conusme n = %s, r = %s" % (n, r))
            r = "consume %s OK" % n     # 消费者消费结果,下个循环返回给生产者
    
    def produce(c):         # 定义生产者,此时的 c 为一个生成器
        print("[Producer] Init Producer ......")
        r = c.send(None)    # 启动消费者生成器,同时第一次接收返回结果
        print("[Producer] Start Consumer, return %s" % r)
        n = 0
        while n < 5:
            n += 1
            print("[Producer] While, Producing %s ......" % n)
            r = c.send(n)   # 向消费者发送消息并准备接收结果。此时会切换到消费者执行
            print("[Producer] Consumer return: %s" % r)
        c.close()           # 关闭消费者生成器
        print("[Producer] Close Producer ......")
    
    produce(consumer())
    

    新关键字

    # 异步IO例子:适配Python3.5,使用async和await关键字
    async def hello(index):       # 通过关键字async定义协程
        print('Hello world! index=%s, thread=%s' % (index, threading.currentThread()))
        await asyncio.sleep(1)     # 模拟IO任务
        print('Hello again! index=%s, thread=%s' % (index, threading.currentThread()))
    
    loop = asyncio.get_event_loop()     # 得到一个事件循环模型
    tasks = [hello(1), hello(2)]        # 初始化任务列表
    loop.run_until_complete(asyncio.wait(tasks))    # 执行任务
    loop.close()                        # 关闭事件循环列表
    

    网络io

    async def get(url):
    async with aiohttp.ClientSession() as session:
    async with session.get(url) as resp:
    print(url, resp.status)
    print(url, await resp.text())
    
    loop = asyncio.get_event_loop()     # 得到一个事件循环模型
    tasks = [                           # 初始化任务列表
        get("http://zhushou.360.cn/detail/index/soft_id/3283370"),
        get("http://zhushou.360.cn/detail/index/soft_id/3264775"),
        get("http://zhushou.360.cn/detail/index/soft_id/705490")
    ]
    loop.run_until_complete(asyncio.wait(tasks))    # 执行任务
    loop.close()                        # 关闭事件循环列表
    

    线/进程

    这里以进程的multiprocessing模块举例,线程可以使用multiprocessing.dummy,所有的API均相同。

    例子

    import multiprocessing as mp
    
    ############## 直接实例化 ############
    
    def func(number):
        result = number * 2
    
    p = mp.Process(target=func, args=(3, )) #实例化进程对象
    p.start() #运行进程
    
    ############ 类封装 #############
    
    class MyProcess(mp.Process):
        def __init__(self, interval):
            mp.Process.__init__(self)
    
        # 需要重载的函数
        def run(self):
            print('I'm running)
    
    p = MyProcess(1)
    p.start()
    
    #################################
    
    p.terminal() # 主动结束进程
    p.join() #让主进程等待子进程结束
    
    
    # 一些常用的属性
    p.pid #获得进程的id号
    p.name #获得进程名
    p.is_alive() #判断进程是否还存活
    p.daemon = True #设置进程随主进程一起结束
    
    mp.active_children() #获得当前进程的所有子进程
    mp.current_process() #返回正在运行的进程
    os.getpid() #获得当前进程的pid
    

    线程池

    from multiprocessing.dummy import Pool as ThreadPool 
    
    tasks = list()
    
    def do_task(item):
        return item
    
    pool = ThreadPool(3)
    
    ################ 原始操作  #######################
    
    for item in items:
        pool.apply_async(do_task, (item,)) #添加进程,非阻塞,返回执行结果
        pool.apply(do_task, (item,)) #阻塞
    
    ############## map操作  #####################3
    
    results = pool.map(do_task, items)
    
    ################################
    
    pool.close() #关闭进程池后不会有新的进程被创建
    pool.join() #等到结束,必须在close后使用
    

    进程通信

    # Lock(锁)
    # 限制对资源的访问
    
    def func(lock): #使用with
        with lock:
            print('I got lock')
    def func(lock): #不使用with
        lock.acquire() #请求锁
        try:
            print('I got lock')
        finally:
            lock.release() #释放锁
    
    lock = mp.Lock() #申请锁
    p = mp.Process(target=func, args=(lock,))
    p.start()
    
    ############################################
    
    # Semaphore(信号量)
    # 限制资源的最大连接数
    
    def func(s):
        s.aquire() #请求连接
        s.release() #断开连接
    
    s = mp.Semaphore(2) #定义信号量的最大连接数
    for i in range(5):
        p = mp.Process(target=func, arg=(s))
        p.start
    
    ############################################
    
    # Event(事件)
    # 进程间同步
    
    def func(e):
        e.wait() #定义等待时间,默认等待到e.set()为止,阻塞
        e.is_set() #判断消息是否被发出
        print('got')
        
    e = mp.Event()
    p = mp.Process(target=func, args=(e,))
    p.start()
    e.set() #发出消息
    
    ############################################
    
    # Queue(队列)
    # 多进程之间的数据传递
    
    import Queue
    
    Queue.Queue(maxsize = 0)  # 先进先出, maxsize小于等于则不限大小
    Queue.LifoQueue(maxsize = 0)  # 后进先出
    Queue.PriorityQueue(maxsize = 0)  # 构造一个优先级队列
    
    #异常
    Queue.Empty  #当调用非阻塞的get()获取空队列的元素时, 引发异常
    Queue.Full  #当调用非阻塞的put()向满队列中添加元素时, 引发异常
    
    # 生存者消费者模型
    
    def produce(q):
        try:
            data = q.put(data, block=, timeout=) 
            # 若block为False且队列已满,则立即抛出Queue.Full
            # 若block为True进程会阻塞timeout指定时间,直到队列有空间,否则抛出Queue.Full
        except:
    
    def cosume(q):
        try:
            q.get(block=, timeout=) #与上同理
        except:
    
    q = mp.Queue()
    pro = mp.Process(target=produce, args=(q, ))
    cos = mp.Process(target=cosume, args=(q, ))
    pro.start()
    cos.start()
    pro.join()
    cos.join()
    
    ############################################
    
    # Pipe(管道)
    # 多进程之间的数据传递
    
    def func1(pipe):
        while True:
            pipe.send(1)
    def func2(pipe):
        while True:
            pipe.recv() #如果管道内无消息可接受,则会阻塞
    pipe = mp.Pipe(duplex=) #参数默认为True即管道的两边均可收发
    # 返回(conn1, conn2),当参数为False时conn1只能收信息,conn2只能发消息
    p1 = mp.Process(target=func1, args=(pipe[0], ))
    p2 = mp.Process(target=func2, args=(pipe[1], ))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    

    并发池

    新的并发池模块concurrent.futures再次封装了并发操作,可以用于量大但简单并发操作。
    进程线程通用关键字换一下就行。

    future对象

    from concurrent.futures import ThreadPoolExecutor
    import time
    
    def working(message):
        time.sleep(2)
        return message
    
    pool = ThreadPoolExecutor(max_workers=2)  # 创建一个最大可容纳2个task的线程池
    
    worker1 = pool.submit(working, ("hello"))  # 往线程池里面加入一个task
    worker2 = pool.submit(working, ("world"))  # 往线程池里面加入一个task
    
    # submit 返回了一个future对象,即未完成的操作,我们可以通过调用函数来查看其状态
    
    worker1.done()  # 判断task1是否结束
    
    worker1.result()  # 查看task1返回的结果
    worker2.result()  # 查看task2返回的结果
    

    executor对象

    import concurrent.futures
    
    items = list() # 任务对象
    
    def do_task(item): # 处理函数
        return item
    
    #################### submit #########################
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        futures = {executor.submit(do_task, item): item for item in items}
        for future in concurrent.futures.as_completed(futures):
            item = futures[future]
            result = future.result()
            print(item, result)
    
    #################### map  #########################
    
    # map跟submit的区别在于submit是无序的,而map是有序的
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        for item, result in zip(items, executor.map(do_task, items)):
                print(item, result)
    
    #################### wait  #########################
    
    # wait返回一个元组,包含已完成任务的集合和未完成任务的集合。
    
    pool = ThreadPoolExecutor(5)
    futures = []
    for item in items:
        futures.append(pool.submit(do_task, item))
    
    concurrent.futures.wait(futures, timeout=None, return_when='FIRST_COMPLETED')
    

    return_when参数可选FIRST_COMPLETED, FIRST_EXCEPTIONALL_COMPLETE
    ALL_COMPLETE 会阻塞

    参考

    Python 并行任务技巧
    Python并发编程之线程池/进程池

  • 相关阅读:
    LC 357. Count Numbers with Unique Digits
    LC 851. Loud and Rich
    LC 650. 2 Keys Keyboard
    LC 553. Optimal Division
    LC 672. Bulb Switcher II
    LC 413. Arithmetic Slices
    LC 648. Replace Words
    LC 959. Regions Cut By Slashes
    Spring框架学习之注解配置与AOP思想
    Spring框架学习之高级依赖关系配置(二)
  • 原文地址:https://www.cnblogs.com/nevermoes/p/10000759.html
Copyright © 2011-2022 走看看