From 823e9785a32210369414a794a3b3da93842a3850 Mon Sep 17 00:00:00 2001 From: Joachim Metz Date: Fri, 14 Jan 2022 22:55:33 +0100 Subject: [PATCH] Added GetExtents method to data stream #597 (#638) --- dfvfs/vfs/apfs_file_entry.py | 39 ++-- dfvfs/vfs/data_stream.py | 25 ++- dfvfs/vfs/ext_file_entry.py | 39 ++-- dfvfs/vfs/file_entry.py | 12 +- dfvfs/vfs/hfs_data_stream.py | 40 ++-- dfvfs/vfs/hfs_file_entry.py | 59 ++---- dfvfs/vfs/ntfs_data_stream.py | 42 +++-- dfvfs/vfs/ntfs_file_entry.py | 58 ++---- dfvfs/vfs/tsk_data_stream.py | 71 ++++++-- dfvfs/vfs/tsk_file_entry.py | 53 ++---- dfvfs/vfs/xfs_file_entry.py | 39 ++-- tests/vfs/data_stream.py | 13 +- tests/vfs/hfs_data_stream.py | 91 +++++++++- tests/vfs/hfs_file_entry.py | 13 -- tests/vfs/ntfs_data_stream.py | 95 +++++++++- tests/vfs/ntfs_file_entry.py | 332 +++++++++++++++++----------------- tests/vfs/tsk_data_stream.py | 304 ++++++++++++++++++++++++++++++- tests/vfs/tsk_file_entry.py | 17 -- 18 files changed, 902 insertions(+), 440 deletions(-) diff --git a/dfvfs/vfs/apfs_file_entry.py b/dfvfs/vfs/apfs_file_entry.py index 0d1a0618..09d5850f 100644 --- a/dfvfs/vfs/apfs_file_entry.py +++ b/dfvfs/vfs/apfs_file_entry.py @@ -208,31 +208,28 @@ def GetAPFSFileEntry(self): """ return self._fsapfs_file_entry - def GetExtents(self, data_stream_name=''): - """Retrieves extents of a specific data stream. - - Args: - data_stream_name (Optional[str]): data stream name, where an empty - string represents the default data stream. + def GetExtents(self): + """Retrieves the extents. Returns: - list[Extent]: extents of the data stream. + list[Extent]: the extents. """ + if self.entry_type != definitions.FILE_ENTRY_TYPE_FILE: + return [] + extents = [] - if (self.entry_type == definitions.FILE_ENTRY_TYPE_FILE and - not data_stream_name): - for extent_index in range(self._fsapfs_file_entry.number_of_extents): - extent_offset, extent_size, extent_flags = ( - self._fsapfs_file_entry.get_extent(extent_index)) - - if extent_flags & 0x1: - extent_type = definitions.EXTENT_TYPE_SPARSE - else: - extent_type = definitions.EXTENT_TYPE_DATA - - data_stream_extent = extent.Extent( - extent_type=extent_type, offset=extent_offset, size=extent_size) - extents.append(data_stream_extent) + for extent_index in range(self._fsapfs_file_entry.number_of_extents): + extent_offset, extent_size, extent_flags = ( + self._fsapfs_file_entry.get_extent(extent_index)) + + if extent_flags & 0x1: + extent_type = definitions.EXTENT_TYPE_SPARSE + else: + extent_type = definitions.EXTENT_TYPE_DATA + + data_stream_extent = extent.Extent( + extent_type=extent_type, offset=extent_offset, size=extent_size) + extents.append(data_stream_extent) return extents diff --git a/dfvfs/vfs/data_stream.py b/dfvfs/vfs/data_stream.py index ffa4fbb8..b50e9d1e 100644 --- a/dfvfs/vfs/data_stream.py +++ b/dfvfs/vfs/data_stream.py @@ -5,13 +5,30 @@ class DataStream(object): """Data stream interface.""" - # The data stream object should not have a reference to its - # file entry since that will create a cyclic reference. + def __init__(self, file_entry): + """Initializes the data stream. + + Args: + file_entry (FileEntry): file entry. + """ + super(DataStream, self).__init__() + self._file_entry = file_entry + self._name = '' @property def name(self): """str: name.""" - return '' + return self._name + + def GetExtents(self): + """Retrieves the extents. + + Returns: + list[Extent]: the extents of the data stream. + """ + if self._file_entry: + return self._file_entry.GetExtents() + return [] def IsDefault(self): """Determines if the data stream is the default data stream. @@ -19,4 +36,4 @@ def IsDefault(self): Returns: bool: True if the data stream is the default data stream. """ - return True + return not bool(self._name) diff --git a/dfvfs/vfs/ext_file_entry.py b/dfvfs/vfs/ext_file_entry.py index a54c6060..2f1bf458 100644 --- a/dfvfs/vfs/ext_file_entry.py +++ b/dfvfs/vfs/ext_file_entry.py @@ -245,31 +245,28 @@ def size(self): """int: size of the file entry in bytes or None if not available.""" return self._fsext_file_entry.size - def GetExtents(self, data_stream_name=''): - """Retrieves extents of a specific data stream. - - Args: - data_stream_name (Optional[str]): data stream name, where an empty - string represents the default data stream. + def GetExtents(self): + """Retrieves the extents. Returns: - list[Extent]: extents of the data stream. + list[Extent]: the extents. """ + if self.entry_type != definitions.FILE_ENTRY_TYPE_FILE: + return [] + extents = [] - if (self.entry_type == definitions.FILE_ENTRY_TYPE_FILE and - not data_stream_name): - for extent_index in range(self._fsext_file_entry.number_of_extents): - extent_offset, extent_size, extent_flags = ( - self._fsext_file_entry.get_extent(extent_index)) - - if extent_flags & 0x1: - extent_type = definitions.EXTENT_TYPE_SPARSE - else: - extent_type = definitions.EXTENT_TYPE_DATA - - data_stream_extent = extent.Extent( - extent_type=extent_type, offset=extent_offset, size=extent_size) - extents.append(data_stream_extent) + for extent_index in range(self._fsext_file_entry.number_of_extents): + extent_offset, extent_size, extent_flags = ( + self._fsext_file_entry.get_extent(extent_index)) + + if extent_flags & 0x1: + extent_type = definitions.EXTENT_TYPE_SPARSE + else: + extent_type = definitions.EXTENT_TYPE_DATA + + data_stream_extent = extent.Extent( + extent_type=extent_type, offset=extent_offset, size=extent_size) + extents.append(data_stream_extent) return extents diff --git a/dfvfs/vfs/file_entry.py b/dfvfs/vfs/file_entry.py index fb20cabe..8d8eeb0b 100644 --- a/dfvfs/vfs/file_entry.py +++ b/dfvfs/vfs/file_entry.py @@ -82,7 +82,7 @@ def _GetDataStreams(self): # It is assumed that non-file file entries do not have data streams. if self.entry_type == definitions.FILE_ENTRY_TYPE_FILE: - data_stream_object = data_stream.DataStream() + data_stream_object = data_stream.DataStream(self) self._data_streams.append(data_stream_object) return self._data_streams @@ -308,15 +308,11 @@ def GetDataStream(self, name, case_sensitive=True): return matching_data_stream - def GetExtents(self, data_stream_name=''): # pylint: disable=unused-argument - """Retrieves extents of a specific data stream. - - Args: - data_stream_name (Optional[str]): data stream name, where an empty - string represents the default data stream. + def GetExtents(self): + """Retrieves the extents. Returns: - list[Extent]: extents of the data stream. + list[Extent]: the extents. """ return [] diff --git a/dfvfs/vfs/hfs_data_stream.py b/dfvfs/vfs/hfs_data_stream.py index 37add57c..25c5e11c 100644 --- a/dfvfs/vfs/hfs_data_stream.py +++ b/dfvfs/vfs/hfs_data_stream.py @@ -1,34 +1,48 @@ # -*- coding: utf-8 -*- """The HFS data stream implementation.""" +from dfvfs.lib import definitions from dfvfs.vfs import data_stream +from dfvfs.vfs import extent class HFSDataStream(data_stream.DataStream): """File system data stream that uses pyfshfs.""" - def __init__(self, fshfs_data_stream): - """Initializes the data stream. + def __init__(self, file_entry, fshfs_data_stream): + """Initializes a HFS data stream. Args: + file_entry (FileEntry): file entry. fshfs_data_stream (pyfshfs.data_stream): HFS data stream. """ - super(HFSDataStream, self).__init__() + super(HFSDataStream, self).__init__(file_entry) self._fshfs_data_stream = fshfs_data_stream - self._name = '' if fshfs_data_stream: self._name = 'rsrc' - @property - def name(self): - """str: name.""" - return self._name - - def IsDefault(self): - """Determines if the data stream is the default (data fork) data stream. + def GetExtents(self): + """Retrieves the extents. Returns: - bool: True if the data stream is the default (data fork) data stream. + list[Extent]: the extents of the data stream. """ - return not self._fshfs_data_stream + if not self._fshfs_data_stream: + return super(HFSDataStream, self).GetExtents() + + extents = [] + for extent_index in range(self._fshfs_data_stream.number_of_extents): + extent_offset, extent_size, extent_flags = ( + self._fshfs_data_stream.get_extent(extent_index)) + + if extent_flags & 0x1: + extent_type = definitions.EXTENT_TYPE_SPARSE + else: + extent_type = definitions.EXTENT_TYPE_DATA + + data_stream_extent = extent.Extent( + extent_type=extent_type, offset=extent_offset, size=extent_size) + extents.append(data_stream_extent) + + return extents diff --git a/dfvfs/vfs/hfs_file_entry.py b/dfvfs/vfs/hfs_file_entry.py index a21600df..1ddb1b37 100644 --- a/dfvfs/vfs/hfs_file_entry.py +++ b/dfvfs/vfs/hfs_file_entry.py @@ -98,12 +98,12 @@ def _GetDataStreams(self): self._data_streams = [] if self.entry_type == definitions.FILE_ENTRY_TYPE_FILE: - data_stream = hfs_data_stream.HFSDataStream(None) + data_stream = hfs_data_stream.HFSDataStream(self, None) self._data_streams.append(data_stream) fshfs_data_stream = self._fshfs_file_entry.get_resource_fork() if fshfs_data_stream: - data_stream = hfs_data_stream.HFSDataStream(fshfs_data_stream) + data_stream = hfs_data_stream.HFSDataStream(self, fshfs_data_stream) self._data_streams.append(data_stream) return self._data_streams @@ -246,47 +246,28 @@ def size(self): """int: size of the file entry in bytes or None if not available.""" return self._fshfs_file_entry.size - def GetExtents(self, data_stream_name=''): - """Retrieves extents of a specific data stream. - - Args: - data_stream_name (Optional[str]): data stream name, where an empty - string represents the default data stream. + def GetExtents(self): + """Retrieves the extents. Returns: - list[Extent]: extents of the data stream. + list[Extent]: the extents. """ + if self.entry_type != definitions.FILE_ENTRY_TYPE_FILE: + return [] + extents = [] - if (self.entry_type == definitions.FILE_ENTRY_TYPE_FILE and - not data_stream_name): - for extent_index in range(self._fshfs_file_entry.number_of_extents): - extent_offset, extent_size, extent_flags = ( - self._fshfs_file_entry.get_extent(extent_index)) - - if extent_flags & 0x1: - extent_type = definitions.EXTENT_TYPE_SPARSE - else: - extent_type = definitions.EXTENT_TYPE_DATA - - data_stream_extent = extent.Extent( - extent_type=extent_type, offset=extent_offset, size=extent_size) - extents.append(data_stream_extent) - - elif data_stream_name == 'rsrc': - fshfs_data_stream = self._fshfs_file_entry.get_resource_fork() - if fshfs_data_stream: - for extent_index in range(fshfs_data_stream.number_of_extents): - extent_offset, extent_size, extent_flags = ( - fshfs_data_stream.get_extent(extent_index)) - - if extent_flags & 0x1: - extent_type = definitions.EXTENT_TYPE_SPARSE - else: - extent_type = definitions.EXTENT_TYPE_DATA - - data_stream_extent = extent.Extent( - extent_type=extent_type, offset=extent_offset, size=extent_size) - extents.append(data_stream_extent) + for extent_index in range(self._fshfs_file_entry.number_of_extents): + extent_offset, extent_size, extent_flags = ( + self._fshfs_file_entry.get_extent(extent_index)) + + if extent_flags & 0x1: + extent_type = definitions.EXTENT_TYPE_SPARSE + else: + extent_type = definitions.EXTENT_TYPE_DATA + + data_stream_extent = extent.Extent( + extent_type=extent_type, offset=extent_offset, size=extent_size) + extents.append(data_stream_extent) return extents diff --git a/dfvfs/vfs/ntfs_data_stream.py b/dfvfs/vfs/ntfs_data_stream.py index 4ba6f801..566ad24a 100644 --- a/dfvfs/vfs/ntfs_data_stream.py +++ b/dfvfs/vfs/ntfs_data_stream.py @@ -1,30 +1,48 @@ # -*- coding: utf-8 -*- """The NTFS data stream implementation.""" +from dfvfs.lib import definitions from dfvfs.vfs import data_stream +from dfvfs.vfs import extent class NTFSDataStream(data_stream.DataStream): """File system data stream that uses pyfsntfs.""" - def __init__(self, fsntfs_data_stream): - """Initializes the data stream. + def __init__(self, file_entry, fsntfs_data_stream): + """Initializes a NTFS data stream. Args: + file_entry (FileEntry): file entry. fsntfs_data_stream (pyfsntfs.data_stream): NTFS data stream. """ - super(NTFSDataStream, self).__init__() - self._name = getattr(fsntfs_data_stream, 'name', None) or '' + super(NTFSDataStream, self).__init__(file_entry) + self._fsntfs_data_stream = fsntfs_data_stream - @property - def name(self): - """str: name.""" - return self._name + if fsntfs_data_stream: + self._name = fsntfs_data_stream.name - def IsDefault(self): - """Determines if the data stream is the default data stream. + def GetExtents(self): + """Retrieves the extents. Returns: - bool: True if the data stream is the default data stream. + list[Extent]: the extents of the data stream. """ - return not bool(self._name) + if not self._fsntfs_data_stream: + return super(NTFSDataStream, self).GetExtents() + + extents = [] + for extent_index in range(self._fsntfs_data_stream.number_of_extents): + extent_offset, extent_size, extent_flags = ( + self._fsntfs_data_stream.get_extent(extent_index)) + + if extent_flags & 0x1: + extent_type = definitions.EXTENT_TYPE_SPARSE + else: + extent_type = definitions.EXTENT_TYPE_DATA + + data_stream_extent = extent.Extent( + extent_type=extent_type, offset=extent_offset, size=extent_size) + extents.append(data_stream_extent) + + return extents diff --git a/dfvfs/vfs/ntfs_file_entry.py b/dfvfs/vfs/ntfs_file_entry.py index 38fd23bc..1ddeb1e5 100644 --- a/dfvfs/vfs/ntfs_file_entry.py +++ b/dfvfs/vfs/ntfs_file_entry.py @@ -96,11 +96,11 @@ def _GetDataStreams(self): if self._data_streams is None: self._data_streams = [] if self._fsntfs_file_entry.has_default_data_stream(): - data_stream = ntfs_data_stream.NTFSDataStream(None) + data_stream = ntfs_data_stream.NTFSDataStream(self, None) self._data_streams.append(data_stream) for fsntfs_data_stream in self._fsntfs_file_entry.alternate_data_streams: - data_stream = ntfs_data_stream.NTFSDataStream(fsntfs_data_stream) + data_stream = ntfs_data_stream.NTFSDataStream(self, fsntfs_data_stream) self._data_streams.append(data_stream) return self._data_streams @@ -232,49 +232,25 @@ def size(self): """int: size of the file entry in bytes or None if not available.""" return getattr(self._fsntfs_file_entry, 'size', None) - def GetExtents(self, data_stream_name=''): - """Retrieves extents of a specific data stream. - - Args: - data_stream_name (Optional[str]): data stream name, where an empty - string represents the default data stream. + def GetExtents(self): + """Retrieves the extents. Returns: - list[Extent]: extents of the data stream. + list[Extent]: the extents. """ extents = [] - if not data_stream_name: - for extent_index in range(self._fsntfs_file_entry.number_of_extents): - extent_offset, extent_size, extent_flags = ( - self._fsntfs_file_entry.get_extent(extent_index)) - - if extent_flags & 0x1: - extent_type = definitions.EXTENT_TYPE_SPARSE - else: - extent_type = definitions.EXTENT_TYPE_DATA - - data_stream_extent = extent.Extent( - extent_type=extent_type, offset=extent_offset, size=extent_size) - extents.append(data_stream_extent) - - else: - fsntfs_data_stream = ( - self._fsntfs_file_entry.get_alternate_data_stream_by_name( - data_stream_name)) - - if fsntfs_data_stream: - for extent_index in range(fsntfs_data_stream.number_of_extents): - extent_offset, extent_size, extent_flags = ( - fsntfs_data_stream.get_extent(extent_index)) - - if extent_flags & 0x1: - extent_type = definitions.EXTENT_TYPE_SPARSE - else: - extent_type = definitions.EXTENT_TYPE_DATA - - data_stream_extent = extent.Extent( - extent_type=extent_type, offset=extent_offset, size=extent_size) - extents.append(data_stream_extent) + for extent_index in range(self._fsntfs_file_entry.number_of_extents): + extent_offset, extent_size, extent_flags = ( + self._fsntfs_file_entry.get_extent(extent_index)) + + if extent_flags & 0x1: + extent_type = definitions.EXTENT_TYPE_SPARSE + else: + extent_type = definitions.EXTENT_TYPE_DATA + + data_stream_extent = extent.Extent( + extent_type=extent_type, offset=extent_offset, size=extent_size) + extents.append(data_stream_extent) return extents diff --git a/dfvfs/vfs/tsk_data_stream.py b/dfvfs/vfs/tsk_data_stream.py index feba20a6..63d079d5 100644 --- a/dfvfs/vfs/tsk_data_stream.py +++ b/dfvfs/vfs/tsk_data_stream.py @@ -3,20 +3,24 @@ import pytsk3 +from dfvfs.lib import definitions +from dfvfs.lib import errors from dfvfs.vfs import data_stream +from dfvfs.vfs import extent class TSKDataStream(data_stream.DataStream): """File system data stream that uses pytsk3.""" - def __init__(self, pytsk_attribute): + def __init__(self, file_entry, pytsk_attribute): """Initializes a data stream. Args: + file_entry (FileEntry): file entry. pytsk_attribute (pytsk3.Attribute): TSK attribute. """ - super(TSKDataStream, self).__init__() - self._name = '' + super(TSKDataStream, self).__init__(file_entry) + self._tsk_attribute = pytsk_attribute if pytsk_attribute: # The value of the attribute name will be None for the default @@ -33,15 +37,58 @@ def __init__(self, pytsk_attribute): except UnicodeError: pass - @property - def name(self): - """str: name.""" - return self._name - - def IsDefault(self): - """Determines if the data stream is the default data stream. + def GetExtents(self): + """Retrieves the extents. Returns: - bool: True if the data stream is the default data stream, false if not. + list[Extent]: the extents of the data stream. + + Raises: + BackEndError: if pytsk3 returns no file system block size or data stream + size. """ - return not bool(self._name) + if not self._tsk_attribute: + return super(TSKDataStream, self).GetExtents() + + extents = [] + file_system = self._file_entry.GetFileSystem() + tsk_file_system = file_system.GetFsInfo() + block_size = getattr(tsk_file_system.info, 'block_size', None) + if not block_size: + raise errors.BackEndError('pytsk3 returned no file system block size.') + + data_stream_size = getattr(self._tsk_attribute.info, 'size', None) + if data_stream_size is None: + raise errors.BackEndError('pytsk3 returned no data stream size.') + + data_stream_number_of_blocks, remainder = divmod( + data_stream_size, block_size) + if remainder: + data_stream_number_of_blocks += 1 + + total_number_of_blocks = 0 + for tsk_attr_run in self._tsk_attribute: + if tsk_attr_run.flags & pytsk3.TSK_FS_ATTR_RUN_FLAG_SPARSE: + extent_type = definitions.EXTENT_TYPE_SPARSE + else: + extent_type = definitions.EXTENT_TYPE_DATA + + extent_offset = tsk_attr_run.addr * block_size + extent_size = tsk_attr_run.len + + # Note that the attribute data runs can be larger than the actual + # allocated size. + if total_number_of_blocks + extent_size > data_stream_number_of_blocks: + extent_size = data_stream_number_of_blocks - total_number_of_blocks + + total_number_of_blocks += extent_size + extent_size *= block_size + + data_stream_extent = extent.Extent( + extent_type=extent_type, offset=extent_offset, size=extent_size) + extents.append(data_stream_extent) + + if total_number_of_blocks >= data_stream_number_of_blocks: + break + + return extents diff --git a/dfvfs/vfs/tsk_file_entry.py b/dfvfs/vfs/tsk_file_entry.py index faddeef1..14d6aa4e 100644 --- a/dfvfs/vfs/tsk_file_entry.py +++ b/dfvfs/vfs/tsk_file_entry.py @@ -383,7 +383,7 @@ def _GetDataStreams(self): if not known_data_attribute_types: if tsk_fs_meta_type == pytsk3.TSK_FS_META_TYPE_REG: - data_stream = tsk_data_stream.TSKDataStream(None) + data_stream = tsk_data_stream.TSKDataStream(self, None) self._data_streams.append(data_stream) else: @@ -398,7 +398,7 @@ def _GetDataStreams(self): attribute_type = getattr(pytsk_attribute.info, 'type', None) if attribute_type in known_data_attribute_types: - data_stream = tsk_data_stream.TSKDataStream(pytsk_attribute) + data_stream = tsk_data_stream.TSKDataStream(self, pytsk_attribute) self._data_streams.append(data_stream) return self._data_streams @@ -695,56 +695,33 @@ def size(self): """int: size of the file entry in bytes or None if not available.""" return getattr(self._tsk_file.info.meta, 'size', None) - def GetExtents(self, data_stream_name=''): - """Retrieves extents of a specific data stream. - - Args: - data_stream_name (Optional[str]): data stream name, where an empty - string represents the default data stream. + def GetExtents(self): + """Retrieves the extents. Returns: - list[Extent]: extents of the data stream. + list[Extent]: the extents. Raises: - BackEndError: if pytsk3 returns a non UTF-8 formatted name or no file - system block size or data stream size. + BackEndError: if pytsk3 returns no file system block size or data stream + size. """ + if self.entry_type != definitions.FILE_ENTRY_TYPE_FILE: + return [] + data_attribute = None for pytsk_attribute in self._tsk_file: if not getattr(pytsk_attribute, 'info', None): continue attribute_name = getattr(pytsk_attribute.info, 'name', None) - if attribute_name: - try: - # pytsk3 returns an UTF-8 encoded byte string. - attribute_name = attribute_name.decode('utf8') - except UnicodeError: - raise errors.BackEndError( - 'pytsk3 returned a non UTF-8 formatted name.') - attribute_type = getattr(pytsk_attribute.info, 'type', None) - if attribute_type == pytsk3.TSK_FS_ATTR_TYPE_HFS_DATA and ( - not data_stream_name and not attribute_name): - data_attribute = pytsk_attribute - break - - if attribute_type == pytsk3.TSK_FS_ATTR_TYPE_HFS_RSRC and ( - data_stream_name == 'rsrc'): - data_attribute = pytsk_attribute - break - - if attribute_type == pytsk3.TSK_FS_ATTR_TYPE_NTFS_DATA and ( - (not data_stream_name and not attribute_name) or - (data_stream_name == attribute_name)): - data_attribute = pytsk_attribute - break # The data stream is returned as a name-less attribute of type - # pytsk3.TSK_FS_ATTR_TYPE_DEFAULT. - if (self.entry_type == definitions.FILE_ENTRY_TYPE_FILE and - attribute_type == pytsk3.TSK_FS_ATTR_TYPE_DEFAULT and - not data_stream_name and not attribute_name): + # pytsk3.TSK_FS_ATTR_TYPE_DEFAULT, pytsk3.TSK_FS_ATTR_TYPE_NTFS_DATA or + # pytsk3.TSK_FS_ATTR_TYPE_NTFS_DATA + if not attribute_name and attribute_type in ( + pytsk3.TSK_FS_ATTR_TYPE_DEFAULT, pytsk3.TSK_FS_ATTR_TYPE_HFS_DATA, + pytsk3.TSK_FS_ATTR_TYPE_NTFS_DATA): data_attribute = pytsk_attribute break diff --git a/dfvfs/vfs/xfs_file_entry.py b/dfvfs/vfs/xfs_file_entry.py index 24b8a985..0f191b93 100644 --- a/dfvfs/vfs/xfs_file_entry.py +++ b/dfvfs/vfs/xfs_file_entry.py @@ -210,31 +210,28 @@ def size(self): """int: size of the file entry in bytes or None if not available.""" return self._fsxfs_file_entry.size - def GetExtents(self, data_stream_name=''): - """Retrieves extents of a specific data stream. - - Args: - data_stream_name (Optional[str]): data stream name, where an empty - string represents the default data stream. + def GetExtents(self): + """Retrieves the extents. Returns: - list[Extent]: extents of the data stream. + list[Extent]: the extents. """ + if self.entry_type != definitions.FILE_ENTRY_TYPE_FILE: + return [] + extents = [] - if (self.entry_type == definitions.FILE_ENTRY_TYPE_FILE and - not data_stream_name): - for extent_index in range(self._fsxfs_file_entry.number_of_extents): - extent_offset, extent_size, extent_flags = ( - self._fsxfs_file_entry.get_extent(extent_index)) - - if extent_flags & 0x1: - extent_type = definitions.EXTENT_TYPE_SPARSE - else: - extent_type = definitions.EXTENT_TYPE_DATA - - data_stream_extent = extent.Extent( - extent_type=extent_type, offset=extent_offset, size=extent_size) - extents.append(data_stream_extent) + for extent_index in range(self._fsxfs_file_entry.number_of_extents): + extent_offset, extent_size, extent_flags = ( + self._fsxfs_file_entry.get_extent(extent_index)) + + if extent_flags & 0x1: + extent_type = definitions.EXTENT_TYPE_SPARSE + else: + extent_type = definitions.EXTENT_TYPE_DATA + + data_stream_extent = extent.Extent( + extent_type=extent_type, offset=extent_offset, size=extent_size) + extents.append(data_stream_extent) return extents diff --git a/tests/vfs/data_stream.py b/tests/vfs/data_stream.py index 5d19bc73..a4a2eb14 100644 --- a/tests/vfs/data_stream.py +++ b/tests/vfs/data_stream.py @@ -14,13 +14,20 @@ class DataStreamTest(shared_test_lib.BaseTestCase): def testName(self): """Test the name property.""" - test_data_stream = data_stream.DataStream() + test_data_stream = data_stream.DataStream(None) self.assertEqual(test_data_stream.name, '') + def testGetExtents(self): + """Test the GetExtents function.""" + test_data_stream = data_stream.DataStream(None) + extents = test_data_stream.GetExtents() + self.assertEqual(extents, []) + def testIsDefault(self): """Test the IsDefault function.""" - test_data_stream = data_stream.DataStream() - self.assertTrue(test_data_stream.IsDefault()) + test_data_stream = data_stream.DataStream(None) + result = test_data_stream.IsDefault() + self.assertTrue(result) if __name__ == '__main__': diff --git a/tests/vfs/hfs_data_stream.py b/tests/vfs/hfs_data_stream.py index 5e752476..5d8ca358 100644 --- a/tests/vfs/hfs_data_stream.py +++ b/tests/vfs/hfs_data_stream.py @@ -4,7 +4,11 @@ import unittest +from dfvfs.lib import definitions +from dfvfs.path import factory as path_spec_factory +from dfvfs.resolver import context from dfvfs.vfs import hfs_data_stream +from dfvfs.vfs import hfs_file_system from tests import test_lib as shared_test_lib @@ -12,17 +16,100 @@ class HFSDataStreamTest(shared_test_lib.BaseTestCase): """Tests the HFS data stream.""" + def setUp(self): + """Sets up the needed objects used throughout the test.""" + self._resolver_context = context.Context() + test_path = self._GetTestFilePath(['hfsplus.raw']) + self._SkipIfPathNotExists(test_path) + + test_os_path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_OS, location=test_path) + self._raw_path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_RAW, parent=test_os_path_spec) + self._hfs_path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_HFS, location='/', + parent=self._raw_path_spec) + + self._file_system = hfs_file_system.HFSFileSystem( + self._resolver_context, self._hfs_path_spec) + self._file_system.Open() + + def tearDown(self): + """Cleans up the needed objects used throughout the test.""" + self._resolver_context.Empty() + def testName(self): """Test the name property.""" - test_data_stream = hfs_data_stream.HFSDataStream(None) + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_HFS, identifier=25, + location='/a_directory/a_resourcefork', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + test_data_stream = hfs_data_stream.HFSDataStream(file_entry, None) self.assertEqual(test_data_stream.name, '') + fshfs_file_entry = file_entry.GetHFSFileEntry() + self.assertIsNotNone(fshfs_file_entry) + + fshfs_data_stream = fshfs_file_entry.get_resource_fork() + + test_data_stream = hfs_data_stream.HFSDataStream( + file_entry, fshfs_data_stream) + self.assertEqual(test_data_stream.name, 'rsrc') + + def testGetExtents(self): + """Test the GetExtents function.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_HFS, identifier=25, + location='/a_directory/a_resourcefork', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + test_data_stream = hfs_data_stream.HFSDataStream(file_entry, None) + + extents = test_data_stream.GetExtents() + self.assertEqual(len(extents), 0) + + fshfs_file_entry = file_entry.GetHFSFileEntry() + self.assertIsNotNone(fshfs_file_entry) + + fshfs_data_stream = fshfs_file_entry.get_resource_fork() + + test_data_stream = hfs_data_stream.HFSDataStream( + file_entry, fshfs_data_stream) + + extents = test_data_stream.GetExtents() + self.assertEqual(len(extents), 1) + + self.assertEqual(extents[0].extent_type, definitions.EXTENT_TYPE_DATA) + self.assertEqual(extents[0].offset, 1142784) + self.assertEqual(extents[0].size, 4096) + def testIsDefault(self): """Test the IsDefault function.""" - test_data_stream = hfs_data_stream.HFSDataStream(None) + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_HFS, identifier=25, + location='/a_directory/a_resourcefork', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + test_data_stream = hfs_data_stream.HFSDataStream(file_entry, None) + result = test_data_stream.IsDefault() self.assertTrue(result) + fshfs_file_entry = file_entry.GetHFSFileEntry() + self.assertIsNotNone(fshfs_file_entry) + + fshfs_data_stream = fshfs_file_entry.get_resource_fork() + + test_data_stream = hfs_data_stream.HFSDataStream( + file_entry, fshfs_data_stream) + + result = test_data_stream.IsDefault() + self.assertFalse(result) + if __name__ == '__main__': unittest.main() diff --git a/tests/vfs/hfs_file_entry.py b/tests/vfs/hfs_file_entry.py index e3a26872..5cdc5803 100644 --- a/tests/vfs/hfs_file_entry.py +++ b/tests/vfs/hfs_file_entry.py @@ -352,19 +352,6 @@ def testGetExtents(self): extents = file_entry.GetExtents() self.assertEqual(len(extents), 0) - path_spec = path_spec_factory.Factory.NewPathSpec( - definitions.TYPE_INDICATOR_HFS, identifier=25, - location='/a_directory/a_resourcefork', parent=self._raw_path_spec) - file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) - self.assertIsNotNone(file_entry) - - extents = file_entry.GetExtents(data_stream_name='rsrc') - self.assertEqual(len(extents), 1) - - self.assertEqual(extents[0].extent_type, definitions.EXTENT_TYPE_DATA) - self.assertEqual(extents[0].offset, 1142784) - self.assertEqual(extents[0].size, 4096) - def testGetFileEntryByPathSpec(self): """Tests the GetFileEntryByPathSpec function.""" path_spec = path_spec_factory.Factory.NewPathSpec( diff --git a/tests/vfs/ntfs_data_stream.py b/tests/vfs/ntfs_data_stream.py index 2d4bfb9a..94f3a700 100644 --- a/tests/vfs/ntfs_data_stream.py +++ b/tests/vfs/ntfs_data_stream.py @@ -4,7 +4,11 @@ import unittest +from dfvfs.lib import definitions +from dfvfs.path import factory as path_spec_factory +from dfvfs.resolver import context from dfvfs.vfs import ntfs_data_stream +from dfvfs.vfs import ntfs_file_system from tests import test_lib as shared_test_lib @@ -12,17 +16,104 @@ class NTFSDataStreamTest(shared_test_lib.BaseTestCase): """Tests the NTFS data stream.""" + def setUp(self): + """Sets up the needed objects used throughout the test.""" + self._resolver_context = context.Context() + test_path = self._GetTestFilePath(['ntfs.raw']) + self._SkipIfPathNotExists(test_path) + + test_os_path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_OS, location=test_path) + self._raw_path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_RAW, parent=test_os_path_spec) + self._ntfs_path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_NTFS, location='\\', + parent=self._raw_path_spec) + + self._file_system = ntfs_file_system.NTFSFileSystem( + self._resolver_context, self._ntfs_path_spec) + self._file_system.Open() + + def tearDown(self): + """Cleans up the needed objects used throughout the test.""" + self._resolver_context.Empty() + def testName(self): """Test the name property.""" - test_data_stream = ntfs_data_stream.NTFSDataStream(None) + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_NTFS, location='\\$UpCase', mft_entry=10, + parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + test_data_stream = ntfs_data_stream.NTFSDataStream(file_entry, None) self.assertEqual(test_data_stream.name, '') + fsntfs_file_entry = file_entry.GetNTFSFileEntry() + self.assertIsNotNone(fsntfs_file_entry) + + fsntfs_data_stream = fsntfs_file_entry.get_alternate_data_stream_by_name( + '$Info') + + test_data_stream = ntfs_data_stream.NTFSDataStream( + file_entry, fsntfs_data_stream) + self.assertEqual(test_data_stream.name, '$Info') + + def testGetExtents(self): + """Test the GetExtents function.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_NTFS, location='\\$UpCase', mft_entry=10, + parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + test_data_stream = ntfs_data_stream.NTFSDataStream(file_entry, None) + + extents = test_data_stream.GetExtents() + self.assertEqual(len(extents), 1) + + self.assertEqual(extents[0].extent_type, definitions.EXTENT_TYPE_DATA) + self.assertEqual(extents[0].offset, 823296) + self.assertEqual(extents[0].size, 131072) + + fsntfs_file_entry = file_entry.GetNTFSFileEntry() + self.assertIsNotNone(fsntfs_file_entry) + + fsntfs_data_stream = fsntfs_file_entry.get_alternate_data_stream_by_name( + '$Info') + + test_data_stream = ntfs_data_stream.NTFSDataStream( + file_entry, fsntfs_data_stream) + + extents = test_data_stream.GetExtents() + # No extents are returned for a resident $DATA attribute. + self.assertEqual(len(extents), 0) + def testIsDefault(self): """Test the IsDefault function.""" - test_data_stream = ntfs_data_stream.NTFSDataStream(None) + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_NTFS, location='\\$UpCase', mft_entry=10, + parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + test_data_stream = ntfs_data_stream.NTFSDataStream(file_entry, None) + result = test_data_stream.IsDefault() self.assertTrue(result) + fsntfs_file_entry = file_entry.GetNTFSFileEntry() + self.assertIsNotNone(fsntfs_file_entry) + + fsntfs_data_stream = fsntfs_file_entry.get_alternate_data_stream_by_name( + '$Info') + + test_data_stream = ntfs_data_stream.NTFSDataStream( + file_entry, fsntfs_data_stream) + + result = test_data_stream.IsDefault() + self.assertFalse(result) + if __name__ == '__main__': unittest.main() diff --git a/tests/vfs/ntfs_file_entry.py b/tests/vfs/ntfs_file_entry.py index 45a9187f..ef53af6d 100644 --- a/tests/vfs/ntfs_file_entry.py +++ b/tests/vfs/ntfs_file_entry.py @@ -131,6 +131,81 @@ def testAccessTime(self): self.assertIsNotNone(file_entry) self.assertIsNotNone(file_entry.access_time) + def testAttributes(self): + """Test the attributes properties.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_NTFS, location='\\a_directory\\a_file', + mft_entry=self._MFT_ENTRY_A_FILE, parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + self.assertEqual(file_entry.number_of_attributes, 4) + + attributes = list(file_entry.attributes) + attribute = attributes[0] + + self.assertEqual( + attribute.type_indicator, + definitions.ATTRIBUTE_TYPE_NTFS_STANDARD_INFORMATION) + + self.assertIsNotNone(attribute.access_time) + self.assertIsNotNone(attribute.creation_time) + self.assertIsNotNone(attribute.modification_time) + self.assertIsNotNone(attribute.entry_modification_time) + + date_time_string = ( + attribute.modification_time.CopyToDateTimeStringISO8601()) + self.assertEqual(date_time_string, '2019-08-31T10:22:59.9581788+00:00') + + attribute = attributes[1] + + self.assertEqual( + attribute.type_indicator, definitions.ATTRIBUTE_TYPE_NTFS_FILE_NAME) + + self.assertIsNotNone(attribute.access_time) + self.assertIsNotNone(attribute.creation_time) + self.assertIsNotNone(attribute.modification_time) + self.assertIsNotNone(attribute.entry_modification_time) + + date_time_string = ( + attribute.access_time.CopyToDateTimeStringISO8601()) + self.assertEqual(date_time_string, '2019-08-31T10:22:59.9567496+00:00') + + attribute = attributes[2] + + self.assertEqual( + attribute.type_indicator, + definitions.ATTRIBUTE_TYPE_NTFS_SECURITY_DESCRIPTOR) + + security_descriptor = attribute.security_descriptor + self.assertIsNotNone(security_descriptor) + + security_identifier = security_descriptor.owner + self.assertIsNotNone(security_identifier) + self.assertEqual(security_identifier.string, 'S-1-5-32-544') + + security_identifier = security_descriptor.group + self.assertIsNotNone(security_identifier) + self.assertEqual(security_identifier.string, 'S-1-5-32-544') + + access_control_list = security_descriptor.system_acl + self.assertIsNone(access_control_list) + + access_control_list = security_descriptor.discretionary_acl + self.assertIsNotNone(access_control_list) + self.assertEqual(access_control_list.number_of_entries, 1) + + access_control_entry = access_control_list.get_entry(0) + self.assertIsNotNone(access_control_entry) + + self.assertEqual(access_control_entry.type, 0) + self.assertEqual(access_control_entry.flags, 3) + self.assertEqual(access_control_entry.access_mask, 0x1f01ff) + + security_identifier = access_control_entry.security_identifier + self.assertIsNotNone(security_identifier) + self.assertEqual(security_identifier.string, 'S-1-1-0') + def testChangeTime(self): """Test the change_time property.""" path_spec = path_spec_factory.Factory.NewPathSpec( @@ -151,6 +226,51 @@ def testCreationTime(self): self.assertIsNotNone(file_entry) self.assertIsNotNone(file_entry.creation_time) + def testDataStream(self): + """Tests the data streams properties.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_NTFS, location='\\a_directory\\a_file', + mft_entry=self._MFT_ENTRY_A_FILE, parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + self.assertEqual(file_entry.number_of_data_streams, 1) + + data_stream_names = [] + for data_stream in file_entry.data_streams: + data_stream_names.append(data_stream.name) + + self.assertEqual(data_stream_names, ['']) + + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_NTFS, location='\\a_directory', + mft_entry=self._MFT_ENTRY_A_DIRECTORY, parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + self.assertEqual(file_entry.number_of_data_streams, 0) + + data_stream_names = [] + for data_stream in file_entry.data_streams: + data_stream_names.append(data_stream.name) + + self.assertEqual(data_stream_names, []) + + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_NTFS, location='\\$UpCase', mft_entry=10, + parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + self.assertEqual(file_entry.number_of_data_streams, 2) + + data_stream_names = [] + for data_stream in file_entry.data_streams: + data_stream_names.append(data_stream.name) + + expected_data_stream_names = sorted(['', '$Info']) + self.assertEqual(sorted(data_stream_names), expected_data_stream_names) + def testModificationTime(self): """Test the modification_time property.""" path_spec = path_spec_factory.Factory.NewPathSpec( @@ -181,6 +301,50 @@ def testSize(self): self.assertIsNotNone(file_entry) self.assertEqual(file_entry.size, 116) + def testSubFileEntries(self): + """Test the sub file entries properties.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_NTFS, location='\\', + parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + self.assertEqual(file_entry.number_of_sub_file_entries, 14) + + expected_sub_file_entry_names = [ + '$AttrDef', + '$BadClus', + '$Bitmap', + '$Boot', + '$Extend', + '$LogFile', + '$MFT', + '$MFTMirr', + '$Secure', + '$UpCase', + '$Volume', + 'a_directory', + 'a_link', + 'passwords.txt'] + + sub_file_entry_names = [] + for sub_file_entry in file_entry.sub_file_entries: + sub_file_entry_names.append(sub_file_entry.name) + + self.assertEqual( + len(sub_file_entry_names), len(expected_sub_file_entry_names)) + self.assertEqual( + sorted(sub_file_entry_names), sorted(expected_sub_file_entry_names)) + + # Test a path specification without a location. + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_NTFS, mft_attribute=1, + mft_entry=self._MFT_ENTRY_A_DIRECTORY, parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + self.assertEqual(file_entry.number_of_sub_file_entries, 2) + def testGetExtents(self): """Tests the GetExtents function.""" path_spec = path_spec_factory.Factory.NewPathSpec( @@ -196,10 +360,6 @@ def testGetExtents(self): self.assertEqual(extents[0].offset, 823296) self.assertEqual(extents[0].size, 131072) - extents = file_entry.GetExtents(data_stream_name='$Info') - # No extents are returned for data store in the $DATA attribute. - self.assertEqual(len(extents), 0) - path_spec = path_spec_factory.Factory.NewPathSpec( definitions.TYPE_INDICATOR_NTFS, location='\\a_directory', mft_entry=self._MFT_ENTRY_A_DIRECTORY, parent=self._raw_path_spec) @@ -500,170 +660,6 @@ def testIsVirtual(self): self.assertIsNotNone(file_entry) self.assertFalse(file_entry.IsVirtual()) - def testSubFileEntries(self): - """Test the sub file entries properties.""" - path_spec = path_spec_factory.Factory.NewPathSpec( - definitions.TYPE_INDICATOR_NTFS, location='\\', - parent=self._raw_path_spec) - file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) - self.assertIsNotNone(file_entry) - - self.assertEqual(file_entry.number_of_sub_file_entries, 14) - - expected_sub_file_entry_names = [ - '$AttrDef', - '$BadClus', - '$Bitmap', - '$Boot', - '$Extend', - '$LogFile', - '$MFT', - '$MFTMirr', - '$Secure', - '$UpCase', - '$Volume', - 'a_directory', - 'a_link', - 'passwords.txt'] - - sub_file_entry_names = [] - for sub_file_entry in file_entry.sub_file_entries: - sub_file_entry_names.append(sub_file_entry.name) - - self.assertEqual( - len(sub_file_entry_names), len(expected_sub_file_entry_names)) - self.assertEqual( - sorted(sub_file_entry_names), sorted(expected_sub_file_entry_names)) - - # Test a path specification without a location. - path_spec = path_spec_factory.Factory.NewPathSpec( - definitions.TYPE_INDICATOR_NTFS, mft_attribute=1, - mft_entry=self._MFT_ENTRY_A_DIRECTORY, parent=self._raw_path_spec) - file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) - self.assertIsNotNone(file_entry) - - self.assertEqual(file_entry.number_of_sub_file_entries, 2) - - def testAttributes(self): - """Test the attributes properties.""" - path_spec = path_spec_factory.Factory.NewPathSpec( - definitions.TYPE_INDICATOR_NTFS, location='\\a_directory\\a_file', - mft_entry=self._MFT_ENTRY_A_FILE, parent=self._raw_path_spec) - file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) - self.assertIsNotNone(file_entry) - - self.assertEqual(file_entry.number_of_attributes, 4) - - attributes = list(file_entry.attributes) - attribute = attributes[0] - - self.assertEqual( - attribute.type_indicator, - definitions.ATTRIBUTE_TYPE_NTFS_STANDARD_INFORMATION) - - self.assertIsNotNone(attribute.access_time) - self.assertIsNotNone(attribute.creation_time) - self.assertIsNotNone(attribute.modification_time) - self.assertIsNotNone(attribute.entry_modification_time) - - date_time_string = ( - attribute.modification_time.CopyToDateTimeStringISO8601()) - self.assertEqual(date_time_string, '2019-08-31T10:22:59.9581788+00:00') - - attribute = attributes[1] - - self.assertEqual( - attribute.type_indicator, definitions.ATTRIBUTE_TYPE_NTFS_FILE_NAME) - - self.assertIsNotNone(attribute.access_time) - self.assertIsNotNone(attribute.creation_time) - self.assertIsNotNone(attribute.modification_time) - self.assertIsNotNone(attribute.entry_modification_time) - - date_time_string = ( - attribute.access_time.CopyToDateTimeStringISO8601()) - self.assertEqual(date_time_string, '2019-08-31T10:22:59.9567496+00:00') - - attribute = attributes[2] - - self.assertEqual( - attribute.type_indicator, - definitions.ATTRIBUTE_TYPE_NTFS_SECURITY_DESCRIPTOR) - - security_descriptor = attribute.security_descriptor - self.assertIsNotNone(security_descriptor) - - security_identifier = security_descriptor.owner - self.assertIsNotNone(security_identifier) - self.assertEqual(security_identifier.string, 'S-1-5-32-544') - - security_identifier = security_descriptor.group - self.assertIsNotNone(security_identifier) - self.assertEqual(security_identifier.string, 'S-1-5-32-544') - - access_control_list = security_descriptor.system_acl - self.assertIsNone(access_control_list) - - access_control_list = security_descriptor.discretionary_acl - self.assertIsNotNone(access_control_list) - self.assertEqual(access_control_list.number_of_entries, 1) - - access_control_entry = access_control_list.get_entry(0) - self.assertIsNotNone(access_control_entry) - - self.assertEqual(access_control_entry.type, 0) - self.assertEqual(access_control_entry.flags, 3) - self.assertEqual(access_control_entry.access_mask, 0x1f01ff) - - security_identifier = access_control_entry.security_identifier - self.assertIsNotNone(security_identifier) - self.assertEqual(security_identifier.string, 'S-1-1-0') - - def testDataStream(self): - """Tests the data streams properties.""" - path_spec = path_spec_factory.Factory.NewPathSpec( - definitions.TYPE_INDICATOR_NTFS, location='\\a_directory\\a_file', - mft_entry=self._MFT_ENTRY_A_FILE, parent=self._raw_path_spec) - file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) - self.assertIsNotNone(file_entry) - - self.assertEqual(file_entry.number_of_data_streams, 1) - - data_stream_names = [] - for data_stream in file_entry.data_streams: - data_stream_names.append(data_stream.name) - - self.assertEqual(data_stream_names, ['']) - - path_spec = path_spec_factory.Factory.NewPathSpec( - definitions.TYPE_INDICATOR_NTFS, location='\\a_directory', - mft_entry=self._MFT_ENTRY_A_DIRECTORY, parent=self._raw_path_spec) - file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) - self.assertIsNotNone(file_entry) - - self.assertEqual(file_entry.number_of_data_streams, 0) - - data_stream_names = [] - for data_stream in file_entry.data_streams: - data_stream_names.append(data_stream.name) - - self.assertEqual(data_stream_names, []) - - path_spec = path_spec_factory.Factory.NewPathSpec( - definitions.TYPE_INDICATOR_NTFS, location='\\$UpCase', mft_entry=10, - parent=self._raw_path_spec) - file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) - self.assertIsNotNone(file_entry) - - self.assertEqual(file_entry.number_of_data_streams, 2) - - data_stream_names = [] - for data_stream in file_entry.data_streams: - data_stream_names.append(data_stream.name) - - expected_data_stream_names = sorted(['', '$Info']) - self.assertEqual(sorted(data_stream_names), expected_data_stream_names) - def testGetDataStream(self): """Tests the GetDataStream function.""" path_spec = path_spec_factory.Factory.NewPathSpec( diff --git a/tests/vfs/tsk_data_stream.py b/tests/vfs/tsk_data_stream.py index 2e5bcc90..df6432a9 100644 --- a/tests/vfs/tsk_data_stream.py +++ b/tests/vfs/tsk_data_stream.py @@ -4,23 +4,317 @@ import unittest +import pytsk3 + +from dfvfs.lib import definitions +from dfvfs.path import factory as path_spec_factory +from dfvfs.resolver import context from dfvfs.vfs import tsk_data_stream +from dfvfs.vfs import tsk_file_system from tests import test_lib as shared_test_lib -class TSKDataStreamTest(shared_test_lib.BaseTestCase): - """Tests the SleuthKit (TSK) data stream.""" +class TSKDataStreamTestExt2(shared_test_lib.BaseTestCase): + """Tests the SleuthKit (TSK) data stream on ext2.""" + + _INODE_ANOTHER_FILE = 15 + + def setUp(self): + """Sets up the needed objects used throughout the test.""" + self._resolver_context = context.Context() + test_path = self._GetTestFilePath(['ext2.raw']) + self._SkipIfPathNotExists(test_path) + + test_os_path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_OS, location=test_path) + self._raw_path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_RAW, parent=test_os_path_spec) + self._tsk_path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_TSK, location='/', + parent=self._raw_path_spec) + + self._file_system = tsk_file_system.TSKFileSystem( + self._resolver_context, self._tsk_path_spec) + self._file_system.Open() + + def tearDown(self): + """Cleans up the needed objects used throughout the test.""" + self._resolver_context.Empty() + + def testName(self): + """Test the name property.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_EXT, inode=self._INODE_ANOTHER_FILE, + location='/a_directory/another_file', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + test_data_stream = tsk_data_stream.TSKDataStream(file_entry, None) + self.assertEqual(test_data_stream.name, '') + + def testGetExtents(self): + """Test the GetExtents function.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_EXT, inode=self._INODE_ANOTHER_FILE, + location='/a_directory/another_file', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + test_data_stream = tsk_data_stream.TSKDataStream(file_entry, None) + + extents = test_data_stream.GetExtents() + self.assertEqual(len(extents), 1) + + self.assertEqual(extents[0].extent_type, definitions.EXTENT_TYPE_DATA) + self.assertEqual(extents[0].offset, 527360) + self.assertEqual(extents[0].size, 1024) + + def testIsDefault(self): + """Test the IsDefault function.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_EXT, inode=self._INODE_ANOTHER_FILE, + location='/a_directory/another_file', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + test_data_stream = tsk_data_stream.TSKDataStream(file_entry, None) + + result = test_data_stream.IsDefault() + self.assertTrue(result) + + +class TSKDataStreamTestHFSPlus(shared_test_lib.BaseTestCase): + """Tests the SleuthKit (TSK) data stream on HFS+.""" + + def setUp(self): + """Sets up the needed objects used throughout the test.""" + self._resolver_context = context.Context() + test_path = self._GetTestFilePath(['hfsplus.raw']) + self._SkipIfPathNotExists(test_path) + + test_os_path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_OS, location=test_path) + self._raw_path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_RAW, parent=test_os_path_spec) + self._tsk_path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_TSK, location='/', + parent=self._raw_path_spec) + + self._file_system = tsk_file_system.TSKFileSystem( + self._resolver_context, self._tsk_path_spec) + self._file_system.Open() + + def tearDown(self): + """Cleans up the needed objects used throughout the test.""" + self._resolver_context.Empty() + + def _GetResourceForkAttribute(self, tsk_file): + """Retrieves an resource fork attribute. + + Args: + tsk_file (pytsk3.File): TSK file. + + Returns: + pytsk3.Attribute: TSK attribute or None. + """ + for tsk_attribute in tsk_file: + if getattr(tsk_attribute, 'info', None): + attribute_type = getattr(tsk_attribute.info, 'type', None) + if attribute_type == pytsk3.TSK_FS_ATTR_TYPE_HFS_RSRC: + return tsk_attribute + + return None def testName(self): """Test the name property.""" - test_data_stream = tsk_data_stream.TSKDataStream(None) + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_TSK, inode=25, + location='/a_directory/a_resourcefork', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + test_data_stream = tsk_data_stream.TSKDataStream(file_entry, None) self.assertEqual(test_data_stream.name, '') + tsk_file = file_entry.GetTSKFile() + self.assertIsNotNone(tsk_file) + + tsk_attribute = self._GetResourceForkAttribute(tsk_file) + self.assertIsNotNone(tsk_attribute) + + test_data_stream = tsk_data_stream.TSKDataStream(file_entry, tsk_attribute) + self.assertEqual(test_data_stream.name, 'rsrc') + + def testGetExtents(self): + """Test the GetExtents function.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_TSK, inode=25, + location='/a_directory/a_resourcefork', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + test_data_stream = tsk_data_stream.TSKDataStream(file_entry, None) + + extents = test_data_stream.GetExtents() + self.assertEqual(len(extents), 0) + + tsk_file = file_entry.GetTSKFile() + self.assertIsNotNone(tsk_file) + + tsk_attribute = self._GetResourceForkAttribute(tsk_file) + self.assertIsNotNone(tsk_attribute) + + test_data_stream = tsk_data_stream.TSKDataStream(file_entry, tsk_attribute) + + extents = test_data_stream.GetExtents() + self.assertEqual(len(extents), 1) + + self.assertEqual(extents[0].extent_type, definitions.EXTENT_TYPE_DATA) + self.assertEqual(extents[0].offset, 1142784) + self.assertEqual(extents[0].size, 4096) + def testIsDefault(self): """Test the IsDefault function.""" - test_data_stream = tsk_data_stream.TSKDataStream(None) - self.assertTrue(test_data_stream.IsDefault()) + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_TSK, inode=25, + location='/a_directory/a_resourcefork', parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + test_data_stream = tsk_data_stream.TSKDataStream(file_entry, None) + + result = test_data_stream.IsDefault() + self.assertTrue(result) + + tsk_file = file_entry.GetTSKFile() + self.assertIsNotNone(tsk_file) + + tsk_attribute = self._GetResourceForkAttribute(tsk_file) + self.assertIsNotNone(tsk_attribute) + + test_data_stream = tsk_data_stream.TSKDataStream(file_entry, tsk_attribute) + + result = test_data_stream.IsDefault() + self.assertFalse(result) + + +class TSKDataStreamTestNTFS(shared_test_lib.BaseTestCase): + """Tests the SleuthKit (TSK) data stream on NTFS.""" + + def setUp(self): + """Sets up the needed objects used throughout the test.""" + self._resolver_context = context.Context() + test_path = self._GetTestFilePath(['ntfs.raw']) + self._SkipIfPathNotExists(test_path) + + test_os_path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_OS, location=test_path) + self._raw_path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_RAW, parent=test_os_path_spec) + self._tsk_path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_TSK, location='/', + parent=self._raw_path_spec) + + self._file_system = tsk_file_system.TSKFileSystem( + self._resolver_context, self._tsk_path_spec) + self._file_system.Open() + + def tearDown(self): + """Cleans up the needed objects used throughout the test.""" + self._resolver_context.Empty() + + def _GetAlternateDataStreamAttribute(self, tsk_file): + """Retrieves an ADS attribute. + + Args: + tsk_file (pytsk3.File): TSK file. + + Returns: + pytsk3.Attribute: TSK attribute or None. + """ + for tsk_attribute in tsk_file: + if getattr(tsk_attribute, 'info', None): + attribute_name = getattr(tsk_attribute.info, 'name', None) + attribute_type = getattr(tsk_attribute.info, 'type', None) + if (attribute_type == pytsk3.TSK_FS_ATTR_TYPE_NTFS_DATA and + attribute_name): + return tsk_attribute + + return None + + def testName(self): + """Test the name property.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_TSK, inode=10, location='/$UpCase', + parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + test_data_stream = tsk_data_stream.TSKDataStream(file_entry, None) + self.assertEqual(test_data_stream.name, '') + + tsk_file = file_entry.GetTSKFile() + self.assertIsNotNone(tsk_file) + + tsk_attribute = self._GetAlternateDataStreamAttribute(tsk_file) + self.assertIsNotNone(tsk_attribute) + + test_data_stream = tsk_data_stream.TSKDataStream(file_entry, tsk_attribute) + self.assertEqual(test_data_stream.name, '$Info') + + def testGetExtents(self): + """Test the GetExtents function.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_TSK, inode=10, location='/$UpCase', + parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + test_data_stream = tsk_data_stream.TSKDataStream(file_entry, None) + + extents = test_data_stream.GetExtents() + self.assertEqual(len(extents), 1) + + self.assertEqual(extents[0].extent_type, definitions.EXTENT_TYPE_DATA) + self.assertEqual(extents[0].offset, 823296) + self.assertEqual(extents[0].size, 131072) + + tsk_file = file_entry.GetTSKFile() + self.assertIsNotNone(tsk_file) + + tsk_attribute = self._GetAlternateDataStreamAttribute(tsk_file) + self.assertIsNotNone(tsk_attribute) + + test_data_stream = tsk_data_stream.TSKDataStream(file_entry, tsk_attribute) + + extents = test_data_stream.GetExtents() + # No extents are returned for a resident $DATA attribute. + self.assertEqual(len(extents), 0) + + def testIsDefault(self): + """Test the IsDefault function.""" + path_spec = path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_TSK, inode=10, location='/$UpCase', + parent=self._raw_path_spec) + file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) + self.assertIsNotNone(file_entry) + + test_data_stream = tsk_data_stream.TSKDataStream(file_entry, None) + + result = test_data_stream.IsDefault() + self.assertTrue(result) + + tsk_file = file_entry.GetTSKFile() + self.assertIsNotNone(tsk_file) + + tsk_attribute = self._GetAlternateDataStreamAttribute(tsk_file) + self.assertIsNotNone(tsk_attribute) + + test_data_stream = tsk_data_stream.TSKDataStream(file_entry, tsk_attribute) + + result = test_data_stream.IsDefault() + self.assertFalse(result) if __name__ == '__main__': diff --git a/tests/vfs/tsk_file_entry.py b/tests/vfs/tsk_file_entry.py index f05fde26..199e6ee2 100644 --- a/tests/vfs/tsk_file_entry.py +++ b/tests/vfs/tsk_file_entry.py @@ -1155,19 +1155,6 @@ def testGetExtents(self): extents = file_entry.GetExtents() self.assertEqual(len(extents), 0) - path_spec = path_spec_factory.Factory.NewPathSpec( - definitions.TYPE_INDICATOR_TSK, inode=25, - location='/a_directory/a_resourcefork', parent=self._raw_path_spec) - file_entry = self._file_system.GetFileEntryByPathSpec(path_spec) - self.assertIsNotNone(file_entry) - - extents = file_entry.GetExtents(data_stream_name='rsrc') - self.assertEqual(len(extents), 1) - - self.assertEqual(extents[0].extent_type, definitions.EXTENT_TYPE_DATA) - self.assertEqual(extents[0].offset, 1142784) - self.assertEqual(extents[0].size, 4096) - def testGetFileEntryByPathSpec(self): """Tests the GetFileEntryByPathSpec function.""" path_spec = path_spec_factory.Factory.NewPathSpec( @@ -1700,10 +1687,6 @@ def testGetExtents(self): self.assertEqual(extents[0].offset, 823296) self.assertEqual(extents[0].size, 131072) - extents = file_entry.GetExtents(data_stream_name='$Info') - # No extents are returned for data store in the $DATA attribute. - self.assertEqual(len(extents), 0) - path_spec = path_spec_factory.Factory.NewPathSpec( definitions.TYPE_INDICATOR_TSK, inode=self._MFT_ENTRY_A_DIRECTORY, location='/a_directory', parent=self._raw_path_spec)