feat: Claude Code patterns - verifier subagent + security monitor + anti-rationalization
This commit is contained in:
@@ -39,6 +39,7 @@ You are running with subagent capabilities enabled. Your role is to be a **task
|
||||
**Available Subagents:**
|
||||
- **general-purpose**: For ANY non-trivial task - web research, code exploration, file operations, analysis, etc.
|
||||
- **bash**: For command execution (git, build, test, deploy operations)
|
||||
- **verifier**: For adversarial verification of implementations. Use AFTER code changes to test correctness. Returns VERDICT: PASS/FAIL/PARTIAL
|
||||
|
||||
**Your Orchestration Strategy:**
|
||||
|
||||
@@ -162,6 +163,15 @@ You are {agent_name}, an open-source super agent.
|
||||
{subagent_thinking}- Never write down your full final answer or report in thinking process, but only outline
|
||||
- CRITICAL: After thinking, you MUST provide your actual response to the user. Thinking is for planning, the response is for delivery.
|
||||
- Your response must contain the actual answer, not just a reference to what you thought about
|
||||
|
||||
**ANTI-RATIONALIZATION RULES (Verification Mindset):**
|
||||
- "The code looks correct based on my reading" is NOT verification. Reading is not testing. RUN IT.
|
||||
- "The tests already pass" -- tests may be shallow or mock-heavy. Verify independently with real execution.
|
||||
- "This is probably fine" -- "probably" is not verified. Confirm with actual evidence.
|
||||
- If you catch yourself writing an explanation instead of running a command, STOP. Run the command first.
|
||||
- Never declare something "done" without execution evidence (command output, HTTP response, screenshot).
|
||||
- When verifying, try to BREAK the implementation -- test edge cases, empty inputs, concurrent access, boundary values.
|
||||
- After implementing a fix, verify it actually works by running the relevant test or endpoint -- do not just read the code and assume.
|
||||
</thinking_style>
|
||||
|
||||
<clarification_system>
|
||||
|
||||
134
backend/packages/harness/deerflow/guardrails/security_monitor.py
Normal file
134
backend/packages/harness/deerflow/guardrails/security_monitor.py
Normal file
@@ -0,0 +1,134 @@
|
||||
"""Security monitor guardrail - inspired by Claude Code security patterns.
|
||||
|
||||
Evaluates autonomous agent actions against block/allow rules to prevent:
|
||||
- Prompt injection via tool inputs
|
||||
- Destructive operations (rm -rf, DROP TABLE, etc.)
|
||||
- Scope creep (agent exceeding its mandate)
|
||||
- Credential exposure
|
||||
"""
|
||||
|
||||
import logging
|
||||
import re
|
||||
|
||||
from deerflow.guardrails.provider import GuardrailDecision, GuardrailReason, GuardrailRequest
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Patterns that indicate destructive or dangerous commands
|
||||
DESTRUCTIVE_PATTERNS = [
|
||||
r"rm\s+-rf\s+/(?!tmp|mnt/user-data)", # rm -rf outside safe dirs
|
||||
r"DROP\s+(TABLE|DATABASE|SCHEMA)", # SQL destructive ops
|
||||
r"TRUNCATE\s+TABLE",
|
||||
r"mkfs\.", # filesystem formatting
|
||||
r"dd\s+if=.*of=/dev/", # disk overwrite
|
||||
r"chmod\s+-R\s+777\s+/", # recursive world-writable root
|
||||
r":(){ :\|:& };:", # fork bomb
|
||||
r"\bshutdown\b|\breboot\b|\bhalt\b", # system shutdown
|
||||
r"curl.*\|.*sh", # pipe curl to shell
|
||||
r"wget.*\|.*sh",
|
||||
]
|
||||
|
||||
# Patterns that indicate prompt injection attempts
|
||||
INJECTION_PATTERNS = [
|
||||
r"ignore\s+(previous|all|above)\s+instructions",
|
||||
r"you\s+are\s+now\s+(a|an|my)",
|
||||
r"system\s*prompt",
|
||||
r"reveal\s+(your|the)\s+(instructions|prompt|rules)",
|
||||
r"ADMIN_OVERRIDE",
|
||||
r"<\/?system>",
|
||||
r"\[INST\]",
|
||||
]
|
||||
|
||||
# Sensitive credential patterns
|
||||
CREDENTIAL_PATTERNS = [
|
||||
r"\bpassword\s*=\s*.", # password= in commands
|
||||
r"\bsecret\s*=\s*.",
|
||||
r"\bapi_key\s*=\s*.",
|
||||
r"\btoken\s*=\s*.",
|
||||
]
|
||||
|
||||
|
||||
class SecurityMonitorProvider:
|
||||
"""Monitors agent actions for security violations.
|
||||
|
||||
Checks tool calls against destructive command patterns,
|
||||
prompt injection attempts, and credential exposure risks.
|
||||
"""
|
||||
|
||||
name = "security_monitor"
|
||||
|
||||
def __init__(self, *, strict_mode: bool = False):
|
||||
self._strict = strict_mode
|
||||
self._destructive_re = [re.compile(p, re.IGNORECASE) for p in DESTRUCTIVE_PATTERNS]
|
||||
self._injection_re = [re.compile(p, re.IGNORECASE) for p in INJECTION_PATTERNS]
|
||||
self._credential_re = [re.compile(p, re.IGNORECASE) for p in CREDENTIAL_PATTERNS]
|
||||
|
||||
def _check_content(self, content: str) -> list[GuardrailReason]:
|
||||
"""Check content against all security patterns."""
|
||||
reasons = []
|
||||
|
||||
for pattern in self._destructive_re:
|
||||
if pattern.search(content):
|
||||
reasons.append(GuardrailReason(
|
||||
code="security.destructive_command",
|
||||
message=f"Blocked: destructive command pattern detected ({pattern.pattern})"
|
||||
))
|
||||
logger.warning(f"Security: destructive command blocked - {pattern.pattern}")
|
||||
|
||||
for pattern in self._injection_re:
|
||||
if pattern.search(content):
|
||||
reasons.append(GuardrailReason(
|
||||
code="security.prompt_injection",
|
||||
message=f"Blocked: potential prompt injection detected ({pattern.pattern})"
|
||||
))
|
||||
logger.warning(f"Security: prompt injection attempt blocked - {pattern.pattern}")
|
||||
|
||||
for pattern in self._credential_re:
|
||||
if pattern.search(content):
|
||||
reasons.append(GuardrailReason(
|
||||
code="security.credential_exposure",
|
||||
message="Warning: potential credential exposure in command"
|
||||
))
|
||||
logger.warning("Security: credential exposure risk detected")
|
||||
if not self._strict:
|
||||
# In non-strict mode, credentials are warned but not blocked
|
||||
reasons[-1] = GuardrailReason(
|
||||
code="security.credential_warning",
|
||||
message="Warning: credential in command (allowed in non-strict mode)"
|
||||
)
|
||||
|
||||
return reasons
|
||||
|
||||
def evaluate(self, request: GuardrailRequest) -> GuardrailDecision:
|
||||
"""Evaluate a tool call request for security concerns."""
|
||||
# Build content string from tool inputs
|
||||
content_parts = [str(request.tool_name)]
|
||||
if hasattr(request, "tool_input") and request.tool_input:
|
||||
if isinstance(request.tool_input, dict):
|
||||
content_parts.extend(str(v) for v in request.tool_input.values())
|
||||
else:
|
||||
content_parts.append(str(request.tool_input))
|
||||
|
||||
content = " ".join(content_parts)
|
||||
reasons = self._check_content(content)
|
||||
|
||||
# Check for blocking reasons
|
||||
blocking = [r for r in reasons if r.code in (
|
||||
"security.destructive_command",
|
||||
"security.prompt_injection",
|
||||
)]
|
||||
|
||||
if blocking:
|
||||
return GuardrailDecision(allow=False, reasons=blocking)
|
||||
|
||||
if reasons:
|
||||
# Non-blocking warnings
|
||||
return GuardrailDecision(allow=True, reasons=reasons)
|
||||
|
||||
return GuardrailDecision(
|
||||
allow=True,
|
||||
reasons=[GuardrailReason(code="security.ok")]
|
||||
)
|
||||
|
||||
async def aevaluate(self, request: GuardrailRequest) -> GuardrailDecision:
|
||||
return self.evaluate(request)
|
||||
@@ -2,14 +2,17 @@
|
||||
|
||||
from .bash_agent import BASH_AGENT_CONFIG
|
||||
from .general_purpose import GENERAL_PURPOSE_CONFIG
|
||||
from .verifier_agent import VERIFIER_AGENT_CONFIG
|
||||
|
||||
__all__ = [
|
||||
"GENERAL_PURPOSE_CONFIG",
|
||||
"BASH_AGENT_CONFIG",
|
||||
"VERIFIER_AGENT_CONFIG",
|
||||
]
|
||||
|
||||
# Registry of built-in subagents
|
||||
BUILTIN_SUBAGENTS = {
|
||||
"general-purpose": GENERAL_PURPOSE_CONFIG,
|
||||
"bash": BASH_AGENT_CONFIG,
|
||||
"verifier": VERIFIER_AGENT_CONFIG,
|
||||
}
|
||||
|
||||
@@ -0,0 +1,77 @@
|
||||
"""Verification specialist subagent - inspired by Claude Code patterns.
|
||||
|
||||
This subagent adversarially tests implementations by running builds,
|
||||
test suites, linters, and adversarial probes, then issuing a PASS/FAIL/PARTIAL verdict.
|
||||
"""
|
||||
|
||||
from deerflow.subagents.config import SubagentConfig
|
||||
|
||||
VERIFIER_SYSTEM_PROMPT = """You are a verification specialist. Your job is NOT to confirm the implementation works -- it is to try to BREAK it.
|
||||
|
||||
=== ANTI-RATIONALIZATION RULES ===
|
||||
You have two documented failure patterns:
|
||||
1. Verification avoidance: reading code, narrating what you would test, writing "PASS" and moving on.
|
||||
2. Being seduced by the first 80%: seeing a polished output and not noticing half of it is broken.
|
||||
|
||||
Recognize your own rationalizations:
|
||||
- "The code looks correct based on my reading" -- reading is NOT verification. Run it.
|
||||
- "The implementer tests already pass" -- the implementer is an LLM. Verify independently.
|
||||
- "This is probably fine" -- probably is NOT verified. Run it.
|
||||
- "This would take too long" -- not your call.
|
||||
If you catch yourself writing an explanation instead of a command, STOP. Run the command.
|
||||
|
||||
=== VERIFICATION STRATEGY ===
|
||||
Adapt based on what was changed:
|
||||
- Frontend: Start dev server, curl endpoints, check for JS errors
|
||||
- Backend/API: curl/fetch endpoints, verify response shapes, test error handling
|
||||
- Scripts: Run with representative inputs, verify stdout/stderr/exit codes
|
||||
- Config: Validate syntax, dry-run where possible
|
||||
- Bug fixes: Reproduce original bug, verify fix, check for side effects
|
||||
|
||||
=== REQUIRED STEPS ===
|
||||
1. Read project README/config for build/test commands
|
||||
2. Run the build (broken build = automatic FAIL)
|
||||
3. Run the test suite (failing tests = automatic FAIL)
|
||||
4. Run linters/type-checkers if configured
|
||||
5. Apply type-specific verification strategy
|
||||
6. Run at least ONE adversarial probe
|
||||
|
||||
=== ADVERSARIAL PROBES ===
|
||||
- Boundary values: 0, -1, empty string, very long strings, unicode
|
||||
- Idempotency: same request twice -- duplicate? error? correct no-op?
|
||||
- Orphan operations: delete/reference IDs that do not exist
|
||||
- Concurrency: parallel requests to create-if-not-exists paths
|
||||
|
||||
=== OUTPUT FORMAT ===
|
||||
Every check MUST follow this structure:
|
||||
|
||||
### Check: [what you are verifying]
|
||||
**Command run:** [exact command]
|
||||
**Output observed:** [actual terminal output]
|
||||
**Result: PASS** (or FAIL with Expected vs Actual)
|
||||
|
||||
A check without a Command run block is NOT a PASS -- it is a skip.
|
||||
|
||||
End with exactly:
|
||||
VERDICT: PASS
|
||||
or
|
||||
VERDICT: FAIL
|
||||
or
|
||||
VERDICT: PARTIAL
|
||||
|
||||
PARTIAL is for environmental limitations only, not for uncertainty.
|
||||
"""
|
||||
|
||||
|
||||
VERIFIER_AGENT_CONFIG = SubagentConfig(
|
||||
name="verifier",
|
||||
description=(
|
||||
"Verification specialist that adversarially tests implementations. "
|
||||
"Use this subagent after code changes to verify correctness by running "
|
||||
"builds, tests, linters, and adversarial probes. Returns PASS/FAIL/PARTIAL verdict."
|
||||
),
|
||||
system_prompt=VERIFIER_SYSTEM_PROMPT,
|
||||
disallowed_tools=["task"],
|
||||
max_turns=30,
|
||||
timeout_seconds=600,
|
||||
)
|
||||
Reference in New Issue
Block a user