Merge pull request #511 from presenton/fix/streaming-retry

refactor: enhance streaming logic with retry mechanism for outline an…
This commit is contained in:
Sudip Parajuli 2026-04-11 14:57:15 +05:45 committed by GitHub
commit 01167c7327
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 280 additions and 159 deletions

View file

@ -6,6 +6,9 @@ import { jsonrepair } from "jsonrepair";
import { RootState } from "@/store/store";
import { getFastAPIUrl } from "@/utils/api";
const MAX_STREAM_RETRIES = 3;
const STREAM_RETRY_DELAY_MS = 1_000;
export const useOutlineStreaming = (presentationId: string | null) => {
@ -22,141 +25,190 @@ export const useOutlineStreaming = (presentationId: string | null) => {
useEffect(() => {
if (!presentationId || outlines.length > 0) return;
let eventSource: EventSource;
let eventSource: EventSource | null = null;
let accumulatedChunks = "";
let retryCount = 0;
let isClosed = false;
let retryTimer: ReturnType<typeof setTimeout> | null = null;
const initializeStream = async () => {
setIsStreaming(true)
setIsLoading(true)
try {
eventSource = new EventSource(`${getFastAPIUrl()}/api/v1/ppt/outlines/stream/${presentationId}`) ;
eventSource.addEventListener("response", (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case "chunk":
//
accumulatedChunks += data.chunk;
//
try {
const repairedJson = jsonrepair(accumulatedChunks);
const partialData = JSON.parse(repairedJson);
if (partialData.slides) {
const nextSlides: { content: string }[] = partialData.slides || [];
// Determine which slide index changed to minimize live parsing
try {
const prev = prevSlidesRef.current || [];
let changedIndex: number | null = null;
const maxLen = Math.max(prev.length, nextSlides.length);
for (let i = 0; i < maxLen; i++) {
const prevContent = prev[i]?.content;
const nextContent = nextSlides[i]?.content;
if (nextContent !== prevContent) {
changedIndex = i;
}
}
// Keep active index stable if no change detected; and ensure non-decreasing
const prevActive = activeIndexRef.current;
let nextActive = changedIndex ?? prevActive;
if (nextActive < prevActive) {
nextActive = prevActive;
}
activeIndexRef.current = nextActive;
setActiveSlideIndex(nextActive);
if (nextActive > highestIndexRef.current) {
highestIndexRef.current = nextActive;
setHighestActiveIndex(nextActive);
}
} catch { }
prevSlidesRef.current = nextSlides;
dispatch(setOutlines(nextSlides));
setIsLoading(false)
}
} catch (error) {
// JSON isn't complete yet, continue accumulating
}
break;
case "complete":
try {
const outlinesData: { content: string }[] = data.presentation.outlines.slides;
dispatch(setOutlines(outlinesData));
setIsStreaming(false)
setIsLoading(false)
setActiveSlideIndex(null)
setHighestActiveIndex(-1)
prevSlidesRef.current = outlinesData;
activeIndexRef.current = -1;
highestIndexRef.current = -1;
eventSource.close();
} catch (error) {
console.error("Error parsing accumulated chunks:", error);
toast.error("Failed to parse presentation data");
eventSource.close();
}
accumulatedChunks = "";
break;
case "closing":
setIsStreaming(false)
setIsLoading(false)
setActiveSlideIndex(null)
setHighestActiveIndex(-1)
activeIndexRef.current = -1;
highestIndexRef.current = -1;
eventSource.close();
break;
case "error":
setIsStreaming(false)
setIsLoading(false)
setActiveSlideIndex(null)
setHighestActiveIndex(-1)
activeIndexRef.current = -1;
highestIndexRef.current = -1;
eventSource.close();
toast.error('Error in outline streaming',
{
description: data.detail || 'Failed to connect to the server. Please try again.',
}
);
break;
}
});
eventSource.onerror = () => {
setIsStreaming(false)
setIsLoading(false)
setActiveSlideIndex(null)
setHighestActiveIndex(-1)
activeIndexRef.current = -1;
highestIndexRef.current = -1;
eventSource.close();
toast.error("Failed to connect to the server. Please try again.");
};
} catch (error) {
setIsStreaming(false)
setIsLoading(false)
setActiveSlideIndex(null)
setHighestActiveIndex(-1)
activeIndexRef.current = -1;
highestIndexRef.current = -1;
toast.error("Failed to initialize connection");
}
};
initializeStream();
return () => {
const closeEventSource = () => {
if (eventSource) {
eventSource.close();
eventSource = null;
}
};
const clearRetryTimer = () => {
if (retryTimer) {
clearTimeout(retryTimer);
retryTimer = null;
}
};
const resetStreamingState = () => {
setIsStreaming(false);
setIsLoading(false);
setActiveSlideIndex(null);
setHighestActiveIndex(-1);
activeIndexRef.current = -1;
highestIndexRef.current = -1;
};
const scheduleRetry = (reason: string): boolean => {
if (retryCount >= MAX_STREAM_RETRIES || isClosed) {
return false;
}
retryCount += 1;
const retryDelay = STREAM_RETRY_DELAY_MS * retryCount;
console.warn(
`Outline stream retry ${retryCount}/${MAX_STREAM_RETRIES}: ${reason}`
);
closeEventSource();
clearRetryTimer();
accumulatedChunks = "";
prevSlidesRef.current = [];
activeIndexRef.current = -1;
highestIndexRef.current = -1;
retryTimer = setTimeout(() => {
if (!isClosed) {
openStream();
}
}, retryDelay);
return true;
};
const openStream = () => {
closeEventSource();
eventSource = new EventSource(`${getFastAPIUrl()}/api/v1/ppt/outlines/stream/${presentationId}`);
eventSource.addEventListener("response", (event) => {
let data: any;
try {
data = JSON.parse(event.data);
} catch {
if (!scheduleRetry("invalid SSE payload")) {
resetStreamingState();
toast.error("Failed to parse outline stream response.");
}
return;
}
switch (data.type) {
case "chunk":
accumulatedChunks += data.chunk;
try {
const repairedJson = jsonrepair(accumulatedChunks);
const partialData = JSON.parse(repairedJson);
if (partialData.slides) {
const nextSlides: { content: string }[] = partialData.slides || [];
try {
const prev = prevSlidesRef.current || [];
let changedIndex: number | null = null;
const maxLen = Math.max(prev.length, nextSlides.length);
for (let i = 0; i < maxLen; i++) {
const prevContent = prev[i]?.content;
const nextContent = nextSlides[i]?.content;
if (nextContent !== prevContent) {
changedIndex = i;
}
}
const prevActive = activeIndexRef.current;
let nextActive = changedIndex ?? prevActive;
if (nextActive < prevActive) {
nextActive = prevActive;
}
activeIndexRef.current = nextActive;
setActiveSlideIndex(nextActive);
if (nextActive > highestIndexRef.current) {
highestIndexRef.current = nextActive;
setHighestActiveIndex(nextActive);
}
} catch {}
prevSlidesRef.current = nextSlides;
dispatch(setOutlines(nextSlides));
setIsLoading(false);
}
} catch (error) {
// JSON isn't complete yet, continue accumulating
}
break;
case "complete":
try {
const outlinesData: { content: string }[] =
data.presentation.outlines.slides;
dispatch(setOutlines(outlinesData));
setIsStreaming(false);
setIsLoading(false);
setActiveSlideIndex(null);
setHighestActiveIndex(-1);
prevSlidesRef.current = outlinesData;
activeIndexRef.current = -1;
highestIndexRef.current = -1;
isClosed = true;
closeEventSource();
clearRetryTimer();
retryCount = 0;
} catch (error) {
if (!scheduleRetry("failed to parse complete payload")) {
resetStreamingState();
toast.error("Failed to parse presentation data");
}
}
accumulatedChunks = "";
break;
case "closing":
setIsStreaming(false);
setIsLoading(false);
setActiveSlideIndex(null);
setHighestActiveIndex(-1);
activeIndexRef.current = -1;
highestIndexRef.current = -1;
isClosed = true;
closeEventSource();
clearRetryTimer();
retryCount = 0;
break;
case "error":
if (!scheduleRetry(data.detail || "server returned stream error")) {
resetStreamingState();
closeEventSource();
toast.error("Error in outline streaming", {
description:
data.detail ||
"Failed to connect to the server. Please try again.",
});
}
break;
}
});
eventSource.onerror = () => {
if (!scheduleRetry("connection lost")) {
resetStreamingState();
closeEventSource();
toast.error("Failed to connect to the server. Please try again.");
}
};
};
setIsStreaming(true);
setIsLoading(true);
openStream();
return () => {
isClosed = true;
closeEventSource();
clearRetryTimer();
};
}, [presentationId, dispatch]);
return { isStreaming, isLoading, activeSlideIndex, highestActiveIndex };

View file

@ -10,6 +10,9 @@ import { toast } from "sonner";
import { MixpanelEvent, trackEvent } from "@/utils/mixpanel";
import { getFastAPIUrl, resolveBackendAssetUrl } from "@/utils/api";
const MAX_STREAM_RETRIES = 3;
const STREAM_RETRY_DELAY_MS = 1_000;
const normalizePresentationAssets = <T,>(input: T): T => {
if (Array.isArray(input)) {
return input.map((item) => normalizePresentationAssets(item)) as T;
@ -41,21 +44,81 @@ export const usePresentationStreaming = (
const previousSlidesLength = useRef(0);
useEffect(() => {
let eventSource: EventSource;
if (!stream) {
fetchUserSlides();
return;
}
let eventSource: EventSource | null = null;
let accumulatedChunks = "";
let retryCount = 0;
let isClosed = false;
let retryTimer: ReturnType<typeof setTimeout> | null = null;
const initializeStream = async () => {
dispatch(setStreaming(true));
dispatch(clearPresentationData());
const closeEventSource = () => {
if (eventSource) {
eventSource.close();
eventSource = null;
}
};
trackEvent(MixpanelEvent.Presentation_Stream_API_Call);
const clearRetryTimer = () => {
if (retryTimer) {
clearTimeout(retryTimer);
retryTimer = null;
}
};
const finalizeFailure = (description: string) => {
closeEventSource();
clearRetryTimer();
setLoading(false);
dispatch(setStreaming(false));
setError(true);
toast.error("Presentation streaming failed", { description });
};
const scheduleRetry = (reason: string): boolean => {
if (retryCount >= MAX_STREAM_RETRIES || isClosed) {
return false;
}
retryCount += 1;
const retryDelay = STREAM_RETRY_DELAY_MS * retryCount;
console.warn(
`Presentation stream retry ${retryCount}/${MAX_STREAM_RETRIES}: ${reason}`
);
closeEventSource();
clearRetryTimer();
accumulatedChunks = "";
previousSlidesLength.current = 0;
retryTimer = setTimeout(() => {
if (!isClosed) {
openStream();
}
}, retryDelay);
return true;
};
const openStream = () => {
closeEventSource();
eventSource = new EventSource(
`${getFastAPIUrl()}/api/v1/ppt/presentation/stream/${presentationId}`
);
eventSource.addEventListener("response", (event) => {
const data = JSON.parse(event.data);
let data: any;
try {
data = JSON.parse(event.data);
} catch {
if (!scheduleRetry("invalid SSE payload")) {
finalizeFailure("Failed to parse stream response.");
}
return;
}
switch (data.type) {
case "chunk":
@ -90,15 +153,19 @@ export const usePresentationStreaming = (
dispatch(setPresentationData(normalizePresentationAssets(data.presentation)));
dispatch(setStreaming(false));
setLoading(false);
eventSource.close();
isClosed = true;
closeEventSource();
clearRetryTimer();
retryCount = 0;
// Remove stream parameter from URL
const newUrl = new URL(window.location.href);
newUrl.searchParams.delete("stream");
window.history.replaceState({}, "", newUrl.toString());
} catch (error) {
eventSource.close();
console.error("Error parsing accumulated chunks:", error);
if (!scheduleRetry("failed to parse complete payload")) {
finalizeFailure("Failed to parse final presentation payload.");
}
}
accumulatedChunks = "";
break;
@ -107,7 +174,10 @@ export const usePresentationStreaming = (
dispatch(setPresentationData(normalizePresentationAssets(data.presentation)));
setLoading(false);
dispatch(setStreaming(false));
eventSource.close();
isClosed = true;
closeEventSource();
clearRetryTimer();
retryCount = 0;
// Remove stream parameter from URL
const newUrl = new URL(window.location.href);
@ -115,38 +185,37 @@ export const usePresentationStreaming = (
window.history.replaceState({}, "", newUrl.toString());
break;
case "error":
eventSource.close();
toast.error("Error in outline streaming", {
description:
if (
!scheduleRetry(
data.detail || "server returned stream error response"
)
) {
finalizeFailure(
data.detail ||
"Failed to connect to the server. Please try again.",
});
setLoading(false);
dispatch(setStreaming(false));
setError(true);
"Failed to connect to the server. Please try again."
);
}
break;
}
});
eventSource.onerror = (error) => {
console.error("EventSource failed:", error);
setLoading(false);
dispatch(setStreaming(false));
setError(true);
eventSource.close();
if (!scheduleRetry("connection lost")) {
finalizeFailure("Failed to connect to the server. Please try again.");
}
};
};
if (stream) {
initializeStream();
} else {
fetchUserSlides();
}
dispatch(setStreaming(true));
dispatch(clearPresentationData());
trackEvent(MixpanelEvent.Presentation_Stream_API_Call);
openStream();
return () => {
if (eventSource) {
eventSource.close();
}
isClosed = true;
closeEventSource();
clearRetryTimer();
};
}, [presentationId, stream, dispatch, setLoading, setError, fetchUserSlides]);
};