diff --git a/src/yc_bench/agent/prompt.py b/src/yc_bench/agent/prompt.py index 88443c2..9f51047 100644 --- a/src/yc_bench/agent/prompt.py +++ b/src/yc_bench/agent/prompt.py @@ -44,7 +44,7 @@ Run multiple tasks concurrently when possible. Accept → assign → dispatch a ## Key Mechanics - **Salary bumps**: completed tasks raise salary for every assigned employee. More employees assigned = higher payroll growth. -- **Throughput split**: employees on multiple active tasks split their rate (rate/sqrt(N)). Two tasks run at ~71% each. +- **Throughput split**: employees on multiple active tasks split their rate (rate/N). Two tasks run at 50% each. - **Deadlines**: success before deadline = reward + prestige. Failure = prestige penalty, no reward. - **Trust**: completing tasks for a client builds trust → less work per task, access to gated tasks. Working for one client erodes trust with others. - **Not all clients are reliable.** Check `client history` for failure patterns. diff --git a/src/yc_bench/agent/runtime/base.py b/src/yc_bench/agent/runtime/base.py index 2a180ec..7050fd3 100644 --- a/src/yc_bench/agent/runtime/base.py +++ b/src/yc_bench/agent/runtime/base.py @@ -11,4 +11,12 @@ class AgentRuntime(ABC): def clear_session(self, session_id): raise NotImplementedError + def save_session_messages(self, session_id: str, path) -> None: + """Persist session messages for crash recovery. Override in subclass.""" + pass + + def restore_session_messages(self, session_id: str, path) -> int: + """Restore session messages from file. Returns count. Override in subclass.""" + return 0 + __all__ = ["AgentRuntime"] \ No newline at end of file diff --git a/src/yc_bench/agent/runtime/litellm_runtime.py b/src/yc_bench/agent/runtime/litellm_runtime.py index dced6ed..27dcba2 100644 --- a/src/yc_bench/agent/runtime/litellm_runtime.py +++ b/src/yc_bench/agent/runtime/litellm_runtime.py @@ -146,6 +146,32 @@ class LiteLLMRuntime(AgentRuntime): def clear_session(self, session_id): self._sessions.pop(session_id, None) + def save_session_messages(self, session_id: str, path) -> None: + """Persist current session messages to a JSON file for resume.""" + session = self._sessions.get(session_id) + if session is None: + return + import json as _json + from pathlib import Path + Path(path).write_text(_json.dumps(session.messages, separators=(",", ":"))) + + def restore_session_messages(self, session_id: str, path) -> int: + """Load saved messages into a session. Returns number of messages loaded.""" + import json as _json + from pathlib import Path + p = Path(path) + if not p.exists(): + return 0 + try: + messages = _json.loads(p.read_text()) + session = self._get_or_create_session(session_id) + session.messages = messages + logger.info("Restored %d messages into session %s.", len(messages), session_id) + return len(messages) + except Exception as exc: + logger.warning("Could not restore session from %s: %s", path, exc) + return 0 + # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ diff --git a/src/yc_bench/core/progress.py b/src/yc_bench/core/progress.py index 2b5ab08..900cd4a 100644 --- a/src/yc_bench/core/progress.py +++ b/src/yc_bench/core/progress.py @@ -73,15 +73,13 @@ def _effective_rate_for_task_domain(*, task_id, domain, assignments, assignment_counts, base_rates): """Compute effective rate for one task+domain. - Throughput split uses sqrt(k) instead of k: two concurrent tasks each run at - 1/sqrt(2) ≈ 71% speed, not 50%. This makes mild parallelism (2-3 tasks) - more efficient than strict sequential. + Throughput split is linear: an employee on k concurrent tasks contributes + rate/k to each. Total throughput is constant regardless of concurrency — + no benefit to parallelizing beyond pipeline flexibility. Brooks's Law: first 4 employees contribute full rate. Beyond that, additional employees contribute at 25% (overcrowding overhead). """ - from math import sqrt - # Collect (employee_id, effective_base) for this task, sorted best-first contributions = [] for a in assignments: @@ -91,7 +89,7 @@ def _effective_rate_for_task_domain(*, task_id, domain, assignments, if k <= 0: continue base = base_rates.get((a.employee_id, domain), Decimal("0")) - split_rate = base / Decimal(str(round(sqrt(k), 6))) + split_rate = base / Decimal(k) contributions.append(split_rate) # Sort best contributors first so they get full rate @@ -264,14 +262,13 @@ def compute_effective_rates(db, company_id): out = [] for req in requirements: - from math import sqrt contributions = [] for a in assignments_by_task.get(req.task_id, []): k = assignment_counts.get(a.employee_id, 0) if k <= 0: continue base = base_rates.get((a.employee_id, req.domain), Decimal("0")) - split_rate = base / Decimal(str(round(sqrt(k), 6))) + split_rate = base / Decimal(k) contributions.append(split_rate) contributions.sort(reverse=True) diff --git a/src/yc_bench/runner/main.py b/src/yc_bench/runner/main.py index 575bab1..f53fd0d 100644 --- a/src/yc_bench/runner/main.py +++ b/src/yc_bench/runner/main.py @@ -277,9 +277,24 @@ def run_benchmark(args): # Write live transcript alongside the DB so the streamlit dashboard can read it _slug = args.model.replace("/", "_") transcript_path = Path("db") / f"{args.config_name}_{args.seed}_{_slug}.transcript.jsonl" - if transcript_path.exists(): + session_messages_path = Path("db") / f"{args.config_name}_{args.seed}_{_slug}.session.json" + _is_resume = transcript_path.exists() and session_messages_path.exists() + if not _is_resume and transcript_path.exists(): transcript_path.unlink() + # Restore session messages on resume + if _is_resume: + n_restored = runtime.restore_session_messages(session_id, session_messages_path) + if n_restored > 0: + # Restore run_state turn count from transcript + try: + prior_lines = transcript_path.read_text().strip().split("\n") + prior_turns = len([l for l in prior_lines if l.strip()]) + run_state.turn_count = prior_turns + logger.info("Resumed: %d prior turns, %d session messages.", prior_turns, n_restored) + except Exception: + pass + def _write_live_transcript(snapshot, rs, commands): """Append one JSONL line per turn for the streamlit dashboard.""" if not rs.transcript: @@ -296,6 +311,8 @@ def run_benchmark(args): }, separators=(",", ":")) with open(transcript_path, "a") as f: f.write(line + "\n") + # Save session messages after every turn for crash recovery + runtime.save_session_messages(session_id, session_messages_path) if use_live: from .dashboard import BenchmarkDashboard