mirror of
https://github.com/ollama/ollama.git
synced 2026-03-27 02:58:43 +07:00
mlx: fix subprocess log deadlock
The stderr reader used bufio.Scanner which has a 64KB max line size. If the subprocess wrote a line exceeding this limit, the scanner would stop reading, the OS pipe buffer would fill, and the subprocess would deadlock. Replace the scanner with a statusWriter that wraps io.Copy. The writer forwards all stderr to os.Stderr while capturing the last short line (≤256 bytes) for error reporting, avoiding both the deadlock and the need to buffer arbitrarily long lines.
This commit is contained in:
@@ -2,6 +2,7 @@ package mlxrunner
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
@@ -38,12 +39,66 @@ type Client struct {
|
|||||||
memory atomic.Uint64
|
memory atomic.Uint64
|
||||||
done chan error
|
done chan error
|
||||||
client *http.Client
|
client *http.Client
|
||||||
lastErr string
|
status *statusWriter
|
||||||
lastErrLock sync.Mutex
|
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
cmd *exec.Cmd
|
cmd *exec.Cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// statusWriter captures the last stderr line from the subprocess while
|
||||||
|
// forwarding all output to os.Stderr. Lines longer than maxStatusLen are
|
||||||
|
// truncated to the first maxStatusLen bytes.
|
||||||
|
type statusWriter struct {
|
||||||
|
lastErrMsg string
|
||||||
|
buf []byte
|
||||||
|
discarding bool
|
||||||
|
mu sync.Mutex
|
||||||
|
out *os.File
|
||||||
|
}
|
||||||
|
|
||||||
|
const maxStatusLen = 256
|
||||||
|
|
||||||
|
func (w *statusWriter) Write(b []byte) (int, error) {
|
||||||
|
n, err := w.out.Write(b)
|
||||||
|
|
||||||
|
w.mu.Lock()
|
||||||
|
defer w.mu.Unlock()
|
||||||
|
|
||||||
|
w.buf = append(w.buf, b...)
|
||||||
|
for {
|
||||||
|
i := bytes.IndexByte(w.buf, '\n')
|
||||||
|
if i < 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if !w.discarding {
|
||||||
|
line := bytes.TrimSpace(w.buf[:i])
|
||||||
|
if len(line) > 0 {
|
||||||
|
if len(line) > maxStatusLen {
|
||||||
|
line = line[:maxStatusLen]
|
||||||
|
}
|
||||||
|
w.lastErrMsg = string(line)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
w.buf = w.buf[i+1:]
|
||||||
|
w.discarding = false
|
||||||
|
}
|
||||||
|
// if the buffer grows past maxStatusLen without a newline, keep the front
|
||||||
|
if len(w.buf) > maxStatusLen {
|
||||||
|
if !w.discarding {
|
||||||
|
w.lastErrMsg = string(bytes.TrimSpace(w.buf[:maxStatusLen]))
|
||||||
|
w.discarding = true
|
||||||
|
}
|
||||||
|
w.buf = w.buf[:0]
|
||||||
|
}
|
||||||
|
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *statusWriter) getLastErr() string {
|
||||||
|
w.mu.Lock()
|
||||||
|
defer w.mu.Unlock()
|
||||||
|
return w.lastErrMsg
|
||||||
|
}
|
||||||
|
|
||||||
// NewClient prepares a new MLX runner client for LLM models.
|
// NewClient prepares a new MLX runner client for LLM models.
|
||||||
// The subprocess is not started until Load() is called.
|
// The subprocess is not started until Load() is called.
|
||||||
func NewClient(modelName string) (*Client, error) {
|
func NewClient(modelName string) (*Client, error) {
|
||||||
@@ -66,12 +121,6 @@ func NewClient(modelName string) (*Client, error) {
|
|||||||
return c, nil
|
return c, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) getLastErr() string {
|
|
||||||
c.lastErrLock.Lock()
|
|
||||||
defer c.lastErrLock.Unlock()
|
|
||||||
return c.lastErr
|
|
||||||
}
|
|
||||||
|
|
||||||
// WaitUntilRunning waits for the subprocess to be ready.
|
// WaitUntilRunning waits for the subprocess to be ready.
|
||||||
func (c *Client) WaitUntilRunning(ctx context.Context) error {
|
func (c *Client) WaitUntilRunning(ctx context.Context) error {
|
||||||
timeout := time.After(2 * time.Minute)
|
timeout := time.After(2 * time.Minute)
|
||||||
@@ -83,15 +132,13 @@ func (c *Client) WaitUntilRunning(ctx context.Context) error {
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
case err := <-c.done:
|
case err := <-c.done:
|
||||||
errMsg := c.getLastErr()
|
if msg := c.status.getLastErr(); msg != "" {
|
||||||
if errMsg != "" {
|
return fmt.Errorf("mlx runner failed: %s (exit: %v)", msg, err)
|
||||||
return fmt.Errorf("mlx runner failed: %s (exit: %v)", errMsg, err)
|
|
||||||
}
|
}
|
||||||
return fmt.Errorf("mlx runner exited unexpectedly: %w", err)
|
return fmt.Errorf("mlx runner exited unexpectedly: %w", err)
|
||||||
case <-timeout:
|
case <-timeout:
|
||||||
errMsg := c.getLastErr()
|
if msg := c.status.getLastErr(); msg != "" {
|
||||||
if errMsg != "" {
|
return fmt.Errorf("timeout waiting for mlx runner: %s", msg)
|
||||||
return fmt.Errorf("timeout waiting for mlx runner: %s", errMsg)
|
|
||||||
}
|
}
|
||||||
return errors.New("timeout waiting for mlx runner to start")
|
return errors.New("timeout waiting for mlx runner to start")
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
@@ -348,18 +395,13 @@ func (c *Client) Load(ctx context.Context, _ ml.SystemInfo, gpus []ml.DeviceInfo
|
|||||||
// Forward subprocess stdout/stderr to server logs
|
// Forward subprocess stdout/stderr to server logs
|
||||||
stdout, _ := cmd.StdoutPipe()
|
stdout, _ := cmd.StdoutPipe()
|
||||||
stderr, _ := cmd.StderrPipe()
|
stderr, _ := cmd.StderrPipe()
|
||||||
|
status := &statusWriter{out: os.Stderr}
|
||||||
|
c.status = status
|
||||||
go func() {
|
go func() {
|
||||||
io.Copy(os.Stderr, stdout) //nolint:errcheck
|
io.Copy(os.Stderr, stdout) //nolint:errcheck
|
||||||
}()
|
}()
|
||||||
go func() {
|
go func() {
|
||||||
scanner := bufio.NewScanner(stderr)
|
io.Copy(status, stderr) //nolint:errcheck
|
||||||
for scanner.Scan() {
|
|
||||||
line := scanner.Text()
|
|
||||||
fmt.Fprintln(os.Stderr, line)
|
|
||||||
c.lastErrLock.Lock()
|
|
||||||
c.lastErr = line
|
|
||||||
c.lastErrLock.Unlock()
|
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
slog.Info("starting mlx runner subprocess", "model", c.modelName, "port", c.port)
|
slog.Info("starting mlx runner subprocess", "model", c.modelName, "port", c.port)
|
||||||
|
|||||||
Reference in New Issue
Block a user