Skip to content

Commit

Permalink
CoW: Use weakref callbacks to track dead references
Browse files Browse the repository at this point in the history
Co-authored-by: José Lucas Silva Mayer <[email protected]>
  • Loading branch information
wangwillian0 and josemayer committed Dec 5, 2023
1 parent 593fa85 commit 98addad
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 33 deletions.
49 changes: 25 additions & 24 deletions pandas/_libs/internals.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -886,29 +886,33 @@ cdef class BlockValuesRefs:
"""
cdef:
public list referenced_blocks
public int clear_counter
public int dead_counter
object __weakref__
object _weakref_cb

def __cinit__(self, blk: Block | None = None) -> None:
def _weakref_cb(
item: weakref.ref,
selfref: weakref.ref = weakref.ref(self)
) -> None:
self = selfref()
if self is not None:
self.dead_counter += 1
if self.dead_counter > 256:
if self.dead_counter > len(self.referenced_blocks) // 2:
self._clear_dead_references()
self._weakref_cb = _weakref_cb
if blk is not None:
self.referenced_blocks = [weakref.ref(blk)]
self.referenced_blocks = [weakref.ref(blk, self._weakref_cb)]
else:
self.referenced_blocks = []
self.clear_counter = 500 # set reasonably high

def _clear_dead_references(self, force=False) -> None:
# Use exponential backoff to decide when we want to clear references
# if force=False. Clearing for every insertion causes slowdowns if
# all these objects stay alive, e.g. df.items() for wide DataFrames
# see GH#55245 and GH#55008
if force or len(self.referenced_blocks) > self.clear_counter:
self.referenced_blocks = [
ref for ref in self.referenced_blocks if ref() is not None
]
nr_of_refs = len(self.referenced_blocks)
if nr_of_refs < self.clear_counter // 2:
self.clear_counter = max(self.clear_counter // 2, 500)
elif nr_of_refs > self.clear_counter:
self.clear_counter = max(self.clear_counter * 2, nr_of_refs)

def _clear_dead_references(self) -> None:
old_len = len(self.referenced_blocks)
self.referenced_blocks = [
ref for ref in self.referenced_blocks if ref() is not None
]
self.dead_counter = self.dead_counter - (old_len - len(self.referenced_blocks))

def add_reference(self, blk: Block) -> None:
"""Adds a new reference to our reference collection.
Expand All @@ -918,8 +922,7 @@ cdef class BlockValuesRefs:
blk : Block
The block that the new references should point to.
"""
self._clear_dead_references()
self.referenced_blocks.append(weakref.ref(blk))
self.referenced_blocks.append(weakref.ref(blk, self._weakref_cb))

def add_index_reference(self, index: object) -> None:
"""Adds a new reference to our reference collection when creating an index.
Expand All @@ -929,8 +932,7 @@ cdef class BlockValuesRefs:
index : Index
The index that the new reference should point to.
"""
self._clear_dead_references()
self.referenced_blocks.append(weakref.ref(index))
self.referenced_blocks.append(weakref.ref(index, self._weakref_cb))

def has_reference(self) -> bool:
"""Checks if block has foreign references.
Expand All @@ -942,6 +944,5 @@ cdef class BlockValuesRefs:
-------
bool
"""
self._clear_dead_references(force=True)
# Checking for more references than block pointing to itself
return len(self.referenced_blocks) > 1
return len(self.referenced_blocks) - self.dead_counter > 1
31 changes: 22 additions & 9 deletions pandas/tests/copy_view/test_internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,31 +121,44 @@ def test_iset_splits_blocks_inplace(using_copy_on_write, locs, arr, dtype):
assert not np.shares_memory(get_array(df, col), get_array(df2, col))


def test_exponential_backoff():
# GH#55518
def test_clear_dead_references():
# GH#55539
df = DataFrame({"a": [1, 2, 3]})
for i in range(490):
df.copy(deep=False)

assert len(df._mgr.blocks[0].refs.referenced_blocks) == 491
assert (
len(df._mgr.blocks[0].refs.referenced_blocks)
- df._mgr.blocks[0].refs.dead_counter
== 1
)

df = DataFrame({"a": [1, 2, 3]})
dfs = [df.copy(deep=False) for i in range(510)]

for i in range(20):
df.copy(deep=False)
assert len(df._mgr.blocks[0].refs.referenced_blocks) == 531
assert df._mgr.blocks[0].refs.clear_counter == 1000
assert (
len(df._mgr.blocks[0].refs.referenced_blocks)
- df._mgr.blocks[0].refs.dead_counter
== 511
)

for i in range(500):
df.copy(deep=False)

# Don't reduce since we still have over 500 objects alive
assert df._mgr.blocks[0].refs.clear_counter == 1000
assert (
len(df._mgr.blocks[0].refs.referenced_blocks)
- df._mgr.blocks[0].refs.dead_counter
== 511
)

dfs = dfs[:300]
for i in range(500):
df.copy(deep=False)

# Reduce since there are less than 500 objects alive
assert df._mgr.blocks[0].refs.clear_counter == 500
assert (
len(df._mgr.blocks[0].refs.referenced_blocks)
- df._mgr.blocks[0].refs.dead_counter
== 301
)

0 comments on commit 98addad

Please sign in to comment.