#encoding=utf-8
'''
Created on 2019年10月24日
@author: 瞌睡蟲子
'''
import pychrome
import errno
import os
import winreg
import win32api
import re
import base64
from time import sleep
import time
import queue
BROWSER = None
def query_reg(hkey, regPath, arch_key):
path = None
try:
key = winreg.OpenKey(hkey, regPath, 0, winreg.KEY_READ | arch_key)
path = winreg.QueryValueEx(key, '')
winreg.CloseKey(key)
return path[0]
except OSError as e:
if e.errno == errno.ENOENT:
# DisplayName doesn't exist in this skey
pass
return path
def check_arch_keys():
proc_arch = os.environ['PROCESSOR_ARCHITECTURE'].lower()
proc_arch64 = os.environ['PROCESSOR_ARCHITEW6432'].lower()
if proc_arch == 'x86' and not proc_arch64:
arch_keys = {0}
elif proc_arch == 'x86' or proc_arch == 'amd64':
arch_keys = {winreg.KEY_WOW64_32KEY, winreg.KEY_WOW64_64KEY}
else:
raise Exception("Unhandled arch: %s" % proc_arch)
return arch_keys
def get_browser_path():
browsers = {
"chrome.exe",
"360chrome.exe",
"msedge.exe",
"2345Explorer.exe",
}
arch_keys = check_arch_keys()
path = None
for browser in browsers:
regPath = "Software\\Microsoft\\Windows\\CurrentVersion\\App Paths\\" + browser
for arch_key in arch_keys:
path = query_reg(winreg.HKEY_LOCAL_MACHINE, regPath, arch_key)
if path:
break
path = query_reg(winreg.HKEY_CURRENT_USER, regPath, arch_key)
if path:
break
if path:
break
return path
def openBrowser(path=None, args={}):
if not path:
path = get_browser_path()
if path:
if "--remote-debugging-port" not in args.keys():
args["--remote-debugging-port"] = 9222
param = [k + ("=" + str(args[k]) if args[k] else "") for k in args]
apps = re.findall("([^\\\\/]+)$", path)
os.system("taskkill /IM " + apps[0] + " /F")
sleep(0.5)
win32api.ShellExecute(0, 'open', path, " ".join(param), '', 1)
sleep(0.5)
return path
else:
return False
def bindBrowser(url="http://127.0.0.1:9222"):
global BROWSER
try:
BROWSER = pychrome.Browser(url=url)
except Exception:
BROWSER = None
def easyBrowser():
global BROWSER
while not BROWSER:
bindBrowser()
if not BROWSER:
openBrowser()
sleep(2)
def start(tab):
if tab.status != "started":
tab.start()
tab.DOM.enable()
tab.Runtime.enable()
tab.Network.enable()
tab.Page.enable()
tab.IndexedDB.enable()
return True
def stop(tab):
return tab.stop()
def wait(tab, timeout=None):
return tab.wait(timeout)
def waitMessage(tab, timeout=1, waitflag=True):
start(tab)
return tab.getMessage(timeout, waitflag)
def clearMessage(tab):
start(tab)
tab.clearMessage()
def getTabs():
global BROWSER
return BROWSER.list_tab()
def createTab(uri="chrome://newtab/"):
global BROWSER
return BROWSER.new_tab(url=uri)
def closeTab(tab):
global BROWSER
tab.stop()
return BROWSER.close_tab(tab)
def findTab(url=None, title=None):
tabs = getTabs()
tab = tabs[0]
start(tab)
tables = tab.Target.getTargets()
tables = tables["targetInfos"]
for table in tables:
for tb in tabs:
if table["targetId"] == tb.id:
table["tab"] = tb
tab1 = list(filter(lambda x: url and url in x["url"], tables))
tab2 = list(filter(lambda x: title and title in x["title"], tables))
if len(tab1) > 0 and len(tab2) > 0:
re = []
for tb1 in tab1:
for tb2 in tab2:
if tb1["targetId"] == tb2["targetId"]:
re.append(tb1["tab"])
return re
elif len(tab1) > 0:
re = []
for tb1 in tab1:
re.append(tb1["tab"])
return re
else:
re = []
for tb1 in tab2:
re.append(tb1["tab"])
return re
def activeTab(tab):
global BROWSER
return BROWSER.activate_tab(tab)
def getTabInfo(tab):
start(tab)
return tab.Target.getTargetInfo(targetId=tab.id)
def goUrl(tab, uri):
start(tab)
return tab.Page.navigate(url=uri)
def reloadPage(tab, cache=False):
start(tab)
return tab.Page.reload(ignoreCache=cache)
def querySelector(tab, selector, nodeid=None):
start(tab)
if not nodeid:
nodeid = tab.DOM.getDocument()
nodeid = nodeid["root"]["nodeId"]
res = tab.DOM.querySelector(nodeId=nodeid, selector=selector)
return res["nodeId"] if res["nodeId"] > 0 else None
def querySelectorAll(tab, selector, nodeid=None):
start(tab)
if not nodeid:
nodeid = tab.DOM.getDocument()
nodeid = nodeid["root"]["nodeId"]
res = tab.DOM.querySelectorAll(nodeId=nodeid, selector=selector)
return res["nodeIds"]
def setAttribute(tab, nodeid, name, value):
start(tab)
return tab.DOM.setAttributeValue(nodeId=nodeid, name=name, value=value)
def getAttributes(tab, nodeid):
start(tab)
attr = tab.DOM.getAttributes(nodeId=nodeid)
attr = attr["attributes"]
res = {}
for i in range(0, len(attr), 2):
res[attr[i]] = attr[i + 1]
return res
def getHTML(tab, nodeid):
start(tab)
html = tab.DOM.getOuterHTML(nodeId=nodeid)
return html["outerHTML"]
def runJS(tab, expression):
start(tab)
res = tab.Runtime.evaluate(expression=expression, returnByValue=True)
if "exceptionDetails" in res:
raise Exception(res["exceptionDetails"]["exception"]["description"])
elif "value" in res:
res = res["value"]
return res
def captureScreenshot(tab, path, clip=None):
start(tab)
if clip:
res = tab.Page.captureScreenshot(clip=clip)
else:
res = tab.Page.captureScreenshot()
if res:
saveBase64File(path, res["data"])
def closeBrowser(tab):
start(tab)
return tab.Browser.close()
def clearBrowserCache(tab):
start(tab)
return tab.Network.clearBrowserCache()
def clearBrowserCookies(tab):
start(tab)
return tab.Network.clearBrowserCookies()
def getResponseBody(tab, requestId):
start(tab)
return tab.Network.getResponseBody(requestId=requestId)
def getRequestPostData(tab, requestId):
start(tab)
return tab.Network.getRequestPostData(requestId=requestId)
def getCookies(tab):
start(tab)
cookies = tab.Page.getCookies()
return cookies["cookies"]
def setCookies(tab, cookies):
start(tab)
return tab.Network.setCookies(cookies=cookies)
def changeUserAgent(tab, userAgent):
start(tab)
return tab.Network.setUserAgentOverride(userAgent=userAgent)
def saveBase64File(path, base64code):
with open(path, 'wb') as f:
f.write(base64.b64decode(base64code))
return True
def fetchUrl(tab, reg, timeout=2):
start_time = time.time()
name_table = {
"Network.requestWillBeSent": "general",
"Network.requestWillBeSentExtraInfo": "request_headers",
"Network.responseReceivedExtraInfo": "response_headers",
"Network.loadingFinished": "successed",
"Network.loadingFailed": "failed"
}
res = {}
while True:
now = time.time()
if (now - start_time) >= timeout:
break
message = tab.getMessage(1, False)
if message is not None and message["method"] in name_table.keys():
start_time = time.time()
requestId = message["params"]["requestId"]
if requestId not in res:
res[requestId] = {}
if message["method"] == "Network.requestWillBeSent":
message["params"]["initiator"] = None
res[requestId][name_table[message["method"]]] = message["params"]
re = []
for k, v in res.items():
if "general" in v and reg in v["general"]["request"]["url"] and "successed" in v:
v["response_body"] = tab.Network.getResponseBody(requestId=k)
if v["general"]["request"]["method"] == "POST":
v["request_post_data"] = tab.Network.getRequestPostData(
requestId=k)
re.append(v)
return re
def fetchBrowserFileByUrl(tab, reg, path, index=0, timeout=2):
data = fetchUrl(tab, reg, timeout)
print(data)
if index < len(data) and "response_body" in data[index] and data[index][
"response_body"]["base64Encoded"]:
return saveBase64File(path, data[index]["response_body"]["body"])
return False
def getIndexedDbData(tab,
securityOrigin,
databaseName,
objectStoreName,
indexName="",
skipCount=0,
pageSize=100):
start(tab)
return tab.IndexedDB.requestData(securityOrigin=securityOrigin,
databaseName=databaseName,
objectStoreName=objectStoreName,
indexName=indexName,
skipCount=skipCount,
pageSize=pageSize)
def importWebScraperSiteMap(tab, siteMapId, sitemapJSON):
goUrl(
tab,
"chrome-extension://jnhgnonknehpejjnehehllkliplmbmhn/devtools_web_scraper_panel.html#/import-sitemap"
)
# waitEvent()
sleep(1)
runJS(
tab, r"document.querySelector('textarea#sitemapJSON').value='" +
str(sitemapJSON) + "';document.querySelector('input#_id').value='" +
str(siteMapId) +
"';document.querySelector('button#submit-import-sitemap').click()")
def startWebScraping(tab, siteMapId, reqDelay=2000, loadDelay=2000):
goUrl(
tab,
"chrome-extension://jnhgnonknehpejjnehehllkliplmbmhn/devtools_web_scraper_panel.html#/sitemap/scrape?sitemapId="
+ siteMapId)
# waitEvent()
sleep(1)
runJS(
tab,
"document.querySelector('input[name=\\'requestInterval\\']').value=" +
str(reqDelay) +
";document.querySelector('input[name=\\'requestInterval\\']').value=" +
str(loadDelay) +
";document.querySelector('button#submit-scrape-sitemap').click()")
def getWebScraperSiteMap(tab, siteMapId):
goUrl(
tab,
"chrome-extension://jnhgnonknehpejjnehehllkliplmbmhn/devtools_web_scraper_panel.html#/sitemap/export?sitemapId="
+ siteMapId)
# waitEvent()
sleep(1)
return runJS("document.querySelector('textarea').value")
def getWebScraperData(tab,
siteMapId,
currPage=0,
pageSize=100,
isList=True,
isfilter=True):
goUrl(
tab,
"chrome-extension://jnhgnonknehpejjnehehllkliplmbmhn/devtools_web_scraper_panel.html#/"
)
# waitEvent()
sleep(1)
data = getIndexedDbData(
tab, "chrome-extension://jnhgnonknehpejjnehehllkliplmbmhn",
"_pouch_sitemap-data-" + siteMapId, "by-sequence", "", currPage,
pageSize)
fts = ["web-scraper-start-url", "web-scraper-order", "next", "next-href"]
res = []
if data:
for value in data["objectStoreDataEntries"]:
temp = {}
res.append(temp)
for val in value["value"]["preview"]["properties"]:
name = val["name"]
re = val["value"]
if (isfilter or isList) and name in fts:
pass
else:
temp[name] = re
if isList:
return [list(re.values()) for re in res]
return res
def callChromeDevFun(tab, funName, args=None):
start(tab)
return tab.call_method(funName, **args)
if __name__ == '__main__':
# openBrowser()
easyBrowser()
tab=createTab("https://www.baidu.com/")
data = fetchUrl(tab,"baidu_resultlogo")
print(data)
# while True:
# message = waitMessage(tab)
# print(message)
# fetchBrowserFileByUrl(tab, "baidu", "C:\\Users\\Administrator\\Desktop\\test.png",0,5)
# tabs = findTab("http://eservice.ciitc.com.cn/ePolicy/download", None)
# tab = tabs[0]
# print(tab)
# clearMessage(tab)
# runJS(tab, '''document.querySelector("#code_img").click()''')
# fetchBrowserFileByUrl(tab, "verification", "d:\\aaa1.jpg")
# data = fetchUrl(tab,"verification",5)
# print(data)
# while True:
# message = waitMessage(tab)
# print(message)
# tabs = getTabs()
# tab = tabs[0]
# addListener(tab,["Network.requestWillBeSent","Network.responseReceived","Network.loadingFinished"])
# goUrl(tab,"http://eservice.ciitc.com.cn/ePolicy/download")
# data = fetchUrl(tab, "verification",2)
# data = getCookies(tab)
# print(data)
# data = getTabInfo(tab)
# print(data)
# wait(tab,3)
# while True:
# data = getMessage(tab)
# print(data)
# waitEvent("Page.frameStoppedLoading", 60)
# wait(tab,3)
# clearMessage(tab)
# runJS(tab,'''document.querySelector("#code_img").click()''')
# fetchBrowserFileByUrl(tab, "http://eservice.ciitc.com.cn/ePolicy/verification.do", "d:\\aaa.jpg", 1)
# goUrl("https://forum.uibot.com.cn/")
# sleep(2)
# data=callChromeDevFun("Target.createTarget", {"url":"https://forum.uibot.com.cn/"})
# sleep(2)
# data=callChromeDevFun("Target.getTargets")
# print(data)
# sleep(2)
# data=callChromeDevFun("TTarget.closeTarget",{"targetId":"C3F9BAC8BE7BF4C58CE1274E1101145A"})
# (targetId=tabid)
# sleep(2)
# reloadPage()
# goUrl("https://forum.uibot.com.cn/")
# while True:
# message = waitMessage()
# print(message)
# body=fetchUrl("user-ffp!loginNew.shtml")
# print(body)
# fetchBrowserFileByUrl("99409_268-403.jpg", r"d:\aaa1.jpg")
# BROWSER.IndexedDB.enable()
# dbs=BROWSER.IndexedDB.requestDatabaseNames(securityOrigin="chrome-extension://jnhgnonknehpejjnehehllkliplmbmhn")
# print(dbs)
# db=BROWSER.IndexedDB.requestDatabase(securityOrigin="chrome-extension://jnhgnonknehpejjnehehllkliplmbmhn",databaseName="_pouch_sitemap-data-uibot")
# print(db)
# aa=r'{"_id":"uibot","startUrl":["https://forum.uibot.com.cn/"],"selectors":[{"id":"next","type":"SelectorLink","parentSelectors":["_root","next"],"selector":"li:nth-of-type(n+2) a.page-link","multiple":true,"delay":0},{"id":"ems","type":"SelectorElement","parentSelectors":["_root","next"],"selector":"li.media","multiple":true,"delay":0},{"id":"title","type":"SelectorText","parentSelectors":["ems"],"selector":".subject a","multiple":false,"regex":"","delay":0},{"id":"author","type":"SelectorText","parentSelectors":["ems"],"selector":"div > span.username","multiple":false,"regex":"","delay":0},{"id":"time","type":"SelectorText","parentSelectors":["ems"],"selector":"span.date","multiple":false,"regex":"","delay":0}]}'
# importWebScraperSiteMap("uibot1",aa)
# print(data)
# goUrl("https://forum.uibot.com.cn/")
# waitEvent()
# data = BROWSER.Page.getCookies()
# print(data)
# data = BROWSER.Page.printToPDF()
# print(data)
# saveBase64File("d:\\aaa.pdf",data)
# nodeid=querySelector("#sitemapJSON")
# print(nodeid)
# print(getAttributes(nodeid))
# setAttribute(nodeid, "value", '''{"_id":"uibot","startUrl":["https://forum.uibot.com.cn/"],"selectors":[{"id":"next","type":"SelectorLink","parentSelectors":["_root","next"],"selector":"li:nth-of-type(n+2) a.page-link","multiple":true,"delay":0},{"id":"ems","type":"SelectorElement","parentSelectors":["_root","next"],"selector":"li.media","multiple":true,"delay":0},{"id":"title","type":"SelectorText","parentSelectors":["ems"],"selector":".subject a","multiple":false,"regex":"","delay":0},{"id":"author","type":"SelectorText","parentSelectors":["ems"],"selector":"div > span.username","multiple":false,"regex":"","delay":0},{"id":"time","type":"SelectorText","parentSelectors":["ems"],"selector":"span.date","multiple":false,"regex":"","delay":0}]}''')
# nodeid=querySelector("#_id")
# print(nodeid)
# setAttribute(nodeid, "value", "test")
# data=runJS('var a = document.querySelector("#_id");a.value="111";a.value')
# print(data)
# data=BROWSER.Target.getTargets()
# print(data)
# goUrl("https://www.baidu.com/")
# fetchBrowserFileByUrl("logo_top_86d58ae1.png","d:\\aaa.png")
# captureScreenshot("d:\\aa.png",{"x":0,"y":0,"width":1080,"height":100000})
# searchId=BROWSER.DOM.performSearch(query="li",includeUserAgentShadowDOM=False)
# print(searchId)
# searchId=BROWSER.DOM.getSearchResults(searchId=searchId["searchId"],fromIndex=0,toIndex=searchId["resultCount"])
# print(searchId)
# searchId=BROWSER.DOM.pushNodeByPathToFrontend(path="//li/text()")
# print(searchId)
# print(get_tabs())
# go_url("https://forum.uibot.com.cn/")
# wait_event()
# bind_browser()
# go_url("https://daypc.vzsite.top/login.php")
# nodeid=querySelector("li.top_3")
# domid=runJS('''(function(){return "111"})()''')
# print(domid)
# domid=runJS('''111''')
# print(type(domid))
# print(domid)
# domid=runJS('''true''')
# print(type(domid))
# print(domid)
# domid=runJS('''"true"''')
# print(type(domid))
# print(domid)
# domid=runJS('''["1"]''')
# print(type(domid))
# print(domid)
# domid=runJS('''"{'a':1}"''')
# print(type(domid))
# print(domid)
# domid=BROWSER.DOM.getDocument()
# print(domid)
# domid=domid["root"]["nodeId"]
# print(domid)
# nodeid=BROWSER.DOM.querySelectorAll(nodeId=domid,selector="a")
# print(nodeid)
# nodeid=nodeid["nodeIds"]
# print(nodeid)
# print(nodeid)
# html=getHTML(nodeid)
# print(html)
# html=getAttributes(nodeid)
# print(html)
# BROWSER.DOM.setAttributeValue(nodeId=nodeid,name="value",value="111111")
# nodeid=BROWSER.DOM.querySelector(nodeId=domid,selector="#login_box > div:nth-child(3) > div > input")
# nodeid=BROWSER.DOM.querySelector(nodeId=domid,selector="#login_box > div:nth-child(4) > div > input")
#
# messages = filter_event_onloaded("Page.frameStoppedLoading")
#
#
# print(messages)
# print(get_tab_info("5ADC94CBC4D7FC796E9C8E806FCC5E2B"))
# active_tab("7B6F61BA4B5FBC3DDF07A6D518130A91")
# close_tab("7B6F61BA4B5FBC3DDF07A6D518130A91")
# create_tab()
# create_tab("https://www.baidu.com")
# create_tab()
# go_url("https://forum.uibot.com.cn/")
# get_all_send("https://www.baidu.com")
# print(get_tabs())
# print(find_tab(title="新标签页",url="newtab"))
# close_browser()
网友评论