perf: add inline-persist fast path for WAC v2 step() (#8807)
This commit is contained in:
@@ -2416,6 +2416,23 @@ class WorkflowCtx:
|
||||
self._counters: dict[str, int] = {}
|
||||
self._pending: list = []
|
||||
self._executing_key: str | None = checkpoint.get("_executing_key")
|
||||
# Reuse a single httpx.AsyncClient across all fast-path step() calls
|
||||
# in this workflow invocation. Instantiating a fresh client per call
|
||||
# allocates a new connection pool each time — on localhost this adds
|
||||
# ~15ms per step, dominating the end-to-end cost. Lazily built so no
|
||||
# client is created for workflows that never hit the fast path.
|
||||
self._inline_http_client: "httpx.AsyncClient | None" = None
|
||||
# Serializes fast-path POSTs across concurrent step() calls within
|
||||
# one workflow invocation. Wraps only the HTTP call, not fn() — so
|
||||
# `asyncio.gather(step("a", fn_a), step("b", fn_b))` still runs the
|
||||
# two fn() bodies in parallel, only the API requests are ordered.
|
||||
# This closes the first-write race window against `SELECT FOR UPDATE`
|
||||
# on a not-yet-created `v2_job_status` row: concurrent POSTs would
|
||||
# both see None and both overwrite each other's checkpoint because
|
||||
# the helper writes the whole serialized `_checkpoint` object, not
|
||||
# a single `completed_steps[key]`. Lazily built so the ctx can be
|
||||
# constructed outside an event loop (tests do this).
|
||||
self._inline_lock: "_asyncio.Lock | None" = None
|
||||
|
||||
def _alloc_key(self, name: str = "step") -> str:
|
||||
"""Name-based key: ``double`` for first call, ``double_2``, ``double_3`` for subsequent."""
|
||||
@@ -2543,6 +2560,54 @@ class WorkflowCtx:
|
||||
result = await result
|
||||
duration_ms = int((_time_mod.monotonic() - t0) * 1000)
|
||||
|
||||
# Fast path: POST the delta to the new per-job API endpoint and return
|
||||
# the result directly, letting the workflow subprocess continue into
|
||||
# the next step() without unwinding. On any failure — network, auth,
|
||||
# timeout, source-hash mismatch, old backend without the endpoint —
|
||||
# fall through to raising _StepSuspend so the worker takes the legacy
|
||||
# suspend-and-replay path. Gated by WM_WAC_INLINE_FAST_PATH (default
|
||||
# on) so the old behavior stays reachable for A/B testing and rollback.
|
||||
_fast_path_flag = os.environ.get("WM_WAC_INLINE_FAST_PATH", "1").strip().lower()
|
||||
_fast_path_enabled = _fast_path_flag not in ("0", "false", "off", "no")
|
||||
_job_id = os.environ.get("WM_JOB_ID")
|
||||
_workspace = os.environ.get("WM_WORKSPACE")
|
||||
_base = os.environ.get("BASE_INTERNAL_URL")
|
||||
_token = os.environ.get("WM_TOKEN")
|
||||
if _fast_path_enabled and _job_id and _workspace and _base and _token:
|
||||
try:
|
||||
if self._inline_lock is None:
|
||||
self._inline_lock = _asyncio.Lock()
|
||||
# Lock wraps only the POST, not fn() above — concurrent
|
||||
# step() calls run fn() in parallel, then serialize on
|
||||
# the API request.
|
||||
async with self._inline_lock:
|
||||
if self._inline_http_client is None:
|
||||
self._inline_http_client = httpx.AsyncClient(
|
||||
timeout=httpx.Timeout(10.0),
|
||||
headers={
|
||||
"Authorization": f"Bearer {_token}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
)
|
||||
_resp = await self._inline_http_client.post(
|
||||
f"{_base}/api/w/{_workspace}/jobs/wac/inline_checkpoint/{_job_id}",
|
||||
json={
|
||||
"key": key,
|
||||
"result": result,
|
||||
"started_at": started_at,
|
||||
"duration_ms": duration_ms,
|
||||
},
|
||||
)
|
||||
_resp.raise_for_status()
|
||||
return result
|
||||
except Exception as _e:
|
||||
logger.info(
|
||||
"WAC v2 inline fast path failed for key %s, falling back to suspend: %s",
|
||||
key,
|
||||
_e,
|
||||
)
|
||||
# fall through to the legacy suspend path
|
||||
|
||||
raise _StepSuspend({
|
||||
"mode": "inline_checkpoint",
|
||||
"steps": [],
|
||||
@@ -2873,7 +2938,23 @@ async def _run_workflow_async(func, checkpoint: dict, input_args: dict):
|
||||
}
|
||||
return {"type": "dispatch", **info}
|
||||
finally:
|
||||
_workflow_ctx.reset(token)
|
||||
# Close the lazily-built fast-path httpx client so we don't emit
|
||||
# asyncio ResourceWarning('unclosed transport') on shutdown and don't
|
||||
# leak connection pools when this coroutine is driven from a
|
||||
# long-lived loop (tests, REPL, embedded callers).
|
||||
#
|
||||
# Wrapped in its own try/finally so that asyncio.CancelledError
|
||||
# (which is a BaseException since Python 3.8) during aclose() does
|
||||
# not skip the _workflow_ctx.reset(token) below.
|
||||
try:
|
||||
if ctx._inline_http_client is not None:
|
||||
try:
|
||||
await ctx._inline_http_client.aclose()
|
||||
except Exception:
|
||||
pass
|
||||
ctx._inline_http_client = None
|
||||
finally:
|
||||
_workflow_ctx.reset(token)
|
||||
|
||||
|
||||
def _run_workflow(func, checkpoint: dict, input_args: dict):
|
||||
|
||||
Reference in New Issue
Block a user