Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ jobs:
run: |
brew install erlang rebar3

- name: Install numpy (for ML thread-affinity tests)
run: |
python3 -m pip install --upgrade pip
python3 -m pip install numpy

- name: Verify versions
run: |
echo "Erlang/OTP version:"
Expand Down Expand Up @@ -112,6 +117,13 @@ jobs:
usesh: true
prepare: |
pkg install -y erlang ${{ matrix.python_pkg }} cmake
# numpy package follows the py<noDot>-numpy convention.
# Non-fatal: not every FreeBSD pkg snapshot ships numpy for
# every Python flavor; the ML SUITE self-skips when numpy
# is missing, which is the right behavior here.
NUMPY_PKG="$(echo '${{ matrix.python_pkg }}' | sed 's/^python/py/')-numpy"
pkg install -y "$NUMPY_PKG" || \
echo "WARN: $NUMPY_PKG unavailable; ML cases will skip"
run: |
# Set up environment
export PYTHON_CONFIG=python${{ matrix.python }}-config
Expand Down Expand Up @@ -159,6 +171,11 @@ jobs:
python3 -c "import sys; print('GIL enabled:', sys._is_gil_enabled())"
python3 -c "import sysconfig; print('Py_GIL_DISABLED:', sysconfig.get_config_var('Py_GIL_DISABLED'))"

- name: Install numpy (for ML thread-affinity tests)
run: |
python3 -m pip install --upgrade pip
python3 -m pip install numpy

- name: Set Python library path
run: |
PYTHON_LIB=$(python3 -c "import sysconfig; print(sysconfig.get_config_var('LIBDIR'))")
Expand Down Expand Up @@ -222,6 +239,11 @@ jobs:
sudo apt-get update
sudo apt-get install -y cmake

- name: Install numpy (for ML thread-affinity tests)
run: |
python3 -m pip install --upgrade pip
python3 -m pip install numpy

- name: Set Python library path
run: |
PYTHON_LIB=$(python3 -c "import sysconfig; print(sysconfig.get_config_var('LIBDIR'))")
Expand Down
2 changes: 1 addition & 1 deletion docs/scalability.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ py:execution_mode().

### Worker Mode (Default)

Each context gets a dedicated pthread that handles all Python operations. This provides stable thread affinity, which is critical for libraries like numpy, torch, and tensorflow that maintain thread-local state.
Each context gets a dedicated pthread that handles all Python operations. This provides stable thread affinity, which is critical for libraries like numpy, torch, and tensorflow that maintain thread-local state. The regression contract for that guarantee lives in `test/py_ml_libs_SUITE.erl`, which drives real numpy and tensorflow operations through `exec` / `eval` / `call` and across multiple Erlang processes targeting the same context.

### OWN_GIL Mode (Python 3.12+)

Expand Down
265 changes: 265 additions & 0 deletions test/py_ml_libs_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
%%% @doc Regression contract for context thread affinity with real
%%% ML libraries.
%%%
%%% v3.0 fixed numpy / torch / tensorflow segfaults caused by the
%%% executor pool moving calls across OS threads. The fix is per-context
%%% worker pthreads with stable thread affinity. `py_thread_affinity_SUITE'
%%% checks the threading.get_native_id invariants in isolation;
%%% this suite drives actual numpy and tensorflow operations through
%%% exec / eval / call paths to confirm the libraries' thread-local
%%% state survives the round-trip.
%%%
%%% Skip-on-missing for both libraries: cases that need an unavailable
%%% module return {skip, ...} from init_per_testcase. TensorFlow is
%%% always skipped on CI (too heavy to install). Owngil cases additionally
%%% skip on Python <3.14.
-module(py_ml_libs_SUITE).

-include_lib("common_test/include/ct.hrl").

-export([
all/0,
init_per_suite/1,
end_per_suite/1,
init_per_testcase/2,
end_per_testcase/2
]).

-export([
numpy_basic_ops/1,
numpy_call_thread_affinity/1,
numpy_parallel_processes/1,
numpy_owngil_basic/1,
tensorflow_basic_ops/1,
tensorflow_call_thread_affinity/1
]).

all() ->
[
numpy_basic_ops,
numpy_call_thread_affinity,
numpy_parallel_processes,
numpy_owngil_basic,
tensorflow_basic_ops,
tensorflow_call_thread_affinity
].

init_per_suite(Config) ->
{ok, _} = application:ensure_all_started(erlang_python),
%% Suppress TensorFlow's chatty C++ logging at import time.
%% Must be set before TF is imported in any context.
os:putenv("TF_CPP_MIN_LOG_LEVEL", "3"),
Config.

end_per_suite(_Config) ->
ok = application:stop(erlang_python),
ok.

init_per_testcase(TC, Config) ->
Mode = case TC of
numpy_owngil_basic -> owngil;
_ -> worker
end,
case Mode of
owngil ->
case py_nif:owngil_supported() of
false ->
{skip, "owngil mode requires Python 3.14+"};
true ->
setup_case(TC, Mode, Config)
end;
worker ->
setup_case(TC, Mode, Config)
end.

end_per_testcase(_TC, Config) ->
case proplists:get_value(ctx, Config) of
undefined -> ok;
Ctx -> py_context:stop(Ctx)
end.

%%% ---------------------------------------------------------------------------
%%% Helpers
%%% ---------------------------------------------------------------------------

setup_case(TC, Mode, Config) ->
case py_context:new(#{mode => Mode}) of
{ok, Ctx} ->
case require_module(Ctx, required_module(TC)) of
ok ->
[{ctx, Ctx} | Config];
{skip, _} = Skip ->
py_context:stop(Ctx),
Skip
end;
{error, Reason} ->
ct:fail({context_create_failed, Mode, Reason})
end.

required_module(numpy_basic_ops) -> "numpy";
required_module(numpy_call_thread_affinity) -> "numpy";
required_module(numpy_parallel_processes) -> "numpy";
required_module(numpy_owngil_basic) -> "numpy";
required_module(tensorflow_basic_ops) -> "tensorflow";
required_module(tensorflow_call_thread_affinity) -> "tensorflow".

%% Reflect the import status into a Python variable so we can
%% distinguish "module not installed" (skip) from any other error
%% (let it bubble up). A native-extension crash that surfaces as a
%% non-ImportError must not silently turn into a skip.
require_module(Ctx, Mod) ->
Code = iolist_to_binary([
"try:\n",
" import ", Mod, "\n",
" _import_status = 'ok'\n",
"except ImportError:\n",
" _import_status = 'not_found'\n"
]),
ok = py_context:exec(Ctx, Code),
{ok, Status} = py_context:eval(Ctx, <<"_import_status">>, #{}),
case Status of
<<"ok">> ->
ok;
<<"not_found">> ->
{skip, "Python module " ++ Mod ++ " not available"}
end.

native_id(Ctx) ->
{ok, Tid} = py_context:eval(Ctx,
<<"__import__('threading').get_native_id()">>, #{}),
Tid.

%%% ---------------------------------------------------------------------------
%%% numpy cases
%%% ---------------------------------------------------------------------------

numpy_basic_ops(Config) ->
Ctx = ?config(ctx, Config),
%% Define a numpy-backed function and exercise both call and eval
%% paths so a thread-state regression in either direction crashes.
ok = py_context:exec(Ctx, <<
"import numpy as np\n"
"def vec_dot_self(xs):\n"
" v = np.array(xs, dtype=np.float64)\n"
" return float(np.dot(v, v))\n"
>>),
{ok, 30.0} = py_context:call(Ctx, '__main__', vec_dot_self,
[[1.0, 2.0, 3.0, 4.0]]),
ok = py_context:exec(Ctx,
<<"_w = vec_dot_self([10.0, 0.0, 0.0])">>),
{ok, 100.0} = py_context:eval(Ctx, <<"_w">>, #{}),
ok.

numpy_call_thread_affinity(Config) ->
Ctx = ?config(ctx, Config),
ok = py_context:exec(Ctx, <<
"import numpy as np\n"
"import threading\n"
"def numpy_with_tid(xs):\n"
" v = np.array(xs, dtype=np.float64)\n"
" return (threading.get_native_id(), float(np.sum(v)))\n"
>>),
Results = [py_context:call(Ctx, '__main__', numpy_with_tid,
[[float(I), float(I + 1), float(I + 2)]])
|| I <- lists:seq(1, 50)],
Tids = [Tid || {ok, {Tid, _Sum}} <- Results],
Sums = [Sum || {ok, {_Tid, Sum}} <- Results],
50 = length(Tids),
[SingleTid] = lists:usort(Tids),
true = is_integer(SingleTid),
%% Spot-check a few sums.
Expected = [3.0 * I + 3.0 || I <- lists:seq(1, 50)],
Expected = Sums,
ok.

numpy_parallel_processes(Config) ->
Ctx = ?config(ctx, Config),
ok = py_context:exec(Ctx, <<
"import numpy as np\n"
"import threading\n"
"def numpy_dot_with_tid(xs, ys):\n"
" a = np.array(xs, dtype=np.float64)\n"
" b = np.array(ys, dtype=np.float64)\n"
" return (threading.get_native_id(), float(np.dot(a, b)))\n"
>>),
Parent = self(),
N = 8,
Pids = [spawn_link(fun() ->
Xs = [float(K * J) || J <- lists:seq(1, 4)],
Ys = [float(K + J) || J <- lists:seq(1, 4)],
R = py_context:call(Ctx, '__main__', numpy_dot_with_tid,
[Xs, Ys]),
Parent ! {result, K, R}
end) || K <- lists:seq(1, N)],
Results = [receive {result, K, R} -> {K, R} after 5000 -> ct:fail(timeout) end
|| _ <- Pids],
%% All calls converged on one thread.
Tids = [Tid || {_K, {ok, {Tid, _Sum}}} <- Results],
N = length(Tids),
[SingleTid] = lists:usort(Tids),
true = is_integer(SingleTid),
%% Each result matches the expected dot product.
lists:foreach(fun({K, {ok, {_Tid, Got}}}) ->
Xs = [float(K * J) || J <- lists:seq(1, 4)],
Ys = [float(K + J) || J <- lists:seq(1, 4)],
Expected = lists:sum([X * Y || {X, Y} <- lists:zip(Xs, Ys)]),
true = abs(Got - Expected) < 1.0e-9
end, Results),
%% Drain time + mailbox sanity (no orphan results).
timer:sleep(50),
{messages, []} = erlang:process_info(self(), messages),
ok.

numpy_owngil_basic(Config) ->
Ctx = ?config(ctx, Config),
%% Same shape as numpy_basic_ops but inside an OWN_GIL subinterpreter.
%% Numpy in OWN_GIL was the original v3.0 motivator on Python 3.14.
ok = py_context:exec(Ctx, <<
"import numpy as np\n"
"def vec_dot_self(xs):\n"
" v = np.array(xs, dtype=np.float64)\n"
" return float(np.dot(v, v))\n"
>>),
{ok, 30.0} = py_context:call(Ctx, '__main__', vec_dot_self,
[[1.0, 2.0, 3.0, 4.0]]),
{ok, 25.0} = py_context:call(Ctx, '__main__', vec_dot_self, [[5.0]]),
%% Confirm the thread is stable across owngil calls.
Tid1 = native_id(Ctx),
Tid2 = native_id(Ctx),
Tid1 = Tid2,
ok.

%%% ---------------------------------------------------------------------------
%%% tensorflow cases
%%% ---------------------------------------------------------------------------

tensorflow_basic_ops(Config) ->
Ctx = ?config(ctx, Config),
ok = py_context:exec(Ctx, <<
"import tensorflow as tf\n"
"def matmul_22():\n"
" a = tf.constant([[1.0, 2.0], [3.0, 4.0]])\n"
" return tf.linalg.matmul(a, a).numpy().tolist()\n"
>>),
{ok, [[7.0, 10.0], [15.0, 22.0]]} =
py_context:call(Ctx, '__main__', matmul_22, []),
ok.

tensorflow_call_thread_affinity(Config) ->
Ctx = ?config(ctx, Config),
ok = py_context:exec(Ctx, <<
"import tensorflow as tf\n"
"import threading\n"
"def tf_sum_with_tid(xs):\n"
" t = tf.constant(xs, dtype=tf.float64)\n"
" return (threading.get_native_id(),\n"
" float(tf.math.reduce_sum(t).numpy()))\n"
>>),
Results = [py_context:call(Ctx, '__main__', tf_sum_with_tid,
[[float(I), float(I + 1)]])
|| I <- lists:seq(1, 20)],
Tids = [Tid || {ok, {Tid, _Sum}} <- Results],
20 = length(Tids),
[SingleTid] = lists:usort(Tids),
true = is_integer(SingleTid),
ok.
Loading