美文网首页
python 爬虫从 0 到 1,实现识别验证码登录、会话保持、

python 爬虫从 0 到 1,实现识别验证码登录、会话保持、

作者: bit_拳倾天下 | 来源:发表于2021-09-06 19:29 被阅读0次

    源码:bit-fist-crawler

    初学 python,还有很多问题待优化,欢迎和我一样的小白一起研究,也欢迎大佬路过指点!

    1. 介绍

    用于从爬取某系统数据,同步到数据库

    • 数学验证码识别与自动计算并登录
    • 会话保持,验证码识别错误、session 过期,自动重试
    • 架构 python、docker、mysql

    2. 安装教程

    2.1. docker 打包、推送

    确保本地 docker 为启动状态

    打开 cmd 或 powershell,cd 到 Dockerfile 同级目录下,执行 docker 打包:

    docker build -t bit-fist-crawler:0.1 .
    docker tag bit-fist-crawler:0.1 harbor.test.com/cztl/bit-fist-crawler:0.1
    dokcer push harbor.test.com/cztl/bit-fist-crawler:0.1
    

    harbor.test.com 替换成自己的 docker 仓库

    2.2. 服务器配置信息

    docker-compose 配置文件 docker-compose-crawler.yml

    version: '2'
    services:
      bit-fist-crawler:
        image: harbor.test.com/cztl/bit-fist-crawler:0.1
        container_name: bit-fist-crawler
        volumes:
          - "/data/crawler/config:/home/project/config"
          - "/data/crawler/log:/home/project/logs"
        restart: always
        ports:
          - 11423:11423
        mem_limit: 512m
        networks:
          - chuanzangNetwork
        logging:
          options:
            max-size: "10m"
            max-file: "10" 
    networks:
      chuanzangNetwork: 
        external: true
    

    其中 "/data/crawler/config","/data/crawler/log" 分别用于挂载配置文件路径和日志文件路径。

    在服务器上创建文件夹,用于存放配置文件覆盖代码中的 /config

    sudo mkdir /data/crawler/config
    

    将 config.txt 上传到该文件夹

    2.3. 启动

    sudo docker-compose -f docker-compose-crawler.yml up -d
    

    启动后,日志挂载在 /data/crawler/log

    3. 实现过程

    3.0. 背景

    开发项目时,遇到了需要用爬虫爬数据的需求。目标网站是个需要登录验证的网站,不能直接获取接口或页面。用户名、账号对方已经给到我们,剩下的是就是通过验证,然后获取数据。
    用 java 应该也是可以实现的,但是想借这个契机学习一下 python,所以决定用 python 实现。于是乎在菜鸟学了几天 Python 3 教程,真是入门级的好去处^-^。
    看了好多文章,最后实现了这个从 0 到 1 的过程,很开心。

    初学 python,代码并不是很规范,文件夹也是随便建的。还有很多问题待优化,欢迎和我一样的小白一起研究,也欢迎大佬路过指点!

    源码地址:bit-fist-crawler

    3.1. 验证码识别

    识别验证码就是这个项目的核心了,第一次接触,还是挺费劲的。这个功能是使用 opencv-python 这个库来做图形处理的。
    代码主要在 src/utils/captcha_util.py 中。验证码各式各样,每种验证码的图形处理都会有不同的处理过程,所以这部分代码只能参考思路。

    我的验证码是这样的:

    验证码

    识别出 5+1,然后计算结果 6 就是最终结果。好在这个验证码都是个位运算,而且后面的 “=?”固定,不需要识别,所以最终只要识别前三个字符就行。

    这部分用到这两个库:

    pip install opencv-python
    pip install numpy
    

    opencv-python 处理图片的方式,其实是将图片数字化。众所周知,图片是由一个个像素组成,每个像素又是由 r,g,b 三种颜色组成,每种颜色从 0-255 代表其亮度。这样一来就可以用一个三维数组来表示一张图片,前面两维表示坐标,第三维表示色道值。

    我的验证码是 60*160 的图片,所以读取之后得到的是 60*160*3 的数组.

    识别出字符,需要排除干扰因素,把最需要并且最简单部分交给程序处理。

    对于这个验证码,我的处理方式是:

    1. 灰度化:排除颜色信息
    import cv2
    import numpy as np
    # 打开图片,image_path 是验证码下载的文件路径:src/img/cztl-web-captcha.jpeg,每次下载重写这个图片
    img = cv2.imread(image_path)
    # 灰度处理 或者 分离通道,这两个都可以得到灰度图,可根据世界效果选;分离通道返回值依次是 b,g,r,可根据世界效果挑选,本例使用的是 g
    # img_gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    b, img_gray, r = cv2.split(img)
    
    灰度化

    灰度化过后,图片中只剩下黑白灰,可以理解成只有一个色道,这样三维数组就变成一个 60*160 的二维数组了。

    1. 二值化:使图片中只有黑白两种颜色
      因为现在图片有很多灰色地带,会比较模糊,会为识别图片造成较大影响,为排除这个因素,需要进行二值化,使其边界清晰。
    # 二值化,大于阈值 80 的都转化成 255(白),否则是 0(黑)
    ret, img_inv = cv2.threshold(img_gray, 80, 255, cv2.THRESH_BINARY_INV)
    
    二值化

    二值化之后的二维数组中,只有两种值,0 和 255.

    1. 透视拉伸:这该死的验证码是倾斜的,所以要做一下拉伸,尽量使字符摆正(但是这个验证码,每张验证码的倾斜程度都不同,这是比较蛋疼的部分,也是我的程序会误判的元凶之一,也没找到很好的解决办法)
    # 透视拉伸
    img_dst = img_perspective(img_inv)
    
    def img_perspective(img):
        """
        图片透视拉伸
        :param img: 源图片
        :return: 拉伸后的图片
        """
        pos1 = np.float32([[0, 0], [135, 0], [30, 60], [160, 60]])
        pos2 = np.float32([[25, 0], [160, 0], [30, 60], [160, 60]])
        mm = cv2.getPerspectiveTransform(pos1, pos2)
        return cv2.warpPerspective(img, mm, (160, 60))
    
    拉伸点位

    pos1 是当前图片中的四个点为像素坐标,按照左上,右上,左下,右下顺序排列。pos2 是想要调整到的目标点位。


    透视拉伸
    1. 切割图片:把验证码的前三个字符切割成图片,由于会发生字符粘连的情况,所以需要额外进行判断和切割
    # 查找轮廓
    contours, hierarchy = cv2.findContours(img_dst, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    
    # 画出矩形边界,x、y边框起始点的坐标,w、h为宽高
    cv2.rectangle(img_dst, (x, y), (x + w, y + h), (255, 255, 255), thickness=1)
    
    矩形边框

    画出矩形边界,已办用于在调整的时候用,可以看到边框,方便修改。二标注的时候需要把字符切成一张张小图片

    box = np.int0([[x, y], [x + w, y], [x + w, y + h], [x, y + h]])
    cv2.drawContours(img_dst, [box], 0, (0, 0, 255), 2)
    roi = img_dst[box[0][1]:box[3][1], box[0][0]:box[1][0]]
    roi_std = cv2.resize(roi, (30, 30))  # 将字符图片统一调整为30x30的图片大小
    

    但是有时候会出现字符粘连的情况,像下图就是前两个字符粘在一起,没法按照轮廓切割。


    字符粘连

    对于字符粘连的问题,我的方式简单粗暴,按照宽度平均分割

    def get_rect_contours(img_dst):
        """
        获取矩形边界列表,按照x坐标从左向右排序
        :param img_dst:
        :return:
        """
        contours, hierarchy = cv2.findContours(img_dst, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        rects = []
        for contour in contours:
            x, y, w, h = cv2.boundingRect(contour)
            # 排除问号
            if w < 10 and x > 100 or x > 110 and h > 10:
                continue
            # 排除等号
            if 2 < w / h < 6 and x > 90:
                continue
            # 两个字符粘在一起,通常是第二个数字和运算符粘连 或第二个数字和等号粘连 或第一个数字和运算符粘连
            if 28 < w < 50:
                w = 20
                if x < 80:
                    rects.append((x + 20, y, w, h))
            # 三个字符粘连,出现在运算符、第二个数字、等号之间 或第一个数字、运算符、第二个数字之间
            if w > 50:
                rects.append((x, y, 20, h))
                rects.append((x + 20, y, 20, h))
                if x < 50:
                    rects.append((x + 40, y, w - 40, h))
                continue
            # '*' 被划分太细的情况放弃
            if w < 10 and h < 10:
                continue
            # 矩形切图
            rects.append((x, y, w, h))
        rects.sort(key=None, reverse=False)
        return rects
    
    1. 人工标注
    def mark_img(roi, timestamp):
        """
        人工标注切图
        :param roi:
        :param timestamp:
        :return:
        """
        print("PS:对每张切图输入对应的字符(用于标记切图),回车跳过当前切图,点击关闭退出人工标记切图")
        cv2.imshow("image", roi)
        key = cv2.waitKey(0)
        if key == 27: # 点击关闭,退出
            sys.exit()
        if key == 13: # 回车跳过当前
            return
        char = chr(key)
        print("您输入的key是:", char)
        filename = "%s/%s_%s.jpg" % (img_lib_path, timestamp, char)
        cv2.imwrite(filename, roi)
    

    这个过程就是不断的加载新的验证码,把图片切出来,人工的查看每张切图,用时间戳、下划线、识别字符命名图片,保存起来,用作训练数据。这部分比较无脑,但是要重复好多次,保存足够的训练数据。

    1. 训练

    所谓训练就是利用已存在的数据(img_lib中的图片),归纳总结出一个规律性的、可借鉴的模型来,后面就可以根据这个模型判断识别新的验证码。

    def train_machine():
        """
        机器训练
        :return: id_label_map, model
        """
        # TODO 后续可尝试将返回值缓存和持久化
        filenames = os.listdir(img_lib_path)
        samples = np.empty((0, 900))
        labels = []
        for filename in filenames:
            filepath = "%s/%s" % (img_lib_path, filename)
            label = filename.split(".")[0].split("_")[-1]
            labels.append(label)
            im = cv2.imread(filepath, cv2.IMREAD_GRAYSCALE)
            roi_std = cv2.resize(im, (30, 30))
            sample = roi_std.reshape((1, 900)).astype(np.float32)
            samples = np.append(samples, sample, 0)
        samples = samples.astype(np.float32)
        unique_labels = list(set(labels))
        unique_ids = list(range(len(unique_labels)))
        label_id_map = dict(zip(unique_labels, unique_ids))
        id_label_map = dict(zip(unique_ids, unique_labels))
        label_ids = list(map(lambda x: label_id_map[x], labels))
        label_ids = np.array(label_ids).reshape((-1, 1)).astype(np.float32)
        model = cv2.ml.KNearest_create()
        model.train(samples, cv2.ml.ROW_SAMPLE, label_ids)
        return id_label_map, model
    

    这个过程是把 img_lib 下的所有图片都用 cv2.imread() 读进来,保存在 samples 中,然后切割文件名,把下划线之后的字符(也就是图片对应的字符,此处称之为 label)保存在 labels 中。samples 和 labels 是一一对应的关系。

    id_label_map 的 value 是 labels 去重的集合,key 是角标,id_label_map 用于后续对应查找 label。
    label_ids 是 img_lib 中所有图片依次对应 id_label_map 的 key。

    1. 识别

    识别的过程,主要用到的是 model 这个对象,拿到新的验证码图片之后,依次和 samples 中的图片(二维数组)进行对比,然后找到最接近的图片,返回这张图片对应的 label_ids 的值。最后用这个值去 id_label_map 中找出对应的 label,即识别到的字符。
    说到底,是数学问题啊~

    id_label_map, model = train_machine()
    for image in images:
        sample = image.reshape((1, 900)).astype(np.float32)
        # 找出最相似的图片
        ret, results, neighbours, distances = model.findNearest(sample, k=3)
        # 找出该图片对应的 label_ids 中的值
        label_id = int(results[0, 0])
        # 找出对应的 label,这就是识别结果
        label = id_label_map[label_id]
        cv2.imshow("image", image)
        key = cv2.waitKey(0)
        if key == 27:
            sys.exit()
        if key == 13:
            return
        correct_char = chr(key)
        print("您输入的key是:%s,机器识别的key是:%s" % (correct_char, label))
    

    这部分代码是搬砖来的,个人觉得 train_machine() 可以稍微简化一下的,训练数据时,label_ids 如果存的是 id_label_map 的值,而不是 key 的话,后面就返回一个 model 就行了,后面识别的 results[0, 0] 直接就是我们想要的结果,也不需要在取 id_label_map 中找了。

    3.2. session 保持及自动重试

    验证码识别的最终目的是登陆,获取校验所需的信息,从而在之后的请求能通过校验。本例系统使用的是 session、cookie机制,所以此处最开始用到 http.cookiejar 保存 cookie 的方式请求。

    import requests
    import http.cookiejar
    
    # 设置一个cookie处理器,它负责从服务器下载cookie到本地,并且在发送请求时带上本地的cookie
        cj = http.cookiejar.CookieJar()
        cookie_support = request.HTTPCookieProcessor(cj)
        opener = request.build_opener(cookie_support, request.HTTPHandler)
    
        request.install_opener(opener)
        raw_data = {"figure": figure, "username": username, "password": password, "imgId": imgId, "code": code}
        post_data = parse.urlencode(raw_data).encode('utf-8')
        cookie = input("输入cookie:")
        headers = {"Cookie": cookie}
        req = request.Request(url=login_url, data=post_data, method='POST')
        # 打开登录主页面(目的是从页面下载cookie,这样我们在再送post数据时就有cookie了,否则发送不成功)
        response = request.urlopen(req)
    

    但是这种方式行不通,因为每次请求都是一个新的会话,导致没次都识别到的验证码都和后端不对应。

    解决办法是使用 session = requests.session(),requests库的session会话对象可以跨请求保持某些参数,就是比如你使用session成功的登录了某个网站,再次使用该session对象请求该网站的其他网页都会默认使用该session之前使用的cookie等参数。

    import requests
    session = requests.session()
    wrong_title = "铁路工程管理平台--登录"
    
    
    def request(url, method=request_method.GET, headers=None, data=None):
        """
        请求数据
        :param url:
        :param method:
        :param headers:
        :param data:
        :return:
        """
        response = session.request(method, url, headers=headers, data=data)
        return response
    

    这样就解决了会话保持的问题,但是还有个问题,session迟早会过期,所以还需要加上 session 过期自动重新登录的功能。
    这个系统 session 过期或登陆失败,都会重定向会登录页,所以此处用网页 title 判断是否 session 过期。所以增加了以下方法,这样 session 过期或者验证码识别错了导致登录失败,都可以自动重新登录。

    
    def request_retry(url, method=request_method.GET, headers=None, data=None):
        """
        请求数据
        :param url:
        :param method:
        :param headers:
        :param data:
        :return:
        """
        response = session.request(method, url, headers=headers, data=data)
        if response.headers.get("Content-Type") == "text/html;charset=UTF-8":
            title = get_tile(response.text)
            if title == wrong_title:
                login()
                response = session.request(method, url, headers=headers, data=data)
        return response
    
    

    3.3 定时任务和爬取数据

    定时任务用到了 apscheduler 库

    from apscheduler.schedulers.blocking import BlockingScheduler
    

    创建一个 BlockingScheduler 对象,然后把需要定时的方法放入参数中。此处设置的是在 7 点到 23 点之间,每半小时更新一次。

    def main():
        # 登录,收集数据
        login.login()
        data_collector()
        # 定时收集数据
        task = BlockingScheduler()
        task.add_job(data_collector, "cron", hour="7-23", minute="*/30")
        task.start()
    
    
    def data_collector():
        """
        数据收集
        :return:
        """
        logging.info("开始定时任务")
        # 同步进度信息
        progress_collector.get_construct_points()
        logging.info("定时任务完成")
    

    所爬数据分两种,一种是响应格式为 json,另一种响应格式为 html 页面。
    json 格式的数据可以借助 json 库转化下,直接取出想要的字段即可,例如:

    def get_retry_std(url, headers=None, data=None):
        """
        get方法请求数据,返回json,只适合 responses 是标准输出的情况
        :param url:
        :param headers:
        :param data:
        :return:
        """
        response = get_retry(url, headers=headers, data=data)
        if response.headers.get("Content-Type") != "text/plain;charset=UTF-8":
            log.error("Content-Type 必须是text/plain;charset=UTF-8")
        if response.status_code != 200:
            log.error("请求异常,状态码:", response.status_code)
        response_body = json.loads(response.text)
        return response_body["result"]
    

    html 页面的数据需要借助 lxml 解析 dom,最后获取想要的 dom 的数据即可,例如:

    from lxml import etree
    def get_tile(text):
        """
        根据文本获取html页面title
        :param text:
        :return:
        """
        tree = etree.HTML(text)
        return tree.xpath("//title")[0].text
    

    4. 使用说明

    4.1. 服务器上启动

    直接运行 docker 镜像即可

    4.2. 本地使用

    本地启动 starter.py 即可,半小时刷新一次。

    test_main.py 中 test_analyse_accuracy() 方法用于测试识别精确度,自定义循环次数,需要每次手动输入用于判断机器识别是否正确。输出结果:

    ...
    您输入的key是:-,机器识别的key是:-
    您输入的key是:7,机器识别的key是:7
    您输入的key是:1,机器识别的key是:1
    
    
    共测试58次,正确率百分之94.83
    

    test_bound_result() 方法,用于画矩形边框,辅助调整切割图片。
    test_mark_images() 人工标注图片,标注好的切图存于 img_lib 中,用作训练数据。

    配置文件中的内容需要 [mysql] 自行补充, [crawler] 部分仅适用于本例,需根据实际情况定。

    遗留问题

    1. 误判率较高,原因有三:
      字符倾斜角度不一,拉伸困难
      字符粘连,平均分割图片并不能准确分割字符
      ‘*’ 识别率低,因为 ‘*’ 会被识别成几个小点
    2. TODO:训练结果可以序列化保存,避免每次重新计算,从而不用再依赖 img_lib 库

    参考文章

    Python 3 教程

    用Python识别验证码

    opencv-python 入门篇

    opencv-python 指南

    相关文章

      网友评论

          本文标题:python 爬虫从 0 到 1,实现识别验证码登录、会话保持、

          本文链接:https://www.haomeiwen.com/subject/rvkywltx.html