django-patterns by affaan-m/everything-claude-code
npx skills add https://github.com/affaan-m/everything-claude-code --skill django-patterns适用于可扩展、可维护应用程序的生产级 Django 架构模式。
myproject/
├── config/
│ ├── __init__.py
│ ├── settings/
│ │ ├── __init__.py
│ │ ├── base.py # 基础设置
│ │ ├── development.py # 开发环境设置
│ │ ├── production.py # 生产环境设置
│ │ └── test.py # 测试环境设置
│ ├── urls.py
│ ├── wsgi.py
│ └── asgi.py
├── manage.py
└── apps/
├── __init__.py
├── users/
│ ├── __init__.py
│ ├── models.py
│ ├── views.py
│ ├── serializers.py
│ ├── urls.py
│ ├── permissions.py
│ ├── filters.py
│ ├── services.py
│ └── tests/
└── products/
└── ...
# config/settings/base.py
from pathlib import Path
BASE_DIR = Path(__file__).resolve().parent.parent.parent
SECRET_KEY = env('DJANGO_SECRET_KEY')
DEBUG = False
ALLOWED_HOSTS = []
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'rest_framework',
'rest_framework.authtoken',
'corsheaders',
# 本地应用
'apps.users',
'apps.products',
]
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'whitenoise.middleware.WhiteNoiseMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'corsheaders.middleware.CorsMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
ROOT_URLCONF = 'config.urls'
WSGI_APPLICATION = 'config.wsgi.application'
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': env('DB_NAME'),
'USER': env('DB_USER'),
'PASSWORD': env('DB_PASSWORD'),
'HOST': env('DB_HOST'),
'PORT': env('DB_PORT', default='5432'),
}
}
# config/settings/development.py
from .base import *
DEBUG = True
ALLOWED_HOSTS = ['localhost', '127.0.0.1']
DATABASES['default']['NAME'] = 'myproject_dev'
INSTALLED_APPS += ['debug_toolbar']
MIDDLEWARE += ['debug_toolbar.middleware.DebugToolbarMiddleware']
EMAIL_BACKEND = 'django.core.mail.backends.console.EmailBackend'
# config/settings/production.py
from .base import *
DEBUG = False
ALLOWED_HOSTS = env.list('ALLOWED_HOSTS')
SECURE_SSL_REDIRECT = True
SESSION_COOKIE_SECURE = True
CSRF_COOKIE_SECURE = True
SECURE_HSTS_SECONDS = 31536000
SECURE_HSTS_INCLUDE_SUBDOMAINS = True
SECURE_HSTS_PRELOAD = True
# 日志记录
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'handlers': {
'file': {
'level': 'WARNING',
'class': 'logging.FileHandler',
'filename': '/var/log/django/django.log',
},
},
'loggers': {
'django': {
'handlers': ['file'],
'level': 'WARNING',
'propagate': True,
},
},
}
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
from django.db import models
from django.contrib.auth.models import AbstractUser
from django.core.validators import MinValueValidator, MaxValueValidator
class User(AbstractUser):
"""扩展 AbstractUser 的自定义用户模型。"""
email = models.EmailField(unique=True)
phone = models.CharField(max_length=20, blank=True)
birth_date = models.DateField(null=True, blank=True)
USERNAME_FIELD = 'email'
REQUIRED_FIELDS = ['username']
class Meta:
db_table = 'users'
verbose_name = 'user'
verbose_name_plural = 'users'
ordering = ['-date_joined']
def __str__(self):
return self.email
def get_full_name(self):
return f"{self.first_name} {self.last_name}".strip()
class Product(models.Model):
"""具有适当字段配置的产品模型。"""
name = models.CharField(max_length=200)
slug = models.SlugField(unique=True, max_length=250)
description = models.TextField(blank=True)
price = models.DecimalField(
max_digits=10,
decimal_places=2,
validators=[MinValueValidator(0)]
)
stock = models.PositiveIntegerField(default=0)
is_active = models.BooleanField(default=True)
category = models.ForeignKey(
'Category',
on_delete=models.CASCADE,
related_name='products'
)
tags = models.ManyToManyField('Tag', blank=True, related_name='products')
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'products'
ordering = ['-created_at']
indexes = [
models.Index(fields=['slug']),
models.Index(fields=['-created_at']),
models.Index(fields=['category', 'is_active']),
]
constraints = [
models.CheckConstraint(
check=models.Q(price__gte=0),
name='price_non_negative'
)
]
def __str__(self):
return self.name
def save(self, *args, **kwargs):
if not self.slug:
self.slug = slugify(self.name)
super().save(*args, **kwargs)
from django.db import models
class ProductQuerySet(models.QuerySet):
"""Product 模型的自定义 QuerySet。"""
def active(self):
"""仅返回活跃产品。"""
return self.filter(is_active=True)
def with_category(self):
"""选择关联的类别以避免 N+1 查询。"""
return self.select_related('category')
def with_tags(self):
"""为多对多关系预取标签。"""
return self.prefetch_related('tags')
def in_stock(self):
"""返回库存大于 0 的产品。"""
return self.filter(stock__gt=0)
def search(self, query):
"""通过名称或描述搜索产品。"""
return self.filter(
models.Q(name__icontains=query) |
models.Q(description__icontains=query)
)
class Product(models.Model):
# ... 字段 ...
objects = ProductQuerySet.as_manager() # 使用自定义 QuerySet
# 用法
Product.objects.active().with_category().in_stock()
class ProductManager(models.Manager):
"""复杂查询的自定义管理器。"""
def get_or_none(self, **kwargs):
"""返回对象或 None 而不是 DoesNotExist。"""
try:
return self.get(**kwargs)
except self.model.DoesNotExist:
return None
def create_with_tags(self, name, price, tag_names):
"""创建带有关联标签的产品。"""
product = self.create(name=name, price=price)
tags = [Tag.objects.get_or_create(name=name)[0] for name in tag_names]
product.tags.set(tags)
return product
def bulk_update_stock(self, product_ids, quantity):
"""批量更新多个产品的库存。"""
return self.filter(id__in=product_ids).update(stock=quantity)
# 在模型中
class Product(models.Model):
# ... 字段 ...
custom = ProductManager()
from rest_framework import serializers
from django.contrib.auth.password_validation import validate_password
from .models import Product, User
class ProductSerializer(serializers.ModelSerializer):
"""Product 模型的序列化器。"""
category_name = serializers.CharField(source='category.name', read_only=True)
average_rating = serializers.FloatField(read_only=True)
discount_price = serializers.SerializerMethodField()
class Meta:
model = Product
fields = [
'id', 'name', 'slug', 'description', 'price',
'discount_price', 'stock', 'category_name',
'average_rating', 'created_at'
]
read_only_fields = ['id', 'slug', 'created_at']
def get_discount_price(self, obj):
"""如果适用,计算折扣价格。"""
if hasattr(obj, 'discount') and obj.discount:
return obj.price * (1 - obj.discount.percent / 100)
return obj.price
def validate_price(self, value):
"""确保价格非负。"""
if value < 0:
raise serializers.ValidationError("价格不能为负数。")
return value
class ProductCreateSerializer(serializers.ModelSerializer):
"""创建产品的序列化器。"""
class Meta:
model = Product
fields = ['name', 'description', 'price', 'stock', 'category']
def validate(self, data):
"""多个字段的自定义验证。"""
if data['price'] > 10000 and data['stock'] > 100:
raise serializers.ValidationError(
"不能同时拥有高价值产品和大库存。"
)
return data
class UserRegistrationSerializer(serializers.ModelSerializer):
"""用户注册序列化器。"""
password = serializers.CharField(
write_only=True,
required=True,
validators=[validate_password],
style={'input_type': 'password'}
)
password_confirm = serializers.CharField(write_only=True, style={'input_type': 'password'})
class Meta:
model = User
fields = ['email', 'username', 'password', 'password_confirm']
def validate(self, data):
"""验证密码是否匹配。"""
if data['password'] != data['password_confirm']:
raise serializers.ValidationError({
"password_confirm": "密码字段不匹配。"
})
return data
def create(self, validated_data):
"""使用哈希密码创建用户。"""
validated_data.pop('password_confirm')
password = validated_data.pop('password')
user = User.objects.create(**validated_data)
user.set_password(password)
user.save()
return user
from rest_framework import viewsets, status, filters
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated, IsAdminUser
from django_filters.rest_framework import DjangoFilterBackend
from .models import Product
from .serializers import ProductSerializer, ProductCreateSerializer
from .permissions import IsOwnerOrReadOnly
from .filters import ProductFilter
from .services import ProductService
class ProductViewSet(viewsets.ModelViewSet):
"""Product 模型的 ViewSet。"""
queryset = Product.objects.select_related('category').prefetch_related('tags')
permission_classes = [IsAuthenticated, IsOwnerOrReadOnly]
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
filterset_class = ProductFilter
search_fields = ['name', 'description']
ordering_fields = ['price', 'created_at', 'name']
ordering = ['-created_at']
def get_serializer_class(self):
"""根据操作返回适当的序列化器。"""
if self.action == 'create':
return ProductCreateSerializer
return ProductSerializer
def perform_create(self, serializer):
"""使用用户上下文保存。"""
serializer.save(created_by=self.request.user)
@action(detail=False, methods=['get'])
def featured(self, request):
"""返回特色产品。"""
featured = self.queryset.filter(is_featured=True)[:10]
serializer = self.get_serializer(featured, many=True)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def purchase(self, request, pk=None):
"""购买产品。"""
product = self.get_object()
service = ProductService()
result = service.purchase(product, request.user)
return Response(result, status=status.HTTP_201_CREATED)
@action(detail=False, methods=['get'], permission_classes=[IsAuthenticated])
def my_products(self, request):
"""返回当前用户创建的产品。"""
products = self.queryset.filter(created_by=request.user)
page = self.paginate_queryset(products)
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def add_to_cart(request):
"""将产品添加到用户购物车。"""
product_id = request.data.get('product_id')
quantity = request.data.get('quantity', 1)
try:
product = Product.objects.get(id=product_id)
except Product.DoesNotExist:
return Response(
{'error': '产品未找到'},
status=status.HTTP_404_NOT_FOUND
)
cart, _ = Cart.objects.get_or_create(user=request.user)
CartItem.objects.create(
cart=cart,
product=product,
quantity=quantity
)
return Response({'message': '已添加到购物车'}, status=status.HTTP_201_CREATED)
# apps/orders/services.py
from typing import Optional
from django.db import transaction
from .models import Order, OrderItem
class OrderService:
"""订单相关业务逻辑的服务层。"""
@staticmethod
@transaction.atomic
def create_order(user, cart: Cart) -> Order:
"""从购物车创建订单。"""
order = Order.objects.create(
user=user,
total_price=cart.total_price
)
for item in cart.items.all():
OrderItem.objects.create(
order=order,
product=item.product,
quantity=item.quantity,
price=item.product.price
)
# 清空购物车
cart.items.all().delete()
return order
@staticmethod
def process_payment(order: Order, payment_data: dict) -> bool:
"""处理订单支付。"""
# 与支付网关集成
payment = PaymentGateway.charge(
amount=order.total_price,
token=payment_data['token']
)
if payment.success:
order.status = Order.Status.PAID
order.save()
# 发送确认邮件
OrderService.send_confirmation_email(order)
return True
return False
@staticmethod
def send_confirmation_email(order: Order):
"""发送订单确认邮件。"""
# 邮件发送逻辑
pass
from django.views.decorators.cache import cache_page
from django.utils.decorators import method_decorator
@method_decorator(cache_page(60 * 15), name='dispatch') # 15 分钟
class ProductListView(generic.ListView):
model = Product
template_name = 'products/list.html'
context_object_name = 'products'
{% load cache %}
{% cache 500 sidebar %}
... 昂贵的侧边栏内容 ...
{% endcache %}
from django.core.cache import cache
def get_featured_products():
"""获取特色产品并使用缓存。"""
cache_key = 'featured_products'
products = cache.get(cache_key)
if products is None:
products = list(Product.objects.filter(is_featured=True))
cache.set(cache_key, products, timeout=60 * 15) # 15 分钟
return products
from django.core.cache import cache
def get_popular_categories():
cache_key = 'popular_categories'
categories = cache.get(cache_key)
if categories is None:
categories = list(Category.objects.annotate(
product_count=Count('products')
).filter(product_count__gt=10).order_by('-product_count')[:20])
cache.set(cache_key, categories, timeout=60 * 60) # 1 小时
return categories
# apps/users/signals.py
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.contrib.auth import get_user_model
from .models import Profile
User = get_user_model()
@receiver(post_save, sender=User)
def create_user_profile(sender, instance, created, **kwargs):
"""创建用户时创建个人资料。"""
if created:
Profile.objects.create(user=instance)
@receiver(post_save, sender=User)
def save_user_profile(sender, instance, **kwargs):
"""保存用户时保存个人资料。"""
instance.profile.save()
# apps/users/apps.py
from django.apps import AppConfig
class UsersConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'apps.users'
def ready(self):
"""应用准备就绪时导入信号。"""
import apps.users.signals
# middleware/active_user_middleware.py
import time
from django.utils.deprecation import MiddlewareMixin
class ActiveUserMiddleware(MiddlewareMixin):
"""跟踪活跃用户的中间件。"""
def process_request(self, request):
"""处理传入请求。"""
if request.user.is_authenticated:
# 更新最后活跃时间
request.user.last_active = timezone.now()
request.user.save(update_fields=['last_active'])
class RequestLoggingMiddleware(MiddlewareMixin):
"""记录请求的中间件。"""
def process_request(self, request):
"""记录请求开始时间。"""
request.start_time = time.time()
def process_response(self, request, response):
"""记录请求持续时间。"""
if hasattr(request, 'start_time'):
duration = time.time() - request.start_time
logger.info(f'{request.method} {request.path} - {response.status_code} - {duration:.3f}s')
return response
# 不好 - N+1 查询
products = Product.objects.all()
for product in products:
print(product.category.name) # 每个产品单独查询
# 好 - 使用 select_related 的单个查询
products = Product.objects.select_related('category').all()
for product in products:
print(product.category.name)
# 好 - 多对多关系的预取
products = Product.objects.prefetch_related('tags').all()
for product in products:
for tag in product.tags.all():
print(tag.name)
class Product(models.Model):
name = models.CharField(max_length=200, db_index=True)
slug = models.SlugField(unique=True)
category = models.ForeignKey('Category', on_delete=models.CASCADE)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
indexes = [
models.Index(fields=['name']),
models.Index(fields=['-created_at']),
models.Index(fields=['category', 'created_at']),
]
# 批量创建
Product.objects.bulk_create([
Product(name=f'Product {i}', price=10.00)
for i in range(1000)
])
# 批量更新
products = Product.objects.all()[:100]
for product in products:
product.is_active = True
Product.objects.bulk_update(products, ['is_active'])
# 批量删除
Product.objects.filter(stock=0).delete()
| 模式 | 描述 |
|---|---|
| 拆分设置 | 分离开发/生产/测试设置 |
| 自定义 QuerySet | 可重用的查询方法 |
| 服务层 | 业务逻辑分离 |
| ViewSet | REST API 端点 |
| 序列化器验证 | 请求/响应转换 |
| select_related | 外键优化 |
| prefetch_related | 多对多优化 |
| 缓存优先 | 缓存昂贵操作 |
| 信号 | 事件驱动操作 |
| 中间件 | 请求/响应处理 |
记住:Django 提供了许多快捷方式,但对于生产应用程序,结构和组织比简洁的代码更重要。为可维护性而构建。
每周安装数
812
仓库
GitHub 星标数
69.1K
首次出现时间
2026年2月1日
安全审计
安装于
opencode681
codex670
gemini-cli647
github-copilot607
cursor574
claude-code573
Production-grade Django architecture patterns for scalable, maintainable applications.
myproject/
├── config/
│ ├── __init__.py
│ ├── settings/
│ │ ├── __init__.py
│ │ ├── base.py # Base settings
│ │ ├── development.py # Dev settings
│ │ ├── production.py # Production settings
│ │ └── test.py # Test settings
│ ├── urls.py
│ ├── wsgi.py
│ └── asgi.py
├── manage.py
└── apps/
├── __init__.py
├── users/
│ ├── __init__.py
│ ├── models.py
│ ├── views.py
│ ├── serializers.py
│ ├── urls.py
│ ├── permissions.py
│ ├── filters.py
│ ├── services.py
│ └── tests/
└── products/
└── ...
# config/settings/base.py
from pathlib import Path
BASE_DIR = Path(__file__).resolve().parent.parent.parent
SECRET_KEY = env('DJANGO_SECRET_KEY')
DEBUG = False
ALLOWED_HOSTS = []
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'rest_framework',
'rest_framework.authtoken',
'corsheaders',
# Local apps
'apps.users',
'apps.products',
]
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'whitenoise.middleware.WhiteNoiseMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'corsheaders.middleware.CorsMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
ROOT_URLCONF = 'config.urls'
WSGI_APPLICATION = 'config.wsgi.application'
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': env('DB_NAME'),
'USER': env('DB_USER'),
'PASSWORD': env('DB_PASSWORD'),
'HOST': env('DB_HOST'),
'PORT': env('DB_PORT', default='5432'),
}
}
# config/settings/development.py
from .base import *
DEBUG = True
ALLOWED_HOSTS = ['localhost', '127.0.0.1']
DATABASES['default']['NAME'] = 'myproject_dev'
INSTALLED_APPS += ['debug_toolbar']
MIDDLEWARE += ['debug_toolbar.middleware.DebugToolbarMiddleware']
EMAIL_BACKEND = 'django.core.mail.backends.console.EmailBackend'
# config/settings/production.py
from .base import *
DEBUG = False
ALLOWED_HOSTS = env.list('ALLOWED_HOSTS')
SECURE_SSL_REDIRECT = True
SESSION_COOKIE_SECURE = True
CSRF_COOKIE_SECURE = True
SECURE_HSTS_SECONDS = 31536000
SECURE_HSTS_INCLUDE_SUBDOMAINS = True
SECURE_HSTS_PRELOAD = True
# Logging
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'handlers': {
'file': {
'level': 'WARNING',
'class': 'logging.FileHandler',
'filename': '/var/log/django/django.log',
},
},
'loggers': {
'django': {
'handlers': ['file'],
'level': 'WARNING',
'propagate': True,
},
},
}
from django.db import models
from django.contrib.auth.models import AbstractUser
from django.core.validators import MinValueValidator, MaxValueValidator
class User(AbstractUser):
"""Custom user model extending AbstractUser."""
email = models.EmailField(unique=True)
phone = models.CharField(max_length=20, blank=True)
birth_date = models.DateField(null=True, blank=True)
USERNAME_FIELD = 'email'
REQUIRED_FIELDS = ['username']
class Meta:
db_table = 'users'
verbose_name = 'user'
verbose_name_plural = 'users'
ordering = ['-date_joined']
def __str__(self):
return self.email
def get_full_name(self):
return f"{self.first_name} {self.last_name}".strip()
class Product(models.Model):
"""Product model with proper field configuration."""
name = models.CharField(max_length=200)
slug = models.SlugField(unique=True, max_length=250)
description = models.TextField(blank=True)
price = models.DecimalField(
max_digits=10,
decimal_places=2,
validators=[MinValueValidator(0)]
)
stock = models.PositiveIntegerField(default=0)
is_active = models.BooleanField(default=True)
category = models.ForeignKey(
'Category',
on_delete=models.CASCADE,
related_name='products'
)
tags = models.ManyToManyField('Tag', blank=True, related_name='products')
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'products'
ordering = ['-created_at']
indexes = [
models.Index(fields=['slug']),
models.Index(fields=['-created_at']),
models.Index(fields=['category', 'is_active']),
]
constraints = [
models.CheckConstraint(
check=models.Q(price__gte=0),
name='price_non_negative'
)
]
def __str__(self):
return self.name
def save(self, *args, **kwargs):
if not self.slug:
self.slug = slugify(self.name)
super().save(*args, **kwargs)
from django.db import models
class ProductQuerySet(models.QuerySet):
"""Custom QuerySet for Product model."""
def active(self):
"""Return only active products."""
return self.filter(is_active=True)
def with_category(self):
"""Select related category to avoid N+1 queries."""
return self.select_related('category')
def with_tags(self):
"""Prefetch tags for many-to-many relationship."""
return self.prefetch_related('tags')
def in_stock(self):
"""Return products with stock > 0."""
return self.filter(stock__gt=0)
def search(self, query):
"""Search products by name or description."""
return self.filter(
models.Q(name__icontains=query) |
models.Q(description__icontains=query)
)
class Product(models.Model):
# ... fields ...
objects = ProductQuerySet.as_manager() # Use custom QuerySet
# Usage
Product.objects.active().with_category().in_stock()
class ProductManager(models.Manager):
"""Custom manager for complex queries."""
def get_or_none(self, **kwargs):
"""Return object or None instead of DoesNotExist."""
try:
return self.get(**kwargs)
except self.model.DoesNotExist:
return None
def create_with_tags(self, name, price, tag_names):
"""Create product with associated tags."""
product = self.create(name=name, price=price)
tags = [Tag.objects.get_or_create(name=name)[0] for name in tag_names]
product.tags.set(tags)
return product
def bulk_update_stock(self, product_ids, quantity):
"""Bulk update stock for multiple products."""
return self.filter(id__in=product_ids).update(stock=quantity)
# In model
class Product(models.Model):
# ... fields ...
custom = ProductManager()
from rest_framework import serializers
from django.contrib.auth.password_validation import validate_password
from .models import Product, User
class ProductSerializer(serializers.ModelSerializer):
"""Serializer for Product model."""
category_name = serializers.CharField(source='category.name', read_only=True)
average_rating = serializers.FloatField(read_only=True)
discount_price = serializers.SerializerMethodField()
class Meta:
model = Product
fields = [
'id', 'name', 'slug', 'description', 'price',
'discount_price', 'stock', 'category_name',
'average_rating', 'created_at'
]
read_only_fields = ['id', 'slug', 'created_at']
def get_discount_price(self, obj):
"""Calculate discount price if applicable."""
if hasattr(obj, 'discount') and obj.discount:
return obj.price * (1 - obj.discount.percent / 100)
return obj.price
def validate_price(self, value):
"""Ensure price is non-negative."""
if value < 0:
raise serializers.ValidationError("Price cannot be negative.")
return value
class ProductCreateSerializer(serializers.ModelSerializer):
"""Serializer for creating products."""
class Meta:
model = Product
fields = ['name', 'description', 'price', 'stock', 'category']
def validate(self, data):
"""Custom validation for multiple fields."""
if data['price'] > 10000 and data['stock'] > 100:
raise serializers.ValidationError(
"Cannot have high-value products with large stock."
)
return data
class UserRegistrationSerializer(serializers.ModelSerializer):
"""Serializer for user registration."""
password = serializers.CharField(
write_only=True,
required=True,
validators=[validate_password],
style={'input_type': 'password'}
)
password_confirm = serializers.CharField(write_only=True, style={'input_type': 'password'})
class Meta:
model = User
fields = ['email', 'username', 'password', 'password_confirm']
def validate(self, data):
"""Validate passwords match."""
if data['password'] != data['password_confirm']:
raise serializers.ValidationError({
"password_confirm": "Password fields didn't match."
})
return data
def create(self, validated_data):
"""Create user with hashed password."""
validated_data.pop('password_confirm')
password = validated_data.pop('password')
user = User.objects.create(**validated_data)
user.set_password(password)
user.save()
return user
from rest_framework import viewsets, status, filters
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated, IsAdminUser
from django_filters.rest_framework import DjangoFilterBackend
from .models import Product
from .serializers import ProductSerializer, ProductCreateSerializer
from .permissions import IsOwnerOrReadOnly
from .filters import ProductFilter
from .services import ProductService
class ProductViewSet(viewsets.ModelViewSet):
"""ViewSet for Product model."""
queryset = Product.objects.select_related('category').prefetch_related('tags')
permission_classes = [IsAuthenticated, IsOwnerOrReadOnly]
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
filterset_class = ProductFilter
search_fields = ['name', 'description']
ordering_fields = ['price', 'created_at', 'name']
ordering = ['-created_at']
def get_serializer_class(self):
"""Return appropriate serializer based on action."""
if self.action == 'create':
return ProductCreateSerializer
return ProductSerializer
def perform_create(self, serializer):
"""Save with user context."""
serializer.save(created_by=self.request.user)
@action(detail=False, methods=['get'])
def featured(self, request):
"""Return featured products."""
featured = self.queryset.filter(is_featured=True)[:10]
serializer = self.get_serializer(featured, many=True)
return Response(serializer.data)
@action(detail=True, methods=['post'])
def purchase(self, request, pk=None):
"""Purchase a product."""
product = self.get_object()
service = ProductService()
result = service.purchase(product, request.user)
return Response(result, status=status.HTTP_201_CREATED)
@action(detail=False, methods=['get'], permission_classes=[IsAuthenticated])
def my_products(self, request):
"""Return products created by current user."""
products = self.queryset.filter(created_by=request.user)
page = self.paginate_queryset(products)
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
from rest_framework.decorators import api_view, permission_classes
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def add_to_cart(request):
"""Add product to user cart."""
product_id = request.data.get('product_id')
quantity = request.data.get('quantity', 1)
try:
product = Product.objects.get(id=product_id)
except Product.DoesNotExist:
return Response(
{'error': 'Product not found'},
status=status.HTTP_404_NOT_FOUND
)
cart, _ = Cart.objects.get_or_create(user=request.user)
CartItem.objects.create(
cart=cart,
product=product,
quantity=quantity
)
return Response({'message': 'Added to cart'}, status=status.HTTP_201_CREATED)
# apps/orders/services.py
from typing import Optional
from django.db import transaction
from .models import Order, OrderItem
class OrderService:
"""Service layer for order-related business logic."""
@staticmethod
@transaction.atomic
def create_order(user, cart: Cart) -> Order:
"""Create order from cart."""
order = Order.objects.create(
user=user,
total_price=cart.total_price
)
for item in cart.items.all():
OrderItem.objects.create(
order=order,
product=item.product,
quantity=item.quantity,
price=item.product.price
)
# Clear cart
cart.items.all().delete()
return order
@staticmethod
def process_payment(order: Order, payment_data: dict) -> bool:
"""Process payment for order."""
# Integration with payment gateway
payment = PaymentGateway.charge(
amount=order.total_price,
token=payment_data['token']
)
if payment.success:
order.status = Order.Status.PAID
order.save()
# Send confirmation email
OrderService.send_confirmation_email(order)
return True
return False
@staticmethod
def send_confirmation_email(order: Order):
"""Send order confirmation email."""
# Email sending logic
pass
from django.views.decorators.cache import cache_page
from django.utils.decorators import method_decorator
@method_decorator(cache_page(60 * 15), name='dispatch') # 15 minutes
class ProductListView(generic.ListView):
model = Product
template_name = 'products/list.html'
context_object_name = 'products'
{% load cache %}
{% cache 500 sidebar %}
... expensive sidebar content ...
{% endcache %}
from django.core.cache import cache
def get_featured_products():
"""Get featured products with caching."""
cache_key = 'featured_products'
products = cache.get(cache_key)
if products is None:
products = list(Product.objects.filter(is_featured=True))
cache.set(cache_key, products, timeout=60 * 15) # 15 minutes
return products
from django.core.cache import cache
def get_popular_categories():
cache_key = 'popular_categories'
categories = cache.get(cache_key)
if categories is None:
categories = list(Category.objects.annotate(
product_count=Count('products')
).filter(product_count__gt=10).order_by('-product_count')[:20])
cache.set(cache_key, categories, timeout=60 * 60) # 1 hour
return categories
# apps/users/signals.py
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.contrib.auth import get_user_model
from .models import Profile
User = get_user_model()
@receiver(post_save, sender=User)
def create_user_profile(sender, instance, created, **kwargs):
"""Create profile when user is created."""
if created:
Profile.objects.create(user=instance)
@receiver(post_save, sender=User)
def save_user_profile(sender, instance, **kwargs):
"""Save profile when user is saved."""
instance.profile.save()
# apps/users/apps.py
from django.apps import AppConfig
class UsersConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'apps.users'
def ready(self):
"""Import signals when app is ready."""
import apps.users.signals
# middleware/active_user_middleware.py
import time
from django.utils.deprecation import MiddlewareMixin
class ActiveUserMiddleware(MiddlewareMixin):
"""Middleware to track active users."""
def process_request(self, request):
"""Process incoming request."""
if request.user.is_authenticated:
# Update last active time
request.user.last_active = timezone.now()
request.user.save(update_fields=['last_active'])
class RequestLoggingMiddleware(MiddlewareMixin):
"""Middleware for logging requests."""
def process_request(self, request):
"""Log request start time."""
request.start_time = time.time()
def process_response(self, request, response):
"""Log request duration."""
if hasattr(request, 'start_time'):
duration = time.time() - request.start_time
logger.info(f'{request.method} {request.path} - {response.status_code} - {duration:.3f}s')
return response
# Bad - N+1 queries
products = Product.objects.all()
for product in products:
print(product.category.name) # Separate query for each product
# Good - Single query with select_related
products = Product.objects.select_related('category').all()
for product in products:
print(product.category.name)
# Good - Prefetch for many-to-many
products = Product.objects.prefetch_related('tags').all()
for product in products:
for tag in product.tags.all():
print(tag.name)
class Product(models.Model):
name = models.CharField(max_length=200, db_index=True)
slug = models.SlugField(unique=True)
category = models.ForeignKey('Category', on_delete=models.CASCADE)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
indexes = [
models.Index(fields=['name']),
models.Index(fields=['-created_at']),
models.Index(fields=['category', 'created_at']),
]
# Bulk create
Product.objects.bulk_create([
Product(name=f'Product {i}', price=10.00)
for i in range(1000)
])
# Bulk update
products = Product.objects.all()[:100]
for product in products:
product.is_active = True
Product.objects.bulk_update(products, ['is_active'])
# Bulk delete
Product.objects.filter(stock=0).delete()
| Pattern | Description |
|---|---|
| Split settings | Separate dev/prod/test settings |
| Custom QuerySet | Reusable query methods |
| Service Layer | Business logic separation |
| ViewSet | REST API endpoints |
| Serializer validation | Request/response transformation |
| select_related | Foreign key optimization |
| prefetch_related | Many-to-many optimization |
| Cache first | Cache expensive operations |
| Signals | Event-driven actions |
| Middleware | Request/response processing |
Remember: Django provides many shortcuts, but for production applications, structure and organization matter more than concise code. Build for maintainability.
Weekly Installs
812
Repository
GitHub Stars
69.1K
First Seen
Feb 1, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykWarn
Installed on
opencode681
codex670
gemini-cli647
github-copilot607
cursor574
claude-code573
React 组合模式指南:Vercel 组件架构最佳实践,提升代码可维护性
102,200 周安装
AI智能体长期记忆系统 - 精英级架构,融合6种方法,永不丢失上下文
1,200 周安装
AI新闻播客制作技能:实时新闻转对话式播客脚本与音频生成
1,200 周安装
Word文档处理器:DOCX创建、编辑、分析与修订痕迹处理全指南 | 自动化办公解决方案
1,200 周安装
React Router 框架模式指南:全栈开发、文件路由、数据加载与渲染策略
1,200 周安装
Nano Banana AI 图像生成工具:使用 Gemini 3 Pro 生成与编辑高分辨率图像
1,200 周安装
SVG Logo Designer - AI 驱动的专业矢量标识设计工具,生成可缩放品牌标识
1,200 周安装