Compare commits

...

8 Commits

Author SHA1 Message Date
claude code agent
8ad42a1a45 fix(logs): seed reconnect boundary state from resume params; extract+test since parser (F10,F11)
F10: the content-exact reconnect dedup rebuilt boundaryLines only from lines
     surviving the current batch, so after one reconnect at a shared timestamp it
     forgot the dropped line — a SECOND reconnect at the same nanosecond then
     re-emitted both as duplicates. Seed lastTimestamp/boundaryLines from
     skipUntilTimestamp/skipBoundaryContents so the boundary set accumulates all
     lines ever seen at the resume ts. Regression test (fails before, passes after).
F11: extract rfc3339ToUnixNanoSince into a testable logHelper module (sinceTimestamp.ts)
     and cover it (standard ns, no fraction, sub-9 pad, >9 truncate); the controller
     imports the single shared function.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 20:42:32 +03:00
claude code agent
9768d7bb99 fix(logs): content-exact reconnect dedup; flush partial on pause; unify since; lines fallback (F1-F5)
Maintainer pre-merge review follow-up:
F1: dedup reconnect redeliveries by EXACT boundary-line content, not just
    timestamp <= resume — a new line that merely shares the boundary nanosecond
    with a redelivered duplicate is no longer dropped (skipBoundaryContents +
    pendingBoundary). Test proves line B survives while a real dup is dropped.
F2: flush the buffered partial line on intentional pause (not reconnect) and
    strip those cosmetic lines on resume so since re-delivers the full line with
    no stale-partial twin; resume point is not advanced past the partial.
F3: unify the since param to <unix>.<nanos> for initial and reconnect.
F4: fall back to 100 lines when the Lines field is cleared (avoid tail=all).
F5: memoize the API-version pin per session; warn on frame desync.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 20:05:18 +03:00
claude code agent
da6933c218 refactor(logs): collapse no-op ternary, drop speculative export, fix stale comment (F8,F9,F10)
F8: formatJSONLogs plain-text fallback — both arms of `withTimestamps ? rawText
    : text` yield rawText (text === rawText when !withTimestamps), so use rawText.
F9: controllerLogsController comment referenced the old 'Live logs' label removed
    by F7 — update it to 'Auto-refresh logs'.
F10: stripHeadersFunc has no external importers — drop the speculative export.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 15:37:37 +03:00
claude code agent
d520aec159 docs(logs): neutral auto-refresh toggle label/tooltip in shared log viewer (F7)
logViewer.html is shared by the container, service and task log views, but only
the container view is a live HTTP stream — service/task still poll. Revert the
toggle wording to a mode-neutral 'Auto-refresh logs' / 'pauses log collection'
so it is accurate for both, keeping the added auto-scroll clarification.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 14:42:19 +03:00
claude code agent
f4f296fc05 fix(logs): drop partial line on reconnect, stabilize poll row ids, cover CRLF/dedup (F1-F6)
F1: stop emitting/committing an unfinished line in onEnd/onError reconnect
    paths; since-based reconnect redelivers the full line.
F2: give service/task poll rows positionally-stable ids so track by log.id
    reuses DOM rows and text selection survives the 3s poll.
F3/F4: tests for CRLF stripping and reconnect-dedup across separate chunks.
F5: correct the stale refreshRate comment.
F6: unroll the side-effecting IIFE-in-ternary into if/else.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 14:08:46 +03:00
claude code agent
960d43e70b fix(logs): byte-level frame demux + reconnect/since + notify throttle (F1-F8)
Container log live-stream review fixes (frontend only):

- F1/F2: demux Docker's multiplexed (non-TTY) stream at the BYTE level by
  frame length, decoding only payloads. Previously the stream was text-decoded
  whole and cut on '\n' before stripping 8-byte headers, which desynced when a
  length low-byte was 0x0a or a header byte was >= 0x80. streamContainerLogs
  now hands the processor raw Uint8Array chunks; createLogStreamProcessor is
  rewritten to parse frames, concatenate payloads, split lines on 0x0a, and
  UTF-8-decode complete lines. formatLogs is called without stripHeaders so
  headers are not stripped twice. Added explicit byte-frame tests.
- F3: request timestamps=1 internally and resume reconnects from the parsed
  RFC3339 timestamp of the last line (not client wall-clock); strip the prefix
  before display when the user's timestamps toggle is off; dedup the inclusive
  `since` boundary lines on reconnect.
- F4: run the fetch stream URL through dockerMaxAPIVersionInterceptor so it
  matches the axios getContainerLogs version pinning.
- F5: notify on stream error once per reconnect loop, not every 3s retry.
- F6: resuming Live no longer wipes the buffer (startStream(false)) and
  continues from `since`.
- F7: service/task logs still poll; documented the re-render limitation
  (out of scope: issue #2 is container logs).
- F8: flush the trailing partial line on the error path too (parity with onEnd).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 07:32:16 +03:00
claude code agent
c3cdb8007e feat(logs): live HTTP stream + append-only render for container logs
Replace the 3s $interval polling of container logs with a live HTTP
stream, and stop re-writing already-rendered lines (fixes selection bug).

- streamContainerLogs (containers.service.ts): fetch + ReadableStream
  reader with follow=1, same-origin credentials:'include' (httpOnly JWT
  cookie; CSRF only guards mutations), agent-target / manager-operation
  headers replicated for Agent/Edge, AbortSignal-driven lifetime.
- containerLogsController: stream instead of poll; append parsed lines
  into the buffer (push, never replace), cap at 5000 lines trimming from
  the head; AbortController on pause/destroy/param-change; reconnect with
  3s backoff resuming from `since` (dropping tail) on stream end/error;
  Live toggle pauses/resumes the stream; tail/since/timestamps changes
  restart the stream.
- log-viewer: `track by log.id` (was $index), filtering moved out of the
  template into the controller (applyFilter via $watchCollection), removed
  inert force-glue, decoupled auto-scroll from log collection, relabelled
  "Auto-refresh logs" -> "Live logs", clearer empty states.

Backend unchanged (logs already stream transparently through the Docker
proxy). Shared task/service log views keep working via the new id'd lines.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 07:03:23 +03:00
claude code agent
8ca0608b21 fix(logs): stable line ids, JSON string guard, streaming demuxer helper
Foundation for append-only log rendering and HTTP log streaming.

- FormattedLine gains a stable, monotonically increasing `id` (new
  lineId.ts sequence), assigned centrally in formatLogs. Internal
  formatters now return id-less FormattedLineContent. This lets the
  viewer use `track by log.id` so already-rendered rows are never
  re-bound (fixes the text-selection collapse).
- formatJSONLine: runtime guard so a bare JSON string/array log line
  falls back to plain text instead of rendering Object.keys as
  `0=h 1=e ...`.
- createLogStreamProcessor: stateful demuxer that buffers streamed text,
  emits only complete lines (carrying the partial remainder), and reuses
  formatLogs/stripHeadersFunc for Docker 8-byte frame demux.
- Unit tests for the demuxer, stable-id assignment and the JSON guard.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 07:03:10 +03:00
20 changed files with 1334 additions and 71 deletions

View File

@@ -10,7 +10,7 @@
label-class="'col-sm-2'"
checked="$ctrl.state.logCollection"
label="'Auto-refresh logs'"
tooltip="'Disabling this option allows you to pause the log collection process and the auto-scrolling.'"
tooltip="'Disabling this option pauses log collection. Auto-scroll follows the bottom of the log and pauses on its own when you scroll up.'"
on-change="($ctrl.handleLogsCollectionChange)"
></por-switch-field>
</div>
@@ -85,10 +85,10 @@
<div class="row" style="height: 54%">
<div class="col-sm-12" style="height: 100%">
<pre ng-class="{ wrap_lines: $ctrl.state.wrapLines }" class="log_viewer" scroll-glue="$ctrl.state.autoScroll" force-glue>
<div ng-repeat="log in $ctrl.state.filteredLogs = ($ctrl.data | filter:{ 'line': $ctrl.state.search }) track by $index" class="line" ng-if="log.line"><p class="inner_line" ng-click="$ctrl.selectLine(log.line)" ng-class="{ 'line_selected': $ctrl.state.selectedLines.indexOf(log.line) > -1 }"><span ng-repeat="span in log.spans track by $index" ng-style="{ 'color': span.fgColor, 'background-color': span.bgColor, 'font-weight': span.fontWeight }">{{ span.text }}</span></p></div>
<div ng-if="!$ctrl.state.filteredLogs.length" class="line"><p class="inner_line">No log line matching the '{{ $ctrl.state.search }}' filter</p></div>
<div ng-if="$ctrl.state.filteredLogs.length === 1 && !$ctrl.state.filteredLogs[0].line" class="line"><p class="inner_line">No logs available</p></div>
<pre ng-class="{ wrap_lines: $ctrl.state.wrapLines }" class="log_viewer" scroll-glue="$ctrl.state.autoScroll">
<div ng-repeat="log in $ctrl.state.filteredLogs track by log.id" class="line" ng-if="log.line"><p class="inner_line" ng-click="$ctrl.selectLine(log.line)" ng-class="{ 'line_selected': $ctrl.state.selectedLines.indexOf(log.line) > -1 }"><span ng-repeat="span in log.spans track by $index" ng-style="{ 'color': span.fgColor, 'background-color': span.bgColor, 'font-weight': span.fontWeight }">{{ span.text }}</span></p></div>
<div ng-if="!$ctrl.state.filteredLogs.length && $ctrl.state.search" class="line"><p class="inner_line">No log line matching the '{{ $ctrl.state.search }}' filter</p></div>
<div ng-if="!$ctrl.state.filteredLogs.length && !$ctrl.state.search" class="line"><p class="inner_line">No logs available</p></div>
</pre>
</div>
</div>

View File

@@ -27,11 +27,34 @@ angular.module('portainer.docker').controller('LogViewerController', [
this.handleLogsCollectionChange = handleLogsCollectionChange.bind(this);
this.handleLogsWrapLinesChange = handleLogsWrapLinesChange.bind(this);
this.handleDisplayTimestampsChange = handleDisplayTimestampsChange.bind(this);
this.applyFilter = applyFilter.bind(this);
this.$onInit = function () {
this.applyFilter();
// Compute the filtered list in the controller (not in the template) so we
// do not rebuild `filteredLogs` on every digest. `$watchCollection` only
// fires when lines are actually appended; combined with `track by log.id`
// in the template, already-rendered rows are never re-bound — so a live
// text selection survives incoming log lines.
$scope.$watchCollection(() => this.data, this.applyFilter);
$scope.$watch(() => this.state.search, this.applyFilter);
};
function applyFilter() {
const data = this.data || [];
const search = (this.state.search || '').toLowerCase();
this.state.filteredLogs = search
? data.filter((log) => log.line && log.line.toLowerCase().indexOf(search) > -1)
: data;
}
function handleLogsCollectionChange(enabled) {
$scope.$evalAsync(() => {
// Decouple Live (log collection) from auto-scroll: pausing the stream no
// longer also forces auto-scroll off, and auto-scroll is driven by
// scroll-glue (it disengages when the user scrolls up, re-engages at the
// bottom) so reading/selecting is never yanked around.
this.state.logCollection = enabled;
this.state.autoScroll = enabled;
this.logCollectionChange(enabled);
});
}

View File

@@ -0,0 +1,26 @@
import { formatJSONLine } from './formatJSONLogs';
describe('formatJSONLine 0=d 1=e guard', () => {
it('renders a bare JSON string as plain text (not index=char pairs)', () => {
const raw = '"hello"';
const lines = formatJSONLine(raw);
expect(lines).toHaveLength(1);
// faithfully passes the raw line through instead of iterating its chars
expect(lines[0].line).toBe(raw);
// would previously have produced "0=h 1=e 2=l ..." via Object.keys on a string
expect(lines[0].line).not.toContain('0=h');
});
it('renders a bare JSON array as plain text', () => {
const raw = '[1,2,3]';
const lines = formatJSONLine(raw);
expect(lines).toHaveLength(1);
expect(lines[0].line).toBe(raw);
});
it('still parses a real JSON-object log line', () => {
const lines = formatJSONLine('{"level":"info","message":"hi"}');
expect(lines[0].line).toContain('hi');
expect(lines[0].line).not.toContain('0=');
});
});

View File

@@ -1,6 +1,11 @@
import { without } from 'lodash';
import { FormattedLine, Span, JSONLogs, TIMESTAMP_LENGTH } from './types';
import {
FormattedLineContent,
Span,
JSONLogs,
TIMESTAMP_LENGTH,
} from './types';
import {
formatCaller,
formatKeyValuePair,
@@ -17,14 +22,23 @@ function removeKnownKeys(keys: string[]) {
export function formatJSONLine(
rawText: string,
withTimestamps?: boolean
): FormattedLine[] {
): FormattedLineContent[] {
const spans: Span[] = [];
const lines: FormattedLine[] = [];
const lines: FormattedLineContent[] = [];
let line = '';
const text = withTimestamps ? rawText.substring(TIMESTAMP_LENGTH) : rawText;
const json: JSONLogs = JSON.parse(text);
const parsed: unknown = JSON.parse(text);
// Only treat the line as a structured JSON log when it actually parses to a
// plain object. A line serialized as a bare JSON string (e.g. `"hello"`)
// parses to a string, whose `Object.keys` is `['0','1',...]` — which used to
// render as `0=h 1=e ...`. Fall back to the plain-text path in that case.
if (typeof parsed !== 'object' || parsed === null || Array.isArray(parsed)) {
const plain = rawText;
return [{ line: plain, spans: [{ text: plain }] }];
}
const json = parsed as JSONLogs;
const { time, level, caller, message, stack_trace: stackTrace } = json;
const keys = removeKnownKeys(Object.keys(json));

View File

@@ -0,0 +1,29 @@
import { formatLogs } from './formatLogs';
import { resetLineIdSequence } from './lineId';
beforeEach(() => {
resetLineIdSequence();
});
describe('formatLogs stable ids', () => {
it('gives every line a unique id within a call', () => {
const lines = formatLogs('one\ntwo\nthree\n');
const ids = lines.map((l) => l.id);
expect(new Set(ids).size).toBe(ids.length);
});
it('continues the id sequence across calls (so appended chunks never collide)', () => {
const first = formatLogs('a\nb\n');
const second = formatLogs('c\nd\n');
const allIds = [...first, ...second].map((l) => l.id);
expect(new Set(allIds).size).toBe(allIds.length);
// every id in the second batch is greater than every id in the first
const maxFirst = Math.max(...first.map((l) => l.id));
expect(Math.min(...second.map((l) => l.id))).toBeGreaterThan(maxFirst);
});
it('preserves the parsed text content', () => {
const lines = formatLogs('hello\nworld\n');
expect(lines.map((l) => l.line)).toEqual(['hello', 'world']);
});
});

View File

@@ -9,7 +9,14 @@ import {
} from './colors';
import { formatJSONLine } from './formatJSONLogs';
import { formatZerologLogs, ZerologRegex } from './formatZerologLogs';
import { Token, Span, TIMESTAMP_LENGTH, FormattedLine } from './types';
import { nextLineId } from './lineId';
import {
Token,
Span,
TIMESTAMP_LENGTH,
FormattedLine,
FormattedLineContent,
} from './types';
type FormatOptions = {
stripHeaders?: boolean;
@@ -43,7 +50,7 @@ export function formatLogs(
}
const tokens: Token[][] = tokenize(logs);
const formattedLogs: FormattedLine[] = [];
const formattedLogs: FormattedLineContent[] = [];
let fgColor: string | undefined;
let bgColor: string | undefined;
@@ -118,9 +125,20 @@ export function formatLogs(
formattedLogs.push({ line, spans });
}
return formattedLogs;
// Assign the stable id centrally, once, here — so every line (plain, JSON,
// zerolog, stack-trace) gets a monotonic id regardless of which formatter
// produced it. Enables `track by log.id` in the viewer template.
return formattedLogs.map(
(content): FormattedLine => ({ id: nextLineId(), ...content })
);
}
// Strips Docker's 8-byte multiplexed-stream headers from a non-TTY log buffer:
// drops the leading header, then every header that follows a newline. This
// text-level strip is only safe on a fully-buffered body (the AngularJS polling
// services for container/service/task logs) where every line starts on a frame
// boundary. The live-stream path (logStream.ts) demuxes frames at the byte
// level instead and calls formatLogs WITHOUT `stripHeaders`.
function stripHeadersFunc(logs: string) {
return logs.substring(8).replace(/\r?\n(.{8})/g, '\n');
}

View File

@@ -7,7 +7,7 @@ import {
formatTime,
} from './formatters';
import {
FormattedLine,
FormattedLineContent,
JSONStackTrace,
Level,
Span,
@@ -50,7 +50,7 @@ type Pair = {
export function formatZerologLogs(rawText: string, withTimestamps?: boolean) {
const spans: Span[] = [];
const lines: FormattedLine[] = [];
const lines: FormattedLineContent[] = [];
let line = '';
const text = withTimestamps ? rawText.substring(TIMESTAMP_LENGTH) : rawText;

View File

@@ -1,7 +1,13 @@
import { format } from 'date-fns';
import { takeRight } from 'lodash';
import { Span, Level, Colors, JSONStackTrace, FormattedLine } from './types';
import {
Span,
Level,
Colors,
JSONStackTrace,
FormattedLineContent,
} from './types';
const spaceSpan: Span = { text: ' ' };
@@ -141,7 +147,7 @@ export function formatKeyValuePair(
export function formatStackTrace(
stackTrace: JSONStackTrace | undefined,
lines: FormattedLine[]
lines: FormattedLineContent[]
) {
if (stackTrace) {
stackTrace.forEach(({ func, line: lineNumber, source }) => {

View File

@@ -1,3 +1,6 @@
export { formatLogs } from './formatLogs';
export { concatLogsToString } from './concatLogsToString';
export { NEW_LINE_BREAKER } from './constants';
export { createLogStreamProcessor } from './logStream';
export type { LogStreamProcessor } from './logStream';
export { rfc3339ToUnixNanoSince } from './sinceTimestamp';

View File

@@ -0,0 +1,17 @@
// Monotonically increasing, process-wide sequence used to give every rendered
// log line a stable id. Because it never resets during a session, lines parsed
// from later stream chunks always get ids greater than earlier ones, so the
// AngularJS `track by log.id` repeat treats already-rendered rows as unchanged
// and never rewrites their DOM text nodes (which is what was collapsing text
// selections on every poll). See logHelper/types.ts (FormattedLine).
let sequence = 0;
export function nextLineId() {
return sequence++;
}
// Test-only helper to make id assignments deterministic between unit tests.
export function resetLineIdSequence() {
sequence = 0;
}

View File

@@ -0,0 +1,344 @@
import { createLogStreamProcessor } from './logStream';
import { resetLineIdSequence } from './lineId';
const encoder = new TextEncoder();
function bytes(s: string): Uint8Array {
return encoder.encode(s);
}
// Build a real Docker multiplexed-stream frame: 1 byte stream-type, 3 zero
// bytes, a 4-byte big-endian payload length, then the payload bytes.
function frame(payload: Uint8Array | string, streamType = 1): Uint8Array {
const body = typeof payload === 'string' ? bytes(payload) : payload;
const out = new Uint8Array(8 + body.length);
out[0] = streamType;
const len = body.length;
out[4] = (len >>> 24) & 0xff;
out[5] = (len >>> 16) & 0xff;
out[6] = (len >>> 8) & 0xff;
out[7] = len & 0xff;
out.set(body, 8);
return out;
}
function concat(...parts: Uint8Array[]): Uint8Array {
const total = parts.reduce((n, p) => n + p.length, 0);
const out = new Uint8Array(total);
let off = 0;
parts.forEach((p) => {
out.set(p, off);
off += p.length;
});
return out;
}
beforeEach(() => {
resetLineIdSequence();
});
describe('createLogStreamProcessor (non-TTY, byte-level frame demux)', () => {
it('emits only completed lines and carries the partial remainder forward', () => {
const proc = createLogStreamProcessor({ stripHeaders: true });
// first chunk: one full frame then a frame whose line has no newline yet
const first = proc.push(concat(frame('hello\n'), frame('wor')));
expect(first.map((l) => l.line)).toEqual(['hello']);
// completing the partial line + a new one (across two frames)
const second = proc.push(concat(frame('ld\n'), frame('bye\n')));
expect(second.map((l) => l.line)).toEqual(['world', 'bye']);
});
it('returns nothing until a newline arrives, then flushes the trailing partial', () => {
const proc = createLogStreamProcessor({ stripHeaders: true });
expect(proc.push(frame('partial')).map((l) => l.line)).toEqual([]);
// stream ended without a trailing newline -> flush yields the remainder
expect(proc.flush().map((l) => l.line)).toEqual(['partial']);
});
it('strips the frame header from every framed line', () => {
const proc = createLogStreamProcessor({ stripHeaders: true });
const lines = proc.push(concat(frame('line-1\n'), frame('line-2\n')));
expect(lines.map((l) => l.line)).toEqual(['line-1', 'line-2']);
});
// F1: a frame whose payload length low-byte is 0x0a (10). A text-level
// demuxer that splits on '\n' before stripping headers would land inside the
// header and desync, mis-rendering "123456789\n" down to "9".
it('handles a frame whose length low-byte is 0x0a (payload of exactly 10 bytes)', () => {
const proc = createLogStreamProcessor({ stripHeaders: true });
const lines = proc.push(frame('123456789\n')); // payload length = 10 = 0x0a
expect(lines.map((l) => l.line)).toEqual(['123456789']);
});
// F2 (boundary): a chunk boundary falling between the 8-byte header and its
// payload.
it('handles a chunk split between the header and the payload', () => {
const proc = createLogStreamProcessor({ stripHeaders: true });
const full = frame('abc\n');
expect(proc.push(full.subarray(0, 8)).map((l) => l.line)).toEqual([]);
expect(proc.push(full.subarray(8)).map((l) => l.line)).toEqual(['abc']);
});
// F2: a length field with a byte >= 0x80 (200 = 0xC8), split across chunks.
// UTF-8-decoding the header would mangle 0xC8 and slip the offset.
it('handles a large frame (length byte >= 0x80) split across chunks', () => {
const proc = createLogStreamProcessor({ stripHeaders: true });
const payload = `${'x'.repeat(199)}\n`; // 200 bytes -> length byte 0xC8
const full = frame(payload);
expect(full[7]).toBe(0xc8);
expect(proc.push(full.subarray(0, 5)).map((l) => l.line)).toEqual([]);
const rest = proc.push(full.subarray(5));
expect(rest.map((l) => l.line)).toEqual(['x'.repeat(199)]);
});
// F2 (multibyte): a multibyte UTF-8 char split across two chunks must be
// reassembled before decoding (not turned into U+FFFD).
it('reassembles a multibyte UTF-8 char split across two chunks', () => {
const proc = createLogStreamProcessor({ stripHeaders: true });
const full = frame('aéb\n'); // é = 0xC3 0xA9
const eIdx = full.indexOf(0xc3);
expect(proc.push(full.subarray(0, eIdx + 1)).map((l) => l.line)).toEqual(
[]
);
const rest = proc.push(full.subarray(eIdx + 1));
expect(rest.map((l) => l.line)).toEqual(['aéb']);
});
it('drops a truncated final frame on flush instead of emitting garbage', () => {
const proc = createLogStreamProcessor({ stripHeaders: true });
const full = frame('never\n');
// only the header + 2 payload bytes arrive, then the stream ends
expect(proc.push(full.subarray(0, 10)).map((l) => l.line)).toEqual([]);
expect(proc.flush().map((l) => l.line)).toEqual([]);
});
// F3: a framed line terminated by \r\n must have its trailing CR stripped.
it('strips the trailing CR from a \\r\\n line ending (framed)', () => {
const proc = createLogStreamProcessor({ stripHeaders: true });
const lines = proc.push(frame('with-crlf\r\n'));
expect(lines.map((l) => l.line)).toEqual(['with-crlf']);
});
});
describe('createLogStreamProcessor (TTY, no headers)', () => {
it('passes lines through unchanged', () => {
const proc = createLogStreamProcessor({ stripHeaders: false });
const lines = proc.push(bytes('plain line\n'));
expect(lines.map((l) => l.line)).toEqual(['plain line']);
});
it('carries a partial line across chunks and flushes the remainder', () => {
const proc = createLogStreamProcessor({ stripHeaders: false });
expect(proc.push(bytes('par')).map((l) => l.line)).toEqual([]);
expect(proc.push(bytes('tial\n')).map((l) => l.line)).toEqual(['partial']);
expect(proc.push(bytes('tail')).map((l) => l.line)).toEqual([]);
expect(proc.flush().map((l) => l.line)).toEqual(['tail']);
});
// F3: a \r\n line ending must have its trailing CR stripped (non-TTY path).
it('strips the trailing CR from a \\r\\n line ending', () => {
const proc = createLogStreamProcessor({ stripHeaders: false });
const lines = proc.push(bytes('with-crlf\r\n'));
expect(lines.map((l) => l.line)).toEqual(['with-crlf']);
});
// F3: a remainder carrying a trailing CR (no terminating LF) must have the CR
// stripped on flush. Split across two chunks to exercise the buffered path.
it('strips a trailing CR on flush when the stream ends without a newline', () => {
const proc = createLogStreamProcessor({ stripHeaders: false });
expect(proc.push(bytes('hel')).map((l) => l.line)).toEqual([]);
expect(proc.push(bytes('lo\r')).map((l) => l.line)).toEqual([]);
expect(proc.flush().map((l) => l.line)).toEqual(['hello']);
});
});
describe('timestamps (always requested, optionally displayed)', () => {
const ts1 = '2024-01-01T00:00:00.000000001Z';
const ts2 = '2024-01-01T00:00:00.000000002Z';
it('strips the RFC3339 prefix when timestamps are hidden', () => {
const proc = createLogStreamProcessor({
stripHeaders: false,
withTimestamps: false,
streamHasTimestamps: true,
});
const lines = proc.push(bytes(`${ts1} hello\n`));
expect(lines.map((l) => l.line)).toEqual(['hello']);
expect(proc.getLastTimestamp()).toBe(ts1);
});
it('keeps the RFC3339 prefix when timestamps are shown', () => {
const proc = createLogStreamProcessor({
stripHeaders: false,
withTimestamps: true,
streamHasTimestamps: true,
});
const lines = proc.push(bytes(`${ts1} hello\n`));
expect(lines.map((l) => l.line)).toEqual([`${ts1} hello`]);
});
it('drops lines re-delivered at/before the reconnect resume point', () => {
const proc = createLogStreamProcessor({
stripHeaders: false,
withTimestamps: false,
streamHasTimestamps: true,
skipUntilTimestamp: ts1,
// the boundary line we had already shown (Docker re-delivers it inclusively)
skipBoundaryContents: [`${ts1} old`],
});
// ts1 is the inclusive boundary Docker re-delivers -> dropped; ts2 is new
const lines = proc.push(bytes(`${ts1} old\n${ts2} new\n`));
expect(lines.map((l) => l.line)).toEqual(['new']);
});
// F4: the real reconnect shape — the redelivered boundary line arrives in its
// OWN chunk (the whole batch is dropped, dropTo === lines.length), so the
// `skipping` flag must stay on and the NEXT chunk must keep dropping dups up
// to the resume point before emitting the first genuinely new line.
it('keeps dropping reconnect dups when the redelivered line arrives in its own chunk', () => {
const tsA = '2024-01-01T00:00:00.000000001Z';
const tsB = '2024-01-01T00:00:00.000000002Z';
const tsC = '2024-01-01T00:00:00.000000003Z';
const tsD = '2024-01-01T00:00:00.000000004Z';
const proc = createLogStreamProcessor({
stripHeaders: false,
withTimestamps: false,
streamHasTimestamps: true,
skipUntilTimestamp: tsB,
// dup-2 carries the resume timestamp and was already shown; dup-1 is before
// it and is dropped by the chronological compare.
skipBoundaryContents: [`${tsB} dup-2`],
});
// (1) only a redelivered boundary line (<= resume point) -> fully consumed
// by dedup; nothing emitted and skipping stays on.
expect(proc.push(bytes(`${tsA} dup-1\n`)).map((l) => l.line)).toEqual([]);
// (2) another dup + the first new line: because skipping was preserved the
// dup is still dropped, and only the new line is returned.
expect(
proc.push(bytes(`${tsB} dup-2\n${tsC} new\n`)).map((l) => l.line)
).toEqual(['new']);
// (3) a later line passes through untouched (no skipping anymore).
expect(proc.push(bytes(`${tsD} after\n`)).map((l) => l.line)).toEqual([
'after',
]);
});
// F1: two lines A and B share the exact same nanosecond timestamp; the
// connection drops after only A was shown. On reconnect `since=T` re-delivers
// BOTH. A `<= timestamp` dedup would drop both and LOSE B. Content-exact dedup
// (the boundary set is just [A]) drops A and keeps B.
it('does not lose a new line that shares the boundary timestamp with a duplicate', () => {
const tsShared = '2024-01-01T00:00:00.000000005Z';
const tsNext = '2024-01-01T00:00:00.000000006Z';
// First session: only line A (at tsShared) was delivered before the drop.
const first = createLogStreamProcessor({
stripHeaders: false,
withTimestamps: true,
streamHasTimestamps: true,
});
expect(first.push(bytes(`${tsShared} A\n`)).map((l) => l.line)).toEqual([
`${tsShared} A`,
]);
expect(first.getLastTimestamp()).toBe(tsShared);
// the boundary set handed to the reconnecting processor is exactly [A]
expect(first.getBoundaryLines()).toEqual([`${tsShared} A`]);
// Reconnect: Docker re-delivers A (duplicate) AND B (new, same timestamp).
const second = createLogStreamProcessor({
stripHeaders: false,
withTimestamps: true,
streamHasTimestamps: true,
skipUntilTimestamp: first.getLastTimestamp(),
skipBoundaryContents: first.getBoundaryLines(),
});
const redelivered = second.push(
bytes(`${tsShared} A\n${tsShared} B\n${tsNext} C\n`)
);
// A (genuine duplicate) is dropped; B (new, shares the boundary ts) survives.
expect(redelivered.map((l) => l.line)).toEqual([
`${tsShared} B`,
`${tsNext} C`,
]);
// and B is now part of the next boundary set alongside any A-twin.
expect(second.getBoundaryLines()).toEqual([`${tsNext} C`]);
});
// F1: a genuine duplicate at the boundary timestamp (nothing new shares it) is
// still dropped — the normal reconnect case is unchanged.
it('still drops a genuine duplicate at the boundary timestamp', () => {
const tsBoundary = '2024-01-01T00:00:00.000000007Z';
const tsAfter = '2024-01-01T00:00:00.000000008Z';
const proc = createLogStreamProcessor({
stripHeaders: false,
withTimestamps: false,
streamHasTimestamps: true,
skipUntilTimestamp: tsBoundary,
skipBoundaryContents: [`${tsBoundary} only`],
});
const lines = proc.push(bytes(`${tsBoundary} only\n${tsAfter} fresh\n`));
expect(lines.map((l) => l.line)).toEqual(['fresh']);
});
// F10: a SECOND reconnect at the same nanosecond timestamp must not re-emit
// duplicates. Lines A and B share ts T; the connection drops after only A was
// shown. Reconnect #1 (boundary set [A]) redelivers A,B -> drops A, keeps B,
// and — because the processor seeds its boundary state from the resume
// params — its boundary set becomes [A, B] (A is NOT forgotten). If a second
// reconnect happens at the same T before any newer-ts line arrives, it carries
// [A, B] forward and drops BOTH redelivered lines (no duplicate). Without the
// seeding, reconnect #1's boundary set would be just [B], and reconnect #2
// would fail to recognise A as a duplicate and re-emit both A and B.
it('does not duplicate lines on a second reconnect at the same nanosecond timestamp', () => {
const tsShared = '2024-01-01T00:00:00.000000005Z';
// Reconnect #1: resume at tsShared, having already shown A.
const first = createLogStreamProcessor({
stripHeaders: false,
withTimestamps: true,
streamHasTimestamps: true,
skipUntilTimestamp: tsShared,
skipBoundaryContents: [`${tsShared} A`],
});
// Docker redelivers A (duplicate) and B (new, same timestamp).
const round1 = first.push(bytes(`${tsShared} A\n${tsShared} B\n`));
expect(round1.map((l) => l.line)).toEqual([`${tsShared} B`]);
// A must NOT be forgotten: the boundary set now carries both A and B.
expect(first.getBoundaryLines()).toEqual([
`${tsShared} A`,
`${tsShared} B`,
]);
// Reconnect #2 at the same timestamp, before any newer-ts line arrived,
// carrying reconnect #1's boundary set forward.
const second = createLogStreamProcessor({
stripHeaders: false,
withTimestamps: true,
streamHasTimestamps: true,
skipUntilTimestamp: first.getLastTimestamp(),
skipBoundaryContents: first.getBoundaryLines(),
});
// Docker redelivers A and B again; both are genuine duplicates now.
const round2 = second.push(bytes(`${tsShared} A\n${tsShared} B\n`));
expect(round2.map((l) => l.line)).toEqual([]);
});
});
describe('stable line ids (append model)', () => {
it('assigns unique, strictly increasing ids across successive chunks', () => {
const proc = createLogStreamProcessor({ stripHeaders: false });
const a = proc.push(bytes('a\nb\n'));
const b = proc.push(bytes('c\n'));
const ids = [...a, ...b].map((l) => l.id);
expect(new Set(ids).size).toBe(ids.length); // all unique
const sorted = [...ids].sort((x, y) => x - y);
expect(ids).toEqual(sorted); // monotonically increasing in arrival order
});
});

View File

@@ -0,0 +1,332 @@
import { formatLogs } from './formatLogs';
import { FormattedLine, TIMESTAMP_LENGTH } from './types';
type ProcessorOptions = {
/** Non-TTY container: demux Docker's 8-byte multiplexed-stream frame headers. */
stripHeaders?: boolean;
/** Display the RFC3339 timestamp prefix (user's "Display timestamps" toggle). */
withTimestamps?: boolean;
/**
* The raw stream always carries an RFC3339 timestamp prefix because the
* controller requests `timestamps=1` internally (so it can resume from the
* exact log timestamp on reconnect). When `withTimestamps` is false we strip
* the prefix before formatting so it is not displayed.
*/
streamHasTimestamps?: boolean;
/**
* Reconnect dedup: Docker's `since` filter is inclusive, so on reconnect it
* re-delivers the boundary line(s) at the resume timestamp. This is the resume
* point (the RFC3339 timestamp of the last line we saw). Re-delivered lines
* before it are dropped; lines exactly at it are matched against
* `skipBoundaryContents` (see below).
*/
skipUntilTimestamp?: string;
/**
* Reconnect dedup, exact-content matching. The lines we had already shown that
* carry the resume timestamp (`skipUntilTimestamp`). On reconnect Docker
* re-delivers every line at that timestamp; we drop only those whose exact
* content matches one of these (the genuine duplicates) and KEEP any line that
* merely shares the timestamp but is new (different content) — otherwise a
* second line sharing the boundary nanosecond would be lost.
*/
skipBoundaryContents?: string[];
};
// Docker multiplexed-stream frame header: 1 byte stream-type, 3 zero bytes,
// then a 4-byte big-endian payload length, followed by exactly `length` payload
// bytes. See the Docker Engine API container "logs" (non-TTY) documentation.
const FRAME_HEADER_SIZE = 8;
const FRAME_LENGTH_OFFSET = 4;
// Defensive only: a single frame payload larger than this almost certainly means
// the byte stream desynced/corrupted. This is NOT a cap on legitimate lines
// (real log lines are orders of magnitude smaller); it only prevents unbounded
// buffering if we ever lose frame alignment.
const MAX_FRAME_PAYLOAD = 100 * 1024 * 1024;
// Width of Docker's fixed RFC3339Nano timestamp prefix emitted with
// `timestamps=1`: "2006-01-02T15:04:05.000000000Z" — 30 chars; index 30 is the
// separating space (TIMESTAMP_LENGTH = 31 = 30 + space).
const TS_WIDTH = TIMESTAMP_LENGTH - 1;
const TS_PREFIX = /^\d{4}-\d{2}-\d{2}T/;
const LF = 0x0a;
const CR = 0x0d;
// `subarray()` widens the backing buffer to ArrayBufferLike under TS 5.7+; use a
// permissive byte-array alias so buffered slices assign cleanly.
type Bytes = Uint8Array<ArrayBufferLike>;
function concatBytes(a: Bytes, b: Bytes): Bytes {
if (a.length === 0) {
return b;
}
if (b.length === 0) {
return a;
}
const out = new Uint8Array(a.length + b.length);
out.set(a, 0);
out.set(b, a.length);
return out;
}
function readUint32BE(buf: Bytes, offset: number): number {
// `* 0x1000000` instead of `<< 24` to keep the high byte unsigned.
return (
buf[offset] * 0x1000000 +
(buf[offset + 1] << 16) +
(buf[offset + 2] << 8) +
buf[offset + 3]
);
}
/**
* Stateful processor for an incremental (HTTP-streamed) container log response.
*
* A streaming response arrives in arbitrary byte chunks that split frames, log
* lines and even multibyte UTF-8 characters at random points. For non-TTY
* containers the body is Docker's binary multiplexed stream: each log message is
* framed with an 8-byte header carrying the payload length. This processor
* demuxes at the BYTE level — never on `\n` and never by UTF-8-decoding header
* bytes — because the low byte of a frame's length field can itself be `0x0a`
* and header bytes can be `>= 0x80` (both would corrupt a text-level demux):
*
* - it buffers raw bytes and parses complete frames by their declared length,
* carrying any partial trailing frame forward;
* - it concatenates payload bytes across frames, splits them into complete
* lines on the `0x0a` byte, and UTF-8-decodes each complete line as a whole
* (so a multibyte char split across frames/chunks is already reassembled);
* - because headers are removed here, the completed text lines are formatted by
* `formatLogs` WITHOUT its `stripHeaders` option (otherwise it would strip 8
* more characters of real content).
*
* Each emitted line carries a stable, monotonically increasing `id`, so the
* viewer can `track by log.id` and append without re-binding existing rows.
*/
export function createLogStreamProcessor({
stripHeaders,
withTimestamps,
streamHasTimestamps,
skipUntilTimestamp,
skipBoundaryContents,
}: ProcessorOptions = {}) {
// Unparsed bytes of the multiplexed frame stream (non-TTY only).
let frameBuf: Bytes = new Uint8Array(0);
// Decoded-payload bytes not yet terminated by a newline.
let lineBuf: Bytes = new Uint8Array(0);
// Reconnect dedup: while true, drop re-delivered lines up to the resume point.
let skipping = !!skipUntilTimestamp;
// Copy of the boundary line contents still pending a duplicate match; each
// matched re-delivered line consumes (splices out) one entry.
const pendingBoundary: string[] = skipBoundaryContents
? skipBoundaryContents.slice()
: [];
// RFC3339 timestamp of the last complete line we saw (next resume point).
// Seeded from the resume point so the boundary set carried in via
// `skipBoundaryContents` is preserved: a surviving line at the resume
// timestamp is APPENDED to the already-known boundary lines (not treated as a
// fresh timestamp), so a second reconnect at the same nanosecond still knows
// every line we have shown at it and drops them all instead of re-emitting
// duplicates. The first NEWER timestamp resets the set as usual.
let lastTimestamp: string | undefined = skipUntilTimestamp;
// Exact content of the line(s) at `lastTimestamp` we have emitted so far — the
// boundary set handed to the next processor for content-exact reconnect dedup.
// Seeded from the inbound boundary set (see `lastTimestamp` above).
let boundaryLines: string[] = skipBoundaryContents
? skipBoundaryContents.slice()
: [];
const decoder = new TextDecoder();
// Extract complete newline-terminated lines from the accumulated payload
// bytes, decoding each whole line at once. A `0x0a` byte can never be part of
// a multibyte UTF-8 sequence, so a line boundary never cuts a character.
function takeCompleteLines(payload: Bytes): string[] {
lineBuf = concatBytes(lineBuf, payload);
const lines: string[] = [];
let start = 0;
for (let i = 0; i < lineBuf.length; i += 1) {
if (lineBuf[i] === LF) {
let end = i;
// drop a trailing CR for \r\n line endings
if (end > start && lineBuf[end - 1] === CR) {
end -= 1;
}
lines.push(decoder.decode(lineBuf.subarray(start, end)));
start = i + 1;
}
}
lineBuf = lineBuf.subarray(start);
return lines;
}
// Pull every complete frame payload out of frameBuf and return the decoded
// complete lines they contain. An incomplete trailing frame stays buffered.
function demuxFrames(): string[] {
const lines: string[] = [];
for (;;) {
if (frameBuf.length < FRAME_HEADER_SIZE) {
break;
}
const payloadLength = readUint32BE(frameBuf, FRAME_LENGTH_OFFSET);
if (payloadLength > MAX_FRAME_PAYLOAD) {
// Corrupt/desynced stream: drop everything buffered to avoid OOM. Surface
// it so an unexpected reset is diagnosable rather than silently swallowed.
// eslint-disable-next-line no-console
console.warn(
`logStream: frame payload length ${payloadLength} exceeds ${MAX_FRAME_PAYLOAD}; resetting desynced buffer`
);
frameBuf = new Uint8Array(0);
break;
}
if (frameBuf.length - FRAME_HEADER_SIZE < payloadLength) {
break; // wait for the rest of the payload
}
const payload = frameBuf.subarray(
FRAME_HEADER_SIZE,
FRAME_HEADER_SIZE + payloadLength
);
lines.push(...takeCompleteLines(payload));
frameBuf = frameBuf.subarray(FRAME_HEADER_SIZE + payloadLength);
}
return lines;
}
// Apply reconnect-dedup + timestamp handling, then format a batch of complete
// text lines into rendered lines.
function formatBatch(rawLines: string[]): FormattedLine[] {
let lines = rawLines;
if (streamHasTimestamps) {
if (skipping && lines.length) {
// Drop re-delivered lines up to the resume point. Lines strictly before
// it are duplicates (lexical compare is chronological for the zero-padded
// UTC format). Lines exactly at the resume timestamp are dropped ONLY
// when their exact content matches a boundary line we already showed —
// so a NEW line sharing the same nanosecond timestamp is not lost.
const boundary = skipUntilTimestamp as string;
let dropTo = 0;
while (dropTo < lines.length) {
const line = lines[dropTo];
if (!TS_PREFIX.test(line)) {
break; // cannot classify without a timestamp; stop dropping
}
const ts = line.substring(0, TS_WIDTH);
const boundaryIdx =
ts === boundary ? pendingBoundary.indexOf(line) : -1;
if (ts < boundary) {
// re-delivered line before the resume point
dropTo += 1;
} else if (boundaryIdx !== -1) {
// exact duplicate of an already-shown boundary line
pendingBoundary.splice(boundaryIdx, 1);
dropTo += 1;
} else {
// ts > boundary, or a new line that merely shares the boundary
// timestamp -> keep it and everything after.
break;
}
}
if (dropTo > 0) {
lines = lines.slice(dropTo);
}
if (lines.length) {
skipping = false;
}
}
// Track the resume timestamp and the exact content of the line(s) at it,
// walking the kept lines in order (each line still has its prefix here).
// Timestamps are monotonically non-decreasing, so the trailing run sharing
// the newest timestamp is the boundary set the next processor dedups on.
for (let i = 0; i < lines.length; i += 1) {
const line = lines[i];
if (TS_PREFIX.test(line)) {
const ts = line.substring(0, TS_WIDTH);
if (ts !== lastTimestamp) {
lastTimestamp = ts;
boundaryLines = [line];
} else {
boundaryLines.push(line);
}
}
}
// When the user has timestamps hidden, strip the prefix we requested
// internally so it is not displayed.
if (!withTimestamps) {
lines = lines.map((line) =>
TS_PREFIX.test(line) ? line.substring(TIMESTAMP_LENGTH) : line
);
}
}
if (!lines.length) {
return [];
}
// Rejoin into a single batch so ANSI colour state carries across lines
// within the batch (matches the previous whole-batch formatLogs call).
// Headers are already stripped at the byte layer, so do NOT pass
// stripHeaders here.
const batch = `${lines.join('\n')}\n`;
return formatLogs(batch, { withTimestamps });
}
return {
/**
* Append a raw byte chunk and return the newly completed, formatted lines.
* Returns an empty array when the chunk did not complete any line.
*/
push(value: Bytes): FormattedLine[] {
if (!value || value.length === 0) {
return [];
}
let rawLines: string[];
if (stripHeaders) {
frameBuf = concatBytes(frameBuf, value);
rawLines = demuxFrames();
} else {
rawLines = takeCompleteLines(value);
}
return formatBatch(rawLines);
},
/**
* Flush any buffered partial line (e.g. when the stream ends without a
* trailing newline). A partial/truncated final frame is dropped so we never
* emit garbage.
*/
flush(): FormattedLine[] {
// Drop any incomplete trailing frame.
frameBuf = new Uint8Array(0);
if (lineBuf.length === 0) {
return [];
}
let bytes: Bytes = lineBuf;
// drop a trailing CR for \r\n line endings
if (bytes.length > 0 && bytes[bytes.length - 1] === CR) {
bytes = bytes.subarray(0, bytes.length - 1);
}
lineBuf = new Uint8Array(0);
return formatBatch([decoder.decode(bytes)]);
},
/** RFC3339 timestamp of the last complete line seen, for reconnect resume. */
getLastTimestamp(): string | undefined {
return lastTimestamp;
},
/**
* Exact content of the line(s) carrying the last-seen timestamp. Pass to the
* next processor as `skipBoundaryContents` so reconnect dedup drops only the
* genuine duplicates Docker re-delivers, never a new line that happens to
* share the boundary nanosecond.
*/
getBoundaryLines(): string[] {
return boundaryLines.slice();
},
};
}
export type LogStreamProcessor = ReturnType<typeof createLogStreamProcessor>;

View File

@@ -0,0 +1,31 @@
import { rfc3339ToUnixNanoSince } from './sinceTimestamp';
// Integer Unix seconds of the shared second-precision prefix, computed the same
// way the parser does (Date.parse of the "...:05Z" prefix, in UTC).
const unix = Math.floor(Date.parse('2024-01-01T00:00:00Z') / 1000);
describe('rfc3339ToUnixNanoSince', () => {
it('keeps a standard fixed-width nanosecond fraction', () => {
expect(rfc3339ToUnixNanoSince('2024-01-01T00:00:00.000000005Z')).toBe(
`${unix}.000000005`
);
});
it('emits a zero fraction when the timestamp has no fractional part', () => {
expect(rfc3339ToUnixNanoSince('2024-01-01T00:00:00Z')).toBe(
`${unix}.000000000`
);
});
it('right-pads a sub-nanosecond-width fraction to 9 digits', () => {
expect(rfc3339ToUnixNanoSince('2024-01-01T00:00:00.5Z')).toBe(
`${unix}.500000000`
);
});
it('truncates a fraction longer than 9 digits to nanosecond precision', () => {
expect(rfc3339ToUnixNanoSince('2024-01-01T00:00:00.1234567890123Z')).toBe(
`${unix}.123456789`
);
});
});

View File

@@ -0,0 +1,26 @@
// Build Docker's `since` param as a Unix timestamp with a nanosecond fraction
// ("<seconds>.<nanos>") from an RFC3339Nano timestamp. Both the initial connect
// and reconnect use this one form so we never mix unix-seconds and RFC3339 —
// some proxies / pinned API versions accept them inconsistently. The fraction
// needs string precision (a JS number cannot hold nanoseconds), hence `since`
// is returned as a string.
//
// Docker emits a fixed-width RFC3339Nano prefix: "2006-01-02T15:04:05.000000000Z".
// We parse the second-precision prefix in UTC for the integer seconds, then take
// the fractional digits verbatim: short fractions (e.g. ".5") are right-padded
// to 9 digits, longer ones are truncated to 9 (nanosecond precision).
export function rfc3339ToUnixNanoSince(rfc3339: string): string {
const seconds = Math.floor(
Date.parse(`${rfc3339.substring(0, 19)}Z`) / 1000
);
const dot = rfc3339.indexOf('.');
const nanos =
dot >= 0
? rfc3339
.substring(dot + 1)
.replace(/[^0-9]/g, '')
.padEnd(9, '0')
.substring(0, 9)
: '000000000';
return `${seconds}.${nanos}`;
}

View File

@@ -36,11 +36,21 @@ export type Span = {
fontWeight?: FontWeight;
};
export type FormattedLine = {
// A rendered log line without its stable id. Internal formatters build these,
// and `formatLogs` assigns the `id` once, centrally, on the way out.
export type FormattedLineContent = {
spans: Span[];
line: string;
};
// A rendered log line with a stable, monotonically increasing id. The id lets
// the AngularJS template use `track by log.id`, so already-rendered rows are
// never re-bound (their text nodes are not rewritten) — which is what keeps a
// live text selection from collapsing while new lines stream in.
export type FormattedLine = FormattedLineContent & {
id: number;
};
export const TIMESTAMP_LENGTH = 31; // 30 for timestamp + 1 for trailing space
export const Colors = {

View File

@@ -1,84 +1,303 @@
import moment from 'moment';
import { streamContainerLogs } from '@/react/docker/containers/containers.service';
import { createLogStreamProcessor, rfc3339ToUnixNanoSince } from '@/docker/helpers/logHelper';
// Hard cap on how many lines we keep in the DOM/buffer during a long live
// stream. We trim from the head (oldest lines) so a selection anchored in the
// tail is never disturbed. The user-facing `lineCount` (tail) is a *request*
// for how many historical lines to fetch, not this in-memory limit.
const MAX_LOG_LINES = 5000;
// Delay before reconnecting after the stream ends or errors (container stopped,
// network blip, proxy hiccup). Avoids hammering a stopped container.
const RECONNECT_DELAY_MS = 3000;
// Default tail (historical-backfill) request used on the initial connect and
// whenever the user clears/invalidates the "Lines" field — without it a cleared
// number input becomes null, `tail` is dropped, and Docker re-downloads the
// entire history of a chatty container on every parameter change.
const DEFAULT_LINE_COUNT = 100;
angular.module('portainer.docker').controller('ContainerLogsController', [
'$scope',
'$transition$',
'$interval',
'$timeout',
'ContainerService',
'Notifications',
'HttpRequestHelper',
'endpoint',
function ($scope, $transition$, $interval, ContainerService, Notifications, HttpRequestHelper, endpoint) {
function ($scope, $transition$, $timeout, ContainerService, Notifications, HttpRequestHelper, endpoint) {
$scope.state = {
refreshRate: 3,
lineCount: 100,
// No refreshRate here: container logs are delivered over a live stream now,
// not a 3s poll, so there is nothing to refresh on an interval.
lineCount: DEFAULT_LINE_COUNT,
sinceTimestamp: '',
displayTimestamps: false,
};
// Live-stream bookkeeping (not on $scope.state to avoid digest churn).
var stream = {
abortController: null,
reconnectTimer: null,
// RFC3339 timestamp of the last log line we received. Used as `since` on
// reconnect so we resume from the exact log position (not the client
// wall-clock) and neither duplicate nor lose lines across a reconnect.
lastTimestamp: '',
// Exact content of the line(s) at `lastTimestamp` we already rendered.
// Handed to the reconnecting processor so it drops only genuine duplicates
// Docker re-delivers, never a new line that shares the boundary nanosecond.
boundaryLines: [],
// The processor of the in-flight stream, so an intentional pause can flush
// its buffered partial last line for display.
processor: null,
// How many cosmetic lines an intentional-pause flush appended; removed on
// resume before Docker re-delivers them in full (see pauseStream/startStream).
pausedFlushCount: 0,
// false while the stream is intentionally paused (Live toggle off) or the
// view is being destroyed — suppresses auto-reconnect.
active: false,
skipHeaders: false,
// Whether we already surfaced the current reconnect-loop error, so the 3s
// reconnect loop does not spam a notification on every attempt.
errorNotified: false,
};
// Live toggle (the "Auto-refresh logs" switch in the viewer).
$scope.changeLogCollection = function (logCollectionStatus) {
if (!logCollectionStatus) {
stopRepeater();
pauseStream(true);
} else {
setUpdateRepeater(!$scope.container.Config.Tty);
// Resume without wiping the buffer (pause promises to keep it) and
// continue from the last timestamp we saw.
startStream(false);
}
};
$scope.$on('$destroy', function () {
stopRepeater();
stopStream();
});
function stopRepeater() {
var repeater = $scope.repeater;
if (angular.isDefined(repeater)) {
$interval.cancel(repeater);
function clearReconnectTimer() {
if (stream.reconnectTimer) {
$timeout.cancel(stream.reconnectTimer);
stream.reconnectTimer = null;
}
}
function setUpdateRepeater(skipHeaders) {
var refreshRate = $scope.state.refreshRate;
$scope.repeater = $interval(function () {
ContainerService.logs(
endpoint.Id,
$transition$.params().id,
1,
1,
$scope.state.displayTimestamps ? 1 : 0,
moment($scope.state.sinceTimestamp).unix(),
$scope.state.lineCount,
skipHeaders
)
.then(function success(data) {
$scope.logs = data;
})
.catch(function error(err) {
stopRepeater();
Notifications.error('Failure', err, 'Unable to retrieve container logs');
});
}, refreshRate * 1000);
function abortInFlight() {
if (stream.abortController) {
stream.abortController.abort();
stream.abortController = null;
}
}
function startLogPolling(skipHeaders) {
ContainerService.logs(
// Pause: stop streaming but keep the current buffer on screen.
//
// `flushPartial` is set only on an intentional pause (Live -> off): with no
// reconnect to re-deliver it, the buffered unfinished last line would stay
// hidden until resume, so we flush it for display. We do NOT advance the
// resume point past it (it is incomplete); on resume Docker re-delivers that
// line in full from `since`, and startStream() first strips these cosmetic
// flushed lines so the buffer never shows a stale partial twin. On reconnect
// (flushPartial omitted) the partial is correctly discarded, unchanged.
function pauseStream(flushPartial) {
stream.active = false;
clearReconnectTimer();
abortInFlight();
if (flushPartial && stream.processor) {
const tail = stream.processor.flush();
if (tail.length) {
appendLines(tail);
stream.pausedFlushCount += tail.length;
}
}
}
// Remove the last `count` lines from the buffer (the cosmetic lines an
// intentional-pause flush appended), about to be re-delivered in full.
function removeTailLines(count) {
if (count <= 0) {
return;
}
$scope.$applyAsync(function () {
const start = Math.max(0, $scope.logs.length - count);
$scope.logs.splice(start, count);
});
}
// Full teardown on view destroy: no flush (the view is going away).
function stopStream() {
pauseStream(false);
}
function appendLines(lines) {
if (!lines.length) {
return;
}
$scope.$applyAsync(function () {
Array.prototype.push.apply($scope.logs, lines);
const overflow = $scope.logs.length - MAX_LOG_LINES;
if (overflow > 0) {
// trim oldest lines; tail (and any selection in it) is untouched
$scope.logs.splice(0, overflow);
}
});
}
// After feeding the processor, advance the reconnect resume point to the
// timestamp of the last line it parsed.
function updateResumePoint(processor) {
const ts = processor.getLastTimestamp();
if (ts) {
stream.lastTimestamp = ts;
// Remember the exact boundary line(s) at this timestamp for content-exact
// reconnect dedup (so a new line sharing the nanosecond is not dropped).
stream.boundaryLines = processor.getBoundaryLines();
}
}
// `since` is built from the last log timestamp via rfc3339ToUnixNanoSince
// (imported from logHelper) as a single "<seconds>.<nanos>" form — see that
// helper for why we never mix unix-seconds and RFC3339.
// Connect (or reconnect) the live stream.
// `resetBuffer` clears the on-screen buffer (used on first connect / param
// changes); reconnects after a drop keep the buffer and resume via `since`.
function startStream(resetBuffer) {
pauseStream(false);
stream.active = true;
if (resetBuffer) {
$scope.logs.length = 0;
stream.lastTimestamp = '';
stream.boundaryLines = [];
stream.pausedFlushCount = 0;
stream.errorNotified = false;
} else if (stream.pausedFlushCount) {
// Resuming after an intentional pause: drop the cosmetic partial line(s)
// we flushed for display; Docker re-delivers them in full from `since`.
removeTailLines(stream.pausedFlushCount);
stream.pausedFlushCount = 0;
}
const resuming = !!stream.lastTimestamp;
const processor = createLogStreamProcessor({
stripHeaders: stream.skipHeaders,
withTimestamps: $scope.state.displayTimestamps,
// We always request timestamps from Docker (see params below) so we can
// resume from the exact log timestamp on reconnect; the processor parses
// them and strips the prefix when the user has timestamps hidden.
streamHasTimestamps: true,
// On reconnect, drop lines Docker re-delivers at/before the resume point
// (its `since` filter is inclusive); content-exact match on the boundary
// line(s) so a new line sharing the boundary timestamp is not lost.
skipUntilTimestamp: resuming ? stream.lastTimestamp : undefined,
skipBoundaryContents: resuming ? stream.boundaryLines : undefined,
});
stream.processor = processor;
const abortController = new AbortController();
stream.abortController = abortController;
// On a reconnect resume from the exact timestamp of the last line we saw;
// on the initial connect honour the user's "Fetch since" selection. Both
// are sent as a single "<unix>.<nanos>" form (see rfc3339ToUnixNanoSince).
let since;
if (stream.lastTimestamp) {
since = rfc3339ToUnixNanoSince(stream.lastTimestamp);
} else if ($scope.state.sinceTimestamp) {
since = `${moment($scope.state.sinceTimestamp).unix()}.000000000`;
} else {
since = undefined; // omitted by the service -> Docker tails from now
}
// Substitute the default when the "Lines" field is cleared/invalid (a
// cleared number input is null), otherwise tail would be dropped and Docker
// would re-download the whole history.
const tailLineCount = $scope.state.lineCount > 0 ? $scope.state.lineCount : DEFAULT_LINE_COUNT;
const params = {
stdout: true,
stderr: true,
follow: true,
// Always request timestamps so we can resume precisely on reconnect; the
// processor hides them from display when the user toggle is off.
timestamps: true,
// tail is a historical-backfill request: apply it only on the initial
// connect. On a reconnect we resume from `since`, so re-applying tail
// would re-deliver the tail window. (0 is dropped by the service.)
tail: resuming ? 0 : tailLineCount,
since,
};
streamContainerLogs(
endpoint.Id,
$transition$.params().id,
1,
1,
$scope.state.displayTimestamps ? 1 : 0,
moment($scope.state.sinceTimestamp).unix(),
$scope.state.lineCount,
skipHeaders
params,
function onChunk(bytes) {
const lines = processor.push(bytes);
appendLines(lines);
updateResumePoint(processor);
if (lines.length) {
// got data again -> allow a fresh error notification next failure
stream.errorNotified = false;
}
},
abortController.signal
)
.then(function success(data) {
$scope.logs = data;
setUpdateRepeater(skipHeaders);
.then(function onEnd() {
// The stream always reconnects from here, so discard the processor's
// unfinished trailing remainder instead of flushing it: emitting a
// truncated fragment AND advancing the resume point to its timestamp
// would make Docker re-send the full line under the same `since` and
// dedup drop it — losing the line and leaving the fragment on screen.
// Completed lines already advanced the resume point in onChunk; `since`
// redelivers the full boundary line on reconnect.
scheduleReconnect();
})
.catch(function error(err) {
stopRepeater();
Notifications.error('Failure', err, 'Unable to retrieve container logs');
.catch(function onError(err) {
if (abortController.signal.aborted) {
return; // intentional abort (pause/destroy/param change)
}
// Drop the unfinished remainder on reconnect (see onEnd): do not flush
// a truncated line nor move the resume point off an incomplete line.
// Notify once per reconnect loop, not on every 3s retry.
if (!stream.errorNotified) {
stream.errorNotified = true;
Notifications.error('Failure', err, 'Unable to stream container logs');
}
scheduleReconnect();
});
}
function scheduleReconnect() {
if (!stream.active) {
return;
}
clearReconnectTimer();
stream.reconnectTimer = $timeout(function () {
if (stream.active) {
startStream(false);
}
}, RECONNECT_DELAY_MS);
}
// Restart the stream from scratch when a fetch parameter changes (tail
// count, since, timestamps). Each replaces the buffer with a fresh request.
function restartOnParamChange() {
if (stream.active || stream.abortController) {
startStream(true);
}
}
$scope.$watch('state.lineCount', onParamWatch);
$scope.$watch('state.sinceTimestamp', onParamWatch);
$scope.$watch('state.displayTimestamps', onParamWatch);
function onParamWatch(newVal, oldVal) {
if (newVal !== oldVal) {
restartOnParamChange();
}
}
function initView() {
HttpRequestHelper.setPortainerAgentTargetHeader($transition$.params().nodeName);
ContainerService.container(endpoint.Id, $transition$.params().id)
@@ -90,7 +309,11 @@ angular.module('portainer.docker').controller('ContainerLogsController', [
$scope.logsEnabled = logsEnabled;
if (logsEnabled) {
startLogPolling(!container.Config.Tty);
// initialise an (empty but defined) buffer so the viewer renders
// immediately, then start the live stream
$scope.logs = [];
stream.skipHeaders = !container.Config.Tty;
startStream(true);
}
})
.catch(function error(err) {

View File

@@ -38,7 +38,17 @@ angular.module('portainer.docker').controller('ServiceLogsController', [
$scope.repeater = $interval(function () {
ServiceService.logs($transition$.params().id, 1, 1, $scope.state.displayTimestamps ? 1 : 0, moment($scope.state.sinceTimestamp).unix(), $scope.state.lineCount)
.then(function success(data) {
$scope.logs = data;
// NOTE: service logs still poll and replace the whole array. Because
// formatLogs assigns fresh line ids per poll, `track by log.id` in
// the viewer re-renders every row each poll (a live text selection
// can collapse). The append-only live stream that fixes this exists
// only for container logs (issue #2); converting service/task logs
// to a live stream is out of scope here. Assign positionally-stable
// ids (0..N) so `track by log.id` reuses rows across polls (like the
// old `track by $index`) and a live text selection survives.
$scope.logs = data.map(function (line, i) {
return { ...line, id: i };
});
})
.catch(function error(err) {
stopRepeater();
@@ -50,7 +60,11 @@ angular.module('portainer.docker').controller('ServiceLogsController', [
function startLogPolling() {
ServiceService.logs($transition$.params().id, 1, 1, $scope.state.displayTimestamps ? 1 : 0, moment($scope.state.sinceTimestamp).unix(), $scope.state.lineCount)
.then(function success(data) {
$scope.logs = data;
// Positionally-stable ids so `track by log.id` reuses rows (see the
// poll handler above).
$scope.logs = data.map(function (line, i) {
return { ...line, id: i };
});
setUpdateRepeater();
})
.catch(function error(err) {

View File

@@ -39,7 +39,17 @@ angular.module('portainer.docker').controller('TaskLogsController', [
$scope.repeater = $interval(function () {
TaskService.logs($transition$.params().id, 1, 1, $scope.state.displayTimestamps ? 1 : 0, moment($scope.state.sinceTimestamp).unix(), $scope.state.lineCount)
.then(function success(data) {
$scope.logs = data;
// NOTE: task logs still poll and replace the whole array. Because
// formatLogs assigns fresh line ids per poll, `track by log.id` in
// the viewer re-renders every row each poll (a live text selection
// can collapse). The append-only live stream that fixes this exists
// only for container logs (issue #2); converting service/task logs
// to a live stream is out of scope here. Assign positionally-stable
// ids (0..N) so `track by log.id` reuses rows across polls (like the
// old `track by $index`) and a live text selection survives.
$scope.logs = data.map(function (line, i) {
return { ...line, id: i };
});
})
.catch(function error(err) {
stopRepeater();
@@ -51,7 +61,11 @@ angular.module('portainer.docker').controller('TaskLogsController', [
function startLogPolling() {
TaskService.logs($transition$.params().id, 1, 1, $scope.state.displayTimestamps ? 1 : 0, moment($scope.state.sinceTimestamp).unix(), $scope.state.lineCount)
.then(function success(data) {
$scope.logs = data;
// Positionally-stable ids so `track by log.id` reuses rows (see the
// poll handler above).
$scope.logs = data.map(function (line, i) {
return { ...line, id: i };
});
setUpdateRepeater();
})
.catch(function error(err) {

View File

@@ -1,8 +1,17 @@
import _ from 'lodash';
import { InternalAxiosRequestConfig } from 'axios';
import { EnvironmentId } from '@/react/portainer/environments/types';
import PortainerError from '@/portainer/error';
import axios, { parseAxiosError } from '@/portainer/services/axios/axios';
import axios, {
agentTargetHeader,
parseAxiosError,
} from '@/portainer/services/axios/axios';
import {
portainerAgentManagerOperation,
portainerAgentTargetHeader,
} from '@/portainer/services/http-request.helper';
import { dockerMaxAPIVersionInterceptor } from '@/portainer/services/dockerMaxApiVersionInterceptor';
import { withAgentTargetHeader } from '../proxy/queries/utils';
import { buildDockerProxyUrl } from '../proxy/queries/buildDockerProxyUrl';
@@ -190,3 +199,124 @@ export async function getContainerLogs(
throw parseAxiosError(e, 'Unable to get container logs');
}
}
export type StreamLogsParams = ContainerLogsParams & {
/** follow=1 — keep the connection open and stream new lines as they arrive */
follow?: boolean;
};
// Memoize the Docker max-API-version pinning per proxy path for the session. The
// pinning interceptor only rewrites the URL based on the environment's Docker API
// version (cached and stable for the session), so resolving it once avoids an
// extra `/version` round-trip on every reconnect. Caching the Promise also
// dedupes concurrent reconnect attempts.
const pinnedLogPathCache = new Map<string, Promise<string>>();
function resolvePinnedLogPath(path: string): Promise<string> {
let cached = pinnedLogPathCache.get(path);
if (!cached) {
cached = dockerMaxAPIVersionInterceptor({
url: path,
} as InternalAxiosRequestConfig).then((config) => config.url ?? path);
pinnedLogPathCache.set(path, cached);
}
return cached;
}
/**
* Live-tail a container's logs over HTTP.
*
* Unlike `getContainerLogs` (axios, buffers the whole body), this uses `fetch`
* so we can read the response body as a `ReadableStream` and surface the raw
* byte chunks to the caller as they arrive (`follow=1`). Bytes are handed over
* undecoded so the caller can demux Docker's binary multiplexed frames at the
* byte level (UTF-8-decoding the whole stream would corrupt frame headers). The
* backend already
* streams `follow=1` transparently through the Docker proxy — including for
* Agent/Edge environments — so no backend change is needed.
*
* Auth: the API uses an httpOnly JWT cookie (SameSite=Strict), so a same-origin
* `fetch` with `credentials: 'include'` carries it automatically — no
* `Authorization` header. CSRF middleware only guards mutating methods; logs is
* a GET, so it is unaffected. The agent-target / manager-operation headers
* (normally added by the axios `agentInterceptor`) are replicated here so the
* stream also resolves the correct node on Agent/Edge environments.
*
* The caller drives lifetime via `signal`: aborting it (unmount, container
* switch, pausing Live) cancels the in-flight fetch and ends the read loop.
*/
export async function streamContainerLogs(
environmentId: EnvironmentId,
containerId: ContainerId,
params: StreamLogsParams,
onChunk: (bytes: Uint8Array) => void,
signal: AbortSignal
): Promise<void> {
const path = buildDockerProxyUrl(
environmentId,
'containers',
containerId,
'logs'
);
// The fetch path bypasses the axios request interceptors, so apply the same
// Docker max-API-version pinning axios applies to getContainerLogs. Resolve it
// once per session (memoized below) instead of hitting `/version` on every 3s
// reconnect — the pinning only depends on the environment's Docker API version,
// which is stable for the session.
const effectivePath = await resolvePinnedLogPath(path);
const query = new URLSearchParams();
// _.pickBy drops undefined/0/'' the same way the axios path does
Object.entries(_.pickBy(params)).forEach(([key, value]) => {
query.set(key, String(typeof value === 'boolean' ? Number(value) : value));
});
// axios baseURL is 'api'; mirror it here since fetch doesn't share it.
const url = `api${effectivePath}?${query.toString()}`;
const headers: Record<string, string> = {};
const target = portainerAgentTargetHeader();
if (target) {
headers[agentTargetHeader] = target;
}
if (portainerAgentManagerOperation()) {
headers['X-PortainerAgent-ManagerOperation'] = '1';
}
const response = await fetch(url, {
signal,
credentials: 'include',
headers,
});
if (!response.ok) {
throw new PortainerError(
`Unable to stream container logs (HTTP ${response.status})`
);
}
if (!response.body) {
return;
}
const reader = response.body.getReader();
try {
for (;;) {
const { value, done } = await reader.read();
if (done) {
break;
}
if (value) {
// Hand over the raw bytes undecoded; the caller demuxes Docker's binary
// frames and decodes complete lines itself. UTF-8 chars split across
// chunks are reassembled there before decoding, so no decoder flush is
// needed here. The caller flushes its own trailing partial line on end.
onChunk(value);
}
}
} finally {
reader.releaseLock();
}
}

View File

@@ -54,6 +54,9 @@ export type ContainerLogsParams = {
stdout?: boolean;
stderr?: boolean;
timestamps?: boolean;
since?: number;
// Unix timestamp. The live-stream path sends a "<seconds>.<nanos>" string so a
// reconnect can resume at exact nanosecond precision (which a number cannot
// hold); the buffered axios path passes a plain number.
since?: number | string;
tail?: number;
};