Skip to content

Commit

Permalink
Add High Memory processor
Browse files Browse the repository at this point in the history
  • Loading branch information
rosswhitfield committed Apr 12, 2024
1 parent dabe4cf commit ace431a
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 1 deletion.
4 changes: 4 additions & 0 deletions postprocessing/processors/reduction_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,7 @@ def __call__(self):
logging.error(f"reduce: {sys.exc_info()[1]}")
self.data["error"] = f"Reduction: {sys.exc_info()[1]} "
self.send(ReductionProcessor.ERROR_QUEUE, json.dumps(self.data))


class ReductionProcessorHighMemory(ReductionProcessor):
_message_queue = "/queue/REDUCTION.HIMEM.DATA_READY"
1 change: 1 addition & 0 deletions tests/integration/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ RUN mkdir -p /opt/postprocessing/log && \
mkdir -p /SNS/EQSANS/IPTS-10674/0/30892/NeXus && \
mkdir -p /SNS/EQSANS/IPTS-10674/shared/autoreduce && \
touch /SNS/EQSANS/IPTS-10674/0/30892/NeXus/EQSANS_30892_event.nxs && \
touch /SNS/EQSANS/IPTS-10674/0/30892/NeXus/EQSANS_30893_event.nxs && \
mkdir -p /SNS/EQSANS/shared/autoreduce && \
echo "import sys;print(sys.argv[1:])" > /SNS/EQSANS/shared/autoreduce/reduce_EQSANS.py && \
\
Expand Down
3 changes: 2 additions & 1 deletion tests/integration/post_processing.conf
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
"calvera_processor.CalveraProcessor",
"calvera_processor.CalveraReducedProcessor",
"create_reduction_script_processor.CreateReductionScriptProcessor",
"reduction_processor.ReductionProcessor"
"reduction_processor.ReductionProcessor",
"reduction_processor.ReductionProcessorHighMemory"
],
"calvera_ingest_url": "http://not-valid.localhost:12345"
}
49 changes: 49 additions & 0 deletions tests/integration/test_data_ready.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,52 @@ def test_reduction_error():
assert msg["facility"] == message["facility"]
assert msg["data_file"] == message["data_file"]
assert msg["error"] == "REDUCTION: This is an ERROR!"


def test_reduction_high_memory():
message = {
"run_number": "30893",
"instrument": "EQSANS",
"ipts": "IPTS-10674",
"facility": "SNS",
"data_file": "/SNS/EQSANS/IPTS-10674/0/30892/NeXus/EQSANS_30893_event.nxs",
}

conn = stomp.Connection(host_and_ports=[("localhost", 61613)])

listener = stomp.listener.TestListener()
conn.set_listener("", listener)

try:
conn.connect()
except stomp.exception.ConnectFailedException:
pytest.skip("Requires activemq running")

# expect a reduction complete
conn.subscribe("/queue/REDUCTION.COMPLETE", id="123", ack="auto")

# send data ready
conn.send("/queue/REDUCTION.HIMEM.DATA_READY", json.dumps(message).encode())

listener.wait_for_message()

conn.disconnect()

header, body = listener.get_latest_message()

msg = json.loads(body)
assert msg["run_number"] == message["run_number"]
assert msg["instrument"] == message["instrument"]
assert msg["ipts"] == message["ipts"]
assert msg["facility"] == message["facility"]
assert msg["data_file"] == message["data_file"]

# we can also check that the reduction did run by checking the reduction_log
reduction_log = docker_exec_and_cat(
"/SNS/EQSANS/IPTS-10674/shared/autoreduce/reduction_log/EQSANS_30893_event.nxs.log"
)

assert (
reduction_log
== "['/SNS/EQSANS/IPTS-10674/0/30892/NeXus/EQSANS_30893_event.nxs', '/SNS/EQSANS/IPTS-10674/shared/autoreduce/']\n"
)

0 comments on commit ace431a

Please sign in to comment.