URI: 
       common/para: Add parallel task executor helper - hugo - [fork] hugo port for 9front
  HTML git clone git@git.drkhsh.at/hugo.git
   DIR Log
   DIR Files
   DIR Refs
   DIR Submodules
   DIR README
   DIR LICENSE
       ---
   DIR commit 628efd6e293d27984a3f5ba33522f8edd19d69d6
   DIR parent 2dcc1318d1d9ed849d040115aa5ba6191a1c102a
  HTML Author: Bjørn Erik Pedersen <bjorn.erik.pedersen@gmail.com>
       Date:   Thu, 21 Nov 2019 18:38:14 +0100
       
       common/para: Add parallel task executor helper
       
       Usage of this will come later.
       
       Diffstat:
         A common/para/para.go                 |      73 +++++++++++++++++++++++++++++++
         A common/para/para_test.go            |      85 +++++++++++++++++++++++++++++++
       
       2 files changed, 158 insertions(+), 0 deletions(-)
       ---
   DIR diff --git a/common/para/para.go b/common/para/para.go
       @@ -0,0 +1,73 @@
       +// Copyright 2019 The Hugo Authors. All rights reserved.
       +//
       +// Licensed under the Apache License, Version 2.0 (the "License");
       +// you may not use this file except in compliance with the License.
       +// You may obtain a copy of the License at
       +// http://www.apache.org/licenses/LICENSE-2.0
       +//
       +// Unless required by applicable law or agreed to in writing, software
       +// distributed under the License is distributed on an "AS IS" BASIS,
       +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       +// See the License for the specific language governing permissions and
       +// limitations under the License.
       +
       +// Package para implements parallel execution helpers.
       +package para
       +
       +import (
       +        "context"
       +
       +        "golang.org/x/sync/errgroup"
       +)
       +
       +// Workers configures a task executor with the most number of tasks to be executed in parallel.
       +type Workers struct {
       +        sem chan struct{}
       +}
       +
       +// Runner wraps the lifecycle methods of a new task set.
       +//
       +// Run wil block until a worker is available or the context is cancelled,
       +// and then run the given func in a new goroutine.
       +// Wait will wait for all the running goroutines to finish.
       +type Runner interface {
       +        Run(func() error)
       +        Wait() error
       +}
       +
       +type errGroupRunner struct {
       +        *errgroup.Group
       +        w *Workers
       +        ctx  context.Context
       +}
       +
       +func (g *errGroupRunner) Run(fn func() error) {
       +        select {
       +        case g.w.sem <- struct{}{}:
       +        case <-g.ctx.Done():
       +                return
       +        }
       +
       +        g.Go(func() error {
       +                err := fn()
       +                <-g.w.sem
       +                return err
       +        })
       +}
       +
       +// New creates a new Workers with the given number of workers.
       +func New(numWorkers int) *Workers {
       +        return &Workers{
       +                sem: make(chan struct{}, numWorkers),
       +        }
       +}
       +
       +// Start starts a new Runner.
       +func (w *Workers) Start(ctx context.Context) (Runner, context.Context) {
       +        g, ctx := errgroup.WithContext(ctx)
       +        return &errGroupRunner{
       +                Group: g,
       +                ctx:   ctx,
       +                w:  w,
       +        }, ctx
       +}
   DIR diff --git a/common/para/para_test.go b/common/para/para_test.go
       @@ -0,0 +1,85 @@
       +// Copyright 2019 The Hugo Authors. All rights reserved.
       +//
       +// Licensed under the Apache License, Version 2.0 (the "License");
       +// you may not use this file except in compliance with the License.
       +// You may obtain a copy of the License at
       +// http://www.apache.org/licenses/LICENSE-2.0
       +//
       +// Unless required by applicable law or agreed to in writing, software
       +// distributed under the License is distributed on an "AS IS" BASIS,
       +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       +// See the License for the specific language governing permissions and
       +// limitations under the License.
       +
       +package para
       +
       +import (
       +        "context"
       +        "sort"
       +        "sync"
       +        "sync/atomic"
       +        "testing"
       +        "time"
       +
       +        qt "github.com/frankban/quicktest"
       +)
       +
       +func TestPara(t *testing.T) {
       +
       +        c := qt.New(t)
       +
       +        c.Run("Order", func(c *qt.C) {
       +                n := 500
       +                ints := make([]int, n)
       +                for i := 0; i < n; i++ {
       +                        ints[i] = i
       +                }
       +
       +                p := New(4)
       +                r, _ := p.Start(context.Background())
       +
       +                var result []int
       +                var mu sync.Mutex
       +                for i := 0; i < n; i++ {
       +                        i := i
       +                        r.Run(func() error {
       +                                mu.Lock()
       +                                defer mu.Unlock()
       +                                result = append(result, i)
       +                                return nil
       +                        })
       +                }
       +
       +                c.Assert(r.Wait(), qt.IsNil)
       +                c.Assert(result, qt.HasLen, len(ints))
       +                c.Assert(sort.IntsAreSorted(result), qt.Equals, false, qt.Commentf("Para does not seem to be parallel"))
       +                sort.Ints(result)
       +                c.Assert(result, qt.DeepEquals, ints)
       +
       +        })
       +
       +        c.Run("Time", func(c *qt.C) {
       +                const n = 100
       +
       +                p := New(5)
       +                r, _ := p.Start(context.Background())
       +
       +                start := time.Now()
       +
       +                var counter int64
       +
       +                for i := 0; i < n; i++ {
       +                        r.Run(func() error {
       +                                atomic.AddInt64(&counter, 1)
       +                                time.Sleep(1 * time.Millisecond)
       +                                return nil
       +                        })
       +                }
       +
       +                c.Assert(r.Wait(), qt.IsNil)
       +                c.Assert(counter, qt.Equals, int64(n))
       +                c.Assert(time.Since(start) < n/2*time.Millisecond, qt.Equals, true)
       +
       +        })
       +
       +}