Implemented safe rerun; fixed skill division bug

This commit is contained in:
Muyu He 2026-03-23 19:14:47 -07:00
parent 2f38babba6
commit f1d5f63aaa
5 changed files with 58 additions and 10 deletions

View file

@ -44,7 +44,7 @@ Run multiple tasks concurrently when possible. Accept → assign → dispatch a
## Key Mechanics ## Key Mechanics
- **Salary bumps**: completed tasks raise salary for every assigned employee. More employees assigned = higher payroll growth. - **Salary bumps**: completed tasks raise salary for every assigned employee. More employees assigned = higher payroll growth.
- **Throughput split**: employees on multiple active tasks split their rate (rate/sqrt(N)). Two tasks run at ~71% each. - **Throughput split**: employees on multiple active tasks split their rate (rate/N). Two tasks run at 50% each.
- **Deadlines**: success before deadline = reward + prestige. Failure = prestige penalty, no reward. - **Deadlines**: success before deadline = reward + prestige. Failure = prestige penalty, no reward.
- **Trust**: completing tasks for a client builds trust less work per task, access to gated tasks. Working for one client erodes trust with others. - **Trust**: completing tasks for a client builds trust less work per task, access to gated tasks. Working for one client erodes trust with others.
- **Not all clients are reliable.** Check `client history` for failure patterns. - **Not all clients are reliable.** Check `client history` for failure patterns.

View file

@ -11,4 +11,12 @@ class AgentRuntime(ABC):
def clear_session(self, session_id): def clear_session(self, session_id):
raise NotImplementedError raise NotImplementedError
def save_session_messages(self, session_id: str, path) -> None:
"""Persist session messages for crash recovery. Override in subclass."""
pass
def restore_session_messages(self, session_id: str, path) -> int:
"""Restore session messages from file. Returns count. Override in subclass."""
return 0
__all__ = ["AgentRuntime"] __all__ = ["AgentRuntime"]

View file

@ -146,6 +146,32 @@ class LiteLLMRuntime(AgentRuntime):
def clear_session(self, session_id): def clear_session(self, session_id):
self._sessions.pop(session_id, None) self._sessions.pop(session_id, None)
def save_session_messages(self, session_id: str, path) -> None:
"""Persist current session messages to a JSON file for resume."""
session = self._sessions.get(session_id)
if session is None:
return
import json as _json
from pathlib import Path
Path(path).write_text(_json.dumps(session.messages, separators=(",", ":")))
def restore_session_messages(self, session_id: str, path) -> int:
"""Load saved messages into a session. Returns number of messages loaded."""
import json as _json
from pathlib import Path
p = Path(path)
if not p.exists():
return 0
try:
messages = _json.loads(p.read_text())
session = self._get_or_create_session(session_id)
session.messages = messages
logger.info("Restored %d messages into session %s.", len(messages), session_id)
return len(messages)
except Exception as exc:
logger.warning("Could not restore session from %s: %s", path, exc)
return 0
# ------------------------------------------------------------------ # ------------------------------------------------------------------
# Internal helpers # Internal helpers
# ------------------------------------------------------------------ # ------------------------------------------------------------------

View file

@ -73,15 +73,13 @@ def _effective_rate_for_task_domain(*, task_id, domain, assignments,
assignment_counts, base_rates): assignment_counts, base_rates):
"""Compute effective rate for one task+domain. """Compute effective rate for one task+domain.
Throughput split uses sqrt(k) instead of k: two concurrent tasks each run at Throughput split is linear: an employee on k concurrent tasks contributes
1/sqrt(2) 71% speed, not 50%. This makes mild parallelism (2-3 tasks) rate/k to each. Total throughput is constant regardless of concurrency
more efficient than strict sequential. no benefit to parallelizing beyond pipeline flexibility.
Brooks's Law: first 4 employees contribute full rate. Beyond that, Brooks's Law: first 4 employees contribute full rate. Beyond that,
additional employees contribute at 25% (overcrowding overhead). additional employees contribute at 25% (overcrowding overhead).
""" """
from math import sqrt
# Collect (employee_id, effective_base) for this task, sorted best-first # Collect (employee_id, effective_base) for this task, sorted best-first
contributions = [] contributions = []
for a in assignments: for a in assignments:
@ -91,7 +89,7 @@ def _effective_rate_for_task_domain(*, task_id, domain, assignments,
if k <= 0: if k <= 0:
continue continue
base = base_rates.get((a.employee_id, domain), Decimal("0")) base = base_rates.get((a.employee_id, domain), Decimal("0"))
split_rate = base / Decimal(str(round(sqrt(k), 6))) split_rate = base / Decimal(k)
contributions.append(split_rate) contributions.append(split_rate)
# Sort best contributors first so they get full rate # Sort best contributors first so they get full rate
@ -264,14 +262,13 @@ def compute_effective_rates(db, company_id):
out = [] out = []
for req in requirements: for req in requirements:
from math import sqrt
contributions = [] contributions = []
for a in assignments_by_task.get(req.task_id, []): for a in assignments_by_task.get(req.task_id, []):
k = assignment_counts.get(a.employee_id, 0) k = assignment_counts.get(a.employee_id, 0)
if k <= 0: if k <= 0:
continue continue
base = base_rates.get((a.employee_id, req.domain), Decimal("0")) base = base_rates.get((a.employee_id, req.domain), Decimal("0"))
split_rate = base / Decimal(str(round(sqrt(k), 6))) split_rate = base / Decimal(k)
contributions.append(split_rate) contributions.append(split_rate)
contributions.sort(reverse=True) contributions.sort(reverse=True)

View file

@ -277,9 +277,24 @@ def run_benchmark(args):
# Write live transcript alongside the DB so the streamlit dashboard can read it # Write live transcript alongside the DB so the streamlit dashboard can read it
_slug = args.model.replace("/", "_") _slug = args.model.replace("/", "_")
transcript_path = Path("db") / f"{args.config_name}_{args.seed}_{_slug}.transcript.jsonl" transcript_path = Path("db") / f"{args.config_name}_{args.seed}_{_slug}.transcript.jsonl"
if transcript_path.exists(): session_messages_path = Path("db") / f"{args.config_name}_{args.seed}_{_slug}.session.json"
_is_resume = transcript_path.exists() and session_messages_path.exists()
if not _is_resume and transcript_path.exists():
transcript_path.unlink() transcript_path.unlink()
# Restore session messages on resume
if _is_resume:
n_restored = runtime.restore_session_messages(session_id, session_messages_path)
if n_restored > 0:
# Restore run_state turn count from transcript
try:
prior_lines = transcript_path.read_text().strip().split("\n")
prior_turns = len([l for l in prior_lines if l.strip()])
run_state.turn_count = prior_turns
logger.info("Resumed: %d prior turns, %d session messages.", prior_turns, n_restored)
except Exception:
pass
def _write_live_transcript(snapshot, rs, commands): def _write_live_transcript(snapshot, rs, commands):
"""Append one JSONL line per turn for the streamlit dashboard.""" """Append one JSONL line per turn for the streamlit dashboard."""
if not rs.transcript: if not rs.transcript:
@ -296,6 +311,8 @@ def run_benchmark(args):
}, separators=(",", ":")) }, separators=(",", ":"))
with open(transcript_path, "a") as f: with open(transcript_path, "a") as f:
f.write(line + "\n") f.write(line + "\n")
# Save session messages after every turn for crash recovery
runtime.save_session_messages(session_id, session_messages_path)
if use_live: if use_live:
from .dashboard import BenchmarkDashboard from .dashboard import BenchmarkDashboard