feat: page update notifications (#2074)
* feat: watchers notification and email preferences * fix: email copy * digests * clean up * fix * clean up * move backlinks queue-up to history processor * fix * fix keys * feat: group notifications * filter * adjust email digest window
This commit is contained in:
@@ -18,12 +18,10 @@ import { QueueJob, QueueName } from '../../integrations/queue/constants';
|
||||
import { Queue } from 'bullmq';
|
||||
import {
|
||||
extractMentions,
|
||||
extractPageMentions,
|
||||
extractUserMentions,
|
||||
} from '../../common/helpers/prosemirror/utils';
|
||||
import { isDeepStrictEqual } from 'node:util';
|
||||
import {
|
||||
IPageBacklinkJob,
|
||||
IPageHistoryJob,
|
||||
IPageMentionNotificationJob,
|
||||
} from '../../integrations/queue/constants/queue.interface';
|
||||
@@ -43,7 +41,6 @@ export class PersistenceExtension implements Extension {
|
||||
constructor(
|
||||
private readonly pageRepo: PageRepo,
|
||||
@InjectKysely() private readonly db: KyselyDB,
|
||||
@InjectQueue(QueueName.GENERAL_QUEUE) private generalQueue: Queue,
|
||||
@InjectQueue(QueueName.AI_QUEUE) private aiQueue: Queue,
|
||||
@InjectQueue(QueueName.HISTORY_QUEUE) private historyQueue: Queue,
|
||||
@InjectQueue(QueueName.NOTIFICATION_QUEUE) private notificationQueue: Queue,
|
||||
@@ -165,13 +162,6 @@ export class PersistenceExtension implements Extension {
|
||||
await this.collabHistory.addContributors(pageId, editingUserIds);
|
||||
|
||||
const mentions = extractMentions(tiptapJson);
|
||||
const pageMentions = extractPageMentions(mentions);
|
||||
|
||||
await this.generalQueue.add(QueueJob.PAGE_BACKLINKS, {
|
||||
pageId: pageId,
|
||||
workspaceId: page.workspaceId,
|
||||
mentions: pageMentions,
|
||||
} as IPageBacklinkJob);
|
||||
|
||||
const userMentions = extractUserMentions(mentions);
|
||||
const oldMentions = page.content ? extractMentions(page.content) : [];
|
||||
|
||||
@@ -1,8 +1,17 @@
|
||||
import { Logger, OnModuleDestroy } from '@nestjs/common';
|
||||
import { OnWorkerEvent, Processor, WorkerHost } from '@nestjs/bullmq';
|
||||
import { Job } from 'bullmq';
|
||||
import { InjectQueue } from '@nestjs/bullmq';
|
||||
import { Job, Queue } from 'bullmq';
|
||||
import { QueueJob, QueueName } from '../../integrations/queue/constants';
|
||||
import { IPageHistoryJob } from '../../integrations/queue/constants/queue.interface';
|
||||
import {
|
||||
IPageBacklinkJob,
|
||||
IPageHistoryJob,
|
||||
IPageUpdateNotificationJob,
|
||||
} from '../../integrations/queue/constants/queue.interface';
|
||||
import {
|
||||
extractMentions,
|
||||
extractPageMentions,
|
||||
} from '../../common/helpers/prosemirror/utils';
|
||||
import { PageHistoryRepo } from '@docmost/db/repos/page/page-history.repo';
|
||||
import { PageRepo } from '@docmost/db/repos/page/page.repo';
|
||||
import { isDeepStrictEqual } from 'node:util';
|
||||
@@ -18,6 +27,8 @@ export class HistoryProcessor extends WorkerHost implements OnModuleDestroy {
|
||||
private readonly pageRepo: PageRepo,
|
||||
private readonly collabHistory: CollabHistoryService,
|
||||
private readonly watcherService: WatcherService,
|
||||
@InjectQueue(QueueName.NOTIFICATION_QUEUE) private notificationQueue: Queue,
|
||||
@InjectQueue(QueueName.GENERAL_QUEUE) private generalQueue: Queue,
|
||||
) {
|
||||
super();
|
||||
}
|
||||
@@ -47,8 +58,7 @@ export class HistoryProcessor extends WorkerHost implements OnModuleDestroy {
|
||||
!lastHistory ||
|
||||
!isDeepStrictEqual(lastHistory.content, page.content)
|
||||
) {
|
||||
const contributorIds =
|
||||
await this.collabHistory.popContributors(pageId);
|
||||
const contributorIds = await this.collabHistory.popContributors(pageId);
|
||||
|
||||
try {
|
||||
await this.watcherService.addPageWatchers(
|
||||
@@ -61,12 +71,39 @@ export class HistoryProcessor extends WorkerHost implements OnModuleDestroy {
|
||||
await this.pageHistoryRepo.saveHistory(page, { contributorIds });
|
||||
this.logger.debug(`History created for page: ${pageId}`);
|
||||
} catch (err) {
|
||||
await this.collabHistory.addContributors(
|
||||
pageId,
|
||||
contributorIds,
|
||||
);
|
||||
await this.collabHistory.addContributors(pageId, contributorIds);
|
||||
throw err;
|
||||
}
|
||||
|
||||
const mentions = extractMentions(page.content);
|
||||
const pageMentions = extractPageMentions(mentions);
|
||||
|
||||
await this.generalQueue
|
||||
.add(QueueJob.PAGE_BACKLINKS, {
|
||||
pageId,
|
||||
workspaceId: page.workspaceId,
|
||||
mentions: pageMentions,
|
||||
} as IPageBacklinkJob)
|
||||
.catch((err) => {
|
||||
this.logger.error(
|
||||
`Failed to queue backlinks for ${pageId}: ${err.message}`,
|
||||
);
|
||||
});
|
||||
|
||||
if (contributorIds.length > 0 && lastHistory?.content) {
|
||||
await this.notificationQueue
|
||||
.add(QueueJob.PAGE_UPDATED, {
|
||||
pageId,
|
||||
spaceId: page.spaceId,
|
||||
workspaceId: page.workspaceId,
|
||||
actorIds: contributorIds,
|
||||
} as IPageUpdateNotificationJob)
|
||||
.catch((err) => {
|
||||
this.logger.error(
|
||||
`Failed to queue page update notification for ${pageId}: ${err.message}`,
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
throw err;
|
||||
|
||||
Reference in New Issue
Block a user