Files
HyunjunJeon af5fbfabec 문서 추가: Context Engineering 문서 추가 및 deepagents_sourcecode 한국어 번역
- Context_Engineering.md: 에이전트 컨텍스트 엔지니어링 개념 정리 문서 추가
- Context_Engineering_Research.ipynb: 연구 노트북 업데이트
- deepagents_sourcecode/: docstring과 주석을 한국어로 번역
2026-01-11 17:55:52 +09:00

799 lines
29 KiB
Python
Executable File

#!/usr/bin/env python3
"""jobs 디렉토리의 trial 실행 결과를 분석합니다.
Analyze job trials from a jobs directory.
Scans through trial directories, extracts trajectory data and success metrics.
"""
import argparse
import asyncio
import json
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Optional
from deepagents import create_deep_agent
def scan_dataset_for_solutions(dataset_path: Path) -> dict[str, Path]:
"""Scan a dataset directory and create a mapping from task names to solution paths.
Args:
dataset_path: Path to the dataset directory (e.g., terminal-bench/)
Returns:
Dictionary mapping task names to their solution/solve.sh paths
Example: {"chess-best-move": Path("terminal-bench/7bFm.../chess-best-move/solution/solve.sh")}
"""
task_to_solution: dict[str, Path] = {}
if not dataset_path.exists():
print(f"Warning: Dataset path {dataset_path} does not exist")
return task_to_solution
# Iterate through hash directories
for hash_dir in dataset_path.iterdir():
if not hash_dir.is_dir():
continue
# Iterate through task directories within each hash
for task_dir in hash_dir.iterdir():
if not task_dir.is_dir():
continue
# Check if this is a valid task directory (has solution/solve.sh)
solution_path = task_dir / "solution" / "solve.sh"
if solution_path.exists():
task_name = task_dir.name
# Store the mapping (if task appears multiple times, last one wins)
task_to_solution[task_name] = solution_path
return task_to_solution
def find_task_directory(trial_dir: Path, task_name: str, task_source: str) -> Optional[Path]:
"""Find the task directory for a given trial.
Args:
trial_dir: Path to the trial directory
task_name: Name of the task (from config.json)
task_source: Source of the task (e.g., "terminal-bench")
Returns:
Path to the task directory if found, None otherwise
"""
# Start from the trial directory and search for the task directory
# The structure is typically: {task_source}/{hash}/{task_name}
# Go up to find the task source directory
current = trial_dir.parent.parent # Go up from trial to jobs root
task_source_dir = current / task_source
if not task_source_dir.exists():
return None
# Search for the task in any hash subdirectory
for hash_dir in task_source_dir.iterdir():
if hash_dir.is_dir():
task_dir = hash_dir / task_name
if task_dir.exists():
return task_dir
return None
class TrialStatus(Enum):
"""Status of a trial execution."""
PENDING = "pending"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class Trial:
"""Metadata for a single trial run."""
trial_id: str
status: TrialStatus
reward: Optional[bool] = None
trajectory_path: Optional[Path] = None
reward_path: Optional[Path] = None
exception_path: Optional[Path] = None
solution_path: Optional[Path] = None
trial_dir: Optional[Path] = None
tool_usage: Optional[dict[str, int]] = None
async def parse_reward(reward_path: Path) -> bool:
"""Parse the reward file. Returns True if reward is 1, False otherwise."""
content = reward_path.read_text()
reward_value = content.strip()
return reward_value == "1"
def extract_task_metadata(trial_dir: Path) -> dict:
"""Extract task metadata from config.json and other files.
Args:
trial_dir: Path to the trial directory
Returns:
Dictionary containing task metadata
"""
metadata = {}
# Read config.json
config_path = trial_dir / "config.json"
if config_path.exists():
try:
with open(config_path, "r") as f:
config = json.load(f)
metadata["task_name"] = config.get("task", {}).get("path", "")
metadata["task_source"] = config.get("task", {}).get("source", "")
metadata["git_url"] = config.get("task", {}).get("git_url", "")
metadata["git_commit_id"] = config.get("task", {}).get("git_commit_id", "")
except Exception:
pass
# Read result.json for additional metadata
result_path = trial_dir / "result.json"
if result_path.exists():
try:
with open(result_path, "r") as f:
result = json.load(f)
metadata["reward"] = (
result.get("verifier_result", {}).get("rewards", {}).get("reward", 0.0)
)
metadata["started_at"] = result.get("started_at", "")
metadata["finished_at"] = result.get("finished_at", "")
except Exception:
pass
return metadata
def extract_task_instructions(trajectory_path: Path) -> Optional[str]:
"""Extract the task instructions from the trajectory file.
Looks for the user message in the trajectory steps.
"""
try:
with open(trajectory_path, "r") as f:
trajectory_data = json.load(f)
# Find the user message in the steps
for step in trajectory_data.get("steps", []):
if step.get("source") == "user":
return step.get("message", "")
return None
except Exception:
return None
def count_tool_usage(trajectory_path: Path) -> dict[str, int]:
"""Count tool usage across all steps in a trajectory.
Args:
trajectory_path: Path to the trajectory.json file in ATIF format
Returns:
Dictionary mapping tool names to their usage counts
"""
tool_counts: dict[str, int] = {}
try:
with open(trajectory_path, "r") as f:
trajectory_data = json.load(f)
# Iterate through all steps
for step in trajectory_data.get("steps", []):
# Check if this step has tool calls
tool_calls = step.get("tool_calls")
if tool_calls:
# Count each tool call
for tool_call in tool_calls:
tool_name = tool_call.get("function_name", "unknown")
tool_counts[tool_name] = tool_counts.get(tool_name, 0) + 1
return tool_counts
except Exception:
return {}
def get_task_name_from_trial(trial_dir: Path) -> Optional[str]:
"""Extract the task name from a trial's config.json.
Args:
trial_dir: Path to the trial directory
Returns:
Task name if found, None otherwise
"""
config_path = trial_dir / "config.json"
if config_path.exists():
try:
with open(config_path, "r") as f:
config = json.load(f)
return config.get("task", {}).get("path", "")
except Exception:
pass
return None
def enrich_trials_with_solutions(
trials: list[Trial], solution_mapping: dict[str, Path]
) -> list[Trial]:
"""Update trials with solution paths from a pre-computed solution mapping.
Args:
trials: List of Trial objects to enrich
solution_mapping: Dictionary mapping task names to solution paths
Returns:
The same list of trials (modified in place) for convenience
"""
for trial in trials:
if trial.trial_dir:
task_name = get_task_name_from_trial(trial.trial_dir)
if task_name and task_name in solution_mapping:
trial.solution_path = solution_mapping[task_name]
return trials
async def analyze_trial(
trial_dir: Path, solution_mapping: Optional[dict[str, Path]] = None
) -> Optional[Trial]:
"""Analyze a single trial directory.
Returns a Trial object even if trajectory or reward files are missing so incomplete
trials can be reported.
Status is determined as follows:
- FAILED: If exception.txt exists or reward is False
- COMPLETED: If reward is True
- PENDING: Otherwise (no reward, no exception)
"""
trajectory_path = trial_dir / "agent" / "trajectory.json"
reward_path = trial_dir / "verifier" / "reward.txt"
exception_path = trial_dir / "exception.txt"
# Read config to find the task directory for the solution
config_path = trial_dir / "config.json"
solution_path = None
# First try to use the solution_mapping if provided
if solution_mapping:
task_name = get_task_name_from_trial(trial_dir)
if task_name and task_name in solution_mapping:
solution_path = solution_mapping[task_name]
# Fall back to searching for the task directory
if not solution_path and config_path.exists():
try:
with open(config_path, "r") as f:
config = json.load(f)
task_name = config.get("task", {}).get("path", "")
task_source = config.get("task", {}).get("source", "")
if task_name and task_source:
task_dir = find_task_directory(trial_dir, task_name, task_source)
if task_dir:
solution_path = task_dir / "solution" / "solve.sh"
except Exception:
pass
traj_exists = trajectory_path.exists()
reward_exists = reward_path.exists()
exception_exists = exception_path.exists()
solution_exists = solution_path and solution_path.exists()
reward_value: Optional[bool]
if reward_exists:
reward_value = reward_path.read_text().strip() == "1"
else:
reward_value = None
# Determine status
if exception_exists:
status = TrialStatus.FAILED
elif reward_value is True:
status = TrialStatus.COMPLETED
elif reward_value is False:
status = TrialStatus.FAILED
else:
status = TrialStatus.PENDING
# Count tool usage if trajectory exists
tool_usage = None
if traj_exists:
tool_usage = count_tool_usage(trajectory_path)
trial_id = trial_dir.name
return Trial(
trial_id=trial_id,
status=status,
reward=reward_value,
trajectory_path=trajectory_path if traj_exists else None,
reward_path=reward_path if reward_exists else None,
exception_path=exception_path if exception_exists else None,
solution_path=solution_path if solution_exists else None,
trial_dir=trial_dir,
tool_usage=tool_usage,
)
async def scan_jobs_directory(
jobs_dir: Path, solution_mapping: Optional[dict[str, Path]] = None
) -> list[Trial]:
"""Scan the jobs directory and extract all trial metadata.
Args:
jobs_dir: Path to the jobs directory containing trial subdirectories
solution_mapping: Optional pre-computed mapping from task names to solution paths.
If not provided, solutions will be searched for individually.
"""
if not jobs_dir.exists():
print(f"Error: Directory {jobs_dir} does not exist")
return []
# List all directories within jobs_dir - each directory is a trial
trial_dirs: list[Path] = [d for d in jobs_dir.iterdir() if d.is_dir()]
print(f"Found {len(trial_dirs)} trial directories")
trials: list[Trial] = []
for trial_dir in trial_dirs:
trial = await analyze_trial(trial_dir, solution_mapping=solution_mapping)
trials.append(trial)
return trials
def print_summary(trials: list[Trial]) -> None:
"""Print a summary of the analyzed trials."""
print("\n" + "=" * 80)
print("ANALYSIS SUMMARY")
print("=" * 80)
print(f"Total trials: {len(trials)}")
completed = sum(1 for t in trials if t.status == TrialStatus.COMPLETED)
failed = sum(1 for t in trials if t.status == TrialStatus.FAILED)
pending = sum(1 for t in trials if t.status == TrialStatus.PENDING)
print(f"Completed: {completed}")
print(f"Failed: {failed}")
print(f"Pending: {pending}")
if trials:
complete_trials = completed + failed
if complete_trials > 0:
success_rate = (completed / complete_trials) * 100
print(f"Success rate (excluding pending): {success_rate:.1f}%")
# Also show success rate including pending trials
total_trials = len(trials)
if total_trials > 0:
overall_success_rate = (completed / total_trials) * 100
print(f"Success rate (of all trials): {overall_success_rate:.1f}%")
# Compute overall tool usage across all trials
overall_tool_usage: dict[str, int] = {}
trials_with_tools = 0
for trial in trials:
if trial.tool_usage:
trials_with_tools += 1
for tool_name, count in trial.tool_usage.items():
overall_tool_usage[tool_name] = overall_tool_usage.get(tool_name, 0) + count
if overall_tool_usage:
print(f"\n{'=' * 80}")
print("OVERALL TOOL USAGE")
print(f"{'=' * 80}")
print(f"Trials with tool usage data: {trials_with_tools}/{len(trials)}")
print("\nTool usage across all trials:")
# Sort by usage count (descending) then alphabetically
sorted_overall_tools = sorted(overall_tool_usage.items(), key=lambda x: (-x[1], x[0]))
for tool_name, count in sorted_overall_tools:
print(f" {tool_name}: {count}")
print("\n" + "=" * 80)
print("TRIAL DETAILS")
print("=" * 80)
# Sort trials: COMPLETED first, then FAILED, then PENDING
status_order = {TrialStatus.COMPLETED: 0, TrialStatus.FAILED: 1, TrialStatus.PENDING: 2}
sorted_trials = sorted(trials, key=lambda t: status_order[t.status])
for trial in sorted_trials:
if trial.status == TrialStatus.COMPLETED:
status = "✓ COMPLETED"
elif trial.status == TrialStatus.FAILED:
status = "✗ FAILED"
else:
status = "⋯ PENDING"
print(f"\n{status} | {trial.trial_id}")
if trial.trajectory_path:
print(f" Trajectory: {trial.trajectory_path}")
else:
print(" Trajectory: MISSING")
if trial.reward_path:
print(f" Reward file: {trial.reward_path}")
else:
print(" Reward file: MISSING")
if trial.exception_path and trial.exception_path.exists():
try:
exception_content = trial.exception_path.read_text()
# Show last 100 characters
exception_snippet = (
exception_content[-100:] if len(exception_content) > 100 else exception_content
)
print(f" Exception: ...{exception_snippet}")
except Exception:
print(" Exception: [Error reading exception file]")
# Display tool usage if available
if trial.tool_usage:
# Sort tools by usage count (descending) then alphabetically
sorted_tools = sorted(trial.tool_usage.items(), key=lambda x: (-x[1], x[0]))
tool_summary = ", ".join([f"{tool}: {count}" for tool, count in sorted_tools])
print(f" Tool usage: {tool_summary}")
ANALYSIS_PROMPT = """\
# Trajectory Analysis Prompt
You are analyzing an agent execution trajectory. Your goal is to identify what happened during execution and, if the trial failed, determine why.
## IMPORTANT: Trial Status
The trial status will be explicitly provided to you. This status is the ground truth:
- **FAILED**: The agent did not successfully complete the task (reward = 0 or exception occurred)
- **PENDING**: The trial has not finished executing yet
- **COMPLETED**: The agent successfully completed the task (reward = 1)
**If the status is FAILED, then something went wrong, even if the agent reported success or the trajectory appears successful.** Your job is to identify what went wrong by carefully examining the details.
## Reference Solution
A reference solution script (solve.sh) will be provided when available. This script shows the correct approach to solving the task. Use this to:
- Compare the agent's approach against the known working solution
- Identify where the agent's actions diverged from the correct approach
- Understand what steps or commands the agent missed or executed incorrectly
- Determine if the agent used different tools/methods that led to failure
## Trajectory Format
The trajectory is in ATIF (Agent Trajectory Interchange Format) with sequential steps:
- `source`: Who generated the step (system/user/agent)
- `message`: The content of the step
- `tool_calls`: (if present) Tools the agent attempted to use
- `observation`: (if present) Results from tool execution
## Analysis Task
Review the trajectory with careful attention to subtle details and provide:
### 1. FAILURE IDENTIFICATION (for FAILED trials)
**Start by comparing the user's request to the agent's actual actions:**
- What exactly did the user ask for? (Quote the specific request)
- What exactly did the agent do? (Quote the actual tool calls and parameters)
- If a reference solution is provided, how does the agent's approach differ from it?
- Are there any discrepancies between what was requested and what was executed?
**Then identify:**
- **Failure Step**: Which step number failed or where did things go wrong?
- **What Failed**: Describe what went wrong (tool error, incorrect logic, incomplete execution, subtle mistakes, etc.)
- **Error Details**: Quote any error messages or failure indicators
- **Subtle Issues**: Look for problems that aren't obvious errors - small differences in parameters, values, or execution that don't match the request
**Special Case: Max Iterations Reached**
If the agent failed due to reaching the maximum iteration/recursion limit:
- **Evaluate Progress**: Was the agent making sensible progress toward the solution?
- **Direction Assessment**: Were the agent's actions moving it closer to completing the task?
- **Correctness**: Despite not finishing, were the steps taken correct and logical?
- **Compare to Solution**: If a reference solution is provided, was the agent following a similar approach?
- **Estimate Completion**: How close was the agent to completing the task when it hit the limit?
- **Root Cause**: Was the limit hit due to:
- Agent making good progress but task simply required more steps?
- Agent spinning in circles or repeating ineffective actions?
- Agent pursuing a suboptimal approach that would take too many steps?
- Agent getting stuck on a subtask or error recovery loop?
### 2. EXECUTION ANALYSIS
- **What the Agent Did**: Trace the agent's actions step by step
- **What Was Expected**: Based on the user's request and reference solution (if provided), what should have happened?
- **Where It Went Wrong**: Identify the specific point where the agent's actions diverged from what was needed
- **Tool Usage**: Examine all tool parameters carefully - verify they match what the user requested
### 3. ROOT CAUSE
Determine the underlying cause:
- Is this incorrect tool usage (wrong tool or wrong parameters)?
- Is this a logical/reasoning error (agent made wrong decision)?
- Is this a tool execution error (tool failed or returned error)?
- Is this incomplete execution (agent stopped too early)?
- Is this a resource/permission error?
- Is this agent confusion about the task requirements?
- Is this a subtle parameter mismatch (values that look correct but differ from the request)?
### 4. SUGGESTED IMPROVEMENTS
If clear from the trajectory, suggest:
- What the agent should have done differently (reference the solution script if available)
- Which component or capability needs improvement
- How to prevent this type of failure
## Guidelines
- **Pay close attention to details**: Even if the agent reported success, if the trial failed, find what went wrong
- **Use the reference solution**: When provided, compare the agent's approach systematically against it
- Look for subtle issues like path mistakes, incorrect values, or logical errors
- Be concise but specific
- Quote exact error messages when present
- Focus on actionable insights
- Identify patterns in agent behavior that led to failure
- Don't assume the agent is correct just because it reported success
""" # noqa: E501
async def analyze_failed_trial(trial: Trial, analyze_pending: bool = False) -> Optional[str]:
"""
Run deep agent analysis on a failed or pending trial trajectory.
Args:
trial: The trial to analyze
analyze_pending: If True, analyze pending trials in addition to failed ones
Returns:
Analysis result as a string, or None if trajectory cannot be read
"""
# Create the deep agent for trajectory analysis
analysis_agent = create_deep_agent(tools=[], system_prompt=ANALYSIS_PROMPT)
# Skip completed trials
if trial.status == TrialStatus.COMPLETED:
return None
# Skip pending trials unless explicitly requested
if trial.status == TrialStatus.PENDING and not analyze_pending:
return None
if not trial.trajectory_path or not trial.trajectory_path.exists():
return None
# Read the trajectory file
with open(trial.trajectory_path, "r") as f:
trajectory_data = json.load(f)
# Format trajectory as JSON string for the prompt
trajectory_json = json.dumps(trajectory_data, indent=2)
# Read the solution script if available
solution_content = None
if trial.solution_path and trial.solution_path.exists():
solution_content = trial.solution_path.read_text()
# Create the user message with the trajectory and explicit status
status_desc = "failed" if trial.status == TrialStatus.FAILED else "pending"
status_upper = trial.status.value.upper()
user_message = f"**TRIAL STATUS: {status_upper}**\n\n"
# Add reference solution if available
if solution_content:
user_message += (
f"**REFERENCE SOLUTION (solve.sh):**\n\n```bash\n{solution_content}\n```\n\n"
)
else:
user_message += "**REFERENCE SOLUTION:** Not provided\n\n"
user_message += (
f"Please analyze this {status_desc} agent trajectory:\n\n```json\n{trajectory_json}\n```\n"
)
# Run the deep agent analysis
result = analysis_agent.invoke({"messages": [{"role": "user", "content": user_message}]})
# Extract the analysis from the response
analysis = result["messages"][-1].content
return analysis
async def write_trial_analysis(
trial: Trial,
trial_dir: Path,
output_dir: Path,
summary_only: bool = False,
analyze_pending: bool = False,
) -> Optional[Path]:
"""
Analyze a failed or pending trial and write the results to a file.
Args:
trial: The trial to analyze
trial_dir: Path to the trial directory
output_dir: Directory where analysis files should be written
summary_only: If True, skip LLM analysis and only write metadata summary
analyze_pending: If True, analyze pending trials in addition to failed ones
Returns:
Path to the written analysis file, or None if analysis was skipped
"""
# Skip completed trials
if trial.status == TrialStatus.COMPLETED:
return None
# Skip pending trials unless explicitly requested
if trial.status == TrialStatus.PENDING and not analyze_pending:
return None
# Extract metadata
metadata = extract_task_metadata(trial_dir)
# Extract task instructions
task_instructions = None
if trial.trajectory_path:
task_instructions = extract_task_instructions(trial.trajectory_path)
# Run the LLM analysis unless summary_only is True
analysis = None
if not summary_only:
analysis = await analyze_failed_trial(trial, analyze_pending=analyze_pending)
if not analysis:
# If we couldn't get analysis (e.g., missing trajectory), skip this trial
return None
# Create output file
output_dir.mkdir(parents=True, exist_ok=True)
output_file = output_dir / f"{trial.trial_id}.md"
# Write the analysis with metadata
with open(output_file, "w") as f:
f.write(f"# Analysis: {trial.trial_id}\n\n")
# Write metadata section
f.write("## Task Metadata\n\n")
f.write(f"- **Trial ID**: {trial.trial_id}\n")
f.write(f"- **Status**: {trial.status.value}\n")
f.write(f"- **Task Name**: {metadata.get('task_name', 'N/A')}\n")
f.write(f"- **Task Source**: {metadata.get('task_source', 'N/A')}\n")
f.write(f"- **Reward**: {metadata.get('reward', 0.0)}\n")
if metadata.get("git_url"):
f.write(f"- **Git URL**: {metadata['git_url']}\n")
if metadata.get("git_commit_id"):
f.write(f"- **Git Commit**: {metadata['git_commit_id']}\n")
if metadata.get("started_at"):
f.write(f"- **Started**: {metadata['started_at']}\n")
if metadata.get("finished_at"):
f.write(f"- **Finished**: {metadata['finished_at']}\n")
# Write task instructions
if task_instructions:
f.write("\n## Task Instructions\n\n")
f.write("```\n")
f.write(task_instructions)
f.write("\n```\n")
# Write the analysis if not summary_only
if analysis:
f.write("\n## Failure Analysis\n\n")
f.write(analysis)
f.write("\n")
elif summary_only:
f.write("\n## Analysis\n\n")
f.write("*Summary only mode - detailed LLM analysis skipped*\n")
return output_file
async def main():
"""Main entry point."""
parser = argparse.ArgumentParser(description="Analyze job trials from a jobs directory")
parser.add_argument(
"jobs_dir", type=Path, help="Path to the jobs directory (e.g., jobs-terminal-bench/)"
)
parser.add_argument(
"--dataset",
"-d",
type=Path,
help="Path to the dataset directory (e.g., terminal-bench/) to scan for solution files",
)
parser.add_argument(
"--output-dir",
type=Path,
help="Output directory for detailed analysis files (one per failed/pending trial)",
)
parser.add_argument(
"--summary-only",
action="store_true",
help="Only print summary, skip detailed LLM analysis of trials",
)
parser.add_argument(
"--analyze-pending",
action="store_true",
help="Analyze pending trials in addition to failed trials",
)
parser.add_argument(
"--json",
action="store_true",
help="Output results as JSON instead of human-readable format",
)
args = parser.parse_args()
# Scan dataset for solutions if provided
solution_mapping = None
if args.dataset:
print(f"Scanning dataset directory: {args.dataset}")
solution_mapping = scan_dataset_for_solutions(args.dataset)
print(f"Found {len(solution_mapping)} tasks with solutions\n")
# Scan and analyze all trials
trials = await scan_jobs_directory(args.jobs_dir, solution_mapping=solution_mapping)
# Print human-readable summary
print_summary(trials)
# If output directory specified, run analysis on trials
if args.output_dir:
# Determine which trials to analyze based on status
trials_to_analyze = [
t for t in trials
if t.status == TrialStatus.FAILED or (args.analyze_pending and t.status == TrialStatus.PENDING)
]
if not trials_to_analyze:
status_desc = "failed or pending" if args.analyze_pending else "failed"
print(f"\nNo {status_desc} trials to analyze.")
else:
print(f"\n{'=' * 80}")
analysis_mode = "SUMMARY" if args.summary_only else "DEEP ANALYSIS"
trial_types = "FAILED/PENDING" if args.analyze_pending else "FAILED"
print(f"RUNNING {analysis_mode} ON {trial_types} TRIALS")
print(f"{'=' * 80}")
print(f"Processing {len(trials_to_analyze)} trials...")
print(f"Output directory: {args.output_dir}")
if args.summary_only:
print("Mode: Summary only (LLM analysis disabled)")
if args.analyze_pending:
print("Mode: Including pending trials")
print()
# Analyze each trial
for i, trial in enumerate(trials_to_analyze, 1):
status_label = trial.status.value.upper()
print(f"[{i}/{len(trials_to_analyze)}] Analyzing {trial.trial_id} ({status_label})...")
if trial.trial_dir is None:
print(f" Warning: No trial directory found for {trial.trial_id}")
continue
# Run the analysis and write to file
try:
output_file = await write_trial_analysis(
trial,
trial.trial_dir,
args.output_dir,
summary_only=args.summary_only,
analyze_pending=args.analyze_pending,
)
if output_file:
print(f" ✓ Analysis written to: {output_file}")
else:
print(" ✗ Skipped (no trajectory or already completed)")
except Exception as e:
print(f" ✗ Error: {e}")
print(f"\n{'=' * 80}")
print(f"Analysis complete. Results saved to: {args.output_dir}")
print(f"{'=' * 80}")
if __name__ == "__main__":
asyncio.run(main())