Skip to content

Commit

Permalink
Manually resolves links relatively to root_dir, and prevent escape
Browse files Browse the repository at this point in the history
This patch is a followup of #311.

It appeared that we were not resolving paths when reading from files.
This means that a symbolic link present under `root_dir` could be
blindly followed _outside_ of `root_dir`, possibly leading to host own
materials.
  • Loading branch information
Samuel FORESTIER committed Mar 18, 2022
1 parent 65eda6f commit 2d0b5f1
Show file tree
Hide file tree
Showing 12 changed files with 107 additions and 7 deletions.
75 changes: 68 additions & 7 deletions src/distro/distro.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"""

import argparse
import errno
import json
import logging
import os
Expand Down Expand Up @@ -766,8 +767,10 @@ def __init__(

# NOTE: The idea is to respect order **and** have it set
# at all times for API backwards compatibility.
if os.path.isfile(etc_dir_os_release_file) or not os.path.isfile(
usr_lib_os_release_file
if (
self.root_dir is not None
or os.path.isfile(etc_dir_os_release_file)
or not os.path.isfile(usr_lib_os_release_file)
):
self.os_release_file = etc_dir_os_release_file
else:
Expand Down Expand Up @@ -1078,6 +1081,57 @@ def uname_attr(self, attribute: str) -> str:
"""
return self._uname_info.get(attribute, "")

@staticmethod
def __abs_path_join(root_path: str, abs_path: str, /) -> str:
rel_path = os.path.splitdrive(abs_path)[1].lstrip(os.sep)
if os.altsep is not None:
rel_path = rel_path.lstrip(os.altsep)

return os.path.join(root_path, rel_path)

def __resolve_chroot_symlink(self, link_location: str) -> str:
# only resolve symlink if root_dir is set and link is under it
if (
self.root_dir is None
or os.path.commonprefix([self.root_dir, link_location]) != self.root_dir
):
return link_location

seen_paths = []
while True:
try:
resolved = os.readlink(link_location)
except OSError: # includes case "not a symlink"
if (
os.path.commonprefix(
[
os.path.realpath(self.root_dir),
os.path.realpath(link_location),
]
)
!= os.path.realpath(self.root_dir)
):
raise FileNotFoundError(
f"{link_location} resolves outside of {self.root_dir}."
) from None

return link_location

if os.path.isabs(resolved):
# i.e. absolute path (regarding to the chroot), that we need to
# "move" back inside the chroot.
resolved = self.__abs_path_join(self.root_dir, resolved)
else:
# i.e. relative path that we make absolute
resolved = os.path.join(os.path.dirname(link_location), resolved)

# prevent symlinks infinite loop
if resolved in seen_paths:
raise OSError(errno.ELOOP, os.strerror(errno.ELOOP), resolved)

seen_paths.append(link_location)
link_location = resolved

@cached_property
def _os_release_info(self) -> Dict[str, str]:
"""
Expand All @@ -1086,10 +1140,13 @@ def _os_release_info(self) -> Dict[str, str]:
Returns:
A dictionary containing all information items.
"""
if os.path.isfile(self.os_release_file):
with open(self.os_release_file, encoding="utf-8") as release_file:
try:
with open(
self.__resolve_chroot_symlink(self.os_release_file), encoding="utf-8"
) as release_file:
return self._parse_os_release_content(release_file)
return {}
except FileNotFoundError:
return {}

@staticmethod
def _parse_os_release_content(lines: TextIO) -> Dict[str, str]:
Expand Down Expand Up @@ -1254,7 +1311,11 @@ def _distro_release_info(self) -> Dict[str, str]:
basename
for basename in os.listdir(self.etc_dir)
if basename not in _DISTRO_RELEASE_IGNORE_BASENAMES
and os.path.isfile(os.path.join(self.etc_dir, basename))
and os.path.isfile(
self.__resolve_chroot_symlink(
os.path.join(self.etc_dir, basename)
)
)
]
# We sort for repeatability in cases where there are multiple
# distro specific files; e.g. CentOS, Oracle, Enterprise all
Expand Down Expand Up @@ -1301,7 +1362,7 @@ def _parse_distro_release_file(self, filepath: str) -> Dict[str, str]:
A dictionary containing all information items.
"""
try:
with open(filepath, encoding="utf-8") as fp:
with open(self.__resolve_chroot_symlink(filepath), encoding="utf-8") as fp:
# Only parse the first line. For instance, on SLES there
# are multiple lines. We don't want them...
return self._parse_distro_release_content(fp.readline())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ID=absolutesymlinks
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ID=rootdirnonescape
30 changes: 30 additions & 0 deletions tests/test_distro.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import ast
import errno
import io
import json
import os
Expand Down Expand Up @@ -701,6 +702,35 @@ def test_empty_release(self) -> None:
desired_outcome = {"id": "empty"}
self._test_outcome(desired_outcome)

def test_absolutesymlinks(self) -> None:
self.distro = distro.LinuxDistribution(
root_dir=os.path.join(TESTDISTROS, "distro", "absolutesymlinks")
)
desired_outcome = {"id": "absolutesymlinks"}
self._test_outcome(desired_outcome)

def test_rootdirescape(self) -> None:
self.distro = distro.LinuxDistribution(
root_dir=os.path.join(TESTDISTROS, "distro", "rootdirescape")
)
with pytest.raises(FileNotFoundError, match=r"resolves outside"):
self.distro.id()

def test_rootdirnonescape(self) -> None:
self.distro = distro.LinuxDistribution(
root_dir=os.path.join(TESTDISTROS, "distro", "rootdirnonescape")
)
desired_outcome = {"id": "rootdirnonescape"}
self._test_outcome(desired_outcome)

def test_symlinksloop(self) -> None:
self.distro = distro.LinuxDistribution(
root_dir=os.path.join(TESTDISTROS, "distro", "symlinksloop")
)
with pytest.raises(OSError) as excinfo:
self.distro.id()
assert excinfo.value.errno == errno.ELOOP

def test_dontincludeuname(self) -> None:
self._setup_for_distro(os.path.join(TESTDISTROS, "distro", "dontincludeuname"))

Expand Down

0 comments on commit 2d0b5f1

Please sign in to comment.