Jean's Blog

一个专注软件测试开发技术的个人博客

0%

Python多线程和协程

多线程初识

多线程前置

一般的,一个程序或者一个App,默认只在一个进程的一个线程中执行,这个线程称为主线程。如果需要开启至少另外一个线程做任务,那么就要用到今天的学习的——多线程,及高效的协程技术。

下面,先看一个线程相关的例子,需要导入线程相关的模块threading:

1
2
3
4
>>> import threading
>>> t = threading.current_thread()
>>> t
<_MainThread(MainThread, started 4553221632)>

所以,验证了程序默认是在MainThread中执行。

t.getName()获得这个线程的名字;其他常用方法,getName()获得线程id,is_alive()(注意:is_alive()是在3.9以后的版本支持的方法,isAlive()是在3.9版本之前)判断线程是否存活。

1
2
3
4
5
6
>>> t.getName()
'MainThread'
>>> t.ident
4553221632
>>> t.is_alive()
True

从上面看到,有且仅有一个“干活”的主线程,下面就要创建自己的一个线程了。

创建线程

  1. 创建一个线程

    1
    >>> my_thread = threading.Thread()
  1. 创建一个名为my_thread的线程

    1
    >>> my_thread = threading.Thread(name='my_thread')
  1. 创建线程,需要告诉这个线程,能帮助我们做什么。“做什么”是通过参数target传入,参数类型为callable。

    1
    >>> my_thread = threading.Thread(target=print_i, args=(1,))
  1. 启动线程,my_thread线程已经准备完毕,那么就启动start()

    1
    >>> my_thread.start()
  2. 执行结果如下,其中args指定函数print_i需要的参数i,类型为元组。

    1
    打印i:1

以上就是多线的基本知识点。

多线程本质

交替获得CPU时间片

为了更好理解多线程,下面以一个例子说明。

假定计算机是单核的(尽管对于CPython,这个假定有些多余),开辟3个线程,装载到threads中:

1
2
3
4
5
6
7
8
>>> import time
>>> import threading
>>> def print_time():
... for _ in range(5):
... time.sleep(0.1)
... print('当前线程%s,打印结束时间为:%s'%(threading.current_thread().getName(),time.time()))
...
>>> threads = [threading.Thread(name='t%d'%(i,),target=print_time) for i in range(3)]

启动3个线程:

1
2
>>> [t.start() for t in threads]
[None, None, None]

打印结果,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
>>> 当前线程t0,打印结束时间为:1650855307.181777
当前线程t1,打印结束时间为:1650855307.1826298
当前线程t2,打印结束时间为:1650855307.182664
当前线程t0,打印结束时间为:1650855307.286497
当前线程t1,打印结束时间为:1650855307.286579
当前线程t2,打印结束时间为:1650855307.286609
当前线程t0,打印结束时间为:1650855307.390517
当前线程t2,打印结束时间为:1650855307.3905768
当前线程t1,打印结束时间为:1650855307.390622
当前线程t2,打印结束时间为:1650855307.491287
当前线程t0,打印结束时间为:1650855307.491333
当前线程t1,打印结束时间为:1650855307.495244
当前线程t2,打印结束时间为:1650855307.5935402
当前线程t0,打印结束时间为:1650855307.593628
当前线程t1,打印结束时间为:1650855307.59603

从上面的打印输出的结果规律,根据操作系统的调度算法,t0、t1、t2三个线程,轮询获得CPU时间片。

抢夺全局变量

全局变量,被当前进程中所有存货线程共享。这就意味着,抢夺全局变量的问题。

比如下面的例子,创建10个线程,它们都会竞争全局变量a:

1
2
3
4
5
6
7
8
9
>>> import threading
>>> a = 0
>>> def add1():
... global a
... a += 1
... print('%s adds a to 1: %d'%(threading.current_thread().getName(), a))
...
>>> threads = [threading.Thread(name='t%d'%(i,), target=add1) for i in range(10)]
>>> [t.start() for t in threads]

执行结果

1
2
3
4
5
6
7
8
9
10
11
t0 adds a to 1: 1
t1 adds a to 1: 2
t2 adds a to 1: 3
t3 adds a to 1: 4
t4 adds a to 1: 5
t5 adds a to 1: 6
t6 adds a to 1: 7
t7 adds a to 1: 8
t8 adds a to 1: 9
t9 adds a to 1: 10
[None, None, None, None, None, None, None, None, None, None]

每个线程执行一次,a的值被加1,最后a变为10,结果看起来一切正常。

运行上面代码十几遍,看起来一切正常。那么,就能下结论:这段代码是线程安全的么?

在编写多线程程序中,只要有读取和修改全局变的情况,如果不采取措施,那么就一定不是线程安全的。

尽管,有时某些情况的资源竞争,暴露出问题的概率极低。乳沟某个线程修改全局变量a后,其他线程获取的,还是未修改前的值,问题就会暴露。但是,a=a+1这种修改操作,花费的时间太短,短到我们无法想象。线程间轮询执行时,都能获取到最新的、修改后的值。所以,暴露问题的概率就变得很低。

不过,现实中使用多线程,目的也不会仅仅就是为了跑一个a=a+1这种操作。更大可能,线程中执行任务,会消耗一定的时间。所以,怎样编写线程安全的代码,变得非常重要。

暴露问题

数据写入数据库操作,一般会耗费可以感知的时间。为模拟数据写库动作,简化起见,等效地,延长修改变量a的时间,问题很快的暴露出来。

1
2
3
4
5
6
7
8
9
10
11
12
13
>>> import threading
>>> import time
>>> a = 0
>>> def add1():
... global a
... tmp = a+1
... time.sleep(0.2) # 延迟0.2秒,模拟写入所需要时间
... a = tmp
... print('%s adds a to 1:%d'%(threading.current_thread().getName(),a))
...
>>> threads = [threading.Thread(name='t%d'%(i,), target=add1) for i in range(10)]
>>> [t.start() for t in threads]
[None, None, None, None, None, None, None, None, None, None]

运行代码,仅仅一次,问题就很快完全暴露,结果如下:

1
2
3
4
5
6
7
8
9
10
>>> t0 adds a to 1:1
t1 adds a to 1:1
t2 adds a to 1:1
t3 adds a to 1:1
t4 adds a to 1:1
t5 adds a to 1:1
t7 adds a to 1:1
t6 adds a to 1:1
t8 adds a to 1:1
t9 adds a to 1:1

看到10个线程全部运行后,a的值只想当与一个线程执行的结果。为什么?

修改a前,有0.2秒的休眠时间。某个线程被延迟后,CPU立即分配计算资源给其他线程。直到所有线程被分配到计算资源,已经运行玩a=a+1后,根据结果反映出,0.2秒的休眠时间还没耗尽,这样每个线程获取到的a值都是0,所有才出现上面的结果。

以上最核心的三行代码:

1
2
3
tmp = a + 1
time.sleep(0.2) # 延迟0.2秒,模拟写入所需要时间
a = tmp

加锁

Python提供的锁机制,是解决上面问题的方法之一。

某段代码只能单线程执行时,加上锁,其他线程等待,直到被释放后,其他线程再争锁,竞争到所得线程执行代码,再释放锁,重复此过程,直到所有线程都走过一遍竞争和释放锁的过程。

  1. 创建一把锁locka
  2. 通过locka.acquire()获得锁,通过locka.release()释放锁。获得锁和释放锁之间的代码,只能单线程执行。
  3. 创建和开始线程并执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import threading
import time

# 1.创建一把锁
locka = threading.Lock()

a = 0


# 2. 获得锁和释放锁
def add1():
global a
try:
locka.acquire() # 获得锁
tmp = a + 1
time.sleep(0.2) # 演示0.2秒。模拟写入所需时间
a = tmp
finally:
locka.release() # 释放锁
print('%s adds a to 1:%d' % (threading.currentThread().getName(), a))


# 3.创建和开始线程
threads = [threading.Thread(name='t%d' % (i,), target=add1) for i in range(10)]
[t.start() for t in threads]

执行结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
/Users/lvjing/PycharmProjects/python_base_project/venv/bin/python /Users/lvjing/PycharmProjects/python_base_project/thread_sample_01.py
t0 adds a to 1:1
t1 adds a to 1:2
t2 adds a to 1:3
t3 adds a to 1:4
t4 adds a to 1:5
t5 adds a to 1:6
t6 adds a to 1:7
t7 adds a to 1:8
t8 adds a to 1:9
t9 adds a to 1:10

Process finished with exit code 0

打印结果一切正常。

但是,仔细想想,这已经是单线程顺序执行,就本案例而言,已经失去多线程的价值。并且,还带来了因为线程创建开销,浪费时间的副作用。除此之外,还有一个很大风险。当程序中只有一把锁,通过try...finally还能确保不发生死锁。但是,当程序中启用多把锁,很容易发生死锁。

考虑使用场合,避免死锁,是多线程开发,需要格外注意一些问题。

高效的协程

在同一个线程中,如果发生以下事情:

  • A函数执行时被中断,传递一些数据给B函数;
  • B函数拿到这些数据后开始执行,执行一段时间后,发送一些数据到A函数;
  • 就这样交替执行……

这种执行调用模式,被称为协程

可以看到,协程是在同一个线程中函数间的切换,而不是线程间的切换,因此执行效率更优,Python的异步操作正式基于高效的协程机制。

下面通过一个例子,加深对协程的理解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def A():
a_list = ['1', '2', '3']
for to_b in a_list:
from_b = yield to_b
print('receive %s from B' % (from_b,))
print('do some complex process for A during 200ms')


def B(a):
from_a = a.send(None)
print('response %s from A' % (from_a))
print('B is analysising data from A')
b_list = ['x', 'y', 'z']
try:
for to_a in b_list:
from_a = a.send(to_a)
print('receive %s from B' % (from_a,))
print('B is analysising data from A')
except StopIteration:
print('----from a done----')
finally:
a.close()


# 调用
a = A()
B(a)

执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/Users/lvjing/PycharmProjects/python_base_project/venv/bin/python /Users/lvjing/PycharmProjects/python_base_project/thread_sample_02.py
response 1 from A
B is analysising data from A
receive x from B
do some complex process for A during 200ms
receive 2 from B
B is analysising data from A
receive y from B
do some complex process for A during 200ms
receive 3 from B
B is analysising data from A
receive z from B
do some complex process for A during 200ms
----from a done----

Process finished with exit code 0

通过执行结果看到,线程是在同一个线程中,不同函数间交替、协作的执行完成任务。下面,分析下执行过程:

  1. a.send(None)激活A函数,并执行到yield to_b,把变量to_b传递给B函数,A函数中断;
  2. from_a就是上步A函数返回的to_b值,然后执行分析这个值;
  3. 当执行到a.send(to_a)时,B函数将加工后的to_a值发送给A函数;
  4. from_b变量接收来自B函数的发送,然后使用此值做分析200ms后,又将to_b传递给B函数,A函数中断;
  5. 重复2、3、4;
  6. 直到from_a获取不到响应值,函数触发StopIteration异常,程序执行结束。

多线程是抢占时间片的编程模型,通过上面多线程的本质中提到的获得锁和释放锁的机制控制全局变量的读取和修改,容易出现死锁。

但是协程无需使用锁,也就不会发生死锁。同时,利用协程的协作这一特点,高效地完成了原编程模型只能通过多个线程才能完成的任务。