Skip to content

Commit

Permalink
Merge branch 'public_locks'
Browse files Browse the repository at this point in the history
  • Loading branch information
jsbueno committed Nov 16, 2024
2 parents 4a9f5e1 + 9a4fe4e commit 63df7b5
Show file tree
Hide file tree
Showing 11 changed files with 910 additions and 541 deletions.
50 changes: 26 additions & 24 deletions .github/workflows/pypi.yml
Original file line number Diff line number Diff line change
@@ -1,33 +1,35 @@
name: Publish Python 🐍 distribution 📦 to PyPI and TestPyPI

on: push
name: publish


jobs:
build:
name: Build distribution 📦
runs-on: ubuntu-latest
environment:
name: release
name: Build wheels on ${{ matrix.os }}
runs-on: ${{ matrix.os }}
strategy:
matrix:
# macos-13 is an intel runner, macos-14 is apple silicon
os: [ubuntu-latest, ] # windows-latest, macos-13, macos-14]

steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: Install pypa/build
run: >-
python3 -m
pip install
build
--user
- name: Build a binary wheel and a source tarball
run: python3 -m build
- name: Store the distribution packages
uses: actions/upload-artifact@v4
with:
name: python-package-distributions
path: dist/
- uses: actions/checkout@v4

- name: Build wheels
uses: pypa/[email protected]
# env:
# CIBW_SOME_OPTION: value
# ...
# with:
# package-dir: .
# output-dir: wheelhouse
# config-file: "{package}/pyproject.toml"

- uses: actions/upload-artifact@v4
with:
name: cibw-wheels-${{ matrix.os }}-${{ strategy.job-index }}
path: ./wheelhouse/*.whl

publish-to-pypi:
name: >-
Expand All @@ -37,7 +39,7 @@ jobs:
- build
runs-on: ubuntu-latest
environment:
name: release
name: pypi
url: https://pypi.org/p/extrainterpreters
permissions:
id-token: write # IMPORTANT: mandatory for trusted publishing
Expand All @@ -47,7 +49,7 @@ jobs:
uses: actions/[email protected]
with:
name: python-package-distributions
path: dist/
path: wheelhouse/
- name: Publish distribution 📦 to PyPI
uses: pypa/gh-action-pypi-publish@release/v1

Expand Down
3 changes: 2 additions & 1 deletion src/extrainterpreters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

# Early declarations to avoid circular imports:

__version__ = "0.2-beta3"
__version__ = "0.2.0"


BFSZ = 10_000_000
Expand Down Expand Up @@ -69,6 +69,7 @@ def raw_list_all():
from .base_interpreter import BaseInterpreter
from .queue import SingleQueue, Queue
from .simple_interpreter import SimpleInterpreter as Interpreter
from .lock import Lock, RLock


def list_all():
Expand Down
9 changes: 7 additions & 2 deletions src/extrainterpreters/base_interpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,17 @@ def execute(self, func, args=(), kwargs=None):
)
)

def run_string(self, code):
def run_string(self, code, raise_=False):
"""Execs a string of code in associated interpreter
Mostly to mirror interpreters.run_string as a convenient method.
"""
return interpreters.run_string(self.intno, code)
result = interpreters.run_string(self.intno, code)
if result and raise_:
# In Python 3.13+ indicates an exception occured.
# (in Python 3.12, an exception is raised immediatelly)
raise RuntimeError(result)
return result

# currently not working. will raise when the interpreter is destroyed:
# def is_running(self):
Expand Down
227 changes: 227 additions & 0 deletions src/extrainterpreters/lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
from threading import TIMEOUT_MAX
import time
import sys


from . import running_interpreters

from .remote_array import RemoteArray
from .utils import (
_atomic_byte_lock,
_remote_memory,
_address_and_size,
guard_internal_use,
Field,
StructBase,
ResourceBusyError,
)

class _LockBuffer(StructBase):
lock = Field(1)

TIME_RESOLUTION = sys.getswitchinterval()
DEFAULT_TIMEOUT = 50 * TIME_RESOLUTION
DEFAULT_TTL = 3600
LOCK_BUFFER_SIZE = _LockBuffer._size


class _CrossInterpreterStructLock:
"""Foundations for cross-interpreter lock.
Used internally, coupled wiht larger memory structs
from which it will consume a single byte - it will
ideally lock those structs.
A "struct" should be a StructBase class containing a single-byte
"lock" field, with a proper buffer.
(Keep in mind that when the struct is passed to other interpreters,
if dealocated in the interpreter of origin, the "byte" buffer used
will point to unalocated memory, with certain disaster ahead)
It is also used as base for the public Lock classes bellow:
those can be used in user-code.
"""

def __init__(self, struct, timeout=DEFAULT_TIMEOUT):
if isinstance(struct._data, RemoteArray):
buffer_ptr, size = struct._data._data_for_remote()
else: # bytes, bytearray
buffer_ptr, size = _address_and_size(struct._data) # , struct._offset)
# struct_ptr = buffer_ptr + struct._offset
lock_offset = struct._offset + struct._get_offset_for_field("lock")
if lock_offset >= size:
raise ValueError("Lock address out of bounds for struct buffer")
self._lock_address = buffer_ptr + lock_offset
self._original_timeout = self._timeout = timeout
self._entered = 0

def timeout(self, timeout: None | float):
"""One use only timeout, for the same lock
with lock.timeout(0.5):
...
"""
self._timeout = timeout
return self
def __enter__(self):
try:
return self.acquire(self._timeout)
finally:
self._timeout = self._original_timeout

def acquire(self, timeout):
# Remember: all attributes are "interpreter-local"
# just the bytes in the passed in struct are shared.
if self._entered:
self._entered += 1
return self
if timeout is None or timeout == 0:
if not _atomic_byte_lock(self._lock_address):
raise ResourceBusyError("Couldn't acquire lock")
else:
threshold = time.monotonic() + timeout
while time.monotonic() <= threshold:
if _atomic_byte_lock(self._lock_address):
break
time.sleep(TIME_RESOLUTION * 4)
else:
raise TimeoutError("Timeout trying to acquire lock")
self._entered += 1
return self

def __exit__(self, *args):
self.release()

def release(self):
if not self._entered:
return
self._entered -= 1
if self._entered:
return
buffer = _remote_memory(self._lock_address, 1)
buffer[0] = 0
del buffer
self._entered = 0
self._timeout = self._original_timeout

def __getstate__(self):
state = self.__dict__.copy()
state["_entered"] = 0
return state


class IntRLock:
"""Cross Interpreter re-entrant lock
This will allow re-entrant acquires in the same
interpreter, _even_ if it is being acquired
in another thread in the same interpreter.
It should not be very useful - but
the implementation path code leads here. Prefer the public
"RLock" and "Lock" classes to avoid surprises.
"""

def __init__(self):

# RemoteArray is a somewhat high-level data structure,
# which includes another byte for a lock - just
# to take account of the buffer life-cycle
# across interpreters.

# unfortunatelly, I got no simpler mechanism than that
# to resolve the problem of the Lock object, along
# with the buffer being deleted in its owner interpreter
# while alive in a scondary one.
# (Remotearrays will go to a parking area, waiting until they
# are dereferenced remotely before freeing the memory)

self._buffer = RemoteArray(size=1)
self._buffer._enter_parent()

lock_str = _LockBuffer._from_data(self._buffer)
self._lock = _CrossInterpreterStructLock(lock_str)

def acquire(self, blocking=True, timeout=-1):
if blocking:
timeout = TIMEOUT_MAX if timeout == -1 or not blocking else timeout
self._lock.acquire(timeout)
return True
else:
try:
self._lock.acquire(None)
except ResourceBusyError:
return False
return True

def release(self):
self._lock.__exit__()

def __enter__(self):
self.acquire()
#self._lock.__enter__()
return self

def __exit__(self, *args):
self.release()
#self._lock.__exit__()

def locked(self):
if self._lock._entered:
return True
try:
self._lock.acquire(0)
except ResourceBusyError:
return True
self._lock.release()
return False

def __getstate__(self):
return {"_lock": self._lock, "_buffer": self._buffer}



class RLock(IntRLock):
"""Cross interpreter re-entrant lock, analogous to
threading.RLock
https://docs.python.org/3/library/threading.html#rlock-objects
More specifically: it will allow re-entrancy in
_the same thread_ and _same interpreter_ -
a different thread in the same interpreter will still
be blocked out.
"""


class Lock(IntRLock):
_delay = 4 * TIME_RESOLUTION

def acquire(self, blocking=True, timeout=-1):
locked = self.locked()
if self.locked() and not blocking:
return False
timeout = TIMEOUT_MAX if timeout == -1 else timeout
start = time.monotonic()
retry = False
while time.monotonic() - start < timeout:
if self.locked():
retry = True
else:
try:
self._lock.acquire(0)
except ResourceBusyError:
retry = True

if self._lock._entered > 1:
retry = True
self._lock.release()

if not retry:
return True

time.sleep(self._delay)
retry = False

raise ResourceBusyError("Could not acquire lock")
Loading

0 comments on commit 63df7b5

Please sign in to comment.