并发基本概念
并发和并行区别
并行,parallel,同时做某些事,可以互不干扰的同一个时刻做几件事
并发,concurrency也是同时做某些事,但是强调,一个时段内有事情要处理。
举例: 高速公路的车道,双向4车道,所有车辆(数据)可以互不干扰的在自己的车道上奔跑(传输)。
在同一个时刻,每条车道上可能同时有车辆在跑,是同时发生的概念,这是并行。
在一段时间内,有这么多车要通过,这是并发。
并发的解决
1、队列、缓冲区
假设只有一个窗口,陆续涌入食堂的人,排队打菜是比较好的方式。
所以,排队(队列)是一种天然解决并发的办法。
排队就是把人排成 队列,先进先出,解决了资源使用的问题。
排成的队列,其实就是一个缓冲地带,就是 缓冲区。
假设女生优先,每次都从这个队伍中优先选出女生出来先打饭,这就是 优先队列。
例如queue模块的类Queue、LifoQueue、PriorityQueue(小顶堆实现)。
2、争抢
只开一个窗口,有可能没有秩序,也就是谁挤进去就给谁打饭。
挤到窗口的人占据窗口,直到打到饭菜离开。
其他人继续争抢,会有一个人占据着窗口,可以视为锁定窗口,窗口就不能为其他人提供服务了。这是一种 锁机 制。
谁抢到资源就上锁,排他性的锁,其他人只能等候。
争抢也是一种高并发解决方案,但是,这样可能不好,因为有可能有人很长时间抢不到
3、预处理
如果排长队的原因,是由于每个人打菜等候时间长,因为要吃的菜没有,需要现做,没打着饭不走开,锁定着窗 口。
食堂可以提前统计大多数人最爱吃的菜品,将最爱吃的80%的热门菜,提前做好,保证供应,20%的冷门菜,现 做。
这样大多数人,就算锁定窗口,也很快打到饭菜走了,快速释放窗口。
一种提前加载用户需要的数据的思路,预处理 思想,缓存常用。
4、并行
成百上千人同时来吃饭,一个队伍搞不定的,多开打饭窗口形成多个队列,如同开多个车道一样,并行打菜。 开窗口就得扩大食堂,得多雇人在每一个窗口提供服务,造成 成本上升。
日常可以通过购买更多服务器,或多开进程、线程实现并行处理,来解决并发问题。
注意这些都是 水平扩展 思想。
注:
如果线程在单CPU上处理,就不是真并行了。
但是多数服务器都是多CPU的,服务的部署往往是多机的、分布式的,这都是并行处理。
5、提速
提高单个窗口的打饭速度,也是解决并发的方式。
打饭人员提高工作技能,或为单个窗口配备更多的服务人员,都是提速的办法。
提高单个CPU性能,或单个服务器安装更多的CPU。
这是一种 垂直扩展 思想。
6、消息中间件
上地、西二旗地铁 站外 的九曲回肠的走廊,缓冲人流,进去之后再多口安检进站。
常见的消息中间件有RabbitMQ、ActiveMQ(Apache)、RocketMQ(阿里Apache)、kafka(Apache)等。 当然还有其他手段解决并发问题,但是已经列举除了最常用的解决方案,一般来说不同的并发场景用不同的策略,
而策略可能是多种方式的优化组合。
例如多开食堂(多地),也可以把食堂建设到宿舍生活区(就近),所以说,技术来源于生活。
进程和线程
在实现了线程的操作系统中,线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实 际运作单位。一个程序的执行实例就是一个进程。
进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位, 是操作系统结构的基础。
进程和程序的关系
程序是源代码编译后的文件,而这些文件存放在磁盘上。当程序被操作系统加载到内存中,就是进程,进程中存放 着指令和数据(资源),它也是线程的容器。
Linux进程有父进程、子进程,Windows的进程是平等关系。
线程,有时被称为轻量级进程(Lightweight Process,LWP),是程序执行流的最小单元。
一个标准的线程由线程ID,当前指令指针(PC)、寄存器集合和堆栈组成。
在许多系统中,创建一个线程比创建一个进程快10-100倍。
进程、线程的理解
现代操作系统提出进程的概念,每一个进程都认为自己独占所有的计算机硬件资源。
进程就是独立的王国,进程间不可以随便的共享数据。
线程就是省份,同一个进程内的线程可以共享进程的资源,每一个线程拥有自己独立的堆栈。 线程的状态
状态
就绪(Ready)
运行(Running)
阻塞(Blocked)
终止(Terminated)
含义
线程能够运行,但在等待被调度。可能线程刚刚创建启动,或刚刚从阻塞中恢复,或者被 其他线程抢占
线程正在运行.
线程等待外部事件发生而无法运行,如I/O操作.
线程完成,或退出,或被取消.
Python中的进程和线程.
进程会启动一个解释器进程,线程共享一个解释器进程。 Python的线程开发.
Python的线程开发使用标准库threading.
进程靠线程执行代码,至少有一个主线程,其它线程是工作线程。 主线程是第一个启动的线程。
父线程:如果线程A中启动了一个线程B,A就是B的父线程。 子线程:B就是A的子线程。
线程启动
import threading
# 最简单的线程程序
def worker():
print("I'm working") print('Fineshed')
含义
线程调用的对象,就是目标函数 为线程起个名字
为目标函数传递实参,元组 为目标函数关键字传参,字典
t = threading.Thread(target=worker, name='worker') # 线程对象
t.start() #
启动
通过threading.Thread创建一个线程对象,target是目标函数,可以使用name为线程指定名称。 但是线程没有启动,需要调用start方法。
线程之所以执行函数,是因为线程中就是要执行代码的,而最简单的封装就是函数,所以还是函数调用。 函数执行完,线程也就退出了。
那么,如果不让线程退出,或者让线程一直工作怎么办呢?
import threading
import time
def worker():
while True:
time.sleep(1)
print("I'm working")
print('Fineshed')
t = threading.Thread(target=worker, name='worker') # 线程对象
t.start() #
启动
线程退出
Python没有提供线程退出的方法,线程在下面情况时退出
1、线程函数内语句执行完毕
2、线程函数中抛出未处理的异常
import threading
import time
def worker():
count = 0
while True:
if count > 5:
# raise RuntimeError(count) # 抛异常
# return # 函数返回
break
time.sleep(1)
count += 1
print('i am working')
print('finished')
t = threading.Thread(target=worker, name='worker') # 线程对象
t.start() # 启动
print('===end===')
Python的线程没有优先级、没有线程组的概念,也不能被销毁、停止、挂起,那也就没有恢复、中断了。 线程的传参
import threading
import time
def add(x, y):
print('{} + {} = {}'.format(x, y, x + y, threading.current_thread().ident))
t1 = threading.Thread(target=add, name='add', args=(4, 5))
t1.start()
time.sleep(2)
t2 = threading.Thread(target=add, name='add', args=(6,), kwargs={'y':7}) t2.start()
time.sleep(2)
t3 = threading.Thread(target=add, name='add', kwargs={'x':8, 'y':9})
t3.start()
线程传参和函数传参没什么区别,本质上就是函数传参。
threading的属性和方法
名称
current_thread()
main_thread()
active_count()
enumerate()
get_ident()
含义
返回当前线程对象
返回主线程对象
当前处于alive状态的线程个数
返回所有活着的线程的列表,不包括已经终止的线程和未开始的线程 返回当前线程的ID,非0整数
active_count、enumerate方法返回的值还包括主线程。
import threading
import time
def showtreadinfo():
print('current thread = {}\nmain thread = {}\nactive count = {}'. format( threading.current_thread(), threading.main_thread(), threading.active_count()
))
def worker():
showtreadinfo()
for i in range(5):
time.sleep(1)
print('i am working')
print('finished')
t = threading.Thread(target=worker, name='worker') # 线程对象
showtreadinfo()
time.sleep(1)
t.start() # 启动
print('===end===')
Thread实例的属性和方法
名称
name
ident
is_alive()
含义
只是一个名字,只是个标识,名称可以重名。getName()、setName()获取、设置这个名词 线程ID,它是非0整数。线程启动后才会有ID,否则为None。线程退出,此ID依旧可以访问。
此ID可以重复使用
返回线程是否活着
注意:线程的name这是一个名称,可以重复;ID必须唯一,但可以在线程退出后再利用。 import threading
import time
def worker():
for i in range(5):
time.sleep(1)
print(\'i am working\')
print(\'finished\')
t = threading.Thread(target=worker, name=\'worker\') # 线程对象
print(t.name, t.ident)
time.sleep(1)
t.start() # 启动
print(\'===end===\')
while True:
time.sleep(1)
print(\'{} {} {}\'.format(t.name, t.ident,
\'alive\' if t.is_alive() else \'dead\'))
if not t.is_alive():
print(\'{} restart\'.format(t.name))
t.start() # 线程重启??
名称
start()
run()
含义
启动线程。每一个线程必须且只能执行该方法一次 运行线程函数
为了演示,派生一个Thread的子类 start方法
import threading
import time
def worker():
for i in range(5):
time.sleep(1)
print(\'i am working\')
print(\'finished\')
class MyThread(threading.Thread):
def start(self):
print(\'start~~~~\')
super().start()
def run(self):
print('run
~~')super().run()
t = MyThread(target=worker, name=\'worker\') t.start()
# 运行结果
start~~~~
run~~~~~~
i am working
i am working
......
run方法
import threading
import time
def worker():
for i in range(5):
time.sleep(1)
print(\'i am working\')
print(\'finished\')
class MyThread(threading.Thread):
def start(self):
print(\'start~~~~\')
super().start()
def run(self):
print(\'run~~~~~~\')
super().run()
t = MyThread(target=worker, name=\'worker\') #t.start()
t.run() # run方法能多次执行吗?为什么
# 运行结果
run~~~~~~
i am working
......
start()方法会调用run()方法,而run()方法可以运行函数。
这两个方法看似功能重复了,这么看来留一个方法就可以了。是这样吗?
start和run的区别
在线程函数中,增加打印线程的名字的语句,看看能看到什么信息。
import threading
import time
def worker():
print(threading.enumerate()) # 增加这一句
for i in range(5):
time.sleep(1)
print(\'i am working\')
print(\'finished\')
class MyThread(threading.Thread):
def start(self):
print(\'start~~~~\')
super().start()
def run(self):
print(\'run~~~~~~\')
super().run()
t = MyThread(target=worker, name=\'worker\')
#t.start()
t.run() # 分别执行start或者run方法
使用start方法启动线程,启动了一个新的线程,名字叫做worker运行。但是使用run方法的,并没有启动新的线 程,就是在主线程中调用了一个普通的函数而已。
因此,启动线程请使用start方法,且对于这个线程来说,start方法只能调用一次。(设置_started属性实现) 多线程
顾名思义,多个线程,一个进程中如果有多个线程运行,就是多线程,实现一种并发。
import threading
import time
def worker():
t = threading.current_thread()
for i in range(5):
time.sleep(1)
print(\'i am working\', t.name, t.ident)
print(\'finished\')
class MyThread(threading.Thread):
def start(self):
print(\'start~~~~\')
super().start()
def run(self):
print(\'run~~~~~~\')
super().run()
t1 = MyThread(target=worker, name=\'worker1\')
t2 = MyThread(target=worker, name=\'worker2\')
t1.start()
t2.start()
可以看到worker1和work2交替执行,改成run方法试试看
import threading
import time
def worker():
t = threading.current_thread()
for i in range(5):
time.sleep(1)
print(\'i am working\', t.name, t.ident)
print(\'finished\')
class MyThread(threading.Thread):
def start(self):
print(\'start~~~~\')
super().start()
def run(self):
print(\'run~~~~~~\')
super().run()
t1 = MyThread(target=worker, name=\'worker1\')
t2 = MyThread(target=worker, name=\'worker2\')
# t1.start()
# t2.start()
t1.run()
t2.run()
没有开新的线程,这就是普通函数调用,所以执行完t1.run(),然后执行t2.run(),这里就不是多线程。 当使用start方法启动线程后,进程内有多个活动的线程并行的工作,就是多线程。
一个进程中至少有一个线程,并作为程序的入口,这个线程就是主线程。
一个进程至少有一个主线程。
其他线程称为工作线程。
线程安全
IPython中演示,python命令行、pycharm都不能演示出效果
import threading
def worker():
for x in range(100):
print(\"{} is running.\".format(threading.current_thread().name))
for x in range(1, 8): # 可以增加线程数
name = \"worker{}\".format(x)
t = threading.Thread(name=name, target=worker)
t.start()
看代码,应该是一行行打印,但是很多字符串打在了一起,为什么?
说明,print函数被打断了,被线程切换打断了。print函数分两步,第一步打印字符串,第二步打印换行符,就在 这之间,发生了线程的切换。
这说明print函数是线程不安全的。
线程安全
线程执行一段代码,不会产生不确定的结果,那这段代码就是线程安全的
上例中,本以为print应该是打印文本之后紧跟着一个换行的,但是有时候确实好几个文本在一起,后面跟上换行, 而且发生这种情况的时机不确定,所以,print函数不是线程安全函数。
如果是这样,多线程编程的时候,print输出日志,不能保证一个输出一定后面立即换行了,怎么办?
1、不让print打印换行
import threading
def worker():
for x in range(100):
print(\"{} is running.\n\".format(threading.current_thread().name), end=\'\')
for x in range(1, 8):
name = \"worker{}\".format(x)
t = threading.Thread(name=name, target=worker)
t.start()
字符串是不可变的类型,它可以作为一个整体不可分割输出。end=''就不在让print输出换行了。 2、使用logging
标准库里面的logging模块,日志处理模块,线程安全的,生成环境代码都使用logging import threading
import logging
def worker():
for x in range(100):
logging.warning(\'{} is running.\'.format(threading.current_thread().name)) #print(\'{} is running.\n\'.format(threading.current_thread().name), end=\'\')
for x in range(1, 8): # 可以增加线程
name = \'worker{}\'.format(x)
t = threading.Thread(target=worker, name=name)
t.start()
daemon线程和non-daemon线程
注意:这里的daemon不是Linux中的守护进程
Python中,构造线程的时候,可以设置daemon属性,这个属性必须在start方法前设置好。 # 源码Thread的__init__方法中
if daemon is not None:
self._daemonic = daemon # 用户设定bool值
else:
self._daemonic = current_thread().daemon
线程daemon属性,如果设定就是用户的设置,否则就取当前线程的daemon值。
主线程是non-daemon线程,即daemon = False。
class _MainThread(Thread):
def __init__(self):
Thread.__init__(self, name=\"MainThread\", daemon=False)
import time
import threading
def foo():
time.sleep(5)
for i in range(20):
print(i)
# 主线程是non-daemon线程
t = threading.Thread(target=foo, daemon=False)
t.start()
print(\'Main Thread Exits\')
发现线程t依然执行,主线程已经执行完,但是一直等着线程t。 修改为 t = threading.Thread(target=foo, daemon=True) 试一试 程序立即结束了,主线程根本没有等线程t。
名称
daemon属性
isDaemon()
setDaemon
含义
表示线程是否是daemon线程,这个值必须在start()之前设置,否则引发RuntimeError异常 是否是daemon线程
设置为daemon线程,必须在start方法之前设置
看一个例子,看看主线程何时结束daemon线程
import time
import threading
def worker(name, timeout):
time.sleep(timeout)
print(\'{} working\'.format(name))
# 主线程 是non-daemon线程
t1 = threading.Thread(target=worker, args=(\'t1\', 5), daemon=True) # 调换5和10看看效果
t1.start()
t2 = threading.Thread(target=worker, args=(\'t2\', 10), daemon=False)
t2.start()
print(\'Main Thread Exits\')
上例说明,如果除主线程之外还有non-daemon线程的时候,主线程退出时,也不会杀掉所有daemon线程,直到 所有non-daemon线程全部结束,如果还有daemon线程,主线程需要退出(主线程退出也可以理解为最后一个 non-daemon线程也要退出了),会结束所有daemon线程,程序退出。
总结
线程具有一个daemon属性,可以手动设置为True或False,也可以不设置,则取默认值None。
如果不设置daemon,就取当前线程的daemon来设置它。
主线程是non-daemon线程,即daemon = False。
从主线程创建的所有线程的不设置daemon属性,则默认都是daemon = False,也就是non-daemon 线程。 Python程序在没有活着的non-daemon线程运行时,程序退出,也就是除主线程之外剩下的只能都是daemon线 程,主线程才能退出,否则主线程就只能等待。
join方法
先看一个简单的例子,看看效果
import time
import threading
def worker(name, timeout):
time.sleep(timeout)
print(\'{} working\'.format(name))
t1 = threading.Thread(target=worker, args=(\'t1\', 3), daemon=True)
t1.start()
t1.join()# 设置join,取消join对比一下
print(\'Main Thread Exits\')
使用了join方法后,daemon线程执行完了,主线程才退出了。
import time
import threading
def worker(name, timeout):
time.sleep(timeout)
print(\'{} working\'.format(name))
t1 = threading.Thread(target=worker, args=(\'t1\', 10), daemon=True)
t1.start()
t1.join(2)
print(\'~~~~~~~~~~~\')
t1.join(2)
print(\'~~~~~~~~~~~\')
print(\'Main Thread Exits\')
join(timeout=None),是线程的标准方法之一。
一个线程中调用另一个线程的join方法,调用者将被阻塞,直到被调用线程终止。
一个线程可以被join多次。
timeout参数指定调用者等待多久,没有设置超时,就一直等到被调用线程结束。
调用谁的join方法,就是join谁,就要等谁。
daemon线程应用场景
简单来说就是,本来并没有 daemon thread,为了简化程序员的工作,让他们不用去记录和管理那些后台线程, 创造了一个 daemon thread 的概念。这个概念唯一的作用就是,当你把一个线程设置为 daemon,它可以会随主 线程的退出而退出。
主要应用场景有:
1、后台任务。如发送心跳包、监控,这种场景最多。
2、主线程工作才有用的线程。如主线程中维护这公共的资源,主线程已经清理了,准备退出,而工作线程使用这 些资源工作也没有意义了,一起退出最合适。
3、随时可以被终止的线程
如果主线程退出,想所有其它工作线程一起退出,就使用daemon=True来创建工作线程。
比如,开启一个线程定时判断WEB服务是否正常工作,主线程退出,工作线程也没有必须存在了,应该随着主线程 退出一起退出。这种daemon线程一旦创建,就可以忘记它了,只用关心主线程什么时候退出就行了。 daemon线程,简化了程序员手动关闭线程的工作。
如果在non-daemon线程A中,对另一个daemon线程B使用了join方法,这个线程B设置成daemon就没有什么意 义了,因为non-daemon线程A总是要等待B。
如果在一个daemon线程C中,对另一个daemon线程D使用了join方法,只能说明C要等待D,主线程退出,C和D 不管是否结束,也不管它们谁等谁,都要被杀掉。
举例
import time
import threading
def worker1(name):
while True:
time.sleep(1)
print(\'{} working\'.format(name), threading.current_thread().isDaemon())
def worker2(name):
current = threading.current_thread()
print(\"{}\'s daemon = {}\".format(name, current.isDaemon()))
t2 = threading.Thread(target=worker1, args=(\'t2\',))
t2.start()
t1 = threading.Thread(target=worker2, args=(\'t1\',), daemon=True)
t1.start()
time.sleep(4)
print(\'Main Thread Exits\')
上例,只要主线程要退出,2个工作线程都结束。
可以使用join,让线程结束不了,怎么做?
import time
import threading
def worker1(name):
while True:
time.sleep(1)
print(\'{} working\'.format(name), threading.current_thread().isDaemon())
def worker2(name):
current = threading.current_thread()
print(\"{}\'s daemon = {}\".format(name, current.isDaemon()))
t2 = threading.Thread(target=worker1, args=(\'t2\',))
t2.start()
t2.join()
t1 = threading.Thread(target=worker2, args=(\'t1\',), daemon=True)
t1.start()
t1.join()
time.sleep(4)
print(\'Main Thread Exits\')
threading.local类
import threading
import time
def worker():
x = 0
for i in range(100):
time.sleep(0.0001)
x += 1
print(threading.current_thread(), x)
for i in range(10):
threading.Thread(target=worker).start()
上例使用多线程,每个线程完成不同的计算任务。
x是局部变量,可以看出每一个线程的x是独立的,互不干扰的,为什么?
能否改造成使用全局变量完成。
import threading
import time
class A:
def __init__(self):
self.x = 0
# 全局对象
global_data = A()
def worker():
global_data.x = 0
for i in range(100):
time.sleep(0.0001)
global_data.x += 1
print(threading.current_thread(), global_data.x)
for i in range(10):
threading.Thread(target=worker).start()
上例虽然使用了全局对象,但是线程之间互相干扰,导致了不期望的结果。
能不能既使用全局对象,还能保持每个线程使用不同的数据呢?
python提供 threading.local 类,将这个类实例化得到一个全局对象,但是不同的线程使用这个对象存储的数据 其他线程看不见。
import threading
import time
# 全局对象
global_data = threading.local()
def worker():
global_data.x = 0
for i in range(100):
time.sleep(0.0001)
global_data.x += 1
print(threading.current_thread(), global_data.x)
for i in range(10):
threading.Thread(target=worker).start()
结果显示和使用局部变量的效果一样。
再看threading.local的例子
import threading
X = \'abc\'
global_data = threading.local() # 注意这个对象所处的线程
global_data.x = 100
print(global_data, type(global_data), global_data.x)
def worker():
print(X)
print(global_data)
print(global_data.x)
print(\'in worker\')
worker() # 普通函数调用
print(\'-\' * 30)
threading.Thread(target=worker).start() # 启动一个线程
从运行结果来看,另起一个线程打印 global_data.x 出错了。
AttributeError: \'_thread._local\' object has no attribute \'x\'
但是,global_data打印没有出错,说明看到global_data,但是global_data中的x看不到,这个x不能跨线程。 threading.local类构建了一个大字典,存放所有线程相关的字典,定义如下:
{ id(Thread) -> (ref(Thread), thread-local dict) }
每一线程实例的id为key,元组为value。
value中2部分为,线程对象引用,每个线程自己的字典。
本质
运行时,threading.local实例处在不同的线程中,就从大字典中找到当前线程相关键值对中的字典,覆盖 threading.local实例的 __dict__ 。
这样就可以在不同的线程中,安全地使用线程独有的数据,做到了线程间数据隔离,如同本地变量一样安 全。