diff --git a/apps/orchestrator/src/activities/post.activity.ts b/apps/orchestrator/src/activities/post.activity.ts index ffd74981..47ec9589 100644 --- a/apps/orchestrator/src/activities/post.activity.ts +++ b/apps/orchestrator/src/activities/post.activity.ts @@ -4,6 +4,7 @@ import { ActivityMethod, TemporalService, } from 'nestjs-temporal-core'; +import * as Sentry from '@sentry/nestjs'; import { PostsService } from '@gitroom/nestjs-libraries/database/prisma/posts/posts.service'; import { NotificationService, @@ -76,7 +77,7 @@ export class PostActivity { for (const post of list) { await this._temporalService.client .getRawClient() - .workflow.signalWithStart('postWorkflowV105', { + .workflow.signalWithStart('postWorkflowV106', { workflowId: `post_${post.id}`, taskQueue: 'main', signal: 'poke', @@ -413,4 +414,227 @@ export class PostActivity { return false; } } + + @ActivityMethod() + async postSocialV106(integration: Integration, posts: Post[]) { + const ctx = { + provider: integration.providerIdentifier, + integrationId: integration.id, + organizationId: integration.organizationId, + postIds: posts.map((p) => p.id), + }; + + try { + if (process.env.STRIPE_SECRET_KEY) { + const subscription = await this._subscriptionService.getSubscription( + integration.organizationId + ); + + if (!subscription) { + throw new Error( + 'No active subscription found for this organization.' + ); + } + } + + const getIntegration = this._integrationManager.getSocialIntegration( + integration.providerIdentifier + ); + + const newPosts = await this._postService.updateTags( + integration.organizationId, + posts + ); + + Sentry.logger.info('Posting to social channel', ctx); + + const postNow = await getIntegration.post( + integration.internalId, + integration.token, + await Promise.all( + (newPosts || []).map(async (p) => ({ + id: p.id, + message: stripHtmlValidation( + getIntegration.editor, + p.content, + true, + false, + !/<\/?[a-z][\s\S]*>/i.test(p.content), + getIntegration.mentionFormat + ), + settings: JSON.parse(p.settings || '{}'), + media: await this._postService.updateMedia( + p.id, + JSON.parse(p.image || '[]'), + getIntegration?.convertToJPEG || false + ), + })) + ), + integration + ); + + Sentry.logger.info('Posted to social channel successfully', { + ...ctx, + published: postNow.length, + }); + + await this._temporalService.client + .getRawClient() + .workflow.start('streakWorkflow', { + args: [{ organizationId: integration.organizationId }], + workflowId: `streak_${integration.organizationId}`, + taskQueue: 'main', + workflowIdConflictPolicy: 'TERMINATE_EXISTING', + typedSearchAttributes: new TypedSearchAttributes([ + { + key: organizationId, + value: integration.organizationId, + }, + ]), + }); + + return postNow; + } catch (err) { + Sentry.logger.error('Failed to post to social channel', { + ...ctx, + error: err instanceof Error ? err.message : String(err), + errorType: err instanceof Error ? err.constructor.name : typeof err, + }); + Sentry.captureException(err, { + tags: { + provider: integration.providerIdentifier, + activity: 'postSocial', + }, + contexts: { post: ctx }, + }); + throw err; + } + } + + @ActivityMethod() + async postCommentV106( + postId: string, + lastPostId: string | undefined, + integration: Integration, + posts: Post[] + ) { + const ctx = { + provider: integration.providerIdentifier, + integrationId: integration.id, + organizationId: integration.organizationId, + parentPostId: postId, + postIds: posts.map((p) => p.id), + }; + + try { + const getIntegration = this._integrationManager.getSocialIntegration( + integration.providerIdentifier + ); + + const newPosts = await this._postService.updateTags( + integration.organizationId, + posts + ); + + Sentry.logger.info('Posting comment to social channel', ctx); + + const result = await getIntegration.comment( + integration.internalId, + postId, + lastPostId, + integration.token, + await Promise.all( + (newPosts || []).map(async (p) => ({ + id: p.id, + message: stripHtmlValidation( + getIntegration.editor, + p.content, + true, + false, + !/<\/?[a-z][\s\S]*>/i.test(p.content), + getIntegration.mentionFormat + ), + settings: JSON.parse(p.settings || '{}'), + media: await this._postService.updateMedia( + p.id, + JSON.parse(p.image || '[]'), + getIntegration?.convertToJPEG || false + ), + })) + ), + integration + ); + + Sentry.logger.info('Posted comment successfully', ctx); + + return result; + } catch (err) { + Sentry.logger.error('Failed to post comment to social channel', { + ...ctx, + error: err instanceof Error ? err.message : String(err), + errorType: err instanceof Error ? err.constructor.name : typeof err, + }); + Sentry.captureException(err, { + tags: { + provider: integration.providerIdentifier, + activity: 'postComment', + }, + contexts: { post: ctx }, + }); + throw err; + } + } + + @ActivityMethod() + async refreshTokenWithCauseV106( + integration: Integration, + cause: string + ): Promise { + const getIntegration = this._integrationManager.getSocialIntegration( + integration.providerIdentifier + ); + + const ctx = { + provider: integration.providerIdentifier, + integrationId: integration.id, + organizationId: integration.organizationId, + cause, + }; + + try { + Sentry.logger.info('Refreshing integration token', ctx); + + const refresh = await this._refreshIntegrationService.refresh( + integration, + cause + ); + if (!refresh) { + Sentry.logger.warn('Token refresh returned no credentials', ctx); + return false; + } + + if (getIntegration.refreshWait) { + await timer(10000); + } + + Sentry.logger.info('Refreshed integration token successfully', ctx); + + return refresh; + } catch (err) { + Sentry.logger.error('Failed to refresh integration token', { + ...ctx, + error: err instanceof Error ? err.message : String(err), + errorType: err instanceof Error ? err.constructor.name : typeof err, + }); + Sentry.captureException(err, { + tags: { + provider: integration.providerIdentifier, + activity: 'refreshTokenWithCause', + }, + contexts: { integration: ctx }, + }); + await this._refreshIntegrationService.setBetweenSteps(integration, cause); + return false; + } + } } diff --git a/apps/orchestrator/src/workflows/index.ts b/apps/orchestrator/src/workflows/index.ts index ad6e8c45..8b9edc94 100644 --- a/apps/orchestrator/src/workflows/index.ts +++ b/apps/orchestrator/src/workflows/index.ts @@ -3,6 +3,7 @@ export * from './post-workflows/post.workflow.v1.0.2'; export * from './post-workflows/post.workflow.v1.0.3'; export * from './post-workflows/post.workflow.v1.0.4'; export * from './post-workflows/post.workflow.v1.0.5'; +export * from './post-workflows/post.workflow.v1.0.6'; export * from './autopost.workflow'; export * from './digest.email.workflow'; export * from './missing.post.workflow'; diff --git a/apps/orchestrator/src/workflows/post-workflows/post.workflow.v1.0.6.ts b/apps/orchestrator/src/workflows/post-workflows/post.workflow.v1.0.6.ts new file mode 100644 index 00000000..da746ffc --- /dev/null +++ b/apps/orchestrator/src/workflows/post-workflows/post.workflow.v1.0.6.ts @@ -0,0 +1,438 @@ +import { PostActivity } from '@gitroom/orchestrator/activities/post.activity'; +import { + ActivityFailure, + ApplicationFailure, + startChild, + proxyActivities, + sleep, + defineSignal, + setHandler, +} from '@temporalio/workflow'; +import dayjs from 'dayjs'; +import { Integration } from '@prisma/client'; +import { capitalize, sortBy } from 'lodash'; +import { PostResponse } from '@gitroom/nestjs-libraries/integrations/social/social.integrations.interface'; +import { makeId } from '@gitroom/nestjs-libraries/services/make.is'; +import { TypedSearchAttributes } from '@temporalio/common'; +import { postId as postIdSearchParam } from '@gitroom/nestjs-libraries/temporal/temporal.search.attribute'; + +const proxyTaskQueue = (taskQueue: string) => { + return proxyActivities({ + startToCloseTimeout: '10 minute', + taskQueue, + retry: { + maximumAttempts: 3, + backoffCoefficient: 1, + initialInterval: '2 minutes', + }, + }); +}; + +const { + getPostsList, + getPost, + inAppNotification, + changeState, + updatePost, + sendWebhooks, + isCommentable, +} = proxyActivities({ + startToCloseTimeout: '10 minute', + retry: { + maximumAttempts: 3, + backoffCoefficient: 1, + initialInterval: '2 minutes', + }, +}); + +const poke = defineSignal('poke'); + +const iterate = Array.from({ length: 5 }); + +export async function postWorkflowV106({ + taskQueue, + postId, + organizationId, + postNow = false, +}: { + taskQueue: string; + postId: string; + organizationId: string; + postNow?: boolean; +}) { + // Dynamic task queue, for concurrency + const { + postSocialV106, + postCommentV106, + getIntegrationById, + refreshTokenWithCauseV106, + internalPlugs, + globalPlugs, + processInternalPlug, + processPlug, + } = proxyTaskQueue(taskQueue); + + let poked = false; + setHandler(poke, () => { + poked = true; + }); + + const startTime = new Date(); + // get all the posts and comments to post + const firstPost = await getPost(organizationId, postId); + + // in case doesn't exists for some reason, fail it + if (!firstPost) { + await changeState(postId, 'ERROR', 'No Post'); + return; + } + + if (!postNow && firstPost.state !== 'QUEUE') { + await changeState(firstPost.id, 'ERROR', 'Already posted', [firstPost]); + return; + } + + // if it's a repeatable post, we should ignore this. + if (!postNow) { + await sleep( + dayjs(firstPost.publishDate).isBefore(dayjs()) + ? 0 + : dayjs(firstPost.publishDate).diff(dayjs(), 'millisecond') + ); + } + + const postsListBefore = await getPostsList(organizationId, postId); + const [post] = postsListBefore; + + if (!post) { + await changeState(postId, 'ERROR', 'No Post'); + return; + } + + // if refresh is needed from last time, let's inform the user + if (post.integration?.refreshNeeded) { + await inAppNotification( + post.organizationId, + `We couldn't post to ${post.integration?.providerIdentifier} for ${post?.integration?.name}`, + `We couldn't post to ${post.integration?.providerIdentifier} for ${post?.integration?.name} because you need to reconnect it. Please enable it and try again.`, + true, + false, + 'info' + ); + + await changeState( + postsListBefore[0].id, + 'ERROR', + 'Refresh channel needed', + postsListBefore + ); + return; + } + + // if it's disabled, inform the user + if (post.integration?.disabled) { + await inAppNotification( + post.organizationId, + `We couldn't post to ${post.integration?.providerIdentifier} for ${post?.integration?.name}`, + `We couldn't post to ${post.integration?.providerIdentifier} for ${post?.integration?.name} because it's disabled. Please enable it and try again.`, + true, + false, + 'info' + ); + + await changeState( + postsListBefore[0].id, + 'ERROR', + 'Channel disabled', + postsListBefore + ); + return; + } + + // Do we need to post comment for this social? + const toComment: boolean = + postsListBefore.length === 1 + ? false + : await isCommentable(post.integration); + + const postsList = toComment ? postsListBefore : [postsListBefore[0]]; + + // list of all the saved results + const postsResults: PostResponse[] = []; + + // iterate over the posts + for (let i = 0; i < postsList.length; i++) { + const before = postsResults.length; + // this is a small trick to repeat an action in case of token refresh + for (const _ of iterate) { + try { + // first post the main post + if (i === 0) { + postsResults.push( + ...(await postSocialV106(post.integration as Integration, [ + postsList[i], + ])) + ); + + // then post the comments if any + } else { + if (postsList[i].delay) { + await sleep(60000 * Math.max(0, Number(postsList[i].delay ?? 0))); + } + + postsResults.push( + ...(await postCommentV106( + postsResults[0].postId, + postsResults.length === 1 + ? undefined + : postsResults[i - 1].postId, + post.integration, + [postsList[i]] + )) + ); + } + + // mark post as successful + await updatePost( + postsList[i].id, + postsResults[i].postId, + postsResults[i].releaseURL + ); + + if (i === 0) { + // send notification on a sucessful post + await inAppNotification( + post.integration.organizationId, + `Your post has been published on ${capitalize( + post.integration.providerIdentifier + )}`, + `Your post has been published on ${capitalize( + post.integration.providerIdentifier + )} at ${postsResults[0].releaseURL}`, + true, + true + ); + } + + // break the current while to move to the next post + break; + } catch (err) { + // if token refresh is needed, do it and repeat + if ( + err instanceof ActivityFailure && + err.cause instanceof ApplicationFailure && + err.cause.type === 'refresh_token' + ) { + const refresh = await refreshTokenWithCauseV106( + post.integration, + err?.cause?.message || '' + ); + if (!refresh || !refresh.accessToken) { + await changeState(postsList[0].id, 'ERROR', err, postsList); + return false; + } + + post.integration.token = refresh.accessToken; + continue; + } + + // for other errors, change state and inform the user if needed + await changeState(postsList[0].id, 'ERROR', err, postsList); + + // specific case for bad body errors + if ( + err instanceof ActivityFailure && + err.cause instanceof ApplicationFailure && + err.cause.type === 'bad_body' + ) { + await inAppNotification( + post.organizationId, + `Error posting${i === 0 ? ' ' : ' comments '}on ${ + post.integration?.providerIdentifier + } for ${post?.integration?.name}`, + `An error occurred while posting${i === 0 ? ' ' : ' comments '}on ${ + post.integration?.providerIdentifier + }${err?.cause?.message ? `: ${err?.cause?.message}` : ``}`, + true, + false, + 'fail' + ); + return false; + } + } + } + + if (postsResults.length === before) { + // all retries exhausted without success + return false; + } + } + + // send webhooks for the post + await sendWebhooks( + postsResults[0].postId, + post.organizationId, + post.integration.id + ); + + // load internal plugs like repost by other users + const internalPlugsList = await internalPlugs( + post.integration, + JSON.parse(post.settings) + ); + + // load global plugs, like repost a post if it gets to a certain number of likes + const globalPlugsList = (await globalPlugs(post.integration)).reduce( + (all, current) => { + for (let i = 1; i <= current.totalRuns; i++) { + all.push({ + ...current, + delay: current.delay * i, + }); + } + + return all; + }, + [] + ); + + // Check if the post is repeatable + const repeatPost = !post.intervalInDays + ? [] + : [ + { + type: 'repeat-post', + delay: + post.intervalInDays * 24 * 60 * 60 * 1000 - + (new Date().getTime() - startTime.getTime()), + }, + ]; + + // Sort all the actions by delay, so we can process them in order + const list = sortBy( + [...internalPlugsList, ...globalPlugsList, ...repeatPost], + 'delay' + ); + + // process all the plugs in order, we are using while because in some cases we need to remove items from the list + while (list.length > 0) { + // get the next to process + const todo = list.shift(); + + // wait for the delay + await sleep(Math.max(0, Number(todo.delay ?? 0))); + + // process internal plug + if (todo.type === 'internal-plug') { + for (const _ of iterate) { + try { + await processInternalPlug({ ...todo, post: postsResults[0].postId }); + } catch (err) { + if ( + err instanceof ActivityFailure && + err.cause instanceof ApplicationFailure && + err.cause.type === 'refresh_token' + ) { + const refresh = await refreshTokenWithCauseV106( + await getIntegrationById(organizationId, todo.integration), + err?.cause?.message || '' + ); + if (!refresh || !refresh.accessToken) { + break; + } + + continue; + } + + if ( + err instanceof ActivityFailure && + err.cause instanceof ApplicationFailure && + err.cause.type === 'bad_body' + ) { + break; + } + + continue; + } + break; + } + } + + // process global plug + if (todo.type === 'global') { + for (const _ of iterate) { + try { + const process = await processPlug({ + ...todo, + postId: postsResults[0].postId, + }); + if (process) { + const toDelete = list + .reduce((all, current, index) => { + if (current.plugId === todo.plugId) { + all.push(index); + } + + return all; + }, []) + .reverse(); + + for (const index of toDelete) { + list.splice(index, 1); + } + } + } catch (err) { + if ( + err instanceof ActivityFailure && + err.cause instanceof ApplicationFailure && + err.cause.type === 'refresh_token' + ) { + const refresh = await refreshTokenWithCauseV106( + post.integration, + err?.cause?.message || '' + ); + if (!refresh || !refresh.accessToken) { + break; + } + + continue; + } + + if ( + err instanceof ActivityFailure && + err.cause instanceof ApplicationFailure && + err.cause.type === 'bad_body' + ) { + break; + } + + continue; + } + + break; + } + } + + // process repeat post in a new workflow, this is important so the other plugs can keep running + if (todo.type === 'repeat-post') { + await startChild(postWorkflowV106, { + parentClosePolicy: 'ABANDON', + args: [ + { + taskQueue, + postId, + organizationId, + postNow: true, + }, + ], + workflowId: `post_${post.id}_${makeId(10)}`, + typedSearchAttributes: new TypedSearchAttributes([ + { + key: postIdSearchParam, + value: postId, + }, + ]), + }); + } + } +} diff --git a/libraries/nestjs-libraries/src/database/prisma/posts/posts.service.ts b/libraries/nestjs-libraries/src/database/prisma/posts/posts.service.ts index 0e8100ef..0d633e67 100644 --- a/libraries/nestjs-libraries/src/database/prisma/posts/posts.service.ts +++ b/libraries/nestjs-libraries/src/database/prisma/posts/posts.service.ts @@ -722,7 +722,7 @@ export class PostsService { try { await this._temporalService.client .getRawClient() - ?.workflow.start('postWorkflowV105', { + ?.workflow.start('postWorkflowV106', { workflowId: `post_${postId}`, taskQueue: 'main', workflowIdConflictPolicy: 'TERMINATE_EXISTING',