presenton/electron/servers/fastapi/models/sse_response.py
2026-02-20 12:02:23 +05:45

40 lines
878 B
Python

import json
from pydantic import BaseModel
class SSEResponse(BaseModel):
event: str
data: str
def to_string(self):
return f"event: {self.event}\ndata: {self.data}\n\n"
class SSEStatusResponse(BaseModel):
status: str
def to_string(self):
return SSEResponse(
event="response", data=json.dumps({"type": "status", "status": self.status})
).to_string()
class SSEErrorResponse(BaseModel):
detail: str
def to_string(self):
return SSEResponse(
event="response", data=json.dumps({"type": "error", "detail": self.detail})
).to_string()
class SSECompleteResponse(BaseModel):
key: str
value: object
def to_string(self):
return SSEResponse(
event="response",
data=json.dumps({"type": "complete", self.key: self.value}),
).to_string()