Neo4j Instrumentation¶
Profilis provides comprehensive instrumentation for Neo4j graph database operations, supporting both synchronous and asynchronous operations. This allows you to monitor Cypher queries, track performance metrics, and analyze graph database performance.
Features¶
- Session & Transaction Monitoring: Automatic tracking of Neo4j sessions and transactions
- Cypher Query Analysis: Query execution time, result statistics, and performance metrics
- Graph Metrics: Node and relationship creation/modification counts
- Error Tracking: Detailed error information and failure analysis
- Parameter Redaction: Configurable parameter redaction for security
- Query Preview: Customizable query preview length for log readability
Installation¶
Install Profilis with Neo4j support:
pip install profilis[neo4j]
This installs the required dependencies:
- neo4j>=5.14
- Neo4j Python driver
Basic Usage¶
Synchronous Neo4j¶
from profilis.neo4j.instrumentation import Neo4jConfig, instrument_neo4j_module
from profilis.core.async_collector import AsyncCollector
from profilis.core.emitter import Emitter
from profilis.exporters.jsonl import JSONLExporter
import neo4j
from neo4j import GraphDatabase
# Setup Profilis collector
exporter = JSONLExporter(dir="./logs")
collector = AsyncCollector(exporter)
emitter = Emitter(collector)
# Create Neo4j driver with instrumentation
config = Neo4jConfig(vendor_label="neo4j", preview_len=200, redact_cypher=True)
instrument_neo4j_module(neo4j, emitter, config)
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
# Use the driver normally
with driver.session() as session:
# Create nodes
result = session.run(
"CREATE (p:Person {name: $name, age: $age}) RETURN p",
name="Alice", age=30
)
# Query nodes
result = session.run("MATCH (p:Person) WHERE p.age > $min_age RETURN p", min_age=25)
people = list(result)
Asynchronous Neo4j¶
import asyncio
from profilis.neo4j.instrumentation import Neo4jConfig, instrument_neo4j_module
from profilis.core.async_collector import AsyncCollector
from profilis.core.emitter import Emitter
from profilis.exporters.jsonl import JSONLExporter
import neo4j
from neo4j import AsyncGraphDatabase
async def main():
# Setup Profilis collector
exporter = JSONLExporter(dir="./logs")
collector = AsyncCollector(exporter)
emitter = Emitter(collector)
# Create async Neo4j driver with instrumentation
config = Neo4jConfig(vendor_label="neo4j", preview_len=200, redact_cypher=True)
instrument_neo4j_module(neo4j, emitter, config)
driver = AsyncGraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
# Use the driver normally
async with driver.session() as session:
# Create nodes
result = await session.run(
"CREATE (p:Person {name: $name, age: $age}) RETURN p",
name="Bob", age=35
)
# Query nodes
result = await session.run("MATCH (p:Person) WHERE p.age > $min_age RETURN p", min_age=30)
people = await result.data()
if __name__ == "__main__":
asyncio.run(main())
Configuration¶
Neo4jConfig Options¶
from profilis.neo4j.instrumentation import Neo4jConfig, instrument_neo4j_module
config = Neo4jConfig(
vendor_label="neo4j", # Custom vendor label for identification
preview_len=150, # Truncate query previews to 150 characters
redact_cypher=True # Whether to redact Cypher queries
)
instrument_neo4j_module(neo4j, emitter, config)
Configuration Parameters¶
vendor_label
: Custom label to identify the database vendor (default: "neo4j")preview_len
: Maximum length for query previews (default: 200)redact_cypher
: Whether to redact Cypher queries (default: True)
Monitored Operations¶
Profilis automatically tracks the following Neo4j operations:
Session Operations¶
session.run()
- Execute Cypher queriessession.begin_transaction()
- Start transactionssession.close()
- Close sessions
Transaction Operations¶
transaction.run()
- Execute queries within transactionstransaction.commit()
- Commit transactionstransaction.rollback()
- Rollback transactions
Query Types¶
- CREATE - Node and relationship creation
- MATCH - Pattern matching queries
- MERGE - Create or match operations
- DELETE - Node and relationship deletion
- SET - Property updates
- RETURN - Result projection
Metrics Extracted¶
For each Neo4j operation, Profilis extracts:
Query Information¶
- Query Text: The Cypher query being executed (with preview truncation)
- Parameters: Query parameters (with redaction support)
- Database: Target database name
- Transaction State: Whether query is in a transaction
Performance Metrics¶
- Duration: Query execution time in microseconds
- Timestamp: When the query was executed
- Success/Failure: Operation outcome
Result Statistics¶
- Nodes Created: Number of nodes created
- Relationships Created: Number of relationships created
- Properties Set: Number of properties set
- Labels Added: Number of labels added
- Indexes Added: Number of indexes added
- Constraints Added: Number of constraints added
Graph Metrics¶
- Nodes Deleted: Number of nodes deleted
- Relationships Deleted: Number of relationships deleted
- Properties Removed: Number of properties removed
- Labels Removed: Number of labels removed
Error Information¶
- Error Code: Neo4j error code (if operation failed)
- Error Message: Detailed error description
- Error Classification: Error type classification
Example Output¶
Successful Query¶
{
"event_type": "DB_META",
"timestamp": "2025-09-24T10:30:45.123456Z",
"duration_us": 2500,
"query_type": "CREATE",
"query_preview": "CREATE (p:Person {name: $name, age: $age}) RETURN p",
"parameters": {"name": "Alice", "age": 30},
"database": "neo4j",
"result_stats": {
"nodes_created": 1,
"properties_set": 2,
"labels_added": 1
},
"success": true
}
Complex Query¶
{
"event_type": "DB_META",
"timestamp": "2025-09-24T10:30:45.123456Z",
"duration_us": 5000,
"query_type": "MATCH",
"query_preview": "MATCH (p:Person)-[r:KNOWS]->(f:Person) WHERE p.age > $min_age RETURN p, f",
"parameters": {"min_age": 25},
"database": "neo4j",
"result_stats": {
"nodes_returned": 10,
"relationships_returned": 15
},
"success": true
}
Error Handling¶
Profilis provides comprehensive error tracking for Neo4j operations:
{
"event_type": "DB_META",
"timestamp": "2025-09-24T10:30:45.123456Z",
"duration_us": 1000,
"query_type": "CREATE",
"query_preview": "CREATE (p:Person {name: $name}) RETURN p",
"parameters": {"name": "Alice"},
"database": "neo4j",
"success": false,
"error": {
"code": "Neo.ClientError.Statement.SyntaxError",
"message": "Invalid input 'P': expected whitespace, comment or ')' (line 1, column 8)",
"classification": "ClientError"
}
}
Transaction Support¶
Profilis tracks transaction operations and their context:
with driver.session() as session:
with session.begin_transaction() as tx:
# These operations will be tracked within the transaction context
tx.run("CREATE (p:Person {name: 'Alice'})")
tx.run("CREATE (p:Person {name: 'Bob'})")
# Transaction commit/rollback will also be tracked
Integration with Flask¶
from flask import Flask
from profilis.flask.adapter import ProfilisFlask
from profilis.neo4j.instrumentation import Neo4jConfig, instrument_neo4j_module
from profilis.core.emitter import Emitter
import neo4j
from neo4j import GraphDatabase
app = Flask(__name__)
# Setup Profilis
exporter = JSONLExporter(dir="./logs")
collector = AsyncCollector(exporter)
emitter = Emitter(collector)
profilis = ProfilisFlask(app, collector=collector)
# Setup Neo4j with instrumentation
config = Neo4jConfig(vendor_label="neo4j", preview_len=200, redact_cypher=True)
instrument_neo4j_module(neo4j, emitter, config)
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
@app.route('/people')
def get_people():
with driver.session() as session:
result = session.run("MATCH (p:Person) RETURN p LIMIT 10")
people = [record["p"] for record in result]
return {"people": people}
Advanced Features¶
Custom Instrumentation¶
You can instrument specific sessions or transactions:
from profilis.neo4j.instrumentation import instrument_neo4j_session
# Instrument a specific session
with driver.session() as session:
instrument_neo4j_session(session, emitter, config)
result = session.run("MATCH (p:Person) RETURN p")
Tracing Integration¶
Profilis provides tracing context support:
from profilis.runtime.context import get_current_parent_span_id
# Get current tracing context
parent_span_id = get_current_parent_span_id()
# Use in your application's tracing system
Best Practices¶
- Parameter Redaction: Always redact sensitive parameters in production
- Query Optimization: Monitor slow queries and optimize Cypher patterns
- Transaction Management: Use transactions for related operations
- Index Monitoring: Track index usage and creation patterns
- Error Monitoring: Monitor error patterns to identify issues
Troubleshooting¶
Common Issues¶
- Missing Events: Ensure instrumentation is properly applied to the driver
- Performance Impact: Profilis adds minimal overhead (~15µs per operation)
- Memory Usage: Large query previews may increase memory usage
Debug Mode¶
Enable debug logging to troubleshoot instrumentation:
import logging
logging.basicConfig(level=logging.DEBUG)
Examples¶
See the complete examples in the repository:
- examples/example_sync_graphdb.py
- Synchronous Neo4j usage
- examples/example_async_graphdb.py
- Asynchronous Neo4j usage