Use GibsonAI to track and manage AI agent decisions and workflows through database operations. Create structured data storage for agent actions, decision logs, and workflow states using natural language database management.
MCP Integration
Connect through MCP server for workflow management
Database Management
Explore database management features
CLI Tools
Use the CLI for database operations
Key Features
Decision Tracking Database
- Agent Actions: Track all agent actions and decisions in structured format
- Decision Logs: Store detailed logs of agent decision-making processes
- Workflow States: Maintain current state of agent workflows
- Audit Trail: Complete audit trail of agent operations
Natural Language Database Management
- Schema Creation: Create database schemas using natural language
- Table Management: Add and modify tables with simple prompts
- Query Operations: Query decision data using natural language
- Relationship Building: Define relationships between workflow entities
Workflow Data APIs
- REST Endpoints: Auto-generated APIs for workflow data access
- Query Interface: Natural language query endpoint for complex analysis
- Real-time Updates: Update workflow states in real-time
- Data Validation: Built-in validation for workflow data
Implementation Examples
Creating Workflow Database Schema
# Using Gibson CLI to create workflow tracking schema
# Create decision tracking tables
# gibson modify agent_decisions "Create agent_decisions table with id, agent_id, decision_type, input_data, output_data, confidence_score, timestamp"
# gibson modify workflow_states "Create workflow_states table with id, workflow_id, current_state, previous_state, transition_reason, updated_at"
# gibson modify action_logs "Create action_logs table with id, agent_id, action_type, parameters, result, status, execution_time"
# gibson modify review_flags "Create review_flags table with id, decision_id, flag_reason, priority, status, created_at"
# Generate models and apply changes
# gibson code models
# gibson merge
Tracking Agent Decisions
import requests
from datetime import datetime
class AgentDecisionTracker:
def __init__(self, api_key):
self.api_key = api_key
self.base_url = "https://api.gibsonai.com/v1/-"
self.headers = {"Authorization": f"Bearer {api_key}"}
def log_decision(self, agent_id, decision_type, input_data, output_data, confidence_score):
"""Log agent decision to database"""
decision_data = {
"agent_id": agent_id,
"decision_type": decision_type,
"input_data": input_data,
"output_data": output_data,
"confidence_score": confidence_score,
"timestamp": datetime.now().isoformat()
}
response = requests.post(
f"{self.base_url}/agent-decisions",
json=decision_data,
headers=self.headers
)
if response.status_code == 201:
print(f"Decision logged: {decision_type}")
return response.json()
else:
print(f"Failed to log decision: {response.status_code}")
return None
def update_workflow_state(self, workflow_id, new_state, transition_reason):
"""Update workflow state"""
# First, get current state
current_response = requests.get(
f"{self.base_url}/workflow-states/{workflow_id}",
headers=self.headers
)
if current_response.status_code == 200:
current_data = current_response.json()
previous_state = current_data.get("current_state")
else:
previous_state = None
# Update state
state_data = {
"workflow_id": workflow_id,
"current_state": new_state,
"previous_state": previous_state,
"transition_reason": transition_reason,
"updated_at": datetime.now().isoformat()
}
response = requests.put(
f"{self.base_url}/workflow-states/{workflow_id}",
json=state_data,
headers=self.headers
)
if response.status_code == 200:
print(f"Workflow state updated: {new_state}")
return response.json()
else:
print(f"Failed to update workflow state: {response.status_code}")
return None
def log_action(self, agent_id, action_type, parameters, result, status, execution_time):
"""Log agent action"""
action_data = {
"agent_id": agent_id,
"action_type": action_type,
"parameters": parameters,
"result": result,
"status": status,
"execution_time": execution_time
}
response = requests.post(
f"{self.base_url}/action-logs",
json=action_data,
headers=self.headers
)
if response.status_code == 201:
print(f"Action logged: {action_type}")
return response.json()
else:
print(f"Failed to log action: {response.status_code}")
return None
def flag_for_review(self, decision_id, flag_reason, priority="medium"):
"""Flag decision for review"""
flag_data = {
"decision_id": decision_id,
"flag_reason": flag_reason,
"priority": priority,
"status": "pending",
"created_at": datetime.now().isoformat()
}
response = requests.post(
f"{self.base_url}/review-flags",
json=flag_data,
headers=self.headers
)
if response.status_code == 201:
print(f"Decision flagged for review: {flag_reason}")
return response.json()
else:
print(f"Failed to flag decision: {response.status_code}")
return None
Querying Decision Data
class DecisionAnalyzer:
def __init__(self, api_key):
self.api_key = api_key
self.base_url = "https://api.gibsonai.com/v1/-"
self.headers = {"Authorization": f"Bearer {api_key}"}
def analyze_low_confidence_decisions(self):
"""Find decisions with low confidence scores"""
query_request = {
"query": "Show all decisions with confidence score below 0.7 from the last 24 hours"
}
response = requests.post(
f"{self.base_url}/query",
json=query_request,
headers=self.headers
)
if response.status_code == 200:
results = response.json()
print(f"Found {len(results)} low confidence decisions")
return results
else:
print(f"Query failed: {response.status_code}")
return None
def get_agent_performance_metrics(self, agent_id):
"""Get performance metrics for specific agent"""
query_request = {
"query": f"Calculate average confidence score, success rate, and decision count for agent {agent_id} over the last 7 days"
}
response = requests.post(
f"{self.base_url}/query",
json=query_request,
headers=self.headers
)
if response.status_code == 200:
results = response.json()
print(f"Performance metrics for agent {agent_id}")
return results
else:
print(f"Query failed: {response.status_code}")
return None
def get_workflow_bottlenecks(self):
"""Identify workflow bottlenecks"""
query_request = {
"query": "Show workflow states that have been stuck in the same state for more than 4 hours"
}
response = requests.post(
f"{self.base_url}/query",
json=query_request,
headers=self.headers
)
if response.status_code == 200:
results = response.json()
print(f"Found {len(results)} workflow bottlenecks")
return results
else:
print(f"Query failed: {response.status_code}")
return None
def get_pending_reviews(self):
"""Get all pending review flags"""
query_request = {
"query": "Show all pending review flags ordered by priority and creation date"
}
response = requests.post(
f"{self.base_url}/query",
json=query_request,
headers=self.headers
)
if response.status_code == 200:
results = response.json()
print(f"Found {len(results)} pending reviews")
return results
else:
print(f"Query failed: {response.status_code}")
return None
Agent Workflow Integration
class WorkflowAgent:
def __init__(self, agent_id, api_key):
self.agent_id = agent_id
self.decision_tracker = AgentDecisionTracker(api_key)
self.analyzer = DecisionAnalyzer(api_key)
def make_decision(self, decision_type, input_data):
"""Make a decision and log it"""
# Agent decision-making logic here
output_data = self.process_decision(input_data)
confidence_score = self.calculate_confidence(input_data, output_data)
# Log the decision
decision_record = self.decision_tracker.log_decision(
self.agent_id,
decision_type,
input_data,
output_data,
confidence_score
)
# Flag for review if confidence is low
if confidence_score < 0.7:
self.decision_tracker.flag_for_review(
decision_record["id"],
"Low confidence score",
"high"
)
return output_data
def execute_action(self, action_type, parameters):
"""Execute an action and log it"""
start_time = datetime.now()
try:
# Execute the action
result = self.perform_action(action_type, parameters)
status = "success"
except Exception as e:
result = str(e)
status = "error"
end_time = datetime.now()
execution_time = (end_time - start_time).total_seconds()
# Log the action
self.decision_tracker.log_action(
self.agent_id,
action_type,
parameters,
result,
status,
execution_time
)
return result
def process_decision(self, input_data):
"""Process decision (placeholder for actual logic)"""
# Implement actual decision logic here
return {"decision": "approved", "reason": "meets criteria"}
def calculate_confidence(self, input_data, output_data):
"""Calculate confidence score (placeholder for actual logic)"""
# Implement confidence calculation logic here
return 0.85
def perform_action(self, action_type, parameters):
"""Perform action (placeholder for actual logic)"""
# Implement actual action logic here
return f"Action {action_type} completed successfully"
Customer Service Agent Example
# Create schema for customer service workflow
# gibson modify customer_tickets "Create customer_tickets table with id, customer_id, issue_type, description, priority, status, assigned_agent"
# gibson modify agent_responses "Create agent_responses table with id, ticket_id, agent_id, response_text, response_type, timestamp"
# gibson modify escalation_rules "Create escalation_rules table with id, rule_name, conditions, escalation_action, priority_threshold"
# gibson code models
# gibson merge
class CustomerServiceAgent(WorkflowAgent):
def handle_customer_ticket(self, ticket_id, customer_message):
"""Handle customer service ticket"""
# Make decision about response
input_data = {
"ticket_id": ticket_id,
"customer_message": customer_message
}
decision = self.make_decision("customer_response", input_data)
# Execute response action
response_result = self.execute_action("send_response", {
"ticket_id": ticket_id,
"response": decision["response"]
})
# Update workflow state
self.decision_tracker.update_workflow_state(
ticket_id,
"awaiting_customer_response",
"Agent responded to customer"
)
return response_result
def escalate_ticket(self, ticket_id, escalation_reason):
"""Escalate ticket to human agent"""
# Log escalation decision
escalation_decision = self.make_decision("escalation", {
"ticket_id": ticket_id,
"reason": escalation_reason
})
# Execute escalation action
escalation_result = self.execute_action("escalate_ticket", {
"ticket_id": ticket_id,
"escalation_reason": escalation_reason
})
# Update workflow state
self.decision_tracker.update_workflow_state(
ticket_id,
"escalated",
f"Escalated due to: {escalation_reason}"
)
return escalation_result
Use Cases
Decision Auditing
Perfect for:
- Tracking all agent decisions with full context
- Maintaining audit logs for compliance
- Analyzing decision patterns and quality
- Identifying areas for improvement
Workflow Management
Enable:
- Tracking workflow states and transitions
- Identifying bottlenecks and inefficiencies
- Monitoring workflow performance
- Managing complex multi-step processes
Performance Analysis
Support:
- Analyzing agent performance metrics
- Identifying high-risk decisions
- Tracking confidence scores and success rates
- Optimizing agent behavior based on data
Quality Assurance
Allow:
- Flagging decisions for review
- Tracking review processes
- Maintaining quality standards
- Continuous improvement based on feedback
Benefits for AI Agent Workflows
Comprehensive Tracking
- Full Audit Trail: Complete record of all agent decisions and actions
- Structured Data: Organized data for easy analysis and reporting
- Natural Language Queries: Query decision data using natural language
- Real-time Updates: Track workflow states in real-time
Performance Insights
- Decision Analysis: Analyze decision patterns and quality
- Confidence Tracking: Monitor confidence scores and success rates
- Bottleneck Identification: Identify workflow bottlenecks and inefficiencies
- Performance Metrics: Track agent performance over time
Scalable Architecture
- Database Management: Easily modify schema as workflow needs evolve
- API Access: REST APIs for integration with existing systems
- Natural Language Interface: Use natural language for complex queries
- Flexible Data Model: Adapt to different workflow requirements
Best Practices
Data Management
- Consistent Logging: Ensure all decisions and actions are logged consistently
- Data Quality: Maintain high-quality data for accurate analysis
- Retention Policies: Implement appropriate data retention policies
- Security: Secure sensitive workflow data appropriately
Workflow Design
- Clear States: Define clear workflow states and transitions
- Decision Criteria: Establish clear criteria for decision-making
- Escalation Rules: Define when and how to escalate decisions
- Performance Metrics: Track meaningful performance metrics
Analysis and Improvement
- Regular Analysis: Regularly analyze decision data for insights
- Feedback Loops: Implement feedback loops for continuous improvement
- Pattern Recognition: Identify patterns in decision-making
- Optimization: Continuously optimize workflows based on data
Getting Started
- Design Workflow Schema: Define your workflow and decision tracking schema
- Create Database: Use Gibson CLI to create your database schema
- Integrate Tracking: Add decision tracking to your agent workflows
- Analyze Data: Use natural language queries to analyze decision data
- Optimize: Continuously improve workflows based on insights
Gibson CLI Commands
# Create workflow tracking schema
gibson modify table_name "description of workflow table"
gibson code models
gibson merge
# Generate models for workflow integration
gibson code models
gibson code schemas
Ready to implement database-driven workflow tracking for your AI agents? Get started with GibsonAI.