Skip to content

Commit

Permalink
Allow dynamic scale factor (+ get on startup)
Browse files Browse the repository at this point in the history
Runtime Patched
  • Loading branch information
Vinyzu committed Mar 25, 2024
1 parent abde9a6 commit 144bd95
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 22 deletions.
18 changes: 14 additions & 4 deletions cdp_patches/input/async_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class AsyncInput:
pid: Optional[int]
_base: Union[WindowsBase]
window_timeout: int = 30
scale_factor: float = 1.0
_scale_factor: float = 1.0
timeout: float = 0.01
typing_speed: int = 50
last_x: int = 0
Expand All @@ -31,7 +31,7 @@ class AsyncInput:
def __init__(self, pid: Optional[int] = None, browser: Optional[async_browsers] = None, scale_factor: Optional[float] = 1.0, emulate_behaviour: Optional[bool] = True) -> None:
self.pid = pid
self.browser = browser
self.scale_factor = scale_factor or self.scale_factor
self._scale_factor = scale_factor or self._scale_factor
self.emulate_behaviour = emulate_behaviour or self.emulate_behaviour

def __await__(self) -> Generator[None, Any, AsyncInput]:
Expand All @@ -41,12 +41,12 @@ def __await__(self) -> Generator[None, Any, AsyncInput]:
async def __ainit__(self) -> None:
if self.browser:
self.pid = await get_async_browser_pid(self.browser)
self.scale_factor = await get_async_scale_factor(self.browser)
self._scale_factor = await get_async_scale_factor(self.browser)
elif not self.pid:
raise ValueError("You must provide a pid or a browser")

if os.name == "nt":
self._base = WindowsBase(self.pid, self.scale_factor)
self._base = WindowsBase(self.pid, self._scale_factor)
else:
# mind to change typing of `self.base` property when implementing
raise NotImplementedError(f"pyinput not implemented yet for {os.name}")
Expand All @@ -58,6 +58,16 @@ async def __ainit__(self) -> None:
def base(self) -> WindowsBase:
return self._base

@property
def scale_factor(self) -> float:
return self._scale_factor

@scale_factor.setter
def scale_factor(self, scale_value) -> None:
self._scale_factor = scale_value
if self._base:
self._base.scale_factor = scale_value

async def _wait_for_window(self) -> None:
max_wait = time.time() + self.window_timeout
while time.time() < max_wait:
Expand Down
134 changes: 120 additions & 14 deletions cdp_patches/input/browsers.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from contextlib import suppress
from datetime import datetime
from typing import Dict, List, TypedDict, Union

from playwright.async_api import Browser as AsyncBrowser
from playwright.async_api import BrowserContext as AsyncContext
from playwright.async_api import Error as AsyncError
from playwright.async_api import Error as SyncError
from playwright.sync_api import Browser as SyncBrowser
from playwright.sync_api import BrowserContext as SyncContext
from selenium import webdriver
Expand Down Expand Up @@ -109,38 +113,140 @@ async def get_async_browser_pid(browser: async_browsers) -> int:
# Scale Factor
# Selenium & Selenium Driverless
def get_sync_selenium_scale_factor(driver: Union[webdriver.Chrome, driverless_sync_webdriver.Chrome]) -> int:
if isinstance(driver, driverless_sync_webdriver.Chrome):
_scale_factor: int = driver.execute_script("return window.devicePixelRatio", unique_context=True)
return _scale_factor

scale_factor: int = driver.execute_script("return window.devicePixelRatio")
return scale_factor


async def get_async_selenium_scale_factor(driver: driverless_async_webdriver.Chrome) -> int:
scale_factor: int = await driver.execute_script("return window.devicePixelRatio")
scale_factor: int = await driver.execute_script("return window.devicePixelRatio", unique_context=True)
return scale_factor


# Playwright
# Playwright with Runtime Patching
def get_sync_playwright_scale_factor(browser: Union[SyncContext, SyncBrowser]) -> int:
if isinstance(browser, SyncContext) and any(browser.pages):
page = browser.pages[0]
close_context, close_page = False, False
if isinstance(browser, SyncContext):
context = browser
elif isinstance(browser, SyncBrowser):
if any(browser.contexts):
context = browser.contexts[0]
else:
context = browser.new_context()
close_context = True
else:
raise ValueError("Invalid browser type.")

if any(context.pages):
page = context.pages[0]
else:
page = context.new_page()
close_page = True
cdp_session = context.new_cdp_session(page)

time1 = datetime.now()
while (datetime.now() - time1).seconds <= 10:
try:
page_frame_tree = cdp_session.send("Page.getFrameTree")
page_id = page_frame_tree["frameTree"]["frame"]["id"]

isolated_world = cdp_session.send("Page.createIsolatedWorld", {"frameId": page_id, "grantUniveralAccess": True, "worldName": "Shimmy shimmy yay, shimmy yay, shimmy ya"})
isolated_exec_id = isolated_world["executionContextId"]
break
except SyncError as e:
if e.message == "Protocol error (Page.createIsolatedWorld): Invalid parameters":
pass
else:
raise e
else:
raise TimeoutError("Page.createIsolatedWorld did not initialize properly within 30 seconds.")

time2 = datetime.now()
while (datetime.now() - time2).seconds <= 10:
try:
scale_factor_eval = cdp_session.send("Runtime.evaluate", {"expression": "window.devicePixelRatio", "contextId": isolated_exec_id})
scale_factor: int = scale_factor_eval["result"]["value"]
break
except SyncError as e:
if e.message == "Protocol error (Runtime.evaluate): Cannot find context with specified id":
pass
else:
raise e
else:
page = browser.new_page()
raise TimeoutError("Runtime.evaluate did not run properly within 30 seconds.")

scale_factor: int = page.evaluate("window.devicePixelRatio")
if not (isinstance(browser, SyncContext) and any(browser.pages)):
page.close()
with suppress(SyncError):
if close_page:
page.close()

with suppress(SyncError):
if close_context:
context.close()

return scale_factor


async def get_async_playwright_scale_factor(browser: Union[AsyncContext, AsyncBrowser]) -> int:
if isinstance(browser, AsyncContext) and any(browser.pages):
page = browser.pages[0]
close_context, close_page = False, False
if isinstance(browser, AsyncContext):
context = browser
elif isinstance(browser, AsyncBrowser):
if any(browser.contexts):
context = browser.contexts[0]
else:
context = await browser.new_context()
close_context = True
else:
page = await browser.new_page()
raise ValueError("Invalid browser type.")

if any(context.pages):
page = context.pages[0]
else:
page = await context.new_page()
close_page = True
cdp_session = await context.new_cdp_session(page)

time1 = datetime.now()
while (datetime.now() - time1).seconds <= 10:
try:
page_frame_tree = await cdp_session.send("Page.getFrameTree")
page_id = page_frame_tree["frameTree"]["frame"]["id"]

isolated_world = await cdp_session.send("Page.createIsolatedWorld", {"frameId": page_id, "grantUniveralAccess": True, "worldName": "Shimmy shimmy yay, shimmy yay, shimmy ya"})
isolated_exec_id = isolated_world["executionContextId"]
break
except AsyncError as e:
if e.message == "Protocol error (Page.createIsolatedWorld): Invalid parameters":
pass
else:
raise e
else:
raise TimeoutError("Page.createIsolatedWorld did not initialize properly within 30 seconds.")

time2 = datetime.now()
while (datetime.now() - time2).seconds <= 10:
try:
scale_factor_eval = await cdp_session.send("Runtime.evaluate", {"expression": "window.devicePixelRatio", "contextId": isolated_exec_id})
scale_factor: int = scale_factor_eval["result"]["value"]
break
except AsyncError as e:
if e.message == "Protocol error (Runtime.evaluate): Cannot find context with specified id":
pass
else:
raise e
else:
raise TimeoutError("Runtime.evaluate did not run properly within 30 seconds.")

with suppress(SyncError):
if close_page:
await page.close()

scale_factor: int = await page.evaluate("window.devicePixelRatio")
if not (isinstance(browser, AsyncContext) and any(browser.pages)):
await page.close()
with suppress(SyncError):
if close_context:
await context.close()

return scale_factor

Expand Down
18 changes: 14 additions & 4 deletions cdp_patches/input/sync_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,27 @@ class SyncInput:
pid: Optional[int]
_base: Union[WindowsBase]
window_timeout: int = 30
scale_factor: float = 1.0
_scale_factor: float = 1.0
timeout: float = 0.01
typing_speed: int = 50
last_x: int = 0
last_y: int = 0
selective_modifiers_regex = re.compile(r"{[^{}]*}|.")

def __init__(self, pid: Optional[int] = None, browser: Optional[sync_browsers] = None, scale_factor: Optional[float] = 1.0, emulate_behaviour: Optional[bool] = True) -> None:
self.scale_factor = scale_factor or self.scale_factor
self._scale_factor = scale_factor or self._scale_factor
self.emulate_behaviour = emulate_behaviour or self.emulate_behaviour

if browser:
self.pid = get_sync_browser_pid(browser)
self.scale_factor = get_sync_scale_factor(browser)
self._scale_factor = get_sync_scale_factor(browser)
elif pid:
self.pid = pid
else:
raise ValueError("You must provide a pid or a browser")

if os.name == "nt":
self._base = WindowsBase(self.pid, self.scale_factor)
self._base = WindowsBase(self.pid, self._scale_factor)
else:
raise NotImplementedError(f"pyinput not implemented yet for {os.name}")
self._wait_for_window()
Expand All @@ -49,6 +49,16 @@ def __init__(self, pid: Optional[int] = None, browser: Optional[sync_browsers] =
def base(self):
return self._base

@property
def scale_factor(self) -> float:
return self._scale_factor

@scale_factor.setter
def scale_factor(self, scale_value) -> None:
self._scale_factor = scale_value
if self._base:
self._base.scale_factor = scale_value

def _wait_for_window(self) -> None:
max_wait = time.time() + self.window_timeout
while time.time() < max_wait:
Expand Down

0 comments on commit 144bd95

Please sign in to comment.