Update CDP Mode

This commit is contained in:
Michael Mintz 2024-11-15 01:17:03 -05:00
parent 5ccee49f90
commit 86a856234f
6 changed files with 106 additions and 44 deletions

View File

@ -599,6 +599,9 @@ def uc_open_with_cdp_mode(driver, url=None):
cdp.select = CDPM.select
cdp.select_all = CDPM.select_all
cdp.find_elements = CDPM.find_elements
cdp.find_visible_elements = CDPM.find_visible_elements
cdp.click_nth_element = CDPM.click_nth_element
cdp.click_nth_visible_element = CDPM.click_nth_visible_element
cdp.click_link = CDPM.click_link
cdp.tile_windows = CDPM.tile_windows
cdp.get_all_cookies = CDPM.get_all_cookies

View File

@ -3,6 +3,7 @@ import os
import shutil
import sys
import time
from contextlib import suppress
from seleniumbase import config as sb_config
from seleniumbase.config import settings
from seleniumbase.fixtures import constants
@ -281,14 +282,13 @@ def log_test_failure_data(test, test_logpath, driver, browser, url=None):
sb_config._report_time = the_time
sb_config._report_traceback = traceback_message
sb_config._report_exception = exc_message
try:
with suppress(Exception):
if not os.path.exists(test_logpath):
os.makedirs(test_logpath)
except Exception:
pass
log_file = codecs.open(basic_file_path, "w+", "utf-8")
log_file.writelines("\r\n".join(data_to_save))
log_file.close()
with suppress(Exception):
log_file = codecs.open(basic_file_path, "w+", encoding="utf-8")
log_file.writelines("\r\n".join(data_to_save))
log_file.close()
def log_skipped_test_data(test, test_logpath, driver, browser, reason):
@ -297,16 +297,12 @@ def log_skipped_test_data(test, test_logpath, driver, browser, reason):
browser_version = None
driver_version = None
driver_name = None
try:
with suppress(Exception):
browser_version = get_browser_version(driver)
except Exception:
pass
try:
with suppress(Exception):
driver_name, driver_version = get_driver_name_and_version(
driver, browser
)
except Exception:
pass
if browser_version:
headless = ""
if test.headless and browser in ["chrome", "edge", "firefox"]:
@ -368,13 +364,11 @@ def log_page_source(test_logpath, driver, source=None):
"unresponsive, or closed prematurely!</h4>"
)
)
try:
with suppress(Exception):
if not os.path.exists(test_logpath):
os.makedirs(test_logpath)
except Exception:
pass
html_file_path = os.path.join(test_logpath, html_file_name)
html_file = codecs.open(html_file_path, "w+", "utf-8")
html_file = codecs.open(html_file_path, "w+", encoding="utf-8")
html_file.write(page_source)
html_file.close()
@ -543,7 +537,7 @@ def log_folder_setup(log_path, archive_logs=False):
try:
os.makedirs(log_path)
except Exception:
pass # Should only be reachable during multi-threaded runs
pass # Only reachable during multi-threaded runs
else:
saved_folder = "%s/../%s/" % (log_path, constants.Logs.SAVED)
archived_folder = os.path.realpath(saved_folder) + "/"
@ -551,7 +545,7 @@ def log_folder_setup(log_path, archive_logs=False):
try:
os.makedirs(archived_folder)
except Exception:
pass # Should only be reachable during multi-threaded runs
pass # Only reachable during multi-threaded runs
archived_logs = "%slogs_%s" % (archived_folder, int(time.time()))
if len(os.listdir(log_path)) > 0:
try:

View File

@ -252,6 +252,45 @@ class CDPMethods():
def find_elements(self, selector, timeout=settings.SMALL_TIMEOUT):
return self.select_all(selector, timeout=timeout)
def find_visible_elements(self, selector, timeout=settings.SMALL_TIMEOUT):
visible_elements = []
elements = self.select_all(selector, timeout=timeout)
for element in elements:
with suppress(Exception):
position = element.get_position()
if (position.width != 0 or position.height != 0):
visible_elements.append(element)
return visible_elements
def click_nth_element(self, selector, number):
elements = self.select_all(selector)
if len(elements) < number:
raise Exception(
"Not enough matching {%s} elements to "
"click number %s!" % (selector, number)
)
number = number - 1
if number < 0:
number = 0
element = elements[number]
element.click()
def click_nth_visible_element(self, selector, number):
"""Finds all matching page elements and clicks the nth visible one.
Example: self.click_nth_visible_element('[type="checkbox"]', 5)
(Clicks the 5th visible checkbox on the page.)"""
elements = self.find_visible_elements(selector)
if len(elements) < number:
raise Exception(
"Not enough matching {%s} elements to "
"click number %s!" % (selector, number)
)
number = number - 1
if number < 0:
number = 0
element = elements[number]
element.click()
def click_link(self, link_text):
self.find_elements_by_text(link_text, "a")[0].click()
@ -479,18 +518,36 @@ class CDPMethods():
self.__slow_mode_pause_if_set()
self.loop.run_until_complete(self.page.wait())
def click_visible_elements(self, selector):
def click_visible_elements(self, selector, limit=0):
"""Finds all matching page elements and clicks visible ones in order.
If a click reloads or opens a new page, the clicking will stop.
If no matching elements appear, an Exception will be raised.
If "limit" is set and > 0, will only click that many elements.
Also clicks elements that become visible from previous clicks.
Works best for actions such as clicking all checkboxes on a page.
Example: self.click_visible_elements('input[type="checkbox"]')"""
elements = self.select_all(selector)
click_count = 0
for element in elements:
if limit and limit > 0 and click_count >= limit:
return
try:
position = element.get_position()
if (position.width != 0 or position.height != 0):
width = 0
height = 0
try:
position = element.get_position()
width = position.width
height = position.height
except Exception:
continue
if (width != 0 or height != 0):
element.click()
click_count += 1
time.sleep(0.0375)
self.__slow_mode_pause_if_set()
self.loop.run_until_complete(self.page.wait())
except Exception:
pass
break
def mouse_click(self, selector, timeout=settings.SMALL_TIMEOUT):
"""(Attempt simulating a mouse click)"""
@ -1238,6 +1295,7 @@ class CDPMethods():
def gui_drag_and_drop(self, drag_selector, drop_selector, timeframe=0.35):
self.__slow_mode_pause_if_set()
self.bring_active_window_to_front()
x1, y1 = self.get_gui_element_center(drag_selector)
self.__add_light_pause()
x2, y2 = self.get_gui_element_center(drop_selector)
@ -1327,10 +1385,14 @@ class CDPMethods():
def gui_hover_element(self, selector, timeframe=0.25):
self.__slow_mode_pause_if_set()
x, y = self.get_gui_element_center(selector)
self.__add_light_pause()
self.__gui_hover_x_y(x, y, timeframe=timeframe)
self.__slow_mode_pause_if_set()
element_rect = self.get_gui_element_rect(selector)
width = element_rect["width"]
height = element_rect["height"]
if width > 0 and height > 0:
x, y = self.get_gui_element_center(selector)
self.bring_active_window_to_front()
self.__gui_hover_x_y(x, y, timeframe=timeframe)
self.__slow_mode_pause_if_set()
self.loop.run_until_complete(self.page.wait())
def gui_hover_and_click(self, hover_selector, click_selector):
@ -1338,6 +1400,7 @@ class CDPMethods():
constants.MultiBrowser.PYAUTOGUILOCK
)
with gui_lock:
self.bring_active_window_to_front()
self.gui_hover_element(hover_selector)
time.sleep(0.15)
self.gui_hover_element(click_selector)

View File

@ -2166,7 +2166,6 @@ class BaseCase(unittest.TestCase):
if limit and limit > 0 and len(elements) > limit:
elements = elements[:limit]
return elements
self.wait_for_ready_state_complete()
time.sleep(0.05)
elements = self.driver.find_elements(by=by, value=selector)
@ -2178,6 +2177,11 @@ class BaseCase(unittest.TestCase):
"""Returns a list of matching WebElements that are visible.
If "limit" is set and > 0, will only return that many elements."""
selector, by = self.__recalculate_selector(selector, by)
if self.__is_cdp_swap_needed():
elements = self.cdp.find_visible_elements(selector)
if limit and limit > 0 and len(elements) > limit:
elements = elements[:limit]
return elements
self.wait_for_ready_state_complete()
time.sleep(0.05)
return page_actions.find_visible_elements(
@ -2201,7 +2205,7 @@ class BaseCase(unittest.TestCase):
timeout = self.__get_new_timeout(timeout)
selector, by = self.__recalculate_selector(selector, by)
if self.__is_cdp_swap_needed():
self.cdp.click_visible_elements(selector)
self.cdp.click_visible_elements(selector, limit)
return
self.wait_for_ready_state_complete()
if self.__needs_minimum_wait():
@ -2283,13 +2287,16 @@ class BaseCase(unittest.TestCase):
):
"""Finds all matching page elements and clicks the nth visible one.
Example: self.click_nth_visible_element('[type="checkbox"]', 5)
(Clicks the 5th visible checkbox on the page.)"""
(Clicks the 5th visible checkbox on the page.)"""
self.__check_scope()
if not timeout:
timeout = settings.SMALL_TIMEOUT
if self.timeout_multiplier and timeout == settings.SMALL_TIMEOUT:
timeout = self.__get_new_timeout(timeout)
selector, by = self.__recalculate_selector(selector, by)
if self.__is_cdp_swap_needed():
self.cdp.click_nth_visible_element(selector, number)
return
self.wait_for_ready_state_complete()
self.wait_for_element_present(selector, by=by, timeout=timeout)
elements = self.find_visible_elements(selector, by=by)
@ -2897,6 +2904,9 @@ class BaseCase(unittest.TestCase):
drop_selector, drop_by = self.__recalculate_selector(
drop_selector, drop_by
)
if self.__is_cdp_swap_needed():
self.cdp.gui_drag_and_drop(drag_selector, drop_selector)
return
drag_element = self.wait_for_element_clickable(
drag_selector, by=drag_by, timeout=timeout
)
@ -15435,7 +15445,8 @@ class BaseCase(unittest.TestCase):
elif hasattr(self, "_using_sb_fixture") and self._using_sb_fixture:
test_id = sb_config._latest_display_id
test_id = test_id.replace(".py::", ".").replace("::", ".")
test_id = test_id.replace("/", ".").replace(" ", "_")
test_id = test_id.replace("/", ".").replace("\\", ".")
test_id = test_id.replace(" ", "_")
# Linux filename length limit for `codecs.open(filename)` = 255
# 255 - len("latest_logs/") - len("/basic_test_info.txt") = 223
if len(test_id) <= 223:
@ -16132,11 +16143,7 @@ class BaseCase(unittest.TestCase):
# This test already called tearDown()
return
if hasattr(self, "recorder_mode") and self.recorder_mode:
if self.undetectable:
try:
self.driver.window_handles
except Exception:
self.driver.connect()
page_actions._reconnect_if_disconnected(self.driver)
try:
self.__process_recorded_actions()
except Exception as e:
@ -16177,12 +16184,7 @@ class BaseCase(unittest.TestCase):
)
raise Exception(message)
# *** Start tearDown() officially ***
if self.undetectable:
try:
self.driver.window_handles
except Exception:
with suppress(Exception):
self.driver.connect()
page_actions._reconnect_if_disconnected(self.driver)
self.__slow_mode_pause_if_active()
has_exception = self.__has_exception()
sb_config._has_exception = has_exception

View File

@ -376,7 +376,7 @@ class Mobile:
class UC:
RECONNECT_TIME = 2.4 # Seconds
CDP_MODE_OPEN_WAIT = 0.9 # Seconds
EXTRA_WINDOWS_WAIT = 0.2 # Seconds
EXTRA_WINDOWS_WAIT = 0.3 # Seconds
class ValidBrowsers:

View File

@ -1202,7 +1202,7 @@ def SB(
from seleniumbase.core import download_helper
from seleniumbase.core import proxy_helper
log_helper.log_folder_setup(constants.Logs.LATEST + "/")
log_helper.log_folder_setup(constants.Logs.LATEST + os.sep)
log_helper.clear_empty_logs()
download_helper.reset_downloads_folder()
if not sb_config.multi_proxy:
@ -1228,7 +1228,7 @@ def SB(
the_traceback = traceback.format_exc().strip()
try:
p2 = the_traceback.split(', in ')[1].split('", line ')[0]
filename = p2.split("/")[-1]
filename = p2.split(os.sep)[-1]
sb.cm_filename = filename
except Exception:
sb.cm_filename = None