Files
gitmost/apps/server/src/integrations/git-sync/services/git-sync.orchestrator.ts
claude code agent 227 9d08a0fd39 feat(git-sync): GitSyncModule orchestrator + config + listener (Phase A.4b/B)
Control plane wiring (plan §5-§11):
- PageService create/update/movePage now honor provenance actor 'git-sync'
  (stamp lastUpdatedSource='git-sync'), closing the A.4a gap.
- EnvironmentService: GIT_SYNC_ENABLED / DATA_DIR / REMOTE_TEMPLATE /
  POLL_INTERVAL_MS / DEBOUNCE_MS / SERVICE_USER_ID (required-if-enabled) /
  SSH_KEY_PATH + validation.
- VaultRegistryService: per-space vault path + cached VaultGit.
- GitSyncOrchestrator: per-space Redis leader-lock (SET NX PX + CAS-Lua release,
  randomUUID instanceId) + in-process mutex; runOnce drives the vendored engine
  PULL (readExisting->computePullActions->applyPullActions) then PUSH (runPush)
  with the bound native GitSyncClient + VaultGit; @Interval poll-safety gated on
  GIT_SYNC_ENABLED; imports plain ScheduleModule (TelemetryModule owns forRoot).
- PageChangeListener: @OnEvent PAGE_* -> per-space debounce -> runOnce, with a
  best-effort lastUpdatedSource==='git-sync' loop-guard.
- GitSyncController: admin POST /api/git-sync/trigger + GET /status (ops/e2e).
- GitSyncModule registered in app.module. Enabled-space enumeration uses
  settings.gitSync.enabled, falling back to all live spaces until Phase C writes
  the flag (master gate = GIT_SYNC_ENABLED).

tsc clean; 713 tests/71 suites pass; dev server hot-reloaded the module (route
live, DI graph boots). Live pull/push round-trip verified next.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-24 16:49:59 +03:00

324 lines
12 KiB
TypeScript

import { Injectable, Logger } from '@nestjs/common';
import { Interval } from '@nestjs/schedule';
import { RedisService } from '@nestjs-labs/nestjs-ioredis';
import type { Redis } from 'ioredis';
import { randomUUID } from 'node:crypto';
import { mkdir, readFile, rm, writeFile } from 'node:fs/promises';
import { dirname } from 'node:path';
import { InjectKysely } from 'nestjs-kysely';
import { KyselyDB } from '@docmost/db/types/kysely.types';
import { sql } from 'kysely';
import {
type Settings,
readExisting,
computePullActions,
applyPullActions,
runPush,
} from '@docmost/git-sync';
import { EnvironmentService } from '../../environment/environment.service';
import { GitmostDataSourceService } from './gitmost-datasource.service';
import { VaultRegistryService } from './vault-registry.service';
import {
GIT_SYNC_LOCK_PREFIX,
GIT_SYNC_LOCK_TTL_MS,
} from '../git-sync.constants';
/** A space the poll loop should reconcile: its id + the workspace it lives in. */
interface EnabledSpace {
spaceId: string;
workspaceId: string;
}
/** Small status summary returned by `runOnce` (for the admin trigger + logs). */
export interface GitSyncRunStatus {
spaceId: string;
ran: boolean;
/** Why the cycle did not run (lock held elsewhere, busy, disabled, error). */
skipped?: 'lock-held' | 'in-progress' | 'disabled' | 'no-service-user';
pull?: { written: number; deleted: number; conflict: boolean };
push?: { mode: string; failures: number };
error?: string;
}
/**
* The git-sync control plane (plan §9/§10/§11). Drives the vendored engine in
* process: under a Redis leader lock (single-writer across replicas) plus an
* in-process per-space mutex (no overlapping cycles on one instance), it runs a
* PULL (Docmost -> vault) then a PUSH (vault -> Docmost) for a space.
*
* Enumeration of enabled spaces (plan §10 / Phase-C dependency): the per-space
* UI flag `space.settings.gitSync.enabled` is Phase C and not built yet, so this
* queries spaces whose jsonb flag is already set AND, when none are, treats
* GIT_SYNC_ENABLED as a master switch that enables ALL spaces (so the feature is
* exercisable before the UI lands). Once Phase C writes the flag, only flagged
* spaces sync. The whole loop is gated on GIT_SYNC_ENABLED first.
*/
@Injectable()
export class GitSyncOrchestrator {
private readonly logger = new Logger(GitSyncOrchestrator.name);
private readonly redis: Redis;
/** Unique per process instance — the leader-lock value (CAS on release). */
private readonly instanceId = randomUUID();
/** In-process per-space mutex: spaceIds with a cycle currently running. */
private readonly running = new Set<string>();
constructor(
private readonly environmentService: EnvironmentService,
private readonly dataSource: GitmostDataSourceService,
private readonly vaultRegistry: VaultRegistryService,
redisService: RedisService,
@InjectKysely() private readonly db: KyselyDB,
) {
this.redis = redisService.getOrThrow();
}
// --- Redis leader lock (plan §9) -----------------------------------------
/**
* Acquire per-space leadership: `SET <key> <instanceId> PX <ttl> NX` returns
* 'OK' only when the key did not exist. Any other reply means another replica
* holds it.
*/
private async acquire(spaceId: string): Promise<boolean> {
const ok = await this.redis.set(
GIT_SYNC_LOCK_PREFIX + spaceId,
this.instanceId,
'PX',
GIT_SYNC_LOCK_TTL_MS,
'NX',
);
return ok === 'OK';
}
/**
* Release the lock with a CAS Lua so we only delete it when WE still hold it
* (the value matches our instanceId) — never another replica's lock that took
* over after our TTL expired.
*/
private async release(spaceId: string): Promise<void> {
const lua =
'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end';
try {
await this.redis.eval(lua, 1, GIT_SYNC_LOCK_PREFIX + spaceId, this.instanceId);
} catch (err) {
this.logger.warn(
`git-sync: failed to release lock for space ${spaceId}: ${
err instanceof Error ? err.message : String(err)
}`,
);
}
}
// --- enabled-space enumeration (plan §10) --------------------------------
/**
* Enumerate the spaces the poll loop should reconcile. Prefers the Phase-C
* per-space flag (`settings->'gitSync'->>'enabled' = 'true'`); when NO space
* carries it yet (UI not built), falls back to enumerating ALL live spaces (the
* GIT_SYNC_ENABLED master switch already gates whether we get here at all).
*/
private async enabledSpaces(): Promise<EnabledSpace[]> {
const flagged = await this.db
.selectFrom('spaces')
.select(['id as spaceId', 'workspaceId'])
.where('deletedAt', 'is', null)
.where(sql<boolean>`settings->'gitSync'->>'enabled' = 'true'`)
.execute();
if (flagged.length > 0) return flagged;
// No per-space flag set yet (Phase C UI pending): the master switch enables
// all spaces so the feature can be verified end-to-end before the UI lands.
return this.db
.selectFrom('spaces')
.select(['id as spaceId', 'workspaceId'])
.where('deletedAt', 'is', null)
.execute();
}
// --- one sync cycle for a space (plan §11) -------------------------------
/**
* Build the engine `Settings` for a space. The engine's REST-era fields
* (docmostApiUrl/email/password) are unused on the native path — the
* datasource writes in-process — so they are placeholders; only `vaultPath`,
* `gitRemote`, and the tunables are load-bearing.
*/
private buildSettings(spaceId: string): Settings {
const remoteTemplate = this.environmentService.getGitSyncRemoteTemplate();
const gitRemote = remoteTemplate
? remoteTemplate.replace(/\{spaceId\}/g, spaceId)
: undefined;
return {
docmostApiUrl: 'http://native.local',
docmostEmail: 'native@local',
docmostPassword: 'native',
docmostSpaceId: spaceId,
vaultPath: this.vaultRegistry.vaultPath(spaceId),
gitRemote,
pollIntervalMs: this.environmentService.getGitSyncPollIntervalMs(),
debounceMs: this.environmentService.getGitSyncDebounceMs(),
logLevel: 'info',
};
}
/**
* Run one full PULL + PUSH cycle for a space, under the Redis leader lock and
* the in-process mutex. Never throws — per-space errors are caught, logged, and
* returned in the status so a poll interval is never broken by one bad space.
*/
async runOnce(
spaceId: string,
workspaceId: string,
): Promise<GitSyncRunStatus> {
if (!this.environmentService.isGitSyncEnabled()) {
return { spaceId, ran: false, skipped: 'disabled' };
}
const serviceUserId = this.environmentService.getGitSyncServiceUserId();
if (!serviceUserId) {
this.logger.error(
'git-sync: GIT_SYNC_SERVICE_USER_ID is required when GIT_SYNC_ENABLED — skipping',
);
return { spaceId, ran: false, skipped: 'no-service-user' };
}
// In-process mutex: never run two overlapping cycles for the same space on
// this instance (the Redis lock guards cross-instance, this guards in-proc).
if (this.running.has(spaceId)) {
return { spaceId, ran: false, skipped: 'in-progress' };
}
// Redis leader lock: only the holder runs the cycle (plan §9).
if (!(await this.acquire(spaceId))) {
return { spaceId, ran: false, skipped: 'lock-held' };
}
this.running.add(spaceId);
try {
return await this.driveCycle(spaceId, workspaceId, serviceUserId);
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
this.logger.error(`git-sync: cycle failed for space ${spaceId}: ${message}`);
return { spaceId, ran: false, error: message };
} finally {
this.running.delete(spaceId);
await this.release(spaceId);
}
}
/**
* The actual engine wiring (plan §11). Mirrors the engine's own `main`:
* PULL — readExisting -> computePullActions -> applyPullActions,
* PUSH — runPush (dry-run disabled: a real apply).
* The dependency-object shapes match pull.ts/push.ts exactly (see comments).
*/
private async driveCycle(
spaceId: string,
workspaceId: string,
serviceUserId: string,
): Promise<GitSyncRunStatus> {
const settings = this.buildSettings(spaceId);
const vault = await this.vaultRegistry.getVault(spaceId);
const vaultRoot = settings.vaultPath;
const client = this.dataSource.bind({ workspaceId, userId: serviceUserId });
// Engine state store is git: make sure the repo + branches exist before any
// tracked-file listing or diff (the engine's pull/push assume an inited repo).
await vault.assertGitAvailable();
await vault.ensureRepo();
await vault.ensureBranch('docmost', 'main');
// --- PULL (plan §11.1/§11.2) --------------------------------------------
// readExisting deps (ReadExistingDeps): list tracked *.md + read by relPath.
const existing = await readExisting({
listTracked: () => vault.listTrackedFiles('*.md'),
readFile: (relPath) => readFile(`${vaultRoot}/${relPath}`, 'utf8'),
});
const tree = await client.listSpaceTree(spaceId);
const pullActions = computePullActions({
pages: tree.pages,
treeComplete: true,
existing,
});
// applyPullActions deps (ApplyPullActionsDeps): the read-side client subset,
// the vault git subset, and ABSOLUTE-path fs ops (mkdir/writeFile/rm).
const pullResult = await applyPullActions(
{
client,
git: vault,
writeFile: (absPath, text) => writeFile(absPath, text, 'utf8'),
mkdir: (absDir) => mkdir(absDir, { recursive: true }).then(() => undefined),
rm: (absPath) => rm(absPath, { force: true }),
},
pullActions,
vaultRoot,
);
// --- PUSH (plan §11.3) --------------------------------------------------
// runPush deps (PushDeps): settings, the full vault git object (method `this`
// binding must be preserved — pass the object, not bound method refs), a
// makeClient factory returning the push client subset, vault-relative fs
// read/write, and a logger. dryRun:false performs the real Docmost writes.
const pushResult = await runPush(
{
settings,
git: vault,
makeClient: () => client,
readFile: (relPath) => readFile(`${vaultRoot}/${relPath}`, 'utf8'),
writeFile: (relPath, text) =>
writeFile(`${vaultRoot}/${relPath}`, text, 'utf8'),
log: (line) => this.logger.log(`git-sync[${spaceId}] ${line}`),
},
{ dryRun: false },
);
return {
spaceId,
ran: true,
pull: {
written: pullResult.written,
deleted: pullResult.deleted,
conflict: pullResult.merge.conflict,
},
push: {
mode: pushResult.mode,
failures: pushResult.failures?.length ?? 0,
},
};
}
// --- poll-safety interval (plan §10) -------------------------------------
/**
* Poll-safety loop: catches events missed by the listener and reconciles after
* downtime. Gated on GIT_SYNC_ENABLED. The interval is a fixed value because
* `@Interval` cannot read config at class-eval time — the body short-circuits
* when disabled. Each enabled space runs under its own lock (overlaps skipped).
*
* ScheduleModule: registered ONCE globally by TelemetryModule
* (ScheduleModule.forRoot()); GitSyncModule imports the plain ScheduleModule so
* @Interval is discovered without a duplicate forRoot (plan §6 note).
*/
@Interval('git-sync-poll', 15000)
async poll(): Promise<void> {
if (!this.environmentService.isGitSyncEnabled()) return;
let spaces: EnabledSpace[];
try {
spaces = await this.enabledSpaces();
} catch (err) {
this.logger.error(
`git-sync: failed to enumerate enabled spaces: ${
err instanceof Error ? err.message : String(err)
}`,
);
return;
}
for (const { spaceId, workspaceId } of spaces) {
// runOnce never throws; a per-space error is logged and returned in status.
await this.runOnce(spaceId, workspaceId);
}
}
}