Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…ches into dev
  • Loading branch information
Vinyzu committed Aug 16, 2024
2 parents 0c19da2 + ac8ff09 commit 6856ac1
Show file tree
Hide file tree
Showing 19 changed files with 232 additions and 735 deletions.
674 changes: 0 additions & 674 deletions LICENSE

This file was deleted.

2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ pip install cdp-patches[automation_linting]
### Concept: Input Domain Leaks
Bypass CDP Leaks in [Input](https://chromedevtools.github.io/devtools-protocol/tot/Input/) domains

[![Brotector Banner](https://github.com/Kaliiiiiiiiii-Vinyzu/CDP-Patches/assets/50874994/fdbe831d-cb39-479d-ba0a-fea7f29fe90a)](https://github.com/kaliiiiiiiiii/brotector)

For an interaction event `e`, the page coordinates won't ever equal the screen coordinates, unless Chrome is in fullscreen.
However, all `CDP` input commands just set it the same by default (see [crbug#1477537](https://bugs.chromium.org/p/chromium/issues/detail?id=1477537)).
```js
Expand Down
1 change: 1 addition & 0 deletions cdp_patches/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
elif system_name == "Linux":
is_windows = False
else:
is_windows = False
warnings.warn("Unknown system (You´re probably using MacOS, which is currently not supported).", RuntimeWarning)

__all__ = ["VERSION", "is_windows"]
36 changes: 22 additions & 14 deletions cdp_patches/input/async_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from cdp_patches import is_windows
from cdp_patches.input.exceptions import WindowClosedException
from cdp_patches.input.utils import _mk_kwargs

if is_windows:
from pywinauto.application import ProcessNotFoundError
Expand Down Expand Up @@ -51,7 +52,7 @@ class AsyncInput:
def __init__(
self, pid: Optional[int] = None, browser: Optional[async_browsers] = None, scale_factor: Optional[float] = 1.0, emulate_behaviour: Optional[bool] = True, window_timeout: Optional[float] = 30.0
) -> None:
if platform.system() not in ('Windows', 'Linux'):
if platform.system() not in ("Windows", "Linux"):
raise SystemError("Unknown system (You´re probably using MacOS, which is currently not supported).")

self.pid = pid
Expand Down Expand Up @@ -113,43 +114,50 @@ async def _sleep_timeout(self, timeout: Optional[float] = None) -> None:

await asyncio.sleep(timeout)

async def click(self, button: Literal["left", "right", "middle"], x: Union[int, float], y: Union[int, float], emulate_behaviour: Optional[bool] = True, timeout: Optional[float] = None) -> None:
async def click(
self, button: Literal["left", "right", "middle"], x: Union[int, float], y: Union[int, float], pressed: str = "", emulate_behaviour: Optional[bool] = True, timeout: Optional[float] = None
) -> None:
x, y = int(x), int(y)

await self.down(button=button, x=x, y=y, emulate_behaviour=emulate_behaviour, timeout=timeout)
await self.down(button=button, x=x, y=y, emulate_behaviour=emulate_behaviour, timeout=timeout, pressed=pressed)
if self.emulate_behaviour and emulate_behaviour:
await self._sleep_timeout(timeout=timeout)
await self.up(button=button, x=x, y=y)
await self.up(button=button, x=x, y=y, pressed=pressed)
self.last_x, self.last_y = x, y

async def double_click(
self, button: Literal["left", "right", "middle"], x: Union[int, float], y: Union[int, float], emulate_behaviour: Optional[bool] = True, timeout: Optional[float] = None
self, button: Literal["left", "right", "middle"], x: Union[int, float], y: Union[int, float], pressed: str = "", emulate_behaviour: Optional[bool] = True, timeout: Optional[float] = None
) -> None:
x, y = int(x), int(y)

await self.click(button=button, x=x, y=y, timeout=timeout, emulate_behaviour=emulate_behaviour)
if self.emulate_behaviour and emulate_behaviour:
await self._sleep_timeout(random.uniform(0.14, 0.21))
# await self._sleep_timeout(timeout=timeout)
await self.click(button=button, x=x, y=y, emulate_behaviour=False, timeout=timeout)
await self.click(button=button, x=x, y=y, emulate_behaviour=False, timeout=timeout, pressed=pressed)

self.last_x, self.last_y = x, y

async def down(self, button: Literal["left", "right", "middle"], x: Union[int, float], y: Union[int, float], emulate_behaviour: Optional[bool] = True, timeout: Optional[float] = None) -> None:
async def down(
self, button: Literal["left", "right", "middle"], x: Union[int, float], y: Union[int, float], pressed: str = "", emulate_behaviour: Optional[bool] = True, timeout: Optional[float] = None
) -> None:
x, y = int(x), int(y)

if self.emulate_behaviour and emulate_behaviour:
await self.move(x=x, y=y, timeout=timeout, emulate_behaviour=emulate_behaviour)
self._base.down(button=button, x=x, y=y)
await self.move(x=x, y=y, timeout=timeout, emulate_behaviour=emulate_behaviour, pressed=pressed)
kwargs = _mk_kwargs(pressed)
self._base.down(button=button, x=x, y=y, **kwargs)
self.last_x, self.last_y = x, y

async def up(self, button: Literal["left", "right", "middle"], x: Union[int, float], y: Union[int, float]) -> None:
async def up(self, button: Literal["left", "right", "middle"], x: Union[int, float], y: Union[int, float], pressed: str = "") -> None:
x, y = int(x), int(y)

self._base.up(button=button, x=x, y=y)
kwargs = _mk_kwargs(pressed)
self._base.up(button=button, x=x, y=y, **kwargs)
self.last_x, self.last_y = x, y

async def move(self, x: Union[int, float], y: Union[int, float], emulate_behaviour: Optional[bool] = True, timeout: Optional[float] = None) -> None:
async def move(self, x: Union[int, float], y: Union[int, float], emulate_behaviour: Optional[bool] = True, timeout: Optional[float] = None, pressed: str = "") -> None:
kwargs = _mk_kwargs(pressed)
async with self._move_lock:
x, y = int(x), int(y)

Expand All @@ -158,11 +166,11 @@ async def move(self, x: Union[int, float], y: Union[int, float], emulate_behavio

# Move Mouse to new random locations
for i, (human_x, human_y) in enumerate(humanized_points.points):
self._base.move(x=int(human_x), y=int(human_y))
self._base.move(x=int(human_x), y=int(human_y), **kwargs)
await self._sleep_timeout(timeout=timeout)

else:
self._base.move(x=x, y=y)
self._base.move(x=x, y=y, **kwargs)
self.last_x, self.last_y = x, y

async def scroll(self, direction: Literal["up", "down", "left", "right"], amount: int) -> None:
Expand Down
6 changes: 3 additions & 3 deletions cdp_patches/input/browsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@
try:
from selenium.webdriver import Chrome as SeleniumChrome
except ImportError:
SeleniumChrome: Type["SeleniumChrome"] = "SeleniumChrome" # type: ignore[no-redef]
SeleniumChrome: Type["SeleniumChrome"] = type("SeleniumChrome", (object,), {}) # type: ignore[no-redef]

try:
from selenium_driverless.sync.webdriver import Chrome as DriverlessSyncChrome
from selenium_driverless.webdriver import Chrome as DriverlessAsyncChrome
except ImportError:
DriverlessAsyncChrome: Type["DriverlessAsyncChrome"] = "DriverlessAsyncChrome" # type: ignore[no-redef]
DriverlessSyncChrome: Type["DriverlessSyncChrome"] = "DriverlessSyncChrome" # type: ignore[no-redef]
DriverlessAsyncChrome: Type["DriverlessAsyncChrome"] = type("DriverlessAsyncChrome", (object,), {}) # type: ignore[no-redef]
DriverlessSyncChrome: Type["DriverlessSyncChrome"] = type("DriverlessSyncChrome", (object,), {}) # type: ignore[no-redef]

all_browsers = Union[AsyncContext, AsyncBrowser, SyncContext, SyncBrowser, BotrightContext, SeleniumChrome, DriverlessAsyncChrome, DriverlessSyncChrome]
sync_browsers = Union[SeleniumChrome, SyncContext, SyncBrowser, DriverlessSyncChrome]
Expand Down
18 changes: 10 additions & 8 deletions cdp_patches/input/os_base/windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import ctypes
import re
import warnings
from typing import Literal, Union, List
from typing import List, Literal, Union

from pywinauto import application, timings
from pywinauto.application import WindowSpecification
Expand All @@ -24,7 +24,7 @@
warnings.filterwarnings("ignore", category=UserWarning, message="32-bit application should be automated using 32-bit Python (you use 64-bit Python)")


def get_top_window(app:application.Application, windows=List[Union[WindowSpecification, HwndWrapper]]) -> WindowSpecification:
def get_top_window(app: application.Application, windows=List[Union[WindowSpecification, HwndWrapper]]) -> WindowSpecification:
if windows is None:
windows = app
# win32_app.top_window(), but without timeout
Expand Down Expand Up @@ -127,17 +127,19 @@ async def async_get_window(self, timeout: float = 1) -> WindowSpecification:

return self.browser_window

def down(self, button: Literal["left", "right", "middle"], x: int, y: int) -> None:
def down(self, button: Literal["left", "right", "middle"], x: int, y: int, pressed: str = "") -> None:
if not pressed:
pressed = button
self.ensure_window()
self.browser_window.press_mouse(button=button, coords=(int(x * self.scale_factor), int(y * self.scale_factor)))
self.browser_window.press_mouse(button=button, pressed=pressed, coords=(int(x * self.scale_factor), int(y * self.scale_factor)))

def up(self, button: Literal["left", "right", "middle"], x: int, y: int) -> None:
def up(self, button: Literal["left", "right", "middle"], x: int, y: int, pressed: str = "") -> None:
self.ensure_window()
self.browser_window.release_mouse(button=button, coords=(int(x * self.scale_factor), int(y * self.scale_factor)))
self.browser_window.release_mouse(button=button, pressed=pressed, coords=(int(x * self.scale_factor), int(y * self.scale_factor)))

def move(self, x: int, y: int) -> None:
def move(self, x: int, y: int, pressed: str = "") -> None:
self.ensure_window()
self.browser_window.move_mouse(coords=(int(x * self.scale_factor), int(y * self.scale_factor)), pressed="left")
self.browser_window.move_mouse(coords=(int(x * self.scale_factor), int(y * self.scale_factor)), pressed=pressed)

def scroll(self, direction: Literal["up", "down", "left", "right"], amount: int) -> None:
self.ensure_window()
Expand Down
38 changes: 23 additions & 15 deletions cdp_patches/input/sync_input.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from cdp_patches import is_windows
from cdp_patches.input.exceptions import WindowClosedException
from cdp_patches.input.utils import _mk_kwargs

if is_windows:
from pywinauto.application import ProcessNotFoundError
Expand Down Expand Up @@ -50,7 +51,7 @@ class SyncInput:
def __init__(
self, pid: Optional[int] = None, browser: Optional[sync_browsers] = None, scale_factor: Optional[float] = 1.0, emulate_behaviour: Optional[bool] = True, window_timeout: Optional[float] = 30.0
) -> None:
if platform.system() not in ('Windows', 'Linux'):
if platform.system() not in ("Windows", "Linux"):
raise SystemError("Unknown system (You´re probably using MacOS, which is currently not supported).")

self._scale_factor = scale_factor or self._scale_factor
Expand Down Expand Up @@ -111,41 +112,48 @@ def _sleep_timeout(self, timeout: Optional[float] = None) -> None:
# while time.perf_counter() - start < timeout:
# pass

def click(self, button: Literal["left", "right", "middle"], x: Union[int, float], y: Union[int, float], emulate_behaviour: Optional[bool] = True, timeout: Optional[float] = 0.07) -> None:
def click(
self, button: Literal["left", "right", "middle"], x: Union[int, float], y: Union[int, float], pressed: str = "", emulate_behaviour: Optional[bool] = True, timeout: Optional[float] = 0.07
) -> None:
x, y = int(x), int(y)

self.down(button=button, x=x, y=y, emulate_behaviour=emulate_behaviour, timeout=timeout)
self.down(button=button, x=x, y=y, emulate_behaviour=emulate_behaviour, timeout=timeout, pressed=pressed)
if self.emulate_behaviour and emulate_behaviour:
self._sleep_timeout(timeout=timeout)
self.up(button=button, x=x, y=y)
self.up(button=button, x=x, y=y, pressed=pressed)
self.last_x, self.last_y = x, y

def double_click(self, button: Literal["left", "right", "middle"], x: Union[int, float], y: Union[int, float], emulate_behaviour: Optional[bool] = True, timeout: Optional[float] = None) -> None:
def double_click(
self, button: Literal["left", "right", "middle"], x: Union[int, float], y: Union[int, float], pressed: str = "", emulate_behaviour: Optional[bool] = True, timeout: Optional[float] = None
) -> None:
x, y = int(x), int(y)

self.click(button=button, x=x, y=y, timeout=timeout, emulate_behaviour=emulate_behaviour)
self.click(button=button, x=x, y=y, timeout=timeout, emulate_behaviour=emulate_behaviour, pressed=pressed)
if emulate_behaviour and self.emulate_behaviour:
self._sleep_timeout(random.uniform(0.14, 0.21))
# self._sleep_timeout(timeout=timeout)
self.click(button=button, x=x, y=y, emulate_behaviour=False, timeout=timeout)
self.click(button=button, x=x, y=y, emulate_behaviour=False, timeout=timeout, pressed=pressed)

self.last_x, self.last_y = x, y

def down(self, button: Literal["left", "right", "middle"], x: Union[int, float], y: Union[int, float], emulate_behaviour: Optional[bool] = True, timeout: Optional[float] = None) -> None:
def down(
self, button: Literal["left", "right", "middle"], x: Union[int, float], y: Union[int, float], pressed: str = "", emulate_behaviour: Optional[bool] = True, timeout: Optional[float] = None
) -> None:
x, y = int(x), int(y)

if self.emulate_behaviour and emulate_behaviour:
self.move(x=x, y=y, emulate_behaviour=emulate_behaviour, timeout=timeout)
self._base.down(button=button, x=x, y=y)
self.move(x=x, y=y, emulate_behaviour=emulate_behaviour, timeout=timeout, pressed=pressed)
self._base.down(button=button, x=x, y=y, **_mk_kwargs(pressed))
self.last_x, self.last_y = x, y

def up(self, button: Literal["left", "right", "middle"], x: Union[int, float], y: Union[int, float]) -> None:
def up(self, button: Literal["left", "right", "middle"], x: Union[int, float], y: Union[int, float], pressed: str = "") -> None:
x, y = int(x), int(y)

self._base.up(button=button, x=x, y=y)
self._base.up(button=button, x=x, y=y, **_mk_kwargs(pressed))
self.last_x, self.last_y = x, y

def move(self, x: Union[int, float], y: Union[int, float], emulate_behaviour: Optional[bool] = True, timeout: Optional[float] = None) -> None:
def move(self, x: Union[int, float], y: Union[int, float], pressed: str = "", emulate_behaviour: Optional[bool] = True, timeout: Optional[float] = None) -> None:
kwargs = _mk_kwargs(pressed)
with self._move_lock:
x, y = int(x), int(y)

Expand All @@ -154,10 +162,10 @@ def move(self, x: Union[int, float], y: Union[int, float], emulate_behaviour: Op

# Move Mouse to new random locations
for i, (human_x, human_y) in enumerate(humanized_points.points):
self._base.move(x=int(human_x), y=int(human_y))
self._base.move(x=int(human_x), y=int(human_y), **kwargs)
self._sleep_timeout(timeout=timeout)

self._base.move(x=x, y=y)
self._base.move(x=x, y=y, **kwargs)
self.last_x, self.last_y = x, y

def scroll(self, direction: Literal["up", "down", "left", "right"], amount: int) -> None:
Expand Down
12 changes: 12 additions & 0 deletions cdp_patches/input/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import Optional

from cdp_patches import is_windows


def _mk_kwargs(pressed: Optional[str]) -> dict[str, str]:
kwargs = {}
if pressed is not None:
if not is_windows:
raise NotImplementedError("specifying pressed buttons currently only supported for windows")
kwargs["pressed"] = pressed
return kwargs
21 changes: 10 additions & 11 deletions requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
# This requirements are for development and testing only, not for production.
flake8==7.0.0
pytest==8.1.1
pytest_asyncio==0.23.6
mypy==1.9.0
types-setuptools==69.2.0.20240317
black==24.3.0
flake8==7.1.0
pytest==8.2.2
pytest_asyncio==0.23.7
mypy==1.10.1
types-setuptools==69.5.*
black==24.4.2
isort==5.13.2
playwright==1.42.0
selenium==4.18.1
selenium_driverless==1.8.0.2
playwright==1.44.0
selenium==4.22.0
selenium_driverless==1.9.3.1
twisted==24.3.0
types-requests==2.31.*
webdriver_manager==4.0.1
types-requests==2.31.*
6 changes: 6 additions & 0 deletions tests/assets/input/mouse-helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,10 @@
for (let i = 0; i < 5; i++)
box.classList.toggle('button-' + i, buttons & (1 << i));
}

window.last_hover_elem = undefined
document.body.addEventListener("mousemove", ()=>{
var elem = document.querySelector('button:hover')
if (typeof elem !== 'undefined'){window.last_hover_elem = elem}
})
})();
30 changes: 27 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
import os
import signal
import subprocess
import tempfile
from typing import AsyncGenerator, Generator, List

import pytest
Expand All @@ -7,14 +11,13 @@
from playwright.sync_api import Page as SyncPage
from playwright.sync_api import sync_playwright
from selenium import webdriver as selenium_webdriver
from selenium.webdriver.chrome.service import Service as SeleniumChromeService
from selenium_driverless import webdriver as async_webdriver
from selenium_driverless.sync import webdriver as sync_webdriver
from webdriver_manager.chrome import ChromeDriverManager

from cdp_patches.input import AsyncInput, SyncInput

from .server import Server, test_server
from .utils import find_chrome_executable, random_port

flags: List[str] = [
"--incognito",
Expand Down Expand Up @@ -146,7 +149,7 @@ def selenium_driver() -> Generator[selenium_webdriver.Chrome, None, None]:
# start url at about:blank
options.add_argument("about:blank")

with selenium_webdriver.Chrome(options, service=SeleniumChromeService(ChromeDriverManager().install())) as driver:
with selenium_webdriver.Chrome(options) as driver:
driver.sync_input = SyncInput(browser=driver)
yield driver

Expand All @@ -160,3 +163,24 @@ async def async_driver() -> AsyncGenerator[async_webdriver.Chrome, None]:
async with async_webdriver.Chrome(options) as driver:
driver.async_input = await AsyncInput(browser=driver)
yield driver


@pytest.fixture
def chrome_proc() -> Generator[subprocess.Popen[bytes], None, None]:
with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tempdir:
path = find_chrome_executable()
proc = subprocess.Popen([path, f"--remote-debugging-port={random_port()}", f"--user-data-dir={tempdir}", "--no-first-run"])
try:
yield proc
finally:
if os.name == "posix":
os.killpg(os.getpgid(proc.pid), signal.SIGTERM) # type: ignore[attr-defined]
else:
proc.terminate()
try:
proc.wait(10)
except subprocess.TimeoutExpired:
if os.name == "posix":
os.killpg(os.getpgid(proc.pid), signal.SIGKILL) # type: ignore[attr-defined]
else:
proc.kill()
4 changes: 2 additions & 2 deletions tests/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
import threading
from contextlib import closing
from http import HTTPStatus
from pathlib import Path
from typing import Any, Callable, Dict, Generator, Generic, Optional, Set, Tuple, TypeVar, cast
from urllib.parse import urlparse

from playwright._impl._path_utils import get_file_dirname
from twisted.internet import reactor as _twisted_reactor
from twisted.internet.selectreactor import SelectReactor
from twisted.web import http

_dirname = get_file_dirname()
_dirname = Path(__file__).parent.absolute()
reactor = cast(SelectReactor, _twisted_reactor)


Expand Down
Loading

0 comments on commit 6856ac1

Please sign in to comment.