Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8ad42a1a45 | ||
|
|
9768d7bb99 | ||
|
|
da6933c218 | ||
|
|
d520aec159 | ||
|
|
f4f296fc05 | ||
|
|
960d43e70b | ||
|
|
c3cdb8007e | ||
|
|
8ca0608b21 |
@@ -10,7 +10,7 @@
|
||||
label-class="'col-sm-2'"
|
||||
checked="$ctrl.state.logCollection"
|
||||
label="'Auto-refresh logs'"
|
||||
tooltip="'Disabling this option allows you to pause the log collection process and the auto-scrolling.'"
|
||||
tooltip="'Disabling this option pauses log collection. Auto-scroll follows the bottom of the log and pauses on its own when you scroll up.'"
|
||||
on-change="($ctrl.handleLogsCollectionChange)"
|
||||
></por-switch-field>
|
||||
</div>
|
||||
@@ -85,10 +85,10 @@
|
||||
|
||||
<div class="row" style="height: 54%">
|
||||
<div class="col-sm-12" style="height: 100%">
|
||||
<pre ng-class="{ wrap_lines: $ctrl.state.wrapLines }" class="log_viewer" scroll-glue="$ctrl.state.autoScroll" force-glue>
|
||||
<div ng-repeat="log in $ctrl.state.filteredLogs = ($ctrl.data | filter:{ 'line': $ctrl.state.search }) track by $index" class="line" ng-if="log.line"><p class="inner_line" ng-click="$ctrl.selectLine(log.line)" ng-class="{ 'line_selected': $ctrl.state.selectedLines.indexOf(log.line) > -1 }"><span ng-repeat="span in log.spans track by $index" ng-style="{ 'color': span.fgColor, 'background-color': span.bgColor, 'font-weight': span.fontWeight }">{{ span.text }}</span></p></div>
|
||||
<div ng-if="!$ctrl.state.filteredLogs.length" class="line"><p class="inner_line">No log line matching the '{{ $ctrl.state.search }}' filter</p></div>
|
||||
<div ng-if="$ctrl.state.filteredLogs.length === 1 && !$ctrl.state.filteredLogs[0].line" class="line"><p class="inner_line">No logs available</p></div>
|
||||
<pre ng-class="{ wrap_lines: $ctrl.state.wrapLines }" class="log_viewer" scroll-glue="$ctrl.state.autoScroll">
|
||||
<div ng-repeat="log in $ctrl.state.filteredLogs track by log.id" class="line" ng-if="log.line"><p class="inner_line" ng-click="$ctrl.selectLine(log.line)" ng-class="{ 'line_selected': $ctrl.state.selectedLines.indexOf(log.line) > -1 }"><span ng-repeat="span in log.spans track by $index" ng-style="{ 'color': span.fgColor, 'background-color': span.bgColor, 'font-weight': span.fontWeight }">{{ span.text }}</span></p></div>
|
||||
<div ng-if="!$ctrl.state.filteredLogs.length && $ctrl.state.search" class="line"><p class="inner_line">No log line matching the '{{ $ctrl.state.search }}' filter</p></div>
|
||||
<div ng-if="!$ctrl.state.filteredLogs.length && !$ctrl.state.search" class="line"><p class="inner_line">No logs available</p></div>
|
||||
</pre>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
@@ -27,11 +27,34 @@ angular.module('portainer.docker').controller('LogViewerController', [
|
||||
this.handleLogsCollectionChange = handleLogsCollectionChange.bind(this);
|
||||
this.handleLogsWrapLinesChange = handleLogsWrapLinesChange.bind(this);
|
||||
this.handleDisplayTimestampsChange = handleDisplayTimestampsChange.bind(this);
|
||||
this.applyFilter = applyFilter.bind(this);
|
||||
|
||||
this.$onInit = function () {
|
||||
this.applyFilter();
|
||||
// Compute the filtered list in the controller (not in the template) so we
|
||||
// do not rebuild `filteredLogs` on every digest. `$watchCollection` only
|
||||
// fires when lines are actually appended; combined with `track by log.id`
|
||||
// in the template, already-rendered rows are never re-bound — so a live
|
||||
// text selection survives incoming log lines.
|
||||
$scope.$watchCollection(() => this.data, this.applyFilter);
|
||||
$scope.$watch(() => this.state.search, this.applyFilter);
|
||||
};
|
||||
|
||||
function applyFilter() {
|
||||
const data = this.data || [];
|
||||
const search = (this.state.search || '').toLowerCase();
|
||||
this.state.filteredLogs = search
|
||||
? data.filter((log) => log.line && log.line.toLowerCase().indexOf(search) > -1)
|
||||
: data;
|
||||
}
|
||||
|
||||
function handleLogsCollectionChange(enabled) {
|
||||
$scope.$evalAsync(() => {
|
||||
// Decouple Live (log collection) from auto-scroll: pausing the stream no
|
||||
// longer also forces auto-scroll off, and auto-scroll is driven by
|
||||
// scroll-glue (it disengages when the user scrolls up, re-engages at the
|
||||
// bottom) so reading/selecting is never yanked around.
|
||||
this.state.logCollection = enabled;
|
||||
this.state.autoScroll = enabled;
|
||||
this.logCollectionChange(enabled);
|
||||
});
|
||||
}
|
||||
|
||||
26
app/docker/helpers/logHelper/formatJSONLogs.test.ts
Normal file
26
app/docker/helpers/logHelper/formatJSONLogs.test.ts
Normal file
@@ -0,0 +1,26 @@
|
||||
import { formatJSONLine } from './formatJSONLogs';
|
||||
|
||||
describe('formatJSONLine 0=d 1=e guard', () => {
|
||||
it('renders a bare JSON string as plain text (not index=char pairs)', () => {
|
||||
const raw = '"hello"';
|
||||
const lines = formatJSONLine(raw);
|
||||
expect(lines).toHaveLength(1);
|
||||
// faithfully passes the raw line through instead of iterating its chars
|
||||
expect(lines[0].line).toBe(raw);
|
||||
// would previously have produced "0=h 1=e 2=l ..." via Object.keys on a string
|
||||
expect(lines[0].line).not.toContain('0=h');
|
||||
});
|
||||
|
||||
it('renders a bare JSON array as plain text', () => {
|
||||
const raw = '[1,2,3]';
|
||||
const lines = formatJSONLine(raw);
|
||||
expect(lines).toHaveLength(1);
|
||||
expect(lines[0].line).toBe(raw);
|
||||
});
|
||||
|
||||
it('still parses a real JSON-object log line', () => {
|
||||
const lines = formatJSONLine('{"level":"info","message":"hi"}');
|
||||
expect(lines[0].line).toContain('hi');
|
||||
expect(lines[0].line).not.toContain('0=');
|
||||
});
|
||||
});
|
||||
@@ -1,6 +1,11 @@
|
||||
import { without } from 'lodash';
|
||||
|
||||
import { FormattedLine, Span, JSONLogs, TIMESTAMP_LENGTH } from './types';
|
||||
import {
|
||||
FormattedLineContent,
|
||||
Span,
|
||||
JSONLogs,
|
||||
TIMESTAMP_LENGTH,
|
||||
} from './types';
|
||||
import {
|
||||
formatCaller,
|
||||
formatKeyValuePair,
|
||||
@@ -17,14 +22,23 @@ function removeKnownKeys(keys: string[]) {
|
||||
export function formatJSONLine(
|
||||
rawText: string,
|
||||
withTimestamps?: boolean
|
||||
): FormattedLine[] {
|
||||
): FormattedLineContent[] {
|
||||
const spans: Span[] = [];
|
||||
const lines: FormattedLine[] = [];
|
||||
const lines: FormattedLineContent[] = [];
|
||||
let line = '';
|
||||
|
||||
const text = withTimestamps ? rawText.substring(TIMESTAMP_LENGTH) : rawText;
|
||||
|
||||
const json: JSONLogs = JSON.parse(text);
|
||||
const parsed: unknown = JSON.parse(text);
|
||||
// Only treat the line as a structured JSON log when it actually parses to a
|
||||
// plain object. A line serialized as a bare JSON string (e.g. `"hello"`)
|
||||
// parses to a string, whose `Object.keys` is `['0','1',...]` — which used to
|
||||
// render as `0=h 1=e ...`. Fall back to the plain-text path in that case.
|
||||
if (typeof parsed !== 'object' || parsed === null || Array.isArray(parsed)) {
|
||||
const plain = rawText;
|
||||
return [{ line: plain, spans: [{ text: plain }] }];
|
||||
}
|
||||
const json = parsed as JSONLogs;
|
||||
const { time, level, caller, message, stack_trace: stackTrace } = json;
|
||||
const keys = removeKnownKeys(Object.keys(json));
|
||||
|
||||
|
||||
29
app/docker/helpers/logHelper/formatLogs.test.ts
Normal file
29
app/docker/helpers/logHelper/formatLogs.test.ts
Normal file
@@ -0,0 +1,29 @@
|
||||
import { formatLogs } from './formatLogs';
|
||||
import { resetLineIdSequence } from './lineId';
|
||||
|
||||
beforeEach(() => {
|
||||
resetLineIdSequence();
|
||||
});
|
||||
|
||||
describe('formatLogs stable ids', () => {
|
||||
it('gives every line a unique id within a call', () => {
|
||||
const lines = formatLogs('one\ntwo\nthree\n');
|
||||
const ids = lines.map((l) => l.id);
|
||||
expect(new Set(ids).size).toBe(ids.length);
|
||||
});
|
||||
|
||||
it('continues the id sequence across calls (so appended chunks never collide)', () => {
|
||||
const first = formatLogs('a\nb\n');
|
||||
const second = formatLogs('c\nd\n');
|
||||
const allIds = [...first, ...second].map((l) => l.id);
|
||||
expect(new Set(allIds).size).toBe(allIds.length);
|
||||
// every id in the second batch is greater than every id in the first
|
||||
const maxFirst = Math.max(...first.map((l) => l.id));
|
||||
expect(Math.min(...second.map((l) => l.id))).toBeGreaterThan(maxFirst);
|
||||
});
|
||||
|
||||
it('preserves the parsed text content', () => {
|
||||
const lines = formatLogs('hello\nworld\n');
|
||||
expect(lines.map((l) => l.line)).toEqual(['hello', 'world']);
|
||||
});
|
||||
});
|
||||
@@ -9,7 +9,14 @@ import {
|
||||
} from './colors';
|
||||
import { formatJSONLine } from './formatJSONLogs';
|
||||
import { formatZerologLogs, ZerologRegex } from './formatZerologLogs';
|
||||
import { Token, Span, TIMESTAMP_LENGTH, FormattedLine } from './types';
|
||||
import { nextLineId } from './lineId';
|
||||
import {
|
||||
Token,
|
||||
Span,
|
||||
TIMESTAMP_LENGTH,
|
||||
FormattedLine,
|
||||
FormattedLineContent,
|
||||
} from './types';
|
||||
|
||||
type FormatOptions = {
|
||||
stripHeaders?: boolean;
|
||||
@@ -43,7 +50,7 @@ export function formatLogs(
|
||||
}
|
||||
|
||||
const tokens: Token[][] = tokenize(logs);
|
||||
const formattedLogs: FormattedLine[] = [];
|
||||
const formattedLogs: FormattedLineContent[] = [];
|
||||
|
||||
let fgColor: string | undefined;
|
||||
let bgColor: string | undefined;
|
||||
@@ -118,9 +125,20 @@ export function formatLogs(
|
||||
formattedLogs.push({ line, spans });
|
||||
}
|
||||
|
||||
return formattedLogs;
|
||||
// Assign the stable id centrally, once, here — so every line (plain, JSON,
|
||||
// zerolog, stack-trace) gets a monotonic id regardless of which formatter
|
||||
// produced it. Enables `track by log.id` in the viewer template.
|
||||
return formattedLogs.map(
|
||||
(content): FormattedLine => ({ id: nextLineId(), ...content })
|
||||
);
|
||||
}
|
||||
|
||||
// Strips Docker's 8-byte multiplexed-stream headers from a non-TTY log buffer:
|
||||
// drops the leading header, then every header that follows a newline. This
|
||||
// text-level strip is only safe on a fully-buffered body (the AngularJS polling
|
||||
// services for container/service/task logs) where every line starts on a frame
|
||||
// boundary. The live-stream path (logStream.ts) demuxes frames at the byte
|
||||
// level instead and calls formatLogs WITHOUT `stripHeaders`.
|
||||
function stripHeadersFunc(logs: string) {
|
||||
return logs.substring(8).replace(/\r?\n(.{8})/g, '\n');
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@ import {
|
||||
formatTime,
|
||||
} from './formatters';
|
||||
import {
|
||||
FormattedLine,
|
||||
FormattedLineContent,
|
||||
JSONStackTrace,
|
||||
Level,
|
||||
Span,
|
||||
@@ -50,7 +50,7 @@ type Pair = {
|
||||
|
||||
export function formatZerologLogs(rawText: string, withTimestamps?: boolean) {
|
||||
const spans: Span[] = [];
|
||||
const lines: FormattedLine[] = [];
|
||||
const lines: FormattedLineContent[] = [];
|
||||
let line = '';
|
||||
|
||||
const text = withTimestamps ? rawText.substring(TIMESTAMP_LENGTH) : rawText;
|
||||
|
||||
@@ -1,7 +1,13 @@
|
||||
import { format } from 'date-fns';
|
||||
import { takeRight } from 'lodash';
|
||||
|
||||
import { Span, Level, Colors, JSONStackTrace, FormattedLine } from './types';
|
||||
import {
|
||||
Span,
|
||||
Level,
|
||||
Colors,
|
||||
JSONStackTrace,
|
||||
FormattedLineContent,
|
||||
} from './types';
|
||||
|
||||
const spaceSpan: Span = { text: ' ' };
|
||||
|
||||
@@ -141,7 +147,7 @@ export function formatKeyValuePair(
|
||||
|
||||
export function formatStackTrace(
|
||||
stackTrace: JSONStackTrace | undefined,
|
||||
lines: FormattedLine[]
|
||||
lines: FormattedLineContent[]
|
||||
) {
|
||||
if (stackTrace) {
|
||||
stackTrace.forEach(({ func, line: lineNumber, source }) => {
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
export { formatLogs } from './formatLogs';
|
||||
export { concatLogsToString } from './concatLogsToString';
|
||||
export { NEW_LINE_BREAKER } from './constants';
|
||||
export { createLogStreamProcessor } from './logStream';
|
||||
export type { LogStreamProcessor } from './logStream';
|
||||
export { rfc3339ToUnixNanoSince } from './sinceTimestamp';
|
||||
|
||||
17
app/docker/helpers/logHelper/lineId.ts
Normal file
17
app/docker/helpers/logHelper/lineId.ts
Normal file
@@ -0,0 +1,17 @@
|
||||
// Monotonically increasing, process-wide sequence used to give every rendered
|
||||
// log line a stable id. Because it never resets during a session, lines parsed
|
||||
// from later stream chunks always get ids greater than earlier ones, so the
|
||||
// AngularJS `track by log.id` repeat treats already-rendered rows as unchanged
|
||||
// and never rewrites their DOM text nodes (which is what was collapsing text
|
||||
// selections on every poll). See logHelper/types.ts (FormattedLine).
|
||||
let sequence = 0;
|
||||
|
||||
export function nextLineId() {
|
||||
|
||||
return sequence++;
|
||||
}
|
||||
|
||||
// Test-only helper to make id assignments deterministic between unit tests.
|
||||
export function resetLineIdSequence() {
|
||||
sequence = 0;
|
||||
}
|
||||
344
app/docker/helpers/logHelper/logStream.test.ts
Normal file
344
app/docker/helpers/logHelper/logStream.test.ts
Normal file
@@ -0,0 +1,344 @@
|
||||
import { createLogStreamProcessor } from './logStream';
|
||||
import { resetLineIdSequence } from './lineId';
|
||||
|
||||
const encoder = new TextEncoder();
|
||||
|
||||
function bytes(s: string): Uint8Array {
|
||||
return encoder.encode(s);
|
||||
}
|
||||
|
||||
// Build a real Docker multiplexed-stream frame: 1 byte stream-type, 3 zero
|
||||
// bytes, a 4-byte big-endian payload length, then the payload bytes.
|
||||
function frame(payload: Uint8Array | string, streamType = 1): Uint8Array {
|
||||
const body = typeof payload === 'string' ? bytes(payload) : payload;
|
||||
const out = new Uint8Array(8 + body.length);
|
||||
out[0] = streamType;
|
||||
const len = body.length;
|
||||
out[4] = (len >>> 24) & 0xff;
|
||||
out[5] = (len >>> 16) & 0xff;
|
||||
out[6] = (len >>> 8) & 0xff;
|
||||
out[7] = len & 0xff;
|
||||
out.set(body, 8);
|
||||
return out;
|
||||
}
|
||||
|
||||
function concat(...parts: Uint8Array[]): Uint8Array {
|
||||
const total = parts.reduce((n, p) => n + p.length, 0);
|
||||
const out = new Uint8Array(total);
|
||||
let off = 0;
|
||||
parts.forEach((p) => {
|
||||
out.set(p, off);
|
||||
off += p.length;
|
||||
});
|
||||
return out;
|
||||
}
|
||||
|
||||
beforeEach(() => {
|
||||
resetLineIdSequence();
|
||||
});
|
||||
|
||||
describe('createLogStreamProcessor (non-TTY, byte-level frame demux)', () => {
|
||||
it('emits only completed lines and carries the partial remainder forward', () => {
|
||||
const proc = createLogStreamProcessor({ stripHeaders: true });
|
||||
|
||||
// first chunk: one full frame then a frame whose line has no newline yet
|
||||
const first = proc.push(concat(frame('hello\n'), frame('wor')));
|
||||
expect(first.map((l) => l.line)).toEqual(['hello']);
|
||||
|
||||
// completing the partial line + a new one (across two frames)
|
||||
const second = proc.push(concat(frame('ld\n'), frame('bye\n')));
|
||||
expect(second.map((l) => l.line)).toEqual(['world', 'bye']);
|
||||
});
|
||||
|
||||
it('returns nothing until a newline arrives, then flushes the trailing partial', () => {
|
||||
const proc = createLogStreamProcessor({ stripHeaders: true });
|
||||
|
||||
expect(proc.push(frame('partial')).map((l) => l.line)).toEqual([]);
|
||||
// stream ended without a trailing newline -> flush yields the remainder
|
||||
expect(proc.flush().map((l) => l.line)).toEqual(['partial']);
|
||||
});
|
||||
|
||||
it('strips the frame header from every framed line', () => {
|
||||
const proc = createLogStreamProcessor({ stripHeaders: true });
|
||||
const lines = proc.push(concat(frame('line-1\n'), frame('line-2\n')));
|
||||
expect(lines.map((l) => l.line)).toEqual(['line-1', 'line-2']);
|
||||
});
|
||||
|
||||
// F1: a frame whose payload length low-byte is 0x0a (10). A text-level
|
||||
// demuxer that splits on '\n' before stripping headers would land inside the
|
||||
// header and desync, mis-rendering "123456789\n" down to "9".
|
||||
it('handles a frame whose length low-byte is 0x0a (payload of exactly 10 bytes)', () => {
|
||||
const proc = createLogStreamProcessor({ stripHeaders: true });
|
||||
const lines = proc.push(frame('123456789\n')); // payload length = 10 = 0x0a
|
||||
expect(lines.map((l) => l.line)).toEqual(['123456789']);
|
||||
});
|
||||
|
||||
// F2 (boundary): a chunk boundary falling between the 8-byte header and its
|
||||
// payload.
|
||||
it('handles a chunk split between the header and the payload', () => {
|
||||
const proc = createLogStreamProcessor({ stripHeaders: true });
|
||||
const full = frame('abc\n');
|
||||
expect(proc.push(full.subarray(0, 8)).map((l) => l.line)).toEqual([]);
|
||||
expect(proc.push(full.subarray(8)).map((l) => l.line)).toEqual(['abc']);
|
||||
});
|
||||
|
||||
// F2: a length field with a byte >= 0x80 (200 = 0xC8), split across chunks.
|
||||
// UTF-8-decoding the header would mangle 0xC8 and slip the offset.
|
||||
it('handles a large frame (length byte >= 0x80) split across chunks', () => {
|
||||
const proc = createLogStreamProcessor({ stripHeaders: true });
|
||||
const payload = `${'x'.repeat(199)}\n`; // 200 bytes -> length byte 0xC8
|
||||
const full = frame(payload);
|
||||
expect(full[7]).toBe(0xc8);
|
||||
expect(proc.push(full.subarray(0, 5)).map((l) => l.line)).toEqual([]);
|
||||
const rest = proc.push(full.subarray(5));
|
||||
expect(rest.map((l) => l.line)).toEqual(['x'.repeat(199)]);
|
||||
});
|
||||
|
||||
// F2 (multibyte): a multibyte UTF-8 char split across two chunks must be
|
||||
// reassembled before decoding (not turned into U+FFFD).
|
||||
it('reassembles a multibyte UTF-8 char split across two chunks', () => {
|
||||
const proc = createLogStreamProcessor({ stripHeaders: true });
|
||||
const full = frame('aéb\n'); // é = 0xC3 0xA9
|
||||
const eIdx = full.indexOf(0xc3);
|
||||
expect(proc.push(full.subarray(0, eIdx + 1)).map((l) => l.line)).toEqual(
|
||||
[]
|
||||
);
|
||||
const rest = proc.push(full.subarray(eIdx + 1));
|
||||
expect(rest.map((l) => l.line)).toEqual(['aéb']);
|
||||
});
|
||||
|
||||
it('drops a truncated final frame on flush instead of emitting garbage', () => {
|
||||
const proc = createLogStreamProcessor({ stripHeaders: true });
|
||||
const full = frame('never\n');
|
||||
// only the header + 2 payload bytes arrive, then the stream ends
|
||||
expect(proc.push(full.subarray(0, 10)).map((l) => l.line)).toEqual([]);
|
||||
expect(proc.flush().map((l) => l.line)).toEqual([]);
|
||||
});
|
||||
|
||||
// F3: a framed line terminated by \r\n must have its trailing CR stripped.
|
||||
it('strips the trailing CR from a \\r\\n line ending (framed)', () => {
|
||||
const proc = createLogStreamProcessor({ stripHeaders: true });
|
||||
const lines = proc.push(frame('with-crlf\r\n'));
|
||||
expect(lines.map((l) => l.line)).toEqual(['with-crlf']);
|
||||
});
|
||||
});
|
||||
|
||||
describe('createLogStreamProcessor (TTY, no headers)', () => {
|
||||
it('passes lines through unchanged', () => {
|
||||
const proc = createLogStreamProcessor({ stripHeaders: false });
|
||||
const lines = proc.push(bytes('plain line\n'));
|
||||
expect(lines.map((l) => l.line)).toEqual(['plain line']);
|
||||
});
|
||||
|
||||
it('carries a partial line across chunks and flushes the remainder', () => {
|
||||
const proc = createLogStreamProcessor({ stripHeaders: false });
|
||||
expect(proc.push(bytes('par')).map((l) => l.line)).toEqual([]);
|
||||
expect(proc.push(bytes('tial\n')).map((l) => l.line)).toEqual(['partial']);
|
||||
expect(proc.push(bytes('tail')).map((l) => l.line)).toEqual([]);
|
||||
expect(proc.flush().map((l) => l.line)).toEqual(['tail']);
|
||||
});
|
||||
|
||||
// F3: a \r\n line ending must have its trailing CR stripped (non-TTY path).
|
||||
it('strips the trailing CR from a \\r\\n line ending', () => {
|
||||
const proc = createLogStreamProcessor({ stripHeaders: false });
|
||||
const lines = proc.push(bytes('with-crlf\r\n'));
|
||||
expect(lines.map((l) => l.line)).toEqual(['with-crlf']);
|
||||
});
|
||||
|
||||
// F3: a remainder carrying a trailing CR (no terminating LF) must have the CR
|
||||
// stripped on flush. Split across two chunks to exercise the buffered path.
|
||||
it('strips a trailing CR on flush when the stream ends without a newline', () => {
|
||||
const proc = createLogStreamProcessor({ stripHeaders: false });
|
||||
expect(proc.push(bytes('hel')).map((l) => l.line)).toEqual([]);
|
||||
expect(proc.push(bytes('lo\r')).map((l) => l.line)).toEqual([]);
|
||||
expect(proc.flush().map((l) => l.line)).toEqual(['hello']);
|
||||
});
|
||||
});
|
||||
|
||||
describe('timestamps (always requested, optionally displayed)', () => {
|
||||
const ts1 = '2024-01-01T00:00:00.000000001Z';
|
||||
const ts2 = '2024-01-01T00:00:00.000000002Z';
|
||||
|
||||
it('strips the RFC3339 prefix when timestamps are hidden', () => {
|
||||
const proc = createLogStreamProcessor({
|
||||
stripHeaders: false,
|
||||
withTimestamps: false,
|
||||
streamHasTimestamps: true,
|
||||
});
|
||||
const lines = proc.push(bytes(`${ts1} hello\n`));
|
||||
expect(lines.map((l) => l.line)).toEqual(['hello']);
|
||||
expect(proc.getLastTimestamp()).toBe(ts1);
|
||||
});
|
||||
|
||||
it('keeps the RFC3339 prefix when timestamps are shown', () => {
|
||||
const proc = createLogStreamProcessor({
|
||||
stripHeaders: false,
|
||||
withTimestamps: true,
|
||||
streamHasTimestamps: true,
|
||||
});
|
||||
const lines = proc.push(bytes(`${ts1} hello\n`));
|
||||
expect(lines.map((l) => l.line)).toEqual([`${ts1} hello`]);
|
||||
});
|
||||
|
||||
it('drops lines re-delivered at/before the reconnect resume point', () => {
|
||||
const proc = createLogStreamProcessor({
|
||||
stripHeaders: false,
|
||||
withTimestamps: false,
|
||||
streamHasTimestamps: true,
|
||||
skipUntilTimestamp: ts1,
|
||||
// the boundary line we had already shown (Docker re-delivers it inclusively)
|
||||
skipBoundaryContents: [`${ts1} old`],
|
||||
});
|
||||
// ts1 is the inclusive boundary Docker re-delivers -> dropped; ts2 is new
|
||||
const lines = proc.push(bytes(`${ts1} old\n${ts2} new\n`));
|
||||
expect(lines.map((l) => l.line)).toEqual(['new']);
|
||||
});
|
||||
|
||||
// F4: the real reconnect shape — the redelivered boundary line arrives in its
|
||||
// OWN chunk (the whole batch is dropped, dropTo === lines.length), so the
|
||||
// `skipping` flag must stay on and the NEXT chunk must keep dropping dups up
|
||||
// to the resume point before emitting the first genuinely new line.
|
||||
it('keeps dropping reconnect dups when the redelivered line arrives in its own chunk', () => {
|
||||
const tsA = '2024-01-01T00:00:00.000000001Z';
|
||||
const tsB = '2024-01-01T00:00:00.000000002Z';
|
||||
const tsC = '2024-01-01T00:00:00.000000003Z';
|
||||
const tsD = '2024-01-01T00:00:00.000000004Z';
|
||||
const proc = createLogStreamProcessor({
|
||||
stripHeaders: false,
|
||||
withTimestamps: false,
|
||||
streamHasTimestamps: true,
|
||||
skipUntilTimestamp: tsB,
|
||||
// dup-2 carries the resume timestamp and was already shown; dup-1 is before
|
||||
// it and is dropped by the chronological compare.
|
||||
skipBoundaryContents: [`${tsB} dup-2`],
|
||||
});
|
||||
|
||||
// (1) only a redelivered boundary line (<= resume point) -> fully consumed
|
||||
// by dedup; nothing emitted and skipping stays on.
|
||||
expect(proc.push(bytes(`${tsA} dup-1\n`)).map((l) => l.line)).toEqual([]);
|
||||
|
||||
// (2) another dup + the first new line: because skipping was preserved the
|
||||
// dup is still dropped, and only the new line is returned.
|
||||
expect(
|
||||
proc.push(bytes(`${tsB} dup-2\n${tsC} new\n`)).map((l) => l.line)
|
||||
).toEqual(['new']);
|
||||
|
||||
// (3) a later line passes through untouched (no skipping anymore).
|
||||
expect(proc.push(bytes(`${tsD} after\n`)).map((l) => l.line)).toEqual([
|
||||
'after',
|
||||
]);
|
||||
});
|
||||
|
||||
// F1: two lines A and B share the exact same nanosecond timestamp; the
|
||||
// connection drops after only A was shown. On reconnect `since=T` re-delivers
|
||||
// BOTH. A `<= timestamp` dedup would drop both and LOSE B. Content-exact dedup
|
||||
// (the boundary set is just [A]) drops A and keeps B.
|
||||
it('does not lose a new line that shares the boundary timestamp with a duplicate', () => {
|
||||
const tsShared = '2024-01-01T00:00:00.000000005Z';
|
||||
const tsNext = '2024-01-01T00:00:00.000000006Z';
|
||||
|
||||
// First session: only line A (at tsShared) was delivered before the drop.
|
||||
const first = createLogStreamProcessor({
|
||||
stripHeaders: false,
|
||||
withTimestamps: true,
|
||||
streamHasTimestamps: true,
|
||||
});
|
||||
expect(first.push(bytes(`${tsShared} A\n`)).map((l) => l.line)).toEqual([
|
||||
`${tsShared} A`,
|
||||
]);
|
||||
expect(first.getLastTimestamp()).toBe(tsShared);
|
||||
// the boundary set handed to the reconnecting processor is exactly [A]
|
||||
expect(first.getBoundaryLines()).toEqual([`${tsShared} A`]);
|
||||
|
||||
// Reconnect: Docker re-delivers A (duplicate) AND B (new, same timestamp).
|
||||
const second = createLogStreamProcessor({
|
||||
stripHeaders: false,
|
||||
withTimestamps: true,
|
||||
streamHasTimestamps: true,
|
||||
skipUntilTimestamp: first.getLastTimestamp(),
|
||||
skipBoundaryContents: first.getBoundaryLines(),
|
||||
});
|
||||
const redelivered = second.push(
|
||||
bytes(`${tsShared} A\n${tsShared} B\n${tsNext} C\n`)
|
||||
);
|
||||
// A (genuine duplicate) is dropped; B (new, shares the boundary ts) survives.
|
||||
expect(redelivered.map((l) => l.line)).toEqual([
|
||||
`${tsShared} B`,
|
||||
`${tsNext} C`,
|
||||
]);
|
||||
// and B is now part of the next boundary set alongside any A-twin.
|
||||
expect(second.getBoundaryLines()).toEqual([`${tsNext} C`]);
|
||||
});
|
||||
|
||||
// F1: a genuine duplicate at the boundary timestamp (nothing new shares it) is
|
||||
// still dropped — the normal reconnect case is unchanged.
|
||||
it('still drops a genuine duplicate at the boundary timestamp', () => {
|
||||
const tsBoundary = '2024-01-01T00:00:00.000000007Z';
|
||||
const tsAfter = '2024-01-01T00:00:00.000000008Z';
|
||||
const proc = createLogStreamProcessor({
|
||||
stripHeaders: false,
|
||||
withTimestamps: false,
|
||||
streamHasTimestamps: true,
|
||||
skipUntilTimestamp: tsBoundary,
|
||||
skipBoundaryContents: [`${tsBoundary} only`],
|
||||
});
|
||||
const lines = proc.push(bytes(`${tsBoundary} only\n${tsAfter} fresh\n`));
|
||||
expect(lines.map((l) => l.line)).toEqual(['fresh']);
|
||||
});
|
||||
|
||||
// F10: a SECOND reconnect at the same nanosecond timestamp must not re-emit
|
||||
// duplicates. Lines A and B share ts T; the connection drops after only A was
|
||||
// shown. Reconnect #1 (boundary set [A]) redelivers A,B -> drops A, keeps B,
|
||||
// and — because the processor seeds its boundary state from the resume
|
||||
// params — its boundary set becomes [A, B] (A is NOT forgotten). If a second
|
||||
// reconnect happens at the same T before any newer-ts line arrives, it carries
|
||||
// [A, B] forward and drops BOTH redelivered lines (no duplicate). Without the
|
||||
// seeding, reconnect #1's boundary set would be just [B], and reconnect #2
|
||||
// would fail to recognise A as a duplicate and re-emit both A and B.
|
||||
it('does not duplicate lines on a second reconnect at the same nanosecond timestamp', () => {
|
||||
const tsShared = '2024-01-01T00:00:00.000000005Z';
|
||||
|
||||
// Reconnect #1: resume at tsShared, having already shown A.
|
||||
const first = createLogStreamProcessor({
|
||||
stripHeaders: false,
|
||||
withTimestamps: true,
|
||||
streamHasTimestamps: true,
|
||||
skipUntilTimestamp: tsShared,
|
||||
skipBoundaryContents: [`${tsShared} A`],
|
||||
});
|
||||
// Docker redelivers A (duplicate) and B (new, same timestamp).
|
||||
const round1 = first.push(bytes(`${tsShared} A\n${tsShared} B\n`));
|
||||
expect(round1.map((l) => l.line)).toEqual([`${tsShared} B`]);
|
||||
// A must NOT be forgotten: the boundary set now carries both A and B.
|
||||
expect(first.getBoundaryLines()).toEqual([
|
||||
`${tsShared} A`,
|
||||
`${tsShared} B`,
|
||||
]);
|
||||
|
||||
// Reconnect #2 at the same timestamp, before any newer-ts line arrived,
|
||||
// carrying reconnect #1's boundary set forward.
|
||||
const second = createLogStreamProcessor({
|
||||
stripHeaders: false,
|
||||
withTimestamps: true,
|
||||
streamHasTimestamps: true,
|
||||
skipUntilTimestamp: first.getLastTimestamp(),
|
||||
skipBoundaryContents: first.getBoundaryLines(),
|
||||
});
|
||||
// Docker redelivers A and B again; both are genuine duplicates now.
|
||||
const round2 = second.push(bytes(`${tsShared} A\n${tsShared} B\n`));
|
||||
expect(round2.map((l) => l.line)).toEqual([]);
|
||||
});
|
||||
});
|
||||
|
||||
describe('stable line ids (append model)', () => {
|
||||
it('assigns unique, strictly increasing ids across successive chunks', () => {
|
||||
const proc = createLogStreamProcessor({ stripHeaders: false });
|
||||
const a = proc.push(bytes('a\nb\n'));
|
||||
const b = proc.push(bytes('c\n'));
|
||||
|
||||
const ids = [...a, ...b].map((l) => l.id);
|
||||
expect(new Set(ids).size).toBe(ids.length); // all unique
|
||||
const sorted = [...ids].sort((x, y) => x - y);
|
||||
expect(ids).toEqual(sorted); // monotonically increasing in arrival order
|
||||
});
|
||||
});
|
||||
332
app/docker/helpers/logHelper/logStream.ts
Normal file
332
app/docker/helpers/logHelper/logStream.ts
Normal file
@@ -0,0 +1,332 @@
|
||||
import { formatLogs } from './formatLogs';
|
||||
import { FormattedLine, TIMESTAMP_LENGTH } from './types';
|
||||
|
||||
type ProcessorOptions = {
|
||||
/** Non-TTY container: demux Docker's 8-byte multiplexed-stream frame headers. */
|
||||
stripHeaders?: boolean;
|
||||
/** Display the RFC3339 timestamp prefix (user's "Display timestamps" toggle). */
|
||||
withTimestamps?: boolean;
|
||||
/**
|
||||
* The raw stream always carries an RFC3339 timestamp prefix because the
|
||||
* controller requests `timestamps=1` internally (so it can resume from the
|
||||
* exact log timestamp on reconnect). When `withTimestamps` is false we strip
|
||||
* the prefix before formatting so it is not displayed.
|
||||
*/
|
||||
streamHasTimestamps?: boolean;
|
||||
/**
|
||||
* Reconnect dedup: Docker's `since` filter is inclusive, so on reconnect it
|
||||
* re-delivers the boundary line(s) at the resume timestamp. This is the resume
|
||||
* point (the RFC3339 timestamp of the last line we saw). Re-delivered lines
|
||||
* before it are dropped; lines exactly at it are matched against
|
||||
* `skipBoundaryContents` (see below).
|
||||
*/
|
||||
skipUntilTimestamp?: string;
|
||||
/**
|
||||
* Reconnect dedup, exact-content matching. The lines we had already shown that
|
||||
* carry the resume timestamp (`skipUntilTimestamp`). On reconnect Docker
|
||||
* re-delivers every line at that timestamp; we drop only those whose exact
|
||||
* content matches one of these (the genuine duplicates) and KEEP any line that
|
||||
* merely shares the timestamp but is new (different content) — otherwise a
|
||||
* second line sharing the boundary nanosecond would be lost.
|
||||
*/
|
||||
skipBoundaryContents?: string[];
|
||||
};
|
||||
|
||||
// Docker multiplexed-stream frame header: 1 byte stream-type, 3 zero bytes,
|
||||
// then a 4-byte big-endian payload length, followed by exactly `length` payload
|
||||
// bytes. See the Docker Engine API container "logs" (non-TTY) documentation.
|
||||
const FRAME_HEADER_SIZE = 8;
|
||||
const FRAME_LENGTH_OFFSET = 4;
|
||||
|
||||
// Defensive only: a single frame payload larger than this almost certainly means
|
||||
// the byte stream desynced/corrupted. This is NOT a cap on legitimate lines
|
||||
// (real log lines are orders of magnitude smaller); it only prevents unbounded
|
||||
// buffering if we ever lose frame alignment.
|
||||
const MAX_FRAME_PAYLOAD = 100 * 1024 * 1024;
|
||||
|
||||
// Width of Docker's fixed RFC3339Nano timestamp prefix emitted with
|
||||
// `timestamps=1`: "2006-01-02T15:04:05.000000000Z" — 30 chars; index 30 is the
|
||||
// separating space (TIMESTAMP_LENGTH = 31 = 30 + space).
|
||||
const TS_WIDTH = TIMESTAMP_LENGTH - 1;
|
||||
const TS_PREFIX = /^\d{4}-\d{2}-\d{2}T/;
|
||||
|
||||
const LF = 0x0a;
|
||||
const CR = 0x0d;
|
||||
|
||||
// `subarray()` widens the backing buffer to ArrayBufferLike under TS 5.7+; use a
|
||||
// permissive byte-array alias so buffered slices assign cleanly.
|
||||
type Bytes = Uint8Array<ArrayBufferLike>;
|
||||
|
||||
function concatBytes(a: Bytes, b: Bytes): Bytes {
|
||||
if (a.length === 0) {
|
||||
return b;
|
||||
}
|
||||
if (b.length === 0) {
|
||||
return a;
|
||||
}
|
||||
const out = new Uint8Array(a.length + b.length);
|
||||
out.set(a, 0);
|
||||
out.set(b, a.length);
|
||||
return out;
|
||||
}
|
||||
|
||||
function readUint32BE(buf: Bytes, offset: number): number {
|
||||
// `* 0x1000000` instead of `<< 24` to keep the high byte unsigned.
|
||||
return (
|
||||
buf[offset] * 0x1000000 +
|
||||
(buf[offset + 1] << 16) +
|
||||
(buf[offset + 2] << 8) +
|
||||
buf[offset + 3]
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stateful processor for an incremental (HTTP-streamed) container log response.
|
||||
*
|
||||
* A streaming response arrives in arbitrary byte chunks that split frames, log
|
||||
* lines and even multibyte UTF-8 characters at random points. For non-TTY
|
||||
* containers the body is Docker's binary multiplexed stream: each log message is
|
||||
* framed with an 8-byte header carrying the payload length. This processor
|
||||
* demuxes at the BYTE level — never on `\n` and never by UTF-8-decoding header
|
||||
* bytes — because the low byte of a frame's length field can itself be `0x0a`
|
||||
* and header bytes can be `>= 0x80` (both would corrupt a text-level demux):
|
||||
*
|
||||
* - it buffers raw bytes and parses complete frames by their declared length,
|
||||
* carrying any partial trailing frame forward;
|
||||
* - it concatenates payload bytes across frames, splits them into complete
|
||||
* lines on the `0x0a` byte, and UTF-8-decodes each complete line as a whole
|
||||
* (so a multibyte char split across frames/chunks is already reassembled);
|
||||
* - because headers are removed here, the completed text lines are formatted by
|
||||
* `formatLogs` WITHOUT its `stripHeaders` option (otherwise it would strip 8
|
||||
* more characters of real content).
|
||||
*
|
||||
* Each emitted line carries a stable, monotonically increasing `id`, so the
|
||||
* viewer can `track by log.id` and append without re-binding existing rows.
|
||||
*/
|
||||
export function createLogStreamProcessor({
|
||||
stripHeaders,
|
||||
withTimestamps,
|
||||
streamHasTimestamps,
|
||||
skipUntilTimestamp,
|
||||
skipBoundaryContents,
|
||||
}: ProcessorOptions = {}) {
|
||||
// Unparsed bytes of the multiplexed frame stream (non-TTY only).
|
||||
let frameBuf: Bytes = new Uint8Array(0);
|
||||
// Decoded-payload bytes not yet terminated by a newline.
|
||||
let lineBuf: Bytes = new Uint8Array(0);
|
||||
// Reconnect dedup: while true, drop re-delivered lines up to the resume point.
|
||||
let skipping = !!skipUntilTimestamp;
|
||||
// Copy of the boundary line contents still pending a duplicate match; each
|
||||
// matched re-delivered line consumes (splices out) one entry.
|
||||
const pendingBoundary: string[] = skipBoundaryContents
|
||||
? skipBoundaryContents.slice()
|
||||
: [];
|
||||
// RFC3339 timestamp of the last complete line we saw (next resume point).
|
||||
// Seeded from the resume point so the boundary set carried in via
|
||||
// `skipBoundaryContents` is preserved: a surviving line at the resume
|
||||
// timestamp is APPENDED to the already-known boundary lines (not treated as a
|
||||
// fresh timestamp), so a second reconnect at the same nanosecond still knows
|
||||
// every line we have shown at it and drops them all instead of re-emitting
|
||||
// duplicates. The first NEWER timestamp resets the set as usual.
|
||||
let lastTimestamp: string | undefined = skipUntilTimestamp;
|
||||
// Exact content of the line(s) at `lastTimestamp` we have emitted so far — the
|
||||
// boundary set handed to the next processor for content-exact reconnect dedup.
|
||||
// Seeded from the inbound boundary set (see `lastTimestamp` above).
|
||||
let boundaryLines: string[] = skipBoundaryContents
|
||||
? skipBoundaryContents.slice()
|
||||
: [];
|
||||
|
||||
const decoder = new TextDecoder();
|
||||
|
||||
// Extract complete newline-terminated lines from the accumulated payload
|
||||
// bytes, decoding each whole line at once. A `0x0a` byte can never be part of
|
||||
// a multibyte UTF-8 sequence, so a line boundary never cuts a character.
|
||||
function takeCompleteLines(payload: Bytes): string[] {
|
||||
lineBuf = concatBytes(lineBuf, payload);
|
||||
const lines: string[] = [];
|
||||
let start = 0;
|
||||
for (let i = 0; i < lineBuf.length; i += 1) {
|
||||
if (lineBuf[i] === LF) {
|
||||
let end = i;
|
||||
// drop a trailing CR for \r\n line endings
|
||||
if (end > start && lineBuf[end - 1] === CR) {
|
||||
end -= 1;
|
||||
}
|
||||
lines.push(decoder.decode(lineBuf.subarray(start, end)));
|
||||
start = i + 1;
|
||||
}
|
||||
}
|
||||
lineBuf = lineBuf.subarray(start);
|
||||
return lines;
|
||||
}
|
||||
|
||||
// Pull every complete frame payload out of frameBuf and return the decoded
|
||||
// complete lines they contain. An incomplete trailing frame stays buffered.
|
||||
function demuxFrames(): string[] {
|
||||
const lines: string[] = [];
|
||||
for (;;) {
|
||||
if (frameBuf.length < FRAME_HEADER_SIZE) {
|
||||
break;
|
||||
}
|
||||
const payloadLength = readUint32BE(frameBuf, FRAME_LENGTH_OFFSET);
|
||||
if (payloadLength > MAX_FRAME_PAYLOAD) {
|
||||
// Corrupt/desynced stream: drop everything buffered to avoid OOM. Surface
|
||||
// it so an unexpected reset is diagnosable rather than silently swallowed.
|
||||
// eslint-disable-next-line no-console
|
||||
console.warn(
|
||||
`logStream: frame payload length ${payloadLength} exceeds ${MAX_FRAME_PAYLOAD}; resetting desynced buffer`
|
||||
);
|
||||
frameBuf = new Uint8Array(0);
|
||||
break;
|
||||
}
|
||||
if (frameBuf.length - FRAME_HEADER_SIZE < payloadLength) {
|
||||
break; // wait for the rest of the payload
|
||||
}
|
||||
const payload = frameBuf.subarray(
|
||||
FRAME_HEADER_SIZE,
|
||||
FRAME_HEADER_SIZE + payloadLength
|
||||
);
|
||||
lines.push(...takeCompleteLines(payload));
|
||||
frameBuf = frameBuf.subarray(FRAME_HEADER_SIZE + payloadLength);
|
||||
}
|
||||
return lines;
|
||||
}
|
||||
|
||||
// Apply reconnect-dedup + timestamp handling, then format a batch of complete
|
||||
// text lines into rendered lines.
|
||||
function formatBatch(rawLines: string[]): FormattedLine[] {
|
||||
let lines = rawLines;
|
||||
|
||||
if (streamHasTimestamps) {
|
||||
if (skipping && lines.length) {
|
||||
// Drop re-delivered lines up to the resume point. Lines strictly before
|
||||
// it are duplicates (lexical compare is chronological for the zero-padded
|
||||
// UTC format). Lines exactly at the resume timestamp are dropped ONLY
|
||||
// when their exact content matches a boundary line we already showed —
|
||||
// so a NEW line sharing the same nanosecond timestamp is not lost.
|
||||
const boundary = skipUntilTimestamp as string;
|
||||
let dropTo = 0;
|
||||
while (dropTo < lines.length) {
|
||||
const line = lines[dropTo];
|
||||
if (!TS_PREFIX.test(line)) {
|
||||
break; // cannot classify without a timestamp; stop dropping
|
||||
}
|
||||
const ts = line.substring(0, TS_WIDTH);
|
||||
const boundaryIdx =
|
||||
ts === boundary ? pendingBoundary.indexOf(line) : -1;
|
||||
if (ts < boundary) {
|
||||
// re-delivered line before the resume point
|
||||
dropTo += 1;
|
||||
} else if (boundaryIdx !== -1) {
|
||||
// exact duplicate of an already-shown boundary line
|
||||
pendingBoundary.splice(boundaryIdx, 1);
|
||||
dropTo += 1;
|
||||
} else {
|
||||
// ts > boundary, or a new line that merely shares the boundary
|
||||
// timestamp -> keep it and everything after.
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (dropTo > 0) {
|
||||
lines = lines.slice(dropTo);
|
||||
}
|
||||
if (lines.length) {
|
||||
skipping = false;
|
||||
}
|
||||
}
|
||||
|
||||
// Track the resume timestamp and the exact content of the line(s) at it,
|
||||
// walking the kept lines in order (each line still has its prefix here).
|
||||
// Timestamps are monotonically non-decreasing, so the trailing run sharing
|
||||
// the newest timestamp is the boundary set the next processor dedups on.
|
||||
for (let i = 0; i < lines.length; i += 1) {
|
||||
const line = lines[i];
|
||||
if (TS_PREFIX.test(line)) {
|
||||
const ts = line.substring(0, TS_WIDTH);
|
||||
if (ts !== lastTimestamp) {
|
||||
lastTimestamp = ts;
|
||||
boundaryLines = [line];
|
||||
} else {
|
||||
boundaryLines.push(line);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// When the user has timestamps hidden, strip the prefix we requested
|
||||
// internally so it is not displayed.
|
||||
if (!withTimestamps) {
|
||||
lines = lines.map((line) =>
|
||||
TS_PREFIX.test(line) ? line.substring(TIMESTAMP_LENGTH) : line
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if (!lines.length) {
|
||||
return [];
|
||||
}
|
||||
|
||||
// Rejoin into a single batch so ANSI colour state carries across lines
|
||||
// within the batch (matches the previous whole-batch formatLogs call).
|
||||
// Headers are already stripped at the byte layer, so do NOT pass
|
||||
// stripHeaders here.
|
||||
const batch = `${lines.join('\n')}\n`;
|
||||
return formatLogs(batch, { withTimestamps });
|
||||
}
|
||||
|
||||
return {
|
||||
/**
|
||||
* Append a raw byte chunk and return the newly completed, formatted lines.
|
||||
* Returns an empty array when the chunk did not complete any line.
|
||||
*/
|
||||
push(value: Bytes): FormattedLine[] {
|
||||
if (!value || value.length === 0) {
|
||||
return [];
|
||||
}
|
||||
let rawLines: string[];
|
||||
if (stripHeaders) {
|
||||
frameBuf = concatBytes(frameBuf, value);
|
||||
rawLines = demuxFrames();
|
||||
} else {
|
||||
rawLines = takeCompleteLines(value);
|
||||
}
|
||||
return formatBatch(rawLines);
|
||||
},
|
||||
|
||||
/**
|
||||
* Flush any buffered partial line (e.g. when the stream ends without a
|
||||
* trailing newline). A partial/truncated final frame is dropped so we never
|
||||
* emit garbage.
|
||||
*/
|
||||
flush(): FormattedLine[] {
|
||||
// Drop any incomplete trailing frame.
|
||||
frameBuf = new Uint8Array(0);
|
||||
if (lineBuf.length === 0) {
|
||||
return [];
|
||||
}
|
||||
let bytes: Bytes = lineBuf;
|
||||
// drop a trailing CR for \r\n line endings
|
||||
if (bytes.length > 0 && bytes[bytes.length - 1] === CR) {
|
||||
bytes = bytes.subarray(0, bytes.length - 1);
|
||||
}
|
||||
lineBuf = new Uint8Array(0);
|
||||
return formatBatch([decoder.decode(bytes)]);
|
||||
},
|
||||
|
||||
/** RFC3339 timestamp of the last complete line seen, for reconnect resume. */
|
||||
getLastTimestamp(): string | undefined {
|
||||
return lastTimestamp;
|
||||
},
|
||||
|
||||
/**
|
||||
* Exact content of the line(s) carrying the last-seen timestamp. Pass to the
|
||||
* next processor as `skipBoundaryContents` so reconnect dedup drops only the
|
||||
* genuine duplicates Docker re-delivers, never a new line that happens to
|
||||
* share the boundary nanosecond.
|
||||
*/
|
||||
getBoundaryLines(): string[] {
|
||||
return boundaryLines.slice();
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
export type LogStreamProcessor = ReturnType<typeof createLogStreamProcessor>;
|
||||
31
app/docker/helpers/logHelper/sinceTimestamp.test.ts
Normal file
31
app/docker/helpers/logHelper/sinceTimestamp.test.ts
Normal file
@@ -0,0 +1,31 @@
|
||||
import { rfc3339ToUnixNanoSince } from './sinceTimestamp';
|
||||
|
||||
// Integer Unix seconds of the shared second-precision prefix, computed the same
|
||||
// way the parser does (Date.parse of the "...:05Z" prefix, in UTC).
|
||||
const unix = Math.floor(Date.parse('2024-01-01T00:00:00Z') / 1000);
|
||||
|
||||
describe('rfc3339ToUnixNanoSince', () => {
|
||||
it('keeps a standard fixed-width nanosecond fraction', () => {
|
||||
expect(rfc3339ToUnixNanoSince('2024-01-01T00:00:00.000000005Z')).toBe(
|
||||
`${unix}.000000005`
|
||||
);
|
||||
});
|
||||
|
||||
it('emits a zero fraction when the timestamp has no fractional part', () => {
|
||||
expect(rfc3339ToUnixNanoSince('2024-01-01T00:00:00Z')).toBe(
|
||||
`${unix}.000000000`
|
||||
);
|
||||
});
|
||||
|
||||
it('right-pads a sub-nanosecond-width fraction to 9 digits', () => {
|
||||
expect(rfc3339ToUnixNanoSince('2024-01-01T00:00:00.5Z')).toBe(
|
||||
`${unix}.500000000`
|
||||
);
|
||||
});
|
||||
|
||||
it('truncates a fraction longer than 9 digits to nanosecond precision', () => {
|
||||
expect(rfc3339ToUnixNanoSince('2024-01-01T00:00:00.1234567890123Z')).toBe(
|
||||
`${unix}.123456789`
|
||||
);
|
||||
});
|
||||
});
|
||||
26
app/docker/helpers/logHelper/sinceTimestamp.ts
Normal file
26
app/docker/helpers/logHelper/sinceTimestamp.ts
Normal file
@@ -0,0 +1,26 @@
|
||||
// Build Docker's `since` param as a Unix timestamp with a nanosecond fraction
|
||||
// ("<seconds>.<nanos>") from an RFC3339Nano timestamp. Both the initial connect
|
||||
// and reconnect use this one form so we never mix unix-seconds and RFC3339 —
|
||||
// some proxies / pinned API versions accept them inconsistently. The fraction
|
||||
// needs string precision (a JS number cannot hold nanoseconds), hence `since`
|
||||
// is returned as a string.
|
||||
//
|
||||
// Docker emits a fixed-width RFC3339Nano prefix: "2006-01-02T15:04:05.000000000Z".
|
||||
// We parse the second-precision prefix in UTC for the integer seconds, then take
|
||||
// the fractional digits verbatim: short fractions (e.g. ".5") are right-padded
|
||||
// to 9 digits, longer ones are truncated to 9 (nanosecond precision).
|
||||
export function rfc3339ToUnixNanoSince(rfc3339: string): string {
|
||||
const seconds = Math.floor(
|
||||
Date.parse(`${rfc3339.substring(0, 19)}Z`) / 1000
|
||||
);
|
||||
const dot = rfc3339.indexOf('.');
|
||||
const nanos =
|
||||
dot >= 0
|
||||
? rfc3339
|
||||
.substring(dot + 1)
|
||||
.replace(/[^0-9]/g, '')
|
||||
.padEnd(9, '0')
|
||||
.substring(0, 9)
|
||||
: '000000000';
|
||||
return `${seconds}.${nanos}`;
|
||||
}
|
||||
@@ -36,11 +36,21 @@ export type Span = {
|
||||
fontWeight?: FontWeight;
|
||||
};
|
||||
|
||||
export type FormattedLine = {
|
||||
// A rendered log line without its stable id. Internal formatters build these,
|
||||
// and `formatLogs` assigns the `id` once, centrally, on the way out.
|
||||
export type FormattedLineContent = {
|
||||
spans: Span[];
|
||||
line: string;
|
||||
};
|
||||
|
||||
// A rendered log line with a stable, monotonically increasing id. The id lets
|
||||
// the AngularJS template use `track by log.id`, so already-rendered rows are
|
||||
// never re-bound (their text nodes are not rewritten) — which is what keeps a
|
||||
// live text selection from collapsing while new lines stream in.
|
||||
export type FormattedLine = FormattedLineContent & {
|
||||
id: number;
|
||||
};
|
||||
|
||||
export const TIMESTAMP_LENGTH = 31; // 30 for timestamp + 1 for trailing space
|
||||
|
||||
export const Colors = {
|
||||
|
||||
@@ -1,84 +1,303 @@
|
||||
import moment from 'moment';
|
||||
|
||||
import { streamContainerLogs } from '@/react/docker/containers/containers.service';
|
||||
import { createLogStreamProcessor, rfc3339ToUnixNanoSince } from '@/docker/helpers/logHelper';
|
||||
|
||||
// Hard cap on how many lines we keep in the DOM/buffer during a long live
|
||||
// stream. We trim from the head (oldest lines) so a selection anchored in the
|
||||
// tail is never disturbed. The user-facing `lineCount` (tail) is a *request*
|
||||
// for how many historical lines to fetch, not this in-memory limit.
|
||||
const MAX_LOG_LINES = 5000;
|
||||
// Delay before reconnecting after the stream ends or errors (container stopped,
|
||||
// network blip, proxy hiccup). Avoids hammering a stopped container.
|
||||
const RECONNECT_DELAY_MS = 3000;
|
||||
// Default tail (historical-backfill) request used on the initial connect and
|
||||
// whenever the user clears/invalidates the "Lines" field — without it a cleared
|
||||
// number input becomes null, `tail` is dropped, and Docker re-downloads the
|
||||
// entire history of a chatty container on every parameter change.
|
||||
const DEFAULT_LINE_COUNT = 100;
|
||||
|
||||
angular.module('portainer.docker').controller('ContainerLogsController', [
|
||||
'$scope',
|
||||
'$transition$',
|
||||
'$interval',
|
||||
'$timeout',
|
||||
'ContainerService',
|
||||
'Notifications',
|
||||
'HttpRequestHelper',
|
||||
'endpoint',
|
||||
function ($scope, $transition$, $interval, ContainerService, Notifications, HttpRequestHelper, endpoint) {
|
||||
function ($scope, $transition$, $timeout, ContainerService, Notifications, HttpRequestHelper, endpoint) {
|
||||
$scope.state = {
|
||||
refreshRate: 3,
|
||||
lineCount: 100,
|
||||
// No refreshRate here: container logs are delivered over a live stream now,
|
||||
// not a 3s poll, so there is nothing to refresh on an interval.
|
||||
lineCount: DEFAULT_LINE_COUNT,
|
||||
sinceTimestamp: '',
|
||||
displayTimestamps: false,
|
||||
};
|
||||
|
||||
// Live-stream bookkeeping (not on $scope.state to avoid digest churn).
|
||||
var stream = {
|
||||
abortController: null,
|
||||
reconnectTimer: null,
|
||||
// RFC3339 timestamp of the last log line we received. Used as `since` on
|
||||
// reconnect so we resume from the exact log position (not the client
|
||||
// wall-clock) and neither duplicate nor lose lines across a reconnect.
|
||||
lastTimestamp: '',
|
||||
// Exact content of the line(s) at `lastTimestamp` we already rendered.
|
||||
// Handed to the reconnecting processor so it drops only genuine duplicates
|
||||
// Docker re-delivers, never a new line that shares the boundary nanosecond.
|
||||
boundaryLines: [],
|
||||
// The processor of the in-flight stream, so an intentional pause can flush
|
||||
// its buffered partial last line for display.
|
||||
processor: null,
|
||||
// How many cosmetic lines an intentional-pause flush appended; removed on
|
||||
// resume before Docker re-delivers them in full (see pauseStream/startStream).
|
||||
pausedFlushCount: 0,
|
||||
// false while the stream is intentionally paused (Live toggle off) or the
|
||||
// view is being destroyed — suppresses auto-reconnect.
|
||||
active: false,
|
||||
skipHeaders: false,
|
||||
// Whether we already surfaced the current reconnect-loop error, so the 3s
|
||||
// reconnect loop does not spam a notification on every attempt.
|
||||
errorNotified: false,
|
||||
};
|
||||
|
||||
// Live toggle (the "Auto-refresh logs" switch in the viewer).
|
||||
$scope.changeLogCollection = function (logCollectionStatus) {
|
||||
if (!logCollectionStatus) {
|
||||
stopRepeater();
|
||||
pauseStream(true);
|
||||
} else {
|
||||
setUpdateRepeater(!$scope.container.Config.Tty);
|
||||
// Resume without wiping the buffer (pause promises to keep it) and
|
||||
// continue from the last timestamp we saw.
|
||||
startStream(false);
|
||||
}
|
||||
};
|
||||
|
||||
$scope.$on('$destroy', function () {
|
||||
stopRepeater();
|
||||
stopStream();
|
||||
});
|
||||
|
||||
function stopRepeater() {
|
||||
var repeater = $scope.repeater;
|
||||
if (angular.isDefined(repeater)) {
|
||||
$interval.cancel(repeater);
|
||||
function clearReconnectTimer() {
|
||||
if (stream.reconnectTimer) {
|
||||
$timeout.cancel(stream.reconnectTimer);
|
||||
stream.reconnectTimer = null;
|
||||
}
|
||||
}
|
||||
|
||||
function setUpdateRepeater(skipHeaders) {
|
||||
var refreshRate = $scope.state.refreshRate;
|
||||
$scope.repeater = $interval(function () {
|
||||
ContainerService.logs(
|
||||
endpoint.Id,
|
||||
$transition$.params().id,
|
||||
1,
|
||||
1,
|
||||
$scope.state.displayTimestamps ? 1 : 0,
|
||||
moment($scope.state.sinceTimestamp).unix(),
|
||||
$scope.state.lineCount,
|
||||
skipHeaders
|
||||
)
|
||||
.then(function success(data) {
|
||||
$scope.logs = data;
|
||||
})
|
||||
.catch(function error(err) {
|
||||
stopRepeater();
|
||||
Notifications.error('Failure', err, 'Unable to retrieve container logs');
|
||||
});
|
||||
}, refreshRate * 1000);
|
||||
function abortInFlight() {
|
||||
if (stream.abortController) {
|
||||
stream.abortController.abort();
|
||||
stream.abortController = null;
|
||||
}
|
||||
}
|
||||
|
||||
function startLogPolling(skipHeaders) {
|
||||
ContainerService.logs(
|
||||
// Pause: stop streaming but keep the current buffer on screen.
|
||||
//
|
||||
// `flushPartial` is set only on an intentional pause (Live -> off): with no
|
||||
// reconnect to re-deliver it, the buffered unfinished last line would stay
|
||||
// hidden until resume, so we flush it for display. We do NOT advance the
|
||||
// resume point past it (it is incomplete); on resume Docker re-delivers that
|
||||
// line in full from `since`, and startStream() first strips these cosmetic
|
||||
// flushed lines so the buffer never shows a stale partial twin. On reconnect
|
||||
// (flushPartial omitted) the partial is correctly discarded, unchanged.
|
||||
function pauseStream(flushPartial) {
|
||||
stream.active = false;
|
||||
clearReconnectTimer();
|
||||
abortInFlight();
|
||||
if (flushPartial && stream.processor) {
|
||||
const tail = stream.processor.flush();
|
||||
if (tail.length) {
|
||||
appendLines(tail);
|
||||
stream.pausedFlushCount += tail.length;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the last `count` lines from the buffer (the cosmetic lines an
|
||||
// intentional-pause flush appended), about to be re-delivered in full.
|
||||
function removeTailLines(count) {
|
||||
if (count <= 0) {
|
||||
return;
|
||||
}
|
||||
$scope.$applyAsync(function () {
|
||||
const start = Math.max(0, $scope.logs.length - count);
|
||||
$scope.logs.splice(start, count);
|
||||
});
|
||||
}
|
||||
|
||||
// Full teardown on view destroy: no flush (the view is going away).
|
||||
function stopStream() {
|
||||
pauseStream(false);
|
||||
}
|
||||
|
||||
function appendLines(lines) {
|
||||
if (!lines.length) {
|
||||
return;
|
||||
}
|
||||
$scope.$applyAsync(function () {
|
||||
Array.prototype.push.apply($scope.logs, lines);
|
||||
const overflow = $scope.logs.length - MAX_LOG_LINES;
|
||||
if (overflow > 0) {
|
||||
// trim oldest lines; tail (and any selection in it) is untouched
|
||||
$scope.logs.splice(0, overflow);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// After feeding the processor, advance the reconnect resume point to the
|
||||
// timestamp of the last line it parsed.
|
||||
function updateResumePoint(processor) {
|
||||
const ts = processor.getLastTimestamp();
|
||||
if (ts) {
|
||||
stream.lastTimestamp = ts;
|
||||
// Remember the exact boundary line(s) at this timestamp for content-exact
|
||||
// reconnect dedup (so a new line sharing the nanosecond is not dropped).
|
||||
stream.boundaryLines = processor.getBoundaryLines();
|
||||
}
|
||||
}
|
||||
|
||||
// `since` is built from the last log timestamp via rfc3339ToUnixNanoSince
|
||||
// (imported from logHelper) as a single "<seconds>.<nanos>" form — see that
|
||||
// helper for why we never mix unix-seconds and RFC3339.
|
||||
|
||||
// Connect (or reconnect) the live stream.
|
||||
// `resetBuffer` clears the on-screen buffer (used on first connect / param
|
||||
// changes); reconnects after a drop keep the buffer and resume via `since`.
|
||||
function startStream(resetBuffer) {
|
||||
pauseStream(false);
|
||||
stream.active = true;
|
||||
|
||||
if (resetBuffer) {
|
||||
$scope.logs.length = 0;
|
||||
stream.lastTimestamp = '';
|
||||
stream.boundaryLines = [];
|
||||
stream.pausedFlushCount = 0;
|
||||
stream.errorNotified = false;
|
||||
} else if (stream.pausedFlushCount) {
|
||||
// Resuming after an intentional pause: drop the cosmetic partial line(s)
|
||||
// we flushed for display; Docker re-delivers them in full from `since`.
|
||||
removeTailLines(stream.pausedFlushCount);
|
||||
stream.pausedFlushCount = 0;
|
||||
}
|
||||
|
||||
const resuming = !!stream.lastTimestamp;
|
||||
|
||||
const processor = createLogStreamProcessor({
|
||||
stripHeaders: stream.skipHeaders,
|
||||
withTimestamps: $scope.state.displayTimestamps,
|
||||
// We always request timestamps from Docker (see params below) so we can
|
||||
// resume from the exact log timestamp on reconnect; the processor parses
|
||||
// them and strips the prefix when the user has timestamps hidden.
|
||||
streamHasTimestamps: true,
|
||||
// On reconnect, drop lines Docker re-delivers at/before the resume point
|
||||
// (its `since` filter is inclusive); content-exact match on the boundary
|
||||
// line(s) so a new line sharing the boundary timestamp is not lost.
|
||||
skipUntilTimestamp: resuming ? stream.lastTimestamp : undefined,
|
||||
skipBoundaryContents: resuming ? stream.boundaryLines : undefined,
|
||||
});
|
||||
stream.processor = processor;
|
||||
|
||||
const abortController = new AbortController();
|
||||
stream.abortController = abortController;
|
||||
|
||||
// On a reconnect resume from the exact timestamp of the last line we saw;
|
||||
// on the initial connect honour the user's "Fetch since" selection. Both
|
||||
// are sent as a single "<unix>.<nanos>" form (see rfc3339ToUnixNanoSince).
|
||||
let since;
|
||||
if (stream.lastTimestamp) {
|
||||
since = rfc3339ToUnixNanoSince(stream.lastTimestamp);
|
||||
} else if ($scope.state.sinceTimestamp) {
|
||||
since = `${moment($scope.state.sinceTimestamp).unix()}.000000000`;
|
||||
} else {
|
||||
since = undefined; // omitted by the service -> Docker tails from now
|
||||
}
|
||||
|
||||
// Substitute the default when the "Lines" field is cleared/invalid (a
|
||||
// cleared number input is null), otherwise tail would be dropped and Docker
|
||||
// would re-download the whole history.
|
||||
const tailLineCount = $scope.state.lineCount > 0 ? $scope.state.lineCount : DEFAULT_LINE_COUNT;
|
||||
|
||||
const params = {
|
||||
stdout: true,
|
||||
stderr: true,
|
||||
follow: true,
|
||||
// Always request timestamps so we can resume precisely on reconnect; the
|
||||
// processor hides them from display when the user toggle is off.
|
||||
timestamps: true,
|
||||
// tail is a historical-backfill request: apply it only on the initial
|
||||
// connect. On a reconnect we resume from `since`, so re-applying tail
|
||||
// would re-deliver the tail window. (0 is dropped by the service.)
|
||||
tail: resuming ? 0 : tailLineCount,
|
||||
since,
|
||||
};
|
||||
|
||||
streamContainerLogs(
|
||||
endpoint.Id,
|
||||
$transition$.params().id,
|
||||
1,
|
||||
1,
|
||||
$scope.state.displayTimestamps ? 1 : 0,
|
||||
moment($scope.state.sinceTimestamp).unix(),
|
||||
$scope.state.lineCount,
|
||||
skipHeaders
|
||||
params,
|
||||
function onChunk(bytes) {
|
||||
const lines = processor.push(bytes);
|
||||
appendLines(lines);
|
||||
updateResumePoint(processor);
|
||||
if (lines.length) {
|
||||
// got data again -> allow a fresh error notification next failure
|
||||
stream.errorNotified = false;
|
||||
}
|
||||
},
|
||||
abortController.signal
|
||||
)
|
||||
.then(function success(data) {
|
||||
$scope.logs = data;
|
||||
setUpdateRepeater(skipHeaders);
|
||||
.then(function onEnd() {
|
||||
// The stream always reconnects from here, so discard the processor's
|
||||
// unfinished trailing remainder instead of flushing it: emitting a
|
||||
// truncated fragment AND advancing the resume point to its timestamp
|
||||
// would make Docker re-send the full line under the same `since` and
|
||||
// dedup drop it — losing the line and leaving the fragment on screen.
|
||||
// Completed lines already advanced the resume point in onChunk; `since`
|
||||
// redelivers the full boundary line on reconnect.
|
||||
scheduleReconnect();
|
||||
})
|
||||
.catch(function error(err) {
|
||||
stopRepeater();
|
||||
Notifications.error('Failure', err, 'Unable to retrieve container logs');
|
||||
.catch(function onError(err) {
|
||||
if (abortController.signal.aborted) {
|
||||
return; // intentional abort (pause/destroy/param change)
|
||||
}
|
||||
// Drop the unfinished remainder on reconnect (see onEnd): do not flush
|
||||
// a truncated line nor move the resume point off an incomplete line.
|
||||
// Notify once per reconnect loop, not on every 3s retry.
|
||||
if (!stream.errorNotified) {
|
||||
stream.errorNotified = true;
|
||||
Notifications.error('Failure', err, 'Unable to stream container logs');
|
||||
}
|
||||
scheduleReconnect();
|
||||
});
|
||||
}
|
||||
|
||||
function scheduleReconnect() {
|
||||
if (!stream.active) {
|
||||
return;
|
||||
}
|
||||
clearReconnectTimer();
|
||||
stream.reconnectTimer = $timeout(function () {
|
||||
if (stream.active) {
|
||||
startStream(false);
|
||||
}
|
||||
}, RECONNECT_DELAY_MS);
|
||||
}
|
||||
|
||||
// Restart the stream from scratch when a fetch parameter changes (tail
|
||||
// count, since, timestamps). Each replaces the buffer with a fresh request.
|
||||
function restartOnParamChange() {
|
||||
if (stream.active || stream.abortController) {
|
||||
startStream(true);
|
||||
}
|
||||
}
|
||||
$scope.$watch('state.lineCount', onParamWatch);
|
||||
$scope.$watch('state.sinceTimestamp', onParamWatch);
|
||||
$scope.$watch('state.displayTimestamps', onParamWatch);
|
||||
function onParamWatch(newVal, oldVal) {
|
||||
if (newVal !== oldVal) {
|
||||
restartOnParamChange();
|
||||
}
|
||||
}
|
||||
|
||||
function initView() {
|
||||
HttpRequestHelper.setPortainerAgentTargetHeader($transition$.params().nodeName);
|
||||
ContainerService.container(endpoint.Id, $transition$.params().id)
|
||||
@@ -90,7 +309,11 @@ angular.module('portainer.docker').controller('ContainerLogsController', [
|
||||
$scope.logsEnabled = logsEnabled;
|
||||
|
||||
if (logsEnabled) {
|
||||
startLogPolling(!container.Config.Tty);
|
||||
// initialise an (empty but defined) buffer so the viewer renders
|
||||
// immediately, then start the live stream
|
||||
$scope.logs = [];
|
||||
stream.skipHeaders = !container.Config.Tty;
|
||||
startStream(true);
|
||||
}
|
||||
})
|
||||
.catch(function error(err) {
|
||||
|
||||
@@ -38,7 +38,17 @@ angular.module('portainer.docker').controller('ServiceLogsController', [
|
||||
$scope.repeater = $interval(function () {
|
||||
ServiceService.logs($transition$.params().id, 1, 1, $scope.state.displayTimestamps ? 1 : 0, moment($scope.state.sinceTimestamp).unix(), $scope.state.lineCount)
|
||||
.then(function success(data) {
|
||||
$scope.logs = data;
|
||||
// NOTE: service logs still poll and replace the whole array. Because
|
||||
// formatLogs assigns fresh line ids per poll, `track by log.id` in
|
||||
// the viewer re-renders every row each poll (a live text selection
|
||||
// can collapse). The append-only live stream that fixes this exists
|
||||
// only for container logs (issue #2); converting service/task logs
|
||||
// to a live stream is out of scope here. Assign positionally-stable
|
||||
// ids (0..N) so `track by log.id` reuses rows across polls (like the
|
||||
// old `track by $index`) and a live text selection survives.
|
||||
$scope.logs = data.map(function (line, i) {
|
||||
return { ...line, id: i };
|
||||
});
|
||||
})
|
||||
.catch(function error(err) {
|
||||
stopRepeater();
|
||||
@@ -50,7 +60,11 @@ angular.module('portainer.docker').controller('ServiceLogsController', [
|
||||
function startLogPolling() {
|
||||
ServiceService.logs($transition$.params().id, 1, 1, $scope.state.displayTimestamps ? 1 : 0, moment($scope.state.sinceTimestamp).unix(), $scope.state.lineCount)
|
||||
.then(function success(data) {
|
||||
$scope.logs = data;
|
||||
// Positionally-stable ids so `track by log.id` reuses rows (see the
|
||||
// poll handler above).
|
||||
$scope.logs = data.map(function (line, i) {
|
||||
return { ...line, id: i };
|
||||
});
|
||||
setUpdateRepeater();
|
||||
})
|
||||
.catch(function error(err) {
|
||||
|
||||
@@ -39,7 +39,17 @@ angular.module('portainer.docker').controller('TaskLogsController', [
|
||||
$scope.repeater = $interval(function () {
|
||||
TaskService.logs($transition$.params().id, 1, 1, $scope.state.displayTimestamps ? 1 : 0, moment($scope.state.sinceTimestamp).unix(), $scope.state.lineCount)
|
||||
.then(function success(data) {
|
||||
$scope.logs = data;
|
||||
// NOTE: task logs still poll and replace the whole array. Because
|
||||
// formatLogs assigns fresh line ids per poll, `track by log.id` in
|
||||
// the viewer re-renders every row each poll (a live text selection
|
||||
// can collapse). The append-only live stream that fixes this exists
|
||||
// only for container logs (issue #2); converting service/task logs
|
||||
// to a live stream is out of scope here. Assign positionally-stable
|
||||
// ids (0..N) so `track by log.id` reuses rows across polls (like the
|
||||
// old `track by $index`) and a live text selection survives.
|
||||
$scope.logs = data.map(function (line, i) {
|
||||
return { ...line, id: i };
|
||||
});
|
||||
})
|
||||
.catch(function error(err) {
|
||||
stopRepeater();
|
||||
@@ -51,7 +61,11 @@ angular.module('portainer.docker').controller('TaskLogsController', [
|
||||
function startLogPolling() {
|
||||
TaskService.logs($transition$.params().id, 1, 1, $scope.state.displayTimestamps ? 1 : 0, moment($scope.state.sinceTimestamp).unix(), $scope.state.lineCount)
|
||||
.then(function success(data) {
|
||||
$scope.logs = data;
|
||||
// Positionally-stable ids so `track by log.id` reuses rows (see the
|
||||
// poll handler above).
|
||||
$scope.logs = data.map(function (line, i) {
|
||||
return { ...line, id: i };
|
||||
});
|
||||
setUpdateRepeater();
|
||||
})
|
||||
.catch(function error(err) {
|
||||
|
||||
@@ -1,8 +1,17 @@
|
||||
import _ from 'lodash';
|
||||
import { InternalAxiosRequestConfig } from 'axios';
|
||||
|
||||
import { EnvironmentId } from '@/react/portainer/environments/types';
|
||||
import PortainerError from '@/portainer/error';
|
||||
import axios, { parseAxiosError } from '@/portainer/services/axios/axios';
|
||||
import axios, {
|
||||
agentTargetHeader,
|
||||
parseAxiosError,
|
||||
} from '@/portainer/services/axios/axios';
|
||||
import {
|
||||
portainerAgentManagerOperation,
|
||||
portainerAgentTargetHeader,
|
||||
} from '@/portainer/services/http-request.helper';
|
||||
import { dockerMaxAPIVersionInterceptor } from '@/portainer/services/dockerMaxApiVersionInterceptor';
|
||||
|
||||
import { withAgentTargetHeader } from '../proxy/queries/utils';
|
||||
import { buildDockerProxyUrl } from '../proxy/queries/buildDockerProxyUrl';
|
||||
@@ -190,3 +199,124 @@ export async function getContainerLogs(
|
||||
throw parseAxiosError(e, 'Unable to get container logs');
|
||||
}
|
||||
}
|
||||
|
||||
export type StreamLogsParams = ContainerLogsParams & {
|
||||
/** follow=1 — keep the connection open and stream new lines as they arrive */
|
||||
follow?: boolean;
|
||||
};
|
||||
|
||||
// Memoize the Docker max-API-version pinning per proxy path for the session. The
|
||||
// pinning interceptor only rewrites the URL based on the environment's Docker API
|
||||
// version (cached and stable for the session), so resolving it once avoids an
|
||||
// extra `/version` round-trip on every reconnect. Caching the Promise also
|
||||
// dedupes concurrent reconnect attempts.
|
||||
const pinnedLogPathCache = new Map<string, Promise<string>>();
|
||||
|
||||
function resolvePinnedLogPath(path: string): Promise<string> {
|
||||
let cached = pinnedLogPathCache.get(path);
|
||||
if (!cached) {
|
||||
cached = dockerMaxAPIVersionInterceptor({
|
||||
url: path,
|
||||
} as InternalAxiosRequestConfig).then((config) => config.url ?? path);
|
||||
pinnedLogPathCache.set(path, cached);
|
||||
}
|
||||
return cached;
|
||||
}
|
||||
|
||||
/**
|
||||
* Live-tail a container's logs over HTTP.
|
||||
*
|
||||
* Unlike `getContainerLogs` (axios, buffers the whole body), this uses `fetch`
|
||||
* so we can read the response body as a `ReadableStream` and surface the raw
|
||||
* byte chunks to the caller as they arrive (`follow=1`). Bytes are handed over
|
||||
* undecoded so the caller can demux Docker's binary multiplexed frames at the
|
||||
* byte level (UTF-8-decoding the whole stream would corrupt frame headers). The
|
||||
* backend already
|
||||
* streams `follow=1` transparently through the Docker proxy — including for
|
||||
* Agent/Edge environments — so no backend change is needed.
|
||||
*
|
||||
* Auth: the API uses an httpOnly JWT cookie (SameSite=Strict), so a same-origin
|
||||
* `fetch` with `credentials: 'include'` carries it automatically — no
|
||||
* `Authorization` header. CSRF middleware only guards mutating methods; logs is
|
||||
* a GET, so it is unaffected. The agent-target / manager-operation headers
|
||||
* (normally added by the axios `agentInterceptor`) are replicated here so the
|
||||
* stream also resolves the correct node on Agent/Edge environments.
|
||||
*
|
||||
* The caller drives lifetime via `signal`: aborting it (unmount, container
|
||||
* switch, pausing Live) cancels the in-flight fetch and ends the read loop.
|
||||
*/
|
||||
export async function streamContainerLogs(
|
||||
environmentId: EnvironmentId,
|
||||
containerId: ContainerId,
|
||||
params: StreamLogsParams,
|
||||
onChunk: (bytes: Uint8Array) => void,
|
||||
signal: AbortSignal
|
||||
): Promise<void> {
|
||||
const path = buildDockerProxyUrl(
|
||||
environmentId,
|
||||
'containers',
|
||||
containerId,
|
||||
'logs'
|
||||
);
|
||||
|
||||
// The fetch path bypasses the axios request interceptors, so apply the same
|
||||
// Docker max-API-version pinning axios applies to getContainerLogs. Resolve it
|
||||
// once per session (memoized below) instead of hitting `/version` on every 3s
|
||||
// reconnect — the pinning only depends on the environment's Docker API version,
|
||||
// which is stable for the session.
|
||||
const effectivePath = await resolvePinnedLogPath(path);
|
||||
|
||||
const query = new URLSearchParams();
|
||||
// _.pickBy drops undefined/0/'' the same way the axios path does
|
||||
Object.entries(_.pickBy(params)).forEach(([key, value]) => {
|
||||
query.set(key, String(typeof value === 'boolean' ? Number(value) : value));
|
||||
});
|
||||
|
||||
// axios baseURL is 'api'; mirror it here since fetch doesn't share it.
|
||||
const url = `api${effectivePath}?${query.toString()}`;
|
||||
|
||||
const headers: Record<string, string> = {};
|
||||
const target = portainerAgentTargetHeader();
|
||||
if (target) {
|
||||
headers[agentTargetHeader] = target;
|
||||
}
|
||||
if (portainerAgentManagerOperation()) {
|
||||
headers['X-PortainerAgent-ManagerOperation'] = '1';
|
||||
}
|
||||
|
||||
const response = await fetch(url, {
|
||||
signal,
|
||||
credentials: 'include',
|
||||
headers,
|
||||
});
|
||||
|
||||
if (!response.ok) {
|
||||
throw new PortainerError(
|
||||
`Unable to stream container logs (HTTP ${response.status})`
|
||||
);
|
||||
}
|
||||
|
||||
if (!response.body) {
|
||||
return;
|
||||
}
|
||||
|
||||
const reader = response.body.getReader();
|
||||
|
||||
try {
|
||||
for (;;) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) {
|
||||
break;
|
||||
}
|
||||
if (value) {
|
||||
// Hand over the raw bytes undecoded; the caller demuxes Docker's binary
|
||||
// frames and decodes complete lines itself. UTF-8 chars split across
|
||||
// chunks are reassembled there before decoding, so no decoder flush is
|
||||
// needed here. The caller flushes its own trailing partial line on end.
|
||||
onChunk(value);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
reader.releaseLock();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,6 +54,9 @@ export type ContainerLogsParams = {
|
||||
stdout?: boolean;
|
||||
stderr?: boolean;
|
||||
timestamps?: boolean;
|
||||
since?: number;
|
||||
// Unix timestamp. The live-stream path sends a "<seconds>.<nanos>" string so a
|
||||
// reconnect can resume at exact nanosecond precision (which a number cannot
|
||||
// hold); the buffered axios path passes a plain number.
|
||||
since?: number | string;
|
||||
tail?: number;
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user