fix(logs): drop partial line on reconnect, stabilize poll row ids, cover CRLF/dedup (F1-F6)
F1: stop emitting/committing an unfinished line in onEnd/onError reconnect
paths; since-based reconnect redelivers the full line.
F2: give service/task poll rows positionally-stable ids so track by log.id
reuses DOM rows and text selection survives the 3s poll.
F3/F4: tests for CRLF stripping and reconnect-dedup across separate chunks.
F5: correct the stale refreshRate comment.
F6: unroll the side-effecting IIFE-in-ternary into if/else.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -112,6 +112,13 @@ describe('createLogStreamProcessor (non-TTY, byte-level frame demux)', () => {
|
||||
expect(proc.push(full.subarray(0, 10)).map((l) => l.line)).toEqual([]);
|
||||
expect(proc.flush().map((l) => l.line)).toEqual([]);
|
||||
});
|
||||
|
||||
// F3: a framed line terminated by \r\n must have its trailing CR stripped.
|
||||
it('strips the trailing CR from a \\r\\n line ending (framed)', () => {
|
||||
const proc = createLogStreamProcessor({ stripHeaders: true });
|
||||
const lines = proc.push(frame('with-crlf\r\n'));
|
||||
expect(lines.map((l) => l.line)).toEqual(['with-crlf']);
|
||||
});
|
||||
});
|
||||
|
||||
describe('createLogStreamProcessor (TTY, no headers)', () => {
|
||||
@@ -128,6 +135,22 @@ describe('createLogStreamProcessor (TTY, no headers)', () => {
|
||||
expect(proc.push(bytes('tail')).map((l) => l.line)).toEqual([]);
|
||||
expect(proc.flush().map((l) => l.line)).toEqual(['tail']);
|
||||
});
|
||||
|
||||
// F3: a \r\n line ending must have its trailing CR stripped (non-TTY path).
|
||||
it('strips the trailing CR from a \\r\\n line ending', () => {
|
||||
const proc = createLogStreamProcessor({ stripHeaders: false });
|
||||
const lines = proc.push(bytes('with-crlf\r\n'));
|
||||
expect(lines.map((l) => l.line)).toEqual(['with-crlf']);
|
||||
});
|
||||
|
||||
// F3: a remainder carrying a trailing CR (no terminating LF) must have the CR
|
||||
// stripped on flush. Split across two chunks to exercise the buffered path.
|
||||
it('strips a trailing CR on flush when the stream ends without a newline', () => {
|
||||
const proc = createLogStreamProcessor({ stripHeaders: false });
|
||||
expect(proc.push(bytes('hel')).map((l) => l.line)).toEqual([]);
|
||||
expect(proc.push(bytes('lo\r')).map((l) => l.line)).toEqual([]);
|
||||
expect(proc.flush().map((l) => l.line)).toEqual(['hello']);
|
||||
});
|
||||
});
|
||||
|
||||
describe('timestamps (always requested, optionally displayed)', () => {
|
||||
@@ -166,6 +189,36 @@ describe('timestamps (always requested, optionally displayed)', () => {
|
||||
const lines = proc.push(bytes(`${ts1} old\n${ts2} new\n`));
|
||||
expect(lines.map((l) => l.line)).toEqual(['new']);
|
||||
});
|
||||
|
||||
// F4: the real reconnect shape — the redelivered boundary line arrives in its
|
||||
// OWN chunk (the whole batch is dropped, dropTo === lines.length), so the
|
||||
// `skipping` flag must stay on and the NEXT chunk must keep dropping dups up
|
||||
// to the resume point before emitting the first genuinely new line.
|
||||
it('keeps dropping reconnect dups when the redelivered line arrives in its own chunk', () => {
|
||||
const tsA = '2024-01-01T00:00:00.000000001Z';
|
||||
const tsB = '2024-01-01T00:00:00.000000002Z';
|
||||
const tsC = '2024-01-01T00:00:00.000000003Z';
|
||||
const tsD = '2024-01-01T00:00:00.000000004Z';
|
||||
const proc = createLogStreamProcessor({
|
||||
stripHeaders: false,
|
||||
withTimestamps: false,
|
||||
streamHasTimestamps: true,
|
||||
skipUntilTimestamp: tsB,
|
||||
});
|
||||
|
||||
// (1) only a redelivered boundary line (<= resume point) -> fully consumed
|
||||
// by dedup; nothing emitted and skipping stays on.
|
||||
expect(proc.push(bytes(`${tsA} dup-1\n`)).map((l) => l.line)).toEqual([]);
|
||||
|
||||
// (2) another dup + the first new line: because skipping was preserved the
|
||||
// dup is still dropped, and only the new line is returned.
|
||||
expect(
|
||||
proc.push(bytes(`${tsB} dup-2\n${tsC} new\n`)).map((l) => l.line)
|
||||
).toEqual(['new']);
|
||||
|
||||
// (3) a later line passes through untouched (no skipping anymore).
|
||||
expect(proc.push(bytes(`${tsD} after\n`)).map((l) => l.line)).toEqual(['after']);
|
||||
});
|
||||
});
|
||||
|
||||
describe('stable line ids (append model)', () => {
|
||||
|
||||
@@ -228,12 +228,13 @@ export function createLogStreamProcessor({
|
||||
if (!value || value.length === 0) {
|
||||
return [];
|
||||
}
|
||||
const rawLines = stripHeaders
|
||||
? (() => {
|
||||
frameBuf = concatBytes(frameBuf, value);
|
||||
return demuxFrames();
|
||||
})()
|
||||
: takeCompleteLines(value);
|
||||
let rawLines: string[];
|
||||
if (stripHeaders) {
|
||||
frameBuf = concatBytes(frameBuf, value);
|
||||
rawLines = demuxFrames();
|
||||
} else {
|
||||
rawLines = takeCompleteLines(value);
|
||||
}
|
||||
return formatBatch(rawLines);
|
||||
},
|
||||
|
||||
|
||||
@@ -22,8 +22,8 @@ angular.module('portainer.docker').controller('ContainerLogsController', [
|
||||
'endpoint',
|
||||
function ($scope, $transition$, $timeout, ContainerService, Notifications, HttpRequestHelper, endpoint) {
|
||||
$scope.state = {
|
||||
// refreshRate kept for backwards compat with any external references; the
|
||||
// transport is now a live stream, not a poll, so it is unused.
|
||||
// No refreshRate here: container logs are delivered over a live stream now,
|
||||
// not a 3s poll, so there is nothing to refresh on an interval.
|
||||
lineCount: 100,
|
||||
sinceTimestamp: '',
|
||||
displayTimestamps: false,
|
||||
@@ -175,18 +175,21 @@ angular.module('portainer.docker').controller('ContainerLogsController', [
|
||||
abortController.signal
|
||||
)
|
||||
.then(function onEnd() {
|
||||
appendLines(processor.flush());
|
||||
updateResumePoint(processor);
|
||||
// The stream always reconnects from here, so discard the processor's
|
||||
// unfinished trailing remainder instead of flushing it: emitting a
|
||||
// truncated fragment AND advancing the resume point to its timestamp
|
||||
// would make Docker re-send the full line under the same `since` and
|
||||
// dedup drop it — losing the line and leaving the fragment on screen.
|
||||
// Completed lines already advanced the resume point in onChunk; `since`
|
||||
// redelivers the full boundary line on reconnect.
|
||||
scheduleReconnect();
|
||||
})
|
||||
.catch(function onError(err) {
|
||||
if (abortController.signal.aborted) {
|
||||
return; // intentional abort (pause/destroy/param change)
|
||||
}
|
||||
// Flush the trailing partial line before reconnecting (parity with
|
||||
// onEnd); a truncated final frame is dropped by the processor.
|
||||
appendLines(processor.flush());
|
||||
updateResumePoint(processor);
|
||||
// Drop the unfinished remainder on reconnect (see onEnd): do not flush
|
||||
// a truncated line nor move the resume point off an incomplete line.
|
||||
// Notify once per reconnect loop, not on every 3s retry.
|
||||
if (!stream.errorNotified) {
|
||||
stream.errorNotified = true;
|
||||
|
||||
@@ -43,8 +43,12 @@ angular.module('portainer.docker').controller('ServiceLogsController', [
|
||||
// the viewer re-renders every row each poll (a live text selection
|
||||
// can collapse). The append-only live stream that fixes this exists
|
||||
// only for container logs (issue #2); converting service/task logs
|
||||
// to a live stream is out of scope here.
|
||||
$scope.logs = data;
|
||||
// to a live stream is out of scope here. Assign positionally-stable
|
||||
// ids (0..N) so `track by log.id` reuses rows across polls (like the
|
||||
// old `track by $index`) and a live text selection survives.
|
||||
$scope.logs = data.map(function (line, i) {
|
||||
return { ...line, id: i };
|
||||
});
|
||||
})
|
||||
.catch(function error(err) {
|
||||
stopRepeater();
|
||||
@@ -56,7 +60,11 @@ angular.module('portainer.docker').controller('ServiceLogsController', [
|
||||
function startLogPolling() {
|
||||
ServiceService.logs($transition$.params().id, 1, 1, $scope.state.displayTimestamps ? 1 : 0, moment($scope.state.sinceTimestamp).unix(), $scope.state.lineCount)
|
||||
.then(function success(data) {
|
||||
$scope.logs = data;
|
||||
// Positionally-stable ids so `track by log.id` reuses rows (see the
|
||||
// poll handler above).
|
||||
$scope.logs = data.map(function (line, i) {
|
||||
return { ...line, id: i };
|
||||
});
|
||||
setUpdateRepeater();
|
||||
})
|
||||
.catch(function error(err) {
|
||||
|
||||
@@ -44,8 +44,12 @@ angular.module('portainer.docker').controller('TaskLogsController', [
|
||||
// the viewer re-renders every row each poll (a live text selection
|
||||
// can collapse). The append-only live stream that fixes this exists
|
||||
// only for container logs (issue #2); converting service/task logs
|
||||
// to a live stream is out of scope here.
|
||||
$scope.logs = data;
|
||||
// to a live stream is out of scope here. Assign positionally-stable
|
||||
// ids (0..N) so `track by log.id` reuses rows across polls (like the
|
||||
// old `track by $index`) and a live text selection survives.
|
||||
$scope.logs = data.map(function (line, i) {
|
||||
return { ...line, id: i };
|
||||
});
|
||||
})
|
||||
.catch(function error(err) {
|
||||
stopRepeater();
|
||||
@@ -57,7 +61,11 @@ angular.module('portainer.docker').controller('TaskLogsController', [
|
||||
function startLogPolling() {
|
||||
TaskService.logs($transition$.params().id, 1, 1, $scope.state.displayTimestamps ? 1 : 0, moment($scope.state.sinceTimestamp).unix(), $scope.state.lineCount)
|
||||
.then(function success(data) {
|
||||
$scope.logs = data;
|
||||
// Positionally-stable ids so `track by log.id` reuses rows (see the
|
||||
// poll handler above).
|
||||
$scope.logs = data.map(function (line, i) {
|
||||
return { ...line, id: i };
|
||||
});
|
||||
setUpdateRepeater();
|
||||
})
|
||||
.catch(function error(err) {
|
||||
|
||||
Reference in New Issue
Block a user