Task Management API
RESTful API for task management with authentication, CRUD operations, and real-time updates
Task Management API
A production-ready RESTful API built with FastAPI that provides comprehensive task management capabilities including user authentication, CRUD operations, real-time updates, and advanced filtering.
OSpec Definition
ospec_version: "1.0.0"
id: "task-management-api"
name: "Task Management API"
description: "RESTful API for task management with authentication, real-time updates, and advanced querying capabilities"
outcome_type: "api"
acceptance:
http_endpoints:
# Authentication endpoints
- path: "/auth/register"
method: "POST"
status: 201
request_schema: "schemas/user_register.json"
response_schema: "schemas/user_response.json"
- path: "/auth/login"
method: "POST"
status: 200
request_schema: "schemas/user_login.json"
response_schema: "schemas/auth_response.json"
- path: "/auth/refresh"
method: "POST"
status: 200
auth_required: true
response_schema: "schemas/auth_response.json"
# Task management endpoints
- path: "/tasks"
method: "GET"
status: 200
auth_required: true
response_schema: "schemas/task_list.json"
query_params:
- name: "status"
type: "string"
enum: ["pending", "in_progress", "completed"]
- name: "priority"
type: "string"
enum: ["low", "medium", "high", "urgent"]
- name: "assigned_to"
type: "integer"
- name: "due_date_from"
type: "string"
format: "date"
- name: "due_date_to"
type: "string"
format: "date"
- name: "limit"
type: "integer"
default: 20
- name: "offset"
type: "integer"
default: 0
- path: "/tasks"
method: "POST"
status: 201
auth_required: true
request_schema: "schemas/task_create.json"
response_schema: "schemas/task_response.json"
- path: "/tasks/{task_id}"
method: "GET"
status: 200
auth_required: true
response_schema: "schemas/task_response.json"
- path: "/tasks/{task_id}"
method: "PUT"
status: 200
auth_required: true
request_schema: "schemas/task_update.json"
response_schema: "schemas/task_response.json"
- path: "/tasks/{task_id}"
method: "DELETE"
status: 204
auth_required: true
- path: "/tasks/{task_id}/assign"
method: "POST"
status: 200
auth_required: true
request_schema: "schemas/task_assign.json"
response_schema: "schemas/task_response.json"
# Project endpoints
- path: "/projects"
method: "GET"
status: 200
auth_required: true
response_schema: "schemas/project_list.json"
- path: "/projects"
method: "POST"
status: 201
auth_required: true
request_schema: "schemas/project_create.json"
response_schema: "schemas/project_response.json"
# Health and monitoring
- path: "/health"
method: "GET"
status: 200
response_contains: ["status", "database", "redis"]
- path: "/metrics"
method: "GET"
status: 200
response_format: "prometheus"
performance:
response_time_p95_ms: 200
response_time_p99_ms: 500
throughput_rps: 1000
concurrent_users: 500
database_query_time_p95_ms: 50
cache_hit_ratio: 0.8
real_time:
websocket_endpoint: "/ws"
events:
- "task_created"
- "task_updated"
- "task_assigned"
- "task_completed"
max_connections: 10000
tests:
- file: "tests/unit/**/*.py"
type: "unit"
coverage_threshold: 0.85
- file: "tests/integration/**/*.py"
type: "integration"
coverage_threshold: 0.75
- file: "tests/e2e/**/*.py"
type: "e2e"
coverage_threshold: 0.6
stack:
backend: "FastAPI 0.104+"
database: "PostgreSQL 15+"
cache: "Redis 7+"
auth: "JWT with refresh tokens"
validation: "Pydantic v2"
orm: "SQLAlchemy 2.0"
migration: "Alembic"
task_queue: "Celery with Redis"
websockets: "FastAPI WebSockets"
documentation: "OpenAPI/Swagger"
containerization: "Docker"
monitoring: "Prometheus + Grafana"
guardrails:
tests_required: true
min_test_coverage: 0.8
lint: true
type_check: true
security_scan: true
dependency_check: true
license_whitelist: ["MIT", "Apache-2.0", "BSD-3-Clause"]
human_approval_required: ["database_migration", "production_deploy"]
prompts:
implementer: |
You are building a production-ready REST API with FastAPI.
Key requirements:
- Follow REST principles strictly
- Use proper HTTP status codes
- Implement comprehensive error handling
- Add request/response validation with Pydantic
- Include proper logging with correlation IDs
- Implement rate limiting and security headers
- Use database transactions appropriately
- Add comprehensive API documentation
- Follow Python best practices (PEP 8, type hints)
- Implement proper async/await patterns
reviewer: |
Focus on these critical areas:
1. API Security: Authentication, authorization, input validation
2. Performance: Query optimization, caching, async operations
3. Reliability: Error handling, transaction management, idempotency
4. Standards Compliance: REST principles, OpenAPI spec accuracy
5. Code Quality: Type safety, error handling, logging
scripts:
setup: "scripts/setup.sh"
test: "scripts/test.sh"
lint: "scripts/lint.sh"
build: "scripts/build.sh"
migrate: "scripts/migrate.sh"
seed: "scripts/seed_data.sh"
dev: "scripts/dev.sh"
production: "scripts/production.sh"
secrets:
provider: "hashicorp-vault"
required:
- "DATABASE_URL"
- "REDIS_URL"
- "JWT_SECRET_KEY"
- "JWT_REFRESH_SECRET_KEY"
optional:
- "SENTRY_DSN"
- "SMTP_PASSWORD"
- "MONITORING_TOKEN"
metadata:
estimated_effort: "5-7 days"
complexity: "intermediate"
tags: ["rest-api", "fastapi", "postgresql", "redis", "jwt", "websockets"]
version: "1.0.0"
maintainers:
- name: "API Team"
email: "api-team@company.com"
Features
Core API Features
- ✅ RESTful Architecture: Proper REST principles with resource-based URLs
- ✅ JWT Authentication: Secure authentication with access and refresh tokens
- ✅ CRUD Operations: Complete Create, Read, Update, Delete for tasks and projects
- ✅ Advanced Filtering: Query tasks by status, priority, assignee, date ranges
- ✅ Pagination: Efficient pagination with limit/offset and cursor support
- ✅ Real-time Updates: WebSocket connections for live task updates
- ✅ Role-based Access: Different permissions for users, managers, and admins
Technical Features
- ✅ Async/Await: Full async implementation for maximum performance
- ✅ Database Transactions: ACID compliance with proper transaction management
- ✅ Caching Strategy: Multi-layer caching with Redis for performance
- ✅ Rate Limiting: API rate limiting to prevent abuse
- ✅ Input Validation: Comprehensive request/response validation
- ✅ Error Handling: Structured error responses with correlation IDs
- ✅ API Documentation: Interactive OpenAPI/Swagger documentation
- ✅ Monitoring: Prometheus metrics and health checks
API Architecture
Database Schema
-- Users table
CREATE TABLE users (
id SERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
username VARCHAR(50) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
first_name VARCHAR(100),
last_name VARCHAR(100),
role VARCHAR(20) DEFAULT 'user',
is_active BOOLEAN DEFAULT true,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
-- Projects table
CREATE TABLE projects (
id SERIAL PRIMARY KEY,
name VARCHAR(200) NOT NULL,
description TEXT,
owner_id INTEGER REFERENCES users(id),
status VARCHAR(20) DEFAULT 'active',
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
-- Tasks table
CREATE TABLE tasks (
id SERIAL PRIMARY KEY,
title VARCHAR(200) NOT NULL,
description TEXT,
status VARCHAR(20) DEFAULT 'pending',
priority VARCHAR(20) DEFAULT 'medium',
project_id INTEGER REFERENCES projects(id),
created_by INTEGER REFERENCES users(id),
assigned_to INTEGER REFERENCES users(id),
due_date TIMESTAMP,
completed_at TIMESTAMP,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
-- Task comments table
CREATE TABLE task_comments (
id SERIAL PRIMARY KEY,
task_id INTEGER REFERENCES tasks(id),
user_id INTEGER REFERENCES users(id),
comment TEXT NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
-- Indexes for performance
CREATE INDEX idx_tasks_status ON tasks(status);
CREATE INDEX idx_tasks_priority ON tasks(priority);
CREATE INDEX idx_tasks_assigned_to ON tasks(assigned_to);
CREATE INDEX idx_tasks_due_date ON tasks(due_date);
CREATE INDEX idx_tasks_project_id ON tasks(project_id);
Project Structure
task-management-api/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI application setup
│ ├── config.py # Configuration management
│ ├── database.py # Database connection and session
│ ├── models/ # SQLAlchemy models
│ │ ├── __init__.py
│ │ ├── user.py
│ │ ├── project.py
│ │ ├── task.py
│ │ └── comment.py
│ ├── schemas/ # Pydantic schemas
│ │ ├── __init__.py
│ │ ├── user.py
│ │ ├── project.py
│ │ ├── task.py
│ │ └── common.py
│ ├── routers/ # API route handlers
│ │ ├── __init__.py
│ │ ├── auth.py
│ │ ├── users.py
│ │ ├── projects.py
│ │ ├── tasks.py
│ │ └── websocket.py
│ ├── services/ # Business logic
│ │ ├── __init__.py
│ │ ├── auth_service.py
│ │ ├── task_service.py
│ │ ├── project_service.py
│ │ └── notification_service.py
│ ├── utils/ # Utility functions
│ │ ├── __init__.py
│ │ ├── auth.py
│ │ ├── cache.py
│ │ ├── pagination.py
│ │ └── validators.py
│ └── middleware/ # Custom middleware
│ ├── __init__.py
│ ├── auth.py
│ ├── cors.py
│ ├── rate_limit.py
│ └── logging.py
├── tests/
│ ├── unit/
│ ├── integration/
│ └── e2e/
├── migrations/ # Alembic database migrations
├── scripts/ # Deployment and utility scripts
├── docker/
├── docs/
├── requirements/
│ ├── base.txt
│ ├── development.txt
│ └── production.txt
├── Dockerfile
├── docker-compose.yml
├── pyproject.toml
└── README.md
Key Implementation Examples
Authentication Middleware
# app/middleware/auth.py
from fastapi import HTTPException, status, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import JWTError, jwt
from app.config import settings
from app.services.auth_service import AuthService
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
auth_service: AuthService = Depends()
):
"""Extract and validate JWT token to get current user."""
try:
payload = jwt.decode(
credentials.credentials,
settings.JWT_SECRET_KEY,
algorithms=[settings.JWT_ALGORITHM]
)
user_id: int = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
user = await auth_service.get_user_by_id(user_id)
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
)
return user
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
def require_role(required_role: str):
"""Decorator to require specific user role."""
def role_checker(current_user = Depends(get_current_user)):
if current_user.role != required_role and current_user.role != "admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient permissions"
)
return current_user
return role_checker
Task Service with Business Logic
# app/services/task_service.py
from typing import List, Optional
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_
from app.models.task import Task
from app.models.user import User
from app.schemas.task import TaskCreate, TaskUpdate, TaskFilter
from app.utils.cache import cache_service
from app.services.notification_service import NotificationService
class TaskService:
def __init__(self, db: Session):
self.db = db
self.notification_service = NotificationService()
async def create_task(self, task_data: TaskCreate, created_by_id: int) -> Task:
"""Create a new task with validation and notifications."""
# Validate project access
if task_data.project_id:
project = self.db.query(Project).filter(
Project.id == task_data.project_id
).first()
if not project:
raise ValueError("Project not found")
# Create task
db_task = Task(
**task_data.dict(),
created_by=created_by_id
)
self.db.add(db_task)
self.db.commit()
self.db.refresh(db_task)
# Send notifications
await self.notification_service.notify_task_created(db_task)
# Invalidate cache
await cache_service.delete_pattern(f"tasks:user:{created_by_id}:*")
return db_task
async def get_tasks(
self,
user_id: int,
filters: TaskFilter,
skip: int = 0,
limit: int = 20
) -> List[Task]:
"""Get filtered tasks with caching."""
# Try cache first
cache_key = f"tasks:user:{user_id}:{hash(str(filters))}:{skip}:{limit}"
cached_tasks = await cache_service.get(cache_key)
if cached_tasks:
return cached_tasks
# Build query
query = self.db.query(Task)
# Apply filters
if filters.status:
query = query.filter(Task.status == filters.status)
if filters.priority:
query = query.filter(Task.priority == filters.priority)
if filters.assigned_to:
query = query.filter(Task.assigned_to == filters.assigned_to)
if filters.project_id:
query = query.filter(Task.project_id == filters.project_id)
if filters.due_date_from:
query = query.filter(Task.due_date >= filters.due_date_from)
if filters.due_date_to:
query = query.filter(Task.due_date <= filters.due_date_to)
# Apply user access control
query = query.filter(
or_(
Task.created_by == user_id,
Task.assigned_to == user_id,
Task.project_id.in_(
self.db.query(Project.id).filter(Project.owner_id == user_id)
)
)
)
# Execute query
tasks = query.offset(skip).limit(limit).all()
# Cache results
await cache_service.set(cache_key, tasks, expire=300) # 5 minutes
return tasks
async def update_task(
self,
task_id: int,
task_data: TaskUpdate,
user_id: int
) -> Optional[Task]:
"""Update task with authorization and change tracking."""
task = self.db.query(Task).filter(Task.id == task_id).first()
if not task:
return None
# Check permissions
if not self._can_modify_task(task, user_id):
raise PermissionError("Insufficient permissions to modify task")
# Track changes for notifications
changes = {}
for field, value in task_data.dict(exclude_unset=True).items():
if getattr(task, field) != value:
changes[field] = {
'old': getattr(task, field),
'new': value
}
setattr(task, field, value)
self.db.commit()
self.db.refresh(task)
# Send notifications for important changes
if changes:
await self.notification_service.notify_task_updated(task, changes)
# Invalidate cache
await self._invalidate_task_cache(task_id)
return task
def _can_modify_task(self, task: Task, user_id: int) -> bool:
"""Check if user can modify the task."""
return (
task.created_by == user_id or
task.assigned_to == user_id or
task.project.owner_id == user_id
)
async def _invalidate_task_cache(self, task_id: int):
"""Invalidate all cache entries related to a task."""
await cache_service.delete_pattern(f"tasks:*")
await cache_service.delete(f"task:{task_id}")
WebSocket Real-time Updates
# app/routers/websocket.py
from fastapi import WebSocket, WebSocketDisconnect, Depends
from typing import List
import json
from app.utils.auth import verify_websocket_token
from app.services.notification_service import NotificationService
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
self.user_connections: dict = {}
async def connect(self, websocket: WebSocket, user_id: int):
await websocket.accept()
self.active_connections.append(websocket)
if user_id not in self.user_connections:
self.user_connections[user_id] = []
self.user_connections[user_id].append(websocket)
def disconnect(self, websocket: WebSocket, user_id: int):
self.active_connections.remove(websocket)
if user_id in self.user_connections:
self.user_connections[user_id].remove(websocket)
if not self.user_connections[user_id]:
del self.user_connections[user_id]
async def send_to_user(self, user_id: int, message: dict):
if user_id in self.user_connections:
for connection in self.user_connections[user_id]:
await connection.send_text(json.dumps(message))
async def broadcast_to_project(self, project_id: int, message: dict):
# Get all users in project and send message
# Implementation depends on your project membership logic
pass
manager = ConnectionManager()
@router.websocket("/ws")
async def websocket_endpoint(
websocket: WebSocket,
token: str,
notification_service: NotificationService = Depends()
):
# Verify token and get user
user = await verify_websocket_token(token)
if not user:
await websocket.close(code=4001, reason="Invalid token")
return
await manager.connect(websocket, user.id)
try:
while True:
# Keep connection alive
data = await websocket.receive_text()
# Handle ping/pong for connection health
if data == "ping":
await websocket.send_text("pong")
except WebSocketDisconnect:
manager.disconnect(websocket, user.id)
Performance Optimizations
Database Query Optimization
# app/models/task.py with optimized queries
from sqlalchemy import Index
from sqlalchemy.orm import relationship, selectinload
class Task(Base):
# ... model definition ...
@classmethod
def get_with_relations(cls, db: Session, task_id: int):
"""Optimized query with eager loading."""
return db.query(cls).options(
selectinload(cls.assignee),
selectinload(cls.creator),
selectinload(cls.project),
selectinload(cls.comments).selectinload(TaskComment.user)
).filter(cls.id == task_id).first()
# Database indexes for common queries
task_status_idx = Index('idx_task_status', Task.status)
task_assigned_to_idx = Index('idx_task_assigned_to', Task.assigned_to)
task_due_date_idx = Index('idx_task_due_date', Task.due_date)
task_compound_idx = Index(
'idx_task_compound',
Task.status,
Task.priority,
Task.assigned_to
)
Caching Strategy
# app/utils/cache.py
import redis
import json
import pickle
from typing import Any, Optional
from app.config import settings
class CacheService:
def __init__(self):
self.redis_client = redis.from_url(
settings.REDIS_URL,
encoding="utf-8",
decode_responses=False
)
async def get(self, key: str) -> Optional[Any]:
"""Get cached value."""
try:
value = self.redis_client.get(key)
if value:
return pickle.loads(value)
return None
except Exception as e:
logger.error(f"Cache get error: {e}")
return None
async def set(self, key: str, value: Any, expire: int = 3600) -> bool:
"""Set cached value with expiration."""
try:
serialized_value = pickle.dumps(value)
return self.redis_client.setex(key, expire, serialized_value)
except Exception as e:
logger.error(f"Cache set error: {e}")
return False
async def delete(self, key: str) -> bool:
"""Delete cached value."""
try:
return bool(self.redis_client.delete(key))
except Exception as e:
logger.error(f"Cache delete error: {e}")
return False
async def delete_pattern(self, pattern: str) -> int:
"""Delete all keys matching pattern."""
try:
keys = self.redis_client.keys(pattern)
if keys:
return self.redis_client.delete(*keys)
return 0
except Exception as e:
logger.error(f"Cache delete pattern error: {e}")
return 0
cache_service = CacheService()
API Testing Examples
Unit Tests
# tests/unit/test_task_service.py
import pytest
from unittest.mock import Mock, AsyncMock
from app.services.task_service import TaskService
from app.schemas.task import TaskCreate
from app.models.task import Task
class TestTaskService:
@pytest.fixture
def mock_db(self):
return Mock()
@pytest.fixture
def task_service(self, mock_db):
return TaskService(mock_db)
@pytest.mark.asyncio
async def test_create_task_success(self, task_service, mock_db):
# Arrange
task_data = TaskCreate(
title="Test Task",
description="Test Description",
priority="high"
)
created_by_id = 1
mock_task = Task(
id=1,
title=task_data.title,
description=task_data.description,
priority=task_data.priority,
created_by=created_by_id
)
mock_db.add.return_value = None
mock_db.commit.return_value = None
mock_db.refresh.return_value = None
# Mock notification service
task_service.notification_service = AsyncMock()
# Act
result = await task_service.create_task(task_data, created_by_id)
# Assert
mock_db.add.assert_called_once()
mock_db.commit.assert_called_once()
mock_db.refresh.assert_called_once()
task_service.notification_service.notify_task_created.assert_called_once()
Integration Tests
# tests/integration/test_task_api.py
import pytest
from httpx import AsyncClient
from app.main import app
from app.database import get_db
from tests.conftest import override_get_db, test_db
@pytest.mark.asyncio
async def test_create_task_endpoint(test_db, authenticated_client):
# Create test data
task_data = {
"title": "Integration Test Task",
"description": "Test task creation via API",
"priority": "medium",
"status": "pending"
}
# Make request
response = await authenticated_client.post("/tasks", json=task_data)
# Assert response
assert response.status_code == 201
data = response.json()
assert data["title"] == task_data["title"]
assert data["description"] == task_data["description"]
assert data["priority"] == task_data["priority"]
assert data["status"] == task_data["status"]
assert "id" in data
assert "created_at" in data
@pytest.mark.asyncio
async def test_get_tasks_with_filters(test_db, authenticated_client):
# Create test tasks
await create_test_tasks(test_db)
# Test filtering by status
response = await authenticated_client.get("/tasks?status=pending")
assert response.status_code == 200
data = response.json()
assert len(data["tasks"]) > 0
assert all(task["status"] == "pending" for task in data["tasks"])
# Test filtering by priority
response = await authenticated_client.get("/tasks?priority=high")
assert response.status_code == 200
data = response.json()
assert all(task["priority"] == "high" for task in data["tasks"])
Performance Benchmarks
Expected performance characteristics:
- Response Time:
- P95: < 200ms for simple queries
- P99: < 500ms for complex queries
- Throughput: 1000+ requests per second
- Concurrent Users: 500+ simultaneous users
- Database: Query times < 50ms (P95)
- Cache Hit Ratio: > 80% for frequently accessed data
Security Features
- JWT Authentication: Secure token-based authentication
- Role-based Authorization: User, manager, and admin roles
- Input Validation: Comprehensive request validation with Pydantic
- Rate Limiting: API rate limiting to prevent abuse
- CORS Configuration: Proper cross-origin resource sharing setup
- Security Headers: Comprehensive security headers
- SQL Injection Prevention: Parameterized queries with SQLAlchemy
- Password Security: Bcrypt hashing with salt
Deployment
Docker Configuration
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
postgresql-client \
&& rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY requirements/production.txt .
RUN pip install --no-cache-dir -r production.txt
# Copy application code
COPY . .
# Create non-root user
RUN groupadd -r appuser && useradd -r -g appuser appuser
RUN chown -R appuser:appuser /app
USER appuser
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \
CMD curl -f http://localhost:8000/health || exit 1
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
Live Demo
Source Code
This comprehensive API example demonstrates how OSpec can generate a production-ready REST API with all the essential features for a real-world application.