From d7c176ab91724751df0eb74ee1df1f8571b515ce Mon Sep 17 00:00:00 2001 From: Jesse Gross Date: Wed, 18 Mar 2026 16:30:43 -0700 Subject: [PATCH] llm, mlxrunner: fix done channel value consumed by first receiver Receiving from a buffered chan error consumes the value, so only the first caller (WaitUntilRunning, HasExited, or Close) sees the signal. Subsequent receivers block or take the wrong branch. Replace with a closed chan struct{} which can be received from any number of times, and store the error in a separate field. --- llm/server.go | 14 ++++++++------ x/mlxrunner/client.go | 15 ++++++++------- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/llm/server.go b/llm/server.go index 291dd47fe..e8fa4cce0 100644 --- a/llm/server.go +++ b/llm/server.go @@ -87,7 +87,8 @@ type LlamaServer interface { type llmServer struct { port int cmd *exec.Cmd - done chan error // Channel to signal when the process exits + done chan struct{} // closed when the process exits + doneErr error // valid after done is closed status *StatusWriter options api.Options modelPath string @@ -280,7 +281,7 @@ func NewLlamaServer(systemInfo ml.SystemInfo, gpus []ml.DeviceInfo, modelPath st sem: semaphore.NewWeighted(int64(numParallel)), totalLayers: f.KV().BlockCount() + 1, loadStart: time.Now(), - done: make(chan error, 1), + done: make(chan struct{}), } if err != nil { @@ -304,10 +305,11 @@ func NewLlamaServer(systemInfo ml.SystemInfo, gpus []ml.DeviceInfo, modelPath st if strings.Contains(s.status.LastErrMsg, "unknown model") { s.status.LastErrMsg = "this model is not supported by your version of Ollama. You may need to upgrade" } - s.done <- errors.New(s.status.LastErrMsg) + s.doneErr = errors.New(s.status.LastErrMsg) } else { - s.done <- err + s.doneErr = err } + close(s.done) }() if tok != nil { @@ -1356,8 +1358,8 @@ func (s *llmServer) WaitUntilRunning(ctx context.Context) error { case <-ctx.Done(): slog.Warn("client connection closed before server finished loading, aborting load") return fmt.Errorf("timed out waiting for llama runner to start: %w", ctx.Err()) - case err := <-s.done: - return fmt.Errorf("llama runner process has terminated: %w", err) + case <-s.done: + return fmt.Errorf("llama runner process has terminated: %w", s.doneErr) default: } if time.Now().After(stallTimer) { diff --git a/x/mlxrunner/client.go b/x/mlxrunner/client.go index 824f20939..989581113 100644 --- a/x/mlxrunner/client.go +++ b/x/mlxrunner/client.go @@ -37,7 +37,8 @@ type Client struct { modelName string contextLength atomic.Int64 memory atomic.Uint64 - done chan error + done chan struct{} + doneErr error // valid after done is closed client *http.Client status *statusWriter mu sync.Mutex @@ -108,7 +109,7 @@ func NewClient(modelName string) (*Client, error) { c := &Client{ modelName: modelName, - done: make(chan error, 1), + done: make(chan struct{}), client: &http.Client{Timeout: 10 * time.Minute}, } @@ -131,11 +132,11 @@ func (c *Client) WaitUntilRunning(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() - case err := <-c.done: + case <-c.done: if msg := c.status.getLastErr(); msg != "" { - return fmt.Errorf("mlx runner failed: %s (exit: %v)", msg, err) + return fmt.Errorf("mlx runner failed: %s (exit: %v)", msg, c.doneErr) } - return fmt.Errorf("mlx runner exited unexpectedly: %w", err) + return fmt.Errorf("mlx runner exited unexpectedly: %w", c.doneErr) case <-timeout: if msg := c.status.getLastErr(); msg != "" { return fmt.Errorf("timeout waiting for mlx runner: %s", msg) @@ -411,8 +412,8 @@ func (c *Client) Load(ctx context.Context, _ ml.SystemInfo, gpus []ml.DeviceInfo // Reap subprocess when it exits go func() { - err := cmd.Wait() - c.done <- err + c.doneErr = cmd.Wait() + close(c.done) }() return nil, nil