Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EXT Filesystem Components #337

Merged
Merged
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions ofrak_core/ofrak/core/extfs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import asyncio
import tempfile
from dataclasses import dataclass
from subprocess import CalledProcessError

from ofrak import Unpacker, Resource
from ofrak.core import (
GenericBinary,
FilesystemRoot,
File,
Folder,
SpecialFileType,
MagicDescriptionIdentifier,
)
from ofrak.model.component_model import ComponentExternalTool, CC

DEBUGFS = ComponentExternalTool(
rbs-jacob marked this conversation as resolved.
Show resolved Hide resolved
"debugfs", "https://e2fsprogs.sourceforge.net/", "-V", brew_package="e2fsprogs"
)


@dataclass
class ExtFilesystem(GenericBinary, FilesystemRoot):
pass


@dataclass
class Ext2Filesystem(ExtFilesystem):
"""
Linux EXT2 filesystem.
"""


@dataclass
class Ext3Filesystem(ExtFilesystem):
"""
Linux EXT3 filesystem.
"""


@dataclass
class Ext4Filesystem(ExtFilesystem):
"""
Linux EXT4 filesystem.
"""


class ExtUnpacker(Unpacker[None]):
"""
Unpack a Linux EXT filesystem.
"""

targets = (ExtFilesystem,)
children = (File, Folder, SpecialFileType)
external_dependencies = (DEBUGFS,)
rbs-jacob marked this conversation as resolved.
Show resolved Hide resolved

async def unpack(self, resource: Resource, config: CC = None) -> None:
with tempfile.NamedTemporaryFile(suffix=".extfs") as temp_fs_file:
temp_fs_file.write(await resource.get_data())
temp_fs_file.flush()

with tempfile.TemporaryDirectory() as temp_dir:
command = [
"debugfs",
"-R",
f"rdump / {temp_dir}",
temp_fs_file.name,
]
proc = await asyncio.create_subprocess_exec(
*command,
)
returncode = await proc.wait()
if returncode:
raise CalledProcessError(returncode=returncode, cmd=command)

fs_view = await resource.view_as(ExtFilesystem)
await fs_view.initialize_from_disk(temp_dir)


MagicDescriptionIdentifier.register(Ext2Filesystem, lambda s: "ext2 filesystem" in s.lower())
MagicDescriptionIdentifier.register(Ext3Filesystem, lambda s: "ext3 filesystem" in s.lower())
MagicDescriptionIdentifier.register(Ext4Filesystem, lambda s: "ext4 filesystem" in s.lower())
Loading