Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EXT Filesystem Components #337

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions ofrak_core/ofrak/core/extfs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import asyncio
import tempfile
from dataclasses import dataclass
from subprocess import CalledProcessError

from ofrak import Unpacker, Resource
from ofrak.core import (
GenericBinary,
FilesystemRoot,
File,
Folder,
SpecialFileType,
MagicDescriptionIdentifier,
)
from ofrak.model.component_model import ComponentExternalTool, ComponentConfig

_DEBUGFS = ComponentExternalTool(
"debugfs", "https://e2fsprogs.sourceforge.net/", "-V", brew_package="e2fsprogs"
)


@dataclass
class ExtFilesystem(GenericBinary, FilesystemRoot):
pass


@dataclass
class Ext2Filesystem(ExtFilesystem):
"""
Linux EXT2 filesystem.
"""


@dataclass
class Ext3Filesystem(ExtFilesystem):
"""
Linux EXT3 filesystem.
"""


@dataclass
class Ext4Filesystem(ExtFilesystem):
"""
Linux EXT4 filesystem.
"""


class ExtUnpacker(Unpacker[None]):
"""
Unpack a Linux EXT filesystem.
"""

targets = (ExtFilesystem,)
children = (File, Folder, SpecialFileType)
external_dependencies = (_DEBUGFS,)

async def unpack(self, resource: Resource, config: ComponentConfig = None) -> None:
with tempfile.NamedTemporaryFile(suffix=".extfs") as temp_fs_file:
temp_fs_file.write(await resource.get_data())
temp_fs_file.flush()

with tempfile.TemporaryDirectory() as temp_dir:
command = [
"debugfs",
"-R",
f"rdump / {temp_dir}",
temp_fs_file.name,
]
proc = await asyncio.create_subprocess_exec(
*command,
)
returncode = await proc.wait()
if returncode:
raise CalledProcessError(returncode=returncode, cmd=command)

fs_view = await resource.view_as(ExtFilesystem)
await fs_view.initialize_from_disk(temp_dir)


MagicDescriptionIdentifier.register(Ext2Filesystem, lambda s: "ext2 filesystem" in s.lower())
MagicDescriptionIdentifier.register(Ext3Filesystem, lambda s: "ext3 filesystem" in s.lower())
MagicDescriptionIdentifier.register(Ext4Filesystem, lambda s: "ext4 filesystem" in s.lower())
3 changes: 3 additions & 0 deletions ofrak_core/test_ofrak/components/assets/ext2.1024.img
Git LFS file not shown
3 changes: 3 additions & 0 deletions ofrak_core/test_ofrak/components/assets/ext2.2048.img
Git LFS file not shown
3 changes: 3 additions & 0 deletions ofrak_core/test_ofrak/components/assets/ext2.4096.img
Git LFS file not shown
3 changes: 3 additions & 0 deletions ofrak_core/test_ofrak/components/assets/ext3.1024.img
Git LFS file not shown
3 changes: 3 additions & 0 deletions ofrak_core/test_ofrak/components/assets/ext3.2048.img
Git LFS file not shown
3 changes: 3 additions & 0 deletions ofrak_core/test_ofrak/components/assets/ext3.4096.img
Git LFS file not shown
3 changes: 3 additions & 0 deletions ofrak_core/test_ofrak/components/assets/ext4.1024.img
Git LFS file not shown
3 changes: 3 additions & 0 deletions ofrak_core/test_ofrak/components/assets/ext4.2048.img
Git LFS file not shown
3 changes: 3 additions & 0 deletions ofrak_core/test_ofrak/components/assets/ext4.4096.img
Git LFS file not shown
186 changes: 186 additions & 0 deletions ofrak_core/test_ofrak/components/test_extfs_component.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import os.path
from dataclasses import dataclass
from typing import Dict

import pytest
from ofrak.core.filesystem import File

from ofrak.resource import Resource

from ofrak import OFRAKContext
from ofrak.core.extfs import *
from pytest_ofrak.patterns.unpack_verify import (
UnpackAndVerifyPattern,
UnpackAndVerifyTestCase,
)
import test_ofrak.components


@dataclass
class ExtUnpackerTestCase(UnpackAndVerifyTestCase[str, bytes]):
filename: str


EXT2_UNPACKER_TEST_CASES = [
ExtUnpackerTestCase(
"Single text file",
{"apple.txt": b"apple\n", "banana.txt": b"banana\n", "cherry.txt": b"cherry\n"},
set(),
"ext2.1024.img",
),
ExtUnpackerTestCase(
"Single text file",
{"apple.txt": b"apple\n", "banana.txt": b"banana\n", "cherry.txt": b"cherry\n"},
set(),
"ext2.2048.img",
),
ExtUnpackerTestCase(
"Single text file",
{"apple.txt": b"apple\n", "banana.txt": b"banana\n", "cherry.txt": b"cherry\n"},
set(),
"ext2.4096.img",
),
]

EXT3_UNPACKER_TEST_CASES = [
ExtUnpackerTestCase(
"Single text file",
{"apple.txt": b"apple\n", "banana.txt": b"banana\n", "cherry.txt": b"cherry\n"},
set(),
"ext3.1024.img",
),
ExtUnpackerTestCase(
"Single text file",
{"apple.txt": b"apple\n", "banana.txt": b"banana\n", "cherry.txt": b"cherry\n"},
set(),
"ext3.2048.img",
),
ExtUnpackerTestCase(
"Single text file",
{"apple.txt": b"apple\n", "banana.txt": b"banana\n", "cherry.txt": b"cherry\n"},
set(),
"ext3.4096.img",
),
]

EXT4_UNPACKER_TEST_CASES = [
ExtUnpackerTestCase(
"Single text file",
{"apple.txt": b"apple\n", "banana.txt": b"banana\n", "cherry.txt": b"cherry\n"},
set(),
"ext4.1024.img",
),
ExtUnpackerTestCase(
"Single text file",
{"apple.txt": b"apple\n", "banana.txt": b"banana\n", "cherry.txt": b"cherry\n"},
set(),
"ext4.2048.img",
),
ExtUnpackerTestCase(
"Single text file",
{"apple.txt": b"apple\n", "banana.txt": b"banana\n", "cherry.txt": b"cherry\n"},
set(),
"ext4.4096.img",
),
]


class _TestExtUnpackModifyPack(UnpackAndVerifyPattern):
@pytest.fixture
async def root_resource(
self,
unpack_verify_test_case: ExtUnpackerTestCase,
ofrak_context: OFRAKContext,
test_id: str,
) -> Resource:
asset_path = os.path.join(
test_ofrak.components.ASSETS_DIR, unpack_verify_test_case.filename
)
with open(asset_path, "rb") as f:
data = f.read()
return await ofrak_context.create_root_resource(test_id, data, tags=(File,))

async def unpack(self, root_resource: Resource):
await root_resource.unpack_recursively()

async def verify_descendant(self, unpacked_descendant: bytes, specified_result: bytes):
assert unpacked_descendant == specified_result

async def unpack_verify_test_case(self, request) -> ExtUnpackerTestCase:
raise NotImplementedError

async def get_descendants_to_verify(self, unpacked_root_resource: Resource) -> Dict[str, bytes]:
raise NotImplementedError


class TestExt2UnpackModifyPack(_TestExtUnpackModifyPack):
@pytest.fixture(params=EXT2_UNPACKER_TEST_CASES, ids=lambda tc: tc.label)
async def unpack_verify_test_case(self, request) -> ExtUnpackerTestCase:
return request.param

async def get_descendants_to_verify(self, unpacked_root_resource: Resource) -> Dict[str, bytes]:
ext2_view = await unpacked_root_resource.view_as(Ext2Filesystem)
children = await ext2_view.list_dir()
return {
name: await child.resource.get_data()
for name, child in children.items()
if child.is_file()
}


class TestExt3UnpackModifyPack(_TestExtUnpackModifyPack):
@pytest.fixture
async def root_resource(
self,
unpack_verify_test_case: ExtUnpackerTestCase,
ofrak_context: OFRAKContext,
test_id: str,
) -> Resource:
asset_path = os.path.join(
test_ofrak.components.ASSETS_DIR, unpack_verify_test_case.filename
)
with open(asset_path, "rb") as f:
data = f.read()
return await ofrak_context.create_root_resource(test_id, data, tags=(Ext3Filesystem,))

@pytest.fixture(params=EXT3_UNPACKER_TEST_CASES, ids=lambda tc: tc.label)
async def unpack_verify_test_case(self, request) -> ExtUnpackerTestCase:
return request.param

async def get_descendants_to_verify(self, unpacked_root_resource: Resource) -> Dict[str, bytes]:
ext3_view = await unpacked_root_resource.view_as(Ext3Filesystem)
children = await ext3_view.list_dir()
return {
name: await child.resource.get_data()
for name, child in children.items()
if child.is_file()
}


class TestExt4UnpackModifyPack(_TestExtUnpackModifyPack):
@pytest.fixture
async def root_resource(
self,
unpack_verify_test_case: ExtUnpackerTestCase,
ofrak_context: OFRAKContext,
test_id: str,
) -> Resource:
asset_path = os.path.join(
test_ofrak.components.ASSETS_DIR, unpack_verify_test_case.filename
)
with open(asset_path, "rb") as f:
data = f.read()
return await ofrak_context.create_root_resource(test_id, data, tags=(Ext4Filesystem,))

@pytest.fixture(params=EXT4_UNPACKER_TEST_CASES, ids=lambda tc: tc.label)
async def unpack_verify_test_case(self, request) -> ExtUnpackerTestCase:
return request.param

async def get_descendants_to_verify(self, unpacked_root_resource: Resource) -> Dict[str, bytes]:
ext4_view = await unpacked_root_resource.view_as(Ext4Filesystem)
children = await ext4_view.list_dir()
return {
name: await child.resource.get_data()
for name, child in children.items()
if child.is_file()
}