From b5bdf84d328b638c699756c8cad81aaa39fb12b3 Mon Sep 17 00:00:00 2001 From: d-w-moore Date: Wed, 15 Apr 2026 08:02:05 -0400 Subject: [PATCH 1/7] [_746] a reasonable default sort function for data replicas --- irods/data_object.py | 23 ++++++++++++++++++---- irods/manager/data_object_manager.py | 29 ++++++++++++++++------------ 2 files changed, 36 insertions(+), 16 deletions(-) diff --git a/irods/data_object.py b/irods/data_object.py index 409f7f33e..fc37ccc6c 100644 --- a/irods/data_object.py +++ b/irods/data_object.py @@ -1,8 +1,9 @@ +import ast +import datetime import io -import sys import logging import os -import ast +import sys from irods.models import DataObject from irods.meta import iRODSMetaCollection @@ -41,8 +42,22 @@ def __repr__(self): return "<{}.{} {}>".format(self.__class__.__module__, self.__class__.__name__, self.resource_name) +_REPL_STATUSES = (1, 0, 2, 3, 4) +_REFERENCE_DATETIME = datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) + +def _DEFAULT_SORT_KEY_FN(row): + repl_status = int(row[DataObject.replica_status]) + + repl_status_rank = ( + _REPL_STATUSES.index(repl_status) if _REPL_STATUSES.count(repl_status) + else sys.maxsize + ) + + return (repl_status_rank, _REFERENCE_DATETIME - row[DataObject.modify_time]) + + class iRODSDataObject: - def __init__(self, manager, parent=None, results=None): + def __init__(self, manager, parent=None, results=None, replica_sort_function=None): self.manager = manager if parent and results: self.collection = parent @@ -54,7 +69,7 @@ def __init__(self, manager, parent=None, results=None): # backward compatibility with older schema versions pass self.path = self.collection.path + "/" + self.name - replicas = sorted(results, key=lambda r: r[DataObject.replica_number]) + replicas = sorted(results, key=(replica_sort_function or _DEFAULT_SORT_KEY_FN)) # The status quo before iRODS 5 diff --git a/irods/manager/data_object_manager.py b/irods/manager/data_object_manager.py index 6c2a6fe2d..ef11f9a31 100644 --- a/irods/manager/data_object_manager.py +++ b/irods/manager/data_object_manager.py @@ -233,12 +233,12 @@ def _download(self, obj, local_path, num_threads, updatables=(), **options): raise ex.OVERWRITE_WITHOUT_FORCE_FLAG data_open_returned_values_ = {} - with self.open(obj, "r", returned_values=data_open_returned_values_, **options) as o: + with self.open(obj_path, "r", returned_values=data_open_returned_values_, **options) as o: if self.should_parallelize_transfer(num_threads, o, open_options=options.items()): error = RuntimeError("parallel get failed") try: if not self.parallel_get( - (obj, o), + (obj_path, o), local_file, num_threads=num_threads, target_resource_name=options.get(kw.RESC_NAME_KW, ""), @@ -265,6 +265,8 @@ def get(self, path, local_path=None, num_threads=DEFAULT_NUMBER_OF_THREADS, upda """ parent = self.sess.collections.get(irods_dirname(path)) + replica_sort_function = options.pop('replica_sort_function', None) + # TODO: optimize if local_path: self._download(path, local_path, num_threads=num_threads, updatables=updatables, **options) @@ -284,7 +286,7 @@ def get(self, path, local_path=None, num_threads=DEFAULT_NUMBER_OF_THREADS, upda results = query.all() # get up to max_rows replicas if len(results) <= 0: raise ex.DataObjectDoesNotExist() - return iRODSDataObject(self, parent, results) + return iRODSDataObject(self, parent, results, replica_sort_function=replica_sort_function) @staticmethod def _resolve_force_put_option(options, default_setting=None, true_value=""): @@ -317,23 +319,25 @@ def put( self._resolve_force_put_option(options, default_setting=client_config.data_objects.force_put_by_default) if self.sess.collections.exists(irods_path): - obj = iRODSCollection.normalize_path(irods_path, os.path.basename(local_path)) + obj_path = iRODSCollection.normalize_path(irods_path, os.path.basename(local_path)) else: - obj = irods_path - if kw.FORCE_FLAG_KW not in options and self.exists(obj): + obj_path = irods_path + if kw.FORCE_FLAG_KW not in options and self.exists(obj_path): raise ex.OVERWRITE_WITHOUT_FORCE_FLAG options.pop(kw.FORCE_FLAG_KW, None) + replica_sort_function = options.pop('replica_sort_function',None) + with open(local_path, "rb") as f: sizelist = [] if self.should_parallelize_transfer(num_threads, f, measured_obj_size=sizelist, open_options=options): - o = deferred_call(self.open, (obj, "w"), options) + o = deferred_call(self.open, (obj_path, "w"), options) f.close() error = RuntimeError("parallel put failed") try: if not self.parallel_put( local_path, - (obj, o), + (obj_path, o), total_bytes=sizelist[0], num_threads=num_threads, target_resource_name=options.get(kw.RESC_NAME_KW, "") or options.get(kw.DEST_RESC_NAME_KW, ""), @@ -346,7 +350,7 @@ def put( except BaseException as e: raise error from e else: - with self.open(obj, "w", **options) as o: + with self.open(obj_path, "w", **options) as o: # Set operation type to trigger acPostProcForPut if kw.OPR_TYPE_KW not in options: options[kw.OPR_TYPE_KW] = 1 # PUT_OPR @@ -360,10 +364,10 @@ def put( # Requested to register checksum without verifying, but source replica has a checksum. This can result # in multiple replicas being marked good with different checksums, which is an inconsistency. del repl_options[kw.REG_CHKSUM_KW] - self.replicate(obj, **repl_options) + self.replicate(obj_path, **repl_options) if return_data_object: - return self.get(obj) + return self.get(obj_path, replica_sort_function=replica_sort_function) def chksum(self, path, **options): """ @@ -480,6 +484,7 @@ def create( raise ex.DataObjectExistsAtLogicalPath options = {**options, kw.DATA_TYPE_KW: "generic"} + replica_sort_function = options.pop('replica_sort_function',None) if resource: options[kw.DEST_RESC_NAME_KW] = resource @@ -508,7 +513,7 @@ def create( desc = response.int_info conn.close_file(desc) - return self.get(path) + return self.get(path, replica_sort_function=replica_sort_function) def open_with_FileRaw(self, *arg, **kw_options): holder = [] From 93dae32f3b56b85b8dca723169e0a94a9059a3ff Mon Sep 17 00:00:00 2001 From: d-w-moore Date: Wed, 15 Apr 2026 13:32:50 -0400 Subject: [PATCH 2/7] correction obj_path --- irods/manager/data_object_manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/irods/manager/data_object_manager.py b/irods/manager/data_object_manager.py index ef11f9a31..4e40879c7 100644 --- a/irods/manager/data_object_manager.py +++ b/irods/manager/data_object_manager.py @@ -218,13 +218,13 @@ def should_parallelize_transfer( if size is not None and isinstance(open_options, dict): open_options[kw.DATA_SIZE_KW] = size - def _download(self, obj, local_path, num_threads, updatables=(), **options): + def _download(self, obj_path, local_path, num_threads, updatables=(), **options): """Transfer the contents of a data object to a local file. Called from get() when a local path is named. """ if os.path.isdir(local_path): - local_file = os.path.join(local_path, irods_basename(obj)) + local_file = os.path.join(local_path, irods_basename(obj_path)) else: local_file = local_path From ff2c8dfa0af44908e5d3e807667c55cd06c75cf6 Mon Sep 17 00:00:00 2001 From: d-w-moore Date: Wed, 15 Apr 2026 14:28:21 -0400 Subject: [PATCH 3/7] fix tests --- irods/test/data_obj_test.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/irods/test/data_obj_test.py b/irods/test/data_obj_test.py index f214950a2..13f40c909 100644 --- a/irods/test/data_obj_test.py +++ b/irods/test/data_obj_test.py @@ -1251,10 +1251,12 @@ def test_replica_number(self): # refresh object obj = session.data_objects.get(obj_path) - # assertions on replicas + # assertions on replicas self.assertEqual(len(obj.replicas), number_of_replicas) - for i, replica in enumerate(obj.replicas): - self.assertEqual(replica.number, i) + self.assertEqual( + {repl.number for repl in obj.replicas}, + {*range(len(obj.replicas))} + ) # now trim odd-numbered replicas # note (see irods/irods#4861): COPIES_KW might disappear in the future @@ -1267,10 +1269,7 @@ def test_replica_number(self): obj = session.data_objects.get(obj_path) # check remaining replica numbers - replica_numbers = [] - for replica in obj.replicas: - replica_numbers.append(replica.number) - self.assertEqual(replica_numbers, [0, 2, 4, 6]) + self.assertEqual({r.number for r in obj.replicas}, {0, 2, 4, 6}) # remove object obj.unlink(force=True) @@ -1728,11 +1727,12 @@ def test_get_data_objects(self): self.assertIsNotNone(obj.replicas[1].__getattribute__(i)) # ensure replica info is sensible + replicas = sorted(obj.replicas, key=lambda repl: repl.number) for i in range(2): - self.assertEqual(obj.replicas[i].number, i) - self.assertEqual(obj.replicas[i].status, "1") - self.assertEqual(obj.replicas[i].path.split("/")[-1], filename) - self.assertEqual(obj.replicas[i].resc_hier.split(";")[-1], ufs_resources[i].name) + self.assertEqual(replicas[i].number, i) + self.assertEqual(replicas[i].status, "1") + self.assertEqual(replicas[i].path.split("/")[-1], filename) + self.assertEqual(replicas[i].resc_hier.split(";")[-1], ufs_resources[i].name) self.assertEqual(obj.replicas[0].resource_name, ufs_resources[0].name) if self.sess.server_version < (4, 2, 0): From d2292165ef0e599da7178a433d97ddaee77082fe Mon Sep 17 00:00:00 2001 From: d-w-moore Date: Thu, 16 Apr 2026 01:09:53 -0400 Subject: [PATCH 4/7] sort at the right place in the code --- irods/data_object.py | 9 +++++---- irods/test/data_obj_test.py | 31 ++++++++++++++++++++++++++++++- 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/irods/data_object.py b/irods/data_object.py index fc37ccc6c..4778b116a 100644 --- a/irods/data_object.py +++ b/irods/data_object.py @@ -47,7 +47,7 @@ def __repr__(self): def _DEFAULT_SORT_KEY_FN(row): repl_status = int(row[DataObject.replica_status]) - + repl_status_rank = ( _REPL_STATUSES.index(repl_status) if _REPL_STATUSES.count(repl_status) else sys.maxsize @@ -61,6 +61,8 @@ def __init__(self, manager, parent=None, results=None, replica_sort_function=Non self.manager = manager if parent and results: self.collection = parent + if results: + results = sorted(results, key=(replica_sort_function or _DEFAULT_SORT_KEY_FN)) for attr, value in DataObject.__dict__.items(): if not attr.startswith("_"): try: @@ -69,7 +71,6 @@ def __init__(self, manager, parent=None, results=None, replica_sort_function=Non # backward compatibility with older schema versions pass self.path = self.collection.path + "/" + self.name - replicas = sorted(results, key=(replica_sort_function or _DEFAULT_SORT_KEY_FN)) # The status quo before iRODS 5 @@ -90,13 +91,13 @@ def __init__(self, manager, parent=None, results=None, replica_sort_function=Non modify_time=r[DataObject.modify_time], ), ) - for r in replicas + for r in results ] # Adjust for adding access_time in the iRODS 5 case. if self.manager.sess.server_version >= (5,): - for n, r in enumerate(replicas): + for n, r in enumerate(results): replica_args[n][1]['access_time'] = r[DataObject.access_time] self.replicas = [iRODSReplica(*a, **k) for a, k in replica_args] diff --git a/irods/test/data_obj_test.py b/irods/test/data_obj_test.py index 13f40c909..e17d9a0e7 100644 --- a/irods/test/data_obj_test.py +++ b/irods/test/data_obj_test.py @@ -1251,7 +1251,7 @@ def test_replica_number(self): # refresh object obj = session.data_objects.get(obj_path) - # assertions on replicas + # assertions on replicas self.assertEqual(len(obj.replicas), number_of_replicas) self.assertEqual( {repl.number for repl in obj.replicas}, @@ -2992,6 +2992,35 @@ def test_handling_of_termination_signals_during_multithread_put__issue_722(self) test_put__issue_722(self) + def test_default_sorting_of_replicas__issue_647(self): + basename = unique_name(my_function_name(), datetime.now()) + '_dataobj_647' + with self.create_simple_resc() as newResc1, self.create_simple_resc() as newResc2: + data = helpers.make_object(self.sess, f'{helpers.home_collection(self.sess)}/{basename}') + + # Precondition for an eventual total of 3 replicas: initial data replica is not on either of the new resources. + self.assertFalse({repl.resource_name for repl in data.replicas} & {newResc1, newResc2}) + try: + data.replicate(resource=newResc1) + + # Ensure that one of the replicas is stale, to test proper sorting. + with data.open('a',**{kw.RESC_NAME_KW:newResc1}) as f: + f.write(b'.') + time.sleep(2) + + # Voting should ensure exactly two good replicas of the three. + data.replicate(resource=newResc2) + + # refresh replicas + data = self.sess.data_objects.get(data.path) + + # Test replica sorting. + self.assertEqual(data.replicas[0].status, '1') + self.assertEqual(data.replicas[0].modify_time, data.modify_time) + self.assertGreater(data.replicas[0].modify_time, data.replicas[1].modify_time) + finally: + if data: + data.unlink(force=True) + if __name__ == "__main__": # let the tests find the parent irods lib From 06c70d215081edc8aa2c40ebe18e7eb5b171b65c Mon Sep 17 00:00:00 2001 From: d-w-moore Date: Thu, 16 Apr 2026 01:17:19 -0400 Subject: [PATCH 5/7] concise --- irods/data_object.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/irods/data_object.py b/irods/data_object.py index 4778b116a..ecff2975d 100644 --- a/irods/data_object.py +++ b/irods/data_object.py @@ -61,8 +61,7 @@ def __init__(self, manager, parent=None, results=None, replica_sort_function=Non self.manager = manager if parent and results: self.collection = parent - if results: - results = sorted(results, key=(replica_sort_function or _DEFAULT_SORT_KEY_FN)) + results = sorted(results, key=(replica_sort_function or _DEFAULT_SORT_KEY_FN)) for attr, value in DataObject.__dict__.items(): if not attr.startswith("_"): try: @@ -72,7 +71,7 @@ def __init__(self, manager, parent=None, results=None, replica_sort_function=Non pass self.path = self.collection.path + "/" + self.name - # The status quo before iRODS 5 + # Copy pre-iRODS 5 fields replica_args = [ ( From 4f9e624dee198278fcd9706b9d8316e8afb55b2d Mon Sep 17 00:00:00 2001 From: d-w-moore Date: Thu, 16 Apr 2026 03:37:34 -0400 Subject: [PATCH 6/7] ruff formatting --- irods/data_object.py | 6 ++---- irods/manager/data_object_manager.py | 4 ++-- irods/test/data_obj_test.py | 7 ++----- 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/irods/data_object.py b/irods/data_object.py index ecff2975d..30d2e9457 100644 --- a/irods/data_object.py +++ b/irods/data_object.py @@ -45,13 +45,11 @@ def __repr__(self): _REPL_STATUSES = (1, 0, 2, 3, 4) _REFERENCE_DATETIME = datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) + def _DEFAULT_SORT_KEY_FN(row): repl_status = int(row[DataObject.replica_status]) - repl_status_rank = ( - _REPL_STATUSES.index(repl_status) if _REPL_STATUSES.count(repl_status) - else sys.maxsize - ) + repl_status_rank = _REPL_STATUSES.index(repl_status) if _REPL_STATUSES.count(repl_status) else sys.maxsize return (repl_status_rank, _REFERENCE_DATETIME - row[DataObject.modify_time]) diff --git a/irods/manager/data_object_manager.py b/irods/manager/data_object_manager.py index 4e40879c7..9d8286bda 100644 --- a/irods/manager/data_object_manager.py +++ b/irods/manager/data_object_manager.py @@ -326,7 +326,7 @@ def put( raise ex.OVERWRITE_WITHOUT_FORCE_FLAG options.pop(kw.FORCE_FLAG_KW, None) - replica_sort_function = options.pop('replica_sort_function',None) + replica_sort_function = options.pop('replica_sort_function', None) with open(local_path, "rb") as f: sizelist = [] @@ -484,7 +484,7 @@ def create( raise ex.DataObjectExistsAtLogicalPath options = {**options, kw.DATA_TYPE_KW: "generic"} - replica_sort_function = options.pop('replica_sort_function',None) + replica_sort_function = options.pop('replica_sort_function', None) if resource: options[kw.DEST_RESC_NAME_KW] = resource diff --git a/irods/test/data_obj_test.py b/irods/test/data_obj_test.py index e17d9a0e7..1f0285a4d 100644 --- a/irods/test/data_obj_test.py +++ b/irods/test/data_obj_test.py @@ -1253,10 +1253,7 @@ def test_replica_number(self): # assertions on replicas self.assertEqual(len(obj.replicas), number_of_replicas) - self.assertEqual( - {repl.number for repl in obj.replicas}, - {*range(len(obj.replicas))} - ) + self.assertEqual({repl.number for repl in obj.replicas}, {*range(len(obj.replicas))}) # now trim odd-numbered replicas # note (see irods/irods#4861): COPIES_KW might disappear in the future @@ -3003,7 +3000,7 @@ def test_default_sorting_of_replicas__issue_647(self): data.replicate(resource=newResc1) # Ensure that one of the replicas is stale, to test proper sorting. - with data.open('a',**{kw.RESC_NAME_KW:newResc1}) as f: + with data.open('a', **{kw.RESC_NAME_KW: newResc1}) as f: f.write(b'.') time.sleep(2) From a9c4e99886b74831e9d102542fdb8271c9ad4531 Mon Sep 17 00:00:00 2001 From: d-w-moore Date: Thu, 16 Apr 2026 10:59:42 -0400 Subject: [PATCH 7/7] return to old sort order --- irods/data_object.py | 8 +++++++- irods/test/data_obj_test.py | 3 ++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/irods/data_object.py b/irods/data_object.py index 30d2e9457..c9df283e7 100644 --- a/irods/data_object.py +++ b/irods/data_object.py @@ -46,7 +46,10 @@ def __repr__(self): _REFERENCE_DATETIME = datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc) -def _DEFAULT_SORT_KEY_FN(row): +def _REPLICA_NUMBER_SORT_KEY_FN(row): + return row[DataObject.replica_number] + +def _REPLICA_FITNESS_SORT_KEY_FN(row): repl_status = int(row[DataObject.replica_status]) repl_status_rank = _REPL_STATUSES.index(repl_status) if _REPL_STATUSES.count(repl_status) else sys.maxsize @@ -54,6 +57,9 @@ def _DEFAULT_SORT_KEY_FN(row): return (repl_status_rank, _REFERENCE_DATETIME - row[DataObject.modify_time]) +_DEFAULT_SORT_KEY_FN = _REPLICA_NUMBER_SORT_KEY_FN + + class iRODSDataObject: def __init__(self, manager, parent=None, results=None, replica_sort_function=None): self.manager = manager diff --git a/irods/test/data_obj_test.py b/irods/test/data_obj_test.py index 1f0285a4d..8a52d0729 100644 --- a/irods/test/data_obj_test.py +++ b/irods/test/data_obj_test.py @@ -2989,7 +2989,8 @@ def test_handling_of_termination_signals_during_multithread_put__issue_722(self) test_put__issue_722(self) - def test_default_sorting_of_replicas__issue_647(self): + @unittest.skipIf(irods.version.version_as_tuple() < (4,), 'too soon for this test.') + def test_modified_default_sorting_of_replicas__issue_647(self): basename = unique_name(my_function_name(), datetime.now()) + '_dataobj_647' with self.create_simple_resc() as newResc1, self.create_simple_resc() as newResc2: data = helpers.make_object(self.sess, f'{helpers.home_collection(self.sess)}/{basename}')