eventualities = (
{
"title": "Secure database learn",
"device": research_db,
"kwargs": {
"desk": "prospects",
"operation": "choose",
"kind": "choose",
"sensitivity": "medium"
}
},
{
"title": "Blocked harmful database motion",
"device": research_db,
"kwargs": {
"desk": "prospects",
"operation": "drop",
"kind": "drop_table",
"sensitivity": "crucial"
}
},
{
"title": "Exterior e mail requiring approval",
"device": research_email,
"kwargs": {
"to": "(e mail protected)",
"recipient_domain": "instance.com",
"topic": "Quarterly replace",
"physique": "Sharing a non-confidential quarterly replace.",
"kind": "send_email",
"sensitivity": "medium"
}
},
{
"title": "Exterior e mail denied attributable to approval rejection",
"device": research_email,
"kwargs": {
"to": "(e mail protected)",
"recipient_domain": "instance.com",
"topic": "Confidential technique",
"physique": "This incorporates confidential technique.",
"kind": "send_email",
"sensitivity": "crucial"
}
},
{
"title": "Secure sandbox shell command",
"device": ops_shell,
"kwargs": {
"command": "echo Agent governance is lively",
"kind": "shell_exec",
"sensitivity": "low"
}
},
{
"title": "Harmful shell command blocked",
"device": ops_shell,
"kwargs": {
"command": "rm -rf /content material/one thing",
"kind": "shell_exec",
"sensitivity": "crucial"
}
},
{
"title": "Low-trust agent blocked from delicate information",
"device": shadow_db,
"kwargs": {
"desk": "executive_compensation",
"operation": "choose",
"kind": "choose",
"sensitivity": "crucial"
}
},
{
"title": "Monetary switch requiring approval",
"device": finance_transfer,
"kwargs": {
"quantity": 2500,
"vacation spot": "vendor-123",
"kind": "transfer_money",
"sensitivity": "excessive"
}
},
{
"title": "Giant monetary switch rejected",
"device": finance_transfer,
"kwargs": {
"quantity": 15000,
"vacation spot": "vendor-999",
"kind": "transfer_money",
"sensitivity": "crucial"
}
},
)
outcomes = ()
for state of affairs in eventualities:
strive:
output = state of affairs("device")(**state of affairs("kwargs"))
outcomes.append({
"state of affairs": state of affairs("title"),
"standing": "executed",
"output": output
})
besides Exception as e:
outcomes.append({
"state of affairs": state of affairs("title"),
"standing": "blocked_or_pending",
"error": str(e)
})
audit_df = audit_log.to_dataframe()
display_cols = (
"timestamp",
"agent_name",
"tool_name",
"choice",
"matched_rule",
"severity",
"cause",
"record_hash"
)
show(audit_df(display_cols))
test_cases = (
{
"title": "drop_table should be denied",
"id": research_agent,
"tool_name": "query_database",
"motion": {"kind": "drop_table", "sensitivity": "crucial", "autonomous": True},
"anticipated": "deny"
},
{
"title": "secure choose ought to be allowed",
"id": research_agent,
"tool_name": "query_database",
"motion": {"kind": "choose", "sensitivity": "low", "autonomous": True},
"anticipated": "enable"
},
{
"title": "exterior e mail ought to require approval",
"id": research_agent,
"tool_name": "send_email",
"motion": {
"kind": "send_email",
"recipient_domain": "instance.com",
"sensitivity": "medium",
"autonomous": True
},
"anticipated": "require_approval"
},
{
"title": "low belief delicate entry denied",
"id": unknown_agent,
"tool_name": "query_database",
"motion": {"kind": "choose", "sensitivity": "crucial", "autonomous": True},
"anticipated": "deny"
},
{
"title": "shell command ought to enter sandbox",
"id": ops_agent,
"tool_name": "shell_exec",
"motion": {
"kind": "shell_exec",
"command": "echo whats up",
"sensitivity": "low",
"autonomous": True
},
"anticipated": "sandbox"
},
)
test_results = ()
for check in test_cases:
choice = engine.consider(
id=check("id"),
tool_name=check("tool_name"),
motion=check("motion")
)
handed = choice.choice == check("anticipated")
test_results.append({
"check": check("title"),
"anticipated": check("anticipated"),
"precise": choice.choice,
"handed": handed,
"matched_rule": choice.matched_rule
})
test_df = pd.DataFrame(test_results)
show(test_df)
engine.activate_kill_switch()
strive:
research_db(
desk="prospects",
operation="choose",
kind="choose",
sensitivity="low"
)
besides Exception as e:
move
engine.deactivate_kill_switch()
audit_df = audit_log.to_dataframe()
abstract = (
audit_df
.groupby(("choice", "severity"), dropna=False)
.measurement()
.reset_index(title="depend")
.sort_values("depend", ascending=False)
)
show(abstract)
agent_summary = (
audit_df
.groupby(("agent_name", "choice"))
.measurement()
.reset_index(title="depend")
.sort_values(("agent_name", "depend"), ascending=(True, False))
)
show(agent_summary)
decision_counts = audit_df("choice").value_counts()
plt.determine(figsize=(8, 5))
decision_counts.plot(form="bar")
plt.title("Governance Selections Throughout Agent Actions")
plt.xlabel("Choice")
plt.ylabel("Depend")
plt.xticks(rotation=30)
plt.tight_layout()
plt.present()
severity_counts = audit_df("severity").fillna("none").value_counts()
plt.determine(figsize=(8, 5))
severity_counts.plot(form="bar")
plt.title("Governance Occasions by Severity")
plt.xlabel("Severity")
plt.ylabel("Depend")
plt.xticks(rotation=30)
plt.tight_layout()
plt.present()
G = nx.DiGraph()
for _, row in audit_df.iterrows():
agent_node = f"Agent: {row('agent_name')}"
tool_node = f"Instrument: {row('tool_name')}"
decision_node = f"Choice: {row('choice')}"
rule_node = f"Rule: {row('matched_rule')}" if pd.notna(row("matched_rule")) else "Rule: default"
G.add_node(agent_node, node_type="agent")
G.add_node(tool_node, node_type="device")
G.add_node(decision_node, node_type="choice")
G.add_node(rule_node, node_type="rule")
G.add_edge(agent_node, tool_node, relation="calls")
G.add_edge(tool_node, decision_node, relation="produces")
G.add_edge(decision_node, rule_node, relation="matched")
plt.determine(figsize=(14, 9))
pos = nx.spring_layout(G, seed=42, okay=0.8)
nx.draw_networkx_nodes(G, pos, node_size=1800)
nx.draw_networkx_edges(G, pos, arrows=True, arrowstyle="->", arrowsize=15)
nx.draw_networkx_labels(G, pos, font_size=8)
plt.title("Agent Governance Graph: Brokers, Instruments, Selections, and Coverage Guidelines")
plt.axis("off")
plt.tight_layout()
plt.present()
EXPORT_DIR = "/content material/agt_tutorial_outputs"
os.makedirs(EXPORT_DIR, exist_ok=True)
audit_json_path = os.path.be a part of(EXPORT_DIR, "tamper_evident_audit_log.json")
audit_csv_path = os.path.be a part of(EXPORT_DIR, "governance_audit_log.csv")
policy_copy_path = os.path.be a part of(EXPORT_DIR, "advanced_agent_policy.yaml")
test_results_path = os.path.be a part of(EXPORT_DIR, "policy_test_results.csv")
with open(audit_json_path, "w") as f:
json.dump((asdict(r) for r in audit_log.data), f, indent=2, default=str)
audit_df.to_csv(audit_csv_path, index=False)
test_df.to_csv(test_results_path, index=False)
shutil.copy(POLICY_PATH, policy_copy_path)
