Skip to content

Commit

Permalink
- add move lock
Browse files Browse the repository at this point in the history
- improve blocking code within asyncio
  • Loading branch information
kaliiiiiiiiii committed Apr 12, 2024
1 parent c6877b5 commit e1e001f
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 20 deletions.
24 changes: 13 additions & 11 deletions cdp_patches/input/async_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def __init__(self, pid: Optional[int] = None, browser: Optional[async_browsers]
self.browser = browser
self._scale_factor = scale_factor or self._scale_factor
self.emulate_behaviour = emulate_behaviour or self.emulate_behaviour
self._move_lock = asyncio.Lock()

def __await__(self) -> Generator[None, Any, AsyncInput]:
yield from self.__ainit__().__await__()
Expand Down Expand Up @@ -84,7 +85,7 @@ async def _wait_for_window(self) -> None:
max_wait = time.time() + self.window_timeout
while time.time() < max_wait:
try:
if self._base.get_window():
if await self._base.async_get_window():
return
except WindowErrors:
pass
Expand Down Expand Up @@ -137,19 +138,20 @@ async def up(self, button: Literal["left", "right", "middle"], x: Union[int, flo
self.last_x, self.last_y = x, y

async def move(self, x: Union[int, float], y: Union[int, float], emulate_behaviour: Optional[bool] = True, timeout: Optional[float] = None) -> None:
x, y = int(x), int(y)
async with self._move_lock:
x, y = int(x), int(y)

if self.emulate_behaviour and emulate_behaviour:
humanized_points = HumanizeMouseTrajectory((self.last_x, self.last_y), (x, y))
if self.emulate_behaviour and emulate_behaviour:
humanized_points = HumanizeMouseTrajectory((self.last_x, self.last_y), (x, y))

# Move Mouse to new random locations
for i, (human_x, human_y) in enumerate(humanized_points.points):
self._base.move(x=int(human_x), y=int(human_y))
await self._sleep_timeout(timeout=timeout)
# Move Mouse to new random locations
for i, (human_x, human_y) in enumerate(humanized_points.points):
self._base.move(x=int(human_x), y=int(human_y))
await self._sleep_timeout(timeout=timeout)

else:
self._base.move(x=x, y=y)
self.last_x, self.last_y = x, y
else:
self._base.move(x=x, y=y)
self.last_x, self.last_y = x, y

async def scroll(self, direction: Literal["up", "down", "left", "right"], amount: int) -> None:
warnings.warn("Scrolling using CDP-Patches is discouraged as Scroll Inputs dont leak the CDP Domain.", UserWarning)
Expand Down
36 changes: 36 additions & 0 deletions cdp_patches/input/os_base/linux.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import os
import re
import subprocess
Expand Down Expand Up @@ -90,6 +91,7 @@ class LinuxBase:
def __init__(self, pid: int, scale_factor: float) -> None:
self.pid = pid
self.scale_factor = scale_factor
self._loop = asyncio.get_event_loop()

display_env = os.getenv("DISPLAY")
self.display = display.Display(display_env)
Expand Down Expand Up @@ -130,6 +132,40 @@ def search_windows_by_pid(query_tree, pid: int):

raise ValueError(f"No windows found for PID: {self.pid}")

async def async_get_window(self) -> Any:
name_atom = self.display.get_atom("WM_NAME", only_if_exists=True)
pid_atom = self.display.get_atom("_NET_WM_PID", only_if_exists=True)
res_windows: List[Window] = []

# Getting all WindowIds by PID by recursively searching through all windows under the root window query tree
def search_windows_by_pid(query_tree, pid: int):
for window in query_tree.children:
window_pid = window.get_property(pid_atom, 0, 0, pow(2, 32) - 1)
if window_pid and window_pid.value[0] == pid:
res_windows.append(window)
if window.query_tree().children:
search_windows_by_pid(window.query_tree(), pid)

await self._loop.run_in_executor(None, lambda: search_windows_by_pid(self.display.screen().root.query_tree(), self.pid))
assert res_windows, ValueError(f"No windows found for PID: {self.pid}")

for window in res_windows:
# Getting necessary window properties
title = window.get_property(name_atom, 0, 0, pow(2, 32) - 1).value
min_height = window.get_wm_normal_hints().min_height
# parent_offset_coords = window.translate_coords(window.query_tree().parent, 0, 0)
# window_x, window_y = parent_offset_coords.x, parent_offset_coords.y

# Filter out non-browser windows, for example the Taskbar or Info Bars
if (b"google-chrome" in title) or (title == b"chrome") or (
min_height == 0): # or (window_x == window_y) or not all((window_x, window_y))
continue

self.browser_window = window
return self.browser_window

raise ValueError(f"No windows found for PID: {self.pid}")

def _offset_toolbar_height(self) -> Tuple[int, int]:
# Get Window Location
root_offset_coords = self.browser_window.translate_coords(self.browser_window.query_tree().root, 0, 0)
Expand Down
17 changes: 17 additions & 0 deletions cdp_patches/input/os_base/windows.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import re
import warnings
from typing import Literal
Expand All @@ -22,6 +23,7 @@ class WindowsBase:
def __init__(self, pid: int, scale_factor: float) -> None:
self.pid = pid
self.scale_factor = scale_factor
self._loop = asyncio.get_event_loop()

def get_window(self) -> WindowSpecification:
win32_app = application.Application(backend="win32")
Expand All @@ -38,6 +40,21 @@ def get_window(self) -> WindowSpecification:
self.browser_window = child
return self.browser_window

async def async_get_window(self) -> WindowSpecification:
win32_app = application.Application(backend="win32")
await self._loop.run_in_executor(None, lambda: win32_app.connect(process=self.pid))

self.browser_window: WindowSpecification = win32_app.top_window()
self.hwnd = self.browser_window.handle
# Perform Window Checks
await self._loop.run_in_executor(None, lambda:self.browser_window.verify_actionable())
assert self.browser_window.is_normal()

for child in self.browser_window.iter_children():
if child.element_info.class_name == "Chrome_RenderWidgetHostHWND":
self.browser_window = child
return self.browser_window

def down(self, button: Literal["left", "right", "middle"], x: int, y: int) -> None:
self.browser_window.press_mouse(button=button, coords=(int(x * self.scale_factor), int(y * self.scale_factor)))

Expand Down
21 changes: 12 additions & 9 deletions cdp_patches/input/sync_input.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import random
import re
import sys
import threading
import time
import warnings
from typing import Literal, Optional, Union
Expand Down Expand Up @@ -47,6 +48,7 @@ class SyncInput:
def __init__(self, pid: Optional[int] = None, browser: Optional[sync_browsers] = None, scale_factor: Optional[float] = 1.0, emulate_behaviour: Optional[bool] = True) -> None:
self._scale_factor = scale_factor or self._scale_factor
self.emulate_behaviour = emulate_behaviour or self.emulate_behaviour
self._move_lock = threading.Lock()

if browser:
self.pid = get_sync_browser_pid(browser)
Expand Down Expand Up @@ -132,18 +134,19 @@ def up(self, button: Literal["left", "right", "middle"], x: Union[int, float], y
self.last_x, self.last_y = x, y

def move(self, x: Union[int, float], y: Union[int, float], emulate_behaviour: Optional[bool] = True, timeout: Optional[float] = None) -> None:
x, y = int(x), int(y)
with self._move_lock:
x, y = int(x), int(y)

if self.emulate_behaviour and emulate_behaviour:
humanized_points = HumanizeMouseTrajectory((self.last_x, self.last_y), (x, y))
if self.emulate_behaviour and emulate_behaviour:
humanized_points = HumanizeMouseTrajectory((self.last_x, self.last_y), (x, y))

# Move Mouse to new random locations
for i, (human_x, human_y) in enumerate(humanized_points.points):
self._base.move(x=int(human_x), y=int(human_y))
self._sleep_timeout(timeout=timeout)
# Move Mouse to new random locations
for i, (human_x, human_y) in enumerate(humanized_points.points):
self._base.move(x=int(human_x), y=int(human_y))
self._sleep_timeout(timeout=timeout)

self._base.move(x=x, y=y)
self.last_x, self.last_y = x, y
self._base.move(x=x, y=y)
self.last_x, self.last_y = x, y

def scroll(self, direction: Literal["up", "down", "left", "right"], amount: int) -> None:
warnings.warn("Scrolling using CDP-Patches is discouraged as Scroll Inputs dont leak the CDP Domain.", UserWarning)
Expand Down

0 comments on commit e1e001f

Please sign in to comment.