Skip to content

并发编程、锁、任务类型

更新: 2025/2/24 字数: 0 字 时长: 0 分钟

并发编程

现如今,计算机早已使用上了多核心的 CPU,且操作系统基本都支持多任务,这使得计算机可以同时运行多个程序,也可以将一个程序分解为若干个相对独立的子任务,让多个子任务“并行”或“并发”的执行,从而缩短程序的执行时间,也让用户获得更好的体验。

Concept 概念

为了深刻理解程序运行的概念,我们需要先了解核心与任务的关系。购买电脑时经常会听到 8 核、16 核处理器,这里的多少核指的就是 CPU 的核心数。“核心数与指令数”的关系就相当于“人数和任务数”的关系,做个形象的比喻:

  • 单核单任务:一个人,喝完酒,然后抽完烟,最后吃完烤鸭。
    • 解释:单核就是一个人,喝酒就是一个任务,喝完酒后,又执行抽烟这个任务,抽完烟后,又执行吃烤鸭这个任务,最后吃完烤鸭,任务结束。
    • 原理:一个核,在执行完一个任务后,才会去执行下一个任务。
  • 单核多任务:一个人,喝一口酒,抽一口烟,吃一口烤鸭,轮换进行。
    • 解释:单核就是一个人,喝酒就是一个任务,喝酒还没结束,又去执行抽烟这个任务,抽烟还没结束,又去执行吃烤鸭这个任务。
    • 原理:一个核,在执行一个任务过程当中,会切换去执行另一个任务。
  • 多核多任务:多个人,一个人喝酒,一个人抽烟,一个人吃烤鸭。
    • 解释:多核就是多个人,每个人各执行一个喝酒、抽烟、吃烤鸭的任务,互不干扰。
    • 原理:多个核,可以同时执行多个任务(当任务数等于核心数时,效率最高)。

执行过程

根据上面任务执行的过程我们可以分为如下两个概念:

  • 同步(synchronous ):只有当上一个任务完成后,才会执行下一个任务,否则就会一直等待上一个任务的完成,或者说执行下一个任务的动作是建立在上一个任务执行完成的前提上的,上面的“单核单任务”执行过程就是一个同步的过程。

  • 异步(asynchronous):即使上一个任务未完成,也会去执行下一个任务,或者说执行下一个任务的动作和上一个任务是否完成没有任何关系,上面的“单核多任务”执行过程就是一个异步的过程。

执行状态

根据上面任务执行的状态我们可以分为如下两个概念:

  • 阻塞(blocking):在执行当前任务的间隙中,不能执行其他任务,上面的“单核单任务”执行状态就是一个阻塞的效果。

  • 非阻塞(nonblocking):在执行当前任务的间隙中,可以执行其他任务,上面的“单核多任务”执行状态就是一个非阻塞的效果。

执行方式

根据上面任务执行的方式我们可以分为两个概念:

  • 并发(concurrency):在任务执行的方式中,任何一个时刻都只能执行一个任务,在一段时间内通过在不同的任务间来回切换执行,达到执行多任务的效果,上面的“单核多任务”执行方式就是一个并发的效果。比如一个单核处理器,它先执行 A 指令流的指令一段时间,再执行 B 指令流的指令一段时间,再切回到指令流 A 执行一段时间。由于处理器执行指令的速度和切换上下文的速度极快,人们完全感知不到,这就使得宏观上看起来多个指令流在同时运行,但微观上其实只有一个指令流在执行。简单说,微观上(一个时刻)是单个程序顺序执行,宏观上(一个时段)是多个程序运行

  • 并行(parallel):在任务执行的方式中,任何一个时刻都能同时执行多个任务,在一段时间内也能同时执行多个任务,达到执行多任务的效果,上面的“多核多任务”执行方式就是一个并行的效果。比如一个多核处理器,它可以同时执行 A 指令流、B 指令流,减少了上下文切换。实现并行必须要依赖于多核处理器,不论是从宏观上还是微观上,多个指令流都是同一时刻一起执行的。简单说,就是“并排行走”或“同时实行或实施”,不管宏观和微观都是多个程序运行

很多时候,我们并不用严格区分“并发”和“并行”两个词,所以我们有时候把多线程、多进程以及异步 I/O (协程)都视为实现并发编程的手段。不过在实现并发编程时需要一个前提条件,就是任务之间没有关联性,也就是执行下一个任务不依赖于上一个任务的执行结果。例如上面例子中抽烟、喝酒、吃烤鸭就是三个互不关联的任务,因此他们可以使用并发编程来加快执行效率。

Process 进程

进程(Process):计算机操作系统将程序(计算机硬盘中的静态文件上的一段可执行代码)从硬盘加载到内存后,操作系统为其分配资源并运行的一个动态的、有周期的过程。其主要特点如下:

  • 主子进程:计算机在运行任何程序时,都必然会基于程序本身产生一个进程,我们称之为“主进程(Parent Process)”。不仅如此,主进程还可以自行创建多个其他的进程,我们称之为“子进程(Child Process)”。可以说,计算机运行一个程序时,必定会产生一个主进程,且主进程的数量始终只有一个,而子进程的数量可以为零,也可以不为零,具体取决于程序的设计和运行时的需求
  • 独立空间不论是主进程,还是子进程,它们在操作系统中都有自己独立的地址空间。这个地址空间是虚拟的,它并不直接对应于物理内存中的实际位置,而是通过操作系统的内存管理机制(如分页或分段)来映射到物理内存上。这个地址空间包含了进程执行所需的所有代码、数据、堆栈等,保证了进程之间的隔离性,因此进程也是 CPU 资源分配的最小单位。即使多个进程执行相同的代码,它们的地址空间也是相互独立的,因此一个进程中的代码和数据不会影响到其他进程,但进程之间数据通信就不是很方便了
  • 进程 ID不论是主进程,还是子进程,操作系统都会为其分配一个唯一标识符,也就是进程的 ID 号(全称 Process ID,简称 PID),用于区分系统中的不同进程。PID 一般是一个整数,通常从 1 开始(但在某些系统中,如 Windows,可能会有不同的起始值或特殊的 PID 值用于表示系统进程)。需要注意的是,PID 不是进程的地址空间编号,而是操作系统用于管理和跟踪进程的一个内部标识
  • 动态周期在不同的时间运行相同的程序,所产生的进程和 PID 是不同的,这是因为进程占用的内存段、内存大小、运行时间都发生了变化。而且进程是有生命周期的,即从程序开始运行(进入内存并被 CPU 执行)到运行结束(从内存中卸载)为止。
  • 使用特性:多个进程可以充分利用 CPU 的多个核心,在需要算力的任务上表现好,且进程有独立的地址空间,当一个进程崩溃后,不会影响其他的进程,但在进程之间来回切换资源开销大较,且进程之间数据通信不方便。
python
import os

# 输出当前程序的主进程PID
print(f'主进程PID是{os.getpid()}')  # 输出:主进程PID是10432。注释:不同时间运行相同程序,产生的进程是不同的,因此每次运行得到的PID值是不同的。

提醒

进程占用的是 CPU、内存,属于计算机的运行资源。而程序占用的是磁盘空间,属于外存储器,不属于计算机的运行资源。

创建进程|守护进程

Python 内置的 multiprocessing 进程模块提供了 Process 类来创建子进程对象。其中有如下两个重要的参数:

  • target 接收函数对象作为子进程对象的内容。
  • args 接受函数对象传递的参数,必须是元组类型。
python
import os
import time
# multiprocessing导入Process类
from multiprocessing import Process

def coding(a, b, c):
    print('子进程PID是%s' % os.getpid())  # 输出:子进程PID是4328
    print(f'子进程接收参数为{a,b,c}')      # 输出:子进程接收参数为(1, '2', {'c': 3})

if __name__ == '__main__':
    # Process类创建子进程对象,target接收函数对象,args接收传递的元组类型参数
    p1 = Process(target=coding, args=(1, '2', {'c': 3}))
    # 启动子进程p1
    p1.start()

注意

调用 multiprocessing 模块的函数必须在 if __name__ == '__main__': 下执行,不然子进程会自动 import 启动它的这个文件,无限递归创建子进程,进而报错。

上面的程序产生了两个进程,一个是主进程,另一个是主进程创建的子进程,进程之间是独立运行的。假如我们希望子进程随着主进程的结束而退出的话,我们需要将子进程设置为守护进程,设置的方法就是将子进程对象的 daemon 参数赋值为 True 即可。代码如下:

python
import os
import time
from multiprocessing import Process

def coding():
    # 耗时操作
    time.sleep(3)
    print('子进程PID是%s' % os.getpid())

if __name__ == '__main__':
    p1 = Process(target=coding, daemon=True)
    # 启动子进程p1
    p1.start()
    print('主进程PID是%s' % os.getpid())
'''
输出:主进程PID是17188
注释:这里没有输出子进程的PID,只输出了主进程PID,说明主进程先于子进程结束,而子进程是守护线程,随着主进程的结束而退出,因此也就无法输出子进程PID。
'''

注意

在 Windows 系统中,在没有设置守护进程的情况下,通过 PyCharm 启动程序,子进程会随主进程结束而结束。但通过 Terminal 启动程序,主进程在执行完所有代码后,会挂起等待子进程结束。

阻塞进程|多个进程

上面的程序中,子进程存在耗时操作,主进程就会先于子进程结束。假如我们希望主进程等待子进程运行完毕,再执行主进程,就必须阻塞主进程。这里就不能再用守护进程了,而是使用进程对象自带的 join 方法来阻塞主进程,当子进程执行结束后,才执行主进程。代码如下:

python
import os
import time
from multiprocessing import Process

def coding():
    # 耗时操作
    time.sleep(3)
    print('子进程PID是%s' % os.getpid())

if __name__ == '__main__':
    p1 = Process(target=coding)
    p1.start()
    # 阻塞主进程,等待子进程执行结束。
    p1.join()
    print('主进程PID是%s' % os.getpid())
'''
输出:
子进程PID是10312
主进程PID是1156
注释:子进程存在耗时操作,且先于主进程输出,说明主进程被阻塞,等待子进程执行结束,主进程才结束执行。
'''

上面的程序中只创建了一个子进程对象,如果创建了多个子进程对象,且每一个子进程 start 启动后就马上 join 阻塞,就变成了串行执行,和单进程运行没有什么差别了。正确的做法是,先循环执行所有子进程对象 start 启动,再循环执行所有子进程对象 join 阻塞。代码如下:

python
import os
import time
from multiprocessing import Process

def coding():
    # 耗时操作
    time.sleep(3)

if __name__ == '__main__':
    time_1 = time.time()
    # 错误写法(每个对象,start后,马上join)
    p1 = Process(target=coding)
    p2 = Process(target=coding)
    p3 = Process(target=coding)
    for obj in [p1, p2, p3]:
        obj.start()
        obj.join()
    time_2 = time.time()
    print(f'主进程PID是{os.getpid()}, 耗时{int(time_2 - time_1)}')
    # 正确写法(每个对象,都start后,再join)
    p1 = Process(target=coding)
    p2 = Process(target=coding)
    p3 = Process(target=coding)
    for obj in [p1, p2, p3]:
        obj.start()
    for obj in [p1, p2, p3]:
        obj.join()
    time_3 = time.time()
    print(f'主进程PID是{os.getpid()}, 耗时{int(time_3 - time_2)}')
'''
输出:
主进程PID是7656, 耗时23(错误写法,串行执行,耗时较长)
主进程PID是7656, 耗时15(正确写法,并行执行,耗时较短)
'''

存活进程|通信进程

上面我们使用了 join 方法来阻塞主进程来等待子进程结束,假如说不想使用 join 方法来阻塞的话,我们还可以通过使用 is_alive 方法来判断子进程是否存活,进而决定是否执行主进程,修改上面多进程代码如下:

python
import os
import time
from multiprocessing import Process

def coding():
    # 耗时操作
    print(f'子进程PID是{os.getpid()}')
    time.sleep(3)

if __name__ == '__main__':
    time_1 = time.time()
    p1 = Process(target=coding)
    p2 = Process(target=coding)
    p3 = Process(target=coding)
    for obj in [p1, p2, p3]:
        obj.start()
    # 判断所有子进程是否都已结束
    while p1.is_alive() or p2.is_alive() or p3.is_alive():
        pass
    time_2 = time.time()
    print(f'主进程PID是{os.getpid()}, 耗时{int(time_2 - time_1)}')
'''
输出:
子进程PID是30540
子进程PID是31900
子进程PID是12648
主进程PID是24616, 耗时5
注释:循环启动所有子进程,最后通过判断所有子进程是否都已结束来决定是否继续执行主进程。
'''

最后给大家一个任务:启动两个进程,一个输出 Ping ,另一个输出 Pong ,当数量加起来等于 5 个时,就结束程序。听起来是不是非常简单,但实现却有一定难度,例如下面的代码实际执行的结果是 Ping 和 Pong 各输出了 5 个:

python
import time
from multiprocessing import Process

counter = 0

def sub_task(string):
    global counter
    while counter < 5:
        print(string, end='', flush=True)
        counter += 1
        time.sleep(1)

if __name__ == '__main__':
    p1 = Process(target=sub_task, args=('Ping', ))
    p2 = Process(target=sub_task, args=('Pong', ))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
'''
输出:PingPongPingPongPingPongPingPongPingPong
注释:由于每个子进程中都有一个阻塞1秒的操作,这对于创建、启动进程对象和输出来说完全够了,因此每轮输出都是规律的PingPong,如果阻塞时间更短一些,那么输出的前半部分出现Ping概率更大,后半部分输出Pong概率更大。
'''

由于进程之间不能直接通过共享内存的方式交换数据,因此程序在创建子进程时会复制父进程及其所有的数据结构,每个子进程有自己独立的内存空间,这也就意味着两个子进程中各有一个 counter 变量,它们都会从 0 加到 5 。要解决这个问题,可以使用 multiprocessing 模块中的 Queue 类,它是可以被多个进程共享的队列,底层是通过操作系统底层的管道和信号量(semaphore)机制来实现的。代码如下所示:

python
import time
from multiprocessing import Process, Queue

def sub_task(content, queue):
    counter = queue.get()
    while counter < 5:
        print(content, end='', flush=True)
        counter += 1
        queue.put(counter)
        time.sleep(0.5)
        counter = queue.get()

if __name__ == '__main__':
    queue = Queue()
    # 队列为空时,使用get方法会导致阻塞,因此这里先推入了0,避免后续get导致阻塞
    queue.put(0)
    p1 = Process(target=sub_task, args=('Ping', queue))
    p1.start()
    p2 = Process(target=sub_task, args=('Pong', queue))
    p2.start()
    # is_alive()判断进程是否存活
    while p1.is_alive() and p2.is_alive():
        pass
    queue.put(5)
'''
输出:PingPingPongPingPong
注释:通过Queue类的get和put方法让三个进程(主进程和p1、p2)实现了数据的共享,这就是进程间的通信。当队列中取出的值已经等于5时,p1和p2两个进程中的就会有一个结束,从而跳出while循环,这时队列已为空,所以主进程还需要向队列中推入一个5,这样另一个尚未结束的进程也会因为读到这个5的值而结束。
'''

建议

上面案例中的 multiprocessing.Queue 对象的 get 方法在队列为空时是会阻塞的,直到获取到数据才会返回。如果不希望该方法阻塞以及需要指定阻塞的超时时间,可以通过指定 blocktimeout 参数进行设定。

复用进程

事实上,进程的创建和释放都会带来很大的开销,频繁的创建和释放进程通常都不是很好的选择。利用进程池,可以提前准备好若干个进程,在使用的过程中不需要再通过自定义的代码创建和释放进程,而是直接复用进程池中的进程。Python 内置的 concurrent.futures 模块提供了对进程池的支持,代码如下所示:

python
import time
import random
from concurrent.futures import ProcessPoolExecutor, as_completed

def download(filename):
    start = time.time()
    time.sleep(random.randint(3, 6))
    end = time.time()
    return f'下载{filename}, 耗时: {end - start:.3f}秒.'

if __name__ == '__main__':
    start = time.time()
    to_do = []
    # 使用上下文管理器创建一个能同时运行4个进程的进程池对象,它可以自动创建和销毁对象。
    with ProcessPoolExecutor(max_workers=4) as pool:
        filenames = ['pdf', 'avi', 'mp4']
        for filename in filenames:
            to_do.append(pool.submit(download, filename=filename))
    # 执行到这里,说明所有的子进程都已结束,进程池对象也自动销毁了。
    end = time.time()
    # 输出每个子进程的结果
    print([res.result() for res in as_completed(to_do)])
    print( f'总耗时: {end - start:.3f}秒')
'''
输出:
['下载mp4, 耗时: 6.008秒.', '下载pdf, 耗时: 5.002秒.', '下载avi, 耗时: 6.011秒.'] 
总耗时: 8.205秒
注释:因为使用了进程池,这三组任务是并行执行的,因此总耗时并不是三组耗时之和,而是只比耗时最长的任务多一点时间。
'''

建议

需要说明的是,进程池的大小值指的时每次能同时执行的进程数量,和主进程申请的进程数量没有关系。

Thread 线程

线程(Thread):每个进程在启动时,操作系统都会创建一个线程,负责执行程序的入口函数(如 main() 函数)。其主要特点如下:

  • 主子线程线程是对进程更小维度的划分,一个进程里面可以有多个线程,最少也会有一个线程,就是主线程,也是默认的线程,同时也是控制进程生命周期的重要线程。主线程可以创建子线程,并在创建子线程后继续执行它的任务。如果主线程结束,且进程中没有其他非守护线程时,进程就会终止。
  • 共享资源线程没有独立的地址空间,而是多个线程共享同一个进程的资源。打个比方,进程相当于教室,线程就相当于教室里面的学生,他们共用这一间教室,因此线程也是 CPU 调度的最小单位。
  • 使用特性:多个线程共享同一个进程资源,因此线程之间来回切换资源开销较小,由于没有独立的地址空间,线程之间数据通信方便,并发性高,运行效率高。不过也正因为线程没有独立的地址空间,所以不能充分使用多核 CPU,而且线程之间共用资源会存在一定的相互影响,因此多线程比多进程更脆弱。还有就是线程之间按时间片来强制切换,不够灵活。

创建线程|守护线程

Python 内置的 threading 线程模块提供了 Thread 类用来创建子线程对象。其中有两个重要的参数如下:

  • target 接收函数对象作为子线程对象的内容。
  • args 接受函数对象传递的参数,必须是元组类型。
python
import time
import threading

def run(a, b, c):
    print('当前进程中的线程数量:', threading.active_count())        # 输出:当前进程中的线程数量:2
    print('当前线程:', threading.current_thread())                # 输出:当前线程:<Thread(Thread-1, started 26996)>
    print('当前线程名称:', threading.current_thread().name)        # 输出:当前线程名称:Thread-1
    print('当前线程存活:', threading.current_thread().is_alive())  # 输出:当前线程存活:True
    print(f'传递的参数为{a,b,c}')                                   # 输出:传递的参数为(1, '2', {'c': 3})

if __name__ == '__main__':
    print('当前进程中的线程数量:', threading.active_count())        # 输出:当前进程中的线程数量:1
    print('当前线程:', threading.current_thread())                # 输出:当前线程:<_MainThread(MainThread, started 16376)>
    print('当前线程名称:', threading.current_thread().name)        # 输出:当前线程名称:MainThread(主线程)
    print('当前线程存活:', threading.current_thread().is_alive())  # 输出:当前线程存活:True
    # 创建子线程对象t1,target参数接收函数对象run
    t1 = threading.Thread(target=run, args=(1, '2', {'c': 3},))
    # 启动子线程t1
    t1.start()

建议

上面代码如果在 PyCharm 中运行,返回的线程数量可能有 4、5 个,这是因为程序把 PyCharm 这个进程中的其他线程也算在内了,比如就包括了 PyDev 调试器中的 pydevd.Reader 线程。如果想查看当前进程中正在运行的所有线程,可以使用 threading.enumerate() 返回一个包含正在运行的线程的列表。

前面我们讲解“进程”的时候,解释了一个“守护进程”概念,而在”线程“中,也存在着一个“守护线程”的概念。所谓“守护线程”就是在主线程结束的时候,不值得再保留的执行线程。简单的说,守护线程会跟随主线程一起挂掉,而主线程的生命周期就是一个进程的生命周期。如果不理解,我们可以看下面一段简单的代码:

python
import time
from threading import Thread

def display(content):
    while True:
        print(content, end='', flush=True)
        time.sleep(0.1)

if __name__ == '__main__':
    Thread(target=display, args=('1', )).start()
    Thread(target=display, args=('2', )).start()
    time.sleep(5)

上面的代码运行起来之后是不会停止的,因为两个子线程中都有死循环,除非你手动中断代码的执行。如果在创建线程对象时,将 daemon 参数设置为 True,这两个线程就会变成守护线程,那么在其他线程结束时,即便有死循环,两个守护线程也会挂掉,不会再继续执行下去。代码如下所示。

python
import time
from threading import Thread

def display(content):
    while True:
        print(content, end='', flush=True)
        time.sleep(0.1)

if __name__ == '__main__':
    Thread(target=display, args=('1', ), daemon=True).start()
    Thread(target=display, args=('2', ), daemon=True).start()
    time.sleep(5)
'''
注释:我们在主线程中添加了一行time.sleep(5)让主线程休眠5秒,在这个过程中,输出1和2的守护线程会持续运转,直到主线程在5秒后结束,这两个守护线程也被销毁,不再继续运行。
'''

阻塞线程|多个线程

线程虽然共享一个进程的内存、数据,但线程执行的任务是相互独立的,假如子线程执行的任务中存在耗时的操作,那么主线程会先于子线程结束执行。假如我们希望子线程先于主线程结束,就有必要让主线程等待子线程运行完毕,再执行主线程,这就是线程阻塞。这里就不能再用守护线程了,而是使用线程对象自带的 join 方法来阻塞主线程,当子线程执行结束,才会继续执行主线程。代码如下:

python
import time
import threading

def coding():
    # 耗时操作
    time.sleep(3)
    print(f'子线程名称是:{threading.current_thread().name}')

if __name__ == '__main__':
    p1 = threading.Thread(target=coding)
    p1.start()
    # 阻塞主进程,等待子进程执行结束。
    p1.join()
    print(f'主线程名称是:{threading.current_thread().name}')
'''
输出:
子线程名称是:Thread-6
主线程名称是:MainThread
注释:子线程存在耗时操作,且先于主线程输出,说明主线程被阻塞,等待子线程执行结束,主线程才结束执行。
'''

警告

time.sleep() 是一个线程级阻塞操作,在单线程中使用它,意味着整个程序都将被阻塞,而在多线程中使用它,只有执行它的线程将会被阻塞,其他线程仍在进程中运行。

同进程对象一样,如果程序中创建了多个子线程对象,且每一个子线程 start 启动后就马上 join 阻塞,就变成了串行执行,和单线程运行没有什么差别了。正确的做法是,先循环执行所有子线程对象 start 启动,再循环执行所有子线程对象 join 阻塞。代码如下:

python
import time
import threading

def coding():
    # 耗时操作
    time.sleep(3)

if __name__ == '__main__':
    time_1 = time.time()
    # 错误写法(每个对象,start后,马上join)
    p1 = threading.Thread(target=coding)
    p2 = threading.Thread(target=coding)
    p3 = threading.Thread(target=coding)
    for obj in [p1, p2, p3]:
        obj.start()
        obj.join()
    time_2 = time.time()
    print(f'当前线程是{threading.current_thread().name}, 耗时{int(time_2 - time_1)}')
    # 正确写法(每个对象,都start后,再join)
    p1 = threading.Thread(target=coding)
    p2 = threading.Thread(target=coding)
    p3 = threading.Thread(target=coding)
    for obj in [p1, p2, p3]:
        obj.start()
    for obj in [p1, p2, p3]:
        obj.join()
    time_3 = time.time()
    print(f'当前线程是{threading.current_thread().name}, 耗时{int(time_3 - time_2)}')
'''
输出:
当前线程是MainThread, 耗时9(错误写法,串行执行,耗时较长)
当前线程是MainThread, 耗时3(正确写法,并发执行,耗时较短)
'''

提醒

可以看到多线程的花费时间比多进程花费时间更少,其原因是进程的创建和启动对于计算机的资源开销比较大,因此花费的时间也会比较长。

上锁线程|重写线程

先给大家一个任务:启动两个线程,操作同一个初始值为 0 的变量各一百万次,在线程中先执行加 1 操作,再执行减 1 操作。听起来是不是非常简单,是不是觉得结果一定等于 0,例如下面的代码实际执行的结果却不等于 0:

python
import threading

num = 0
def run():
    global num
    for i in range(1000000):
        num += i
        num -= i

if __name__ == '__main__':
    # 创建子线程对象
    t1 = threading.Thread(target=run)
    t2 = threading.Thread(target=run)
    # 启动子线程
    t1.start()
    t2.start()
    # 阻塞主线程,等待子线程t1、t2结束
    t1.join()
    t2.join()
    print(num)  # 输出:-1515072

造成上面结果不等于 0 的原因就是,当多个线程操作同一个变量时,线程操作不一定是串行执行,也就是说上一个线程还没有完成对变量的所有操作时,就切换到下一个线程执行了。对应到上面代码中,线程 t1 还没有完成对变量 num 的全部操作时,就切换到了线程 t2 ,导致线程 t2 中操作的变量 num 存储的仍然是线程 t1 未操作完的数据,从而导致了数据混乱。为了决解数据不一致的情况,就需要给线程加锁,使用锁的要点如下:

  1. 有锁的线程才有执行权,没有锁的线程就只能等待
  2. 有锁的线程操作完后,必须释放锁,其他线程来争抢锁,以获得接下来的执行权
  3. 线程获得锁后,不释放锁,其他线程无休止的等待锁的释放,这种情况称“死锁”
python
import threading

# 创建一个线程锁对象lock
lock = threading.Lock()

num = 0
def run():
    global num
    for i in range(1000000):
        # with 线程锁对象:可以对线程自动上锁、解锁
        with lock:
            num += i
            num -= i

if __name__ == '__main__':
    # 创建子线程对象
    t1 = threading.Thread(target=run)
    t2 = threading.Thread(target=run)
    # 启动子线程
    t1.start()
    t2.start()
    # 阻塞主线程,等待子线程t1、t2结束
    t1.join()
    t2.join()
    print(num)  # 输出:0

上面我们所写的多线程都是通过线程对象结合函数去实现的,如果我们要通过面向对象的方法来实现,就需要继承 Thread 类,具体代码如下:

python
import time
from threading import Thread, Lock

# 创建一个线程锁对象lock
lock = Lock()

class MyThread(Thread):
    def __init__(self, name):
        # 继承Thread类属性
        super().__init__()
        self.name = name

    # 重写run方法(方法名run固定)
    def run(self):
        for i in range(3):
            time.sleep(0.5)
            with lock:
                print(self.name, i)

if __name__ == '__main__':
    t1 = MyThread("t1")
    t2 = MyThread("t2")
    t3 = MyThread("t3")
    t1.start()
    t2.start()
    t3.start()
'''
输出:
t3 0
t2 0
t1 0
t3 1
t1 1
t2 1
t1 2
t3 2
t2 2
'''

复用线程

事实上,线程的创建和释放也会带来一定的开销,频繁的创建和释放线程通常都不是很好的选择。利用线程池,可以提前准备好若干个线程,在使用的过程中不需要再通过自定义的代码创建和释放线程,而是直接复用线程池中的线程。Python 内置的 concurrent.futures 模块提供了对线程池的支持,代码如下所示:

python
import time
import random
from concurrent.futures import ThreadPoolExecutor

def download(file):
    start = time.time()
    time.sleep(random.randint(1, 7))
    end = time.time()
    return f'下载 {file} 耗时: {end - start:.3f}秒'

if __name__ == '__main__':
    # 记录程序开始时间
    start = time.time()
    futures = []
    # 使用上下文管理器创建一个含有3个线程的线程池。
    with ThreadPoolExecutor(max_workers=3) as pool:
        for file in ['pdf', 'avi', 'mp4']:
            # 封装任务并添加到列表中
            futures.append(pool.submit(download, file=file))
        # 输出每个任务的状态
        print('管理器内部: ', [res._state for res in futures])
        time.sleep(3)
        print('暂停三秒后: ', [res._state for res in futures])
    print('管理器外部: ', [res._state for res in futures])
    # 输出每个子线程的结果
    print([res.result() for res in futures])
    # 记录程序结束时间
    end = time.time()
    print(f'总耗时: {end - start:.3f}秒')
'''
输出:
管理器内部:  ['PENDING', 'PENDING', 'PENDING']
暂停三秒后:  ['FINISHED', 'RUNNING', 'FINISHED']
管理器外部:  ['FINISHED', 'FINISHED', 'FINISHED']
['下载 pdf 耗时: 2.006秒', '下载 avi 耗时: 4.006秒', '下载 mp4 耗时: 1.009秒']
总耗时: 4.045秒
'''

上面的案例中使用了线程池来并发的执行三组任务,通过输出我们可以得到如下总结:

  1. 在管理器内部,将封装任务添加到列表中时,每个任务都是 PENDING 待执行状态。
  2. 在管理器内部暂停三秒后,有的任务是 RUNNING 执行状态,有的任务是 FINISHED 完成状态,说明有线程池中有的线程正在执行任务,有的线程已经完成了任务。
  3. 在管理器外部,每个任务都是 FINISHED 完成状态,说明当程序执行到上下文管理器外部时,所有的任务均已执行完成。
  4. 输出的任务结果中可以看到,任务的添加顺序和任务的结果顺序是一样的,其中下载 avi 耗时超过 3 秒,下载 pdf 和下载 mp4 均耗时在 3 秒以内,这就恰好对应了管理器内部暂停三秒后输出中一个任务是 RUNNING 执行状态,另外两个任务是 FINISHED 完成状态。
  5. 从最后输出的总耗时结果来看,执行三组任务的耗时基本等于执行时间最长的任务耗时,相比进程池还是节省了不少时间。

建议

如果线程池中的某个线程在执行过程中出现错误,并且这个错误没有被捕获和处理的话,那么错误会中断该线程的执行。由于每个线程都有自己的执行栈和异常处理上下文,因此其他线程的执行不会受到影响,它们会继续执行直到完成或遇到它们自己的异常。

由于上面的案例中因为需要统计程序总耗时,所以我们需要等待所有任务都完成才开始统计。如果不需要统计总耗时,上面案例这样写会存在以下几点潜在的缺陷:

  1. 效率低下:如果你等待所有任务都完成才开始处理结果,那么你可能会有一个长时间的等待期,特别是当任务执行时间差异很大时。这意味着一些已经完成的任务的结果会被延迟处理,导致整体程序效率降低。
  2. 资源利用不足:在等待所有任务完成期间,程序可能会处于空闲状态,没有充分利用 CPU 或 I/O 资源。特别是当一些任务已经完成而其他任务仍在运行时,这种空闲状态会更加明显。
  3. 实时性差:在某些应用场景中,你可能希望尽快获取并处理任务的结果,以便能够实时地做出响应或进行下一步操作。如果你等待所有任务完成,那么这种实时性就会受到影响。

因此我们希望能够实时处理线程池中完成的任务的结果,这里就需要用到 concurrent.futures.as_completed 函数,该函数返回一个迭代器,一旦某个任务完成,就会立即返回其任务对象,而不必等待其他任务完成,这对于并行或并发地处理多个异步任务非常有用。具体代码如下:

python
import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completed

def download(file):
    start = time.time()
    time.sleep(random.randint(1, 7))
    end = time.time()
    return f'下载 {file} 耗时: {end - start:.3f}秒'

if __name__ == '__main__':
    futures = []
    # 使用上下文管理器创建一个含有3个线程的线程池。
    with ThreadPoolExecutor(max_workers=3) as pool:
        for file in ['pdf', 'avi', 'mp4']:
            futures.append(pool.submit(download, file=file))
        # as_completed接受一个任务列表
        for res in as_completed(futures):
            print(res.result())
'''
输出:
下载 avi 耗时: 1.006秒
下载 mp4 耗时: 2.003秒
下载 pdf 耗时: 6.005秒
注释:这个案例输出的任务结果顺序和前面案例输出的任务结果顺序是不同的,前面的案例是按照任务的添加顺序进行输出的,而这个案例是按照任务的完成顺序进行输出的。
'''

重要

假如在案例中直接遍历 futures 任务列表,里面任务可能处于 PENDING 待执行状态或 RUNNING 执行状态时,这时调用 .result() 方法会出现阻塞,直到任务变为 FINISHED 完成状态,才会返回任务结果。不过案例中 futures 任务列表外部加了 as_completed 迭代器,它所返回的任务都是 FINISHED 完成状态,因此任务调用 .result() 方法就不会出现阻塞。

警告

值得注意的是,ThreadPoolExecutor 线程池中线程切换实际上是操作系统级别的切换。当线程池中的一个线程完成了它的任务后,它会被操作系统调度器挂起(如果没有其他任务需要执行),而另一个线程(可能是同一个线程池中的另一个线程,也可能是系统中的其他线程,这取决于操作系统的调度策略和当前的系统负载)会被唤醒以执行下一个任务

Coroutine 协程

协程(Coroutine):是一种比线程更轻量级的并发编程机制,它允许在程序执行过程中暂停和恢复某些任务,而不需要像线程那样进行上下文切换。

  • 更小维度:协程又称微线程,是对线程更小维度的划分。总结来说,一个进程可以有多个线程,一个线程可以有多个协程。按维度来划分就是,进程 >= 线程 >= 协程。
  • 极高效率:由于协程在线程内,所以不需要锁机制,也不存在同时写变量冲突,因此协程的执行效率极高,可以支持更高的并发(可达上万次),同时内存开销更小,且程序可以自由控制协程切换,避免了无意义的调度,结合多进程可获得极高的性能,但同时程序自身必须承担调度责任。
  • 对比线程:线程池执行任务时,当一个线程完成任务后,会进行操作系统级别的切换,操作系统可能会切换到 QQ 上,QQ 的任务完成后,可能再切到 WeChat 上,WeChat 的任务完成后,可能再切到线程池上。由于操作系统级别的任务切换非常多,所以线程池的效率并不是非常高。而协程执行任务时,只要任务阻塞就切换到下一个任务上,相当于协程执行的任务始终是不阻塞的,也就始终保持对 CPU 的占用,所以协程的执行效率远高于线程池的执行效率。
  • 使用特性协程是一种高效的并发机制,非常适合 I/O 密集型任务,如网络请求、数据库操作、文件读写等。但不适用于 CPU 密集型任务,因为协程本质上是单线程的,并没有真正的并行执行。总的来说,协程的优势在于减少了系统开销,提高了程序处理大量并发任务时的效率

实现步骤|使用案例

在 Python 中一般是通过异步编程来实现协程,其中使用 asyncawait 关键字来定义和管理协程,还使用 Python 3.4 内置的 asyncio 标准库,它支持异步 I/O,可以实现单线程并发 I/O 操作,这样协程就可以并发执行多个任务,而不会阻塞主线程。具体如何实现一个协程,主要包含以下四个部分:

  • async 关键字:将函数标记为一个异步函数,在调用异步函数时,不会立即执行异步函数,而是返回一个协程对象。
  • await 关键字:当事件中出现 I/O 阻塞操作,将其挂起,交出执行权去执行其他协程对象。或者说,暂停当前协程的执行,用来等待另一个协程的执行结果,并在该协程执行完毕后继续执行。注意该关键字只能在协程内部使用,不能在普通函数中使用。
  • 事件轮询:建立一个不断轮询协程对象的对象,用于判断协程对象是否处于 I/O 阻塞状态,若是 I/O 阻塞状态则轮询至下一个协程对象,若不是 I/O 阻塞状态则交由 CPU 执行。
  • 注册运行:协程的执行由事件轮询控制,事件轮询负责管理协程任务的调度。将协程对象注册到事件轮询上,当满足事件发生的时候,执行相对应的协程对象。

提醒

轮询又叫程控输出入,是一种 CPU 决策如何提供周边设备服务的方式。它主要是由 CPU 定时发出询问,依序询问每一个周边设备是否需要其服务,如果有需要则给予服务,服务结束后再询问下一个周边设备,如此不断周而复始。

首先,我们写一个协程程序并执行它:

python
# 导入asyncio
import asyncio

# async关键字把hello函数标记为异步函数
async def hello():
    print('Hello world!')
    print('Hello again!')

# 执行异步函数返回协程对象
h = hello()
# 获取事件轮询
event_loop = asyncio.get_event_loop()
# 调用事件轮询中的run_until_complete方法执行协程对象直到完成
event_loop.run_until_complete(h)
'''
输出:
Hello world!
Hello again!
'''

建议

上面 asyncio.get_event_loop 获取事件轮询和 event_loop.run_until_complete 执行协程对象,可以简写为 asyncio.run(协程对象) 创建事件轮询并执行协程对象。不过要注意该方法在 Windows 上可能会报 Event loop has closed! 错误,原因在该方法的源码最后一句 loop.close(),正常我们运行完之后 loop 就已经被销毁了,此时再进行 close 关闭操作必然会报错。

接下来,我们创建 3 个协程对象,并在对象中加上 1 秒延迟来模拟 I/O 阻塞,具体代码如下:

python
import time
import asyncio
import threading

async def hello():
    print('Hello world! (%s)' % threading.currentThread())
    time.sleep(1)  # 注释:增加1秒延迟。
    print('Hello again! (%s)' % threading.currentThread())

# 用列表保存3个hello协程对象
tasks = [hello() for _ in range(3)]
# 获取事件轮询
loop = asyncio.get_event_loop()
# asyncio.wait等待列表中的协程对象执行完成
loop.run_until_complete(asyncio.wait(tasks))
'''
输出:
Hello world! (<_MainThread(MainThread, started 11160)>)
(1秒延迟)
Hello again! (<_MainThread(MainThread, started 11160)>)
Hello world! (<_MainThread(MainThread, started 11160)>)
(1秒延迟)
Hello again! (<_MainThread(MainThread, started 11160)>)
Hello world! (<_MainThread(MainThread, started 11160)>)
(1秒延迟)
Hello again! (<_MainThread(MainThread, started 11160)>)
'''

首先,我们可以看到三个协程对象的线程 ID 都是一样的,说明是在同一个线程内执行的。其次我们增加了延迟,但发现程序并没有并发的去执行,而是在串行执行,其原因就是整个协程程序中只有一个线程,而 time.sleep(1) 是一个线程级别的阻塞操作,它并不支持异步操作,因此它阻塞了整个程序的运行,导致了协程程序的同步执行。为了实现异步操作,我们需要asyncio 模块自带的 sleep() 方法来模拟 I/O 阻塞,并且还需要加上 await 关键字,来挂起处于阻塞状态的任务,让协程切换到其它不阻塞状态的任务继续执行,于是代码变成了这样:

python
import asyncio
import threading

async def hello():
    print('Hello world! (%s)' % threading.currentThread())
    await asyncio.sleep(1)  # 注释:asyncio.sleep支持异步,await挂起阻塞任务
    print('Hello again! (%s)' % threading.currentThread())

loop = asyncio.get_event_loop()
tasks = [hello() for _ in range(3)]
loop.run_until_complete(asyncio.wait(tasks))
'''
输出:
Hello world! (<_MainThread(MainThread, started 8420)>)
Hello world! (<_MainThread(MainThread, started 8420)>)
Hello world! (<_MainThread(MainThread, started 8420)>)
(1秒延迟)
Hello again! (<_MainThread(MainThread, started 8420)>)
Hello again! (<_MainThread(MainThread, started 8420)>)
Hello again! (<_MainThread(MainThread, started 8420)>)
注释:执行到await命令时,把asyncio.sleep(1)当作了一个耗时1秒的I/O操作,于是暂停当前的任务执行,并把执行权交出去,继续执行事件循环中其他可以执行的任务,等到任务阻塞结束,执行权交回时,继续往下执行。这也就说明协程执行时一旦阻塞,会将CPU让出而不是让CPU处于闲置状态,这样就大大的提升了CPU的利用率。
'''

image-20231220111850672

重要

协程在执行子程序(函数)过程中是可中断的,它在适当的时候会返回来接着执行,也就是说,所有的子程序都是由一个线程在异步执行,避免了线程之间的切换操作,而多线程本质是多个线程轮换的同步执行,不能真正地实现异步,因此协程的执行效率比多线程还要高。

通过上面的案例,我们简单学习了协程的使用方法,不过在平常的任务当中我们不会像上面这样来写。我们还是以前面的例子来写,具体代码如下:

python
import time
import asyncio

# 标记download函数为异步函数
async def download(file, t):
    print(f'下载{file}')
    # asyncio.sleep会阻塞,前面使用await挂起
    await asyncio.sleep(t)
    print(f'{file}完成')

# 标记main主函数为异步函数
async def main():
    tasks = []
    # 循环遍历任务
    files = ['pdf', 'avi', 'mp4']
    for file in files:
        # 协程对象放入列表
        tasks.append(download(file, 3))
    # asyncio.wait会等待全部的协程对象任务完成,前面使用await挂起
    await asyncio.wait(tasks)

if __name__ == '__main__':
    start = time.time()
    event_loop = asyncio.get_event_loop()
    event_loop.run_until_complete(main())
    end = time.time()
    print(f'下载任务耗时:{end-start}秒')
'''
输出:
下载mp4
下载pdf
下载avi
mp4完成
pdf完成
avi完成
下载任务耗时:3.018556594848633秒
'''

值得说明一下的是,在 Python 3.8 以上的环境中执行上面的代码会有 DeprecationWarning: The explicit passing of coroutine objects to asyncio.wait() is deprecated since Python 3.8, and scheduled for removal in Python 3.11. 这串警告,提示我们列表里面保存协程对象的写法最多支持到 Python 3.11,建议我们使用任务对象。修改方式很简单,就是通过 asyncio.create_task(协程对象) 把协程对象包装成任务对象。重写 main 异步函数代码如下:

python
async def main():
    tasks = []
    files = ['pdf', 'avi', 'mp4']
    for file in files:
        # 将协程对象包装成任务对象
        task = asyncio.create_task(download(file, 3))
        # 将任务对象放入tasks列表里面,为了统一处理
        tasks.append(task)
    # 统一等到协程任务执行完毕
    await asyncio.wait(tasks)

拿返回值|输出异常

假如协程对象有返回值,我们该如何获取呢?很简单,还是用上面的案例,只需要在 await asyncio.wait(tasks) 前面赋值两个变量即可,具体什么变量,直接看 asyncio.wait 方法的源码:

image-20240604230519979

  • done 表示结束状态。
  • pending 表示等待状态。
python
import time
import asyncio

async def download(file, t):
    await asyncio.sleep(t)
    # 协程对象返回值
    return f'{file}完成'

async def main():
    tasks = []
    # 循环遍历任务
    files = ['pdf', 'avi', 'mp4']
    for file in files:
        # 将协程对象包装成任务对象
        task = asyncio.create_task(download(file, 3))
        tasks.append(task)
    # 获取返回值
    done, pending = await asyncio.wait(tasks)
    print(done)  # 输出:{<Task finished ... result='pdf完成'>, <Task finished ... result='mp4完成'>, <Task finished ... result='avi完成'>}
    print(pending)  # 输出:set()
    for result in done:
        print(result.result(), end=', ')  # 输出:pdf完成, mp4完成, avi完成

if __name__ == '__main__':
    event_loop = asyncio.get_event_loop()
    event_loop.run_until_complete(main())

这里提醒一下的是,上面案例中的协程对象是谁先完成,谁就先输出,因此协程对象的返回值顺序和协程对象进入列表的顺序是不一样的。假如我们希望协程对象的返回值顺序和协程对象进入列表的顺序一样,就可以使用 asyncio.gather() 方法来执行协程任务。具体代码如下:

python
import asyncio

async def download(file, t):
    await asyncio.sleep(t)
    return f'{file}完成'

async def main():
    tasks = []
    # 循环遍历任务
    files = ['pdf', 'avi', 'mp4']
    for file in files:
        # 将协程对象包装成任务对象
        task = asyncio.create_task(download(file, 3))
        tasks.append(task)
    # 按协程对象的顺序获取返回值
    result = await asyncio.gather(*tasks)
    print(result)  # 输出:['pdf完成', 'avi完成', 'mp4完成']

if __name__ == '__main__':
    event_loop = asyncio.get_event_loop()
    event_loop.run_until_complete(main())

上面的案例中所有的协程对象都执行正常,且都有输出。假如说协程对象在执行过程中报错了,效果怎样呢?具体代码如下:

python
import asyncio

async def download(file, t):
    await asyncio.sleep(t)
    # 报错代码
    print(1/0)
    return f'{file}完成'

async def main():
    tasks = []
    files = ['pdf', 'avi', 'mp4']
    for file in files:
        task = asyncio.create_task(download(file, 3))
        tasks.append(task)
    result = await asyncio.gather(*tasks)
    print(result)

if __name__ == '__main__':
    event_loop = asyncio.get_event_loop()
    event_loop.run_until_complete(main())

上面的代码,我们在 download 异步函数中添加了错误代码,执行后我们会看到程序直接报错 ZeroDivisionError: division by zero 并退出了。但是我们希望协程对象执行报错时,是输出错误,而不是直接退出,就需要在 asyncio.gather() 方法中添加 return_exceptions=True 参数。具体代码如下:

python
import asyncio

async def download(file, t):
    await asyncio.sleep(t)
    # 报错代码
    print(1/0)
    return f'{file}完成'

async def main():
    tasks = []
    files = ['pdf', 'avi', 'mp4']
    for file in files:
        task = asyncio.create_task(download(file, 3))
        tasks.append(task)
    result = await asyncio.gather(*tasks, return_exceptions=True)
    print(result)  # 输出:[ZeroDivisionError('division by zero'), ZeroDivisionError('division by zero'), ZeroDivisionError('division by zero')]

if __name__ == '__main__':
    event_loop = asyncio.get_event_loop()
    event_loop.run_until_complete(main())
'''
注释:可以总结到,当return_exceptions=True时,如果某个协程对象报错,就返回错误信息,其它协程对象正常执行。当return_exceptions=False时,如果某个协程对象报错,则程序直接退出,所有协程对象停止执行。
'''

并发限制

虽然 asyncio 可以支持非常高的并发量,但过高的并发可能会让电脑卡顿,因此我们就需要对并发量进行限制。代码如下:

python
import asyncio

# 借助Semaphore创建一个信号量对象,限制并发量为2个
semaphore = asyncio.Semaphore(2)

async def download(file, t):
    # 有了信号量的控制,最大协程数量被限制在2个
    async with semaphore:
        print(f'开始下载{file}')
        await asyncio.sleep(t)
        print(f'{file}下载完成')
        return f'{file}完成'

async def main():
    tasks = []
    files = ['pdf', 'avi', 'mp4']
    for file in files:
        task = asyncio.create_task(download(file, 3))
        tasks.append(task)
    result = await asyncio.gather(*tasks, return_exceptions=True)
    print(result)

if __name__ == '__main__':
    event_loop = asyncio.get_event_loop()
    event_loop.run_until_complete(main())
'''
输出:
开始下载pdf
开始下载avi
(3秒延迟)
pdf下载完成
avi下载完成
开始下载mp4
(3秒延迟)
mp4下载完成
['pdf完成', 'avi完成', 'mp4完成']
注释:这里可以看到,最大并发量被限制在2个,因此完全执行3个并发任务,一共延迟了6秒。
'''

全局解释器锁

上面我们学习了实现并发编程的几种方式,同时我们也提到每一种的并发编程所适用的任务场景,例如多进程可以充分利用 CPU 的多个核心,在需要算力的任务上表现好,而多线程、异步 I/O (协程)可以进行高效并发,在处理大量并发任务上效率高。为什么它们适用的任务场景有所差异,这里就需要了解一样东西了,那就是“全局解释器锁”。全局解释器锁(Global Interpreter Lock,简称 GIL)是计算机程序设计语言解释器用于同步线程的工具,使得在同一进程内任何时刻仅有一个线程在执行。例如,Python 解释器的官方实现 CPython 里面就有 GIL 的存在,所以在 Python 解释器进程中的任何线程,都只有先获得 GIL,才会获得执行权。这就意味着任何一个时间节点,Python 解释器中只有一个线程处于执行状态,即使在拥有多核 CPU 的环境中使用多线程框架,也只允许一次运行一个线程,这极大的限制了 Python 的性能,所以 GIL 在 Python 众多功能中其声誉可谓是“臭名昭著”

性能瓶颈

因为 GIL 的存在,Python 解释器中始终只有一个线程处于执行状态,因此 GIL 对单线程执行任务来说并没什么显著影响,但是它成为了多线程的性能瓶颈。案例如下:

python
import time
import threading

# 函数run的主要内容是循环,需要大量的运算,属于CPU密集型任务。
def run():
    for _ in range(10**8):
        pass

if __name__ == '__main__':
    # 单线程
    t1 = time.time()
    run()
    run()
    t2 = time.time()
    print(f'单线程{t2-t1}')  # 输出:单线程6.69
    # 多线程
    t1 = time.time()
    r1 = threading.Thread(target=run)
    r2 = threading.Thread(target=run)
    r1.start()
    r2.start()
    r1.join()
    r2.join()
    t2 = time.time()
    print(f'多线程{t2-t1}')  # 输出:多线程6.78
'''
注释:针对CPU密集型任务,单线程、多线程消耗的时间是差不多的。单线程在程序运行上是串行同步的,只有上一步程序运行完成后,才会运行下一步程序,否则一直等待。而多线程表面看似两个线程同时在运行,但由于GIL锁的存在,实则是在交替运行,在任意一个时间点只有一个线程处于执行状态,也就是说本该并行的多线程,被GIL强行转成并发的单线程。
'''

伪多线程

经过上面的案例,因此许多人讲 Python 的多线程实际上是“伪多线程”。可虽然是“伪多线程”,但它还是能解决一些比如网络、磁盘等很多 I/O 阻塞问题。当发生阻塞时,此时如果只有一个线程就没法处理其他事,CPU 就处于闲置状态,而多线程至少有机会把一个 CPU 核心跑到 100% 。代码案例如下:

python
import time
import threading
import multiprocessing

# 函数run内容主要是暂停,不需要大量运算,属于I/O密集型任务。
def run():
    for _ in range(3):
        time.sleep(3)  # 注释:阻塞线程3秒。

if __name__ == '__main__':
    # 单线程/单进程
    t1 = time.time()
    run()
    run()
    t2 = time.time()
    print(t2-t1)       # 输出:18.0
    # 多线程
    t1 = time.time()
    r1 = threading.Thread(target=run)
    r2 = threading.Thread(target=run)
    r1.start()
    r2.start()
    r1.join()
    r2.join()
    t2 = time.time()
    print(t2-t1)       # 输出:9.0
    # 多进程
    t1 = time.time()
    r1 = multiprocessing.Process(target=run)
    r2 = multiprocessing.Process(target=run)
    r1.start()
    r2.start()
    r1.join()
    r2.join()
    t2 = time.time()
    print(t2-t1)       # 输出:9.1
'''
注释:针对I/O密集型任务,单线程执行的方式是同步阻塞,因此在线程阻塞的情况下,什么都不能干,只能等待,而多线程就可以灵活的切换线程,当线程被阻塞时,去执行其他的线程,因此执行时间比单线程少一半。多线程、多进程执行时间差不多,这是因为在线程阻塞的情况下,并发和并行的效率差不多,但并行消耗的资源会比并发消耗的资源更大。
'''

提升性能

既然 Python 解释器是单线程的,那么我们使用多进程,每个进程开启一个 Python 解释器,这样每个 Python 解释器都有自己的 GIL,而且进程之间不会像线程之间出现 GIL 的争抢,这就是为什么多进程能摆脱 GIL 束缚的原因。前面讲到的 multiprocess 库恰好可以在很大程度上弥补 thread 库因 GIL 导致的缺陷,为了方便使用它还完整的复制了一套 thread 所提供的接口方便迁移。案例如下:

python
import time
import threading
import multiprocessing

# 函数run内容主要循环,需要大量的运算,属于CPU密集型任务
def run():
    for _ in range(10**9):
        pass

if __name__ == '__main__':
    # 多线程
    t1 = time.time()
    r1 = threading.Thread(target=run)
    r2 = threading.Thread(target=run)
    r1.start()
    r2.start()
    r1.join()
    r2.join()
    t2 = time.time()
    print(t2-t1)  # 输出:45.9
    # 多进程
    t1 = time.time()
    r1 = multiprocessing.Process(target=run)
    r2 = multiprocessing.Process(target=run)
    r1.start()
    r2.start()
    r1.join()
    r2.join()
    t2 = time.time()
    print(t2-t1)  # 输出:23.3
'''
注释:多进程相比多线程的运行时间减少了一半,这是因为多进程可以做到真正的并行,而多线程就是单线程的并发。
'''

任务类型

不同的并发编程所适用的任务场景是不同的,本质上就是所适用的任务类型是不同的。在计算机中,所执行的任务可以分为两种类型,一种是“I/O 密集型”,另一种就是“CPU 密集型”。

  • I/O 密集型任务:指磁盘 I/O、网络 I/O 占主要的任务,输入输出量很大,计算量很小(请求网页、 读写文件等)

    • 任务特点:阻塞操作多,需要等待时间。
    • 适用方法:使用多线程、协程,遇到阻塞操作时,就去执行其他的任务。
  • CPU 密集型任务:指 CPU 计算占主要的任务,输入输出量很小,计算量很大(复杂运算等)

    • 任务特点:需要 CPU 的算力。
    • 适用方法:使用多进程,充分利用 CPU 的多核心来加速运算。

总结来说就是,针对不同类型的任务选择合适的方法,可以最大限度的节约计算机资源和我们的时间