refactor(chat): execution graph optimize (#873)

Co-authored-by: Haze <hazeone@users.noreply.github.com>
This commit is contained in:
Haze
2026-04-19 19:36:33 +08:00
committed by GitHub
parent 2f03aa1fad
commit 1b2dccee6e
24 changed files with 1444 additions and 536 deletions

View File

@@ -1,6 +1,6 @@
/**
* Chat State Store
* Manages chat messages, sessions, streaming, and thinking state.
* Manages chat messages, sessions, and streaming state.
* Communicates with OpenClaw Gateway via renderer WebSocket RPC.
*/
import { create } from 'zustand';
@@ -93,6 +93,13 @@ function buildChatEventDedupeKey(eventState: string, event: Record<string, unkno
const runId = event.runId != null ? String(event.runId) : '';
const sessionKey = event.sessionKey != null ? String(event.sessionKey) : '';
const seq = event.seq != null ? String(event.seq) : '';
// Some gateways emit multiple `delta` updates without a monotonically
// increasing `seq`. Deduping those by just `runId + sessionKey + state`
// collapses legitimate stream progression, so only seq-backed deltas are
// safe to dedupe generically.
if (eventState === 'delta' && !seq) {
return null;
}
if (runId || sessionKey || seq || eventState) {
return [runId, sessionKey, seq, eventState].join('|');
}
@@ -204,45 +211,7 @@ function compactProgressiveTextParts(parts: string[]): string[] {
}
function normalizeLiveContentBlocks(content: ContentBlock[]): ContentBlock[] {
const normalized: ContentBlock[] = [];
let textBuffer: string[] = [];
let thinkingBuffer: string[] = [];
const flushTextBuffer = () => {
for (const part of compactProgressiveTextParts(textBuffer)) {
normalized.push({ type: 'text', text: part });
}
textBuffer = [];
};
const flushThinkingBuffer = () => {
for (const part of compactProgressiveTextParts(thinkingBuffer)) {
normalized.push({ type: 'thinking', thinking: part });
}
thinkingBuffer = [];
};
for (const block of content) {
if (block.type === 'text' && block.text) {
textBuffer.push(block.text);
continue;
}
if (block.type === 'thinking' && block.thinking) {
thinkingBuffer.push(block.thinking);
continue;
}
flushTextBuffer();
flushThinkingBuffer();
normalized.push(block);
}
flushTextBuffer();
flushThinkingBuffer();
return normalized;
return content.map((block) => ({ ...block }));
}
function normalizeStreamingMessage(message: unknown): unknown {
@@ -1199,7 +1168,6 @@ export const useChatStore = create<ChatState>((set, get) => ({
sessionLabels: {},
sessionLastActivity: {},
showThinking: true,
thinkingLevel: null,
// ── Load sessions via sessions.list ──
@@ -2269,10 +2237,6 @@ export const useChatStore = create<ChatState>((set, get) => ({
}
},
// ── Toggle thinking visibility ──
toggleThinking: () => set((s) => ({ showThinking: !s.showThinking })),
// ── Refresh: reload history + sessions ──
refresh: async () => {

View File

@@ -109,44 +109,7 @@ function compactProgressiveTextParts(parts: string[]): string[] {
}
function normalizeLiveContentBlocks(content: ContentBlock[]): ContentBlock[] {
const normalized: ContentBlock[] = [];
let textBuffer: string[] = [];
let thinkingBuffer: string[] = [];
const flushTextBuffer = () => {
for (const part of compactProgressiveTextParts(textBuffer)) {
normalized.push({ type: 'text', text: part });
}
textBuffer = [];
};
const flushThinkingBuffer = () => {
for (const part of compactProgressiveTextParts(thinkingBuffer)) {
normalized.push({ type: 'thinking', thinking: part });
}
thinkingBuffer = [];
};
for (const block of content) {
if (block.type === 'text' && block.text) {
textBuffer.push(block.text);
continue;
}
if (block.type === 'thinking' && block.thinking) {
thinkingBuffer.push(block.thinking);
continue;
}
flushTextBuffer();
flushThinkingBuffer();
normalized.push(block);
}
flushTextBuffer();
flushThinkingBuffer();
return normalized;
return content.map((block) => ({ ...block }));
}
function normalizeStreamingMessage(message: unknown): unknown {

View File

@@ -21,7 +21,6 @@ export const initialChatState: Pick<
| 'currentAgentId'
| 'sessionLabels'
| 'sessionLastActivity'
| 'showThinking'
| 'thinkingLevel'
> = {
messages: [],
@@ -43,7 +42,6 @@ export const initialChatState: Pick<
sessionLabels: {},
sessionLastActivity: {},
showThinking: true,
thinkingLevel: null,
};
@@ -61,7 +59,6 @@ export function createChatActions(
| 'sendMessage'
| 'abortRun'
| 'handleChatEvent'
| 'toggleThinking'
| 'refresh'
| 'clearError'
> {

View File

@@ -1,9 +1,7 @@
import type { ChatGet, ChatSet, RuntimeActions } from './store-api';
export function createRuntimeUiActions(set: ChatSet, get: ChatGet): Pick<RuntimeActions, 'toggleThinking' | 'refresh' | 'clearError'> {
export function createRuntimeUiActions(set: ChatSet, get: ChatGet): Pick<RuntimeActions, 'refresh' | 'clearError'> {
return {
toggleThinking: () => set((s) => ({ showThinking: !s.showThinking })),
// ── Refresh: reload history + sessions ──
refresh: async () => {

View File

@@ -14,5 +14,5 @@ export type SessionHistoryActions = Pick<
export type RuntimeActions = Pick<
ChatState,
'sendMessage' | 'abortRun' | 'handleChatEvent' | 'toggleThinking' | 'refresh' | 'clearError'
'sendMessage' | 'abortRun' | 'handleChatEvent' | 'refresh' | 'clearError'
>;

View File

@@ -85,7 +85,6 @@ export interface ChatState {
sessionLastActivity: Record<string, number>;
// Thinking
showThinking: boolean;
thinkingLevel: string | null;
// Actions
@@ -108,7 +107,6 @@ export interface ChatState {
) => Promise<void>;
abortRun: () => Promise<void>;
handleChatEvent: (event: Record<string, unknown>) => void;
toggleThinking: () => void;
refresh: () => Promise<void>;
clearError: () => void;
}