diff --git a/web-app/src/containers/ChatInput.tsx b/web-app/src/containers/ChatInput.tsx index 82427a6d4..7f45b60ed 100644 --- a/web-app/src/containers/ChatInput.tsx +++ b/web-app/src/containers/ChatInput.tsx @@ -32,6 +32,9 @@ import { IconBrandChrome, IconUser, } from '@tabler/icons-react' +import { generateId } from 'ai' +import { useMessageQueue } from '@/stores/message-queue-store' +import { QueuedMessageChip } from '@/containers/QueuedMessageBubble' import { BotIcon } from 'lucide-react' import { useTranslation } from '@/i18n/react-i18next-compat' import { useGeneralSetting } from '@/hooks/useGeneralSetting' @@ -271,6 +274,19 @@ const ChatInput = memo(function ChatInput({ ) const ingestingAny = attachments.some((a) => a.processing) + // Queued messages for this thread (shown as chips in the input area) + const queuedMessages = useMessageQueue( + useShallow((s) => s.getQueue(currentThreadId ?? '')) + ) + const queueLength = queuedMessages.length + + const removeQueuedMessage = useCallback( + (id: string) => { + useMessageQueue.getState().removeMessage(currentThreadId ?? '', id) + }, + [currentThreadId] + ) + const lastTransferredThreadId = useRef(null) useEffect(() => { @@ -375,6 +391,17 @@ const ChatInput = memo(function ChatInput({ // Use onSubmit prop if available (AI SDK), otherwise create thread and navigate if (onSubmit) { + // When the model is still streaming, queue the message for later + if (isStreaming && currentThreadId) { + useMessageQueue.getState().enqueue(currentThreadId, { + id: generateId(), + text: prompt, + createdAt: Date.now(), + }) + setPrompt('') + return + } + const assistant = currentThread?.assistants?.[0] setCurrentAssistant(assistant) // Build file parts for AI SDK @@ -1513,8 +1540,7 @@ const ChatInput = memo(function ChatInput({
{isStreaming && ( @@ -1621,6 +1647,23 @@ const ChatInput = memo(function ChatInput({
)} + {queuedMessages.length > 0 && ( +
+ {queuedMessages.map((msg) => ( + { + // Put the text back in the input for editing, remove from queue + setPrompt(queued.text) + removeQueuedMessage(queued.id) + textareaRef.current?.focus() + }} + onRemove={removeQueuedMessage} + /> + ))} +
+ )} )} - {isStreaming ? ( - - ) : ( - + {/* Two-stage stop: first click clears the queue, second click stops streaming */} + {isStreaming && ( + + + + + +

{queueLength > 0 ? `Clear ${queueLength} queued message(s)` : 'Stop generating'}

+
+
)} + diff --git a/web-app/src/containers/QueuedMessageBubble.tsx b/web-app/src/containers/QueuedMessageBubble.tsx new file mode 100644 index 000000000..66e7690db --- /dev/null +++ b/web-app/src/containers/QueuedMessageBubble.tsx @@ -0,0 +1,39 @@ +import { memo } from 'react' +import { IconClock, IconX } from '@tabler/icons-react' +import type { QueuedMessage } from '@/stores/message-queue-store' + +type QueuedMessageChipProps = { + message: QueuedMessage + onEdit?: (message: QueuedMessage) => void + onRemove?: (id: string) => void +} + +// Compact chip for a queued message, displayed inside the chat input area. +// Click the text to edit it (puts it back in the input), click X to discard. +export const QueuedMessageChip = memo(function QueuedMessageChip({ + message, + onEdit, + onRemove, +}: QueuedMessageChipProps) { + return ( +
+ + onEdit?.(message)} + title="Click to edit" + > + {message.text} + + {onRemove && ( + + )} +
+ ) +}) diff --git a/web-app/src/containers/__tests__/QueuedMessageBubble.test.tsx b/web-app/src/containers/__tests__/QueuedMessageBubble.test.tsx new file mode 100644 index 000000000..d90cce32c --- /dev/null +++ b/web-app/src/containers/__tests__/QueuedMessageBubble.test.tsx @@ -0,0 +1,42 @@ +import { render, screen } from '@testing-library/react' +import { describe, it, expect, vi } from 'vitest' +import { QueuedMessageChip } from '../QueuedMessageBubble' + +describe('QueuedMessageChip', () => { + const baseMessage = { + id: 'queued-1', + text: 'This is a queued message', + createdAt: Date.now(), + } + + it('renders the message text', () => { + render() + expect(screen.getByText('This is a queued message')).toBeInTheDocument() + }) + + it('renders the pulsing clock icon', () => { + const { container } = render() + expect(container.querySelector('.animate-pulse')).toBeInTheDocument() + }) + + it('calls onRemove with the message id when X is clicked', () => { + const onRemove = vi.fn() + const { container } = render( + + ) + container.querySelector('button')?.click() + expect(onRemove).toHaveBeenCalledWith('queued-1') + }) + + it('does not render X button when onRemove is not provided', () => { + const { container } = render() + expect(container.querySelector('button')).toBeNull() + }) + + it('calls onEdit with the full message when text is clicked', () => { + const onEdit = vi.fn() + render() + screen.getByText('This is a queued message').click() + expect(onEdit).toHaveBeenCalledWith(baseMessage) + }) +}) diff --git a/web-app/src/routes/threads/$threadId.tsx b/web-app/src/routes/threads/$threadId.tsx index d2af43836..d1de8bd09 100644 --- a/web-app/src/routes/threads/$threadId.tsx +++ b/web-app/src/routes/threads/$threadId.tsx @@ -53,6 +53,7 @@ import { ExtensionTypeEnum, VectorDBExtension } from '@janhq/core' import { ExtensionManager } from '@/lib/extension' import { Shimmer } from '@/components/ai-elements/shimmer' import { useAgentMode } from '@/hooks/useAgentMode' +import { useMessageQueue } from '@/stores/message-queue-store' const CHAT_STATUS = { STREAMING: 'streaming', @@ -596,6 +597,23 @@ function ThreadDetail() { ] ) + // Sends a text-only queued message, bypassing attachment processing entirely. + // This prevents stale or new attachments from leaking into auto-sent queue items. + const sendQueuedMessage = useCallback( + async (text: string) => { + const messageId = generateId() + const userMessage = newUserThreadContent(threadId, text, [], messageId) + addMessage(userMessage) + + sendMessage({ + parts: [{ type: 'text', text }], + id: messageId, + metadata: userMessage.metadata, + }) + }, + [sendMessage, threadId, addMessage] + ) + // Check for and send initial message from sessionStorage const initialMessageSentRef = useRef(false) @@ -865,6 +883,42 @@ function ThreadDetail() { } }, [status]) // eslint-disable-line react-hooks/exhaustive-deps + // Message queue: auto-send the next queued message when the stream finishes. + // No reactive subscription to the queue here — ChatInput owns the UI. + // We only read the store imperatively when status transitions to 'ready'. + const processingQueueRef = useRef(false) + + useEffect(() => { + if (status !== 'ready' || processingQueueRef.current) return + if (sessionData.tools.length > 0) return + + const next = useMessageQueue.getState().dequeue(threadId) + if (!next) return + + processingQueueRef.current = true + sendQueuedMessage(next.text) + .catch((err) => { + console.error('Failed to send queued message:', err) + }) + .finally(() => { + processingQueueRef.current = false + }) + }, [status, threadId, sendQueuedMessage, sessionData.tools.length]) // eslint-disable-line react-hooks/exhaustive-deps + + // If streaming errors out, discard any queued messages so they don't sit there stuck + useEffect(() => { + if (status === 'error') { + useMessageQueue.getState().clearQueue(threadId) + } + }, [status, threadId]) + + // Clear the queue when navigating away from this thread + useEffect(() => { + return () => { + useMessageQueue.getState().clearQueue(threadId) + } + }, [threadId]) + const threadModel = useMemo( () => searchThreadModel ?? thread?.model, [searchThreadModel, thread] diff --git a/web-app/src/stores/__tests__/message-queue-store.test.ts b/web-app/src/stores/__tests__/message-queue-store.test.ts new file mode 100644 index 000000000..c58351c52 --- /dev/null +++ b/web-app/src/stores/__tests__/message-queue-store.test.ts @@ -0,0 +1,160 @@ +import { describe, it, expect, beforeEach } from 'vitest' +import { useMessageQueue } from '../message-queue-store' + +function resetStore() { + useMessageQueue.setState({ queues: {} }) +} + +function makeMessage(id: string, text: string) { + return { id, text, createdAt: Date.now() } +} + +describe('useMessageQueue', () => { + beforeEach(() => { + resetStore() + }) + + describe('enqueue', () => { + it('adds a message to an empty queue', () => { + const { enqueue, getQueue } = useMessageQueue.getState() + enqueue('thread-1', makeMessage('m1', 'hello')) + + expect(getQueue('thread-1')).toHaveLength(1) + expect(getQueue('thread-1')[0].text).toBe('hello') + }) + + it('appends messages in FIFO order', () => { + const { enqueue, getQueue } = useMessageQueue.getState() + enqueue('thread-1', makeMessage('m1', 'first')) + enqueue('thread-1', makeMessage('m2', 'second')) + enqueue('thread-1', makeMessage('m3', 'third')) + + const queue = getQueue('thread-1') + expect(queue).toHaveLength(3) + expect(queue.map((m) => m.text)).toEqual(['first', 'second', 'third']) + }) + + it('keeps queues isolated per thread', () => { + const { enqueue, getQueue } = useMessageQueue.getState() + enqueue('thread-1', makeMessage('m1', 'for thread 1')) + enqueue('thread-2', makeMessage('m2', 'for thread 2')) + + expect(getQueue('thread-1')).toHaveLength(1) + expect(getQueue('thread-2')).toHaveLength(1) + expect(getQueue('thread-1')[0].text).toBe('for thread 1') + expect(getQueue('thread-2')[0].text).toBe('for thread 2') + }) + }) + + describe('dequeue', () => { + it('removes and returns the first message', () => { + const { enqueue, dequeue, getQueue } = useMessageQueue.getState() + enqueue('thread-1', makeMessage('m1', 'first')) + enqueue('thread-1', makeMessage('m2', 'second')) + + const msg = dequeue('thread-1') + expect(msg?.text).toBe('first') + expect(getQueue('thread-1')).toHaveLength(1) + expect(getQueue('thread-1')[0].text).toBe('second') + }) + + it('returns undefined when the queue is empty', () => { + const { dequeue } = useMessageQueue.getState() + expect(dequeue('thread-1')).toBeUndefined() + }) + + it('returns undefined for a non-existent thread', () => { + const { dequeue } = useMessageQueue.getState() + expect(dequeue('non-existent')).toBeUndefined() + }) + }) + + describe('removeMessage', () => { + it('removes a specific message by id', () => { + const { enqueue, removeMessage, getQueue } = useMessageQueue.getState() + enqueue('thread-1', makeMessage('m1', 'first')) + enqueue('thread-1', makeMessage('m2', 'second')) + enqueue('thread-1', makeMessage('m3', 'third')) + + removeMessage('thread-1', 'm2') + const queue = getQueue('thread-1') + expect(queue).toHaveLength(2) + expect(queue.map((m) => m.text)).toEqual(['first', 'third']) + }) + + it('is a no-op if the message id does not exist', () => { + const { enqueue, removeMessage, getQueue } = useMessageQueue.getState() + enqueue('thread-1', makeMessage('m1', 'first')) + + removeMessage('thread-1', 'non-existent') + expect(getQueue('thread-1')).toHaveLength(1) + }) + + it('is a no-op for a non-existent thread', () => { + const { removeMessage } = useMessageQueue.getState() + removeMessage('non-existent', 'm1') + // Should not throw + }) + }) + + describe('clearQueue', () => { + it('removes all messages for a thread', () => { + const { enqueue, clearQueue, getQueue } = useMessageQueue.getState() + enqueue('thread-1', makeMessage('m1', 'a')) + enqueue('thread-1', makeMessage('m2', 'b')) + enqueue('thread-1', makeMessage('m3', 'c')) + + clearQueue('thread-1') + expect(getQueue('thread-1')).toHaveLength(0) + }) + + it('does not affect other threads', () => { + const { enqueue, clearQueue, getQueue } = useMessageQueue.getState() + enqueue('thread-1', makeMessage('m1', 'a')) + enqueue('thread-2', makeMessage('m2', 'b')) + + clearQueue('thread-1') + expect(getQueue('thread-1')).toHaveLength(0) + expect(getQueue('thread-2')).toHaveLength(1) + }) + + it('is a no-op for an empty or non-existent queue', () => { + const { clearQueue, getQueue } = useMessageQueue.getState() + // Should not throw + clearQueue('non-existent') + expect(getQueue('non-existent')).toHaveLength(0) + }) + }) + + describe('getQueue', () => { + it('returns an empty array for unknown threads', () => { + const { getQueue } = useMessageQueue.getState() + expect(getQueue('unknown')).toEqual([]) + }) + + it('returns the same reference for empty queues (avoids re-renders)', () => { + const { getQueue } = useMessageQueue.getState() + const a = getQueue('unknown-1') + const b = getQueue('unknown-2') + expect(a).toBe(b) + }) + }) + + describe('sequential dequeue (simulates auto-processing)', () => { + it('processes messages one at a time in order', () => { + const { enqueue, dequeue } = useMessageQueue.getState() + enqueue('thread-1', makeMessage('m1', 'first')) + enqueue('thread-1', makeMessage('m2', 'second')) + enqueue('thread-1', makeMessage('m3', 'third')) + + const results: string[] = [] + let msg = dequeue('thread-1') + while (msg) { + results.push(msg.text) + msg = dequeue('thread-1') + } + + expect(results).toEqual(['first', 'second', 'third']) + }) + }) +}) diff --git a/web-app/src/stores/chat-session-store.ts b/web-app/src/stores/chat-session-store.ts index 1a92545b4..73fcdb978 100644 --- a/web-app/src/stores/chat-session-store.ts +++ b/web-app/src/stores/chat-session-store.ts @@ -3,6 +3,7 @@ import { create } from "zustand"; import type { Chat, UIMessage } from "@ai-sdk/react"; import type { ChatStatus } from "ai"; import { CustomChatTransport } from "@/lib/custom-chat-transport"; +import { useMessageQueue } from "@/stores/message-queue-store"; // import { showChatCompletionToast } from "@/components/toasts/chat-completion-toast"; export type SessionData = { @@ -197,6 +198,9 @@ export const useChatSessions = create((set, get) => ({ return; } + // Clear any pending queued messages for this session + useMessageQueue.getState().clearQueue(sessionId); + // Remove from store FIRST - prevents updateStatus from showing toast during cleanup set((state) => { if (!state.sessions[sessionId]) { @@ -225,6 +229,12 @@ export const useChatSessions = create((set, get) => ({ }, clearSessions: () => { const sessions = get().sessions; + + // Clear all queued messages + Object.keys(sessions).forEach((sessionId) => { + useMessageQueue.getState().clearQueue(sessionId); + }); + Object.values(sessions).forEach((session) => { session.unsubscribers.forEach((unsubscribe) => { try { diff --git a/web-app/src/stores/message-queue-store.ts b/web-app/src/stores/message-queue-store.ts new file mode 100644 index 000000000..f6975fdb9 --- /dev/null +++ b/web-app/src/stores/message-queue-store.ts @@ -0,0 +1,71 @@ +import { create } from 'zustand' + +export type QueuedMessage = { + id: string + text: string + createdAt: number +} + +// Stable reference for empty queues so selectors don't trigger unnecessary re-renders +const EMPTY_QUEUE: QueuedMessage[] = [] + +interface MessageQueueState { + // Per-thread message queues + queues: Record + + enqueue: (threadId: string, message: QueuedMessage) => void + dequeue: (threadId: string) => QueuedMessage | undefined + removeMessage: (threadId: string, messageId: string) => void + clearQueue: (threadId: string) => void + getQueue: (threadId: string) => QueuedMessage[] +} + +export const useMessageQueue = create((set, get) => ({ + queues: {}, + + enqueue: (threadId, message) => { + set((state) => ({ + queues: { + ...state.queues, + [threadId]: [...(state.queues[threadId] ?? []), message], + }, + })) + }, + + // Atomically removes and returns the first message from the queue. + // The entire read-and-mutate happens inside set() to avoid stale-closure races. + dequeue: (threadId) => { + let first: QueuedMessage | undefined + set((state) => { + const queue = state.queues[threadId] + if (!queue || queue.length === 0) return state + const [head, ...rest] = queue + first = head + return { queues: { ...state.queues, [threadId]: rest } } + }) + return first + }, + + removeMessage: (threadId, messageId) => { + set((state) => { + const queue = state.queues[threadId] + if (!queue) return state + const filtered = queue.filter((m) => m.id !== messageId) + if (filtered.length === queue.length) return state + return { queues: { ...state.queues, [threadId]: filtered } } + }) + }, + + clearQueue: (threadId) => { + set((state) => { + if (!state.queues[threadId]?.length) return state + const updated = { ...state.queues } + delete updated[threadId] + return { queues: updated } + }) + }, + + getQueue: (threadId) => { + return get().queues[threadId] ?? EMPTY_QUEUE + }, +}))