diff --git a/config/dpkg/control b/config/dpkg/control index 4c8d4672..6d665b79 100644 --- a/config/dpkg/control +++ b/config/dpkg/control @@ -9,7 +9,7 @@ Homepage: https://github.com/log2timeline/dfvfs Package: python3-dfvfs Architecture: all -Depends: libbde-python3 (>= 20140531), libewf-python3 (>= 20131210), libfsapfs-python3 (>= 20201107), libfsext-python3 (>= 20220112), libfshfs-python3 (>= 20220113), libfsntfs-python3 (>= 20211229), libfsxfs-python3 (>= 20220113), libfvde-python3 (>= 20160719), libfwnt-python3 (>= 20210717), libluksde-python3 (>= 20200101), libmodi-python3 (>= 20210405), libphdi-python3 (>= 20220110), libqcow-python3 (>= 20201213), libsigscan-python3 (>= 20191221), libsmdev-python3 (>= 20140529), libsmraw-python3 (>= 20140612), libvhdi-python3 (>= 20201014), libvmdk-python3 (>= 20140421), libvsgpt-python3 (>= 20211115), libvshadow-python3 (>= 20160109), libvslvm-python3 (>= 20160109), python3-cffi-backend (>= 1.9.1), python3-cryptography (>= 2.0.2), python3-dfdatetime (>= 20211113), python3-dtfabric (>= 20170524), python3-idna (>= 2.5), python3-pytsk3 (>= 20210419), python3-pyxattr (>= 0.7.2), python3-yaml (>= 3.10), ${misc:Depends} +Depends: libbde-python3 (>= 20140531), libewf-python3 (>= 20131210), libfsapfs-python3 (>= 20201107), libfsext-python3 (>= 20220112), libfshfs-python3 (>= 20220114), libfsntfs-python3 (>= 20211229), libfsxfs-python3 (>= 20220113), libfvde-python3 (>= 20160719), libfwnt-python3 (>= 20210717), libluksde-python3 (>= 20200101), libmodi-python3 (>= 20210405), libphdi-python3 (>= 20220110), libqcow-python3 (>= 20201213), libsigscan-python3 (>= 20191221), libsmdev-python3 (>= 20140529), libsmraw-python3 (>= 20140612), libvhdi-python3 (>= 20201014), libvmdk-python3 (>= 20140421), libvsgpt-python3 (>= 20211115), libvshadow-python3 (>= 20160109), libvslvm-python3 (>= 20160109), python3-cffi-backend (>= 1.9.1), python3-cryptography (>= 2.0.2), python3-dfdatetime (>= 20211113), python3-dtfabric (>= 20170524), python3-idna (>= 2.5), python3-pytsk3 (>= 20210419), python3-pyxattr (>= 0.7.2), python3-yaml (>= 3.10), ${misc:Depends} Description: Python 3 module of dfVFS dfVFS, or Digital Forensics Virtual File System, provides read-only access to file-system objects from various storage media types and file formats. The goal diff --git a/dependencies.ini b/dependencies.ini index 7334cd8b..14c45848 100644 --- a/dependencies.ini +++ b/dependencies.ini @@ -66,7 +66,7 @@ version_property: get_version() [pyfshfs] dpkg_name: libfshfs-python3 l2tbinaries_name: libfshfs -minimum_version: 20220113 +minimum_version: 20220114 pypi_name: libfshfs-python rpm_name: libfshfs-python3 version_property: get_version() diff --git a/dfvfs/file_io/hfs_file_io.py b/dfvfs/file_io/hfs_file_io.py index 1f09639d..ef5119e0 100644 --- a/dfvfs/file_io/hfs_file_io.py +++ b/dfvfs/file_io/hfs_file_io.py @@ -4,7 +4,6 @@ import os from dfvfs.file_io import file_io -from dfvfs.lib import errors from dfvfs.resolver import resolver @@ -20,10 +19,12 @@ def __init__(self, resolver_context, path_spec): """ super(HFSFile, self).__init__(resolver_context, path_spec) self._file_system = None + self._fshfs_data_stream = None self._fshfs_file_entry = None def _Close(self): """Closes the file-like object.""" + self._fshfs_data_stream = None self._fshfs_file_entry = None self._file_system = None @@ -42,10 +43,7 @@ def _Open(self, mode='rb'): OSError: if the file-like object could not be opened. PathSpecError: if the path specification is incorrect. """ - data_stream = getattr(self._path_spec, 'data_stream', None) - if data_stream: - raise errors.NotSupported( - 'Open data stream: {0:s} not supported.'.format(data_stream)) + data_stream_name = getattr(self._path_spec, 'data_stream', None) self._file_system = resolver.Resolver.OpenFileSystem( self._path_spec, resolver_context=self._resolver_context) @@ -54,10 +52,18 @@ def _Open(self, mode='rb'): if not file_entry: raise IOError('Unable to open file entry.') + fshfs_data_stream = None fshfs_file_entry = file_entry.GetHFSFileEntry() if not fshfs_file_entry: raise IOError('Unable to open HFS file entry.') + if data_stream_name == 'rsrc': + fshfs_data_stream = fshfs_file_entry.get_resource_fork() + elif data_stream_name: + raise IOError('Unable to open data stream: {0:s}.'.format( + data_stream_name)) + + self._fshfs_data_stream = fshfs_data_stream self._fshfs_file_entry = fshfs_file_entry # Note: that the following functions do not follow the style guide @@ -84,6 +90,8 @@ def read(self, size=None): if not self._is_open: raise IOError('Not opened.') + if self._fshfs_data_stream: + return self._fshfs_data_stream.read(size=size) return self._fshfs_file_entry.read(size=size) def seek(self, offset, whence=os.SEEK_SET): @@ -101,7 +109,10 @@ def seek(self, offset, whence=os.SEEK_SET): if not self._is_open: raise IOError('Not opened.') - self._fshfs_file_entry.seek(offset, whence) + if self._fshfs_data_stream: + self._fshfs_data_stream.seek(offset, whence) + else: + self._fshfs_file_entry.seek(offset, whence) def get_offset(self): """Retrieves the current offset into the file-like object. @@ -116,6 +127,8 @@ def get_offset(self): if not self._is_open: raise IOError('Not opened.') + if self._fshfs_data_stream: + return self._fshfs_data_stream.get_offset() return self._fshfs_file_entry.get_offset() def get_size(self): @@ -131,4 +144,6 @@ def get_size(self): if not self._is_open: raise IOError('Not opened.') + if self._fshfs_data_stream: + return self._fshfs_data_stream.get_size() return self._fshfs_file_entry.get_size() diff --git a/dfvfs/file_io/ntfs_file_io.py b/dfvfs/file_io/ntfs_file_io.py index 51b05a0e..6f4eef7b 100644 --- a/dfvfs/file_io/ntfs_file_io.py +++ b/dfvfs/file_io/ntfs_file_io.py @@ -41,7 +41,7 @@ def _Open(self, mode='rb'): OSError: if the file-like object could not be opened. PathSpecError: if the path specification is incorrect. """ - data_stream = getattr(self._path_spec, 'data_stream', None) + data_stream_name = getattr(self._path_spec, 'data_stream', None) self._file_system = resolver.Resolver.OpenFileSystem( self._path_spec, resolver_context=self._resolver_context) @@ -55,12 +55,12 @@ def _Open(self, mode='rb'): if not fsntfs_file_entry: raise IOError('Unable to open NTFS file entry.') - if data_stream: + if data_stream_name: fsntfs_data_stream = fsntfs_file_entry.get_alternate_data_stream_by_name( - data_stream) + data_stream_name) if not fsntfs_data_stream: raise IOError('Unable to open data stream: {0:s}.'.format( - data_stream)) + data_stream_name)) elif not fsntfs_file_entry.has_default_data_stream(): raise IOError('Missing default data stream.') diff --git a/dfvfs/file_io/tsk_file_io.py b/dfvfs/file_io/tsk_file_io.py index a0d0997f..fdd25cfc 100644 --- a/dfvfs/file_io/tsk_file_io.py +++ b/dfvfs/file_io/tsk_file_io.py @@ -5,6 +5,7 @@ import pytsk3 +from dfvfs.lib import errors from dfvfs.file_io import file_io from dfvfs.resolver import resolver @@ -41,11 +42,12 @@ def _Open(self, mode='rb'): Raises: AccessError: if the access to open the file was denied. + BackEndError: if pytsk3 returns a non UTF-8 formatted name. IOError: if the file-like object could not be opened. OSError: if the file-like object could not be opened. PathSpecError: if the path specification is incorrect. """ - data_stream = getattr(self._path_spec, 'data_stream', None) + data_stream_name = getattr(self._path_spec, 'data_stream', None) file_system = resolver.Resolver.OpenFileSystem( self._path_spec, resolver_context=self._resolver_context) @@ -82,35 +84,47 @@ def _Open(self, mode='rb'): raise IOError( 'Missing attribute type in file.info.meta (pytsk3.TSK_FS_META).') - if data_stream: - for attribute in tsk_file: - if getattr(attribute, 'info', None) is None: + if data_stream_name: + for pytsk_attribute in tsk_file: + if getattr(pytsk_attribute, 'info', None) is None: continue - # The value of the attribute name will be None for the default - # data stream. - attribute_name = getattr(attribute.info, 'name', None) - if attribute_name is None: - attribute_name = '' - - else: + attribute_name = getattr(pytsk_attribute.info, 'name', None) + if attribute_name: try: # pytsk3 returns an UTF-8 encoded byte string. attribute_name = attribute_name.decode('utf8') except UnicodeError: - # Continue here since we cannot represent the attribute name. - continue - - attribute_type = getattr(attribute.info, 'type', None) - if attribute_name == data_stream and attribute_type in ( - pytsk3.TSK_FS_ATTR_TYPE_HFS_DEFAULT, - pytsk3.TSK_FS_ATTR_TYPE_HFS_DATA, - pytsk3.TSK_FS_ATTR_TYPE_NTFS_DATA): - tsk_attribute = attribute + raise errors.BackEndError( + 'pytsk3 returned a non UTF-8 formatted name.') + + attribute_type = getattr(pytsk_attribute.info, 'type', None) + if attribute_type == pytsk3.TSK_FS_ATTR_TYPE_HFS_DATA and ( + not data_stream_name and not attribute_name): + tsk_attribute = pytsk_attribute + break + + if attribute_type == pytsk3.TSK_FS_ATTR_TYPE_HFS_RSRC and ( + data_stream_name == 'rsrc'): + tsk_attribute = pytsk_attribute + break + + if attribute_type == pytsk3.TSK_FS_ATTR_TYPE_NTFS_DATA and ( + (not data_stream_name and not attribute_name) or + (data_stream_name == attribute_name)): + tsk_attribute = pytsk_attribute + break + + # The data stream is returned as a name-less attribute of type + # pytsk3.TSK_FS_ATTR_TYPE_DEFAULT. + if (attribute_type == pytsk3.TSK_FS_ATTR_TYPE_DEFAULT and + not data_stream_name and not attribute_name): + tsk_attribute = pytsk_attribute break if tsk_attribute is None: - raise IOError('Unable to open data stream: {0:s}.'.format(data_stream)) + raise IOError('Unable to open data stream: {0:s}.'.format( + data_stream_name)) if (not tsk_attribute and tsk_file.info.meta.type != pytsk3.TSK_FS_META_TYPE_REG): diff --git a/dfvfs/path/hfs_path_spec.py b/dfvfs/path/hfs_path_spec.py index 9e17ac6b..75849075 100644 --- a/dfvfs/path/hfs_path_spec.py +++ b/dfvfs/path/hfs_path_spec.py @@ -10,6 +10,8 @@ class HFSPathSpec(path_spec.PathSpec): """HFS path specification implementation. Attributes: + data_stream (str): data stream name, where None indicates the default + data stream. identifier (int): catalog node identifier (CNID). location (str): location. """ @@ -17,12 +19,15 @@ class HFSPathSpec(path_spec.PathSpec): TYPE_INDICATOR = definitions.TYPE_INDICATOR_HFS def __init__( - self, identifier=None, location=None, parent=None, **kwargs): + self, data_stream=None, identifier=None, location=None, parent=None, + **kwargs): """Initializes a path specification. Note that an HFS path specification must have a parent. Args: + data_stream (Optional[str]): data stream name, where None indicates + the default data stream. identifier (Optional[int]): catalog node identifier (CNID). location (Optional[str]): location. parent (Optional[PathSpec]): parent path specification. @@ -34,6 +39,7 @@ def __init__( raise ValueError('Missing identifier and location, or parent value.') super(HFSPathSpec, self).__init__(parent=parent, **kwargs) + self.data_stream = data_stream self.identifier = identifier self.location = location @@ -42,6 +48,8 @@ def comparable(self): """str: comparable representation of the path specification.""" string_parts = [] + if self.data_stream: + string_parts.append('data stream: {0:s}'.format(self.data_stream)) if self.identifier is not None: string_parts.append('identifier: {0:d}'.format(self.identifier)) if self.location is not None: diff --git a/dfvfs/vfs/apfs_file_entry.py b/dfvfs/vfs/apfs_file_entry.py index febd21e7..0d1a0618 100644 --- a/dfvfs/vfs/apfs_file_entry.py +++ b/dfvfs/vfs/apfs_file_entry.py @@ -211,6 +211,10 @@ def GetAPFSFileEntry(self): def GetExtents(self, data_stream_name=''): """Retrieves extents of a specific data stream. + Args: + data_stream_name (Optional[str]): data stream name, where an empty + string represents the default data stream. + Returns: list[Extent]: extents of the data stream. """ diff --git a/dfvfs/vfs/ext_file_entry.py b/dfvfs/vfs/ext_file_entry.py index 1e83d848..a54c6060 100644 --- a/dfvfs/vfs/ext_file_entry.py +++ b/dfvfs/vfs/ext_file_entry.py @@ -248,6 +248,10 @@ def size(self): def GetExtents(self, data_stream_name=''): """Retrieves extents of a specific data stream. + Args: + data_stream_name (Optional[str]): data stream name, where an empty + string represents the default data stream. + Returns: list[Extent]: extents of the data stream. """ diff --git a/dfvfs/vfs/file_entry.py b/dfvfs/vfs/file_entry.py index cb2b7aca..fb20cabe 100644 --- a/dfvfs/vfs/file_entry.py +++ b/dfvfs/vfs/file_entry.py @@ -311,6 +311,10 @@ def GetDataStream(self, name, case_sensitive=True): def GetExtents(self, data_stream_name=''): # pylint: disable=unused-argument """Retrieves extents of a specific data stream. + Args: + data_stream_name (Optional[str]): data stream name, where an empty + string represents the default data stream. + Returns: list[Extent]: extents of the data stream. """ diff --git a/dfvfs/vfs/hfs_data_stream.py b/dfvfs/vfs/hfs_data_stream.py new file mode 100644 index 00000000..37add57c --- /dev/null +++ b/dfvfs/vfs/hfs_data_stream.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- +"""The HFS data stream implementation.""" + +from dfvfs.vfs import data_stream + + +class HFSDataStream(data_stream.DataStream): + """File system data stream that uses pyfshfs.""" + + def __init__(self, fshfs_data_stream): + """Initializes the data stream. + + Args: + fshfs_data_stream (pyfshfs.data_stream): HFS data stream. + """ + super(HFSDataStream, self).__init__() + self._fshfs_data_stream = fshfs_data_stream + self._name = '' + + if fshfs_data_stream: + self._name = 'rsrc' + + @property + def name(self): + """str: name.""" + return self._name + + def IsDefault(self): + """Determines if the data stream is the default (data fork) data stream. + + Returns: + bool: True if the data stream is the default (data fork) data stream. + """ + return not self._fshfs_data_stream diff --git a/dfvfs/vfs/hfs_file_entry.py b/dfvfs/vfs/hfs_file_entry.py index f47eec83..a21600df 100644 --- a/dfvfs/vfs/hfs_file_entry.py +++ b/dfvfs/vfs/hfs_file_entry.py @@ -1,16 +1,20 @@ # -*- coding: utf-8 -*- """The HFS file entry implementation.""" +import copy + from dfdatetime import hfs_time as dfdatetime_hfs_time from dfdatetime import posix_time as dfdatetime_posix_time from dfvfs.lib import definitions from dfvfs.lib import errors from dfvfs.path import hfs_path_spec +from dfvfs.resolver import resolver from dfvfs.vfs import attribute from dfvfs.vfs import extent from dfvfs.vfs import file_entry from dfvfs.vfs import hfs_attribute +from dfvfs.vfs import hfs_data_stream from dfvfs.vfs import hfs_directory @@ -84,6 +88,26 @@ def _GetAttributes(self): return self._attributes + def _GetDataStreams(self): + """Retrieves the data streams. + + Returns: + list[HFSDataStream]: data streams. + """ + if self._data_streams is None: + self._data_streams = [] + + if self.entry_type == definitions.FILE_ENTRY_TYPE_FILE: + data_stream = hfs_data_stream.HFSDataStream(None) + self._data_streams.append(data_stream) + + fshfs_data_stream = self._fshfs_file_entry.get_resource_fork() + if fshfs_data_stream: + data_stream = hfs_data_stream.HFSDataStream(fshfs_data_stream) + self._data_streams.append(data_stream) + + return self._data_streams + def _GetDirectory(self): """Retrieves a directory. @@ -225,6 +249,10 @@ def size(self): def GetExtents(self, data_stream_name=''): """Retrieves extents of a specific data stream. + Args: + data_stream_name (Optional[str]): data stream name, where an empty + string represents the default data stream. + Returns: list[Extent]: extents of the data stream. """ @@ -244,8 +272,47 @@ def GetExtents(self, data_stream_name=''): extent_type=extent_type, offset=extent_offset, size=extent_size) extents.append(data_stream_extent) + elif data_stream_name == 'rsrc': + fshfs_data_stream = self._fshfs_file_entry.get_resource_fork() + if fshfs_data_stream: + for extent_index in range(fshfs_data_stream.number_of_extents): + extent_offset, extent_size, extent_flags = ( + fshfs_data_stream.get_extent(extent_index)) + + if extent_flags & 0x1: + extent_type = definitions.EXTENT_TYPE_SPARSE + else: + extent_type = definitions.EXTENT_TYPE_DATA + + data_stream_extent = extent.Extent( + extent_type=extent_type, offset=extent_offset, size=extent_size) + extents.append(data_stream_extent) + return extents + def GetFileObject(self, data_stream_name=''): + """Retrieves a file-like object of a specific data stream. + + Args: + data_stream_name (Optional[str]): name of the data stream, where an empty + string represents the default data stream. + + Returns: + FileIO: a file-like object or None if not available. + """ + if self.entry_type != definitions.FILE_ENTRY_TYPE_FILE or ( + data_stream_name and data_stream_name != 'rsrc'): + return None + + # Make sure to make the changes on a copy of the path specification, so we + # do not alter self.path_spec. + path_spec = copy.deepcopy(self.path_spec) + if data_stream_name: + setattr(path_spec, 'data_stream', data_stream_name) + + return resolver.Resolver.OpenFileObject( + path_spec, resolver_context=self._resolver_context) + def GetHFSFileEntry(self): """Retrieves the HFS file entry. diff --git a/dfvfs/vfs/ntfs_data_stream.py b/dfvfs/vfs/ntfs_data_stream.py index c0131bb2..4ba6f801 100644 --- a/dfvfs/vfs/ntfs_data_stream.py +++ b/dfvfs/vfs/ntfs_data_stream.py @@ -8,18 +8,18 @@ class NTFSDataStream(data_stream.DataStream): """File system data stream that uses pyfsntfs.""" def __init__(self, fsntfs_data_stream): - """Initializes the data stream object. + """Initializes the data stream. Args: fsntfs_data_stream (pyfsntfs.data_stream): NTFS data stream. """ super(NTFSDataStream, self).__init__() - self._fsntfs_data_stream = fsntfs_data_stream + self._name = getattr(fsntfs_data_stream, 'name', None) or '' @property def name(self): """str: name.""" - return getattr(self._fsntfs_data_stream, 'name', '') + return self._name def IsDefault(self): """Determines if the data stream is the default data stream. @@ -27,4 +27,4 @@ def IsDefault(self): Returns: bool: True if the data stream is the default data stream. """ - return not self._fsntfs_data_stream + return not bool(self._name) diff --git a/dfvfs/vfs/ntfs_file_entry.py b/dfvfs/vfs/ntfs_file_entry.py index ea17df03..38fd23bc 100644 --- a/dfvfs/vfs/ntfs_file_entry.py +++ b/dfvfs/vfs/ntfs_file_entry.py @@ -235,6 +235,10 @@ def size(self): def GetExtents(self, data_stream_name=''): """Retrieves extents of a specific data stream. + Args: + data_stream_name (Optional[str]): data stream name, where an empty + string represents the default data stream. + Returns: list[Extent]: extents of the data stream. """ diff --git a/dfvfs/vfs/tsk_data_stream.py b/dfvfs/vfs/tsk_data_stream.py index 4d36eb6b..feba20a6 100644 --- a/dfvfs/vfs/tsk_data_stream.py +++ b/dfvfs/vfs/tsk_data_stream.py @@ -9,32 +9,34 @@ class TSKDataStream(data_stream.DataStream): """File system data stream that uses pytsk3.""" - def __init__(self, file_system, pytsk_attribute): + def __init__(self, pytsk_attribute): """Initializes a data stream. Args: - file_system (TSKFileSystem): file system. pytsk_attribute (pytsk3.Attribute): TSK attribute. """ super(TSKDataStream, self).__init__() - self._file_system = file_system - self._tsk_attribute = pytsk_attribute + self._name = '' - @property - def name(self): - """str: name.""" - if self._tsk_attribute: + if pytsk_attribute: # The value of the attribute name will be None for the default # data stream. - attribute_name = getattr(self._tsk_attribute.info, 'name', None) - if attribute_name: + attribute_name = getattr(pytsk_attribute.info, 'name', None) + attribute_type = getattr(pytsk_attribute.info, 'type', None) + if attribute_type == pytsk3.TSK_FS_ATTR_TYPE_HFS_RSRC: + self._name = 'rsrc' + + elif attribute_name: try: # pytsk3 returns an UTF-8 encoded byte string. - return attribute_name.decode('utf8') + self._name = attribute_name.decode('utf8') except UnicodeError: pass - return '' + @property + def name(self): + """str: name.""" + return self._name def IsDefault(self): """Determines if the data stream is the default data stream. @@ -42,15 +44,4 @@ def IsDefault(self): Returns: bool: True if the data stream is the default data stream, false if not. """ - if not self._tsk_attribute or not self._file_system: - return True - - if self._file_system.IsHFS(): - attribute_type = getattr(self._tsk_attribute.info, 'type', None) - return attribute_type in ( - pytsk3.TSK_FS_ATTR_TYPE_HFS_DEFAULT, pytsk3.TSK_FS_ATTR_TYPE_HFS_DATA) - - if self._file_system.IsNTFS(): - return not bool(self.name) - - return True + return not bool(self._name) diff --git a/dfvfs/vfs/tsk_file_entry.py b/dfvfs/vfs/tsk_file_entry.py index 69571fda..faddeef1 100644 --- a/dfvfs/vfs/tsk_file_entry.py +++ b/dfvfs/vfs/tsk_file_entry.py @@ -365,12 +365,13 @@ def _GetDataStreams(self): """ if self._data_streams is None: if self._file_system.IsHFS(): - known_data_attribute_types = [ + known_data_attribute_types = ( pytsk3.TSK_FS_ATTR_TYPE_HFS_DEFAULT, - pytsk3.TSK_FS_ATTR_TYPE_HFS_DATA] + pytsk3.TSK_FS_ATTR_TYPE_HFS_DATA, + pytsk3.TSK_FS_ATTR_TYPE_HFS_RSRC) elif self._file_system.IsNTFS(): - known_data_attribute_types = [pytsk3.TSK_FS_ATTR_TYPE_NTFS_DATA] + known_data_attribute_types = (pytsk3.TSK_FS_ATTR_TYPE_NTFS_DATA, ) else: known_data_attribute_types = None @@ -382,7 +383,7 @@ def _GetDataStreams(self): if not known_data_attribute_types: if tsk_fs_meta_type == pytsk3.TSK_FS_META_TYPE_REG: - data_stream = tsk_data_stream.TSKDataStream(self._file_system, None) + data_stream = tsk_data_stream.TSKDataStream(None) self._data_streams.append(data_stream) else: @@ -397,8 +398,7 @@ def _GetDataStreams(self): attribute_type = getattr(pytsk_attribute.info, 'type', None) if attribute_type in known_data_attribute_types: - data_stream = tsk_data_stream.TSKDataStream( - self._file_system, pytsk_attribute) + data_stream = tsk_data_stream.TSKDataStream(pytsk_attribute) self._data_streams.append(data_stream) return self._data_streams @@ -698,6 +698,10 @@ def size(self): def GetExtents(self, data_stream_name=''): """Retrieves extents of a specific data stream. + Args: + data_stream_name (Optional[str]): data stream name, where an empty + string represents the default data stream. + Returns: list[Extent]: extents of the data stream. @@ -705,46 +709,53 @@ def GetExtents(self, data_stream_name=''): BackEndError: if pytsk3 returns a non UTF-8 formatted name or no file system block size or data stream size. """ - data_pytsk_attribute = None + data_attribute = None for pytsk_attribute in self._tsk_file: - if getattr(pytsk_attribute, 'info', None): - attribute_type = getattr(pytsk_attribute.info, 'type', None) - - name = getattr(pytsk_attribute.info, 'name', None) - if name: - try: - # pytsk3 returns an UTF-8 encoded byte string. - name = name.decode('utf8') - except UnicodeError: - raise errors.BackEndError( - 'pytsk3 returned a non UTF-8 formatted name.') - - if attribute_type == pytsk3.TSK_FS_ATTR_TYPE_HFS_DATA and ( - not name and not data_stream_name): - data_pytsk_attribute = pytsk_attribute - break + if not getattr(pytsk_attribute, 'info', None): + continue - if attribute_type == pytsk3.TSK_FS_ATTR_TYPE_NTFS_DATA and ( - (not name and not data_stream_name) or (name == data_stream_name)): - data_pytsk_attribute = pytsk_attribute - break + attribute_name = getattr(pytsk_attribute.info, 'name', None) + if attribute_name: + try: + # pytsk3 returns an UTF-8 encoded byte string. + attribute_name = attribute_name.decode('utf8') + except UnicodeError: + raise errors.BackEndError( + 'pytsk3 returned a non UTF-8 formatted name.') - # The data stream is returned as a name-less attribute of type - # pytsk3.TSK_FS_ATTR_TYPE_DEFAULT. - if (self.entry_type == definitions.FILE_ENTRY_TYPE_FILE and - attribute_type == pytsk3.TSK_FS_ATTR_TYPE_DEFAULT and - not name and not data_stream_name): - data_pytsk_attribute = pytsk_attribute - break + attribute_type = getattr(pytsk_attribute.info, 'type', None) + if attribute_type == pytsk3.TSK_FS_ATTR_TYPE_HFS_DATA and ( + not data_stream_name and not attribute_name): + data_attribute = pytsk_attribute + break + + if attribute_type == pytsk3.TSK_FS_ATTR_TYPE_HFS_RSRC and ( + data_stream_name == 'rsrc'): + data_attribute = pytsk_attribute + break + + if attribute_type == pytsk3.TSK_FS_ATTR_TYPE_NTFS_DATA and ( + (not data_stream_name and not attribute_name) or + (data_stream_name == attribute_name)): + data_attribute = pytsk_attribute + break + + # The data stream is returned as a name-less attribute of type + # pytsk3.TSK_FS_ATTR_TYPE_DEFAULT. + if (self.entry_type == definitions.FILE_ENTRY_TYPE_FILE and + attribute_type == pytsk3.TSK_FS_ATTR_TYPE_DEFAULT and + not data_stream_name and not attribute_name): + data_attribute = pytsk_attribute + break extents = [] - if data_pytsk_attribute: + if data_attribute: tsk_file_system = self._file_system.GetFsInfo() block_size = getattr(tsk_file_system.info, 'block_size', None) if not block_size: raise errors.BackEndError('pytsk3 returned no file system block size.') - data_stream_size = getattr(data_pytsk_attribute.info, 'size', None) + data_stream_size = getattr(data_attribute.info, 'size', None) if data_stream_size is None: raise errors.BackEndError('pytsk3 returned no data stream size.') @@ -754,14 +765,14 @@ def GetExtents(self, data_stream_name=''): data_stream_number_of_blocks += 1 total_number_of_blocks = 0 - for pytsk_attr_run in data_pytsk_attribute: - if pytsk_attr_run.flags & pytsk3.TSK_FS_ATTR_RUN_FLAG_SPARSE: + for tsk_attr_run in data_attribute: + if tsk_attr_run.flags & pytsk3.TSK_FS_ATTR_RUN_FLAG_SPARSE: extent_type = definitions.EXTENT_TYPE_SPARSE else: extent_type = definitions.EXTENT_TYPE_DATA - extent_offset = pytsk_attr_run.addr * block_size - extent_size = pytsk_attr_run.len + extent_offset = tsk_attr_run.addr * block_size + extent_size = tsk_attr_run.len # Note that the attribute data runs can be larger than the actual # allocated size. @@ -802,13 +813,8 @@ def GetFileObject(self, data_stream_name=''): # these differently when opened and the correct behavior seems to be # treating this as the default (nameless) fork instead. For context libtsk # 4.5.0 is unable to read the data steam and yields an error. - if self._file_system.IsHFS() and data_stream_name == 'DECOMP': - data_stream_name = '' - - setattr(path_spec, 'data_stream', data_stream_name) - - if self.entry_type != definitions.FILE_ENTRY_TYPE_FILE: - return None + if not self._file_system.IsHFS() or data_stream_name != 'DECOMP': + setattr(path_spec, 'data_stream', data_stream_name) return resolver.Resolver.OpenFileObject( path_spec, resolver_context=self._resolver_context) diff --git a/dfvfs/vfs/xfs_file_entry.py b/dfvfs/vfs/xfs_file_entry.py index 26d382f8..24b8a985 100644 --- a/dfvfs/vfs/xfs_file_entry.py +++ b/dfvfs/vfs/xfs_file_entry.py @@ -213,6 +213,10 @@ def size(self): def GetExtents(self, data_stream_name=''): """Retrieves extents of a specific data stream. + Args: + data_stream_name (Optional[str]): data stream name, where an empty + string represents the default data stream. + Returns: list[Extent]: extents of the data stream. """ diff --git a/requirements.txt b/requirements.txt index b6bef401..8a950eae 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,7 +8,7 @@ libbde-python >= 20140531 libewf-python >= 20131210 libfsapfs-python >= 20201107 libfsext-python >= 20220112 -libfshfs-python >= 20220113 +libfshfs-python >= 20220114 libfsntfs-python >= 20211229 libfsxfs-python >= 20220113 libfvde-python >= 20160719 diff --git a/setup.cfg b/setup.cfg index a7fcd686..63d87665 100644 --- a/setup.cfg +++ b/setup.cfg @@ -21,7 +21,7 @@ requires = libbde-python3 >= 20140531 libewf-python3 >= 20131210 libfsapfs-python3 >= 20201107 libfsext-python3 >= 20220112 - libfshfs-python3 >= 20220113 + libfshfs-python3 >= 20220114 libfsntfs-python3 >= 20211229 libfsxfs-python3 >= 20220113 libfvde-python3 >= 20160719 diff --git a/tests/file_io/hfs_file_io.py b/tests/file_io/hfs_file_io.py index b4895002..105042f2 100644 --- a/tests/file_io/hfs_file_io.py +++ b/tests/file_io/hfs_file_io.py @@ -81,6 +81,15 @@ def testRead(self): self._TestRead(file_object) + def testReadResourceFork(self): + """Test the read functionality on a resource fork.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_HFS, data_stream='rsrc', identifier=25, + location='/a_directory/a_resourcefork', parent=self._raw_path_spec) + file_object = hfs_file_io.HFSFile(self._resolver_context, path_spec) + + self._TestReadResourceFork(file_object) + if __name__ == '__main__': unittest.main() diff --git a/tests/file_io/test_lib.py b/tests/file_io/test_lib.py index 6b986b1a..71c7ea38 100644 --- a/tests/file_io/test_lib.py +++ b/tests/file_io/test_lib.py @@ -215,6 +215,26 @@ def _TestRead(self, file_object): # TODO: add boundary scenarios. + def _TestReadResourceFork(self, file_object): + """Test the read functionality on a resource fork. + + Args: + file_object (FileIO): file-like object. + """ + file_object.Open() + + expected_buffer = b'My resource fork' + + read_buffer = file_object.read(size=16) + self.assertEqual(read_buffer, expected_buffer) + + file_object.seek(-8, os.SEEK_END) + + expected_buffer = b'ce fork\n' + + read_buffer = file_object.read(size=16) + self.assertEqual(read_buffer, expected_buffer) + class ImageFileTestCase(shared_test_lib.BaseTestCase): """The unit test case for storage media image based test data.""" diff --git a/tests/file_io/tsk_file_io.py b/tests/file_io/tsk_file_io.py index 7c47371c..e3577ca0 100644 --- a/tests/file_io/tsk_file_io.py +++ b/tests/file_io/tsk_file_io.py @@ -193,6 +193,15 @@ def testRead(self): self._TestRead(file_object) + def testReadResourceFork(self): + """Test the read functionality on a resource fork.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_TSK, data_stream='rsrc', inode=25, + location='/a_directory/a_resourcefork', parent=self._raw_path_spec) + file_object = tsk_file_io.TSKFile(self._resolver_context, path_spec) + + self._TestReadResourceFork(file_object) + class TSKFileTestNTFS(test_lib.NTFSImageFileTestCase): """Tests the SleuthKit (TSK) file-like object on NTFS.""" diff --git a/tests/path/hfs_path_spec.py b/tests/path/hfs_path_spec.py index 48a4824f..c7483551 100644 --- a/tests/path/hfs_path_spec.py +++ b/tests/path/hfs_path_spec.py @@ -19,6 +19,11 @@ def testInitialize(self): self.assertIsNotNone(path_spec) + path_spec = hfs_path_spec.HFSPathSpec( + data_stream='test', location='/test', parent=self._path_spec) + + self.assertIsNotNone(path_spec) + path_spec = hfs_path_spec.HFSPathSpec( identifier=1, parent=self._path_spec) @@ -56,6 +61,19 @@ def testComparable(self): self.assertEqual(path_spec.comparable, expected_comparable) + path_spec = hfs_path_spec.HFSPathSpec( + data_stream='test', location='/test', parent=self._path_spec) + + self.assertIsNotNone(path_spec) + + expected_comparable = '\n'.join([ + 'type: TEST', + 'type: HFS, data stream: test, location: /test', + '']) + + self.assertEqual(path_spec.comparable, expected_comparable) + + path_spec = hfs_path_spec.HFSPathSpec( identifier=1, parent=self._path_spec) diff --git a/tests/vfs/hfs_data_stream.py b/tests/vfs/hfs_data_stream.py new file mode 100644 index 00000000..5e752476 --- /dev/null +++ b/tests/vfs/hfs_data_stream.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +"""Tests for the data stream implementation using pyfshfs.""" + +import unittest + +from dfvfs.vfs import hfs_data_stream + +from tests import test_lib as shared_test_lib + + +class HFSDataStreamTest(shared_test_lib.BaseTestCase): + """Tests the HFS data stream.""" + + def testName(self): + """Test the name property.""" + test_data_stream = hfs_data_stream.HFSDataStream(None) + self.assertEqual(test_data_stream.name, '') + + def testIsDefault(self): + """Test the IsDefault function.""" + test_data_stream = hfs_data_stream.HFSDataStream(None) + result = test_data_stream.IsDefault() + self.assertTrue(result) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/vfs/hfs_file_entry.py b/tests/vfs/hfs_file_entry.py index d2047b0a..e3a26872 100644 --- a/tests/vfs/hfs_file_entry.py +++ b/tests/vfs/hfs_file_entry.py @@ -53,6 +53,103 @@ def testInitialize(self): self.assertIsNotNone(file_entry) + def testGetAttributes(self): + """Tests the _GetAttributes function.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_HFS, identifier=self._IDENTIFIER_A_FILE, + location='/a_directory/a_file', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + self.assertIsNone(file_entry._attributes) + + file_entry._GetAttributes() + self.assertIsNotNone(file_entry._attributes) + self.assertEqual(len(file_entry._attributes), 1) + + test_attribute = file_entry._attributes[0] + self.assertIsInstance(test_attribute, hfs_attribute.HFSExtendedAttribute) + self.assertEqual(test_attribute.name, 'myxattr') + + test_attribute_value_data = test_attribute.read() + self.assertEqual(test_attribute_value_data, b'My extended attribute') + + def testGetDataStreams(self): + """Tests the _GetDataStreams function.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_HFS, + identifier=self._IDENTIFIER_ANOTHER_FILE, + location='/a_directory/another_file', + parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + data_streams = file_entry._GetDataStreams() + self.assertEqual(len(data_streams), 1) + + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_HFS, identifier=25, + location='/a_directory/a_resourcefork', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + data_streams = file_entry._GetDataStreams() + self.assertEqual(len(data_streams), 2) + + def testGetStat(self): + """Tests the _GetStat function.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_HFS, + identifier=self._IDENTIFIER_ANOTHER_FILE, + location='/a_directory/another_file', + parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + stat_object = file_entry._GetStat() + + self.assertIsNotNone(stat_object) + self.assertEqual(stat_object.type, stat_object.TYPE_FILE) + self.assertEqual(stat_object.size, 22) + + self.assertEqual(stat_object.mode, 0o644) + self.assertEqual(stat_object.uid, 501) + self.assertEqual(stat_object.gid, 20) + + self.assertEqual(stat_object.atime, 1642144782) + self.assertFalse(hasattr(stat_object, 'atime_nano')) + + self.assertEqual(stat_object.ctime, 1642144782) + self.assertFalse(hasattr(stat_object, 'ctime_nano')) + + self.assertEqual(stat_object.crtime, 1642144782) + self.assertFalse(hasattr(stat_object, 'crtime_nano')) + + self.assertEqual(stat_object.mtime, 1642144782) + self.assertFalse(hasattr(stat_object, 'mtime_nano')) + + def testGetStatAttribute(self): + """Tests the _GetStatAttribute function.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_HFS, + identifier=self._IDENTIFIER_ANOTHER_FILE, + location='/a_directory/another_file', + parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + stat_attribute = file_entry._GetStatAttribute() + + self.assertIsNotNone(stat_attribute) + self.assertEqual(stat_attribute.group_identifier, 20) + self.assertEqual(stat_attribute.inode_number, 21) + self.assertEqual(stat_attribute.mode, 0o100644) + # TODO: implement number of hard links support in pyfshfs + # self.assertEqual(stat_attribute.number_of_links, 1) + self.assertEqual(stat_attribute.owner_identifier, 501) + self.assertEqual(stat_attribute.size, 22) + self.assertEqual(stat_attribute.type, stat_attribute.TYPE_FILE) + def testAccessTime(self): """Test the access_time property.""" path_spec = path_spec_factory.Factory.NewPathSpec( @@ -101,6 +198,52 @@ def testCreationTime(self): self.assertIsNotNone(file_entry) self.assertIsNotNone(file_entry.creation_time) + def testDataStreams(self): + """Tests the data_streams property.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_HFS, + identifier=self._IDENTIFIER_ANOTHER_FILE, + location='/a_directory/another_file', + parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + self.assertEqual(file_entry.number_of_data_streams, 1) + + data_stream_names = [] + for data_stream in file_entry.data_streams: + data_stream_names.append(data_stream.name) + + self.assertEqual(data_stream_names, ['']) + + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_HFS, identifier=self._IDENTIFIER_A_DIRECTORY, + location='/a_directory', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + self.assertEqual(file_entry.number_of_data_streams, 0) + + data_stream_names = [] + for data_stream in file_entry.data_streams: + data_stream_names.append(data_stream.name) + + self.assertEqual(data_stream_names, []) + + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_HFS, identifier=25, + location='/a_directory/a_resourcefork', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + self.assertEqual(file_entry.number_of_data_streams, 2) + + data_stream_names = [] + for data_stream in file_entry.data_streams: + data_stream_names.append(data_stream.name) + + self.assertEqual(data_stream_names, ['', 'rsrc']) + def testModificationTime(self): """Test the modification_time property.""" path_spec = path_spec_factory.Factory.NewPathSpec( @@ -125,61 +268,44 @@ def testSize(self): self.assertIsNotNone(file_entry) self.assertEqual(file_entry.size, 22) - def testGetAttributes(self): - """Tests the _GetAttributes function.""" + def testSubFileEntries(self): + """Tests the number_of_sub_file_entries and sub_file_entries properties.""" path_spec = path_spec_factory.Factory.NewPathSpec( - definitions.TYPE_INDICATOR_HFS, identifier=self._IDENTIFIER_A_FILE, - location='/a_directory/a_file', parent=self._raw_path_spec) + definitions.TYPE_INDICATOR_HFS, location='/', + parent=self._raw_path_spec) file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNotNone(file_entry) - self.assertIsNone(file_entry._attributes) + self.assertEqual(file_entry.number_of_sub_file_entries, 6) - file_entry._GetAttributes() - self.assertIsNotNone(file_entry._attributes) - self.assertEqual(len(file_entry._attributes), 1) + expected_sub_file_entry_names = [ + '.fseventsd', + '.HFS+ Private Directory Data\r', + 'a_directory', + 'a_link', + 'passwords.txt', + '\u2400\u2400\u2400\u2400HFS+ Private Data'] - test_attribute = file_entry._attributes[0] - self.assertIsInstance(test_attribute, hfs_attribute.HFSExtendedAttribute) - self.assertEqual(test_attribute.name, 'myxattr') + sub_file_entry_names = [] + for sub_file_entry in file_entry.sub_file_entries: + sub_file_entry_names.append(sub_file_entry.name) - test_attribute_value_data = test_attribute.read() - self.assertEqual(test_attribute_value_data, b'My extended attribute') + self.assertEqual( + len(sub_file_entry_names), len(expected_sub_file_entry_names)) + self.assertEqual( + sorted(sub_file_entry_names), sorted(expected_sub_file_entry_names)) - def testGetStat(self): - """Tests the _GetStat function.""" + # Test a path specification without a location. path_spec = path_spec_factory.Factory.NewPathSpec( - definitions.TYPE_INDICATOR_HFS, - identifier=self._IDENTIFIER_ANOTHER_FILE, - location='/a_directory/another_file', + definitions.TYPE_INDICATOR_HFS, identifier=self._IDENTIFIER_A_DIRECTORY, parent=self._raw_path_spec) file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNotNone(file_entry) - stat_object = file_entry._GetStat() - - self.assertIsNotNone(stat_object) - self.assertEqual(stat_object.type, stat_object.TYPE_FILE) - self.assertEqual(stat_object.size, 22) - - self.assertEqual(stat_object.mode, 0o644) - self.assertEqual(stat_object.uid, 501) - self.assertEqual(stat_object.gid, 20) - - self.assertEqual(stat_object.atime, 1642144782) - self.assertFalse(hasattr(stat_object, 'atime_nano')) - - self.assertEqual(stat_object.ctime, 1642144782) - self.assertFalse(hasattr(stat_object, 'ctime_nano')) - - self.assertEqual(stat_object.crtime, 1642144782) - self.assertFalse(hasattr(stat_object, 'crtime_nano')) - - self.assertEqual(stat_object.mtime, 1642144782) - self.assertFalse(hasattr(stat_object, 'mtime_nano')) + self.assertEqual(file_entry.number_of_sub_file_entries, 3) - def testGetStatAttribute(self): - """Tests the _GetStatAttribute function.""" + def testGetDataStream(self): + """Tests the GetDataStream function.""" path_spec = path_spec_factory.Factory.NewPathSpec( definitions.TYPE_INDICATOR_HFS, identifier=self._IDENTIFIER_ANOTHER_FILE, @@ -188,17 +314,17 @@ def testGetStatAttribute(self): file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNotNone(file_entry) - stat_attribute = file_entry._GetStatAttribute() + data_stream = file_entry.GetDataStream('') + self.assertIsNotNone(data_stream) - self.assertIsNotNone(stat_attribute) - self.assertEqual(stat_attribute.group_identifier, 20) - self.assertEqual(stat_attribute.inode_number, 21) - self.assertEqual(stat_attribute.mode, 0o100644) - # TODO: implement number of hard links support in pyfshfs - # self.assertEqual(stat_attribute.number_of_links, 1) - self.assertEqual(stat_attribute.owner_identifier, 501) - self.assertEqual(stat_attribute.size, 22) - self.assertEqual(stat_attribute.type, stat_attribute.TYPE_FILE) + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_HFS, identifier=25, + location='/a_directory/a_resourcefork', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + data_stream = file_entry.GetDataStream('rsrc') + self.assertIsNotNone(data_stream) def testGetExtents(self): """Tests the GetExtents function.""" @@ -226,6 +352,19 @@ def testGetExtents(self): extents = file_entry.GetExtents() self.assertEqual(len(extents), 0) + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_HFS, identifier=25, + location='/a_directory/a_resourcefork', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + extents = file_entry.GetExtents(data_stream_name='rsrc') + self.assertEqual(len(extents), 1) + + self.assertEqual(extents[0].extent_type, definitions.EXTENT_TYPE_DATA) + self.assertEqual(extents[0].offset, 1142784) + self.assertEqual(extents[0].size, 4096) + def testGetFileEntryByPathSpec(self): """Tests the GetFileEntryByPathSpec function.""" path_spec = path_spec_factory.Factory.NewPathSpec( @@ -235,6 +374,44 @@ def testGetFileEntryByPathSpec(self): self.assertIsNotNone(file_entry) + def testGetFileObject(self): + """Tests the GetFileObject function.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_HFS, + identifier=self._IDENTIFIER_ANOTHER_FILE, + location='/a_directory/another_file', + parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + file_object = file_entry.GetFileObject() + self.assertIsNotNone(file_object) + + self.assertEqual(file_object.get_size(), 22) + + file_object = file_entry.GetFileObject(data_stream_name='bogus') + self.assertIsNone(file_object) + + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_HFS, identifier=self._IDENTIFIER_A_DIRECTORY, + location='/a_directory', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + file_object = file_entry.GetFileObject() + self.assertIsNone(file_object) + + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_HFS, identifier=25, + location='/a_directory/a_resourcefork', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + file_object = file_entry.GetFileObject(data_stream_name='rsrc') + self.assertIsNotNone(file_object) + + self.assertEqual(file_object.get_size(), 17) + def testGetLinkedFileEntry(self): """Tests the GetLinkedFileEntry function.""" path_spec = path_spec_factory.Factory.NewPathSpec( @@ -320,88 +497,6 @@ def testIsFunctions(self): self.assertFalse(file_entry.IsPipe()) self.assertFalse(file_entry.IsSocket()) - def testSubFileEntries(self): - """Tests the number_of_sub_file_entries and sub_file_entries properties.""" - path_spec = path_spec_factory.Factory.NewPathSpec( - definitions.TYPE_INDICATOR_HFS, location='/', - parent=self._raw_path_spec) - file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) - self.assertIsNotNone(file_entry) - - self.assertEqual(file_entry.number_of_sub_file_entries, 6) - - expected_sub_file_entry_names = [ - '.fseventsd', - '.HFS+ Private Directory Data\r', - 'a_directory', - 'a_link', - 'passwords.txt', - '\u2400\u2400\u2400\u2400HFS+ Private Data'] - - sub_file_entry_names = [] - for sub_file_entry in file_entry.sub_file_entries: - sub_file_entry_names.append(sub_file_entry.name) - - self.assertEqual( - len(sub_file_entry_names), len(expected_sub_file_entry_names)) - self.assertEqual( - sorted(sub_file_entry_names), sorted(expected_sub_file_entry_names)) - - # Test a path specification without a location. - path_spec = path_spec_factory.Factory.NewPathSpec( - definitions.TYPE_INDICATOR_HFS, identifier=self._IDENTIFIER_A_DIRECTORY, - parent=self._raw_path_spec) - file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) - self.assertIsNotNone(file_entry) - - self.assertEqual(file_entry.number_of_sub_file_entries, 3) - - def testDataStreams(self): - """Tests the data streams functionality.""" - path_spec = path_spec_factory.Factory.NewPathSpec( - definitions.TYPE_INDICATOR_HFS, - identifier=self._IDENTIFIER_ANOTHER_FILE, - location='/a_directory/another_file', - parent=self._raw_path_spec) - file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) - self.assertIsNotNone(file_entry) - - self.assertEqual(file_entry.number_of_data_streams, 1) - - data_stream_names = [] - for data_stream in file_entry.data_streams: - data_stream_names.append(data_stream.name) - - self.assertEqual(data_stream_names, ['']) - - path_spec = path_spec_factory.Factory.NewPathSpec( - definitions.TYPE_INDICATOR_HFS, identifier=self._IDENTIFIER_A_DIRECTORY, - location='/a_directory', parent=self._raw_path_spec) - file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) - self.assertIsNotNone(file_entry) - - self.assertEqual(file_entry.number_of_data_streams, 0) - - data_stream_names = [] - for data_stream in file_entry.data_streams: - data_stream_names.append(data_stream.name) - - self.assertEqual(data_stream_names, []) - - def testGetDataStream(self): - """Tests the GetDataStream function.""" - path_spec = path_spec_factory.Factory.NewPathSpec( - definitions.TYPE_INDICATOR_HFS, - identifier=self._IDENTIFIER_ANOTHER_FILE, - location='/a_directory/another_file', - parent=self._raw_path_spec) - file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) - self.assertIsNotNone(file_entry) - - data_stream_name = '' - data_stream = file_entry.GetDataStream(data_stream_name) - self.assertIsNotNone(data_stream) - if __name__ == '__main__': unittest.main() diff --git a/tests/vfs/ntfs_data_stream.py b/tests/vfs/ntfs_data_stream.py index b08d14ba..2d4bfb9a 100644 --- a/tests/vfs/ntfs_data_stream.py +++ b/tests/vfs/ntfs_data_stream.py @@ -20,7 +20,8 @@ def testName(self): def testIsDefault(self): """Test the IsDefault function.""" test_data_stream = ntfs_data_stream.NTFSDataStream(None) - self.assertTrue(test_data_stream.IsDefault()) + result = test_data_stream.IsDefault() + self.assertTrue(result) if __name__ == '__main__': diff --git a/tests/vfs/ntfs_file_entry.py b/tests/vfs/ntfs_file_entry.py index 7a43751b..45a9187f 100644 --- a/tests/vfs/ntfs_file_entry.py +++ b/tests/vfs/ntfs_file_entry.py @@ -20,6 +20,7 @@ class NTFSFileEntryTest(shared_test_lib.BaseTestCase): _MFT_ENTRY_A_DIRECTORY = 64 _MFT_ENTRY_A_FILE = 65 + _MFT_ENTRY_ANOTHER_FILE = 67 _MFT_ENTRY_PASSWORDS_TXT = 66 def setUp(self): @@ -52,7 +53,27 @@ def testIntialize(self): self.assertIsNotNone(file_entry) # TODO: add tests for _GetAttributes - # TODO: add tests for _GetDataStreams + + def testGetDataStreams(self): + """Tests the _GetDataStreams function.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_NTFS, location='\\a_directory\\another_file', + mft_entry=self._MFT_ENTRY_ANOTHER_FILE, parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + data_streams = file_entry._GetDataStreams() + self.assertEqual(len(data_streams), 1) + + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_NTFS, location='\\$UpCase', mft_entry=10, + parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + data_streams = file_entry._GetDataStreams() + self.assertEqual(len(data_streams), 2) + # TODO: add tests for _GetDirectory # TODO: add tests for _GetLink @@ -646,15 +667,14 @@ def testDataStream(self): def testGetDataStream(self): """Tests the GetDataStream function.""" path_spec = path_spec_factory.Factory.NewPathSpec( - definitions.TYPE_INDICATOR_NTFS, location='\\a_directory\\a_file', - mft_entry=self._MFT_ENTRY_A_FILE, parent=self._raw_path_spec) + definitions.TYPE_INDICATOR_NTFS, location='\\a_directory\\another_file', + mft_entry=self._MFT_ENTRY_ANOTHER_FILE, parent=self._raw_path_spec) file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNotNone(file_entry) - data_stream_name = '' - data_stream = file_entry.GetDataStream(data_stream_name) + data_stream = file_entry.GetDataStream('') self.assertIsNotNone(data_stream) - self.assertEqual(data_stream.name, data_stream_name) + self.assertEqual(data_stream.name, '') data_stream = file_entry.GetDataStream('bogus') self.assertIsNone(data_stream) @@ -665,10 +685,9 @@ def testGetDataStream(self): file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNotNone(file_entry) - data_stream_name = '$Info' - data_stream = file_entry.GetDataStream(data_stream_name) + data_stream = file_entry.GetDataStream('$Info') self.assertIsNotNone(data_stream) - self.assertEqual(data_stream.name, data_stream_name) + self.assertEqual(data_stream.name, '$Info') def testGetSecurityDescriptor(self): """Tests the GetSecurityDescriptor function.""" diff --git a/tests/vfs/tsk_data_stream.py b/tests/vfs/tsk_data_stream.py index 0f95e911..2e5bcc90 100644 --- a/tests/vfs/tsk_data_stream.py +++ b/tests/vfs/tsk_data_stream.py @@ -4,11 +4,7 @@ import unittest -from dfvfs.lib import definitions -from dfvfs.path import factory as path_spec_factory -from dfvfs.resolver import context from dfvfs.vfs import tsk_data_stream -from dfvfs.vfs import tsk_file_system from tests import test_lib as shared_test_lib @@ -16,38 +12,14 @@ class TSKDataStreamTest(shared_test_lib.BaseTestCase): """Tests the SleuthKit (TSK) data stream.""" - def setUp(self): - """Sets up the needed objects used throughout the test.""" - self._resolver_context = context.Context() - test_path = self._GetTestFilePath(['ext2.raw']) - self._SkipIfPathNotExists(test_path) - - test_os_path_spec = path_spec_factory.Factory.NewPathSpec( - definitions.TYPE_INDICATOR_OS, location=test_path) - self._raw_path_spec = path_spec_factory.Factory.NewPathSpec( - definitions.TYPE_INDICATOR_RAW, parent=test_os_path_spec) - self._tsk_path_spec = path_spec_factory.Factory.NewPathSpec( - definitions.TYPE_INDICATOR_TSK, location='/', - parent=self._raw_path_spec) - - self._file_system = tsk_file_system.TSKFileSystem( - self._resolver_context, self._tsk_path_spec) - self._file_system.Open() - - def tearDown(self): - """Cleans up the needed objects used throughout the test.""" - self._resolver_context.Empty() - def testName(self): """Test the name property.""" - test_data_stream = tsk_data_stream.TSKDataStream( - self._file_system, None) + test_data_stream = tsk_data_stream.TSKDataStream(None) self.assertEqual(test_data_stream.name, '') def testIsDefault(self): """Test the IsDefault function.""" - test_data_stream = tsk_data_stream.TSKDataStream( - self._file_system, None) + test_data_stream = tsk_data_stream.TSKDataStream(None) self.assertTrue(test_data_stream.IsDefault()) diff --git a/tests/vfs/tsk_file_entry.py b/tests/vfs/tsk_file_entry.py index 763d59a6..f05fde26 100644 --- a/tests/vfs/tsk_file_entry.py +++ b/tests/vfs/tsk_file_entry.py @@ -7,6 +7,7 @@ import pytsk3 from dfvfs.lib import definitions +from dfvfs.lib import errors from dfvfs.path import factory as path_spec_factory from dfvfs.resolver import context from dfvfs.vfs import tsk_attribute @@ -158,7 +159,17 @@ def testGetAttributes(self): # No extended attributes are returned. # Also see: https://github.com/py4n6/pytsk/issues/79. - # TODO: add tests for _GetDataStreams + def testGetDataStreams(self): + """Tests the _GetDataStreams function.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_TSK, inode=self._INODE_ANOTHER_FILE, + location='/a_directory/another_file', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + data_streams = file_entry._GetDataStreams() + self.assertEqual(len(data_streams), 1) + # TODO: add tests for _GetDirectory # TODO: add tests for _GetLink @@ -351,8 +362,8 @@ def testGetFileObject(self): file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNotNone(file_entry) - file_object = file_entry.GetFileObject() - self.assertIsNone(file_object) + with self.assertRaises(errors.BackEndError): + file_entry.GetFileObject() def testGetLinkedFileEntry(self): """Tests the GetLinkedFileEntry function.""" @@ -512,8 +523,7 @@ def testGetDataStream(self): file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNotNone(file_entry) - data_stream_name = '' - data_stream = file_entry.GetDataStream(data_stream_name) + data_stream = file_entry.GetDataStream('') self.assertIsNotNone(data_stream) @@ -569,7 +579,17 @@ def testGetAttributes(self): self.assertIsNotNone(file_entry._attributes) self.assertEqual(len(file_entry._attributes), 0) - # TODO: add tests for _GetDataStreams + def testGetDataStreams(self): + """Tests the _GetDataStreams function.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_TSK, inode=self._INODE_ANOTHER_FILE, + location='/a_directory/another_file', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + data_streams = file_entry._GetDataStreams() + self.assertEqual(len(data_streams), 1) + # TODO: add tests for _GetDirectory # TODO: add tests for _GetLink @@ -740,8 +760,8 @@ def testGetFileObject(self): file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNotNone(file_entry) - file_object = file_entry.GetFileObject() - self.assertIsNone(file_object) + with self.assertRaises(errors.BackEndError): + file_entry.GetFileObject() # TODO: add tests for GetLinkedFileEntry @@ -891,8 +911,7 @@ def testGetDataStream(self): file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNotNone(file_entry) - data_stream_name = '' - data_stream = file_entry.GetDataStream(data_stream_name) + data_stream = file_entry.GetDataStream('') self.assertIsNotNone(data_stream) @@ -956,7 +975,26 @@ def testGetAttributes(self): test_attribute_value_data = test_attribute.read() self.assertEqual(test_attribute_value_data, b'My extended attribute') - # TODO: add tests for _GetDataStreams + def testGetDataStreams(self): + """Tests the _GetDataStreams function.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_TSK, inode=self._INODE_ANOTHER_FILE, + location='/a_directory/another_file', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + data_streams = file_entry._GetDataStreams() + self.assertEqual(len(data_streams), 1) + + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_TSK, inode=25, + location='/a_directory/a_resourcefork', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + data_streams = file_entry._GetDataStreams() + self.assertEqual(len(data_streams), 2) + # TODO: add tests for _GetDirectory # TODO: add tests for _GetLink @@ -1117,6 +1155,28 @@ def testGetExtents(self): extents = file_entry.GetExtents() self.assertEqual(len(extents), 0) + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_TSK, inode=25, + location='/a_directory/a_resourcefork', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + extents = file_entry.GetExtents(data_stream_name='rsrc') + self.assertEqual(len(extents), 1) + + self.assertEqual(extents[0].extent_type, definitions.EXTENT_TYPE_DATA) + self.assertEqual(extents[0].offset, 1142784) + self.assertEqual(extents[0].size, 4096) + + def testGetFileEntryByPathSpec(self): + """Tests the GetFileEntryByPathSpec function.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_TSK, inode=self._INODE_ANOTHER_FILE, + parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + + self.assertIsNotNone(file_entry) + def testGetFileObject(self): """Tests the GetFileObject function.""" path_spec = path_spec_factory.Factory.NewPathSpec( @@ -1139,18 +1199,20 @@ def testGetFileObject(self): file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNotNone(file_entry) - file_object = file_entry.GetFileObject() - self.assertIsNone(file_object) + with self.assertRaises(errors.BackEndError): + file_entry.GetFileObject() - def testGetFileEntryByPathSpec(self): - """Tests the GetFileEntryByPathSpec function.""" path_spec = path_spec_factory.Factory.NewPathSpec( - definitions.TYPE_INDICATOR_TSK, inode=self._INODE_ANOTHER_FILE, - parent=self._raw_path_spec) + definitions.TYPE_INDICATOR_TSK, inode=25, + location='/a_directory/a_resourcefork', parent=self._raw_path_spec) file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) - self.assertIsNotNone(file_entry) + file_object = file_entry.GetFileObject(data_stream_name='rsrc') + self.assertIsNotNone(file_object) + + self.assertEqual(file_object.get_size(), 17) + def testGetLinkedFileEntry(self): """Tests the GetLinkedFileEntry function.""" path_spec = path_spec_factory.Factory.NewPathSpec( @@ -1305,6 +1367,20 @@ def testDataStreams(self): self.assertEqual(data_stream_names, []) + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_TSK, inode=25, + location='/a_directory/a_resourcefork', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + self.assertEqual(file_entry.number_of_data_streams, 2) + + data_stream_names = [] + for data_stream in file_entry.data_streams: + data_stream_names.append(data_stream.name) + + self.assertEqual(data_stream_names, ['', 'rsrc']) + def testGetDataStream(self): """Tests the GetDataStream function.""" path_spec = path_spec_factory.Factory.NewPathSpec( @@ -1313,8 +1389,16 @@ def testGetDataStream(self): file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNotNone(file_entry) - data_stream_name = '' - data_stream = file_entry.GetDataStream(data_stream_name) + data_stream = file_entry.GetDataStream('') + self.assertIsNotNone(data_stream) + + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_TSK, inode=25, + location='/a_directory/a_resourcefork', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + data_stream = file_entry.GetDataStream('rsrc') self.assertIsNotNone(data_stream) @@ -1368,7 +1452,26 @@ def testGetAttributes(self): self.assertEqual( test_attribute.attribute_type, pytsk3.TSK_FS_ATTR_TYPE_NTFS_SI) - # TODO: add tests for _GetDataStreams + def testGetDataStreams(self): + """Tests the _GetDataStreams function.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_TSK, inode=self._MFT_ENTRY_ANOTHER_FILE, + location='/a_directory/another_file', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + data_streams = file_entry._GetDataStreams() + self.assertEqual(len(data_streams), 1) + + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_TSK, inode=10, location='/$UpCase', + parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + data_streams = file_entry._GetDataStreams() + self.assertEqual(len(data_streams), 2) + # TODO: add tests for _GetDirectory # TODO: add tests for _GetLink @@ -1560,15 +1663,14 @@ def testDataStream(self): def testGetDataStream(self): """Tests the retrieve data stream functionality.""" path_spec = path_spec_factory.Factory.NewPathSpec( - definitions.TYPE_INDICATOR_TSK, inode=self._MFT_ENTRY_A_FILE, - location='/a_directory/a_file', parent=self._raw_path_spec) + definitions.TYPE_INDICATOR_TSK, inode=self._MFT_ENTRY_ANOTHER_FILE, + location='/a_directory/another_file', parent=self._raw_path_spec) file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNotNone(file_entry) - data_stream_name = '' - data_stream = file_entry.GetDataStream(data_stream_name) + data_stream = file_entry.GetDataStream('') self.assertIsNotNone(data_stream) - self.assertEqual(data_stream.name, data_stream_name) + self.assertEqual(data_stream.name, '') data_stream = file_entry.GetDataStream('bogus') self.assertIsNone(data_stream) @@ -1579,10 +1681,9 @@ def testGetDataStream(self): file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNotNone(file_entry) - data_stream_name = '$Info' - data_stream = file_entry.GetDataStream(data_stream_name) + data_stream = file_entry.GetDataStream('$Info') self.assertIsNotNone(data_stream) - self.assertEqual(data_stream.name, data_stream_name) + self.assertEqual(data_stream.name, '$Info') def testGetExtents(self): """Tests the GetExtents function.""" @@ -1636,8 +1737,8 @@ def testGetFileObject(self): file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) self.assertIsNotNone(file_entry) - file_object = file_entry.GetFileObject() - self.assertIsNone(file_object) + with self.assertRaises(errors.BackEndError): + file_entry.GetFileObject() if __name__ == '__main__':