美文网首页
python的爬虫项目(链家买二手房)

python的爬虫项目(链家买二手房)

作者: thirsd | 来源:发表于2020-05-02 11:28 被阅读0次

    不知不觉,已经工作6年有余,恍恍惚惚,有机会满足房子需求。
    在收集房子信息过程中,做些记录。

    贝壳的功能很强大,但很难满足小区、距离、教育、面积等多个方面的匹配,使用起来成本仍然较高。

    针对以上情况,编写该项目,收集链家的二手房数据。项目中,主要根据价格来筛选小区,并根据小区教育、同工作位置的距离来确定关注小区,再通过房子面积、总价、户型来确定可以选择的房子 列表,从而将购房精力集中在关注的重点小区和房子中。

    当然,每个人可以根据自己需求进行调整。

    一、基础环境说明

    1.1 基础环境

    1.1.1 python

    官网:https://www.python.org/

    官方文档:https://www.python.org/doc/

    1.1.2 request(加载页面)

    官方文档:https://cn.python-requests.org/zh_CN/latest/

    1.1.3 BeautifuSoup(提取信息 )

    官方文档:https://www.crummy.com/software/BeautifulSoup/bs4/doc/index.zh.html

    常用使用例子:

    from bs4 import BeautifulSoup
    soup = BeautifulSoup(a, "html.parser")
    soup.title.text # '标题'
    
    # 一、提取标签
    # 1.1 提取唯一标签
    soup.h1
    soup.find('h1')
    soup.find_all('h1')[0]
    # 1.2 提取多个标签
    soup.find_all('h2')
    # [<h2>标题2</h2>, <h2>标题3</h2>]
    soup.find_all(['h1','h2'])
    # [<h1>标题1</h1>, <h2>标题2</h2>, <h2>标题3</h2>]
    # 1.3 使用正则表达式
    import re
    soup.find_all(re.compile('^h'))
    # [<h1>标题1</h1>, <h2>标题2</h2>, <h2>标题3</h2>]
    
    # 二、匹配属性
    # 2.1 匹配属性1,直接将属性名作为参数名,但是有些属性不行,比如像a-b这样的属性
    soup.find_all('p', id = 'p1') # 一般情况
    soup.find_all('p', class_='p3') # class是保留字比较特殊,需要后面加一个_
    # 2.2 匹配属性2,最通用的方法
    soup.find_all('p', attrs={'class':'p3'}) # 包含这个属性就算,而不是只有这个属性
    soup.find_all('p', attrs={'class':'p3','id':'pp'}) # 使用多个属性匹配
    soup.find_all('p', attrs={'class':'p3','id':False}) # 指定不能有某个属性
    soup.find_all('p', attrs={'id':['p1','p2']}) # 属性值是p1或p2
    soup.find_all('p', attrs={'class':True}) # 含有class属性即可
    # 2.3 匹配属性3,正则表达式匹配
    import re
    soup.find_all('p', attrs={'id':re.compile('^p')}) # 使用正则表达式
    
    
    # 三、根据标签内容文本来识别
    # 3.1 匹配标签内容1,正则表达式
    import re
    soup.find_all('p', text=re.compile('段落'))
    soup.find_all('p',text=True)
    # 3.2 匹配标签内容2,传入函数
    def nothing(c):
        return c not in ['段落1','段落2','文章']
    soup.find_all('p',text=nothing)
    
    def has_class_but_no_id(tag):
        return tag.has_attr('class') and not tag.has_attr('id')
    
    # 四、提取内容
    # 4.1 提取标签文本
    soup.h.text # 多层嵌套也可以直接返回
    soup.h.a.text # 也可以这样
    soup.body.text # 里面有多个内容时 '\n标题\n段落1\n段落2\n'
    # 4.2 其他标签的属性值
    # 提取属性值,像字典一样提取,以下两种方法等价
    soup.h.a['href']
    soup.h.a.get('href')
    
    # 五、提取标签信息
    print(i.name) # 提取标签名
    print(i.attrs) # 提取标签所有属性值
    print(i.has_attr('href')) # 检查标签是否有某属性
    
    # 五、示例
    soup.find('p', attrs={'class':'first'}).text # '文字1'
    soup.find_all('p') # [<p class="first">文字1</p>, <p class="second">文字2</p>], 再分别从中提取文字
    soup.find('ul', attrs={'class':'list1'}).find_all('li') # [<li>列表1第1项</li>, <li>列表1第2项</li>]
    
    
    # 代码参考:https://zhuanlan.zhihu.com/p/35354532
    

    1.1.4 地理位置信息(百度API)

    官方文档:http://lbsyun.baidu.com/index.php?title=webapi/guide/webservice-geocoding

    调用方式1:

    def geocodeB(address):
        base = "http://api.map.baidu.com/geocoder?address=%s&output=json&key=yourak&city=上海" % address
        response = requests.get(base)
        if response.status_code == 200:
            answer = response.json()
            if "location" in answer['result'] and "level" in answer['result']:
                return (address,
                        # round(answer['result']['location']['lng'], 5),
                        answer['result']['location']['lng'],
                        # round(answer['result']['location']['lat'], 5),
                        answer['result']['location']['lat'],
                        answer['result']["level"])
            else:
                logger.error("geocodeB %s warning:%s" % (address, answer))
                return None
        else:
            logger.error("geocodeB %s Error" % address)
            return None
    

    调用方式2:

    def geocodeB2(address):
        from urllib.request import urlopen, quote
        from urllib.parse import quote_plus
        import hashlib, json
        # 以get请求为例http://api.map.baidu.com/geocoder/v2/?address=百度大厦&output=json&ak=yourak
        queryStr = '/geocoder/v2/?address=%s&city=上海&output=json&ak=$yourak$' % address
    
        # 对queryStr进行转码,safe内的保留字符不转换
        encodedStr = quote(queryStr, safe="/:=&?#+!$,;'@()*[]")
    
        # 在最后直接追加上yoursk
        rawStr = encodedStr + '$yoursn$'
        sn = hashlib.md5(quote_plus(rawStr).encode("utf8")).hexdigest()
    
        url = 'http://api.map.baidu.com%s&sn=%s' % (encodedStr, sn)
        req = urlopen(url)
        res = req.read().decode()  # 将其他编码的字符串解码成unicode
        answer = json.loads(res)  # 对json数据进行解析
        if "location" in answer['result'] and "level" in answer['result']:
            return answer['result']['location']['lat'], answer['result']['location']['lng']
        else:
            logger.error("geocodeB %s warning:%s" % (address, answer))
            return None
    
    

    调用方式3:

    def geocode_by_baidu(address):
        from geopy.geocoders import baidu
        apikey = '$yourak$'  # 从网站申请 http://lbsyun.baidu.com/apiconsole/key?application=key
        sn = '$yoursn$'
        g = baidu.Baidu(api_key=apikey, security_key=sn, timeout=200)
        a = g.geocode(address)
        # return (round(a.latitude, 6), round(a.longitude, 6))
        return a.latitude, a.longitude
    

    1.1.5 地理获取距离计算(geopy)

    # x and y is (lat,lng)
    def get_distance(x, y):
        from geopy.distance import geodesic
        return round(geodesic(x, y).km, 3)
    

    1.1.6 解决懒加载和滚动加载(selenium)

    Selenium是一个用于Web应用程序测试的工具。

    Selenium测试直接运行在浏览器中,就像真正的用户在操作一样。支持的浏览器包括IE(7, 8, 9, 10, 11),Firefox,Safari,Chrome,Opera等。

    使用python爬虫调用selenium来模拟正常用户访问浏览器.

    1.2 主要问题

    1.2.1 懒加载问题

    参考:http://www.selenium.org.cn/

    1.2.2 滚动加载问题

    1.2.3 IP访问限制

    参考:Python爬虫 | 代理IP的获取和使用

    二、前期准备

    2.1 分析获取的需求

    个人买房需求:

    预算:400万,最多不超过450万;

    教育:2梯队学区房

    户型:二房以上

    房龄:1990年后

    面积:60平米以上

    交通:离世纪大道乘公交不超过1小时

    ===》

    1、学区情况,根据小区攻略的教育评分来过滤,确定小区范围

    2、根据a. 小区的中的房子的价格,使用预算过滤;b.小区的位置,通过距离来过滤交通,不满足 的小区

    3、通过符合要求的小区列表,来针对每个小区获取房子列表,并确定跟踪重点小区

    特别说明:

    1、为什么不直接获取房子呢?房子无法判断是否满足教育;如果通过房子找小区,再找教育,考虑房子比小区多出几个数量级,会有更多的时间浪费

    2、通过预算和面积需求,可以确定房子的单价,通过单价来筛选小区,减少小区范围。

    2.2 分析页面路径

    2.2.1 获取小区列表

    1、小区列表的链接分析

    由于链家仅显示前100页内容,而整个上海的小区显然比100页更多,故根据区来获取小区。

    https://m.ke.com/sh/xiaoqu/hongkou/bp5ep7.5pg%s/

    其中

    1. bp5ep7.5为价格在5-7.5万的区间,bp为begin price;ep为end price。
    2. pg为page的页面

    2、小区是否有评价的判断

    可以根据第一步获取的小区列表中,查看小区是否存在小区攻略标签来判断是否有小区评价信息

    特别说明:并不是每一个小区,都可以查看到小区的教育评分

    示例链接:https://m.ke.com/sh/xiaoqu/5011000016009/,可以获取到小区的整体的评分

    2.2.2 根据小区,获取攻略

    小区的攻略地址为:

    https://m.ke.com/sh/xiaoqu/5011000016009/gonglueV2.html?click_source=m_resblock_detail#review

    对于小区,有总体评分和分项评分,其中分项评分包含建筑品质、户型设计、交通条件、教育质量、商业环境、花园景观、物业管理等评分。

    每个人可以根据自己的需求,使用不同的评分项进行小区过滤。

    例如,我优先考虑教育,则以教育条件进行主要过滤条件,要求教育8分以上,而其他的要求6.5分以上。

    2.2.3 根据小区,获取房子列表

    小区的房子列表:

    原始路径:https://m.ke.com/sh/ershoufang/c5011000016009/

    增加过滤:https://m.ke.com/sh/ershoufang/bp350ep450l2l3ba67ea70c5011000016009
    其中,bp350ep450为价格区间为350-450万;l2l3户型为2室或3室;ba67ea70为面积在67-70平米;c5011000016009 为小区编号为5011000016009。

    三、项目代码实现

    3.1 获取小区

    def get_xiaoqu_list(self, area, save_path):
        page_size = 100
        # 由于仅收集上海,故未对多城市处理
        fieldnames = ['area', 'page', 'xiaoqu_id', 'url', 'name', "brief", "loc", "build_type", "build_year", "price",
                      "have_gonglue"]
    
        # 如果不存在,则创建一个空CSV文件,包含表头
        # 如果已存在,则将记录已处理的记录情况(针对IP限制,需要跑多次情况)
        handled_list = []
        if os.path.isfile(save_path):
            with open(save_path, encoding='utf-8-sig') as csvfile:
                reader = csv.DictReader(csvfile)
                for row in reader:
                    handled_list.append("%s_%s" % (row['area'], row['page']))
        else:
            with open(save_path, "a+", newline='\n', encoding='utf-8-sig') as csvfile:
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                writer.writeheader()
    
        handled_set = set(handled_list)
        logger.info(
            "get_xiaoqu_list, have handled:%s " % (len(handled_set)))
    
        # 针对上海各区进行处理
        with open(save_path, "a+", newline='\n', encoding='utf-8-sig') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            for page_num in range(1, page_size):
                # https://m.ke.com/sh/xiaoqu/pudong/pb4ep4.5pg10/
                url = "https://m.ke.com/sh/xiaoqu/%s/bp5ep7.5pg%s/" % (area, str(page_num))
    
                if "%s_%s" % (area, page_num) in handled_set:
                    logger.info("%s has been handled." % url)
                    continue
                else:
                    logger.info(url)
    
                # 获取页面内容
                r = requests.get(url=url, headers=self.page_headers)
                html = r.content
                lj = BeautifulSoup(html, 'html.parser')
                page_items = lj.find_all('li', attrs={'class': 'pictext'})
    
                # 解析各页中的小区列表
                if len(page_items) > 0:
                    for item in page_items:
                        xiaoqu_url = item.a.get('href')
                        xiaoqu_id = xiaoqu_url.split("/")[-2]
                        xiaoqu_gonglue = item.find_all("p", attrs={"class": "gonglue_title"})
                        if len(xiaoqu_gonglue) == 0:
                            is_gonglue = 0
                        else:
                            is_gonglue = 1
                        xiaoqu_info = item.find_all("div", attrs={"class": "item_list"})[0]
                        xiaoqu_name = xiaoqu_info.find_all("div", attrs={"class": "item_main"})[0].string
                        xiaoqu_brief = xiaoqu_info.find_all("div", attrs={"class": "item_other"})[0].string.strip(
                            "\n\r \"")
                        xiaoqu_brief = " ".join(xiaoqu_brief.split())
                        xiaoqu_loc = xiaoqu_brief.split()[0]
                        build_type = xiaoqu_brief.split()[1]
                        build_year = re.search(r' (?P<build_year>\d{1,})年建成', xiaoqu_brief, re.I)
                        if build_year:
                            xiaoqu_build = build_year.group("build_year")
                        else:
                            xiaoqu_build = ""
                        xiaoqu_price = xiaoqu_info.find_all("span", attrs={"class": "price_total"})[0].em.string
    
                        xiaoqu_dict = {
                            "area": area,
                            "page": page_num,
                            "xiaoqu_id": xiaoqu_id,
                            "url": xiaoqu_url,
                            "name": xiaoqu_name,
                            "brief": xiaoqu_brief,
                            "loc": xiaoqu_loc,
                            "build_type": build_type,
                            "build_year": xiaoqu_build,
                            "price": xiaoqu_price,
                            "have_gonglue": is_gonglue
                        }
                        writer.writerow(xiaoqu_dict)
    
                else:
                    # 表面已到最后一页
                    break
                handled_set.update({"%s_%s" % (area, page_num)})
    

    3.2 根据小区列表,获取包含攻略的小区

    3.2.1 根据单个页面获取小区详细信息

    # 根据指定小区的id,获取小区的攻略信息
    def get_xiaoqu_gonglue_dict(self, id):
        url = "https://m.ke.com/sh/xiaoqu/%s/gonglueV2.html?click_source=m_resblock_detail#review" % id
        logger.info(url)
    
        # 根据url加载页面
        # https://m.ke.com/sh/xiaoqu/5011000007603/gonglueV2.html?click_source=m_resblock_detail#review
        html = requests.get(url=url, headers=self.page_headers).content
        lj = BeautifulSoup(html, 'html.parser')
        loc_node = lj.find('div', attrs={'class': 'head_location'})
        if loc_node is not None:
            loc_name = loc_node.string
        else:
            loc_name = ""
        cpt_content = lj.find_all('div', attrs={'id': 'review'})[0]
    
        totoal_score = cpt_content.find('div', attrs={'class': "review_score"}).get_text().replace("综合测评得分", "")
        review_txt = ""
        if cpt_content.find('div', attrs={'class': "review_txt_box"}) is not None:
            review_txt = cpt_content.find('div', attrs={'class': "review_txt_box"}).get_text().strip(" \n\r")
    
        review_list_txt = cpt_content.find('ul', attrs={'class': "review_list"})
        review_list = review_list_txt.find_all('li')
        other = ""
        jianzhu_score = huxing_score = jiaotong_score = shangye_score = jiaoyu_score = jingguan_score = wuye_score = ""
        for item in review_list:
            key = item.span.string
            value = item.progress.get('value')
            if key == "建筑品质":
                jianzhu_score = value
            elif key == "户型设计":
                huxing_score = value
            elif key == "交通条件":
                jiaotong_score = value
            elif key == "教育质量":
                jiaoyu_score = value
            elif key == "商业环境":
                shangye_score = value
            elif key == "花园景观":
                jingguan_score = value
            elif key == "物业管理":
                wuye_score = value
            else:
                other = " %s:%s " % (key, value)
    
        peitao_node = lj.find('div', attrs={"class": "box peitao card_box"})
        map_api_node = peitao_node.find('img') if peitao_node is not None else None
        if map_api_node is not None:
            map_api = map_api_node.get('src')
        else:
            map_api = ""
    
        def get_geo_from_mapapi(map_api):
            geo = re.search(r'center=(?P<lng>[\d.]+),(?P<lat>[\d.]+)', map_api, re.I)
            if geo:
                lat = geo.group("lat")
                lng = geo.group("lng")
            else:
                lat = lng = None
            return lat, lng
    
        lat, lng = get_geo_from_mapapi(map_api)
    
        gonglue_dict = {
            "xiaoqu_id": id,
            "loc_name": loc_name,
            "total_score": totoal_score,
            "review_txt": review_txt if review_txt is not None else "",
            "jianzhu_score": jianzhu_score if jianzhu_score is not None else "",
            "huxing_score": huxing_score if huxing_score is not None else "",
            "jiaotong_score": jiaotong_score if jiaotong_score is not None else "",
            "jiaoyu_score": jiaoyu_score if jiaoyu_score is not None else "",
            "shangye_score": shangye_score if shangye_score is not None else "",
            "jingguan_score": jingguan_score if jingguan_score is not None else "",
            "wuye_score": wuye_score if wuye_score is not None else "",
            "map_api": map_api,
            "lng": lng if lng is not None else "",
            "lat": lat if lat is not None else "",
            "other": other
        }
        return gonglue_dict
    

    3.2.2 根据列表,生成所有攻略信息列表

    # 根据第一步获取的小区列表,逐项生成攻略列表
    def handle_gonglue_by_xiaoqu(self, file_path, save_path, if_distance=False, local_geo=None):
        # 判断参数是否正确
        if if_distance == True and local_geo is None:
            logger.error("in handle_gonglue_by_xiaoqu, if_distance's Ture, local_geo can't be None")
            exit(1)
    
        # 生成小区列表
        url_list = []
        with open(file_path, encoding='utf-8-sig') as csvfile:
            reader = csv.DictReader(csvfile)
            for row in reader:
                if row['have_gonglue'] == "1":
                    url_list.append(row['xiaoqu_id'])
    
        # 如果攻略列表已存在,则统计已处理的记录
        handled_list = []
        fieldnames = ['xiaoqu_id', 'loc_name', 'total_score', "review_txt", "jianzhu_score", "huxing_score",
                      "jiaotong_score", "jiaoyu_score", "shangye_score", "jingguan_score", "wuye_score",
                      "map_api", "lat", "lng", "distance", "other"]
        if os.path.isfile(save_path):
            with open(save_path, encoding='utf-8-sig') as csvfile:
                reader = csv.DictReader(csvfile)
                for row in reader:
                    handled_list.append(row['xiaoqu_id'])
        else:
            # 如果不存在,则创建一个空CSV文件,包含表头
            with open(save_path, "a+", newline='\n', encoding='utf-8-sig') as csvfile:
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                writer.writeheader()
    
        handled_set = set(handled_list)
        logger.info("handle_gonglue_by_xiaoqu, the length of url_list: %s" % len(url_list))
    
        # 针对每一个小区列表,获取攻略信息
        with open(save_path, "a+", newline='\n', encoding='utf-8-sig') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            for xiaoqu_id in url_list:
                if xiaoqu_id not in handled_set:
                    gonglue_dict = self.get_xiaoqu_gonglue_dict(id=xiaoqu_id)
                    if if_distance:
                        distance = get_distance((gonglue_dict["lat"], gonglue_dict["lng"]), local_geo)
                        gonglue_dict["distance"] = distance
                    writer.writerow(gonglue_dict)
                    handled_set.update({xiaoqu_id})
                else:
                    logger.info("xiaoqu %s is handled" % xiaoqu_id)
    

    3.3 根据攻略列表,生成关注的房子列表

    3.3.1 获取单个小区的房子列表

    # 根据小区id,获取小区的满足条件的房子列表
    def get_houselist_by_xiaoqu(self, xiaoqu_id):
        # https://m.ke.com/sh/ershoufang/bp350ep450l2l3ba67ea70c5011000009590
        # bp350ep450 表示价格开始和结束
        # l2l3 户型2室和3室
        # ba67ea70 面积67-70
        # c5011000009590 小区编号
        url = "https://m.ke.com/sh/ershoufang/bp350ep450l2l3ba60ea90c%s" % xiaoqu_id
        html = requests.get(url=url, headers=self.page_headers).content
    
        house_list = []
        lj = BeautifulSoup(html, 'html.parser')
        # 页面中包含多个列表,包含当前搜索以及推荐其他小区
        view_body = lj.find('div', attrs={'class': 'list-view-section-body'})
        item_list = view_body.find_all('div', attrs={'class': 'lj-track', 'data-click-event': 'SearchClick'})
        for item in item_list:
            house_body = item.find("div", attrs={'class': 'kem__house-tile-ershou'})
            house_id = house_body.get("data-id")
            logger.info("handle house_id:%s" % house_id)
    
            house_txt = house_body.find("div", attrs={'class': 'house-text'})
            house_title = house_txt.find("div", attrs={"class": 'house-title'}).text
            house_desc = house_txt.find("div", attrs={"class": 'house-desc'}).string
            house_price_total = house_txt.find("span", attrs={"class": "price-total"}).strong.string
            house_price_unit = house_txt.find("span", attrs={"class": "price-unit"}).string.strip("元/平")
    
            house_dict = {
                "xiaoqu_id": xiaoqu_id,
                "house_id": house_id,
                "title": house_title,
                "desc": house_desc,
                "price_total": house_price_total,
                "price_unit": house_price_unit
            }
            house_list.append(house_dict)
        return house_list
    

    3.3.2 根据攻略列表,生成房子列表

    # 根据攻略列表,提取关注的小区,再逐项获取列表
    def handle_hoselist_by_gonglue(self, file_path, save_path, filter_func=None):
        xiaoqu_list = []
        with open(file_path, encoding='utf-8-sig') as csvfile:
            reader = csv.DictReader(csvfile)
            for row in reader:
                if filter_func is not None:
                    if filter_func(row):
                        # 将小区的ID,加入到处理列表中
                        xiaoqu_list.append((row["xiaoqu_id"], row["loc_name"], row["distance"]))
                else:
                    xiaoqu_list.append((row["xiaoqu_id"], row["loc_name"], row["distance"]))
    
        handled_list = []
        fieldnames = ['xiaoqu_id', 'xiaoqu_name', 'distance', 'house_id', 'title', "desc", "price_total", "price_unit"]
        if os.path.isfile(save_path):
            with open(save_path, encoding='utf-8-sig') as csvfile:
                reader = csv.DictReader(csvfile)
                for row in reader:
                    handled_list.append(row['xiaoqu_id'])
        else:
            # 如果不存在,则创建一个空CSV文件,包含表头
            with open(save_path, "a+", newline='\n', encoding='utf-8-sig') as csvfile:
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                writer.writeheader()
    
        handled_set = set(handled_list)
        logger.info(
            "handle_hoselist_by_xiaoqu, to be handled: %s, have handled:%s " % (len(xiaoqu_list), len(handled_set)))
    
        with open(save_path, "a+", newline='\n', encoding='utf-8-sig') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
            for xiaoqu_id, xiaoqu_loc_name, distance in xiaoqu_list:
                if xiaoqu_id not in handled_set:
                    logger.info("handle xiaoqu:%s" % xiaoqu_id)
                    house_list = self.get_houselist_by_xiaoqu(xiaoqu_id)
                    if len(house_list) > 0:
                        for house_dict in house_list:
                            house_dict["xiaoqu_name"] = xiaoqu_loc_name
                            house_dict["distance"] = distance
                            writer.writerow(house_dict)
                    else:
                        house_dict = {
                            "xiaoqu_id": xiaoqu_id,
                            "xiaoqu_name": xiaoqu_loc_name,
                            "distance": distance
                        }
                        writer.writerow(house_dict)
                        logger.info("小区:%s %s have no match house." % (xiaoqu_id, xiaoqu_loc_name))
    
                    handled_set.update({xiaoqu_id})
                else:
                    logger.info("%s is handled" % xiaoqu_id)
    

    相关文章

      网友评论

          本文标题:python的爬虫项目(链家买二手房)

          本文链接:https://www.haomeiwen.com/subject/vvfkghtx.html