F10: the content-exact reconnect dedup rebuilt boundaryLines only from lines
surviving the current batch, so after one reconnect at a shared timestamp it
forgot the dropped line — a SECOND reconnect at the same nanosecond then
re-emitted both as duplicates. Seed lastTimestamp/boundaryLines from
skipUntilTimestamp/skipBoundaryContents so the boundary set accumulates all
lines ever seen at the resume ts. Regression test (fails before, passes after).
F11: extract rfc3339ToUnixNanoSince into a testable logHelper module (sinceTimestamp.ts)
and cover it (standard ns, no fraction, sub-9 pad, >9 truncate); the controller
imports the single shared function.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
345 lines
15 KiB
TypeScript
345 lines
15 KiB
TypeScript
import { createLogStreamProcessor } from './logStream';
|
|
import { resetLineIdSequence } from './lineId';
|
|
|
|
const encoder = new TextEncoder();
|
|
|
|
function bytes(s: string): Uint8Array {
|
|
return encoder.encode(s);
|
|
}
|
|
|
|
// Build a real Docker multiplexed-stream frame: 1 byte stream-type, 3 zero
|
|
// bytes, a 4-byte big-endian payload length, then the payload bytes.
|
|
function frame(payload: Uint8Array | string, streamType = 1): Uint8Array {
|
|
const body = typeof payload === 'string' ? bytes(payload) : payload;
|
|
const out = new Uint8Array(8 + body.length);
|
|
out[0] = streamType;
|
|
const len = body.length;
|
|
out[4] = (len >>> 24) & 0xff;
|
|
out[5] = (len >>> 16) & 0xff;
|
|
out[6] = (len >>> 8) & 0xff;
|
|
out[7] = len & 0xff;
|
|
out.set(body, 8);
|
|
return out;
|
|
}
|
|
|
|
function concat(...parts: Uint8Array[]): Uint8Array {
|
|
const total = parts.reduce((n, p) => n + p.length, 0);
|
|
const out = new Uint8Array(total);
|
|
let off = 0;
|
|
parts.forEach((p) => {
|
|
out.set(p, off);
|
|
off += p.length;
|
|
});
|
|
return out;
|
|
}
|
|
|
|
beforeEach(() => {
|
|
resetLineIdSequence();
|
|
});
|
|
|
|
describe('createLogStreamProcessor (non-TTY, byte-level frame demux)', () => {
|
|
it('emits only completed lines and carries the partial remainder forward', () => {
|
|
const proc = createLogStreamProcessor({ stripHeaders: true });
|
|
|
|
// first chunk: one full frame then a frame whose line has no newline yet
|
|
const first = proc.push(concat(frame('hello\n'), frame('wor')));
|
|
expect(first.map((l) => l.line)).toEqual(['hello']);
|
|
|
|
// completing the partial line + a new one (across two frames)
|
|
const second = proc.push(concat(frame('ld\n'), frame('bye\n')));
|
|
expect(second.map((l) => l.line)).toEqual(['world', 'bye']);
|
|
});
|
|
|
|
it('returns nothing until a newline arrives, then flushes the trailing partial', () => {
|
|
const proc = createLogStreamProcessor({ stripHeaders: true });
|
|
|
|
expect(proc.push(frame('partial')).map((l) => l.line)).toEqual([]);
|
|
// stream ended without a trailing newline -> flush yields the remainder
|
|
expect(proc.flush().map((l) => l.line)).toEqual(['partial']);
|
|
});
|
|
|
|
it('strips the frame header from every framed line', () => {
|
|
const proc = createLogStreamProcessor({ stripHeaders: true });
|
|
const lines = proc.push(concat(frame('line-1\n'), frame('line-2\n')));
|
|
expect(lines.map((l) => l.line)).toEqual(['line-1', 'line-2']);
|
|
});
|
|
|
|
// F1: a frame whose payload length low-byte is 0x0a (10). A text-level
|
|
// demuxer that splits on '\n' before stripping headers would land inside the
|
|
// header and desync, mis-rendering "123456789\n" down to "9".
|
|
it('handles a frame whose length low-byte is 0x0a (payload of exactly 10 bytes)', () => {
|
|
const proc = createLogStreamProcessor({ stripHeaders: true });
|
|
const lines = proc.push(frame('123456789\n')); // payload length = 10 = 0x0a
|
|
expect(lines.map((l) => l.line)).toEqual(['123456789']);
|
|
});
|
|
|
|
// F2 (boundary): a chunk boundary falling between the 8-byte header and its
|
|
// payload.
|
|
it('handles a chunk split between the header and the payload', () => {
|
|
const proc = createLogStreamProcessor({ stripHeaders: true });
|
|
const full = frame('abc\n');
|
|
expect(proc.push(full.subarray(0, 8)).map((l) => l.line)).toEqual([]);
|
|
expect(proc.push(full.subarray(8)).map((l) => l.line)).toEqual(['abc']);
|
|
});
|
|
|
|
// F2: a length field with a byte >= 0x80 (200 = 0xC8), split across chunks.
|
|
// UTF-8-decoding the header would mangle 0xC8 and slip the offset.
|
|
it('handles a large frame (length byte >= 0x80) split across chunks', () => {
|
|
const proc = createLogStreamProcessor({ stripHeaders: true });
|
|
const payload = `${'x'.repeat(199)}\n`; // 200 bytes -> length byte 0xC8
|
|
const full = frame(payload);
|
|
expect(full[7]).toBe(0xc8);
|
|
expect(proc.push(full.subarray(0, 5)).map((l) => l.line)).toEqual([]);
|
|
const rest = proc.push(full.subarray(5));
|
|
expect(rest.map((l) => l.line)).toEqual(['x'.repeat(199)]);
|
|
});
|
|
|
|
// F2 (multibyte): a multibyte UTF-8 char split across two chunks must be
|
|
// reassembled before decoding (not turned into U+FFFD).
|
|
it('reassembles a multibyte UTF-8 char split across two chunks', () => {
|
|
const proc = createLogStreamProcessor({ stripHeaders: true });
|
|
const full = frame('aéb\n'); // é = 0xC3 0xA9
|
|
const eIdx = full.indexOf(0xc3);
|
|
expect(proc.push(full.subarray(0, eIdx + 1)).map((l) => l.line)).toEqual(
|
|
[]
|
|
);
|
|
const rest = proc.push(full.subarray(eIdx + 1));
|
|
expect(rest.map((l) => l.line)).toEqual(['aéb']);
|
|
});
|
|
|
|
it('drops a truncated final frame on flush instead of emitting garbage', () => {
|
|
const proc = createLogStreamProcessor({ stripHeaders: true });
|
|
const full = frame('never\n');
|
|
// only the header + 2 payload bytes arrive, then the stream ends
|
|
expect(proc.push(full.subarray(0, 10)).map((l) => l.line)).toEqual([]);
|
|
expect(proc.flush().map((l) => l.line)).toEqual([]);
|
|
});
|
|
|
|
// F3: a framed line terminated by \r\n must have its trailing CR stripped.
|
|
it('strips the trailing CR from a \\r\\n line ending (framed)', () => {
|
|
const proc = createLogStreamProcessor({ stripHeaders: true });
|
|
const lines = proc.push(frame('with-crlf\r\n'));
|
|
expect(lines.map((l) => l.line)).toEqual(['with-crlf']);
|
|
});
|
|
});
|
|
|
|
describe('createLogStreamProcessor (TTY, no headers)', () => {
|
|
it('passes lines through unchanged', () => {
|
|
const proc = createLogStreamProcessor({ stripHeaders: false });
|
|
const lines = proc.push(bytes('plain line\n'));
|
|
expect(lines.map((l) => l.line)).toEqual(['plain line']);
|
|
});
|
|
|
|
it('carries a partial line across chunks and flushes the remainder', () => {
|
|
const proc = createLogStreamProcessor({ stripHeaders: false });
|
|
expect(proc.push(bytes('par')).map((l) => l.line)).toEqual([]);
|
|
expect(proc.push(bytes('tial\n')).map((l) => l.line)).toEqual(['partial']);
|
|
expect(proc.push(bytes('tail')).map((l) => l.line)).toEqual([]);
|
|
expect(proc.flush().map((l) => l.line)).toEqual(['tail']);
|
|
});
|
|
|
|
// F3: a \r\n line ending must have its trailing CR stripped (non-TTY path).
|
|
it('strips the trailing CR from a \\r\\n line ending', () => {
|
|
const proc = createLogStreamProcessor({ stripHeaders: false });
|
|
const lines = proc.push(bytes('with-crlf\r\n'));
|
|
expect(lines.map((l) => l.line)).toEqual(['with-crlf']);
|
|
});
|
|
|
|
// F3: a remainder carrying a trailing CR (no terminating LF) must have the CR
|
|
// stripped on flush. Split across two chunks to exercise the buffered path.
|
|
it('strips a trailing CR on flush when the stream ends without a newline', () => {
|
|
const proc = createLogStreamProcessor({ stripHeaders: false });
|
|
expect(proc.push(bytes('hel')).map((l) => l.line)).toEqual([]);
|
|
expect(proc.push(bytes('lo\r')).map((l) => l.line)).toEqual([]);
|
|
expect(proc.flush().map((l) => l.line)).toEqual(['hello']);
|
|
});
|
|
});
|
|
|
|
describe('timestamps (always requested, optionally displayed)', () => {
|
|
const ts1 = '2024-01-01T00:00:00.000000001Z';
|
|
const ts2 = '2024-01-01T00:00:00.000000002Z';
|
|
|
|
it('strips the RFC3339 prefix when timestamps are hidden', () => {
|
|
const proc = createLogStreamProcessor({
|
|
stripHeaders: false,
|
|
withTimestamps: false,
|
|
streamHasTimestamps: true,
|
|
});
|
|
const lines = proc.push(bytes(`${ts1} hello\n`));
|
|
expect(lines.map((l) => l.line)).toEqual(['hello']);
|
|
expect(proc.getLastTimestamp()).toBe(ts1);
|
|
});
|
|
|
|
it('keeps the RFC3339 prefix when timestamps are shown', () => {
|
|
const proc = createLogStreamProcessor({
|
|
stripHeaders: false,
|
|
withTimestamps: true,
|
|
streamHasTimestamps: true,
|
|
});
|
|
const lines = proc.push(bytes(`${ts1} hello\n`));
|
|
expect(lines.map((l) => l.line)).toEqual([`${ts1} hello`]);
|
|
});
|
|
|
|
it('drops lines re-delivered at/before the reconnect resume point', () => {
|
|
const proc = createLogStreamProcessor({
|
|
stripHeaders: false,
|
|
withTimestamps: false,
|
|
streamHasTimestamps: true,
|
|
skipUntilTimestamp: ts1,
|
|
// the boundary line we had already shown (Docker re-delivers it inclusively)
|
|
skipBoundaryContents: [`${ts1} old`],
|
|
});
|
|
// ts1 is the inclusive boundary Docker re-delivers -> dropped; ts2 is new
|
|
const lines = proc.push(bytes(`${ts1} old\n${ts2} new\n`));
|
|
expect(lines.map((l) => l.line)).toEqual(['new']);
|
|
});
|
|
|
|
// F4: the real reconnect shape — the redelivered boundary line arrives in its
|
|
// OWN chunk (the whole batch is dropped, dropTo === lines.length), so the
|
|
// `skipping` flag must stay on and the NEXT chunk must keep dropping dups up
|
|
// to the resume point before emitting the first genuinely new line.
|
|
it('keeps dropping reconnect dups when the redelivered line arrives in its own chunk', () => {
|
|
const tsA = '2024-01-01T00:00:00.000000001Z';
|
|
const tsB = '2024-01-01T00:00:00.000000002Z';
|
|
const tsC = '2024-01-01T00:00:00.000000003Z';
|
|
const tsD = '2024-01-01T00:00:00.000000004Z';
|
|
const proc = createLogStreamProcessor({
|
|
stripHeaders: false,
|
|
withTimestamps: false,
|
|
streamHasTimestamps: true,
|
|
skipUntilTimestamp: tsB,
|
|
// dup-2 carries the resume timestamp and was already shown; dup-1 is before
|
|
// it and is dropped by the chronological compare.
|
|
skipBoundaryContents: [`${tsB} dup-2`],
|
|
});
|
|
|
|
// (1) only a redelivered boundary line (<= resume point) -> fully consumed
|
|
// by dedup; nothing emitted and skipping stays on.
|
|
expect(proc.push(bytes(`${tsA} dup-1\n`)).map((l) => l.line)).toEqual([]);
|
|
|
|
// (2) another dup + the first new line: because skipping was preserved the
|
|
// dup is still dropped, and only the new line is returned.
|
|
expect(
|
|
proc.push(bytes(`${tsB} dup-2\n${tsC} new\n`)).map((l) => l.line)
|
|
).toEqual(['new']);
|
|
|
|
// (3) a later line passes through untouched (no skipping anymore).
|
|
expect(proc.push(bytes(`${tsD} after\n`)).map((l) => l.line)).toEqual([
|
|
'after',
|
|
]);
|
|
});
|
|
|
|
// F1: two lines A and B share the exact same nanosecond timestamp; the
|
|
// connection drops after only A was shown. On reconnect `since=T` re-delivers
|
|
// BOTH. A `<= timestamp` dedup would drop both and LOSE B. Content-exact dedup
|
|
// (the boundary set is just [A]) drops A and keeps B.
|
|
it('does not lose a new line that shares the boundary timestamp with a duplicate', () => {
|
|
const tsShared = '2024-01-01T00:00:00.000000005Z';
|
|
const tsNext = '2024-01-01T00:00:00.000000006Z';
|
|
|
|
// First session: only line A (at tsShared) was delivered before the drop.
|
|
const first = createLogStreamProcessor({
|
|
stripHeaders: false,
|
|
withTimestamps: true,
|
|
streamHasTimestamps: true,
|
|
});
|
|
expect(first.push(bytes(`${tsShared} A\n`)).map((l) => l.line)).toEqual([
|
|
`${tsShared} A`,
|
|
]);
|
|
expect(first.getLastTimestamp()).toBe(tsShared);
|
|
// the boundary set handed to the reconnecting processor is exactly [A]
|
|
expect(first.getBoundaryLines()).toEqual([`${tsShared} A`]);
|
|
|
|
// Reconnect: Docker re-delivers A (duplicate) AND B (new, same timestamp).
|
|
const second = createLogStreamProcessor({
|
|
stripHeaders: false,
|
|
withTimestamps: true,
|
|
streamHasTimestamps: true,
|
|
skipUntilTimestamp: first.getLastTimestamp(),
|
|
skipBoundaryContents: first.getBoundaryLines(),
|
|
});
|
|
const redelivered = second.push(
|
|
bytes(`${tsShared} A\n${tsShared} B\n${tsNext} C\n`)
|
|
);
|
|
// A (genuine duplicate) is dropped; B (new, shares the boundary ts) survives.
|
|
expect(redelivered.map((l) => l.line)).toEqual([
|
|
`${tsShared} B`,
|
|
`${tsNext} C`,
|
|
]);
|
|
// and B is now part of the next boundary set alongside any A-twin.
|
|
expect(second.getBoundaryLines()).toEqual([`${tsNext} C`]);
|
|
});
|
|
|
|
// F1: a genuine duplicate at the boundary timestamp (nothing new shares it) is
|
|
// still dropped — the normal reconnect case is unchanged.
|
|
it('still drops a genuine duplicate at the boundary timestamp', () => {
|
|
const tsBoundary = '2024-01-01T00:00:00.000000007Z';
|
|
const tsAfter = '2024-01-01T00:00:00.000000008Z';
|
|
const proc = createLogStreamProcessor({
|
|
stripHeaders: false,
|
|
withTimestamps: false,
|
|
streamHasTimestamps: true,
|
|
skipUntilTimestamp: tsBoundary,
|
|
skipBoundaryContents: [`${tsBoundary} only`],
|
|
});
|
|
const lines = proc.push(bytes(`${tsBoundary} only\n${tsAfter} fresh\n`));
|
|
expect(lines.map((l) => l.line)).toEqual(['fresh']);
|
|
});
|
|
|
|
// F10: a SECOND reconnect at the same nanosecond timestamp must not re-emit
|
|
// duplicates. Lines A and B share ts T; the connection drops after only A was
|
|
// shown. Reconnect #1 (boundary set [A]) redelivers A,B -> drops A, keeps B,
|
|
// and — because the processor seeds its boundary state from the resume
|
|
// params — its boundary set becomes [A, B] (A is NOT forgotten). If a second
|
|
// reconnect happens at the same T before any newer-ts line arrives, it carries
|
|
// [A, B] forward and drops BOTH redelivered lines (no duplicate). Without the
|
|
// seeding, reconnect #1's boundary set would be just [B], and reconnect #2
|
|
// would fail to recognise A as a duplicate and re-emit both A and B.
|
|
it('does not duplicate lines on a second reconnect at the same nanosecond timestamp', () => {
|
|
const tsShared = '2024-01-01T00:00:00.000000005Z';
|
|
|
|
// Reconnect #1: resume at tsShared, having already shown A.
|
|
const first = createLogStreamProcessor({
|
|
stripHeaders: false,
|
|
withTimestamps: true,
|
|
streamHasTimestamps: true,
|
|
skipUntilTimestamp: tsShared,
|
|
skipBoundaryContents: [`${tsShared} A`],
|
|
});
|
|
// Docker redelivers A (duplicate) and B (new, same timestamp).
|
|
const round1 = first.push(bytes(`${tsShared} A\n${tsShared} B\n`));
|
|
expect(round1.map((l) => l.line)).toEqual([`${tsShared} B`]);
|
|
// A must NOT be forgotten: the boundary set now carries both A and B.
|
|
expect(first.getBoundaryLines()).toEqual([
|
|
`${tsShared} A`,
|
|
`${tsShared} B`,
|
|
]);
|
|
|
|
// Reconnect #2 at the same timestamp, before any newer-ts line arrived,
|
|
// carrying reconnect #1's boundary set forward.
|
|
const second = createLogStreamProcessor({
|
|
stripHeaders: false,
|
|
withTimestamps: true,
|
|
streamHasTimestamps: true,
|
|
skipUntilTimestamp: first.getLastTimestamp(),
|
|
skipBoundaryContents: first.getBoundaryLines(),
|
|
});
|
|
// Docker redelivers A and B again; both are genuine duplicates now.
|
|
const round2 = second.push(bytes(`${tsShared} A\n${tsShared} B\n`));
|
|
expect(round2.map((l) => l.line)).toEqual([]);
|
|
});
|
|
});
|
|
|
|
describe('stable line ids (append model)', () => {
|
|
it('assigns unique, strictly increasing ids across successive chunks', () => {
|
|
const proc = createLogStreamProcessor({ stripHeaders: false });
|
|
const a = proc.push(bytes('a\nb\n'));
|
|
const b = proc.push(bytes('c\n'));
|
|
|
|
const ids = [...a, ...b].map((l) => l.id);
|
|
expect(new Set(ids).size).toBe(ids.length); // all unique
|
|
const sorted = [...ids].sort((x, y) => x - y);
|
|
expect(ids).toEqual(sorted); // monotonically increasing in arrival order
|
|
});
|
|
});
|