- 连接时被连接的webdrive 已打开的第一个窗口不能关闭(懒得测试原因)
Input 参数:
command_executor = browser.command_executor._url
session_id = browser.session_id
o_capabilities = browser.desired_capabilities
import time
from selenium.webdriver import Remote
from selenium.webdriver.chrome import options
from selenium.common.exceptions import InvalidArgumentException
def verifiedConnect(func):
def inner(*args, **kwargs):
self = func(*args, **kwargs)
try:
current_window_handle = self.current_window_handle
print("[%s]current_window_handle:"%time.strftime("%x %X"), current_window_handle)
except Exception as e:
self.connect_failed = True
print("[%s]webdrive connect failed. [%s]"%(time.strftime("%x %X"), e.__str__().strip()))
# raise
return self
return inner
class ReuseWebdriver(Remote):
def __init__(self, command_executor, session_id, o_capabilities=None):
self.r_session_id = session_id
self.o_capabilities = o_capabilities
self.connect_failed = False
Remote.__init__(self, command_executor=command_executor, desired_capabilities={})
@verifiedConnect
def start_session(self, capabilities, browser_profile=None):
"""
重写start_session方法
"""
if not isinstance(capabilities, dict):
raise InvalidArgumentException("Capabilities must be a dictionary")
if browser_profile:
if "moz:firefoxOptions" in capabilities:
capabilities["moz:firefoxOptions"]["profile"] = browser_profile.encoded
else:
capabilities.update({'firefox_profile': browser_profile.encoded})
self.capabilities = self.o_capabilities or options.Options().to_capabilities()
self.session_id = self.r_session_id
# if capabilities is none we are probably speaking to
# a W3C endpoint
# Double check to see if we have a W3C Compliant browser
self.w3c = True
self.command_executor.w3c = self.w3c
return self
brower = ReuseWebdriver("http://127.0.0.1:64122", "7732c8d6-2a8c-48c5-afdb-4cc9ff1501aa")
print(brower.execute_script('return sign(arguments[0],arguments[1],arguments[2])', 1, 2,3))
Demo
import time
import os
import json
from selenium import webdriver
from selenium.webdriver import Remote
from selenium.webdriver.chrome import options as chromeOptions
from selenium.webdriver.firefox import options as fireFoxOptions
from selenium.common.exceptions import InvalidArgumentException
firefox_driver_path = r"D:\chromedriver_win32\fireFox/geckodriver.exe"
base_dir = os.path.dirname(os.path.abspath(__file__))
def verifiedConnect(func):
def inner(*args, **kwargs):
self = func(*args, **kwargs)
try:
current_window_handle = self.current_window_handle
print("[%s]current_window_handle:"%time.strftime("%x %X"), current_window_handle)
except Exception as e:
self.connect_failed = True
print("[%s]webdrive connect failed. [%s]"%(time.strftime("%x %X"), e.__str__().strip()))
# raise
return self
return inner
class ReuseWebdriver(Remote):
def __init__(self, command_executor, session_id, o_capabilities=None):
self.r_session_id = session_id
self.o_capabilities = o_capabilities
self.connect_failed = False
Remote.__init__(self, command_executor=command_executor, desired_capabilities={})
@verifiedConnect
def start_session(self, capabilities, browser_profile=None):
"""
重写start_session方法
"""
if not isinstance(capabilities, dict):
raise InvalidArgumentException("Capabilities must be a dictionary")
if browser_profile:
if "moz:firefoxOptions" in capabilities:
capabilities["moz:firefoxOptions"]["profile"] = browser_profile.encoded
else:
capabilities.update({'firefox_profile': browser_profile.encoded})
self.capabilities = self.o_capabilities or fireFoxOptions.Options().to_capabilities()
self.session_id = self.r_session_id
# if capabilities is none we are probably speaking to
# a W3C endpoint
# Double check to see if we have a W3C Compliant browser
self.w3c = True
self.command_executor.w3c = self.w3c
return self
class Browser:
encoding = "utf-8"
fireFox_wsPoint = "fireFox_wsPoint"
fireFox_capabilities = "fireFox_capabilities"
"""可以打开一个chrome,获取ws信息,然后后续的操作都在这一个浏览器上进行"""
chrome_webdriver_js = '''() =>
Object.defineProperties(navigator,{
webdriver:{
get: () => false
}
})
'''
def create_fireFox_ws(self):
options = fireFoxOptions.Options()
options.headless = False # True 无界
browser = webdriver.Firefox(executable_path=firefox_driver_path, options=options)
# 存储web driver连接信息
with open(os.path.join(base_dir, self.fireFox_wsPoint), "w", encoding=self.encoding) as f:
meta = {"command_executor":browser.command_executor._url,"session_id":browser.session_id}
f.write(json.dumps(meta, ensure_ascii=False))
with open(os.path.join(base_dir, self.fireFox_capabilities), "w", encoding=self.encoding) as f:
meta = browser.desired_capabilities
f.write(json.dumps(meta, ensure_ascii=False))
return browser
def connect_fireFox(self):
# 读取连接信息
if not os.path.isfile(self.fireFox_wsPoint):
return None
with open(os.path.join(base_dir, self.fireFox_wsPoint), "r", encoding=self.encoding) as f:
wsPoint = json.loads(f.read().strip())
if not wsPoint: return None
if not os.path.isfile(self.fireFox_capabilities):
capabilities = None
else:
with open(os.path.join(base_dir, self.fireFox_capabilities), "r", encoding=self.encoding) as f:
capabilities = json.loads(f.read().strip()) or None
browser = ReuseWebdriver(command_executor=wsPoint["command_executor"], session_id=wsPoint["session_id"], o_capabilities=capabilities)
if browser.connect_failed: return None
return browser
def get_fireFox_browser(self):
browser = self.connect_fireFox()
if not browser:
browser = self.create_fireFox_ws()
return browser
if __name__ == '__main__':
browser = Browser().get_fireFox_browser()
time.sleep(60 * 60 * 10)
网友评论