feat: fix workflow after sleep

This commit is contained in:
Nevo David 2026-05-12 23:57:13 +07:00
parent 4ee5231cb2
commit d2c1eabc8b
4 changed files with 469 additions and 9 deletions

View file

@ -76,7 +76,7 @@ export class PostActivity {
for (const post of list) {
await this._temporalService.client
.getRawClient()
.workflow.signalWithStart('postWorkflowV104', {
.workflow.signalWithStart('postWorkflowV105', {
workflowId: `post_${post.id}`,
taskQueue: 'main',
signal: 'poke',
@ -110,10 +110,25 @@ export class PostActivity {
await this._postService.updatePost(id, postId, releaseURL);
}
@ActivityMethod()
async getPost(orgId: string, postId: string) {
if (process.env.STRIPE_SECRET_KEY) {
const subscription = await this._subscriptionService.getSubscription(
orgId
);
if (!subscription) {
return false;
}
}
return this._postService.getPostById(postId, orgId);
}
@ActivityMethod()
async getPostsList(orgId: string, postId: string) {
if (process.env.STRIPE_SECRET_KEY) {
const subscription = await this._subscriptionService.getSubscription(orgId);
const subscription = await this._subscriptionService.getSubscription(
orgId
);
if (!subscription) {
return [];
}
@ -392,10 +407,7 @@ export class PostActivity {
return refresh;
} catch (err) {
await this._refreshIntegrationService.setBetweenSteps(
integration,
cause
);
await this._refreshIntegrationService.setBetweenSteps(integration, cause);
return false;
}
}

View file

@ -2,6 +2,7 @@ export * from './post-workflows/post.workflow.v1.0.1';
export * from './post-workflows/post.workflow.v1.0.2';
export * from './post-workflows/post.workflow.v1.0.3';
export * from './post-workflows/post.workflow.v1.0.4';
export * from './post-workflows/post.workflow.v1.0.5';
export * from './autopost.workflow';
export * from './digest.email.workflow';
export * from './missing.post.workflow';

View file

@ -0,0 +1,438 @@
import { PostActivity } from '@gitroom/orchestrator/activities/post.activity';
import {
ActivityFailure,
ApplicationFailure,
startChild,
proxyActivities,
sleep,
defineSignal,
setHandler,
} from '@temporalio/workflow';
import dayjs from 'dayjs';
import { Integration } from '@prisma/client';
import { capitalize, sortBy } from 'lodash';
import { PostResponse } from '@gitroom/nestjs-libraries/integrations/social/social.integrations.interface';
import { makeId } from '@gitroom/nestjs-libraries/services/make.is';
import { TypedSearchAttributes } from '@temporalio/common';
import { postId as postIdSearchParam } from '@gitroom/nestjs-libraries/temporal/temporal.search.attribute';
const proxyTaskQueue = (taskQueue: string) => {
return proxyActivities<PostActivity>({
startToCloseTimeout: '10 minute',
taskQueue,
retry: {
maximumAttempts: 3,
backoffCoefficient: 1,
initialInterval: '2 minutes',
},
});
};
const {
getPostsList,
getPost,
inAppNotification,
changeState,
updatePost,
sendWebhooks,
isCommentable,
} = proxyActivities<PostActivity>({
startToCloseTimeout: '10 minute',
retry: {
maximumAttempts: 3,
backoffCoefficient: 1,
initialInterval: '2 minutes',
},
});
const poke = defineSignal('poke');
const iterate = Array.from({ length: 5 });
export async function postWorkflowV105({
taskQueue,
postId,
organizationId,
postNow = false,
}: {
taskQueue: string;
postId: string;
organizationId: string;
postNow?: boolean;
}) {
// Dynamic task queue, for concurrency
const {
postSocial,
postComment,
getIntegrationById,
refreshTokenWithCause,
internalPlugs,
globalPlugs,
processInternalPlug,
processPlug,
} = proxyTaskQueue(taskQueue);
let poked = false;
setHandler(poke, () => {
poked = true;
});
const startTime = new Date();
// get all the posts and comments to post
const firstPost = await getPost(organizationId, postId);
// in case doesn't exists for some reason, fail it
if (!firstPost) {
await changeState(postId, 'ERROR', 'No Post');
return;
}
if (!postNow && firstPost.state !== 'QUEUE') {
await changeState(firstPost.id, 'ERROR', 'Already posted', [firstPost]);
return;
}
// if it's a repeatable post, we should ignore this.
if (!postNow) {
await sleep(
dayjs(firstPost.publishDate).isBefore(dayjs())
? 0
: dayjs(firstPost.publishDate).diff(dayjs(), 'millisecond')
);
}
const postsListBefore = await getPostsList(organizationId, postId);
const [post] = postsListBefore;
if (!post) {
await changeState(postId, 'ERROR', 'No Post');
return;
}
// if refresh is needed from last time, let's inform the user
if (post.integration?.refreshNeeded) {
await inAppNotification(
post.organizationId,
`We couldn't post to ${post.integration?.providerIdentifier} for ${post?.integration?.name}`,
`We couldn't post to ${post.integration?.providerIdentifier} for ${post?.integration?.name} because you need to reconnect it. Please enable it and try again.`,
true,
false,
'info'
);
await changeState(
postsListBefore[0].id,
'ERROR',
'Refresh channel needed',
postsListBefore
);
return;
}
// if it's disabled, inform the user
if (post.integration?.disabled) {
await inAppNotification(
post.organizationId,
`We couldn't post to ${post.integration?.providerIdentifier} for ${post?.integration?.name}`,
`We couldn't post to ${post.integration?.providerIdentifier} for ${post?.integration?.name} because it's disabled. Please enable it and try again.`,
true,
false,
'info'
);
await changeState(
postsListBefore[0].id,
'ERROR',
'Channel disabled',
postsListBefore
);
return;
}
// Do we need to post comment for this social?
const toComment: boolean =
postsListBefore.length === 1
? false
: await isCommentable(post.integration);
const postsList = toComment ? postsListBefore : [postsListBefore[0]];
// list of all the saved results
const postsResults: PostResponse[] = [];
// iterate over the posts
for (let i = 0; i < postsList.length; i++) {
const before = postsResults.length;
// this is a small trick to repeat an action in case of token refresh
for (const _ of iterate) {
try {
// first post the main post
if (i === 0) {
postsResults.push(
...(await postSocial(post.integration as Integration, [
postsList[i],
]))
);
// then post the comments if any
} else {
if (postsList[i].delay) {
await sleep(60000 * Math.max(0, Number(postsList[i].delay ?? 0)));
}
postsResults.push(
...(await postComment(
postsResults[0].postId,
postsResults.length === 1
? undefined
: postsResults[i - 1].postId,
post.integration,
[postsList[i]]
))
);
}
// mark post as successful
await updatePost(
postsList[i].id,
postsResults[i].postId,
postsResults[i].releaseURL
);
if (i === 0) {
// send notification on a sucessful post
await inAppNotification(
post.integration.organizationId,
`Your post has been published on ${capitalize(
post.integration.providerIdentifier
)}`,
`Your post has been published on ${capitalize(
post.integration.providerIdentifier
)} at ${postsResults[0].releaseURL}`,
true,
true
);
}
// break the current while to move to the next post
break;
} catch (err) {
// if token refresh is needed, do it and repeat
if (
err instanceof ActivityFailure &&
err.cause instanceof ApplicationFailure &&
err.cause.type === 'refresh_token'
) {
const refresh = await refreshTokenWithCause(
post.integration,
err?.cause?.message || ''
);
if (!refresh || !refresh.accessToken) {
await changeState(postsList[0].id, 'ERROR', err, postsList);
return false;
}
post.integration.token = refresh.accessToken;
continue;
}
// for other errors, change state and inform the user if needed
await changeState(postsList[0].id, 'ERROR', err, postsList);
// specific case for bad body errors
if (
err instanceof ActivityFailure &&
err.cause instanceof ApplicationFailure &&
err.cause.type === 'bad_body'
) {
await inAppNotification(
post.organizationId,
`Error posting${i === 0 ? ' ' : ' comments '}on ${
post.integration?.providerIdentifier
} for ${post?.integration?.name}`,
`An error occurred while posting${i === 0 ? ' ' : ' comments '}on ${
post.integration?.providerIdentifier
}${err?.cause?.message ? `: ${err?.cause?.message}` : ``}`,
true,
false,
'fail'
);
return false;
}
}
}
if (postsResults.length === before) {
// all retries exhausted without success
return false;
}
}
// send webhooks for the post
await sendWebhooks(
postsResults[0].postId,
post.organizationId,
post.integration.id
);
// load internal plugs like repost by other users
const internalPlugsList = await internalPlugs(
post.integration,
JSON.parse(post.settings)
);
// load global plugs, like repost a post if it gets to a certain number of likes
const globalPlugsList = (await globalPlugs(post.integration)).reduce(
(all, current) => {
for (let i = 1; i <= current.totalRuns; i++) {
all.push({
...current,
delay: current.delay * i,
});
}
return all;
},
[]
);
// Check if the post is repeatable
const repeatPost = !post.intervalInDays
? []
: [
{
type: 'repeat-post',
delay:
post.intervalInDays * 24 * 60 * 60 * 1000 -
(new Date().getTime() - startTime.getTime()),
},
];
// Sort all the actions by delay, so we can process them in order
const list = sortBy(
[...internalPlugsList, ...globalPlugsList, ...repeatPost],
'delay'
);
// process all the plugs in order, we are using while because in some cases we need to remove items from the list
while (list.length > 0) {
// get the next to process
const todo = list.shift();
// wait for the delay
await sleep(Math.max(0, Number(todo.delay ?? 0)));
// process internal plug
if (todo.type === 'internal-plug') {
for (const _ of iterate) {
try {
await processInternalPlug({ ...todo, post: postsResults[0].postId });
} catch (err) {
if (
err instanceof ActivityFailure &&
err.cause instanceof ApplicationFailure &&
err.cause.type === 'refresh_token'
) {
const refresh = await refreshTokenWithCause(
await getIntegrationById(organizationId, todo.integration),
err?.cause?.message || ''
);
if (!refresh || !refresh.accessToken) {
break;
}
continue;
}
if (
err instanceof ActivityFailure &&
err.cause instanceof ApplicationFailure &&
err.cause.type === 'bad_body'
) {
break;
}
continue;
}
break;
}
}
// process global plug
if (todo.type === 'global') {
for (const _ of iterate) {
try {
const process = await processPlug({
...todo,
postId: postsResults[0].postId,
});
if (process) {
const toDelete = list
.reduce((all, current, index) => {
if (current.plugId === todo.plugId) {
all.push(index);
}
return all;
}, [])
.reverse();
for (const index of toDelete) {
list.splice(index, 1);
}
}
} catch (err) {
if (
err instanceof ActivityFailure &&
err.cause instanceof ApplicationFailure &&
err.cause.type === 'refresh_token'
) {
const refresh = await refreshTokenWithCause(
post.integration,
err?.cause?.message || ''
);
if (!refresh || !refresh.accessToken) {
break;
}
continue;
}
if (
err instanceof ActivityFailure &&
err.cause instanceof ApplicationFailure &&
err.cause.type === 'bad_body'
) {
break;
}
continue;
}
break;
}
}
// process repeat post in a new workflow, this is important so the other plugs can keep running
if (todo.type === 'repeat-post') {
await startChild(postWorkflowV105, {
parentClosePolicy: 'ABANDON',
args: [
{
taskQueue,
postId,
organizationId,
postNow: true,
},
],
workflowId: `post_${post.id}_${makeId(10)}`,
typedSearchAttributes: new TypedSearchAttributes([
{
key: postIdSearchParam,
value: postId,
},
]),
});
}
}
}

View file

@ -18,7 +18,10 @@ import utc from 'dayjs/plugin/utc';
import { MediaService } from '@gitroom/nestjs-libraries/database/prisma/media/media.service';
import { ShortLinkService } from '@gitroom/nestjs-libraries/short-linking/short.link.service';
import { CreateTagDto } from '@gitroom/nestjs-libraries/dtos/posts/create.tag.dto';
import { minifyPostsList, minifyPosts } from '@gitroom/helpers/utils/posts.list.minify';
import {
minifyPostsList,
minifyPosts,
} from '@gitroom/helpers/utils/posts.list.minify';
import axios from 'axios';
import sharp from 'sharp';
import { UploadFactory } from '@gitroom/nestjs-libraries/upload/upload.factory';
@ -126,6 +129,10 @@ export class PostsService {
return [];
}
async getPostById(postId: string, orgId: string) {
return this._postRepository.getPostById(postId, orgId);
}
async updateReleaseId(orgId: string, postId: string, releaseId: string) {
return this._postRepository.updateReleaseId(postId, orgId, releaseId);
}
@ -707,7 +714,7 @@ export class PostsService {
try {
await this._temporalService.client
.getRawClient()
?.workflow.start('postWorkflowV104', {
?.workflow.start('postWorkflowV105', {
workflowId: `post_${postId}`,
taskQueue: 'main',
workflowIdConflictPolicy: 'TERMINATE_EXISTING',
@ -841,7 +848,9 @@ export class PostsService {
if (action === 'schedule') {
try {
await this.startWorkflow(
getPostById.integration.providerIdentifier.split('-')[0].toLowerCase(),
getPostById.integration.providerIdentifier
.split('-')[0]
.toLowerCase(),
getPostById.id,
orgId,
getPostById.state === 'DRAFT' ? 'DRAFT' : 'QUEUE'