Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cups.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ def getLatestId( tablename, dstname, run, seg ):

cache="cups.cache"

cupsid=os.getenv('cupsid')
if cupsid and tablename=='production_status':
return cupsid

result = 0
query=f"""
select id,dstname from {tablename} where run={run} and segment={seg} order by id desc limit {MAXDSTNAMES};
Expand Down Expand Up @@ -181,7 +185,6 @@ def started(args):
execution_node='{node}'
where id={id_}
"""
#where dstname='{dstname}' and run={run} and segment={seg} and id={id_}

if args.verbose:
print(update)
Expand Down
78 changes: 47 additions & 31 deletions slurp.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ def fetch_invalid_run_entry( dstname, run, seg ):
def getLatestId( tablename, dstname, run, seg ):

cache="cups.cache"

# We are limiting to the list of all productions for a given run,segment pair.

result = 0
Expand Down Expand Up @@ -362,17 +362,40 @@ def getLatestId( tablename, dstname, run, seg ):

def update_production_status( matching, setup, condor, state ):

# Condor map contains a dictionary keyed on the "output" field of the job description.
# The map contains the cluster ID, the process ID, the arguments, and the output log.
# (This is the condor.stdout log...)
condor_map = {}
for ad in condor:
clusterId = ad['ClusterId']
procId = ad['ProcId']
out = ad['Out'].split('/')[-1] # discard anything that looks like a filepath
ulog = ad['UserLog'].split('/')[-1]
key = ulog.split('.')[0].lower() # lowercase b/c referenced by file basename

condor_map[key]= { 'ClusterId':clusterId, 'ProcId':procId, 'Out':out, 'UserLog':ulog }

name = sphenix_dstname( setup.name, setup.build, setup.dbtag )

for m in matching:
run = int(m['run'])
segment = int(m['seg'])

key = sphenix_base_filename( setup.name, setup.build, setup.dbtag, run, segment )

try:
cluster = condor_map[ key.lower() ][ 'ClusterId' ]
process = condor_map[ key.lower() ][ 'ProcId' ]
except KeyError:
ERROR("Key Error getting cluster and/or process number from the class ads map.")
ERROR(f" key={key}")
#pprint.pprint( condor_map )
ERROR("Assuming this is an issue with condor, setting cluster=0, process=0 and trying to continue...")
cluster=0
process=0

dsttype=setup.name
dstname=setup.name+'_'+setup.build.replace(".","")+'_'+setup.dbtag
#dstfile=dstname+'-%08i-%04i'%(run,segment)
dstfile=( dstname + '-' + RUNFMT + '-' + SEGFMT ) % (run,segment)

# 1s time resolution
Expand All @@ -382,15 +405,15 @@ def update_production_status( matching, setup, condor, state ):

update=f"""
update production_status
set status='{state}',{state}='{timestamp}'
set status='{state}',{state}='{timestamp}',cluster={cluster},process={process}
where id={id_}
"""

statusdbw.execute(update)

statusdbw.commit()

def insert_production_status( matching, setup, condor, state ):
def insert_production_status( matching, setup, condor=[], state='submitting' ):

# Condor map contains a dictionary keyed on the "output" field of the job description.
# The map contains the cluster ID, the process ID, the arguments, and the output log.
Expand All @@ -401,11 +424,8 @@ def insert_production_status( matching, setup, condor, state ):
procId = ad['ProcId']
out = ad['Out'].split('/')[-1] # discard anything that looks like a filepath
ulog = ad['UserLog'].split('/')[-1]
#args = ad['Args']
#key = out.split('.')[0].lower() # lowercase b/c referenced by file basename
key = ulog.split('.')[0].lower() # lowercase b/c referenced by file basename

#condor_map[key]= { 'ClusterId':clusterId, 'ProcId':procId, 'Out':out, 'Args':args, 'UserLog':ulog }
condor_map[key]= { 'ClusterId':clusterId, 'ProcId':procId, 'Out':out, 'UserLog':ulog }


Expand Down Expand Up @@ -437,10 +457,6 @@ def insert_production_status( matching, setup, condor, state ):
cluster = condor_map[ key.lower() ][ 'ClusterId' ]
process = condor_map[ key.lower() ][ 'ProcId' ]
except KeyError:
ERROR("Key Error getting cluster and/or process number from the class ads map.")
ERROR(f" key={key}")
pprint.pprint( condor_map )
ERROR("Assuming this is an issue with condor, setting cluster=0, process=0 and trying to continue...")
cluster = 0
process = 0

Expand All @@ -454,6 +470,7 @@ def insert_production_status( matching, setup, condor, state ):
insert=f"""
insert into production_status
(dsttype, dstname, dstfile, run, segment, nsegments, inputs, prod_id, cluster, process, status, submitting, nevents, submission_host )

values ('{dsttype}','{dstname}','{dstfile}',{run},{segment},0,'{dstfileinput}',{prod_id},{cluster},{process},'{status}', '{timestamp}', 0, '{node}' )
"""

Expand All @@ -470,10 +487,17 @@ def insert_production_status( matching, setup, condor, state ):
(dsttype, dstname, dstfile, run, segment, nsegments, inputs, prod_id, cluster, process, status, submitting, nevents, submission_host )
values
{insvals}

returning id
"""
statusdbw.execute(insert)
statusdbw.commit()

result=[ int(x.id) for x in statusdbw.fetchall() ]

return result





Expand Down Expand Up @@ -529,6 +553,9 @@ def submit( rule, maxjobs, **kwargs ):
INFO("Get the job dictionary")
jobd = rule.job.dict()

# Append $(cupsid) as the last argument
jobd['arguments'] = jobd['arguments'] + ' $(cupsid)'


#
# Make target output directories. We abuse the python string formatting facility
Expand Down Expand Up @@ -605,9 +632,12 @@ def submit( rule, maxjobs, **kwargs ):

run_submit_loop=30
schedd_query = None

# for run_submit_loop in [120,180,300,600]:
# try:

# Insert jobs into the production status table and add the ID to the dictionary
INFO("... insert")
cupsids = insert_production_status( matching, setup, [], state="submitting" )
for i,m in zip(cupsids,mymatching):
m['cupsid']=str(i)

INFO("Submitting the jobs to the cluster")
submit_result = schedd.submit(submit_job, itemdata=iter(mymatching)) # submit one job for each item in the itemdata
Expand All @@ -617,30 +647,16 @@ def submit( rule, maxjobs, **kwargs ):
projection=["ClusterId", "ProcId", "Out", "UserLog", "Args" ]
)

# break # success... break past the else clause
# except htcondor.HTCondorIOError:
#
# WARN(f"Could not submit jobs to condor. Retry in {run_submit_loop} seconds")
# time.sleep( run_submit_loop )
#
# else:
# # Executes after final iteration
# ERROR(f"ERROR: could not submit jobs to condor after several retries")



# Update DB IFF we have a valid submission
INFO("Insert and update the production_status")
if ( schedd_query ):

INFO("... insert")
insert_production_status( matching, setup, schedd_query, state="submitted" )

INFO("... result")
result = submit_result.cluster()
result = submit_result.cluster()

#INFO("... update")
#update_production_status( matching, setup, schedd_query, state="submitted" )
INFO("... update")
update_production_status( matching, setup, schedd_query, state="submitted" )


else:
Expand Down