From d8a20311b609b75274be0e03759c8044f7430710 Mon Sep 17 00:00:00 2001 From: Keith Kraus Date: Tue, 28 Apr 2026 12:18:26 -0400 Subject: [PATCH 01/10] Add CUDA process checkpointing helpers --- cuda_core/cuda/core/__init__.py | 2 +- cuda_core/cuda/core/checkpoint.py | 182 ++++++++++++++++ cuda_core/docs/source/api.rst | 16 ++ cuda_core/docs/source/release/1.0.0-notes.rst | 5 +- cuda_core/tests/test_checkpoint.py | 203 ++++++++++++++++++ 5 files changed, 406 insertions(+), 2 deletions(-) create mode 100644 cuda_core/cuda/core/checkpoint.py create mode 100644 cuda_core/tests/test_checkpoint.py diff --git a/cuda_core/cuda/core/__init__.py b/cuda_core/cuda/core/__init__.py index dfd52accea3..3152c9ceacf 100644 --- a/cuda_core/cuda/core/__init__.py +++ b/cuda_core/cuda/core/__init__.py @@ -28,7 +28,7 @@ def _import_versioned_module(): del _import_versioned_module -from cuda.core import system, utils +from cuda.core import checkpoint, system, utils from cuda.core._device import Device from cuda.core._event import Event, EventOptions from cuda.core._graphics import GraphicsResource diff --git a/cuda_core/cuda/core/checkpoint.py b/cuda_core/cuda/core/checkpoint.py new file mode 100644 index 00000000000..ad0be778974 --- /dev/null +++ b/cuda_core/cuda/core/checkpoint.py @@ -0,0 +1,182 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +from collections.abc import Mapping as _Mapping +from dataclasses import dataclass as _dataclass +from enum import IntEnum as _IntEnum +from typing import Any as _Any + +from cuda.core._utils.cuda_utils import handle_return as _handle_cuda_return + +try: + from cuda.bindings import driver as _driver +except ImportError: + from cuda import cuda as _driver + + +class ProcessState(_IntEnum): + """ + CUDA checkpoint state for a process. + """ + + RUNNING = 0 + LOCKED = 1 + CHECKPOINTED = 2 + FAILED = 3 + + +@_dataclass(frozen=True) +class Process: + """ + CUDA process that can be locked, checkpointed, restored, and unlocked. + + Parameters + ---------- + pid : int + Process ID of the CUDA process. + """ + + pid: int + + def __post_init__(self): + _check_pid(self.pid) + + @property + def state(self) -> ProcessState: + """ + CUDA checkpoint state for this process. + """ + driver = _get_driver() + state = _handle_return(driver, driver.cuCheckpointProcessGetState(self.pid)) + return ProcessState(int(state)) + + @property + def restore_thread_id(self) -> int: + """ + CUDA restore thread ID for this process. + """ + driver = _get_driver() + return _handle_return(driver, driver.cuCheckpointProcessGetRestoreThreadId(self.pid)) + + def lock(self, timeout_ms: int = 0) -> None: + """ + Lock this process, blocking further CUDA API calls. + + Parameters + ---------- + timeout_ms : int, optional + Timeout in milliseconds. A value of 0 indicates no timeout. + """ + driver = _get_driver() + args = driver.CUcheckpointLockArgs() + args.timeoutMs = _check_timeout_ms(timeout_ms) + _handle_return(driver, driver.cuCheckpointProcessLock(self.pid, args)) + + def checkpoint(self) -> None: + """ + Checkpoint the GPU memory contents of this locked process. + """ + driver = _get_driver() + _handle_return(driver, driver.cuCheckpointProcessCheckpoint(self.pid, None)) + + def restore(self, gpu_mapping: _Mapping[_Any, _Any] | None = None) -> None: + """ + Restore this checkpointed process. + + Parameters + ---------- + gpu_mapping : mapping, optional + GPU UUID remapping from each checkpointed GPU UUID to the GPU UUID + to restore onto. If provided, the mapping must contain every + checkpointed GPU UUID. + """ + driver = _get_driver() + args = _make_restore_args(driver, gpu_mapping) + _handle_return(driver, driver.cuCheckpointProcessRestore(self.pid, args)) + + def unlock(self) -> None: + """ + Unlock this locked process so it can resume CUDA API calls. + """ + driver = _get_driver() + _handle_return(driver, driver.cuCheckpointProcessUnlock(self.pid, None)) + + +def _get_driver(): + required = ( + "cuCheckpointProcessCheckpoint", + "cuCheckpointProcessGetRestoreThreadId", + "cuCheckpointProcessGetState", + "cuCheckpointProcessLock", + "cuCheckpointProcessRestore", + "cuCheckpointProcessUnlock", + "CUcheckpointGpuPair", + "CUcheckpointLockArgs", + "CUcheckpointRestoreArgs", + ) + missing = [name for name in required if not hasattr(_driver, name)] + if missing: + raise RuntimeError( + f"CUDA checkpointing requires cuda.bindings with CUDA checkpoint API support. Missing: {', '.join(missing)}" + ) + return _driver + + +def _handle_return(driver, result): + err = result[0] + not_supported_errors = ( + getattr(driver.CUresult, "CUDA_ERROR_NOT_FOUND", None), + getattr(driver.CUresult, "CUDA_ERROR_NOT_SUPPORTED", None), + ) + if err in not_supported_errors: + raise RuntimeError( + "CUDA checkpointing is not supported by the installed NVIDIA driver. " + "Upgrade to a driver version with CUDA checkpoint API support." + ) + + return _handle_cuda_return(result) + + +def _check_pid(pid: int) -> int: + if isinstance(pid, bool) or not isinstance(pid, int): + raise TypeError("pid must be an int") + if pid <= 0: + raise ValueError("pid must be a positive int") + return pid + + +def _check_timeout_ms(timeout_ms: int) -> int: + if isinstance(timeout_ms, bool) or not isinstance(timeout_ms, int): + raise TypeError("timeout_ms must be an int") + if timeout_ms < 0: + raise ValueError("timeout_ms must be >= 0") + return timeout_ms + + +def _make_restore_args(driver, gpu_mapping: _Mapping[_Any, _Any] | None): + if gpu_mapping is None: + return None + if not isinstance(gpu_mapping, _Mapping): + raise TypeError("gpu_mapping must be a mapping from checkpointed GPU UUID to restore GPU UUID") + + pairs = [] + for old_uuid, new_uuid in gpu_mapping.items(): + pair = driver.CUcheckpointGpuPair() + pair.oldUuid = old_uuid + pair.newUuid = new_uuid + pairs.append(pair) + + if not pairs: + return None + + args = driver.CUcheckpointRestoreArgs() + args.gpuPairs = pairs + args.gpuPairsCount = len(pairs) + return args + + +__all__ = [ + "Process", + "ProcessState", +] diff --git a/cuda_core/docs/source/api.rst b/cuda_core/docs/source/api.rst index 88780732d54..5d7efdb2d17 100644 --- a/cuda_core/docs/source/api.rst +++ b/cuda_core/docs/source/api.rst @@ -174,6 +174,22 @@ CUDA compilation toolchain LinkerOptions +CUDA process checkpointing +-------------------------- + +.. autosummary:: + :toctree: generated/ + + :template: class.rst + + checkpoint.Process + +.. autosummary:: + :toctree: generated/ + + checkpoint.ProcessState + + CUDA system information and NVIDIA Management Library (NVML) ------------------------------------------------------------ diff --git a/cuda_core/docs/source/release/1.0.0-notes.rst b/cuda_core/docs/source/release/1.0.0-notes.rst index 34eff571005..13e1430ee23 100644 --- a/cuda_core/docs/source/release/1.0.0-notes.rst +++ b/cuda_core/docs/source/release/1.0.0-notes.rst @@ -16,7 +16,10 @@ Highlights New features ------------ -- TBD +- Added the :mod:`cuda.core.checkpoint` module for CUDA process checkpointing, + including process state queries, lock/checkpoint/restore/unlock operations, + and GPU UUID remapping support for restore. + (`#1343 `__) Fixes and enhancements diff --git a/cuda_core/tests/test_checkpoint.py b/cuda_core/tests/test_checkpoint.py new file mode 100644 index 00000000000..c461fb6e6e0 --- /dev/null +++ b/cuda_core/tests/test_checkpoint.py @@ -0,0 +1,203 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +from enum import IntEnum + +import pytest + +from cuda.core import checkpoint + + +class _DriverProcessState(IntEnum): + CU_PROCESS_STATE_RUNNING = 0 + CU_PROCESS_STATE_LOCKED = 1 + CU_PROCESS_STATE_CHECKPOINTED = 2 + CU_PROCESS_STATE_FAILED = 3 + + +class _DriverResult(IntEnum): + CUDA_SUCCESS = 0 + CUDA_ERROR_NOT_FOUND = 500 + CUDA_ERROR_NOT_SUPPORTED = 801 + + +class _Uuid: + pass + + +class _CheckpointGpuPair: + def __init__(self): + self.oldUuid = None + self.newUuid = None + + +class _CheckpointLockArgs: + def __init__(self): + self.timeoutMs = None + + +class _CheckpointRestoreArgs: + def __init__(self): + self.gpuPairs = None + self.gpuPairsCount = None + + +class _Driver: + CUresult = _DriverResult + CUprocessState = _DriverProcessState + CUcheckpointGpuPair = _CheckpointGpuPair + CUcheckpointLockArgs = _CheckpointLockArgs + CUcheckpointRestoreArgs = _CheckpointRestoreArgs + + def __init__(self): + self.calls = [] + + def cuCheckpointProcessGetState(self, pid): + self.calls.append(("get_state", pid)) + return (0, self.CUprocessState.CU_PROCESS_STATE_CHECKPOINTED) + + def cuCheckpointProcessGetRestoreThreadId(self, pid): + self.calls.append(("get_restore_thread_id", pid)) + return (0, 123) + + def cuCheckpointProcessLock(self, pid, args): + self.calls.append(("lock", pid, args)) + return (0,) + + def cuCheckpointProcessCheckpoint(self, pid, args): + self.calls.append(("checkpoint", pid, args)) + return (0,) + + def cuCheckpointProcessRestore(self, pid, args): + self.calls.append(("restore", pid, args)) + return (0,) + + def cuCheckpointProcessUnlock(self, pid, args): + self.calls.append(("unlock", pid, args)) + return (0,) + + +@pytest.fixture +def checkpoint_driver(monkeypatch): + driver = _Driver() + monkeypatch.setattr(checkpoint, "_get_driver", lambda: driver) + + def handle_return(driver, result): + if len(result) == 1: + return None + return result[1] + + monkeypatch.setattr(checkpoint, "_handle_return", handle_return) + return driver + + +def test_public_checkpoint_symbols(): + assert checkpoint.ProcessState.CHECKPOINTED == 2 + assert "Process" in checkpoint.__all__ + assert "ProcessState" in checkpoint.__all__ + for name in ("Any", "Mapping", "IntEnum", "dataclass", "handle_return"): + assert not hasattr(checkpoint, name) + + +def test_process_state(checkpoint_driver): + state = checkpoint.Process(42).state + + assert state is checkpoint.ProcessState.CHECKPOINTED + assert checkpoint_driver.calls == [("get_state", 42)] + + +def test_process_restore_thread_id(checkpoint_driver): + tid = checkpoint.Process(42).restore_thread_id + + assert tid == 123 + assert checkpoint_driver.calls == [("get_restore_thread_id", 42)] + + +def test_process_lock_sets_timeout_ms(checkpoint_driver): + checkpoint.Process(42).lock(timeout_ms=500) + + opname, pid, args = checkpoint_driver.calls[0] + assert opname == "lock" + assert pid == 42 + assert isinstance(args, _CheckpointLockArgs) + assert args.timeoutMs == 500 + + +def test_process_checkpoint_and_unlock_pass_null_args(checkpoint_driver): + process = checkpoint.Process(42) + process.checkpoint() + process.unlock() + + assert checkpoint_driver.calls == [ + ("checkpoint", 42, None), + ("unlock", 42, None), + ] + + +def test_process_restore_accepts_gpu_uuid_mapping(checkpoint_driver): + old_uuid = _Uuid() + new_uuid = _Uuid() + + checkpoint.Process(42).restore(gpu_mapping={old_uuid: new_uuid}) + + opname, pid, args = checkpoint_driver.calls[0] + assert opname == "restore" + assert pid == 42 + assert isinstance(args, _CheckpointRestoreArgs) + assert args.gpuPairsCount == 1 + assert len(args.gpuPairs) == 1 + assert args.gpuPairs[0].oldUuid is old_uuid + assert args.gpuPairs[0].newUuid is new_uuid + + +def test_process_restore_empty_gpu_mapping_uses_null_args(checkpoint_driver): + checkpoint.Process(42).restore(gpu_mapping={}) + + assert checkpoint_driver.calls == [("restore", 42, None)] + + +@pytest.mark.parametrize( + ("args", "error_type", "match"), + [ + (("123",), TypeError, "pid must be an int"), + ((True,), TypeError, "pid must be an int"), + ((0,), ValueError, "pid must be a positive int"), + ], +) +def test_process_rejects_invalid_pid(checkpoint_driver, args, error_type, match): + with pytest.raises(error_type, match=match): + checkpoint.Process(*args) + + +@pytest.mark.parametrize( + ("timeout_ms", "error_type", "match"), + [ + (-1, ValueError, "timeout_ms must be >= 0"), + (1.5, TypeError, "timeout_ms must be an int"), + (True, TypeError, "timeout_ms must be an int"), + ], +) +def test_process_lock_rejects_invalid_timeout(checkpoint_driver, timeout_ms, error_type, match): + with pytest.raises(error_type, match=match): + checkpoint.Process(42).lock(timeout_ms=timeout_ms) + + +def test_process_restore_rejects_invalid_gpu_mapping(checkpoint_driver): + with pytest.raises(TypeError, match="gpu_mapping must be a mapping"): + checkpoint.Process(42).restore(gpu_mapping=[object()]) + + +@pytest.mark.parametrize( + "error_name", + [ + "CUDA_ERROR_NOT_FOUND", + "CUDA_ERROR_NOT_SUPPORTED", + ], +) +def test_checkpoint_apis_reject_unsupported_driver(error_name): + driver = _Driver() + result = (getattr(driver.CUresult, error_name),) + + with pytest.raises(RuntimeError, match="CUDA checkpointing is not supported"): + checkpoint._handle_return(driver, result) From 4992921dfe046ad1b2beabcece1261e5f6686c0d Mon Sep 17 00:00:00 2001 From: Keith Kraus Date: Wed, 29 Apr 2026 15:40:27 -0400 Subject: [PATCH 02/10] Address checkpoint review feedback --- cuda_core/cuda/core/checkpoint.py | 113 ++++++++++++------ cuda_core/docs/source/api.rst | 35 +++++- cuda_core/docs/source/release/1.0.0-notes.rst | 4 +- cuda_core/tests/test_checkpoint.py | 102 +++++++++++++--- 4 files changed, 197 insertions(+), 57 deletions(-) diff --git a/cuda_core/cuda/core/checkpoint.py b/cuda_core/cuda/core/checkpoint.py index ad0be778974..1333a8f0e43 100644 --- a/cuda_core/cuda/core/checkpoint.py +++ b/cuda_core/cuda/core/checkpoint.py @@ -3,11 +3,12 @@ # SPDX-License-Identifier: Apache-2.0 from collections.abc import Mapping as _Mapping -from dataclasses import dataclass as _dataclass -from enum import IntEnum as _IntEnum from typing import Any as _Any +from typing import Literal as _Literal from cuda.core._utils.cuda_utils import handle_return as _handle_cuda_return +from cuda.core._utils.version import binding_version as _binding_version +from cuda.core._utils.version import driver_version as _driver_version try: from cuda.bindings import driver as _driver @@ -15,18 +16,30 @@ from cuda import cuda as _driver -class ProcessState(_IntEnum): - """ - CUDA checkpoint state for a process. - """ +ProcessStateT = _Literal["running", "locked", "checkpointed", "failed"] + +_PROCESS_STATE_NAMES: dict[int, ProcessStateT] = { + 0: "running", + 1: "locked", + 2: "checkpointed", + 3: "failed", +} - RUNNING = 0 - LOCKED = 1 - CHECKPOINTED = 2 - FAILED = 3 +_REQUIRED_BINDING_ATTRS = ( + "cuCheckpointProcessCheckpoint", + "cuCheckpointProcessGetRestoreThreadId", + "cuCheckpointProcessGetState", + "cuCheckpointProcessLock", + "cuCheckpointProcessRestore", + "cuCheckpointProcessUnlock", + "CUcheckpointGpuPair", + "CUcheckpointLockArgs", + "CUcheckpointRestoreArgs", +) +_REQUIRED_DRIVER_VERSION = (12, 8, 0) +_driver_capability_checked = False -@_dataclass(frozen=True) class Process: """ CUDA process that can be locked, checkpointed, restored, and unlocked. @@ -37,19 +50,23 @@ class Process: Process ID of the CUDA process. """ - pid: int + __slots__ = ("pid",) - def __post_init__(self): - _check_pid(self.pid) + def __init__(self, pid: int): + self.pid = _check_pid(pid) @property - def state(self) -> ProcessState: + def state(self) -> ProcessStateT: """ CUDA checkpoint state for this process. """ driver = _get_driver() - state = _handle_return(driver, driver.cuCheckpointProcessGetState(self.pid)) - return ProcessState(int(state)) + state = _call_driver(driver, driver.cuCheckpointProcessGetState, self.pid) + state_value = int(state) + try: + return _PROCESS_STATE_NAMES[state_value] + except KeyError as e: + raise RuntimeError(f"Unknown CUDA checkpoint process state: {state_value}") from e @property def restore_thread_id(self) -> int: @@ -57,7 +74,7 @@ def restore_thread_id(self) -> int: CUDA restore thread ID for this process. """ driver = _get_driver() - return _handle_return(driver, driver.cuCheckpointProcessGetRestoreThreadId(self.pid)) + return _call_driver(driver, driver.cuCheckpointProcessGetRestoreThreadId, self.pid) def lock(self, timeout_ms: int = 0) -> None: """ @@ -71,14 +88,14 @@ def lock(self, timeout_ms: int = 0) -> None: driver = _get_driver() args = driver.CUcheckpointLockArgs() args.timeoutMs = _check_timeout_ms(timeout_ms) - _handle_return(driver, driver.cuCheckpointProcessLock(self.pid, args)) + _call_driver(driver, driver.cuCheckpointProcessLock, self.pid, args) def checkpoint(self) -> None: """ Checkpoint the GPU memory contents of this locked process. """ driver = _get_driver() - _handle_return(driver, driver.cuCheckpointProcessCheckpoint(self.pid, None)) + _call_driver(driver, driver.cuCheckpointProcessCheckpoint, self.pid, None) def restore(self, gpu_mapping: _Mapping[_Any, _Any] | None = None) -> None: """ @@ -93,36 +110,63 @@ def restore(self, gpu_mapping: _Mapping[_Any, _Any] | None = None) -> None: """ driver = _get_driver() args = _make_restore_args(driver, gpu_mapping) - _handle_return(driver, driver.cuCheckpointProcessRestore(self.pid, args)) + _call_driver(driver, driver.cuCheckpointProcessRestore, self.pid, args) def unlock(self) -> None: """ Unlock this locked process so it can resume CUDA API calls. """ driver = _get_driver() - _handle_return(driver, driver.cuCheckpointProcessUnlock(self.pid, None)) + _call_driver(driver, driver.cuCheckpointProcessUnlock, self.pid, None) def _get_driver(): - required = ( - "cuCheckpointProcessCheckpoint", - "cuCheckpointProcessGetRestoreThreadId", - "cuCheckpointProcessGetState", - "cuCheckpointProcessLock", - "cuCheckpointProcessRestore", - "cuCheckpointProcessUnlock", - "CUcheckpointGpuPair", - "CUcheckpointLockArgs", - "CUcheckpointRestoreArgs", - ) - missing = [name for name in required if not hasattr(_driver, name)] + global _driver_capability_checked + if _driver_capability_checked: + return _driver + + binding_ver = _binding_version() + if not _binding_version_supports_checkpoint(binding_ver): + raise RuntimeError( + "CUDA checkpointing requires cuda.bindings with CUDA checkpoint API support. " + f"Found cuda.bindings {'.'.join(str(part) for part in binding_ver[:3])}." + ) + + missing = [name for name in _REQUIRED_BINDING_ATTRS if not hasattr(_driver, name)] if missing: raise RuntimeError( f"CUDA checkpointing requires cuda.bindings with CUDA checkpoint API support. Missing: {', '.join(missing)}" ) + + driver_ver = _driver_version() + if driver_ver < _REQUIRED_DRIVER_VERSION: + raise RuntimeError( + "CUDA checkpointing is not supported by the installed NVIDIA driver. " + "Upgrade to a driver version with CUDA checkpoint API support." + ) + + _driver_capability_checked = True return _driver +def _binding_version_supports_checkpoint(version) -> bool: + major, minor, patch = version[:3] + return (major == 12 and (minor, patch) >= (8, 0)) or (major == 13 and (minor, patch) >= (0, 2)) or major > 13 + + +def _call_driver(driver, func, *args): + try: + result = func(*args) + except RuntimeError as e: + if "cuCheckpointProcess" in str(e) and "not found" in str(e): + raise RuntimeError( + "CUDA checkpointing is not supported by the installed NVIDIA driver. " + "Upgrade to a driver version with CUDA checkpoint API support." + ) from e + raise + return _handle_return(driver, result) + + def _handle_return(driver, result): err = result[0] not_supported_errors = ( @@ -178,5 +222,4 @@ def _make_restore_args(driver, gpu_mapping: _Mapping[_Any, _Any] | None): __all__ = [ "Process", - "ProcessState", ] diff --git a/cuda_core/docs/source/api.rst b/cuda_core/docs/source/api.rst index 5d7efdb2d17..2762f8ca541 100644 --- a/cuda_core/docs/source/api.rst +++ b/cuda_core/docs/source/api.rst @@ -177,17 +177,42 @@ CUDA compilation toolchain CUDA process checkpointing -------------------------- -.. autosummary:: - :toctree: generated/ +The :mod:`cuda.core.checkpoint` module wraps the CUDA driver process +checkpoint APIs. These APIs are intended for Linux process checkpoint and +restore workflows, and require a CUDA driver with checkpoint API support and +a ``cuda-bindings`` version that exposes those driver entry points. - :template: class.rst +A checkpoint workflow operates on a CUDA process by process ID. The typical +sequence is to lock the process, capture its GPU memory state, restore it +when needed, and then unlock it so CUDA API calls can resume: - checkpoint.Process +.. code-block:: python + + from cuda.core import checkpoint + + process = checkpoint.Process(pid) + process.lock(timeout_ms=5000) + process.checkpoint() + process.restore() + process.unlock() + +``Process.state`` returns one of ``"running"``, ``"locked"``, +``"checkpointed"``, or ``"failed"``. Restore may optionally remap GPUs by +passing ``gpu_mapping`` from each checkpointed GPU UUID to the GPU UUID that +should be used during restore. A successful restore returns the process to +the locked state; call ``Process.unlock`` after restore to allow CUDA API +calls to resume. + +The CUDA driver requires restore to run from the process restore thread. +Use ``Process.restore_thread_id`` to discover that thread before calling +``Process.restore`` from a checkpoint coordinator. .. autosummary:: :toctree: generated/ - checkpoint.ProcessState + :template: class.rst + + checkpoint.Process CUDA system information and NVIDIA Management Library (NVML) diff --git a/cuda_core/docs/source/release/1.0.0-notes.rst b/cuda_core/docs/source/release/1.0.0-notes.rst index 13e1430ee23..f5d3645c3d6 100644 --- a/cuda_core/docs/source/release/1.0.0-notes.rst +++ b/cuda_core/docs/source/release/1.0.0-notes.rst @@ -17,8 +17,8 @@ New features ------------ - Added the :mod:`cuda.core.checkpoint` module for CUDA process checkpointing, - including process state queries, lock/checkpoint/restore/unlock operations, - and GPU UUID remapping support for restore. + including string process state queries, lock/checkpoint/restore/unlock + operations, and GPU UUID remapping support for restore. (`#1343 `__) diff --git a/cuda_core/tests/test_checkpoint.py b/cuda_core/tests/test_checkpoint.py index c461fb6e6e0..d92a0c632ab 100644 --- a/cuda_core/tests/test_checkpoint.py +++ b/cuda_core/tests/test_checkpoint.py @@ -9,14 +9,14 @@ from cuda.core import checkpoint -class _DriverProcessState(IntEnum): +class _MockDriverProcessState(IntEnum): CU_PROCESS_STATE_RUNNING = 0 CU_PROCESS_STATE_LOCKED = 1 CU_PROCESS_STATE_CHECKPOINTED = 2 CU_PROCESS_STATE_FAILED = 3 -class _DriverResult(IntEnum): +class _MockDriverResult(IntEnum): CUDA_SUCCESS = 0 CUDA_ERROR_NOT_FOUND = 500 CUDA_ERROR_NOT_SUPPORTED = 801 @@ -43,19 +43,20 @@ def __init__(self): self.gpuPairsCount = None -class _Driver: - CUresult = _DriverResult - CUprocessState = _DriverProcessState +class _MockDriver: + CUresult = _MockDriverResult + CUprocessState = _MockDriverProcessState CUcheckpointGpuPair = _CheckpointGpuPair CUcheckpointLockArgs = _CheckpointLockArgs CUcheckpointRestoreArgs = _CheckpointRestoreArgs - def __init__(self): + def __init__(self, process_state=_MockDriverProcessState.CU_PROCESS_STATE_CHECKPOINTED): self.calls = [] + self.process_state = process_state def cuCheckpointProcessGetState(self, pid): self.calls.append(("get_state", pid)) - return (0, self.CUprocessState.CU_PROCESS_STATE_CHECKPOINTED) + return (0, self.process_state) def cuCheckpointProcessGetRestoreThreadId(self, pid): self.calls.append(("get_restore_thread_id", pid)) @@ -80,7 +81,7 @@ def cuCheckpointProcessUnlock(self, pid, args): @pytest.fixture def checkpoint_driver(monkeypatch): - driver = _Driver() + driver = _MockDriver() monkeypatch.setattr(checkpoint, "_get_driver", lambda: driver) def handle_return(driver, result): @@ -93,17 +94,27 @@ def handle_return(driver, result): def test_public_checkpoint_symbols(): - assert checkpoint.ProcessState.CHECKPOINTED == 2 - assert "Process" in checkpoint.__all__ - assert "ProcessState" in checkpoint.__all__ - for name in ("Any", "Mapping", "IntEnum", "dataclass", "handle_return"): + assert set(checkpoint.ProcessStateT.__args__) == {"running", "locked", "checkpointed", "failed"} + assert checkpoint.__all__ == ["Process"] + for name in ("Any", "Mapping", "Literal", "IntEnum", "dataclass", "handle_return", "ProcessState"): assert not hasattr(checkpoint, name) -def test_process_state(checkpoint_driver): +@pytest.mark.parametrize( + ("process_state", "expected"), + [ + (_MockDriverProcessState.CU_PROCESS_STATE_RUNNING, "running"), + (_MockDriverProcessState.CU_PROCESS_STATE_LOCKED, "locked"), + (_MockDriverProcessState.CU_PROCESS_STATE_CHECKPOINTED, "checkpointed"), + (_MockDriverProcessState.CU_PROCESS_STATE_FAILED, "failed"), + ], +) +def test_process_state(checkpoint_driver, process_state, expected): + checkpoint_driver.process_state = process_state + state = checkpoint.Process(42).state - assert state is checkpoint.ProcessState.CHECKPOINTED + assert state == expected assert checkpoint_driver.calls == [("get_state", 42)] @@ -196,8 +207,69 @@ def test_process_restore_rejects_invalid_gpu_mapping(checkpoint_driver): ], ) def test_checkpoint_apis_reject_unsupported_driver(error_name): - driver = _Driver() + driver = _MockDriver() result = (getattr(driver.CUresult, error_name),) with pytest.raises(RuntimeError, match="CUDA checkpointing is not supported"): checkpoint._handle_return(driver, result) + + +def test_get_driver_caches_capability_check(monkeypatch): + calls = {"binding_version": 0, "driver_version": 0} + + def binding_version(): + calls["binding_version"] += 1 + return (13, 0, 2) + + def driver_version(): + calls["driver_version"] += 1 + return (12, 8, 0) + + driver = _MockDriver() + monkeypatch.setattr(checkpoint, "_driver", driver) + monkeypatch.setattr(checkpoint, "_driver_capability_checked", False) + monkeypatch.setattr(checkpoint, "_binding_version", binding_version) + monkeypatch.setattr(checkpoint, "_driver_version", driver_version) + + assert checkpoint._get_driver() is driver + assert checkpoint._get_driver() is driver + assert calls == {"binding_version": 1, "driver_version": 1} + + +@pytest.mark.parametrize("binding_version", [(12, 7, 0), (13, 0, 1)]) +def test_get_driver_rejects_unsupported_binding_version(monkeypatch, binding_version): + monkeypatch.setattr(checkpoint, "_driver", _MockDriver()) + monkeypatch.setattr(checkpoint, "_driver_capability_checked", False) + monkeypatch.setattr(checkpoint, "_binding_version", lambda: binding_version) + + with pytest.raises(RuntimeError, match="CUDA checkpointing requires cuda.bindings"): + checkpoint._get_driver() + + +def test_get_driver_rejects_missing_binding_symbols(monkeypatch): + monkeypatch.setattr(checkpoint, "_driver", object()) + monkeypatch.setattr(checkpoint, "_driver_capability_checked", False) + monkeypatch.setattr(checkpoint, "_binding_version", lambda: (13, 0, 2)) + + with pytest.raises(RuntimeError, match="Missing: cuCheckpointProcessCheckpoint"): + checkpoint._get_driver() + + +def test_get_driver_rejects_unsupported_driver_version(monkeypatch): + monkeypatch.setattr(checkpoint, "_driver", _MockDriver()) + monkeypatch.setattr(checkpoint, "_driver_capability_checked", False) + monkeypatch.setattr(checkpoint, "_binding_version", lambda: (13, 0, 2)) + monkeypatch.setattr(checkpoint, "_driver_version", lambda: (12, 7, 0)) + + with pytest.raises(RuntimeError, match="CUDA checkpointing is not supported"): + checkpoint._get_driver() + + +def test_checkpoint_apis_translate_missing_runtime_symbol(): + driver = _MockDriver() + + def missing_checkpoint_symbol(): + raise RuntimeError('Function "cuCheckpointProcessLock" not found') + + with pytest.raises(RuntimeError, match="CUDA checkpointing is not supported"): + checkpoint._call_driver(driver, missing_checkpoint_symbol) From 5afd43c97855844fafcdb4df814a5a880ffbd8a4 Mon Sep 17 00:00:00 2001 From: Leo Fang Date: Sat, 2 May 2026 03:34:06 +0000 Subject: [PATCH 03/10] Rewrite checkpoint tests: replace mocks with real GPU tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the entire mock-based test suite with real GPU tests that exercise the CUDA driver checkpoint API directly: - Input validation: pid type/range, public symbol checks - Lifecycle (single GPU): state transitions at every stage (running→locked→checkpointed→locked→running), restore_thread_id, lock/unlock, lock with timeout, full checkpoint-restore cycle - GPU migration: rotation mapping and same-chip swap following the r580-migration-api.c pattern; gracefully skip when the driver does not support migration (CUDA_ERROR_INVALID_VALUE — NVBug 5437334) The self_process fixture wraps os.getpid() and safety-unlocks on teardown if the test fails mid-lifecycle. Co-Authored-By: Claude Opus 4.6 (1M context) --- cuda_core/tests/test_checkpoint.py | 487 ++++++++++++++--------------- 1 file changed, 237 insertions(+), 250 deletions(-) diff --git a/cuda_core/tests/test_checkpoint.py b/cuda_core/tests/test_checkpoint.py index d92a0c632ab..82a6827efaf 100644 --- a/cuda_core/tests/test_checkpoint.py +++ b/cuda_core/tests/test_checkpoint.py @@ -2,274 +2,261 @@ # # SPDX-License-Identifier: Apache-2.0 -from enum import IntEnum - -import pytest - -from cuda.core import checkpoint - - -class _MockDriverProcessState(IntEnum): - CU_PROCESS_STATE_RUNNING = 0 - CU_PROCESS_STATE_LOCKED = 1 - CU_PROCESS_STATE_CHECKPOINTED = 2 - CU_PROCESS_STATE_FAILED = 3 - - -class _MockDriverResult(IntEnum): - CUDA_SUCCESS = 0 - CUDA_ERROR_NOT_FOUND = 500 - CUDA_ERROR_NOT_SUPPORTED = 801 - - -class _Uuid: - pass - - -class _CheckpointGpuPair: - def __init__(self): - self.oldUuid = None - self.newUuid = None - - -class _CheckpointLockArgs: - def __init__(self): - self.timeoutMs = None - - -class _CheckpointRestoreArgs: - def __init__(self): - self.gpuPairs = None - self.gpuPairsCount = None - - -class _MockDriver: - CUresult = _MockDriverResult - CUprocessState = _MockDriverProcessState - CUcheckpointGpuPair = _CheckpointGpuPair - CUcheckpointLockArgs = _CheckpointLockArgs - CUcheckpointRestoreArgs = _CheckpointRestoreArgs - - def __init__(self, process_state=_MockDriverProcessState.CU_PROCESS_STATE_CHECKPOINTED): - self.calls = [] - self.process_state = process_state - - def cuCheckpointProcessGetState(self, pid): - self.calls.append(("get_state", pid)) - return (0, self.process_state) - - def cuCheckpointProcessGetRestoreThreadId(self, pid): - self.calls.append(("get_restore_thread_id", pid)) - return (0, 123) - - def cuCheckpointProcessLock(self, pid, args): - self.calls.append(("lock", pid, args)) - return (0,) - - def cuCheckpointProcessCheckpoint(self, pid, args): - self.calls.append(("checkpoint", pid, args)) - return (0,) - - def cuCheckpointProcessRestore(self, pid, args): - self.calls.append(("restore", pid, args)) - return (0,) - - def cuCheckpointProcessUnlock(self, pid, args): - self.calls.append(("unlock", pid, args)) - return (0,) - - -@pytest.fixture -def checkpoint_driver(monkeypatch): - driver = _MockDriver() - monkeypatch.setattr(checkpoint, "_get_driver", lambda: driver) - - def handle_return(driver, result): - if len(result) == 1: - return None - return result[1] - - monkeypatch.setattr(checkpoint, "_handle_return", handle_return) - return driver - - -def test_public_checkpoint_symbols(): - assert set(checkpoint.ProcessStateT.__args__) == {"running", "locked", "checkpointed", "failed"} - assert checkpoint.__all__ == ["Process"] - for name in ("Any", "Mapping", "Literal", "IntEnum", "dataclass", "handle_return", "ProcessState"): - assert not hasattr(checkpoint, name) - - -@pytest.mark.parametrize( - ("process_state", "expected"), - [ - (_MockDriverProcessState.CU_PROCESS_STATE_RUNNING, "running"), - (_MockDriverProcessState.CU_PROCESS_STATE_LOCKED, "locked"), - (_MockDriverProcessState.CU_PROCESS_STATE_CHECKPOINTED, "checkpointed"), - (_MockDriverProcessState.CU_PROCESS_STATE_FAILED, "failed"), - ], -) -def test_process_state(checkpoint_driver, process_state, expected): - checkpoint_driver.process_state = process_state - - state = checkpoint.Process(42).state - - assert state == expected - assert checkpoint_driver.calls == [("get_state", 42)] - - -def test_process_restore_thread_id(checkpoint_driver): - tid = checkpoint.Process(42).restore_thread_id - - assert tid == 123 - assert checkpoint_driver.calls == [("get_restore_thread_id", 42)] - - -def test_process_lock_sets_timeout_ms(checkpoint_driver): - checkpoint.Process(42).lock(timeout_ms=500) - - opname, pid, args = checkpoint_driver.calls[0] - assert opname == "lock" - assert pid == 42 - assert isinstance(args, _CheckpointLockArgs) - assert args.timeoutMs == 500 - - -def test_process_checkpoint_and_unlock_pass_null_args(checkpoint_driver): - process = checkpoint.Process(42) - process.checkpoint() - process.unlock() - - assert checkpoint_driver.calls == [ - ("checkpoint", 42, None), - ("unlock", 42, None), - ] - - -def test_process_restore_accepts_gpu_uuid_mapping(checkpoint_driver): - old_uuid = _Uuid() - new_uuid = _Uuid() - - checkpoint.Process(42).restore(gpu_mapping={old_uuid: new_uuid}) +# Real GPU tests for cuda.core.checkpoint — no mocks. +# +# Lifecycle tests self-checkpoint the current process (os.getpid()) and +# exercise lock / checkpoint / restore / unlock through the real driver. +# +# Migration tests attempt GPU UUID remapping following the pattern from +# NVIDIA/cuda-checkpoint r580-migration-api.c. They require ≥2 GPUs of +# the same chip type and a driver that supports migration; the tests skip +# gracefully when the hardware or driver cannot satisfy this. - opname, pid, args = checkpoint_driver.calls[0] - assert opname == "restore" - assert pid == 42 - assert isinstance(args, _CheckpointRestoreArgs) - assert args.gpuPairsCount == 1 - assert len(args.gpuPairs) == 1 - assert args.gpuPairs[0].oldUuid is old_uuid - assert args.gpuPairs[0].newUuid is new_uuid +import os +import sys +import pytest -def test_process_restore_empty_gpu_mapping_uses_null_args(checkpoint_driver): - checkpoint.Process(42).restore(gpu_mapping={}) +try: + from cuda.bindings import driver +except ImportError: + from cuda import cuda as driver - assert checkpoint_driver.calls == [("restore", 42, None)] +from cuda.core import Device, checkpoint +from cuda.core._utils.cuda_utils import CUDAError, handle_return -@pytest.mark.parametrize( - ("args", "error_type", "match"), - [ - (("123",), TypeError, "pid must be an int"), - ((True,), TypeError, "pid must be an int"), - ((0,), ValueError, "pid must be a positive int"), - ], -) -def test_process_rejects_invalid_pid(checkpoint_driver, args, error_type, match): - with pytest.raises(error_type, match=match): - checkpoint.Process(*args) - - -@pytest.mark.parametrize( - ("timeout_ms", "error_type", "match"), - [ - (-1, ValueError, "timeout_ms must be >= 0"), - (1.5, TypeError, "timeout_ms must be an int"), - (True, TypeError, "timeout_ms must be an int"), - ], -) -def test_process_lock_rejects_invalid_timeout(checkpoint_driver, timeout_ms, error_type, match): - with pytest.raises(error_type, match=match): - checkpoint.Process(42).lock(timeout_ms=timeout_ms) +# -- Skip condition ------------------------------------------------------- - -def test_process_restore_rejects_invalid_gpu_mapping(checkpoint_driver): - with pytest.raises(TypeError, match="gpu_mapping must be a mapping"): - checkpoint.Process(42).restore(gpu_mapping=[object()]) +def _checkpoint_available(): + """Return True if the checkpoint API is usable on this system.""" + try: + checkpoint._get_driver() + return True + except RuntimeError: + return False -@pytest.mark.parametrize( - "error_name", - [ - "CUDA_ERROR_NOT_FOUND", - "CUDA_ERROR_NOT_SUPPORTED", - ], +needs_checkpoint = pytest.mark.skipif( + sys.platform != "linux" or not _checkpoint_available(), + reason="CUDA checkpoint API requires Linux and a supported driver/bindings", ) -def test_checkpoint_apis_reject_unsupported_driver(error_name): - driver = _MockDriver() - result = (getattr(driver.CUresult, error_name),) - - with pytest.raises(RuntimeError, match="CUDA checkpointing is not supported"): - checkpoint._handle_return(driver, result) - -def test_get_driver_caches_capability_check(monkeypatch): - calls = {"binding_version": 0, "driver_version": 0} - def binding_version(): - calls["binding_version"] += 1 - return (13, 0, 2) +# -- Helpers --------------------------------------------------------------- - def driver_version(): - calls["driver_version"] += 1 - return (12, 8, 0) +def _get_context_device_uuid(): + """Return the UUID string of the device owning the current CUDA context.""" + dev_id = int(handle_return(driver.cuCtxGetDevice())) + return Device(dev_id).uuid - driver = _MockDriver() - monkeypatch.setattr(checkpoint, "_driver", driver) - monkeypatch.setattr(checkpoint, "_driver_capability_checked", False) - monkeypatch.setattr(checkpoint, "_binding_version", binding_version) - monkeypatch.setattr(checkpoint, "_driver_version", driver_version) - assert checkpoint._get_driver() is driver - assert checkpoint._get_driver() is driver - assert calls == {"binding_version": 1, "driver_version": 1} +def _build_rotation_mapping(devices): + """GPU i UUID -> GPU (i+1) % N UUID for every visible device. + Returns a dict of CUuuid -> CUuuid suitable for Process.restore(). + """ + n = len(devices) + mapping = {} + for i in range(n): + old_uuid = handle_return(driver.cuDeviceGetUuid(devices[i].device_id)) + new_uuid = handle_return(driver.cuDeviceGetUuid(devices[(i + 1) % n].device_id)) + mapping[old_uuid] = new_uuid + return mapping -@pytest.mark.parametrize("binding_version", [(12, 7, 0), (13, 0, 1)]) -def test_get_driver_rejects_unsupported_binding_version(monkeypatch, binding_version): - monkeypatch.setattr(checkpoint, "_driver", _MockDriver()) - monkeypatch.setattr(checkpoint, "_driver_capability_checked", False) - monkeypatch.setattr(checkpoint, "_binding_version", lambda: binding_version) - - with pytest.raises(RuntimeError, match="CUDA checkpointing requires cuda.bindings"): - checkpoint._get_driver() +def _find_same_chip_pair(devices): + """Return (i, j) indices of two devices with the same name, or None.""" + seen = {} + for i, dev in enumerate(devices): + name = dev.name + if name in seen: + return (seen[name], i) + seen[name] = i + return None -def test_get_driver_rejects_missing_binding_symbols(monkeypatch): - monkeypatch.setattr(checkpoint, "_driver", object()) - monkeypatch.setattr(checkpoint, "_driver_capability_checked", False) - monkeypatch.setattr(checkpoint, "_binding_version", lambda: (13, 0, 2)) - - with pytest.raises(RuntimeError, match="Missing: cuCheckpointProcessCheckpoint"): - checkpoint._get_driver() +# -- Fixtures -------------------------------------------------------------- -def test_get_driver_rejects_unsupported_driver_version(monkeypatch): - monkeypatch.setattr(checkpoint, "_driver", _MockDriver()) - monkeypatch.setattr(checkpoint, "_driver_capability_checked", False) - monkeypatch.setattr(checkpoint, "_binding_version", lambda: (13, 0, 2)) - monkeypatch.setattr(checkpoint, "_driver_version", lambda: (12, 7, 0)) - - with pytest.raises(RuntimeError, match="CUDA checkpointing is not supported"): - checkpoint._get_driver() - - -def test_checkpoint_apis_translate_missing_runtime_symbol(): - driver = _MockDriver() - - def missing_checkpoint_symbol(): - raise RuntimeError('Function "cuCheckpointProcessLock" not found') - - with pytest.raises(RuntimeError, match="CUDA checkpointing is not supported"): - checkpoint._call_driver(driver, missing_checkpoint_symbol) +@pytest.fixture +def self_process(init_cuda): + """checkpoint.Process wrapping os.getpid(), with safety unlock on teardown.""" + proc = checkpoint.Process(os.getpid()) + yield proc + # Ensure the process is not left locked if the test fails mid-lifecycle. + try: + st = proc.state + if st == "checkpointed": + proc.restore() + proc.unlock() + elif st == "locked": + proc.unlock() + except Exception: + pass + + +# -- Input validation (no GPU / driver needed) ----------------------------- + +class TestInputValidation: + @pytest.mark.parametrize( + ("args", "error_type", "match"), + [ + (("abc",), TypeError, "pid must be an int"), + ((True,), TypeError, "pid must be an int"), + ((0,), ValueError, "pid must be a positive int"), + ((-1,), ValueError, "pid must be a positive int"), + ], + ) + def test_process_rejects_invalid_pid(self, args, error_type, match): + with pytest.raises(error_type, match=match): + checkpoint.Process(*args) + + def test_public_symbols(self): + assert checkpoint.__all__ == ["Process"] + + +# -- Lifecycle (single GPU, real driver) ----------------------------------- + +@needs_checkpoint +class TestCheckpointLifecycle: + def test_initial_state_is_running(self, self_process): + assert self_process.state == "running" + + def test_restore_thread_id_is_positive(self, self_process): + tid = self_process.restore_thread_id + assert isinstance(tid, int) + assert tid > 0 + + def test_lock_unlock(self, self_process): + self_process.lock() + assert self_process.state == "locked" + self_process.unlock() + assert self_process.state == "running" + + def test_lock_default_timeout(self, self_process): + """lock() with the default timeout_ms=0 (no timeout).""" + self_process.lock() + assert self_process.state == "locked" + self_process.unlock() + + def test_lock_with_timeout(self, self_process): + self_process.lock(timeout_ms=5000) + assert self_process.state == "locked" + self_process.unlock() + + def test_full_cycle_no_migration(self, self_process): + """lock -> checkpoint -> restore -> unlock, verify state at each step.""" + self_process.lock() + assert self_process.state == "locked" + + self_process.checkpoint() + assert self_process.state == "checkpointed" + + self_process.restore() + assert self_process.state == "locked" # restore leaves process locked + + self_process.unlock() + assert self_process.state == "running" + + +# -- GPU migration (>= 2 same-chip GPUs, real driver) --------------------- + +@needs_checkpoint +class TestCheckpointGpuMigration: + """GPU UUID remapping tests following the r580-migration-api.c pattern. + + These tests require at least two GPUs of the same chip type and a + driver that supports checkpoint migration. They skip when the + hardware cannot satisfy this (e.g. heterogeneous GPUs, or a driver + build where migration returns CUDA_ERROR_INVALID_VALUE — see + NVBug 5437334). + """ + + @staticmethod + def _try_migration(proc, gpu_mapping): + """Attempt a single checkpoint-restore with migration. + + Returns True on success. Skips the test if the driver rejects + the migration with CUDA_ERROR_INVALID_VALUE (known limitation + on some architectures / driver versions). + """ + proc.lock() + proc.checkpoint() + try: + proc.restore(gpu_mapping=gpu_mapping) + except (CUDAError, RuntimeError) as exc: + # Recover: restore without migration, then unlock. + proc.restore() + proc.unlock() + if "INVALID_VALUE" in str(exc): + pytest.skip( + "Driver does not support GPU migration on this hardware " + "(CUDA_ERROR_INVALID_VALUE — see NVBug 5437334)" + ) + raise + proc.unlock() + return True + + def test_rotation_migrates_context(self, self_process): + """Rotate context through all GPUs and back to the origin. + + Builds a rotation mapping (device i -> device (i+1) % N) for + every visible device and performs N rotations. After each step + the context device UUID is checked. After N steps the context + should be back on the original device. + """ + devices = Device.get_all_devices() + if len(devices) < 2: + pytest.skip("GPU migration tests require at least 2 GPUs") + if _find_same_chip_pair(devices) is None: + pytest.skip("GPU migration requires at least 2 GPUs of the same chip type") + + gpu_mapping = _build_rotation_mapping(devices) + uuid_origin = _get_context_device_uuid() + + for step in range(len(devices)): + expected_uuid = devices[(step + 1) % len(devices)].uuid + + self._try_migration(self_process, gpu_mapping) + + assert _get_context_device_uuid() == expected_uuid, ( + f"Step {step}: expected UUID {expected_uuid}, " + f"got {_get_context_device_uuid()}" + ) + + # After N rotations, back at the origin. + assert _get_context_device_uuid() == uuid_origin + + def test_swap_identical_gpus(self, self_process): + """Swap context between two GPUs of the same chip type. + + Sets the context on one of the pair members so that a successful + migration is observable (the context UUID changes). + """ + devices = Device.get_all_devices() + pair = _find_same_chip_pair(devices) + if pair is None: + pytest.skip("No two GPUs of the same chip type found") + + i, j = pair + # Place context on device i so the swap is observable. + devices[i].set_current() + + # Build an identity mapping, then swap the pair. + n = len(devices) + uuids_cu = [handle_return(driver.cuDeviceGetUuid(devices[k].device_id)) for k in range(n)] + gpu_mapping = {uuids_cu[k]: uuids_cu[k] for k in range(n)} + gpu_mapping[uuids_cu[i]] = uuids_cu[j] + gpu_mapping[uuids_cu[j]] = uuids_cu[i] + + assert _get_context_device_uuid() == devices[i].uuid + + self._try_migration(self_process, gpu_mapping) + uuid_after = _get_context_device_uuid() + + if uuid_after == devices[i].uuid: + pytest.skip( + "Driver accepted GPU swap but migration is a no-op " + "on this hardware/driver version" + ) + assert uuid_after == devices[j].uuid From f67a5e64893c254b41d31f5a529f268800e82cca Mon Sep 17 00:00:00 2001 From: Leo Fang Date: Sat, 2 May 2026 04:19:20 +0000 Subject: [PATCH 04/10] Accept Device.uuid strings in gpu_mapping; use cuda.core APIs in tests - checkpoint._make_restore_args now accepts UUID strings (as returned by Device.uuid) in addition to CUuuid objects, via a new _as_cuuuid helper that converts "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" strings to CUuuid using ctypes. - Tests no longer import cuda.bindings.driver; all device queries use cuda.core.Device (Device().uuid for current device, Device.uuid for mapping keys/values, Device.get_all_devices() for enumeration). Co-Authored-By: Claude Opus 4.6 (1M context) --- cuda_core/cuda/core/checkpoint.py | 25 +++++++++++++++-- cuda_core/tests/test_checkpoint.py | 45 ++++++++++-------------------- 2 files changed, 37 insertions(+), 33 deletions(-) diff --git a/cuda_core/cuda/core/checkpoint.py b/cuda_core/cuda/core/checkpoint.py index 1333a8f0e43..01a4ac77af2 100644 --- a/cuda_core/cuda/core/checkpoint.py +++ b/cuda_core/cuda/core/checkpoint.py @@ -207,8 +207,8 @@ def _make_restore_args(driver, gpu_mapping: _Mapping[_Any, _Any] | None): pairs = [] for old_uuid, new_uuid in gpu_mapping.items(): pair = driver.CUcheckpointGpuPair() - pair.oldUuid = old_uuid - pair.newUuid = new_uuid + pair.oldUuid = _as_cuuuid(driver, old_uuid) + pair.newUuid = _as_cuuuid(driver, new_uuid) pairs.append(pair) if not pairs: @@ -220,6 +220,27 @@ def _make_restore_args(driver, gpu_mapping: _Mapping[_Any, _Any] | None): return args +def _as_cuuuid(driver, value): + """Convert *value* to a ``CUuuid``. + + Accepts a ``CUuuid`` instance (returned as-is) or a UUID string in + the ``"xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"`` format returned by + :attr:`Device.uuid`. + """ + if isinstance(value, str): + import ctypes + + raw = bytes.fromhex(value.replace("-", "")) + if len(raw) != 16: + raise ValueError( + "GPU UUID string must be 32 hex characters " + f"(with optional hyphens), got {value!r}" + ) + buf = ctypes.create_string_buffer(raw, 16) + return driver.CUuuid(ctypes.addressof(buf)) + return value + + __all__ = [ "Process", ] diff --git a/cuda_core/tests/test_checkpoint.py b/cuda_core/tests/test_checkpoint.py index 82a6827efaf..60906651a40 100644 --- a/cuda_core/tests/test_checkpoint.py +++ b/cuda_core/tests/test_checkpoint.py @@ -17,13 +17,8 @@ import pytest -try: - from cuda.bindings import driver -except ImportError: - from cuda import cuda as driver - from cuda.core import Device, checkpoint -from cuda.core._utils.cuda_utils import CUDAError, handle_return +from cuda.core._utils.cuda_utils import CUDAError # -- Skip condition ------------------------------------------------------- @@ -45,24 +40,14 @@ def _checkpoint_available(): # -- Helpers --------------------------------------------------------------- -def _get_context_device_uuid(): - """Return the UUID string of the device owning the current CUDA context.""" - dev_id = int(handle_return(driver.cuCtxGetDevice())) - return Device(dev_id).uuid - - def _build_rotation_mapping(devices): """GPU i UUID -> GPU (i+1) % N UUID for every visible device. - Returns a dict of CUuuid -> CUuuid suitable for Process.restore(). + Returns a ``{str: str}`` dict of UUID strings suitable for + :meth:`~checkpoint.Process.restore`. """ n = len(devices) - mapping = {} - for i in range(n): - old_uuid = handle_return(driver.cuDeviceGetUuid(devices[i].device_id)) - new_uuid = handle_return(driver.cuDeviceGetUuid(devices[(i + 1) % n].device_id)) - mapping[old_uuid] = new_uuid - return mapping + return {devices[i].uuid: devices[(i + 1) % n].uuid for i in range(n)} def _find_same_chip_pair(devices): @@ -212,20 +197,20 @@ def test_rotation_migrates_context(self, self_process): pytest.skip("GPU migration requires at least 2 GPUs of the same chip type") gpu_mapping = _build_rotation_mapping(devices) - uuid_origin = _get_context_device_uuid() + uuid_origin = Device().uuid for step in range(len(devices)): expected_uuid = devices[(step + 1) % len(devices)].uuid self._try_migration(self_process, gpu_mapping) - assert _get_context_device_uuid() == expected_uuid, ( + assert Device().uuid == expected_uuid, ( f"Step {step}: expected UUID {expected_uuid}, " - f"got {_get_context_device_uuid()}" + f"got {Device().uuid}" ) # After N rotations, back at the origin. - assert _get_context_device_uuid() == uuid_origin + assert Device().uuid == uuid_origin def test_swap_identical_gpus(self, self_process): """Swap context between two GPUs of the same chip type. @@ -242,17 +227,15 @@ def test_swap_identical_gpus(self, self_process): # Place context on device i so the swap is observable. devices[i].set_current() - # Build an identity mapping, then swap the pair. - n = len(devices) - uuids_cu = [handle_return(driver.cuDeviceGetUuid(devices[k].device_id)) for k in range(n)] - gpu_mapping = {uuids_cu[k]: uuids_cu[k] for k in range(n)} - gpu_mapping[uuids_cu[i]] = uuids_cu[j] - gpu_mapping[uuids_cu[j]] = uuids_cu[i] + # Build an identity mapping, then swap the pair (using UUID strings). + gpu_mapping = {d.uuid: d.uuid for d in devices} + gpu_mapping[devices[i].uuid] = devices[j].uuid + gpu_mapping[devices[j].uuid] = devices[i].uuid - assert _get_context_device_uuid() == devices[i].uuid + assert Device().uuid == devices[i].uuid self._try_migration(self_process, gpu_mapping) - uuid_after = _get_context_device_uuid() + uuid_after = Device().uuid if uuid_after == devices[i].uuid: pytest.skip( From 245e7a4ffae85c98d8565695bf4c1d8f16742006 Mon Sep 17 00:00:00 2001 From: Leo Fang Date: Sat, 2 May 2026 04:52:42 +0000 Subject: [PATCH 05/10] Apply pre-commit formatting fixes Ruff import sorting, ruff format, and noqa annotation for best-effort teardown in the self_process fixture. Co-Authored-By: Claude Opus 4.6 (1M context) --- cuda_core/cuda/core/checkpoint.py | 5 +---- cuda_core/tests/test_checkpoint.py | 19 +++++++++---------- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/cuda_core/cuda/core/checkpoint.py b/cuda_core/cuda/core/checkpoint.py index 01a4ac77af2..26637417c4a 100644 --- a/cuda_core/cuda/core/checkpoint.py +++ b/cuda_core/cuda/core/checkpoint.py @@ -232,10 +232,7 @@ def _as_cuuuid(driver, value): raw = bytes.fromhex(value.replace("-", "")) if len(raw) != 16: - raise ValueError( - "GPU UUID string must be 32 hex characters " - f"(with optional hyphens), got {value!r}" - ) + raise ValueError(f"GPU UUID string must be 32 hex characters (with optional hyphens), got {value!r}") buf = ctypes.create_string_buffer(raw, 16) return driver.CUuuid(ctypes.addressof(buf)) return value diff --git a/cuda_core/tests/test_checkpoint.py b/cuda_core/tests/test_checkpoint.py index 60906651a40..bea7242fd22 100644 --- a/cuda_core/tests/test_checkpoint.py +++ b/cuda_core/tests/test_checkpoint.py @@ -20,9 +20,9 @@ from cuda.core import Device, checkpoint from cuda.core._utils.cuda_utils import CUDAError - # -- Skip condition ------------------------------------------------------- + def _checkpoint_available(): """Return True if the checkpoint API is usable on this system.""" try: @@ -40,6 +40,7 @@ def _checkpoint_available(): # -- Helpers --------------------------------------------------------------- + def _build_rotation_mapping(devices): """GPU i UUID -> GPU (i+1) % N UUID for every visible device. @@ -63,6 +64,7 @@ def _find_same_chip_pair(devices): # -- Fixtures -------------------------------------------------------------- + @pytest.fixture def self_process(init_cuda): """checkpoint.Process wrapping os.getpid(), with safety unlock on teardown.""" @@ -76,12 +78,13 @@ def self_process(init_cuda): proc.unlock() elif st == "locked": proc.unlock() - except Exception: + except Exception: # noqa: S110 — best-effort teardown, nothing useful to log pass # -- Input validation (no GPU / driver needed) ----------------------------- + class TestInputValidation: @pytest.mark.parametrize( ("args", "error_type", "match"), @@ -102,6 +105,7 @@ def test_public_symbols(self): # -- Lifecycle (single GPU, real driver) ----------------------------------- + @needs_checkpoint class TestCheckpointLifecycle: def test_initial_state_is_running(self, self_process): @@ -146,6 +150,7 @@ def test_full_cycle_no_migration(self, self_process): # -- GPU migration (>= 2 same-chip GPUs, real driver) --------------------- + @needs_checkpoint class TestCheckpointGpuMigration: """GPU UUID remapping tests following the r580-migration-api.c pattern. @@ -204,10 +209,7 @@ def test_rotation_migrates_context(self, self_process): self._try_migration(self_process, gpu_mapping) - assert Device().uuid == expected_uuid, ( - f"Step {step}: expected UUID {expected_uuid}, " - f"got {Device().uuid}" - ) + assert Device().uuid == expected_uuid, f"Step {step}: expected UUID {expected_uuid}, got {Device().uuid}" # After N rotations, back at the origin. assert Device().uuid == uuid_origin @@ -238,8 +240,5 @@ def test_swap_identical_gpus(self, self_process): uuid_after = Device().uuid if uuid_after == devices[i].uuid: - pytest.skip( - "Driver accepted GPU swap but migration is a no-op " - "on this hardware/driver version" - ) + pytest.skip("Driver accepted GPU swap but migration is a no-op on this hardware/driver version") assert uuid_after == devices[j].uuid From 7c7f0e568d2b3326dbd17950edbba647e7b4b907 Mon Sep 17 00:00:00 2001 From: Leo Fang Date: Sun, 3 May 2026 00:08:17 +0000 Subject: [PATCH 06/10] Restore original device in self_process fixture teardown The swap migration test calls set_current() on a different device. Record the initial device from init_cuda and restore it on teardown so tests are side-effect free. Co-Authored-By: Claude Opus 4.6 (1M context) --- cuda_core/tests/test_checkpoint.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/cuda_core/tests/test_checkpoint.py b/cuda_core/tests/test_checkpoint.py index bea7242fd22..a12c0dc2392 100644 --- a/cuda_core/tests/test_checkpoint.py +++ b/cuda_core/tests/test_checkpoint.py @@ -67,7 +67,12 @@ def _find_same_chip_pair(devices): @pytest.fixture def self_process(init_cuda): - """checkpoint.Process wrapping os.getpid(), with safety unlock on teardown.""" + """checkpoint.Process wrapping os.getpid(), with safety unlock on teardown. + + Records the initial device so tests that call ``set_current()`` on a + different device (e.g. migration tests) are side-effect free. + """ + original_device = init_cuda proc = checkpoint.Process(os.getpid()) yield proc # Ensure the process is not left locked if the test fails mid-lifecycle. @@ -80,6 +85,8 @@ def self_process(init_cuda): proc.unlock() except Exception: # noqa: S110 — best-effort teardown, nothing useful to log pass + # Restore the original device so init_cuda's teardown pops the right context. + original_device.set_current() # -- Input validation (no GPU / driver needed) ----------------------------- From 8192df672894b3606cd0fb31a2099d2a6b0d5eb0 Mon Sep 17 00:00:00 2001 From: Keith Kraus Date: Mon, 4 May 2026 11:54:22 -0400 Subject: [PATCH 07/10] Address checkpoint review follow-ups --- cuda_core/cuda/core/checkpoint.py | 47 ++++++++++++++------------ cuda_core/cuda/core/typing.py | 5 +++ cuda_core/docs/source/api.rst | 34 ++++++++++++++----- cuda_core/docs/source/api_private.rst | 1 + cuda_core/tests/test_checkpoint.py | 40 +++++++++++++++------- cuda_core/tests/test_typing_imports.py | 4 +++ 6 files changed, 89 insertions(+), 42 deletions(-) diff --git a/cuda_core/cuda/core/checkpoint.py b/cuda_core/cuda/core/checkpoint.py index 26637417c4a..c811da7c081 100644 --- a/cuda_core/cuda/core/checkpoint.py +++ b/cuda_core/cuda/core/checkpoint.py @@ -2,13 +2,14 @@ # # SPDX-License-Identifier: Apache-2.0 +import ctypes as _ctypes from collections.abc import Mapping as _Mapping from typing import Any as _Any -from typing import Literal as _Literal from cuda.core._utils.cuda_utils import handle_return as _handle_cuda_return from cuda.core._utils.version import binding_version as _binding_version from cuda.core._utils.version import driver_version as _driver_version +from cuda.core.typing import ProcessStateT as _ProcessStateT try: from cuda.bindings import driver as _driver @@ -16,14 +17,12 @@ from cuda import cuda as _driver -ProcessStateT = _Literal["running", "locked", "checkpointed", "failed"] - -_PROCESS_STATE_NAMES: dict[int, ProcessStateT] = { - 0: "running", - 1: "locked", - 2: "checkpointed", - 3: "failed", -} +_PROCESS_STATE_NAME_ATTRS: tuple[tuple[str, _ProcessStateT], ...] = ( + ("CU_PROCESS_STATE_RUNNING", "running"), + ("CU_PROCESS_STATE_LOCKED", "locked"), + ("CU_PROCESS_STATE_CHECKPOINTED", "checkpointed"), + ("CU_PROCESS_STATE_FAILED", "failed"), +) _REQUIRED_BINDING_ATTRS = ( "cuCheckpointProcessCheckpoint", @@ -34,6 +33,7 @@ "cuCheckpointProcessUnlock", "CUcheckpointGpuPair", "CUcheckpointLockArgs", + "CUprocessState", "CUcheckpointRestoreArgs", ) _REQUIRED_DRIVER_VERSION = (12, 8, 0) @@ -56,16 +56,17 @@ def __init__(self, pid: int): self.pid = _check_pid(pid) @property - def state(self) -> ProcessStateT: + def state(self) -> _ProcessStateT: """ CUDA checkpoint state for this process. """ driver = _get_driver() state = _call_driver(driver, driver.cuCheckpointProcessGetState, self.pid) - state_value = int(state) + state_names = _get_process_state_names(driver) try: - return _PROCESS_STATE_NAMES[state_value] + return state_names[state] except KeyError as e: + state_value = int(state) raise RuntimeError(f"Unknown CUDA checkpoint process state: {state_value}") from e @property @@ -105,8 +106,8 @@ def restore(self, gpu_mapping: _Mapping[_Any, _Any] | None = None) -> None: ---------- gpu_mapping : mapping, optional GPU UUID remapping from each checkpointed GPU UUID to the GPU UUID - to restore onto. If provided, the mapping must contain every - checkpointed GPU UUID. + to restore onto. For migration workflows, provide mappings for + every CUDA-visible GPU. """ driver = _get_driver() args = _make_restore_args(driver, gpu_mapping) @@ -154,6 +155,10 @@ def _binding_version_supports_checkpoint(version) -> bool: return (major == 12 and (minor, patch) >= (8, 0)) or (major == 13 and (minor, patch) >= (0, 2)) or major > 13 +def _get_process_state_names(driver) -> dict[_Any, _ProcessStateT]: + return {getattr(driver.CUprocessState, attr): state_name for attr, state_name in _PROCESS_STATE_NAME_ATTRS} + + def _call_driver(driver, func, *args): try: result = func(*args) @@ -207,8 +212,9 @@ def _make_restore_args(driver, gpu_mapping: _Mapping[_Any, _Any] | None): pairs = [] for old_uuid, new_uuid in gpu_mapping.items(): pair = driver.CUcheckpointGpuPair() - pair.oldUuid = _as_cuuuid(driver, old_uuid) - pair.newUuid = _as_cuuuid(driver, new_uuid) + buffers = [] + pair.oldUuid = _as_cuuuid(driver, old_uuid, buffers) + pair.newUuid = _as_cuuuid(driver, new_uuid, buffers) pairs.append(pair) if not pairs: @@ -220,7 +226,7 @@ def _make_restore_args(driver, gpu_mapping: _Mapping[_Any, _Any] | None): return args -def _as_cuuuid(driver, value): +def _as_cuuuid(driver, value, buffers): """Convert *value* to a ``CUuuid``. Accepts a ``CUuuid`` instance (returned as-is) or a UUID string in @@ -228,13 +234,12 @@ def _as_cuuuid(driver, value): :attr:`Device.uuid`. """ if isinstance(value, str): - import ctypes - raw = bytes.fromhex(value.replace("-", "")) if len(raw) != 16: raise ValueError(f"GPU UUID string must be 32 hex characters (with optional hyphens), got {value!r}") - buf = ctypes.create_string_buffer(raw, 16) - return driver.CUuuid(ctypes.addressof(buf)) + buf = _ctypes.create_string_buffer(raw, 16) + buffers.append(buf) + return driver.CUuuid(_ctypes.addressof(buf)) return value diff --git a/cuda_core/cuda/core/typing.py b/cuda_core/cuda/core/typing.py index a66ab1881fb..e95331d463f 100644 --- a/cuda_core/cuda/core/typing.py +++ b/cuda_core/cuda/core/typing.py @@ -4,10 +4,15 @@ """Public type aliases and protocols used in cuda.core API signatures.""" +from typing import Literal as _Literal + from cuda.core._memory._buffer import DevicePointerT from cuda.core._stream import IsStreamT +ProcessStateT = _Literal["running", "locked", "checkpointed", "failed"] + __all__ = [ "DevicePointerT", "IsStreamT", + "ProcessStateT", ] diff --git a/cuda_core/docs/source/api.rst b/cuda_core/docs/source/api.rst index 2762f8ca541..03c120288f0 100644 --- a/cuda_core/docs/source/api.rst +++ b/cuda_core/docs/source/api.rst @@ -182,30 +182,48 @@ checkpoint APIs. These APIs are intended for Linux process checkpoint and restore workflows, and require a CUDA driver with checkpoint API support and a ``cuda-bindings`` version that exposes those driver entry points. -A checkpoint workflow operates on a CUDA process by process ID. The typical -sequence is to lock the process, capture its GPU memory state, restore it -when needed, and then unlock it so CUDA API calls can resume: +Checkpointing is typically driven by a coordinator process acting on a target +CUDA process, similar to attaching a debugger or sending a signal. The target +process is identified by process ID. Linux and the CUDA driver enforce process +permissions; checkpointing another user's process may require elevated +permissions such as ``CAP_SYS_PTRACE`` or administrator privileges. + +The CUDA checkpoint APIs prepare CUDA-managed GPU state for process-level +checkpoint and restore. They do not capture the CPU process image by +themselves; full process checkpoint workflows still need a CPU-side process +checkpointing tool such as CRIU. A minimal coordinator-side sequence looks like +this: .. code-block:: python + import os + from cuda.core import checkpoint - process = checkpoint.Process(pid) + target_pid = os.getpid() # or the PID of another CUDA process + process = checkpoint.Process(target_pid) process.lock(timeout_ms=5000) process.checkpoint() + + # Capture or restore the CPU process image outside cuda.core. + process.restore() process.unlock() ``Process.state`` returns one of ``"running"``, ``"locked"``, ``"checkpointed"``, or ``"failed"``. Restore may optionally remap GPUs by passing ``gpu_mapping`` from each checkpointed GPU UUID to the GPU UUID that -should be used during restore. A successful restore returns the process to -the locked state; call ``Process.unlock`` after restore to allow CUDA API -calls to resume. +should be used during restore. For migration workflows, provide mappings for +every CUDA-visible GPU. The mapping may use ``CUuuid`` objects or the UUID +strings returned by :attr:`Device.uuid`. A successful restore returns the +process to the locked state; call ``Process.unlock`` after restore to allow +CUDA API calls to resume. The CUDA driver requires restore to run from the process restore thread. Use ``Process.restore_thread_id`` to discover that thread before calling -``Process.restore`` from a checkpoint coordinator. +``Process.restore`` from a checkpoint coordinator. Restore also requires +persistence mode to be enabled or ``cuInit`` to have been called before +execution. .. autosummary:: :toctree: generated/ diff --git a/cuda_core/docs/source/api_private.rst b/cuda_core/docs/source/api_private.rst index 141773967e8..3db572d619d 100644 --- a/cuda_core/docs/source/api_private.rst +++ b/cuda_core/docs/source/api_private.rst @@ -17,6 +17,7 @@ CUDA runtime :toctree: generated/ typing.DevicePointerT + typing.ProcessStateT _memory._virtual_memory_resource.VirtualMemoryAllocationTypeT _memory._virtual_memory_resource.VirtualMemoryLocationTypeT _memory._virtual_memory_resource.VirtualMemoryGranularityT diff --git a/cuda_core/tests/test_checkpoint.py b/cuda_core/tests/test_checkpoint.py index a12c0dc2392..843be68b3a0 100644 --- a/cuda_core/tests/test_checkpoint.py +++ b/cuda_core/tests/test_checkpoint.py @@ -14,6 +14,7 @@ import os import sys +from contextlib import suppress import pytest @@ -62,6 +63,15 @@ def _find_same_chip_pair(devices): return None +def _run_or_skip_unsupported(func, *args, **kwargs): + try: + return func(*args, **kwargs) + except RuntimeError as exc: + if "CUDA checkpointing is not supported" in str(exc): + pytest.skip(str(exc)) + raise + + # -- Fixtures -------------------------------------------------------------- @@ -78,13 +88,16 @@ def self_process(init_cuda): # Ensure the process is not left locked if the test fails mid-lifecycle. try: st = proc.state - if st == "checkpointed": + except Exception: + st = None + if st == "checkpointed": + with suppress(Exception): proc.restore() + with suppress(Exception): proc.unlock() - elif st == "locked": + elif st == "locked": + with suppress(Exception): proc.unlock() - except Exception: # noqa: S110 — best-effort teardown, nothing useful to log - pass # Restore the original device so init_cuda's teardown pops the right context. original_device.set_current() @@ -108,6 +121,7 @@ def test_process_rejects_invalid_pid(self, args, error_type, match): def test_public_symbols(self): assert checkpoint.__all__ == ["Process"] + assert not hasattr(checkpoint, "ProcessStateT") # -- Lifecycle (single GPU, real driver) ----------------------------------- @@ -124,31 +138,31 @@ def test_restore_thread_id_is_positive(self, self_process): assert tid > 0 def test_lock_unlock(self, self_process): - self_process.lock() + _run_or_skip_unsupported(self_process.lock) assert self_process.state == "locked" self_process.unlock() assert self_process.state == "running" def test_lock_default_timeout(self, self_process): """lock() with the default timeout_ms=0 (no timeout).""" - self_process.lock() + _run_or_skip_unsupported(self_process.lock) assert self_process.state == "locked" self_process.unlock() def test_lock_with_timeout(self, self_process): - self_process.lock(timeout_ms=5000) + _run_or_skip_unsupported(self_process.lock, timeout_ms=5000) assert self_process.state == "locked" self_process.unlock() def test_full_cycle_no_migration(self, self_process): """lock -> checkpoint -> restore -> unlock, verify state at each step.""" - self_process.lock() + _run_or_skip_unsupported(self_process.lock) assert self_process.state == "locked" - self_process.checkpoint() + _run_or_skip_unsupported(self_process.checkpoint) assert self_process.state == "checkpointed" - self_process.restore() + _run_or_skip_unsupported(self_process.restore) assert self_process.state == "locked" # restore leaves process locked self_process.unlock() @@ -177,10 +191,10 @@ def _try_migration(proc, gpu_mapping): the migration with CUDA_ERROR_INVALID_VALUE (known limitation on some architectures / driver versions). """ - proc.lock() - proc.checkpoint() + _run_or_skip_unsupported(proc.lock) + _run_or_skip_unsupported(proc.checkpoint) try: - proc.restore(gpu_mapping=gpu_mapping) + _run_or_skip_unsupported(proc.restore, gpu_mapping=gpu_mapping) except (CUDAError, RuntimeError) as exc: # Recover: restore without migration, then unlock. proc.restore() diff --git a/cuda_core/tests/test_typing_imports.py b/cuda_core/tests/test_typing_imports.py index c05e3ae3b37..2e207d55d8b 100644 --- a/cuda_core/tests/test_typing_imports.py +++ b/cuda_core/tests/test_typing_imports.py @@ -10,10 +10,12 @@ def test_typing_module_imports(): from cuda.core.typing import ( DevicePointerT, IsStreamT, + ProcessStateT, ) assert DevicePointerT is not None assert IsStreamT is not None + assert set(ProcessStateT.__args__) == {"running", "locked", "checkpointed", "failed"} def test_typing_matches_private_definitions(): @@ -23,7 +25,9 @@ def test_typing_matches_private_definitions(): from cuda.core.typing import ( DevicePointerT, IsStreamT, + ProcessStateT, ) assert DevicePointerT is _DevicePointerT assert IsStreamT is _IsStreamT + assert set(ProcessStateT.__args__) == {"running", "locked", "checkpointed", "failed"} From fbb8037d124ca2d727ea358019e299cf16fcee2b Mon Sep 17 00:00:00 2001 From: Leo Fang Date: Mon, 4 May 2026 17:22:04 +0000 Subject: [PATCH 08/10] Skip checkpoint lifecycle/migration tests in CI cuCheckpointProcessCheckpoint hangs on CI runners (ephemeral VM + container), causing all CUDA 13.x test jobs to time out. Skip the tests that call into the checkpoint driver when the CI environment variable is set. Input validation tests still run everywhere. Co-Authored-By: Claude Opus 4.6 (1M context) --- cuda_core/tests/test_checkpoint.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cuda_core/tests/test_checkpoint.py b/cuda_core/tests/test_checkpoint.py index 843be68b3a0..52eee112ebe 100644 --- a/cuda_core/tests/test_checkpoint.py +++ b/cuda_core/tests/test_checkpoint.py @@ -34,8 +34,8 @@ def _checkpoint_available(): needs_checkpoint = pytest.mark.skipif( - sys.platform != "linux" or not _checkpoint_available(), - reason="CUDA checkpoint API requires Linux and a supported driver/bindings", + sys.platform != "linux" or os.environ.get("CI") is not None or not _checkpoint_available(), + reason="CUDA checkpoint API requires Linux, a supported driver/bindings, and a non-CI environment", ) From 8f798f487bf19068febc803aa25a8d6414d55bde Mon Sep 17 00:00:00 2001 From: Keith Kraus Date: Mon, 4 May 2026 16:36:45 -0400 Subject: [PATCH 09/10] Isolate checkpoint lifecycle tests --- cuda_core/tests/test_checkpoint.py | 326 +++++++++++++++++++++-------- 1 file changed, 233 insertions(+), 93 deletions(-) diff --git a/cuda_core/tests/test_checkpoint.py b/cuda_core/tests/test_checkpoint.py index 52eee112ebe..6d95ef3cb54 100644 --- a/cuda_core/tests/test_checkpoint.py +++ b/cuda_core/tests/test_checkpoint.py @@ -4,8 +4,9 @@ # Real GPU tests for cuda.core.checkpoint — no mocks. # -# Lifecycle tests self-checkpoint the current process (os.getpid()) and -# exercise lock / checkpoint / restore / unlock through the real driver. +# Lifecycle tests exercise lightweight state/lock operations in-process and +# mutating checkpoint / restore cycles through an isolated coordinator/target +# process pair. # # Migration tests attempt GPU UUID remapping following the pattern from # NVIDIA/cuda-checkpoint r580-migration-api.c. They require ≥2 GPUs of @@ -13,13 +14,15 @@ # gracefully when the hardware or driver cannot satisfy this. import os +import signal +import subprocess import sys +import textwrap from contextlib import suppress import pytest -from cuda.core import Device, checkpoint -from cuda.core._utils.cuda_utils import CUDAError +from cuda.core import checkpoint # -- Skip condition ------------------------------------------------------- @@ -42,18 +45,65 @@ def _checkpoint_available(): # -- Helpers --------------------------------------------------------------- -def _build_rotation_mapping(devices): - """GPU i UUID -> GPU (i+1) % N UUID for every visible device. +def _run_or_skip_unsupported(func, *args, **kwargs): + try: + return func(*args, **kwargs) + except RuntimeError as exc: + if "CUDA checkpointing is not supported" in str(exc): + pytest.skip(str(exc)) + raise - Returns a ``{str: str}`` dict of UUID strings suitable for - :meth:`~checkpoint.Process.restore`. - """ + +_SCENARIO_SKIP_EXIT_CODE = 77 + +_SCENARIO_COMMON = r""" +import subprocess +import sys +from contextlib import suppress + +from cuda.core import Device, checkpoint +from cuda.core._utils.cuda_utils import CUDAError + +EXIT_SKIP = 77 + +TARGET_SCRIPT = r''' +import sys + +from cuda.core import Device + +device_index = int(sys.argv[1]) +Device(device_index).set_current() +print(f"READY:{Device().uuid}", flush=True) + +for line in sys.stdin: + command = line.strip() + if command == "uuid": + print(f"UUID:{Device().uuid}", flush=True) + elif command == "exit": + break +''' + + +def skip(reason): + print(f"SKIP: {reason}", flush=True) + raise SystemExit(EXIT_SKIP) + + +def run_or_skip_unsupported(func, *args, **kwargs): + try: + return func(*args, **kwargs) + except RuntimeError as exc: + if "CUDA checkpointing is not supported" in str(exc): + skip(str(exc)) + raise + + +def build_rotation_mapping(devices): n = len(devices) return {devices[i].uuid: devices[(i + 1) % n].uuid for i in range(n)} -def _find_same_chip_pair(devices): - """Return (i, j) indices of two devices with the same name, or None.""" +def find_same_chip_pair(devices): seen = {} for i, dev in enumerate(devices): name = dev.name @@ -63,13 +113,105 @@ def _find_same_chip_pair(devices): return None -def _run_or_skip_unsupported(func, *args, **kwargs): +def read_prefixed(target, prefix): + line = target.stdout.readline() + if not line: + stderr = target.stderr.read() + raise RuntimeError(f"checkpoint target exited before {prefix!r}; stderr:\n{stderr}") + line = line.strip() + if not line.startswith(prefix): + raise RuntimeError(f"expected target output prefix {prefix!r}, got {line!r}") + return line[len(prefix):] + + +def start_target(device_index=0): + target = subprocess.Popen( + [sys.executable, "-c", TARGET_SCRIPT, str(device_index)], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) try: - return func(*args, **kwargs) - except RuntimeError as exc: - if "CUDA checkpointing is not supported" in str(exc): - pytest.skip(str(exc)) + ready_uuid = read_prefixed(target, "READY:") + except Exception: + stop_target(target) raise + return target, ready_uuid + + +def stop_target(target): + if target.poll() is None: + with suppress(Exception): + target.stdin.write("exit\n") + target.stdin.flush() + try: + target.wait(timeout=5) + except subprocess.TimeoutExpired: + target.kill() + target.wait() + + +def target_uuid(target): + target.stdin.write("uuid\n") + target.stdin.flush() + return read_prefixed(target, "UUID:") + + +def checkpoint_restore(proc, gpu_mapping=None): + run_or_skip_unsupported(proc.lock, timeout_ms=5000) + run_or_skip_unsupported(proc.checkpoint) + try: + run_or_skip_unsupported(proc.restore, gpu_mapping=gpu_mapping) + except (CUDAError, RuntimeError) as exc: + with suppress(Exception): + proc.restore() + with suppress(Exception): + proc.unlock() + if "INVALID_VALUE" in str(exc): + skip( + "Driver does not support GPU migration on this hardware " + "(CUDA_ERROR_INVALID_VALUE; see NVBug 5437334)" + ) + raise + proc.unlock() +""" + + +def _run_checkpoint_scenario_or_skip(body: str, *, timeout: int = 90) -> None: + """Run mutating checkpoint/restore scenarios out-of-process. + + The CUDA checkpoint APIs can block inside the driver when a runner exposes + symbols but the platform path cannot complete checkpoint/restore. Running + the scenario in its own process group lets the parent test skip that runner + cleanly instead of hanging the entire CI job. + """ + script = _SCENARIO_COMMON + "\n" + textwrap.dedent(body) + proc = subprocess.Popen( # noqa: S603 - controlled test subprocess using this Python executable. + [sys.executable, "-c", script], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + start_new_session=True, + ) + try: + stdout, stderr = proc.communicate(timeout=timeout) + except subprocess.TimeoutExpired: + with suppress(ProcessLookupError): + os.killpg(proc.pid, signal.SIGKILL) + stdout, stderr = proc.communicate() + pytest.skip( + f"CUDA checkpoint scenario timed out after {timeout}s; driver/hardware did not complete " + f"checkpoint/restore.\nstdout:\n{stdout}\nstderr:\n{stderr}" + ) + + if proc.returncode == _SCENARIO_SKIP_EXIT_CODE: + reason = stdout.strip() or stderr.strip() or "CUDA checkpoint scenario skipped" + pytest.skip(reason) + if proc.returncode != 0: + pytest.fail( + f"CUDA checkpoint scenario failed with exit code {proc.returncode}.\nstdout:\n{stdout}\nstderr:\n{stderr}" + ) # -- Fixtures -------------------------------------------------------------- @@ -154,19 +296,28 @@ def test_lock_with_timeout(self, self_process): assert self_process.state == "locked" self_process.unlock() - def test_full_cycle_no_migration(self, self_process): + def test_full_cycle_no_migration(self): """lock -> checkpoint -> restore -> unlock, verify state at each step.""" - _run_or_skip_unsupported(self_process.lock) - assert self_process.state == "locked" + _run_checkpoint_scenario_or_skip( + """ + target, _ = start_target() + proc = checkpoint.Process(target.pid) + try: + run_or_skip_unsupported(proc.lock, timeout_ms=5000) + assert proc.state == "locked" - _run_or_skip_unsupported(self_process.checkpoint) - assert self_process.state == "checkpointed" + run_or_skip_unsupported(proc.checkpoint) + assert proc.state == "checkpointed" - _run_or_skip_unsupported(self_process.restore) - assert self_process.state == "locked" # restore leaves process locked + run_or_skip_unsupported(proc.restore) + assert proc.state == "locked" # restore leaves process locked - self_process.unlock() - assert self_process.state == "running" + proc.unlock() + assert proc.state == "running" + finally: + stop_target(target) + """ + ) # -- GPU migration (>= 2 same-chip GPUs, real driver) --------------------- @@ -183,32 +334,7 @@ class TestCheckpointGpuMigration: NVBug 5437334). """ - @staticmethod - def _try_migration(proc, gpu_mapping): - """Attempt a single checkpoint-restore with migration. - - Returns True on success. Skips the test if the driver rejects - the migration with CUDA_ERROR_INVALID_VALUE (known limitation - on some architectures / driver versions). - """ - _run_or_skip_unsupported(proc.lock) - _run_or_skip_unsupported(proc.checkpoint) - try: - _run_or_skip_unsupported(proc.restore, gpu_mapping=gpu_mapping) - except (CUDAError, RuntimeError) as exc: - # Recover: restore without migration, then unlock. - proc.restore() - proc.unlock() - if "INVALID_VALUE" in str(exc): - pytest.skip( - "Driver does not support GPU migration on this hardware " - "(CUDA_ERROR_INVALID_VALUE — see NVBug 5437334)" - ) - raise - proc.unlock() - return True - - def test_rotation_migrates_context(self, self_process): + def test_rotation_migrates_context(self): """Rotate context through all GPUs and back to the origin. Builds a rotation mapping (device i -> device (i+1) % N) for @@ -216,50 +342,64 @@ def test_rotation_migrates_context(self, self_process): the context device UUID is checked. After N steps the context should be back on the original device. """ - devices = Device.get_all_devices() - if len(devices) < 2: - pytest.skip("GPU migration tests require at least 2 GPUs") - if _find_same_chip_pair(devices) is None: - pytest.skip("GPU migration requires at least 2 GPUs of the same chip type") - - gpu_mapping = _build_rotation_mapping(devices) - uuid_origin = Device().uuid - - for step in range(len(devices)): - expected_uuid = devices[(step + 1) % len(devices)].uuid - - self._try_migration(self_process, gpu_mapping) - - assert Device().uuid == expected_uuid, f"Step {step}: expected UUID {expected_uuid}, got {Device().uuid}" - - # After N rotations, back at the origin. - assert Device().uuid == uuid_origin - - def test_swap_identical_gpus(self, self_process): + _run_checkpoint_scenario_or_skip( + """ + devices = Device.get_all_devices() + if len(devices) < 2: + skip("GPU migration tests require at least 2 GPUs") + if find_same_chip_pair(devices) is None: + skip("GPU migration requires at least 2 GPUs of the same chip type") + + gpu_mapping = build_rotation_mapping(devices) + target, uuid_origin = start_target(0) + proc = checkpoint.Process(target.pid) + try: + for step in range(len(devices)): + expected_uuid = devices[(step + 1) % len(devices)].uuid + checkpoint_restore(proc, gpu_mapping=gpu_mapping) + observed_uuid = target_uuid(target) + assert observed_uuid == expected_uuid, ( + f"Step {step}: expected UUID {expected_uuid}, got {observed_uuid}" + ) + + assert target_uuid(target) == uuid_origin + finally: + stop_target(target) + """, + timeout=180, + ) + + def test_swap_identical_gpus(self): """Swap context between two GPUs of the same chip type. Sets the context on one of the pair members so that a successful migration is observable (the context UUID changes). """ - devices = Device.get_all_devices() - pair = _find_same_chip_pair(devices) - if pair is None: - pytest.skip("No two GPUs of the same chip type found") - - i, j = pair - # Place context on device i so the swap is observable. - devices[i].set_current() - - # Build an identity mapping, then swap the pair (using UUID strings). - gpu_mapping = {d.uuid: d.uuid for d in devices} - gpu_mapping[devices[i].uuid] = devices[j].uuid - gpu_mapping[devices[j].uuid] = devices[i].uuid - - assert Device().uuid == devices[i].uuid - - self._try_migration(self_process, gpu_mapping) - uuid_after = Device().uuid - - if uuid_after == devices[i].uuid: - pytest.skip("Driver accepted GPU swap but migration is a no-op on this hardware/driver version") - assert uuid_after == devices[j].uuid + _run_checkpoint_scenario_or_skip( + """ + devices = Device.get_all_devices() + pair = find_same_chip_pair(devices) + if pair is None: + skip("No two GPUs of the same chip type found") + + i, j = pair + gpu_mapping = {d.uuid: d.uuid for d in devices} + gpu_mapping[devices[i].uuid] = devices[j].uuid + gpu_mapping[devices[j].uuid] = devices[i].uuid + + target, uuid_before = start_target(i) + proc = checkpoint.Process(target.pid) + try: + assert uuid_before == devices[i].uuid + + checkpoint_restore(proc, gpu_mapping=gpu_mapping) + uuid_after = target_uuid(target) + + if uuid_after == devices[i].uuid: + skip("Driver accepted GPU swap but migration is a no-op on this hardware/driver version") + assert uuid_after == devices[j].uuid + finally: + stop_target(target) + """, + timeout=120, + ) From 376acc7f18241115ebacc748d58ed1dde7c0e73a Mon Sep 17 00:00:00 2001 From: Keith Kraus Date: Mon, 4 May 2026 19:11:16 -0400 Subject: [PATCH 10/10] Address checkpoint review follow-ups --- cuda_core/cuda/core/checkpoint.py | 30 +++--- cuda_core/docs/source/api.rst | 7 +- cuda_core/tests/test_checkpoint.py | 153 +++++++++++++++++------------ 3 files changed, 111 insertions(+), 79 deletions(-) diff --git a/cuda_core/cuda/core/checkpoint.py b/cuda_core/cuda/core/checkpoint.py index c811da7c081..b5831f030ed 100644 --- a/cuda_core/cuda/core/checkpoint.py +++ b/cuda_core/cuda/core/checkpoint.py @@ -50,10 +50,17 @@ class Process: Process ID of the CUDA process. """ - __slots__ = ("pid",) + __slots__ = ("_pid",) def __init__(self, pid: int): - self.pid = _check_pid(pid) + self._pid = _check_pid(pid) + + @property + def pid(self) -> int: + """ + Process ID of the CUDA process. + """ + return self._pid @property def state(self) -> _ProcessStateT: @@ -61,7 +68,7 @@ def state(self) -> _ProcessStateT: CUDA checkpoint state for this process. """ driver = _get_driver() - state = _call_driver(driver, driver.cuCheckpointProcessGetState, self.pid) + state = _call_driver(driver, driver.cuCheckpointProcessGetState, self._pid) state_names = _get_process_state_names(driver) try: return state_names[state] @@ -75,7 +82,7 @@ def restore_thread_id(self) -> int: CUDA restore thread ID for this process. """ driver = _get_driver() - return _call_driver(driver, driver.cuCheckpointProcessGetRestoreThreadId, self.pid) + return _call_driver(driver, driver.cuCheckpointProcessGetRestoreThreadId, self._pid) def lock(self, timeout_ms: int = 0) -> None: """ @@ -89,14 +96,14 @@ def lock(self, timeout_ms: int = 0) -> None: driver = _get_driver() args = driver.CUcheckpointLockArgs() args.timeoutMs = _check_timeout_ms(timeout_ms) - _call_driver(driver, driver.cuCheckpointProcessLock, self.pid, args) + _call_driver(driver, driver.cuCheckpointProcessLock, self._pid, args) def checkpoint(self) -> None: """ Checkpoint the GPU memory contents of this locked process. """ driver = _get_driver() - _call_driver(driver, driver.cuCheckpointProcessCheckpoint, self.pid, None) + _call_driver(driver, driver.cuCheckpointProcessCheckpoint, self._pid, None) def restore(self, gpu_mapping: _Mapping[_Any, _Any] | None = None) -> None: """ @@ -107,18 +114,20 @@ def restore(self, gpu_mapping: _Mapping[_Any, _Any] | None = None) -> None: gpu_mapping : mapping, optional GPU UUID remapping from each checkpointed GPU UUID to the GPU UUID to restore onto. For migration workflows, provide mappings for - every CUDA-visible GPU. + every GPU visible to the kernel-mode driver. User-space masking + such as ``CUDA_VISIBLE_DEVICES`` does not reduce this mapping + requirement. """ driver = _get_driver() args = _make_restore_args(driver, gpu_mapping) - _call_driver(driver, driver.cuCheckpointProcessRestore, self.pid, args) + _call_driver(driver, driver.cuCheckpointProcessRestore, self._pid, args) def unlock(self) -> None: """ Unlock this locked process so it can resume CUDA API calls. """ driver = _get_driver() - _call_driver(driver, driver.cuCheckpointProcessUnlock, self.pid, None) + _call_driver(driver, driver.cuCheckpointProcessUnlock, self._pid, None) def _get_driver(): @@ -169,10 +178,7 @@ def _call_driver(driver, func, *args): "Upgrade to a driver version with CUDA checkpoint API support." ) from e raise - return _handle_return(driver, result) - -def _handle_return(driver, result): err = result[0] not_supported_errors = ( getattr(driver.CUresult, "CUDA_ERROR_NOT_FOUND", None), diff --git a/cuda_core/docs/source/api.rst b/cuda_core/docs/source/api.rst index 03c120288f0..667417bce28 100644 --- a/cuda_core/docs/source/api.rst +++ b/cuda_core/docs/source/api.rst @@ -214,8 +214,11 @@ this: ``"checkpointed"``, or ``"failed"``. Restore may optionally remap GPUs by passing ``gpu_mapping`` from each checkpointed GPU UUID to the GPU UUID that should be used during restore. For migration workflows, provide mappings for -every CUDA-visible GPU. The mapping may use ``CUuuid`` objects or the UUID -strings returned by :attr:`Device.uuid`. A successful restore returns the +every GPU visible to the NVIDIA kernel-mode driver at checkpoint time. +User-space masking such as ``CUDA_VISIBLE_DEVICES`` does not reduce this +mapping requirement, so applications that rely on user-space GPU masking may +not be valid migration targets. The mapping may use ``CUuuid`` objects or the +UUID strings returned by :attr:`Device.uuid`. A successful restore returns the process to the locked state; call ``Process.unlock`` after restore to allow CUDA API calls to resume. diff --git a/cuda_core/tests/test_checkpoint.py b/cuda_core/tests/test_checkpoint.py index 6d95ef3cb54..d3adb87deb6 100644 --- a/cuda_core/tests/test_checkpoint.py +++ b/cuda_core/tests/test_checkpoint.py @@ -4,14 +4,14 @@ # Real GPU tests for cuda.core.checkpoint — no mocks. # -# Lifecycle tests exercise lightweight state/lock operations in-process and -# mutating checkpoint / restore cycles through an isolated coordinator/target -# process pair. +# Driver-backed lifecycle tests run through an isolated coordinator/target +# process pair so hangs can be timed out without wedging the pytest process. # # Migration tests attempt GPU UUID remapping following the pattern from # NVIDIA/cuda-checkpoint r580-migration-api.c. They require ≥2 GPUs of -# the same chip type and a driver that supports migration; the tests skip -# gracefully when the hardware or driver cannot satisfy this. +# the same chip type, an unmasked CUDA device view, and a driver that supports +# migration; the tests skip gracefully when the hardware or driver cannot +# satisfy this. import os import signal @@ -37,26 +37,18 @@ def _checkpoint_available(): needs_checkpoint = pytest.mark.skipif( - sys.platform != "linux" or os.environ.get("CI") is not None or not _checkpoint_available(), - reason="CUDA checkpoint API requires Linux, a supported driver/bindings, and a non-CI environment", + sys.platform != "linux" or not _checkpoint_available(), + reason="CUDA checkpoint API requires Linux and a supported driver/bindings", ) # -- Helpers --------------------------------------------------------------- -def _run_or_skip_unsupported(func, *args, **kwargs): - try: - return func(*args, **kwargs) - except RuntimeError as exc: - if "CUDA checkpointing is not supported" in str(exc): - pytest.skip(str(exc)) - raise - - _SCENARIO_SKIP_EXIT_CODE = 77 _SCENARIO_COMMON = r""" +import os import subprocess import sys from contextlib import suppress @@ -214,36 +206,6 @@ def _run_checkpoint_scenario_or_skip(body: str, *, timeout: int = 90) -> None: ) -# -- Fixtures -------------------------------------------------------------- - - -@pytest.fixture -def self_process(init_cuda): - """checkpoint.Process wrapping os.getpid(), with safety unlock on teardown. - - Records the initial device so tests that call ``set_current()`` on a - different device (e.g. migration tests) are side-effect free. - """ - original_device = init_cuda - proc = checkpoint.Process(os.getpid()) - yield proc - # Ensure the process is not left locked if the test fails mid-lifecycle. - try: - st = proc.state - except Exception: - st = None - if st == "checkpointed": - with suppress(Exception): - proc.restore() - with suppress(Exception): - proc.unlock() - elif st == "locked": - with suppress(Exception): - proc.unlock() - # Restore the original device so init_cuda's teardown pops the right context. - original_device.set_current() - - # -- Input validation (no GPU / driver needed) ----------------------------- @@ -265,36 +227,87 @@ def test_public_symbols(self): assert checkpoint.__all__ == ["Process"] assert not hasattr(checkpoint, "ProcessStateT") + def test_pid_is_read_only(self): + proc = checkpoint.Process(1) + assert proc.pid == 1 + with pytest.raises(AttributeError): + proc.pid = 2 + # -- Lifecycle (single GPU, real driver) ----------------------------------- @needs_checkpoint class TestCheckpointLifecycle: - def test_initial_state_is_running(self, self_process): - assert self_process.state == "running" + def test_initial_state_is_running(self): + _run_checkpoint_scenario_or_skip( + """ + target, _ = start_target() + proc = checkpoint.Process(target.pid) + try: + assert proc.state == "running" + finally: + stop_target(target) + """ + ) - def test_restore_thread_id_is_positive(self, self_process): - tid = self_process.restore_thread_id - assert isinstance(tid, int) - assert tid > 0 + def test_restore_thread_id_is_positive(self): + _run_checkpoint_scenario_or_skip( + """ + target, _ = start_target() + proc = checkpoint.Process(target.pid) + try: + tid = proc.restore_thread_id + assert isinstance(tid, int) + assert tid > 0 + finally: + stop_target(target) + """ + ) - def test_lock_unlock(self, self_process): - _run_or_skip_unsupported(self_process.lock) - assert self_process.state == "locked" - self_process.unlock() - assert self_process.state == "running" + def test_lock_unlock(self): + _run_checkpoint_scenario_or_skip( + """ + target, _ = start_target() + proc = checkpoint.Process(target.pid) + try: + run_or_skip_unsupported(proc.lock) + assert proc.state == "locked" + proc.unlock() + assert proc.state == "running" + finally: + stop_target(target) + """ + ) - def test_lock_default_timeout(self, self_process): + def test_lock_default_timeout(self): """lock() with the default timeout_ms=0 (no timeout).""" - _run_or_skip_unsupported(self_process.lock) - assert self_process.state == "locked" - self_process.unlock() + _run_checkpoint_scenario_or_skip( + """ + target, _ = start_target() + proc = checkpoint.Process(target.pid) + try: + run_or_skip_unsupported(proc.lock) + assert proc.state == "locked" + proc.unlock() + finally: + stop_target(target) + """ + ) - def test_lock_with_timeout(self, self_process): - _run_or_skip_unsupported(self_process.lock, timeout_ms=5000) - assert self_process.state == "locked" - self_process.unlock() + def test_lock_with_timeout(self): + _run_checkpoint_scenario_or_skip( + """ + target, _ = start_target() + proc = checkpoint.Process(target.pid) + try: + run_or_skip_unsupported(proc.lock, timeout_ms=5000) + assert proc.state == "locked" + proc.unlock() + finally: + stop_target(target) + """ + ) def test_full_cycle_no_migration(self): """lock -> checkpoint -> restore -> unlock, verify state at each step.""" @@ -345,6 +358,11 @@ def test_rotation_migrates_context(self): _run_checkpoint_scenario_or_skip( """ devices = Device.get_all_devices() + if "CUDA_VISIBLE_DEVICES" in os.environ: + skip( + "GPU migration tests require an unmasked CUDA device view because " + "the checkpoint mapping must cover every GPU visible to the kernel-mode driver" + ) if len(devices) < 2: skip("GPU migration tests require at least 2 GPUs") if find_same_chip_pair(devices) is None: @@ -378,6 +396,11 @@ def test_swap_identical_gpus(self): _run_checkpoint_scenario_or_skip( """ devices = Device.get_all_devices() + if "CUDA_VISIBLE_DEVICES" in os.environ: + skip( + "GPU migration tests require an unmasked CUDA device view because " + "the checkpoint mapping must cover every GPU visible to the kernel-mode driver" + ) pair = find_same_chip_pair(devices) if pair is None: skip("No two GPUs of the same chip type found")