Compare commits

..

11 Commits

Author SHA1 Message Date
claude code agent
7257ae52d8 test(logs): cover the docker proxy stream/flush loop (F1)
Extract the manual stream-and-flush loop from dockerLocalProxy.ServeHTTP
into a behaviour-preserving package-private streamResponse(w, body) helper,
and add docker_test.go regression tests for the riskiest path (it runs on
every Docker API response):

- DeliversFullBodyAndFlushesPerChunk: a >32KB body delivered as several
  chunks (boundaries not aligned to the 32KB buffer), with the final Read
  returning (n>0, io.EOF) simultaneously, asserts the streamed body equals
  the input exactly (no loss/duplication) and that Flush ran more than once
  (the per-chunk flush is the whole point of the change).
- StopsOnWriteErrorWithoutPanic: a writer that errors on first Write (and
  does not implement http.Flusher, exercising the nil-flusher fallback)
  breaks the loop after one write without panicking.

No production behaviour change — the loop body is identical, only moved.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 02:58:48 +03:00
claude code agent
637e96f236 fix(logs): flush docker proxy stream per chunk; trim log-viewer settings UI
Backend (the "logs arrive every ~5s / pipe clogged" bug):
- dockerLocalProxy.ServeHTTP streamed the docker socket response via
  io.Copy, which buffers ~2KB into the ResponseWriter and only flushes
  when full or on handler return. Low-throughput streaming endpoints
  (container logs follow=1, events, stats, attach) therefore arrived in
  multi-second batches. Stream manually and Flush() after each chunk so
  they are delivered live. Behaviour is otherwise identical to io.Copy
  (full-write contract, EOF handling, Debug error logging); hijacked
  attach/exec go through a separate websocket handler, unaffected.
- NewSingleHostReverseProxyWithHostHeader: set FlushInterval = -1 so the
  remote-endpoint path streams live too.

Frontend (maintainer UI asks):
- Remove the line-selection mechanic entirely (Copy-selected-lines and
  Unselect buttons, selectLine/copySelection/clearSelection, selectedLines
  state, line_selected highlight): selecting/copying is mouse-native. Copy
  (all visible) and Download stay.
- Rename the unclear "Fetch" since-selector label to "Since".
- Move the settings controls into the widget header (rd-widget-header
  default transclude slot) so they share one row with the "Log viewer
  settings" title, reclaiming vertical space for the log pane.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 02:05:02 +03:00
claude code agent
343d36834a feat(logs): drop the auto-refresh toggle (always live) and compact settings to one row
Per maintainer request: remove the 'Auto-refresh logs' toggle entirely — logs are
now always collected (container always streams, service/task always poll). Drops
state.logCollection and its whole cascade (handleLogsCollectionChange, the
logCollectionChange binding, changeLogCollection in all three view controllers,
the log-collection-change attribute) and the now-dead manual flush-on-pause
machinery (pausedFlushCount / removeTailLines / the flush branch); pauseStream is
kept for $destroy/reconnect teardown, and the stream/poll start unconditionally.
Collapse the seven stacked settings rows into a single compact flex row
(wrap-lines, timestamps, fetch, lines, search, actions) — bindings unchanged.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-30 00:14:04 +03:00
claude code agent
8ad42a1a45 fix(logs): seed reconnect boundary state from resume params; extract+test since parser (F10,F11)
F10: the content-exact reconnect dedup rebuilt boundaryLines only from lines
     surviving the current batch, so after one reconnect at a shared timestamp it
     forgot the dropped line — a SECOND reconnect at the same nanosecond then
     re-emitted both as duplicates. Seed lastTimestamp/boundaryLines from
     skipUntilTimestamp/skipBoundaryContents so the boundary set accumulates all
     lines ever seen at the resume ts. Regression test (fails before, passes after).
F11: extract rfc3339ToUnixNanoSince into a testable logHelper module (sinceTimestamp.ts)
     and cover it (standard ns, no fraction, sub-9 pad, >9 truncate); the controller
     imports the single shared function.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 20:42:32 +03:00
claude code agent
9768d7bb99 fix(logs): content-exact reconnect dedup; flush partial on pause; unify since; lines fallback (F1-F5)
Maintainer pre-merge review follow-up:
F1: dedup reconnect redeliveries by EXACT boundary-line content, not just
    timestamp <= resume — a new line that merely shares the boundary nanosecond
    with a redelivered duplicate is no longer dropped (skipBoundaryContents +
    pendingBoundary). Test proves line B survives while a real dup is dropped.
F2: flush the buffered partial line on intentional pause (not reconnect) and
    strip those cosmetic lines on resume so since re-delivers the full line with
    no stale-partial twin; resume point is not advanced past the partial.
F3: unify the since param to <unix>.<nanos> for initial and reconnect.
F4: fall back to 100 lines when the Lines field is cleared (avoid tail=all).
F5: memoize the API-version pin per session; warn on frame desync.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 20:05:18 +03:00
claude code agent
da6933c218 refactor(logs): collapse no-op ternary, drop speculative export, fix stale comment (F8,F9,F10)
F8: formatJSONLogs plain-text fallback — both arms of `withTimestamps ? rawText
    : text` yield rawText (text === rawText when !withTimestamps), so use rawText.
F9: controllerLogsController comment referenced the old 'Live logs' label removed
    by F7 — update it to 'Auto-refresh logs'.
F10: stripHeadersFunc has no external importers — drop the speculative export.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 15:37:37 +03:00
claude code agent
d520aec159 docs(logs): neutral auto-refresh toggle label/tooltip in shared log viewer (F7)
logViewer.html is shared by the container, service and task log views, but only
the container view is a live HTTP stream — service/task still poll. Revert the
toggle wording to a mode-neutral 'Auto-refresh logs' / 'pauses log collection'
so it is accurate for both, keeping the added auto-scroll clarification.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 14:42:19 +03:00
claude code agent
f4f296fc05 fix(logs): drop partial line on reconnect, stabilize poll row ids, cover CRLF/dedup (F1-F6)
F1: stop emitting/committing an unfinished line in onEnd/onError reconnect
    paths; since-based reconnect redelivers the full line.
F2: give service/task poll rows positionally-stable ids so track by log.id
    reuses DOM rows and text selection survives the 3s poll.
F3/F4: tests for CRLF stripping and reconnect-dedup across separate chunks.
F5: correct the stale refreshRate comment.
F6: unroll the side-effecting IIFE-in-ternary into if/else.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 14:08:46 +03:00
claude code agent
960d43e70b fix(logs): byte-level frame demux + reconnect/since + notify throttle (F1-F8)
Container log live-stream review fixes (frontend only):

- F1/F2: demux Docker's multiplexed (non-TTY) stream at the BYTE level by
  frame length, decoding only payloads. Previously the stream was text-decoded
  whole and cut on '\n' before stripping 8-byte headers, which desynced when a
  length low-byte was 0x0a or a header byte was >= 0x80. streamContainerLogs
  now hands the processor raw Uint8Array chunks; createLogStreamProcessor is
  rewritten to parse frames, concatenate payloads, split lines on 0x0a, and
  UTF-8-decode complete lines. formatLogs is called without stripHeaders so
  headers are not stripped twice. Added explicit byte-frame tests.
- F3: request timestamps=1 internally and resume reconnects from the parsed
  RFC3339 timestamp of the last line (not client wall-clock); strip the prefix
  before display when the user's timestamps toggle is off; dedup the inclusive
  `since` boundary lines on reconnect.
- F4: run the fetch stream URL through dockerMaxAPIVersionInterceptor so it
  matches the axios getContainerLogs version pinning.
- F5: notify on stream error once per reconnect loop, not every 3s retry.
- F6: resuming Live no longer wipes the buffer (startStream(false)) and
  continues from `since`.
- F7: service/task logs still poll; documented the re-render limitation
  (out of scope: issue #2 is container logs).
- F8: flush the trailing partial line on the error path too (parity with onEnd).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 07:32:16 +03:00
claude code agent
c3cdb8007e feat(logs): live HTTP stream + append-only render for container logs
Replace the 3s $interval polling of container logs with a live HTTP
stream, and stop re-writing already-rendered lines (fixes selection bug).

- streamContainerLogs (containers.service.ts): fetch + ReadableStream
  reader with follow=1, same-origin credentials:'include' (httpOnly JWT
  cookie; CSRF only guards mutations), agent-target / manager-operation
  headers replicated for Agent/Edge, AbortSignal-driven lifetime.
- containerLogsController: stream instead of poll; append parsed lines
  into the buffer (push, never replace), cap at 5000 lines trimming from
  the head; AbortController on pause/destroy/param-change; reconnect with
  3s backoff resuming from `since` (dropping tail) on stream end/error;
  Live toggle pauses/resumes the stream; tail/since/timestamps changes
  restart the stream.
- log-viewer: `track by log.id` (was $index), filtering moved out of the
  template into the controller (applyFilter via $watchCollection), removed
  inert force-glue, decoupled auto-scroll from log collection, relabelled
  "Auto-refresh logs" -> "Live logs", clearer empty states.

Backend unchanged (logs already stream transparently through the Docker
proxy). Shared task/service log views keep working via the new id'd lines.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 07:03:23 +03:00
claude code agent
8ca0608b21 fix(logs): stable line ids, JSON string guard, streaming demuxer helper
Foundation for append-only log rendering and HTTP log streaming.

- FormattedLine gains a stable, monotonically increasing `id` (new
  lineId.ts sequence), assigned centrally in formatLogs. Internal
  formatters now return id-less FormattedLineContent. This lets the
  viewer use `track by log.id` so already-rendered rows are never
  re-bound (fixes the text-selection collapse).
- formatJSONLine: runtime guard so a bare JSON string/array log line
  falls back to plain text instead of rendering Object.keys as
  `0=h 1=e ...`.
- createLogStreamProcessor: stateful demuxer that buffers streamed text,
  emits only complete lines (carrying the partial remainder), and reuses
  formatLogs/stripHeadersFunc for Docker 8-byte frame demux.
- Unit tests for the demuxer, stable-id assignment and the JSON guard.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 07:03:10 +03:00
27 changed files with 1453 additions and 180 deletions

View File

@@ -124,7 +124,38 @@ func (proxy *dockerLocalProxy) ServeHTTP(w http.ResponseWriter, r *http.Request)
w.WriteHeader(res.StatusCode)
if _, err := io.Copy(w, res.Body); err != nil {
log.Debug().Err(err).Msg("proxy error")
streamResponse(w, res.Body)
}
// streamResponse copies body to w, flushing after every chunk instead of using
// io.Copy. io.Copy buffers into the http.ResponseWriter (~2KB) and only flushes
// when the buffer fills or the handler returns, so low-throughput streaming
// docker responses (container logs follow, events, stats, attach) would arrive
// in multi-second batches instead of live; flushing per chunk delivers them as
// they are produced. Behaviour otherwise matches io.Copy: the n>0 chunk is
// written BEFORE the readErr check so a simultaneous Read -> (n>0, io.EOF)
// still emits the final chunk; a write error or a non-EOF read error breaks the
// loop with a Debug log; and a writer that does not implement http.Flusher
// degrades to the old buffered behaviour rather than panicking.
func streamResponse(w http.ResponseWriter, body io.Reader) {
flusher, _ := w.(http.Flusher)
buf := make([]byte, 32*1024)
for {
n, readErr := body.Read(buf)
if n > 0 {
if _, writeErr := w.Write(buf[:n]); writeErr != nil {
log.Debug().Err(writeErr).Msg("proxy error")
break
}
if flusher != nil {
flusher.Flush()
}
}
if readErr != nil {
if readErr != io.EOF {
log.Debug().Err(readErr).Msg("proxy error")
}
break
}
}
}

View File

@@ -0,0 +1,111 @@
package factory
import (
"bytes"
"errors"
"io"
"net/http"
"testing"
"github.com/stretchr/testify/require"
)
// chunkedReader yields a fixed list of chunks across successive Read calls. A
// chunk larger than the caller's buffer is returned in pieces; once the LAST
// chunk has been fully consumed, the final Read returns its bytes together with
// io.EOF in the SAME call — the (n>0, io.EOF) case streamResponse must handle so
// the tail of a streamed Docker response is not dropped.
type chunkedReader struct {
chunks [][]byte
i int
}
func (r *chunkedReader) Read(p []byte) (int, error) {
if r.i >= len(r.chunks) {
return 0, io.EOF
}
c := r.chunks[r.i]
n := copy(p, c)
if n < len(c) {
// Buffer too small for the whole chunk: keep the remainder for the next
// Read and report no error yet.
r.chunks[r.i] = c[n:]
return n, nil
}
r.i++
if r.i >= len(r.chunks) {
// Last chunk fully delivered: hand back the data AND io.EOF at once.
return n, io.EOF
}
return n, nil
}
// countingFlushWriter is a minimal http.ResponseWriter (with http.Flusher) that
// accumulates the body and counts Flush() calls.
type countingFlushWriter struct {
buf bytes.Buffer
header http.Header
flushCount int
}
func (w *countingFlushWriter) Header() http.Header {
if w.header == nil {
w.header = http.Header{}
}
return w.header
}
func (w *countingFlushWriter) Write(p []byte) (int, error) { return w.buf.Write(p) }
func (w *countingFlushWriter) WriteHeader(int) {}
func (w *countingFlushWriter) Flush() { w.flushCount++ }
// errOnWriteWriter fails on the first Write and does NOT implement http.Flusher,
// so it also exercises streamResponse's nil-flusher fallback path.
type errOnWriteWriter struct {
header http.Header
calls int
}
func (w *errOnWriteWriter) Header() http.Header {
if w.header == nil {
w.header = http.Header{}
}
return w.header
}
func (w *errOnWriteWriter) Write(p []byte) (int, error) {
w.calls++
return 0, errors.New("write failed")
}
func (w *errOnWriteWriter) WriteHeader(int) {}
func TestStreamResponse_DeliversFullBodyAndFlushesPerChunk(t *testing.T) {
t.Parallel()
// A body larger than streamResponse's 32KB internal buffer, made of three
// distinct regions so any loss/duplication/reordering is detectable. The
// chunk boundaries do not align with the 32KB buffer, exercising both the
// partial-read path and the final (n>0, io.EOF) read.
regionA := bytes.Repeat([]byte("A"), 40000)
regionB := bytes.Repeat([]byte("B"), 20000)
regionC := bytes.Repeat([]byte("C"), 5000)
want := bytes.Join([][]byte{regionA, regionB, regionC}, nil)
r := &chunkedReader{chunks: [][]byte{
append([]byte(nil), regionA...),
append([]byte(nil), regionB...),
append([]byte(nil), regionC...),
}}
w := &countingFlushWriter{}
streamResponse(w, r)
require.Equal(t, want, w.buf.Bytes(), "streamed body must equal the input exactly (no loss/duplication)")
require.Greater(t, w.flushCount, 1, "must flush more than once for a multi-chunk body (the whole point of the change)")
}
func TestStreamResponse_StopsOnWriteErrorWithoutPanic(t *testing.T) {
t.Parallel()
r := &chunkedReader{chunks: [][]byte{[]byte("hello"), []byte("world")}}
w := &errOnWriteWriter{}
require.NotPanics(t, func() { streamResponse(w, r) })
require.Equal(t, 1, w.calls, "must break the loop on the first write error, not keep writing")
}

View File

@@ -38,6 +38,9 @@ var allowedHeaders = map[string]struct{}{
func NewSingleHostReverseProxyWithHostHeader(target *url.URL) *httputil.ReverseProxy {
proxy := &httputil.ReverseProxy{Rewrite: createRewriteFn(target)}
// FlushInterval = -1 flushes each write immediately so streaming docker responses (logs follow, events, stats, attach) are delivered live; harmless for normal JSON responses (single write).
proxy.FlushInterval = -1
proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
httperror.WriteError(w, http.StatusBadGateway, "Proxy failure", err)
}

View File

@@ -4,7 +4,6 @@ angular.module('portainer.docker').component('logViewer', {
bindings: {
data: '=',
displayTimestamps: '=',
logCollectionChange: '<',
sinceTimestamp: '=',
lineCount: '=',
resourceName: '<',

View File

@@ -1,59 +1,37 @@
<div class="row">
<div class="col-sm-12">
<rd-widget>
<rd-widget-header icon="file" title-text="Log viewer settings"></rd-widget-header>
<rd-widget-body>
<form class="form-horizontal">
<div class="form-group">
<div class="col-sm-12">
<por-switch-field
label-class="'col-sm-2'"
checked="$ctrl.state.logCollection"
label="'Auto-refresh logs'"
tooltip="'Disabling this option allows you to pause the log collection process and the auto-scrolling.'"
on-change="($ctrl.handleLogsCollectionChange)"
></por-switch-field>
</div>
</div>
<div class="form-group">
<div class="col-sm-12">
<por-switch-field label-class="'col-sm-2'" checked="$ctrl.state.wrapLines" label="'Wrap lines'" on-change="($ctrl.handleLogsWrapLinesChange)"></por-switch-field>
</div>
</div>
<div class="form-group">
<div class="col-sm-12">
<por-switch-field
label-class="'col-sm-2'"
checked="$ctrl.displayTimestamps"
label="'Display timestamps'"
on-change="($ctrl.handleDisplayTimestampsChange)"
></por-switch-field>
</div>
</div>
<div class="form-group">
<label for="logs_since" class="col-sm-2 control-label text-left"> Fetch </label>
<div class="col-sm-2">
<select class="form-control" ng-model="$ctrl.sinceTimestamp" id="logs_since">
<!-- The controls bar is projected into the widget header (default transclude
slot) so the "Log viewer settings" title and the controls share one
horizontal line, freeing vertical space for the log pane below. -->
<rd-widget-header icon="file" title-text="Log viewer settings">
<form class="form-horizontal !m-0">
<!-- Compact single-row settings bar: every control sits inline and
wraps onto extra rows only on narrow widths. -->
<div class="flex flex-wrap items-center justify-end gap-x-4 gap-y-2">
<por-switch-field label-class="'!w-auto'" checked="$ctrl.state.wrapLines" label="'Wrap lines'" on-change="($ctrl.handleLogsWrapLinesChange)"></por-switch-field>
<por-switch-field
label-class="'!w-auto'"
checked="$ctrl.displayTimestamps"
label="'Display timestamps'"
on-change="($ctrl.handleDisplayTimestampsChange)"
></por-switch-field>
<div class="flex items-center gap-x-2">
<label for="logs_since" class="control-label !w-auto whitespace-nowrap !p-0 text-left"> Since </label>
<select class="form-control !w-auto" ng-model="$ctrl.sinceTimestamp" id="logs_since">
<option selected value="">All logs</option>
<option ng-repeat="dt in $ctrl.state.availableSinceDatetime" ng-value="dt.value" title="{{ dt.value }}">{{ dt.desc }}</option>
</select>
</div>
</div>
<div class="form-group">
<label for="logs_search" class="col-sm-2 control-label text-left"> Search </label>
<div class="col-sm-8">
<input class="form-control" type="text" name="logs_search" ng-model="$ctrl.state.search" ng-change="$ctrl.state.selectedLines.length = 0;" placeholder="Filter..." />
<div class="flex items-center gap-x-2">
<label for="lines_count" class="control-label !w-auto whitespace-nowrap !p-0 text-left"> Lines </label>
<input class="form-control !w-24" type="number" name="lines_count" ng-model="$ctrl.lineCount" placeholder="Lines..." />
</div>
</div>
<div class="form-group">
<label for="lines_count" class="col-sm-2 control-label text-left"> Lines </label>
<div class="col-sm-8">
<input class="form-control" type="number" name="lines_count" ng-model="$ctrl.lineCount" placeholder="Enter no of lines..." />
<div class="flex items-center gap-x-2" style="min-width: 160px">
<label for="logs_search" class="control-label !w-auto whitespace-nowrap !p-0 text-left"> Search </label>
<input class="form-control flex-1" type="text" name="logs_search" ng-model="$ctrl.state.search" placeholder="Filter..." />
</div>
</div>
<div class="form-group" ng-if="$ctrl.state.copySupported">
<label class="col-sm-2 control-label text-left"> Actions </label>
<div class="col-sm-10">
<div class="flex flex-wrap items-center gap-1" ng-if="$ctrl.state.copySupported">
<button class="btn btn-primary btn-sm" type="button" ng-click="$ctrl.downloadLogs()" style="margin-left: 0" data-cy="download-logs-button"
><pr-icon icon="'download'"></pr-icon> Download logs</button
>
@@ -63,32 +41,23 @@
ng-disabled="($ctrl.state.filteredLogs.length === 1 && !$ctrl.state.filteredLogs[0].line) || !$ctrl.state.filteredLogs.length"
><pr-icon icon="'copy'" class-name="'space-right'"></pr-icon>Copy</button
>
<button
class="btn btn-primary btn-sm"
ng-click="$ctrl.copySelection()"
ng-disabled="($ctrl.state.filteredLogs.length === 1 && !$ctrl.state.filteredLogs[0].line) || !$ctrl.state.filteredLogs.length || !$ctrl.state.selectedLines.length"
><pr-icon icon="'copy'" class-name="'space-right'"></pr-icon>Copy selected lines</button
>
<button class="btn btn-primary btn-sm" ng-click="$ctrl.clearSelection()" ng-disabled="$ctrl.state.selectedLines.length === 0"
><pr-icon icon="'x'" class-name="'space-right'"></pr-icon>Unselect</button
>
<span>
<pr-icon id="refreshRateChange" icon="'check'" mode="'success'" style="display: none"></pr-icon>
</span>
</div>
</div>
</form>
</rd-widget-body>
</rd-widget-header>
</rd-widget>
</div>
</div>
<div class="row" style="height: 54%">
<div class="col-sm-12" style="height: 100%">
<pre ng-class="{ wrap_lines: $ctrl.state.wrapLines }" class="log_viewer" scroll-glue="$ctrl.state.autoScroll" force-glue>
<div ng-repeat="log in $ctrl.state.filteredLogs = ($ctrl.data | filter:{ 'line': $ctrl.state.search }) track by $index" class="line" ng-if="log.line"><p class="inner_line" ng-click="$ctrl.selectLine(log.line)" ng-class="{ 'line_selected': $ctrl.state.selectedLines.indexOf(log.line) > -1 }"><span ng-repeat="span in log.spans track by $index" ng-style="{ 'color': span.fgColor, 'background-color': span.bgColor, 'font-weight': span.fontWeight }">{{ span.text }}</span></p></div>
<div ng-if="!$ctrl.state.filteredLogs.length" class="line"><p class="inner_line">No log line matching the '{{ $ctrl.state.search }}' filter</p></div>
<div ng-if="$ctrl.state.filteredLogs.length === 1 && !$ctrl.state.filteredLogs[0].line" class="line"><p class="inner_line">No logs available</p></div>
<pre ng-class="{ wrap_lines: $ctrl.state.wrapLines }" class="log_viewer" scroll-glue="$ctrl.state.autoScroll">
<div ng-repeat="log in $ctrl.state.filteredLogs track by log.id" class="line" ng-if="log.line"><p class="inner_line"><span ng-repeat="span in log.spans track by $index" ng-style="{ 'color': span.fgColor, 'background-color': span.bgColor, 'font-weight': span.fontWeight }">{{ span.text }}</span></p></div>
<div ng-if="!$ctrl.state.filteredLogs.length && $ctrl.state.search" class="line"><p class="inner_line">No log line matching the '{{ $ctrl.state.search }}' filter</p></div>
<div ng-if="!$ctrl.state.filteredLogs.length && !$ctrl.state.search" class="line"><p class="inner_line">No logs available</p></div>
</pre>
</div>
</div>

View File

@@ -16,24 +16,31 @@ angular.module('portainer.docker').controller('LogViewerController', [
{ desc: 'Last 10 minutes', value: moment().subtract(10, 'minutes').format() },
],
copySupported: clipboard.supported,
logCollection: true,
autoScroll: true,
wrapLines: true,
search: '',
filteredLogs: [],
selectedLines: [],
};
this.handleLogsCollectionChange = handleLogsCollectionChange.bind(this);
this.handleLogsWrapLinesChange = handleLogsWrapLinesChange.bind(this);
this.handleDisplayTimestampsChange = handleDisplayTimestampsChange.bind(this);
this.applyFilter = applyFilter.bind(this);
function handleLogsCollectionChange(enabled) {
$scope.$evalAsync(() => {
this.state.logCollection = enabled;
this.state.autoScroll = enabled;
this.logCollectionChange(enabled);
});
this.$onInit = function () {
this.applyFilter();
// Compute the filtered list in the controller (not in the template) so we
// do not rebuild `filteredLogs` on every digest. `$watchCollection` only
// fires when lines are actually appended; combined with `track by log.id`
// in the template, already-rendered rows are never re-bound — so a live
// text selection survives incoming log lines.
$scope.$watchCollection(() => this.data, this.applyFilter);
$scope.$watch(() => this.state.search, this.applyFilter);
};
function applyFilter() {
const data = this.data || [];
const search = (this.state.search || '').toLowerCase();
this.state.filteredLogs = search ? data.filter((log) => log.line && log.line.toLowerCase().indexOf(search) > -1) : data;
}
function handleLogsWrapLinesChange(enabled) {
@@ -54,25 +61,6 @@ angular.module('portainer.docker').controller('LogViewerController', [
$('#refreshRateChange').fadeOut(2000);
};
this.copySelection = function () {
clipboard.copyText(this.state.selectedLines.join(NEW_LINE_BREAKER));
$('#refreshRateChange').show();
$('#refreshRateChange').fadeOut(2000);
};
this.clearSelection = function () {
this.state.selectedLines = [];
};
this.selectLine = function (line) {
var idx = this.state.selectedLines.indexOf(line);
if (idx === -1) {
this.state.selectedLines.push(line);
} else {
this.state.selectedLines.splice(idx, 1);
}
};
this.downloadLogs = function () {
const logsAsString = concatLogsToString(this.state.filteredLogs);
const data = new Blob([logsAsString]);

View File

@@ -0,0 +1,26 @@
import { formatJSONLine } from './formatJSONLogs';
describe('formatJSONLine 0=d 1=e guard', () => {
it('renders a bare JSON string as plain text (not index=char pairs)', () => {
const raw = '"hello"';
const lines = formatJSONLine(raw);
expect(lines).toHaveLength(1);
// faithfully passes the raw line through instead of iterating its chars
expect(lines[0].line).toBe(raw);
// would previously have produced "0=h 1=e 2=l ..." via Object.keys on a string
expect(lines[0].line).not.toContain('0=h');
});
it('renders a bare JSON array as plain text', () => {
const raw = '[1,2,3]';
const lines = formatJSONLine(raw);
expect(lines).toHaveLength(1);
expect(lines[0].line).toBe(raw);
});
it('still parses a real JSON-object log line', () => {
const lines = formatJSONLine('{"level":"info","message":"hi"}');
expect(lines[0].line).toContain('hi');
expect(lines[0].line).not.toContain('0=');
});
});

View File

@@ -1,6 +1,11 @@
import { without } from 'lodash';
import { FormattedLine, Span, JSONLogs, TIMESTAMP_LENGTH } from './types';
import {
FormattedLineContent,
Span,
JSONLogs,
TIMESTAMP_LENGTH,
} from './types';
import {
formatCaller,
formatKeyValuePair,
@@ -17,14 +22,23 @@ function removeKnownKeys(keys: string[]) {
export function formatJSONLine(
rawText: string,
withTimestamps?: boolean
): FormattedLine[] {
): FormattedLineContent[] {
const spans: Span[] = [];
const lines: FormattedLine[] = [];
const lines: FormattedLineContent[] = [];
let line = '';
const text = withTimestamps ? rawText.substring(TIMESTAMP_LENGTH) : rawText;
const json: JSONLogs = JSON.parse(text);
const parsed: unknown = JSON.parse(text);
// Only treat the line as a structured JSON log when it actually parses to a
// plain object. A line serialized as a bare JSON string (e.g. `"hello"`)
// parses to a string, whose `Object.keys` is `['0','1',...]` — which used to
// render as `0=h 1=e ...`. Fall back to the plain-text path in that case.
if (typeof parsed !== 'object' || parsed === null || Array.isArray(parsed)) {
const plain = rawText;
return [{ line: plain, spans: [{ text: plain }] }];
}
const json = parsed as JSONLogs;
const { time, level, caller, message, stack_trace: stackTrace } = json;
const keys = removeKnownKeys(Object.keys(json));

View File

@@ -0,0 +1,29 @@
import { formatLogs } from './formatLogs';
import { resetLineIdSequence } from './lineId';
beforeEach(() => {
resetLineIdSequence();
});
describe('formatLogs stable ids', () => {
it('gives every line a unique id within a call', () => {
const lines = formatLogs('one\ntwo\nthree\n');
const ids = lines.map((l) => l.id);
expect(new Set(ids).size).toBe(ids.length);
});
it('continues the id sequence across calls (so appended chunks never collide)', () => {
const first = formatLogs('a\nb\n');
const second = formatLogs('c\nd\n');
const allIds = [...first, ...second].map((l) => l.id);
expect(new Set(allIds).size).toBe(allIds.length);
// every id in the second batch is greater than every id in the first
const maxFirst = Math.max(...first.map((l) => l.id));
expect(Math.min(...second.map((l) => l.id))).toBeGreaterThan(maxFirst);
});
it('preserves the parsed text content', () => {
const lines = formatLogs('hello\nworld\n');
expect(lines.map((l) => l.line)).toEqual(['hello', 'world']);
});
});

View File

@@ -9,7 +9,14 @@ import {
} from './colors';
import { formatJSONLine } from './formatJSONLogs';
import { formatZerologLogs, ZerologRegex } from './formatZerologLogs';
import { Token, Span, TIMESTAMP_LENGTH, FormattedLine } from './types';
import { nextLineId } from './lineId';
import {
Token,
Span,
TIMESTAMP_LENGTH,
FormattedLine,
FormattedLineContent,
} from './types';
type FormatOptions = {
stripHeaders?: boolean;
@@ -43,7 +50,7 @@ export function formatLogs(
}
const tokens: Token[][] = tokenize(logs);
const formattedLogs: FormattedLine[] = [];
const formattedLogs: FormattedLineContent[] = [];
let fgColor: string | undefined;
let bgColor: string | undefined;
@@ -118,9 +125,20 @@ export function formatLogs(
formattedLogs.push({ line, spans });
}
return formattedLogs;
// Assign the stable id centrally, once, here — so every line (plain, JSON,
// zerolog, stack-trace) gets a monotonic id regardless of which formatter
// produced it. Enables `track by log.id` in the viewer template.
return formattedLogs.map(
(content): FormattedLine => ({ id: nextLineId(), ...content })
);
}
// Strips Docker's 8-byte multiplexed-stream headers from a non-TTY log buffer:
// drops the leading header, then every header that follows a newline. This
// text-level strip is only safe on a fully-buffered body (the AngularJS polling
// services for container/service/task logs) where every line starts on a frame
// boundary. The live-stream path (logStream.ts) demuxes frames at the byte
// level instead and calls formatLogs WITHOUT `stripHeaders`.
function stripHeadersFunc(logs: string) {
return logs.substring(8).replace(/\r?\n(.{8})/g, '\n');
}

View File

@@ -7,7 +7,7 @@ import {
formatTime,
} from './formatters';
import {
FormattedLine,
FormattedLineContent,
JSONStackTrace,
Level,
Span,
@@ -50,7 +50,7 @@ type Pair = {
export function formatZerologLogs(rawText: string, withTimestamps?: boolean) {
const spans: Span[] = [];
const lines: FormattedLine[] = [];
const lines: FormattedLineContent[] = [];
let line = '';
const text = withTimestamps ? rawText.substring(TIMESTAMP_LENGTH) : rawText;

View File

@@ -1,7 +1,13 @@
import { format } from 'date-fns';
import { takeRight } from 'lodash';
import { Span, Level, Colors, JSONStackTrace, FormattedLine } from './types';
import {
Span,
Level,
Colors,
JSONStackTrace,
FormattedLineContent,
} from './types';
const spaceSpan: Span = { text: ' ' };
@@ -141,7 +147,7 @@ export function formatKeyValuePair(
export function formatStackTrace(
stackTrace: JSONStackTrace | undefined,
lines: FormattedLine[]
lines: FormattedLineContent[]
) {
if (stackTrace) {
stackTrace.forEach(({ func, line: lineNumber, source }) => {

View File

@@ -1,3 +1,6 @@
export { formatLogs } from './formatLogs';
export { concatLogsToString } from './concatLogsToString';
export { NEW_LINE_BREAKER } from './constants';
export { createLogStreamProcessor } from './logStream';
export type { LogStreamProcessor } from './logStream';
export { rfc3339ToUnixNanoSince } from './sinceTimestamp';

View File

@@ -0,0 +1,17 @@
// Monotonically increasing, process-wide sequence used to give every rendered
// log line a stable id. Because it never resets during a session, lines parsed
// from later stream chunks always get ids greater than earlier ones, so the
// AngularJS `track by log.id` repeat treats already-rendered rows as unchanged
// and never rewrites their DOM text nodes (which is what was collapsing text
// selections on every poll). See logHelper/types.ts (FormattedLine).
let sequence = 0;
export function nextLineId() {
return sequence++;
}
// Test-only helper to make id assignments deterministic between unit tests.
export function resetLineIdSequence() {
sequence = 0;
}

View File

@@ -0,0 +1,344 @@
import { createLogStreamProcessor } from './logStream';
import { resetLineIdSequence } from './lineId';
const encoder = new TextEncoder();
function bytes(s: string): Uint8Array {
return encoder.encode(s);
}
// Build a real Docker multiplexed-stream frame: 1 byte stream-type, 3 zero
// bytes, a 4-byte big-endian payload length, then the payload bytes.
function frame(payload: Uint8Array | string, streamType = 1): Uint8Array {
const body = typeof payload === 'string' ? bytes(payload) : payload;
const out = new Uint8Array(8 + body.length);
out[0] = streamType;
const len = body.length;
out[4] = (len >>> 24) & 0xff;
out[5] = (len >>> 16) & 0xff;
out[6] = (len >>> 8) & 0xff;
out[7] = len & 0xff;
out.set(body, 8);
return out;
}
function concat(...parts: Uint8Array[]): Uint8Array {
const total = parts.reduce((n, p) => n + p.length, 0);
const out = new Uint8Array(total);
let off = 0;
parts.forEach((p) => {
out.set(p, off);
off += p.length;
});
return out;
}
beforeEach(() => {
resetLineIdSequence();
});
describe('createLogStreamProcessor (non-TTY, byte-level frame demux)', () => {
it('emits only completed lines and carries the partial remainder forward', () => {
const proc = createLogStreamProcessor({ stripHeaders: true });
// first chunk: one full frame then a frame whose line has no newline yet
const first = proc.push(concat(frame('hello\n'), frame('wor')));
expect(first.map((l) => l.line)).toEqual(['hello']);
// completing the partial line + a new one (across two frames)
const second = proc.push(concat(frame('ld\n'), frame('bye\n')));
expect(second.map((l) => l.line)).toEqual(['world', 'bye']);
});
it('returns nothing until a newline arrives, then flushes the trailing partial', () => {
const proc = createLogStreamProcessor({ stripHeaders: true });
expect(proc.push(frame('partial')).map((l) => l.line)).toEqual([]);
// stream ended without a trailing newline -> flush yields the remainder
expect(proc.flush().map((l) => l.line)).toEqual(['partial']);
});
it('strips the frame header from every framed line', () => {
const proc = createLogStreamProcessor({ stripHeaders: true });
const lines = proc.push(concat(frame('line-1\n'), frame('line-2\n')));
expect(lines.map((l) => l.line)).toEqual(['line-1', 'line-2']);
});
// F1: a frame whose payload length low-byte is 0x0a (10). A text-level
// demuxer that splits on '\n' before stripping headers would land inside the
// header and desync, mis-rendering "123456789\n" down to "9".
it('handles a frame whose length low-byte is 0x0a (payload of exactly 10 bytes)', () => {
const proc = createLogStreamProcessor({ stripHeaders: true });
const lines = proc.push(frame('123456789\n')); // payload length = 10 = 0x0a
expect(lines.map((l) => l.line)).toEqual(['123456789']);
});
// F2 (boundary): a chunk boundary falling between the 8-byte header and its
// payload.
it('handles a chunk split between the header and the payload', () => {
const proc = createLogStreamProcessor({ stripHeaders: true });
const full = frame('abc\n');
expect(proc.push(full.subarray(0, 8)).map((l) => l.line)).toEqual([]);
expect(proc.push(full.subarray(8)).map((l) => l.line)).toEqual(['abc']);
});
// F2: a length field with a byte >= 0x80 (200 = 0xC8), split across chunks.
// UTF-8-decoding the header would mangle 0xC8 and slip the offset.
it('handles a large frame (length byte >= 0x80) split across chunks', () => {
const proc = createLogStreamProcessor({ stripHeaders: true });
const payload = `${'x'.repeat(199)}\n`; // 200 bytes -> length byte 0xC8
const full = frame(payload);
expect(full[7]).toBe(0xc8);
expect(proc.push(full.subarray(0, 5)).map((l) => l.line)).toEqual([]);
const rest = proc.push(full.subarray(5));
expect(rest.map((l) => l.line)).toEqual(['x'.repeat(199)]);
});
// F2 (multibyte): a multibyte UTF-8 char split across two chunks must be
// reassembled before decoding (not turned into U+FFFD).
it('reassembles a multibyte UTF-8 char split across two chunks', () => {
const proc = createLogStreamProcessor({ stripHeaders: true });
const full = frame('aéb\n'); // é = 0xC3 0xA9
const eIdx = full.indexOf(0xc3);
expect(proc.push(full.subarray(0, eIdx + 1)).map((l) => l.line)).toEqual(
[]
);
const rest = proc.push(full.subarray(eIdx + 1));
expect(rest.map((l) => l.line)).toEqual(['aéb']);
});
it('drops a truncated final frame on flush instead of emitting garbage', () => {
const proc = createLogStreamProcessor({ stripHeaders: true });
const full = frame('never\n');
// only the header + 2 payload bytes arrive, then the stream ends
expect(proc.push(full.subarray(0, 10)).map((l) => l.line)).toEqual([]);
expect(proc.flush().map((l) => l.line)).toEqual([]);
});
// F3: a framed line terminated by \r\n must have its trailing CR stripped.
it('strips the trailing CR from a \\r\\n line ending (framed)', () => {
const proc = createLogStreamProcessor({ stripHeaders: true });
const lines = proc.push(frame('with-crlf\r\n'));
expect(lines.map((l) => l.line)).toEqual(['with-crlf']);
});
});
describe('createLogStreamProcessor (TTY, no headers)', () => {
it('passes lines through unchanged', () => {
const proc = createLogStreamProcessor({ stripHeaders: false });
const lines = proc.push(bytes('plain line\n'));
expect(lines.map((l) => l.line)).toEqual(['plain line']);
});
it('carries a partial line across chunks and flushes the remainder', () => {
const proc = createLogStreamProcessor({ stripHeaders: false });
expect(proc.push(bytes('par')).map((l) => l.line)).toEqual([]);
expect(proc.push(bytes('tial\n')).map((l) => l.line)).toEqual(['partial']);
expect(proc.push(bytes('tail')).map((l) => l.line)).toEqual([]);
expect(proc.flush().map((l) => l.line)).toEqual(['tail']);
});
// F3: a \r\n line ending must have its trailing CR stripped (non-TTY path).
it('strips the trailing CR from a \\r\\n line ending', () => {
const proc = createLogStreamProcessor({ stripHeaders: false });
const lines = proc.push(bytes('with-crlf\r\n'));
expect(lines.map((l) => l.line)).toEqual(['with-crlf']);
});
// F3: a remainder carrying a trailing CR (no terminating LF) must have the CR
// stripped on flush. Split across two chunks to exercise the buffered path.
it('strips a trailing CR on flush when the stream ends without a newline', () => {
const proc = createLogStreamProcessor({ stripHeaders: false });
expect(proc.push(bytes('hel')).map((l) => l.line)).toEqual([]);
expect(proc.push(bytes('lo\r')).map((l) => l.line)).toEqual([]);
expect(proc.flush().map((l) => l.line)).toEqual(['hello']);
});
});
describe('timestamps (always requested, optionally displayed)', () => {
const ts1 = '2024-01-01T00:00:00.000000001Z';
const ts2 = '2024-01-01T00:00:00.000000002Z';
it('strips the RFC3339 prefix when timestamps are hidden', () => {
const proc = createLogStreamProcessor({
stripHeaders: false,
withTimestamps: false,
streamHasTimestamps: true,
});
const lines = proc.push(bytes(`${ts1} hello\n`));
expect(lines.map((l) => l.line)).toEqual(['hello']);
expect(proc.getLastTimestamp()).toBe(ts1);
});
it('keeps the RFC3339 prefix when timestamps are shown', () => {
const proc = createLogStreamProcessor({
stripHeaders: false,
withTimestamps: true,
streamHasTimestamps: true,
});
const lines = proc.push(bytes(`${ts1} hello\n`));
expect(lines.map((l) => l.line)).toEqual([`${ts1} hello`]);
});
it('drops lines re-delivered at/before the reconnect resume point', () => {
const proc = createLogStreamProcessor({
stripHeaders: false,
withTimestamps: false,
streamHasTimestamps: true,
skipUntilTimestamp: ts1,
// the boundary line we had already shown (Docker re-delivers it inclusively)
skipBoundaryContents: [`${ts1} old`],
});
// ts1 is the inclusive boundary Docker re-delivers -> dropped; ts2 is new
const lines = proc.push(bytes(`${ts1} old\n${ts2} new\n`));
expect(lines.map((l) => l.line)).toEqual(['new']);
});
// F4: the real reconnect shape — the redelivered boundary line arrives in its
// OWN chunk (the whole batch is dropped, dropTo === lines.length), so the
// `skipping` flag must stay on and the NEXT chunk must keep dropping dups up
// to the resume point before emitting the first genuinely new line.
it('keeps dropping reconnect dups when the redelivered line arrives in its own chunk', () => {
const tsA = '2024-01-01T00:00:00.000000001Z';
const tsB = '2024-01-01T00:00:00.000000002Z';
const tsC = '2024-01-01T00:00:00.000000003Z';
const tsD = '2024-01-01T00:00:00.000000004Z';
const proc = createLogStreamProcessor({
stripHeaders: false,
withTimestamps: false,
streamHasTimestamps: true,
skipUntilTimestamp: tsB,
// dup-2 carries the resume timestamp and was already shown; dup-1 is before
// it and is dropped by the chronological compare.
skipBoundaryContents: [`${tsB} dup-2`],
});
// (1) only a redelivered boundary line (<= resume point) -> fully consumed
// by dedup; nothing emitted and skipping stays on.
expect(proc.push(bytes(`${tsA} dup-1\n`)).map((l) => l.line)).toEqual([]);
// (2) another dup + the first new line: because skipping was preserved the
// dup is still dropped, and only the new line is returned.
expect(
proc.push(bytes(`${tsB} dup-2\n${tsC} new\n`)).map((l) => l.line)
).toEqual(['new']);
// (3) a later line passes through untouched (no skipping anymore).
expect(proc.push(bytes(`${tsD} after\n`)).map((l) => l.line)).toEqual([
'after',
]);
});
// F1: two lines A and B share the exact same nanosecond timestamp; the
// connection drops after only A was shown. On reconnect `since=T` re-delivers
// BOTH. A `<= timestamp` dedup would drop both and LOSE B. Content-exact dedup
// (the boundary set is just [A]) drops A and keeps B.
it('does not lose a new line that shares the boundary timestamp with a duplicate', () => {
const tsShared = '2024-01-01T00:00:00.000000005Z';
const tsNext = '2024-01-01T00:00:00.000000006Z';
// First session: only line A (at tsShared) was delivered before the drop.
const first = createLogStreamProcessor({
stripHeaders: false,
withTimestamps: true,
streamHasTimestamps: true,
});
expect(first.push(bytes(`${tsShared} A\n`)).map((l) => l.line)).toEqual([
`${tsShared} A`,
]);
expect(first.getLastTimestamp()).toBe(tsShared);
// the boundary set handed to the reconnecting processor is exactly [A]
expect(first.getBoundaryLines()).toEqual([`${tsShared} A`]);
// Reconnect: Docker re-delivers A (duplicate) AND B (new, same timestamp).
const second = createLogStreamProcessor({
stripHeaders: false,
withTimestamps: true,
streamHasTimestamps: true,
skipUntilTimestamp: first.getLastTimestamp(),
skipBoundaryContents: first.getBoundaryLines(),
});
const redelivered = second.push(
bytes(`${tsShared} A\n${tsShared} B\n${tsNext} C\n`)
);
// A (genuine duplicate) is dropped; B (new, shares the boundary ts) survives.
expect(redelivered.map((l) => l.line)).toEqual([
`${tsShared} B`,
`${tsNext} C`,
]);
// and B is now part of the next boundary set alongside any A-twin.
expect(second.getBoundaryLines()).toEqual([`${tsNext} C`]);
});
// F1: a genuine duplicate at the boundary timestamp (nothing new shares it) is
// still dropped — the normal reconnect case is unchanged.
it('still drops a genuine duplicate at the boundary timestamp', () => {
const tsBoundary = '2024-01-01T00:00:00.000000007Z';
const tsAfter = '2024-01-01T00:00:00.000000008Z';
const proc = createLogStreamProcessor({
stripHeaders: false,
withTimestamps: false,
streamHasTimestamps: true,
skipUntilTimestamp: tsBoundary,
skipBoundaryContents: [`${tsBoundary} only`],
});
const lines = proc.push(bytes(`${tsBoundary} only\n${tsAfter} fresh\n`));
expect(lines.map((l) => l.line)).toEqual(['fresh']);
});
// F10: a SECOND reconnect at the same nanosecond timestamp must not re-emit
// duplicates. Lines A and B share ts T; the connection drops after only A was
// shown. Reconnect #1 (boundary set [A]) redelivers A,B -> drops A, keeps B,
// and — because the processor seeds its boundary state from the resume
// params — its boundary set becomes [A, B] (A is NOT forgotten). If a second
// reconnect happens at the same T before any newer-ts line arrives, it carries
// [A, B] forward and drops BOTH redelivered lines (no duplicate). Without the
// seeding, reconnect #1's boundary set would be just [B], and reconnect #2
// would fail to recognise A as a duplicate and re-emit both A and B.
it('does not duplicate lines on a second reconnect at the same nanosecond timestamp', () => {
const tsShared = '2024-01-01T00:00:00.000000005Z';
// Reconnect #1: resume at tsShared, having already shown A.
const first = createLogStreamProcessor({
stripHeaders: false,
withTimestamps: true,
streamHasTimestamps: true,
skipUntilTimestamp: tsShared,
skipBoundaryContents: [`${tsShared} A`],
});
// Docker redelivers A (duplicate) and B (new, same timestamp).
const round1 = first.push(bytes(`${tsShared} A\n${tsShared} B\n`));
expect(round1.map((l) => l.line)).toEqual([`${tsShared} B`]);
// A must NOT be forgotten: the boundary set now carries both A and B.
expect(first.getBoundaryLines()).toEqual([
`${tsShared} A`,
`${tsShared} B`,
]);
// Reconnect #2 at the same timestamp, before any newer-ts line arrived,
// carrying reconnect #1's boundary set forward.
const second = createLogStreamProcessor({
stripHeaders: false,
withTimestamps: true,
streamHasTimestamps: true,
skipUntilTimestamp: first.getLastTimestamp(),
skipBoundaryContents: first.getBoundaryLines(),
});
// Docker redelivers A and B again; both are genuine duplicates now.
const round2 = second.push(bytes(`${tsShared} A\n${tsShared} B\n`));
expect(round2.map((l) => l.line)).toEqual([]);
});
});
describe('stable line ids (append model)', () => {
it('assigns unique, strictly increasing ids across successive chunks', () => {
const proc = createLogStreamProcessor({ stripHeaders: false });
const a = proc.push(bytes('a\nb\n'));
const b = proc.push(bytes('c\n'));
const ids = [...a, ...b].map((l) => l.id);
expect(new Set(ids).size).toBe(ids.length); // all unique
const sorted = [...ids].sort((x, y) => x - y);
expect(ids).toEqual(sorted); // monotonically increasing in arrival order
});
});

View File

@@ -0,0 +1,332 @@
import { formatLogs } from './formatLogs';
import { FormattedLine, TIMESTAMP_LENGTH } from './types';
type ProcessorOptions = {
/** Non-TTY container: demux Docker's 8-byte multiplexed-stream frame headers. */
stripHeaders?: boolean;
/** Display the RFC3339 timestamp prefix (user's "Display timestamps" toggle). */
withTimestamps?: boolean;
/**
* The raw stream always carries an RFC3339 timestamp prefix because the
* controller requests `timestamps=1` internally (so it can resume from the
* exact log timestamp on reconnect). When `withTimestamps` is false we strip
* the prefix before formatting so it is not displayed.
*/
streamHasTimestamps?: boolean;
/**
* Reconnect dedup: Docker's `since` filter is inclusive, so on reconnect it
* re-delivers the boundary line(s) at the resume timestamp. This is the resume
* point (the RFC3339 timestamp of the last line we saw). Re-delivered lines
* before it are dropped; lines exactly at it are matched against
* `skipBoundaryContents` (see below).
*/
skipUntilTimestamp?: string;
/**
* Reconnect dedup, exact-content matching. The lines we had already shown that
* carry the resume timestamp (`skipUntilTimestamp`). On reconnect Docker
* re-delivers every line at that timestamp; we drop only those whose exact
* content matches one of these (the genuine duplicates) and KEEP any line that
* merely shares the timestamp but is new (different content) — otherwise a
* second line sharing the boundary nanosecond would be lost.
*/
skipBoundaryContents?: string[];
};
// Docker multiplexed-stream frame header: 1 byte stream-type, 3 zero bytes,
// then a 4-byte big-endian payload length, followed by exactly `length` payload
// bytes. See the Docker Engine API container "logs" (non-TTY) documentation.
const FRAME_HEADER_SIZE = 8;
const FRAME_LENGTH_OFFSET = 4;
// Defensive only: a single frame payload larger than this almost certainly means
// the byte stream desynced/corrupted. This is NOT a cap on legitimate lines
// (real log lines are orders of magnitude smaller); it only prevents unbounded
// buffering if we ever lose frame alignment.
const MAX_FRAME_PAYLOAD = 100 * 1024 * 1024;
// Width of Docker's fixed RFC3339Nano timestamp prefix emitted with
// `timestamps=1`: "2006-01-02T15:04:05.000000000Z" — 30 chars; index 30 is the
// separating space (TIMESTAMP_LENGTH = 31 = 30 + space).
const TS_WIDTH = TIMESTAMP_LENGTH - 1;
const TS_PREFIX = /^\d{4}-\d{2}-\d{2}T/;
const LF = 0x0a;
const CR = 0x0d;
// `subarray()` widens the backing buffer to ArrayBufferLike under TS 5.7+; use a
// permissive byte-array alias so buffered slices assign cleanly.
type Bytes = Uint8Array<ArrayBufferLike>;
function concatBytes(a: Bytes, b: Bytes): Bytes {
if (a.length === 0) {
return b;
}
if (b.length === 0) {
return a;
}
const out = new Uint8Array(a.length + b.length);
out.set(a, 0);
out.set(b, a.length);
return out;
}
function readUint32BE(buf: Bytes, offset: number): number {
// `* 0x1000000` instead of `<< 24` to keep the high byte unsigned.
return (
buf[offset] * 0x1000000 +
(buf[offset + 1] << 16) +
(buf[offset + 2] << 8) +
buf[offset + 3]
);
}
/**
* Stateful processor for an incremental (HTTP-streamed) container log response.
*
* A streaming response arrives in arbitrary byte chunks that split frames, log
* lines and even multibyte UTF-8 characters at random points. For non-TTY
* containers the body is Docker's binary multiplexed stream: each log message is
* framed with an 8-byte header carrying the payload length. This processor
* demuxes at the BYTE level — never on `\n` and never by UTF-8-decoding header
* bytes — because the low byte of a frame's length field can itself be `0x0a`
* and header bytes can be `>= 0x80` (both would corrupt a text-level demux):
*
* - it buffers raw bytes and parses complete frames by their declared length,
* carrying any partial trailing frame forward;
* - it concatenates payload bytes across frames, splits them into complete
* lines on the `0x0a` byte, and UTF-8-decodes each complete line as a whole
* (so a multibyte char split across frames/chunks is already reassembled);
* - because headers are removed here, the completed text lines are formatted by
* `formatLogs` WITHOUT its `stripHeaders` option (otherwise it would strip 8
* more characters of real content).
*
* Each emitted line carries a stable, monotonically increasing `id`, so the
* viewer can `track by log.id` and append without re-binding existing rows.
*/
export function createLogStreamProcessor({
stripHeaders,
withTimestamps,
streamHasTimestamps,
skipUntilTimestamp,
skipBoundaryContents,
}: ProcessorOptions = {}) {
// Unparsed bytes of the multiplexed frame stream (non-TTY only).
let frameBuf: Bytes = new Uint8Array(0);
// Decoded-payload bytes not yet terminated by a newline.
let lineBuf: Bytes = new Uint8Array(0);
// Reconnect dedup: while true, drop re-delivered lines up to the resume point.
let skipping = !!skipUntilTimestamp;
// Copy of the boundary line contents still pending a duplicate match; each
// matched re-delivered line consumes (splices out) one entry.
const pendingBoundary: string[] = skipBoundaryContents
? skipBoundaryContents.slice()
: [];
// RFC3339 timestamp of the last complete line we saw (next resume point).
// Seeded from the resume point so the boundary set carried in via
// `skipBoundaryContents` is preserved: a surviving line at the resume
// timestamp is APPENDED to the already-known boundary lines (not treated as a
// fresh timestamp), so a second reconnect at the same nanosecond still knows
// every line we have shown at it and drops them all instead of re-emitting
// duplicates. The first NEWER timestamp resets the set as usual.
let lastTimestamp: string | undefined = skipUntilTimestamp;
// Exact content of the line(s) at `lastTimestamp` we have emitted so far — the
// boundary set handed to the next processor for content-exact reconnect dedup.
// Seeded from the inbound boundary set (see `lastTimestamp` above).
let boundaryLines: string[] = skipBoundaryContents
? skipBoundaryContents.slice()
: [];
const decoder = new TextDecoder();
// Extract complete newline-terminated lines from the accumulated payload
// bytes, decoding each whole line at once. A `0x0a` byte can never be part of
// a multibyte UTF-8 sequence, so a line boundary never cuts a character.
function takeCompleteLines(payload: Bytes): string[] {
lineBuf = concatBytes(lineBuf, payload);
const lines: string[] = [];
let start = 0;
for (let i = 0; i < lineBuf.length; i += 1) {
if (lineBuf[i] === LF) {
let end = i;
// drop a trailing CR for \r\n line endings
if (end > start && lineBuf[end - 1] === CR) {
end -= 1;
}
lines.push(decoder.decode(lineBuf.subarray(start, end)));
start = i + 1;
}
}
lineBuf = lineBuf.subarray(start);
return lines;
}
// Pull every complete frame payload out of frameBuf and return the decoded
// complete lines they contain. An incomplete trailing frame stays buffered.
function demuxFrames(): string[] {
const lines: string[] = [];
for (;;) {
if (frameBuf.length < FRAME_HEADER_SIZE) {
break;
}
const payloadLength = readUint32BE(frameBuf, FRAME_LENGTH_OFFSET);
if (payloadLength > MAX_FRAME_PAYLOAD) {
// Corrupt/desynced stream: drop everything buffered to avoid OOM. Surface
// it so an unexpected reset is diagnosable rather than silently swallowed.
// eslint-disable-next-line no-console
console.warn(
`logStream: frame payload length ${payloadLength} exceeds ${MAX_FRAME_PAYLOAD}; resetting desynced buffer`
);
frameBuf = new Uint8Array(0);
break;
}
if (frameBuf.length - FRAME_HEADER_SIZE < payloadLength) {
break; // wait for the rest of the payload
}
const payload = frameBuf.subarray(
FRAME_HEADER_SIZE,
FRAME_HEADER_SIZE + payloadLength
);
lines.push(...takeCompleteLines(payload));
frameBuf = frameBuf.subarray(FRAME_HEADER_SIZE + payloadLength);
}
return lines;
}
// Apply reconnect-dedup + timestamp handling, then format a batch of complete
// text lines into rendered lines.
function formatBatch(rawLines: string[]): FormattedLine[] {
let lines = rawLines;
if (streamHasTimestamps) {
if (skipping && lines.length) {
// Drop re-delivered lines up to the resume point. Lines strictly before
// it are duplicates (lexical compare is chronological for the zero-padded
// UTC format). Lines exactly at the resume timestamp are dropped ONLY
// when their exact content matches a boundary line we already showed —
// so a NEW line sharing the same nanosecond timestamp is not lost.
const boundary = skipUntilTimestamp as string;
let dropTo = 0;
while (dropTo < lines.length) {
const line = lines[dropTo];
if (!TS_PREFIX.test(line)) {
break; // cannot classify without a timestamp; stop dropping
}
const ts = line.substring(0, TS_WIDTH);
const boundaryIdx =
ts === boundary ? pendingBoundary.indexOf(line) : -1;
if (ts < boundary) {
// re-delivered line before the resume point
dropTo += 1;
} else if (boundaryIdx !== -1) {
// exact duplicate of an already-shown boundary line
pendingBoundary.splice(boundaryIdx, 1);
dropTo += 1;
} else {
// ts > boundary, or a new line that merely shares the boundary
// timestamp -> keep it and everything after.
break;
}
}
if (dropTo > 0) {
lines = lines.slice(dropTo);
}
if (lines.length) {
skipping = false;
}
}
// Track the resume timestamp and the exact content of the line(s) at it,
// walking the kept lines in order (each line still has its prefix here).
// Timestamps are monotonically non-decreasing, so the trailing run sharing
// the newest timestamp is the boundary set the next processor dedups on.
for (let i = 0; i < lines.length; i += 1) {
const line = lines[i];
if (TS_PREFIX.test(line)) {
const ts = line.substring(0, TS_WIDTH);
if (ts !== lastTimestamp) {
lastTimestamp = ts;
boundaryLines = [line];
} else {
boundaryLines.push(line);
}
}
}
// When the user has timestamps hidden, strip the prefix we requested
// internally so it is not displayed.
if (!withTimestamps) {
lines = lines.map((line) =>
TS_PREFIX.test(line) ? line.substring(TIMESTAMP_LENGTH) : line
);
}
}
if (!lines.length) {
return [];
}
// Rejoin into a single batch so ANSI colour state carries across lines
// within the batch (matches the previous whole-batch formatLogs call).
// Headers are already stripped at the byte layer, so do NOT pass
// stripHeaders here.
const batch = `${lines.join('\n')}\n`;
return formatLogs(batch, { withTimestamps });
}
return {
/**
* Append a raw byte chunk and return the newly completed, formatted lines.
* Returns an empty array when the chunk did not complete any line.
*/
push(value: Bytes): FormattedLine[] {
if (!value || value.length === 0) {
return [];
}
let rawLines: string[];
if (stripHeaders) {
frameBuf = concatBytes(frameBuf, value);
rawLines = demuxFrames();
} else {
rawLines = takeCompleteLines(value);
}
return formatBatch(rawLines);
},
/**
* Flush any buffered partial line (e.g. when the stream ends without a
* trailing newline). A partial/truncated final frame is dropped so we never
* emit garbage.
*/
flush(): FormattedLine[] {
// Drop any incomplete trailing frame.
frameBuf = new Uint8Array(0);
if (lineBuf.length === 0) {
return [];
}
let bytes: Bytes = lineBuf;
// drop a trailing CR for \r\n line endings
if (bytes.length > 0 && bytes[bytes.length - 1] === CR) {
bytes = bytes.subarray(0, bytes.length - 1);
}
lineBuf = new Uint8Array(0);
return formatBatch([decoder.decode(bytes)]);
},
/** RFC3339 timestamp of the last complete line seen, for reconnect resume. */
getLastTimestamp(): string | undefined {
return lastTimestamp;
},
/**
* Exact content of the line(s) carrying the last-seen timestamp. Pass to the
* next processor as `skipBoundaryContents` so reconnect dedup drops only the
* genuine duplicates Docker re-delivers, never a new line that happens to
* share the boundary nanosecond.
*/
getBoundaryLines(): string[] {
return boundaryLines.slice();
},
};
}
export type LogStreamProcessor = ReturnType<typeof createLogStreamProcessor>;

View File

@@ -0,0 +1,31 @@
import { rfc3339ToUnixNanoSince } from './sinceTimestamp';
// Integer Unix seconds of the shared second-precision prefix, computed the same
// way the parser does (Date.parse of the "...:05Z" prefix, in UTC).
const unix = Math.floor(Date.parse('2024-01-01T00:00:00Z') / 1000);
describe('rfc3339ToUnixNanoSince', () => {
it('keeps a standard fixed-width nanosecond fraction', () => {
expect(rfc3339ToUnixNanoSince('2024-01-01T00:00:00.000000005Z')).toBe(
`${unix}.000000005`
);
});
it('emits a zero fraction when the timestamp has no fractional part', () => {
expect(rfc3339ToUnixNanoSince('2024-01-01T00:00:00Z')).toBe(
`${unix}.000000000`
);
});
it('right-pads a sub-nanosecond-width fraction to 9 digits', () => {
expect(rfc3339ToUnixNanoSince('2024-01-01T00:00:00.5Z')).toBe(
`${unix}.500000000`
);
});
it('truncates a fraction longer than 9 digits to nanosecond precision', () => {
expect(rfc3339ToUnixNanoSince('2024-01-01T00:00:00.1234567890123Z')).toBe(
`${unix}.123456789`
);
});
});

View File

@@ -0,0 +1,26 @@
// Build Docker's `since` param as a Unix timestamp with a nanosecond fraction
// ("<seconds>.<nanos>") from an RFC3339Nano timestamp. Both the initial connect
// and reconnect use this one form so we never mix unix-seconds and RFC3339 —
// some proxies / pinned API versions accept them inconsistently. The fraction
// needs string precision (a JS number cannot hold nanoseconds), hence `since`
// is returned as a string.
//
// Docker emits a fixed-width RFC3339Nano prefix: "2006-01-02T15:04:05.000000000Z".
// We parse the second-precision prefix in UTC for the integer seconds, then take
// the fractional digits verbatim: short fractions (e.g. ".5") are right-padded
// to 9 digits, longer ones are truncated to 9 (nanosecond precision).
export function rfc3339ToUnixNanoSince(rfc3339: string): string {
const seconds = Math.floor(
Date.parse(`${rfc3339.substring(0, 19)}Z`) / 1000
);
const dot = rfc3339.indexOf('.');
const nanos =
dot >= 0
? rfc3339
.substring(dot + 1)
.replace(/[^0-9]/g, '')
.padEnd(9, '0')
.substring(0, 9)
: '000000000';
return `${seconds}.${nanos}`;
}

View File

@@ -36,11 +36,21 @@ export type Span = {
fontWeight?: FontWeight;
};
export type FormattedLine = {
// A rendered log line without its stable id. Internal formatters build these,
// and `formatLogs` assigns the `id` once, centrally, on the way out.
export type FormattedLineContent = {
spans: Span[];
line: string;
};
// A rendered log line with a stable, monotonically increasing id. The id lets
// the AngularJS template use `track by log.id`, so already-rendered rows are
// never re-bound (their text nodes are not rewritten) — which is what keeps a
// live text selection from collapsing while new lines stream in.
export type FormattedLine = FormattedLineContent & {
id: number;
};
export const TIMESTAMP_LENGTH = 31; // 30 for timestamp + 1 for trailing space
export const Colors = {

View File

@@ -1,84 +1,254 @@
import moment from 'moment';
import { streamContainerLogs } from '@/react/docker/containers/containers.service';
import { createLogStreamProcessor, rfc3339ToUnixNanoSince } from '@/docker/helpers/logHelper';
// Hard cap on how many lines we keep in the DOM/buffer during a long live
// stream. We trim from the head (oldest lines) so a selection anchored in the
// tail is never disturbed. The user-facing `lineCount` (tail) is a *request*
// for how many historical lines to fetch, not this in-memory limit.
const MAX_LOG_LINES = 5000;
// Delay before reconnecting after the stream ends or errors (container stopped,
// network blip, proxy hiccup). Avoids hammering a stopped container.
const RECONNECT_DELAY_MS = 3000;
// Default tail (historical-backfill) request used on the initial connect and
// whenever the user clears/invalidates the "Lines" field — without it a cleared
// number input becomes null, `tail` is dropped, and Docker re-downloads the
// entire history of a chatty container on every parameter change.
const DEFAULT_LINE_COUNT = 100;
angular.module('portainer.docker').controller('ContainerLogsController', [
'$scope',
'$transition$',
'$interval',
'$timeout',
'ContainerService',
'Notifications',
'HttpRequestHelper',
'endpoint',
function ($scope, $transition$, $interval, ContainerService, Notifications, HttpRequestHelper, endpoint) {
function ($scope, $transition$, $timeout, ContainerService, Notifications, HttpRequestHelper, endpoint) {
$scope.state = {
refreshRate: 3,
lineCount: 100,
// No refreshRate here: container logs are delivered over a live stream now,
// not a 3s poll, so there is nothing to refresh on an interval.
lineCount: DEFAULT_LINE_COUNT,
sinceTimestamp: '',
displayTimestamps: false,
};
$scope.changeLogCollection = function (logCollectionStatus) {
if (!logCollectionStatus) {
stopRepeater();
} else {
setUpdateRepeater(!$scope.container.Config.Tty);
}
// Live-stream bookkeeping (not on $scope.state to avoid digest churn).
var stream = {
abortController: null,
reconnectTimer: null,
// RFC3339 timestamp of the last log line we received. Used as `since` on
// reconnect so we resume from the exact log position (not the client
// wall-clock) and neither duplicate nor lose lines across a reconnect.
lastTimestamp: '',
// Exact content of the line(s) at `lastTimestamp` we already rendered.
// Handed to the reconnecting processor so it drops only genuine duplicates
// Docker re-delivers, never a new line that shares the boundary nanosecond.
boundaryLines: [],
// false while the stream is being torn down (view destroyed / reconnect
// teardown) — suppresses auto-reconnect.
active: false,
skipHeaders: false,
// Whether we already surfaced the current reconnect-loop error, so the 3s
// reconnect loop does not spam a notification on every attempt.
errorNotified: false,
};
$scope.$on('$destroy', function () {
stopRepeater();
stopStream();
});
function stopRepeater() {
var repeater = $scope.repeater;
if (angular.isDefined(repeater)) {
$interval.cancel(repeater);
function clearReconnectTimer() {
if (stream.reconnectTimer) {
$timeout.cancel(stream.reconnectTimer);
stream.reconnectTimer = null;
}
}
function setUpdateRepeater(skipHeaders) {
var refreshRate = $scope.state.refreshRate;
$scope.repeater = $interval(function () {
ContainerService.logs(
endpoint.Id,
$transition$.params().id,
1,
1,
$scope.state.displayTimestamps ? 1 : 0,
moment($scope.state.sinceTimestamp).unix(),
$scope.state.lineCount,
skipHeaders
)
.then(function success(data) {
$scope.logs = data;
})
.catch(function error(err) {
stopRepeater();
Notifications.error('Failure', err, 'Unable to retrieve container logs');
});
}, refreshRate * 1000);
function abortInFlight() {
if (stream.abortController) {
stream.abortController.abort();
stream.abortController = null;
}
}
function startLogPolling(skipHeaders) {
ContainerService.logs(
// Stop streaming but keep the current buffer on screen. Used for the
// reconnect/teardown path: aborts the in-flight request and cancels any
// pending reconnect so a fresh connect (or view destroy) starts clean.
function pauseStream() {
stream.active = false;
clearReconnectTimer();
abortInFlight();
}
// Full teardown on view destroy.
function stopStream() {
pauseStream();
}
function appendLines(lines) {
if (!lines.length) {
return;
}
$scope.$applyAsync(function () {
Array.prototype.push.apply($scope.logs, lines);
const overflow = $scope.logs.length - MAX_LOG_LINES;
if (overflow > 0) {
// trim oldest lines; tail (and any selection in it) is untouched
$scope.logs.splice(0, overflow);
}
});
}
// After feeding the processor, advance the reconnect resume point to the
// timestamp of the last line it parsed.
function updateResumePoint(processor) {
const ts = processor.getLastTimestamp();
if (ts) {
stream.lastTimestamp = ts;
// Remember the exact boundary line(s) at this timestamp for content-exact
// reconnect dedup (so a new line sharing the nanosecond is not dropped).
stream.boundaryLines = processor.getBoundaryLines();
}
}
// `since` is built from the last log timestamp via rfc3339ToUnixNanoSince
// (imported from logHelper) as a single "<seconds>.<nanos>" form — see that
// helper for why we never mix unix-seconds and RFC3339.
// Connect (or reconnect) the live stream.
// `resetBuffer` clears the on-screen buffer (used on first connect / param
// changes); reconnects after a drop keep the buffer and resume via `since`.
function startStream(resetBuffer) {
pauseStream();
stream.active = true;
if (resetBuffer) {
$scope.logs.length = 0;
stream.lastTimestamp = '';
stream.boundaryLines = [];
stream.errorNotified = false;
}
const resuming = !!stream.lastTimestamp;
const processor = createLogStreamProcessor({
stripHeaders: stream.skipHeaders,
withTimestamps: $scope.state.displayTimestamps,
// We always request timestamps from Docker (see params below) so we can
// resume from the exact log timestamp on reconnect; the processor parses
// them and strips the prefix when the user has timestamps hidden.
streamHasTimestamps: true,
// On reconnect, drop lines Docker re-delivers at/before the resume point
// (its `since` filter is inclusive); content-exact match on the boundary
// line(s) so a new line sharing the boundary timestamp is not lost.
skipUntilTimestamp: resuming ? stream.lastTimestamp : undefined,
skipBoundaryContents: resuming ? stream.boundaryLines : undefined,
});
const abortController = new AbortController();
stream.abortController = abortController;
// On a reconnect resume from the exact timestamp of the last line we saw;
// on the initial connect honour the user's "Fetch since" selection. Both
// are sent as a single "<unix>.<nanos>" form (see rfc3339ToUnixNanoSince).
let since;
if (stream.lastTimestamp) {
since = rfc3339ToUnixNanoSince(stream.lastTimestamp);
} else if ($scope.state.sinceTimestamp) {
since = `${moment($scope.state.sinceTimestamp).unix()}.000000000`;
} else {
since = undefined; // omitted by the service -> Docker tails from now
}
// Substitute the default when the "Lines" field is cleared/invalid (a
// cleared number input is null), otherwise tail would be dropped and Docker
// would re-download the whole history.
const tailLineCount = $scope.state.lineCount > 0 ? $scope.state.lineCount : DEFAULT_LINE_COUNT;
const params = {
stdout: true,
stderr: true,
follow: true,
// Always request timestamps so we can resume precisely on reconnect; the
// processor hides them from display when the user toggle is off.
timestamps: true,
// tail is a historical-backfill request: apply it only on the initial
// connect. On a reconnect we resume from `since`, so re-applying tail
// would re-deliver the tail window. (0 is dropped by the service.)
tail: resuming ? 0 : tailLineCount,
since,
};
streamContainerLogs(
endpoint.Id,
$transition$.params().id,
1,
1,
$scope.state.displayTimestamps ? 1 : 0,
moment($scope.state.sinceTimestamp).unix(),
$scope.state.lineCount,
skipHeaders
params,
function onChunk(bytes) {
const lines = processor.push(bytes);
appendLines(lines);
updateResumePoint(processor);
if (lines.length) {
// got data again -> allow a fresh error notification next failure
stream.errorNotified = false;
}
},
abortController.signal
)
.then(function success(data) {
$scope.logs = data;
setUpdateRepeater(skipHeaders);
.then(function onEnd() {
// The stream always reconnects from here, so discard the processor's
// unfinished trailing remainder instead of flushing it: emitting a
// truncated fragment AND advancing the resume point to its timestamp
// would make Docker re-send the full line under the same `since` and
// dedup drop it — losing the line and leaving the fragment on screen.
// Completed lines already advanced the resume point in onChunk; `since`
// redelivers the full boundary line on reconnect.
scheduleReconnect();
})
.catch(function error(err) {
stopRepeater();
Notifications.error('Failure', err, 'Unable to retrieve container logs');
.catch(function onError(err) {
if (abortController.signal.aborted) {
return; // intentional abort (pause/destroy/param change)
}
// Drop the unfinished remainder on reconnect (see onEnd): do not flush
// a truncated line nor move the resume point off an incomplete line.
// Notify once per reconnect loop, not on every 3s retry.
if (!stream.errorNotified) {
stream.errorNotified = true;
Notifications.error('Failure', err, 'Unable to stream container logs');
}
scheduleReconnect();
});
}
function scheduleReconnect() {
if (!stream.active) {
return;
}
clearReconnectTimer();
stream.reconnectTimer = $timeout(function () {
if (stream.active) {
startStream(false);
}
}, RECONNECT_DELAY_MS);
}
// Restart the stream from scratch when a fetch parameter changes (tail
// count, since, timestamps). Each replaces the buffer with a fresh request.
function restartOnParamChange() {
if (stream.active || stream.abortController) {
startStream(true);
}
}
$scope.$watch('state.lineCount', onParamWatch);
$scope.$watch('state.sinceTimestamp', onParamWatch);
$scope.$watch('state.displayTimestamps', onParamWatch);
function onParamWatch(newVal, oldVal) {
if (newVal !== oldVal) {
restartOnParamChange();
}
}
function initView() {
HttpRequestHelper.setPortainerAgentTargetHeader($transition$.params().nodeName);
ContainerService.container(endpoint.Id, $transition$.params().id)
@@ -90,7 +260,11 @@ angular.module('portainer.docker').controller('ContainerLogsController', [
$scope.logsEnabled = logsEnabled;
if (logsEnabled) {
startLogPolling(!container.Config.Tty);
// initialise an (empty but defined) buffer so the viewer renders
// immediately, then start the live stream
$scope.logs = [];
stream.skipHeaders = !container.Config.Tty;
startStream(true);
}
})
.catch(function error(err) {

View File

@@ -15,7 +15,6 @@
<log-viewer
data="logs"
ng-if="logs && logsEnabled"
log-collection-change="changeLogCollection"
display-timestamps="state.displayTimestamps"
line-count="state.lineCount"
since-timestamp="state.sinceTimestamp"

View File

@@ -14,14 +14,6 @@ angular.module('portainer.docker').controller('ServiceLogsController', [
displayTimestamps: false,
};
$scope.changeLogCollection = function (logCollectionStatus) {
if (!logCollectionStatus) {
stopRepeater();
} else {
setUpdateRepeater();
}
};
$scope.$on('$destroy', function () {
stopRepeater();
});
@@ -38,7 +30,17 @@ angular.module('portainer.docker').controller('ServiceLogsController', [
$scope.repeater = $interval(function () {
ServiceService.logs($transition$.params().id, 1, 1, $scope.state.displayTimestamps ? 1 : 0, moment($scope.state.sinceTimestamp).unix(), $scope.state.lineCount)
.then(function success(data) {
$scope.logs = data;
// NOTE: service logs still poll and replace the whole array. Because
// formatLogs assigns fresh line ids per poll, `track by log.id` in
// the viewer re-renders every row each poll (a live text selection
// can collapse). The append-only live stream that fixes this exists
// only for container logs (issue #2); converting service/task logs
// to a live stream is out of scope here. Assign positionally-stable
// ids (0..N) so `track by log.id` reuses rows across polls (like the
// old `track by $index`) and a live text selection survives.
$scope.logs = data.map(function (line, i) {
return { ...line, id: i };
});
})
.catch(function error(err) {
stopRepeater();
@@ -50,7 +52,11 @@ angular.module('portainer.docker').controller('ServiceLogsController', [
function startLogPolling() {
ServiceService.logs($transition$.params().id, 1, 1, $scope.state.displayTimestamps ? 1 : 0, moment($scope.state.sinceTimestamp).unix(), $scope.state.lineCount)
.then(function success(data) {
$scope.logs = data;
// Positionally-stable ids so `track by log.id` reuses rows (see the
// poll handler above).
$scope.logs = data.map(function (line, i) {
return { ...line, id: i };
});
setUpdateRepeater();
})
.catch(function error(err) {

View File

@@ -18,7 +18,6 @@
<log-viewer
data="logs"
ng-if="logs"
log-collection-change="changeLogCollection"
display-timestamps="state.displayTimestamps"
line-count="state.lineCount"
since-timestamp="state.sinceTimestamp"

View File

@@ -15,14 +15,6 @@ angular.module('portainer.docker').controller('TaskLogsController', [
displayTimestamps: false,
};
$scope.changeLogCollection = function (logCollectionStatus) {
if (!logCollectionStatus) {
stopRepeater();
} else {
setUpdateRepeater();
}
};
$scope.$on('$destroy', function () {
stopRepeater();
});
@@ -39,7 +31,17 @@ angular.module('portainer.docker').controller('TaskLogsController', [
$scope.repeater = $interval(function () {
TaskService.logs($transition$.params().id, 1, 1, $scope.state.displayTimestamps ? 1 : 0, moment($scope.state.sinceTimestamp).unix(), $scope.state.lineCount)
.then(function success(data) {
$scope.logs = data;
// NOTE: task logs still poll and replace the whole array. Because
// formatLogs assigns fresh line ids per poll, `track by log.id` in
// the viewer re-renders every row each poll (a live text selection
// can collapse). The append-only live stream that fixes this exists
// only for container logs (issue #2); converting service/task logs
// to a live stream is out of scope here. Assign positionally-stable
// ids (0..N) so `track by log.id` reuses rows across polls (like the
// old `track by $index`) and a live text selection survives.
$scope.logs = data.map(function (line, i) {
return { ...line, id: i };
});
})
.catch(function error(err) {
stopRepeater();
@@ -51,7 +53,11 @@ angular.module('portainer.docker').controller('TaskLogsController', [
function startLogPolling() {
TaskService.logs($transition$.params().id, 1, 1, $scope.state.displayTimestamps ? 1 : 0, moment($scope.state.sinceTimestamp).unix(), $scope.state.lineCount)
.then(function success(data) {
$scope.logs = data;
// Positionally-stable ids so `track by log.id` reuses rows (see the
// poll handler above).
$scope.logs = data.map(function (line, i) {
return { ...line, id: i };
});
setUpdateRepeater();
})
.catch(function error(err) {

View File

@@ -19,7 +19,6 @@
<log-viewer
data="logs"
ng-if="logs"
log-collection-change="changeLogCollection"
display-timestamps="state.displayTimestamps"
line-count="state.lineCount"
since-timestamp="state.sinceTimestamp"

View File

@@ -1,8 +1,17 @@
import _ from 'lodash';
import { InternalAxiosRequestConfig } from 'axios';
import { EnvironmentId } from '@/react/portainer/environments/types';
import PortainerError from '@/portainer/error';
import axios, { parseAxiosError } from '@/portainer/services/axios/axios';
import axios, {
agentTargetHeader,
parseAxiosError,
} from '@/portainer/services/axios/axios';
import {
portainerAgentManagerOperation,
portainerAgentTargetHeader,
} from '@/portainer/services/http-request.helper';
import { dockerMaxAPIVersionInterceptor } from '@/portainer/services/dockerMaxApiVersionInterceptor';
import { withAgentTargetHeader } from '../proxy/queries/utils';
import { buildDockerProxyUrl } from '../proxy/queries/buildDockerProxyUrl';
@@ -190,3 +199,124 @@ export async function getContainerLogs(
throw parseAxiosError(e, 'Unable to get container logs');
}
}
export type StreamLogsParams = ContainerLogsParams & {
/** follow=1 — keep the connection open and stream new lines as they arrive */
follow?: boolean;
};
// Memoize the Docker max-API-version pinning per proxy path for the session. The
// pinning interceptor only rewrites the URL based on the environment's Docker API
// version (cached and stable for the session), so resolving it once avoids an
// extra `/version` round-trip on every reconnect. Caching the Promise also
// dedupes concurrent reconnect attempts.
const pinnedLogPathCache = new Map<string, Promise<string>>();
function resolvePinnedLogPath(path: string): Promise<string> {
let cached = pinnedLogPathCache.get(path);
if (!cached) {
cached = dockerMaxAPIVersionInterceptor({
url: path,
} as InternalAxiosRequestConfig).then((config) => config.url ?? path);
pinnedLogPathCache.set(path, cached);
}
return cached;
}
/**
* Live-tail a container's logs over HTTP.
*
* Unlike `getContainerLogs` (axios, buffers the whole body), this uses `fetch`
* so we can read the response body as a `ReadableStream` and surface the raw
* byte chunks to the caller as they arrive (`follow=1`). Bytes are handed over
* undecoded so the caller can demux Docker's binary multiplexed frames at the
* byte level (UTF-8-decoding the whole stream would corrupt frame headers). The
* backend already
* streams `follow=1` transparently through the Docker proxy — including for
* Agent/Edge environments — so no backend change is needed.
*
* Auth: the API uses an httpOnly JWT cookie (SameSite=Strict), so a same-origin
* `fetch` with `credentials: 'include'` carries it automatically — no
* `Authorization` header. CSRF middleware only guards mutating methods; logs is
* a GET, so it is unaffected. The agent-target / manager-operation headers
* (normally added by the axios `agentInterceptor`) are replicated here so the
* stream also resolves the correct node on Agent/Edge environments.
*
* The caller drives lifetime via `signal`: aborting it (unmount, container
* switch, pausing Live) cancels the in-flight fetch and ends the read loop.
*/
export async function streamContainerLogs(
environmentId: EnvironmentId,
containerId: ContainerId,
params: StreamLogsParams,
onChunk: (bytes: Uint8Array) => void,
signal: AbortSignal
): Promise<void> {
const path = buildDockerProxyUrl(
environmentId,
'containers',
containerId,
'logs'
);
// The fetch path bypasses the axios request interceptors, so apply the same
// Docker max-API-version pinning axios applies to getContainerLogs. Resolve it
// once per session (memoized below) instead of hitting `/version` on every 3s
// reconnect — the pinning only depends on the environment's Docker API version,
// which is stable for the session.
const effectivePath = await resolvePinnedLogPath(path);
const query = new URLSearchParams();
// _.pickBy drops undefined/0/'' the same way the axios path does
Object.entries(_.pickBy(params)).forEach(([key, value]) => {
query.set(key, String(typeof value === 'boolean' ? Number(value) : value));
});
// axios baseURL is 'api'; mirror it here since fetch doesn't share it.
const url = `api${effectivePath}?${query.toString()}`;
const headers: Record<string, string> = {};
const target = portainerAgentTargetHeader();
if (target) {
headers[agentTargetHeader] = target;
}
if (portainerAgentManagerOperation()) {
headers['X-PortainerAgent-ManagerOperation'] = '1';
}
const response = await fetch(url, {
signal,
credentials: 'include',
headers,
});
if (!response.ok) {
throw new PortainerError(
`Unable to stream container logs (HTTP ${response.status})`
);
}
if (!response.body) {
return;
}
const reader = response.body.getReader();
try {
for (;;) {
const { value, done } = await reader.read();
if (done) {
break;
}
if (value) {
// Hand over the raw bytes undecoded; the caller demuxes Docker's binary
// frames and decodes complete lines itself. UTF-8 chars split across
// chunks are reassembled there before decoding, so no decoder flush is
// needed here. The caller flushes its own trailing partial line on end.
onChunk(value);
}
}
} finally {
reader.releaseLock();
}
}

View File

@@ -54,6 +54,9 @@ export type ContainerLogsParams = {
stdout?: boolean;
stderr?: boolean;
timestamps?: boolean;
since?: number;
// Unix timestamp. The live-stream path sends a "<seconds>.<nanos>" string so a
// reconnect can resume at exact nanosecond precision (which a number cannot
// hold); the buffered axios path passes a plain number.
since?: number | string;
tail?: number;
};