refactor(gateway): remove phase completion timer logic and update run completion handling
Eliminate the phase completion timer and its associated logic from the Gateway. The handling of run completion is now solely based on Gateway phase events and streaming final events. This change simplifies the code and ensures that the state transitions are more reliable, as run completion is no longer inferred from the timer. Additionally, update the runtime send actions to finalize the sending state immediately after the chat.send RPC completes, ensuring accurate state management during agent conversations.
This commit is contained in:
@@ -223,8 +223,19 @@ export function createRuntimeSendActions(set: ChatSet, get: ChatGet): Pick<Runti
|
|||||||
if (!result.success) {
|
if (!result.success) {
|
||||||
clearHistoryPoll();
|
clearHistoryPoll();
|
||||||
set({ error: result.error || 'Failed to send message', sending: false });
|
set({ error: result.error || 'Failed to send message', sending: false });
|
||||||
} else if (result.result?.runId) {
|
} else {
|
||||||
set({ activeRunId: result.result.runId });
|
if (result.result?.runId) {
|
||||||
|
set({ activeRunId: result.result.runId });
|
||||||
|
}
|
||||||
|
// The chat.send RPC blocks until the entire agent conversation
|
||||||
|
// finishes. If sending is still true (streaming events haven't
|
||||||
|
// finalized it yet), finalize now — this is the authoritative
|
||||||
|
// signal that the run is complete.
|
||||||
|
if (get().sending) {
|
||||||
|
clearHistoryPoll();
|
||||||
|
set({ sending: false, activeRunId: null, pendingFinal: false, lastUserMessageAt: null });
|
||||||
|
get().loadHistory(true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
clearHistoryPoll();
|
clearHistoryPoll();
|
||||||
|
|||||||
@@ -19,20 +19,6 @@ let lastLoadSessionsAt = 0;
|
|||||||
let lastLoadHistoryAt = 0;
|
let lastLoadHistoryAt = 0;
|
||||||
let cronRepairTriggeredThisSession = false;
|
let cronRepairTriggeredThisSession = false;
|
||||||
|
|
||||||
// Grace timer for agent phase completion. The Gateway sends phase "end"
|
|
||||||
// after each tool-execution round, not just at the end of the entire run.
|
|
||||||
// We delay `sending=false` to give the next sub-run's "started" event
|
|
||||||
// time to arrive. If a new streaming event or started phase cancels the
|
|
||||||
// timer, sending stays true seamlessly.
|
|
||||||
let _phaseCompletionTimer: ReturnType<typeof setTimeout> | null = null;
|
|
||||||
const PHASE_COMPLETION_GRACE_MS = 5_000;
|
|
||||||
function clearPhaseCompletionTimer(): void {
|
|
||||||
if (_phaseCompletionTimer) {
|
|
||||||
clearTimeout(_phaseCompletionTimer);
|
|
||||||
_phaseCompletionTimer = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
interface GatewayHealth {
|
interface GatewayHealth {
|
||||||
ok: boolean;
|
ok: boolean;
|
||||||
error?: string;
|
error?: string;
|
||||||
@@ -144,7 +130,6 @@ function handleGatewayNotification(notification: { method?: string; params?: Rec
|
|||||||
if (hasChatData) {
|
if (hasChatData) {
|
||||||
// Any streaming data cancels the phase-completion grace timer — the
|
// Any streaming data cancels the phase-completion grace timer — the
|
||||||
// run is still producing output (or a new sub-run has started).
|
// run is still producing output (or a new sub-run has started).
|
||||||
clearPhaseCompletionTimer();
|
|
||||||
const normalizedEvent: Record<string, unknown> = {
|
const normalizedEvent: Record<string, unknown> = {
|
||||||
...data,
|
...data,
|
||||||
runId: p.runId ?? data.runId,
|
runId: p.runId ?? data.runId,
|
||||||
@@ -166,7 +151,6 @@ function handleGatewayNotification(notification: { method?: string; params?: Rec
|
|||||||
const runId = p.runId ?? data.runId;
|
const runId = p.runId ?? data.runId;
|
||||||
const sessionKey = p.sessionKey ?? data.sessionKey;
|
const sessionKey = p.sessionKey ?? data.sessionKey;
|
||||||
if (phase === 'started' && runId != null && sessionKey != null) {
|
if (phase === 'started' && runId != null && sessionKey != null) {
|
||||||
clearPhaseCompletionTimer();
|
|
||||||
import('./chat')
|
import('./chat')
|
||||||
.then(({ useChatStore }) => {
|
.then(({ useChatStore }) => {
|
||||||
const state = useChatStore.getState();
|
const state = useChatStore.getState();
|
||||||
@@ -206,29 +190,11 @@ function handleGatewayNotification(notification: { method?: string; params?: Rec
|
|||||||
if (matchesCurrentSession || matchesActiveRun) {
|
if (matchesCurrentSession || matchesActiveRun) {
|
||||||
maybeLoadHistory(state);
|
maybeLoadHistory(state);
|
||||||
}
|
}
|
||||||
if ((matchesCurrentSession || matchesActiveRun) && state.sending) {
|
// Note: we do NOT set sending=false here. The Gateway sends
|
||||||
// The Gateway sends phase "end" after each tool-execution round,
|
// phase "end" after each tool-execution round (sub-run), not only
|
||||||
// not only when the entire conversation finishes. Delay the
|
// when the entire conversation finishes. Run completion is
|
||||||
// sending=false so a subsequent sub-run's "started" event or
|
// determined by the chat.send RPC returning (runtime-send-actions)
|
||||||
// streaming delta can cancel it and keep the UI in the active state.
|
// or a streaming "final" event with output (runtime-event-handlers).
|
||||||
clearPhaseCompletionTimer();
|
|
||||||
_phaseCompletionTimer = setTimeout(() => {
|
|
||||||
_phaseCompletionTimer = null;
|
|
||||||
const current = useChatStore.getState();
|
|
||||||
// Only finalize if still in the same run and no new streaming data arrived.
|
|
||||||
if (current.sending && !current.streamingMessage) {
|
|
||||||
useChatStore.setState({
|
|
||||||
sending: false,
|
|
||||||
activeRunId: null,
|
|
||||||
pendingFinal: false,
|
|
||||||
lastUserMessageAt: null,
|
|
||||||
error: null,
|
|
||||||
});
|
|
||||||
// Reload history to get the final state.
|
|
||||||
maybeLoadHistory(current);
|
|
||||||
}
|
|
||||||
}, PHASE_COMPLETION_GRACE_MS);
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
.catch(() => {});
|
.catch(() => {});
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user