Skip to content

Commit

Permalink
DIRAC management (#103)
Browse files Browse the repository at this point in the history
* More gracefully handle exceptions when initializing DIRAC

---------

Co-authored-by: Jean-Philippe Lenain <[email protected]>
  • Loading branch information
jlenain and jlenain authored Feb 2, 2024
1 parent a31bad2 commit 8cb6ff9
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 71 deletions.
20 changes: 12 additions & 8 deletions src/nectarchain/data/management.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
# The DIRAC magic 2 lines !
try:
import DIRAC

DIRAC.initialize()
except ImportError:
pass

import glob
import logging
import os
Expand All @@ -24,6 +16,18 @@

__all__ = ["DataManagement"]

# The DIRAC magic 2 lines !
try:
import DIRAC

DIRAC.initialize()
except ImportError:
log.warning("DIRAC probably not installed")
pass
except Exception as e:
log.warning(f"DIRAC could not be properly initialized: {e}")
pass


class DataManagement:
@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

function usage ()
{
echo "Usage: `basename $0` -r <run number>"
echo "Usage: $(basename $0) -r <run number>"
}

function help ()
Expand Down Expand Up @@ -55,6 +55,7 @@ function exit_script() {
# Some cleanup before leaving:
# [ -d $CONTAINER ] && rm -rf $CONTAINER
# [ -f $CONTAINER ] && rm -f $CONTAINER
[ -d $NECTARCAMDATA ] && rm -rf $NECTARCAMDATA
[ -d $OUTDIR ] && rm -rf $OUTDIR
[ -f ${OUTDIR}.tar.gz ] && rm -f ${OUTDIR}.tar.gz
[ -d ${OUTDIR} ] && rm -rf ${OUTDIR}
Expand All @@ -63,22 +64,29 @@ function exit_script() {
exit $return_code
}

export NECTARCAMDATA=$PWD/runs
[ ! -d $NECTARCAMDATA ] && mkdir -p $NECTARCAMDATA || exit_script $?
mv nectarcam*.sqlite NectarCAM.Run*.fits.fz $NECTARCAMDATA/.

# Halim's DQM code needs to use a specific output directory:
export NECTARDIR=$PWD/$OUTDIR
[ ! -d $NECTARDIR ] && mkdir -p $NECTARDIR || exit_script $?
# mv nectarcam*.sqlite NectarCAM.Run*.fits.fz $NECTARDIR/.

LISTRUNS=""
for run in $PWD/NectarCAM.Run${runnb}.*.fits.fz; do
LISTRUNS="$LISTRUNS $(basename $run)"
done
#LISTRUNS=""
#for run in $NECTARCAMDATA/NectarCAM.Run${runnb}.*.fits.fz; do
# LISTRUNS="$LISTRUNS $(basename $run)"
#done

# Create a wrapper BASH script with cleaned environment, see https://redmine.cta-observatory.org/issues/51483
cat > $WRAPPER <<EOF
#!/bin/env bash
echo "Cleaning environment \$CLEANED_ENV"
[ -z "\$CLEANED_ENV" ] && exec /bin/env -i CLEANED_ENV="Done" HOME=\${HOME} SHELL=/bin/bash /bin/bash -l "\$0" "\$@"
# From https://github.com/DIRACGrid/COMDIRAC/wiki/Injob
# initialize job for COMDIRAC commands
export DCOMMANDS_CONFIG_DIR=$PWD
dconfig --guess
dinit --fromProxy
# Some environment variables related to python, to be passed to container, be it for old Singularity version or recent Apptainer ones:
export SINGULARITYENV_MPLCONFIGDIR=/tmp
Expand All @@ -102,7 +110,8 @@ fi
echo
echo "Running"
# Instantiate the nectarchain Singularity image, run our DQM example run within it:
cmd="\$CALLER exec --home $PWD $CONTAINER /opt/conda/envs/nectarchain/bin/python /opt/cta/nectarchain/src/nectarchain/dqm/start_calib.py $PWD $NECTARDIR -i $LISTRUNS"
# cmd="\$CALLER exec --home $PWD $CONTAINER /opt/conda/envs/nectarchain/bin/python /opt/cta/nectarchain/src/nectarchain/dqm/start_dqm.py --r0 $NECTARCAMDATA $NECTARDIR -i $LISTRUNS"
cmd="\$CALLER exec --home $PWD $CONTAINER /opt/conda/envs/nectarchain/bin/python /opt/cta/nectarchain/src/nectarchain/dqm/start_dqm.py --r0 -r $runnb $NECTARCAMDATA $NECTARDIR"
echo \$cmd
eval \$cmd
EOF
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,64 +4,72 @@
# Time-stamp: "2023-05-30 13:09:04 jlenain"

import argparse
import sys
import logging
import sys
from time import sleep

# The magic DIRAC 2 lines
import DIRAC

# astropy imports
from astropy import time
from astropy import units as u

DIRAC.initialize()

# DIRAC imports
from DIRAC.Interfaces.API.Dirac import Dirac
from DIRAC.Interfaces.API.Job import Job
from DIRAC.Resources.Catalog.FileCatalogClient import FileCatalogClient

logging.basicConfig(format='[%(levelname)s] %(message)s')
logging.basicConfig(format="[%(levelname)s] %(message)s")
logger = logging.getLogger(__name__)

dirac = Dirac()

# Option and argument parser
parser = argparse.ArgumentParser()
parser.add_argument('-d', '--date',
default=None,
help='date for which NectarCAM runs should be processed',
type=str)
parser.add_argument('-r', '--run',
default=None,
help='only process a specific run (optional)',
type=str)
parser.add_argument('--dry-run',
action='store_true',
default=False,
help='dry run (does not actually submit jobs)')
parser.add_argument('--log',
default='info',
help='debug output',
type=str)
parser.add_argument(
"-d",
"--date",
default=None,
help="date for which NectarCAM runs should be processed",
type=str,
)
parser.add_argument(
"-r", "--run", default=None, help="only process a specific run (optional)", type=str
)
parser.add_argument(
"--dry-run",
action="store_true",
default=False,
help="dry run (does not actually submit jobs)",
)
parser.add_argument("--log", default="info", help="debug output", type=str)
args = parser.parse_args()

logger.setLevel(args.log.upper())

if args.date is None:
logger.critical('A date should be provided, in a format astropy.time.Time compliant. E.g. "2022-04-01".')
logger.critical(
'A date should be provided, in a format astropy.time.Time compliant. E.g. "2022-04-01".'
)
sys.exit(1)

executable_wrapper="dqm_processor.sh"
executable_wrapper = "dqm_processor.sh"

## Possible massive job processing via loop on run numbers:
# for run in ['2720', '3277', '...']:

## or from DIRAC FileCatalog directory listing:
processDate = time.Time(args.date)
dfcDir = f'/vo.cta.in2p3.fr/nectarcam/{processDate.ymdhms[0]}/{processDate.ymdhms[0]}{str(processDate.ymdhms[1]).zfill(2)}{str(processDate.ymdhms[2]).zfill(2)}'
dfcDir = f"/vo.cta.in2p3.fr/nectarcam/{processDate.ymdhms[0]}/{processDate.ymdhms[0]}{str(processDate.ymdhms[1]).zfill(2)}{str(processDate.ymdhms[2]).zfill(2)}"

# The relevant DB file may be stored in the directory corresponding to the day after:
processDateTomorrow = processDate + 1. * u.day
dfcDirTomorrow = f'/vo.cta.in2p3.fr/nectarcam/{processDateTomorrow.ymdhms[0]}/{processDateTomorrow.ymdhms[0]}{str(processDateTomorrow.ymdhms[1]).zfill(2)}{str(processDateTomorrow.ymdhms[2]).zfill(2)}'
processDateTomorrow = processDate + 1.0 * u.day
dfcDirTomorrow = f"/vo.cta.in2p3.fr/nectarcam/{processDateTomorrow.ymdhms[0]}/{processDateTomorrow.ymdhms[0]}{str(processDateTomorrow.ymdhms[1]).zfill(2)}{str(processDateTomorrow.ymdhms[2]).zfill(2)}"

# Sometimes, for unkown reason, the connection to the DFC can fail, try a few times:
# Sometimes, for unknown reason, the connection to the DFC can fail, try a few times:
sleep_time = 2
num_retries = 3
for x in range(0, num_retries):
Expand All @@ -76,77 +84,91 @@
else:
break
if not dfc:
logger.fatal(f'Connection to FileCatalogClient failed, aborting...')
logger.fatal(f"Connection to FileCatalogClient failed, aborting...")
sys.exit(1)

infos = dfc.listDirectory(dfcDir)
infosTomorrow = dfc.listDirectory(dfcDirTomorrow)
if not infos['OK'] or not infos['Value']['Successful']:
logger.critical(f"Could not properly retrieve the file metadata for {dfcDir} ... Exiting !")
if not infos["OK"] or not infos["Value"]["Successful"]:
logger.critical(
f"Could not properly retrieve the file metadata for {dfcDir} ... Exiting !"
)
sys.exit(1)
if not infosTomorrow['OK'] or not infosTomorrow['Value']['Successful']:
logger.warning(f"Could not properly retrieve the file metadata for {dfcDirTomorrow} ... Continuing !")
meta = infos['Value']['Successful'][dfcDir]
if not infosTomorrow["OK"] or not infosTomorrow["Value"]["Successful"]:
logger.warning(
f"Could not properly retrieve the file metadata for {dfcDirTomorrow} ... Continuing !"
)
meta = infos["Value"]["Successful"][dfcDir]
try:
metaTomorrow = infosTomorrow['Value']['Successful'][dfcDirTomorrow]
metaTomorrow = infosTomorrow["Value"]["Successful"][dfcDirTomorrow]
except KeyError:
metaTomorrow = None

runlist = []

sqlfilelist = []
for f in meta['Files']:
if f.endswith('.fits.fz'):
run = f.split('NectarCAM.Run')[1].split('.')[0]
for f in meta["Files"]:
if f.endswith(".fits.fz"):
run = f.split("NectarCAM.Run")[1].split(".")[0]
if run not in runlist and run is not None:
runlist.append(run)
if f.endswith('.sqlite'):
if f.endswith(".sqlite"):
sqlfilelist.append(f)
if metaTomorrow:
for f in metaTomorrow['Files']:
if f.endswith('.sqlite'):
for f in metaTomorrow["Files"]:
if f.endswith(".sqlite"):
sqlfilelist.append(f)
if args.run is not None:
if args.run not in runlist:
logger.critical(f'Your specified run {args.run} was not found in {dfcDir}, aborting...')
logger.critical(
f"Your specified run {args.run} was not found in {dfcDir}, aborting..."
)
sys.exit(1)
runlist = [args.run]
logger.info(f'Found runs {runlist} in {dfcDir}')

logger.info(f"Found runs {runlist} in {dfcDir}")

if len(sqlfilelist) == 0:
logger.critical('Could not find any SQLite file in {dfcDir} nor in {dfcDirTomorrow}, aborting...')
logger.critical(
"Could not find any SQLite file in {dfcDir} nor in {dfcDirTomorrow}, aborting..."
)
sys.exit(1)
logger.info(f'Found SQLite files {sqlfilelist} in {dfcDir} and {dfcDirTomorrow}')
logger.info(f"Found SQLite files {sqlfilelist} in {dfcDir} and {dfcDirTomorrow}")

# Now, submit the DIRAC jobs:
# for run in ['2721']:
for run in runlist:
j = Job()
# j.setExecutable(f'{executable_wrapper}', '<SOME POSSIBLE ARGUMENTS such as run number>')
j.setExecutable(f'{executable_wrapper}', f'-r {run}')
j.setExecutable(f"{executable_wrapper}", f"-r {run}")
# Force job to be run from a given Computing Element:
# j.setDestination('LCG.GRIF.fr')
j.setName(f'NectarCAM DQM run {run}')
j.setJobGroup('NectarCAM DQM')
sandboxlist = [f'{executable_wrapper}']
for f in meta['Files']:
if f.endswith('.fits.fz') and f'NectarCAM.Run{run}' in f:
sandboxlist.append(f'LFN:{f}')
j.setName(f"NectarCAM DQM run {run}")
j.setJobGroup("NectarCAM DQM")
sandboxlist = [f"{executable_wrapper}"]
for f in meta["Files"]:
if f.endswith(".fits.fz") and f"NectarCAM.Run{run}" in f:
sandboxlist.append(f"LFN:{f}")
for s in sqlfilelist:
sandboxlist.append(f'LFN:{s}')
sandboxlist.append(f"LFN:{s}")
if len(sandboxlist) < 2:
logger.critical(f'''Misformed sandboxlist, actual data .fits.fz files missing:
logger.critical(
f"""Misformed sandboxlist, actual data .fits.fz files missing:
{sandboxlist}
Aborting...
''')
"""
)
sys.exit(1)
logger.info(f'''Submitting job for run {run}, with the following InputSandbox:
logger.info(
f"""Submitting job for run {run}, with the following InputSandbox:
{sandboxlist}
''')
"""
)
j.setInputSandbox(sandboxlist)

if not args.dry_run:
res = dirac.submitJob(j) # , mode='local') # for local execution, simulating a DIRAC job on the local machine, instead of submitting it to a DIRAC Computing Element
res = dirac.submitJob(
j
) # , mode='local') # for local execution, simulating a DIRAC job on the local machine, instead of submitting it to a DIRAC Computing Element
logger.info(f"Submission Result: {res['Value']}")

0 comments on commit 8cb6ff9

Please sign in to comment.