feat(git-sync): GitSyncModule orchestrator + config + listener (Phase A.4b/B)

Control plane wiring (plan §5-§11):
- PageService create/update/movePage now honor provenance actor 'git-sync'
  (stamp lastUpdatedSource='git-sync'), closing the A.4a gap.
- EnvironmentService: GIT_SYNC_ENABLED / DATA_DIR / REMOTE_TEMPLATE /
  POLL_INTERVAL_MS / DEBOUNCE_MS / SERVICE_USER_ID (required-if-enabled) /
  SSH_KEY_PATH + validation.
- VaultRegistryService: per-space vault path + cached VaultGit.
- GitSyncOrchestrator: per-space Redis leader-lock (SET NX PX + CAS-Lua release,
  randomUUID instanceId) + in-process mutex; runOnce drives the vendored engine
  PULL (readExisting->computePullActions->applyPullActions) then PUSH (runPush)
  with the bound native GitSyncClient + VaultGit; @Interval poll-safety gated on
  GIT_SYNC_ENABLED; imports plain ScheduleModule (TelemetryModule owns forRoot).
- PageChangeListener: @OnEvent PAGE_* -> per-space debounce -> runOnce, with a
  best-effort lastUpdatedSource==='git-sync' loop-guard.
- GitSyncController: admin POST /api/git-sync/trigger + GET /status (ops/e2e).
- GitSyncModule registered in app.module. Enabled-space enumeration uses
  settings.gitSync.enabled, falling back to all live spaces until Phase C writes
  the flag (master gate = GIT_SYNC_ENABLED).

tsc clean; 713 tests/71 suites pass; dev server hot-reloaded the module (route
live, DI graph boots). Live pull/push round-trip verified next.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
claude code agent 227
2026-06-21 15:04:31 +03:00
parent afe1ba8398
commit 901147a224
10 changed files with 794 additions and 0 deletions

View File

@@ -28,6 +28,7 @@ import { ClsModule } from 'nestjs-cls';
import { NoopAuditModule } from './integrations/audit/audit.module';
import { ThrottleModule } from './integrations/throttle/throttle.module';
import { McpModule } from './integrations/mcp/mcp.module';
import { GitSyncModule } from './integrations/git-sync/git-sync.module';
import { AiModule } from './integrations/ai/ai.module';
import { AiChatModule } from './core/ai-chat/ai-chat.module';
@@ -89,6 +90,7 @@ try {
TelemetryModule,
ThrottleModule,
McpModule,
GitSyncModule,
AiModule,
AiChatModule,
...enterpriseModules,

View File

@@ -62,6 +62,14 @@ export function agentSourceFields<S extends string, C extends string>(
sourceKey: S,
chatKey: C,
): Partial<Record<S, ProvenanceSource> & Record<C, string | null>> {
// git-sync data-plane write (plan §8.1): stamp the source 'git-sync' with NO
// aiChatId (it has no internal ai_chats row). Mirrors the agent branch; each
// write has a single actor, so precedence is irrelevant here.
if (provenance?.actor === 'git-sync') {
return { [sourceKey]: 'git-sync' } as Partial<
Record<S, ProvenanceSource> & Record<C, string | null>
>;
}
if (provenance?.actor !== 'agent') return {};
return {
[sourceKey]: 'agent',

View File

@@ -320,4 +320,58 @@ export class EnvironmentService {
.map((o) => o.trim())
.filter(Boolean);
}
// --- git-sync (plan §7.2) -------------------------------------------------
/** Global master switch for the git-sync control plane (default false). */
isGitSyncEnabled(): boolean {
return (
this.configService.get<string>('GIT_SYNC_ENABLED', 'false').toLowerCase() ===
'true'
);
}
/**
* Root directory holding the per-space vault repos. Defaults to
* `<DATA_DIR or ./data>/git-sync`. `DATA_DIR` is read directly (no dedicated
* getter exists in this codebase) so the vault root tracks the data volume.
*/
getGitSyncDataDir(): string {
const explicit = this.configService.get<string>('GIT_SYNC_DATA_DIR');
if (explicit) return explicit;
const dataDir = this.configService.get<string>('DATA_DIR') || './data';
return `${dataDir.replace(/\/+$/, '')}/git-sync`;
}
/** Optional remote template, e.g. `git@host:vault-{spaceId}.git`. */
getGitSyncRemoteTemplate(): string | undefined {
return this.configService.get<string>('GIT_SYNC_REMOTE_TEMPLATE');
}
/** Poll-safety interval in ms (default 15000). */
getGitSyncPollIntervalMs(): number {
return parseInt(
this.configService.get<string>('GIT_SYNC_POLL_INTERVAL_MS', '15000'),
);
}
/** Event debounce window in ms (default 2000). */
getGitSyncDebounceMs(): number {
return parseInt(
this.configService.get<string>('GIT_SYNC_DEBOUNCE_MS', '2000'),
);
}
/**
* The service user id git-sync writes are attributed to. Required when sync is
* enabled (validated in environment.validation.ts); optional otherwise.
*/
getGitSyncServiceUserId(): string | undefined {
return this.configService.get<string>('GIT_SYNC_SERVICE_USER_ID');
}
/** Optional path to the SSH key used for git remote access. */
getGitSyncSshKeyPath(): string | undefined {
return this.configService.get<string>('GIT_SYNC_SSH_KEY_PATH');
}
}

View File

@@ -170,6 +170,41 @@ export class EnvironmentVariables {
},
)
CLICKHOUSE_URL: string;
// --- git-sync (plan §7.2) — all OPTIONAL. The master switch defaults off; a
// required-if-enabled service user id is validated only when sync is on. ---
@IsOptional()
@IsIn(['true', 'false'])
@IsString()
GIT_SYNC_ENABLED: string;
@IsOptional()
@IsString()
GIT_SYNC_DATA_DIR: string;
@IsOptional()
@IsString()
GIT_SYNC_REMOTE_TEMPLATE: string;
@IsOptional()
@IsString()
GIT_SYNC_POLL_INTERVAL_MS: string;
@IsOptional()
@IsString()
GIT_SYNC_DEBOUNCE_MS: string;
// Required when git-sync is enabled: the service user create/move/rename/delete
// are attributed to (plan §7.2). Optional otherwise.
@ValidateIf((obj) => obj.GIT_SYNC_ENABLED === 'true')
@IsNotEmpty()
@IsString()
GIT_SYNC_SERVICE_USER_ID: string;
@IsOptional()
@IsString()
GIT_SYNC_SSH_KEY_PATH: string;
}
export function validate(config: Record<string, any>) {

View File

@@ -0,0 +1,45 @@
/**
* Git-sync control-plane constants (plan §6/§9/§10).
*
* Event/job names are REUSED from the shared event contract (event.contants.ts)
* so the listener subscribes to the exact names the rest of the server emits —
* never a string literal that could drift. The Redis lock-key prefix + TTLs back
* the single-writer leader lock (§9); the debounce default backs the per-space
* event coalescing (§10).
*/
import { EventName } from '../../common/events/event.contants';
/**
* The page lifecycle events the git-sync listener reacts to (plan §10). A change
* to any of these in an enabled space schedules a debounced sync cycle.
* - PAGE_CREATED / PAGE_UPDATED / PAGE_MOVED — structural + content edits;
* - PAGE_CONTENT_UPDATED — the collab body-save job (real name in the enum);
* - PAGE_SOFT_DELETED / PAGE_RESTORED — Trash transitions (deletes are soft);
* - PAGE_MOVED_TO_SPACE — cross-space move (cross-repo, plan §5).
*/
export const GIT_SYNC_PAGE_EVENTS = [
EventName.PAGE_CREATED,
EventName.PAGE_UPDATED,
EventName.PAGE_CONTENT_UPDATED,
EventName.PAGE_MOVED,
EventName.PAGE_MOVED_TO_SPACE,
EventName.PAGE_SOFT_DELETED,
EventName.PAGE_RESTORED,
] as const;
/** Redis key prefix for the per-space leader lock (plan §9). */
export const GIT_SYNC_LOCK_PREFIX = 'git-sync:lock:';
/**
* Leader-lock TTL (ms). Must exceed the maximum expected cycle duration so the
* lock is not lost mid-cycle; on a crash it expires on its own (plan §9). The
* in-process mutex (orchestrator) prevents overlapping cycles on one instance,
* and the Redis lock prevents two instances racing the same space.
*/
export const GIT_SYNC_LOCK_TTL_MS = 5 * 60 * 1000;
/** Default event-debounce window (ms), overridable via GIT_SYNC_DEBOUNCE_MS. */
export const GIT_SYNC_DEBOUNCE_MS_DEFAULT = 2000;
/** Default poll-safety interval (ms), overridable via GIT_SYNC_POLL_INTERVAL_MS. */
export const GIT_SYNC_POLL_INTERVAL_MS_DEFAULT = 15000;

View File

@@ -0,0 +1,92 @@
import {
Body,
Controller,
ForbiddenException,
HttpCode,
HttpStatus,
Post,
Get,
UseGuards,
} from '@nestjs/common';
import { JwtAuthGuard } from '../../common/guards/jwt-auth.guard';
import { AuthUser } from '../../common/decorators/auth-user.decorator';
import { AuthWorkspace } from '../../common/decorators/auth-workspace.decorator';
import { User, Workspace } from '@docmost/db/types/entity.types';
import WorkspaceAbilityFactory from '../../core/casl/abilities/workspace-ability.factory';
import {
WorkspaceCaslAction,
WorkspaceCaslSubject,
} from '../../core/casl/interfaces/workspace-ability.type';
import { EnvironmentService } from '../environment/environment.service';
import {
GitSyncOrchestrator,
GitSyncRunStatus,
} from './services/git-sync.orchestrator';
/** Body for the manual one-shot trigger. */
class TriggerGitSyncDto {
spaceId: string;
}
/**
* Ops/testing endpoints for the git-sync control plane (plan §6). Admin-guarded
* (workspace Manage/Settings, mirroring WorkspaceController) so only workspace
* admins can force a cycle. Mounted under the global `/api` prefix:
* - POST /api/git-sync/trigger { spaceId } — run one cycle now (await result),
* - GET /api/git-sync/status — report whether sync is enabled + config.
*/
@UseGuards(JwtAuthGuard)
@Controller('git-sync')
export class GitSyncController {
constructor(
private readonly orchestrator: GitSyncOrchestrator,
private readonly environmentService: EnvironmentService,
private readonly workspaceAbility: WorkspaceAbilityFactory,
) {}
/** Throw unless the caller is a workspace admin (Manage Settings). */
private assertAdmin(user: User, workspace: Workspace): void {
const ability = this.workspaceAbility.createForUser(user, workspace);
if (
ability.cannot(WorkspaceCaslAction.Manage, WorkspaceCaslSubject.Settings)
) {
throw new ForbiddenException();
}
}
@HttpCode(HttpStatus.OK)
@Post('trigger')
async trigger(
@Body() dto: TriggerGitSyncDto,
@AuthUser() user: User,
@AuthWorkspace() workspace: Workspace,
): Promise<GitSyncRunStatus> {
this.assertAdmin(user, workspace);
// Use the workspace from the request context (never client-supplied).
return this.orchestrator.runOnce(dto.spaceId, workspace.id);
}
@HttpCode(HttpStatus.OK)
@Get('status')
async status(
@AuthUser() user: User,
@AuthWorkspace() workspace: Workspace,
): Promise<{
enabled: boolean;
dataDir: string;
pollIntervalMs: number;
debounceMs: number;
serviceUserConfigured: boolean;
}> {
this.assertAdmin(user, workspace);
return {
enabled: this.environmentService.isGitSyncEnabled(),
dataDir: this.environmentService.getGitSyncDataDir(),
pollIntervalMs: this.environmentService.getGitSyncPollIntervalMs(),
debounceMs: this.environmentService.getGitSyncDebounceMs(),
serviceUserConfigured: Boolean(
this.environmentService.getGitSyncServiceUserId(),
),
};
}
}

View File

@@ -0,0 +1,48 @@
import { Module } from '@nestjs/common';
import { ScheduleModule } from '@nestjs/schedule';
import { DatabaseModule } from '@docmost/db/database.module';
import { EnvironmentModule } from '../environment/environment.module';
import { CollaborationModule } from '../../collaboration/collaboration.module';
import { PageModule } from '../../core/page/page.module';
import { GitmostDataSourceService } from './services/gitmost-datasource.service';
import { GitSyncOrchestrator } from './services/git-sync.orchestrator';
import { VaultRegistryService } from './services/vault-registry.service';
import { PageChangeListener } from './listeners/page-change.listener';
import { GitSyncController } from './git-sync.controller';
/**
* The git-sync control plane (plan §6). Wires the native datasource, the
* orchestrator (poll + leader-lock), the per-space vault registry, the
* event-driven listener, and the admin trigger controller.
*
* Imports:
* - DatabaseModule (global) — PageRepo / SpaceRepo / KyselyDB for the
* datasource + orchestrator queries;
* - EnvironmentModule (global) — EnvironmentService config;
* - CollaborationModule — exports CollaborationGateway for native body writes;
* - PageModule — exports PageService for structural mutations;
* - ScheduleModule (NOT forRoot) — so @Interval is discovered. forRoot() is
* already registered globally by TelemetryModule; importing the plain module
* here avoids a duplicate scheduler registration (plan §6 note).
*
* RedisService is provided by the global RedisModule (app.module) and CASL's
* WorkspaceAbilityFactory by the global CaslModule — both resolve without an
* explicit import here.
*/
@Module({
imports: [
DatabaseModule,
EnvironmentModule,
CollaborationModule,
PageModule,
ScheduleModule,
],
controllers: [GitSyncController],
providers: [
GitmostDataSourceService,
GitSyncOrchestrator,
VaultRegistryService,
PageChangeListener,
],
})
export class GitSyncModule {}

View File

@@ -0,0 +1,143 @@
import { Injectable, Logger } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { PageRepo } from '@docmost/db/repos/page/page.repo';
import { EnvironmentService } from '../../environment/environment.service';
import { GitSyncOrchestrator } from '../services/git-sync.orchestrator';
import { GIT_SYNC_PAGE_EVENTS } from '../git-sync.constants';
/**
* Shape of the page domain events the listener consumes. Different emit sites
* carry different optional fields (page.repo `PageEvent`, `PageMovedEvent`,
* etc.), so this is the intersection we read: a `pageIds` list / single `pageId`,
* the `workspaceId`, and an OPTIONAL `spaceId` (present only on some events). When
* `spaceId` is absent we resolve it from the page row.
*/
interface PageEventLike {
pageIds?: string[];
pageId?: string;
workspaceId?: string;
spaceId?: string;
pages?: { id: string; spaceId: string }[];
node?: { id: string; spaceId: string };
}
/** Per-space debounce bookkeeping. */
interface DebounceEntry {
timer: NodeJS.Timeout;
workspaceId: string;
}
/**
* Event-driven trigger for the git-sync control plane (plan §10). Subscribes to
* the page lifecycle events and, for an enabled space, schedules a DEBOUNCED
* `orchestrator.runOnce(spaceId, workspaceId)` — coalescing a burst of edits into
* a single cycle per space.
*
* Loop-guard (best-effort, plan §10/§8.2): an event whose page row already reads
* `lastUpdatedSource === 'git-sync'` is the orchestrator's OWN write, so we skip
* it to avoid a write -> event -> sync echo. This is the cheap first guard; the
* full bodyHash + updatedAt loop-guard (consuming the push side's
* `PushedPageRecord`) is a later hardening step (plan §8.2) — noted, not built
* here. The poll-safety interval still converges anything this guard drops.
*/
@Injectable()
export class PageChangeListener {
private readonly logger = new Logger(PageChangeListener.name);
private readonly debounce = new Map<string, DebounceEntry>();
constructor(
private readonly environmentService: EnvironmentService,
private readonly orchestrator: GitSyncOrchestrator,
private readonly pageRepo: PageRepo,
) {}
/**
* One handler bound to ALL git-sync page events (the array form of `@OnEvent`).
* Resolves the affected page's space + workspace, applies the cheap loop-guard,
* and schedules the debounced cycle.
*/
@OnEvent(GIT_SYNC_PAGE_EVENTS as unknown as string[])
async handlePageEvent(event: PageEventLike): Promise<void> {
if (!this.environmentService.isGitSyncEnabled()) return;
try {
const pageId = this.firstPageId(event);
if (!pageId) return;
// Prefer a spaceId carried on the event; otherwise read the page row (also
// gives us the loop-guard source). A missing page (hard-deleted) is ignored.
let spaceId = this.eventSpaceId(event, pageId);
let workspaceId = event.workspaceId;
if (!spaceId || !workspaceId) {
const page = await this.pageRepo.findById(pageId, {
includeContent: false,
});
if (!page) return;
spaceId = spaceId ?? page.spaceId;
workspaceId = workspaceId ?? page.workspaceId;
// Loop-guard: skip our own writes (best-effort, plan §8.2).
if (page.lastUpdatedSource === 'git-sync') return;
}
if (!spaceId || !workspaceId) return;
this.schedule(spaceId, workspaceId);
} catch (err) {
this.logger.warn(
`git-sync: failed to handle page event: ${
err instanceof Error ? err.message : String(err)
}`,
);
}
}
/** Pull the first affected pageId out of the heterogeneous event shapes. */
private firstPageId(event: PageEventLike): string | undefined {
return (
event.pageId ??
event.pageIds?.[0] ??
event.pages?.[0]?.id ??
event.node?.id
);
}
/** A spaceId carried directly on the event, for the given pageId if scoped. */
private eventSpaceId(
event: PageEventLike,
pageId: string,
): string | undefined {
if (event.spaceId) return event.spaceId;
const fromPages = event.pages?.find((p) => p.id === pageId)?.spaceId;
if (fromPages) return fromPages;
if (event.node?.id === pageId) return event.node.spaceId;
return undefined;
}
/**
* Debounce per space: a new event resets the timer so a burst collapses into a
* single cycle. On fire, `runOnce` is enqueued (it internally serializes via the
* in-process mutex + Redis lock, so a still-running cycle is simply skipped and
* the next event reschedules).
*/
private schedule(spaceId: string, workspaceId: string): void {
const existing = this.debounce.get(spaceId);
if (existing) clearTimeout(existing.timer);
const timer = setTimeout(() => {
this.debounce.delete(spaceId);
void this.orchestrator
.runOnce(spaceId, workspaceId)
.catch((err) =>
this.logger.error(
`git-sync: debounced cycle for space ${spaceId} failed: ${
err instanceof Error ? err.message : String(err)
}`,
),
);
}, this.environmentService.getGitSyncDebounceMs());
// Do not keep the event loop alive solely for a pending sync.
timer.unref?.();
this.debounce.set(spaceId, { timer, workspaceId });
}
}

View File

@@ -0,0 +1,323 @@
import { Injectable, Logger } from '@nestjs/common';
import { Interval } from '@nestjs/schedule';
import { RedisService } from '@nestjs-labs/nestjs-ioredis';
import type { Redis } from 'ioredis';
import { randomUUID } from 'node:crypto';
import { mkdir, readFile, rm, writeFile } from 'node:fs/promises';
import { dirname } from 'node:path';
import { InjectKysely } from 'nestjs-kysely';
import { KyselyDB } from '@docmost/db/types/kysely.types';
import { sql } from 'kysely';
import {
type Settings,
readExisting,
computePullActions,
applyPullActions,
runPush,
} from '@docmost/git-sync';
import { EnvironmentService } from '../../environment/environment.service';
import { GitmostDataSourceService } from './gitmost-datasource.service';
import { VaultRegistryService } from './vault-registry.service';
import {
GIT_SYNC_LOCK_PREFIX,
GIT_SYNC_LOCK_TTL_MS,
} from '../git-sync.constants';
/** A space the poll loop should reconcile: its id + the workspace it lives in. */
interface EnabledSpace {
spaceId: string;
workspaceId: string;
}
/** Small status summary returned by `runOnce` (for the admin trigger + logs). */
export interface GitSyncRunStatus {
spaceId: string;
ran: boolean;
/** Why the cycle did not run (lock held elsewhere, busy, disabled, error). */
skipped?: 'lock-held' | 'in-progress' | 'disabled' | 'no-service-user';
pull?: { written: number; deleted: number; conflict: boolean };
push?: { mode: string; failures: number };
error?: string;
}
/**
* The git-sync control plane (plan §9/§10/§11). Drives the vendored engine in
* process: under a Redis leader lock (single-writer across replicas) plus an
* in-process per-space mutex (no overlapping cycles on one instance), it runs a
* PULL (Docmost -> vault) then a PUSH (vault -> Docmost) for a space.
*
* Enumeration of enabled spaces (plan §10 / Phase-C dependency): the per-space
* UI flag `space.settings.gitSync.enabled` is Phase C and not built yet, so this
* queries spaces whose jsonb flag is already set AND, when none are, treats
* GIT_SYNC_ENABLED as a master switch that enables ALL spaces (so the feature is
* exercisable before the UI lands). Once Phase C writes the flag, only flagged
* spaces sync. The whole loop is gated on GIT_SYNC_ENABLED first.
*/
@Injectable()
export class GitSyncOrchestrator {
private readonly logger = new Logger(GitSyncOrchestrator.name);
private readonly redis: Redis;
/** Unique per process instance — the leader-lock value (CAS on release). */
private readonly instanceId = randomUUID();
/** In-process per-space mutex: spaceIds with a cycle currently running. */
private readonly running = new Set<string>();
constructor(
private readonly environmentService: EnvironmentService,
private readonly dataSource: GitmostDataSourceService,
private readonly vaultRegistry: VaultRegistryService,
redisService: RedisService,
@InjectKysely() private readonly db: KyselyDB,
) {
this.redis = redisService.getOrThrow();
}
// --- Redis leader lock (plan §9) -----------------------------------------
/**
* Acquire per-space leadership: `SET <key> <instanceId> PX <ttl> NX` returns
* 'OK' only when the key did not exist. Any other reply means another replica
* holds it.
*/
private async acquire(spaceId: string): Promise<boolean> {
const ok = await this.redis.set(
GIT_SYNC_LOCK_PREFIX + spaceId,
this.instanceId,
'PX',
GIT_SYNC_LOCK_TTL_MS,
'NX',
);
return ok === 'OK';
}
/**
* Release the lock with a CAS Lua so we only delete it when WE still hold it
* (the value matches our instanceId) — never another replica's lock that took
* over after our TTL expired.
*/
private async release(spaceId: string): Promise<void> {
const lua =
'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end';
try {
await this.redis.eval(lua, 1, GIT_SYNC_LOCK_PREFIX + spaceId, this.instanceId);
} catch (err) {
this.logger.warn(
`git-sync: failed to release lock for space ${spaceId}: ${
err instanceof Error ? err.message : String(err)
}`,
);
}
}
// --- enabled-space enumeration (plan §10) --------------------------------
/**
* Enumerate the spaces the poll loop should reconcile. Prefers the Phase-C
* per-space flag (`settings->'gitSync'->>'enabled' = 'true'`); when NO space
* carries it yet (UI not built), falls back to enumerating ALL live spaces (the
* GIT_SYNC_ENABLED master switch already gates whether we get here at all).
*/
private async enabledSpaces(): Promise<EnabledSpace[]> {
const flagged = await this.db
.selectFrom('spaces')
.select(['id as spaceId', 'workspaceId'])
.where('deletedAt', 'is', null)
.where(sql<boolean>`settings->'gitSync'->>'enabled' = 'true'`)
.execute();
if (flagged.length > 0) return flagged;
// No per-space flag set yet (Phase C UI pending): the master switch enables
// all spaces so the feature can be verified end-to-end before the UI lands.
return this.db
.selectFrom('spaces')
.select(['id as spaceId', 'workspaceId'])
.where('deletedAt', 'is', null)
.execute();
}
// --- one sync cycle for a space (plan §11) -------------------------------
/**
* Build the engine `Settings` for a space. The engine's REST-era fields
* (docmostApiUrl/email/password) are unused on the native path — the
* datasource writes in-process — so they are placeholders; only `vaultPath`,
* `gitRemote`, and the tunables are load-bearing.
*/
private buildSettings(spaceId: string): Settings {
const remoteTemplate = this.environmentService.getGitSyncRemoteTemplate();
const gitRemote = remoteTemplate
? remoteTemplate.replace(/\{spaceId\}/g, spaceId)
: undefined;
return {
docmostApiUrl: 'http://native.local',
docmostEmail: 'native@local',
docmostPassword: 'native',
docmostSpaceId: spaceId,
vaultPath: this.vaultRegistry.vaultPath(spaceId),
gitRemote,
pollIntervalMs: this.environmentService.getGitSyncPollIntervalMs(),
debounceMs: this.environmentService.getGitSyncDebounceMs(),
logLevel: 'info',
};
}
/**
* Run one full PULL + PUSH cycle for a space, under the Redis leader lock and
* the in-process mutex. Never throws — per-space errors are caught, logged, and
* returned in the status so a poll interval is never broken by one bad space.
*/
async runOnce(
spaceId: string,
workspaceId: string,
): Promise<GitSyncRunStatus> {
if (!this.environmentService.isGitSyncEnabled()) {
return { spaceId, ran: false, skipped: 'disabled' };
}
const serviceUserId = this.environmentService.getGitSyncServiceUserId();
if (!serviceUserId) {
this.logger.error(
'git-sync: GIT_SYNC_SERVICE_USER_ID is required when GIT_SYNC_ENABLED — skipping',
);
return { spaceId, ran: false, skipped: 'no-service-user' };
}
// In-process mutex: never run two overlapping cycles for the same space on
// this instance (the Redis lock guards cross-instance, this guards in-proc).
if (this.running.has(spaceId)) {
return { spaceId, ran: false, skipped: 'in-progress' };
}
// Redis leader lock: only the holder runs the cycle (plan §9).
if (!(await this.acquire(spaceId))) {
return { spaceId, ran: false, skipped: 'lock-held' };
}
this.running.add(spaceId);
try {
return await this.driveCycle(spaceId, workspaceId, serviceUserId);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
this.logger.error(`git-sync: cycle failed for space ${spaceId}: ${message}`);
return { spaceId, ran: false, error: message };
} finally {
this.running.delete(spaceId);
await this.release(spaceId);
}
}
/**
* The actual engine wiring (plan §11). Mirrors the engine's own `main`:
* PULL — readExisting -> computePullActions -> applyPullActions,
* PUSH — runPush (dry-run disabled: a real apply).
* The dependency-object shapes match pull.ts/push.ts exactly (see comments).
*/
private async driveCycle(
spaceId: string,
workspaceId: string,
serviceUserId: string,
): Promise<GitSyncRunStatus> {
const settings = this.buildSettings(spaceId);
const vault = await this.vaultRegistry.getVault(spaceId);
const vaultRoot = settings.vaultPath;
const client = this.dataSource.bind({ workspaceId, userId: serviceUserId });
// Engine state store is git: make sure the repo + branches exist before any
// tracked-file listing or diff (the engine's pull/push assume an inited repo).
await vault.assertGitAvailable();
await vault.ensureRepo();
await vault.ensureBranch('docmost', 'main');
// --- PULL (plan §11.1/§11.2) --------------------------------------------
// readExisting deps (ReadExistingDeps): list tracked *.md + read by relPath.
const existing = await readExisting({
listTracked: () => vault.listTrackedFiles('*.md'),
readFile: (relPath) => readFile(`${vaultRoot}/${relPath}`, 'utf8'),
});
const tree = await client.listSpaceTree(spaceId);
const pullActions = computePullActions({
pages: tree.pages,
treeComplete: true,
existing,
});
// applyPullActions deps (ApplyPullActionsDeps): the read-side client subset,
// the vault git subset, and ABSOLUTE-path fs ops (mkdir/writeFile/rm).
const pullResult = await applyPullActions(
{
client,
git: vault,
writeFile: (absPath, text) => writeFile(absPath, text, 'utf8'),
mkdir: (absDir) => mkdir(absDir, { recursive: true }).then(() => undefined),
rm: (absPath) => rm(absPath, { force: true }),
},
pullActions,
vaultRoot,
);
// --- PUSH (plan §11.3) --------------------------------------------------
// runPush deps (PushDeps): settings, the full vault git object (method `this`
// binding must be preserved — pass the object, not bound method refs), a
// makeClient factory returning the push client subset, vault-relative fs
// read/write, and a logger. dryRun:false performs the real Docmost writes.
const pushResult = await runPush(
{
settings,
git: vault,
makeClient: () => client,
readFile: (relPath) => readFile(`${vaultRoot}/${relPath}`, 'utf8'),
writeFile: (relPath, text) =>
writeFile(`${vaultRoot}/${relPath}`, text, 'utf8'),
log: (line) => this.logger.log(`git-sync[${spaceId}] ${line}`),
},
{ dryRun: false },
);
return {
spaceId,
ran: true,
pull: {
written: pullResult.written,
deleted: pullResult.deleted,
conflict: pullResult.merge.conflict,
},
push: {
mode: pushResult.mode,
failures: pushResult.failures?.length ?? 0,
},
};
}
// --- poll-safety interval (plan §10) -------------------------------------
/**
* Poll-safety loop: catches events missed by the listener and reconciles after
* downtime. Gated on GIT_SYNC_ENABLED. The interval is a fixed value because
* `@Interval` cannot read config at class-eval time — the body short-circuits
* when disabled. Each enabled space runs under its own lock (overlaps skipped).
*
* ScheduleModule: registered ONCE globally by TelemetryModule
* (ScheduleModule.forRoot()); GitSyncModule imports the plain ScheduleModule so
* @Interval is discovered without a duplicate forRoot (plan §6 note).
*/
@Interval('git-sync-poll', 15000)
async poll(): Promise<void> {
if (!this.environmentService.isGitSyncEnabled()) return;
let spaces: EnabledSpace[];
try {
spaces = await this.enabledSpaces();
} catch (err) {
this.logger.error(
`git-sync: failed to enumerate enabled spaces: ${
err instanceof Error ? err.message : String(err)
}`,
);
return;
}
for (const { spaceId, workspaceId } of spaces) {
// runOnce never throws; a per-space error is logged and returned in status.
await this.runOnce(spaceId, workspaceId);
}
}
}

View File

@@ -0,0 +1,44 @@
import { Injectable, Logger } from '@nestjs/common';
import { mkdir } from 'node:fs/promises';
import { VaultGit } from '@docmost/git-sync';
import { EnvironmentService } from '../../environment/environment.service';
/**
* Resolves the on-disk vault location per space and owns the (lazily created,
* cached) `VaultGit` instance for each one (plan §3/§5).
*
* Topology (plan §5): one git repo per enabled space, rooted at
* `<GIT_SYNC_DATA_DIR>/<spaceId>`. A `VaultGit` is constructed at most once per
* space and reused across cycles — it is a thin, stateless shell-out wrapper, so
* caching it just avoids re-resolving the path and re-running `mkdir`.
*/
@Injectable()
export class VaultRegistryService {
private readonly logger = new Logger(VaultRegistryService.name);
private readonly vaults = new Map<string, VaultGit>();
constructor(private readonly environmentService: EnvironmentService) {}
/** Absolute vault path for a space: `<GIT_SYNC_DATA_DIR>/<spaceId>`. */
vaultPath(spaceId: string): string {
const root = this.environmentService.getGitSyncDataDir().replace(/\/+$/, '');
return `${root}/${spaceId}`;
}
/**
* Get (or lazily construct + cache) the `VaultGit` for a space, ensuring its
* directory exists. `VaultGit.ensureRepo()` is NOT called here — the engine's
* pull/push paths call it (and the branch/ref setup) as their first step; this
* only guarantees the parent dir exists so a fresh space does not ENOENT.
*/
async getVault(spaceId: string): Promise<VaultGit> {
const cached = this.vaults.get(spaceId);
if (cached) return cached;
const path = this.vaultPath(spaceId);
await mkdir(path, { recursive: true });
const vault = new VaultGit(path);
this.vaults.set(spaceId, vault);
return vault;
}
}