队列
import queue
# 创建一个队列(FIFO)
# maxsize:允许存储的最大的量
queue_obj = queue.Queue(maxsize=40)
for i in range(0,40):
queue_obj.put(i)
#取值
print(queue_obj.get())
#获取队列大小
print(queue_obj.qsize())
#判断队列长度
print(queue_obj.full())
#判断队列是否为空
print(queue_obj.empty())
#!!!对列线程安全的,list,dict是非线程安全的(最好使用线程锁)
多线程爬虫示例
多线程
import queue,threading,requests,json
from lxml import etree
from fake_useragent import UserAgent
def download_article_list_url(task_queue,data_queue):
'''根据分页url获取分页中的文章列表'''
req_header = {'User-Agent':UserAgent().random}
while not take_queue.empty():
req_url = take_queue.get()
response = requests.get(url=req_url,headers=req_header)
if response.status_code==200:
#页面源码储存到数列
print(response.url,'获取成功')
data_queue.put(response.text)
def parse_data_by_queue(data_queue,threadLock):
'''解析数据队中的页面源码'''
while not data_queue.empty():
html = data_queue.get()
html_element = etree.HTML(html)
#获取文章列表
article_divs = html_element.xpath('//div[@class="post floated-thumb"]')
for article_div in article_divs:
article_dict = {}
#info信息储存
article_dict['title'] = article_div.xpath('.//a[@class="archive-title"]/text()')[0]
article_dict['publish'] = article_div.xpath('.//div[@class="post-meta"]/p[1]/text()')[0]
article_dict['catotry'] = article_div.xpath('.//div[@class="post-meta"]/p[1]/a[2]/text()')[0]
commentnum = article_div.xpath('.//div[@class="post-meta"]/p[1]/a[3]/text()')
if len(commentnum) > 0:
article_dict['commentnum'] = commentnum[0]
article_dict['content'] = article_div.xpath('.//span[@class="excerpt"]/p/text()')[0]
#将数据写入本地:
threadLock.acquire()
with open('jobboleartcle.json','a+') as file:
print('正在写入',article_dict['title'],threading.currentThread().name)
json_str = json.dumps(article_dict,ensure_ascii=False)
file.write(json_str+'\n')
threadLock.release()
if __name__=='__main__':
take_queue = queue.Queue()
data_queue = queue.Queue()
#锁
threadLock = threading.Lock()
#创建任务队列
for i in range(1,30):
full_url = 'http://blog.jobbole.com/all-posts/page/{}/'.format(i)
take_queue.put(full_url)
#-------------
#创建爬取线程(4个)
crawlThreadName = ['崔一','崔二','崔三','崔四']
crawlThread = []
for name in crawlThreadName:
#将队列放入
thread = threading.Thread(target=download_article_list_url,name=name,args=(take_queue,data_queue))
thread.start()
crawlThread.append(thread)
#添加join,为了让任务队列里所有请求都执行完毕
for thread in crawlThread:
#添加join() #timeout 参数的目的是设置阻塞时间
thread.join()
#-------------
parseThreadName = ['崔1','崔2','崔3','崔4']
parseThread = []
for name in parseThreadName:
thread = threading.Thread(target=parse_data_by_queue,name=name,args=(data_queue,threadLock))
thread.start()
parseThread.append(thread)
for thread in parseThread:
thread.join()
print('Spider_over')
print('Main_Thread_over')
自定义线程
import queue,threading,requests,json
from lxml import etree
from fake_useragent import UserAgent
class CrawlThreadt(threading.Thread):
def __init__(self,task_queue,data_queue,threadName):
super(CrawlThreadt,self).__init__()
self.task_queue = task_queue
self.data_queue = data_queue
self.threadName = threadName
self.header = {'User-Agent':UserAgent().random}
def run(self):
while not self.task_queue.empty():
print(self.threadName,'正在下载')
req_url = self.task_queue.get()
response = requests.get(url=req_url,headers=self.header)
if response.status_code==200:
print(response.url,'获取成功')
self.data_queue.put(response.text)
class ParserThreadt(threading.Thread):
def __init__(self,data_queue,threadLock,threadName):
super(ParserThreadt,self).__init__()
self.data_queue = data_queue
self.threadLock = threadLock
self.threadName = threadName
def run(self):
while not self.data_queue.empty():
print(self.threadName,'正在解析')
html = self.data_queue.get()
html_element =etree.HTML(html)
article_divs = html_element.xpath('//div[@class="post floated-thumb"]')
for article_div in article_divs:
#info
article_dict = {}
article_dict['title'] = article_div.xpath('.//a[@class="archive-title"]/text()')[0]
article_dict['publish'] = article_div.xpath('.//div[@class="post-meta"]/p[1]/text()')[0]
article_dict['catotry'] = article_div.xpath('.//div[@class="post-meta"]/p[1]/a[2]/text()')[0]
commentnum = article_div.xpath('.//div[@class="post-meta"]/p[1]/a[3]/text()')
if len(commentnum) > 0:
article_dict['commentnum'] = commentnum[0]
article_dict['content'] = article_div.xpath('.//span[@class="excerpt"]/p/text()')[0]
# 加锁
self.threadLock.acquire()
with open('jobboleartcl2e.json', 'a+') as file:
print('正在写入', article_dict['title'], threading.currentThread().name)
json_str = json.dumps(article_dict, ensure_ascii=False)
file.write(json_str + '\n')
self.threadLock.release()
if __name__ == '__main__':
task_queue = queue.Queue()
data_queue = queue.Queue()
for i in range(1,100):
full_url = 'http://blog.jobbole.com/all-posts/page/{}/'.format(i)
task_queue.put(full_url)
#-------------
crawlThreadName = ['吕1','吕2','吕3','吕4']
crawlThread = []
for name in crawlThreadName:
thread = CrawlThreadt(task_queue,data_queue,name)
thread.start()
crawlThread.append(thread)
for thread in crawlThread:
thread.join()
#-------------------
threadLock = threading.Lock()
#-------------------
parseThreadName = ['娜1','娜2','娜3','娜4']
parseThread = []
for name in parseThreadName:
thread = ParserThreadt(data_queue,threadLock,name)
thread.start()
parseThread.append(thread)
for thread in parseThread:
thread.join()
print('over')
print('main_over')
线程池
#线程池
from concurrent.futures import ThreadPoolExecutor
from lxml import etree
from fake_useragent import UserAgent
import threading,requests,json
def download_list(req_url):
print(req_url,threading.currentThread().name)
req_header = {'User-Agent':UserAgent().random}
response = requests.get(url=req_url,headers=req_header)
if response.status_code==200:
#请求成功,做数据解析
#-------------
return response.text,response.status_code
def download_done(future):
html = future.result()[0]
code = future.result()[1]
#-------------
print(html,code)
#做数据的解析操作
html_element = etree.HTML(html)
article_divs = html_element.xpath('//div[@class="post floated-thumb"]')
for article_div in article_divs:
article_dict = {}
#info
article_dict['title'] = article_div.xpath('.//a[@class="archive-title"]/text()')[0]
article_dict['catotry'] = article_div.xpath('.//div[@class="post-meta"]/p[1]/a[2]/text()')[0]
commentnum = article_div.xpath('.//div[@class="post-meta"]/p[1]/a[3]/text()')
if len(commentnum) > 0:
article_dict['commentnum'] = commentnum[0]
article_dict['content'] = article_div.xpath('.//span[@class="excerpt"]/p/text()')[0]
# 将数据写入本地:
with open('jobboleartcle2.json', 'a+') as file:
print('正在写入', article_dict['title'], threading.currentThread().name)
json_str = json.dumps(article_dict, ensure_ascii=False)
file.write(json_str + '\n')
if __name__ == '__main__':
# 创建进程池 max_workers 线程池中最大的线程数量
#-------------
pool = ThreadPoolExecutor(max_workers=10)
for i in range(1,50):
full_url = 'http://blog.jobbole.com/all-posts/page/{}/'.format(i)
handler = pool.submit(download_list,full_url)
handler.add_done_callback(download_done)
#内部join( )方法
pool.shutdown()
#-------------
print('爬虫结束')
print('主线程结束')
网友评论