feat: queue messages during streaming with auto-send, two-stage stop, and editable pending chips in input area

This commit is contained in:
statxc
2026-03-27 03:15:19 +00:00
parent 55f8d67297
commit 19c8dcc76a
7 changed files with 459 additions and 29 deletions

View File

@@ -32,6 +32,9 @@ import {
IconBrandChrome,
IconUser,
} from '@tabler/icons-react'
import { generateId } from 'ai'
import { useMessageQueue } from '@/stores/message-queue-store'
import { QueuedMessageChip } from '@/containers/QueuedMessageBubble'
import { BotIcon } from 'lucide-react'
import { useTranslation } from '@/i18n/react-i18next-compat'
import { useGeneralSetting } from '@/hooks/useGeneralSetting'
@@ -271,6 +274,19 @@ const ChatInput = memo(function ChatInput({
)
const ingestingAny = attachments.some((a) => a.processing)
// Queued messages for this thread (shown as chips in the input area)
const queuedMessages = useMessageQueue(
useShallow((s) => s.getQueue(currentThreadId ?? ''))
)
const queueLength = queuedMessages.length
const removeQueuedMessage = useCallback(
(id: string) => {
useMessageQueue.getState().removeMessage(currentThreadId ?? '', id)
},
[currentThreadId]
)
const lastTransferredThreadId = useRef<string | null>(null)
useEffect(() => {
@@ -375,6 +391,17 @@ const ChatInput = memo(function ChatInput({
// Use onSubmit prop if available (AI SDK), otherwise create thread and navigate
if (onSubmit) {
// When the model is still streaming, queue the message for later
if (isStreaming && currentThreadId) {
useMessageQueue.getState().enqueue(currentThreadId, {
id: generateId(),
text: prompt,
createdAt: Date.now(),
})
setPrompt('')
return
}
const assistant = currentThread?.assistants?.[0]
setCurrentAssistant(assistant)
// Build file parts for AI SDK
@@ -1513,8 +1540,7 @@ const ChatInput = memo(function ChatInput({
<div className="relative">
<div
className={cn(
'relative overflow-hidden p-0.5 rounded-3xl',
isStreaming && 'opacity-70'
'relative overflow-hidden p-0.5 rounded-3xl'
)}
>
{isStreaming && (
@@ -1621,6 +1647,23 @@ const ChatInput = memo(function ChatInput({
</div>
</div>
)}
{queuedMessages.length > 0 && (
<div className="flex flex-col gap-1 px-3 pt-2 pb-0">
{queuedMessages.map((msg) => (
<QueuedMessageChip
key={msg.id}
message={msg}
onEdit={(queued) => {
// Put the text back in the input for editing, remove from queue
setPrompt(queued.text)
removeQueuedMessage(queued.id)
textareaRef.current?.focus()
}}
onRemove={removeQueuedMessage}
/>
))}
</div>
)}
<TextareaAutosize
dir="auto"
ref={textareaRef}
@@ -1641,11 +1684,9 @@ const ChatInput = memo(function ChatInput({
e.nativeEvent.isComposing || e.keyCode === 229
if (e.key === 'Enter' && !e.shiftKey && !isComposing) {
e.preventDefault()
// Submit prompt when the following conditions are met:
// - Enter is pressed without Shift
// - The streaming content has finished
// - Prompt is not empty
if (!isStreaming && prompt.trim() && !ingestingAny) {
// Submit prompt when Enter is pressed without Shift and prompt is not empty.
// If streaming, handleSendMessage will queue the message automatically.
if (prompt.trim() && !ingestingAny) {
handleSendMessage(prompt)
}
// When Shift+Enter is pressed, a new line is added (default behavior)
@@ -2027,29 +2068,42 @@ const ChatInput = memo(function ChatInput({
</div>
)}
{isStreaming ? (
<Button
variant="destructive"
size="icon-sm"
className="rounded-full mr-1 mb-1"
onClick={() => {
if (currentThreadId) stopStreaming(currentThreadId)
}}
>
<IconPlayerStopFilled />
</Button>
) : (
<Button
variant="default"
size="icon-sm"
disabled={!prompt.trim() || ingestingAny}
data-test-id="send-message-button"
onClick={() => handleSendMessage(prompt)}
className="rounded-full mr-1 mb-1"
>
<ArrowRight className="text-primary-fg" />
</Button>
{/* Two-stage stop: first click clears the queue, second click stops streaming */}
{isStreaming && (
<Tooltip>
<TooltipTrigger asChild>
<Button
variant="destructive"
size="icon-sm"
className="rounded-full mr-1 mb-1"
onClick={() => {
if (!currentThreadId) return
const queue = useMessageQueue.getState().getQueue(currentThreadId)
if (queue.length > 0) {
useMessageQueue.getState().clearQueue(currentThreadId)
} else {
stopStreaming(currentThreadId)
}
}}
>
<IconPlayerStopFilled />
</Button>
</TooltipTrigger>
<TooltipContent>
<p>{queueLength > 0 ? `Clear ${queueLength} queued message(s)` : 'Stop generating'}</p>
</TooltipContent>
</Tooltip>
)}
<Button
variant="default"
size="icon-sm"
disabled={!prompt.trim() || ingestingAny}
data-test-id="send-message-button"
onClick={() => handleSendMessage(prompt)}
className="rounded-full mr-1 mb-1"
>
<ArrowRight className="text-primary-fg" />
</Button>
</div>
</div>
</div>

View File

@@ -0,0 +1,39 @@
import { memo } from 'react'
import { IconClock, IconX } from '@tabler/icons-react'
import type { QueuedMessage } from '@/stores/message-queue-store'
type QueuedMessageChipProps = {
message: QueuedMessage
onEdit?: (message: QueuedMessage) => void
onRemove?: (id: string) => void
}
// Compact chip for a queued message, displayed inside the chat input area.
// Click the text to edit it (puts it back in the input), click X to discard.
export const QueuedMessageChip = memo(function QueuedMessageChip({
message,
onEdit,
onRemove,
}: QueuedMessageChipProps) {
return (
<div className="flex items-center gap-1.5 px-2.5 py-1.5 rounded-lg bg-secondary/80 border border-input text-sm max-w-full">
<IconClock size={14} className="shrink-0 text-muted-foreground animate-pulse" />
<span
className="truncate text-foreground/70 cursor-pointer hover:text-foreground transition-colors"
onClick={() => onEdit?.(message)}
title="Click to edit"
>
{message.text}
</span>
{onRemove && (
<button
type="button"
className="shrink-0 text-muted-foreground hover:text-foreground transition-colors"
onClick={() => onRemove(message.id)}
>
<IconX size={14} />
</button>
)}
</div>
)
})

View File

@@ -0,0 +1,42 @@
import { render, screen } from '@testing-library/react'
import { describe, it, expect, vi } from 'vitest'
import { QueuedMessageChip } from '../QueuedMessageBubble'
describe('QueuedMessageChip', () => {
const baseMessage = {
id: 'queued-1',
text: 'This is a queued message',
createdAt: Date.now(),
}
it('renders the message text', () => {
render(<QueuedMessageChip message={baseMessage} />)
expect(screen.getByText('This is a queued message')).toBeInTheDocument()
})
it('renders the pulsing clock icon', () => {
const { container } = render(<QueuedMessageChip message={baseMessage} />)
expect(container.querySelector('.animate-pulse')).toBeInTheDocument()
})
it('calls onRemove with the message id when X is clicked', () => {
const onRemove = vi.fn()
const { container } = render(
<QueuedMessageChip message={baseMessage} onRemove={onRemove} />
)
container.querySelector('button')?.click()
expect(onRemove).toHaveBeenCalledWith('queued-1')
})
it('does not render X button when onRemove is not provided', () => {
const { container } = render(<QueuedMessageChip message={baseMessage} />)
expect(container.querySelector('button')).toBeNull()
})
it('calls onEdit with the full message when text is clicked', () => {
const onEdit = vi.fn()
render(<QueuedMessageChip message={baseMessage} onEdit={onEdit} />)
screen.getByText('This is a queued message').click()
expect(onEdit).toHaveBeenCalledWith(baseMessage)
})
})

View File

@@ -53,6 +53,7 @@ import { ExtensionTypeEnum, VectorDBExtension } from '@janhq/core'
import { ExtensionManager } from '@/lib/extension'
import { Shimmer } from '@/components/ai-elements/shimmer'
import { useAgentMode } from '@/hooks/useAgentMode'
import { useMessageQueue } from '@/stores/message-queue-store'
const CHAT_STATUS = {
STREAMING: 'streaming',
@@ -596,6 +597,23 @@ function ThreadDetail() {
]
)
// Sends a text-only queued message, bypassing attachment processing entirely.
// This prevents stale or new attachments from leaking into auto-sent queue items.
const sendQueuedMessage = useCallback(
async (text: string) => {
const messageId = generateId()
const userMessage = newUserThreadContent(threadId, text, [], messageId)
addMessage(userMessage)
sendMessage({
parts: [{ type: 'text', text }],
id: messageId,
metadata: userMessage.metadata,
})
},
[sendMessage, threadId, addMessage]
)
// Check for and send initial message from sessionStorage
const initialMessageSentRef = useRef(false)
@@ -865,6 +883,42 @@ function ThreadDetail() {
}
}, [status]) // eslint-disable-line react-hooks/exhaustive-deps
// Message queue: auto-send the next queued message when the stream finishes.
// No reactive subscription to the queue here — ChatInput owns the UI.
// We only read the store imperatively when status transitions to 'ready'.
const processingQueueRef = useRef(false)
useEffect(() => {
if (status !== 'ready' || processingQueueRef.current) return
if (sessionData.tools.length > 0) return
const next = useMessageQueue.getState().dequeue(threadId)
if (!next) return
processingQueueRef.current = true
sendQueuedMessage(next.text)
.catch((err) => {
console.error('Failed to send queued message:', err)
})
.finally(() => {
processingQueueRef.current = false
})
}, [status, threadId, sendQueuedMessage, sessionData.tools.length]) // eslint-disable-line react-hooks/exhaustive-deps
// If streaming errors out, discard any queued messages so they don't sit there stuck
useEffect(() => {
if (status === 'error') {
useMessageQueue.getState().clearQueue(threadId)
}
}, [status, threadId])
// Clear the queue when navigating away from this thread
useEffect(() => {
return () => {
useMessageQueue.getState().clearQueue(threadId)
}
}, [threadId])
const threadModel = useMemo(
() => searchThreadModel ?? thread?.model,
[searchThreadModel, thread]

View File

@@ -0,0 +1,160 @@
import { describe, it, expect, beforeEach } from 'vitest'
import { useMessageQueue } from '../message-queue-store'
function resetStore() {
useMessageQueue.setState({ queues: {} })
}
function makeMessage(id: string, text: string) {
return { id, text, createdAt: Date.now() }
}
describe('useMessageQueue', () => {
beforeEach(() => {
resetStore()
})
describe('enqueue', () => {
it('adds a message to an empty queue', () => {
const { enqueue, getQueue } = useMessageQueue.getState()
enqueue('thread-1', makeMessage('m1', 'hello'))
expect(getQueue('thread-1')).toHaveLength(1)
expect(getQueue('thread-1')[0].text).toBe('hello')
})
it('appends messages in FIFO order', () => {
const { enqueue, getQueue } = useMessageQueue.getState()
enqueue('thread-1', makeMessage('m1', 'first'))
enqueue('thread-1', makeMessage('m2', 'second'))
enqueue('thread-1', makeMessage('m3', 'third'))
const queue = getQueue('thread-1')
expect(queue).toHaveLength(3)
expect(queue.map((m) => m.text)).toEqual(['first', 'second', 'third'])
})
it('keeps queues isolated per thread', () => {
const { enqueue, getQueue } = useMessageQueue.getState()
enqueue('thread-1', makeMessage('m1', 'for thread 1'))
enqueue('thread-2', makeMessage('m2', 'for thread 2'))
expect(getQueue('thread-1')).toHaveLength(1)
expect(getQueue('thread-2')).toHaveLength(1)
expect(getQueue('thread-1')[0].text).toBe('for thread 1')
expect(getQueue('thread-2')[0].text).toBe('for thread 2')
})
})
describe('dequeue', () => {
it('removes and returns the first message', () => {
const { enqueue, dequeue, getQueue } = useMessageQueue.getState()
enqueue('thread-1', makeMessage('m1', 'first'))
enqueue('thread-1', makeMessage('m2', 'second'))
const msg = dequeue('thread-1')
expect(msg?.text).toBe('first')
expect(getQueue('thread-1')).toHaveLength(1)
expect(getQueue('thread-1')[0].text).toBe('second')
})
it('returns undefined when the queue is empty', () => {
const { dequeue } = useMessageQueue.getState()
expect(dequeue('thread-1')).toBeUndefined()
})
it('returns undefined for a non-existent thread', () => {
const { dequeue } = useMessageQueue.getState()
expect(dequeue('non-existent')).toBeUndefined()
})
})
describe('removeMessage', () => {
it('removes a specific message by id', () => {
const { enqueue, removeMessage, getQueue } = useMessageQueue.getState()
enqueue('thread-1', makeMessage('m1', 'first'))
enqueue('thread-1', makeMessage('m2', 'second'))
enqueue('thread-1', makeMessage('m3', 'third'))
removeMessage('thread-1', 'm2')
const queue = getQueue('thread-1')
expect(queue).toHaveLength(2)
expect(queue.map((m) => m.text)).toEqual(['first', 'third'])
})
it('is a no-op if the message id does not exist', () => {
const { enqueue, removeMessage, getQueue } = useMessageQueue.getState()
enqueue('thread-1', makeMessage('m1', 'first'))
removeMessage('thread-1', 'non-existent')
expect(getQueue('thread-1')).toHaveLength(1)
})
it('is a no-op for a non-existent thread', () => {
const { removeMessage } = useMessageQueue.getState()
removeMessage('non-existent', 'm1')
// Should not throw
})
})
describe('clearQueue', () => {
it('removes all messages for a thread', () => {
const { enqueue, clearQueue, getQueue } = useMessageQueue.getState()
enqueue('thread-1', makeMessage('m1', 'a'))
enqueue('thread-1', makeMessage('m2', 'b'))
enqueue('thread-1', makeMessage('m3', 'c'))
clearQueue('thread-1')
expect(getQueue('thread-1')).toHaveLength(0)
})
it('does not affect other threads', () => {
const { enqueue, clearQueue, getQueue } = useMessageQueue.getState()
enqueue('thread-1', makeMessage('m1', 'a'))
enqueue('thread-2', makeMessage('m2', 'b'))
clearQueue('thread-1')
expect(getQueue('thread-1')).toHaveLength(0)
expect(getQueue('thread-2')).toHaveLength(1)
})
it('is a no-op for an empty or non-existent queue', () => {
const { clearQueue, getQueue } = useMessageQueue.getState()
// Should not throw
clearQueue('non-existent')
expect(getQueue('non-existent')).toHaveLength(0)
})
})
describe('getQueue', () => {
it('returns an empty array for unknown threads', () => {
const { getQueue } = useMessageQueue.getState()
expect(getQueue('unknown')).toEqual([])
})
it('returns the same reference for empty queues (avoids re-renders)', () => {
const { getQueue } = useMessageQueue.getState()
const a = getQueue('unknown-1')
const b = getQueue('unknown-2')
expect(a).toBe(b)
})
})
describe('sequential dequeue (simulates auto-processing)', () => {
it('processes messages one at a time in order', () => {
const { enqueue, dequeue } = useMessageQueue.getState()
enqueue('thread-1', makeMessage('m1', 'first'))
enqueue('thread-1', makeMessage('m2', 'second'))
enqueue('thread-1', makeMessage('m3', 'third'))
const results: string[] = []
let msg = dequeue('thread-1')
while (msg) {
results.push(msg.text)
msg = dequeue('thread-1')
}
expect(results).toEqual(['first', 'second', 'third'])
})
})
})

View File

@@ -3,6 +3,7 @@ import { create } from "zustand";
import type { Chat, UIMessage } from "@ai-sdk/react";
import type { ChatStatus } from "ai";
import { CustomChatTransport } from "@/lib/custom-chat-transport";
import { useMessageQueue } from "@/stores/message-queue-store";
// import { showChatCompletionToast } from "@/components/toasts/chat-completion-toast";
export type SessionData = {
@@ -197,6 +198,9 @@ export const useChatSessions = create<ChatSessionState>((set, get) => ({
return;
}
// Clear any pending queued messages for this session
useMessageQueue.getState().clearQueue(sessionId);
// Remove from store FIRST - prevents updateStatus from showing toast during cleanup
set((state) => {
if (!state.sessions[sessionId]) {
@@ -225,6 +229,12 @@ export const useChatSessions = create<ChatSessionState>((set, get) => ({
},
clearSessions: () => {
const sessions = get().sessions;
// Clear all queued messages
Object.keys(sessions).forEach((sessionId) => {
useMessageQueue.getState().clearQueue(sessionId);
});
Object.values(sessions).forEach((session) => {
session.unsubscribers.forEach((unsubscribe) => {
try {

View File

@@ -0,0 +1,71 @@
import { create } from 'zustand'
export type QueuedMessage = {
id: string
text: string
createdAt: number
}
// Stable reference for empty queues so selectors don't trigger unnecessary re-renders
const EMPTY_QUEUE: QueuedMessage[] = []
interface MessageQueueState {
// Per-thread message queues
queues: Record<string, QueuedMessage[]>
enqueue: (threadId: string, message: QueuedMessage) => void
dequeue: (threadId: string) => QueuedMessage | undefined
removeMessage: (threadId: string, messageId: string) => void
clearQueue: (threadId: string) => void
getQueue: (threadId: string) => QueuedMessage[]
}
export const useMessageQueue = create<MessageQueueState>((set, get) => ({
queues: {},
enqueue: (threadId, message) => {
set((state) => ({
queues: {
...state.queues,
[threadId]: [...(state.queues[threadId] ?? []), message],
},
}))
},
// Atomically removes and returns the first message from the queue.
// The entire read-and-mutate happens inside set() to avoid stale-closure races.
dequeue: (threadId) => {
let first: QueuedMessage | undefined
set((state) => {
const queue = state.queues[threadId]
if (!queue || queue.length === 0) return state
const [head, ...rest] = queue
first = head
return { queues: { ...state.queues, [threadId]: rest } }
})
return first
},
removeMessage: (threadId, messageId) => {
set((state) => {
const queue = state.queues[threadId]
if (!queue) return state
const filtered = queue.filter((m) => m.id !== messageId)
if (filtered.length === queue.length) return state
return { queues: { ...state.queues, [threadId]: filtered } }
})
},
clearQueue: (threadId) => {
set((state) => {
if (!state.queues[threadId]?.length) return state
const updated = { ...state.queues }
delete updated[threadId]
return { queues: updated }
})
},
getQueue: (threadId) => {
return get().queues[threadId] ?? EMPTY_QUEUE
},
}))