Compare commits

...
Sign in to create a new pull request.

1 commit

Author SHA1 Message Date
Enno Gelhaus
81018396ab
feat: add sentry implementation for posting
Some checks failed
Build / build (22.12.0) (push) Has been cancelled
2026-05-15 21:08:32 +02:00
4 changed files with 665 additions and 2 deletions

View file

@ -4,6 +4,7 @@ import {
ActivityMethod,
TemporalService,
} from 'nestjs-temporal-core';
import * as Sentry from '@sentry/nestjs';
import { PostsService } from '@gitroom/nestjs-libraries/database/prisma/posts/posts.service';
import {
NotificationService,
@ -76,7 +77,7 @@ export class PostActivity {
for (const post of list) {
await this._temporalService.client
.getRawClient()
.workflow.signalWithStart('postWorkflowV105', {
.workflow.signalWithStart('postWorkflowV106', {
workflowId: `post_${post.id}`,
taskQueue: 'main',
signal: 'poke',
@ -413,4 +414,227 @@ export class PostActivity {
return false;
}
}
@ActivityMethod()
async postSocialV106(integration: Integration, posts: Post[]) {
const ctx = {
provider: integration.providerIdentifier,
integrationId: integration.id,
organizationId: integration.organizationId,
postIds: posts.map((p) => p.id),
};
try {
if (process.env.STRIPE_SECRET_KEY) {
const subscription = await this._subscriptionService.getSubscription(
integration.organizationId
);
if (!subscription) {
throw new Error(
'No active subscription found for this organization.'
);
}
}
const getIntegration = this._integrationManager.getSocialIntegration(
integration.providerIdentifier
);
const newPosts = await this._postService.updateTags(
integration.organizationId,
posts
);
Sentry.logger.info('Posting to social channel', ctx);
const postNow = await getIntegration.post(
integration.internalId,
integration.token,
await Promise.all(
(newPosts || []).map(async (p) => ({
id: p.id,
message: stripHtmlValidation(
getIntegration.editor,
p.content,
true,
false,
!/<\/?[a-z][\s\S]*>/i.test(p.content),
getIntegration.mentionFormat
),
settings: JSON.parse(p.settings || '{}'),
media: await this._postService.updateMedia(
p.id,
JSON.parse(p.image || '[]'),
getIntegration?.convertToJPEG || false
),
}))
),
integration
);
Sentry.logger.info('Posted to social channel successfully', {
...ctx,
published: postNow.length,
});
await this._temporalService.client
.getRawClient()
.workflow.start('streakWorkflow', {
args: [{ organizationId: integration.organizationId }],
workflowId: `streak_${integration.organizationId}`,
taskQueue: 'main',
workflowIdConflictPolicy: 'TERMINATE_EXISTING',
typedSearchAttributes: new TypedSearchAttributes([
{
key: organizationId,
value: integration.organizationId,
},
]),
});
return postNow;
} catch (err) {
Sentry.logger.error('Failed to post to social channel', {
...ctx,
error: err instanceof Error ? err.message : String(err),
errorType: err instanceof Error ? err.constructor.name : typeof err,
});
Sentry.captureException(err, {
tags: {
provider: integration.providerIdentifier,
activity: 'postSocial',
},
contexts: { post: ctx },
});
throw err;
}
}
@ActivityMethod()
async postCommentV106(
postId: string,
lastPostId: string | undefined,
integration: Integration,
posts: Post[]
) {
const ctx = {
provider: integration.providerIdentifier,
integrationId: integration.id,
organizationId: integration.organizationId,
parentPostId: postId,
postIds: posts.map((p) => p.id),
};
try {
const getIntegration = this._integrationManager.getSocialIntegration(
integration.providerIdentifier
);
const newPosts = await this._postService.updateTags(
integration.organizationId,
posts
);
Sentry.logger.info('Posting comment to social channel', ctx);
const result = await getIntegration.comment(
integration.internalId,
postId,
lastPostId,
integration.token,
await Promise.all(
(newPosts || []).map(async (p) => ({
id: p.id,
message: stripHtmlValidation(
getIntegration.editor,
p.content,
true,
false,
!/<\/?[a-z][\s\S]*>/i.test(p.content),
getIntegration.mentionFormat
),
settings: JSON.parse(p.settings || '{}'),
media: await this._postService.updateMedia(
p.id,
JSON.parse(p.image || '[]'),
getIntegration?.convertToJPEG || false
),
}))
),
integration
);
Sentry.logger.info('Posted comment successfully', ctx);
return result;
} catch (err) {
Sentry.logger.error('Failed to post comment to social channel', {
...ctx,
error: err instanceof Error ? err.message : String(err),
errorType: err instanceof Error ? err.constructor.name : typeof err,
});
Sentry.captureException(err, {
tags: {
provider: integration.providerIdentifier,
activity: 'postComment',
},
contexts: { post: ctx },
});
throw err;
}
}
@ActivityMethod()
async refreshTokenWithCauseV106(
integration: Integration,
cause: string
): Promise<false | AuthTokenDetails> {
const getIntegration = this._integrationManager.getSocialIntegration(
integration.providerIdentifier
);
const ctx = {
provider: integration.providerIdentifier,
integrationId: integration.id,
organizationId: integration.organizationId,
cause,
};
try {
Sentry.logger.info('Refreshing integration token', ctx);
const refresh = await this._refreshIntegrationService.refresh(
integration,
cause
);
if (!refresh) {
Sentry.logger.warn('Token refresh returned no credentials', ctx);
return false;
}
if (getIntegration.refreshWait) {
await timer(10000);
}
Sentry.logger.info('Refreshed integration token successfully', ctx);
return refresh;
} catch (err) {
Sentry.logger.error('Failed to refresh integration token', {
...ctx,
error: err instanceof Error ? err.message : String(err),
errorType: err instanceof Error ? err.constructor.name : typeof err,
});
Sentry.captureException(err, {
tags: {
provider: integration.providerIdentifier,
activity: 'refreshTokenWithCause',
},
contexts: { integration: ctx },
});
await this._refreshIntegrationService.setBetweenSteps(integration, cause);
return false;
}
}
}

View file

@ -3,6 +3,7 @@ export * from './post-workflows/post.workflow.v1.0.2';
export * from './post-workflows/post.workflow.v1.0.3';
export * from './post-workflows/post.workflow.v1.0.4';
export * from './post-workflows/post.workflow.v1.0.5';
export * from './post-workflows/post.workflow.v1.0.6';
export * from './autopost.workflow';
export * from './digest.email.workflow';
export * from './missing.post.workflow';

View file

@ -0,0 +1,438 @@
import { PostActivity } from '@gitroom/orchestrator/activities/post.activity';
import {
ActivityFailure,
ApplicationFailure,
startChild,
proxyActivities,
sleep,
defineSignal,
setHandler,
} from '@temporalio/workflow';
import dayjs from 'dayjs';
import { Integration } from '@prisma/client';
import { capitalize, sortBy } from 'lodash';
import { PostResponse } from '@gitroom/nestjs-libraries/integrations/social/social.integrations.interface';
import { makeId } from '@gitroom/nestjs-libraries/services/make.is';
import { TypedSearchAttributes } from '@temporalio/common';
import { postId as postIdSearchParam } from '@gitroom/nestjs-libraries/temporal/temporal.search.attribute';
const proxyTaskQueue = (taskQueue: string) => {
return proxyActivities<PostActivity>({
startToCloseTimeout: '10 minute',
taskQueue,
retry: {
maximumAttempts: 3,
backoffCoefficient: 1,
initialInterval: '2 minutes',
},
});
};
const {
getPostsList,
getPost,
inAppNotification,
changeState,
updatePost,
sendWebhooks,
isCommentable,
} = proxyActivities<PostActivity>({
startToCloseTimeout: '10 minute',
retry: {
maximumAttempts: 3,
backoffCoefficient: 1,
initialInterval: '2 minutes',
},
});
const poke = defineSignal('poke');
const iterate = Array.from({ length: 5 });
export async function postWorkflowV106({
taskQueue,
postId,
organizationId,
postNow = false,
}: {
taskQueue: string;
postId: string;
organizationId: string;
postNow?: boolean;
}) {
// Dynamic task queue, for concurrency
const {
postSocialV106,
postCommentV106,
getIntegrationById,
refreshTokenWithCauseV106,
internalPlugs,
globalPlugs,
processInternalPlug,
processPlug,
} = proxyTaskQueue(taskQueue);
let poked = false;
setHandler(poke, () => {
poked = true;
});
const startTime = new Date();
// get all the posts and comments to post
const firstPost = await getPost(organizationId, postId);
// in case doesn't exists for some reason, fail it
if (!firstPost) {
await changeState(postId, 'ERROR', 'No Post');
return;
}
if (!postNow && firstPost.state !== 'QUEUE') {
await changeState(firstPost.id, 'ERROR', 'Already posted', [firstPost]);
return;
}
// if it's a repeatable post, we should ignore this.
if (!postNow) {
await sleep(
dayjs(firstPost.publishDate).isBefore(dayjs())
? 0
: dayjs(firstPost.publishDate).diff(dayjs(), 'millisecond')
);
}
const postsListBefore = await getPostsList(organizationId, postId);
const [post] = postsListBefore;
if (!post) {
await changeState(postId, 'ERROR', 'No Post');
return;
}
// if refresh is needed from last time, let's inform the user
if (post.integration?.refreshNeeded) {
await inAppNotification(
post.organizationId,
`We couldn't post to ${post.integration?.providerIdentifier} for ${post?.integration?.name}`,
`We couldn't post to ${post.integration?.providerIdentifier} for ${post?.integration?.name} because you need to reconnect it. Please enable it and try again.`,
true,
false,
'info'
);
await changeState(
postsListBefore[0].id,
'ERROR',
'Refresh channel needed',
postsListBefore
);
return;
}
// if it's disabled, inform the user
if (post.integration?.disabled) {
await inAppNotification(
post.organizationId,
`We couldn't post to ${post.integration?.providerIdentifier} for ${post?.integration?.name}`,
`We couldn't post to ${post.integration?.providerIdentifier} for ${post?.integration?.name} because it's disabled. Please enable it and try again.`,
true,
false,
'info'
);
await changeState(
postsListBefore[0].id,
'ERROR',
'Channel disabled',
postsListBefore
);
return;
}
// Do we need to post comment for this social?
const toComment: boolean =
postsListBefore.length === 1
? false
: await isCommentable(post.integration);
const postsList = toComment ? postsListBefore : [postsListBefore[0]];
// list of all the saved results
const postsResults: PostResponse[] = [];
// iterate over the posts
for (let i = 0; i < postsList.length; i++) {
const before = postsResults.length;
// this is a small trick to repeat an action in case of token refresh
for (const _ of iterate) {
try {
// first post the main post
if (i === 0) {
postsResults.push(
...(await postSocialV106(post.integration as Integration, [
postsList[i],
]))
);
// then post the comments if any
} else {
if (postsList[i].delay) {
await sleep(60000 * Math.max(0, Number(postsList[i].delay ?? 0)));
}
postsResults.push(
...(await postCommentV106(
postsResults[0].postId,
postsResults.length === 1
? undefined
: postsResults[i - 1].postId,
post.integration,
[postsList[i]]
))
);
}
// mark post as successful
await updatePost(
postsList[i].id,
postsResults[i].postId,
postsResults[i].releaseURL
);
if (i === 0) {
// send notification on a sucessful post
await inAppNotification(
post.integration.organizationId,
`Your post has been published on ${capitalize(
post.integration.providerIdentifier
)}`,
`Your post has been published on ${capitalize(
post.integration.providerIdentifier
)} at ${postsResults[0].releaseURL}`,
true,
true
);
}
// break the current while to move to the next post
break;
} catch (err) {
// if token refresh is needed, do it and repeat
if (
err instanceof ActivityFailure &&
err.cause instanceof ApplicationFailure &&
err.cause.type === 'refresh_token'
) {
const refresh = await refreshTokenWithCauseV106(
post.integration,
err?.cause?.message || ''
);
if (!refresh || !refresh.accessToken) {
await changeState(postsList[0].id, 'ERROR', err, postsList);
return false;
}
post.integration.token = refresh.accessToken;
continue;
}
// for other errors, change state and inform the user if needed
await changeState(postsList[0].id, 'ERROR', err, postsList);
// specific case for bad body errors
if (
err instanceof ActivityFailure &&
err.cause instanceof ApplicationFailure &&
err.cause.type === 'bad_body'
) {
await inAppNotification(
post.organizationId,
`Error posting${i === 0 ? ' ' : ' comments '}on ${
post.integration?.providerIdentifier
} for ${post?.integration?.name}`,
`An error occurred while posting${i === 0 ? ' ' : ' comments '}on ${
post.integration?.providerIdentifier
}${err?.cause?.message ? `: ${err?.cause?.message}` : ``}`,
true,
false,
'fail'
);
return false;
}
}
}
if (postsResults.length === before) {
// all retries exhausted without success
return false;
}
}
// send webhooks for the post
await sendWebhooks(
postsResults[0].postId,
post.organizationId,
post.integration.id
);
// load internal plugs like repost by other users
const internalPlugsList = await internalPlugs(
post.integration,
JSON.parse(post.settings)
);
// load global plugs, like repost a post if it gets to a certain number of likes
const globalPlugsList = (await globalPlugs(post.integration)).reduce(
(all, current) => {
for (let i = 1; i <= current.totalRuns; i++) {
all.push({
...current,
delay: current.delay * i,
});
}
return all;
},
[]
);
// Check if the post is repeatable
const repeatPost = !post.intervalInDays
? []
: [
{
type: 'repeat-post',
delay:
post.intervalInDays * 24 * 60 * 60 * 1000 -
(new Date().getTime() - startTime.getTime()),
},
];
// Sort all the actions by delay, so we can process them in order
const list = sortBy(
[...internalPlugsList, ...globalPlugsList, ...repeatPost],
'delay'
);
// process all the plugs in order, we are using while because in some cases we need to remove items from the list
while (list.length > 0) {
// get the next to process
const todo = list.shift();
// wait for the delay
await sleep(Math.max(0, Number(todo.delay ?? 0)));
// process internal plug
if (todo.type === 'internal-plug') {
for (const _ of iterate) {
try {
await processInternalPlug({ ...todo, post: postsResults[0].postId });
} catch (err) {
if (
err instanceof ActivityFailure &&
err.cause instanceof ApplicationFailure &&
err.cause.type === 'refresh_token'
) {
const refresh = await refreshTokenWithCauseV106(
await getIntegrationById(organizationId, todo.integration),
err?.cause?.message || ''
);
if (!refresh || !refresh.accessToken) {
break;
}
continue;
}
if (
err instanceof ActivityFailure &&
err.cause instanceof ApplicationFailure &&
err.cause.type === 'bad_body'
) {
break;
}
continue;
}
break;
}
}
// process global plug
if (todo.type === 'global') {
for (const _ of iterate) {
try {
const process = await processPlug({
...todo,
postId: postsResults[0].postId,
});
if (process) {
const toDelete = list
.reduce((all, current, index) => {
if (current.plugId === todo.plugId) {
all.push(index);
}
return all;
}, [])
.reverse();
for (const index of toDelete) {
list.splice(index, 1);
}
}
} catch (err) {
if (
err instanceof ActivityFailure &&
err.cause instanceof ApplicationFailure &&
err.cause.type === 'refresh_token'
) {
const refresh = await refreshTokenWithCauseV106(
post.integration,
err?.cause?.message || ''
);
if (!refresh || !refresh.accessToken) {
break;
}
continue;
}
if (
err instanceof ActivityFailure &&
err.cause instanceof ApplicationFailure &&
err.cause.type === 'bad_body'
) {
break;
}
continue;
}
break;
}
}
// process repeat post in a new workflow, this is important so the other plugs can keep running
if (todo.type === 'repeat-post') {
await startChild(postWorkflowV106, {
parentClosePolicy: 'ABANDON',
args: [
{
taskQueue,
postId,
organizationId,
postNow: true,
},
],
workflowId: `post_${post.id}_${makeId(10)}`,
typedSearchAttributes: new TypedSearchAttributes([
{
key: postIdSearchParam,
value: postId,
},
]),
});
}
}
}

View file

@ -722,7 +722,7 @@ export class PostsService {
try {
await this._temporalService.client
.getRawClient()
?.workflow.start('postWorkflowV105', {
?.workflow.start('postWorkflowV106', {
workflowId: `post_${postId}`,
taskQueue: 'main',
workflowIdConflictPolicy: 'TERMINATE_EXISTING',