Directus Backend Architecture by gumpen-app/directapp
npx skills add https://github.com/gumpen-app/directapp --skill 'Directus Backend Architecture'此技能提供对 Directus 后端架构的深入专业知识,涵盖 API 端点扩展、钩子系统、服务层、流程与自动化、数据库操作、身份验证和性能优化。掌握 TypeScript/Node.js 后端,以构建可扩展、安全且高效的 Directus 应用程序。
directus/
├── api/ # 核心 API 实现
│ ├── src/
│ │ ├── services/ # 业务逻辑层
│ │ ├── controllers/ # 请求处理器
│ │ ├── middleware/ # Express 中间件
│ │ ├── database/ # 数据库抽象
│ │ ├── utils/ # 辅助函数
│ │ └── types/ # TypeScript 定义
├── extensions/
│ ├── endpoints/ # 自定义 API 端点
│ ├── hooks/ # 事件钩子
│ ├── operations/ # 流程操作
│ └── services/ # 自定义服务
└── shared/ # 共享工具
npx create-directus-extension@latest
# 选择:
# > endpoint
# > my-custom-api
# > typescript
// src/index.ts
import { defineEndpoint } from '@directus/extensions-sdk';
import { Router } from 'express';
import Joi from 'joi';
export default defineEndpoint((router, context) => {
const { services, database, getSchema, env, logger, emitter } = context;
const { ItemsService, MailService, AssetsService } = services;
// 输入验证模式
const createSchema = Joi.object({
title: Joi.string().required().min(3).max(255),
content: Joi.string().required(),
status: Joi.string().valid('draft', 'published').default('draft'),
tags: Joi.array().items(Joi.string()),
metadata: Joi.object(),
});
// GET /custom/analytics
router.get('/analytics', async (req, res, next) => {
try {
// 检查身份验证
if (!req.accountability?.user) {
return res.status(401).json({
error: '未授权',
});
}
const schema = await getSchema();
// 直接使用 Knex 进行复杂查询
const results = await database
.select(
database.raw('DATE(created_at) as date'),
database.raw('COUNT(*) as count'),
database.raw('AVG(amount) as avg_amount')
)
.from('orders')
.where('status', 'completed')
.whereRaw('created_at >= DATE_SUB(NOW(), INTERVAL 30 DAY)')
.groupBy(database.raw('DATE(created_at)'))
.orderBy('date', 'desc');
// 转换结果
const analytics = {
daily: results,
total: results.reduce((sum, day) => sum + day.count, 0),
average: results.reduce((sum, day) => sum + day.avg_amount, 0) / results.length,
period: '30_days',
};
return res.json({
data: analytics,
});
} catch (error) {
logger.error('分析端点错误:', error);
return next(error);
}
});
// POST /custom/process
router.post('/process', async (req, res, next) => {
try {
// 验证输入
const { error, value } = createSchema.validate(req.body);
if (error) {
return res.status(400).json({
error: '验证错误',
details: error.details,
});
}
// 使用用户上下文创建项目服务
const itemsService = new ItemsService('articles', {
schema: await getSchema(),
accountability: req.accountability,
});
// 业务逻辑事务
const result = await database.transaction(async (trx) => {
// 创建主项目
const article = await itemsService.createOne({
...value,
author: req.accountability?.user,
}, { emitEvents: false });
// 创建相关项目
if (value.tags && value.tags.length > 0) {
const tagsService = new ItemsService('article_tags', {
schema: await getSchema(),
accountability: req.accountability,
knex: trx,
});
await tagsService.createMany(
value.tags.map(tag => ({
article_id: article.id,
tag_name: tag,
}))
);
}
// 触发自定义事件
emitter.emitAction('custom.article.created', {
article,
user: req.accountability?.user,
});
return article;
});
// 发送通知邮件
if (env.EMAIL_NOTIFICATIONS === 'true') {
const mailService = new MailService({ schema: await getSchema() });
await mailService.send({
to: env.ADMIN_EMAIL,
subject: `新文章:${result.title}`,
html: `
<h2>新文章已发布</h2>
<p><strong>标题:</strong> ${result.title}</p>
<p><strong>作者:</strong> ${req.accountability?.user}</p>
<p><strong>状态:</strong> ${result.status}</p>
`,
});
}
return res.status(201).json({
data: result,
});
} catch (error) {
logger.error('处理端点错误:', error);
return next(error);
}
});
// DELETE /custom/cleanup
router.delete('/cleanup/:days', async (req, res, next) => {
try {
// 检查管理员权限
if (req.accountability?.role !== 'admin') {
return res.status(403).json({
error: '禁止访问',
});
}
const days = parseInt(req.params.days, 10);
if (isNaN(days) || days < 1) {
return res.status(400).json({
error: '无效的天数参数',
});
}
// 执行清理
const deleted = await database('logs')
.where('created_at', '<', database.raw(`DATE_SUB(NOW(), INTERVAL ? DAY)`, [days]))
.delete();
logger.info(`清理了 ${deleted} 条旧日志条目`);
return res.json({
data: {
deleted,
message: `移除了 ${deleted} 条超过 ${days} 天的日志条目`,
},
});
} catch (error) {
logger.error('清理端点错误:', error);
return next(error);
}
});
// WebSocket 端点
router.get('/stream', (req, res) => {
// 用于实时更新的服务器发送事件
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
});
// 发送初始连接
res.write('data: {"type":"connected"}\n\n');
// 监听事件
const handler = (data: any) => {
res.write(`data: ${JSON.stringify(data)}\n\n`);
};
emitter.onAction('items.create', handler);
emitter.onAction('items.update', handler);
// 断开连接时清理
req.on('close', () => {
emitter.offAction('items.create', handler);
emitter.offAction('items.update', handler);
});
});
});
// src/index.ts
import { defineHook } from '@directus/extensions-sdk';
import { createHash } from 'crypto';
import axios from 'axios';
export default defineHook(({ filter, action, init, schedule }, context) => {
const { services, database, getSchema, env, logger } = context;
const { ItemsService, ActivityService } = services;
// 过滤器钩子 - 在数据库操作前运行
filter('items.create', async (payload, meta, context) => {
// 为文章自动生成 slugs
if (meta.collection === 'articles') {
if (!payload.slug && payload.title) {
payload.slug = generateSlug(payload.title);
}
// 添加元数据
payload.word_count = countWords(payload.content || '');
payload.reading_time = Math.ceil(payload.word_count / 200);
// 如果未提供摘要则生成
if (!payload.excerpt && payload.content) {
payload.excerpt = generateExcerpt(payload.content, 160);
}
// 验证唯一性
const schema = await getSchema();
const articlesService = new ItemsService('articles', {
schema,
knex: database,
});
const existing = await articlesService.readByQuery({
filter: { slug: { _eq: payload.slug } },
limit: 1,
});
if (existing.length > 0) {
payload.slug = `${payload.slug}-${Date.now()}`;
}
}
return payload;
});
// 动作钩子 - 在数据库操作后运行
action('items.create', async ({ payload, key, collection }, context) => {
try {
if (collection === 'orders') {
// 发送到外部 API
if (env.EXTERNAL_API_URL) {
await axios.post(`${env.EXTERNAL_API_URL}/webhook/order`, {
order_id: key,
...payload,
}, {
headers: {
'X-API-Key': env.EXTERNAL_API_KEY,
},
});
}
// 创建审计日志
const schema = await getSchema();
const activityService = new ActivityService({
schema,
accountability: context.accountability,
});
await activityService.createOne({
action: 'create',
collection: 'orders',
item: key,
comment: `订单 #${key} 已创建,总计:${payload.total}`,
});
// 更新统计信息
await updateStatistics('orders', 'created');
}
} catch (error) {
logger.error('订单创建钩子错误:', error);
// 不要抛出 - 让主操作成功
}
});
// 更新过滤器钩子
filter('items.update', async (payload, meta, context) => {
const { collection, keys } = meta;
if (collection === 'users') {
// 哈希敏感数据
if (payload.password) {
payload.password = await hashPassword(payload.password);
}
// 跟踪更改
payload.last_modified = new Date().toISOString();
payload.modified_by = context.accountability?.user;
// 验证邮箱更改
if (payload.email) {
const isValid = validateEmail(payload.email);
if (!isValid) {
throw new Error('无效的邮箱格式');
}
}
}
return payload;
});
// 删除动作钩子
action('items.delete', async ({ collection, payload }, context) => {
if (collection === 'files') {
// 清理关联资源
for (const fileId of payload) {
await cleanupFileResources(fileId);
}
// 记录删除
logger.info(`文件已删除:${payload.join(', ')}`);
}
});
// 初始化钩子 - 在启动时运行
init('app.before', async () => {
logger.info('正在初始化自定义钩子...');
// 验证数据库表
await verifyCustomTables();
// 加载配置
await loadConfiguration();
// 注册自定义验证器
registerValidators();
logger.info('自定义钩子初始化成功');
});
// 计划钩子 - 按 cron 计划运行
schedule('0 0 * * *', async () => {
// 每日清理任务
logger.info('正在运行每日清理...');
const schema = await getSchema();
// 清理过期会话
const deleted = await database('directus_sessions')
.where('expires', '<', new Date())
.delete();
logger.info(`清理了 ${deleted} 个过期会话`);
// 生成每日报告
await generateDailyReport(schema);
// 更新搜索索引
await updateSearchIndex();
});
// 计划钩子 - 每 5 分钟
schedule('*/5 * * * *', async () => {
// 检查系统健康状态
const health = await checkSystemHealth();
if (!health.healthy) {
logger.error('系统健康检查失败:', health.issues);
// 发送警报
await sendHealthAlert(health);
}
});
// 辅助函数
function generateSlug(title: string): string {
return title
.toLowerCase()
.replace(/[^\w\s-]/g, '')
.replace(/[\s_-]+/g, '-')
.replace(/^-+|-+$/g, '');
}
function countWords(text: string): number {
return text.trim().split(/\s+/).length;
}
function generateExcerpt(content: string, maxLength: number): string {
const stripped = content.replace(/<[^>]*>/g, '');
if (stripped.length <= maxLength) return stripped;
return stripped.substring(0, maxLength).trim() + '...';
}
async function hashPassword(password: string): Promise<string> {
const hash = createHash('sha256');
hash.update(password + env.KEY);
return hash.digest('hex');
}
function validateEmail(email: string): boolean {
const regex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;
return regex.test(email);
}
async function cleanupFileResources(fileId: string): Promise<void> {
// 清理缩略图
await database('directus_files')
.where('folder', fileId)
.delete();
// 清理 CDN 缓存
if (env.CDN_URL) {
await axios.delete(`${env.CDN_URL}/purge/${fileId}`);
}
}
async function verifyCustomTables(): Promise<void> {
const requiredTables = ['custom_logs', 'custom_statistics'];
for (const table of requiredTables) {
const exists = await database.schema.hasTable(table);
if (!exists) {
logger.warn(`正在创建缺失的表:${table}`);
await database.schema.createTable(table, (t) => {
t.uuid('id').primary();
t.jsonb('data');
t.timestamps(true, true);
});
}
}
}
async function updateStatistics(type: string, action: string): Promise<void> {
await database('custom_statistics')
.insert({
id: database.raw('gen_random_uuid()'),
type,
action,
count: 1,
date: new Date(),
})
.onConflict(['type', 'action', database.raw('DATE(date)')])
.merge({
count: database.raw('custom_statistics.count + 1'),
});
}
async function checkSystemHealth(): Promise<any> {
const health = {
healthy: true,
issues: [],
metrics: {},
};
// 检查数据库连接
try {
await database.raw('SELECT 1');
health.metrics.database = '已连接';
} catch (error) {
health.healthy = false;
health.issues.push('数据库连接失败');
}
// 检查内存使用情况
const memUsage = process.memoryUsage();
const heapUsedMB = Math.round(memUsage.heapUsed / 1024 / 1024);
health.metrics.memory = `${heapUsedMB}MB`;
if (heapUsedMB > 512) {
health.healthy = false;
health.issues.push(`内存使用过高:${heapUsedMB}MB`);
}
return health;
}
});
// src/index.ts
import { defineOperationApi } from '@directus/extensions-sdk';
type Options = {
collection: string;
batchSize: number;
operation: 'archive' | 'delete' | 'export';
conditions: Record<string, any>;
};
export default defineOperationApi<Options>({
id: 'batch-processor',
handler: async ({ collection, batchSize, operation, conditions }, context) => {
const { services, database, getSchema, logger, data } = context;
const { ItemsService } = services;
const schema = await getSchema();
const itemsService = new ItemsService(collection, {
schema,
accountability: {
role: 'admin', // 使用管理员上下文进行操作
},
});
let processed = 0;
let hasMore = true;
const results = [];
while (hasMore) {
// 获取批次
const items = await itemsService.readByQuery({
filter: conditions,
limit: batchSize,
offset: processed,
});
if (items.length === 0) {
hasMore = false;
break;
}
// 根据操作处理批次
for (const item of items) {
try {
switch (operation) {
case 'archive':
await itemsService.updateOne(item.id, {
status: 'archived',
archived_at: new Date(),
});
results.push({ id: item.id, status: 'archived' });
break;
case 'delete':
await itemsService.deleteOne(item.id);
results.push({ id: item.id, status: 'deleted' });
break;
case 'export':
// 转换并准备导出
const exportData = transformForExport(item);
results.push(exportData);
break;
}
processed++;
} catch (error) {
logger.error(`处理项目 ${item.id} 失败:`, error);
results.push({ id: item.id, status: 'error', error: error.message });
}
}
// 检查是否已处理所有项目
if (items.length < batchSize) {
hasMore = false;
}
// 添加延迟以防止过载
await new Promise(resolve => setTimeout(resolve, 100));
}
return {
processed,
results,
summary: {
total: processed,
successful: results.filter(r => r.status !== 'error').length,
failed: results.filter(r => r.status === 'error').length,
},
};
},
});
function transformForExport(item: any): any {
return {
id: item.id,
title: item.title || '未命名',
created: new Date(item.created_at).toISOString(),
data: JSON.stringify(item),
};
}
// src/flow-app.ts
import { defineOperationApp } from '@directus/extensions-sdk';
export default defineOperationApp({
id: 'batch-processor',
name: '批量处理器',
icon: 'library_books',
description: '使用各种操作批量处理项目',
overview: ({ collection, operation, batchSize }) => [
{
label: '集合',
text: collection,
},
{
label: '操作',
text: operation,
},
{
label: '批次大小',
text: String(batchSize),
},
],
options: [
{
field: 'collection',
name: '集合',
type: 'string',
meta: {
width: 'half',
interface: 'system-collection',
},
},
{
field: 'operation',
name: '操作',
type: 'string',
meta: {
width: 'half',
interface: 'select-dropdown',
options: {
choices: [
{ text: '归档', value: 'archive' },
{ text: '删除', value: 'delete' },
{ text: '导出', value: 'export' },
],
},
},
},
{
field: 'batchSize',
name: '批次大小',
type: 'integer',
meta: {
width: 'half',
interface: 'input',
},
schema: {
default_value: 100,
},
},
{
field: 'conditions',
name: '过滤条件',
type: 'json',
meta: {
width: 'full',
interface: 'input-code',
options: {
language: 'json',
},
},
},
],
});
// src/services/analytics.service.ts
import { BaseService } from '@directus/api/services';
import { Knex } from 'knex';
export class AnalyticsService extends BaseService {
private knex: Knex;
private tableName = 'analytics_events';
constructor(options: any) {
super(options);
this.knex = options.knex || options.database;
}
async trackEvent(event: {
category: string;
action: string;
label?: string;
value?: number;
userId?: string;
metadata?: Record<string, any>;
}): Promise<void> {
await this.knex(this.tableName).insert({
id: this.knex.raw('gen_random_uuid()'),
category: event.category,
action: event.action,
label: event.label,
value: event.value,
user_id: event.userId,
metadata: JSON.stringify(event.metadata || {}),
created_at: new Date(),
session_id: this.accountability?.session,
ip_address: this.accountability?.ip,
});
// 更新聚合数据
await this.updateAggregates(event);
}
async getMetrics(options: {
startDate: Date;
endDate: Date;
groupBy: 'hour' | 'day' | 'week' | 'month';
category?: string;
}): Promise<any[]> {
const query = this.knex(this.tableName)
.select(
this.knex.raw(`DATE_TRUNC('${options.groupBy}', created_at) as period`),
'category',
'action',
this.knex.raw('COUNT(*) as count'),
this.knex.raw('COUNT(DISTINCT user_id) as unique_users'),
this.knex.raw('AVG(value) as avg_value')
)
.whereBetween('created_at', [options.startDate, options.endDate])
.groupBy('period', 'category', 'action')
.orderBy('period', 'desc');
if (options.category) {
query.where('category', options.category);
}
return await query;
}
async getUserJourney(userId: string): Promise<any[]> {
return await this.knex(this.tableName)
.where('user_id', userId)
.orderBy('created_at', 'asc')
.select('category', 'action', 'label', 'created_at', 'metadata');
}
async getFunnelAnalysis(steps: string[]): Promise<any> {
const results = {};
for (let i = 0; i < steps.length; i++) {
const step = steps[i];
const query = this.knex(this.tableName)
.countDistinct('user_id as users')
.where('action', step);
if (i > 0) {
// 仅统计完成先前步骤的用户
query.whereIn('user_id', function() {
this.select('user_id')
.from('analytics_events')
.where('action', steps[i - 1]);
});
}
const result = await query.first();
results[step] = result.users;
}
return results;
}
async getCohortAnalysis(cohortDate: Date): Promise<any> {
const cohortQuery = `
WITH cohort_users AS (
SELECT DISTINCT user_id
FROM ${this.tableName}
WHERE DATE(created_at) = DATE(?)
),
retention_data AS (
SELECT
DATE_PART('day', ae.created_at - ?) as days_since_cohort,
COUNT(DISTINCT ae.user_id) as retained_users
FROM ${this.tableName} ae
INNER JOIN cohort_users cu ON ae.user_id = cu.user_id
WHERE ae.created_at >= ?
GROUP BY days_since_cohort
)
SELECT * FROM retention_data
ORDER BY days_since_cohort
`;
return await this.knex.raw(cohortQuery, [cohortDate, cohortDate, cohortDate]);
}
private async updateAggregates(event: any): Promise<void> {
const date = new Date().toISOString().split('T')[0];
await this.knex('analytics_aggregates')
.insert({
id: this.knex.raw('gen_random_uuid()'),
date,
category: event.category,
action: event.action,
count: 1,
sum_value: event.value || 0,
})
.onConflict(['date', 'category', 'action'])
.merge({
count: this.knex.raw('analytics_aggregates.count + 1'),
sum_value: this.knex.raw('analytics_aggregates.sum_value + ?', [event.value || 0]),
});
}
async cleanupOldEvents(daysToKeep: number): Promise<number> {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - daysToKeep);
return await this.knex(this.tableName)
.where('created_at', '<', cutoffDate)
.delete();
}
}
// 复杂数据库查询
import { Knex } from 'knex';
export class DatabaseOperations {
constructor(private database: Knex) {}
// 用于分层数据的递归 CTE
async getHierarchy(parentId: string | null): Promise<any[]> {
const query = `
WITH RECURSIVE category_tree AS (
-- 锚点:顶级分类
SELECT id, name, parent_id, 0 as level, ARRAY[id] as path
FROM categories
WHERE parent_id ${parentId ? '= ?' : 'IS NULL'}
UNION ALL
-- 递归:子分类
SELECT c.id, c.name, c.parent_id, ct.level + 1, ct.path || c.id
FROM categories c
INNER JOIN category_tree ct ON c.parent_id = ct.id
WHERE ct.level < 10 -- 防止无限递归
)
SELECT * FROM category_tree
ORDER BY path
`;
const bindings = parentId ? [parentId] : [];
const result = await this.database.raw(query, bindings);
return result.rows;
}
// 用于分析的窗口函数
async getRunningTotals(startDate: Date, endDate: Date): Promise<any[]> {
const query = `
SELECT
date,
amount,
SUM(amount) OVER (ORDER BY date) as running_total,
AVG(amount) OVER (
ORDER BY date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) as moving_avg_7d,
ROW_NUMBER() OVER (ORDER BY amount DESC) as rank_by_amount
FROM daily_sales
WHERE date BETWEEN ? AND ?
ORDER BY date
`;
const result = await this.database.raw(query, [startDate, endDate]);
return result.rows;
}
// 带排名的全文搜索
async searchContent(searchTerm: string): Promise<any[]> {
const query = `
SELECT
id,
title,
content,
ts_rank(search_vector, query) as relevance,
ts_headline(
'english',
content,
query,
'StartSel=<mark>, StopSel=</mark>, MaxWords=50, MinWords=20'
) as excerpt
FROM articles,
plainto_tsquery('english', ?) as query
WHERE search_vector @@ query
ORDER BY relevance DESC
LIMIT 20
`;
const result = await this.database.raw(query, [searchTerm]);
return result.rows;
}
// 带冲突解决的批量 upsert
async batchUpsert(table: string, records: any[]): Promise<void> {
const chunkSize = 1000;
for (let i = 0; i < records.length; i += chunkSize) {
const chunk = records.slice(i, i + chunkSize);
await this.database(table)
.insert(chunk)
.onConflict('id')
.merge({
updated_at: this.database.fn.now(),
// 仅合并更改的字段
...chunk.reduce((acc, record) => {
Object.keys(record).forEach(key => {
if (key !== 'id' && key !== 'created_at') {
acc[key] = this.database.raw('EXCLUDED.' + key);
}
});
return acc;
}, {}),
});
}
}
// 使用游标进行优化分页
async getCursorPagination(options: {
table: string;
cursor?: string;
limit: number;
orderBy: string;
}): Promise<{ data: any[]; nextCursor: string | null }> {
let query = this.database(options.table)
.orderBy(options.orderBy)
.limit(options.limit + 1);
if (options.cursor) {
const decodedCursor = Buffer.from(options.cursor, 'base64').toString();
query = query.where(options.orderBy, '>', decodedCursor);
}
const results = await query;
const hasMore = results.length > options.limit;
const data = hasMore ? results.slice(0, -1) : results;
const nextCursor = hasMore
? Buffer.from(data[data.length - 1][options.orderBy]).toString('base64')
: null;
return { data, nextCursor };
}
}
// src/auth-provider.ts
import jwt from 'jsonwebtoken';
import bcrypt from 'bcrypt';
import { BaseAuthProvider } from '@directus/api/auth';
export class CustomAuthProvider extends BaseAuthProvider {
async login(credentials: { email: string; password: string }) {
const user = await this.knex('directus_users')
.where('email', credentials.email)
.first();
if (!user) {
throw new Error('无效的凭据');
}
const validPassword = await bcrypt.compare(
credentials.password,
user.password
);
if (!validPassword) {
// 记录失败尝试
await this.logFailedAttempt(user.id);
throw new Error('无效的凭据');
}
// 检查账户是否被锁定
if (user.status !== 'active') {
throw new Error('账户未激活');
}
// 生成令牌
const accessToken = this.generateAccessToken(user);
const refreshToken = await this.generateRefreshToken(user);
// 更新最后登录时间
await this.knex('directus_users')
.where('id', user.id)
.update({
last_access: new Date(),
last_page: '/dashboard',
});
return {
accessToken,
refreshToken,
user: this.sanitizeUser(user),
};
}
async verify(token: string) {
try {
const payload = jwt.verify(token, this.secret) as any;
// 检查令牌在数据库中是否仍然有效
const session = await this.knex('directus_sessions')
.where('token', token)
.where('expires', '>', new Date())
.first();
if (!session) {
throw new Error('会话已过期');
}
return {
id: payload.id,
role: payload.role,
app_access: payload.app_access,
};
} catch (error) {
throw new Error('无效的令牌');
}
}
private generateAccessToken(user: any): string {
return jwt.sign(
{
id: user.id,
role: user.role,
app_access: user.app_access,
email: user.email,
},
this.secret,
{
expiresIn: '15m',
issuer: 'directus',
}
);
}
private async generateRefreshToken(user: any): Promise<string> {
const token = this.generateRandomToken();
await this.knex('directus_sessions').insert({
token,
user: user.id,
expires: new Date(Date.now() + 7 * 24 * 60 * 60 * 1000), // 7 天
ip: this.request?.ip,
user_agent: this.request?.headers['user-agent'],
});
return token;
}
private sanitizeUser(user: any): any {
const { password, ...sanitized } = user;
return sanitized;
}
}
// src/cache/cache.service.ts
import Redis from 'ioredis';
import { LRUCache } from 'lru-cache';
export class CacheService {
private redis: Redis;
private memoryCache: LRUCache<string, any>;
constructor() {
// Redis 用于分布式缓存
this.redis = new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD,
});
// 内存缓存用于热数据
this.memoryCache = new LRUCache({
max: 500,
ttl: 1000 * 60 * 5, // 5 分钟
});
}
async get(key: string): Promise<any | null> {
// 首先检查内存缓存
const memResult = this.memoryCache.get(key);
if (memResult) return memResult;
// 检查 Redis
const redisResult = await this.redis.get(key);
if (redisResult) {
const data = JSON.parse(redisResult);
// 填充内存缓存
this.memoryCache.set(key, data);
return data;
}
return null;
}
async set(key: string, value: any, ttl?: number): Promise<void> {
const serialized = JSON.stringify(value);
// 在两个缓存中都设置
this.memoryCache.set(key, value);
if (ttl) {
await this
This skill provides deep expertise in Directus backend architecture, covering API endpoint extensions, hook systems, service layers, flows and automation, database operations, authentication, and performance optimization. Master the TypeScript/Node.js backend to build scalable, secure, and efficient Directus applications.
directus/
├── api/ # Core API implementation
│ ├── src/
│ │ ├── services/ # Business logic layer
│ │ ├── controllers/ # Request handlers
│ │ ├── middleware/ # Express middleware
│ │ ├── database/ # Database abstraction
│ │ ├── utils/ # Helper functions
│ │ └── types/ # TypeScript definitions
├── extensions/
│ ├── endpoints/ # Custom API endpoints
│ ├── hooks/ # Event hooks
│ ├── operations/ # Flow operations
│ └── services/ # Custom services
└── shared/ # Shared utilities
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
npx create-directus-extension@latest
# Select:
# > endpoint
# > my-custom-api
# > typescript
// src/index.ts
import { defineEndpoint } from '@directus/extensions-sdk';
import { Router } from 'express';
import Joi from 'joi';
export default defineEndpoint((router, context) => {
const { services, database, getSchema, env, logger, emitter } = context;
const { ItemsService, MailService, AssetsService } = services;
// Input validation schema
const createSchema = Joi.object({
title: Joi.string().required().min(3).max(255),
content: Joi.string().required(),
status: Joi.string().valid('draft', 'published').default('draft'),
tags: Joi.array().items(Joi.string()),
metadata: Joi.object(),
});
// GET /custom/analytics
router.get('/analytics', async (req, res, next) => {
try {
// Check authentication
if (!req.accountability?.user) {
return res.status(401).json({
error: 'Unauthorized',
});
}
const schema = await getSchema();
// Use Knex directly for complex queries
const results = await database
.select(
database.raw('DATE(created_at) as date'),
database.raw('COUNT(*) as count'),
database.raw('AVG(amount) as avg_amount')
)
.from('orders')
.where('status', 'completed')
.whereRaw('created_at >= DATE_SUB(NOW(), INTERVAL 30 DAY)')
.groupBy(database.raw('DATE(created_at)'))
.orderBy('date', 'desc');
// Transform results
const analytics = {
daily: results,
total: results.reduce((sum, day) => sum + day.count, 0),
average: results.reduce((sum, day) => sum + day.avg_amount, 0) / results.length,
period: '30_days',
};
return res.json({
data: analytics,
});
} catch (error) {
logger.error('Analytics endpoint error:', error);
return next(error);
}
});
// POST /custom/process
router.post('/process', async (req, res, next) => {
try {
// Validate input
const { error, value } = createSchema.validate(req.body);
if (error) {
return res.status(400).json({
error: 'Validation Error',
details: error.details,
});
}
// Create items service with user context
const itemsService = new ItemsService('articles', {
schema: await getSchema(),
accountability: req.accountability,
});
// Business logic transaction
const result = await database.transaction(async (trx) => {
// Create main item
const article = await itemsService.createOne({
...value,
author: req.accountability?.user,
}, { emitEvents: false });
// Create related items
if (value.tags && value.tags.length > 0) {
const tagsService = new ItemsService('article_tags', {
schema: await getSchema(),
accountability: req.accountability,
knex: trx,
});
await tagsService.createMany(
value.tags.map(tag => ({
article_id: article.id,
tag_name: tag,
}))
);
}
// Emit custom event
emitter.emitAction('custom.article.created', {
article,
user: req.accountability?.user,
});
return article;
});
// Send notification email
if (env.EMAIL_NOTIFICATIONS === 'true') {
const mailService = new MailService({ schema: await getSchema() });
await mailService.send({
to: env.ADMIN_EMAIL,
subject: `New Article: ${result.title}`,
html: `
<h2>New Article Published</h2>
<p><strong>Title:</strong> ${result.title}</p>
<p><strong>Author:</strong> ${req.accountability?.user}</p>
<p><strong>Status:</strong> ${result.status}</p>
`,
});
}
return res.status(201).json({
data: result,
});
} catch (error) {
logger.error('Process endpoint error:', error);
return next(error);
}
});
// DELETE /custom/cleanup
router.delete('/cleanup/:days', async (req, res, next) => {
try {
// Check admin permission
if (req.accountability?.role !== 'admin') {
return res.status(403).json({
error: 'Forbidden',
});
}
const days = parseInt(req.params.days, 10);
if (isNaN(days) || days < 1) {
return res.status(400).json({
error: 'Invalid days parameter',
});
}
// Perform cleanup
const deleted = await database('logs')
.where('created_at', '<', database.raw(`DATE_SUB(NOW(), INTERVAL ? DAY)`, [days]))
.delete();
logger.info(`Cleaned up ${deleted} old log entries`);
return res.json({
data: {
deleted,
message: `Removed ${deleted} log entries older than ${days} days`,
},
});
} catch (error) {
logger.error('Cleanup endpoint error:', error);
return next(error);
}
});
// WebSocket endpoint
router.get('/stream', (req, res) => {
// Server-Sent Events for real-time updates
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
});
// Send initial connection
res.write('data: {"type":"connected"}\n\n');
// Listen for events
const handler = (data: any) => {
res.write(`data: ${JSON.stringify(data)}\n\n`);
};
emitter.onAction('items.create', handler);
emitter.onAction('items.update', handler);
// Cleanup on disconnect
req.on('close', () => {
emitter.offAction('items.create', handler);
emitter.offAction('items.update', handler);
});
});
});
// src/index.ts
import { defineHook } from '@directus/extensions-sdk';
import { createHash } from 'crypto';
import axios from 'axios';
export default defineHook(({ filter, action, init, schedule }, context) => {
const { services, database, getSchema, env, logger } = context;
const { ItemsService, ActivityService } = services;
// Filter hook - runs before database operations
filter('items.create', async (payload, meta, context) => {
// Auto-generate slugs for articles
if (meta.collection === 'articles') {
if (!payload.slug && payload.title) {
payload.slug = generateSlug(payload.title);
}
// Add metadata
payload.word_count = countWords(payload.content || '');
payload.reading_time = Math.ceil(payload.word_count / 200);
// Generate excerpt if not provided
if (!payload.excerpt && payload.content) {
payload.excerpt = generateExcerpt(payload.content, 160);
}
// Validate uniqueness
const schema = await getSchema();
const articlesService = new ItemsService('articles', {
schema,
knex: database,
});
const existing = await articlesService.readByQuery({
filter: { slug: { _eq: payload.slug } },
limit: 1,
});
if (existing.length > 0) {
payload.slug = `${payload.slug}-${Date.now()}`;
}
}
return payload;
});
// Action hook - runs after database operations
action('items.create', async ({ payload, key, collection }, context) => {
try {
if (collection === 'orders') {
// Send to external API
if (env.EXTERNAL_API_URL) {
await axios.post(`${env.EXTERNAL_API_URL}/webhook/order`, {
order_id: key,
...payload,
}, {
headers: {
'X-API-Key': env.EXTERNAL_API_KEY,
},
});
}
// Create audit log
const schema = await getSchema();
const activityService = new ActivityService({
schema,
accountability: context.accountability,
});
await activityService.createOne({
action: 'create',
collection: 'orders',
item: key,
comment: `Order #${key} created with total: ${payload.total}`,
});
// Update statistics
await updateStatistics('orders', 'created');
}
} catch (error) {
logger.error('Order creation hook error:', error);
// Don't throw - let the main operation succeed
}
});
// Filter hook for updates
filter('items.update', async (payload, meta, context) => {
const { collection, keys } = meta;
if (collection === 'users') {
// Hash sensitive data
if (payload.password) {
payload.password = await hashPassword(payload.password);
}
// Track changes
payload.last_modified = new Date().toISOString();
payload.modified_by = context.accountability?.user;
// Validate email changes
if (payload.email) {
const isValid = validateEmail(payload.email);
if (!isValid) {
throw new Error('Invalid email format');
}
}
}
return payload;
});
// Action hook for deletions
action('items.delete', async ({ collection, payload }, context) => {
if (collection === 'files') {
// Clean up associated resources
for (const fileId of payload) {
await cleanupFileResources(fileId);
}
// Log deletion
logger.info(`Files deleted: ${payload.join(', ')}`);
}
});
// Init hook - runs on startup
init('app.before', async () => {
logger.info('Initializing custom hooks...');
// Verify database tables
await verifyCustomTables();
// Load configuration
await loadConfiguration();
// Register custom validators
registerValidators();
logger.info('Custom hooks initialized successfully');
});
// Schedule hook - runs on cron schedule
schedule('0 0 * * *', async () => {
// Daily cleanup task
logger.info('Running daily cleanup...');
const schema = await getSchema();
// Clean expired sessions
const deleted = await database('directus_sessions')
.where('expires', '<', new Date())
.delete();
logger.info(`Cleaned ${deleted} expired sessions`);
// Generate daily report
await generateDailyReport(schema);
// Update search index
await updateSearchIndex();
});
// Schedule hook - every 5 minutes
schedule('*/5 * * * *', async () => {
// Check system health
const health = await checkSystemHealth();
if (!health.healthy) {
logger.error('System health check failed:', health.issues);
// Send alert
await sendHealthAlert(health);
}
});
// Helper functions
function generateSlug(title: string): string {
return title
.toLowerCase()
.replace(/[^\w\s-]/g, '')
.replace(/[\s_-]+/g, '-')
.replace(/^-+|-+$/g, '');
}
function countWords(text: string): number {
return text.trim().split(/\s+/).length;
}
function generateExcerpt(content: string, maxLength: number): string {
const stripped = content.replace(/<[^>]*>/g, '');
if (stripped.length <= maxLength) return stripped;
return stripped.substring(0, maxLength).trim() + '...';
}
async function hashPassword(password: string): Promise<string> {
const hash = createHash('sha256');
hash.update(password + env.KEY);
return hash.digest('hex');
}
function validateEmail(email: string): boolean {
const regex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;
return regex.test(email);
}
async function cleanupFileResources(fileId: string): Promise<void> {
// Clean thumbnails
await database('directus_files')
.where('folder', fileId)
.delete();
// Clean CDN cache
if (env.CDN_URL) {
await axios.delete(`${env.CDN_URL}/purge/${fileId}`);
}
}
async function verifyCustomTables(): Promise<void> {
const requiredTables = ['custom_logs', 'custom_statistics'];
for (const table of requiredTables) {
const exists = await database.schema.hasTable(table);
if (!exists) {
logger.warn(`Creating missing table: ${table}`);
await database.schema.createTable(table, (t) => {
t.uuid('id').primary();
t.jsonb('data');
t.timestamps(true, true);
});
}
}
}
async function updateStatistics(type: string, action: string): Promise<void> {
await database('custom_statistics')
.insert({
id: database.raw('gen_random_uuid()'),
type,
action,
count: 1,
date: new Date(),
})
.onConflict(['type', 'action', database.raw('DATE(date)')])
.merge({
count: database.raw('custom_statistics.count + 1'),
});
}
async function checkSystemHealth(): Promise<any> {
const health = {
healthy: true,
issues: [],
metrics: {},
};
// Check database connection
try {
await database.raw('SELECT 1');
health.metrics.database = 'connected';
} catch (error) {
health.healthy = false;
health.issues.push('Database connection failed');
}
// Check memory usage
const memUsage = process.memoryUsage();
const heapUsedMB = Math.round(memUsage.heapUsed / 1024 / 1024);
health.metrics.memory = `${heapUsedMB}MB`;
if (heapUsedMB > 512) {
health.healthy = false;
health.issues.push(`High memory usage: ${heapUsedMB}MB`);
}
return health;
}
});
// src/index.ts
import { defineOperationApi } from '@directus/extensions-sdk';
type Options = {
collection: string;
batchSize: number;
operation: 'archive' | 'delete' | 'export';
conditions: Record<string, any>;
};
export default defineOperationApi<Options>({
id: 'batch-processor',
handler: async ({ collection, batchSize, operation, conditions }, context) => {
const { services, database, getSchema, logger, data } = context;
const { ItemsService } = services;
const schema = await getSchema();
const itemsService = new ItemsService(collection, {
schema,
accountability: {
role: 'admin', // Use admin context for operations
},
});
let processed = 0;
let hasMore = true;
const results = [];
while (hasMore) {
// Fetch batch
const items = await itemsService.readByQuery({
filter: conditions,
limit: batchSize,
offset: processed,
});
if (items.length === 0) {
hasMore = false;
break;
}
// Process batch based on operation
for (const item of items) {
try {
switch (operation) {
case 'archive':
await itemsService.updateOne(item.id, {
status: 'archived',
archived_at: new Date(),
});
results.push({ id: item.id, status: 'archived' });
break;
case 'delete':
await itemsService.deleteOne(item.id);
results.push({ id: item.id, status: 'deleted' });
break;
case 'export':
// Transform and prepare for export
const exportData = transformForExport(item);
results.push(exportData);
break;
}
processed++;
} catch (error) {
logger.error(`Failed to process item ${item.id}:`, error);
results.push({ id: item.id, status: 'error', error: error.message });
}
}
// Check if we've processed all items
if (items.length < batchSize) {
hasMore = false;
}
// Add delay to prevent overload
await new Promise(resolve => setTimeout(resolve, 100));
}
return {
processed,
results,
summary: {
total: processed,
successful: results.filter(r => r.status !== 'error').length,
failed: results.filter(r => r.status === 'error').length,
},
};
},
});
function transformForExport(item: any): any {
return {
id: item.id,
title: item.title || 'Untitled',
created: new Date(item.created_at).toISOString(),
data: JSON.stringify(item),
};
}
// src/flow-app.ts
import { defineOperationApp } from '@directus/extensions-sdk';
export default defineOperationApp({
id: 'batch-processor',
name: 'Batch Processor',
icon: 'library_books',
description: 'Process items in batches with various operations',
overview: ({ collection, operation, batchSize }) => [
{
label: 'Collection',
text: collection,
},
{
label: 'Operation',
text: operation,
},
{
label: 'Batch Size',
text: String(batchSize),
},
],
options: [
{
field: 'collection',
name: 'Collection',
type: 'string',
meta: {
width: 'half',
interface: 'system-collection',
},
},
{
field: 'operation',
name: 'Operation',
type: 'string',
meta: {
width: 'half',
interface: 'select-dropdown',
options: {
choices: [
{ text: 'Archive', value: 'archive' },
{ text: 'Delete', value: 'delete' },
{ text: 'Export', value: 'export' },
],
},
},
},
{
field: 'batchSize',
name: 'Batch Size',
type: 'integer',
meta: {
width: 'half',
interface: 'input',
},
schema: {
default_value: 100,
},
},
{
field: 'conditions',
name: 'Filter Conditions',
type: 'json',
meta: {
width: 'full',
interface: 'input-code',
options: {
language: 'json',
},
},
},
],
});
// src/services/analytics.service.ts
import { BaseService } from '@directus/api/services';
import { Knex } from 'knex';
export class AnalyticsService extends BaseService {
private knex: Knex;
private tableName = 'analytics_events';
constructor(options: any) {
super(options);
this.knex = options.knex || options.database;
}
async trackEvent(event: {
category: string;
action: string;
label?: string;
value?: number;
userId?: string;
metadata?: Record<string, any>;
}): Promise<void> {
await this.knex(this.tableName).insert({
id: this.knex.raw('gen_random_uuid()'),
category: event.category,
action: event.action,
label: event.label,
value: event.value,
user_id: event.userId,
metadata: JSON.stringify(event.metadata || {}),
created_at: new Date(),
session_id: this.accountability?.session,
ip_address: this.accountability?.ip,
});
// Update aggregates
await this.updateAggregates(event);
}
async getMetrics(options: {
startDate: Date;
endDate: Date;
groupBy: 'hour' | 'day' | 'week' | 'month';
category?: string;
}): Promise<any[]> {
const query = this.knex(this.tableName)
.select(
this.knex.raw(`DATE_TRUNC('${options.groupBy}', created_at) as period`),
'category',
'action',
this.knex.raw('COUNT(*) as count'),
this.knex.raw('COUNT(DISTINCT user_id) as unique_users'),
this.knex.raw('AVG(value) as avg_value')
)
.whereBetween('created_at', [options.startDate, options.endDate])
.groupBy('period', 'category', 'action')
.orderBy('period', 'desc');
if (options.category) {
query.where('category', options.category);
}
return await query;
}
async getUserJourney(userId: string): Promise<any[]> {
return await this.knex(this.tableName)
.where('user_id', userId)
.orderBy('created_at', 'asc')
.select('category', 'action', 'label', 'created_at', 'metadata');
}
async getFunnelAnalysis(steps: string[]): Promise<any> {
const results = {};
for (let i = 0; i < steps.length; i++) {
const step = steps[i];
const query = this.knex(this.tableName)
.countDistinct('user_id as users')
.where('action', step);
if (i > 0) {
// Only count users who completed previous steps
query.whereIn('user_id', function() {
this.select('user_id')
.from('analytics_events')
.where('action', steps[i - 1]);
});
}
const result = await query.first();
results[step] = result.users;
}
return results;
}
async getCohortAnalysis(cohortDate: Date): Promise<any> {
const cohortQuery = `
WITH cohort_users AS (
SELECT DISTINCT user_id
FROM ${this.tableName}
WHERE DATE(created_at) = DATE(?)
),
retention_data AS (
SELECT
DATE_PART('day', ae.created_at - ?) as days_since_cohort,
COUNT(DISTINCT ae.user_id) as retained_users
FROM ${this.tableName} ae
INNER JOIN cohort_users cu ON ae.user_id = cu.user_id
WHERE ae.created_at >= ?
GROUP BY days_since_cohort
)
SELECT * FROM retention_data
ORDER BY days_since_cohort
`;
return await this.knex.raw(cohortQuery, [cohortDate, cohortDate, cohortDate]);
}
private async updateAggregates(event: any): Promise<void> {
const date = new Date().toISOString().split('T')[0];
await this.knex('analytics_aggregates')
.insert({
id: this.knex.raw('gen_random_uuid()'),
date,
category: event.category,
action: event.action,
count: 1,
sum_value: event.value || 0,
})
.onConflict(['date', 'category', 'action'])
.merge({
count: this.knex.raw('analytics_aggregates.count + 1'),
sum_value: this.knex.raw('analytics_aggregates.sum_value + ?', [event.value || 0]),
});
}
async cleanupOldEvents(daysToKeep: number): Promise<number> {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - daysToKeep);
return await this.knex(this.tableName)
.where('created_at', '<', cutoffDate)
.delete();
}
}
// Complex database queries
import { Knex } from 'knex';
export class DatabaseOperations {
constructor(private database: Knex) {}
// Recursive CTE for hierarchical data
async getHierarchy(parentId: string | null): Promise<any[]> {
const query = `
WITH RECURSIVE category_tree AS (
-- Anchor: top-level categories
SELECT id, name, parent_id, 0 as level, ARRAY[id] as path
FROM categories
WHERE parent_id ${parentId ? '= ?' : 'IS NULL'}
UNION ALL
-- Recursive: child categories
SELECT c.id, c.name, c.parent_id, ct.level + 1, ct.path || c.id
FROM categories c
INNER JOIN category_tree ct ON c.parent_id = ct.id
WHERE ct.level < 10 -- Prevent infinite recursion
)
SELECT * FROM category_tree
ORDER BY path
`;
const bindings = parentId ? [parentId] : [];
const result = await this.database.raw(query, bindings);
return result.rows;
}
// Window functions for analytics
async getRunningTotals(startDate: Date, endDate: Date): Promise<any[]> {
const query = `
SELECT
date,
amount,
SUM(amount) OVER (ORDER BY date) as running_total,
AVG(amount) OVER (
ORDER BY date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) as moving_avg_7d,
ROW_NUMBER() OVER (ORDER BY amount DESC) as rank_by_amount
FROM daily_sales
WHERE date BETWEEN ? AND ?
ORDER BY date
`;
const result = await this.database.raw(query, [startDate, endDate]);
return result.rows;
}
// Full-text search with ranking
async searchContent(searchTerm: string): Promise<any[]> {
const query = `
SELECT
id,
title,
content,
ts_rank(search_vector, query) as relevance,
ts_headline(
'english',
content,
query,
'StartSel=<mark>, StopSel=</mark>, MaxWords=50, MinWords=20'
) as excerpt
FROM articles,
plainto_tsquery('english', ?) as query
WHERE search_vector @@ query
ORDER BY relevance DESC
LIMIT 20
`;
const result = await this.database.raw(query, [searchTerm]);
return result.rows;
}
// Batch upsert with conflict resolution
async batchUpsert(table: string, records: any[]): Promise<void> {
const chunkSize = 1000;
for (let i = 0; i < records.length; i += chunkSize) {
const chunk = records.slice(i, i + chunkSize);
await this.database(table)
.insert(chunk)
.onConflict('id')
.merge({
updated_at: this.database.fn.now(),
// Merge only changed fields
...chunk.reduce((acc, record) => {
Object.keys(record).forEach(key => {
if (key !== 'id' && key !== 'created_at') {
acc[key] = this.database.raw('EXCLUDED.' + key);
}
});
return acc;
}, {}),
});
}
}
// Optimized pagination with cursor
async getCursorPagination(options: {
table: string;
cursor?: string;
limit: number;
orderBy: string;
}): Promise<{ data: any[]; nextCursor: string | null }> {
let query = this.database(options.table)
.orderBy(options.orderBy)
.limit(options.limit + 1);
if (options.cursor) {
const decodedCursor = Buffer.from(options.cursor, 'base64').toString();
query = query.where(options.orderBy, '>', decodedCursor);
}
const results = await query;
const hasMore = results.length > options.limit;
const data = hasMore ? results.slice(0, -1) : results;
const nextCursor = hasMore
? Buffer.from(data[data.length - 1][options.orderBy]).toString('base64')
: null;
return { data, nextCursor };
}
}
// src/auth-provider.ts
import jwt from 'jsonwebtoken';
import bcrypt from 'bcrypt';
import { BaseAuthProvider } from '@directus/api/auth';
export class CustomAuthProvider extends BaseAuthProvider {
async login(credentials: { email: string; password: string }) {
const user = await this.knex('directus_users')
.where('email', credentials.email)
.first();
if (!user) {
throw new Error('Invalid credentials');
}
const validPassword = await bcrypt.compare(
credentials.password,
user.password
);
if (!validPassword) {
// Log failed attempt
await this.logFailedAttempt(user.id);
throw new Error('Invalid credentials');
}
// Check if account is locked
if (user.status !== 'active') {
throw new Error('Account is not active');
}
// Generate tokens
const accessToken = this.generateAccessToken(user);
const refreshToken = await this.generateRefreshToken(user);
// Update last login
await this.knex('directus_users')
.where('id', user.id)
.update({
last_access: new Date(),
last_page: '/dashboard',
});
return {
accessToken,
refreshToken,
user: this.sanitizeUser(user),
};
}
async verify(token: string) {
try {
const payload = jwt.verify(token, this.secret) as any;
// Check if token is still valid in database
const session = await this.knex('directus_sessions')
.where('token', token)
.where('expires', '>', new Date())
.first();
if (!session) {
throw new Error('Session expired');
}
return {
id: payload.id,
role: payload.role,
app_access: payload.app_access,
};
} catch (error) {
throw new Error('Invalid token');
}
}
private generateAccessToken(user: any): string {
return jwt.sign(
{
id: user.id,
role: user.role,
app_access: user.app_access,
email: user.email,
},
this.secret,
{
expiresIn: '15m',
issuer: 'directus',
}
);
}
private async generateRefreshToken(user: any): Promise<string> {
const token = this.generateRandomToken();
await this.knex('directus_sessions').insert({
token,
user: user.id,
expires: new Date(Date.now() + 7 * 24 * 60 * 60 * 1000), // 7 days
ip: this.request?.ip,
user_agent: this.request?.headers['user-agent'],
});
return token;
}
private sanitizeUser(user: any): any {
const { password, ...sanitized } = user;
return sanitized;
}
}
// src/cache/cache.service.ts
import Redis from 'ioredis';
import { LRUCache } from 'lru-cache';
export class CacheService {
private redis: Redis;
private memoryCache: LRUCache<string, any>;
constructor() {
// Redis for distributed cache
this.redis = new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
password: process.env.REDIS_PASSWORD,
});
// Memory cache for hot data
this.memoryCache = new LRUCache({
max: 500,
ttl: 1000 * 60 * 5, // 5 minutes
});
}
async get(key: string): Promise<any | null> {
// Check memory cache first
const memResult = this.memoryCache.get(key);
if (memResult) return memResult;
// Check Redis
const redisResult = await this.redis.get(key);
if (redisResult) {
const data = JSON.parse(redisResult);
// Populate memory cache
this.memoryCache.set(key, data);
return data;
}
return null;
}
async set(key: string, value: any, ttl?: number): Promise<void> {
const serialized = JSON.stringify(value);
// Set in both caches
this.memoryCache.set(key, value);
if (ttl) {
await this.redis.setex(key, ttl, serialized);
} else {
await this.redis.set(key, serialized);
}
}
async invalidate(pattern: string): Promise<void> {
// Clear from memory cache
for (const key of this.memoryCache.keys()) {
if (key.includes(pattern)) {
this.memoryCache.delete(key);
}
}
// Clear from Redis
const keys = await this.redis.keys(`*${pattern}*`);
if (keys.length > 0) {
await this.redis.del(...keys);
}
}
// Cache wrapper for database queries
async remember<T>(
key: string,
ttl: number,
callback: () => Promise<T>
): Promise<T> {
const cached = await this.get(key);
if (cached) return cached;
const fresh = await callback();
await this.set(key, fresh, ttl);
return fresh;
}
}
// src/optimization/query-optimizer.ts
export class QueryOptimizer {
constructor(private knex: Knex) {}
// Optimize N+1 queries with dataloader pattern
async batchLoadRelations<T>(
items: T[],
relation: string,
foreignKey: string
): Promise<Map<string, any[]>> {
const ids = items.map(item => item[foreignKey]);
const relations = await this.knex(relation)
.whereIn(foreignKey, ids)
.select();
// Group by foreign key
const grouped = new Map<string, any[]>();
for (const rel of relations) {
const key = rel[foreignKey];
if (!grouped.has(key)) {
grouped.set(key, []);
}
grouped.get(key)!.push(rel);
}
return grouped;
}
// Index hints for specific queries
async optimizedSearch(table: string, conditions: any): Promise<any[]> {
return await this.knex.raw(`
SELECT /*+ INDEX(${table} idx_search) */ *
FROM ${table}
WHERE ?
ORDER BY created_at DESC
LIMIT 100
`, [conditions]);
}
// Query result streaming for large datasets
streamLargeDataset(table: string, batchSize: number = 1000) {
return this.knex(table)
.select()
.stream({ highWaterMark: batchSize });
}
// Explain query execution plan
async explainQuery(query: Knex.QueryBuilder): Promise<any> {
const sql = query.toSQL();
const result = await this.knex.raw(`EXPLAIN ANALYZE ${sql.sql}`, sql.bindings);
return result.rows;
}
}
// test/endpoints.test.ts
import { describe, it, expect, beforeAll, afterAll } from 'vitest';
import request from 'supertest';
import { createDirectus } from '@directus/sdk';
describe('Custom Endpoints', () => {
let app;
let authToken;
beforeAll(async () => {
app = await createTestApp();
authToken = await getAuthToken();
});
afterAll(async () => {
await cleanupTestData();
});
describe('POST /custom/process', () => {
it('should create article with valid data', async () => {
const response = await request(app)
.post('/custom/process')
.set('Authorization', `Bearer ${authToken}`)
.send({
title: 'Test Article',
content: 'Test content',
status: 'draft',
tags: ['test', 'api'],
});
expect(response.status).toBe(201);
expect(response.body.data).toHaveProperty('id');
expect(response.body.data.title).toBe('Test Article');
});
it('should validate input data', async () => {
const response = await request(app)
.post('/custom/process')
.set('Authorization', `Bearer ${authToken}`)
.send({
// Missing required fields
status: 'invalid',
});
expect(response.status).toBe(400);
expect(response.body.error).toBe('Validation Error');
});
it('should handle unauthorized requests', async () => {
const response = await request(app)
.post('/custom/process')
.send({ title: 'Test' });
expect(response.status).toBe(401);
});
});
describe('GET /custom/analytics', () => {
it('should return analytics data', async () => {
const response = await request(app)
.get('/custom/analytics')
.set('Authorization', `Bearer ${authToken}`);
expect(response.status).toBe(200);
expect(response.body.data).toHaveProperty('daily');
expect(response.body.data).toHaveProperty('total');
expect(response.body.data).toHaveProperty('average');
});
it('should respect date filters', async () => {
const response = await request(app)
.get('/custom/analytics')
.query({
start_date: '2024-01-01',
end_date: '2024-01-31',
})
.set('Authorization', `Bearer ${authToken}`);
expect(response.status).toBe(200);
expect(response.body.data.period).toBe('30_days');
});
});
});
// migrations/001_create_analytics_tables.ts
import { Knex } from 'knex';
export async function up(knex: Knex): Promise<void> {
// Create analytics events table
await knex.schema.createTable('analytics_events', (table) => {
table.uuid('id').primary().defaultTo(knex.raw('gen_random_uuid()'));
table.string('category', 50).notNullable();
table.string('action', 50).notNullable();
table.string('label', 100);
table.decimal('value', 10, 2);
table.uuid('user_id').references('id').inTable('directus_users');
table.jsonb('metadata');
table.string('session_id', 100);
table.string('ip_address', 45);
table.timestamp('created_at').defaultTo(knex.fn.now());
// Indexes for performance
table.index(['category', 'action']);
table.index('user_id');
table.index('created_at');
table.index(['category', 'created_at']);
});
// Create aggregates table
await knex.schema.createTable('analytics_aggregates', (table) => {
table.uuid('id').primary().defaultTo(knex.raw('gen_random_uuid()'));
table.date('date').notNullable();
table.string('category', 50).notNullable();
table.string('action', 50).notNullable();
table.integer('count').defaultTo(0);
table.decimal('sum_value', 12, 2).defaultTo(0);
table.integer('unique_users').defaultTo(0);
table.timestamps(true, true);
// Composite unique constraint
table.unique(['date', 'category', 'action']);
table.index('date');
});
// Add trigger for auto-aggregation
await knex.raw(`
CREATE OR REPLACE FUNCTION update_analytics_aggregates()
RETURNS TRIGGER AS $$
BEGIN
INSERT INTO analytics_aggregates (date, category, action, count, sum_value)
VALUES (DATE(NEW.created_at), NEW.category, NEW.action, 1, COALESCE(NEW.value, 0))
ON CONFLICT (date, category, action)
DO UPDATE SET
count = analytics_aggregates.count + 1,
sum_value = analytics_aggregates.sum_value + COALESCE(NEW.value, 0);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER analytics_events_aggregate
AFTER INSERT ON analytics_events
FOR EACH ROW
EXECUTE FUNCTION update_analytics_aggregates();
`);
}
export async function down(knex: Knex): Promise<void> {
await knex.raw('DROP TRIGGER IF EXISTS analytics_events_aggregate ON analytics_events');
await knex.raw('DROP FUNCTION IF EXISTS update_analytics_aggregates()');
await knex.schema.dropTableIfExists('analytics_aggregates');
await knex.schema.dropTableIfExists('analytics_events');
}
// src/logger/custom-logger.ts
import winston from 'winston';
import { ElasticsearchTransport } from 'winston-elasticsearch';
export class CustomLogger {
private logger: winston.Logger;
constructor() {
this.logger = winston.createLogger({
level: process.env.LOG_LEVEL || 'info',
format: winston.format.combine(
winston.format.timestamp(),
winston.format.errors({ stack: true }),
winston.format.json()
),
defaultMeta: {
service: 'directus-backend',
environment: process.env.NODE_ENV,
},
transports: [
// Console transport
new winston.transports.Console({
format: winston.format.combine(
winston.format.colorize(),
winston.format.simple()
),
}),
// File transport
new winston.transports.File({
filename: 'logs/error.log',
level: 'error',
maxsize: 5242880, // 5MB
maxFiles: 5,
}),
new winston.transports.File({
filename: 'logs/combined.log',
maxsize: 5242880,
maxFiles: 5,
}),
// Elasticsearch transport for centralized logging
new ElasticsearchTransport({
level: 'info',
clientOpts: {
node: process.env.ELASTICSEARCH_URL || 'http://localhost:9200',
},
index: 'directus-logs',
}),
],
});
}
info(message: string, meta?: any): void {
this.logger.info(message, meta);
}
error(message: string, error?: Error, meta?: any): void {
this.logger.error(message, {
...meta,
error: {
message: error?.message,
stack: error?.stack,
name: error?.name,
},
});
}
warn(message: string, meta?: any): void {
this.logger.warn(message, meta);
}
debug(message: string, meta?: any): void {
this.logger.debug(message, meta);
}
// Performance logging
logPerformance(operation: string, duration: number, meta?: any): void {
this.logger.info(`Performance: ${operation}`, {
...meta,
duration_ms: duration,
slow: duration > 1000,
});
}
// Audit logging
logAudit(action: string, userId: string, details: any): void {
this.logger.info(`Audit: ${action}`, {
audit: true,
user_id: userId,
action,
details,
timestamp: new Date().toISOString(),
});
}
}
Weekly Installs
–
Repository
First Seen
–
Security Audits
飞书OpenAPI Explorer:探索和调用未封装的飞书原生API接口
19,800 周安装