Python Bot Templates

OpenAutomate provides a collection of Python templates to help you quickly build automation bots for common scenarios. These templates include best practices, error handling, and integration with the OpenAutomate SDK.

Available Templates

1. Hello World Template

Perfect for getting started and understanding the basic structure.
from openautomateagent import Client
import sys
import time

def main():
    """Simple Hello World automation bot."""
    agent = Client()
    
    try:
        agent.update_status("Starting Hello World automation")
        agent.log("Hello World bot started", level="info")
        
        # Simulate some work
        for i in range(3):
            agent.update_status(f"Processing step {i+1}/3")
            time.sleep(1)
            agent.log(f"Completed step {i+1}", level="info")
        
        # Final message
        message = "Hello from OpenAutomate! 🤖"
        print(message)
        agent.log(message, level="info")
        agent.update_status("Completed successfully")
        
        return 0
        
    except Exception as e:
        agent.log(f"Error: {str(e)}", level="error")
        agent.update_status("Failed")
        return 1

if __name__ == "__main__":
    sys.exit(main())

2. File Processing Template

Automate file operations like reading, processing, and writing files.
from openautomateagent import Client
import sys
import os
from pathlib import Path
import csv
import json

def main():
    """File processing automation bot."""
    agent = Client()
    
    try:
        agent.update_status("Starting file processing")
        
        # Get configuration from assets
        input_folder = agent.get_asset("input_folder", default="./input")
        output_folder = agent.get_asset("output_folder", default="./output")
        file_pattern = agent.get_asset("file_pattern", default="*.csv")
        
        # Process files
        processed_count = process_files(agent, input_folder, output_folder, file_pattern)
        
        agent.log(f"Successfully processed {processed_count} files", level="info")
        agent.update_status("Completed successfully")
        return 0
        
    except Exception as e:
        agent.log(f"File processing failed: {str(e)}", level="error")
        agent.update_status("Failed")
        return 1

def process_files(agent, input_folder, output_folder, pattern):
    """Process files matching the pattern."""
    input_path = Path(input_folder)
    output_path = Path(output_folder)
    
    # Create output directory
    output_path.mkdir(parents=True, exist_ok=True)
    
    # Find files to process
    files = list(input_path.glob(pattern))
    total_files = len(files)
    
    if total_files == 0:
        agent.log(f"No files found matching pattern: {pattern}", level="warning")
        return 0
    
    agent.log(f"Found {total_files} files to process", level="info")
    
    processed_count = 0
    for i, file_path in enumerate(files):
        try:
            progress = int((i / total_files) * 100)
            agent.update_status(f"Processing {file_path.name}", progress=progress)
            
            # Process based on file type
            if file_path.suffix.lower() == '.csv':
                process_csv_file(file_path, output_path / file_path.name)
            elif file_path.suffix.lower() == '.json':
                process_json_file(file_path, output_path / file_path.name)
            else:
                # Generic text processing
                process_text_file(file_path, output_path / file_path.name)
            
            processed_count += 1
            agent.log(f"Processed: {file_path.name}", level="info")
            
        except Exception as e:
            agent.log(f"Failed to process {file_path.name}: {str(e)}", level="error")
    
    return processed_count

def process_csv_file(input_file, output_file):
    """Process CSV file - example: convert to uppercase."""
    with open(input_file, 'r', newline='', encoding='utf-8') as infile:
        reader = csv.reader(infile)
        rows = list(reader)
    
    # Process data (example: uppercase all text)
    processed_rows = []
    for row in rows:
        processed_row = [cell.upper() if isinstance(cell, str) else cell for cell in row]
        processed_rows.append(processed_row)
    
    with open(output_file, 'w', newline='', encoding='utf-8') as outfile:
        writer = csv.writer(outfile)
        writer.writerows(processed_rows)

def process_json_file(input_file, output_file):
    """Process JSON file - example: add timestamp."""
    with open(input_file, 'r', encoding='utf-8') as infile:
        data = json.load(infile)
    
    # Add processing timestamp
    if isinstance(data, dict):
        data['processed_at'] = datetime.now().isoformat()
    elif isinstance(data, list):
        for item in data:
            if isinstance(item, dict):
                item['processed_at'] = datetime.now().isoformat()
    
    with open(output_file, 'w', encoding='utf-8') as outfile:
        json.dump(data, outfile, indent=2)

def process_text_file(input_file, output_file):
    """Process text file - example: add line numbers."""
    with open(input_file, 'r', encoding='utf-8') as infile:
        lines = infile.readlines()
    
    with open(output_file, 'w', encoding='utf-8') as outfile:
        for i, line in enumerate(lines, 1):
            outfile.write(f"{i:04d}: {line}")

if __name__ == "__main__":
    sys.exit(main())

3. Database Operations Template

Automate database queries, updates, and data synchronization.
from openautomateagent import Client
import sys
import sqlite3
import pandas as pd
from datetime import datetime

def main():
    """Database operations automation bot."""
    agent = Client()
    
    try:
        agent.update_status("Starting database operations")
        
        # Get database configuration
        db_path = agent.get_asset("database_path")
        operation = agent.get_asset("operation", default="sync")
        
        # Perform database operations
        if operation == "sync":
            result = sync_data(agent, db_path)
        elif operation == "cleanup":
            result = cleanup_old_records(agent, db_path)
        elif operation == "report":
            result = generate_report(agent, db_path)
        else:
            raise ValueError(f"Unknown operation: {operation}")
        
        agent.log(f"Database operation completed: {result}", level="info")
        agent.update_status("Completed successfully")
        return 0
        
    except Exception as e:
        agent.log(f"Database operation failed: {str(e)}", level="error")
        agent.update_status("Failed")
        return 1

def sync_data(agent, db_path):
    """Synchronize data between tables."""
    agent.update_status("Connecting to database")
    
    with sqlite3.connect(db_path) as conn:
        agent.update_status("Syncing data")
        
        # Example: Sync user data
        cursor = conn.cursor()
        
        # Get source data
        cursor.execute("SELECT id, name, email, updated_at FROM users_staging")
        staging_data = cursor.fetchall()
        
        synced_count = 0
        for user_id, name, email, updated_at in staging_data:
            # Check if user exists in main table
            cursor.execute("SELECT updated_at FROM users WHERE id = ?", (user_id,))
            existing = cursor.fetchone()
            
            if not existing or existing[0] < updated_at:
                # Insert or update user
                cursor.execute("""
                    INSERT OR REPLACE INTO users (id, name, email, updated_at)
                    VALUES (?, ?, ?, ?)
                """, (user_id, name, email, updated_at))
                synced_count += 1
        
        conn.commit()
        agent.log(f"Synced {synced_count} user records", level="info")
        
        return f"Synced {synced_count} records"

def cleanup_old_records(agent, db_path):
    """Clean up old records from database."""
    agent.update_status("Cleaning up old records")
    
    with sqlite3.connect(db_path) as conn:
        cursor = conn.cursor()
        
        # Delete records older than 90 days
        cursor.execute("""
            DELETE FROM logs 
            WHERE created_at < datetime('now', '-90 days')
        """)
        deleted_logs = cursor.rowcount
        
        # Delete completed executions older than 30 days
        cursor.execute("""
            DELETE FROM executions 
            WHERE status = 'completed' 
            AND completed_at < datetime('now', '-30 days')
        """)
        deleted_executions = cursor.rowcount
        
        conn.commit()
        
        result = f"Deleted {deleted_logs} log records, {deleted_executions} executions"
        agent.log(result, level="info")
        
        return result

def generate_report(agent, db_path):
    """Generate database report."""
    agent.update_status("Generating database report")
    
    with sqlite3.connect(db_path) as conn:
        # Generate summary statistics
        df_users = pd.read_sql_query("SELECT COUNT(*) as total_users FROM users", conn)
        df_executions = pd.read_sql_query("""
            SELECT 
                status,
                COUNT(*) as count,
                AVG(duration) as avg_duration
            FROM executions 
            WHERE created_at >= datetime('now', '-7 days')
            GROUP BY status
        """, conn)
        
        # Create report
        report = {
            'generated_at': datetime.now().isoformat(),
            'total_users': df_users.iloc[0]['total_users'],
            'executions_last_7_days': df_executions.to_dict('records')
        }
        
        # Save report
        report_file = f"database_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
        with open(report_file, 'w') as f:
            json.dump(report, f, indent=2)
        
        agent.log(f"Report saved to {report_file}", level="info")
        return f"Report generated: {report_file}"

if __name__ == "__main__":
    sys.exit(main())

4. Web Automation Template

Automate web interactions using Selenium WebDriver.
from openautomateagent import Client
import sys
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
import time

def main():
    """Web automation bot using Selenium."""
    agent = Client()
    
    try:
        agent.update_status("Starting web automation")
        
        # Get configuration
        target_url = agent.get_asset("target_url")
        username = agent.get_asset("username")
        password = agent.get_asset("password")
        
        # Perform web automation
        result = automate_web_task(agent, target_url, username, password)
        
        agent.log(f"Web automation completed: {result}", level="info")
        agent.update_status("Completed successfully")
        return 0
        
    except Exception as e:
        agent.log(f"Web automation failed: {str(e)}", level="error")
        agent.update_status("Failed")
        return 1

def automate_web_task(agent, url, username, password):
    """Perform web automation task."""
    agent.update_status("Setting up web driver")
    
    # Configure Chrome options
    chrome_options = Options()
    chrome_options.add_argument("--headless")  # Run in background
    chrome_options.add_argument("--no-sandbox")
    chrome_options.add_argument("--disable-dev-shm-usage")
    chrome_options.add_argument("--disable-gpu")
    chrome_options.add_argument("--window-size=1920,1080")
    
    driver = webdriver.Chrome(options=chrome_options)
    wait = WebDriverWait(driver, 10)
    
    try:
        agent.update_status("Navigating to website")
        driver.get(url)
        agent.log(f"Navigated to {url}", level="info")
        
        # Login process
        agent.update_status("Logging in")
        login_user(driver, wait, username, password)
        agent.log("Successfully logged in", level="info")
        
        # Perform main task
        agent.update_status("Performing main task")
        result = perform_main_task(driver, wait, agent)
        
        return result
        
    finally:
        driver.quit()
        agent.update_status("Web driver closed")

def login_user(driver, wait, username, password):
    """Handle user login."""
    # Wait for and fill username
    username_field = wait.until(
        EC.presence_of_element_located((By.NAME, "username"))
    )
    username_field.send_keys(username)
    
    # Fill password
    password_field = driver.find_element(By.NAME, "password")
    password_field.send_keys(password)
    
    # Click login button
    login_button = driver.find_element(By.XPATH, "//button[@type='submit']")
    login_button.click()
    
    # Wait for login to complete
    wait.until(EC.url_contains("dashboard"))

def perform_main_task(driver, wait, agent):
    """Perform the main automation task."""
    # Example: Extract data from a table
    agent.update_status("Extracting data from table")
    
    # Wait for table to load
    table = wait.until(
        EC.presence_of_element_located((By.CLASS_NAME, "data-table"))
    )
    
    # Extract table data
    rows = table.find_elements(By.TAG_NAME, "tr")
    data = []
    
    for i, row in enumerate(rows[1:], 1):  # Skip header row
        cells = row.find_elements(By.TAG_NAME, "td")
        if cells:
            row_data = [cell.text for cell in cells]
            data.append(row_data)
            
            # Update progress
            progress = int((i / (len(rows) - 1)) * 100)
            agent.update_status(f"Extracting row {i}/{len(rows)-1}", progress=progress)
    
    # Save extracted data
    import csv
    output_file = f"extracted_data_{int(time.time())}.csv"
    with open(output_file, 'w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        writer.writerows(data)
    
    agent.log(f"Extracted {len(data)} rows to {output_file}", level="info")
    return f"Extracted {len(data)} rows"

if __name__ == "__main__":
    sys.exit(main())

5. API Integration Template

Integrate with external APIs and web services.
from openautomateagent import Client
import sys
import requests
import json
from datetime import datetime
import time

def main():
    """API integration automation bot."""
    agent = Client()
    
    try:
        agent.update_status("Starting API integration")
        
        # Get API configuration
        api_base_url = agent.get_asset("api_base_url")
        api_key = agent.get_asset("api_key")
        operation = agent.get_asset("operation", default="sync")
        
        # Perform API operations
        if operation == "sync":
            result = sync_with_api(agent, api_base_url, api_key)
        elif operation == "fetch":
            result = fetch_data_from_api(agent, api_base_url, api_key)
        elif operation == "push":
            result = push_data_to_api(agent, api_base_url, api_key)
        else:
            raise ValueError(f"Unknown operation: {operation}")
        
        agent.log(f"API integration completed: {result}", level="info")
        agent.update_status("Completed successfully")
        return 0
        
    except Exception as e:
        agent.log(f"API integration failed: {str(e)}", level="error")
        agent.update_status("Failed")
        return 1

def sync_with_api(agent, base_url, api_key):
    """Synchronize data with external API."""
    agent.update_status("Syncing with external API")
    
    headers = {
        'Authorization': f'Bearer {api_key}',
        'Content-Type': 'application/json'
    }
    
    # Fetch data from API
    agent.update_status("Fetching data from API")
    response = requests.get(f"{base_url}/api/users", headers=headers)
    response.raise_for_status()
    
    users = response.json()
    agent.log(f"Fetched {len(users)} users from API", level="info")
    
    # Process each user
    processed_count = 0
    for i, user in enumerate(users):
        try:
            progress = int((i / len(users)) * 100)
            agent.update_status(f"Processing user {user['id']}", progress=progress)
            
            # Update user data
            update_data = {
                'last_sync': datetime.now().isoformat(),
                'sync_source': 'openautomatebot'
            }
            
            update_response = requests.patch(
                f"{base_url}/api/users/{user['id']}", 
                headers=headers,
                json=update_data
            )
            update_response.raise_for_status()
            
            processed_count += 1
            time.sleep(0.1)  # Rate limiting
            
        except Exception as e:
            agent.log(f"Failed to process user {user['id']}: {str(e)}", level="warning")
    
    return f"Processed {processed_count}/{len(users)} users"

def fetch_data_from_api(agent, base_url, api_key):
    """Fetch data from API and save locally."""
    agent.update_status("Fetching data from API")
    
    headers = {'Authorization': f'Bearer {api_key}'}
    
    # Fetch data with pagination
    all_data = []
    page = 1
    
    while True:
        agent.update_status(f"Fetching page {page}")
        
        response = requests.get(
            f"{base_url}/api/data",
            headers=headers,
            params={'page': page, 'per_page': 100}
        )
        response.raise_for_status()
        
        data = response.json()
        if not data['items']:
            break
        
        all_data.extend(data['items'])
        agent.log(f"Fetched page {page}, total items: {len(all_data)}", level="info")
        
        if not data['has_next']:
            break
        
        page += 1
        time.sleep(0.5)  # Rate limiting
    
    # Save data locally
    output_file = f"api_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
    with open(output_file, 'w') as f:
        json.dump(all_data, f, indent=2)
    
    agent.log(f"Saved {len(all_data)} items to {output_file}", level="info")
    return f"Fetched {len(all_data)} items"

def push_data_to_api(agent, base_url, api_key):
    """Push local data to API."""
    agent.update_status("Reading local data")
    
    # Read data from local file
    input_file = agent.get_asset("input_file", default="data_to_push.json")
    with open(input_file, 'r') as f:
        data = json.load(f)
    
    headers = {
        'Authorization': f'Bearer {api_key}',
        'Content-Type': 'application/json'
    }
    
    # Push data in batches
    batch_size = 10
    pushed_count = 0
    
    for i in range(0, len(data), batch_size):
        batch = data[i:i + batch_size]
        progress = int((i / len(data)) * 100)
        agent.update_status(f"Pushing batch {i//batch_size + 1}", progress=progress)
        
        try:
            response = requests.post(
                f"{base_url}/api/data/batch",
                headers=headers,
                json={'items': batch}
            )
            response.raise_for_status()
            
            pushed_count += len(batch)
            agent.log(f"Pushed batch {i//batch_size + 1}, total: {pushed_count}", level="info")
            
            time.sleep(1)  # Rate limiting
            
        except Exception as e:
            agent.log(f"Failed to push batch {i//batch_size + 1}: {str(e)}", level="error")
    
    return f"Pushed {pushed_count}/{len(data)} items"

if __name__ == "__main__":
    sys.exit(main())

Using Templates

1. Create Bot from Template

OpenAutomate uses cookiecutter to generate bot projects from templates:

Prerequisites

Install cookiecutter if you haven’t already:
pip install cookiecutter

First Time Setup (From GitHub)

cookiecutter https://github.com/OpenAutomateOrg/openautomate-bot-template

Additional Bots (Using Local Template)

If you’ve already cloned the template repository locally:
# From local path
cookiecutter path/to/openautomate-bot-template

# Or from within the template directory
cookiecutter .

Answer the Prompts

bot_name [MyBot]: FileProcessorBot
bot_description [A Python automation bot]: Processes CSV files automatically
The version is automatically set to 1.0.0 and does not require user input.

2. Customize Template

  1. Navigate to your generated bot project
  2. Install dependencies: pip install -r requirements.txt
  3. Modify bot.py to fit your specific needs
  4. Update the requirements.txt file if you add dependencies
  5. Test your bot locally: python bot.py

3. Deploy Template

  1. Zip your customized bot folder
  2. Upload to OpenAutomate dashboard
  3. Configure any required assets (credentials, settings)
  4. Test execution on your agents

Template Best Practices

Error Handling

  • Always wrap main logic in try-catch blocks
  • Log errors with appropriate detail
  • Update status on failures
  • Return proper exit codes

Status Updates

  • Provide regular status updates for long operations
  • Include progress percentages when possible
  • Use descriptive status messages

Asset Management

  • Use assets for configuration and credentials
  • Provide default values where appropriate
  • Validate required assets at startup

Logging

  • Log important events and milestones
  • Use appropriate log levels (info, warning, error)
  • Include context in log messages

Performance

  • Implement rate limiting for API calls
  • Use batch operations when possible
  • Monitor memory usage for large datasets

Creating Custom Templates

You can create your own templates by following these guidelines:
  1. Structure: Follow the standard bot structure
  2. Documentation: Include clear comments and README
  3. Configuration: Use assets for configurable values
  4. Testing: Include test cases and examples
  5. Dependencies: Minimize external dependencies
Share your templates with the community by submitting them to our template repository.