mirror of
https://github.com/ollama/ollama.git
synced 2026-03-27 02:58:43 +07:00
llm, mlxrunner: fix done channel value consumed by first receiver
Receiving from a buffered chan error consumes the value, so only the
first caller (WaitUntilRunning, HasExited, or Close) sees the signal.
Subsequent receivers block or take the wrong branch. Replace with a
closed chan struct{} which can be received from any number of times,
and store the error in a separate field.
This commit is contained in:
@@ -87,7 +87,8 @@ type LlamaServer interface {
|
||||
type llmServer struct {
|
||||
port int
|
||||
cmd *exec.Cmd
|
||||
done chan error // Channel to signal when the process exits
|
||||
done chan struct{} // closed when the process exits
|
||||
doneErr error // valid after done is closed
|
||||
status *StatusWriter
|
||||
options api.Options
|
||||
modelPath string
|
||||
@@ -280,7 +281,7 @@ func NewLlamaServer(systemInfo ml.SystemInfo, gpus []ml.DeviceInfo, modelPath st
|
||||
sem: semaphore.NewWeighted(int64(numParallel)),
|
||||
totalLayers: f.KV().BlockCount() + 1,
|
||||
loadStart: time.Now(),
|
||||
done: make(chan error, 1),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
@@ -304,10 +305,11 @@ func NewLlamaServer(systemInfo ml.SystemInfo, gpus []ml.DeviceInfo, modelPath st
|
||||
if strings.Contains(s.status.LastErrMsg, "unknown model") {
|
||||
s.status.LastErrMsg = "this model is not supported by your version of Ollama. You may need to upgrade"
|
||||
}
|
||||
s.done <- errors.New(s.status.LastErrMsg)
|
||||
s.doneErr = errors.New(s.status.LastErrMsg)
|
||||
} else {
|
||||
s.done <- err
|
||||
s.doneErr = err
|
||||
}
|
||||
close(s.done)
|
||||
}()
|
||||
|
||||
if tok != nil {
|
||||
@@ -1356,8 +1358,8 @@ func (s *llmServer) WaitUntilRunning(ctx context.Context) error {
|
||||
case <-ctx.Done():
|
||||
slog.Warn("client connection closed before server finished loading, aborting load")
|
||||
return fmt.Errorf("timed out waiting for llama runner to start: %w", ctx.Err())
|
||||
case err := <-s.done:
|
||||
return fmt.Errorf("llama runner process has terminated: %w", err)
|
||||
case <-s.done:
|
||||
return fmt.Errorf("llama runner process has terminated: %w", s.doneErr)
|
||||
default:
|
||||
}
|
||||
if time.Now().After(stallTimer) {
|
||||
|
||||
@@ -37,7 +37,8 @@ type Client struct {
|
||||
modelName string
|
||||
contextLength atomic.Int64
|
||||
memory atomic.Uint64
|
||||
done chan error
|
||||
done chan struct{}
|
||||
doneErr error // valid after done is closed
|
||||
client *http.Client
|
||||
status *statusWriter
|
||||
mu sync.Mutex
|
||||
@@ -108,7 +109,7 @@ func NewClient(modelName string) (*Client, error) {
|
||||
|
||||
c := &Client{
|
||||
modelName: modelName,
|
||||
done: make(chan error, 1),
|
||||
done: make(chan struct{}),
|
||||
client: &http.Client{Timeout: 10 * time.Minute},
|
||||
}
|
||||
|
||||
@@ -131,11 +132,11 @@ func (c *Client) WaitUntilRunning(ctx context.Context) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case err := <-c.done:
|
||||
case <-c.done:
|
||||
if msg := c.status.getLastErr(); msg != "" {
|
||||
return fmt.Errorf("mlx runner failed: %s (exit: %v)", msg, err)
|
||||
return fmt.Errorf("mlx runner failed: %s (exit: %v)", msg, c.doneErr)
|
||||
}
|
||||
return fmt.Errorf("mlx runner exited unexpectedly: %w", err)
|
||||
return fmt.Errorf("mlx runner exited unexpectedly: %w", c.doneErr)
|
||||
case <-timeout:
|
||||
if msg := c.status.getLastErr(); msg != "" {
|
||||
return fmt.Errorf("timeout waiting for mlx runner: %s", msg)
|
||||
@@ -411,8 +412,8 @@ func (c *Client) Load(ctx context.Context, _ ml.SystemInfo, gpus []ml.DeviceInfo
|
||||
|
||||
// Reap subprocess when it exits
|
||||
go func() {
|
||||
err := cmd.Wait()
|
||||
c.done <- err
|
||||
c.doneErr = cmd.Wait()
|
||||
close(c.done)
|
||||
}()
|
||||
|
||||
return nil, nil
|
||||
|
||||
Reference in New Issue
Block a user