Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
82 commits
Select commit Hold shift + click to select a range
3ba7196
Define a 'streamname' and 'streamfile' argument
klendathu2k Sep 9, 2024
77225b5
Parameterize the output filename
klendathu2k Sep 9, 2024
b360159
Single streaming event builder workflow.
klendathu2k Sep 9, 2024
ddd4b30
Run hit unpacked on the single stream outputs.
klendathu2k Sep 10, 2024
bacc9eb
Pull in the clustering step. Still need to rework the input query.
klendathu2k Sep 11, 2024
30ce362
Extend the match with stream name and stream file IF its defined in t…
klendathu2k Oct 15, 2024
9e22241
The output of each job will be different based on the stream name
klendathu2k Oct 16, 2024
ade1181
For optimizing the lookup of existing outputs we will use the dsttype…
klendathu2k Oct 16, 2024
8799f60
...
klendathu2k Oct 16, 2024
fbbb536
... cleanup ...
klendathu2k Oct 16, 2024
a18d1e6
Runnumber, segment number and output file are not used in mapping the…
klendathu2k Oct 16, 2024
f9bc6d6
... cleanup ...
klendathu2k Oct 16, 2024
d895928
A question about how to handle the naming of the production setup.
klendathu2k Oct 16, 2024
bdbd4f9
... cleanup ...
klendathu2k Oct 16, 2024
7d14a24
fetch_production_status ignored the dstname and should never reach th…
klendathu2k Oct 16, 2024
9301fc2
Even if we reached this code, the table already exists. So ...
klendathu2k Oct 16, 2024
eefce5f
We don't need to try to create the table here.
klendathu2k Oct 16, 2024
5d4c1fd
The table exists or the system has not been created properly. The na…
klendathu2k Oct 16, 2024
6ef62a3
Should be better optimized this way.
klendathu2k Oct 16, 2024
df96354
And we can rid ourselves of the unused arguments.
klendathu2k Oct 16, 2024
cc02a85
... cleanup ...
klendathu2k Oct 16, 2024
e4b4e30
file_basename is not used here. Rather we are mapping the dstfile (o…
klendathu2k Oct 16, 2024
71b9266
... simplify ...
klendathu2k Oct 16, 2024
2674679
... cleanup ...
klendathu2k Oct 16, 2024
511d9e5
... cleanup ...
klendathu2k Oct 16, 2024
526fdb8
Cleanup and impose the equal length condition on fc_result and outputs.
klendathu2k Oct 16, 2024
2fe8d39
Not sure why this is showing up as a difference, but commit it separa…
klendathu2k Oct 16, 2024
0223b20
Any job submitted with a stream name will be mapped to the same produ…
klendathu2k Oct 16, 2024
0e831fb
dstname is no longer an argument to this function.
klendathu2k Oct 16, 2024
22355b7
When streamname is provided we expect to need condor substitution for…
klendathu2k Oct 16, 2024
68ad775
Remove unused query.
klendathu2k Oct 16, 2024
bef830f
We no longer have a fixed name... each match varies at the level of s…
klendathu2k Oct 16, 2024
4a92736
... cleanup ...
klendathu2k Oct 16, 2024
a33f04c
Should be better optimized.
klendathu2k Oct 16, 2024
ae3d708
When a streamname is specified, the triplet (run,segment,streamname) …
klendathu2k Oct 16, 2024
d53cf81
Update of the production status depends on the stream name.
klendathu2k Oct 16, 2024
2558f3b
First crack at a 'closeout' dataset.
klendathu2k Oct 23, 2024
49eddf1
Moving the database connections down into the update and fetch functi…
klendathu2k Oct 24, 2024
0e7fa90
At the risk of creating a subroutine, make all status updates follow …
klendathu2k Oct 24, 2024
63a51eb
Ugly as an early return is... this is the logic if I want to log fail…
klendathu2k Oct 24, 2024
b90b732
Define a 'streamname' and 'streamfile' argument
klendathu2k Sep 9, 2024
93a776a
Parameterize the output filename
klendathu2k Sep 9, 2024
6b0112e
Single streaming event builder workflow.
klendathu2k Sep 9, 2024
17f9a27
Run hit unpacked on the single stream outputs.
klendathu2k Sep 10, 2024
3d5dcc5
Pull in the clustering step. Still need to rework the input query.
klendathu2k Sep 11, 2024
202602b
Extend the match with stream name and stream file IF its defined in t…
klendathu2k Oct 15, 2024
a598bcf
The output of each job will be different based on the stream name
klendathu2k Oct 16, 2024
a023725
For optimizing the lookup of existing outputs we will use the dsttype…
klendathu2k Oct 16, 2024
875c18c
...
klendathu2k Oct 16, 2024
f4c3da9
... cleanup ...
klendathu2k Oct 16, 2024
0e6fc63
Runnumber, segment number and output file are not used in mapping the…
klendathu2k Oct 16, 2024
04590f3
... cleanup ...
klendathu2k Oct 16, 2024
a4ed824
A question about how to handle the naming of the production setup.
klendathu2k Oct 16, 2024
52115a2
... cleanup ...
klendathu2k Oct 16, 2024
87818e1
fetch_production_status ignored the dstname and should never reach th…
klendathu2k Oct 16, 2024
ea3d828
Even if we reached this code, the table already exists. So ...
klendathu2k Oct 16, 2024
325a457
We don't need to try to create the table here.
klendathu2k Oct 16, 2024
d1777c6
The table exists or the system has not been created properly. The na…
klendathu2k Oct 16, 2024
487121f
Should be better optimized this way.
klendathu2k Oct 16, 2024
3965df6
And we can rid ourselves of the unused arguments.
klendathu2k Oct 16, 2024
59540de
... cleanup ...
klendathu2k Oct 16, 2024
679d255
file_basename is not used here. Rather we are mapping the dstfile (o…
klendathu2k Oct 16, 2024
3a149c8
... simplify ...
klendathu2k Oct 16, 2024
1b2947a
... cleanup ...
klendathu2k Oct 16, 2024
15d2fb7
... cleanup ...
klendathu2k Oct 16, 2024
615c863
Cleanup and impose the equal length condition on fc_result and outputs.
klendathu2k Oct 16, 2024
2be62bc
Not sure why this is showing up as a difference, but commit it separa…
klendathu2k Oct 16, 2024
86d0c3d
Any job submitted with a stream name will be mapped to the same produ…
klendathu2k Oct 16, 2024
d6c9504
dstname is no longer an argument to this function.
klendathu2k Oct 16, 2024
7e642a2
When streamname is provided we expect to need condor substitution for…
klendathu2k Oct 16, 2024
c6d4442
Remove unused query.
klendathu2k Oct 16, 2024
4da8226
We no longer have a fixed name... each match varies at the level of s…
klendathu2k Oct 16, 2024
933b2fa
... cleanup ...
klendathu2k Oct 16, 2024
8272067
Should be better optimized.
klendathu2k Oct 16, 2024
a9328ec
When a streamname is specified, the triplet (run,segment,streamname) …
klendathu2k Oct 16, 2024
45f14dd
Update of the production status depends on the stream name.
klendathu2k Oct 16, 2024
14a68b9
First crack at a 'closeout' dataset.
klendathu2k Oct 23, 2024
51f5bcc
Merge branch 'single-stream-rebase' of https://github.com/klendathu2k…
klendathu2k Oct 31, 2024
0f86b3e
Modify python path for alma linux
klendathu2k Nov 12, 2024
d8c8e4f
add in cups statistics
klendathu2k Nov 12, 2024
844f165
Saving changes on the local working branch that may not be present in…
pinkenburg Dec 11, 2024
25796cd
Merge branch 'main' into single-stream-rebase
klendathu2k Jan 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions cups.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ def getLatestId( tablename, dstname, run, seg ):
print("[CUPS FATAL]: cupsid is not defined")
exit(0) # operating without a cupsid is now a fatal error


@subcommand()
def info( args ):
start = datetime.datetime.now(datetime.timezone.utc)
Expand All @@ -207,6 +206,24 @@ def info( args ):



def update_production_status( update_query, retries=10, delay=10.0 ):
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a zombie method... removed in previous PRs, but this PR is bringing
it back from the dead.

print(update_query)
for itry in range(0,retries):
time.sleep( delay * (itry + 1 ) * random.random() )
try:
with pyodbc.connect("DSN=ProductionStatusWrite") as statusdb:
curs=statusdb.cursor()
curs.execute(update_query)
curs.commit()
print(f"Applied after {itry+1} attempts")
return
except:
print(f"Failed {itry+1} attempts...")

print("Update failed")



@subcommand()
def started(args):
"""
Expand Down Expand Up @@ -234,7 +251,6 @@ def started(args):

return 'result', ntries, start, finish, ex, nm, sv


@subcommand([
argument( "--nsegments",help="Number of segments produced",dest="nsegments",default=1),
])
Expand All @@ -255,13 +271,13 @@ def running(args):
set status='running',running='{timestamp}',nsegments={nsegments}
where id={id_}
"""

curs, ntries, start, finish, ex, nm, sv = dbQuery( cnxn_string_map[ 'statw' ], update )
if curs:
curs.commit()

return 'result', ntries, start, finish, ex, nm, sv


#_______________________________________________________________________________________________________
@subcommand([
argument("-e","--exit",help="Exit code of the payload macro",dest="exit",default=-1),
Expand All @@ -286,13 +302,13 @@ def finished(args):
state='finished'
if ec>0:
state='failed'
update = None
if args.inc:
update = f"""
update {tablename}
set status='{state}',ended='{timestamp}',nsegments={ns},exit_code={ec},nevents=nevents+{ne}
where id={id_}
"""
# where dstname='{dstname}' and run={run} and segment={seg} and id={id_}
else:
update = f"""
update {tablename}
Expand Down Expand Up @@ -359,7 +375,6 @@ def nevents(args):
set nevents=nevents+{ne}
where id={id_}
"""
# where dstname='{dstname}' and run={run} and segment={seg} and id={id_}
else:
update = f"""
update {tablename}
Expand All @@ -372,7 +387,7 @@ def nevents(args):
curs.commit()

return 'result', ntries, start, finish, ex, nm, sv


@subcommand([
])
Expand Down
30 changes: 9 additions & 21 deletions kaedama.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,8 @@
arg_parser.add_argument( '--print-query',dest='printquery',help="Print the query after parameter substitution and exit", action="store_true", default=False )


# TODO: physics/run2pp/ana449_2024p008/”run range”/DST_TRKR_CLUSTER
#_default_filesystem = {
# 'outdir' : "/sphenix/lustre01/sphnxpro/production/$(runtype)/$(runname)/$(name)/$(build)_$(tag)/run_$(rungroup)"
# , 'logdir' : "file:///sphenix/data/data02/sphnxpro/production/$(runtype)/$(runname)/$(name)/$(build)_$(tag)/run_$(rungroup)"
# , 'histdir' : "/sphenix/data/data02/sphnxpro/production/$(runtype)/$(runname)/$(name)/$(build)_$(tag)/run_$(rungroup)"
# , 'condor' : "/tmp/production/$(runtype)/$(runname)/$(name)/$(build)_$(tag)/run_$(rungroup)"
#}


# TODO: physics/run2pp/ana449_2024p008/”run range”/DST_TRKR_CLUSTER
# runtype runname build_tag runrange DST
#_default_filesystem = {
# 'outdir' : "/sphenix/lustre01/sphnxpro/production/$(runtype)/$(runname)/$(build)_$(tag)/run_$(rungroup)/$(name)"
# , 'logdir' : "file:///sphenix/data/data02/sphnxpro/production/$(runtype)/$(runname)/$(build)_$(tag)/run_$(rungroup)/$(name)"
# , 'histdir' : "/sphenix/data/data02/sphnxpro/production/$(runtype)/$(runname)/$(build)_$(tag)/run_$(rungroup)/$(name)"
# , 'condor' : "/tmp/production/$(runtype)/$(runname)/$(build)_$(tag)/run_$(rungroup)/$(name)"
#}
arg_parser.add_argument( '--streamname', help="Name of the data stream for single-stream jobs" ) #TODO: May not need these arguments...
arg_parser.add_argument( '--streamfile', help="Filename (not incl run number) for the data stream" )

_default_filesystem = {
'outdir' : "/sphenix/lustre01/sphnxpro/production/$(runtype)/$(runname)/$(build)_$(tag)/run_$(rungroup)/{leafdir}"
Expand All @@ -80,7 +65,6 @@
, 'condor' : "/tmp/production/$(runtype)/$(runname)/$(build)_$(tag)/run_$(rungroup)/{leafdir}"
}


def sanity_checks( params, inputq ):
result = True

Expand All @@ -100,9 +84,9 @@ def sanity_checks( params, inputq ):
#

# Name should be of the form DST_NAME_runXauau
if re.match( "[A-Z][A-Z][A-Z]_([A-Z]+_)+[a-z0-9]+", params['name'] ) == None:
logging.error( f'params.name {params["name"]} does not respect the sPHENIX convention: DST_NAME_run<N>species' )
result = False
#if re.match( "[A-Z][A-Z][A-Z]_([A-Z]+_)+[a-z0-9]+", params['name'] ) == None:
# logging.warn( f'params.name {params["name"]} does not respect the sPHENIX convention: DST_NAME_run<N>species' )
# result = False

# Build and dbtag should not contain a "_"
if re.match("_",params['build']):
Expand Down Expand Up @@ -237,6 +221,8 @@ def main():
elif len(args.segments)>=3:
seg_condition = "and segment in ( %s )" % ','.join( args.segments )

streamname = args.streamname
streamfile = args.streamfile

RUNFMT = slurp.RUNFMT
SEGFMT = slurp.SEGFMT
Expand Down Expand Up @@ -277,6 +263,8 @@ def main():

if params:

params['name']=params['name'].format( **locals() )

if args.mangle_dstname:
params['name']=params['name'].replace('DST',args.mangle_dstname)
logging.info(f"DST name is mangled to {params['name']}")
Expand Down
202 changes: 202 additions & 0 deletions production-rules/DST_STREAMING_EVENT_run2pp_ana435_2024p007.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
PHYS_DST_SINGLE_STREAMING_EVENT_run2pp:

params:
name: DST_STREAMING_EVENT_{streamname}_run2pp
build: ana.435
build_name: ana435
dbtag: 2024p007
logbase : $(name)_$(build)_$(tag)-$INT(run,{RUNFMT})-$INT(seg,{SEGFMT})
outbase : $(name)_$(build)_$(tag)
script : run_cosmics.sh
payload : ./slurp-examples/sPHENIX/cosmics/
mem : 20480MB
neventsper: 1000
comment : "---"
rsync : "./slurp-examples/sPHENIX/cosmics/*,cups.py,bachi.py,odbc.ini"

input:
db: daqdb
direct_path: /sphenix/lustre01/sphnxpro/{mode}/*/physics/
query: |-
with partialrun as (
select 'daqdb/filelist' as source ,
runnumber ,
0 as segment ,
string_agg( distinct split_part(filename,'/',-1), ' ' ) as files ,
string_agg( distinct split_part(filename,'/',-1) || ':' || firstevent || ':' || lastevent, ' ' ) as fileranges

from filelist
where
(
(filename like '/bbox%/{streamfile}%-0000.evt' and lastevent>2 ) or
(filename like '/bbox%/GL1_physics%-0000.evt' and lastevent>2 )
)
{run_condition}

group by runnumber
having
every(transferred_to_sdcc) and
max(lastevent)>1000 and

sum( case when filename like '/bbox%/GL1_physics%' then 1 else 0 end )>0 and
sum( case when filename like '/bbox%/{streamfile}%' then 1 else 0 end )>0

order by runnumber
),

fullrun as (
select
'daqdb/filelist' as source ,
runnumber ,
0 as segment ,
string_agg( distinct split_part(filename,'/',-1), ' ' ) as files ,
string_agg( distinct split_part(filename,'/',-1) || ':' || firstevent || ':' || lastevent, ' ' ) as fileranges
from
filelist
where
(
(filename like '/bbox%/{streamfile}%.evt' and lastevent>2 ) or
(filename like '/bbox%/GL1_physics%.evt' and lastevent>2 )

)
{run_condition}

group by runnumber
having
every(transferred_to_sdcc) and
max(lastevent)>1000 and

sum( case when filename like '/bbox%/GL1_physics%' then 1 else 0 end )>0 and
sum( case when filename like '/bbox%/{streamfile}%' then 1 else 0 end )>0

order by runnumber
)

select *,'partial run' as runtype from partialrun where runnumber not in ( select runnumber from fullrun )
union all
select *,'full run' as runtype from fullrun where true

;

# TODO: Need to add error checking to make sure that outdir, logdir, etc... are quoted properly. Else, this will cause problems with argument substitution
filesystem:
outdir : "/sphenix/lustre01/sphnxpro/physics/slurp/streaming/physics/$(build)_$(tag)/run_$(rungroup)"
logdir : "file:///sphenix/data/data02/sphnxpro/streaminglogs/$(build)_$(tag)/run_$(rungroup)"
histdir : "/sphenix/data/data02/sphnxpro/streamhist/$(build)_$(tag)/run_$(rungroup)"
condor : "/tmp/testlogs/$(build)_$(tag)/run_$(rungroup)"

#
# Again I note the need to ensure that the arguments are properly specified given the
# definition of the payload script.
#
job:
executable : "{payload}/run_cosmics.sh"
arguments : "$(nevents) {outbase} {logbase} $(run) $(seg) {outdir} $(build) $(tag) $(inputs) $(ranges) {neventsper} {logdir} {comment} {histdir} {PWD} {rsync}"
output_destination : '{logdir}'
log : '{condor}/{logbase}.condor'
accounting_group : "group_sphenix.mdc2"
accounting_group_user : "sphnxpro"
priority : '4000'
request_xferslots: '0'



#_____________________________________________________________________________________________________________________________

PHYS_DST_SINGLE_TRKR_HIT_SET_physics_2024p007:
# DST_EVENT works from a pre-built set of run lists.
params:
name: DST_TRKR_HIT_{streamname}_run2pp
build: new
build_name: new
dbtag: 2024p007
logbase : $(name)_$(build)_$(tag)-$INT(run,{RUNFMT})-$INT(seg,{SEGFMT})
outbase : $(name)_$(build)_$(tag)
script : run.sh
payload : ./slurp-examples/sPHENIX/TrackingProduction/
mem : 2048MB
rsync : "./slurp-examples/sPHENIX/TrackingProduction/*,cups.py,bachi.py,odbc.ini"

input:
db: fc
query: |-
select
'filecatalog/datasets' as source ,
runnumber ,
segment ,
filename as files ,
'X' as fileranges
from
datasets
where
filename like 'DST_STREAMING_EVENT_{streamname}_run2pp_ana435_2024p007%'
{run_condition}
and runnumber>=49700
order by runnumber
{limit_condition}
;
filesystem:
outdir : "/sphenix/lustre01/sphnxpro/physics/slurp/tracking/$(build)_$(tag)/run_$(rungroup)"
logdir : "file:///sphenix/data/data02/sphnxpro/trackinglogs/$(build)_$(tag)/run_$(rungroup)"
histdir : "/sphenix/data/data02/sphnxpro/hitsethist/$(build)_$(tag)/run_$(rungroup)"
condor : "/tmp/trkrogs/$(build)_$(tag)/run_$(rungroup)"

job:
executable : "{payload}/run.sh"
arguments : "$(nevents) {outbase} {logbase} $(run) $(seg) {outdir} $(build) $(tag) $(inputs) $(ranges) {logdir} {histdir} {PWD} {rsync}"
output_destination : '{logdir}'
log : '{condor}/{logbase}.condor'
accounting_group : "group_sphenix.mdc2"
accounting_group_user : "sphnxpro"
priority : '3800'


#_____________________________________________________________________________________________________________________________

DST_TRKR_CLUSTER_SET_run2pp_2024p007:
# DST_EVENT works from a pre-built set of run lists.
params:
name: DST_TRKR_CLUSTER_run2pp
build: new
build_name: new
dbtag: 2024p007
logbase : $(name)_$(build)_$(tag)-$INT(run,{RUNFMT})-$INT(seg,{SEGFMT})
outbase : $(name)_$(build)_$(tag)
script : run_job0.sh
payload : ./slurp-examples/sPHENIX/TrackingProduction/
mem : 2048MB
nevents : 0
rsync : "./slurp-examples/sPHENIX/TrackingProduction/*,cups.py,bachi.py,odbc.ini"

input:
db: fc
query: |-
select
'filecatalog/datasets' as source ,
runnumber ,
segment ,
filename as files ,
'X' as fileranges
from
datasets
where
filename like 'DST_TRKR_HIT_run2pp_new_2024p007%'
{run_condition}
and runnumber>=49700
order by runnumber
{limit_condition}
;
filesystem:
outdir : "/sphenix/lustre01/sphnxpro/physics/slurp/tracking/$(build)_$(tag)/run_$(rungroup)"
logdir : "file:///sphenix/data/data02/sphnxpro/trackinglogs/$(build)_$(tag)/run_$(rungroup)"
histdir : "/sphenix/data/data02/sphnxpro/clusterhist/$(build)_$(tag)/run_$(rungroup)"
condor : "/tmp/trkrlogs/$(build)_$(tag)/run_$(rungroup)"

job:
executable : "{payload}/run_job0.sh"
arguments : "{nevents} {outbase} {logbase} $(run) $(seg) {outdir} $(build) $(tag) $(inputs) $(ranges) {logdir} {histdir} {PWD} {rsync}"
output_destination : '{logdir}'
log : '{condor}/{logbase}.condor'
accounting_group : "group_sphenix.mdc2"
accounting_group_user : "sphnxpro"
priority : '3800'
2 changes: 2 additions & 0 deletions setup
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ export PATH=${PATH}:${HOME}/bin
export ODBCINI=./odbc.ini

if [[ $OS =~ "Alma" ]]; then

export PATH=/usr/bin:${PATH}
export PYTHONPATH=/opt/sphenix/core/lib/python3.9/site-packages
alias python=/usr/bin/python
Expand All @@ -17,3 +18,4 @@ echo Using $(python --version)




Loading