diff --git a/newsfragments/659.feature b/newsfragments/659.feature new file mode 100644 index 000000000..c374981ed --- /dev/null +++ b/newsfragments/659.feature @@ -0,0 +1 @@ +Add format class to read Jungfrau4M serial images from beamline ID29 at ESRF diff --git a/src/dxtbx/format/FormatHDF5ESRFJungfrau4M.py b/src/dxtbx/format/FormatHDF5ESRFJungfrau4M.py new file mode 100644 index 000000000..64f41f5f5 --- /dev/null +++ b/src/dxtbx/format/FormatHDF5ESRFJungfrau4M.py @@ -0,0 +1,142 @@ +from __future__ import annotations + +import sys + +import h5py + +from scitbx.array_family import flex + +from dxtbx import flumpy +from dxtbx.format.FormatHDF5 import FormatHDF5 + + +class FormatHDF5ESRFJungfrau4M(FormatHDF5): + + # A class to understand still-shot images from ESRF collected on a Jungfrau 4M. + + _cached_mask = None + + @staticmethod + def understand(image_file): + with h5py.File(image_file, "r") as h5_handle: + if len(h5_handle) != 1: + return False + key = list(h5_handle.keys())[0] + if "instrument" not in h5_handle[key]: + return False + # instrument name is of form jungfrau4m_rr[X]_smx where X is empty, 4 or another number + instrument = list(h5_handle[key]["instrument"].keys())[0] + if not instrument.startswith("jungfrau4m_rr"): + return False + if not instrument.endswith("smx"): + return False + return True + + def _start(self): + super()._start() + image_file = self.get_image_file() + self._h5_handle = h5py.File(image_file, "r") + self.key = list(self._h5_handle.keys())[0] + self.instrument_name = list(self._h5_handle[self.key]["instrument"].keys())[0] + instrument = self._h5_handle[self.key]["instrument"][self.instrument_name] + self.n_images = instrument["data"].shape[0] + self.adus_per_photon = instrument["detector_information"]["adus_per_photon"] + self.image_size = tuple(instrument["data"].shape[1:]) + wavelength = instrument["beam"]["incident_wavelength"][()] + x_pixel_size = ( + instrument["detector_information"]["x_pixel_size"][()] * 1000 + ) # convert m to mm + y_pixel_size = ( + instrument["detector_information"]["y_pixel_size"][()] * 1000 + ) # convert m to mm + distance = ( + instrument["detector_information"]["detector_distance"][()] * 1000 + ) # convert m to mm + beam_center_x = instrument["detector_information"]["beam_center_x"][()] # in px + beam_center_y = instrument["detector_information"]["beam_center_y"][()] # in px + + beam_center_x *= x_pixel_size + beam_center_y *= y_pixel_size + trusted_range = ( + instrument["detector_information"]["underload_value"][()], + instrument["detector_information"]["saturation_value"][()], + ) + exposure_time = instrument["acquisition"]["exposure_time"][()] + + self._detector_model = self._detector_factory.simple( + sensor="UNKNOWN", + distance=distance, + beam_centre=( + beam_center_x, + beam_center_y, + ), + fast_direction="+x", + slow_direction="-y", + pixel_size=( + x_pixel_size, + y_pixel_size, + ), + image_size=(self.image_size[1], self.image_size[0]), + trusted_range=trusted_range, + mask=self.get_static_mask(), + ) + self._beam_model = self._beam_factory.simple(wavelength) + self._scan_model = self._scan_factory.make_scan( + image_range=(1, self.n_images), + exposure_times=exposure_time, + oscillation=(0.0, 0.0), + epochs=list(range(self.n_images)), + ) + # Add a placeholder goniometer model, which has no practical effect on processing as the oscillation is 0. + # Some dxtbx format logic assumes both or neither scan + goniometer are None for still images + self._goniometer_model = self._goniometer_factory.known_axis((0, 1, 0)) + + def get_raw_data(self, index=None): + if index is None: + index = 0 + # data can be int32 with adus_per_photon != 1.0 or float16 with adus_per_photon == 1.0 + data = ( + ( + self._h5_handle[self.key]["measurement"]["data"][index] + / self.adus_per_photon + ) + if self.adus_per_photon != 1.0 + else self._h5_handle[self.key]["measurement"]["data"][index] + ) + return flex.double(data.astype(float)) + + def get_num_images(self): + return self.n_images + + def get_beam(self, index=None): + return self._beam(index) + + def _beam(self, index=None): + return self._beam_model + + def get_detector(self, index=None): + return self._detector(index) + + def get_static_mask(self): + if FormatHDF5ESRFJungfrau4M._cached_mask is None: + mask = self._h5_handle[self.key]["instrument"][self.instrument_name][ + "detector_information" + ]["pixel_mask"] + mask = flumpy.from_numpy(mask[()]) + mask_array = mask == 0 + FormatHDF5ESRFJungfrau4M._cached_mask = mask_array + return FormatHDF5ESRFJungfrau4M._cached_mask + + def _detector(self, index=None): + return self._detector_model + + def _goniometer(self): + return self._goniometer_model + + def _scan(self): + return self._scan_model + + +if __name__ == "__main__": + for arg in sys.argv[1:]: + print(FormatHDF5ESRFJungfrau4M.understand(arg))