美文网首页
用ChatGpt 实现一次简单的 python 新手爬虫

用ChatGpt 实现一次简单的 python 新手爬虫

作者: cgrass | 来源:发表于2023-11-07 11:29 被阅读0次

    前言

    1.适合有一定程序思维的速成者。
    2.适合有其他语言基础的速成者。

    爬虫流程

    1.明确需求
    就是你要从哪个网址拿到什么信息,本次我们需要yelp 的二级网址拿到商户的商户名,电话,地址,访问链接。
    2.分析网址
    查找你所需要的内容的相关熟悉。
    3.开发调试
    vscode 或者 pycharm
    3.输出文档
    输出csv文档

    开始搞活

    1.先看下需求
    需要在yelp网址上查找固定的几个美国州 ['FL','GA','IL','IN','MD','MA','MI', 'NJ','NY','NC','OH','PA','SC','TN','TX','VA']内 "All You Can Eat" l类型的店铺信息.
    如下图所示,具体链接为https://www.yelp.com/search?find_desc=All+You+Can+Eat&find_loc=FL

    image.png

    可以看到总21页,每页10条,除了正常要获取的10条商户信息外还有,数量不固定的广告位。为了好爬取信息,我这边直接忽略广告位的信息。直接爬取每页最多10条的商户信息。

    2.点击第二页查看下不同页数的 访问规律:
    https://www.yelp.com/search?find_desc=All+You+Can+Eat&find_loc=FL&start=10
    每页start+10

    3.接着查看 我们需要的数据在哪个位置
    打开浏览器的F12 进入调试模式,建议google浏览器


    image.png

    通过初步分析发现我们要的数据在这边,看过去class name都是固定的,也有我们要的列表点击进去的二级url。


    image.png
    或者通过这种方式查找
    image.png
    通过观察发现数据在<Script>标签下面
    image.png

    下面这个是列表的名称和地址


    image.png

    本文通过的是爬取Script标签数据,来获取二级网址。

    4.获取到二级网址后,我们继续分析二级网址内容
    如下图所示,电话,地址都有了。


    image.png

    通过f12 看看这些东西在哪里


    image.png

    看上图就可以直接看出name,telephone,address

    5.东西都查找完了,这个时候,如何提取,如何编写代码呢,对于不熟悉python的人来说,chatGpt就可以派上用场了,哪里报错了,就问chatGpt,边问边搞。
    比如:
    如何获取scrpt 标签中的内容


    image.png

    如何爬取script标签中的指定内容(正则)


    image.png
    只要你懂的问,就有结果。

    接下来我们用chatGpt来边问边写代码.
    先看看代码主体流程

    # 程序结构
    class xxxSpider(object):
        def __init__(self):
            # 定义常用变量,比如url或计数变量等
           
        def get_html(self):
            # 获取响应内容函数,使用随机User-Agent
       
        def parse_html(self):
            # 使用正则表达式来解析页面,提取数据
       
        def write_html(self):
            # 将提取的数据按要求保存,csv、MySQL数据库等
           
        def run(self):
            # 主函数,用来控制整体逻辑
           
    if __name__ == '__main__':
        # 程序开始运行时间
        spider = xxxSpider()
        spider.run()
    

    基本上的爬虫流程就是这样的,定义url,获取url的内容,解析url,把获取到的东西写到文件中存储。

    具体的代码如下:
    csv 文件工具 csvUtil.py

    import csv
    def writeCsv(name,urls):
        with open(name+'.csv', 'w', newline='') as csvfile:
            writer = csv.writer(csvfile)
            #构建字段名称,也就是key
            # 写入表头
            writer.writerow(['url'])
            # 写入每个 URL
            for url in urls:
                writer.writerow([url])
    def initShopCsv(name):
        with open(name+'.csv', 'w', newline='') as csvfile:
            writer = csv.writer(csvfile)
            #构建字段名称,也就是key
            # 写入表头
            writer.writerow(['ShopName',"Phone","Address","Url"])
            
    def writeShopCsv(data,name):
        with open(name+'.csv', 'a', newline='', encoding='utf-8') as csvfile:
            writer = csv.writer(csvfile)
            # 写入每个
            writer.writerow(data)
            print("写入数据:"+str(data))
    

    常量类address_info.py

    states=['FL','GA','IL','IN','MD','MA','MI',
                      'NJ','NY','NC','OH','PA','SC','TN','TX','VA']
    

    主体代码 yelp.py

    from urllib import parse
    import time
    import random
    from fake_useragent import UserAgent
    import requests
    from lxml import etree
    from bs4 import BeautifulSoup
    import re
    import json
    import csvUtil
    from static.address_info import states
    #忽略ssl报错
    requests.packages.urllib3.disable_warnings()
    import concurrent.futures
    
    class openTableSpider(object):
        #1.初始化,设置基地址
        def __init__(self):
            self.url='https://www.yelp.com/search?{}'
         
        #2.搜索店面,加入头部信息,查询时间理应大于当前时间,否则容易被发现不是人工点击
        #查找到收寻的店明的列表信息
        def searchShopHtml(self,url,state,size):
            #参数信息
            params={
                'find_desc':"All You Can Eat",
                'find_loc':state,
                'start':size
            }
            #格式化信息
            full_url=url.format(parse.urlencode(params))
            print(""+full_url)
            ua=UserAgent()
            # req=requests.Request(url=full_url,)
            respone=requests.get(url=full_url,headers={'User-Agent':'AdsBot-Google'},verify=False)
           # 使用 BeautifulSoup 解析 HTML
            soup = BeautifulSoup(respone.text, 'html.parser')
            # 查找所有的 <script> 标签
            script_tags = soup.find_all('script')
            # 输出每个 <script> 标签的内容
            for script in script_tags:
                script_str=script.string
                # 关键字   businessUrl /biz 开头
                if script_str and 'businessUrl' in script_str:
                    # print("开始:"+script_str)
                    #查找到相应的访问url
                    link_match=re.findall(r'"businessUrl":"([^"]+)"',script_str)
                    if link_match:
                        format_urls=[x for x in link_match if "ad_business_id" not in x]
                        # 去重
                        urls = list(set(format_urls))
                        # print(len(urls))
                        print(str(urls))
                        # csvUtil.writeCsv(shopName,link_match)
                        return urls
                       
           
         # 3.解析查找到的每家店的url,并且获取到店面和电话号码,以及地址。
        def parse_Url(self,url,state):
            url="https://www.yelp.com/"+url
         
            print("Start doing "+url)
            ua=UserAgent()
            respone=requests.get(url=url,headers={'User-Agent':ua.random},verify=False)
            # 使用 BeautifulSoup 解析 HTML
            soup = BeautifulSoup(respone.text, 'html.parser')
            # 查找所有的 <script> 标签
            script_tags = soup.find_all('script')
            # 输出每个 <script> 标签的内容
            # 找到name,phone,address 数据的位置,发现在 script标签底下
            # name window.__INITIAL_STATE__   restaurantProfile  restaurant  name   address  
            #  phone=  contactInformation   formattedPhoneNumber
            #  address address":{Phone number
            for script in script_tags:
                script_str=script.string
                address=name=phone=""
                if script_str and 'telephone' in script_str:
                    # print(script_str)
                    m_address = re.search( r'"address":{(.*?)},', script_str)
                    if m_address:
                        # {"line1":"5115 Wilshire Blvd","line2":"","state":"CA","city":"Los Angeles","postCode":"90036","country":"United States","__typename":"Address"},
                        temp=m_address.group(1)
                        # print(temp)
                        try:
                            streetAddress = re.search(r'"streetAddress":"([^"]*)"', temp).group(1)
                            addressLocality = re.search( r'"addressLocality":"([^"]*)"', temp).group(1)
                            addressCountry = re.search( r'"addressCountry":"([^"]*)"', temp).group(1)
                            addressRegion = re.search( r'"addressRegion":"([^"]*)"', temp).group(1)
                            postalCode = re.search(r'"postalCode":"([^"]*)"', temp).group(1)
                            address=streetAddress.replace("\\n","")+","+addressLocality+","+addressRegion+","+postalCode
                            print(address)
                            # flag=any(item.upper() in state.upper() for item in openTableAddress)
                            # # 如果不在地址池中的店,则不记录下来
                            # if flag is False:
                            #     return
                        except  Exception as e:
                            address=streetAddress+","+addressLocality+","+addressRegion+","+postalCode
                            print("地址解析异常:", e)
                        
                    m_name = re.search( r'"name":"(.*?)",', script_str)
                    if m_name:
                        name=m_name.group(1)
                        # print(name)
                    m_phone = re.search( r'telephone":"(.*?)",', script_str)
                    if m_phone:
                        phone=m_phone.group(1)
                        # print(phone)
                    data=[name,phone,address,url] 
                    csvUtil.writeShopCsv(data=data,name=state)                   
            
        # 4.一个一个流程
        def doTransaction(self,state,page):
            urls=self.searchShopHtml(self.url,state,page)
            if not urls:
                return
            print(len(urls))
            while len(urls)%10==0:
                print(len(urls))
                new_urls=[]
                reptry=0
                page+=10
                reptry=0
                #重试10次
                while(reptry<10):
                    try:
                        new_urls = self.searchShopHtml(spider.url,state, page)
                        break
                    except  Exception as e:
                            print("发生了其他异常:", e)
                            # 重试一次
                            reptry+=1
                            time.sleep(random.randint(5,10))
                 # 如果没有新的URLs,即查询结果为空,终止循环
                if not new_urls:
                    break
                urls += new_urls
                time.sleep(random.randint(5,10))                
                
            count=0     
            #重试10次
            for url in urls:
                reptry=0
                while(reptry<10):
                    try:
                        self.parse_Url(url,state) 
                        count+=1
                        print(state+" deal count "+str(count))
                        break
                    except  Exception as e:
                        print("发生了其他异常:", e)
                        reptry+=1
                    finally:
                        time.sleep(random.randint(3,10))     
        
        #5.入口函数
        def run(self):
            
            for state in states:
                  csvUtil.initShopCsv(state)
            # 创建线程池并并发执行任务
            with concurrent.futures.ThreadPoolExecutor() as executor:
            # 使用executor.map并发执行任务,传递参数列表
                executor.map(self.doTransaction, states, [0] * len(states))
            print("All tasks have been submitted.")
            # # 单线程处理
            # threads = []
            # for state in states:
            #      print(state)
            #      csvUtil.initShopCsv(state)
            #      thread = threading.Thread(target=)
            #      thread.start()
            #      threads.append(thread)
           
            # # 等待所有线程完成
            # for thread in threads:
            #     thread.join()
          
    if __name__=='__main__':
        start=time.time()
        spider=openTableSpider() #实例化一个对象spider
        # spider.searchShopHtml(spider.url,"New York",0) #调用入口函数
        # spider.parse_Url("biz/zen-korean-bbq-duluth?osq=All+You+Can+Eat+Buffet","GEORGIA") 
        spider.run()
        end=time.time()
        #查看程序执行时间
        print('执行时间:%.2f'%(end-start)) 
    
    

    具体内容查看注释,有不懂操作的直接问gpt

    相关文章

      网友评论

          本文标题:用ChatGpt 实现一次简单的 python 新手爬虫

          本文链接:https://www.haomeiwen.com/subject/gzqfwdtx.html