日志异步落库方案设计
2026/5/24大约 6 分钟
日志异步落库方案设计
概述
项目采用 生产者-消费者 架构实现日志的异步持久化。admin 模块负责生产日志消息并发送到 Kafka,async 模块负责消费 Kafka 消息并双写 MySQL 和 Elasticsearch,实现业务请求与日志落库的解耦。
整体架构
flowchart TD
subgraph "admin 模块(生产者)"
A[HTTP 请求] --> B[ApiLoggerFilter]
B --> C[ApiLoggerService.sendApiLogToDBMQ/ESMQ]
C --> D["@Async(\"loggerTaskExecutor\")"]
D --> E[KafkaTemplate.send()]
A --> F[Controller @ApiOperationLog]
F --> G[ApiOperationLogAspect]
G --> H[OperationLoggerService]
H --> I[sendOperationLogToDBMQ/ESMQ]
I --> J["@Async(\"loggerTaskExecutor\")"]
J --> K[KafkaTemplate.send()]
end
subgraph "Kafka Broker"
L["Topics:\n- api-log-db-topic\n- api-log-es-topic\n- operation-log-db-topic\n- operation-log-es-topic"]
end
subgraph "async 模块(消费者)"
M[ApiLoggerDBListener] --> N[MySQL api_log_yyyyMMdd]
O[ApiLoggerESListener] --> P[ES api_log_yyyyMMdd]
Q[OperationLoggerDBListener] --> R[MySQL opl_log_yyyyMMdd]
S[OperationLoggerESListener] --> T[ES opl_log_yyyyMMdd]
end
E --> L
K --> L
L --> M
L --> O
L --> Q
L --> S日志分类与数据流
接口请求日志
接口日志特点
- 入口:
ApiLoggerFilter(OncePerRequestFilter),拦截所有非白名单请求 - 记录内容: 请求路径、请求体、响应体、耗时、来源 IP、操作人、请求状态、异常信息
- 触发时机: 每个 HTTP 请求结束后(
finally块) - 发送方法:
ApiLoggerService.sendApiLogToDBMQ()/sendApiLogToESMQ()
操作审计日志
操作日志特点
- 入口:
ApiOperationLogAspect(AOP 切面),拦截标注了@ApiOperationLog的方法 - 记录内容: 类名、方法名、入参、返回结果、耗时、操作人、来源 IP、异常堆栈
- 触发时机: 方法执行成功/异常后
- 发送方法:
OperationLoggerService.sendOperationLogToDBMQ()/sendOperationLogToESMQ()
数据流差异
| 维度 | MySQL (DB Topic) | Elasticsearch (ES Topic) |
|---|---|---|
| 🗄️ 存储内容 | 结构化字段(主键、路径、状态、耗时等) | 全量数据(含请求体、响应体、异常堆栈) |
| 🔍 用途 | 业务查询、统计分析 | 全文检索、问题排查 |
| 📅 分片策略 | 按日分表 yyyyMMdd | 按日索引 yyyyMMdd |
Kafka 主题设计
查看主题详细配置
| 主题名 | 分区数 | 消息 Key | 用途 |
|---|---|---|---|
api-log-db-topic | 3 | api-log-db-key | 接口日志 → MySQL |
api-log-es-topic | 3 | api-log-es-key | 接口日志 → ES |
operation-log-db-topic | 3 | operation-log-db-key | 操作日志 → MySQL |
operation-log-es-topic | 3 | operation-log-es-key | 操作日志 → ES |
提示: 消息 value 为 JSON 字符串,发送到 partition 0(可通过
kafkaTemplate.send(topic, 0, key, json)指定)。
生产者设计(admin 模块)
异步发送机制
AsyncConfig.java
// AsyncConfig.java — 线程池配置
@Configuration
@EnableAsync
public class AsyncConfig {
@Bean("loggerTaskExecutor")
public Executor loggerTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2); // 核心线程数
executor.setMaxPoolSize(8); // 最大线程数
executor.setQueueCapacity(256); // 等待队列容量
executor.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy() // 队列满时由调用线程执行
);
return executor;
}
}ApiLoggerServiceImpl.java
// ApiLoggerServiceImpl.java — @Async 方法示例
@Override
@Async("loggerTaskExecutor")
public void sendApiLogToDBMQ(CreateApiLogDBDTO dto) {
sendMessage(dto, LoggerConstant.API_LOG_DB_TOPIC, LoggerConstant.API_LOG_DB_TOPIC_KEY);
}发送失败处理
// BaseLoggerServiceImpl.sendMessage()
kafkaTemplate.send(topic, 0, key, json)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Kafka发送失败 topic={}, key={}", topic, key, ex);
}
});失败处理策略
发送失败时仅记录错误日志,不阻塞业务请求,不抛异常。
服务类继承关系
flowchart TB
A[BaseLoggerService 接口] --> B[BaseLoggerServiceImpl 基础实现]
B --> C[ApiLoggerServiceImpl]
B --> D[OperationLoggerServiceImpl]消费者设计(async 模块)
监听器列表
监听器配置
| 监听器 | 消费主题 | 落库目标 |
|---|---|---|
ApiLoggerDBListener | api-log-db-topic | MySQL api_log_yyyyMMdd |
ApiLoggerESListener | api-log-es-topic | ES api_log_yyyyMMdd |
OperationLoggerDBListener | operation-log-db-topic | MySQL opl_log_yyyyMMdd |
OperationLoggerESListener | operation-log-es-topic | ES opl_log_yyyyMMdd |
反序列化示例
OperationLoggerDBListener.java
// 示例:OperationLoggerDBListener
@KafkaListener(topics = "operation-log-db-topic", groupId = "logger-group")
public void onMessage(String message) {
CreateOperationLogDBDTO dto = objectMapper.readValue(message, CreateOperationLogDBDTO.class);
// 写入 MySQL...
}StringDeserializer 配置
spring:
kafka:
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer问题分析与解决方案
原始问题:接口响应缓慢
问题根因
根因: ApiLoggerFilter 拦截每个请求后,在请求线程内同步调用 kafkaTemplate.send(),每次请求发送 2 条消息(DB + ES),每条消息需要等待 broker 确认。若 Kafka broker 不可达,默认超时时间高达 120 秒,请求线程被长时间阻塞。
修改方案
解决方案要点
- 异步化: 在
sendApiLogToDBMQ/ESMQ和sendOperationLogToDBMQ/ESMQ方法上添加@Async("loggerTaskExecutor") - 返回值调整:
@Async要求返回void或Future,将所有发送方法的返回值从Boolean改为void - 超时兜底: 在 application.yml 中配置 producer 超时参数,防止极端情况下异步线程也被长时间阻塞
- 失败可见:
sendMessage中添加whenComplete回调,发送失败时输出错误日志
修改前后对比
性能对比
修改前:
请求线程 → sendApiLogToDBMQ() [阻塞] → sendApiLogToESMQ() [阻塞] → 返回响应
每次请求增加 2 次 Kafka 往返延迟修改后:
请求线程 → sendApiLogToDBMQ() [提交到线程池,立即返回]
→ sendApiLogToESMQ() [提交到线程池,立即返回]
→ 返回响应(无额外延迟)
loggerTaskExecutor 线程 → 异步完成 Kafka 发送关键配置
admin 模块 — Kafka Producer
application.yml
spring:
kafka:
bootstrap-servers: 192.168.1.16:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
max.block.ms: 3000 # 发送缓冲区满时最大阻塞时间
delivery.timeout.ms: 5000 # 消息投递总超时
request.timeout.ms: 3000 # 单次请求超时
linger.ms: 5 # 批量发送等待时间(低延迟优先)async 模块 — Kafka Consumer
application.yml
spring:
kafka:
bootstrap-servers: 192.168.1.16:9092
consumer:
group-id: logger-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer异步线程池配置
| 参数 | 值 | 说明 |
|---|---|---|
| corePoolSize | 2 | 核心线程数 |
| maxPoolSize | 8 | 最大线程数 |
| queueCapacity | 256 | 等待队列容量 |
| rejectedExecutionHandler | CallerRunsPolicy | 队列满时由调用线程执行 |
数据库与 ES 索引设计
MySQL 分表策略
分表规则
接口日志和操作日志均按日分表,表名格式 {prefix}_yyyyMMdd:
| 日志类型 | 表名前缀 | 示例 |
|---|---|---|
| 接口日志 | api_log | api_log_20260523 |
| 操作日志 | opl_log | opl_log_20260523 |
消费者写入前检查表是否存在,不存在则自动创建。
ES 索引策略
按日创建索引,索引名格式 {prefix}_yyyyMMdd,写入时指定 createTime 路由到对应索引。
异常处理与兜底
异常处理策略
| 场景 | 处理方式 |
|---|---|
| Kafka broker 不可达 | whenComplete 回调记录错误日志;业务请求不受影响 |
| 异步线程池队列满 | CallerRunsPolicy 降级为同步发送,防止消息丢失 |
| 消息序列化失败 | catch 块输出错误日志;不抛异常 |
| 消费者写入 MySQL 失败 | 消费者内 catch 异常记录日志;Kafka offset 不变,消息可重试 |
| 消费者写入 ES 失败 | 同上 |
| Topic 不存在 | 生产者 whenComplete 报 TimeoutException;手动创建 topic 后恢复 |