Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions irods/data_object.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import ast

Check failure on line 1 in irods/data_object.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-check

Ruff D100

D100: Missing docstring in public module [pydocstyle:undocumented-public-module]
import datetime
import io
import sys
import logging
import os
import ast
import sys

from irods.models import DataObject
from irods.meta import iRODSMetaCollection
Expand Down Expand Up @@ -41,11 +42,30 @@
return "<{}.{} {}>".format(self.__class__.__module__, self.__class__.__name__, self.resource_name)


_REPL_STATUSES = (1, 0, 2, 3, 4)
_REFERENCE_DATETIME = datetime.datetime(1970, 1, 1, 0, 0, 0, tzinfo=datetime.timezone.utc)


def _REPLICA_NUMBER_SORT_KEY_FN(row):

Check failure on line 49 in irods/data_object.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-check

Ruff N802

N802: Function name `_REPLICA_NUMBER_SORT_KEY_FN` should be lowercase [pep8-naming:invalid-function-name]
return row[DataObject.replica_number]

def _REPLICA_FITNESS_SORT_KEY_FN(row):

Check failure on line 52 in irods/data_object.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-format

Ruff format

Improper formatting

Check failure on line 52 in irods/data_object.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-check

Ruff N802

N802: Function name `_REPLICA_FITNESS_SORT_KEY_FN` should be lowercase [pep8-naming:invalid-function-name]
repl_status = int(row[DataObject.replica_status])

repl_status_rank = _REPL_STATUSES.index(repl_status) if _REPL_STATUSES.count(repl_status) else sys.maxsize

return (repl_status_rank, _REFERENCE_DATETIME - row[DataObject.modify_time])


_DEFAULT_SORT_KEY_FN = _REPLICA_NUMBER_SORT_KEY_FN


class iRODSDataObject:
def __init__(self, manager, parent=None, results=None):
def __init__(self, manager, parent=None, results=None, replica_sort_function=None):

Check failure on line 64 in irods/data_object.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-check

Ruff D107

D107: Missing docstring in `__init__` [pydocstyle:undocumented-public-init]
self.manager = manager
if parent and results:
self.collection = parent
results = sorted(results, key=(replica_sort_function or _DEFAULT_SORT_KEY_FN))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only way to support this in a minor release is to provide an opt-in which changes the default behavior.

for attr, value in DataObject.__dict__.items():
if not attr.startswith("_"):
try:
Expand All @@ -54,9 +74,8 @@
# backward compatibility with older schema versions
pass
self.path = self.collection.path + "/" + self.name
replicas = sorted(results, key=lambda r: r[DataObject.replica_number])

# The status quo before iRODS 5
# Copy pre-iRODS 5 fields

replica_args = [
(
Expand All @@ -75,13 +94,13 @@
modify_time=r[DataObject.modify_time],
),
)
for r in replicas
for r in results
]

# Adjust for adding access_time in the iRODS 5 case.

if self.manager.sess.server_version >= (5,):
for n, r in enumerate(replicas):
for n, r in enumerate(results):
replica_args[n][1]['access_time'] = r[DataObject.access_time]
self.replicas = [iRODSReplica(*a, **k) for a, k in replica_args]

Expand Down
33 changes: 19 additions & 14 deletions irods/manager/data_object_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,13 @@
if size is not None and isinstance(open_options, dict):
open_options[kw.DATA_SIZE_KW] = size

def _download(self, obj, local_path, num_threads, updatables=(), **options):
def _download(self, obj_path, local_path, num_threads, updatables=(), **options):
"""Transfer the contents of a data object to a local file.

Called from get() when a local path is named.
"""
if os.path.isdir(local_path):
local_file = os.path.join(local_path, irods_basename(obj))
local_file = os.path.join(local_path, irods_basename(obj_path))

Check failure on line 227 in irods/manager/data_object_manager.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-check

Ruff PTH118

PTH118: `os.path.join()` should be replaced by `Path` with `/` operator [flake8-use-pathlib:os-path-join]
else:
local_file = local_path

Check failure on line 229 in irods/manager/data_object_manager.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-check

Ruff SIM108

SIM108: Use ternary operator `local_file = os.path.join(local_path, irods_basename(obj_path)) if os.path.isdir(local_path) else local_path` instead of `if`-`else`-block [flake8-simplify:if-else-block-instead-of-if-exp]

Expand All @@ -233,12 +233,12 @@
raise ex.OVERWRITE_WITHOUT_FORCE_FLAG

data_open_returned_values_ = {}
with self.open(obj, "r", returned_values=data_open_returned_values_, **options) as o:
with self.open(obj_path, "r", returned_values=data_open_returned_values_, **options) as o:
if self.should_parallelize_transfer(num_threads, o, open_options=options.items()):
error = RuntimeError("parallel get failed")
try:
if not self.parallel_get(
(obj, o),
(obj_path, o),
local_file,
num_threads=num_threads,
target_resource_name=options.get(kw.RESC_NAME_KW, ""),
Expand All @@ -265,6 +265,8 @@
"""
parent = self.sess.collections.get(irods_dirname(path))

replica_sort_function = options.pop('replica_sort_function', None)

# TODO: optimize
if local_path:
self._download(path, local_path, num_threads=num_threads, updatables=updatables, **options)
Expand All @@ -284,7 +286,7 @@
results = query.all() # get up to max_rows replicas
if len(results) <= 0:
raise ex.DataObjectDoesNotExist()
return iRODSDataObject(self, parent, results)
return iRODSDataObject(self, parent, results, replica_sort_function=replica_sort_function)

@staticmethod
def _resolve_force_put_option(options, default_setting=None, true_value=""):
Expand All @@ -304,36 +306,38 @@
else:
del options[kw.FORCE_FLAG_KW]

def put(
self,
local_path,
irods_path,
return_data_object=False,
num_threads=DEFAULT_NUMBER_OF_THREADS,
updatables=(),
**options,
):
# Decide if a put option should be used and modify options accordingly.
self._resolve_force_put_option(options, default_setting=client_config.data_objects.force_put_by_default)

if self.sess.collections.exists(irods_path):
obj = iRODSCollection.normalize_path(irods_path, os.path.basename(local_path))
obj_path = iRODSCollection.normalize_path(irods_path, os.path.basename(local_path))

Check failure on line 322 in irods/manager/data_object_manager.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-check

Ruff PTH119

PTH119: `os.path.basename()` should be replaced by `Path.name` [flake8-use-pathlib:os-path-basename]
else:
obj = irods_path
if kw.FORCE_FLAG_KW not in options and self.exists(obj):
obj_path = irods_path
if kw.FORCE_FLAG_KW not in options and self.exists(obj_path):
raise ex.OVERWRITE_WITHOUT_FORCE_FLAG
options.pop(kw.FORCE_FLAG_KW, None)

replica_sort_function = options.pop('replica_sort_function', None)

with open(local_path, "rb") as f:
sizelist = []
if self.should_parallelize_transfer(num_threads, f, measured_obj_size=sizelist, open_options=options):
o = deferred_call(self.open, (obj, "w"), options)
o = deferred_call(self.open, (obj_path, "w"), options)
f.close()
error = RuntimeError("parallel put failed")
try:
if not self.parallel_put(
local_path,
(obj, o),
(obj_path, o),
total_bytes=sizelist[0],
num_threads=num_threads,
target_resource_name=options.get(kw.RESC_NAME_KW, "") or options.get(kw.DEST_RESC_NAME_KW, ""),
Expand All @@ -346,7 +350,7 @@
except BaseException as e:
raise error from e
else:
with self.open(obj, "w", **options) as o:
with self.open(obj_path, "w", **options) as o:
# Set operation type to trigger acPostProcForPut
if kw.OPR_TYPE_KW not in options:
options[kw.OPR_TYPE_KW] = 1 # PUT_OPR
Expand All @@ -360,10 +364,10 @@
# Requested to register checksum without verifying, but source replica has a checksum. This can result
# in multiple replicas being marked good with different checksums, which is an inconsistency.
del repl_options[kw.REG_CHKSUM_KW]
self.replicate(obj, **repl_options)
self.replicate(obj_path, **repl_options)

if return_data_object:
return self.get(obj)
return self.get(obj_path, replica_sort_function=replica_sort_function)

Check failure on line 370 in irods/manager/data_object_manager.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-check

Ruff RET503

RET503: Missing explicit `return` at the end of function able to return non-`None` value [flake8-return:implicit-return]

def chksum(self, path, **options):
"""
Expand Down Expand Up @@ -480,6 +484,7 @@
raise ex.DataObjectExistsAtLogicalPath

options = {**options, kw.DATA_TYPE_KW: "generic"}
replica_sort_function = options.pop('replica_sort_function', None)

if resource:
options[kw.DEST_RESC_NAME_KW] = resource
Expand Down Expand Up @@ -508,7 +513,7 @@
desc = response.int_info
conn.close_file(desc)

return self.get(path)
return self.get(path, replica_sort_function=replica_sort_function)

def open_with_FileRaw(self, *arg, **kw_options):
holder = []
Expand Down
47 changes: 37 additions & 10 deletions irods/test/data_obj_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1253,8 +1253,7 @@

# assertions on replicas
self.assertEqual(len(obj.replicas), number_of_replicas)
for i, replica in enumerate(obj.replicas):
self.assertEqual(replica.number, i)
self.assertEqual({repl.number for repl in obj.replicas}, {*range(len(obj.replicas))})

# now trim odd-numbered replicas
# note (see irods/irods#4861): COPIES_KW might disappear in the future
Expand All @@ -1267,10 +1266,7 @@
obj = session.data_objects.get(obj_path)

# check remaining replica numbers
replica_numbers = []
for replica in obj.replicas:
replica_numbers.append(replica.number)
self.assertEqual(replica_numbers, [0, 2, 4, 6])
self.assertEqual({r.number for r in obj.replicas}, {0, 2, 4, 6})

# remove object
obj.unlink(force=True)
Expand Down Expand Up @@ -1728,11 +1724,12 @@
self.assertIsNotNone(obj.replicas[1].__getattribute__(i))

# ensure replica info is sensible
replicas = sorted(obj.replicas, key=lambda repl: repl.number)
for i in range(2):
self.assertEqual(obj.replicas[i].number, i)
self.assertEqual(obj.replicas[i].status, "1")
self.assertEqual(obj.replicas[i].path.split("/")[-1], filename)
self.assertEqual(obj.replicas[i].resc_hier.split(";")[-1], ufs_resources[i].name)
self.assertEqual(replicas[i].number, i)
self.assertEqual(replicas[i].status, "1")
self.assertEqual(replicas[i].path.split("/")[-1], filename)
self.assertEqual(replicas[i].resc_hier.split(";")[-1], ufs_resources[i].name)

self.assertEqual(obj.replicas[0].resource_name, ufs_resources[0].name)
if self.sess.server_version < (4, 2, 0):
Expand Down Expand Up @@ -2992,6 +2989,36 @@

test_put__issue_722(self)

@unittest.skipIf(irods.version.version_as_tuple() < (4,), 'too soon for this test.')
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider changing the message to something like the following.

Relies on backward incompatible changes. Disabled until PRC 4

def test_modified_default_sorting_of_replicas__issue_647(self):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we need another test for the sorter option?

Alternatively, you can change the behavior of the test such that it covers PRC 3 and PRC 4. For example:

if irods.version.version_as_tuple() < (4,):
    data = self.sess.data_objects.get(data.path, sorter=<fn>)
else:
    data = self.sess.data_objects.get(data.path)

Doing that implies the name of the test would need to change as well.

basename = unique_name(my_function_name(), datetime.now()) + '_dataobj_647'

Check failure on line 2994 in irods/test/data_obj_test.py

View workflow job for this annotation

GitHub Actions / ruff-lint / ruff-check

Ruff DTZ005

DTZ005: `datetime.datetime.now()` called without a `tz` argument [flake8-datetimez:call-datetime-now-without-tzinfo]
with self.create_simple_resc() as newResc1, self.create_simple_resc() as newResc2:
data = helpers.make_object(self.sess, f'{helpers.home_collection(self.sess)}/{basename}')

# Precondition for an eventual total of 3 replicas: initial data replica is not on either of the new resources.
self.assertFalse({repl.resource_name for repl in data.replicas} & {newResc1, newResc2})
try:
data.replicate(resource=newResc1)

# Ensure that one of the replicas is stale, to test proper sorting.
with data.open('a', **{kw.RESC_NAME_KW: newResc1}) as f:
f.write(b'.')
time.sleep(2)

# Voting should ensure exactly two good replicas of the three.
data.replicate(resource=newResc2)
Comment on lines +3003 to +3009
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add an assertion which proves a replica is stale.


# refresh replicas
data = self.sess.data_objects.get(data.path)

# Test replica sorting.
self.assertEqual(data.replicas[0].status, '1')
self.assertEqual(data.replicas[0].modify_time, data.modify_time)
self.assertGreater(data.replicas[0].modify_time, data.replicas[1].modify_time)
finally:
if data:
data.unlink(force=True)


if __name__ == "__main__":
# let the tests find the parent irods lib
Expand Down
Loading