美文网首页python随记
chrome cdp(chrome devtools proto

chrome cdp(chrome devtools proto

作者: LCSan | 来源:发表于2020-05-05 13:01 被阅读0次
#encoding=utf-8
'''
Created on 2019年10月24日

@author: 瞌睡蟲子
'''
import pychrome
import errno
import os
import winreg
import win32api
import re
import base64
from time import sleep
import time
import queue

BROWSER = None


def query_reg(hkey, regPath, arch_key):
    path = None
    try:
        key = winreg.OpenKey(hkey, regPath, 0, winreg.KEY_READ | arch_key)
        path = winreg.QueryValueEx(key, '')
        winreg.CloseKey(key)
        return path[0]
    except OSError as e:
        if e.errno == errno.ENOENT:
            # DisplayName doesn't exist in this skey
            pass
    return path


def check_arch_keys():
    proc_arch = os.environ['PROCESSOR_ARCHITECTURE'].lower()
    proc_arch64 = os.environ['PROCESSOR_ARCHITEW6432'].lower()

    if proc_arch == 'x86' and not proc_arch64:
        arch_keys = {0}
    elif proc_arch == 'x86' or proc_arch == 'amd64':
        arch_keys = {winreg.KEY_WOW64_32KEY, winreg.KEY_WOW64_64KEY}
    else:
        raise Exception("Unhandled arch: %s" % proc_arch)
    return arch_keys


def get_browser_path():
    browsers = {
        "chrome.exe",
        "360chrome.exe",
        "msedge.exe",
        "2345Explorer.exe",
    }
    arch_keys = check_arch_keys()
    path = None
    for browser in browsers:
        regPath = "Software\\Microsoft\\Windows\\CurrentVersion\\App Paths\\" + browser
        for arch_key in arch_keys:
            path = query_reg(winreg.HKEY_LOCAL_MACHINE, regPath, arch_key)
            if path:
                break
            path = query_reg(winreg.HKEY_CURRENT_USER, regPath, arch_key)
            if path:
                break
        if path:
            break
    return path


def openBrowser(path=None, args={}):
    if not path:
        path = get_browser_path()
    if path:
        if "--remote-debugging-port" not in args.keys():
            args["--remote-debugging-port"] = 9222
        param = [k + ("=" + str(args[k]) if args[k] else "") for k in args]
        apps = re.findall("([^\\\\/]+)$", path)
        os.system("taskkill /IM " + apps[0] + " /F")
        sleep(0.5)
        win32api.ShellExecute(0, 'open', path, " ".join(param), '', 1)
        sleep(0.5)
        return path
    else:
        return False


def bindBrowser(url="http://127.0.0.1:9222"):
    global BROWSER
    try:
        BROWSER = pychrome.Browser(url=url)
    except Exception:
        BROWSER = None


def easyBrowser():
    global BROWSER
    while not BROWSER:
        bindBrowser()
        if not BROWSER:
            openBrowser()
            sleep(2)


def start(tab):
    if tab.status != "started":
        tab.start()
        tab.DOM.enable()
        tab.Runtime.enable()
        tab.Network.enable()
        tab.Page.enable()
        tab.IndexedDB.enable()
    return True


def stop(tab):
    return tab.stop()


def wait(tab, timeout=None):
    return tab.wait(timeout)


def waitMessage(tab, timeout=1, waitflag=True):
    start(tab)
    return tab.getMessage(timeout, waitflag)


def clearMessage(tab):
    start(tab)
    tab.clearMessage()


def getTabs():
    global BROWSER
    return BROWSER.list_tab()


def createTab(uri="chrome://newtab/"):
    global BROWSER
    return BROWSER.new_tab(url=uri)


def closeTab(tab):
    global BROWSER
    tab.stop()
    return BROWSER.close_tab(tab)


def findTab(url=None, title=None):
    tabs = getTabs()
    tab = tabs[0]
    start(tab)
    tables = tab.Target.getTargets()
    tables = tables["targetInfos"]
    for table in tables:
        for tb in tabs:
            if table["targetId"] == tb.id:
                table["tab"] = tb
    tab1 = list(filter(lambda x: url and url in x["url"], tables))
    tab2 = list(filter(lambda x: title and title in x["title"], tables))
    if len(tab1) > 0 and len(tab2) > 0:
        re = []
        for tb1 in tab1:
            for tb2 in tab2:
                if tb1["targetId"] == tb2["targetId"]:
                    re.append(tb1["tab"])
        return re
    elif len(tab1) > 0:
        re = []
        for tb1 in tab1:
            re.append(tb1["tab"])
        return re
    else:
        re = []
        for tb1 in tab2:
            re.append(tb1["tab"])
        return re


def activeTab(tab):
    global BROWSER
    return BROWSER.activate_tab(tab)


def getTabInfo(tab):
    start(tab)
    return tab.Target.getTargetInfo(targetId=tab.id)


def goUrl(tab, uri):
    start(tab)
    return tab.Page.navigate(url=uri)


def reloadPage(tab, cache=False):
    start(tab)
    return tab.Page.reload(ignoreCache=cache)


def querySelector(tab, selector, nodeid=None):
    start(tab)
    if not nodeid:
        nodeid = tab.DOM.getDocument()
        nodeid = nodeid["root"]["nodeId"]
    res = tab.DOM.querySelector(nodeId=nodeid, selector=selector)
    return res["nodeId"] if res["nodeId"] > 0 else None


def querySelectorAll(tab, selector, nodeid=None):
    start(tab)
    if not nodeid:
        nodeid = tab.DOM.getDocument()
        nodeid = nodeid["root"]["nodeId"]
    res = tab.DOM.querySelectorAll(nodeId=nodeid, selector=selector)
    return res["nodeIds"]


def setAttribute(tab, nodeid, name, value):
    start(tab)
    return tab.DOM.setAttributeValue(nodeId=nodeid, name=name, value=value)


def getAttributes(tab, nodeid):
    start(tab)
    attr = tab.DOM.getAttributes(nodeId=nodeid)
    attr = attr["attributes"]
    res = {}
    for i in range(0, len(attr), 2):
        res[attr[i]] = attr[i + 1]
    return res


def getHTML(tab, nodeid):
    start(tab)
    html = tab.DOM.getOuterHTML(nodeId=nodeid)
    return html["outerHTML"]


def runJS(tab, expression):
    start(tab)
    res = tab.Runtime.evaluate(expression=expression, returnByValue=True)
    if "exceptionDetails" in res:
        raise Exception(res["exceptionDetails"]["exception"]["description"])
    elif "value" in res:
        res = res["value"]
    return res


def captureScreenshot(tab, path, clip=None):
    start(tab)
    if clip:
        res = tab.Page.captureScreenshot(clip=clip)
    else:
        res = tab.Page.captureScreenshot()
    if res:
        saveBase64File(path, res["data"])


def closeBrowser(tab):
    start(tab)
    return tab.Browser.close()


def clearBrowserCache(tab):
    start(tab)
    return tab.Network.clearBrowserCache()


def clearBrowserCookies(tab):
    start(tab)
    return tab.Network.clearBrowserCookies()


def getResponseBody(tab, requestId):
    start(tab)
    return tab.Network.getResponseBody(requestId=requestId)


def getRequestPostData(tab, requestId):
    start(tab)
    return tab.Network.getRequestPostData(requestId=requestId)


def getCookies(tab):
    start(tab)
    cookies = tab.Page.getCookies()
    return cookies["cookies"]


def setCookies(tab, cookies):
    start(tab)
    return tab.Network.setCookies(cookies=cookies)


def changeUserAgent(tab, userAgent):
    start(tab)
    return tab.Network.setUserAgentOverride(userAgent=userAgent)


def saveBase64File(path, base64code):
    with open(path, 'wb') as f:
        f.write(base64.b64decode(base64code))
    return True


def fetchUrl(tab, reg, timeout=2):
    start_time = time.time()
    name_table = {
        "Network.requestWillBeSent": "general",
        "Network.requestWillBeSentExtraInfo": "request_headers",
        "Network.responseReceivedExtraInfo": "response_headers",
        "Network.loadingFinished": "successed",
        "Network.loadingFailed": "failed"
    }
    res = {}
    while True:
        now = time.time()
        if (now - start_time) >= timeout:
            break
        message = tab.getMessage(1, False)
        if message is not None and message["method"] in name_table.keys():
            start_time = time.time()
            requestId = message["params"]["requestId"]
            if requestId not in res:
                res[requestId] = {}
            if message["method"] == "Network.requestWillBeSent":
                message["params"]["initiator"] = None
            res[requestId][name_table[message["method"]]] = message["params"]
    re = []
    for k, v in res.items():
        if "general" in v and reg in v["general"]["request"]["url"] and "successed" in v:
            v["response_body"] = tab.Network.getResponseBody(requestId=k)
            if v["general"]["request"]["method"] == "POST":
                v["request_post_data"] = tab.Network.getRequestPostData(
                    requestId=k)
            re.append(v)
    return re


def fetchBrowserFileByUrl(tab, reg, path, index=0, timeout=2):
    data = fetchUrl(tab, reg, timeout)
    print(data)
    if index < len(data) and "response_body" in data[index] and data[index][
            "response_body"]["base64Encoded"]:
        return saveBase64File(path, data[index]["response_body"]["body"])
    return False


def getIndexedDbData(tab,
                     securityOrigin,
                     databaseName,
                     objectStoreName,
                     indexName="",
                     skipCount=0,
                     pageSize=100):
    start(tab)
    return tab.IndexedDB.requestData(securityOrigin=securityOrigin,
                                     databaseName=databaseName,
                                     objectStoreName=objectStoreName,
                                     indexName=indexName,
                                     skipCount=skipCount,
                                     pageSize=pageSize)


def importWebScraperSiteMap(tab, siteMapId, sitemapJSON):
    goUrl(
        tab,
        "chrome-extension://jnhgnonknehpejjnehehllkliplmbmhn/devtools_web_scraper_panel.html#/import-sitemap"
    )
    # waitEvent()
    sleep(1)
    runJS(
        tab, r"document.querySelector('textarea#sitemapJSON').value='" +
        str(sitemapJSON) + "';document.querySelector('input#_id').value='" +
        str(siteMapId) +
        "';document.querySelector('button#submit-import-sitemap').click()")


def startWebScraping(tab, siteMapId, reqDelay=2000, loadDelay=2000):
    goUrl(
        tab,
        "chrome-extension://jnhgnonknehpejjnehehllkliplmbmhn/devtools_web_scraper_panel.html#/sitemap/scrape?sitemapId="
        + siteMapId)
    # waitEvent()
    sleep(1)
    runJS(
        tab,
        "document.querySelector('input[name=\\'requestInterval\\']').value=" +
        str(reqDelay) +
        ";document.querySelector('input[name=\\'requestInterval\\']').value=" +
        str(loadDelay) +
        ";document.querySelector('button#submit-scrape-sitemap').click()")


def getWebScraperSiteMap(tab, siteMapId):
    goUrl(
        tab,
        "chrome-extension://jnhgnonknehpejjnehehllkliplmbmhn/devtools_web_scraper_panel.html#/sitemap/export?sitemapId="
        + siteMapId)
    # waitEvent()
    sleep(1)
    return runJS("document.querySelector('textarea').value")


def getWebScraperData(tab,
                      siteMapId,
                      currPage=0,
                      pageSize=100,
                      isList=True,
                      isfilter=True):
    goUrl(
        tab,
        "chrome-extension://jnhgnonknehpejjnehehllkliplmbmhn/devtools_web_scraper_panel.html#/"
    )
    # waitEvent()
    sleep(1)
    data = getIndexedDbData(
        tab, "chrome-extension://jnhgnonknehpejjnehehllkliplmbmhn",
        "_pouch_sitemap-data-" + siteMapId, "by-sequence", "", currPage,
        pageSize)
    fts = ["web-scraper-start-url", "web-scraper-order", "next", "next-href"]
    res = []
    if data:
        for value in data["objectStoreDataEntries"]:
            temp = {}
            res.append(temp)
            for val in value["value"]["preview"]["properties"]:
                name = val["name"]
                re = val["value"]
                if (isfilter or isList) and name in fts:
                    pass
                else:
                    temp[name] = re
    if isList:
        return [list(re.values()) for re in res]
    return res


def callChromeDevFun(tab, funName, args=None):
    start(tab)
    return tab.call_method(funName, **args)


if __name__ == '__main__':
    # openBrowser()
    easyBrowser()

    tab=createTab("https://www.baidu.com/")
    data = fetchUrl(tab,"baidu_resultlogo")
    print(data)
    # while True:
    #     message = waitMessage(tab)
    #     print(message)
    # fetchBrowserFileByUrl(tab, "baidu", "C:\\Users\\Administrator\\Desktop\\test.png",0,5)
    # tabs = findTab("http://eservice.ciitc.com.cn/ePolicy/download", None)
    # tab = tabs[0]
    # print(tab)
    # clearMessage(tab)
    # runJS(tab, '''document.querySelector("#code_img").click()''')
    # fetchBrowserFileByUrl(tab, "verification", "d:\\aaa1.jpg")
    # data = fetchUrl(tab,"verification",5)
    # print(data)
    # while True:
    #     message = waitMessage(tab)
    #     print(message)
    # tabs = getTabs()
    # tab = tabs[0]
    # addListener(tab,["Network.requestWillBeSent","Network.responseReceived","Network.loadingFinished"])
    # goUrl(tab,"http://eservice.ciitc.com.cn/ePolicy/download")
    # data = fetchUrl(tab, "verification",2)
    # data = getCookies(tab)
    # print(data)
    # data = getTabInfo(tab)
    # print(data)
    # wait(tab,3)
    # while True:
    #     data = getMessage(tab)
    #     print(data)
    # waitEvent("Page.frameStoppedLoading", 60)
    # wait(tab,3)
    # clearMessage(tab)
    # runJS(tab,'''document.querySelector("#code_img").click()''')
    # fetchBrowserFileByUrl(tab, "http://eservice.ciitc.com.cn/ePolicy/verification.do", "d:\\aaa.jpg", 1)
#     goUrl("https://forum.uibot.com.cn/")
#     sleep(2)
#     data=callChromeDevFun("Target.createTarget", {"url":"https://forum.uibot.com.cn/"})
#     sleep(2)
#     data=callChromeDevFun("Target.getTargets")
#     print(data)
#     sleep(2)
#     data=callChromeDevFun("TTarget.closeTarget",{"targetId":"C3F9BAC8BE7BF4C58CE1274E1101145A"})
#     (targetId=tabid)
#     sleep(2)
#     reloadPage()
#     goUrl("https://forum.uibot.com.cn/")
#     while True:
#         message = waitMessage()
#         print(message)
# body=fetchUrl("user-ffp!loginNew.shtml")
# print(body)
#     fetchBrowserFileByUrl("99409_268-403.jpg", r"d:\aaa1.jpg")
# BROWSER.IndexedDB.enable()
# dbs=BROWSER.IndexedDB.requestDatabaseNames(securityOrigin="chrome-extension://jnhgnonknehpejjnehehllkliplmbmhn")
# print(dbs)
# db=BROWSER.IndexedDB.requestDatabase(securityOrigin="chrome-extension://jnhgnonknehpejjnehehllkliplmbmhn",databaseName="_pouch_sitemap-data-uibot")
# print(db)
#     aa=r'{"_id":"uibot","startUrl":["https://forum.uibot.com.cn/"],"selectors":[{"id":"next","type":"SelectorLink","parentSelectors":["_root","next"],"selector":"li:nth-of-type(n+2) a.page-link","multiple":true,"delay":0},{"id":"ems","type":"SelectorElement","parentSelectors":["_root","next"],"selector":"li.media","multiple":true,"delay":0},{"id":"title","type":"SelectorText","parentSelectors":["ems"],"selector":".subject a","multiple":false,"regex":"","delay":0},{"id":"author","type":"SelectorText","parentSelectors":["ems"],"selector":"div > span.username","multiple":false,"regex":"","delay":0},{"id":"time","type":"SelectorText","parentSelectors":["ems"],"selector":"span.date","multiple":false,"regex":"","delay":0}]}'

#     importWebScraperSiteMap("uibot1",aa)
#     print(data)

# goUrl("https://forum.uibot.com.cn/")
# waitEvent()
# data = BROWSER.Page.getCookies()
# print(data)
# data = BROWSER.Page.printToPDF()
# print(data)
# saveBase64File("d:\\aaa.pdf",data)
# nodeid=querySelector("#sitemapJSON")
# print(nodeid)
# print(getAttributes(nodeid))
# setAttribute(nodeid, "value", '''{"_id":"uibot","startUrl":["https://forum.uibot.com.cn/"],"selectors":[{"id":"next","type":"SelectorLink","parentSelectors":["_root","next"],"selector":"li:nth-of-type(n+2) a.page-link","multiple":true,"delay":0},{"id":"ems","type":"SelectorElement","parentSelectors":["_root","next"],"selector":"li.media","multiple":true,"delay":0},{"id":"title","type":"SelectorText","parentSelectors":["ems"],"selector":".subject a","multiple":false,"regex":"","delay":0},{"id":"author","type":"SelectorText","parentSelectors":["ems"],"selector":"div > span.username","multiple":false,"regex":"","delay":0},{"id":"time","type":"SelectorText","parentSelectors":["ems"],"selector":"span.date","multiple":false,"regex":"","delay":0}]}''')
# nodeid=querySelector("#_id")
# print(nodeid)
# setAttribute(nodeid, "value", "test")
# data=runJS('var a = document.querySelector("#_id");a.value="111";a.value')
# print(data)
# data=BROWSER.Target.getTargets()
# print(data)
# goUrl("https://www.baidu.com/")
# fetchBrowserFileByUrl("logo_top_86d58ae1.png","d:\\aaa.png")
# captureScreenshot("d:\\aa.png",{"x":0,"y":0,"width":1080,"height":100000})
# searchId=BROWSER.DOM.performSearch(query="li",includeUserAgentShadowDOM=False)
# print(searchId)
# searchId=BROWSER.DOM.getSearchResults(searchId=searchId["searchId"],fromIndex=0,toIndex=searchId["resultCount"])
# print(searchId)
# searchId=BROWSER.DOM.pushNodeByPathToFrontend(path="//li/text()")
# print(searchId)
# print(get_tabs())
# go_url("https://forum.uibot.com.cn/")
# wait_event()
# bind_browser()
# go_url("https://daypc.vzsite.top/login.php")
# nodeid=querySelector("li.top_3")
# domid=runJS('''(function(){return "111"})()''')
# print(domid)
# domid=runJS('''111''')
# print(type(domid))
# print(domid)
# domid=runJS('''true''')
# print(type(domid))
# print(domid)
# domid=runJS('''"true"''')
# print(type(domid))
# print(domid)
# domid=runJS('''["1"]''')
# print(type(domid))
# print(domid)
# domid=runJS('''"{'a':1}"''')
# print(type(domid))
# print(domid)
# domid=BROWSER.DOM.getDocument()
# print(domid)
# domid=domid["root"]["nodeId"]
# print(domid)
# nodeid=BROWSER.DOM.querySelectorAll(nodeId=domid,selector="a")
# print(nodeid)
# nodeid=nodeid["nodeIds"]
# print(nodeid)
# print(nodeid)
# html=getHTML(nodeid)
# print(html)
# html=getAttributes(nodeid)
# print(html)
# BROWSER.DOM.setAttributeValue(nodeId=nodeid,name="value",value="111111")
# nodeid=BROWSER.DOM.querySelector(nodeId=domid,selector="#login_box > div:nth-child(3) > div > input")
# nodeid=BROWSER.DOM.querySelector(nodeId=domid,selector="#login_box > div:nth-child(4) > div > input")
#
# messages = filter_event_onloaded("Page.frameStoppedLoading")
#
#
# print(messages)

# print(get_tab_info("5ADC94CBC4D7FC796E9C8E806FCC5E2B"))
# active_tab("7B6F61BA4B5FBC3DDF07A6D518130A91")
# close_tab("7B6F61BA4B5FBC3DDF07A6D518130A91")
# create_tab()
# create_tab("https://www.baidu.com")
# create_tab()
# go_url("https://forum.uibot.com.cn/")
# get_all_send("https://www.baidu.com")
# print(get_tabs())
# print(find_tab(title="新标签页",url="newtab"))
# close_browser()

相关文章

网友评论

    本文标题:chrome cdp(chrome devtools proto

    本文链接:https://www.haomeiwen.com/subject/tokmghtx.html