Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cuda_core/cuda/core/_context.pxd
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# SPDX-License-Identifier: Apache-2.0

Expand Down
13 changes: 6 additions & 7 deletions cuda_core/cuda/core/_device.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1394,14 +1394,12 @@ class Device:
cdef Context ctx = self._context
return cyEvent._init(cyEvent, self._device_id, ctx._h_context, options, True)

def allocate(self, size, stream: Stream | GraphBuilder | None = None) -> Buffer:
def allocate(self, size, *, stream: Stream | GraphBuilder) -> Buffer:
"""Allocate device memory from a specified stream.

Allocates device memory of `size` bytes on the specified `stream`
using the memory resource currently associated with this Device.

Parameter `stream` is optional, using a default stream by default.

Note
----
Device must be initialized.
Expand All @@ -1410,9 +1408,10 @@ class Device:
----------
size : int
Number of bytes to allocate.
stream : :obj:`~_stream.Stream`, optional
The stream establishing the stream ordering semantic.
Default value of `None` uses default stream.
stream : :obj:`~_stream.Stream` | :obj:`~graph.GraphBuilder`
Keyword-only. The stream establishing the stream ordering semantic.
Must be passed explicitly; pass ``self.default_stream`` to use
the default stream.

Returns
-------
Expand All @@ -1421,7 +1420,7 @@ class Device:

"""
self._check_context_initialized()
return self.memory_resource.allocate(size, stream)
return self.memory_resource.allocate(size, stream=stream)

def sync(self):
"""Synchronize the device.
Expand Down
13 changes: 7 additions & 6 deletions cuda_core/cuda/core/_graphics.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ from cuda.core._resource_handles cimport (
as_intptr,
)
from cuda.core._memory._buffer cimport Buffer, Buffer_from_deviceptr_handle
from cuda.core._stream cimport Stream, Stream_accept, default_stream
from cuda.core._stream cimport Stream, Stream_accept
from cuda.core._utils.cuda_utils cimport HANDLE_RETURN

__all__ = ['GraphicsResource']
Expand Down Expand Up @@ -206,7 +206,7 @@ cdef class GraphicsResource:
return None
return self._mapped_buffer

def map(self, *, stream: Stream | None = None) -> Buffer:
def map(self, *, stream: Stream) -> Buffer:
"""Map this graphics resource for CUDA access.

After mapping, a CUDA device pointer into the underlying graphics
Expand All @@ -220,9 +220,10 @@ cdef class GraphicsResource:

Parameters
----------
stream : :class:`~cuda.core.Stream`, optional
The CUDA stream on which to perform the mapping. If ``None``,
the current default stream is used.
stream : :class:`~cuda.core.Stream`
Keyword-only. The CUDA stream on which to perform the mapping.
Must be passed explicitly; pass ``device.default_stream`` to use
the default stream.

Returns
-------
Expand All @@ -248,7 +249,7 @@ cdef class GraphicsResource:
if self._get_mapped_buffer() is not None:
raise RuntimeError("GraphicsResource is already mapped")

s_obj = default_stream() if stream is None else Stream_accept(stream)
s_obj = Stream_accept(stream)
raw = as_cu(self._handle)
cy_stream = as_cu(s_obj._h_stream)
with nogil:
Expand Down
2 changes: 1 addition & 1 deletion cuda_core/cuda/core/_layout.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ cdef class _StridedLayout:
required_size = layout.required_size_in_bytes()
# allocate the memory on the device
device.set_current()
mem = device.allocate(required_size)
mem = device.allocate(required_size, stream=device.default_stream)
# create a view on the newly allocated device memory
b_view = StridedMemoryView.from_buffer(mem, layout, a_view.dtype)
return b_view
Expand Down
67 changes: 47 additions & 20 deletions cuda_core/cuda/core/_memory/_buffer.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ from cuda.core._resource_handles cimport (
)
from cuda.core.typing import DevicePointerType

from cuda.core._stream cimport Stream, Stream_accept
from cuda.core._stream cimport Stream, Stream_accept, default_stream
from cuda.core._utils.cuda_utils cimport HANDLE_RETURN, _parse_fill_value

import sys
Expand All @@ -49,12 +49,24 @@ cdef void _mr_dealloc_callback(
size_t size,
const StreamHandle& h_stream,
) noexcept:
"""Called by the C++ deleter to deallocate via MemoryResource.deallocate."""
"""Called by the C++ deleter to deallocate via MemoryResource.deallocate.

This is the C++ teardown path: there is no Python caller frame from
which to obtain a stream. If the device-pointer handle was created
without ``set_deallocation_stream`` being called (e.g. buffers minted
via ``Buffer.from_handle(ptr, size, mr=mr)`` from DLPack import,
third-party adapters, or other foreign sources), ``h_stream`` is
empty here. Stream-ordered MR ``deallocate`` overrides reject
``stream=None`` (issue #2001), so without a fallback the destructor
would print a warning and leak the allocation. Fall back to the
legacy/per-thread default stream so the free still happens; this is
the unique exception to the "no implicit default-stream fallback"
policy because the teardown has no other source of truth.
"""
cdef Stream stream
try:
stream = None
if h_stream:
stream = Stream._from_handle(Stream, h_stream)
mr.deallocate(int(ptr), size, stream)
stream = Stream._from_handle(Stream, h_stream) if h_stream else default_stream()
mr.deallocate(int(ptr), size, stream=stream)
Comment thread
leofang marked this conversation as resolved.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My apology. It seems my bot was hallucinating? I thought we guaranteed already that we always have a deallocation stream. There should be no way that we would need the default stream here. @Andy-Jost could you confirm?

Below is bot-generated:


For pool allocations via _MemPool.allocate(): the invariant holds — h_stream is always non-zero. In deviceptr_alloc_from_pool() (resource_handles.cpp:719), the stream is stored in the DevicePtrBox at creation and captured by the shared_ptr deleter. Same for Buffer.from_ipc_descriptor(). So for the normal async MR allocation path, the default_stream() fallback is unreachable.

The one path that breaks the invariant: Buffer.from_handle(ptr, size, mr=some_mr). This calls deviceptr_create_with_mr() (resource_handles.cpp:847) which creates the box with StreamHandle{} (empty). No stream parameter is accepted by from_handle(). So if someone constructs a buffer via from_handle(mr=pool_mr), the callback sees h_stream=0.

So the question is: is Buffer.from_handle(mr=pool_mr) with an async MR a legitimate use case? If not, the default_stream() fallback papers over something that should be an error. If yes, then from_handle() arguably should accept an optional stream when mr is provided.

For the test Andy added (test_mr_dealloc_callback_falls_back_to_default_stream), it constructs a buffer via Buffer.from_handle(1, 1024, mr=mr) — which is exactly the from_handle path, not the pool allocation path. This confirms the fallback only matters for from_handle, not for normal pool allocations.

Should this be flagged in the review? The options seem to be:

  • a) Keep the fallback but document it's specifically for the from_handle(mr=...) edge case, not for pool allocations
  • b) Make from_handle() accept an optional stream param when mr is provided, and treat missing-stream-on-async-MR as an error in the callback instead

except Exception as exc:
print(f"Warning: mr.deallocate() failed during Buffer destruction: {exc}",
file=sys.stderr)
Expand Down Expand Up @@ -119,7 +131,11 @@ cdef class Buffer:

@staticmethod
def _reduce_helper(mr, ipc_descriptor):
return Buffer.from_ipc_descriptor(mr, ipc_descriptor)
# The parent process's stream is not portable across processes, so the
# pickle path cannot thread an explicit stream through. Seed the
# imported buffer's deallocation with the current context's default
# stream; the receiver can override via buffer.close(stream).
return Buffer.from_ipc_descriptor(mr, ipc_descriptor, stream=default_stream())
Comment thread
leofang marked this conversation as resolved.

def __reduce__(self):
# Must not serialize the parent's stream!
Expand Down Expand Up @@ -158,9 +174,20 @@ cdef class Buffer:
@classmethod
def from_ipc_descriptor(
cls, mr: DeviceMemoryResource | PinnedMemoryResource, ipc_descriptor: IPCBufferDescriptor,
stream: Stream = None
*, stream: Stream
) -> Buffer:
"""Import a buffer that was exported from another process."""
"""Import a buffer that was exported from another process.

Parameters
----------
mr : :obj:`~_memory.DeviceMemoryResource` | :obj:`~_memory.PinnedMemoryResource`
The IPC-enabled memory resource matching the exporting process.
ipc_descriptor : :obj:`~_memory.IPCBufferDescriptor`
The descriptor exported from another process.
stream : :obj:`~_stream.Stream`
Keyword-only. The stream used for asynchronous deallocation when
the buffer is closed or garbage collected.
"""
return _ipc.Buffer_from_ipc_descriptor(cls, mr, ipc_descriptor, stream)

@property
Expand Down Expand Up @@ -215,7 +242,7 @@ cdef class Buffer:
if self._memory_resource is None:
raise ValueError("a destination buffer must be provided (this "
"buffer does not have a memory_resource)")
dst = self._memory_resource.allocate(src_size, s)
dst = self._memory_resource.allocate(src_size, stream=s)

cdef size_t dst_size = dst._size
if dst_size != src_size:
Expand Down Expand Up @@ -490,17 +517,17 @@ cdef class MemoryResource:
resource's respective property.)
"""

def allocate(self, size_t size, stream: Stream | GraphBuilder | None = None) -> Buffer:
def allocate(self, size_t size, *, stream: Stream | GraphBuilder) -> Buffer:
"""Allocate a buffer of the requested size.

Parameters
----------
size : int
The size of the buffer to allocate, in bytes.
stream : :obj:`~_stream.Stream` | :obj:`~graph.GraphBuilder`, optional
The stream on which to perform the allocation asynchronously.
If None, it is up to each memory resource implementation to decide
and document the behavior.
stream : :obj:`~_stream.Stream` | :obj:`~graph.GraphBuilder`
Keyword-only. The stream on which to perform the allocation
asynchronously. Must be passed explicitly; pass
``device.default_stream`` to use the default stream.

Returns
-------
Expand All @@ -510,7 +537,7 @@ cdef class MemoryResource:
"""
raise TypeError("MemoryResource.allocate must be implemented by subclasses.")

def deallocate(self, ptr: DevicePointerType, size_t size, stream: Stream | GraphBuilder | None = None):
def deallocate(self, ptr: DevicePointerType, size_t size, *, stream: Stream | GraphBuilder):
"""Deallocate a buffer previously allocated by this resource.

Parameters
Expand All @@ -519,10 +546,10 @@ cdef class MemoryResource:
The pointer or handle to the buffer to deallocate.
size : int
The size of the buffer to deallocate, in bytes.
stream : :obj:`~_stream.Stream` | :obj:`~graph.GraphBuilder`, optional
The stream on which to perform the deallocation asynchronously.
If None, it is up to each memory resource implementation to decide
and document the behavior.
stream : :obj:`~_stream.Stream` | :obj:`~graph.GraphBuilder`
Keyword-only. The stream on which to perform the deallocation
asynchronously. Must be passed explicitly; pass
``device.default_stream`` to use the default stream.
"""
raise TypeError("MemoryResource.deallocate must be implemented by subclasses.")

Expand Down
14 changes: 7 additions & 7 deletions cuda_core/cuda/core/_memory/_graph_memory_resource.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ from cuda.core._resource_handles cimport (
as_cu,
)

from cuda.core._stream cimport default_stream, Stream_accept, Stream
from cuda.core._stream cimport Stream_accept, Stream
from cuda.core._utils.cuda_utils cimport HANDLE_RETURN

from functools import cache
Expand Down Expand Up @@ -104,19 +104,19 @@ cdef class cyGraphMemoryResource(MemoryResource):
def __cinit__(self, int device_id):
self._device_id = device_id

def allocate(self, size_t size, stream: Stream | GraphBuilder | None = None) -> Buffer:
def allocate(self, size_t size, *, stream: Stream | GraphBuilder) -> Buffer:
"""
Allocate a buffer of the requested size. See documentation for :obj:`~_memory.MemoryResource`.
"""
stream = Stream_accept(stream) if stream is not None else default_stream()
return GMR_allocate(self, size, <Stream> stream)
cdef Stream s = Stream_accept(stream)
return GMR_allocate(self, size, s)

def deallocate(self, ptr: "DevicePointerType", size_t size, stream: Stream | GraphBuilder | None = None):
def deallocate(self, ptr: "DevicePointerType", size_t size, *, stream: Stream | GraphBuilder):
"""
Deallocate a buffer of the requested size. See documentation for :obj:`~_memory.MemoryResource`.
"""
stream = Stream_accept(stream) if stream is not None else default_stream()
return GMR_deallocate(ptr, size, <Stream> stream)
cdef Stream s = Stream_accept(stream)
return GMR_deallocate(ptr, size, s)

def close(self):
"""No operation (provided for compatibility)."""
Expand Down
8 changes: 2 additions & 6 deletions cuda_core/cuda/core/_memory/_ipc.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ cimport cpython
from cuda.bindings cimport cydriver
from cuda.core._memory._buffer cimport Buffer, Buffer_from_deviceptr_handle
from cuda.core._memory._memory_pool cimport _MemPool
from cuda.core._stream cimport Stream
from cuda.core._stream cimport Stream, Stream_accept
from cuda.core._resource_handles cimport (
DevicePtrHandle,
create_fd_handle,
Expand All @@ -19,7 +19,6 @@ from cuda.core._resource_handles cimport (
as_py,
)

from cuda.core._stream cimport default_stream
from cuda.core._utils.cuda_utils cimport HANDLE_RETURN
from cuda.core._utils.cuda_utils import check_multiprocessing_start_method

Expand Down Expand Up @@ -171,10 +170,7 @@ cdef Buffer Buffer_from_ipc_descriptor(
"""Import a buffer that was exported from another process."""
if not mr.is_ipc_enabled:
raise RuntimeError("Memory resource is not IPC-enabled")
if stream is None:
# Note: match this behavior to _MemPool.allocate()
stream = default_stream()
cdef Stream s = <Stream>stream
cdef Stream s = Stream_accept(stream)
cdef DevicePtrHandle h_ptr = deviceptr_import_ipc(
mr._h_pool,
ipc_descriptor.payload_ptr(),
Expand Down
43 changes: 28 additions & 15 deletions cuda_core/cuda/core/_memory/_legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

if TYPE_CHECKING:
from cuda.core._memory._buffer import DevicePointerType
from cuda.core._stream import Stream

from cuda.core._memory._buffer import Buffer, MemoryResource
from cuda.core._utils.cuda_utils import (
Expand All @@ -27,33 +28,38 @@ class LegacyPinnedMemoryResource(MemoryResource):

# TODO: support creating this MR with flags that are later passed to cuMemHostAlloc?

def allocate(self, size, stream=None) -> Buffer:
def allocate(self, size, *, stream: Stream | None = None) -> Buffer:
"""Allocate a buffer of the requested size.

``cuMemAllocHost`` is synchronous, so this resource ignores any
supplied stream. The argument is accepted (and validated when
non-``None``) for interface conformance with stream-ordered
memory resources.

Parameters
----------
size : int
The size of the buffer to allocate, in bytes.
stream : Stream, optional
Currently ignored
Keyword-only. Validated when provided but otherwise unused.

Returns
-------
Buffer
The allocated buffer object, which is accessible on both host and device.
"""
if stream is None:
from cuda.core._stream import default_stream
from cuda.core._stream import Stream_accept

stream = default_stream()
if stream is not None:
Stream_accept(stream)
if size:
err, ptr = driver.cuMemAllocHost(size)
raise_if_driver_error(err)
else:
ptr = 0
return Buffer._init(ptr, size, self)

def deallocate(self, ptr: DevicePointerType, size, stream):
def deallocate(self, ptr: DevicePointerType, size, *, stream: Stream | None = None):
"""Deallocate a buffer previously allocated by this resource.

Parameters
Expand All @@ -62,11 +68,14 @@ def deallocate(self, ptr: DevicePointerType, size, stream):
The pointer or handle to the buffer to deallocate.
size : int
The size of the buffer to deallocate, in bytes.
stream : Stream
The stream on which to perform the deallocation synchronously.
stream : Stream, optional
Keyword-only. If provided, ``stream.sync()`` is called before the
host allocation is freed. ``None`` skips the sync.
"""
from cuda.core._stream import Stream_accept

if stream is not None:
stream.sync()
Stream_accept(stream).sync()

if size:
(err,) = driver.cuMemFreeHost(ptr)
Expand Down Expand Up @@ -96,21 +105,25 @@ def __init__(self, device_id):

self._device_id = Device(device_id).device_id

def allocate(self, size, stream=None) -> Buffer:
if stream is None:
from cuda.core._stream import default_stream
def allocate(self, size, *, stream: Stream | None = None) -> Buffer:
# cuMemAlloc is synchronous; stream is accepted (and validated)
# for interface conformance but not used.
from cuda.core._stream import Stream_accept

stream = default_stream()
if stream is not None:
Stream_accept(stream)
if size:
err, ptr = driver.cuMemAlloc(size)
raise_if_driver_error(err)
else:
ptr = 0
return Buffer._init(ptr, size, self)

def deallocate(self, ptr, size, stream):
def deallocate(self, ptr, size, *, stream: Stream | None = None):
from cuda.core._stream import Stream_accept

if stream is not None:
stream.sync()
Stream_accept(stream).sync()
if size:
(err,) = driver.cuMemFree(ptr)
raise_if_driver_error(err)
Expand Down
Loading
Loading