refactor(git-sync): extract SpaceLockService from the orchestrator (PR #119 review, arch #2)

The per-space single-writer lock — Redis CAS leader lock (SET NX PX, DEL-CAS and
PEXPIRE-CAS Lua), the in-process mutex, the per-process instanceId and the
heartbeat — lived inline in GitSyncOrchestrator. Extract it into a dedicated
@Injectable() SpaceLockService exposing one narrow surface, withSpaceLock(spaceId,
fn), so the lock is the orchestrator's only Redis-lock touch-point and is testable
in isolation. The orchestrator now injects SpaceLockService and both consumers
(runOnce, ingestExternalPush) go through spaceLock.withSpaceLock — behavior
unchanged (same sentinel returns, same 503-on-lock-held contract). Orchestrator
drops 591→472 lines.

Adds space-lock.service.spec.ts asserting the lock SEMANTICS against a fake Redis
(the test-coverage warning from the review): the SET NX/PX args, the DEL-CAS and
PEXPIRE-CAS Lua + ARGV[1]=instanceId, plus the lock-held / in-progress / throw-
still-releases paths. The orchestrator spec is unchanged in count and stays green
(it now builds the real SpaceLockService over its mock Redis).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
claude code agent 227
2026-06-24 01:20:38 +03:00
parent 1709ffb8de
commit 893ab75a3b
5 changed files with 344 additions and 125 deletions

View File

@@ -7,6 +7,7 @@ import { PageModule } from '../../core/page/page.module';
import { AuthModule } from '../../core/auth/auth.module';
import { GitmostDataSourceService } from './services/gitmost-datasource.service';
import { GitSyncOrchestrator } from './services/git-sync.orchestrator';
import { SpaceLockService } from './services/space-lock.service';
import { VaultRegistryService } from './services/vault-registry.service';
import { PageChangeListener } from './listeners/page-change.listener';
import { GitSyncController } from './git-sync.controller';
@@ -47,6 +48,7 @@ import { GitHttpService } from './http/git-http.service';
providers: [
GitmostDataSourceService,
GitSyncOrchestrator,
SpaceLockService,
VaultRegistryService,
PageChangeListener,
// /git smart-HTTP host (the raw Fastify route in main.ts resolves these).

View File

@@ -22,6 +22,7 @@ import {
runPush,
} from '@docmost/git-sync';
import { GitSyncOrchestrator } from './git-sync.orchestrator';
import { SpaceLockService } from './space-lock.service';
type AnyMock = jest.Mock;
@@ -118,12 +119,17 @@ function build(opts: BuildOptions = {}): Built {
const db = {};
// The REAL SpaceLockService, constructed against the mock redis above, so all
// existing lock assertions (lock-held, in-progress, leader lock, release CAS,
// heartbeat) still exercise the same `redis.set`/`redis.eval` mock unchanged.
const spaceLock = new SpaceLockService(redisService as any);
const orchestrator = new GitSyncOrchestrator(
env as any,
dataSource as any,
vaultRegistry as any,
scheduler as any,
redisService as any,
spaceLock as any,
db as any,
);

View File

@@ -5,9 +5,6 @@ import {
OnModuleInit,
} from '@nestjs/common';
import { SchedulerRegistry } from '@nestjs/schedule';
import { RedisService } from '@nestjs-labs/nestjs-ioredis';
import type { Redis } from 'ioredis';
import { randomUUID } from 'node:crypto';
import { mkdir, readFile, rm, writeFile } from 'node:fs/promises';
import { dirname } from 'node:path';
import { InjectKysely } from 'nestjs-kysely';
@@ -23,10 +20,7 @@ import {
import { EnvironmentService } from '../../environment/environment.service';
import { GitmostDataSourceService } from './gitmost-datasource.service';
import { VaultRegistryService } from './vault-registry.service';
import {
GIT_SYNC_LOCK_PREFIX,
GIT_SYNC_LOCK_TTL_MS,
} from '../git-sync.constants';
import { SpaceLockService } from './space-lock.service';
/** A space the poll loop should reconcile: its id + the workspace it lives in. */
interface EnabledSpace {
@@ -80,11 +74,6 @@ export interface GitSyncRunStatus {
@Injectable()
export class GitSyncOrchestrator implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(GitSyncOrchestrator.name);
private readonly redis: Redis;
/** Unique per process instance — the leader-lock value (CAS on release). */
private readonly instanceId = randomUUID();
/** In-process per-space mutex: spaceIds with a cycle currently running. */
private readonly running = new Set<string>();
/** The registered poll-interval name, or null when none is registered. */
private pollIntervalName: string | null = null;
@@ -93,77 +82,9 @@ export class GitSyncOrchestrator implements OnModuleInit, OnModuleDestroy {
private readonly dataSource: GitmostDataSourceService,
private readonly vaultRegistry: VaultRegistryService,
private readonly schedulerRegistry: SchedulerRegistry,
redisService: RedisService,
private readonly spaceLock: SpaceLockService,
@InjectKysely() private readonly db: KyselyDB,
) {
this.redis = redisService.getOrThrow();
}
// --- Redis leader lock -----------------------------------------
/**
* Acquire per-space leadership: `SET <key> <instanceId> PX <ttl> NX` returns
* 'OK' only when the key did not exist. Any other reply means another replica
* holds it.
*/
private async acquire(spaceId: string): Promise<boolean> {
const ok = await this.redis.set(
GIT_SYNC_LOCK_PREFIX + spaceId,
this.instanceId,
'PX',
GIT_SYNC_LOCK_TTL_MS,
'NX',
);
return ok === 'OK';
}
/**
* Release the lock with a CAS Lua so we only delete it when WE still hold it
* (the value matches our instanceId) — never another replica's lock that took
* over after our TTL expired.
*/
private async release(spaceId: string): Promise<void> {
const lua =
'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end';
try {
await this.redis.eval(lua, 1, GIT_SYNC_LOCK_PREFIX + spaceId, this.instanceId);
} catch (err) {
this.logger.warn(
`git-sync: failed to release lock for space ${spaceId}: ${
err instanceof Error ? err.message : String(err)
}`,
);
}
}
/**
* CAS-guarded TTL refresh: extend the lock's TTL ONLY while WE still own it
* (the stored value matches our instanceId) — never extend another replica's
* lock that took over after our TTL expired. Used by the heartbeat in
* `withSpaceLock` so a long-running push (client-controlled receive-pack + the
* Docmost cycle) cannot outlive the lock and let a concurrent cycle race the
* working tree. Logs (warn) but never throws — a failed refresh must not break
* the cycle it is protecting.
*/
private async refreshLock(spaceId: string): Promise<void> {
const lua =
'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end';
try {
await this.redis.eval(
lua,
1,
GIT_SYNC_LOCK_PREFIX + spaceId,
this.instanceId,
String(GIT_SYNC_LOCK_TTL_MS),
);
} catch (err) {
this.logger.warn(
`git-sync: failed to refresh lock for space ${spaceId}: ${
err instanceof Error ? err.message : String(err)
}`,
);
}
}
) {}
// --- enabled-space enumeration --------------------------------
@@ -236,7 +157,7 @@ export class GitSyncOrchestrator implements OnModuleInit, OnModuleDestroy {
// when it could not enter — surfaced here as the existing skipped:'in-progress'
// / 'lock-held' status so runOnce's observable behavior is unchanged.
try {
const result = await this.withSpaceLock(spaceId, () =>
const result = await this.spaceLock.withSpaceLock(spaceId, () =>
this.driveCycle(spaceId, workspaceId, serviceUserId),
);
if ('skipped' in result && !('spaceId' in result)) {
@@ -250,46 +171,6 @@ export class GitSyncOrchestrator implements OnModuleInit, OnModuleDestroy {
}
}
/**
* Run `fn` under the per-space lock: the in-process mutex (no overlapping
* cycles on this instance) AND the Redis leader lock (single writer across
* replicas). Returns `fn`'s result, or a skip sentinel when the lock could not
* be acquired — `{ skipped: 'in-progress' }` (this instance is mid-cycle) or
* `{ skipped: 'lock-held' }` (another replica holds the Redis lock). The mutex
* + Redis lock are always released in a `finally`, even when `fn` throws (the
* throw propagates to the caller). This is the single reusable wrapper shared
* by `runOnce` (the poll/admin cycle) and `ingestExternalPush` (a push from a
* git client over HTTP) so both serialize against each other identically.
*/
async withSpaceLock<T>(
spaceId: string,
fn: () => Promise<T>,
): Promise<T | { skipped: 'lock-held' | 'in-progress' }> {
if (this.running.has(spaceId)) {
return { skipped: 'in-progress' };
}
if (!(await this.acquire(spaceId))) {
return { skipped: 'lock-held' };
}
this.running.add(spaceId);
// Heartbeat: periodically (≈ TTL/3) extend the lock's TTL while `fn` runs so
// a long push (client-controlled receive-pack + the Docmost cycle) cannot
// outlive the fixed TTL and let a concurrent cycle race the working tree. The
// refresh is CAS-guarded (only extends while WE own it). `.unref()` keeps the
// timer from holding the event loop open; it is ALWAYS cleared in `finally`.
const heartbeat = setInterval(() => {
void this.refreshLock(spaceId);
}, Math.max(1, Math.floor(GIT_SYNC_LOCK_TTL_MS / 3)));
heartbeat.unref?.();
try {
return await fn();
} finally {
clearInterval(heartbeat);
this.running.delete(spaceId);
await this.release(spaceId);
}
}
/**
* Ingest a push that arrived over smart-HTTP (the /git host). Under the SAME
* per-space lock the poll cycle uses, it:
@@ -324,7 +205,7 @@ export class GitSyncOrchestrator implements OnModuleInit, OnModuleDestroy {
}
const serviceUserId = this.environmentService.getGitSyncServiceUserId();
const result = await this.withSpaceLock(spaceId, async () => {
const result = await this.spaceLock.withSpaceLock(spaceId, async () => {
// 1) Stream the receive-pack to the client (durable commits land on main).
await runReceivePack();

View File

@@ -0,0 +1,195 @@
// Unit tests for SpaceLockService in ISOLATION. The lock is exercised against a
// fake redis (mock `set`/`eval`) and we assert the exact ARGUMENTS passed to
// redis — the test-coverage gap this refactor (PR #119 #2) closes: acquire uses
// `SET ... PX <ttl> NX`, release uses a DEL-CAS Lua, and the heartbeat refresh
// uses a PEXPIRE-CAS Lua, all keyed by the same private instanceId.
import { Logger } from '@nestjs/common';
import { SpaceLockService } from './space-lock.service';
import {
GIT_SYNC_LOCK_PREFIX,
GIT_SYNC_LOCK_TTL_MS,
} from '../git-sync.constants';
type AnyMock = jest.Mock;
interface Built {
service: SpaceLockService;
redis: { set: AnyMock; eval: AnyMock };
}
function build(): Built {
const redis = {
// Default: lock acquired. Tests override per-case.
set: jest.fn(async () => 'OK'),
eval: jest.fn(async () => 1),
};
const redisService = { getOrThrow: jest.fn(() => redis) };
const service = new SpaceLockService(redisService as any);
return { service, redis };
}
/** Drain queued microtasks so awaited continuations inside the lock run. */
async function flushMicrotasks(): Promise<void> {
await Promise.resolve();
await Promise.resolve();
await Promise.resolve();
}
beforeEach(() => {
jest.clearAllMocks();
});
describe('SpaceLockService', () => {
describe('acquire (SET NX/PX)', () => {
it('calls redis.set with (prefix+spaceId, <instanceId>, PX, ttl, NX) and reuses the instanceId on release', async () => {
const { service, redis } = build();
const result = await service.withSpaceLock('space-1', async () => 'ok');
expect(result).toBe('ok');
// acquire arguments
expect(redis.set).toHaveBeenCalledTimes(1);
const [key, instanceId, px, ttl, nx] = redis.set.mock.calls[0];
expect(key).toBe(GIT_SYNC_LOCK_PREFIX + 'space-1');
expect(typeof instanceId).toBe('string');
expect(instanceId.length).toBeGreaterThan(0);
expect(px).toBe('PX');
expect(ttl).toBe(GIT_SYNC_LOCK_TTL_MS);
expect(nx).toBe('NX');
// release (eval) reuses the SAME instanceId as ARGV[1]
expect(redis.eval).toHaveBeenCalledTimes(1);
const [, , relKey, relInstanceId] = redis.eval.mock.calls[0];
expect(relKey).toBe(GIT_SYNC_LOCK_PREFIX + 'space-1');
expect(relInstanceId).toBe(instanceId);
});
});
describe('release (DEL-CAS Lua)', () => {
it('returns the fn result and runs a get/del CAS-compared release in finally', async () => {
const { service, redis } = build();
const result = await service.withSpaceLock('space-1', async () => 42);
expect(result).toBe(42);
expect(redis.eval).toHaveBeenCalledTimes(1);
const [lua, numKeys, key, instanceId] = redis.eval.mock.calls[0];
expect(lua).toContain('get');
expect(lua).toContain('del');
expect(lua).toContain('== ARGV[1]');
expect(numKeys).toBe(1);
expect(key).toBe(GIT_SYNC_LOCK_PREFIX + 'space-1');
expect(typeof instanceId).toBe('string');
});
});
describe('lock held by another replica', () => {
it("returns { skipped: 'lock-held' } without running fn or releasing when set != 'OK'", async () => {
const { service, redis } = build();
redis.set.mockResolvedValueOnce(null);
const fn = jest.fn(async () => 'ran');
const result = await service.withSpaceLock('space-1', fn);
expect(result).toEqual({ skipped: 'lock-held' });
expect(fn).not.toHaveBeenCalled();
// No release: we never acquired it.
expect(redis.eval).not.toHaveBeenCalled();
});
});
describe('in-process mutex', () => {
it("a second withSpaceLock on the same space mid-flight returns { skipped: 'in-progress' } without a second set", async () => {
const { service, redis } = build();
let release!: () => void;
const gate = new Promise<void>((resolve) => {
release = resolve;
});
const first = service.withSpaceLock('space-1', async () => {
await gate;
return 'first';
});
// Let the first call acquire + enter the running set.
await flushMicrotasks();
const second = await service.withSpaceLock('space-1', async () => 'second');
expect(second).toEqual({ skipped: 'in-progress' });
// Only the first call hit redis.set — the mutex short-circuits the second.
expect(redis.set).toHaveBeenCalledTimes(1);
release();
await expect(first).resolves.toBe('first');
});
});
describe('fn throwing', () => {
it('propagates the throw AND still releases (eval) in finally', async () => {
const { service, redis } = build();
const boom = new Error('boom');
await expect(
service.withSpaceLock('space-1', async () => {
throw boom;
}),
).rejects.toBe(boom);
// Release still ran despite the throw.
expect(redis.eval).toHaveBeenCalledTimes(1);
const [lua] = redis.eval.mock.calls[0];
expect(lua).toContain('del');
});
});
describe('heartbeat refresh (PEXPIRE-CAS Lua)', () => {
it('extends the lock via a pexpire CAS-Lua with the same instanceId while fn is in flight', async () => {
jest.useFakeTimers();
try {
const { service, redis } = build();
let release!: () => void;
const gate = new Promise<void>((resolve) => {
release = resolve;
});
const run = service.withSpaceLock('space-1', async () => {
await gate;
return 'done';
});
// Let acquire resolve and the running.add + setInterval registration run.
await flushMicrotasks();
// Capture the instanceId used on acquire so we can assert it is reused.
const instanceId = redis.set.mock.calls[0][1];
// Advance past one heartbeat interval (≈ TTL/3) to fire refreshLock.
jest.advanceTimersByTime(Math.floor(GIT_SYNC_LOCK_TTL_MS / 3));
await flushMicrotasks();
// The refresh eval ran (release has not, fn still awaiting the gate).
expect(redis.eval).toHaveBeenCalledTimes(1);
const [lua, numKeys, key, argInstanceId, ttlArg] =
redis.eval.mock.calls[0];
expect(lua).toContain('pexpire');
expect(lua).toContain('== ARGV[1]');
expect(numKeys).toBe(1);
expect(key).toBe(GIT_SYNC_LOCK_PREFIX + 'space-1');
expect(argInstanceId).toBe(instanceId);
expect(ttlArg).toBe(String(GIT_SYNC_LOCK_TTL_MS));
// Let fn finish; release runs in finally (second eval, the DEL-CAS).
release();
await flushMicrotasks();
await expect(run).resolves.toBe('done');
expect(redis.eval).toHaveBeenCalledTimes(2);
expect(redis.eval.mock.calls[1][0]).toContain('del');
} finally {
jest.useRealTimers();
}
});
});
});
// Silence the warn logger if a refresh/release path ever logs (defensive).
beforeAll(() => {
jest.spyOn(Logger.prototype, 'warn').mockImplementation(() => undefined);
});

View File

@@ -0,0 +1,135 @@
import { Injectable, Logger } from '@nestjs/common';
import { RedisService } from '@nestjs-labs/nestjs-ioredis';
import type { Redis } from 'ioredis';
import { randomUUID } from 'node:crypto';
import {
GIT_SYNC_LOCK_PREFIX,
GIT_SYNC_LOCK_TTL_MS,
} from '../git-sync.constants';
/**
* The per-space lock used by the git-sync control plane: an in-process per-space
* mutex (no overlapping cycles on one instance) PLUS a Redis leader lock
* (single writer across replicas). Extracted from `GitSyncOrchestrator` so the
* locking primitive is a single reusable, independently testable unit
* (PR #119 refactor #2).
*/
@Injectable()
export class SpaceLockService {
private readonly logger = new Logger(SpaceLockService.name);
private readonly redis: Redis;
/** Unique per process instance — the leader-lock value (CAS on release). */
private readonly instanceId = randomUUID();
/** In-process per-space mutex: spaceIds with a cycle currently running. */
private readonly running = new Set<string>();
constructor(redisService: RedisService) {
this.redis = redisService.getOrThrow();
}
// --- Redis leader lock -----------------------------------------
/**
* Acquire per-space leadership: `SET <key> <instanceId> PX <ttl> NX` returns
* 'OK' only when the key did not exist. Any other reply means another replica
* holds it.
*/
private async acquire(spaceId: string): Promise<boolean> {
const ok = await this.redis.set(
GIT_SYNC_LOCK_PREFIX + spaceId,
this.instanceId,
'PX',
GIT_SYNC_LOCK_TTL_MS,
'NX',
);
return ok === 'OK';
}
/**
* Release the lock with a CAS Lua so we only delete it when WE still hold it
* (the value matches our instanceId) — never another replica's lock that took
* over after our TTL expired.
*/
private async release(spaceId: string): Promise<void> {
const lua =
'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end';
try {
await this.redis.eval(lua, 1, GIT_SYNC_LOCK_PREFIX + spaceId, this.instanceId);
} catch (err) {
this.logger.warn(
`git-sync: failed to release lock for space ${spaceId}: ${
err instanceof Error ? err.message : String(err)
}`,
);
}
}
/**
* CAS-guarded TTL refresh: extend the lock's TTL ONLY while WE still own it
* (the stored value matches our instanceId) — never extend another replica's
* lock that took over after our TTL expired. Used by the heartbeat in
* `withSpaceLock` so a long-running push (client-controlled receive-pack + the
* Docmost cycle) cannot outlive the lock and let a concurrent cycle race the
* working tree. Logs (warn) but never throws — a failed refresh must not break
* the cycle it is protecting.
*/
private async refreshLock(spaceId: string): Promise<void> {
const lua =
'if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("pexpire", KEYS[1], ARGV[2]) else return 0 end';
try {
await this.redis.eval(
lua,
1,
GIT_SYNC_LOCK_PREFIX + spaceId,
this.instanceId,
String(GIT_SYNC_LOCK_TTL_MS),
);
} catch (err) {
this.logger.warn(
`git-sync: failed to refresh lock for space ${spaceId}: ${
err instanceof Error ? err.message : String(err)
}`,
);
}
}
/**
* Run `fn` under the per-space lock: the in-process mutex (no overlapping
* cycles on this instance) AND the Redis leader lock (single writer across
* replicas). Returns `fn`'s result, or a skip sentinel when the lock could not
* be acquired — `{ skipped: 'in-progress' }` (this instance is mid-cycle) or
* `{ skipped: 'lock-held' }` (another replica holds the Redis lock). The mutex
* + Redis lock are always released in a `finally`, even when `fn` throws (the
* throw propagates to the caller). This is the single reusable wrapper shared
* by `runOnce` (the poll/admin cycle) and `ingestExternalPush` (a push from a
* git client over HTTP) so both serialize against each other identically.
*/
async withSpaceLock<T>(
spaceId: string,
fn: () => Promise<T>,
): Promise<T | { skipped: 'lock-held' | 'in-progress' }> {
if (this.running.has(spaceId)) {
return { skipped: 'in-progress' };
}
if (!(await this.acquire(spaceId))) {
return { skipped: 'lock-held' };
}
this.running.add(spaceId);
// Heartbeat: periodically (≈ TTL/3) extend the lock's TTL while `fn` runs so
// a long push (client-controlled receive-pack + the Docmost cycle) cannot
// outlive the fixed TTL and let a concurrent cycle race the working tree. The
// refresh is CAS-guarded (only extends while WE own it). `.unref()` keeps the
// timer from holding the event loop open; it is ALWAYS cleared in `finally`.
const heartbeat = setInterval(() => {
void this.refreshLock(spaceId);
}, Math.max(1, Math.floor(GIT_SYNC_LOCK_TTL_MS / 3)));
heartbeat.unref?.();
try {
return await fn();
} finally {
clearInterval(heartbeat);
this.running.delete(spaceId);
await this.release(spaceId);
}
}
}