package transfer import ( "bytes" "cmp" "context" "errors" "fmt" "io" "log/slog" "net/http" "net/url" "os" "path/filepath" "time" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" ) type uploader struct { client *http.Client baseURL string srcDir string repository string // Repository path for blob URLs (e.g., "library/model") token *string getToken func(context.Context, AuthChallenge) (string, error) userAgent string progress *progressTracker logger *slog.Logger } func upload(ctx context.Context, opts UploadOptions) error { if len(opts.Blobs) == 0 && len(opts.Manifest) == 0 { return nil } token := opts.Token u := &uploader{ client: cmp.Or(opts.Client, defaultClient), baseURL: opts.BaseURL, srcDir: opts.SrcDir, repository: cmp.Or(opts.Repository, "library/_"), token: &token, getToken: opts.GetToken, userAgent: cmp.Or(opts.UserAgent, defaultUserAgent), logger: opts.Logger, } if len(opts.Blobs) > 0 { // Phase 1: Fast parallel HEAD checks to find which blobs need uploading needsUpload := make([]bool, len(opts.Blobs)) { sem := semaphore.NewWeighted(128) // High concurrency for HEAD checks g, gctx := errgroup.WithContext(ctx) for i, blob := range opts.Blobs { g.Go(func() error { if err := sem.Acquire(gctx, 1); err != nil { return err } defer sem.Release(1) exists, err := u.exists(gctx, blob) if err != nil { return err } if !exists { needsUpload[i] = true } else if u.logger != nil { u.logger.Debug("blob exists", "digest", blob.Digest) } return nil }) } if err := g.Wait(); err != nil { return err } } // Filter to only blobs that need uploading var toUpload []Blob var total int64 for i, blob := range opts.Blobs { if needsUpload[i] { toUpload = append(toUpload, blob) total += blob.Size } } if len(toUpload) == 0 { if u.logger != nil { u.logger.Debug("all blobs exist, nothing to upload") } } else { // Phase 2: Upload blobs that don't exist u.progress = newProgressTracker(total, opts.Progress) concurrency := cmp.Or(opts.Concurrency, DefaultUploadConcurrency) sem := semaphore.NewWeighted(int64(concurrency)) g, gctx := errgroup.WithContext(ctx) for _, blob := range toUpload { g.Go(func() error { if err := sem.Acquire(gctx, 1); err != nil { return err } defer sem.Release(1) return u.upload(gctx, blob) }) } if err := g.Wait(); err != nil { return err } } } if len(opts.Manifest) > 0 && opts.ManifestRef != "" && opts.Repository != "" { return u.pushManifest(ctx, opts.Repository, opts.ManifestRef, opts.Manifest) } return nil } func (u *uploader) upload(ctx context.Context, blob Blob) error { var lastErr error var n int64 for attempt := range maxRetries { if attempt > 0 { if err := backoff(ctx, attempt, time.Second< 0 { r.n += int64(n) r.tracker.add(int64(n)) } return n, err }