美文网首页
selenium 连接已打开的webdriver

selenium 连接已打开的webdriver

作者: 领悟悟悟 | 来源:发表于2020-04-30 14:08 被阅读0次
    1. 连接时被连接的webdrive 已打开的第一个窗口不能关闭(懒得测试原因)
    Input 参数:
      command_executor = browser.command_executor._url
      session_id = browser.session_id
      o_capabilities = browser.desired_capabilities
    
    import time
    
    from selenium.webdriver import Remote
    from selenium.webdriver.chrome import options
    from selenium.common.exceptions import InvalidArgumentException
    
    def verifiedConnect(func):
        def inner(*args, **kwargs):
            self = func(*args, **kwargs)
            try:
                current_window_handle = self.current_window_handle
                print("[%s]current_window_handle:"%time.strftime("%x %X"), current_window_handle)
            except Exception as e:
                self.connect_failed = True
                print("[%s]webdrive connect failed. [%s]"%(time.strftime("%x %X"), e.__str__().strip()))
                # raise
            return self
        return inner
    
    class ReuseWebdriver(Remote):
        def __init__(self, command_executor, session_id, o_capabilities=None):
            self.r_session_id = session_id
            self.o_capabilities = o_capabilities
            self.connect_failed = False
    
            Remote.__init__(self, command_executor=command_executor, desired_capabilities={})
    
        @verifiedConnect
        def start_session(self, capabilities, browser_profile=None):
            """
            重写start_session方法
            """
            if not isinstance(capabilities, dict):
                raise InvalidArgumentException("Capabilities must be a dictionary")
            if browser_profile:
                if "moz:firefoxOptions" in capabilities:
                    capabilities["moz:firefoxOptions"]["profile"] = browser_profile.encoded
                else:
                    capabilities.update({'firefox_profile': browser_profile.encoded})
    
            self.capabilities = self.o_capabilities or options.Options().to_capabilities()
            self.session_id = self.r_session_id
    
            # if capabilities is none we are probably speaking to
            # a W3C endpoint
    
            # Double check to see if we have a W3C Compliant browser
            self.w3c = True
            self.command_executor.w3c = self.w3c
    
            return self
    
    brower = ReuseWebdriver("http://127.0.0.1:64122", "7732c8d6-2a8c-48c5-afdb-4cc9ff1501aa")
    print(brower.execute_script('return sign(arguments[0],arguments[1],arguments[2])', 1, 2,3))
    

    Demo

    import time
    import os
    import json
    
    from selenium import webdriver
    from selenium.webdriver import Remote
    from selenium.webdriver.chrome import options as chromeOptions
    from selenium.webdriver.firefox import options as fireFoxOptions
    from selenium.common.exceptions import InvalidArgumentException
    
    firefox_driver_path = r"D:\chromedriver_win32\fireFox/geckodriver.exe"
    base_dir = os.path.dirname(os.path.abspath(__file__))
    
    def verifiedConnect(func):
        def inner(*args, **kwargs):
            self = func(*args, **kwargs)
            try:
                current_window_handle = self.current_window_handle
                print("[%s]current_window_handle:"%time.strftime("%x %X"), current_window_handle)
            except Exception as e:
                self.connect_failed = True
                print("[%s]webdrive connect failed. [%s]"%(time.strftime("%x %X"), e.__str__().strip()))
                # raise
            return self
        return inner
    
    class ReuseWebdriver(Remote):
        def __init__(self, command_executor, session_id, o_capabilities=None):
            self.r_session_id = session_id
            self.o_capabilities = o_capabilities
            self.connect_failed = False
    
            Remote.__init__(self, command_executor=command_executor, desired_capabilities={})
    
        @verifiedConnect
        def start_session(self, capabilities, browser_profile=None):
            """
            重写start_session方法
            """
            if not isinstance(capabilities, dict):
                raise InvalidArgumentException("Capabilities must be a dictionary")
            if browser_profile:
                if "moz:firefoxOptions" in capabilities:
                    capabilities["moz:firefoxOptions"]["profile"] = browser_profile.encoded
                else:
                    capabilities.update({'firefox_profile': browser_profile.encoded})
    
            self.capabilities = self.o_capabilities or fireFoxOptions.Options().to_capabilities()
            self.session_id = self.r_session_id
    
            # if capabilities is none we are probably speaking to
            # a W3C endpoint
    
            # Double check to see if we have a W3C Compliant browser
            self.w3c = True
            self.command_executor.w3c = self.w3c
    
            return self
    
    
    class Browser:
        encoding = "utf-8"
        fireFox_wsPoint = "fireFox_wsPoint"
        fireFox_capabilities = "fireFox_capabilities"
        """可以打开一个chrome,获取ws信息,然后后续的操作都在这一个浏览器上进行"""
        chrome_webdriver_js = '''() =>
           Object.defineProperties(navigator,{
             webdriver:{
               get: () => false
             }
           })
        '''
    
        def create_fireFox_ws(self):
            options = fireFoxOptions.Options()
            options.headless = False # True 无界
            browser = webdriver.Firefox(executable_path=firefox_driver_path, options=options)
    
            # 存储web driver连接信息
            with open(os.path.join(base_dir, self.fireFox_wsPoint), "w", encoding=self.encoding) as f:
                meta = {"command_executor":browser.command_executor._url,"session_id":browser.session_id}
                f.write(json.dumps(meta, ensure_ascii=False))
            with open(os.path.join(base_dir, self.fireFox_capabilities), "w", encoding=self.encoding) as f:
                meta = browser.desired_capabilities
                f.write(json.dumps(meta, ensure_ascii=False))
    
            return browser
    
        def connect_fireFox(self):
            # 读取连接信息
            if not os.path.isfile(self.fireFox_wsPoint):
                return None
            with open(os.path.join(base_dir, self.fireFox_wsPoint), "r", encoding=self.encoding) as f:
                wsPoint = json.loads(f.read().strip())
            if not wsPoint: return None
            if not os.path.isfile(self.fireFox_capabilities):
                capabilities = None
            else:
                with open(os.path.join(base_dir, self.fireFox_capabilities), "r", encoding=self.encoding) as f:
                    capabilities = json.loads(f.read().strip()) or None
    
            browser = ReuseWebdriver(command_executor=wsPoint["command_executor"], session_id=wsPoint["session_id"], o_capabilities=capabilities)
            if browser.connect_failed: return None
    
            return browser
    
        def get_fireFox_browser(self):
            browser = self.connect_fireFox()
            if not browser:
                browser = self.create_fireFox_ws()
            return browser
    
    if __name__ == '__main__':
        browser = Browser().get_fireFox_browser()
        time.sleep(60 * 60 * 10)
    

    相关文章

      网友评论

          本文标题:selenium 连接已打开的webdriver

          本文链接:https://www.haomeiwen.com/subject/ckmzwhtx.html