Files
docker-docs/graph/push_v2.go
Tonis Tiigi fb4a725692 Don’t overwrite layer checksum on push
After v1.8.3 layer checksum is used for image ID
validation. Rewriting the checksums on push would
mean that next pulls will get different image IDs
and pulls may fail if its detected that same
manifest digest can now point to new image ID.

Fixes #17178

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
2015-10-19 11:44:16 -07:00

275 lines
7.2 KiB
Go

package graph
import (
"fmt"
"io"
"io/ioutil"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest"
"github.com/docker/docker/image"
"github.com/docker/docker/pkg/progressreader"
"github.com/docker/docker/pkg/streamformatter"
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/registry"
"github.com/docker/docker/runconfig"
"github.com/docker/docker/utils"
"golang.org/x/net/context"
)
type v2Pusher struct {
*TagStore
endpoint registry.APIEndpoint
localRepo Repository
repoInfo *registry.RepositoryInfo
config *ImagePushConfig
sf *streamformatter.StreamFormatter
repo distribution.Repository
// layersPushed is the set of layers known to exist on the remote side.
// This avoids redundant queries when pushing multiple tags that
// involve the same layers.
layersPushed map[digest.Digest]bool
}
func (p *v2Pusher) Push() (fallback bool, err error) {
p.repo, err = NewV2Repository(p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "push", "pull")
if err != nil {
logrus.Debugf("Error getting v2 registry: %v", err)
return true, err
}
return false, p.pushV2Repository(p.config.Tag)
}
func (p *v2Pusher) getImageTags(askedTag string) ([]string, error) {
logrus.Debugf("Checking %q against %#v", askedTag, p.localRepo)
if len(askedTag) > 0 {
if _, ok := p.localRepo[askedTag]; !ok || utils.DigestReference(askedTag) {
return nil, fmt.Errorf("Tag does not exist for %s", askedTag)
}
return []string{askedTag}, nil
}
var tags []string
for tag := range p.localRepo {
if !utils.DigestReference(tag) {
tags = append(tags, tag)
}
}
return tags, nil
}
func (p *v2Pusher) pushV2Repository(tag string) error {
localName := p.repoInfo.LocalName
if _, found := p.poolAdd("push", localName); found {
return fmt.Errorf("push or pull %s is already in progress", localName)
}
defer p.poolRemove("push", localName)
tags, err := p.getImageTags(tag)
if err != nil {
return fmt.Errorf("error getting tags for %s: %s", localName, err)
}
if len(tags) == 0 {
return fmt.Errorf("no tags to push for %s", localName)
}
for _, tag := range tags {
if err := p.pushV2Tag(tag); err != nil {
return err
}
}
return nil
}
func (p *v2Pusher) pushV2Tag(tag string) error {
logrus.Debugf("Pushing repository: %s:%s", p.repo.Name(), tag)
layerID, exists := p.localRepo[tag]
if !exists {
return fmt.Errorf("tag does not exist: %s", tag)
}
layersSeen := make(map[string]bool)
layer, err := p.graph.Get(layerID)
if err != nil {
return err
}
m := &manifest.Manifest{
Versioned: manifest.Versioned{
SchemaVersion: 1,
},
Name: p.repo.Name(),
Tag: tag,
Architecture: layer.Architecture,
FSLayers: []manifest.FSLayer{},
History: []manifest.History{},
}
var metadata runconfig.Config
if layer != nil && layer.Config != nil {
metadata = *layer.Config
}
out := p.config.OutStream
for ; layer != nil; layer, err = p.graph.GetParent(layer) {
if err != nil {
return err
}
// break early if layer has already been seen in this image,
// this prevents infinite loops on layers which loopback, this
// cannot be prevented since layer IDs are not merkle hashes
// TODO(dmcgowan): throw error if no valid use case is found
if layersSeen[layer.ID] {
break
}
logrus.Debugf("Pushing layer: %s", layer.ID)
if layer.Config != nil && metadata.Image != layer.ID {
if err := runconfig.Merge(&metadata, layer.Config); err != nil {
return err
}
}
var exists bool
dgst, err := p.graph.GetLayerDigest(layer.ID)
switch err {
case nil:
if p.layersPushed[dgst] {
exists = true
// break out of switch, it is already known that
// the push is not needed and therefore doing a
// stat is unnecessary
break
}
_, err := p.repo.Blobs(context.Background()).Stat(context.Background(), dgst)
switch err {
case nil:
exists = true
out.Write(p.sf.FormatProgress(stringid.TruncateID(layer.ID), "Image already exists", nil))
case distribution.ErrBlobUnknown:
// nop
default:
out.Write(p.sf.FormatProgress(stringid.TruncateID(layer.ID), "Image push failed", nil))
return err
}
case ErrDigestNotSet:
// nop
case digest.ErrDigestInvalidFormat, digest.ErrDigestUnsupported:
return fmt.Errorf("error getting image checksum: %v", err)
}
// if digest was empty or not saved, or if blob does not exist on the remote repository,
// then fetch it.
if !exists {
var pushDigest digest.Digest
if pushDigest, err = p.pushV2Image(p.repo.Blobs(context.Background()), layer); err != nil {
return err
}
if dgst == "" {
// Cache new checksum
if err := p.graph.SetLayerDigest(layer.ID, pushDigest); err != nil {
return err
}
}
dgst = pushDigest
}
// read v1Compatibility config, generate new if needed
jsonData, err := p.graph.GenerateV1CompatibilityChain(layer.ID)
if err != nil {
return err
}
m.FSLayers = append(m.FSLayers, manifest.FSLayer{BlobSum: dgst})
m.History = append(m.History, manifest.History{V1Compatibility: string(jsonData)})
layersSeen[layer.ID] = true
p.layersPushed[dgst] = true
}
logrus.Infof("Signed manifest for %s:%s using daemon's key: %s", p.repo.Name(), tag, p.trustKey.KeyID())
signed, err := manifest.Sign(m, p.trustKey)
if err != nil {
return err
}
manifestDigest, manifestSize, err := digestFromManifest(signed, p.repo.Name())
if err != nil {
return err
}
if manifestDigest != "" {
out.Write(p.sf.FormatStatus("", "%s: digest: %s size: %d", tag, manifestDigest, manifestSize))
}
manSvc, err := p.repo.Manifests(context.Background())
if err != nil {
return err
}
return manSvc.Put(signed)
}
func (p *v2Pusher) pushV2Image(bs distribution.BlobService, img *image.Image) (digest.Digest, error) {
out := p.config.OutStream
out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Preparing", nil))
image, err := p.graph.Get(img.ID)
if err != nil {
return "", err
}
arch, err := p.graph.TarLayer(image)
if err != nil {
return "", err
}
defer arch.Close()
// Send the layer
layerUpload, err := bs.Create(context.Background())
if err != nil {
return "", err
}
defer layerUpload.Close()
digester := digest.Canonical.New()
tee := io.TeeReader(arch, digester.Hash())
reader := progressreader.New(progressreader.Config{
In: ioutil.NopCloser(tee), // we'll take care of close here.
Out: out,
Formatter: p.sf,
// TODO(stevvooe): This may cause a size reporting error. Try to get
// this from tar-split or elsewhere. The main issue here is that we
// don't want to buffer to disk *just* to calculate the size.
Size: img.Size,
NewLines: false,
ID: stringid.TruncateID(img.ID),
Action: "Pushing",
})
out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Pushing", nil))
nn, err := io.Copy(layerUpload, reader)
if err != nil {
return "", err
}
dgst := digester.Digest()
if _, err := layerUpload.Commit(context.Background(), distribution.Descriptor{Digest: dgst}); err != nil {
return "", err
}
logrus.Debugf("uploaded layer %s (%s), %d bytes", img.ID, dgst, nn)
out.Write(p.sf.FormatProgress(stringid.TruncateID(img.ID), "Pushed", nil))
return dgst, nil
}