Skip to content

Commit 656b45d

Browse files
committed
feat: put blob in containerd content store
1 parent 5b4ffa2 commit 656b45d

File tree

1 file changed

+31
-49
lines changed

1 file changed

+31
-49
lines changed

internal/storage/containerd/blob.go

Lines changed: 31 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ import (
77
"net/http"
88
"time"
99

10+
"github.com/containerd/containerd/v2/client"
1011
"github.com/containerd/containerd/v2/core/content"
11-
"github.com/containerd/containerd/v2/core/leases"
1212
"github.com/containerd/errdefs"
1313
"github.com/distribution/distribution/v3"
1414
"github.com/distribution/reference"
@@ -18,7 +18,7 @@ import (
1818

1919
// blobStore implements distribution.BlobStore backed by containerd image store.
2020
type blobStore struct {
21-
client *Client
21+
client *client.Client
2222
repo reference.Named
2323
}
2424

@@ -57,12 +57,12 @@ func (b *blobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error)
5757
}
5858

5959
// Open returns a reader for the blob.
60+
// TODO
6061
func (b *blobStore) Open(ctx context.Context, dgst digest.Digest) (io.ReadSeekCloser, error) {
61-
ctx = b.client.Context(ctx)
62-
6362
reader, err := b.client.ContentStore().ReaderAt(ctx, ocispec.Descriptor{Digest: dgst})
6463
if err != nil {
65-
return nil, convertError(err)
64+
// TODO: convert err if possible
65+
return nil, err
6666
}
6767

6868
// Wrap the ReaderAt as a ReadSeekCloser
@@ -73,65 +73,48 @@ func (b *blobStore) Open(ctx context.Context, dgst digest.Digest) (io.ReadSeekCl
7373
}, nil
7474
}
7575

76-
// Put stores a blob.
77-
// TODO: implement blob storage as one request
78-
func (b *blobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) {
79-
ctx = b.client.Context(ctx)
80-
81-
dgst := digest.FromBytes(p)
82-
83-
// Create a lease to prevent garbage collection
84-
lease, err := b.client.LeasesService().Create(ctx, leases.WithRandomID())
85-
if err != nil {
86-
return distribution.Descriptor{}, fmt.Errorf("failed to create lease: %w", err)
87-
}
88-
defer b.client.LeasesService().Delete(ctx, lease)
89-
90-
// Write the blob
91-
ref := fmt.Sprintf("%s@%s", b.repo.String(), dgst)
92-
writer, err := b.client.ContentStore().Writer(
93-
ctx,
94-
content.WithRef(ref),
95-
content.WithDescriptor(
96-
ocispec.Descriptor{
97-
MediaType: mediaType,
98-
Digest: dgst,
99-
Size: int64(len(p)),
100-
},
101-
),
102-
)
76+
// Put stores a blob in the containerd content store with the given media type. If the blob already exists,
77+
// it will return the existing descriptor without re-uploading the content. It should be used for small objects,
78+
// such as manifests.
79+
func (b *blobStore) Put(ctx context.Context, mediaType string, blob []byte) (distribution.Descriptor, error) {
80+
writer, err := newBlobWriter(ctx, b.client, b.repo, "")
10381
if err != nil {
104-
return distribution.Descriptor{}, fmt.Errorf("failed to create writer: %w", err)
82+
return distribution.Descriptor{}, err
10583
}
106-
107-
if _, err := writer.Write(p); err != nil {
84+
defer func() {
85+
if err != nil {
86+
// Clean up resources occupied by the writer if an error occurs.
87+
_ = writer.Cancel(ctx)
88+
}
10889
writer.Close()
109-
return distribution.Descriptor{}, fmt.Errorf("failed to write blob: %w", err)
110-
}
90+
}()
11191

112-
if err := writer.Commit(ctx, 0, dgst); err != nil {
113-
if !errdefs.IsAlreadyExists(err) {
114-
return distribution.Descriptor{}, fmt.Errorf("failed to commit blob: %w", err)
115-
}
92+
if _, err = writer.Write(blob); err != nil {
93+
return distribution.Descriptor{}, err
11694
}
11795

118-
return distribution.Descriptor{
96+
desc := distribution.Descriptor{
11997
MediaType: mediaType,
120-
Digest: dgst,
121-
Size: int64(len(p)),
122-
}, nil
98+
Digest: digest.FromBytes(blob),
99+
Size: int64(len(blob)),
100+
}
101+
if desc, err = writer.Commit(ctx, desc); err != nil {
102+
return distribution.Descriptor{}, err
103+
}
104+
105+
return desc, nil
123106
}
124107

125108
// Create creates a blob writer to add a blob to the containerd content store.`
126109
func (b *blobStore) Create(ctx context.Context, _ ...distribution.BlobCreateOption) (
127110
distribution.BlobWriter, error,
128111
) {
129-
return newBlobWriter(ctx, b.client.client, b.repo, "")
112+
return newBlobWriter(ctx, b.client, b.repo, "")
130113
}
131114

132115
// Resume creates a blob writer for resuming an upload with a specific ID.
133116
func (b *blobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) {
134-
return newBlobWriter(ctx, b.client.client, b.repo, id)
117+
return newBlobWriter(ctx, b.client, b.repo, id)
135118
}
136119

137120
// Mount is not supported for simplicity.
@@ -144,9 +127,8 @@ func (b *blobStore) Mount(ctx context.Context, sourceRepo reference.Named, dgst
144127
}
145128

146129
// ServeBlob serves the blob over HTTP.
130+
// TODO
147131
func (b *blobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
148-
ctx = b.client.Context(ctx)
149-
150132
reader, err := b.Open(ctx, dgst)
151133
if err != nil {
152134
return err

0 commit comments

Comments
 (0)