fix(logs): drop partial line on reconnect, stabilize poll row ids, cover CRLF/dedup (F1-F6)

F1: stop emitting/committing an unfinished line in onEnd/onError reconnect
    paths; since-based reconnect redelivers the full line.
F2: give service/task poll rows positionally-stable ids so track by log.id
    reuses DOM rows and text selection survives the 3s poll.
F3/F4: tests for CRLF stripping and reconnect-dedup across separate chunks.
F5: correct the stale refreshRate comment.
F6: unroll the side-effecting IIFE-in-ternary into if/else.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
claude code agent
2026-06-29 14:08:46 +03:00
parent 960d43e70b
commit f4f296fc05
5 changed files with 93 additions and 20 deletions

View File

@@ -112,6 +112,13 @@ describe('createLogStreamProcessor (non-TTY, byte-level frame demux)', () => {
expect(proc.push(full.subarray(0, 10)).map((l) => l.line)).toEqual([]);
expect(proc.flush().map((l) => l.line)).toEqual([]);
});
// F3: a framed line terminated by \r\n must have its trailing CR stripped.
it('strips the trailing CR from a \\r\\n line ending (framed)', () => {
const proc = createLogStreamProcessor({ stripHeaders: true });
const lines = proc.push(frame('with-crlf\r\n'));
expect(lines.map((l) => l.line)).toEqual(['with-crlf']);
});
});
describe('createLogStreamProcessor (TTY, no headers)', () => {
@@ -128,6 +135,22 @@ describe('createLogStreamProcessor (TTY, no headers)', () => {
expect(proc.push(bytes('tail')).map((l) => l.line)).toEqual([]);
expect(proc.flush().map((l) => l.line)).toEqual(['tail']);
});
// F3: a \r\n line ending must have its trailing CR stripped (non-TTY path).
it('strips the trailing CR from a \\r\\n line ending', () => {
const proc = createLogStreamProcessor({ stripHeaders: false });
const lines = proc.push(bytes('with-crlf\r\n'));
expect(lines.map((l) => l.line)).toEqual(['with-crlf']);
});
// F3: a remainder carrying a trailing CR (no terminating LF) must have the CR
// stripped on flush. Split across two chunks to exercise the buffered path.
it('strips a trailing CR on flush when the stream ends without a newline', () => {
const proc = createLogStreamProcessor({ stripHeaders: false });
expect(proc.push(bytes('hel')).map((l) => l.line)).toEqual([]);
expect(proc.push(bytes('lo\r')).map((l) => l.line)).toEqual([]);
expect(proc.flush().map((l) => l.line)).toEqual(['hello']);
});
});
describe('timestamps (always requested, optionally displayed)', () => {
@@ -166,6 +189,36 @@ describe('timestamps (always requested, optionally displayed)', () => {
const lines = proc.push(bytes(`${ts1} old\n${ts2} new\n`));
expect(lines.map((l) => l.line)).toEqual(['new']);
});
// F4: the real reconnect shape — the redelivered boundary line arrives in its
// OWN chunk (the whole batch is dropped, dropTo === lines.length), so the
// `skipping` flag must stay on and the NEXT chunk must keep dropping dups up
// to the resume point before emitting the first genuinely new line.
it('keeps dropping reconnect dups when the redelivered line arrives in its own chunk', () => {
const tsA = '2024-01-01T00:00:00.000000001Z';
const tsB = '2024-01-01T00:00:00.000000002Z';
const tsC = '2024-01-01T00:00:00.000000003Z';
const tsD = '2024-01-01T00:00:00.000000004Z';
const proc = createLogStreamProcessor({
stripHeaders: false,
withTimestamps: false,
streamHasTimestamps: true,
skipUntilTimestamp: tsB,
});
// (1) only a redelivered boundary line (<= resume point) -> fully consumed
// by dedup; nothing emitted and skipping stays on.
expect(proc.push(bytes(`${tsA} dup-1\n`)).map((l) => l.line)).toEqual([]);
// (2) another dup + the first new line: because skipping was preserved the
// dup is still dropped, and only the new line is returned.
expect(
proc.push(bytes(`${tsB} dup-2\n${tsC} new\n`)).map((l) => l.line)
).toEqual(['new']);
// (3) a later line passes through untouched (no skipping anymore).
expect(proc.push(bytes(`${tsD} after\n`)).map((l) => l.line)).toEqual(['after']);
});
});
describe('stable line ids (append model)', () => {

View File

@@ -228,12 +228,13 @@ export function createLogStreamProcessor({
if (!value || value.length === 0) {
return [];
}
const rawLines = stripHeaders
? (() => {
frameBuf = concatBytes(frameBuf, value);
return demuxFrames();
})()
: takeCompleteLines(value);
let rawLines: string[];
if (stripHeaders) {
frameBuf = concatBytes(frameBuf, value);
rawLines = demuxFrames();
} else {
rawLines = takeCompleteLines(value);
}
return formatBatch(rawLines);
},

View File

@@ -22,8 +22,8 @@ angular.module('portainer.docker').controller('ContainerLogsController', [
'endpoint',
function ($scope, $transition$, $timeout, ContainerService, Notifications, HttpRequestHelper, endpoint) {
$scope.state = {
// refreshRate kept for backwards compat with any external references; the
// transport is now a live stream, not a poll, so it is unused.
// No refreshRate here: container logs are delivered over a live stream now,
// not a 3s poll, so there is nothing to refresh on an interval.
lineCount: 100,
sinceTimestamp: '',
displayTimestamps: false,
@@ -175,18 +175,21 @@ angular.module('portainer.docker').controller('ContainerLogsController', [
abortController.signal
)
.then(function onEnd() {
appendLines(processor.flush());
updateResumePoint(processor);
// The stream always reconnects from here, so discard the processor's
// unfinished trailing remainder instead of flushing it: emitting a
// truncated fragment AND advancing the resume point to its timestamp
// would make Docker re-send the full line under the same `since` and
// dedup drop it — losing the line and leaving the fragment on screen.
// Completed lines already advanced the resume point in onChunk; `since`
// redelivers the full boundary line on reconnect.
scheduleReconnect();
})
.catch(function onError(err) {
if (abortController.signal.aborted) {
return; // intentional abort (pause/destroy/param change)
}
// Flush the trailing partial line before reconnecting (parity with
// onEnd); a truncated final frame is dropped by the processor.
appendLines(processor.flush());
updateResumePoint(processor);
// Drop the unfinished remainder on reconnect (see onEnd): do not flush
// a truncated line nor move the resume point off an incomplete line.
// Notify once per reconnect loop, not on every 3s retry.
if (!stream.errorNotified) {
stream.errorNotified = true;

View File

@@ -43,8 +43,12 @@ angular.module('portainer.docker').controller('ServiceLogsController', [
// the viewer re-renders every row each poll (a live text selection
// can collapse). The append-only live stream that fixes this exists
// only for container logs (issue #2); converting service/task logs
// to a live stream is out of scope here.
$scope.logs = data;
// to a live stream is out of scope here. Assign positionally-stable
// ids (0..N) so `track by log.id` reuses rows across polls (like the
// old `track by $index`) and a live text selection survives.
$scope.logs = data.map(function (line, i) {
return { ...line, id: i };
});
})
.catch(function error(err) {
stopRepeater();
@@ -56,7 +60,11 @@ angular.module('portainer.docker').controller('ServiceLogsController', [
function startLogPolling() {
ServiceService.logs($transition$.params().id, 1, 1, $scope.state.displayTimestamps ? 1 : 0, moment($scope.state.sinceTimestamp).unix(), $scope.state.lineCount)
.then(function success(data) {
$scope.logs = data;
// Positionally-stable ids so `track by log.id` reuses rows (see the
// poll handler above).
$scope.logs = data.map(function (line, i) {
return { ...line, id: i };
});
setUpdateRepeater();
})
.catch(function error(err) {

View File

@@ -44,8 +44,12 @@ angular.module('portainer.docker').controller('TaskLogsController', [
// the viewer re-renders every row each poll (a live text selection
// can collapse). The append-only live stream that fixes this exists
// only for container logs (issue #2); converting service/task logs
// to a live stream is out of scope here.
$scope.logs = data;
// to a live stream is out of scope here. Assign positionally-stable
// ids (0..N) so `track by log.id` reuses rows across polls (like the
// old `track by $index`) and a live text selection survives.
$scope.logs = data.map(function (line, i) {
return { ...line, id: i };
});
})
.catch(function error(err) {
stopRepeater();
@@ -57,7 +61,11 @@ angular.module('portainer.docker').controller('TaskLogsController', [
function startLogPolling() {
TaskService.logs($transition$.params().id, 1, 1, $scope.state.displayTimestamps ? 1 : 0, moment($scope.state.sinceTimestamp).unix(), $scope.state.lineCount)
.then(function success(data) {
$scope.logs = data;
// Positionally-stable ids so `track by log.id` reuses rows (see the
// poll handler above).
$scope.logs = data.map(function (line, i) {
return { ...line, id: i };
});
setUpdateRepeater();
})
.catch(function error(err) {