Skip to content
This repository was archived by the owner on Sep 30, 2024. It is now read-only.
Merged

Use conc #52805

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions internal/goroutine/BUILD.bazel

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions internal/uploadstore/BUILD.bazel

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/uploadstore/gcs_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (s *gcsStore) create(ctx context.Context, bucket gcsBucketHandle) error {
}

func (s *gcsStore) deleteSources(ctx context.Context, bucket gcsBucketHandle, sources []string) error {
return RunWorkersOverStrings(sources, func(index int, source string) error {
return ForEachString(sources, func(index int, source string) error {
if err := bucket.Object(source).Delete(ctx); err != nil {
return errors.Wrap(err, "failed to delete source object")
}
Expand Down
83 changes: 13 additions & 70 deletions internal/uploadstore/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,81 +2,24 @@ package uploadstore

import (
"runtime"
"sync"

"github.com/sourcegraph/sourcegraph/lib/errors"
"github.com/sourcegraph/conc/pool"
)

// poolWorker is a function invoked by RunWorkers that sends
// any errors that occur during execution down a shared channel.
type poolWorker func(errs chan<- error)

// runWorkersN invokes the given worker n times and collects the
// errors from each invocation.
func runWorkersN(n int, worker poolWorker) (err error) {
errs := make(chan error, n)

var wg sync.WaitGroup
for i := 0; i < n; i++ {
wg.Add(1)
go func() { worker(errs); wg.Done() }()
}

go func() {
wg.Wait()
close(errs)
}()

for e := range errs {
if err == nil {
err = e
} else {
err = errors.Append(err, e)
}
}

return err
}

// RunWorkersOverStrings invokes the given worker once for each of the
// given string values. The worker function will receive the index as well
// as the string value as parameters. Workers will be invoked in a number
// ForEachString invokes the given callback once for each of the
// given string values. The callback function will receive the index as well
// as the string value as parameters. Callbacks will be invoked in a number
// of concurrent routines proportional to the maximum number of CPUs that
// can be executing simultaneously.
func RunWorkersOverStrings(values []string, worker func(index int, value string) error) error {
return runWorkersOverStringsN(runtime.GOMAXPROCS(0), values, worker)
}

// RunWorkersOverStrings invokes the given worker once for each of the
// given string values. The worker function will receive the index as well
// as the string value as parameters. Workers will be invoked in n concurrent
// routines.
func runWorkersOverStringsN(n int, values []string, worker func(index int, value string) error) error {
return runWorkersN(n, indexedStringWorker(loadIndexedStringChannel(values), worker))
}

type indexedString struct {
index int
value string
}

func loadIndexedStringChannel(values []string) <-chan indexedString {
ch := make(chan indexedString, len(values))
defer close(ch)

func ForEachString(values []string, f func(index int, value string) error) error {
p := pool.New().
WithErrors().
WithMaxGoroutines(runtime.GOMAXPROCS(0))
for i, value := range values {
ch <- indexedString{index: i, value: value}
}

return ch
}

func indexedStringWorker(ch <-chan indexedString, worker func(index int, value string) error) poolWorker {
return func(errs chan<- error) {
for value := range ch {
if err := worker(value.index, value.value); err != nil {
errs <- err
}
}
i, value := i, value
p.Go(func() error {
return f(i, value)
})
}
return p.Wait()
}
4 changes: 2 additions & 2 deletions internal/uploadstore/s3_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (s *s3Store) Compose(ctx context.Context, destination string, sources ...st
var m sync.Mutex
etags := map[int]*string{}

if err := RunWorkersOverStrings(sources, func(index int, source string) error {
if err := ForEachString(sources, func(index int, source string) error {
partNumber := index + 1

copyResult, err := s.client.UploadPartCopy(ctx, &s3.UploadPartCopyInput{
Expand Down Expand Up @@ -331,7 +331,7 @@ func (s *s3Store) create(ctx context.Context) error {
}

func (s *s3Store) deleteSources(ctx context.Context, bucket string, sources []string) error {
return RunWorkersOverStrings(sources, func(index int, source string) error {
return ForEachString(sources, func(index int, source string) error {
if _, err := s.client.DeleteObject(ctx, &s3.DeleteObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(source),
Expand Down