OpenClaw 重构实战:3步优化 WhatsApp 媒体发送状态共享机制
——
OpenClaw 重构实战:3步优化 WhatsApp 媒体发送状态共享机制
一句话总结
本次更新通过重构 WhatsApp 媒体发送状态的共享机制,解决了多模块间状态同步不一致的问题,让 OpenClaw 的 AI Agent 在处理图片、视频等媒体消息时更加稳定可靠。
—
为什么需要这次重构?
在 AI Agent 与 WhatsApp 集成的场景中,媒体消息(图片、音频、视频、文档)的发送状态管理一直是开发者的痛点。当多个模块需要同时追踪同一条媒体消息的发送进度时,状态分散存储会导致以下问题:
- 发送进度不同步,用户看到”发送中”和”已发送”反复跳变
- 重试机制触发混乱,同一媒体可能被重复发送
- 错误处理困难,无法准确定位失败环节
本次 GitHub Commit 59c84f8 的核心改进,正是将分散的媒体发送状态整合为统一可共享的状态源。
—
重构前后的架构对比
重构前:状态孤岛问题
// ❌ 旧方案:每个模块独立维护状态
class WhatsAppMediaSender {
private uploadProgress = 0; // 上传模块状态
private sendStatus = 'pending'; // 发送模块状态
async sendMedia(file) {
// 上传和发送状态无法实时同步给其他模块
await this.uploadToWhatsApp(file);
await this.sendMessage(file);
}
}
// 另一个模块想获取状态?只能轮询或回调,耦合严重
class MessageLogger {
checkStatus() {
return sender.getStatus(); // 可能拿到过期数据
}
}
重构后:集中式状态共享
// ✅ 新方案:共享状态存储(Shared State Store)
interface MediaSendState {
mediaId: string;
stage: 'preparing' | 'uploading' | 'sending' | 'completed' | 'failed';
progress: number; // 0-100
error?: Error;
timestamp: number;
}
// 全局状态管理器,支持订阅式更新
class MediaSendStateManager {
private stateMap = new Map();
private subscribers = new Map>();
// 任何模块都可以订阅特定媒体的状态变化
subscribe(mediaId: string, listener: StateListener): () => void {
if (!this.subscribers.has(mediaId)) {
this.subscribers.set(mediaId, new Set());
}
this.subscribers.get(mediaId)!.add(listener);
// 返回取消订阅函数
return () => this.subscribers.get(mediaId)?.delete(listener);
}
// 状态变更时自动通知所有订阅者
updateState(mediaId: string, update: Partial) {
const current = this.stateMap.get(mediaId) || {} as MediaSendState;
const newState = { ...current, ...update, timestamp: Date.now() };
this.stateMap.set(mediaId, newState);
// 广播给所有订阅者
this.subscribers.get(mediaId)?.forEach(listener => listener(newState));
}
}
—
3步实现状态共享优化
步骤一:定义标准化的状态接口
统一的状态结构是共享的基础。OpenClaw 为 WhatsApp 媒体发送定义了五阶段状态机:
// 完整的状态类型定义
type SendStage =
| 'preparing' // 文件预处理(压缩、格式转换)
| 'uploading' // 上传至 WhatsApp 服务器
| 'sending' // 发送给目标用户
| 'completed' // 成功送达
| 'failed'; // 发送失败,包含错误详情
interface SharedMediaState {
readonly mediaId: string; // 唯一标识
readonly stage: SendStage;
readonly progress: number; // 各阶段的细分进度
readonly retryCount: number; // 当前重试次数
readonly maxRetries: number; // 最大重试次数
readonly errorCode?: string; // 标准化错误码
readonly createdAt: number;
readonly updatedAt: number;
}
步骤二:实现发布-订阅模式的状态管理器
// OpenClaw 核心实现:MediaStateHub.js
class MediaStateHub {
constructor(eventBus) {
this.eventBus = eventBus; // 与 OpenClaw 事件总线集成
this.states = new Map();
}
// 创建新的媒体发送任务
createTask(mediaId, initialData) {
const state = {
mediaId,
stage: 'preparing',
progress: 0,
retryCount: 0,
maxRetries: 3,
...initialData,
createdAt: Date.now(),
updatedAt: Date.now()
};
this.states.set(mediaId, state);
this.broadcast(mediaId, state);
return state;
}
// 原子化状态更新
transition(mediaId, stage, updates = {}) {
const current = this.states.get(mediaId);
if (!current) throw new Error(Media ${mediaId} not found);
// 验证状态转换是否合法
if (!this.isValidTransition(current.stage, stage)) {
console.warn(Invalid transition: ${current.stage} -> ${stage});
return current;
}
const newState = {
...current,
stage,
...updates,
updatedAt: Date.now()
};
this.states.set(mediaId, newState);
this.broadcast(mediaId, newState);
return newState;
}
// 订阅状态变化(支持筛选特定阶段)
on(mediaId, options = {}) {
const { stages, once } = options;
return this.eventBus.subscribe(media:${mediaId}, (state) => {
if (stages && !stages.includes(state.stage)) return;
if (once) this.off(mediaId);
return state;
});
}
broadcast(mediaId, state) {
this.eventBus.emit(media:${mediaId}, state);
this.eventBus.emit('media:all', { mediaId, ...state }); // 全局广播
}
}
步骤三:在 AI Agent 工作流中集成
// 实际使用示例:AI Agent 发送图片并实时反馈进度
async function sendImageWithAgent(agent, userId, imageBuffer) {
const mediaId = generateUUID();
const stateHub = agent.whatsapp.stateHub;
// 1. 创建任务,UI 立即显示"准备中"
stateHub.createTask(mediaId, {
type: 'image',
targetUser: userId,
fileSize: imageBuffer.length
});
// 2. 订阅状态变化,实时更新用户界面
const unsubscribe = stateHub.on(mediaId, {
stages: ['uploading', 'sending', 'completed', 'failed']
}, (state) => {
agent.ui.updateMessageStatus(mediaId, {
text: getStatusText(state.stage),
progress: state.progress,
error: state.errorCode
});
});
try {
// 3. 执行发送,状态自动流转
const processed = await agent.media.process(imageBuffer, {
onProgress: (p) => stateHub.transition(mediaId, 'preparing', { progress: p * 0.2 })
});
const uploadResult = await agent.whatsapp.upload(processed, {
onProgress: (p) => stateHub.transition(mediaId, 'uploading', { progress: 20 + p * 0.5 })
});
await agent.whatsapp.send(userId, uploadResult, {
onProgress: (p) => stateHub.transition(mediaId, 'sending', { progress: 70 + p * 0.3 })
});
// 4. 完成
stateHub.transition(mediaId, 'completed', { progress: 100 });
} catch (error) {
const current = stateHub.get(mediaId);
if (current.retryCount < current.maxRetries) {
// 自动重试,状态显示"重试中"
stateHub.transition(mediaId, 'preparing', {
retryCount: current.retryCount + 1,
progress: 0
});
return sendImageWithAgent(agent, userId, imageBuffer); // 递归重试
} else {
stateHub.transition(mediaId, 'failed', {
errorCode: error.code,
errorMessage: error.message
});
}
} finally {
unsubscribe(); // 清理订阅
}
}
---
性能优化与最佳实践
内存管理:自动清理已完成任务
// 配置自动清理策略
const stateHub = new MediaStateHub(eventBus, {
cleanupPolicy: {
completedAfter: 5 60 1000, // 成功任务保留5分钟
failedAfter: 30 60 1000, // 失败任务保留30分钟(便于调试)
maxActiveTasks: 1000 // 限制并发任务数
}
});
调试支持:状态历史追踪
// 启用状态历史记录(开发环境)
stateHub.enableHistory(mediaId, {
maxEntries: 50,
includeStackTrace: true // 记录每次状态变更的调用栈
});
// 查看完整状态流转
console.log(stateHub.getHistory(mediaId));
// 输出: [{ stage: 'preparing', at: 1699..., stack: ... }, { stage: 'uploading', ... }]
---
常见问题解答 (FAQ)
Q1: 这次重构会影响现有 OpenClaw 项目的兼容性吗?
不会。 本次重构是内部实现优化,对外 API 保持向后兼容。现有使用 whatsapp.sendMedia() 的代码无需修改即可正常工作。如需使用新功能,可通过配置项显式启用:
const agent = new OpenClawAgent({
whatsapp: {
enableSharedState: true // 启用状态共享(默认关闭,下版本将默认开启)
}
});
Q2: 状态共享在多实例部署时如何保持一致?
OpenClaw 的 MediaStateHub 设计为与底层存储解耦。在分布式部署场景中,可通过实现 StateStorageAdapter 接口接入 Redis 等共享存储:
import { RedisStateAdapter } from '@openclaw/adapters';
const stateHub = new MediaStateHub(eventBus, {
storage: new RedisStateAdapter(redisClient, {
keyPrefix: 'openclaw:media:',
ttl: 3600
})
});
Q3: 如何处理 WhatsApp 的速率限制(Rate Limiting)?
共享状态机制天然支持全局速率控制。通过订阅 media:all 事件,可在状态管理器层面实现统一的请求队列:
stateHub.on('media:all', ({ stage }) => {
if (stage === 'uploading') {
rateLimiter.acquire('whatsapp-upload').then(() => {
// 获得配额后才允许进入上传阶段
});
}
});
Q4: 媒体发送失败后的重试策略可以自定义吗?
可以。通过 createTask 时的配置或全局默认值进行设置:
// 单任务配置
stateHub.createTask(mediaId, {
maxRetries: 5,
retryDelay: (attempt) => Math.pow(2, attempt) * 1000, // 指数退避
retryableErrors: ['NETWORK_ERROR', 'TIMEOUT'] // 仅特定错误触发重试
});
Q5: 如何监控生产环境中的媒体发送成功率?
OpenClaw 提供了内置的指标收集接口,可对接 Prometheus 等监控系统:
// 暴露关键指标
stateHub.on('media:all', (state) => {
if (state.stage === 'completed') {
metrics.increment('whatsapp_media_sent_total', { type: state.type });
}
if (state.stage === 'failed') {
metrics.increment('whatsapp_media_failed_total', {
type: state.type,
error: state.errorCode
});
}
});
---
总结与下一步
本次重构通过集中式状态管理解决了 WhatsApp 媒体发送中的状态同步难题,为 OpenClaw 的 AI Agent 提供了更可靠的消息处理能力。关键改进包括:
| 方面 | 改进效果 |
|:---|:---|
| 状态一致性 | 消除多模块间的状态漂移 |
| 可观测性 | 实时追踪每个媒体的全生命周期 |
| 可维护性 | 统一的状态机降低代码复杂度 |
| 扩展性 | 支持分布式部署和自定义存储后端 |
建议下一步行动:
1. 升级至包含本次更新的 OpenClaw 版本
2. 在开发环境启用 enableSharedState 测试现有功能
3. 参考 OpenClaw 文档 配置适合您场景的存储适配器
---
相关阅读
---