Agent Actions
Submit, track, and manage AI agent actions through the ASCEND governance platform.
Action Evaluation
Basic Evaluation
from ascend import AscendClient, Decision
client = AscendClient(
api_key="owkai_live_xxxxxxxxxxxx",
agent_id="my-agent",
agent_name="My Agent",
)
decision = client.evaluate_action(
action_type="database.query",
resource="customer_database",
)
if decision.decision == Decision.ALLOWED:
print(f"Approved (risk: {decision.risk_score})")
elif decision.decision == Decision.DENIED:
print(f"Denied: {decision.reason}")
elif decision.decision == Decision.PENDING:
print(f"Pending: {decision.approval_request_id}")
Evaluation with Full Context
Provide parameters, context, and risk indicators for more accurate governance:
decision = client.evaluate_action(
action_type="transaction.refund",
resource="stripe_api",
resource_id="ACC-98765",
parameters={
"amount": 50000,
"currency": "USD",
"destination": "external_account",
},
context={
"user_request": "Transfer $50,000 to savings",
"session_id": "sess_abc123",
"ip_address": "192.168.1.100",
},
risk_indicators={
"financial_data": True,
"amount_threshold": "exceeded",
"external_transfer": True,
},
)
evaluate_action() Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
action_type | str | Required | Action type in category.action format |
resource | str | Required | Resource identifier |
parameters | dict | None | Action-specific parameters |
context | dict | None | Session/request context for risk scoring |
resource_id | str | None | Specific resource instance ID |
risk_indicators | dict | None | Risk assessment hints |
wait_for_decision | bool | True | Wait for approval if pending |
timeout | int | None | Timeout for waiting (seconds) |
Decision Handling
Decision Enum
from ascend import Decision
Decision.ALLOWED # Action is approved
Decision.DENIED # Action is denied by policy
Decision.PENDING # Action requires human approval
AuthorizationDecision Properties
| Property | Type | Description |
|---|---|---|
action_id | str | Unique action identifier |
decision | Decision | ALLOWED, DENIED, or PENDING |
risk_score | int | Risk score (0–100) |
risk_level | RiskLevel | LOW, MEDIUM, HIGH, or CRITICAL |
reason | str | Decision explanation |
policy_violations | list | Violated policies |
conditions | list | Conditions applied to approval |
approval_request_id | str | Approval tracking ID (if pending) |
required_approvers | list | Who needs to approve |
Handle All Outcomes
from ascend import Decision
decision = client.evaluate_action(
action_type="data.access",
resource="customer_db",
)
if decision.decision == Decision.ALLOWED:
execute_action()
client.log_action_completed(action_id=decision.action_id)
elif decision.decision == Decision.PENDING:
# Wait with custom timeout
final = client.wait_for_decision(
action_id=decision.action_id,
timeout=300,
poll_interval=5.0,
)
if final.decision == Decision.ALLOWED:
execute_action()
client.log_action_completed(action_id=final.action_id)
elif decision.decision == Decision.DENIED:
log_denial(decision.reason, decision.policy_violations)
Action Completion Logging
After executing an approved action, log the outcome:
Log Success
import time
decision = client.evaluate_action(
action_type="database.query",
resource="production_db",
parameters={"query": "SELECT COUNT(*) FROM users"},
)
if decision.decision == Decision.ALLOWED:
start = time.time()
result = execute_query()
duration = int((time.time() - start) * 1000)
client.log_action_completed(
action_id=decision.action_id,
result={"row_count": result["count"]},
duration_ms=duration,
)
Log Failure
try:
result = execute_query()
except Exception as e:
client.log_action_failed(
action_id=decision.action_id,
error={
"code": "EXECUTION_ERROR",
"message": str(e),
"type": type(e).__name__,
},
)
raise
Approval Workflows
Check Approval Status
if decision.decision == Decision.PENDING:
status = client.check_approval(decision.approval_request_id)
if status["approved"]:
execute_action()
elif status["denied"]:
print(f"Denied by {status['approver']}: {status['comments']}")
Wait for Decision
# Submit without waiting
decision = client.evaluate_action(
action_type="database.delete",
resource="production_db",
wait_for_decision=False,
)
# Wait separately with custom timeout
if decision.decision == Decision.PENDING:
final = client.wait_for_decision(
action_id=decision.action_id,
timeout=300, # 5 minutes
poll_interval=5.0, # Check every 5 seconds
)
Bulk Evaluation
Evaluate multiple actions concurrently (up to 50):
from ascend import AscendClient, AgentAction
actions = [
AgentAction(
agent_id="bot-1", agent_name="Bot",
action_type="data.read", resource="db1",
),
AgentAction(
agent_id="bot-1", agent_name="Bot",
action_type="data.write", resource="db2",
),
]
result = client.evaluate_actions(actions, max_concurrent=5)
print(f"{result.succeeded}/{result.total} succeeded")
for r in result.results:
if r.succeeded:
print(f" {r.action_type}: {r.decision.decision}")
else:
print(f" {r.action_type}: FAILED - {r.error}")
Kill Switch
Check and poll for emergency kill switch status:
# One-time check
status = client.is_blocked()
if status.active:
print(f"Agent blocked: {status.reason}")
# Background polling (daemon thread)
client.start_kill_switch_polling(interval=30)
# Stop polling on shutdown
client.stop_kill_switch_polling()
Enterprise Methods
Policy Evaluation
Evaluate an action against a specific policy:
result = client.evaluate_policy(
policy_id="pol_financial_controls",
action_type="financial.transfer",
resource="stripe_api",
parameters={"amount": 5000},
)
if not result.allowed:
print(f"Blocked by rules: {result.matched_rules}")
Resource Classification
Get the security classification for a resource:
classification = client.get_resource_classification("database")
print(f"Sensitivity: {classification.sensitivity}")
print(f"Compliance: {classification.compliance_tags}")
Audit Log
Query the audit log:
audit = client.query_audit_log(
start_date="2026-03-01",
end_date="2026-03-28",
action_type="financial.transfer",
limit=50,
)
for event in audit.events:
print(f"{event.timestamp}: {event.action_type} - {event.decision}")
Agent Lifecycle
# Send heartbeat
client.heartbeat()
# Get agent health status
status = client.get_agent_status()
print(f"Status: {status.status}, Uptime: {status.uptime_seconds}s")
# Deregister on shutdown
client.deregister()
Best Practices
1. Provide Detailed Context
# Detailed context enables better governance decisions
decision = client.evaluate_action(
action_type="data.access",
resource="customer_database",
resource_id="CUST-12345",
parameters={
"operation": "read",
"fields": ["name", "email", "order_history"],
"purpose": "customer_support",
},
context={
"user_request": "Show order history for refund",
"session_id": "sess_abc123",
},
risk_indicators={
"pii_involved": True,
"financial_data": True,
"data_sensitivity": "medium",
},
)
2. Always Log Outcomes
if decision.decision == Decision.ALLOWED:
try:
result = execute_action()
client.log_action_completed(
action_id=decision.action_id,
result=result,
)
except Exception as e:
client.log_action_failed(
action_id=decision.action_id,
error={"message": str(e)},
)
raise
3. Use Bulk Evaluation for Batch Operations
# Evaluate many actions at once instead of sequential calls
result = client.evaluate_actions(actions, max_concurrent=5)
Next Steps
- Error Handling — Handle errors gracefully
- API Reference — Complete API documentation