From 89966a694b54f81510f06a35b1406d56a2f2c8c5 Mon Sep 17 00:00:00 2001 From: Barney Gale Date: Sat, 30 Sep 2023 15:45:01 +0100 Subject: [PATCH] GH-89812: Add `pathlib._PathBase` (#106337) Add private `pathlib._PathBase` class. This will be used by an experimental PyPI package to incubate a `tarfile.TarPath` class. Co-authored-by: Adam Turner <9087854+AA-Turner@users.noreply.github.com> --- Lib/pathlib.py | 446 ++++++++++++++---- Lib/test/test_pathlib.py | 400 +++++++++++++--- ...3-07-03-20-23-56.gh-issue-89812.cFkDOE.rst | 2 + 3 files changed, 687 insertions(+), 161 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2023-07-03-20-23-56.gh-issue-89812.cFkDOE.rst diff --git a/Lib/pathlib.py b/Lib/pathlib.py index bd5f61b0b7c878..e6be9061013a8a 100644 --- a/Lib/pathlib.py +++ b/Lib/pathlib.py @@ -5,6 +5,7 @@ operating systems. """ +import contextlib import fnmatch import functools import io @@ -15,10 +16,19 @@ import sys import warnings from _collections_abc import Sequence -from errno import ENOENT, ENOTDIR, EBADF, ELOOP +from errno import ENOENT, ENOTDIR, EBADF, ELOOP, EINVAL from stat import S_ISDIR, S_ISLNK, S_ISREG, S_ISSOCK, S_ISBLK, S_ISCHR, S_ISFIFO from urllib.parse import quote_from_bytes as urlquote_from_bytes +try: + import pwd +except ImportError: + pwd = None +try: + import grp +except ImportError: + grp = None + __all__ = [ "UnsupportedOperation", @@ -30,6 +40,9 @@ # Internals # +# Maximum number of symlinks to follow in _PathBase.resolve() +_MAX_SYMLINKS = 40 + # Reference for Windows paths can be found at # https://learn.microsoft.com/en-gb/windows/win32/fileio/naming-a-file . _WIN_RESERVED_NAMES = frozenset( @@ -292,6 +305,11 @@ class PurePath: # The `_hash` slot stores the hash of the case-normalized string # path. It's set when `__hash__()` is called for the first time. '_hash', + + # The '_resolving' slot stores a boolean indicating whether the path + # is being processed by `_PathBase.resolve()`. This prevents duplicate + # work from occurring when `resolve()` calls `stat()` or `readlink()`. + '_resolving', ) pathmod = os.path @@ -331,6 +349,7 @@ def __init__(self, *args): f"not {type(path).__name__!r}") paths.append(path) self._raw_paths = paths + self._resolving = False def with_segments(self, *pathsegments): """Construct a new path object from any number of path-like objects. @@ -416,7 +435,7 @@ def __repr__(self): return "{}({!r})".format(self.__class__.__name__, self.as_posix()) def as_uri(self): - """Return the path as a 'file' URI.""" + """Return the path as a URI.""" if not self.is_absolute(): raise ValueError("relative path can't be expressed as a file URI") @@ -691,7 +710,9 @@ def parent(self): tail = self._tail if not tail: return self - return self._from_parsed_parts(drv, root, tail[:-1]) + path = self._from_parsed_parts(drv, root, tail[:-1]) + path._resolving = self._resolving + return path @property def parents(self): @@ -776,23 +797,35 @@ class PureWindowsPath(PurePath): # Filesystem-accessing classes -class Path(PurePath): - """PurePath subclass that can make system calls. +class _PathBase(PurePath): + """Base class for concrete path objects. - Path represents a filesystem path but unlike PurePath, also offers - methods to do system calls on path objects. Depending on your system, - instantiating a Path will return either a PosixPath or a WindowsPath - object. You can also instantiate a PosixPath or WindowsPath directly, - but cannot instantiate a WindowsPath on a POSIX system or vice versa. + This class provides dummy implementations for many methods that derived + classes can override selectively; the default implementations raise + UnsupportedOperation. The most basic methods, such as stat() and open(), + directly raise UnsupportedOperation; these basic methods are called by + other methods such as is_dir() and read_text(). + + The Path class derives this class to implement local filesystem paths. + Users may derive their own classes to implement virtual filesystem paths, + such as paths in archive files or on remote storage systems. """ __slots__ = () + __bytes__ = None + __fspath__ = None # virtual paths have no local file system representation + + def _unsupported(self, method_name): + msg = f"{type(self).__name__}.{method_name}() is unsupported" + if isinstance(self, Path): + msg += " on this system" + raise UnsupportedOperation(msg) def stat(self, *, follow_symlinks=True): """ Return the result of the stat() system call on this path, like os.stat() does. """ - return os.stat(self, follow_symlinks=follow_symlinks) + self._unsupported("stat") def lstat(self): """ @@ -859,7 +892,21 @@ def is_mount(self): """ Check if this path is a mount point """ - return os.path.ismount(self) + # Need to exist and be a dir + if not self.exists() or not self.is_dir(): + return False + + try: + parent_dev = self.parent.stat().st_dev + except OSError: + return False + + dev = self.stat().st_dev + if dev != parent_dev: + return True + ino = self.stat().st_ino + parent_ino = self.parent.stat().st_ino + return ino == parent_ino def is_symlink(self): """ @@ -880,7 +927,10 @@ def is_junction(self): """ Whether this path is a junction. """ - return os.path.isjunction(self) + # Junctions are a Windows-only feature, not present in POSIX nor the + # majority of virtual filesystems. There is no cross-platform idiom + # to check for junctions (using stat().st_mode). + return False def is_block_device(self): """ @@ -964,9 +1014,7 @@ def open(self, mode='r', buffering=-1, encoding=None, Open the file pointed by this path and return a file object, as the built-in open() function does. """ - if "b" not in mode: - encoding = io.text_encoding(encoding) - return io.open(self, mode, buffering, encoding, errors, newline) + self._unsupported("open") def read_bytes(self): """ @@ -1009,13 +1057,12 @@ def iterdir(self): The children are yielded in arbitrary order, and the special entries '.' and '..' are not included. """ - return (self._make_child_relpath(name) for name in os.listdir(self)) + self._unsupported("iterdir") def _scandir(self): - # bpo-24132: a future version of pathlib will support subclassing of - # pathlib.Path to customize how the filesystem is accessed. This - # includes scandir(), which is used to implement glob(). - return os.scandir(self) + # Emulate os.scandir(), which returns an object that can be used as a + # context manager. This method is called by walk() and glob(). + return contextlib.nullcontext(self.iterdir()) def _make_child_relpath(self, name): sep = self.pathmod.sep @@ -1144,13 +1191,13 @@ def walk(self, top_down=True, on_error=None, follow_symlinks=False): # blow up for a minor reason when (say) a thousand readable # directories are still left to visit. That logic is copied here. try: - scandir_it = path._scandir() + scandir_obj = path._scandir() except OSError as error: if on_error is not None: on_error(error) continue - with scandir_it: + with scandir_obj as scandir_it: dirnames = [] filenames = [] for entry in scandir_it: @@ -1172,17 +1219,13 @@ def walk(self, top_down=True, on_error=None, follow_symlinks=False): paths += [path._make_child_relpath(d) for d in reversed(dirnames)] - def __init__(self, *args, **kwargs): - if kwargs: - msg = ("support for supplying keyword arguments to pathlib.PurePath " - "is deprecated and scheduled for removal in Python {remove}") - warnings._deprecated("pathlib.PurePath(**kwargs)", msg, remove=(3, 14)) - super().__init__(*args) + def absolute(self): + """Return an absolute version of this path + No normalization or symlink resolution is performed. - def __new__(cls, *args, **kwargs): - if cls is Path: - cls = WindowsPath if os.name == 'nt' else PosixPath - return object.__new__(cls) + Use resolve() to resolve symlinks and remove '..' segments. + """ + self._unsupported("absolute") @classmethod def cwd(cls): @@ -1193,18 +1236,264 @@ def cwd(cls): # os.path.abspath('.') == os.getcwd(). return cls().absolute() + def expanduser(self): + """ Return a new path with expanded ~ and ~user constructs + (as returned by os.path.expanduser) + """ + self._unsupported("expanduser") + @classmethod def home(cls): - """Return a new path pointing to the user's home directory (as - returned by os.path.expanduser('~')). + """Return a new path pointing to expanduser('~'). """ return cls("~").expanduser() + def readlink(self): + """ + Return the path to which the symbolic link points. + """ + self._unsupported("readlink") + readlink._supported = False + + def _split_stack(self): + """ + Split the path into a 2-tuple (anchor, parts), where *anchor* is the + uppermost parent of the path (equivalent to path.parents[-1]), and + *parts* is a reversed list of parts following the anchor. + """ + return self._from_parsed_parts(self.drive, self.root, []), self._tail[::-1] + + def resolve(self, strict=False): + """ + Make the path absolute, resolving all symlinks on the way and also + normalizing it. + """ + if self._resolving: + return self + try: + path = self.absolute() + except UnsupportedOperation: + path = self + + # If the user has *not* overridden the `readlink()` method, then symlinks are unsupported + # and (in non-strict mode) we can improve performance by not calling `stat()`. + querying = strict or getattr(self.readlink, '_supported', True) + link_count = 0 + stat_cache = {} + target_cache = {} + path, parts = path._split_stack() + while parts: + part = parts.pop() + if part == '..': + if not path._tail: + if path.root: + # Delete '..' segment immediately following root + continue + elif path._tail[-1] != '..': + # Delete '..' segment and its predecessor + path = path.parent + continue + # Join the current part onto the path. + path_parent = path + path = path._make_child_relpath(part) + if querying and part != '..': + path._resolving = True + try: + st = stat_cache.get(path) + if st is None: + st = stat_cache[path] = path.stat(follow_symlinks=False) + if S_ISLNK(st.st_mode): + # Like Linux and macOS, raise OSError(errno.ELOOP) if too many symlinks are + # encountered during resolution. + link_count += 1 + if link_count >= _MAX_SYMLINKS: + raise OSError(ELOOP, "Too many symbolic links in path", str(path)) + target = target_cache.get(path) + if target is None: + target = target_cache[path] = path.readlink() + target, target_parts = target._split_stack() + # If the symlink target is absolute (like '/etc/hosts'), set the current + # path to its uppermost parent (like '/'). If not, the symlink target is + # relative to the symlink parent, which we recorded earlier. + path = target if target.root else path_parent + # Add the symlink target's reversed tail parts (like ['hosts', 'etc']) to + # the stack of unresolved path parts. + parts.extend(target_parts) + elif parts and not S_ISDIR(st.st_mode): + raise NotADirectoryError(ENOTDIR, "Not a directory", str(path)) + except OSError: + if strict: + raise + else: + querying = False + path._resolving = False + return path + + def symlink_to(self, target, target_is_directory=False): + """ + Make this path a symlink pointing to the target path. + Note the order of arguments (link, target) is the reverse of os.symlink. + """ + self._unsupported("symlink_to") + + def hardlink_to(self, target): + """ + Make this path a hard link pointing to the same file as *target*. + + Note the order of arguments (self, target) is the reverse of os.link's. + """ + self._unsupported("hardlink_to") + + def touch(self, mode=0o666, exist_ok=True): + """ + Create this file with the given access mode, if it doesn't exist. + """ + self._unsupported("touch") + + def mkdir(self, mode=0o777, parents=False, exist_ok=False): + """ + Create a new directory at this given path. + """ + self._unsupported("mkdir") + + def rename(self, target): + """ + Rename this path to the target path. + + The target path may be absolute or relative. Relative paths are + interpreted relative to the current working directory, *not* the + directory of the Path object. + + Returns the new Path instance pointing to the target path. + """ + self._unsupported("rename") + + def replace(self, target): + """ + Rename this path to the target path, overwriting if that path exists. + + The target path may be absolute or relative. Relative paths are + interpreted relative to the current working directory, *not* the + directory of the Path object. + + Returns the new Path instance pointing to the target path. + """ + self._unsupported("replace") + + def chmod(self, mode, *, follow_symlinks=True): + """ + Change the permissions of the path, like os.chmod(). + """ + self._unsupported("chmod") + + def lchmod(self, mode): + """ + Like chmod(), except if the path points to a symlink, the symlink's + permissions are changed, rather than its target's. + """ + self.chmod(mode, follow_symlinks=False) + + def unlink(self, missing_ok=False): + """ + Remove this file or link. + If the path is a directory, use rmdir() instead. + """ + self._unsupported("unlink") + + def rmdir(self): + """ + Remove this directory. The directory must be empty. + """ + self._unsupported("rmdir") + + def owner(self): + """ + Return the login name of the file owner. + """ + self._unsupported("owner") + + def group(self): + """ + Return the group name of the file gid. + """ + self._unsupported("group") + + def as_uri(self): + """Return the path as a URI.""" + self._unsupported("as_uri") + + +class Path(_PathBase): + """PurePath subclass that can make system calls. + + Path represents a filesystem path but unlike PurePath, also offers + methods to do system calls on path objects. Depending on your system, + instantiating a Path will return either a PosixPath or a WindowsPath + object. You can also instantiate a PosixPath or WindowsPath directly, + but cannot instantiate a WindowsPath on a POSIX system or vice versa. + """ + __slots__ = () + __bytes__ = PurePath.__bytes__ + __fspath__ = PurePath.__fspath__ + as_uri = PurePath.as_uri + + def __init__(self, *args, **kwargs): + if kwargs: + msg = ("support for supplying keyword arguments to pathlib.PurePath " + "is deprecated and scheduled for removal in Python {remove}") + warnings._deprecated("pathlib.PurePath(**kwargs)", msg, remove=(3, 14)) + super().__init__(*args) + + def __new__(cls, *args, **kwargs): + if cls is Path: + cls = WindowsPath if os.name == 'nt' else PosixPath + return object.__new__(cls) + + def stat(self, *, follow_symlinks=True): + """ + Return the result of the stat() system call on this path, like + os.stat() does. + """ + return os.stat(self, follow_symlinks=follow_symlinks) + + def is_mount(self): + """ + Check if this path is a mount point + """ + return os.path.ismount(self) + + def is_junction(self): + """ + Whether this path is a junction. + """ + return os.path.isjunction(self) + + def open(self, mode='r', buffering=-1, encoding=None, + errors=None, newline=None): + """ + Open the file pointed by this path and return a file object, as + the built-in open() function does. + """ + if "b" not in mode: + encoding = io.text_encoding(encoding) + return io.open(self, mode, buffering, encoding, errors, newline) + + def iterdir(self): + """Yield path objects of the directory contents. + + The children are yielded in arbitrary order, and the + special entries '.' and '..' are not included. + """ + return (self._make_child_relpath(name) for name in os.listdir(self)) + + def _scandir(self): + return os.scandir(self) + def absolute(self): - """Return an absolute version of this path by prepending the current - working directory. No normalization or symlink resolution is performed. + """Return an absolute version of this path + No normalization or symlink resolution is performed. - Use resolve() to get the canonical path to a file. + Use resolve() to resolve symlinks and remove '..' segments. """ if self.is_absolute(): return self @@ -1232,34 +1521,26 @@ def resolve(self, strict=False): return self.with_segments(os.path.realpath(self, strict=strict)) - def owner(self): - """ - Return the login name of the file owner. - """ - try: - import pwd + if pwd: + def owner(self): + """ + Return the login name of the file owner. + """ return pwd.getpwuid(self.stat().st_uid).pw_name - except ImportError: - raise UnsupportedOperation("Path.owner() is unsupported on this system") - - def group(self): - """ - Return the group name of the file gid. - """ - try: - import grp + if grp: + def group(self): + """ + Return the group name of the file gid. + """ return grp.getgrgid(self.stat().st_gid).gr_name - except ImportError: - raise UnsupportedOperation("Path.group() is unsupported on this system") - def readlink(self): - """ - Return the path to which the symbolic link points. - """ - if not hasattr(os, "readlink"): - raise UnsupportedOperation("os.readlink() not available on this system") - return self.with_segments(os.readlink(self)) + if hasattr(os, "readlink"): + def readlink(self): + """ + Return the path to which the symbolic link points. + """ + return self.with_segments(os.readlink(self)) def touch(self, mode=0o666, exist_ok=True): """ @@ -1306,13 +1587,6 @@ def chmod(self, mode, *, follow_symlinks=True): """ os.chmod(self, mode, follow_symlinks=follow_symlinks) - def lchmod(self, mode): - """ - Like chmod(), except if the path points to a symlink, the symlink's - permissions are changed, rather than its target's. - """ - self.chmod(mode, follow_symlinks=False) - def unlink(self, missing_ok=False): """ Remove this file or link. @@ -1356,24 +1630,22 @@ def replace(self, target): os.replace(self, target) return self.with_segments(target) - def symlink_to(self, target, target_is_directory=False): - """ - Make this path a symlink pointing to the target path. - Note the order of arguments (link, target) is the reverse of os.symlink. - """ - if not hasattr(os, "symlink"): - raise UnsupportedOperation("os.symlink() not available on this system") - os.symlink(target, self, target_is_directory) - - def hardlink_to(self, target): - """ - Make this path a hard link pointing to the same file as *target*. - - Note the order of arguments (self, target) is the reverse of os.link's. - """ - if not hasattr(os, "link"): - raise UnsupportedOperation("os.link() not available on this system") - os.link(target, self) + if hasattr(os, "symlink"): + def symlink_to(self, target, target_is_directory=False): + """ + Make this path a symlink pointing to the target path. + Note the order of arguments (link, target) is the reverse of os.symlink. + """ + os.symlink(target, self, target_is_directory) + + if hasattr(os, "link"): + def hardlink_to(self, target): + """ + Make this path a hard link pointing to the same file as *target*. + + Note the order of arguments (self, target) is the reverse of os.link's. + """ + os.link(target, self) def expanduser(self): """ Return a new path with expanded ~ and ~user constructs diff --git a/Lib/test/test_pathlib.py b/Lib/test/test_pathlib.py index 484a5e6c3bd64d..319148e9065a65 100644 --- a/Lib/test/test_pathlib.py +++ b/Lib/test/test_pathlib.py @@ -1582,14 +1582,172 @@ def test_group(self): # -# Tests for the concrete classes. +# Tests for the virtual classes. # -class PathTest(unittest.TestCase): - """Tests for the FS-accessing functionalities of the Path classes.""" +class PathBaseTest(PurePathTest): + cls = pathlib._PathBase - cls = pathlib.Path - can_symlink = os_helper.can_symlink() + def test_unsupported_operation(self): + P = self.cls + p = self.cls() + e = pathlib.UnsupportedOperation + self.assertRaises(e, p.stat) + self.assertRaises(e, p.lstat) + self.assertRaises(e, p.exists) + self.assertRaises(e, p.samefile, 'foo') + self.assertRaises(e, p.is_dir) + self.assertRaises(e, p.is_file) + self.assertRaises(e, p.is_mount) + self.assertRaises(e, p.is_symlink) + self.assertRaises(e, p.is_block_device) + self.assertRaises(e, p.is_char_device) + self.assertRaises(e, p.is_fifo) + self.assertRaises(e, p.is_socket) + self.assertRaises(e, p.open) + self.assertRaises(e, p.read_bytes) + self.assertRaises(e, p.read_text) + self.assertRaises(e, p.write_bytes, b'foo') + self.assertRaises(e, p.write_text, 'foo') + self.assertRaises(e, p.iterdir) + self.assertRaises(e, p.glob, '*') + self.assertRaises(e, p.rglob, '*') + self.assertRaises(e, lambda: list(p.walk())) + self.assertRaises(e, p.absolute) + self.assertRaises(e, P.cwd) + self.assertRaises(e, p.expanduser) + self.assertRaises(e, p.home) + self.assertRaises(e, p.readlink) + self.assertRaises(e, p.symlink_to, 'foo') + self.assertRaises(e, p.hardlink_to, 'foo') + self.assertRaises(e, p.mkdir) + self.assertRaises(e, p.touch) + self.assertRaises(e, p.rename, 'foo') + self.assertRaises(e, p.replace, 'foo') + self.assertRaises(e, p.chmod, 0o755) + self.assertRaises(e, p.lchmod, 0o755) + self.assertRaises(e, p.unlink) + self.assertRaises(e, p.rmdir) + self.assertRaises(e, p.owner) + self.assertRaises(e, p.group) + self.assertRaises(e, p.as_uri) + + def test_as_uri_common(self): + e = pathlib.UnsupportedOperation + self.assertRaises(e, self.cls().as_uri) + + def test_fspath_common(self): + self.assertRaises(TypeError, os.fspath, self.cls()) + + def test_as_bytes_common(self): + self.assertRaises(TypeError, bytes, self.cls()) + + def test_matches_path_api(self): + our_names = {name for name in dir(self.cls) if name[0] != '_'} + path_names = {name for name in dir(pathlib.Path) if name[0] != '_'} + self.assertEqual(our_names, path_names) + for attr_name in our_names: + our_attr = getattr(self.cls, attr_name) + path_attr = getattr(pathlib.Path, attr_name) + self.assertEqual(our_attr.__doc__, path_attr.__doc__) + + +class DummyPathIO(io.BytesIO): + """ + Used by DummyPath to implement `open('w')` + """ + + def __init__(self, files, path): + super().__init__() + self.files = files + self.path = path + + def close(self): + self.files[self.path] = self.getvalue() + super().close() + + +class DummyPath(pathlib._PathBase): + """ + Simple implementation of PathBase that keeps files and directories in + memory. + """ + _files = {} + _directories = {} + _symlinks = {} + + def stat(self, *, follow_symlinks=True): + if follow_symlinks: + path = str(self.resolve()) + else: + path = str(self.parent.resolve() / self.name) + if path in self._files: + st_mode = stat.S_IFREG + elif path in self._directories: + st_mode = stat.S_IFDIR + elif path in self._symlinks: + st_mode = stat.S_IFLNK + else: + raise FileNotFoundError(errno.ENOENT, "Not found", str(self)) + return os.stat_result((st_mode, hash(str(self)), 0, 0, 0, 0, 0, 0, 0, 0)) + + def open(self, mode='r', buffering=-1, encoding=None, + errors=None, newline=None): + if buffering != -1: + raise NotImplementedError + path_obj = self.resolve() + path = str(path_obj) + name = path_obj.name + parent = str(path_obj.parent) + if path in self._directories: + raise IsADirectoryError(errno.EISDIR, "Is a directory", path) + + text = 'b' not in mode + mode = ''.join(c for c in mode if c not in 'btU') + if mode == 'r': + if path not in self._files: + raise FileNotFoundError(errno.ENOENT, "File not found", path) + stream = io.BytesIO(self._files[path]) + elif mode == 'w': + if parent not in self._directories: + raise FileNotFoundError(errno.ENOENT, "File not found", parent) + stream = DummyPathIO(self._files, path) + self._files[path] = b'' + self._directories[parent].add(name) + else: + raise NotImplementedError + if text: + stream = io.TextIOWrapper(stream, encoding=encoding, errors=errors, newline=newline) + return stream + + def iterdir(self): + path = str(self.resolve()) + if path in self._files: + raise NotADirectoryError(errno.ENOTDIR, "Not a directory", path) + elif path in self._directories: + return (self / name for name in self._directories[path]) + else: + raise FileNotFoundError(errno.ENOENT, "File not found", path) + + def mkdir(self, mode=0o777, parents=False, exist_ok=False): + try: + self._directories[str(self.parent)].add(self.name) + self._directories[str(self)] = set() + except KeyError: + if not parents or self.parent == self: + raise FileNotFoundError(errno.ENOENT, "File not found", str(self.parent)) from None + self.parent.mkdir(parents=True, exist_ok=True) + self.mkdir(mode, parents=False, exist_ok=exist_ok) + except FileExistsError: + if not exist_ok: + raise + + +class DummyPathTest(unittest.TestCase): + """Tests for PathBase methods that use stat(), open() and iterdir().""" + + cls = DummyPath + can_symlink = False # (BASE) # | @@ -1612,37 +1770,38 @@ class PathTest(unittest.TestCase): # def setUp(self): - def cleanup(): - os.chmod(join('dirE'), 0o777) - os_helper.rmtree(BASE) - self.addCleanup(cleanup) - os.mkdir(BASE) - os.mkdir(join('dirA')) - os.mkdir(join('dirB')) - os.mkdir(join('dirC')) - os.mkdir(join('dirC', 'dirD')) - os.mkdir(join('dirE')) - with open(join('fileA'), 'wb') as f: - f.write(b"this is file A\n") - with open(join('dirB', 'fileB'), 'wb') as f: - f.write(b"this is file B\n") - with open(join('dirC', 'fileC'), 'wb') as f: - f.write(b"this is file C\n") - with open(join('dirC', 'novel.txt'), 'wb') as f: - f.write(b"this is a novel\n") - with open(join('dirC', 'dirD', 'fileD'), 'wb') as f: - f.write(b"this is file D\n") - os.chmod(join('dirE'), 0) - if self.can_symlink: - # Relative symlinks. - os.symlink('fileA', join('linkA')) - os.symlink('non-existing', join('brokenLink')) - os.symlink('dirB', join('linkB'), target_is_directory=True) - os.symlink(os.path.join('..', 'dirB'), join('dirA', 'linkC'), target_is_directory=True) - # This one goes upwards, creating a loop. - os.symlink(os.path.join('..', 'dirB'), join('dirB', 'linkD'), target_is_directory=True) - # Broken symlink (pointing to itself). - os.symlink('brokenLinkLoop', join('brokenLinkLoop')) + # note: this must be kept in sync with `PathTest.setUp()` + cls = self.cls + cls._files.clear() + cls._directories.clear() + cls._symlinks.clear() + join = cls.pathmod.join + cls._files.update({ + join(BASE, 'fileA'): b'this is file A\n', + join(BASE, 'dirB', 'fileB'): b'this is file B\n', + join(BASE, 'dirC', 'fileC'): b'this is file C\n', + join(BASE, 'dirC', 'dirD', 'fileD'): b'this is file D\n', + join(BASE, 'dirC', 'novel.txt'): b'this is a novel\n', + }) + cls._directories.update({ + BASE: {'dirA', 'dirB', 'dirC', 'dirE', 'fileA'}, + join(BASE, 'dirA'): set(), + join(BASE, 'dirB'): {'fileB'}, + join(BASE, 'dirC'): {'dirD', 'fileC', 'novel.txt'}, + join(BASE, 'dirC', 'dirD'): {'fileD'}, + join(BASE, 'dirE'): {}, + }) + dirname = BASE + while True: + dirname, basename = cls.pathmod.split(dirname) + if not basename: + break + cls._directories[dirname] = {basename} + + def tempdir(self): + path = self.cls(BASE).with_name('tmp-dirD') + path.mkdir() + return path def assertFileNotFound(self, func, *args, **kwargs): with self.assertRaises(FileNotFoundError) as cm: @@ -1991,9 +2150,11 @@ def test_rglob_symlink_loop(self): def test_glob_many_open_files(self): depth = 30 P = self.cls - base = P(BASE) / 'deep' - p = P(base, *(['d']*depth)) - p.mkdir(parents=True) + p = base = P(BASE) / 'deep' + p.mkdir() + for _ in range(depth): + p /= 'd' + p.mkdir() pattern = '/'.join(['*'] * depth) iters = [base.glob(pattern) for j in range(100)] for it in iters: @@ -2080,6 +2241,7 @@ def test_readlink(self): self.assertEqual((P / 'brokenLink').readlink(), self.cls('non-existing')) self.assertEqual((P / 'linkB').readlink(), self.cls('dirB')) + self.assertEqual((P / 'linkB' / 'linkD').readlink(), self.cls('../dirB')) with self.assertRaises(OSError): (P / 'fileA').readlink() @@ -2128,7 +2290,7 @@ def test_resolve_common(self): self._check_resolve_relative(p, P(BASE, 'dirB', 'fileB', 'foo', 'in', 'spam'), False) p = P(BASE, 'dirA', 'linkC', '..', 'foo', 'in', 'spam') - if os.name == 'nt': + if os.name == 'nt' and isinstance(p, pathlib.Path): # In Windows, if linkY points to dirB, 'dirA\linkY\..' # resolves to 'dirA' without resolving linkY first. self._check_resolve_relative(p, P(BASE, 'dirA', 'foo', 'in', @@ -2138,9 +2300,7 @@ def test_resolve_common(self): # resolves to 'dirB/..' first before resolving to parent of dirB. self._check_resolve_relative(p, P(BASE, 'foo', 'in', 'spam'), False) # Now create absolute symlinks. - d = os_helper._longpath(tempfile.mkdtemp(suffix='-dirD', - dir=os.getcwd())) - self.addCleanup(os_helper.rmtree, d) + d = self.tempdir() P(BASE, 'dirA', 'linkX').symlink_to(d) P(BASE, str(d), 'linkY').symlink_to(join('dirB')) p = P(BASE, 'dirA', 'linkX', 'linkY', 'fileB') @@ -2150,7 +2310,7 @@ def test_resolve_common(self): self._check_resolve_relative(p, P(BASE, 'dirB', 'foo', 'in', 'spam'), False) p = P(BASE, 'dirA', 'linkX', 'linkY', '..', 'foo', 'in', 'spam') - if os.name == 'nt': + if os.name == 'nt' and isinstance(p, pathlib.Path): # In Windows, if linkY points to dirB, 'dirA\linkY\..' # resolves to 'dirA' without resolving linkY first. self._check_resolve_relative(p, P(d, 'foo', 'in', 'spam'), False) @@ -2174,6 +2334,38 @@ def test_resolve_dot(self): # Non-strict self.assertEqual(r.resolve(strict=False), p / '3' / '4') + def _check_symlink_loop(self, *args): + path = self.cls(*args) + with self.assertRaises(OSError) as cm: + path.resolve(strict=True) + self.assertEqual(cm.exception.errno, errno.ELOOP) + + def test_resolve_loop(self): + if not self.can_symlink: + self.skipTest("symlinks required") + if os.name == 'nt' and issubclass(self.cls, pathlib.Path): + self.skipTest("symlink loops work differently with concrete Windows paths") + # Loops with relative symlinks. + self.cls(BASE, 'linkX').symlink_to('linkX/inside') + self._check_symlink_loop(BASE, 'linkX') + self.cls(BASE, 'linkY').symlink_to('linkY') + self._check_symlink_loop(BASE, 'linkY') + self.cls(BASE, 'linkZ').symlink_to('linkZ/../linkZ') + self._check_symlink_loop(BASE, 'linkZ') + # Non-strict + p = self.cls(BASE, 'linkZ', 'foo') + self.assertEqual(p.resolve(strict=False), p) + # Loops with absolute symlinks. + self.cls(BASE, 'linkU').symlink_to(join('linkU/inside')) + self._check_symlink_loop(BASE, 'linkU') + self.cls(BASE, 'linkV').symlink_to(join('linkV')) + self._check_symlink_loop(BASE, 'linkV') + self.cls(BASE, 'linkW').symlink_to(join('linkW/../linkW')) + self._check_symlink_loop(BASE, 'linkW') + # Non-strict + q = self.cls(BASE, 'linkW', 'foo') + self.assertEqual(q.resolve(strict=False), q) + def test_stat(self): statA = self.cls(BASE).joinpath('fileA').stat() statB = self.cls(BASE).joinpath('dirB', 'fileB').stat() @@ -2382,6 +2574,10 @@ def _check_complex_symlinks(self, link0_target): self.assertEqualNormCase(str(p), BASE) # Resolve relative paths. + try: + self.cls().absolute() + except pathlib.UnsupportedOperation: + return old_path = os.getcwd() os.chdir(BASE) try: @@ -2409,6 +2605,92 @@ def test_complex_symlinks_relative(self): def test_complex_symlinks_relative_dot_dot(self): self._check_complex_symlinks(os.path.join('dirA', '..')) + +class DummyPathWithSymlinks(DummyPath): + def readlink(self): + path = str(self.parent.resolve() / self.name) + if path in self._symlinks: + return self.with_segments(self._symlinks[path]) + elif path in self._files or path in self._directories: + raise OSError(errno.EINVAL, "Not a symlink", path) + else: + raise FileNotFoundError(errno.ENOENT, "File not found", path) + + def symlink_to(self, target, target_is_directory=False): + self._directories[str(self.parent)].add(self.name) + self._symlinks[str(self)] = str(target) + + +class DummyPathWithSymlinksTest(DummyPathTest): + cls = DummyPathWithSymlinks + can_symlink = True + + def setUp(self): + super().setUp() + cls = self.cls + join = cls.pathmod.join + cls._symlinks.update({ + join(BASE, 'linkA'): 'fileA', + join(BASE, 'linkB'): 'dirB', + join(BASE, 'dirA', 'linkC'): join('..', 'dirB'), + join(BASE, 'dirB', 'linkD'): join('..', 'dirB'), + join(BASE, 'brokenLink'): 'non-existing', + join(BASE, 'brokenLinkLoop'): 'brokenLinkLoop', + }) + cls._directories[BASE].update({'linkA', 'linkB', 'brokenLink', 'brokenLinkLoop'}) + cls._directories[join(BASE, 'dirA')].add('linkC') + cls._directories[join(BASE, 'dirB')].add('linkD') + + +# +# Tests for the concrete classes. +# + +class PathTest(DummyPathTest): + """Tests for the FS-accessing functionalities of the Path classes.""" + cls = pathlib.Path + can_symlink = os_helper.can_symlink() + + def setUp(self): + # note: this must be kept in sync with `DummyPathTest.setUp()` + def cleanup(): + os.chmod(join('dirE'), 0o777) + os_helper.rmtree(BASE) + self.addCleanup(cleanup) + os.mkdir(BASE) + os.mkdir(join('dirA')) + os.mkdir(join('dirB')) + os.mkdir(join('dirC')) + os.mkdir(join('dirC', 'dirD')) + os.mkdir(join('dirE')) + with open(join('fileA'), 'wb') as f: + f.write(b"this is file A\n") + with open(join('dirB', 'fileB'), 'wb') as f: + f.write(b"this is file B\n") + with open(join('dirC', 'fileC'), 'wb') as f: + f.write(b"this is file C\n") + with open(join('dirC', 'novel.txt'), 'wb') as f: + f.write(b"this is a novel\n") + with open(join('dirC', 'dirD', 'fileD'), 'wb') as f: + f.write(b"this is file D\n") + os.chmod(join('dirE'), 0) + if self.can_symlink: + # Relative symlinks. + os.symlink('fileA', join('linkA')) + os.symlink('non-existing', join('brokenLink')) + os.symlink('dirB', join('linkB'), target_is_directory=True) + os.symlink(os.path.join('..', 'dirB'), join('dirA', 'linkC'), target_is_directory=True) + # This one goes upwards, creating a loop. + os.symlink(os.path.join('..', 'dirB'), join('dirB', 'linkD'), target_is_directory=True) + # Broken symlink (pointing to itself). + os.symlink('brokenLinkLoop', join('brokenLinkLoop')) + + def tempdir(self): + d = os_helper._longpath(tempfile.mkdtemp(suffix='-dirD', + dir=os.getcwd())) + self.addCleanup(os_helper.rmtree, d) + return d + def test_concrete_class(self): if self.cls is pathlib.Path: expected = pathlib.WindowsPath if os.name == 'nt' else pathlib.PosixPath @@ -3178,12 +3460,6 @@ def test_absolute(self): self.assertEqual(str(P('//a').absolute()), '//a') self.assertEqual(str(P('//a/b').absolute()), '//a/b') - def _check_symlink_loop(self, *args): - path = self.cls(*args) - with self.assertRaises(OSError) as cm: - path.resolve(strict=True) - self.assertEqual(cm.exception.errno, errno.ELOOP) - @unittest.skipIf( is_emscripten or is_wasi, "umask is not implemented on Emscripten/WASI." @@ -3230,30 +3506,6 @@ def test_touch_mode(self): st = os.stat(join('masked_new_file')) self.assertEqual(stat.S_IMODE(st.st_mode), 0o750) - def test_resolve_loop(self): - if not self.can_symlink: - self.skipTest("symlinks required") - # Loops with relative symlinks. - os.symlink('linkX/inside', join('linkX')) - self._check_symlink_loop(BASE, 'linkX') - os.symlink('linkY', join('linkY')) - self._check_symlink_loop(BASE, 'linkY') - os.symlink('linkZ/../linkZ', join('linkZ')) - self._check_symlink_loop(BASE, 'linkZ') - # Non-strict - p = self.cls(BASE, 'linkZ', 'foo') - self.assertEqual(p.resolve(strict=False), p) - # Loops with absolute symlinks. - os.symlink(join('linkU/inside'), join('linkU')) - self._check_symlink_loop(BASE, 'linkU') - os.symlink(join('linkV'), join('linkV')) - self._check_symlink_loop(BASE, 'linkV') - os.symlink(join('linkW/../linkW'), join('linkW')) - self._check_symlink_loop(BASE, 'linkW') - # Non-strict - q = self.cls(BASE, 'linkW', 'foo') - self.assertEqual(q.resolve(strict=False), q) - def test_glob(self): P = self.cls p = P(BASE) diff --git a/Misc/NEWS.d/next/Library/2023-07-03-20-23-56.gh-issue-89812.cFkDOE.rst b/Misc/NEWS.d/next/Library/2023-07-03-20-23-56.gh-issue-89812.cFkDOE.rst new file mode 100644 index 00000000000000..a4221fc4ca900b --- /dev/null +++ b/Misc/NEWS.d/next/Library/2023-07-03-20-23-56.gh-issue-89812.cFkDOE.rst @@ -0,0 +1,2 @@ +Add private ``pathlib._PathBase`` class, which provides experimental support +for virtual filesystems, and may be made public in a future version of Python.