fix(chat): prevent aborted runs from reactivating conversation state (#916)
This commit is contained in:
@@ -82,6 +82,7 @@ export function Chat() {
|
|||||||
const streamingMessage = useChatStore((s) => s.streamingMessage);
|
const streamingMessage = useChatStore((s) => s.streamingMessage);
|
||||||
const streamingTools = useChatStore((s) => s.streamingTools);
|
const streamingTools = useChatStore((s) => s.streamingTools);
|
||||||
const pendingFinal = useChatStore((s) => s.pendingFinal);
|
const pendingFinal = useChatStore((s) => s.pendingFinal);
|
||||||
|
const activeRunId = useChatStore((s) => s.activeRunId);
|
||||||
const sendMessage = useChatStore((s) => s.sendMessage);
|
const sendMessage = useChatStore((s) => s.sendMessage);
|
||||||
const abortRun = useChatStore((s) => s.abortRun);
|
const abortRun = useChatStore((s) => s.abortRun);
|
||||||
const clearError = useChatStore((s) => s.clearError);
|
const clearError = useChatStore((s) => s.clearError);
|
||||||
@@ -280,8 +281,12 @@ export function Chat() {
|
|||||||
);
|
);
|
||||||
});
|
});
|
||||||
const runStillExecutingTools = hasToolActivity && !hasFinalReply;
|
const runStillExecutingTools = hasToolActivity && !hasFinalReply;
|
||||||
|
// runStillExecutingTools bridges the brief gap between tool rounds when
|
||||||
|
// Gateway temporarily clears sending. However, after an explicit abort
|
||||||
|
// (which clears activeRunId), we must NOT keep the run "open" — so we
|
||||||
|
// gate it on activeRunId being present.
|
||||||
const isLatestOpenRun = nextUserIndex === -1
|
const isLatestOpenRun = nextUserIndex === -1
|
||||||
&& (sending || pendingFinal || hasAnyStreamContent || runStillExecutingTools);
|
&& (sending || pendingFinal || hasAnyStreamContent || (runStillExecutingTools && !!activeRunId));
|
||||||
const replyIndexOffset = findReplyMessageIndex(segmentMessages, isLatestOpenRun);
|
const replyIndexOffset = findReplyMessageIndex(segmentMessages, isLatestOpenRun);
|
||||||
const replyIndex = replyIndexOffset === -1 ? null : idx + 1 + replyIndexOffset;
|
const replyIndex = replyIndexOffset === -1 ? null : idx + 1 + replyIndexOffset;
|
||||||
|
|
||||||
|
|||||||
@@ -23,6 +23,12 @@ let _historyPollTimer: ReturnType<typeof setTimeout> | null = null;
|
|||||||
// before committing the error to give the recovery path a chance.
|
// before committing the error to give the recovery path a chance.
|
||||||
let _errorRecoveryTimer: ReturnType<typeof setTimeout> | null = null;
|
let _errorRecoveryTimer: ReturnType<typeof setTimeout> | null = null;
|
||||||
|
|
||||||
|
// Track the last run ID that was explicitly aborted by the user.
|
||||||
|
// Prevents lingering Gateway events from the aborted run from re-arming
|
||||||
|
// the sending state after abortRun clears it.
|
||||||
|
let _lastAbortedRunId: string | null = null;
|
||||||
|
const _blockedRunEvents = new Map<string, Record<string, unknown>[]>();
|
||||||
|
|
||||||
function clearErrorRecoveryTimer(): void {
|
function clearErrorRecoveryTimer(): void {
|
||||||
if (_errorRecoveryTimer) {
|
if (_errorRecoveryTimer) {
|
||||||
clearTimeout(_errorRecoveryTimer);
|
clearTimeout(_errorRecoveryTimer);
|
||||||
@@ -1020,6 +1026,27 @@ function getLastChatEventAt(): number {
|
|||||||
return _lastChatEventAt;
|
return _lastChatEventAt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function setLastAbortedRunId(id: string | null): void {
|
||||||
|
_lastAbortedRunId = id;
|
||||||
|
}
|
||||||
|
|
||||||
|
function getLastAbortedRunId(): string | null {
|
||||||
|
return _lastAbortedRunId;
|
||||||
|
}
|
||||||
|
|
||||||
|
function queueBlockedRunEvent(runId: string, event: Record<string, unknown>): void {
|
||||||
|
const events = _blockedRunEvents.get(runId) ?? [];
|
||||||
|
events.push({ ...event });
|
||||||
|
if (events.length > 100) events.shift();
|
||||||
|
_blockedRunEvents.set(runId, events);
|
||||||
|
}
|
||||||
|
|
||||||
|
function takeBlockedRunEvents(runId: string): Record<string, unknown>[] {
|
||||||
|
const events = _blockedRunEvents.get(runId) ?? [];
|
||||||
|
_blockedRunEvents.delete(runId);
|
||||||
|
return events;
|
||||||
|
}
|
||||||
|
|
||||||
export {
|
export {
|
||||||
toMs,
|
toMs,
|
||||||
clearErrorRecoveryTimer,
|
clearErrorRecoveryTimer,
|
||||||
@@ -1050,4 +1077,8 @@ export {
|
|||||||
setErrorRecoveryTimer,
|
setErrorRecoveryTimer,
|
||||||
setLastChatEventAt,
|
setLastChatEventAt,
|
||||||
getLastChatEventAt,
|
getLastChatEventAt,
|
||||||
|
setLastAbortedRunId,
|
||||||
|
getLastAbortedRunId,
|
||||||
|
queueBlockedRunEvent,
|
||||||
|
takeBlockedRunEvents,
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import { clearHistoryPoll, setLastChatEventAt } from './helpers';
|
import { clearHistoryPoll, getLastAbortedRunId, queueBlockedRunEvent, setLastAbortedRunId, setLastChatEventAt } from './helpers';
|
||||||
import type { ChatGet, ChatSet, RuntimeActions } from './store-api';
|
import type { ChatGet, ChatSet, RuntimeActions } from './store-api';
|
||||||
import { handleRuntimeEventState } from './runtime-event-handlers';
|
import { handleRuntimeEventState } from './runtime-event-handlers';
|
||||||
|
|
||||||
@@ -16,6 +16,28 @@ export function createRuntimeEventActions(set: ChatSet, get: ChatGet): Pick<Runt
|
|||||||
// Only process events for the active run (or if no active run set)
|
// Only process events for the active run (or if no active run set)
|
||||||
if (activeRunId && runId && runId !== activeRunId) return;
|
if (activeRunId && runId && runId !== activeRunId) return;
|
||||||
|
|
||||||
|
// Reject lingering events from a run that the user explicitly aborted.
|
||||||
|
// The 'aborted' confirmation event is allowed through to finalize state.
|
||||||
|
// '*' is a wildcard meaning "abort was requested before we knew the runId".
|
||||||
|
const lastAbortedRunId = getLastAbortedRunId();
|
||||||
|
if (lastAbortedRunId && runId && (lastAbortedRunId === '*' || runId === lastAbortedRunId)) {
|
||||||
|
if (eventState === 'aborted' && lastAbortedRunId === '*') {
|
||||||
|
// Gateway confirmed which run was aborted. Narrow the wildcard so
|
||||||
|
// later unrelated runs can be adopted while this run stays blocked.
|
||||||
|
setLastAbortedRunId(runId);
|
||||||
|
}
|
||||||
|
// Let the 'aborted' event fall through to handleRuntimeEventState
|
||||||
|
// which properly clears all state. Other wildcard-blocked events may
|
||||||
|
// belong to a newer send whose runId has not returned yet, so keep a
|
||||||
|
// bounded queue and replay only if that runId becomes the active run.
|
||||||
|
if (eventState !== 'aborted') {
|
||||||
|
if (lastAbortedRunId === '*' && !activeRunId && get().sending) {
|
||||||
|
queueBlockedRunEvent(runId, event);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
setLastChatEventAt(Date.now());
|
setLastChatEventAt(Date.now());
|
||||||
|
|
||||||
// Defensive: if state is missing but we have a message, try to infer state.
|
// Defensive: if state is missing but we have a message, try to infer state.
|
||||||
|
|||||||
@@ -3,9 +3,12 @@ import { useAgentsStore } from '@/stores/agents';
|
|||||||
import {
|
import {
|
||||||
clearErrorRecoveryTimer,
|
clearErrorRecoveryTimer,
|
||||||
clearHistoryPoll,
|
clearHistoryPoll,
|
||||||
|
getLastAbortedRunId,
|
||||||
getLastChatEventAt,
|
getLastChatEventAt,
|
||||||
setHistoryPollTimer,
|
setHistoryPollTimer,
|
||||||
setLastChatEventAt,
|
setLastChatEventAt,
|
||||||
|
setLastAbortedRunId,
|
||||||
|
takeBlockedRunEvents,
|
||||||
upsertImageCacheEntry,
|
upsertImageCacheEntry,
|
||||||
} from './helpers';
|
} from './helpers';
|
||||||
import type { ChatSession, RawMessage } from './types';
|
import type { ChatSession, RawMessage } from './types';
|
||||||
@@ -39,6 +42,8 @@ function ensureSessionEntry(sessions: ChatSession[], sessionKey: string): ChatSe
|
|||||||
return [...sessions, { key: sessionKey, displayName: sessionKey }];
|
return [...sessions, { key: sessionKey, displayName: sessionKey }];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let sendGeneration = 0;
|
||||||
|
|
||||||
export function createRuntimeSendActions(set: ChatSet, get: ChatGet): Pick<RuntimeActions, 'sendMessage' | 'abortRun'> {
|
export function createRuntimeSendActions(set: ChatSet, get: ChatGet): Pick<RuntimeActions, 'sendMessage' | 'abortRun'> {
|
||||||
return {
|
return {
|
||||||
sendMessage: async (
|
sendMessage: async (
|
||||||
@@ -48,6 +53,7 @@ export function createRuntimeSendActions(set: ChatSet, get: ChatGet): Pick<Runti
|
|||||||
) => {
|
) => {
|
||||||
const trimmed = text.trim();
|
const trimmed = text.trim();
|
||||||
if (!trimmed && (!attachments || attachments.length === 0)) return;
|
if (!trimmed && (!attachments || attachments.length === 0)) return;
|
||||||
|
const currentSendGeneration = ++sendGeneration;
|
||||||
|
|
||||||
const targetSessionKey = resolveMainSessionKeyForAgent(targetAgentId) ?? get().currentSessionKey;
|
const targetSessionKey = resolveMainSessionKeyForAgent(targetAgentId) ?? get().currentSessionKey;
|
||||||
if (targetSessionKey !== get().currentSessionKey) {
|
if (targetSessionKey !== get().currentSessionKey) {
|
||||||
@@ -220,13 +226,43 @@ export function createRuntimeSendActions(set: ChatSet, get: ChatGet): Pick<Runti
|
|||||||
|
|
||||||
console.log(`[sendMessage] RPC result: success=${result.success}, runId=${result.result?.runId || 'none'}`);
|
console.log(`[sendMessage] RPC result: success=${result.success}, runId=${result.result?.runId || 'none'}`);
|
||||||
|
|
||||||
|
const returnedRunId = result.result?.runId;
|
||||||
|
if (returnedRunId && currentSendGeneration !== sendGeneration) {
|
||||||
|
// This send was stopped or superseded while the RPC was in flight.
|
||||||
|
// If the stop happened before activeRunId was known, narrow the
|
||||||
|
// wildcard abort marker to the concrete runId we just learned.
|
||||||
|
// Keep '*' while a newer send is still pending its runId so early
|
||||||
|
// events from that newer run cannot re-arm the UI before ownership
|
||||||
|
// is established.
|
||||||
|
const lastAbortedRunId = getLastAbortedRunId();
|
||||||
|
if (!get().sending && (!lastAbortedRunId || lastAbortedRunId === '*' || lastAbortedRunId === returnedRunId)) {
|
||||||
|
setLastAbortedRunId(returnedRunId);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (currentSendGeneration !== sendGeneration) return;
|
||||||
|
|
||||||
if (!result.success) {
|
if (!result.success) {
|
||||||
clearHistoryPoll();
|
clearHistoryPoll();
|
||||||
set({ error: result.error || 'Failed to send message', sending: false });
|
set({ error: result.error || 'Failed to send message', sending: false });
|
||||||
} else if (result.result?.runId) {
|
} else if (returnedRunId && get().sending) {
|
||||||
set({ activeRunId: result.result.runId });
|
set({ activeRunId: returnedRunId });
|
||||||
|
// Now that we have a valid activeRunId for the new run, the
|
||||||
|
// activeRunId guard will filter stale events from the old run.
|
||||||
|
// Safe to clear the abort marker.
|
||||||
|
setLastAbortedRunId(null);
|
||||||
|
const blockedEvents = takeBlockedRunEvents(returnedRunId);
|
||||||
|
if (blockedEvents.length > 0) {
|
||||||
|
queueMicrotask(() => {
|
||||||
|
for (const blockedEvent of blockedEvents) {
|
||||||
|
get().handleChatEvent(blockedEvent);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
|
if (currentSendGeneration !== sendGeneration) return;
|
||||||
clearHistoryPoll();
|
clearHistoryPoll();
|
||||||
set({ error: String(err), sending: false });
|
set({ error: String(err), sending: false });
|
||||||
}
|
}
|
||||||
@@ -235,10 +271,16 @@ export function createRuntimeSendActions(set: ChatSet, get: ChatGet): Pick<Runti
|
|||||||
// ── Abort active run ──
|
// ── Abort active run ──
|
||||||
|
|
||||||
abortRun: async () => {
|
abortRun: async () => {
|
||||||
|
sendGeneration += 1;
|
||||||
clearHistoryPoll();
|
clearHistoryPoll();
|
||||||
clearErrorRecoveryTimer();
|
clearErrorRecoveryTimer();
|
||||||
const { currentSessionKey } = get();
|
const { currentSessionKey, activeRunId } = get();
|
||||||
set({ sending: false, streamingText: '', streamingMessage: null, pendingFinal: false, lastUserMessageAt: null, pendingToolImages: [] });
|
// Mark the run as aborted BEFORE clearing state, so the event handler
|
||||||
|
// rejects any lingering Gateway events from this run. Use wildcard '*'
|
||||||
|
// when activeRunId is not yet known (user stopped before chat.send
|
||||||
|
// returned a runId) to block ALL run events from re-arming sending.
|
||||||
|
setLastAbortedRunId(activeRunId || '*');
|
||||||
|
set({ sending: false, activeRunId: null, streamingText: '', streamingMessage: null, pendingFinal: false, lastUserMessageAt: null, pendingToolImages: [] });
|
||||||
set({ streamingTools: [] });
|
set({ streamingTools: [] });
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@@ -250,6 +292,9 @@ export function createRuntimeSendActions(set: ChatSet, get: ChatGet): Pick<Runti
|
|||||||
} catch (err) {
|
} catch (err) {
|
||||||
set({ error: String(err) });
|
set({ error: String(err) });
|
||||||
}
|
}
|
||||||
|
// Reload history to pick up final transcript state from Gateway,
|
||||||
|
// which resolves hasFinalReply and clears hasActiveExecutionGraph.
|
||||||
|
void get().loadHistory(true);
|
||||||
},
|
},
|
||||||
|
|
||||||
// ── Handle incoming chat events from Gateway ──
|
// ── Handle incoming chat events from Gateway ──
|
||||||
|
|||||||
Reference in New Issue
Block a user