From 1366ed11c5e5f3cabf751d491459a9928e030c39 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Sun, 3 May 2026 09:55:19 +0200 Subject: [PATCH 1/2] Add numpy + tensorflow regression tests for thread affinity The v3.0 fix for numpy/torch/tensorflow segfaults was per-context worker pthreads with stable thread affinity (commit 8a7a68c). py_thread_affinity_ SUITE checks threading.get_native_id invariants in isolation; this new suite drives real numpy and tensorflow operations through exec / eval / call and across multiple Erlang processes hitting the same context. Each case spins up a fresh py_context for isolation. Library imports use a Python try/except that flags import status into a variable so missing modules surface as a clean skip while genuine errors still propagate. TensorFlow stays user-skipped on CI; numpy is now installed on every CT-running leg (Linux/macOS matrix, ASan, free-threaded 3.13t, FreeBSD via py-numpy). numpy 2.4 still rejects subinterpreter loading, so numpy_owngil_basic self-skips with that ImportError; the case will start running once numpy adds OWN_GIL support. --- .github/workflows/ci.yml | 18 +++ docs/scalability.md | 2 +- test/py_ml_libs_SUITE.erl | 265 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 284 insertions(+), 1 deletion(-) create mode 100644 test/py_ml_libs_SUITE.erl diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9b7206a..af86d48 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -57,6 +57,11 @@ jobs: run: | brew install erlang rebar3 + - name: Install numpy (for ML thread-affinity tests) + run: | + python3 -m pip install --upgrade pip + python3 -m pip install numpy + - name: Verify versions run: | echo "Erlang/OTP version:" @@ -112,6 +117,9 @@ jobs: usesh: true prepare: | pkg install -y erlang ${{ matrix.python_pkg }} cmake + # numpy package follows the py-numpy convention + NUMPY_PKG="$(echo '${{ matrix.python_pkg }}' | sed 's/^python/py/')-numpy" + pkg install -y "$NUMPY_PKG" run: | # Set up environment export PYTHON_CONFIG=python${{ matrix.python }}-config @@ -159,6 +167,11 @@ jobs: python3 -c "import sys; print('GIL enabled:', sys._is_gil_enabled())" python3 -c "import sysconfig; print('Py_GIL_DISABLED:', sysconfig.get_config_var('Py_GIL_DISABLED'))" + - name: Install numpy (for ML thread-affinity tests) + run: | + python3 -m pip install --upgrade pip + python3 -m pip install numpy + - name: Set Python library path run: | PYTHON_LIB=$(python3 -c "import sysconfig; print(sysconfig.get_config_var('LIBDIR'))") @@ -222,6 +235,11 @@ jobs: sudo apt-get update sudo apt-get install -y cmake + - name: Install numpy (for ML thread-affinity tests) + run: | + python3 -m pip install --upgrade pip + python3 -m pip install numpy + - name: Set Python library path run: | PYTHON_LIB=$(python3 -c "import sysconfig; print(sysconfig.get_config_var('LIBDIR'))") diff --git a/docs/scalability.md b/docs/scalability.md index eef5d21..a5c7eeb 100644 --- a/docs/scalability.md +++ b/docs/scalability.md @@ -21,7 +21,7 @@ py:execution_mode(). ### Worker Mode (Default) -Each context gets a dedicated pthread that handles all Python operations. This provides stable thread affinity, which is critical for libraries like numpy, torch, and tensorflow that maintain thread-local state. +Each context gets a dedicated pthread that handles all Python operations. This provides stable thread affinity, which is critical for libraries like numpy, torch, and tensorflow that maintain thread-local state. The regression contract for that guarantee lives in `test/py_ml_libs_SUITE.erl`, which drives real numpy and tensorflow operations through `exec` / `eval` / `call` and across multiple Erlang processes targeting the same context. ### OWN_GIL Mode (Python 3.12+) diff --git a/test/py_ml_libs_SUITE.erl b/test/py_ml_libs_SUITE.erl new file mode 100644 index 0000000..44de937 --- /dev/null +++ b/test/py_ml_libs_SUITE.erl @@ -0,0 +1,265 @@ +%%% @doc Regression contract for context thread affinity with real +%%% ML libraries. +%%% +%%% v3.0 fixed numpy / torch / tensorflow segfaults caused by the +%%% executor pool moving calls across OS threads. The fix is per-context +%%% worker pthreads with stable thread affinity. `py_thread_affinity_SUITE' +%%% checks the threading.get_native_id invariants in isolation; +%%% this suite drives actual numpy and tensorflow operations through +%%% exec / eval / call paths to confirm the libraries' thread-local +%%% state survives the round-trip. +%%% +%%% Skip-on-missing for both libraries: cases that need an unavailable +%%% module return {skip, ...} from init_per_testcase. TensorFlow is +%%% always skipped on CI (too heavy to install). Owngil cases additionally +%%% skip on Python <3.14. +-module(py_ml_libs_SUITE). + +-include_lib("common_test/include/ct.hrl"). + +-export([ + all/0, + init_per_suite/1, + end_per_suite/1, + init_per_testcase/2, + end_per_testcase/2 +]). + +-export([ + numpy_basic_ops/1, + numpy_call_thread_affinity/1, + numpy_parallel_processes/1, + numpy_owngil_basic/1, + tensorflow_basic_ops/1, + tensorflow_call_thread_affinity/1 +]). + +all() -> + [ + numpy_basic_ops, + numpy_call_thread_affinity, + numpy_parallel_processes, + numpy_owngil_basic, + tensorflow_basic_ops, + tensorflow_call_thread_affinity + ]. + +init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(erlang_python), + %% Suppress TensorFlow's chatty C++ logging at import time. + %% Must be set before TF is imported in any context. + os:putenv("TF_CPP_MIN_LOG_LEVEL", "3"), + Config. + +end_per_suite(_Config) -> + ok = application:stop(erlang_python), + ok. + +init_per_testcase(TC, Config) -> + Mode = case TC of + numpy_owngil_basic -> owngil; + _ -> worker + end, + case Mode of + owngil -> + case py_nif:owngil_supported() of + false -> + {skip, "owngil mode requires Python 3.14+"}; + true -> + setup_case(TC, Mode, Config) + end; + worker -> + setup_case(TC, Mode, Config) + end. + +end_per_testcase(_TC, Config) -> + case proplists:get_value(ctx, Config) of + undefined -> ok; + Ctx -> py_context:stop(Ctx) + end. + +%%% --------------------------------------------------------------------------- +%%% Helpers +%%% --------------------------------------------------------------------------- + +setup_case(TC, Mode, Config) -> + case py_context:new(#{mode => Mode}) of + {ok, Ctx} -> + case require_module(Ctx, required_module(TC)) of + ok -> + [{ctx, Ctx} | Config]; + {skip, _} = Skip -> + py_context:stop(Ctx), + Skip + end; + {error, Reason} -> + ct:fail({context_create_failed, Mode, Reason}) + end. + +required_module(numpy_basic_ops) -> "numpy"; +required_module(numpy_call_thread_affinity) -> "numpy"; +required_module(numpy_parallel_processes) -> "numpy"; +required_module(numpy_owngil_basic) -> "numpy"; +required_module(tensorflow_basic_ops) -> "tensorflow"; +required_module(tensorflow_call_thread_affinity) -> "tensorflow". + +%% Reflect the import status into a Python variable so we can +%% distinguish "module not installed" (skip) from any other error +%% (let it bubble up). A native-extension crash that surfaces as a +%% non-ImportError must not silently turn into a skip. +require_module(Ctx, Mod) -> + Code = iolist_to_binary([ + "try:\n", + " import ", Mod, "\n", + " _import_status = 'ok'\n", + "except ImportError:\n", + " _import_status = 'not_found'\n" + ]), + ok = py_context:exec(Ctx, Code), + {ok, Status} = py_context:eval(Ctx, <<"_import_status">>, #{}), + case Status of + <<"ok">> -> + ok; + <<"not_found">> -> + {skip, "Python module " ++ Mod ++ " not available"} + end. + +native_id(Ctx) -> + {ok, Tid} = py_context:eval(Ctx, + <<"__import__('threading').get_native_id()">>, #{}), + Tid. + +%%% --------------------------------------------------------------------------- +%%% numpy cases +%%% --------------------------------------------------------------------------- + +numpy_basic_ops(Config) -> + Ctx = ?config(ctx, Config), + %% Define a numpy-backed function and exercise both call and eval + %% paths so a thread-state regression in either direction crashes. + ok = py_context:exec(Ctx, << + "import numpy as np\n" + "def vec_dot_self(xs):\n" + " v = np.array(xs, dtype=np.float64)\n" + " return float(np.dot(v, v))\n" + >>), + {ok, 30.0} = py_context:call(Ctx, '__main__', vec_dot_self, + [[1.0, 2.0, 3.0, 4.0]]), + ok = py_context:exec(Ctx, + <<"_w = vec_dot_self([10.0, 0.0, 0.0])">>), + {ok, 100.0} = py_context:eval(Ctx, <<"_w">>, #{}), + ok. + +numpy_call_thread_affinity(Config) -> + Ctx = ?config(ctx, Config), + ok = py_context:exec(Ctx, << + "import numpy as np\n" + "import threading\n" + "def numpy_with_tid(xs):\n" + " v = np.array(xs, dtype=np.float64)\n" + " return (threading.get_native_id(), float(np.sum(v)))\n" + >>), + Results = [py_context:call(Ctx, '__main__', numpy_with_tid, + [[float(I), float(I + 1), float(I + 2)]]) + || I <- lists:seq(1, 50)], + Tids = [Tid || {ok, {Tid, _Sum}} <- Results], + Sums = [Sum || {ok, {_Tid, Sum}} <- Results], + 50 = length(Tids), + [SingleTid] = lists:usort(Tids), + true = is_integer(SingleTid), + %% Spot-check a few sums. + Expected = [3.0 * I + 3.0 || I <- lists:seq(1, 50)], + Expected = Sums, + ok. + +numpy_parallel_processes(Config) -> + Ctx = ?config(ctx, Config), + ok = py_context:exec(Ctx, << + "import numpy as np\n" + "import threading\n" + "def numpy_dot_with_tid(xs, ys):\n" + " a = np.array(xs, dtype=np.float64)\n" + " b = np.array(ys, dtype=np.float64)\n" + " return (threading.get_native_id(), float(np.dot(a, b)))\n" + >>), + Parent = self(), + N = 8, + Pids = [spawn_link(fun() -> + Xs = [float(K * J) || J <- lists:seq(1, 4)], + Ys = [float(K + J) || J <- lists:seq(1, 4)], + R = py_context:call(Ctx, '__main__', numpy_dot_with_tid, + [Xs, Ys]), + Parent ! {result, K, R} + end) || K <- lists:seq(1, N)], + Results = [receive {result, K, R} -> {K, R} after 5000 -> ct:fail(timeout) end + || _ <- Pids], + %% All calls converged on one thread. + Tids = [Tid || {_K, {ok, {Tid, _Sum}}} <- Results], + N = length(Tids), + [SingleTid] = lists:usort(Tids), + true = is_integer(SingleTid), + %% Each result matches the expected dot product. + lists:foreach(fun({K, {ok, {_Tid, Got}}}) -> + Xs = [float(K * J) || J <- lists:seq(1, 4)], + Ys = [float(K + J) || J <- lists:seq(1, 4)], + Expected = lists:sum([X * Y || {X, Y} <- lists:zip(Xs, Ys)]), + true = abs(Got - Expected) < 1.0e-9 + end, Results), + %% Drain time + mailbox sanity (no orphan results). + timer:sleep(50), + {messages, []} = erlang:process_info(self(), messages), + ok. + +numpy_owngil_basic(Config) -> + Ctx = ?config(ctx, Config), + %% Same shape as numpy_basic_ops but inside an OWN_GIL subinterpreter. + %% Numpy in OWN_GIL was the original v3.0 motivator on Python 3.14. + ok = py_context:exec(Ctx, << + "import numpy as np\n" + "def vec_dot_self(xs):\n" + " v = np.array(xs, dtype=np.float64)\n" + " return float(np.dot(v, v))\n" + >>), + {ok, 30.0} = py_context:call(Ctx, '__main__', vec_dot_self, + [[1.0, 2.0, 3.0, 4.0]]), + {ok, 25.0} = py_context:call(Ctx, '__main__', vec_dot_self, [[5.0]]), + %% Confirm the thread is stable across owngil calls. + Tid1 = native_id(Ctx), + Tid2 = native_id(Ctx), + Tid1 = Tid2, + ok. + +%%% --------------------------------------------------------------------------- +%%% tensorflow cases +%%% --------------------------------------------------------------------------- + +tensorflow_basic_ops(Config) -> + Ctx = ?config(ctx, Config), + ok = py_context:exec(Ctx, << + "import tensorflow as tf\n" + "def matmul_22():\n" + " a = tf.constant([[1.0, 2.0], [3.0, 4.0]])\n" + " return tf.linalg.matmul(a, a).numpy().tolist()\n" + >>), + {ok, [[7.0, 10.0], [15.0, 22.0]]} = + py_context:call(Ctx, '__main__', matmul_22, []), + ok. + +tensorflow_call_thread_affinity(Config) -> + Ctx = ?config(ctx, Config), + ok = py_context:exec(Ctx, << + "import tensorflow as tf\n" + "import threading\n" + "def tf_sum_with_tid(xs):\n" + " t = tf.constant(xs, dtype=tf.float64)\n" + " return (threading.get_native_id(),\n" + " float(tf.math.reduce_sum(t).numpy()))\n" + >>), + Results = [py_context:call(Ctx, '__main__', tf_sum_with_tid, + [[float(I), float(I + 1)]]) + || I <- lists:seq(1, 20)], + Tids = [Tid || {ok, {Tid, _Sum}} <- Results], + 20 = length(Tids), + [SingleTid] = lists:usort(Tids), + true = is_integer(SingleTid), + ok. From 25ca849c526b6de48566548057a6413a012deaae Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Sun, 3 May 2026 10:13:19 +0200 Subject: [PATCH 2/2] ci(freebsd): make numpy install non-fatal py312-numpy isn't published in FreeBSD 14.1's pkg repository yet, so pkg install fails the job before the build even starts. The ML SUITE already self-skips when numpy isn't importable, so the right move is to log a warning and move on rather than block CI on an upstream package gap. --- .github/workflows/ci.yml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index af86d48..e14e356 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -117,9 +117,13 @@ jobs: usesh: true prepare: | pkg install -y erlang ${{ matrix.python_pkg }} cmake - # numpy package follows the py-numpy convention + # numpy package follows the py-numpy convention. + # Non-fatal: not every FreeBSD pkg snapshot ships numpy for + # every Python flavor; the ML SUITE self-skips when numpy + # is missing, which is the right behavior here. NUMPY_PKG="$(echo '${{ matrix.python_pkg }}' | sed 's/^python/py/')-numpy" - pkg install -y "$NUMPY_PKG" + pkg install -y "$NUMPY_PKG" || \ + echo "WARN: $NUMPY_PKG unavailable; ML cases will skip" run: | # Set up environment export PYTHON_CONFIG=python${{ matrix.python }}-config