美文网首页
基于pyppeteer实现迷你Web测试框架

基于pyppeteer实现迷你Web测试框架

作者: 汪少SZ | 来源:发表于2019-06-20 19:28 被阅读0次
简介:

Puppeteer(中文翻译”木偶”) 是 Google Chrome 团队官方的无界面(Headless)Chrome 工具,它是一个 Node 库,提供了一个高级的 API 来控制 DevTools协议上的无头版 Chrome 。也可以配置为使用完整(非无头)的 Chrome。Chrome 素来在浏览器界稳执牛耳,因此,Chrome Headless 必将成为 web 应用自动化测试的行业标杆。使用 Puppeteer,相当于同时具有 Linux 和 Chrome 双端的操作能力,应用场景可谓非常之多。此仓库的建立,即是尝试各种折腾使用 GoogleChrome Puppeteer;以期在好玩的同时,学到更多有意思的操作。

Pyppeteer是基于Puppeteer协议,用python实现的异步框架。本人在此基础上,做了简易封装。

核心代码:

ExtendPageOrFrame类定义了更方便的方法:

class ExtendPageOrFrame(pyppeteer.page.Page):
    def __init__(self, page_frame_obj):
        self.page = page_frame_obj
        self.all_pending_requests = []
        self.all_finished_requests = []

    def __getattr__(self, name):
        if name not in ['page', 'all_pending_requests', 'all_finished_requests',
                        'click', 'set_value', 'set_value',
                        'set_el_radio', 'get_el_radio', 'waitForAllXhrFinished']:
            return getattr(self.page, name)
        else:
            return getattr(self, name)

    async def click(self, selector_or_xpath, idx=0, wait_after=1):
        if selector_or_xpath is None:
            return
        elif selector_or_xpath.startswith('/'):
            elem = await self.page.xpath(selector_or_xpath)
            await elem[idx].click()
        else:
            await self.page.click(selector_or_xpath)
        await self.page.waitFor(wait_after * 1000)

    async def get_value(self, selector_or_xpath, idx=0):
        if selector_or_xpath is None:
            return
        else:
            if selector_or_xpath.startswith('/'):
                elements = await self.page.xpath(selector_or_xpath)
                element = elements[idx]
            else:
                element = await self.page.querySelector(selector_or_xpath)

            tag_name = await self.page.evaluate('(element) => element.tagName', element)
            # print(tag_name)
            if tag_name == 'INPUT':
                ele_type = await self.page.evaluate('(element) => element.getAttribute("type")', element)
                if ele_type == 'checkbox':
                    # print(ele_type)
                    value = await self.page.evaluate('(element) => element.checked', element)
                else:
                    value = await self.page.evaluate('(element) => element.value', element)
            elif tag_name == 'SELECT':
                value = await self.page.evaluate('''(element) => {
                var index=element.selectedIndex;
                return element.options[index].text;
                }''', element)
            else:
                value = await self.page.evaluate('(element) => element.textContent', element)
            return value

    async def set_value(self, selector_or_xpath, value, idx=0):
        if selector_or_xpath is None:
            return
        else:
            if selector_or_xpath.startswith('/'):
                elements = await self.page.xpath(selector_or_xpath)
                element = elements[idx]
            else:
                element = await self.page.querySelector(selector_or_xpath)

            tag_name = await self.page.evaluate('(element) => element.tagName', element)

            if tag_name == 'INPUT':
                ele_type = await self.page.evaluate('(element) => element.getAttribute("type")', element)
                if ele_type == 'checkbox':
                    if value is True:
                        await self.page.evaluate('(element) => {element.checked=true;}', element)
                    else:
                        await self.page.evaluate('(element) => {element.checked=false;}', element)
                else:
                    await element.type(value)
            elif tag_name == 'SELECT':
                await self.page.evaluate('''(element, value) => {
                    for(var i=0; i<element.options.length; i++){  
                            if(element.options[i].innerHTML == value){  
                                element.options[i].selected = true;  
                                break;  
                            }  
                        }
                    }''', element, value)
            else:
                pass

    async def set_el_radio(self, text, idx=0, wait_after=1):
        radios = await self.page.xpath(
            '//label[contains(@class,"el-radio")]/span[@class="el-radio__label" and contains(text(), "{}")]'.format(
                text))
        await radios[idx].click()
        await self.page.waitFor(wait_after * 1000)

    async def get_el_radio(self, idx=0):
        value = await self.get_value(
            '//label[contains(@class,"el-radio") and contains(@class,"is-checked")]/span[@class="el-radio__label"]',
            idx)
        return value.strip()

    async def wait_for_all_xhr_finished(self):
        def request_handler(r):
            if r.resourceType == 'xhr':
                self.all_pending_requests.append(r)

        def response_handler(fr):
            if fr.resourceType == 'xhr':
                self.all_finished_requests.append(fr)

        self.page.on('request', request_handler)
        self.page.on('requestfinished', response_handler)

        last_several_cnt = []
        loop_times = 0
        max_tries = 6

        while 1:
            loop_times += 1
            if len(last_several_cnt) == max_tries:
                last_several_cnt.pop(0)
            cur_num = len(self.all_pending_requests)
            last_several_cnt.append(cur_num)
            if len(set(last_several_cnt)) == 1 and loop_times >= max_tries:
                break
            await asyncio.sleep(0.1)
        # print('last 10 requests number:', last_several_cnt)
        while 1:
            if len(self.all_pending_requests) == len(self.all_finished_requests):
                break
            else:
                await asyncio.sleep(0.1)
        self.all_pending_requests = []
        self.all_finished_requests = []
        self.remove_listener('request', request_handler)
        self.remove_listener('requestfinished', response_handler)

BaseWebPage类启动浏览器、初始化

class BaseWebPage(object):
    def __init__(self, start_url, headless=False):
        self.url = start_url
        self.headless = headless

    async def navigate(self):
        browser = await pyppeteer.launch(headless=self.headless, args=['--start-maximized'])
        page = (await browser.pages())[0]

        page = ExtendPageOrFrame(page)
        current_screen = await page.evaluate('''() => {
        return {
            width: window.screen.availWidth,
            height: window.screen.availHeight,
            };
        }''')
        await page.setViewport(current_screen)

        await page.goto(self.url)
        await page.wait_for_all_xhr_finished()
        return page, browser

测试代码(使用上述自定义类):

async def test_async():
    page, browser = await BaseWebPage('http://ip:port/static/html/login.html').navigate()

    await page.set_value('input[name=username]', '')
    print(await page.get_value('input[name=username]'))
    await page.type('input[name=password]', '')
    await page.click('button[type=submit]', wait_after=3)

    # await page.waitForNavigation({'waitUntil': 'networkidle2'})

    await page.click('//span[text()=\'策略中心\']')
    await page.click('//a[text()=\'优先拣选策略\']', wait_after=3)

    iframe = await page.xpath('//iframe[@class="iframe"]')
    frame = await iframe[0].contentFrame()

    frame = ExtendPageOrFrame(frame)

    await frame.click('//table[@class="el-table__body"]/tbody/tr[1]/td[12]/div/button/span[contains(text(),"编辑")]')

    await frame.set_el_radio('每天')

    print(await frame.get_el_radio())

    await asyncio.sleep(2)

    await browser.close()


def test():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(test_async())


if __name__ == '__main__':
    test()

相关文章

网友评论

      本文标题:基于pyppeteer实现迷你Web测试框架

      本文链接:https://www.haomeiwen.com/subject/nixsqctx.html