From 0ff7d724ff5a807c420dfe6f9bd1870c546ea426 Mon Sep 17 00:00:00 2001 From: Jesse Gross Date: Tue, 17 Mar 2026 17:02:51 -0700 Subject: [PATCH] mlx: fix subprocess log deadlock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The stderr reader used bufio.Scanner which has a 64KB max line size. If the subprocess wrote a line exceeding this limit, the scanner would stop reading, the OS pipe buffer would fill, and the subprocess would deadlock. Replace the scanner with a statusWriter that wraps io.Copy. The writer forwards all stderr to os.Stderr while capturing the last short line (≤256 bytes) for error reporting, avoiding both the deadlock and the need to buffer arbitrarily long lines. --- x/mlxrunner/client.go | 86 ++++++++++++++++++++++++++++++++----------- 1 file changed, 64 insertions(+), 22 deletions(-) diff --git a/x/mlxrunner/client.go b/x/mlxrunner/client.go index 5eff1bd02..824f20939 100644 --- a/x/mlxrunner/client.go +++ b/x/mlxrunner/client.go @@ -2,6 +2,7 @@ package mlxrunner import ( "bufio" + "bytes" "context" "encoding/json" "errors" @@ -38,12 +39,66 @@ type Client struct { memory atomic.Uint64 done chan error client *http.Client - lastErr string - lastErrLock sync.Mutex + status *statusWriter mu sync.Mutex cmd *exec.Cmd } +// statusWriter captures the last stderr line from the subprocess while +// forwarding all output to os.Stderr. Lines longer than maxStatusLen are +// truncated to the first maxStatusLen bytes. +type statusWriter struct { + lastErrMsg string + buf []byte + discarding bool + mu sync.Mutex + out *os.File +} + +const maxStatusLen = 256 + +func (w *statusWriter) Write(b []byte) (int, error) { + n, err := w.out.Write(b) + + w.mu.Lock() + defer w.mu.Unlock() + + w.buf = append(w.buf, b...) + for { + i := bytes.IndexByte(w.buf, '\n') + if i < 0 { + break + } + if !w.discarding { + line := bytes.TrimSpace(w.buf[:i]) + if len(line) > 0 { + if len(line) > maxStatusLen { + line = line[:maxStatusLen] + } + w.lastErrMsg = string(line) + } + } + w.buf = w.buf[i+1:] + w.discarding = false + } + // if the buffer grows past maxStatusLen without a newline, keep the front + if len(w.buf) > maxStatusLen { + if !w.discarding { + w.lastErrMsg = string(bytes.TrimSpace(w.buf[:maxStatusLen])) + w.discarding = true + } + w.buf = w.buf[:0] + } + + return n, err +} + +func (w *statusWriter) getLastErr() string { + w.mu.Lock() + defer w.mu.Unlock() + return w.lastErrMsg +} + // NewClient prepares a new MLX runner client for LLM models. // The subprocess is not started until Load() is called. func NewClient(modelName string) (*Client, error) { @@ -66,12 +121,6 @@ func NewClient(modelName string) (*Client, error) { return c, nil } -func (c *Client) getLastErr() string { - c.lastErrLock.Lock() - defer c.lastErrLock.Unlock() - return c.lastErr -} - // WaitUntilRunning waits for the subprocess to be ready. func (c *Client) WaitUntilRunning(ctx context.Context) error { timeout := time.After(2 * time.Minute) @@ -83,15 +132,13 @@ func (c *Client) WaitUntilRunning(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() case err := <-c.done: - errMsg := c.getLastErr() - if errMsg != "" { - return fmt.Errorf("mlx runner failed: %s (exit: %v)", errMsg, err) + if msg := c.status.getLastErr(); msg != "" { + return fmt.Errorf("mlx runner failed: %s (exit: %v)", msg, err) } return fmt.Errorf("mlx runner exited unexpectedly: %w", err) case <-timeout: - errMsg := c.getLastErr() - if errMsg != "" { - return fmt.Errorf("timeout waiting for mlx runner: %s", errMsg) + if msg := c.status.getLastErr(); msg != "" { + return fmt.Errorf("timeout waiting for mlx runner: %s", msg) } return errors.New("timeout waiting for mlx runner to start") case <-ticker.C: @@ -348,18 +395,13 @@ func (c *Client) Load(ctx context.Context, _ ml.SystemInfo, gpus []ml.DeviceInfo // Forward subprocess stdout/stderr to server logs stdout, _ := cmd.StdoutPipe() stderr, _ := cmd.StderrPipe() + status := &statusWriter{out: os.Stderr} + c.status = status go func() { io.Copy(os.Stderr, stdout) //nolint:errcheck }() go func() { - scanner := bufio.NewScanner(stderr) - for scanner.Scan() { - line := scanner.Text() - fmt.Fprintln(os.Stderr, line) - c.lastErrLock.Lock() - c.lastErr = line - c.lastErrLock.Unlock() - } + io.Copy(status, stderr) //nolint:errcheck }() slog.Info("starting mlx runner subprocess", "model", c.modelName, "port", c.port)