Skip to content

Commit

Permalink
Merge pull request #17 from 2018-11-27/master
Browse files Browse the repository at this point in the history
1.1.4
  • Loading branch information
2018-11-27 authored Aug 14, 2024
2 parents 9c7a8ec + 8498519 commit d6d9f20
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 17 deletions.
46 changes: 45 additions & 1 deletion systempath/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
────────────────────────────────────────────────────────────────────────────────
Copyright (c) 2022-2024 GQYLPY <http://gqylpy.com>. All rights reserved.
@version: 1.1.3
@version: 1.1.4
@author: 竹永康 <[email protected]>
@source: https://github.com/gqylpy/systempath
Expand Down Expand Up @@ -202,6 +202,10 @@ def dirnamel(self, level: int) -> 'Directory':
follow_symlinks=self.follow_symlinks
)

def ldirname(self, *, level: Optional[int] = None) -> PathType:
"""Cut the path from the left side, and can specify the cutting level
through the parameter `level`, with a default of 1 level."""

@property
def abspath(self) -> PathType:
return self.__class__(
Expand Down Expand Up @@ -886,6 +890,46 @@ def walk(
An optional error handler, for more instructions see `os.walk`.
"""

def search(
self,
slicing: BytesOrStr,
/, *,
level: Optional[int] = None,
omit_dir: Optional[bool] = None,
pure_path: Optional[bool] = None,
shortpath: Optional[bool] = None
) -> Iterator[Union[PathType, PathLink]]:
"""
Search for all paths containing the specified string fragment in the
current directory (and its subdirectories, according to the specified
search depth). It traverses the directory tree, checking whether each
path (which can be the path of a file or subdirectory) contains the
specified slicing string `slicing`. If a matching path is found, it
produces these paths as results.
@param slicing
The path slicing, which can be any part of the path.
@param level
Recursion depth of the directory, default is deepest. An int must be
passed in, any integer less than 1 is considered to be 1, warning
passing decimals can cause depth confusion.
@param omit_dir
Omit all subdirectories when yielding paths. The default is False.
@param pure_path
By default, if the subpath is a directory then yield a `Directory`
object, if the subpath is a file then yield a `File` object. If set
this parameter to True, directly yield the path link string (or
bytes). This parameter is not recommended for use.
@param shortpath
Yield short path link string, delete the `dirpath` from the left end
of the path, used with the parameter `pure_path`. The default is
False.
"""

def copytree(
self,
dst: Union['Directory', PathLink],
Expand Down
72 changes: 56 additions & 16 deletions systempath/i systempath.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ def lchmod(*a, **kw): raise NotImplementedError
def getpwuid(_): raise NotImplementedError
getgrgid = getpwuid

__read_bufsize__ = 1024 * 64
READ_BUFSIZE = 1024 * 64
else:
__read_bufsize__ = 1024 * 1024
READ_BUFSIZE = 1024 * 1024

from os.path import (
basename, dirname, abspath, realpath, relpath,
normpath, expanduser, expandvars,
join, split, splitext, splitdrive,
join, split, splitext, splitdrive, sep,
isabs, exists, isdir, isfile, islink, ismount,
getctime, getmtime, getatime, getsize
)
Expand Down Expand Up @@ -126,6 +126,8 @@ def __getitem__(self, *a): ...

UNIQUE: Final[Annotated[object, 'A unique object.']] = object()

sepb: Final[Annotated[bytes, 'The byte type path separator.']] = sep.encode()


class MasqueradeClass(type):
"""
Expand Down Expand Up @@ -460,6 +462,10 @@ def dirnamel(self, level: int) -> 'Directory':
follow_symlinks=self.follow_symlinks
)

def ldirname(self, *, level: int = 1) -> PathType:
sepx: BytesOrStr = sepb if self.name.__class__ is bytes else sep
return Directory(sepx.join(self.name.split(sepx)[level:]))

@property
def abspath(self) -> PathType:
return self.__class__(
Expand Down Expand Up @@ -868,6 +874,40 @@ def walk(
followlinks=not self.follow_symlinks
)

def search(
self,
slicing: BytesOrStr,
/, *,
level: int = float('inf'),
omit_dir: bool = False,
pure_path: Optional[bool] = None,
shortpath: bool = False
) -> Iterator[Union[PathType, PathLink]]:
slicing: BytesOrStr = normpath(slicing)
nullchar: BytesOrStr = b'' if self.name.__class__ is bytes else ''
dirtree = tree(
self.name, level=level, omit_dir=omit_dir,
pure_path=pure_path, shortpath=shortpath
)
for subpath in dirtree:
pure_subpath = (subpath if pure_path else subpath.name)\
.replace(self.name, nullchar)[1:]
try:
r: bool = slicing in pure_subpath
except TypeError:
if slicing.__class__ is bytes:
slicing: str = slicing.decode()
elif slicing.__class__ is str:
slicing: bytes = slicing.encode()
else:
raise ex.ParameterError(
'parameter "slicing" must be of type bytes or str,'
f'not "{slicing.__class__.__name__}".'
) from None
r: bool = slicing in pure_subpath
if r:
yield subpath

def copytree(
self,
dst: Union['Directory', PathLink],
Expand Down Expand Up @@ -1008,7 +1048,7 @@ def copycontent(
self,
other: Union['File', FileIO],
/, *,
bufsize: int = __read_bufsize__
bufsize: int = READ_BUFSIZE
) -> Union['File', FileIO]:
write, read = (
FileIO(other.name, 'wb') if isinstance(other, File) else other
Expand Down Expand Up @@ -1096,7 +1136,7 @@ def md5(self, salting: bytes = b'') -> str:
read = FileIO(self).read

while True:
content = read(__read_bufsize__)
content = read(READ_BUFSIZE)
if not content:
break
m5.update(content)
Expand Down Expand Up @@ -1200,7 +1240,7 @@ def __ior__(self, other: Union['Content', bytes], /) -> 'Content':
)
read, write = other.rb().read, self.wb().write
while True:
content = read(__read_bufsize__)
content = read(READ_BUFSIZE)
if not content:
break
write(content)
Expand All @@ -1220,7 +1260,7 @@ def __iadd__(self, other: Union['Content', bytes], /) -> 'Content':
if isinstance(other, Content):
read, write = other.rb().read, self.ab().write
while True:
content = read(__read_bufsize__)
content = read(READ_BUFSIZE)
if not content:
break
write(content)
Expand All @@ -1243,24 +1283,24 @@ def __eq__(self, other: Union['Content', bytes], /) -> bool:
return True
read1, read2 = self.rb().read, other.rb().read
while True:
content1 = read1(__read_bufsize__)
content2 = read2(__read_bufsize__)
content1 = read1(READ_BUFSIZE)
content2 = read2(READ_BUFSIZE)
if content1 == content2 == b'':
return True
if content1 != content2:
return False

elif other.__class__ is bytes:
start, end = 0, __read_bufsize__
start, end = 0, READ_BUFSIZE
read1 = self.rb().read
while True:
content1 = read1(__read_bufsize__)
content1 = read1(READ_BUFSIZE)
if content1 == other[start:end] == b'':
return True
if content1 != other[start:end]:
return False
start += __read_bufsize__
end += __read_bufsize__
start += READ_BUFSIZE
end += READ_BUFSIZE

raise TypeError(
'content type to be equality judgment operation can only be '
Expand All @@ -1278,7 +1318,7 @@ def __contains__(self, subcontent: bytes, /) -> bool:
read = self.rb().read

while True:
content = read(__read_bufsize__)
content = read(READ_BUFSIZE)
if not content:
return False
if subcontent in deviation_value + content:
Expand Down Expand Up @@ -1310,7 +1350,7 @@ def copy(
self,
other: Union['Content', FileIO],
/, *,
bufsize: int = __read_bufsize__
bufsize: int = READ_BUFSIZE
) -> None:
write = (other.ab() if isinstance(other, Content) else other).write
read = self.rb().read
Expand All @@ -1332,7 +1372,7 @@ def md5(self, salting: bytes = b'') -> str:
read = self.rb().read

while True:
content = read(__read_bufsize__)
content = read(READ_BUFSIZE)
if not content:
break
m5.update(content)
Expand Down

0 comments on commit d6d9f20

Please sign in to comment.