From 94b9fcc5e0671e22d6c89617d1fa88a5416a5ff6 Mon Sep 17 00:00:00 2001 From: sudipnext Date: Sat, 11 Apr 2026 14:49:05 +0545 Subject: [PATCH] refactor: enhance streaming logic with retry mechanism for outline and presentation hooks --- .../outline/hooks/useOutlineStreaming.ts | 310 ++++++++++-------- .../hooks/usePresentationStreaming.ts | 129 ++++++-- 2 files changed, 280 insertions(+), 159 deletions(-) diff --git a/electron/servers/nextjs/app/(presentation-generator)/outline/hooks/useOutlineStreaming.ts b/electron/servers/nextjs/app/(presentation-generator)/outline/hooks/useOutlineStreaming.ts index 0a556d2a..f816378d 100644 --- a/electron/servers/nextjs/app/(presentation-generator)/outline/hooks/useOutlineStreaming.ts +++ b/electron/servers/nextjs/app/(presentation-generator)/outline/hooks/useOutlineStreaming.ts @@ -6,6 +6,9 @@ import { jsonrepair } from "jsonrepair"; import { RootState } from "@/store/store"; import { getFastAPIUrl } from "@/utils/api"; +const MAX_STREAM_RETRIES = 3; +const STREAM_RETRY_DELAY_MS = 1_000; + export const useOutlineStreaming = (presentationId: string | null) => { @@ -22,141 +25,190 @@ export const useOutlineStreaming = (presentationId: string | null) => { useEffect(() => { if (!presentationId || outlines.length > 0) return; - let eventSource: EventSource; + let eventSource: EventSource | null = null; let accumulatedChunks = ""; + let retryCount = 0; + let isClosed = false; + let retryTimer: ReturnType | null = null; - const initializeStream = async () => { - setIsStreaming(true) - setIsLoading(true) - try { - eventSource = new EventSource(`${getFastAPIUrl()}/api/v1/ppt/outlines/stream/${presentationId}`) ; - - eventSource.addEventListener("response", (event) => { - const data = JSON.parse(event.data); - switch (data.type) { - case "chunk": - // - accumulatedChunks += data.chunk; - // - try { - const repairedJson = jsonrepair(accumulatedChunks); - const partialData = JSON.parse(repairedJson); - - if (partialData.slides) { - const nextSlides: { content: string }[] = partialData.slides || []; - // Determine which slide index changed to minimize live parsing - try { - const prev = prevSlidesRef.current || []; - let changedIndex: number | null = null; - const maxLen = Math.max(prev.length, nextSlides.length); - for (let i = 0; i < maxLen; i++) { - const prevContent = prev[i]?.content; - const nextContent = nextSlides[i]?.content; - if (nextContent !== prevContent) { - changedIndex = i; - } - } - // Keep active index stable if no change detected; and ensure non-decreasing - const prevActive = activeIndexRef.current; - let nextActive = changedIndex ?? prevActive; - if (nextActive < prevActive) { - nextActive = prevActive; - } - activeIndexRef.current = nextActive; - setActiveSlideIndex(nextActive); - - if (nextActive > highestIndexRef.current) { - highestIndexRef.current = nextActive; - setHighestActiveIndex(nextActive); - } - } catch { } - - prevSlidesRef.current = nextSlides; - dispatch(setOutlines(nextSlides)); - setIsLoading(false) - } - } catch (error) { - // JSON isn't complete yet, continue accumulating - } - break; - - case "complete": - - try { - const outlinesData: { content: string }[] = data.presentation.outlines.slides; - dispatch(setOutlines(outlinesData)); - setIsStreaming(false) - setIsLoading(false) - setActiveSlideIndex(null) - setHighestActiveIndex(-1) - prevSlidesRef.current = outlinesData; - activeIndexRef.current = -1; - highestIndexRef.current = -1; - eventSource.close(); - } catch (error) { - console.error("Error parsing accumulated chunks:", error); - toast.error("Failed to parse presentation data"); - eventSource.close(); - } - accumulatedChunks = ""; - break; - - case "closing": - - setIsStreaming(false) - setIsLoading(false) - setActiveSlideIndex(null) - setHighestActiveIndex(-1) - activeIndexRef.current = -1; - highestIndexRef.current = -1; - eventSource.close(); - break; - case "error": - - setIsStreaming(false) - setIsLoading(false) - setActiveSlideIndex(null) - setHighestActiveIndex(-1) - activeIndexRef.current = -1; - highestIndexRef.current = -1; - eventSource.close(); - toast.error('Error in outline streaming', - { - description: data.detail || 'Failed to connect to the server. Please try again.', - } - ); - break; - } - }); - - eventSource.onerror = () => { - - setIsStreaming(false) - setIsLoading(false) - setActiveSlideIndex(null) - setHighestActiveIndex(-1) - activeIndexRef.current = -1; - highestIndexRef.current = -1; - eventSource.close(); - toast.error("Failed to connect to the server. Please try again."); - }; - } catch (error) { - - setIsStreaming(false) - setIsLoading(false) - setActiveSlideIndex(null) - setHighestActiveIndex(-1) - activeIndexRef.current = -1; - highestIndexRef.current = -1; - toast.error("Failed to initialize connection"); - } - }; - initializeStream(); - return () => { + const closeEventSource = () => { if (eventSource) { eventSource.close(); + eventSource = null; } }; + + const clearRetryTimer = () => { + if (retryTimer) { + clearTimeout(retryTimer); + retryTimer = null; + } + }; + + const resetStreamingState = () => { + setIsStreaming(false); + setIsLoading(false); + setActiveSlideIndex(null); + setHighestActiveIndex(-1); + activeIndexRef.current = -1; + highestIndexRef.current = -1; + }; + + const scheduleRetry = (reason: string): boolean => { + if (retryCount >= MAX_STREAM_RETRIES || isClosed) { + return false; + } + + retryCount += 1; + const retryDelay = STREAM_RETRY_DELAY_MS * retryCount; + console.warn( + `Outline stream retry ${retryCount}/${MAX_STREAM_RETRIES}: ${reason}` + ); + + closeEventSource(); + clearRetryTimer(); + accumulatedChunks = ""; + prevSlidesRef.current = []; + activeIndexRef.current = -1; + highestIndexRef.current = -1; + + retryTimer = setTimeout(() => { + if (!isClosed) { + openStream(); + } + }, retryDelay); + + return true; + }; + + const openStream = () => { + closeEventSource(); + eventSource = new EventSource(`${getFastAPIUrl()}/api/v1/ppt/outlines/stream/${presentationId}`); + + eventSource.addEventListener("response", (event) => { + let data: any; + try { + data = JSON.parse(event.data); + } catch { + if (!scheduleRetry("invalid SSE payload")) { + resetStreamingState(); + toast.error("Failed to parse outline stream response."); + } + return; + } + + switch (data.type) { + case "chunk": + accumulatedChunks += data.chunk; + try { + const repairedJson = jsonrepair(accumulatedChunks); + const partialData = JSON.parse(repairedJson); + + if (partialData.slides) { + const nextSlides: { content: string }[] = partialData.slides || []; + try { + const prev = prevSlidesRef.current || []; + let changedIndex: number | null = null; + const maxLen = Math.max(prev.length, nextSlides.length); + for (let i = 0; i < maxLen; i++) { + const prevContent = prev[i]?.content; + const nextContent = nextSlides[i]?.content; + if (nextContent !== prevContent) { + changedIndex = i; + } + } + const prevActive = activeIndexRef.current; + let nextActive = changedIndex ?? prevActive; + if (nextActive < prevActive) { + nextActive = prevActive; + } + activeIndexRef.current = nextActive; + setActiveSlideIndex(nextActive); + + if (nextActive > highestIndexRef.current) { + highestIndexRef.current = nextActive; + setHighestActiveIndex(nextActive); + } + } catch {} + + prevSlidesRef.current = nextSlides; + dispatch(setOutlines(nextSlides)); + setIsLoading(false); + } + } catch (error) { + // JSON isn't complete yet, continue accumulating + } + break; + + case "complete": + try { + const outlinesData: { content: string }[] = + data.presentation.outlines.slides; + dispatch(setOutlines(outlinesData)); + setIsStreaming(false); + setIsLoading(false); + setActiveSlideIndex(null); + setHighestActiveIndex(-1); + prevSlidesRef.current = outlinesData; + activeIndexRef.current = -1; + highestIndexRef.current = -1; + isClosed = true; + closeEventSource(); + clearRetryTimer(); + retryCount = 0; + } catch (error) { + if (!scheduleRetry("failed to parse complete payload")) { + resetStreamingState(); + toast.error("Failed to parse presentation data"); + } + } + accumulatedChunks = ""; + break; + + case "closing": + setIsStreaming(false); + setIsLoading(false); + setActiveSlideIndex(null); + setHighestActiveIndex(-1); + activeIndexRef.current = -1; + highestIndexRef.current = -1; + isClosed = true; + closeEventSource(); + clearRetryTimer(); + retryCount = 0; + break; + case "error": + if (!scheduleRetry(data.detail || "server returned stream error")) { + resetStreamingState(); + closeEventSource(); + toast.error("Error in outline streaming", { + description: + data.detail || + "Failed to connect to the server. Please try again.", + }); + } + break; + } + }); + + eventSource.onerror = () => { + if (!scheduleRetry("connection lost")) { + resetStreamingState(); + closeEventSource(); + toast.error("Failed to connect to the server. Please try again."); + } + }; + }; + + setIsStreaming(true); + setIsLoading(true); + openStream(); + + return () => { + isClosed = true; + closeEventSource(); + clearRetryTimer(); + }; }, [presentationId, dispatch]); return { isStreaming, isLoading, activeSlideIndex, highestActiveIndex }; diff --git a/electron/servers/nextjs/app/(presentation-generator)/presentation/hooks/usePresentationStreaming.ts b/electron/servers/nextjs/app/(presentation-generator)/presentation/hooks/usePresentationStreaming.ts index 2bfdc7c6..dc4cb380 100644 --- a/electron/servers/nextjs/app/(presentation-generator)/presentation/hooks/usePresentationStreaming.ts +++ b/electron/servers/nextjs/app/(presentation-generator)/presentation/hooks/usePresentationStreaming.ts @@ -10,6 +10,9 @@ import { toast } from "sonner"; import { MixpanelEvent, trackEvent } from "@/utils/mixpanel"; import { getFastAPIUrl, resolveBackendAssetUrl } from "@/utils/api"; +const MAX_STREAM_RETRIES = 3; +const STREAM_RETRY_DELAY_MS = 1_000; + const normalizePresentationAssets = (input: T): T => { if (Array.isArray(input)) { return input.map((item) => normalizePresentationAssets(item)) as T; @@ -41,21 +44,81 @@ export const usePresentationStreaming = ( const previousSlidesLength = useRef(0); useEffect(() => { - let eventSource: EventSource; + if (!stream) { + fetchUserSlides(); + return; + } + + let eventSource: EventSource | null = null; let accumulatedChunks = ""; + let retryCount = 0; + let isClosed = false; + let retryTimer: ReturnType | null = null; - const initializeStream = async () => { - dispatch(setStreaming(true)); - dispatch(clearPresentationData()); + const closeEventSource = () => { + if (eventSource) { + eventSource.close(); + eventSource = null; + } + }; - trackEvent(MixpanelEvent.Presentation_Stream_API_Call); + const clearRetryTimer = () => { + if (retryTimer) { + clearTimeout(retryTimer); + retryTimer = null; + } + }; + const finalizeFailure = (description: string) => { + closeEventSource(); + clearRetryTimer(); + setLoading(false); + dispatch(setStreaming(false)); + setError(true); + toast.error("Presentation streaming failed", { description }); + }; + + const scheduleRetry = (reason: string): boolean => { + if (retryCount >= MAX_STREAM_RETRIES || isClosed) { + return false; + } + + retryCount += 1; + const retryDelay = STREAM_RETRY_DELAY_MS * retryCount; + console.warn( + `Presentation stream retry ${retryCount}/${MAX_STREAM_RETRIES}: ${reason}` + ); + + closeEventSource(); + clearRetryTimer(); + accumulatedChunks = ""; + previousSlidesLength.current = 0; + + retryTimer = setTimeout(() => { + if (!isClosed) { + openStream(); + } + }, retryDelay); + + return true; + }; + + const openStream = () => { + closeEventSource(); eventSource = new EventSource( `${getFastAPIUrl()}/api/v1/ppt/presentation/stream/${presentationId}` ); eventSource.addEventListener("response", (event) => { - const data = JSON.parse(event.data); + let data: any; + try { + data = JSON.parse(event.data); + } catch { + if (!scheduleRetry("invalid SSE payload")) { + finalizeFailure("Failed to parse stream response."); + } + return; + } switch (data.type) { case "chunk": @@ -90,15 +153,19 @@ export const usePresentationStreaming = ( dispatch(setPresentationData(normalizePresentationAssets(data.presentation))); dispatch(setStreaming(false)); setLoading(false); - eventSource.close(); + isClosed = true; + closeEventSource(); + clearRetryTimer(); + retryCount = 0; // Remove stream parameter from URL const newUrl = new URL(window.location.href); newUrl.searchParams.delete("stream"); window.history.replaceState({}, "", newUrl.toString()); } catch (error) { - eventSource.close(); - console.error("Error parsing accumulated chunks:", error); + if (!scheduleRetry("failed to parse complete payload")) { + finalizeFailure("Failed to parse final presentation payload."); + } } accumulatedChunks = ""; break; @@ -107,7 +174,10 @@ export const usePresentationStreaming = ( dispatch(setPresentationData(normalizePresentationAssets(data.presentation))); setLoading(false); dispatch(setStreaming(false)); - eventSource.close(); + isClosed = true; + closeEventSource(); + clearRetryTimer(); + retryCount = 0; // Remove stream parameter from URL const newUrl = new URL(window.location.href); @@ -115,38 +185,37 @@ export const usePresentationStreaming = ( window.history.replaceState({}, "", newUrl.toString()); break; case "error": - eventSource.close(); - toast.error("Error in outline streaming", { - description: + if ( + !scheduleRetry( + data.detail || "server returned stream error response" + ) + ) { + finalizeFailure( data.detail || - "Failed to connect to the server. Please try again.", - }); - setLoading(false); - dispatch(setStreaming(false)); - setError(true); + "Failed to connect to the server. Please try again." + ); + } break; } }); eventSource.onerror = (error) => { console.error("EventSource failed:", error); - setLoading(false); - dispatch(setStreaming(false)); - setError(true); - eventSource.close(); + if (!scheduleRetry("connection lost")) { + finalizeFailure("Failed to connect to the server. Please try again."); + } }; }; - if (stream) { - initializeStream(); - } else { - fetchUserSlides(); - } + dispatch(setStreaming(true)); + dispatch(clearPresentationData()); + trackEvent(MixpanelEvent.Presentation_Stream_API_Call); + openStream(); return () => { - if (eventSource) { - eventSource.close(); - } + isClosed = true; + closeEventSource(); + clearRetryTimer(); }; }, [presentationId, stream, dispatch, setLoading, setError, fetchUserSlides]); };