From 9aaf5810cf55a842bd043305ee064bc719549127 Mon Sep 17 00:00:00 2001 From: claude code agent 227 Date: Sun, 21 Jun 2026 15:04:31 +0300 Subject: [PATCH] feat(git-sync): GitSyncModule orchestrator + config + listener (Phase A.4b/B) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Control plane wiring (plan §5-§11): - PageService create/update/movePage now honor provenance actor 'git-sync' (stamp lastUpdatedSource='git-sync'), closing the A.4a gap. - EnvironmentService: GIT_SYNC_ENABLED / DATA_DIR / REMOTE_TEMPLATE / POLL_INTERVAL_MS / DEBOUNCE_MS / SERVICE_USER_ID (required-if-enabled) / SSH_KEY_PATH + validation. - VaultRegistryService: per-space vault path + cached VaultGit. - GitSyncOrchestrator: per-space Redis leader-lock (SET NX PX + CAS-Lua release, randomUUID instanceId) + in-process mutex; runOnce drives the vendored engine PULL (readExisting->computePullActions->applyPullActions) then PUSH (runPush) with the bound native GitSyncClient + VaultGit; @Interval poll-safety gated on GIT_SYNC_ENABLED; imports plain ScheduleModule (TelemetryModule owns forRoot). - PageChangeListener: @OnEvent PAGE_* -> per-space debounce -> runOnce, with a best-effort lastUpdatedSource==='git-sync' loop-guard. - GitSyncController: admin POST /api/git-sync/trigger + GET /status (ops/e2e). - GitSyncModule registered in app.module. Enabled-space enumeration uses settings.gitSync.enabled, falling back to all live spaces until Phase C writes the flag (master gate = GIT_SYNC_ENABLED). tsc clean; 713 tests/71 suites pass; dev server hot-reloaded the module (route live, DI graph boots). Live pull/push round-trip verified next. Co-Authored-By: Claude Opus 4.8 --- apps/server/src/app.module.ts | 2 + .../decorators/auth-provenance.decorator.ts | 8 + .../environment/environment.service.ts | 54 +++ .../environment/environment.validation.ts | 35 ++ .../git-sync/git-sync.constants.ts | 45 +++ .../git-sync/git-sync.controller.ts | 92 +++++ .../integrations/git-sync/git-sync.module.ts | 48 +++ .../listeners/page-change.listener.ts | 143 ++++++++ .../services/git-sync.orchestrator.ts | 323 ++++++++++++++++++ .../services/vault-registry.service.ts | 44 +++ 10 files changed, 794 insertions(+) create mode 100644 apps/server/src/integrations/git-sync/git-sync.constants.ts create mode 100644 apps/server/src/integrations/git-sync/git-sync.controller.ts create mode 100644 apps/server/src/integrations/git-sync/git-sync.module.ts create mode 100644 apps/server/src/integrations/git-sync/listeners/page-change.listener.ts create mode 100644 apps/server/src/integrations/git-sync/services/git-sync.orchestrator.ts create mode 100644 apps/server/src/integrations/git-sync/services/vault-registry.service.ts diff --git a/apps/server/src/app.module.ts b/apps/server/src/app.module.ts index 926d5802..5fe95a7a 100644 --- a/apps/server/src/app.module.ts +++ b/apps/server/src/app.module.ts @@ -28,6 +28,7 @@ import { ClsModule } from 'nestjs-cls'; import { NoopAuditModule } from './integrations/audit/audit.module'; import { ThrottleModule } from './integrations/throttle/throttle.module'; import { McpModule } from './integrations/mcp/mcp.module'; +import { GitSyncModule } from './integrations/git-sync/git-sync.module'; import { AiModule } from './integrations/ai/ai.module'; import { AiChatModule } from './core/ai-chat/ai-chat.module'; @@ -89,6 +90,7 @@ try { TelemetryModule, ThrottleModule, McpModule, + GitSyncModule, AiModule, AiChatModule, ...enterpriseModules, diff --git a/apps/server/src/common/decorators/auth-provenance.decorator.ts b/apps/server/src/common/decorators/auth-provenance.decorator.ts index 22947ee8..e7358dd8 100644 --- a/apps/server/src/common/decorators/auth-provenance.decorator.ts +++ b/apps/server/src/common/decorators/auth-provenance.decorator.ts @@ -62,6 +62,14 @@ export function agentSourceFields( sourceKey: S, chatKey: C, ): Partial & Record> { + // git-sync data-plane write (plan §8.1): stamp the source 'git-sync' with NO + // aiChatId (it has no internal ai_chats row). Mirrors the agent branch; each + // write has a single actor, so precedence is irrelevant here. + if (provenance?.actor === 'git-sync') { + return { [sourceKey]: 'git-sync' } as Partial< + Record & Record + >; + } if (provenance?.actor !== 'agent') return {}; return { [sourceKey]: 'agent', diff --git a/apps/server/src/integrations/environment/environment.service.ts b/apps/server/src/integrations/environment/environment.service.ts index 6bbc6dba..5cff4764 100644 --- a/apps/server/src/integrations/environment/environment.service.ts +++ b/apps/server/src/integrations/environment/environment.service.ts @@ -320,4 +320,58 @@ export class EnvironmentService { .map((o) => o.trim()) .filter(Boolean); } + + // --- git-sync (plan §7.2) ------------------------------------------------- + + /** Global master switch for the git-sync control plane (default false). */ + isGitSyncEnabled(): boolean { + return ( + this.configService.get('GIT_SYNC_ENABLED', 'false').toLowerCase() === + 'true' + ); + } + + /** + * Root directory holding the per-space vault repos. Defaults to + * `/git-sync`. `DATA_DIR` is read directly (no dedicated + * getter exists in this codebase) so the vault root tracks the data volume. + */ + getGitSyncDataDir(): string { + const explicit = this.configService.get('GIT_SYNC_DATA_DIR'); + if (explicit) return explicit; + const dataDir = this.configService.get('DATA_DIR') || './data'; + return `${dataDir.replace(/\/+$/, '')}/git-sync`; + } + + /** Optional remote template, e.g. `git@host:vault-{spaceId}.git`. */ + getGitSyncRemoteTemplate(): string | undefined { + return this.configService.get('GIT_SYNC_REMOTE_TEMPLATE'); + } + + /** Poll-safety interval in ms (default 15000). */ + getGitSyncPollIntervalMs(): number { + return parseInt( + this.configService.get('GIT_SYNC_POLL_INTERVAL_MS', '15000'), + ); + } + + /** Event debounce window in ms (default 2000). */ + getGitSyncDebounceMs(): number { + return parseInt( + this.configService.get('GIT_SYNC_DEBOUNCE_MS', '2000'), + ); + } + + /** + * The service user id git-sync writes are attributed to. Required when sync is + * enabled (validated in environment.validation.ts); optional otherwise. + */ + getGitSyncServiceUserId(): string | undefined { + return this.configService.get('GIT_SYNC_SERVICE_USER_ID'); + } + + /** Optional path to the SSH key used for git remote access. */ + getGitSyncSshKeyPath(): string | undefined { + return this.configService.get('GIT_SYNC_SSH_KEY_PATH'); + } } diff --git a/apps/server/src/integrations/environment/environment.validation.ts b/apps/server/src/integrations/environment/environment.validation.ts index ef3c420c..923efa66 100644 --- a/apps/server/src/integrations/environment/environment.validation.ts +++ b/apps/server/src/integrations/environment/environment.validation.ts @@ -170,6 +170,41 @@ export class EnvironmentVariables { }, ) CLICKHOUSE_URL: string; + + // --- git-sync (plan §7.2) — all OPTIONAL. The master switch defaults off; a + // required-if-enabled service user id is validated only when sync is on. --- + + @IsOptional() + @IsIn(['true', 'false']) + @IsString() + GIT_SYNC_ENABLED: string; + + @IsOptional() + @IsString() + GIT_SYNC_DATA_DIR: string; + + @IsOptional() + @IsString() + GIT_SYNC_REMOTE_TEMPLATE: string; + + @IsOptional() + @IsString() + GIT_SYNC_POLL_INTERVAL_MS: string; + + @IsOptional() + @IsString() + GIT_SYNC_DEBOUNCE_MS: string; + + // Required when git-sync is enabled: the service user create/move/rename/delete + // are attributed to (plan §7.2). Optional otherwise. + @ValidateIf((obj) => obj.GIT_SYNC_ENABLED === 'true') + @IsNotEmpty() + @IsString() + GIT_SYNC_SERVICE_USER_ID: string; + + @IsOptional() + @IsString() + GIT_SYNC_SSH_KEY_PATH: string; } export function validate(config: Record) { diff --git a/apps/server/src/integrations/git-sync/git-sync.constants.ts b/apps/server/src/integrations/git-sync/git-sync.constants.ts new file mode 100644 index 00000000..4941a3e0 --- /dev/null +++ b/apps/server/src/integrations/git-sync/git-sync.constants.ts @@ -0,0 +1,45 @@ +/** + * Git-sync control-plane constants (plan §6/§9/§10). + * + * Event/job names are REUSED from the shared event contract (event.contants.ts) + * so the listener subscribes to the exact names the rest of the server emits — + * never a string literal that could drift. The Redis lock-key prefix + TTLs back + * the single-writer leader lock (§9); the debounce default backs the per-space + * event coalescing (§10). + */ +import { EventName } from '../../common/events/event.contants'; + +/** + * The page lifecycle events the git-sync listener reacts to (plan §10). A change + * to any of these in an enabled space schedules a debounced sync cycle. + * - PAGE_CREATED / PAGE_UPDATED / PAGE_MOVED — structural + content edits; + * - PAGE_CONTENT_UPDATED — the collab body-save job (real name in the enum); + * - PAGE_SOFT_DELETED / PAGE_RESTORED — Trash transitions (deletes are soft); + * - PAGE_MOVED_TO_SPACE — cross-space move (cross-repo, plan §5). + */ +export const GIT_SYNC_PAGE_EVENTS = [ + EventName.PAGE_CREATED, + EventName.PAGE_UPDATED, + EventName.PAGE_CONTENT_UPDATED, + EventName.PAGE_MOVED, + EventName.PAGE_MOVED_TO_SPACE, + EventName.PAGE_SOFT_DELETED, + EventName.PAGE_RESTORED, +] as const; + +/** Redis key prefix for the per-space leader lock (plan §9). */ +export const GIT_SYNC_LOCK_PREFIX = 'git-sync:lock:'; + +/** + * Leader-lock TTL (ms). Must exceed the maximum expected cycle duration so the + * lock is not lost mid-cycle; on a crash it expires on its own (plan §9). The + * in-process mutex (orchestrator) prevents overlapping cycles on one instance, + * and the Redis lock prevents two instances racing the same space. + */ +export const GIT_SYNC_LOCK_TTL_MS = 5 * 60 * 1000; + +/** Default event-debounce window (ms), overridable via GIT_SYNC_DEBOUNCE_MS. */ +export const GIT_SYNC_DEBOUNCE_MS_DEFAULT = 2000; + +/** Default poll-safety interval (ms), overridable via GIT_SYNC_POLL_INTERVAL_MS. */ +export const GIT_SYNC_POLL_INTERVAL_MS_DEFAULT = 15000; diff --git a/apps/server/src/integrations/git-sync/git-sync.controller.ts b/apps/server/src/integrations/git-sync/git-sync.controller.ts new file mode 100644 index 00000000..4c8a5267 --- /dev/null +++ b/apps/server/src/integrations/git-sync/git-sync.controller.ts @@ -0,0 +1,92 @@ +import { + Body, + Controller, + ForbiddenException, + HttpCode, + HttpStatus, + Post, + Get, + UseGuards, +} from '@nestjs/common'; +import { JwtAuthGuard } from '../../common/guards/jwt-auth.guard'; +import { AuthUser } from '../../common/decorators/auth-user.decorator'; +import { AuthWorkspace } from '../../common/decorators/auth-workspace.decorator'; +import { User, Workspace } from '@docmost/db/types/entity.types'; +import WorkspaceAbilityFactory from '../../core/casl/abilities/workspace-ability.factory'; +import { + WorkspaceCaslAction, + WorkspaceCaslSubject, +} from '../../core/casl/interfaces/workspace-ability.type'; +import { EnvironmentService } from '../environment/environment.service'; +import { + GitSyncOrchestrator, + GitSyncRunStatus, +} from './services/git-sync.orchestrator'; + +/** Body for the manual one-shot trigger. */ +class TriggerGitSyncDto { + spaceId: string; +} + +/** + * Ops/testing endpoints for the git-sync control plane (plan §6). Admin-guarded + * (workspace Manage/Settings, mirroring WorkspaceController) so only workspace + * admins can force a cycle. Mounted under the global `/api` prefix: + * - POST /api/git-sync/trigger { spaceId } — run one cycle now (await result), + * - GET /api/git-sync/status — report whether sync is enabled + config. + */ +@UseGuards(JwtAuthGuard) +@Controller('git-sync') +export class GitSyncController { + constructor( + private readonly orchestrator: GitSyncOrchestrator, + private readonly environmentService: EnvironmentService, + private readonly workspaceAbility: WorkspaceAbilityFactory, + ) {} + + /** Throw unless the caller is a workspace admin (Manage Settings). */ + private assertAdmin(user: User, workspace: Workspace): void { + const ability = this.workspaceAbility.createForUser(user, workspace); + if ( + ability.cannot(WorkspaceCaslAction.Manage, WorkspaceCaslSubject.Settings) + ) { + throw new ForbiddenException(); + } + } + + @HttpCode(HttpStatus.OK) + @Post('trigger') + async trigger( + @Body() dto: TriggerGitSyncDto, + @AuthUser() user: User, + @AuthWorkspace() workspace: Workspace, + ): Promise { + this.assertAdmin(user, workspace); + // Use the workspace from the request context (never client-supplied). + return this.orchestrator.runOnce(dto.spaceId, workspace.id); + } + + @HttpCode(HttpStatus.OK) + @Get('status') + async status( + @AuthUser() user: User, + @AuthWorkspace() workspace: Workspace, + ): Promise<{ + enabled: boolean; + dataDir: string; + pollIntervalMs: number; + debounceMs: number; + serviceUserConfigured: boolean; + }> { + this.assertAdmin(user, workspace); + return { + enabled: this.environmentService.isGitSyncEnabled(), + dataDir: this.environmentService.getGitSyncDataDir(), + pollIntervalMs: this.environmentService.getGitSyncPollIntervalMs(), + debounceMs: this.environmentService.getGitSyncDebounceMs(), + serviceUserConfigured: Boolean( + this.environmentService.getGitSyncServiceUserId(), + ), + }; + } +} diff --git a/apps/server/src/integrations/git-sync/git-sync.module.ts b/apps/server/src/integrations/git-sync/git-sync.module.ts new file mode 100644 index 00000000..dbf5ba19 --- /dev/null +++ b/apps/server/src/integrations/git-sync/git-sync.module.ts @@ -0,0 +1,48 @@ +import { Module } from '@nestjs/common'; +import { ScheduleModule } from '@nestjs/schedule'; +import { DatabaseModule } from '@docmost/db/database.module'; +import { EnvironmentModule } from '../environment/environment.module'; +import { CollaborationModule } from '../../collaboration/collaboration.module'; +import { PageModule } from '../../core/page/page.module'; +import { GitmostDataSourceService } from './services/gitmost-datasource.service'; +import { GitSyncOrchestrator } from './services/git-sync.orchestrator'; +import { VaultRegistryService } from './services/vault-registry.service'; +import { PageChangeListener } from './listeners/page-change.listener'; +import { GitSyncController } from './git-sync.controller'; + +/** + * The git-sync control plane (plan §6). Wires the native datasource, the + * orchestrator (poll + leader-lock), the per-space vault registry, the + * event-driven listener, and the admin trigger controller. + * + * Imports: + * - DatabaseModule (global) — PageRepo / SpaceRepo / KyselyDB for the + * datasource + orchestrator queries; + * - EnvironmentModule (global) — EnvironmentService config; + * - CollaborationModule — exports CollaborationGateway for native body writes; + * - PageModule — exports PageService for structural mutations; + * - ScheduleModule (NOT forRoot) — so @Interval is discovered. forRoot() is + * already registered globally by TelemetryModule; importing the plain module + * here avoids a duplicate scheduler registration (plan §6 note). + * + * RedisService is provided by the global RedisModule (app.module) and CASL's + * WorkspaceAbilityFactory by the global CaslModule — both resolve without an + * explicit import here. + */ +@Module({ + imports: [ + DatabaseModule, + EnvironmentModule, + CollaborationModule, + PageModule, + ScheduleModule, + ], + controllers: [GitSyncController], + providers: [ + GitmostDataSourceService, + GitSyncOrchestrator, + VaultRegistryService, + PageChangeListener, + ], +}) +export class GitSyncModule {} diff --git a/apps/server/src/integrations/git-sync/listeners/page-change.listener.ts b/apps/server/src/integrations/git-sync/listeners/page-change.listener.ts new file mode 100644 index 00000000..9f5789f7 --- /dev/null +++ b/apps/server/src/integrations/git-sync/listeners/page-change.listener.ts @@ -0,0 +1,143 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { OnEvent } from '@nestjs/event-emitter'; +import { PageRepo } from '@docmost/db/repos/page/page.repo'; +import { EnvironmentService } from '../../environment/environment.service'; +import { GitSyncOrchestrator } from '../services/git-sync.orchestrator'; +import { GIT_SYNC_PAGE_EVENTS } from '../git-sync.constants'; + +/** + * Shape of the page domain events the listener consumes. Different emit sites + * carry different optional fields (page.repo `PageEvent`, `PageMovedEvent`, + * etc.), so this is the intersection we read: a `pageIds` list / single `pageId`, + * the `workspaceId`, and an OPTIONAL `spaceId` (present only on some events). When + * `spaceId` is absent we resolve it from the page row. + */ +interface PageEventLike { + pageIds?: string[]; + pageId?: string; + workspaceId?: string; + spaceId?: string; + pages?: { id: string; spaceId: string }[]; + node?: { id: string; spaceId: string }; +} + +/** Per-space debounce bookkeeping. */ +interface DebounceEntry { + timer: NodeJS.Timeout; + workspaceId: string; +} + +/** + * Event-driven trigger for the git-sync control plane (plan §10). Subscribes to + * the page lifecycle events and, for an enabled space, schedules a DEBOUNCED + * `orchestrator.runOnce(spaceId, workspaceId)` — coalescing a burst of edits into + * a single cycle per space. + * + * Loop-guard (best-effort, plan §10/§8.2): an event whose page row already reads + * `lastUpdatedSource === 'git-sync'` is the orchestrator's OWN write, so we skip + * it to avoid a write -> event -> sync echo. This is the cheap first guard; the + * full bodyHash + updatedAt loop-guard (consuming the push side's + * `PushedPageRecord`) is a later hardening step (plan §8.2) — noted, not built + * here. The poll-safety interval still converges anything this guard drops. + */ +@Injectable() +export class PageChangeListener { + private readonly logger = new Logger(PageChangeListener.name); + private readonly debounce = new Map(); + + constructor( + private readonly environmentService: EnvironmentService, + private readonly orchestrator: GitSyncOrchestrator, + private readonly pageRepo: PageRepo, + ) {} + + /** + * One handler bound to ALL git-sync page events (the array form of `@OnEvent`). + * Resolves the affected page's space + workspace, applies the cheap loop-guard, + * and schedules the debounced cycle. + */ + @OnEvent(GIT_SYNC_PAGE_EVENTS as unknown as string[]) + async handlePageEvent(event: PageEventLike): Promise { + if (!this.environmentService.isGitSyncEnabled()) return; + + try { + const pageId = this.firstPageId(event); + if (!pageId) return; + + // Prefer a spaceId carried on the event; otherwise read the page row (also + // gives us the loop-guard source). A missing page (hard-deleted) is ignored. + let spaceId = this.eventSpaceId(event, pageId); + let workspaceId = event.workspaceId; + + if (!spaceId || !workspaceId) { + const page = await this.pageRepo.findById(pageId, { + includeContent: false, + }); + if (!page) return; + spaceId = spaceId ?? page.spaceId; + workspaceId = workspaceId ?? page.workspaceId; + // Loop-guard: skip our own writes (best-effort, plan §8.2). + if (page.lastUpdatedSource === 'git-sync') return; + } + + if (!spaceId || !workspaceId) return; + this.schedule(spaceId, workspaceId); + } catch (err) { + this.logger.warn( + `git-sync: failed to handle page event: ${ + err instanceof Error ? err.message : String(err) + }`, + ); + } + } + + /** Pull the first affected pageId out of the heterogeneous event shapes. */ + private firstPageId(event: PageEventLike): string | undefined { + return ( + event.pageId ?? + event.pageIds?.[0] ?? + event.pages?.[0]?.id ?? + event.node?.id + ); + } + + /** A spaceId carried directly on the event, for the given pageId if scoped. */ + private eventSpaceId( + event: PageEventLike, + pageId: string, + ): string | undefined { + if (event.spaceId) return event.spaceId; + const fromPages = event.pages?.find((p) => p.id === pageId)?.spaceId; + if (fromPages) return fromPages; + if (event.node?.id === pageId) return event.node.spaceId; + return undefined; + } + + /** + * Debounce per space: a new event resets the timer so a burst collapses into a + * single cycle. On fire, `runOnce` is enqueued (it internally serializes via the + * in-process mutex + Redis lock, so a still-running cycle is simply skipped and + * the next event reschedules). + */ + private schedule(spaceId: string, workspaceId: string): void { + const existing = this.debounce.get(spaceId); + if (existing) clearTimeout(existing.timer); + + const timer = setTimeout(() => { + this.debounce.delete(spaceId); + void this.orchestrator + .runOnce(spaceId, workspaceId) + .catch((err) => + this.logger.error( + `git-sync: debounced cycle for space ${spaceId} failed: ${ + err instanceof Error ? err.message : String(err) + }`, + ), + ); + }, this.environmentService.getGitSyncDebounceMs()); + + // Do not keep the event loop alive solely for a pending sync. + timer.unref?.(); + this.debounce.set(spaceId, { timer, workspaceId }); + } +} diff --git a/apps/server/src/integrations/git-sync/services/git-sync.orchestrator.ts b/apps/server/src/integrations/git-sync/services/git-sync.orchestrator.ts new file mode 100644 index 00000000..2f34395f --- /dev/null +++ b/apps/server/src/integrations/git-sync/services/git-sync.orchestrator.ts @@ -0,0 +1,323 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Interval } from '@nestjs/schedule'; +import { RedisService } from '@nestjs-labs/nestjs-ioredis'; +import type { Redis } from 'ioredis'; +import { randomUUID } from 'node:crypto'; +import { mkdir, readFile, rm, writeFile } from 'node:fs/promises'; +import { dirname } from 'node:path'; +import { InjectKysely } from 'nestjs-kysely'; +import { KyselyDB } from '@docmost/db/types/kysely.types'; +import { sql } from 'kysely'; +import { + type Settings, + readExisting, + computePullActions, + applyPullActions, + runPush, +} from '@docmost/git-sync'; +import { EnvironmentService } from '../../environment/environment.service'; +import { GitmostDataSourceService } from './gitmost-datasource.service'; +import { VaultRegistryService } from './vault-registry.service'; +import { + GIT_SYNC_LOCK_PREFIX, + GIT_SYNC_LOCK_TTL_MS, +} from '../git-sync.constants'; + +/** A space the poll loop should reconcile: its id + the workspace it lives in. */ +interface EnabledSpace { + spaceId: string; + workspaceId: string; +} + +/** Small status summary returned by `runOnce` (for the admin trigger + logs). */ +export interface GitSyncRunStatus { + spaceId: string; + ran: boolean; + /** Why the cycle did not run (lock held elsewhere, busy, disabled, error). */ + skipped?: 'lock-held' | 'in-progress' | 'disabled' | 'no-service-user'; + pull?: { written: number; deleted: number; conflict: boolean }; + push?: { mode: string; failures: number }; + error?: string; +} + +/** + * The git-sync control plane (plan §9/§10/§11). Drives the vendored engine in + * process: under a Redis leader lock (single-writer across replicas) plus an + * in-process per-space mutex (no overlapping cycles on one instance), it runs a + * PULL (Docmost -> vault) then a PUSH (vault -> Docmost) for a space. + * + * Enumeration of enabled spaces (plan §10 / Phase-C dependency): the per-space + * UI flag `space.settings.gitSync.enabled` is Phase C and not built yet, so this + * queries spaces whose jsonb flag is already set AND, when none are, treats + * GIT_SYNC_ENABLED as a master switch that enables ALL spaces (so the feature is + * exercisable before the UI lands). Once Phase C writes the flag, only flagged + * spaces sync. The whole loop is gated on GIT_SYNC_ENABLED first. + */ +@Injectable() +export class GitSyncOrchestrator { + private readonly logger = new Logger(GitSyncOrchestrator.name); + private readonly redis: Redis; + /** Unique per process instance — the leader-lock value (CAS on release). */ + private readonly instanceId = randomUUID(); + /** In-process per-space mutex: spaceIds with a cycle currently running. */ + private readonly running = new Set(); + + constructor( + private readonly environmentService: EnvironmentService, + private readonly dataSource: GitmostDataSourceService, + private readonly vaultRegistry: VaultRegistryService, + redisService: RedisService, + @InjectKysely() private readonly db: KyselyDB, + ) { + this.redis = redisService.getOrThrow(); + } + + // --- Redis leader lock (plan §9) ----------------------------------------- + + /** + * Acquire per-space leadership: `SET PX NX` returns + * 'OK' only when the key did not exist. Any other reply means another replica + * holds it. + */ + private async acquire(spaceId: string): Promise { + const ok = await this.redis.set( + GIT_SYNC_LOCK_PREFIX + spaceId, + this.instanceId, + 'PX', + GIT_SYNC_LOCK_TTL_MS, + 'NX', + ); + return ok === 'OK'; + } + + /** + * Release the lock with a CAS Lua so we only delete it when WE still hold it + * (the value matches our instanceId) — never another replica's lock that took + * over after our TTL expired. + */ + private async release(spaceId: string): Promise { + const lua = + 'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end'; + try { + await this.redis.eval(lua, 1, GIT_SYNC_LOCK_PREFIX + spaceId, this.instanceId); + } catch (err) { + this.logger.warn( + `git-sync: failed to release lock for space ${spaceId}: ${ + err instanceof Error ? err.message : String(err) + }`, + ); + } + } + + // --- enabled-space enumeration (plan §10) -------------------------------- + + /** + * Enumerate the spaces the poll loop should reconcile. Prefers the Phase-C + * per-space flag (`settings->'gitSync'->>'enabled' = 'true'`); when NO space + * carries it yet (UI not built), falls back to enumerating ALL live spaces (the + * GIT_SYNC_ENABLED master switch already gates whether we get here at all). + */ + private async enabledSpaces(): Promise { + const flagged = await this.db + .selectFrom('spaces') + .select(['id as spaceId', 'workspaceId']) + .where('deletedAt', 'is', null) + .where(sql`settings->'gitSync'->>'enabled' = 'true'`) + .execute(); + + if (flagged.length > 0) return flagged; + + // No per-space flag set yet (Phase C UI pending): the master switch enables + // all spaces so the feature can be verified end-to-end before the UI lands. + return this.db + .selectFrom('spaces') + .select(['id as spaceId', 'workspaceId']) + .where('deletedAt', 'is', null) + .execute(); + } + + // --- one sync cycle for a space (plan §11) ------------------------------- + + /** + * Build the engine `Settings` for a space. The engine's REST-era fields + * (docmostApiUrl/email/password) are unused on the native path — the + * datasource writes in-process — so they are placeholders; only `vaultPath`, + * `gitRemote`, and the tunables are load-bearing. + */ + private buildSettings(spaceId: string): Settings { + const remoteTemplate = this.environmentService.getGitSyncRemoteTemplate(); + const gitRemote = remoteTemplate + ? remoteTemplate.replace(/\{spaceId\}/g, spaceId) + : undefined; + return { + docmostApiUrl: 'http://native.local', + docmostEmail: 'native@local', + docmostPassword: 'native', + docmostSpaceId: spaceId, + vaultPath: this.vaultRegistry.vaultPath(spaceId), + gitRemote, + pollIntervalMs: this.environmentService.getGitSyncPollIntervalMs(), + debounceMs: this.environmentService.getGitSyncDebounceMs(), + logLevel: 'info', + }; + } + + /** + * Run one full PULL + PUSH cycle for a space, under the Redis leader lock and + * the in-process mutex. Never throws — per-space errors are caught, logged, and + * returned in the status so a poll interval is never broken by one bad space. + */ + async runOnce( + spaceId: string, + workspaceId: string, + ): Promise { + if (!this.environmentService.isGitSyncEnabled()) { + return { spaceId, ran: false, skipped: 'disabled' }; + } + const serviceUserId = this.environmentService.getGitSyncServiceUserId(); + if (!serviceUserId) { + this.logger.error( + 'git-sync: GIT_SYNC_SERVICE_USER_ID is required when GIT_SYNC_ENABLED — skipping', + ); + return { spaceId, ran: false, skipped: 'no-service-user' }; + } + + // In-process mutex: never run two overlapping cycles for the same space on + // this instance (the Redis lock guards cross-instance, this guards in-proc). + if (this.running.has(spaceId)) { + return { spaceId, ran: false, skipped: 'in-progress' }; + } + + // Redis leader lock: only the holder runs the cycle (plan §9). + if (!(await this.acquire(spaceId))) { + return { spaceId, ran: false, skipped: 'lock-held' }; + } + + this.running.add(spaceId); + try { + return await this.driveCycle(spaceId, workspaceId, serviceUserId); + } catch (err) { + const message = err instanceof Error ? err.message : String(err); + this.logger.error(`git-sync: cycle failed for space ${spaceId}: ${message}`); + return { spaceId, ran: false, error: message }; + } finally { + this.running.delete(spaceId); + await this.release(spaceId); + } + } + + /** + * The actual engine wiring (plan §11). Mirrors the engine's own `main`: + * PULL — readExisting -> computePullActions -> applyPullActions, + * PUSH — runPush (dry-run disabled: a real apply). + * The dependency-object shapes match pull.ts/push.ts exactly (see comments). + */ + private async driveCycle( + spaceId: string, + workspaceId: string, + serviceUserId: string, + ): Promise { + const settings = this.buildSettings(spaceId); + const vault = await this.vaultRegistry.getVault(spaceId); + const vaultRoot = settings.vaultPath; + const client = this.dataSource.bind({ workspaceId, userId: serviceUserId }); + + // Engine state store is git: make sure the repo + branches exist before any + // tracked-file listing or diff (the engine's pull/push assume an inited repo). + await vault.assertGitAvailable(); + await vault.ensureRepo(); + await vault.ensureBranch('docmost', 'main'); + + // --- PULL (plan §11.1/§11.2) -------------------------------------------- + // readExisting deps (ReadExistingDeps): list tracked *.md + read by relPath. + const existing = await readExisting({ + listTracked: () => vault.listTrackedFiles('*.md'), + readFile: (relPath) => readFile(`${vaultRoot}/${relPath}`, 'utf8'), + }); + + const tree = await client.listSpaceTree(spaceId); + const pullActions = computePullActions({ + pages: tree.pages, + treeComplete: true, + existing, + }); + + // applyPullActions deps (ApplyPullActionsDeps): the read-side client subset, + // the vault git subset, and ABSOLUTE-path fs ops (mkdir/writeFile/rm). + const pullResult = await applyPullActions( + { + client, + git: vault, + writeFile: (absPath, text) => writeFile(absPath, text, 'utf8'), + mkdir: (absDir) => mkdir(absDir, { recursive: true }).then(() => undefined), + rm: (absPath) => rm(absPath, { force: true }), + }, + pullActions, + vaultRoot, + ); + + // --- PUSH (plan §11.3) -------------------------------------------------- + // runPush deps (PushDeps): settings, the full vault git object (method `this` + // binding must be preserved — pass the object, not bound method refs), a + // makeClient factory returning the push client subset, vault-relative fs + // read/write, and a logger. dryRun:false performs the real Docmost writes. + const pushResult = await runPush( + { + settings, + git: vault, + makeClient: () => client, + readFile: (relPath) => readFile(`${vaultRoot}/${relPath}`, 'utf8'), + writeFile: (relPath, text) => + writeFile(`${vaultRoot}/${relPath}`, text, 'utf8'), + log: (line) => this.logger.log(`git-sync[${spaceId}] ${line}`), + }, + { dryRun: false }, + ); + + return { + spaceId, + ran: true, + pull: { + written: pullResult.written, + deleted: pullResult.deleted, + conflict: pullResult.merge.conflict, + }, + push: { + mode: pushResult.mode, + failures: pushResult.failures?.length ?? 0, + }, + }; + } + + // --- poll-safety interval (plan §10) ------------------------------------- + + /** + * Poll-safety loop: catches events missed by the listener and reconciles after + * downtime. Gated on GIT_SYNC_ENABLED. The interval is a fixed value because + * `@Interval` cannot read config at class-eval time — the body short-circuits + * when disabled. Each enabled space runs under its own lock (overlaps skipped). + * + * ScheduleModule: registered ONCE globally by TelemetryModule + * (ScheduleModule.forRoot()); GitSyncModule imports the plain ScheduleModule so + * @Interval is discovered without a duplicate forRoot (plan §6 note). + */ + @Interval('git-sync-poll', 15000) + async poll(): Promise { + if (!this.environmentService.isGitSyncEnabled()) return; + let spaces: EnabledSpace[]; + try { + spaces = await this.enabledSpaces(); + } catch (err) { + this.logger.error( + `git-sync: failed to enumerate enabled spaces: ${ + err instanceof Error ? err.message : String(err) + }`, + ); + return; + } + for (const { spaceId, workspaceId } of spaces) { + // runOnce never throws; a per-space error is logged and returned in status. + await this.runOnce(spaceId, workspaceId); + } + } +} diff --git a/apps/server/src/integrations/git-sync/services/vault-registry.service.ts b/apps/server/src/integrations/git-sync/services/vault-registry.service.ts new file mode 100644 index 00000000..b7a50637 --- /dev/null +++ b/apps/server/src/integrations/git-sync/services/vault-registry.service.ts @@ -0,0 +1,44 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { mkdir } from 'node:fs/promises'; +import { VaultGit } from '@docmost/git-sync'; +import { EnvironmentService } from '../../environment/environment.service'; + +/** + * Resolves the on-disk vault location per space and owns the (lazily created, + * cached) `VaultGit` instance for each one (plan §3/§5). + * + * Topology (plan §5): one git repo per enabled space, rooted at + * `/`. A `VaultGit` is constructed at most once per + * space and reused across cycles — it is a thin, stateless shell-out wrapper, so + * caching it just avoids re-resolving the path and re-running `mkdir`. + */ +@Injectable() +export class VaultRegistryService { + private readonly logger = new Logger(VaultRegistryService.name); + private readonly vaults = new Map(); + + constructor(private readonly environmentService: EnvironmentService) {} + + /** Absolute vault path for a space: `/`. */ + vaultPath(spaceId: string): string { + const root = this.environmentService.getGitSyncDataDir().replace(/\/+$/, ''); + return `${root}/${spaceId}`; + } + + /** + * Get (or lazily construct + cache) the `VaultGit` for a space, ensuring its + * directory exists. `VaultGit.ensureRepo()` is NOT called here — the engine's + * pull/push paths call it (and the branch/ref setup) as their first step; this + * only guarantees the parent dir exists so a fresh space does not ENOENT. + */ + async getVault(spaceId: string): Promise { + const cached = this.vaults.get(spaceId); + if (cached) return cached; + + const path = this.vaultPath(spaceId); + await mkdir(path, { recursive: true }); + const vault = new VaultGit(path); + this.vaults.set(spaceId, vault); + return vault; + } +}