Python Bot Templates
OpenAutomate provides a collection of Python templates to help you quickly build automation bots for common scenarios. These templates include best practices, error handling, and integration with the OpenAutomate SDK.Available Templates
1. Hello World Template
Perfect for getting started and understanding the basic structure.Copy
from openautomateagent import Client
import sys
import time
def main():
"""Simple Hello World automation bot."""
agent = Client()
try:
agent.update_status("Starting Hello World automation")
agent.log("Hello World bot started", level="info")
# Simulate some work
for i in range(3):
agent.update_status(f"Processing step {i+1}/3")
time.sleep(1)
agent.log(f"Completed step {i+1}", level="info")
# Final message
message = "Hello from OpenAutomate! 🤖"
print(message)
agent.log(message, level="info")
agent.update_status("Completed successfully")
return 0
except Exception as e:
agent.log(f"Error: {str(e)}", level="error")
agent.update_status("Failed")
return 1
if __name__ == "__main__":
sys.exit(main())
2. File Processing Template
Automate file operations like reading, processing, and writing files.Copy
from openautomateagent import Client
import sys
import os
from pathlib import Path
import csv
import json
def main():
"""File processing automation bot."""
agent = Client()
try:
agent.update_status("Starting file processing")
# Get configuration from assets
input_folder = agent.get_asset("input_folder", default="./input")
output_folder = agent.get_asset("output_folder", default="./output")
file_pattern = agent.get_asset("file_pattern", default="*.csv")
# Process files
processed_count = process_files(agent, input_folder, output_folder, file_pattern)
agent.log(f"Successfully processed {processed_count} files", level="info")
agent.update_status("Completed successfully")
return 0
except Exception as e:
agent.log(f"File processing failed: {str(e)}", level="error")
agent.update_status("Failed")
return 1
def process_files(agent, input_folder, output_folder, pattern):
"""Process files matching the pattern."""
input_path = Path(input_folder)
output_path = Path(output_folder)
# Create output directory
output_path.mkdir(parents=True, exist_ok=True)
# Find files to process
files = list(input_path.glob(pattern))
total_files = len(files)
if total_files == 0:
agent.log(f"No files found matching pattern: {pattern}", level="warning")
return 0
agent.log(f"Found {total_files} files to process", level="info")
processed_count = 0
for i, file_path in enumerate(files):
try:
progress = int((i / total_files) * 100)
agent.update_status(f"Processing {file_path.name}", progress=progress)
# Process based on file type
if file_path.suffix.lower() == '.csv':
process_csv_file(file_path, output_path / file_path.name)
elif file_path.suffix.lower() == '.json':
process_json_file(file_path, output_path / file_path.name)
else:
# Generic text processing
process_text_file(file_path, output_path / file_path.name)
processed_count += 1
agent.log(f"Processed: {file_path.name}", level="info")
except Exception as e:
agent.log(f"Failed to process {file_path.name}: {str(e)}", level="error")
return processed_count
def process_csv_file(input_file, output_file):
"""Process CSV file - example: convert to uppercase."""
with open(input_file, 'r', newline='', encoding='utf-8') as infile:
reader = csv.reader(infile)
rows = list(reader)
# Process data (example: uppercase all text)
processed_rows = []
for row in rows:
processed_row = [cell.upper() if isinstance(cell, str) else cell for cell in row]
processed_rows.append(processed_row)
with open(output_file, 'w', newline='', encoding='utf-8') as outfile:
writer = csv.writer(outfile)
writer.writerows(processed_rows)
def process_json_file(input_file, output_file):
"""Process JSON file - example: add timestamp."""
with open(input_file, 'r', encoding='utf-8') as infile:
data = json.load(infile)
# Add processing timestamp
if isinstance(data, dict):
data['processed_at'] = datetime.now().isoformat()
elif isinstance(data, list):
for item in data:
if isinstance(item, dict):
item['processed_at'] = datetime.now().isoformat()
with open(output_file, 'w', encoding='utf-8') as outfile:
json.dump(data, outfile, indent=2)
def process_text_file(input_file, output_file):
"""Process text file - example: add line numbers."""
with open(input_file, 'r', encoding='utf-8') as infile:
lines = infile.readlines()
with open(output_file, 'w', encoding='utf-8') as outfile:
for i, line in enumerate(lines, 1):
outfile.write(f"{i:04d}: {line}")
if __name__ == "__main__":
sys.exit(main())
3. Database Operations Template
Automate database queries, updates, and data synchronization.Copy
from openautomateagent import Client
import sys
import sqlite3
import pandas as pd
from datetime import datetime
def main():
"""Database operations automation bot."""
agent = Client()
try:
agent.update_status("Starting database operations")
# Get database configuration
db_path = agent.get_asset("database_path")
operation = agent.get_asset("operation", default="sync")
# Perform database operations
if operation == "sync":
result = sync_data(agent, db_path)
elif operation == "cleanup":
result = cleanup_old_records(agent, db_path)
elif operation == "report":
result = generate_report(agent, db_path)
else:
raise ValueError(f"Unknown operation: {operation}")
agent.log(f"Database operation completed: {result}", level="info")
agent.update_status("Completed successfully")
return 0
except Exception as e:
agent.log(f"Database operation failed: {str(e)}", level="error")
agent.update_status("Failed")
return 1
def sync_data(agent, db_path):
"""Synchronize data between tables."""
agent.update_status("Connecting to database")
with sqlite3.connect(db_path) as conn:
agent.update_status("Syncing data")
# Example: Sync user data
cursor = conn.cursor()
# Get source data
cursor.execute("SELECT id, name, email, updated_at FROM users_staging")
staging_data = cursor.fetchall()
synced_count = 0
for user_id, name, email, updated_at in staging_data:
# Check if user exists in main table
cursor.execute("SELECT updated_at FROM users WHERE id = ?", (user_id,))
existing = cursor.fetchone()
if not existing or existing[0] < updated_at:
# Insert or update user
cursor.execute("""
INSERT OR REPLACE INTO users (id, name, email, updated_at)
VALUES (?, ?, ?, ?)
""", (user_id, name, email, updated_at))
synced_count += 1
conn.commit()
agent.log(f"Synced {synced_count} user records", level="info")
return f"Synced {synced_count} records"
def cleanup_old_records(agent, db_path):
"""Clean up old records from database."""
agent.update_status("Cleaning up old records")
with sqlite3.connect(db_path) as conn:
cursor = conn.cursor()
# Delete records older than 90 days
cursor.execute("""
DELETE FROM logs
WHERE created_at < datetime('now', '-90 days')
""")
deleted_logs = cursor.rowcount
# Delete completed executions older than 30 days
cursor.execute("""
DELETE FROM executions
WHERE status = 'completed'
AND completed_at < datetime('now', '-30 days')
""")
deleted_executions = cursor.rowcount
conn.commit()
result = f"Deleted {deleted_logs} log records, {deleted_executions} executions"
agent.log(result, level="info")
return result
def generate_report(agent, db_path):
"""Generate database report."""
agent.update_status("Generating database report")
with sqlite3.connect(db_path) as conn:
# Generate summary statistics
df_users = pd.read_sql_query("SELECT COUNT(*) as total_users FROM users", conn)
df_executions = pd.read_sql_query("""
SELECT
status,
COUNT(*) as count,
AVG(duration) as avg_duration
FROM executions
WHERE created_at >= datetime('now', '-7 days')
GROUP BY status
""", conn)
# Create report
report = {
'generated_at': datetime.now().isoformat(),
'total_users': df_users.iloc[0]['total_users'],
'executions_last_7_days': df_executions.to_dict('records')
}
# Save report
report_file = f"database_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(report_file, 'w') as f:
json.dump(report, f, indent=2)
agent.log(f"Report saved to {report_file}", level="info")
return f"Report generated: {report_file}"
if __name__ == "__main__":
sys.exit(main())
4. Web Automation Template
Automate web interactions using Selenium WebDriver.Copy
from openautomateagent import Client
import sys
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
import time
def main():
"""Web automation bot using Selenium."""
agent = Client()
try:
agent.update_status("Starting web automation")
# Get configuration
target_url = agent.get_asset("target_url")
username = agent.get_asset("username")
password = agent.get_asset("password")
# Perform web automation
result = automate_web_task(agent, target_url, username, password)
agent.log(f"Web automation completed: {result}", level="info")
agent.update_status("Completed successfully")
return 0
except Exception as e:
agent.log(f"Web automation failed: {str(e)}", level="error")
agent.update_status("Failed")
return 1
def automate_web_task(agent, url, username, password):
"""Perform web automation task."""
agent.update_status("Setting up web driver")
# Configure Chrome options
chrome_options = Options()
chrome_options.add_argument("--headless") # Run in background
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--disable-gpu")
chrome_options.add_argument("--window-size=1920,1080")
driver = webdriver.Chrome(options=chrome_options)
wait = WebDriverWait(driver, 10)
try:
agent.update_status("Navigating to website")
driver.get(url)
agent.log(f"Navigated to {url}", level="info")
# Login process
agent.update_status("Logging in")
login_user(driver, wait, username, password)
agent.log("Successfully logged in", level="info")
# Perform main task
agent.update_status("Performing main task")
result = perform_main_task(driver, wait, agent)
return result
finally:
driver.quit()
agent.update_status("Web driver closed")
def login_user(driver, wait, username, password):
"""Handle user login."""
# Wait for and fill username
username_field = wait.until(
EC.presence_of_element_located((By.NAME, "username"))
)
username_field.send_keys(username)
# Fill password
password_field = driver.find_element(By.NAME, "password")
password_field.send_keys(password)
# Click login button
login_button = driver.find_element(By.XPATH, "//button[@type='submit']")
login_button.click()
# Wait for login to complete
wait.until(EC.url_contains("dashboard"))
def perform_main_task(driver, wait, agent):
"""Perform the main automation task."""
# Example: Extract data from a table
agent.update_status("Extracting data from table")
# Wait for table to load
table = wait.until(
EC.presence_of_element_located((By.CLASS_NAME, "data-table"))
)
# Extract table data
rows = table.find_elements(By.TAG_NAME, "tr")
data = []
for i, row in enumerate(rows[1:], 1): # Skip header row
cells = row.find_elements(By.TAG_NAME, "td")
if cells:
row_data = [cell.text for cell in cells]
data.append(row_data)
# Update progress
progress = int((i / (len(rows) - 1)) * 100)
agent.update_status(f"Extracting row {i}/{len(rows)-1}", progress=progress)
# Save extracted data
import csv
output_file = f"extracted_data_{int(time.time())}.csv"
with open(output_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerows(data)
agent.log(f"Extracted {len(data)} rows to {output_file}", level="info")
return f"Extracted {len(data)} rows"
if __name__ == "__main__":
sys.exit(main())
5. API Integration Template
Integrate with external APIs and web services.Copy
from openautomateagent import Client
import sys
import requests
import json
from datetime import datetime
import time
def main():
"""API integration automation bot."""
agent = Client()
try:
agent.update_status("Starting API integration")
# Get API configuration
api_base_url = agent.get_asset("api_base_url")
api_key = agent.get_asset("api_key")
operation = agent.get_asset("operation", default="sync")
# Perform API operations
if operation == "sync":
result = sync_with_api(agent, api_base_url, api_key)
elif operation == "fetch":
result = fetch_data_from_api(agent, api_base_url, api_key)
elif operation == "push":
result = push_data_to_api(agent, api_base_url, api_key)
else:
raise ValueError(f"Unknown operation: {operation}")
agent.log(f"API integration completed: {result}", level="info")
agent.update_status("Completed successfully")
return 0
except Exception as e:
agent.log(f"API integration failed: {str(e)}", level="error")
agent.update_status("Failed")
return 1
def sync_with_api(agent, base_url, api_key):
"""Synchronize data with external API."""
agent.update_status("Syncing with external API")
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
}
# Fetch data from API
agent.update_status("Fetching data from API")
response = requests.get(f"{base_url}/api/users", headers=headers)
response.raise_for_status()
users = response.json()
agent.log(f"Fetched {len(users)} users from API", level="info")
# Process each user
processed_count = 0
for i, user in enumerate(users):
try:
progress = int((i / len(users)) * 100)
agent.update_status(f"Processing user {user['id']}", progress=progress)
# Update user data
update_data = {
'last_sync': datetime.now().isoformat(),
'sync_source': 'openautomatebot'
}
update_response = requests.patch(
f"{base_url}/api/users/{user['id']}",
headers=headers,
json=update_data
)
update_response.raise_for_status()
processed_count += 1
time.sleep(0.1) # Rate limiting
except Exception as e:
agent.log(f"Failed to process user {user['id']}: {str(e)}", level="warning")
return f"Processed {processed_count}/{len(users)} users"
def fetch_data_from_api(agent, base_url, api_key):
"""Fetch data from API and save locally."""
agent.update_status("Fetching data from API")
headers = {'Authorization': f'Bearer {api_key}'}
# Fetch data with pagination
all_data = []
page = 1
while True:
agent.update_status(f"Fetching page {page}")
response = requests.get(
f"{base_url}/api/data",
headers=headers,
params={'page': page, 'per_page': 100}
)
response.raise_for_status()
data = response.json()
if not data['items']:
break
all_data.extend(data['items'])
agent.log(f"Fetched page {page}, total items: {len(all_data)}", level="info")
if not data['has_next']:
break
page += 1
time.sleep(0.5) # Rate limiting
# Save data locally
output_file = f"api_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(output_file, 'w') as f:
json.dump(all_data, f, indent=2)
agent.log(f"Saved {len(all_data)} items to {output_file}", level="info")
return f"Fetched {len(all_data)} items"
def push_data_to_api(agent, base_url, api_key):
"""Push local data to API."""
agent.update_status("Reading local data")
# Read data from local file
input_file = agent.get_asset("input_file", default="data_to_push.json")
with open(input_file, 'r') as f:
data = json.load(f)
headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json'
}
# Push data in batches
batch_size = 10
pushed_count = 0
for i in range(0, len(data), batch_size):
batch = data[i:i + batch_size]
progress = int((i / len(data)) * 100)
agent.update_status(f"Pushing batch {i//batch_size + 1}", progress=progress)
try:
response = requests.post(
f"{base_url}/api/data/batch",
headers=headers,
json={'items': batch}
)
response.raise_for_status()
pushed_count += len(batch)
agent.log(f"Pushed batch {i//batch_size + 1}, total: {pushed_count}", level="info")
time.sleep(1) # Rate limiting
except Exception as e:
agent.log(f"Failed to push batch {i//batch_size + 1}: {str(e)}", level="error")
return f"Pushed {pushed_count}/{len(data)} items"
if __name__ == "__main__":
sys.exit(main())
Using Templates
1. Create Bot from Template
OpenAutomate uses cookiecutter to generate bot projects from templates:Prerequisites
Install cookiecutter if you haven’t already:Copy
pip install cookiecutter
First Time Setup (From GitHub)
Copy
cookiecutter https://github.com/OpenAutomateOrg/openautomate-bot-template
Additional Bots (Using Local Template)
If you’ve already cloned the template repository locally:Copy
# From local path
cookiecutter path/to/openautomate-bot-template
# Or from within the template directory
cookiecutter .
Answer the Prompts
Copy
bot_name [MyBot]: FileProcessorBot
bot_description [A Python automation bot]: Processes CSV files automatically
1.0.0
and does not require user input.
2. Customize Template
- Navigate to your generated bot project
- Install dependencies:
pip install -r requirements.txt
- Modify
bot.py
to fit your specific needs - Update the
requirements.txt
file if you add dependencies - Test your bot locally:
python bot.py
3. Deploy Template
- Zip your customized bot folder
- Upload to OpenAutomate dashboard
- Configure any required assets (credentials, settings)
- Test execution on your agents
Template Best Practices
Error Handling
- Always wrap main logic in try-catch blocks
- Log errors with appropriate detail
- Update status on failures
- Return proper exit codes
Status Updates
- Provide regular status updates for long operations
- Include progress percentages when possible
- Use descriptive status messages
Asset Management
- Use assets for configuration and credentials
- Provide default values where appropriate
- Validate required assets at startup
Logging
- Log important events and milestones
- Use appropriate log levels (info, warning, error)
- Include context in log messages
Performance
- Implement rate limiting for API calls
- Use batch operations when possible
- Monitor memory usage for large datasets
Creating Custom Templates
You can create your own templates by following these guidelines:- Structure: Follow the standard bot structure
- Documentation: Include clear comments and README
- Configuration: Use assets for configurable values
- Testing: Include test cases and examples
- Dependencies: Minimize external dependencies