From d2c1eabc8b18dcd58c21ecd54d774ea4c0c9f3ce Mon Sep 17 00:00:00 2001 From: Nevo David Date: Tue, 12 May 2026 23:57:13 +0700 Subject: [PATCH] feat: fix workflow after sleep --- .../src/activities/post.activity.ts | 24 +- apps/orchestrator/src/workflows/index.ts | 1 + .../post-workflows/post.workflow.v1.0.5.ts | 438 ++++++++++++++++++ .../database/prisma/posts/posts.service.ts | 15 +- 4 files changed, 469 insertions(+), 9 deletions(-) create mode 100644 apps/orchestrator/src/workflows/post-workflows/post.workflow.v1.0.5.ts diff --git a/apps/orchestrator/src/activities/post.activity.ts b/apps/orchestrator/src/activities/post.activity.ts index fce1c74a..e2a02ce8 100644 --- a/apps/orchestrator/src/activities/post.activity.ts +++ b/apps/orchestrator/src/activities/post.activity.ts @@ -76,7 +76,7 @@ export class PostActivity { for (const post of list) { await this._temporalService.client .getRawClient() - .workflow.signalWithStart('postWorkflowV104', { + .workflow.signalWithStart('postWorkflowV105', { workflowId: `post_${post.id}`, taskQueue: 'main', signal: 'poke', @@ -110,10 +110,25 @@ export class PostActivity { await this._postService.updatePost(id, postId, releaseURL); } + @ActivityMethod() + async getPost(orgId: string, postId: string) { + if (process.env.STRIPE_SECRET_KEY) { + const subscription = await this._subscriptionService.getSubscription( + orgId + ); + if (!subscription) { + return false; + } + } + return this._postService.getPostById(postId, orgId); + } + @ActivityMethod() async getPostsList(orgId: string, postId: string) { if (process.env.STRIPE_SECRET_KEY) { - const subscription = await this._subscriptionService.getSubscription(orgId); + const subscription = await this._subscriptionService.getSubscription( + orgId + ); if (!subscription) { return []; } @@ -392,10 +407,7 @@ export class PostActivity { return refresh; } catch (err) { - await this._refreshIntegrationService.setBetweenSteps( - integration, - cause - ); + await this._refreshIntegrationService.setBetweenSteps(integration, cause); return false; } } diff --git a/apps/orchestrator/src/workflows/index.ts b/apps/orchestrator/src/workflows/index.ts index b4da83e8..ad6e8c45 100644 --- a/apps/orchestrator/src/workflows/index.ts +++ b/apps/orchestrator/src/workflows/index.ts @@ -2,6 +2,7 @@ export * from './post-workflows/post.workflow.v1.0.1'; export * from './post-workflows/post.workflow.v1.0.2'; export * from './post-workflows/post.workflow.v1.0.3'; export * from './post-workflows/post.workflow.v1.0.4'; +export * from './post-workflows/post.workflow.v1.0.5'; export * from './autopost.workflow'; export * from './digest.email.workflow'; export * from './missing.post.workflow'; diff --git a/apps/orchestrator/src/workflows/post-workflows/post.workflow.v1.0.5.ts b/apps/orchestrator/src/workflows/post-workflows/post.workflow.v1.0.5.ts new file mode 100644 index 00000000..21b7c05b --- /dev/null +++ b/apps/orchestrator/src/workflows/post-workflows/post.workflow.v1.0.5.ts @@ -0,0 +1,438 @@ +import { PostActivity } from '@gitroom/orchestrator/activities/post.activity'; +import { + ActivityFailure, + ApplicationFailure, + startChild, + proxyActivities, + sleep, + defineSignal, + setHandler, +} from '@temporalio/workflow'; +import dayjs from 'dayjs'; +import { Integration } from '@prisma/client'; +import { capitalize, sortBy } from 'lodash'; +import { PostResponse } from '@gitroom/nestjs-libraries/integrations/social/social.integrations.interface'; +import { makeId } from '@gitroom/nestjs-libraries/services/make.is'; +import { TypedSearchAttributes } from '@temporalio/common'; +import { postId as postIdSearchParam } from '@gitroom/nestjs-libraries/temporal/temporal.search.attribute'; + +const proxyTaskQueue = (taskQueue: string) => { + return proxyActivities({ + startToCloseTimeout: '10 minute', + taskQueue, + retry: { + maximumAttempts: 3, + backoffCoefficient: 1, + initialInterval: '2 minutes', + }, + }); +}; + +const { + getPostsList, + getPost, + inAppNotification, + changeState, + updatePost, + sendWebhooks, + isCommentable, +} = proxyActivities({ + startToCloseTimeout: '10 minute', + retry: { + maximumAttempts: 3, + backoffCoefficient: 1, + initialInterval: '2 minutes', + }, +}); + +const poke = defineSignal('poke'); + +const iterate = Array.from({ length: 5 }); + +export async function postWorkflowV105({ + taskQueue, + postId, + organizationId, + postNow = false, +}: { + taskQueue: string; + postId: string; + organizationId: string; + postNow?: boolean; +}) { + // Dynamic task queue, for concurrency + const { + postSocial, + postComment, + getIntegrationById, + refreshTokenWithCause, + internalPlugs, + globalPlugs, + processInternalPlug, + processPlug, + } = proxyTaskQueue(taskQueue); + + let poked = false; + setHandler(poke, () => { + poked = true; + }); + + const startTime = new Date(); + // get all the posts and comments to post + const firstPost = await getPost(organizationId, postId); + + // in case doesn't exists for some reason, fail it + if (!firstPost) { + await changeState(postId, 'ERROR', 'No Post'); + return; + } + + if (!postNow && firstPost.state !== 'QUEUE') { + await changeState(firstPost.id, 'ERROR', 'Already posted', [firstPost]); + return; + } + + // if it's a repeatable post, we should ignore this. + if (!postNow) { + await sleep( + dayjs(firstPost.publishDate).isBefore(dayjs()) + ? 0 + : dayjs(firstPost.publishDate).diff(dayjs(), 'millisecond') + ); + } + + const postsListBefore = await getPostsList(organizationId, postId); + const [post] = postsListBefore; + + if (!post) { + await changeState(postId, 'ERROR', 'No Post'); + return; + } + + // if refresh is needed from last time, let's inform the user + if (post.integration?.refreshNeeded) { + await inAppNotification( + post.organizationId, + `We couldn't post to ${post.integration?.providerIdentifier} for ${post?.integration?.name}`, + `We couldn't post to ${post.integration?.providerIdentifier} for ${post?.integration?.name} because you need to reconnect it. Please enable it and try again.`, + true, + false, + 'info' + ); + + await changeState( + postsListBefore[0].id, + 'ERROR', + 'Refresh channel needed', + postsListBefore + ); + return; + } + + // if it's disabled, inform the user + if (post.integration?.disabled) { + await inAppNotification( + post.organizationId, + `We couldn't post to ${post.integration?.providerIdentifier} for ${post?.integration?.name}`, + `We couldn't post to ${post.integration?.providerIdentifier} for ${post?.integration?.name} because it's disabled. Please enable it and try again.`, + true, + false, + 'info' + ); + + await changeState( + postsListBefore[0].id, + 'ERROR', + 'Channel disabled', + postsListBefore + ); + return; + } + + // Do we need to post comment for this social? + const toComment: boolean = + postsListBefore.length === 1 + ? false + : await isCommentable(post.integration); + + const postsList = toComment ? postsListBefore : [postsListBefore[0]]; + + // list of all the saved results + const postsResults: PostResponse[] = []; + + // iterate over the posts + for (let i = 0; i < postsList.length; i++) { + const before = postsResults.length; + // this is a small trick to repeat an action in case of token refresh + for (const _ of iterate) { + try { + // first post the main post + if (i === 0) { + postsResults.push( + ...(await postSocial(post.integration as Integration, [ + postsList[i], + ])) + ); + + // then post the comments if any + } else { + if (postsList[i].delay) { + await sleep(60000 * Math.max(0, Number(postsList[i].delay ?? 0))); + } + + postsResults.push( + ...(await postComment( + postsResults[0].postId, + postsResults.length === 1 + ? undefined + : postsResults[i - 1].postId, + post.integration, + [postsList[i]] + )) + ); + } + + // mark post as successful + await updatePost( + postsList[i].id, + postsResults[i].postId, + postsResults[i].releaseURL + ); + + if (i === 0) { + // send notification on a sucessful post + await inAppNotification( + post.integration.organizationId, + `Your post has been published on ${capitalize( + post.integration.providerIdentifier + )}`, + `Your post has been published on ${capitalize( + post.integration.providerIdentifier + )} at ${postsResults[0].releaseURL}`, + true, + true + ); + } + + // break the current while to move to the next post + break; + } catch (err) { + // if token refresh is needed, do it and repeat + if ( + err instanceof ActivityFailure && + err.cause instanceof ApplicationFailure && + err.cause.type === 'refresh_token' + ) { + const refresh = await refreshTokenWithCause( + post.integration, + err?.cause?.message || '' + ); + if (!refresh || !refresh.accessToken) { + await changeState(postsList[0].id, 'ERROR', err, postsList); + return false; + } + + post.integration.token = refresh.accessToken; + continue; + } + + // for other errors, change state and inform the user if needed + await changeState(postsList[0].id, 'ERROR', err, postsList); + + // specific case for bad body errors + if ( + err instanceof ActivityFailure && + err.cause instanceof ApplicationFailure && + err.cause.type === 'bad_body' + ) { + await inAppNotification( + post.organizationId, + `Error posting${i === 0 ? ' ' : ' comments '}on ${ + post.integration?.providerIdentifier + } for ${post?.integration?.name}`, + `An error occurred while posting${i === 0 ? ' ' : ' comments '}on ${ + post.integration?.providerIdentifier + }${err?.cause?.message ? `: ${err?.cause?.message}` : ``}`, + true, + false, + 'fail' + ); + return false; + } + } + } + + if (postsResults.length === before) { + // all retries exhausted without success + return false; + } + } + + // send webhooks for the post + await sendWebhooks( + postsResults[0].postId, + post.organizationId, + post.integration.id + ); + + // load internal plugs like repost by other users + const internalPlugsList = await internalPlugs( + post.integration, + JSON.parse(post.settings) + ); + + // load global plugs, like repost a post if it gets to a certain number of likes + const globalPlugsList = (await globalPlugs(post.integration)).reduce( + (all, current) => { + for (let i = 1; i <= current.totalRuns; i++) { + all.push({ + ...current, + delay: current.delay * i, + }); + } + + return all; + }, + [] + ); + + // Check if the post is repeatable + const repeatPost = !post.intervalInDays + ? [] + : [ + { + type: 'repeat-post', + delay: + post.intervalInDays * 24 * 60 * 60 * 1000 - + (new Date().getTime() - startTime.getTime()), + }, + ]; + + // Sort all the actions by delay, so we can process them in order + const list = sortBy( + [...internalPlugsList, ...globalPlugsList, ...repeatPost], + 'delay' + ); + + // process all the plugs in order, we are using while because in some cases we need to remove items from the list + while (list.length > 0) { + // get the next to process + const todo = list.shift(); + + // wait for the delay + await sleep(Math.max(0, Number(todo.delay ?? 0))); + + // process internal plug + if (todo.type === 'internal-plug') { + for (const _ of iterate) { + try { + await processInternalPlug({ ...todo, post: postsResults[0].postId }); + } catch (err) { + if ( + err instanceof ActivityFailure && + err.cause instanceof ApplicationFailure && + err.cause.type === 'refresh_token' + ) { + const refresh = await refreshTokenWithCause( + await getIntegrationById(organizationId, todo.integration), + err?.cause?.message || '' + ); + if (!refresh || !refresh.accessToken) { + break; + } + + continue; + } + + if ( + err instanceof ActivityFailure && + err.cause instanceof ApplicationFailure && + err.cause.type === 'bad_body' + ) { + break; + } + + continue; + } + break; + } + } + + // process global plug + if (todo.type === 'global') { + for (const _ of iterate) { + try { + const process = await processPlug({ + ...todo, + postId: postsResults[0].postId, + }); + if (process) { + const toDelete = list + .reduce((all, current, index) => { + if (current.plugId === todo.plugId) { + all.push(index); + } + + return all; + }, []) + .reverse(); + + for (const index of toDelete) { + list.splice(index, 1); + } + } + } catch (err) { + if ( + err instanceof ActivityFailure && + err.cause instanceof ApplicationFailure && + err.cause.type === 'refresh_token' + ) { + const refresh = await refreshTokenWithCause( + post.integration, + err?.cause?.message || '' + ); + if (!refresh || !refresh.accessToken) { + break; + } + + continue; + } + + if ( + err instanceof ActivityFailure && + err.cause instanceof ApplicationFailure && + err.cause.type === 'bad_body' + ) { + break; + } + + continue; + } + + break; + } + } + + // process repeat post in a new workflow, this is important so the other plugs can keep running + if (todo.type === 'repeat-post') { + await startChild(postWorkflowV105, { + parentClosePolicy: 'ABANDON', + args: [ + { + taskQueue, + postId, + organizationId, + postNow: true, + }, + ], + workflowId: `post_${post.id}_${makeId(10)}`, + typedSearchAttributes: new TypedSearchAttributes([ + { + key: postIdSearchParam, + value: postId, + }, + ]), + }); + } + } +} diff --git a/libraries/nestjs-libraries/src/database/prisma/posts/posts.service.ts b/libraries/nestjs-libraries/src/database/prisma/posts/posts.service.ts index 41caa601..7b6b8285 100644 --- a/libraries/nestjs-libraries/src/database/prisma/posts/posts.service.ts +++ b/libraries/nestjs-libraries/src/database/prisma/posts/posts.service.ts @@ -18,7 +18,10 @@ import utc from 'dayjs/plugin/utc'; import { MediaService } from '@gitroom/nestjs-libraries/database/prisma/media/media.service'; import { ShortLinkService } from '@gitroom/nestjs-libraries/short-linking/short.link.service'; import { CreateTagDto } from '@gitroom/nestjs-libraries/dtos/posts/create.tag.dto'; -import { minifyPostsList, minifyPosts } from '@gitroom/helpers/utils/posts.list.minify'; +import { + minifyPostsList, + minifyPosts, +} from '@gitroom/helpers/utils/posts.list.minify'; import axios from 'axios'; import sharp from 'sharp'; import { UploadFactory } from '@gitroom/nestjs-libraries/upload/upload.factory'; @@ -126,6 +129,10 @@ export class PostsService { return []; } + async getPostById(postId: string, orgId: string) { + return this._postRepository.getPostById(postId, orgId); + } + async updateReleaseId(orgId: string, postId: string, releaseId: string) { return this._postRepository.updateReleaseId(postId, orgId, releaseId); } @@ -707,7 +714,7 @@ export class PostsService { try { await this._temporalService.client .getRawClient() - ?.workflow.start('postWorkflowV104', { + ?.workflow.start('postWorkflowV105', { workflowId: `post_${postId}`, taskQueue: 'main', workflowIdConflictPolicy: 'TERMINATE_EXISTING', @@ -841,7 +848,9 @@ export class PostsService { if (action === 'schedule') { try { await this.startWorkflow( - getPostById.integration.providerIdentifier.split('-')[0].toLowerCase(), + getPostById.integration.providerIdentifier + .split('-')[0] + .toLowerCase(), getPostById.id, orgId, getPostById.state === 'DRAFT' ? 'DRAFT' : 'QUEUE'