From c9d5757896acf528da259a414e32e3eb9c3030da Mon Sep 17 00:00:00 2001 From: Tom Boissonnet Date: Wed, 5 Jun 2024 18:05:55 +0100 Subject: [PATCH] Getters for High-Content-Sreening (#101) * getters for HCS containers * list images from run * tests for HCS getters * fix test get_well_id * fix screen creation for tests * cleanup of test data * renamed run to plate_acquisition --- ezomero/__init__.py | 8 ++ ezomero/_gets.py | 264 ++++++++++++++++++++++++++++++++++++++++- tests/conftest.py | 123 ++++++++++++++++--- tests/test_ezimport.py | 19 +++ tests/test_gets.py | 179 ++++++++++++++++++++++++++-- 5 files changed, 559 insertions(+), 34 deletions(-) diff --git a/ezomero/__init__.py b/ezomero/__init__.py index ea8f81d..0d30d8e 100644 --- a/ezomero/__init__.py +++ b/ezomero/__init__.py @@ -27,6 +27,10 @@ get_image_ids, get_project_ids, get_dataset_ids, + get_screen_ids, + get_plate_ids, + get_well_ids, + get_plate_acquisition_ids, get_map_annotation_ids, get_map_annotation, get_file_annotation_ids, @@ -58,6 +62,10 @@ 'get_image_ids', 'get_project_ids', 'get_dataset_ids', + 'get_screen_ids', + 'get_plate_ids', + 'get_well_ids', + 'get_plate_acquisition_ids', 'get_map_annotation_ids', 'get_map_annotation', 'get_file_annotation_ids', diff --git a/ezomero/_gets.py b/ezomero/_gets.py index 2131e3b..f95e8d7 100644 --- a/ezomero/_gets.py +++ b/ezomero/_gets.py @@ -332,6 +332,7 @@ def get_image_ids(conn: BlitzGateway, project: Optional[int] = None, dataset: Optional[int] = None, plate: Optional[int] = None, well: Optional[int] = None, + plate_acquisition: Optional[int] = None, across_groups: Optional[bool] = True) -> List[int]: """Return a list of image ids based on image container @@ -351,6 +352,8 @@ def get_image_ids(conn: BlitzGateway, project: Optional[int] = None, all images contained in all Wells belonging to the specified Plate. well : int, optional ID of Well from which to return image IDs. + plate_acquisition : int, optional + ID of Plate acquisition from which to return image IDs. across_groups : bool, optional Defines cross-group behavior of function - set to ``False`` to disable it. @@ -366,8 +369,8 @@ def get_image_ids(conn: BlitzGateway, project: Optional[int] = None, ``ezomero.set_group`` to specify group prior to passing the `conn` object to this function. - Only one of Project, Dataset, Plate, or Well can be specified. If none of - those are specified, orphaned images are returned. + Only one of Project, Dataset, Plate, Well or Plate acquisition can be + specified. If none of those are specified, orphaned images are returned. Examples -------- @@ -380,12 +383,12 @@ def get_image_ids(conn: BlitzGateway, project: Optional[int] = None, >>> ds_ims = get_image_ids(conn, dataset=448) """ arg_counter = 0 - for arg in [project, dataset, plate, well]: + for arg in [project, dataset, plate, well, plate_acquisition]: if arg is not None: arg_counter += 1 if arg_counter > 1: raise ValueError('Only one of Project/Dataset/Plate/Well' - ' can be specified') + '/PlateAcquisition can be specified') q = conn.getQueryService() params = Parameters() @@ -441,6 +444,18 @@ def get_image_ids(conn: BlitzGateway, project: Optional[int] = None, params, conn.SERVICE_OPTS ) + elif plate_acquisition is not None: + if not isinstance(plate_acquisition, int): + raise TypeError('Plate acquisition ID must be integer') + params.map = {"plate_acquisition": rlong(plate_acquisition)} + results = q.projection( + "SELECT i.id FROM WellSample ws" + " JOIN ws.image i" + " JOIN ws.plateAcquisition pa" + " WHERE pa.id=:plate_acquisition", + params, + conn.SERVICE_OPTS + ) else: results = q.projection( "SELECT i.id FROM Image i" @@ -550,6 +565,247 @@ def get_dataset_ids(conn: BlitzGateway, project: Optional[int] = None, return [r[0].val for r in results] +@do_across_groups +def get_screen_ids(conn: BlitzGateway, + across_groups: Optional[bool] = True) -> List[int]: + """Return a list with IDs for all available Screens. + + Parameters + ---------- + conn : ``omero.gateway.BlitzGateway`` object + OMERO connection. + across_groups : bool, optional + Defines cross-group behavior of function - set to + ``False`` to disable it. + + Returns + ------- + scrn_ids : list of ints + List of screen IDs accessible by current user. + + Examples + -------- + # Return IDs of all screens accessible by current user: + + >>> scrn_ids = get_screen_ids(conn) + """ + + scrn_ids = [] + for s in conn.listScreens(): + scrn_ids.append(s.getId()) + return scrn_ids + + +@do_across_groups +def get_plate_ids(conn: BlitzGateway, screen: Optional[int] = None, + across_groups: Optional[bool] = True) -> List[int]: + """Return a list of plate ids based on screen ID. + + If no screen is specified, function will return orphan plates. + + Parameters + ---------- + conn : ``omero.gateway.BlitzGateway`` object + OMERO connection. + screen : int, optional + ID of Screen from which to return plate IDs. This will return IDs of + all plates contained in the specified Screen. + across_groups : bool, optional + Defines cross-group behavior of function - set to + ``False`` to disable it. + + Returns + ------- + pl_ids : list of ints + List of plates IDs contained in the specified screen. + + Examples + -------- + # Return orphaned plates: + + >>> orphans = get_plate_ids(conn) + + # Return IDs of all plates from Screen with ID 224: + + >>> pl_ids = get_plate_ids(conn, screen=224) + """ + + q = conn.getQueryService() + params = Parameters() + + if screen is not None: + if not isinstance(screen, int): + raise TypeError('Screen ID must be integer') + params.map = {"screen": rlong(screen)} + results = q.projection( + "SELECT p.id FROM Screen s" + " JOIN s.plateLinks spl" + " JOIN spl.child p" + " WHERE s.id=:screen", + params, + conn.SERVICE_OPTS + ) + else: + results = q.projection( + "SELECT p.id FROM Plate p" + " WHERE NOT EXISTS (" + " SELECT spl FROM ScreenPlateLink spl" + " WHERE spl.child=p.id" + " )", + params, + conn.SERVICE_OPTS + ) + return [r[0].val for r in results] + + +@do_across_groups +def get_well_ids(conn: BlitzGateway, screen: Optional[int] = None, + plate: Optional[int] = None, + across_groups: Optional[bool] = True) -> List[int]: + """Return a list of well ids based on a container + + Parameters + ---------- + conn : ``omero.gateway.BlitzGateway`` object + OMERO connection. + screen : int, optional + ID of Screen from which to return well IDs. This will return IDs of + all wells contained in the specified Screen. + plate : int, optional + ID of Plate from which to return well IDs. This will return IDs of + all wells belonging to the specified Plate. + across_groups : bool, optional + Defines cross-group behavior of function - set to + ``False`` to disable it. + + Returns + ------- + wl_ids : list of ints + List of wells IDs contained in the specified container. + + Examples + -------- + # Return IDs of all wells from Screen with ID 224: + + >>> wl_ids = get_well_ids(conn, screen=224) + """ + arg_counter = 0 + for arg in [screen, plate]: + if arg is not None: + arg_counter += 1 + if arg_counter > 1: + raise ValueError('Only one of Screen/Plate' + ' can be specified') + elif arg_counter == 0: + raise ValueError('One of Screen/Plate' + ' must be specified') + + q = conn.getQueryService() + params = Parameters() + + if screen is not None: + if not isinstance(screen, int): + raise TypeError('Screen ID must be integer') + params.map = {"screen": rlong(screen)} + results = q.projection( + "SELECT w.id FROM Screen s" + " JOIN s.plateLinks spl" + " JOIN spl.child p" + " JOIN p.wells w" + " WHERE s.id=:screen", + params, + conn.SERVICE_OPTS + ) + elif plate is not None: + if not isinstance(plate, int): + raise TypeError('Plate ID must be integer') + params.map = {"plate": rlong(plate)} + results = q.projection( + "SELECT w.id FROM Plate p" + " JOIN p.wells w" + " WHERE p.id=:plate", + params, + conn.SERVICE_OPTS + ) + return [r[0].val for r in results] + + +@do_across_groups +def get_plate_acquisition_ids( + conn: BlitzGateway, screen: Optional[int] = None, + plate: Optional[int] = None, + across_groups: Optional[bool] = True +) -> List[int]: + """Return a list of plate acquisition ids based on a container + + Parameters + ---------- + conn : ``omero.gateway.BlitzGateway`` object + OMERO connection. + screen : int, optional + ID of Screen from which to return plate acquisition IDs. + This will return IDs of all plate acquisitions contained + in the specified Screen. + plate : int, optional + ID of Plate from which to return plate acquisition IDs. + This will return IDs of all plate acquisitions belonging + to the specified Plate. + across_groups : bool, optional + Defines cross-group behavior of function - set to + ``False`` to disable it. + + Returns + ------- + plate_acquisition_ids : list of ints + List of plate acquisitions IDs contained in the specified container. + + Examples + -------- + # Return IDs of all plate acquisitions from Screen with ID 224: + + >>> plate_acquisition_ids = get_plate_acquisition_ids(conn, screen=224) + """ + arg_counter = 0 + for arg in [screen, plate]: + if arg is not None: + arg_counter += 1 + if arg_counter > 1: + raise ValueError('Only one of Screen/Plate' + ' can be specified') + elif arg_counter == 0: + raise ValueError('One of Screen/Plate' + ' must be specified') + + q = conn.getQueryService() + params = Parameters() + + if screen is not None: + if not isinstance(screen, int): + raise TypeError('Screen ID must be integer') + params.map = {"screen": rlong(screen)} + results = q.projection( + "SELECT r.id FROM Screen s" + " JOIN s.plateLinks spl" + " JOIN spl.child p" + " JOIN p.plateAcquisitions r" + " WHERE s.id=:screen", + params, + conn.SERVICE_OPTS + ) + elif plate is not None: + if not isinstance(plate, int): + raise TypeError('Plate ID must be integer') + params.map = {"plate": rlong(plate)} + results = q.projection( + "SELECT r.id FROM Plate p" + " JOIN p.plateAcquisitions r" + " WHERE p.id=:plate", + params, + conn.SERVICE_OPTS + ) + return [r[0].val for r in results] + + @do_across_groups def get_map_annotation_ids(conn: BlitzGateway, object_type: str, object_id: int, ns: Optional[str] = None, diff --git a/tests/conftest.py b/tests/conftest.py index 40cda8d..946f3fb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -9,7 +9,7 @@ from omero.gateway import BlitzGateway from omero.gateway import ScreenWrapper, PlateWrapper from omero.model import ScreenI, PlateI, WellI, WellSampleI, ImageI -from omero.model import ScreenPlateLinkI +from omero.model import ScreenPlateLinkI, PlateAcquisitionI from omero.plugins.sessions import SessionsControl from omero.plugins.user import UserControl from omero.plugins.group import GroupControl @@ -437,6 +437,9 @@ def project_structure(conn, timestamp, image_fixture, users_groups, yield [project_info, dataset_info, image_info] current_group = conn.getGroupFromContext().getId() conn.SERVICE_OPTS.setOmeroGroup(-1) + for dname, did in dataset_info: + conn.deleteObjects("Dataset", [did], deleteAnns=True, + deleteChildren=True, wait=True) for pname, pid in project_info: conn.deleteObjects("Project", [pid], deleteAnns=True, deleteChildren=True, wait=True) @@ -454,7 +457,7 @@ def screen_structure(conn, timestamp, image_fixture): screen.save() screen_id = screen.getId() # Create Plate - plate_name = "plate_" + timestamp + plate_name = "plate1_" + timestamp plate = PlateWrapper(conn, PlateI()) plate.setName(plate_name) plate.save() @@ -464,42 +467,128 @@ def screen_structure(conn, timestamp, image_fixture): link.setChild(PlateI(plate_id, False)) update_service.saveObject(link) + # Create second Plate + plate2_name = "plate2_" + timestamp + plate2 = PlateWrapper(conn, PlateI()) + plate2.setName(plate2_name) + plate2.save() + plate2_id = plate2.getId() + link = ScreenPlateLinkI() + link.setParent(ScreenI(screen_id, False)) + link.setChild(PlateI(plate2_id, False)) + update_service.saveObject(link) + # Create Well (row 1, col 1) - well = WellI() - well.setPlate(PlateI(plate_id, False)) - well.setColumn(rint(1)) - well.setRow(rint(1)) - well.setPlate(PlateI(plate_id, False)) + well1 = WellI() + well1.setPlate(PlateI(plate_id, False)) + well1.setColumn(rint(1)) + well1.setRow(rint(1)) # Create another Well (row 2, col 2) well2 = WellI() well2.setPlate(PlateI(plate_id, False)) well2.setColumn(rint(2)) well2.setRow(rint(2)) - well2.setPlate(PlateI(plate_id, False)) + + # Create Well for second plate + well3 = WellI() + well3.setPlate(PlateI(plate2_id, False)) + well3.setColumn(rint(2)) + well3.setRow(rint(2)) + + # Create PlateAcquisition/Run for plate 1 + run1 = PlateAcquisitionI() + run1.setPlate(PlateI(plate_id, False)) + run2 = PlateAcquisitionI() + run2.setPlate(PlateI(plate_id, False)) + + # Create PlateAcquisition/Run for plate 2 + run3 = PlateAcquisitionI() + run3.setPlate(PlateI(plate2_id, False)) + + well1 = update_service.saveAndReturnObject(well1) + well2 = update_service.saveAndReturnObject(well2) + well3 = update_service.saveAndReturnObject(well3) + well1_id = well1.getId().getValue() + well2_id = well2.getId().getValue() + well3_id = well3.getId().getValue() + + run1 = update_service.saveAndReturnObject(run1) + run2 = update_service.saveAndReturnObject(run2) + run3 = update_service.saveAndReturnObject(run3) + run1_id = run1.getId().getValue() + run2_id = run2.getId().getValue() + run3_id = run3.getId().getValue() # Create Well Sample with Image for both wells ws = WellSampleI() im_id1 = ezomero.post_image(conn, image_fixture, "well image") ws.setImage(ImageI(im_id1, False)) - well.addWellSample(ws) + well1.addWellSample(ws) + run1.addWellSample(ws) ws2 = WellSampleI() im_id2 = ezomero.post_image(conn, image_fixture, "well image2") ws2.setImage(ImageI(im_id2, False)) well2.addWellSample(ws2) - - well_obj = update_service.saveAndReturnObject(well) - well2_obj = update_service.saveAndReturnObject(well2) - - well_id = well_obj.getId().getValue() - well2_id = well2_obj.getId().getValue() - - yield [plate_id, well_id, im_id1, screen_id, well2_id, im_id2] + run1.addWellSample(ws2) + + ws3 = WellSampleI() + im_id3 = ezomero.post_image(conn, image_fixture, "well image3") + ws3.setImage(ImageI(im_id3, False)) + well1.addWellSample(ws3) + run2.addWellSample(ws3) + + ws4 = WellSampleI() + im_id4 = ezomero.post_image(conn, image_fixture, "well image4") + ws4.setImage(ImageI(im_id4, False)) + well2.addWellSample(ws4) + run2.addWellSample(ws4) + + ws5 = WellSampleI() + im_id5 = ezomero.post_image(conn, image_fixture, "well image5") + ws5.setImage(ImageI(im_id5, False)) + well3.addWellSample(ws5) + run3.addWellSample(ws5) + + # One call for each plate is enough to update + well1 = update_service.saveAndReturnObject(well1) + well3 = update_service.saveAndReturnObject(well3) + + # Create OrphanPlate + plate3_name = "plate3_" + timestamp + plate3 = PlateWrapper(conn, PlateI()) + plate3.setName(plate3_name) + plate3.save() + plate3_id = plate3.getId() + + # Create Well fr orphan plate + well4 = WellI() + well4.setPlate(PlateI(plate3_id, False)) + well4.setColumn(rint(1)) + well4.setRow(rint(1)) + + # Create Well Sample with Image + ws6 = WellSampleI() + im_id6 = ezomero.post_image(conn, image_fixture, "well image6") + ws6.setImage(ImageI(im_id6, False)) + well4.addWellSample(ws6) + + well4 = update_service.saveAndReturnObject(well4) + well4_id = well4.getId().getValue() + + yield [screen_id, plate_id, plate2_id, plate3_id, + run1_id, run2_id, run3_id, + well1_id, im_id1, im_id3, + well2_id, im_id2, im_id4, + well3_id, im_id5, + well4_id, im_id6] current_group = conn.getGroupFromContext().getId() conn.SERVICE_OPTS.setOmeroGroup(-1) conn.deleteObjects("Screen", [screen_id], deleteAnns=True, deleteChildren=True, wait=True) + conn.deleteObjects("Plate", [plate3_id], deleteAnns=True, + deleteChildren=True, wait=True) conn.SERVICE_OPTS.setOmeroGroup(current_group) diff --git a/tests/test_ezimport.py b/tests/test_ezimport.py index a6256f6..54e1007 100644 --- a/tests/test_ezimport.py +++ b/tests/test_ezimport.py @@ -4,6 +4,24 @@ # Test imports +def test_cleanup(conn): + ids = ezomero.get_project_ids(conn) + if len(ids) > 0: + conn.deleteObjects("Project", ids, deleteChildren=True) + ids = ezomero.get_dataset_ids(conn) + if len(ids) > 0: + conn.deleteObjects("Dataset", ids, deleteChildren=True) + ids = ezomero.get_image_ids(conn) + if len(ids) > 0: + conn.deleteObjects("Image", ids) + ids = ezomero.get_screen_ids(conn) + if len(ids) > 0: + conn.deleteObjects("Screen", ids, deleteChildren=True) + ids = ezomero.get_plate_ids(conn) + if len(ids) > 0: + conn.deleteObjects("Plate", ids, deleteChildren=True) + + def test_ezimport(conn, monkeypatch): # test simple import, single file @@ -30,6 +48,7 @@ def test_ezimport(conn, monkeypatch): monkeypatch.setattr('sys.stdin', io) id = ezomero.ezimport(conn, fpath) assert len(id) == 2 + conn.deleteObjects("Image", id) # # test simple import, new orphan dataset fpath = "tests/data/test_pyramid.ome.tif" diff --git a/tests/test_gets.py b/tests/test_gets.py index 9046c03..2cf44d2 100644 --- a/tests/test_gets.py +++ b/tests/test_gets.py @@ -229,17 +229,68 @@ def test_get_image_ids(conn, project_structure, screen_structure, bad_im_ids = ezomero.get_image_ids(conn, dataset=999999) assert not bad_im_ids + # screen_id = screen_structure[0] + plate_id = screen_structure[1] + plate2_id = screen_structure[2] + plate3_id = screen_structure[3] + pacq_1_id = screen_structure[4] + pacq_2_id = screen_structure[5] + pacq_3_id = screen_structure[6] + well_id1 = screen_structure[7] + well_im_id1 = screen_structure[8] + well_im_id3 = screen_structure[9] + well_id2 = screen_structure[10] + well_im_id2 = screen_structure[11] + well_im_id4 = screen_structure[12] + well_id3 = screen_structure[13] + well_im_id5 = screen_structure[14] + well_id4 = screen_structure[15] + well_im_id6 = screen_structure[16] + # Based on well ID - well_id = screen_structure[1] - plate_im_id1 = screen_structure[2] - well_im_ids = ezomero.get_image_ids(conn, well=well_id) - assert set(well_im_ids) == set([plate_im_id1]) + well_im_ids = ezomero.get_image_ids(conn, well=well_id1) + assert set(well_im_ids) == set([well_im_id1, well_im_id3]) + well_im_ids = ezomero.get_image_ids(conn, well=well_id2) + assert set(well_im_ids) == set([well_im_id2, well_im_id4]) + well_im_ids = ezomero.get_image_ids(conn, well=well_id3) + assert set(well_im_ids) == set([well_im_id5]) + well_im_ids = ezomero.get_image_ids(conn, well=well_id4) + assert set(well_im_ids) == set([well_im_id6]) + + # Based on plateAcquistion ID + pacq_im_ids = ezomero.get_image_ids(conn, plate_acquisition=pacq_1_id) + assert set(pacq_im_ids) == set([well_im_id1, well_im_id2]) + pacq_im_ids = ezomero.get_image_ids(conn, plate_acquisition=pacq_2_id) + assert set(pacq_im_ids) == set([well_im_id3, well_im_id4]) + pacq_im_ids = ezomero.get_image_ids(conn, plate_acquisition=pacq_3_id) + assert set(pacq_im_ids) == set([well_im_id5]) # Based on plate ID - plate_id = screen_structure[0] - plate_im_id2 = screen_structure[5] plate_im_ids = ezomero.get_image_ids(conn, plate=plate_id) - assert set(plate_im_ids) == set([plate_im_id1, plate_im_id2]) + assert set(plate_im_ids) == set([well_im_id1, well_im_id2, + well_im_id3, well_im_id4]) + plate_im_ids = ezomero.get_image_ids(conn, plate=plate2_id) + assert set(plate_im_ids) == set([well_im_id5]) + plate_im_ids = ezomero.get_image_ids(conn, plate=plate3_id) + assert set(plate_im_ids) == set([well_im_id6]) + + # Based on screen ID: not implemented + # screen_im_ids = ezomero.get_image_ids(conn, screen=screen_id) + # assert set(screen_im_ids) == set([well_im_id1, well_im_id2, + # well_im_id3, well_im_id4]) + + with pytest.raises(TypeError): + _ = ezomero.get_image_ids(conn, project='test') + with pytest.raises(TypeError): + _ = ezomero.get_image_ids(conn, dataset='test') + # with pytest.raises(TypeError): + # _ = ezomero.get_image_ids(conn, screen='test') + with pytest.raises(TypeError): + _ = ezomero.get_image_ids(conn, plate='test') + with pytest.raises(TypeError): + _ = ezomero.get_image_ids(conn, well='test') + with pytest.raises(TypeError): + _ = ezomero.get_image_ids(conn, plate_acquisition='test') def test_get_project_ids(conn, project_structure, users_groups): @@ -282,6 +333,99 @@ def test_get_dataset_ids(conn, project_structure, users_groups): assert not bad_im_ids +def test_get_screen_ids(conn, screen_structure): + + screen_id = screen_structure[0] + + screen_ids = ezomero.get_screen_ids(conn) + assert set(screen_ids) == set([screen_id]) + + +def test_get_plate_ids(conn, screen_structure): + + screen_id = screen_structure[0] + plate_id = screen_structure[1] + plate2_id = screen_structure[2] + orphan_plate_id = screen_structure[3] + + with pytest.raises(TypeError): + _ = ezomero.get_plate_ids(conn, screen='test') + + # Test orphans + orphan_ids = ezomero.get_plate_ids(conn) + assert set(orphan_ids) == set([orphan_plate_id]) + + # Based on screen ID + screen_pl_ids = ezomero.get_plate_ids(conn, screen=screen_id) + assert set(screen_pl_ids) == set([plate_id, plate2_id]) + + # Return nothing on bad input + bad_pl_ids = ezomero.get_plate_ids(conn, screen=999999) + assert not bad_pl_ids + + +def test_get_well_ids(conn, screen_structure): + + screen_id = screen_structure[0] + plate_id = screen_structure[1] + plate2_id = screen_structure[2] + plate3_id = screen_structure[3] + well_id1 = screen_structure[7] + well_id2 = screen_structure[10] + well_id3 = screen_structure[13] + well_id4 = screen_structure[15] + + with pytest.raises(TypeError): + _ = ezomero.get_well_ids(conn, screen='test') + with pytest.raises(TypeError): + _ = ezomero.get_well_ids(conn, plate='test') + + # Based on screen ID + screen_wl_ids = ezomero.get_well_ids(conn, screen=screen_id) + assert set(screen_wl_ids) == set([well_id1, well_id2, well_id3]) + + # Based on plate ID + plate_wl_ids = ezomero.get_well_ids(conn, plate=plate_id) + assert set(plate_wl_ids) == set([well_id1, well_id2]) + plate_wl_ids = ezomero.get_well_ids(conn, plate=plate2_id) + assert set(plate_wl_ids) == set([well_id3]) + plate_wl_ids = ezomero.get_well_ids(conn, plate=plate3_id) + assert set(plate_wl_ids) == set([well_id4]) + + # Return nothing on bad input + bad_ids = ezomero.get_well_ids(conn, screen=999999) + assert not bad_ids + + +def test_get_plate_acquisition_ids(conn, screen_structure): + + screen_id = screen_structure[0] + plate_id = screen_structure[1] + plate2_id = screen_structure[2] + pacq_id1 = screen_structure[4] + pacq_id2 = screen_structure[5] + pacq_id3 = screen_structure[6] + + with pytest.raises(TypeError): + _ = ezomero.get_plate_acquisition_ids(conn, screen='test') + with pytest.raises(TypeError): + _ = ezomero.get_plate_acquisition_ids(conn, plate='test') + + # Based on screen ID + screen_pacq_ids = ezomero.get_plate_acquisition_ids(conn, screen=screen_id) + assert set(screen_pacq_ids) == set([pacq_id1, pacq_id2, pacq_id3]) + + # Based on plate ID + plate_pacq_ids = ezomero.get_plate_acquisition_ids(conn, plate=plate_id) + assert set(plate_pacq_ids) == set([pacq_id1, pacq_id2]) + plate_pacq_ids = ezomero.get_plate_acquisition_ids(conn, plate=plate2_id) + assert set(plate_pacq_ids) == set([pacq_id3]) + + # Return nothing on bad input + bad_ids = ezomero.get_plate_acquisition_ids(conn, screen=999999) + assert not bad_ids + + def test_get_image_ids_params(conn): with pytest.raises(ValueError): _ = ezomero.get_image_ids(conn, project=1, plate=2) @@ -428,13 +572,22 @@ def test_get_file_annotation_and_ids(conn, project_structure, tmp_path): def test_get_well_id(conn, screen_structure): - plate_id = screen_structure[0] - well_id = screen_structure[1] - well2_id = screen_structure[4] - well_id_result = ezomero.get_well_id(conn, plate_id, row=1, column=1) + plate_id = screen_structure[1] + plate2_id = screen_structure[2] + plate3_id = screen_structure[3] + well_id1 = screen_structure[7] + well_id2 = screen_structure[10] + well_id3 = screen_structure[13] + well_id4 = screen_structure[15] + + well1_id_result = ezomero.get_well_id(conn, plate_id, row=1, column=1) well2_id_result = ezomero.get_well_id(conn, plate_id, row=2, column=2) - assert well_id == well_id_result - assert well2_id == well2_id_result + well3_id_result = ezomero.get_well_id(conn, plate2_id, row=2, column=2) + well4_id_result = ezomero.get_well_id(conn, plate3_id, row=1, column=1) + assert well_id1 == well1_id_result + assert well_id2 == well2_id_result + assert well_id3 == well3_id_result + assert well_id4 == well4_id_result assert ezomero.get_well_id(conn, plate_id, row=5, column=9) is None