美文网首页
爬虫---线程、进程

爬虫---线程、进程

作者: riverstation | 来源:发表于2018-07-18 20:50 被阅读24次

多进程

多任务:生活中来看,就是多个任务同时进行,喝酒聊天,开车,手脚并用,唱歌跳舞
电脑中:录屏、sublime、vnc服务端、浏览器打开等
代码中:实现多任务,多进程、多线程
进程:电脑中,每一个软件启动起来都是一个进程,
代码中:没有运行的时候称之为程序,运行起来之后就是一个进程
多进程:进程只有一个,称之为主进程,子进程,要实现两个函数同时执行,就要通过主进程来创建子进程
操作系统实现,只是在进程之间来回切换,切换的非常快,看着像同时执行一样

如何实现?

面向过程:(process)

p = Process(target=xxx, name=xxx, args=(xxx,))
target: 进程启动之后要执行的函数
name: 给进程起个名字
args: 给子进程传递的参数,是一个元组
p.start() 启动进程
p.join() 让主进程等待子进程结束
os.getpid() 获取当前进程id号
os.getppid() 获取父进程的id号

from multiprocessing import Process
import time
import os

# 想让子进程1执行sing函数
def sing(a):
    print('接受的参数为%s' % a)
    # 进程id号
    print('进程-%s-开始运行' % os.getpid())
    print('父进程为%s' % os.getppid())
    for x in range(1, 5):
        print('我在唱小情歌')
        time.sleep(1)

# 想让子进程2执行dance函数
def dance(a):
    print('进程-%s-开始运行' % os.getpid())
    print('父进程为%s' % os.getppid())
    for x in range(1, 5):
        print('我在跳钢管舞')
        time.sleep(1)

def main():
    # 主进程
    print('主进程id号为%s' % os.getpid())
    a = '青花瓷'
    # 创建进程
    p_sing = Process(target=sing, name='唱歌', args=(a,))
    p_dance = Process(target=dance, name='跳舞', args=(a,))
    
    # 启动进程
    p_sing.start()
    p_dance.start()

    # 获取进程名字
    print(p_sing.name)
    print(p_dance.name)

    # 因为主进程中有子进程的信息,所以主进程必须等子进程结束之后再结束
    p_sing.join()
    p_dance.join()

    print('主进程结束')

if __name__ == '__main__':
    main()

面向对象:

from multiprocessing import Process
import time

class SingProcess(Process):
    def __init__(self, a):
        # 如果要重写构造方法,一定得手动调用父类的构造方法
        super().__init__()
        self.a = a

    def run(self):
        print('传递的参数为%s' % self.a)
        for x in range(1, 5):
            print('我在唱小情歌')
            time.sleep(1)

class DanceProcess(Process):
    def run(self):
        for x in range(1, 5):
            print('我在跳钢管舞')
            time.sleep(1)
        

def main():
    a = '现在很多老歌手为什么不唱歌了'
    p_sing = SingProcess(a)
    # 启动进程,进程启动之后默认执行类里面的run方法
    p_sing.start()

    # p_dance = DanceProcess()
    # p_dance.start()

    p_sing.join()
    # p_dance.join()
    print('主进程结束')

if __name__ == '__main__':
    main()

多进程拷贝,拷贝文件夹,假如文件夹里面只有文件。假如有100个文件,那么拷贝的时候先拷贝第一个,然后第二个,然后第三个====
拷贝一个文件就是一个任务,那一共100文件,那难道要开辟100个进程吗?
进程并不是越多越好,切换占用的时间越大

练习:多进程拷贝
拷贝之前记录时间戳,拷贝之后记录时间戳,计算拷贝的时间
多进程拷贝也是一样,哪个时间短
引入进程池,规定能创建几个进程,来了任务,5个进程

进程之间是否共享全局变量

全局变量
进程之间是否共享全局变量,不共享
每一个进程都是单独的代码

from multiprocessing import Process
import os
import time

count = 100

# 该进程用来修改全局变量的值
def change():
    global count
    count += 100
    print('进程%s修改后的值为%s' % (os.getpid(), count))

# 该进程用来读取全局变量的值
def read():
    print('进程%s读取的值为%s' % (os.getpid(), count))

def test(c):
    a = 100
    if c == 'hello':
        a += 100
        time.sleep(2)
        print('进程%s修改a的值为%s' % (os.getpid(), a))
    else:
        time.sleep(5)
        print('进程%s读取a的值为%s' % (os.getpid(), a))

def main():
    '''
    p1 = Process(target=change)
    p1.start()
    time.sleep(2)

    p2 = Process(target=read)
    p2.start()
    '''
    a = 'hello'
    b = 'world'
    p1 = Process(target=test, args=(a, ))
    p2 = Process(target=test, args=(b, ))
    p1.start()
    p2.start()

    p1.join()
    p2.join()

if __name__ == '__main__':
    main()

进程池

from multiprocessing import Process
from multiprocessing import Pool
import os
import time

def test(name):
    print('任务%s正在运行,进程id号为%s' % (name, os.getpid()))
    time.sleep(2)

def main():
    # 创建一个进程池对象
    po = Pool(3)

    # 给进程池添加任务
    lt = ['关羽', '赵云', '张飞', '马超', '黄忠', '吕布', '孙策', '大乔']
    for name in lt:
        po.apply_async(test, args=(name, ))

    # 进程池使用完毕之后,关闭
    po.close()
    # 让主进程等待结束
    po.join()
    print('主进程、进程池全部结束')

if __name__ == '__main__':
    main()

多线程

线程:比如qq。比如暴风影音,比如word
可以同时语音、同时视频、同时聊天,多线程
暴风影音,视频播放、音频播放,多线程
word,打字,拼写检查,等,多线程
多任务的实现:多进程、多线程

主进程-子进程1-子进程2
特点:进程之间没有关系,如果一个进程挂了,不影响其它子进程

进程-主线程-子线程1-子线程2
特点:线程之间有关系,如果一个线程挂了,整个进程就挂了

实现方式:(thread)

面向过程

t = Thread(target=xxx, name=xxx, args=(xxx,))
target: 线程启动之后要执行的函数
name: 线程的名字
args: 给线程传递的参数
t.start(): 启动线程
t.join(): 让主线程等待子线程结束
threading.current_thread().name : 获取线程名字

import threading
import time

def sing(song):
    print('传递过来的参数为%s' % song)
    print('获取线程的名字为%s' % threading.current_thread().name)
    for x in range(1, 5):
        print('我在唱老情歌')
        time.sleep(1)

def dance():
    for x in range(1, 5):
        print('我在跳广场舞')
        time.sleep(1)

def main():
    a = '广岛之恋'
    # 这是一个进程,进程里面有一个主线程,然后主线程创建子线程1(唱歌),子线程2(跳舞)
    t_sing = threading.Thread(target=sing, name='唱歌', args=(a, ))
    t_dance = threading.Thread(target=dance, name='跳舞')

    # 启动线程
    t_sing.start()
    t_dance.start()

    t_sing.join()
    t_dance.join()

    print('主线程、子线程同时结束')

if __name__ == '__main__':
    main()

面向对象

import threading
import time

# 滕王阁序  王勃
# 命硬
# 沁园春-雪
# 出师表

class MyThread(threading.Thread):
    def __init__(self, a):
        super().__init__()
        self.a = a

    def run(self):
        print('传递过来的参数为%s' % self.a)
        for x in range(1, 5):
            print('凤凰台上凤凰游,凤去台空江自流')
            time.sleep(1)

def main():
    a = '落霞与孤鹜齐飞,秋水共长天一色'
    t = MyThread(a)
    t.start()
    t.join()

    print('主线程、子线程全部结束')

if __name__ == '__main__':
    main()

是否共享

全局变量
共享全局变量
局部变量
不共享局部变量
线程安全问题
线程之间可以共享全局变量

import threading
import os
import time

count = 100

# 该线程用来修改全局变量的值
def change():
    global count
    count += 100
    print('线程%s修改后的值为%s' % (threading.current_thread().name, count))

# 该线程用来读取全局变量的值
def read():
    print('线程%s读取的值为%s' % (threading.current_thread().name, count))


def test():
    a = 100
    name = threading.current_thread().name
    if name == 'lala':
        a += 100
        time.sleep(2)
        print('线程%s修改a的值为%s' % (name, a))
    else:
        time.sleep(5)
        print('线程读取a的值为%s' % a)

def main():
    '''
    t1 = threading.Thread(target=change, name='修改thread')
    t1.start()
    time.sleep(2)

    t2 = threading.Thread(target=read, name='读取thread')
    t2.start()
    '''
    t1 = threading.Thread(target=test, name='lala')
    t2 = threading.Thread(target=test)

    t1.start()
    t2.start()

    t1.join()
    t2.join()

if __name__ == '__main__':
    main()

加锁

加锁

lock.acquire()

释放锁

lock.release()

import threading
import time

count = 100
# 创建一把锁
lock = threading.Lock()

def test(number):
    start = time.time()
    # 在这里面修改count的值
    global count
    for x in range(1, 10000000):
        # 加锁
        lock.acquire()
        count += number
        count -= number
        # 释放锁
        lock.release()
    end = time.time()
    # 上厕所,大号,如何解决,加锁
    # 用之前,加锁,用完之后,释放锁
    # 牺牲了效率了

    print('循环计算的时间为%s' % (start - end))

def main():
    t1 = threading.Thread(target=test, args=(3, ))
    t2 = threading.Thread(target=test, args=(5, ))
    t2.start()
    t1.start()
    t1.join()
    t2.join()

    print('主线程中打印的count的值为%s' % count)

if __name__ == '__main__':
    main()

队列

队列:买火车票,电动三轮,特点:先进先出
用在哪?

线程之间使用队列进行交互,让线程解耦合,生产者-消费者模型

线程1-生产数据

交互渠道:队列

线程2-消费数据
while 1:
生产数据
消费数据

队列使用:

            from queue import Queue
    q = Queue(5)
    q.put()         添加元素
    q.put(False)    如果队列已满,添加元素立即抛出异常
    q.put(True, 5)  如果队列已满,添加元素5s之后抛出异常
    q.get()         获取元素
    q.get(False)    如果队列为空,获取元素立即抛出异常
    q.get(True, 5)  如果队列为空,获取元素5s之后抛出异常
    q.empty()       队列是否为空
    q.full()        队列是否已满
    q.qsize()       队列长度
from queue import Queue

# 创建一个队列
# 如果写,代表队列的长度,如果不写,队列长度无限
q = Queue(5)

print(q.empty())
print(q.full())
print(q.qsize())

# 向队列中添加数据
q.put('吴彦祖')
q.put('岳云鹏')
q.put('王宝强')
q.put('黄渤')
q.put('刘德华')
print(q.empty())
print(q.full())
print(q.qsize())
# q.put('古天乐', True, 5)

# 从队列中获取数据
print(q.get())
print(q.get())
print(q.get())
print(q.get())
print(q.get())
# print(q.get(True, 5))

多线程爬虫

分析:
爬虫里面如何分多线程,

循环:
拼接url,发送请求,获取响应
解析响应,保存到文件

涉及到:
采集线程,3个线程同时采集
解析线程,3个线程同时解析
页码队列:里面是要爬取的页码数据
数据队列:采集线程向队列中添加数据
解析线程从队列中获取数据
保存到同一个文件中,锁机制

import threading
from queue import Queue
import time
from lxml import etree
import requests
import json

class CrawlThread(threading.Thread):
    def __init__(self, name, page_queue, data_queue):
        super().__init__()
        self.name = name
        # 保存页码队列
        self.page_queue = page_queue
        self.data_queue = data_queue
        # url
        self.url = 'http://www.fanjian.net/duanzi-{}'
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/67.0.3396.99 Safari/537.36',
        }

    def run(self):
        print('%s线程开始启动' % self.name)
        # 这里面的思路是什么?
        while 1:
            if self.page_queue.empty():
                break
            # 1、从页码队列中获取页码
            page = self.page_queue.get()
            # 2、将url和页码进行拼接
            url = self.url.format(page)
            # 3、发送请求,获取响应
            r = requests.get(url=url, headers=self.headers)
            time.sleep(1)

            # 4、将响应内容放入到数据队列中
            self.data_queue.put(r.text)
            
        print('%s线程结束' % self.name)

class ParseThread(threading.Thread):
    def __init__(self, name, data_queue, lock, fp):
        super().__init__()
        self.name = name
        # 保存数据队列
        self.data_queue = data_queue

        self.lock = lock
        self.fp = fp

    def run(self):
        print('%s线程开始启动' % self.name)
        # 解析线程解析步骤
        while 1:

            if self.data_queue.empty():

                break
            # 1、从数据队列中取出一个数据
            content = self.data_queue.get()
            # 2、解析这个数据

            items = self.parse_content(content)
            # 3、写入到文件中
            string = json.dumps(items, ensure_ascii=False)
            # 加锁
            self.lock.acquire()
            self.fp.write(string)
            # 释放锁
            self.lock.release()
        print('%s线程结束' % self.name)

    # 解析数据函数
    def parse_content(content):
        # 生成tree对象
        tree = etree.HTML(content)
        # 先找到所有的li标签
        li_list = tree.xpath('//li[@class="cont-item"]')
        items = []
        for oli in li_list:
            # 获取头像

            face = oli.xpath('.//div[@class="cont-list-reward"]//img/@data-src')[0]
            # 获取名字
            name = oli.xpath('.//div[@class="cont-list-head"]/a/text()')[0]
            # 获取内容
            text = oli.xpath('.//div[@class="cont-list-main"]/p/text()')[0]
            # 获取时间
            shijian = oli.xpath('.//div[@class="cont-list-info fc-gray"]/text()')[-1]
            item = {
                '头像': face,
                '名字': name,
                '内容': text,
                '时间': shijian,
            }

            # 将字典添加到列表中
            items.append(item)

        return items
        

def create_queue():
    page_queue = Queue()
    data_queue = Queue()
    # 向页码队列中添加页码
    for page in range(1, 11):
        page_queue.put(page)
    return page_queue, data_queue

def main():
    # 做什么?
    # 创建锁
    lock = threading.Lock()
    # 打开文件
    fp = open('duanzi.txt', 'w', encoding='utf8')
    # 创建两个队列
    page_queue, data_queue = create_queue()
    # 创建采集、解析线程
    crawlname_list = ['采集线程1', '采集线程2', '采集线程3']
    parsename_list = ['解析线程1', '解析线程2', '解析线程3']
    # 列表,用来保存所有的采集线程和解析线程
    t_crawl_list = []
    t_parse_list = []
    for crawlname in crawlname_list:
        t_crawl = CrawlThread(crawlname, page_queue, data_queue)
        t_crawl.start()
        # 将对应的采集线程保存起来
        t_crawl_list.append(t_crawl)

    for parsename in parsename_list:
        t_parse = ParseThread(parsename, data_queue, lock, fp)
        # 将对应的解析线程保存起来
        t_parse_list.append(t_parse)
        t_parse.start()

    # 让主线程等待子线程结束之后再结束
    for t_crawl in t_crawl_list:
        t_crawl.join()
    for t_parse in t_parse_list:
        t_parse.join()

    fp.close()
    print('主线程、子线程全部结束')

if __name__ == '__main__':
    main()

# 留给大家了,为什么里面没有写数据呢?

相关文章

网友评论

      本文标题:爬虫---线程、进程

      本文链接:https://www.haomeiwen.com/subject/vmpapftx.html