Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiplexed descendants in MultiplexedPath. #278

Merged
merged 5 commits into from
Feb 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
v5.11.0
=======

* #265: ``MultiplexedPath`` now honors multiple subdirectories
in ``iterdir`` and ``joinpath``.

v5.10.3
=======

Expand Down
69 changes: 36 additions & 33 deletions importlib_resources/_itertools.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,38 @@
from itertools import filterfalse
# from more_itertools 9.0
def only(iterable, default=None, too_long=None):
"""If *iterable* has only one item, return it.
If it has zero items, return *default*.
If it has more than one item, raise the exception given by *too_long*,
which is ``ValueError`` by default.
>>> only([], default='missing')
'missing'
>>> only([1])
1
>>> only([1, 2]) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
ValueError: Expected exactly one item in iterable, but got 1, 2,
and perhaps more.'
>>> only([1, 2], too_long=TypeError) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
TypeError
Note that :func:`only` attempts to advance *iterable* twice to ensure there
is only one item. See :func:`spy` or :func:`peekable` to check
iterable contents less destructively.
"""
it = iter(iterable)
first_value = next(it, default)

from typing import (
Callable,
Iterable,
Iterator,
Optional,
Set,
TypeVar,
Union,
)

# Type and type variable definitions
_T = TypeVar('_T')
_U = TypeVar('_U')


def unique_everseen(
iterable: Iterable[_T], key: Optional[Callable[[_T], _U]] = None
) -> Iterator[_T]:
"List unique elements, preserving order. Remember all elements ever seen."
# unique_everseen('AAAABBBCCDAABBB') --> A B C D
# unique_everseen('ABBCcAD', str.lower) --> A B C D
seen: Set[Union[_T, _U]] = set()
seen_add = seen.add
if key is None:
for element in filterfalse(seen.__contains__, iterable):
seen_add(element)
yield element
try:
second_value = next(it)
except StopIteration:
pass
else:
for element in iterable:
k = key(element)
if k not in seen:
seen_add(k)
yield element
msg = (
'Expected exactly one item in iterable, but got {!r}, {!r}, '
'and perhaps more.'.format(first_value, second_value)
)
raise too_long or ValueError(msg)

return first_value
28 changes: 25 additions & 3 deletions importlib_resources/readers.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import collections
import itertools
import pathlib
import operator

from . import abc

from ._itertools import unique_everseen
from ._itertools import only
from ._compat import ZipPath


Expand Down Expand Up @@ -69,8 +70,10 @@ def __init__(self, *paths):
raise NotADirectoryError('MultiplexedPath only supports directories')

def iterdir(self):
files = (file for path in self._paths for file in path.iterdir())
return unique_everseen(files, key=operator.attrgetter('name'))
children = (child for path in self._paths for child in path.iterdir())
by_name = operator.attrgetter('name')
groups = itertools.groupby(sorted(children, key=by_name), key=by_name)
return map(self._follow, (locs for name, locs in groups))

def read_bytes(self):
raise FileNotFoundError(f'{self} is not a file')
Expand All @@ -92,6 +95,25 @@ def joinpath(self, *descendants):
# Just return something that will not exist.
return self._paths[0].joinpath(*descendants)

@classmethod
def _follow(cls, children):
"""
Construct a MultiplexedPath if needed.

If children contains a sole element, return it.
Otherwise, return a MultiplexedPath of the items.
Unless one of the items is not a Directory, then return the first.
"""
subdirs, one_dir, one_file = itertools.tee(children, 3)

try:
return only(one_dir)
except ValueError:
try:
return cls(*subdirs)
except NotADirectoryError:
return next(one_file)

def open(self, *args, **kwargs):
raise FileNotFoundError(f'{self} is not a file')

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
a resource
11 changes: 11 additions & 0 deletions importlib_resources/tests/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ def test_join_path_compound(self):
path = MultiplexedPath(self.folder)
assert not path.joinpath('imaginary/foo.py').exists()

def test_join_path_common_subdir(self):
prefix = os.path.abspath(os.path.join(__file__, '..'))
data01 = os.path.join(prefix, 'data01')
data02 = os.path.join(prefix, 'data02')
path = MultiplexedPath(data01, data02)
self.assertIsInstance(path.joinpath('subdirectory'), MultiplexedPath)
self.assertEqual(
str(path.joinpath('subdirectory', 'subsubdir'))[len(prefix) + 1 :],
os.path.join('data02', 'subdirectory', 'subsubdir'),
)

def test_repr(self):
self.assertEqual(
repr(MultiplexedPath(self.folder)),
Expand Down