SQLAlchemy Instrumentation¶
Profilis provides automatic SQLAlchemy query profiling with minimal configuration.
Quick Start¶
from sqlalchemy import create_engine
from profilis.sqlalchemy.instrumentation import instrument_engine
from profilis.core.emitter import Emitter
from profilis.exporters.jsonl import JSONLExporter
from profilis.core.async_collector import AsyncCollector
# Setup Profilis
exporter = JSONLExporter(dir="./logs")
collector = AsyncCollector(exporter)
emitter = Emitter(collector)
# Create SQLAlchemy engine
engine = create_engine("sqlite:///app.db")
# Instrument the engine
instrument_engine(engine, emitter)
# All queries will now be automatically profiled
Async SQLAlchemy Support¶
from sqlalchemy.ext.asyncio import create_async_engine
from profilis.sqlalchemy.instrumentation import instrument_async_engine
# Create async engine
async_engine = create_async_engine("sqlite+aiosqlite:///app.db")
# Instrument the async engine
emitter = Emitter(collector)
instrument_async_engine(async_engine, emitter)
# All async queries will now be automatically profiled
Features¶
Automatic Query Profiling¶
- Query Timing: Measures execution time with microsecond precision
- Row Counts: Tracks number of rows returned/affected
- Query Text: Captures actual SQL queries (with redaction support)
- Async Support: Works with both sync and async engines
- Performance Metrics: Tracks slow queries and bottlenecks
Performance Optimized¶
- Non-blocking: All profiling happens asynchronously
- Minimal Overhead: ≤15µs per query profiling cost
- Batch Processing: Efficient event collection and export
- Memory Efficient: Lightweight event representation
Configuration¶
Basic Instrumentation¶
from profilis.sqlalchemy.instrumentation import instrument_engine
from profilis.core.emitter import Emitter
# Simple instrumentation
emitter = Emitter(collector)
instrument_engine(engine, emitter)
# With custom options
instrument_engine(
engine,
emitter,
redact=True, # Hide sensitive data (default: True)
max_len=200 # Maximum query length (default: 200)
)
Advanced Configuration¶
# Production configuration
instrument_engine(
engine,
emitter,
redact=True, # Always redact in production (default)
max_len=500 # Truncate very long queries
)
What Gets Profiled¶
Database Events (DB)¶
Each SQL query generates a DB
event:
{
"ts_ns": 1703123456789000000,
"trace_id": "trace-abc123",
"span_id": "span-def456",
"kind": "DB",
"query": "SELECT * FROM users WHERE id = ?",
"dur_ns": 5000000,
"rows": 1,
"engine": "sqlite:///app.db"
}
Fields:
- ts_ns
: Timestamp in nanoseconds
- trace_id
: Current trace identifier (if in trace context)
- span_id
: Current span identifier (if in span context)
- query
: SQL query text (may be redacted)
- dur_ns
: Query execution time in nanoseconds
- rows
: Number of rows returned/affected
- engine
: Database engine identifier
Query Metadata (DB_META)¶
Additional query information is recorded:
{
"kind": "DB_META",
"ts_ns": 1703123456789000000,
"trace_id": "trace-abc123",
"span_id": "span-def456",
"query_hash": "abc123def456",
"query_type": "SELECT",
"table": "users",
"parameters": {"id": 123}
}
Integration Patterns¶
With Flask Adapter¶
from flask import Flask
from profilis.flask.adapter import ProfilisFlask
from profilis.sqlalchemy.instrumentation import instrument_engine
from profilis.core.emitter import Emitter
# Setup Flask with Profilis
app = Flask(__name__)
profilis = ProfilisFlask(app, collector=collector)
# Instrument SQLAlchemy
engine = create_engine("sqlite:///app.db")
emitter = Emitter(collector)
instrument_engine(engine, emitter)
# Now Flask requests and SQL queries are automatically correlated
@app.route('/api/users/<user_id>')
def get_user(user_id):
# This query will be automatically profiled and linked to the request
user = engine.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
return {"user": user}
With Function Profiling¶
from profilis.decorators.profile import profile_function
@profile_function(emitter)
def get_user_data(user_id: int):
"""Profile both function execution and database queries"""
# SQLAlchemy queries are automatically profiled
user = engine.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
posts = engine.execute("SELECT * FROM posts WHERE user_id = ?", (user_id,)).fetchall()
return {"user": user, "posts": posts}
With Runtime Context¶
from profilis.runtime import use_span, span_id
def process_user_batch(user_ids: list[int]):
"""Process multiple users with distributed tracing"""
with use_span(trace_id=span_id()):
for user_id in user_ids:
with use_span(span_id=span_id()):
# Each query inherits the trace context
user = engine.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
# Process user data...
Query Redaction¶
Automatic Redaction¶
# Enable query redaction (recommended for production)
instrument_sqlalchemy(
engine,
collector,
redact_queries=True
)
# Queries with sensitive data are automatically redacted
# Original: SELECT * FROM users WHERE email = 'user@example.com' AND password = 'secret'
# Redacted: SELECT * FROM users WHERE email = ? AND password = ?
Custom Redaction Rules¶
import re
from profilis.sqlalchemy.instrumentation import instrument_sqlalchemy
def custom_redactor(query: str) -> str:
"""Custom query redaction logic"""
# Redact email addresses
query = re.sub(r"'[^']*@[^']*'", "?", query)
# Redact credit card numbers
query = re.sub(r"'[0-9]{4}-[0-9]{4}-[0-9]{4}-[0-9]{4}'", "?", query)
return query
instrument_sqlalchemy(
engine,
collector,
redact_queries=True,
query_redactor=custom_redactor
)
Performance Tuning¶
Query Length Limiting¶
# Limit query text length to reduce memory usage
instrument_engine(
engine,
emitter,
max_len=100 # Truncate queries longer than 100 characters
)
Redaction Control¶
# Disable redaction for debugging (not recommended for production)
instrument_engine(
engine,
emitter,
redact=False # Show full query text
)
Note: The current implementation profiles all queries. For high-volume applications, consider implementing sampling at the collector level or using route-based sampling in Flask.
Monitoring and Alerting¶
Slow Query Detection¶
import logging
logger = logging.getLogger(__name__)
def monitor_slow_queries(collector):
"""Monitor for slow database queries"""
# This would be implemented based on your monitoring strategy
pass
# Example: Alert on queries > 1 second
SLOW_QUERY_THRESHOLD = 1_000_000_000 # 1 second in nanoseconds
Query Performance Metrics¶
from collections import defaultdict
import statistics
class QueryAnalyzer:
def __init__(self):
self.query_stats = defaultdict(list)
def analyze_query(self, query: str, duration_ns: int):
"""Analyze query performance"""
self.query_stats[query].append(duration_ns)
# Alert on slow queries
if duration_ns > 1_000_000_000: # 1 second
logger.warning(f"Slow query detected: {query} ({duration_ns/1_000_000:.2f}ms)")
def get_stats(self):
"""Get query performance statistics"""
stats = {}
for query, durations in self.query_stats.items():
stats[query] = {
"count": len(durations),
"avg_ms": statistics.mean(durations) / 1_000_000,
"max_ms": max(durations) / 1_000_000,
"min_ms": min(durations) / 1_000_000
}
return stats
Testing¶
Unit Testing¶
import pytest
from profilis.exporters.console import ConsoleExporter
@pytest.fixture
def test_collector():
"""Test collector for unit tests"""
exporter = ConsoleExporter(pretty=False)
return AsyncCollector(exporter, queue_size=128, flush_interval=0.01)
@pytest.fixture
def test_engine(test_collector):
"""Test engine with Profilis instrumentation"""
engine = create_engine("sqlite:///:memory:")
emitter = Emitter(test_collector)
instrument_engine(engine, emitter)
return engine
def test_query_profiling(test_engine, test_collector):
"""Test that queries are profiled"""
# Execute a query
result = test_engine.execute("SELECT 1").fetchone()
# Verify profiling data was collected
# (Implementation depends on your testing strategy)
assert result[0] == 1
Integration Testing¶
def test_sqlalchemy_integration(test_engine, test_collector):
"""Test SQLAlchemy integration with Profilis"""
# Create test data
test_engine.execute("CREATE TABLE test (id INTEGER, name TEXT)")
test_engine.execute("INSERT INTO test VALUES (1, 'test')")
# Query the data
result = test_engine.execute("SELECT * FROM test WHERE id = 1").fetchone()
# Verify the result
assert result[0] == 1
assert result[1] == 'test'
Troubleshooting¶
Common Issues¶
- No Events Generated: Ensure the collector is properly configured
- Missing Query Text: Check if redaction is enabled
- Performance Impact: Use sampling and filtering for high-volume applications
- Async Engine Issues: Ensure proper async context setup
Debug Mode¶
import os
os.environ['PROFILIS_DEBUG'] = '1'
# This will enable debug logging for SQLAlchemy instrumentation
emitter = Emitter(collector)
instrument_engine(engine, emitter)
Health Checks¶
def check_sqlalchemy_instrumentation(engine, collector):
"""Check if SQLAlchemy instrumentation is working"""
try:
# Execute a simple query
result = engine.execute("SELECT 1").fetchone()
# Check if events were generated
# (Implementation depends on your monitoring strategy)
return True
except Exception as e:
logger.error(f"SQLAlchemy instrumentation check failed: {e}")
return False
Best Practices¶
- Enable Redaction: Always enable query redaction in production (default behavior)
- Limit Query Length: Use
max_len
to prevent very long queries from consuming memory - Monitor Performance: Track query performance metrics over time
- Use Async Engines: For async applications, use
instrument_async_engine()
with async SQLAlchemy engines - Test Thoroughly: Test instrumentation in your specific environment
- Review Queries: Regularly review profiled queries for optimization opportunities
- Correlate with Requests: Use the same collector for both Flask and SQLAlchemy to correlate requests with queries