Skip to content

Commit 8fa67a6

Browse files
ktocksequix
authored andcommitted
Improve file read performance
Throughout the benchmarking in the community, it turned out the file read performance is low especially on random and parallel reads. This commit solves this by the following fixes: - minimizing the occurrence of slice allocation in the execution path of file read, leveraging sync.Pool, - minimizing the memory copy and disk I/O by allowing to fetch a partials range of blobs from the cache, and - minimizing the locked region in the cache. Signed-off-by: Kohei Tokunaga <[email protected]>
1 parent 738da1a commit 8fa67a6

File tree

9 files changed

+584
-262
lines changed

9 files changed

+584
-262
lines changed

cache/cache.go

Lines changed: 232 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
package cache
1818

1919
import (
20+
"bytes"
2021
"fmt"
2122
"io"
22-
"io/ioutil"
2323
"os"
2424
"path/filepath"
2525
"sync"
@@ -28,117 +28,211 @@ import (
2828
"github.com/pkg/errors"
2929
)
3030

31+
const (
32+
defaultMaxLRUCacheEntry = 10
33+
defaultMaxCacheFds = 10
34+
)
35+
36+
type DirectoryCacheConfig struct {
37+
MaxLRUCacheEntry int `toml:"max_lru_cache_entry"`
38+
MaxCacheFds int `toml:"max_cache_fds"`
39+
SyncAdd bool `toml:"sync_add"`
40+
}
41+
3142
// TODO: contents validation.
3243

3344
type BlobCache interface {
34-
Fetch(blobHash string) ([]byte, error)
35-
Add(blobHash string, p []byte)
45+
Add(key string, p []byte, opts ...Option)
46+
FetchAt(key string, offset int64, p []byte, opts ...Option) (n int, err error)
3647
}
3748

38-
type dirOpt struct {
39-
syncAdd bool
49+
type cacheOpt struct {
50+
direct bool
4051
}
4152

42-
type DirOption func(o *dirOpt) *dirOpt
53+
type Option func(o *cacheOpt) *cacheOpt
4354

44-
func SyncAdd() DirOption {
45-
return func(o *dirOpt) *dirOpt {
46-
o.syncAdd = true
55+
// When Direct option is specified for FetchAt and Add methods, these operation
56+
// won't use on-memory caches. When you know that the targeting value won't be
57+
// used immediately, you can prevent the limited space of on-memory caches from
58+
// being polluted by these unimportant values.
59+
func Direct() Option {
60+
return func(o *cacheOpt) *cacheOpt {
61+
o.direct = true
4762
return o
4863
}
4964
}
5065

51-
func NewDirectoryCache(directory string, memCacheSize int, opts ...DirOption) (BlobCache, error) {
52-
opt := &dirOpt{}
53-
for _, o := range opts {
54-
opt = o(opt)
66+
func NewDirectoryCache(directory string, config DirectoryCacheConfig) (BlobCache, error) {
67+
maxEntry := config.MaxLRUCacheEntry
68+
if maxEntry == 0 {
69+
maxEntry = defaultMaxLRUCacheEntry
70+
}
71+
maxFds := config.MaxCacheFds
72+
if maxFds == 0 {
73+
maxFds = defaultMaxCacheFds
5574
}
5675
if err := os.MkdirAll(directory, os.ModePerm); err != nil {
5776
return nil, err
5877
}
5978
dc := &directoryCache{
60-
cache: lru.New(memCacheSize),
79+
cache: newObjectCache(maxEntry),
80+
fileCache: newObjectCache(maxFds),
6181
directory: directory,
82+
bufPool: sync.Pool{
83+
New: func() interface{} {
84+
return new(bytes.Buffer)
85+
},
86+
},
6287
}
63-
if opt.syncAdd {
64-
dc.syncAdd = true
88+
dc.cache.finalize = func(value interface{}) {
89+
dc.bufPool.Put(value)
6590
}
91+
dc.fileCache.finalize = func(value interface{}) {
92+
value.(*os.File).Close()
93+
}
94+
dc.syncAdd = config.SyncAdd
6695
return dc, nil
6796
}
6897

6998
// directoryCache is a cache implementation which backend is a directory.
7099
type directoryCache struct {
71-
cache *lru.Cache
72-
cacheMu sync.Mutex
100+
cache *objectCache
101+
fileCache *objectCache
73102
directory string
74-
syncAdd bool
75-
fileMu sync.Mutex
103+
104+
bufPool sync.Pool
105+
106+
syncAdd bool
76107
}
77108

78-
func (dc *directoryCache) Fetch(blobHash string) (p []byte, err error) {
79-
dc.cacheMu.Lock()
80-
defer dc.cacheMu.Unlock()
109+
func (dc *directoryCache) FetchAt(key string, offset int64, p []byte, opts ...Option) (n int, err error) {
110+
opt := &cacheOpt{}
111+
for _, o := range opts {
112+
opt = o(opt)
113+
}
81114

82-
if cache, ok := dc.cache.Get(blobHash); ok {
83-
p, ok := cache.([]byte)
84-
if ok {
85-
return p, nil
115+
if !opt.direct {
116+
// Get data from memory
117+
if b, done, ok := dc.cache.get(key); ok {
118+
defer done()
119+
data := b.(*bytes.Buffer).Bytes()
120+
if int64(len(data)) < offset {
121+
return 0, fmt.Errorf("invalid offset %d exceeds chunk size %d",
122+
offset, len(data))
123+
}
124+
return copy(p, data[offset:]), nil
86125
}
87-
}
88126

89-
c := filepath.Join(dc.directory, blobHash[:2], blobHash)
90-
if _, err := os.Stat(c); err != nil {
91-
return nil, errors.Wrapf(err, "Missed cache %q", c)
127+
// Get data from disk. If the file is already opened, use it.
128+
if f, done, ok := dc.fileCache.get(key); ok {
129+
defer done()
130+
return f.(*os.File).ReadAt(p, offset)
131+
}
92132
}
93133

94-
file, err := os.Open(c)
134+
// Open the cache file and read the target region
135+
// TODO: If the target cache is write-in-progress, should we wait for the completion
136+
// or simply report the cache miss?
137+
file, err := os.Open(dc.cachePath(key))
95138
if err != nil {
96-
return nil, errors.Wrapf(err, "Failed to Open cached blob file %q", c)
139+
return 0, errors.Wrapf(err, "failed to open blob file for %q", key)
140+
}
141+
if n, err = file.ReadAt(p, offset); err == io.EOF {
142+
err = nil
97143
}
98-
defer file.Close()
99144

100-
if p, err = ioutil.ReadAll(file); err != nil && err != io.EOF {
101-
return nil, errors.Wrapf(err, "failed to read cached data %q", c)
145+
// Cache the opened file for future use. If "direct" option is specified, this
146+
// won't be done. This option is useful for preventing file cache from being
147+
// polluted by data that won't be accessed immediately.
148+
if opt.direct || !dc.fileCache.add(key, file) {
149+
file.Close()
102150
}
103-
dc.cache.Add(blobHash, p)
104151

105-
return
152+
// TODO: should we cache the entire file data on memory?
153+
// but making I/O (possibly huge) on every fetching
154+
// might be costly.
155+
156+
return n, err
106157
}
107158

108-
func (dc *directoryCache) Add(blobHash string, p []byte) {
109-
// Copy the original data for avoiding the cached contents to be edited accidentally
110-
p2 := make([]byte, len(p))
111-
copy(p2, p)
112-
p = p2
159+
func (dc *directoryCache) Add(key string, p []byte, opts ...Option) {
160+
opt := &cacheOpt{}
161+
for _, o := range opts {
162+
opt = o(opt)
163+
}
113164

114-
dc.cacheMu.Lock()
115-
dc.cache.Add(blobHash, p)
116-
dc.cacheMu.Unlock()
165+
if !opt.direct {
166+
// Cache the passed data on memory. This enables to serve this data even
167+
// during writing it to the disk. If "direct" option is specified, this
168+
// won't be done. This option is useful for preventing memory cache from being
169+
// polluted by data that won't be accessed immediately.
170+
b := dc.bufPool.Get().(*bytes.Buffer)
171+
b.Reset()
172+
b.Write(p)
173+
if !dc.cache.add(key, b) {
174+
dc.bufPool.Put(b) // Already exists. No need to cache.
175+
}
176+
}
117177

178+
// Cache the passed data to disk.
179+
b2 := dc.bufPool.Get().(*bytes.Buffer)
180+
b2.Reset()
181+
b2.Write(p)
118182
addFunc := func() {
119-
dc.fileMu.Lock()
120-
defer dc.fileMu.Unlock()
183+
defer dc.bufPool.Put(b2)
121184

122-
// Check if cache exists.
123-
c := filepath.Join(dc.directory, blobHash[:2], blobHash)
185+
var (
186+
c = dc.cachePath(key)
187+
wip = dc.wipPath(key)
188+
)
189+
if _, err := os.Stat(wip); err == nil {
190+
return // Write in progress
191+
}
124192
if _, err := os.Stat(c); err == nil {
193+
return // Already exists.
194+
}
195+
196+
// Write the contents to a temporary file
197+
if err := os.MkdirAll(filepath.Dir(wip), os.ModePerm); err != nil {
198+
fmt.Printf("Warning: Failed to Create blob cache directory %q: %v\n", c, err)
199+
return
200+
}
201+
wipfile, err := os.Create(wip)
202+
if err != nil {
203+
fmt.Printf("Warning: failed to prepare temp file for storing cache %q", key)
204+
return
205+
}
206+
defer func() {
207+
wipfile.Close()
208+
os.Remove(wipfile.Name())
209+
}()
210+
want := b2.Len()
211+
if _, err := io.CopyN(wipfile, b2, int64(want)); err != nil {
212+
fmt.Printf("Warning: failed to write cache: %v\n", err)
125213
return
126214
}
127215

128-
// Create cache file
216+
// Commit the cache contents
129217
if err := os.MkdirAll(filepath.Dir(c), os.ModePerm); err != nil {
130218
fmt.Printf("Warning: Failed to Create blob cache directory %q: %v\n", c, err)
131219
return
132220
}
133-
f, err := os.Create(c)
221+
if err := os.Rename(wipfile.Name(), c); err != nil {
222+
fmt.Printf("Warning: failed to commit cache to %q: %v\n", c, err)
223+
return
224+
}
225+
file, err := os.Open(c)
134226
if err != nil {
135-
fmt.Printf("Warning: could not create a cache file at %q: %v\n", c, err)
227+
fmt.Printf("Warning: failed to open cache on %q: %v\n", c, err)
136228
return
137229
}
138-
defer f.Close()
139-
if n, err := f.Write(p); err != nil || n != len(p) {
140-
fmt.Printf("Warning: failed to write cache: %d(wrote)/%d(expected): %v\n",
141-
n, len(p), err)
230+
231+
// Cache the opened file for future use. If "direct" option is specified, this
232+
// won't be done. This option is useful for preventing file cache from being
233+
// polluted by data that won't be accessed immediately.
234+
if opt.direct || !dc.fileCache.add(key, file) {
235+
file.Close()
142236
}
143237
}
144238

@@ -149,6 +243,81 @@ func (dc *directoryCache) Add(blobHash string, p []byte) {
149243
}
150244
}
151245

246+
func (dc *directoryCache) cachePath(key string) string {
247+
return filepath.Join(dc.directory, key[:2], key)
248+
}
249+
250+
func (dc *directoryCache) wipPath(key string) string {
251+
return filepath.Join(dc.directory, key[:2], "w", key)
252+
}
253+
254+
func newObjectCache(maxEntries int) *objectCache {
255+
oc := &objectCache{
256+
cache: lru.New(maxEntries),
257+
}
258+
oc.cache.OnEvicted = func(key lru.Key, value interface{}) {
259+
value.(*object).release() // Decrease ref count incremented in add operation.
260+
}
261+
return oc
262+
}
263+
264+
type objectCache struct {
265+
cache *lru.Cache
266+
cacheMu sync.Mutex
267+
finalize func(interface{})
268+
}
269+
270+
func (oc *objectCache) get(key string) (value interface{}, done func(), ok bool) {
271+
oc.cacheMu.Lock()
272+
defer oc.cacheMu.Unlock()
273+
o, ok := oc.cache.Get(key)
274+
if !ok {
275+
return nil, nil, false
276+
}
277+
o.(*object).use()
278+
return o.(*object).v, func() { o.(*object).release() }, true
279+
}
280+
281+
func (oc *objectCache) add(key string, value interface{}) bool {
282+
oc.cacheMu.Lock()
283+
defer oc.cacheMu.Unlock()
284+
if _, ok := oc.cache.Get(key); ok {
285+
return false // TODO: should we swap the object?
286+
}
287+
o := &object{
288+
v: value,
289+
finalize: oc.finalize,
290+
}
291+
o.use() // Keep this object having at least 1 ref count (will be decreased on eviction)
292+
oc.cache.Add(key, o)
293+
return true
294+
}
295+
296+
type object struct {
297+
v interface{}
298+
299+
refCounts int64
300+
finalize func(interface{})
301+
302+
mu sync.Mutex
303+
}
304+
305+
func (o *object) use() {
306+
o.mu.Lock()
307+
defer o.mu.Unlock()
308+
o.refCounts++
309+
}
310+
311+
func (o *object) release() {
312+
o.mu.Lock()
313+
defer o.mu.Unlock()
314+
o.refCounts--
315+
if o.refCounts <= 0 && o.finalize != nil {
316+
// nobody will refer this object
317+
o.finalize(o.v)
318+
}
319+
}
320+
152321
func NewMemoryCache() BlobCache {
153322
return &memoryCache{
154323
membuf: map[string]string{},
@@ -161,19 +330,19 @@ type memoryCache struct {
161330
mu sync.Mutex
162331
}
163332

164-
func (mc *memoryCache) Fetch(blobHash string) ([]byte, error) {
333+
func (mc *memoryCache) FetchAt(key string, offset int64, p []byte, opts ...Option) (n int, err error) {
165334
mc.mu.Lock()
166335
defer mc.mu.Unlock()
167336

168-
cache, ok := mc.membuf[blobHash]
337+
cache, ok := mc.membuf[key]
169338
if !ok {
170-
return nil, fmt.Errorf("Missed cache: %q", blobHash)
339+
return 0, fmt.Errorf("Missed cache: %q", key)
171340
}
172-
return []byte(cache), nil
341+
return copy(p, cache[offset:]), nil
173342
}
174343

175-
func (mc *memoryCache) Add(blobHash string, p []byte) {
344+
func (mc *memoryCache) Add(key string, p []byte, opts ...Option) {
176345
mc.mu.Lock()
177346
defer mc.mu.Unlock()
178-
mc.membuf[blobHash] = string(p)
347+
mc.membuf[key] = string(p)
179348
}

0 commit comments

Comments
 (0)