Extract the manual stream-and-flush loop from dockerLocalProxy.ServeHTTP into a behaviour-preserving package-private streamResponse(w, body) helper, and add docker_test.go regression tests for the riskiest path (it runs on every Docker API response): - DeliversFullBodyAndFlushesPerChunk: a >32KB body delivered as several chunks (boundaries not aligned to the 32KB buffer), with the final Read returning (n>0, io.EOF) simultaneously, asserts the streamed body equals the input exactly (no loss/duplication) and that Flush ran more than once (the per-chunk flush is the whole point of the change). - StopsOnWriteErrorWithoutPanic: a writer that errors on first Write (and does not implement http.Flusher, exercising the nil-flusher fallback) breaks the loop after one write without panicking. No production behaviour change — the loop body is identical, only moved. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
162 lines
4.8 KiB
Go
162 lines
4.8 KiB
Go
package factory
|
|
|
|
import (
|
|
"io"
|
|
"net/http"
|
|
"strings"
|
|
|
|
portainer "github.com/portainer/portainer/api"
|
|
"github.com/portainer/portainer/api/crypto"
|
|
"github.com/portainer/portainer/api/http/proxy/factory/docker"
|
|
"github.com/portainer/portainer/api/internal/endpointutils"
|
|
"github.com/portainer/portainer/api/url"
|
|
httperror "github.com/portainer/portainer/pkg/libhttp/error"
|
|
"github.com/portainer/portainer/pkg/libhttp/ssrf"
|
|
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
func (factory *ProxyFactory) newDockerProxy(endpoint *portainer.Endpoint) (http.Handler, error) {
|
|
if strings.HasPrefix(endpoint.URL, "unix://") || strings.HasPrefix(endpoint.URL, "npipe://") {
|
|
return factory.newDockerLocalProxy(endpoint)
|
|
}
|
|
|
|
return factory.newDockerHTTPProxy(endpoint)
|
|
}
|
|
|
|
func (factory *ProxyFactory) newDockerLocalProxy(endpoint *portainer.Endpoint) (http.Handler, error) {
|
|
endpointURL, err := url.ParseURL(endpoint.URL)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return factory.newOSBasedLocalProxy(endpointURL.Path, endpoint)
|
|
}
|
|
|
|
func (factory *ProxyFactory) newDockerHTTPProxy(endpoint *portainer.Endpoint) (http.Handler, error) {
|
|
rawURL := endpoint.URL
|
|
if endpoint.Type == portainer.EdgeAgentOnDockerEnvironment {
|
|
tunnelAddr, err := factory.reverseTunnelService.TunnelAddr(endpoint)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rawURL = "http://" + tunnelAddr
|
|
}
|
|
|
|
endpointURL, err := url.ParseURL(rawURL)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
endpointURL.Scheme = "http"
|
|
|
|
transportParameters := &docker.TransportParameters{
|
|
Endpoint: endpoint,
|
|
DataStore: factory.dataStore,
|
|
ReverseTunnelService: factory.reverseTunnelService,
|
|
SignatureService: factory.signatureService,
|
|
DockerClientFactory: factory.dockerClientFactory,
|
|
}
|
|
|
|
var innerTransport *http.Transport
|
|
if endpoint.TLSConfig.TLS || endpoint.TLSConfig.TLSSkipVerify {
|
|
tlsConfig, err := crypto.CreateTLSConfigurationFromDisk(endpoint.TLSConfig)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
endpointURL.Scheme = "https"
|
|
|
|
if endpointutils.IsEdgeEndpoint(endpoint) {
|
|
innerTransport = ssrf.NewInternalTransport(tlsConfig)
|
|
} else {
|
|
innerTransport = ssrf.NewTransport(tlsConfig)
|
|
}
|
|
} else if endpointutils.IsEdgeEndpoint(endpoint) {
|
|
innerTransport = ssrf.NewInternalTransport(nil)
|
|
} else {
|
|
innerTransport = ssrf.NewTransport(nil)
|
|
}
|
|
|
|
dockerTransport, err := docker.NewTransport(transportParameters, innerTransport, factory.gitService, factory.snapshotService)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
proxy := NewSingleHostReverseProxyWithHostHeader(endpointURL)
|
|
proxy.Transport = dockerTransport
|
|
return proxy, nil
|
|
}
|
|
|
|
type dockerLocalProxy struct {
|
|
transport *docker.Transport
|
|
}
|
|
|
|
// ServeHTTP is the http.Handler interface implementation
|
|
// for a local (Unix socket or Windows named pipe) Docker proxy.
|
|
func (proxy *dockerLocalProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
// Force URL/domain to http/unixsocket to be able to
|
|
// use http.transport RoundTrip to do the requests via the socket
|
|
r.URL.Scheme = "http"
|
|
r.URL.Host = "unixsocket"
|
|
|
|
res, err := proxy.transport.ProxyDockerRequest(r)
|
|
if err != nil {
|
|
code := http.StatusInternalServerError
|
|
if res != nil && res.StatusCode != 0 {
|
|
code = res.StatusCode
|
|
}
|
|
|
|
httperror.WriteError(w, code, "Unable to proxy the request via the Docker socket", err)
|
|
return
|
|
}
|
|
defer func() {
|
|
if err := res.Body.Close(); err != nil {
|
|
log.Warn().Err(err).Msg("proxy error: failed to close response body")
|
|
}
|
|
}()
|
|
|
|
for k, vv := range res.Header {
|
|
for _, v := range vv {
|
|
w.Header().Add(k, v)
|
|
}
|
|
}
|
|
|
|
w.WriteHeader(res.StatusCode)
|
|
|
|
streamResponse(w, res.Body)
|
|
}
|
|
|
|
// streamResponse copies body to w, flushing after every chunk instead of using
|
|
// io.Copy. io.Copy buffers into the http.ResponseWriter (~2KB) and only flushes
|
|
// when the buffer fills or the handler returns, so low-throughput streaming
|
|
// docker responses (container logs follow, events, stats, attach) would arrive
|
|
// in multi-second batches instead of live; flushing per chunk delivers them as
|
|
// they are produced. Behaviour otherwise matches io.Copy: the n>0 chunk is
|
|
// written BEFORE the readErr check so a simultaneous Read -> (n>0, io.EOF)
|
|
// still emits the final chunk; a write error or a non-EOF read error breaks the
|
|
// loop with a Debug log; and a writer that does not implement http.Flusher
|
|
// degrades to the old buffered behaviour rather than panicking.
|
|
func streamResponse(w http.ResponseWriter, body io.Reader) {
|
|
flusher, _ := w.(http.Flusher)
|
|
buf := make([]byte, 32*1024)
|
|
for {
|
|
n, readErr := body.Read(buf)
|
|
if n > 0 {
|
|
if _, writeErr := w.Write(buf[:n]); writeErr != nil {
|
|
log.Debug().Err(writeErr).Msg("proxy error")
|
|
break
|
|
}
|
|
if flusher != nil {
|
|
flusher.Flush()
|
|
}
|
|
}
|
|
if readErr != nil {
|
|
if readErr != io.EOF {
|
|
log.Debug().Err(readErr).Msg("proxy error")
|
|
}
|
|
break
|
|
}
|
|
}
|
|
}
|