From ff4667488ab46357ba9cae2c0558c88fe0b4135e Mon Sep 17 00:00:00 2001 From: Iulian Meghea Date: Thu, 30 Apr 2026 12:56:54 +0000 Subject: [PATCH] Sort offers by price within availability groups in _get_job_plan (#3839) The sort in _get_job_plan only sorted by availability, preserving the input order from heapq.merge. Since backend offers are not guaranteed to be price-sorted (gpuhunt explicitly documents its results as 'not strictly sorted by the price'), cheap offers from one backend could be buried behind expensive offers from another. Add price as a secondary sort key so that available offers are sorted cheapest-first, matching the behavior of the dstack offer path which explicitly sorts by price in get_backend_offers_in_run_candidate_fleets. --- .../_internal/server/services/runs/plan.py | 2 +- .../server/services/runs/test_plan.py | 158 +++++++++++++++++- 2 files changed, 158 insertions(+), 2 deletions(-) diff --git a/src/dstack/_internal/server/services/runs/plan.py b/src/dstack/_internal/server/services/runs/plan.py index 1fcf3e7bd4..43ccbac889 100644 --- a/src/dstack/_internal/server/services/runs/plan.py +++ b/src/dstack/_internal/server/services/runs/plan.py @@ -832,7 +832,7 @@ def _get_job_plan( job_offers.extend(offer for _, offer in instance_offers) if profile.creation_policy == CreationPolicy.REUSE_OR_CREATE: job_offers.extend(offer for _, offer in backend_offers) - job_offers.sort(key=lambda offer: not offer.availability.is_available()) + job_offers.sort(key=lambda offer: (not offer.availability.is_available(), offer.price)) remove_job_spec_sensitive_info(job.job_spec) return JobPlan( job_spec=job.job_spec, diff --git a/src/tests/_internal/server/services/runs/test_plan.py b/src/tests/_internal/server/services/runs/test_plan.py index 5836319bc1..fcfea7aa9c 100644 --- a/src/tests/_internal/server/services/runs/test_plan.py +++ b/src/tests/_internal/server/services/runs/test_plan.py @@ -1,17 +1,22 @@ import copy -from unittest.mock import AsyncMock +from unittest.mock import AsyncMock, Mock import pytest from sqlalchemy.ext.asyncio import AsyncSession +from dstack._internal.core.models.backends.base import BackendType from dstack._internal.core.models.configurations import TaskConfiguration from dstack._internal.core.models.fleets import FleetNodesSpec, InstanceGroupPlacement from dstack._internal.core.models.instances import InstanceAvailability +from dstack._internal.core.models.profiles import CreationPolicy, Profile +from dstack._internal.core.models.resources import ResourcesSpec +from dstack._internal.core.models.runs import Job, JobSpec, Requirements from dstack._internal.server.services.jobs import get_jobs_from_run_spec from dstack._internal.server.services.runs.plan import ( _freeze_offer_identity_value, _get_backend_offer_identity, _get_backend_offers_in_fleet, + _get_job_plan, ) from dstack._internal.server.testing.common import ( create_fleet, @@ -113,3 +118,154 @@ async def test_keeps_unconstrained_offers_for_non_empty_cluster_fleet_without_el get_offers_by_requirements_mock.await_args.kwargs["master_job_provisioning_data"] is None ) + + +class TestGetJobPlan: + def _make_job(self): + job_spec = JobSpec( + job_num=0, + job_name="test-job", + commands=[":"], + env={}, + home_dir="/root", + image_name="scratch", + max_duration=None, + registry_auth=None, + requirements=Requirements(resources=ResourcesSpec()), + retry=None, + working_dir=None, + ) + return Job(job_spec=job_spec, job_submissions=[]) + + def test_sorts_available_offers_by_price(self) -> None: + cheap_available = get_instance_offer_with_availability( + backend=BackendType.VASTAI, price=0.37, availability=InstanceAvailability.AVAILABLE + ) + mid_available = get_instance_offer_with_availability( + backend=BackendType.RUNPOD, price=0.98, availability=InstanceAvailability.AVAILABLE + ) + expensive_available = get_instance_offer_with_availability( + backend=BackendType.RUNPOD, price=1.49, availability=InstanceAvailability.AVAILABLE + ) + profile = Profile(name="test", creation_policy=CreationPolicy.REUSE_OR_CREATE) + job = self._make_job() + + plan = _get_job_plan( + instance_offers=[], + backend_offers=[ + (Mock(), expensive_available), + (Mock(), cheap_available), + (Mock(), mid_available), + ], + profile=profile, + job=job, + max_offers=None, + ) + + assert [o.price for o in plan.offers] == [0.37, 0.98, 1.49] + + def test_sorts_not_available_offers_by_price(self) -> None: + cheap_na = get_instance_offer_with_availability( + price=0.37, availability=InstanceAvailability.NOT_AVAILABLE + ) + expensive_na = get_instance_offer_with_availability( + price=1.49, availability=InstanceAvailability.NOT_AVAILABLE + ) + profile = Profile(name="test", creation_policy=CreationPolicy.REUSE_OR_CREATE) + job = self._make_job() + + plan = _get_job_plan( + instance_offers=[], + backend_offers=[ + (Mock(), expensive_na), + (Mock(), cheap_na), + ], + profile=profile, + job=job, + max_offers=None, + ) + + assert [o.price for o in plan.offers] == [0.37, 1.49] + + def test_sorts_mixed_availability_by_availability_then_price(self) -> None: + available_expensive = get_instance_offer_with_availability( + price=1.49, availability=InstanceAvailability.AVAILABLE + ) + available_cheap = get_instance_offer_with_availability( + price=0.98, availability=InstanceAvailability.AVAILABLE + ) + na_expensive = get_instance_offer_with_availability( + price=2.00, availability=InstanceAvailability.NOT_AVAILABLE + ) + na_cheap = get_instance_offer_with_availability( + price=0.37, availability=InstanceAvailability.NOT_AVAILABLE + ) + profile = Profile(name="test", creation_policy=CreationPolicy.REUSE_OR_CREATE) + job = self._make_job() + + plan = _get_job_plan( + instance_offers=[], + backend_offers=[ + (Mock(), available_expensive), + (Mock(), na_expensive), + (Mock(), available_cheap), + (Mock(), na_cheap), + ], + profile=profile, + job=job, + max_offers=None, + ) + + assert [o.price for o in plan.offers] == [0.98, 1.49, 0.37, 2.00] + + def test_sorts_unsorted_multi_backend_offers_by_price(self) -> None: + runpod_expensive = get_instance_offer_with_availability( + backend=BackendType.RUNPOD, price=0.98, availability=InstanceAvailability.AVAILABLE + ) + runpod_mid = get_instance_offer_with_availability( + backend=BackendType.RUNPOD, price=1.49, availability=InstanceAvailability.AVAILABLE + ) + vastai_cheap = get_instance_offer_with_availability( + backend=BackendType.VASTAI, price=0.37, availability=InstanceAvailability.AVAILABLE + ) + vastai_mid = get_instance_offer_with_availability( + backend=BackendType.VASTAI, price=1.22, availability=InstanceAvailability.AVAILABLE + ) + profile = Profile(name="test", creation_policy=CreationPolicy.REUSE_OR_CREATE) + job = self._make_job() + + plan = _get_job_plan( + instance_offers=[], + backend_offers=[ + (Mock(), runpod_expensive), + (Mock(), runpod_mid), + (Mock(), vastai_cheap), + (Mock(), vastai_mid), + ], + profile=profile, + job=job, + max_offers=None, + ) + + assert [o.price for o in plan.offers] == [0.37, 0.98, 1.22, 1.49] + + def test_instance_offers_and_backend_offers_sorted_by_availability_then_price(self) -> None: + idle_expensive = get_instance_offer_with_availability( + price=0.98, availability=InstanceAvailability.IDLE + ) + backend_cheap = get_instance_offer_with_availability( + price=0.37, availability=InstanceAvailability.AVAILABLE + ) + profile = Profile(name="test", creation_policy=CreationPolicy.REUSE_OR_CREATE) + job = self._make_job() + instance_model = Mock() + + plan = _get_job_plan( + instance_offers=[(instance_model, idle_expensive)], + backend_offers=[(Mock(), backend_cheap)], + profile=profile, + job=job, + max_offers=None, + ) + + assert [o.price for o in plan.offers] == [0.37, 0.98]