Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 191 additions & 27 deletions pylabrobot/liquid_handling/backends/hamilton/STAR_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -1432,6 +1432,12 @@ class Head96Information:
"""Z-drive default acceleration (mm/s2)."""
dispensing_drive_speed_default: float = 261.1
"""Dispensing-drive default speed (uL/s)."""
z_speed_range: Tuple[float, float] = (0.25, 100.0)
"""Z-drive speed window (mm/s); unchanged across the 2008/2013/2025 firmware, unlike the
version-resolved y_speed_range."""
z_acceleration_range: Tuple[float, float] = (25.0, 500.0)
"""Z-drive acceleration window (mm/s2); unchanged across the 2008/2013/2025 firmware (the
pre-2010 encoding differs, the physical range does not)."""

# === Encoder resolutions (defaulted device facts). Y/Z are unchanged across firmware; the
# dispensing/squeezer resolutions are the 2013+ generation values (2008-era heads differ). ===
Expand Down Expand Up @@ -8056,7 +8062,9 @@ async def head96_move_to_z_safety(
"requires 96-head firmware version information for safe operation"
)
z_max = self._head96_information.z_range[1]
return await self.head96_move_stop_disk_z(z_max, speed=speed, acceleration=acceleration)
return await self.head96_move_stop_disk_z(
z_max, speed=speed, acceleration=acceleration, retract_on_crash=False
)

@_requires_head96
async def head96_park(
Expand Down Expand Up @@ -8111,17 +8119,25 @@ async def head96_move_x(
async def head96_move_y(
self,
y: float,
speed: float = 300.0,
acceleration: float = 300.0,
speed: Optional[float] = None,
acceleration: Optional[float] = None,
current_protection_limiter: int = 15,
reset_y_parameters: bool = True,
):
"""Move the 96-head to a specified Y-axis coordinate.

An overridden speed/acceleration persists in the drive's volatile register and is inherited by
later moves, so it is reset to the head's default afterwards unless `reset_y_parameters` is False.

Args:
y: Target Y coordinate in mm. Valid range: [93.75, 562.5]
speed: Movement speed in mm/sec. Valid range: [0.78125, 390.625 or 625.0]. Default: 300.0
acceleration: Movement acceleration in mm/sec**2. Valid range: [78.125, 781.25]. Default: 300.0
speed: Movement speed in mm/sec. Valid range: [0.78125, 390.625 or 625.0]; None uses the head's
y_drive_speed_default.
acceleration: Movement acceleration in mm/sec**2. Valid range: [78.125, 781.25]; None uses the
head's y_drive_acceleration_default.
current_protection_limiter: Motor current limit (0-15, hardware units). Default: 15
reset_y_parameters: If True (default), reset an overridden speed/acceleration to the head's
defaults after the move so it does not persist; set False to deliberately keep it.

Returns:
Response from the hardware command.
Expand All @@ -8140,6 +8156,15 @@ async def head96_move_y(
"requires 96-head firmware version information for safe operation"
)

# Reset only what the caller overrode: None means "use the default", which already leaves the
# register at the default, so there is nothing to clean up (no churn on default moves).
restore_speed = reset_y_parameters and speed is not None
restore_acceleration = reset_y_parameters and acceleration is not None
if speed is None:
speed = self._head96_information.y_drive_speed_default
if acceleration is None:
acceleration = self._head96_information.y_drive_acceleration_default

fw_version = self._head96_information.fw_version
y_min, y_max = self._head96_information.y_range
y_speed_min, y_speed_max = self._head96_information.y_speed_range
Expand All @@ -8164,16 +8189,20 @@ async def head96_move_y(
speed_increment = self._head96_y_drive_mm_to_increment(speed)
acceleration_increment = self._head96_y_drive_mm_to_increment(acceleration)

resp = await self.send_command(
module="H0",
command="YA",
ya=f"{y_increment:05}",
yv=f"{speed_increment:05}",
yr=f"{acceleration_increment:05}",
yw=f"{current_protection_limiter:02}",
)

return resp
try:
return await self.send_command(
module="H0",
command="YA",
ya=f"{y_increment:05}",
yv=f"{speed_increment:05}",
yr=f"{acceleration_increment:05}",
yw=f"{current_protection_limiter:02}",
)
finally:
if restore_speed:
await self.head96_set_y_speed(self._head96_information.y_drive_speed_default)
if restore_acceleration:
await self.head96_set_y_acceleration(self._head96_information.y_drive_acceleration_default)

@_requires_head96
async def head96_move_z(
Expand Down Expand Up @@ -8209,12 +8238,19 @@ async def head96_move_stop_disk_z(
speed: Optional[float] = None,
acceleration: Optional[float] = None,
current_protection_limiter: int = 15,
reset_z_parameters: bool = True,
retract_on_crash: bool = True,
):
"""Move the 96-head z-drive (stop disk) to an absolute Z position in mm.

Stop-disk reference, mirroring the single-channel `move_channel_stop_disk_z`: use this for moves
without a tip; for the tip end with a tip on, use `head96_move_tool_z`.

An overridden speed/acceleration persists in the drive's volatile register and would be inherited
by later moves (and C0-level commands), so it is reset to the head's default afterwards unless
`reset_z_parameters` is False. On any firmware error during the move (e.g. the head crashing into
something) the head retracts to Z-safety before the error is re-raised.

Args:
z: Target stop-disk Z in mm. Valid range: Head96Information.z_range (180.5-342.5 mm; FM-STAR
extends it).
Expand All @@ -8223,6 +8259,10 @@ async def head96_move_stop_disk_z(
acceleration: Movement acceleration in mm/sec^2, [25.0, 500.0]; None uses the head's
z_drive_acceleration_default (400 mm/s^2; likewise constant for the Z drive).
current_protection_limiter: Motor current limit (0-15, hardware units). Default: 15
reset_z_parameters: If True (default), reset an overridden speed/acceleration to the head's
defaults after the move so it does not persist; set False to deliberately keep it.
retract_on_crash: If True (default), retract to Z-safety on any firmware error (e.g. a crash)
before re-raising. head96_move_to_z_safety passes False so its own retract cannot recurse.

Returns:
Response from the hardware command.
Expand All @@ -8238,6 +8278,10 @@ async def head96_move_stop_disk_z(
assert self._head96_information is not None, (
"requires 96-head firmware version information for safe operation"
)
# Reset only what the caller actually overrode: a None means "use the default", which already
# leaves the register at the default, so there is nothing to clean up (no churn on default moves).
restore_speed = reset_z_parameters and speed is not None
restore_acceleration = reset_z_parameters and acceleration is not None
if speed is None:
speed = self._head96_information.z_drive_speed_default
if acceleration is None:
Expand All @@ -8248,9 +8292,15 @@ async def head96_move_stop_disk_z(
# Validate parameters before hardware communication. The Z window is firmware/variant-adaptive
# (FM-STAR extends it), so read it from Head96Information rather than hardcoding the legacy range.
z_min, z_max = self._head96_information.z_range
z_speed_min, z_speed_max = self._head96_information.z_speed_range
z_accel_min, z_accel_max = self._head96_information.z_acceleration_range
assert z_min <= z <= z_max, f"z must be between {z_min} and {z_max} mm"
assert 0.25 <= speed <= 100.0, "speed must be between 0.25 and 100.0 mm/sec"
assert 25.0 <= acceleration <= 500.0, "acceleration must be between 25.0 and 500.0 mm/sec**2"
assert z_speed_min <= speed <= z_speed_max, (
f"speed must be between {z_speed_min} and {z_speed_max} mm/sec"
)
assert z_accel_min <= acceleration <= z_accel_max, (
f"acceleration must be between {z_accel_min} and {z_accel_max} mm/sec**2"
)
assert isinstance(current_protection_limiter, int) and (
0 <= current_protection_limiter <= 15
), "current_protection_limiter must be an integer between 0 and 15"
Expand All @@ -8268,16 +8318,31 @@ async def head96_move_stop_disk_z(
self._head96_z_drive_mm_to_increment(acceleration) * acceleration_multiplier
)

resp = await self.send_command(
module="H0",
command="ZA",
za=f"{z_increment:05}",
zv=f"{speed_increment:05}",
zr=f"{acceleration_increment:06}",
zw=f"{current_protection_limiter:02}",
)

return resp
try:
return await self.send_command(
module="H0",
command="ZA",
za=f"{z_increment:05}",
zv=f"{speed_increment:05}",
zr=f"{acceleration_increment:06}",
zw=f"{current_protection_limiter:02}",
)
except STARFirmwareError:
# Any firmware error here (most importantly a Z-drive crash) can leave the head against an
# obstacle, so retract to Z-safety before re-raising. head96_move_to_z_safety calls back into
# this method with retract_on_crash=False, so the retract cannot recurse into recovery.
if retract_on_crash:
try:
# retract slowly (quarter of max speed) - the head may be in liquid after a crash
await self.head96_move_to_z_safety(speed=self._head96_information.z_speed_range[1] * 0.25)
except STARFirmwareError:
pass # retract failed too; surface the original error below
raise
finally:
if restore_speed:
await self.head96_set_z_speed(self._head96_information.z_drive_speed_default)
if restore_acceleration:
await self.head96_set_z_acceleration(self._head96_information.z_drive_acceleration_default)

@_requires_head96
async def head96_move_tool_z(self, z: float, speed: Optional[float] = None):
Expand Down Expand Up @@ -8322,6 +8387,68 @@ async def head96_move_tool_z(self, z: float, speed: Optional[float] = None):

return await self.head96_move_stop_disk_z(z + tip_overhang, speed=speed)

@_requires_head96
async def head96_set_y_speed(self, speed: float):
"""Set the persistent 96-head Y-drive speed (mm/s) without moving, via H0 AA (save parameter yv).

Subsequent Y moves that don't pass their own speed - including the C0-level 96-head commands -
inherit this until it is changed or the drive re-initialises. Same standalone-set mechanism as
slow_iswap (which sets the iSWAP wv/tv velocities via R0 AA).
"""
assert self._head96_information is not None, (
"requires 96-head firmware version information for safe operation"
)
y_speed_min, y_speed_max = self._head96_information.y_speed_range
assert y_speed_min <= speed <= y_speed_max, (
f"speed must be between {y_speed_min} and {y_speed_max} mm/sec"
)
return await self.send_command(
module="H0", command="AA", yv=f"{self._head96_y_drive_mm_to_increment(speed):05}"
)

@_requires_head96
async def head96_set_y_acceleration(self, acceleration: float):
"""Set the persistent 96-head Y-drive acceleration (mm/s^2) without moving, via H0 AA (save yr)."""
assert 78.125 <= acceleration <= 781.25, (
"acceleration must be between 78.125 and 781.25 mm/sec**2"
)
return await self.send_command(
module="H0", command="AA", yr=f"{self._head96_y_drive_mm_to_increment(acceleration):05}"
)

@_requires_head96
async def head96_set_z_speed(self, speed: float):
"""Set the persistent 96-head Z-drive speed (mm/s) without moving, via H0 AA (save parameter zv)."""
assert self._head96_information is not None, (
"requires 96-head firmware version information for safe operation"
)
z_speed_min, z_speed_max = self._head96_information.z_speed_range
assert z_speed_min <= speed <= z_speed_max, (
f"speed must be between {z_speed_min} and {z_speed_max} mm/sec"
)
return await self.send_command(
module="H0", command="AA", zv=f"{self._head96_z_drive_mm_to_increment(speed):05}"
)

@_requires_head96
async def head96_set_z_acceleration(self, acceleration: float):
"""Set the persistent 96-head Z-drive acceleration (mm/s^2) without moving, via H0 AA (save zr).

Applies the same firmware-version acceleration scaling as head96_move_stop_disk_z (pre-2010 x0.001).
"""
assert self._head96_information is not None, (
"requires 96-head firmware version information for safe operation"
)
z_accel_min, z_accel_max = self._head96_information.z_acceleration_range
assert z_accel_min <= acceleration <= z_accel_max, (
f"acceleration must be between {z_accel_min} and {z_accel_max} mm/sec**2"
)
acceleration_multiplier = 1 if self._head96_information.fw_version.year >= 2010 else 0.001
acceleration_increment = round(
self._head96_z_drive_mm_to_increment(acceleration) * acceleration_multiplier
)
return await self.send_command(module="H0", command="AA", zr=f"{acceleration_increment:06}")

# -------------- 3.10.2 Tip handling using CoRe 96 Head --------------

@need_iswap_parked
Expand Down Expand Up @@ -9277,6 +9404,43 @@ async def _head96_probe_z_max(self) -> float:
await self.send_command(module="C0", command="EV", read_timeout=20)
return await self.head96_request_stop_disk_z()

async def head96_request_y_speed(self) -> float:
"""Request the persistent 96-head Y-drive speed (mm/s), via H0 RA (read parameter yv).

The read counterpart of head96_set_y_speed.
"""
resp = await self.send_command(module="H0", command="RA", ra="yv", fmt="yv#####")
return self._head96_y_drive_increment_to_mm(resp["yv"])

async def head96_request_y_acceleration(self) -> float:
"""Request the persistent 96-head Y-drive acceleration (mm/s^2), via H0 RA (read parameter yr).

The read counterpart of head96_set_y_acceleration.
"""
resp = await self.send_command(module="H0", command="RA", ra="yr", fmt="yr#####")
return self._head96_y_drive_increment_to_mm(resp["yr"])

async def head96_request_z_speed(self) -> float:
"""Request the persistent 96-head Z-drive speed (mm/s), via H0 RA (read parameter zv).

The read counterpart of head96_set_z_speed.
"""
resp = await self.send_command(module="H0", command="RA", ra="zv", fmt="zv#####")
return self._head96_z_drive_increment_to_mm(resp["zv"])

async def head96_request_z_acceleration(self) -> float:
"""Request the persistent 96-head Z-drive acceleration (mm/s^2), via H0 RA (read parameter zr).

The read counterpart of head96_set_z_acceleration; inverts the firmware-version acceleration
scaling that the setter (and head96_move_stop_disk_z) applies.
"""
assert self._head96_information is not None, (
"requires 96-head firmware version information for safe operation"
)
resp = await self.send_command(module="H0", command="RA", ra="zr", fmt="zr######")
acceleration_multiplier = 1 if self._head96_information.fw_version.year >= 2010 else 0.001
return self._head96_z_drive_increment_to_mm(round(resp["zr"] / acceleration_multiplier))

async def request_core_96_head_channel_tadm_status(self):
"""Request CoRe 96 Head channel TADM Status

Expand Down
69 changes: 69 additions & 0 deletions pylabrobot/liquid_handling/backends/hamilton/STAR_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,75 @@ def test_version_resolved_default_falls_back_for_pre_2010_firmware(self):
)


class TestHead96CrashRecovery(unittest.IsolatedAsyncioTestCase):
"""head96_move_stop_disk_z retracts the head to Z-safety on a firmware error then re-raises, and
the retract - which routes back through the same primitive - cannot recurse."""

async def asyncSetUp(self):
self.cb = STARChatterboxBackend()
self.cb.set_deck(STARLetDeck())
await self.cb.setup()
assert self.cb._head96_information is not None
z_min, z_max = self.cb._head96_information.z_range
self.z_target = round((z_min + z_max) / 2, 1)
self.z_safety_za = f"{self.cb._head96_z_drive_mm_to_increment(z_max):05}"

def _crash(self, message):
return STARFirmwareError(
errors={
"CoRe 96 Head": UnknownHamiltonError(
message=message, trace_information=62, raw_response=message, raw_module="H0"
)
},
raw_response=message,
)

async def test_crash_retracts_to_z_safety_then_reraises(self):
"""A ZA firmware error retracts the head to z_range[1] (a second ZA) before the original error
propagates."""
original = self._crash("z drive movement error")
za_targets = []

async def fake_send(module, command, **kwargs):
if command == "ZA":
za_targets.append(kwargs["za"])
if len(za_targets) == 1:
raise original # the move crashes
return {} # the safety retract succeeds
return {} # AA restore etc.

self.cb.send_command = unittest.mock.AsyncMock(side_effect=fake_send)
with self.assertRaises(STARFirmwareError) as ctx:
await self.cb.head96_move_stop_disk_z(self.z_target)

self.assertIs(ctx.exception, original)
move_za = f"{self.cb._head96_z_drive_mm_to_increment(self.z_target):05}"
self.assertEqual(za_targets, [move_za, self.z_safety_za])

async def test_retract_that_also_crashes_does_not_recurse(self):
"""If the safety retract itself errors, exactly two ZA moves are sent (no recursion) and the
ORIGINAL error re-raises, not the retract's."""
original = self._crash("original crash")
retract_err = self._crash("retract crash")
za_count = 0

async def fake_send(module, command, **kwargs):
nonlocal za_count
if command == "ZA":
za_count += 1
if za_count == 1:
raise original
raise retract_err
return {}

self.cb.send_command = unittest.mock.AsyncMock(side_effect=fake_send)
with self.assertRaises(STARFirmwareError) as ctx:
await self.cb.head96_move_stop_disk_z(self.z_target)

self.assertIs(ctx.exception, original)
self.assertEqual(za_count, 2)


class TestiSWAPYMaxBootstrap(unittest.IsolatedAsyncioTestCase):
"""`_iswap_rotation_drive_request_y_max` runs during setup, before
`iswap_information` exists, so it must not read it (regression: it used to,
Expand Down
Loading