From 95bc9fe98d471f0aa21e2370000739895d637668 Mon Sep 17 00:00:00 2001 From: claude code agent 227 Date: Wed, 24 Jun 2026 01:20:38 +0300 Subject: [PATCH] refactor(git-sync): extract SpaceLockService from the orchestrator (PR #119 review, arch #2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The per-space single-writer lock — Redis CAS leader lock (SET NX PX, DEL-CAS and PEXPIRE-CAS Lua), the in-process mutex, the per-process instanceId and the heartbeat — lived inline in GitSyncOrchestrator. Extract it into a dedicated @Injectable() SpaceLockService exposing one narrow surface, withSpaceLock(spaceId, fn), so the lock is the orchestrator's only Redis-lock touch-point and is testable in isolation. The orchestrator now injects SpaceLockService and both consumers (runOnce, ingestExternalPush) go through spaceLock.withSpaceLock — behavior unchanged (same sentinel returns, same 503-on-lock-held contract). Orchestrator drops 591→472 lines. Adds space-lock.service.spec.ts asserting the lock SEMANTICS against a fake Redis (the test-coverage warning from the review): the SET NX/PX args, the DEL-CAS and PEXPIRE-CAS Lua + ARGV[1]=instanceId, plus the lock-held / in-progress / throw- still-releases paths. The orchestrator spec is unchanged in count and stays green (it now builds the real SpaceLockService over its mock Redis). Co-Authored-By: Claude Opus 4.8 --- .../integrations/git-sync/git-sync.module.ts | 2 + .../services/git-sync.orchestrator.spec.ts | 8 +- .../services/git-sync.orchestrator.ts | 129 +----------- .../services/space-lock.service.spec.ts | 195 ++++++++++++++++++ .../git-sync/services/space-lock.service.ts | 135 ++++++++++++ 5 files changed, 344 insertions(+), 125 deletions(-) create mode 100644 apps/server/src/integrations/git-sync/services/space-lock.service.spec.ts create mode 100644 apps/server/src/integrations/git-sync/services/space-lock.service.ts diff --git a/apps/server/src/integrations/git-sync/git-sync.module.ts b/apps/server/src/integrations/git-sync/git-sync.module.ts index b16cfd93..cd80a1d2 100644 --- a/apps/server/src/integrations/git-sync/git-sync.module.ts +++ b/apps/server/src/integrations/git-sync/git-sync.module.ts @@ -7,6 +7,7 @@ import { PageModule } from '../../core/page/page.module'; import { AuthModule } from '../../core/auth/auth.module'; import { GitmostDataSourceService } from './services/gitmost-datasource.service'; import { GitSyncOrchestrator } from './services/git-sync.orchestrator'; +import { SpaceLockService } from './services/space-lock.service'; import { VaultRegistryService } from './services/vault-registry.service'; import { PageChangeListener } from './listeners/page-change.listener'; import { GitSyncController } from './git-sync.controller'; @@ -47,6 +48,7 @@ import { GitHttpService } from './http/git-http.service'; providers: [ GitmostDataSourceService, GitSyncOrchestrator, + SpaceLockService, VaultRegistryService, PageChangeListener, // /git smart-HTTP host (the raw Fastify route in main.ts resolves these). diff --git a/apps/server/src/integrations/git-sync/services/git-sync.orchestrator.spec.ts b/apps/server/src/integrations/git-sync/services/git-sync.orchestrator.spec.ts index 56aade9b..a5b2154d 100644 --- a/apps/server/src/integrations/git-sync/services/git-sync.orchestrator.spec.ts +++ b/apps/server/src/integrations/git-sync/services/git-sync.orchestrator.spec.ts @@ -22,6 +22,7 @@ import { runPush, } from '@docmost/git-sync'; import { GitSyncOrchestrator } from './git-sync.orchestrator'; +import { SpaceLockService } from './space-lock.service'; type AnyMock = jest.Mock; @@ -118,12 +119,17 @@ function build(opts: BuildOptions = {}): Built { const db = {}; + // The REAL SpaceLockService, constructed against the mock redis above, so all + // existing lock assertions (lock-held, in-progress, leader lock, release CAS, + // heartbeat) still exercise the same `redis.set`/`redis.eval` mock unchanged. + const spaceLock = new SpaceLockService(redisService as any); + const orchestrator = new GitSyncOrchestrator( env as any, dataSource as any, vaultRegistry as any, scheduler as any, - redisService as any, + spaceLock as any, db as any, ); diff --git a/apps/server/src/integrations/git-sync/services/git-sync.orchestrator.ts b/apps/server/src/integrations/git-sync/services/git-sync.orchestrator.ts index cb89349a..507e3f5e 100644 --- a/apps/server/src/integrations/git-sync/services/git-sync.orchestrator.ts +++ b/apps/server/src/integrations/git-sync/services/git-sync.orchestrator.ts @@ -5,9 +5,6 @@ import { OnModuleInit, } from '@nestjs/common'; import { SchedulerRegistry } from '@nestjs/schedule'; -import { RedisService } from '@nestjs-labs/nestjs-ioredis'; -import type { Redis } from 'ioredis'; -import { randomUUID } from 'node:crypto'; import { mkdir, readFile, rm, writeFile } from 'node:fs/promises'; import { dirname } from 'node:path'; import { InjectKysely } from 'nestjs-kysely'; @@ -23,10 +20,7 @@ import { import { EnvironmentService } from '../../environment/environment.service'; import { GitmostDataSourceService } from './gitmost-datasource.service'; import { VaultRegistryService } from './vault-registry.service'; -import { - GIT_SYNC_LOCK_PREFIX, - GIT_SYNC_LOCK_TTL_MS, -} from '../git-sync.constants'; +import { SpaceLockService } from './space-lock.service'; /** A space the poll loop should reconcile: its id + the workspace it lives in. */ interface EnabledSpace { @@ -80,11 +74,6 @@ export interface GitSyncRunStatus { @Injectable() export class GitSyncOrchestrator implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(GitSyncOrchestrator.name); - private readonly redis: Redis; - /** Unique per process instance — the leader-lock value (CAS on release). */ - private readonly instanceId = randomUUID(); - /** In-process per-space mutex: spaceIds with a cycle currently running. */ - private readonly running = new Set(); /** The registered poll-interval name, or null when none is registered. */ private pollIntervalName: string | null = null; @@ -93,77 +82,9 @@ export class GitSyncOrchestrator implements OnModuleInit, OnModuleDestroy { private readonly dataSource: GitmostDataSourceService, private readonly vaultRegistry: VaultRegistryService, private readonly schedulerRegistry: SchedulerRegistry, - redisService: RedisService, + private readonly spaceLock: SpaceLockService, @InjectKysely() private readonly db: KyselyDB, - ) { - this.redis = redisService.getOrThrow(); - } - - // --- Redis leader lock ----------------------------------------- - - /** - * Acquire per-space leadership: `SET PX NX` returns - * 'OK' only when the key did not exist. Any other reply means another replica - * holds it. - */ - private async acquire(spaceId: string): Promise { - const ok = await this.redis.set( - GIT_SYNC_LOCK_PREFIX + spaceId, - this.instanceId, - 'PX', - GIT_SYNC_LOCK_TTL_MS, - 'NX', - ); - return ok === 'OK'; - } - - /** - * Release the lock with a CAS Lua so we only delete it when WE still hold it - * (the value matches our instanceId) — never another replica's lock that took - * over after our TTL expired. - */ - private async release(spaceId: string): Promise { - const lua = - 'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end'; - try { - await this.redis.eval(lua, 1, GIT_SYNC_LOCK_PREFIX + spaceId, this.instanceId); - } catch (err) { - this.logger.warn( - `git-sync: failed to release lock for space ${spaceId}: ${ - err instanceof Error ? err.message : String(err) - }`, - ); - } - } - - /** - * CAS-guarded TTL refresh: extend the lock's TTL ONLY while WE still own it - * (the stored value matches our instanceId) — never extend another replica's - * lock that took over after our TTL expired. Used by the heartbeat in - * `withSpaceLock` so a long-running push (client-controlled receive-pack + the - * Docmost cycle) cannot outlive the lock and let a concurrent cycle race the - * working tree. Logs (warn) but never throws — a failed refresh must not break - * the cycle it is protecting. - */ - private async refreshLock(spaceId: string): Promise { - const lua = - 'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end'; - try { - await this.redis.eval( - lua, - 1, - GIT_SYNC_LOCK_PREFIX + spaceId, - this.instanceId, - String(GIT_SYNC_LOCK_TTL_MS), - ); - } catch (err) { - this.logger.warn( - `git-sync: failed to refresh lock for space ${spaceId}: ${ - err instanceof Error ? err.message : String(err) - }`, - ); - } - } + ) {} // --- enabled-space enumeration -------------------------------- @@ -236,7 +157,7 @@ export class GitSyncOrchestrator implements OnModuleInit, OnModuleDestroy { // when it could not enter — surfaced here as the existing skipped:'in-progress' // / 'lock-held' status so runOnce's observable behavior is unchanged. try { - const result = await this.withSpaceLock(spaceId, () => + const result = await this.spaceLock.withSpaceLock(spaceId, () => this.driveCycle(spaceId, workspaceId, serviceUserId), ); if ('skipped' in result && !('spaceId' in result)) { @@ -250,46 +171,6 @@ export class GitSyncOrchestrator implements OnModuleInit, OnModuleDestroy { } } - /** - * Run `fn` under the per-space lock: the in-process mutex (no overlapping - * cycles on this instance) AND the Redis leader lock (single writer across - * replicas). Returns `fn`'s result, or a skip sentinel when the lock could not - * be acquired — `{ skipped: 'in-progress' }` (this instance is mid-cycle) or - * `{ skipped: 'lock-held' }` (another replica holds the Redis lock). The mutex - * + Redis lock are always released in a `finally`, even when `fn` throws (the - * throw propagates to the caller). This is the single reusable wrapper shared - * by `runOnce` (the poll/admin cycle) and `ingestExternalPush` (a push from a - * git client over HTTP) so both serialize against each other identically. - */ - async withSpaceLock( - spaceId: string, - fn: () => Promise, - ): Promise { - if (this.running.has(spaceId)) { - return { skipped: 'in-progress' }; - } - if (!(await this.acquire(spaceId))) { - return { skipped: 'lock-held' }; - } - this.running.add(spaceId); - // Heartbeat: periodically (≈ TTL/3) extend the lock's TTL while `fn` runs so - // a long push (client-controlled receive-pack + the Docmost cycle) cannot - // outlive the fixed TTL and let a concurrent cycle race the working tree. The - // refresh is CAS-guarded (only extends while WE own it). `.unref()` keeps the - // timer from holding the event loop open; it is ALWAYS cleared in `finally`. - const heartbeat = setInterval(() => { - void this.refreshLock(spaceId); - }, Math.max(1, Math.floor(GIT_SYNC_LOCK_TTL_MS / 3))); - heartbeat.unref?.(); - try { - return await fn(); - } finally { - clearInterval(heartbeat); - this.running.delete(spaceId); - await this.release(spaceId); - } - } - /** * Ingest a push that arrived over smart-HTTP (the /git host). Under the SAME * per-space lock the poll cycle uses, it: @@ -324,7 +205,7 @@ export class GitSyncOrchestrator implements OnModuleInit, OnModuleDestroy { } const serviceUserId = this.environmentService.getGitSyncServiceUserId(); - const result = await this.withSpaceLock(spaceId, async () => { + const result = await this.spaceLock.withSpaceLock(spaceId, async () => { // 1) Stream the receive-pack to the client (durable commits land on main). await runReceivePack(); diff --git a/apps/server/src/integrations/git-sync/services/space-lock.service.spec.ts b/apps/server/src/integrations/git-sync/services/space-lock.service.spec.ts new file mode 100644 index 00000000..809ea47e --- /dev/null +++ b/apps/server/src/integrations/git-sync/services/space-lock.service.spec.ts @@ -0,0 +1,195 @@ +// Unit tests for SpaceLockService in ISOLATION. The lock is exercised against a +// fake redis (mock `set`/`eval`) and we assert the exact ARGUMENTS passed to +// redis — the test-coverage gap this refactor (PR #119 #2) closes: acquire uses +// `SET ... PX NX`, release uses a DEL-CAS Lua, and the heartbeat refresh +// uses a PEXPIRE-CAS Lua, all keyed by the same private instanceId. +import { Logger } from '@nestjs/common'; +import { SpaceLockService } from './space-lock.service'; +import { + GIT_SYNC_LOCK_PREFIX, + GIT_SYNC_LOCK_TTL_MS, +} from '../git-sync.constants'; + +type AnyMock = jest.Mock; + +interface Built { + service: SpaceLockService; + redis: { set: AnyMock; eval: AnyMock }; +} + +function build(): Built { + const redis = { + // Default: lock acquired. Tests override per-case. + set: jest.fn(async () => 'OK'), + eval: jest.fn(async () => 1), + }; + const redisService = { getOrThrow: jest.fn(() => redis) }; + const service = new SpaceLockService(redisService as any); + return { service, redis }; +} + +/** Drain queued microtasks so awaited continuations inside the lock run. */ +async function flushMicrotasks(): Promise { + await Promise.resolve(); + await Promise.resolve(); + await Promise.resolve(); +} + +beforeEach(() => { + jest.clearAllMocks(); +}); + +describe('SpaceLockService', () => { + describe('acquire (SET NX/PX)', () => { + it('calls redis.set with (prefix+spaceId, , PX, ttl, NX) and reuses the instanceId on release', async () => { + const { service, redis } = build(); + + const result = await service.withSpaceLock('space-1', async () => 'ok'); + expect(result).toBe('ok'); + + // acquire arguments + expect(redis.set).toHaveBeenCalledTimes(1); + const [key, instanceId, px, ttl, nx] = redis.set.mock.calls[0]; + expect(key).toBe(GIT_SYNC_LOCK_PREFIX + 'space-1'); + expect(typeof instanceId).toBe('string'); + expect(instanceId.length).toBeGreaterThan(0); + expect(px).toBe('PX'); + expect(ttl).toBe(GIT_SYNC_LOCK_TTL_MS); + expect(nx).toBe('NX'); + + // release (eval) reuses the SAME instanceId as ARGV[1] + expect(redis.eval).toHaveBeenCalledTimes(1); + const [, , relKey, relInstanceId] = redis.eval.mock.calls[0]; + expect(relKey).toBe(GIT_SYNC_LOCK_PREFIX + 'space-1'); + expect(relInstanceId).toBe(instanceId); + }); + }); + + describe('release (DEL-CAS Lua)', () => { + it('returns the fn result and runs a get/del CAS-compared release in finally', async () => { + const { service, redis } = build(); + + const result = await service.withSpaceLock('space-1', async () => 42); + expect(result).toBe(42); + + expect(redis.eval).toHaveBeenCalledTimes(1); + const [lua, numKeys, key, instanceId] = redis.eval.mock.calls[0]; + expect(lua).toContain('get'); + expect(lua).toContain('del'); + expect(lua).toContain('== ARGV[1]'); + expect(numKeys).toBe(1); + expect(key).toBe(GIT_SYNC_LOCK_PREFIX + 'space-1'); + expect(typeof instanceId).toBe('string'); + }); + }); + + describe('lock held by another replica', () => { + it("returns { skipped: 'lock-held' } without running fn or releasing when set != 'OK'", async () => { + const { service, redis } = build(); + redis.set.mockResolvedValueOnce(null); + const fn = jest.fn(async () => 'ran'); + + const result = await service.withSpaceLock('space-1', fn); + + expect(result).toEqual({ skipped: 'lock-held' }); + expect(fn).not.toHaveBeenCalled(); + // No release: we never acquired it. + expect(redis.eval).not.toHaveBeenCalled(); + }); + }); + + describe('in-process mutex', () => { + it("a second withSpaceLock on the same space mid-flight returns { skipped: 'in-progress' } without a second set", async () => { + const { service, redis } = build(); + let release!: () => void; + const gate = new Promise((resolve) => { + release = resolve; + }); + + const first = service.withSpaceLock('space-1', async () => { + await gate; + return 'first'; + }); + // Let the first call acquire + enter the running set. + await flushMicrotasks(); + + const second = await service.withSpaceLock('space-1', async () => 'second'); + expect(second).toEqual({ skipped: 'in-progress' }); + // Only the first call hit redis.set — the mutex short-circuits the second. + expect(redis.set).toHaveBeenCalledTimes(1); + + release(); + await expect(first).resolves.toBe('first'); + }); + }); + + describe('fn throwing', () => { + it('propagates the throw AND still releases (eval) in finally', async () => { + const { service, redis } = build(); + const boom = new Error('boom'); + + await expect( + service.withSpaceLock('space-1', async () => { + throw boom; + }), + ).rejects.toBe(boom); + + // Release still ran despite the throw. + expect(redis.eval).toHaveBeenCalledTimes(1); + const [lua] = redis.eval.mock.calls[0]; + expect(lua).toContain('del'); + }); + }); + + describe('heartbeat refresh (PEXPIRE-CAS Lua)', () => { + it('extends the lock via a pexpire CAS-Lua with the same instanceId while fn is in flight', async () => { + jest.useFakeTimers(); + try { + const { service, redis } = build(); + let release!: () => void; + const gate = new Promise((resolve) => { + release = resolve; + }); + + const run = service.withSpaceLock('space-1', async () => { + await gate; + return 'done'; + }); + // Let acquire resolve and the running.add + setInterval registration run. + await flushMicrotasks(); + + // Capture the instanceId used on acquire so we can assert it is reused. + const instanceId = redis.set.mock.calls[0][1]; + + // Advance past one heartbeat interval (≈ TTL/3) to fire refreshLock. + jest.advanceTimersByTime(Math.floor(GIT_SYNC_LOCK_TTL_MS / 3)); + await flushMicrotasks(); + + // The refresh eval ran (release has not, fn still awaiting the gate). + expect(redis.eval).toHaveBeenCalledTimes(1); + const [lua, numKeys, key, argInstanceId, ttlArg] = + redis.eval.mock.calls[0]; + expect(lua).toContain('pexpire'); + expect(lua).toContain('== ARGV[1]'); + expect(numKeys).toBe(1); + expect(key).toBe(GIT_SYNC_LOCK_PREFIX + 'space-1'); + expect(argInstanceId).toBe(instanceId); + expect(ttlArg).toBe(String(GIT_SYNC_LOCK_TTL_MS)); + + // Let fn finish; release runs in finally (second eval, the DEL-CAS). + release(); + await flushMicrotasks(); + await expect(run).resolves.toBe('done'); + expect(redis.eval).toHaveBeenCalledTimes(2); + expect(redis.eval.mock.calls[1][0]).toContain('del'); + } finally { + jest.useRealTimers(); + } + }); + }); +}); + +// Silence the warn logger if a refresh/release path ever logs (defensive). +beforeAll(() => { + jest.spyOn(Logger.prototype, 'warn').mockImplementation(() => undefined); +}); diff --git a/apps/server/src/integrations/git-sync/services/space-lock.service.ts b/apps/server/src/integrations/git-sync/services/space-lock.service.ts new file mode 100644 index 00000000..b0cd999d --- /dev/null +++ b/apps/server/src/integrations/git-sync/services/space-lock.service.ts @@ -0,0 +1,135 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { RedisService } from '@nestjs-labs/nestjs-ioredis'; +import type { Redis } from 'ioredis'; +import { randomUUID } from 'node:crypto'; +import { + GIT_SYNC_LOCK_PREFIX, + GIT_SYNC_LOCK_TTL_MS, +} from '../git-sync.constants'; + +/** + * The per-space lock used by the git-sync control plane: an in-process per-space + * mutex (no overlapping cycles on one instance) PLUS a Redis leader lock + * (single writer across replicas). Extracted from `GitSyncOrchestrator` so the + * locking primitive is a single reusable, independently testable unit + * (PR #119 refactor #2). + */ +@Injectable() +export class SpaceLockService { + private readonly logger = new Logger(SpaceLockService.name); + private readonly redis: Redis; + /** Unique per process instance — the leader-lock value (CAS on release). */ + private readonly instanceId = randomUUID(); + /** In-process per-space mutex: spaceIds with a cycle currently running. */ + private readonly running = new Set(); + + constructor(redisService: RedisService) { + this.redis = redisService.getOrThrow(); + } + + // --- Redis leader lock ----------------------------------------- + + /** + * Acquire per-space leadership: `SET PX NX` returns + * 'OK' only when the key did not exist. Any other reply means another replica + * holds it. + */ + private async acquire(spaceId: string): Promise { + const ok = await this.redis.set( + GIT_SYNC_LOCK_PREFIX + spaceId, + this.instanceId, + 'PX', + GIT_SYNC_LOCK_TTL_MS, + 'NX', + ); + return ok === 'OK'; + } + + /** + * Release the lock with a CAS Lua so we only delete it when WE still hold it + * (the value matches our instanceId) — never another replica's lock that took + * over after our TTL expired. + */ + private async release(spaceId: string): Promise { + const lua = + 'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end'; + try { + await this.redis.eval(lua, 1, GIT_SYNC_LOCK_PREFIX + spaceId, this.instanceId); + } catch (err) { + this.logger.warn( + `git-sync: failed to release lock for space ${spaceId}: ${ + err instanceof Error ? err.message : String(err) + }`, + ); + } + } + + /** + * CAS-guarded TTL refresh: extend the lock's TTL ONLY while WE still own it + * (the stored value matches our instanceId) — never extend another replica's + * lock that took over after our TTL expired. Used by the heartbeat in + * `withSpaceLock` so a long-running push (client-controlled receive-pack + the + * Docmost cycle) cannot outlive the lock and let a concurrent cycle race the + * working tree. Logs (warn) but never throws — a failed refresh must not break + * the cycle it is protecting. + */ + private async refreshLock(spaceId: string): Promise { + const lua = + 'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end'; + try { + await this.redis.eval( + lua, + 1, + GIT_SYNC_LOCK_PREFIX + spaceId, + this.instanceId, + String(GIT_SYNC_LOCK_TTL_MS), + ); + } catch (err) { + this.logger.warn( + `git-sync: failed to refresh lock for space ${spaceId}: ${ + err instanceof Error ? err.message : String(err) + }`, + ); + } + } + + /** + * Run `fn` under the per-space lock: the in-process mutex (no overlapping + * cycles on this instance) AND the Redis leader lock (single writer across + * replicas). Returns `fn`'s result, or a skip sentinel when the lock could not + * be acquired — `{ skipped: 'in-progress' }` (this instance is mid-cycle) or + * `{ skipped: 'lock-held' }` (another replica holds the Redis lock). The mutex + * + Redis lock are always released in a `finally`, even when `fn` throws (the + * throw propagates to the caller). This is the single reusable wrapper shared + * by `runOnce` (the poll/admin cycle) and `ingestExternalPush` (a push from a + * git client over HTTP) so both serialize against each other identically. + */ + async withSpaceLock( + spaceId: string, + fn: () => Promise, + ): Promise { + if (this.running.has(spaceId)) { + return { skipped: 'in-progress' }; + } + if (!(await this.acquire(spaceId))) { + return { skipped: 'lock-held' }; + } + this.running.add(spaceId); + // Heartbeat: periodically (≈ TTL/3) extend the lock's TTL while `fn` runs so + // a long push (client-controlled receive-pack + the Docmost cycle) cannot + // outlive the fixed TTL and let a concurrent cycle race the working tree. The + // refresh is CAS-guarded (only extends while WE own it). `.unref()` keeps the + // timer from holding the event loop open; it is ALWAYS cleared in `finally`. + const heartbeat = setInterval(() => { + void this.refreshLock(spaceId); + }, Math.max(1, Math.floor(GIT_SYNC_LOCK_TTL_MS / 3))); + heartbeat.unref?.(); + try { + return await fn(); + } finally { + clearInterval(heartbeat); + this.running.delete(spaceId); + await this.release(spaceId); + } + } +}