在一个进程的内部,要同时干多件事,就需要同时运行多个“子任务”,我们把进程内的这些“子任务”叫做线程
线程通常叫做轻型的进程。线程是共享内存空间的并发执行的多任务,每一个线程都共享一个进程的资源
线程是最小的执行单元,而进程由至少一个线程组成。如何调度进程和线程,完全由操作系统绝对,程序自己不能决定什么时候执行,执行多长时间
模块
1、_thread模块 低级模块
2、threading模块 高级模块,对_thread进行了封装
启动一个线程
import threading,time
def run(num):
print("子线程(%s)开始" % (threading.current_thread().name))
#实现线程的功能
time.sleep(2)
print("打印", num)
time.sleep(2)
print("子线程(%s)结束" % (threading.current_thread().name))
if __name__ == "__main__":
#任何进程默认就会启动一个线程,称为主线程,主线程可以启动新的子线程
#current_thread():返回返回当前线程的实例
print("主线程(%s)启动" % (threading.current_thread().name)) # MainThread
#创建子线程 线程的名称 默认 Thread-1
t = threading.Thread(target=run, name="runThread", args=(1,))
t.start()
#等待线程结束
t.join()
print("主线程(%s)结束" % (threading.current_thread().name))
封装进线程对象
# 多线程模块
import threading
import time
import random
import os
# 定义一个线程的子类
class Job(threading.Thread):
def __init__(self,name):
super(Job,self).__init__()
self.name = name
def run(self):
print("开始下载{0}".format(self.name))
# 假设下载的时间是random
download_time = random.randint(3,8)
time.sleep(download_time)
print("下载结束{0}".format(self.name))
task1 = Job(name="task1")
task2 = Job(name="task2")
task3 = Job(name="task3")
task1.start()
task2.start()
task3.start()
print("主进程的pid是:{pid}".format(pid=os.getpid()))
线程间共享数据
import threading
'''
多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在每个进程中,互不影响。而多线程中,所有变量都由所有线程共享。所以,任何一个变量都可以被任意一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时修改一个变量,容易把内容改乱了。
'''
num = 0
def run(n):
global num
for i in range(10000000):
num = num + n # 15 = 9 + 6
num = num - n # 9
if __name__ == "__main__":
t1 = threading.Thread(target=run, args=(6,))
t2 = threading.Thread(target=run, args=(9,))
t1.start()
t2.start()
t1.join()
t2.join()
print("num =",num)
'''
线程1 num = num + 6
num - 6 = 3
线程2 num = num + 9
3 - 9 = -6
'''
线程锁解决线程混乱
'''
两个线程同时工作,一个存钱,一个取钱
可能导致数据异常
思路:加锁,
'''
import threading
#锁对象
lock = threading.Lock()
num = 0
def run(n):
global num
for i in range(10000000):
# 锁
# 确保了这段代码只能由一个线程从头到尾的完整执行
# 阻止了多线程的并发执行,包含锁的某段代码实际上只能以单线程模式执行,所以效率大大滴降低了
# 由于可以存在多个锁,不同线程持有不同的锁,并试图获取其他的锁,可能造成死锁,导致多个线程挂起。只能靠操作系统强制终止
# 使用异常处理的目的是:如果抛出异常,程序堆栈回滚,那么lock.release()就执行,造成死锁,因此使用try finally或 with 解决
'''
lock.acquire()
try:
num = num + n # 15 = 9 + 6
num = num - n # 9
finally:
#修改完一定要释放锁,如果不释放就会死锁,t2线程就会挂起
lock.release()
'''
#与上面代码功能相同,with lock可以自动上锁与解锁 Python写法更优雅
with lock:
num = num + n
num = num - n
if __name__ == "__main__":
t1 = threading.Thread(target=run, args=(6,))
t2 = threading.Thread(target=run, args=(9,))
t1.start()
t2.start()
t1.join()
t2.join()
print("num =",num)
守护线程
import threading
import time
def fetch(url):
print(threading.enumerate())
time.sleep(5)
if __name__ == "__main__":
t1= threading.Thread(target=fetch,args=('http://www.baidu.com',))
# True 主线程结束,子线程立刻结束 不会再sleep 5秒了
t1.daemon = True
t1.start()
ThreadingLocal
import threading
num = 0
#创建一个全局的ThreadLocal对象
#每个线程有独立的存储空间
#每个线程对ThreadLocal对象都可以读写,但是互不影响
local = threading.local()
def run(x, n):
x = x + n
x = x - n
def func(n):
#每个线程都有local.x,就是线程的局部变量
local.x = num
for i in range(1000000):
run(local.x, n)
print("%s-%d"%(threading.current_thread().name, local.x))
if __name__ == "__main__":
t1 = threading.Thread(target=func, args=(6,))
t2 = threading.Thread(target=func, args=(9,))
t1.start()
t2.start()
t1.join()
t2.join()
print("num =",num)
#作用:为每个线程绑定一个数据库链接,HTTP请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源
客户端与服务器间的数据交互
# server.py
import socket
import threading
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('10.0.142.171',8081))
server.listen(5)
def run(ck):
data = clientSocket.recv(1024)
print("客户端说:" + data.decode("utf-8"))
#sendData = input("输入返回给客户端的数据")
clientSocket.send("sunck is a goood man".encode("utf-8"))
print("服务器启动成功,等待客户端的链接")
while True:
clientSocket, clientAddress = server.accept()
#print("%s -- %s 链接成功" % (str(clientSocket), clientAddress))
t = threading.Thread(target=run, args=(clientSocket,))
t.start()
# client.py
import socket
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(("10.0.142.171", 9876))
while True:
data = input("请输入给服务器发送的数据")
client.send(data.encode("utf-8"))
info = client.recv(1024)
print("服务器说:", info.decode("utf-8"))
# client1.py
import socket
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(("10.0.142.171", 8081))
while True:
data = input("请输入给服务器发送的数据")
client.send(data.encode("utf-8"))
info = client.recv(1024)
print("服务器说:", info.decode("utf-8"))
# server.py
import tkinter
import socket
import threading
win = tkinter.Tk()
win.title("QQ服务器")
win.geometry("400x400+200+20")
users = {}
def run(ck, ca):
print("***********")
userName = ck.recv(1024)
users[userName.decode("utf-8")] = ck
print(users)
while True:
rData = ck.recv(1024)
dataStr = rData.decode("utf-8")
#xym:sunck is a goodman
infolist = dataStr.split(":")
users[infolist[0]].send((userName.decode("utf-8") + "说+"+infolist[1]).encode("utf-8"))
def start():
ipStr = eip.get()
portStr = eport.get()
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((ipStr, int(portStr)))
server.listen(10)
printStr = "服务器启动成功"
text.insert(tkinter.INSERT, printStr)
while True:
ck, ca = server.accept()
t = threading.Thread(target=run, args=(ck,ca))
t.start()
def startServer():
s = threading.Thread(target=start)
s.start()
lableIp = tkinter.Label(win,text="ip").grid(row=0,column=0)
lablePort = tkinter.Label(win,text="port").grid(row=1,column=0)
eip = tkinter.Variable()
eport = tkinter.Variable()
entryIp = tkinter.Entry(win, textvariable=eip).grid(row=0,column=1)
entryPort = tkinter.Entry(win, textvariable=eport).grid(row=1,column=1)
button = tkinter.Button(win, text="启动", command=startServer).grid(row=2,column=0)
text = tkinter.Text(win, width=30, height=10)
text.grid(row=3,column=0)
win.mainloop()
# client.py client2.py ....
import tkinter
import socket
import threading
win = tkinter.Tk()
win.title("sunck")
win.geometry("400x400+200+20")
ck = None
def getInfo():
while True:
data = ck.recv(1024)
text.insert(tkinter.INSERT, data.decode("utf-8"))
def connectServer():
global ck
ipStr = eip.get()
portStr = eport.get()
userStr = euser.get()
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((ipStr, int(portStr)))
client.send(userStr.encode("utf-8"))
ck = client
#等待接收数据
t = threading.Thread(target=getInfo)
t.start()
def sendMail():
frient = efriend.get()
sendStr = esend.get()
sendStr = frient + ":" + sendStr
ck.send(sendStr.encode("utf-8"))
lableUser = tkinter.Label(win,text="userName").grid(row=0,column=0)
euser = tkinter.Variable()
entryUser = tkinter.Entry(win, textvariable=euser).grid(row=0,column=1)
lableIp = tkinter.Label(win,text="ip").grid(row=1,column=0)
eip = tkinter.Variable()
entryIp = tkinter.Entry(win, textvariable=eip).grid(row=1,column=1)
lablePort = tkinter.Label(win,text="port").grid(row=2,column=0)
eport = tkinter.Variable()
entryPort = tkinter.Entry(win, textvariable=eport).grid(row=2,column=1)
button = tkinter.Button(win, text="连接", command=connectServer).grid(row=3,column=0)
text = tkinter.Text(win, width=30, height=5)
text.grid(row=4,column=0)
esend = tkinter.Variable()
entrySend = tkinter.Entry(win, textvariable=esend).grid(row=5,column=0)
efriend= tkinter.Variable()
entryFriend = tkinter.Entry(win, textvariable=efriend).grid(row=6,column=0)
button2 = tkinter.Button(win, text="发送", command=sendMail).grid(row=6,column=1)
win.mainloop()
信号量控制线程数量
import threading, time
sem = threading.Semaphore(3) # 并发数为3
def run():
with sem:
for i in range(5):
print("%s--%d"%(threading.current_thread().name, i))
time.sleep(1)
if __name__ == "__main__":
for i in range(5):
threading.Thread(target=run).start()
凑够一定数量才能一起执行
import threading, time
bar = threading.Barrier(3)
def run():
print("%s--start"%(threading.current_thread().name))
time.sleep(1)
bar.wait()
print("%s--end" % (threading.current_thread().name))
if __name__ == "__main__":
for i in range(5):
threading.Thread(target=run).start()
定时线程
import threading
def run():
print("sunck is a good man")
#延时执行线程
t = threading.Timer(5, run)
t.start()
t.join()
print("父线程结束")
线程通信
import threading, time
def func():
#事件对象
event = threading.Event()
def run():
for i in range(5):
#阻塞,等待事件的触发
event.wait()
#重置
event.clear()
print("sunck is a good man!!%d"%i)
t = threading.Thread(target=run).start()
return event
e = func()
#触发事件 event被阻塞需要被触发
for i in range(5):
time.sleep(2)
e.set()
生产者与消费者
import threading,queue,time,random
#生产者
def product(id, q):
while True:
num = random.randint(0, 10000)
q.put(num)
print("生产者%d生产了%d数据放入了队列" % (id, num))
time.sleep(3)
#任务完成
q.task_done()
#消费者
def customer(id, q):
while True:
item = q.get()
if item is None:
break
print("消费者%d消费了%d数据" % (id, item))
time.sleep(2)
# 任务完成
q.task_done()
if __name__ == "__main__":
# 消息队列
q = queue.Queue()
# 启动生产者
for i in range(4):
threading.Thread(target=product, args=(i,q)).start()
# 启动消费者
for i in range(3):
threading.Thread(target=customer, args=(i,q)).start()
线程调度
import threading,time
#线程条件变量
cond = threading.Condition()
def run1():
with cond:
for i in range(0, 10, 2):
print(threading.current_thread().name, i)
#time.sleep(1)
cond.wait()
cond.notify()
def run2():
with cond:
for i in range(1, 10, 2):
print(threading.current_thread().name, i)
#time.sleep(1)
cond.notify()
cond.wait()
threading.Thread(target=run1).start()
threading.Thread(target=run2).start()
多线程爬虫
#!/usr/bin/env python
from queue import Queue
import threading
import requests
import re
start_url = "http://www.geyanw.com"
# 队列,存放要爬取的url
urls_queue = Queue()
# 并发下载线程数
DOWNLOADER_NUM = 10
# 线程池
thread_pool = []
def fetch(url):
"""
根据url获取url对应的内容,并从网页中提取要爬取的url。
把提取的url put 到队列。
Args:
url:要下载的页面的地址
Returns:
text:网页的内容
"""
try:
r = requests.get(url)
html = r.content
text = html.decode('gb2312')
#text = html.decode('ISO-8859-1')
return text
except Exception as e:
print(e)
else:
# 检测http返回的状态码是否正常
r.raise_for_staus()
def parse(html):
"""
对一级页面进行解析,解析html源码中的内容。
Args:
html:网页的源码, html的类型是str。
"""
pattern = re.compile(r'href="(/[a-z0-9-/]+(.html)?)"')
urls = pattern.findall(html)
for url in urls[:5]:
print(url[0])
urls_queue.put(start_url + url[0])
def parse_detail(html):
"""
解析详情页中的内容,从详情页中抽取数据。
Args:
html:详情页的源代码。
"""
from bs4 import BeautifulSoup
soup = BeautifulSoup(html)
print(soup.title)
def downloader():
"""
从url队列中提取url,下载url对应的页面。
每个url都是一个详情页的地址。
Returns:
None
"""
#不停地从url队列中取url,如果url不是None,下载url页面,并进行解析。
while not urls_queue.empty():
url = urls_queue.get()
if url is not None:
print(url)
html = fetch(url)
parse_detail(html)
def main():
"""
在主线程中初始化url队列。
根据DOWNLOAD_NUM的设置,启动多个线程,多个线程并发地从
url队列中获取url,执行url页面的下载。
"""
# 主线程中初始化队列
html = fetch(start_url)
parse(html)
# 启动多个子线程
for i in range(DOWNLOADER_NUM):
t = threading.Thread(target=downloader)
t.start()
thread_pool.append(t)
# 阻塞队列,直到队列中没有任何url
# urls_queue.join()
# 阻塞线程,直到所有线程结束
for t in thread_pool:
t.join()
if __name__ == '__main__':
main()
网友评论