Use GibsonAI to track and manage AI agent decisions and workflows through database operations. Create structured data storage for agent actions, decision logs, and workflow states using natural language database management.

Key Features

Decision Tracking Database

  • Agent Actions: Track all agent actions and decisions in structured format
  • Decision Logs: Store detailed logs of agent decision-making processes
  • Workflow States: Maintain current state of agent workflows
  • Audit Trail: Complete audit trail of agent operations

Natural Language Database Management

  • Schema Creation: Create database schemas using natural language
  • Table Management: Add and modify tables with simple prompts
  • Query Operations: Query decision data using natural language
  • Relationship Building: Define relationships between workflow entities

Workflow Data APIs

  • REST Endpoints: Auto-generated APIs for workflow data access
  • Query Interface: Natural language query endpoint for complex analysis
  • Real-time Updates: Update workflow states in real-time
  • Data Validation: Built-in validation for workflow data

Implementation Examples

Creating Workflow Database Schema

# Using Gibson CLI to create workflow tracking schema
# Create decision tracking tables
# gibson modify agent_decisions "Create agent_decisions table with id, agent_id, decision_type, input_data, output_data, confidence_score, timestamp"
# gibson modify workflow_states "Create workflow_states table with id, workflow_id, current_state, previous_state, transition_reason, updated_at"
# gibson modify action_logs "Create action_logs table with id, agent_id, action_type, parameters, result, status, execution_time"
# gibson modify review_flags "Create review_flags table with id, decision_id, flag_reason, priority, status, created_at"

# Generate models and apply changes
# gibson code models
# gibson merge

Tracking Agent Decisions

import requests
from datetime import datetime

class AgentDecisionTracker:
    def __init__(self, api_key):
        self.api_key = api_key
        self.base_url = "https://api.gibsonai.com/v1/-"
        self.headers = {"Authorization": f"Bearer {api_key}"}

    def log_decision(self, agent_id, decision_type, input_data, output_data, confidence_score):
        """Log agent decision to database"""
        decision_data = {
            "agent_id": agent_id,
            "decision_type": decision_type,
            "input_data": input_data,
            "output_data": output_data,
            "confidence_score": confidence_score,
            "timestamp": datetime.now().isoformat()
        }

        response = requests.post(
            f"{self.base_url}/agent-decisions",
            json=decision_data,
            headers=self.headers
        )

        if response.status_code == 201:
            print(f"Decision logged: {decision_type}")
            return response.json()
        else:
            print(f"Failed to log decision: {response.status_code}")
            return None

    def update_workflow_state(self, workflow_id, new_state, transition_reason):
        """Update workflow state"""
        # First, get current state
        current_response = requests.get(
            f"{self.base_url}/workflow-states/{workflow_id}",
            headers=self.headers
        )

        if current_response.status_code == 200:
            current_data = current_response.json()
            previous_state = current_data.get("current_state")
        else:
            previous_state = None

        # Update state
        state_data = {
            "workflow_id": workflow_id,
            "current_state": new_state,
            "previous_state": previous_state,
            "transition_reason": transition_reason,
            "updated_at": datetime.now().isoformat()
        }

        response = requests.put(
            f"{self.base_url}/workflow-states/{workflow_id}",
            json=state_data,
            headers=self.headers
        )

        if response.status_code == 200:
            print(f"Workflow state updated: {new_state}")
            return response.json()
        else:
            print(f"Failed to update workflow state: {response.status_code}")
            return None

    def log_action(self, agent_id, action_type, parameters, result, status, execution_time):
        """Log agent action"""
        action_data = {
            "agent_id": agent_id,
            "action_type": action_type,
            "parameters": parameters,
            "result": result,
            "status": status,
            "execution_time": execution_time
        }

        response = requests.post(
            f"{self.base_url}/action-logs",
            json=action_data,
            headers=self.headers
        )

        if response.status_code == 201:
            print(f"Action logged: {action_type}")
            return response.json()
        else:
            print(f"Failed to log action: {response.status_code}")
            return None

    def flag_for_review(self, decision_id, flag_reason, priority="medium"):
        """Flag decision for review"""
        flag_data = {
            "decision_id": decision_id,
            "flag_reason": flag_reason,
            "priority": priority,
            "status": "pending",
            "created_at": datetime.now().isoformat()
        }

        response = requests.post(
            f"{self.base_url}/review-flags",
            json=flag_data,
            headers=self.headers
        )

        if response.status_code == 201:
            print(f"Decision flagged for review: {flag_reason}")
            return response.json()
        else:
            print(f"Failed to flag decision: {response.status_code}")
            return None

Querying Decision Data

class DecisionAnalyzer:
    def __init__(self, api_key):
        self.api_key = api_key
        self.base_url = "https://api.gibsonai.com/v1/-"
        self.headers = {"Authorization": f"Bearer {api_key}"}

    def analyze_low_confidence_decisions(self):
        """Find decisions with low confidence scores"""
        query_request = {
            "query": "Show all decisions with confidence score below 0.7 from the last 24 hours"
        }

        response = requests.post(
            f"{self.base_url}/query",
            json=query_request,
            headers=self.headers
        )

        if response.status_code == 200:
            results = response.json()
            print(f"Found {len(results)} low confidence decisions")
            return results
        else:
            print(f"Query failed: {response.status_code}")
            return None

    def get_agent_performance_metrics(self, agent_id):
        """Get performance metrics for specific agent"""
        query_request = {
            "query": f"Calculate average confidence score, success rate, and decision count for agent {agent_id} over the last 7 days"
        }

        response = requests.post(
            f"{self.base_url}/query",
            json=query_request,
            headers=self.headers
        )

        if response.status_code == 200:
            results = response.json()
            print(f"Performance metrics for agent {agent_id}")
            return results
        else:
            print(f"Query failed: {response.status_code}")
            return None

    def get_workflow_bottlenecks(self):
        """Identify workflow bottlenecks"""
        query_request = {
            "query": "Show workflow states that have been stuck in the same state for more than 4 hours"
        }

        response = requests.post(
            f"{self.base_url}/query",
            json=query_request,
            headers=self.headers
        )

        if response.status_code == 200:
            results = response.json()
            print(f"Found {len(results)} workflow bottlenecks")
            return results
        else:
            print(f"Query failed: {response.status_code}")
            return None

    def get_pending_reviews(self):
        """Get all pending review flags"""
        query_request = {
            "query": "Show all pending review flags ordered by priority and creation date"
        }

        response = requests.post(
            f"{self.base_url}/query",
            json=query_request,
            headers=self.headers
        )

        if response.status_code == 200:
            results = response.json()
            print(f"Found {len(results)} pending reviews")
            return results
        else:
            print(f"Query failed: {response.status_code}")
            return None

Agent Workflow Integration

class WorkflowAgent:
    def __init__(self, agent_id, api_key):
        self.agent_id = agent_id
        self.decision_tracker = AgentDecisionTracker(api_key)
        self.analyzer = DecisionAnalyzer(api_key)

    def make_decision(self, decision_type, input_data):
        """Make a decision and log it"""
        # Agent decision-making logic here
        output_data = self.process_decision(input_data)
        confidence_score = self.calculate_confidence(input_data, output_data)

        # Log the decision
        decision_record = self.decision_tracker.log_decision(
            self.agent_id,
            decision_type,
            input_data,
            output_data,
            confidence_score
        )

        # Flag for review if confidence is low
        if confidence_score < 0.7:
            self.decision_tracker.flag_for_review(
                decision_record["id"],
                "Low confidence score",
                "high"
            )

        return output_data

    def execute_action(self, action_type, parameters):
        """Execute an action and log it"""
        start_time = datetime.now()

        try:
            # Execute the action
            result = self.perform_action(action_type, parameters)
            status = "success"
        except Exception as e:
            result = str(e)
            status = "error"

        end_time = datetime.now()
        execution_time = (end_time - start_time).total_seconds()

        # Log the action
        self.decision_tracker.log_action(
            self.agent_id,
            action_type,
            parameters,
            result,
            status,
            execution_time
        )

        return result

    def process_decision(self, input_data):
        """Process decision (placeholder for actual logic)"""
        # Implement actual decision logic here
        return {"decision": "approved", "reason": "meets criteria"}

    def calculate_confidence(self, input_data, output_data):
        """Calculate confidence score (placeholder for actual logic)"""
        # Implement confidence calculation logic here
        return 0.85

    def perform_action(self, action_type, parameters):
        """Perform action (placeholder for actual logic)"""
        # Implement actual action logic here
        return f"Action {action_type} completed successfully"

Customer Service Agent Example

# Create schema for customer service workflow
# gibson modify customer_tickets "Create customer_tickets table with id, customer_id, issue_type, description, priority, status, assigned_agent"
# gibson modify agent_responses "Create agent_responses table with id, ticket_id, agent_id, response_text, response_type, timestamp"
# gibson modify escalation_rules "Create escalation_rules table with id, rule_name, conditions, escalation_action, priority_threshold"
# gibson code models
# gibson merge

class CustomerServiceAgent(WorkflowAgent):
    def handle_customer_ticket(self, ticket_id, customer_message):
        """Handle customer service ticket"""

        # Make decision about response
        input_data = {
            "ticket_id": ticket_id,
            "customer_message": customer_message
        }

        decision = self.make_decision("customer_response", input_data)

        # Execute response action
        response_result = self.execute_action("send_response", {
            "ticket_id": ticket_id,
            "response": decision["response"]
        })

        # Update workflow state
        self.decision_tracker.update_workflow_state(
            ticket_id,
            "awaiting_customer_response",
            "Agent responded to customer"
        )

        return response_result

    def escalate_ticket(self, ticket_id, escalation_reason):
        """Escalate ticket to human agent"""

        # Log escalation decision
        escalation_decision = self.make_decision("escalation", {
            "ticket_id": ticket_id,
            "reason": escalation_reason
        })

        # Execute escalation action
        escalation_result = self.execute_action("escalate_ticket", {
            "ticket_id": ticket_id,
            "escalation_reason": escalation_reason
        })

        # Update workflow state
        self.decision_tracker.update_workflow_state(
            ticket_id,
            "escalated",
            f"Escalated due to: {escalation_reason}"
        )

        return escalation_result

Use Cases

Decision Auditing

Perfect for:

  • Tracking all agent decisions with full context
  • Maintaining audit logs for compliance
  • Analyzing decision patterns and quality
  • Identifying areas for improvement

Workflow Management

Enable:

  • Tracking workflow states and transitions
  • Identifying bottlenecks and inefficiencies
  • Monitoring workflow performance
  • Managing complex multi-step processes

Performance Analysis

Support:

  • Analyzing agent performance metrics
  • Identifying high-risk decisions
  • Tracking confidence scores and success rates
  • Optimizing agent behavior based on data

Quality Assurance

Allow:

  • Flagging decisions for review
  • Tracking review processes
  • Maintaining quality standards
  • Continuous improvement based on feedback

Benefits for AI Agent Workflows

Comprehensive Tracking

  • Full Audit Trail: Complete record of all agent decisions and actions
  • Structured Data: Organized data for easy analysis and reporting
  • Natural Language Queries: Query decision data using natural language
  • Real-time Updates: Track workflow states in real-time

Performance Insights

  • Decision Analysis: Analyze decision patterns and quality
  • Confidence Tracking: Monitor confidence scores and success rates
  • Bottleneck Identification: Identify workflow bottlenecks and inefficiencies
  • Performance Metrics: Track agent performance over time

Scalable Architecture

  • Database Management: Easily modify schema as workflow needs evolve
  • API Access: REST APIs for integration with existing systems
  • Natural Language Interface: Use natural language for complex queries
  • Flexible Data Model: Adapt to different workflow requirements

Best Practices

Data Management

  • Consistent Logging: Ensure all decisions and actions are logged consistently
  • Data Quality: Maintain high-quality data for accurate analysis
  • Retention Policies: Implement appropriate data retention policies
  • Security: Secure sensitive workflow data appropriately

Workflow Design

  • Clear States: Define clear workflow states and transitions
  • Decision Criteria: Establish clear criteria for decision-making
  • Escalation Rules: Define when and how to escalate decisions
  • Performance Metrics: Track meaningful performance metrics

Analysis and Improvement

  • Regular Analysis: Regularly analyze decision data for insights
  • Feedback Loops: Implement feedback loops for continuous improvement
  • Pattern Recognition: Identify patterns in decision-making
  • Optimization: Continuously optimize workflows based on data

Getting Started

  1. Design Workflow Schema: Define your workflow and decision tracking schema
  2. Create Database: Use Gibson CLI to create your database schema
  3. Integrate Tracking: Add decision tracking to your agent workflows
  4. Analyze Data: Use natural language queries to analyze decision data
  5. Optimize: Continuously improve workflows based on insights

Gibson CLI Commands

# Create workflow tracking schema
gibson modify table_name "description of workflow table"
gibson code models
gibson merge

# Generate models for workflow integration
gibson code models
gibson code schemas

Ready to implement database-driven workflow tracking for your AI agents? Get started with GibsonAI.