Commit 130301e3 authored by Matt Holt's avatar Matt Holt

Merge pull request #92 from abiosoft/master

Git: code refactor. replace Sleep with Ticker
parents ee059c09 2013838b
...@@ -57,14 +57,14 @@ func TestIntervals(t *testing.T) { ...@@ -57,14 +57,14 @@ func TestIntervals(t *testing.T) {
check(t, err) check(t, err)
// wait for first background pull // wait for first background pull
time.Sleep(time.Millisecond * 100) gittest.Sleep(time.Millisecond * 100)
// switch logger to test file // switch logger to test file
logFile := gittest.Open("file") logFile := gittest.Open("file")
git.Logger = log.New(logFile, "", 0) git.Logger = log.New(logFile, "", 0)
// sleep for the interval // sleep for the interval
time.Sleep(repo.Interval) gittest.Sleep(repo.Interval)
// get log output // get log output
out, err := ioutil.ReadAll(logFile) out, err := ioutil.ReadAll(logFile)
...@@ -87,7 +87,7 @@ No new changes.` ...@@ -87,7 +87,7 @@ No new changes.`
} }
// stop background thread monitor // stop background thread monitor
git.Monitor.StopAndWait(repo.URL, 1) git.Services.Stop(repo.URL, 1)
} }
......
...@@ -33,8 +33,9 @@ var initMutex = sync.Mutex{} ...@@ -33,8 +33,9 @@ var initMutex = sync.Mutex{}
// Logger is used to log errors; if nil, the default log.Logger is used. // Logger is used to log errors; if nil, the default log.Logger is used.
var Logger *log.Logger var Logger *log.Logger
// Monitor listens for halt signal to stop repositories from auto pulling. // Services holds all git pulling services and provides the function to
var Monitor = &monitor{} // stop them.
var Services = &services{}
// logger is an helper function to retrieve the available logger // logger is an helper function to retrieve the available logger
func logger() *log.Logger { func logger() *log.Logger {
...@@ -67,7 +68,7 @@ func (r *Repo) Pull() error { ...@@ -67,7 +68,7 @@ func (r *Repo) Pull() error {
defer r.Unlock() defer r.Unlock()
// prevent a pull if the last one was less than 5 seconds ago // prevent a pull if the last one was less than 5 seconds ago
if time.Since(r.lastPull) < 5*time.Second { if gos.TimeSince(r.lastPull) < 5*time.Second {
return nil return nil
} }
......
...@@ -160,7 +160,7 @@ Command echo Hello successful. ...@@ -160,7 +160,7 @@ Command echo Hello successful.
before := r.repo.lastPull before := r.repo.lastPull
time.Sleep(r.repo.Interval) gittest.Sleep(r.repo.Interval)
err = r.repo.Pull() err = r.repo.Pull()
after := r.repo.lastPull after := r.repo.lastPull
......
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"io/ioutil" "io/ioutil"
"os" "os"
"os/exec" "os/exec"
"time"
) )
// File is an abstraction for file (os.File). // File is an abstraction for file (os.File).
...@@ -114,6 +115,33 @@ type OS interface { ...@@ -114,6 +115,33 @@ type OS interface {
// beginning with prefix, opens the file for reading and writing, and // beginning with prefix, opens the file for reading and writing, and
// returns the resulting File. // returns the resulting File.
TempFile(string, string) (File, error) TempFile(string, string) (File, error)
// Sleep pauses the current goroutine for at least the duration d. A
// negative or zero duration causes Sleep to return immediately.
Sleep(time.Duration)
// NewTicker returns a new Ticker containing a channel that will send the
// time with a period specified by the argument.
NewTicker(time.Duration) Ticker
// TimeSince returns the time elapsed since the argument.
TimeSince(time.Time) time.Duration
}
// Ticker is an abstraction for Ticker (time.Ticker)
type Ticker interface {
C() <-chan time.Time
Stop()
}
// GitTicker is the implementation of Ticker for git.
type GitTicker struct {
*time.Ticker
}
// C returns the channel on which the ticks are delivered.s
func (g *GitTicker) C() <-chan time.Time {
return g.Ticker.C
} }
// GitOS is the implementation of OS for git. // GitOS is the implementation of OS for git.
...@@ -158,3 +186,18 @@ func (g GitOS) ReadDir(dirname string) ([]os.FileInfo, error) { ...@@ -158,3 +186,18 @@ func (g GitOS) ReadDir(dirname string) ([]os.FileInfo, error) {
func (g GitOS) Command(name string, args ...string) Cmd { func (g GitOS) Command(name string, args ...string) Cmd {
return &gitCmd{exec.Command(name, args...)} return &gitCmd{exec.Command(name, args...)}
} }
// Sleep calls time.Sleep.
func (g GitOS) Sleep(d time.Duration) {
time.Sleep(d)
}
// New Ticker calls time.NewTicker.
func (g GitOS) NewTicker(d time.Duration) Ticker {
return &GitTicker{time.NewTicker(d)}
}
// TimeSince calls time.Since
func (g GitOS) TimeSince(t time.Time) time.Duration {
return time.Since(t)
}
...@@ -19,6 +19,9 @@ var CmdOutput = "success" ...@@ -19,6 +19,9 @@ var CmdOutput = "success"
// TempFileName is the name of any file returned by mocked gitos.OS's TempFile(). // TempFileName is the name of any file returned by mocked gitos.OS's TempFile().
var TempFileName = "tempfile" var TempFileName = "tempfile"
// TimeSpeed is how faster the mocked gitos.Ticker and gitos.Sleep should run.
var TimeSpeed = 5
// dirs mocks a fake git dir if filename is "gitdir". // dirs mocks a fake git dir if filename is "gitdir".
var dirs = map[string][]os.FileInfo{ var dirs = map[string][]os.FileInfo{
"gitdir": { "gitdir": {
...@@ -31,6 +34,11 @@ func Open(name string) gitos.File { ...@@ -31,6 +34,11 @@ func Open(name string) gitos.File {
return &fakeFile{name: name} return &fakeFile{name: name}
} }
// Sleep calls fake time.Sleep
func Sleep(d time.Duration) {
FakeOS.Sleep(d)
}
// fakeFile is a mock gitos.File. // fakeFile is a mock gitos.File.
type fakeFile struct { type fakeFile struct {
name string name string
...@@ -70,7 +78,7 @@ func (f *fakeFile) Write(b []byte) (int, error) { ...@@ -70,7 +78,7 @@ func (f *fakeFile) Write(b []byte) (int, error) {
return len(b), nil return len(b), nil
} }
// fakeCmd is a mock git.Cmd. // fakeCmd is a mock gitos.Cmd.
type fakeCmd struct{} type fakeCmd struct{}
func (f fakeCmd) Run() error { func (f fakeCmd) Run() error {
...@@ -128,7 +136,16 @@ func (f fakeInfo) Sys() interface{} { ...@@ -128,7 +136,16 @@ func (f fakeInfo) Sys() interface{} {
return nil return nil
} }
// fakeOS is a mock git.OS. // fakeTicker is a mock gitos.Ticker
type fakeTicker struct {
*time.Ticker
}
func (f fakeTicker) C() <-chan time.Time {
return f.Ticker.C
}
// fakeOS is a mock gitos.OS.
type fakeOS struct{} type fakeOS struct{}
func (f fakeOS) Mkdir(name string, perm os.FileMode) error { func (f fakeOS) Mkdir(name string, perm os.FileMode) error {
...@@ -165,3 +182,15 @@ func (f fakeOS) ReadDir(dirname string) ([]os.FileInfo, error) { ...@@ -165,3 +182,15 @@ func (f fakeOS) ReadDir(dirname string) ([]os.FileInfo, error) {
func (f fakeOS) Command(name string, args ...string) gitos.Cmd { func (f fakeOS) Command(name string, args ...string) gitos.Cmd {
return fakeCmd{} return fakeCmd{}
} }
func (f fakeOS) Sleep(d time.Duration) {
time.Sleep(d / time.Duration(TimeSpeed))
}
func (f fakeOS) NewTicker(d time.Duration) gitos.Ticker {
return &fakeTicker{time.NewTicker(d / time.Duration(TimeSpeed))}
}
func (f fakeOS) TimeSince(t time.Time) time.Duration {
return time.Since(t) * time.Duration(TimeSpeed)
}
...@@ -2,109 +2,83 @@ package git ...@@ -2,109 +2,83 @@ package git
import ( import (
"sync" "sync"
"time"
"github.com/mholt/caddy/middleware/git/gitos"
) )
// RepoService is the repository service that runs in background and // repoService is the service that runs in background and periodically
// periodic pull from the repository. // pull from the repository.
type RepoService struct { type repoService struct {
repo *Repo repo *Repo
running bool // whether service is running. ticker gitos.Ticker // ticker to tick at intervals
halt chan struct{} // channel to notify service to halt and stop pulling. halt chan struct{} // channel to notify service to halt and stop pulling.
exit chan struct{} // channel to notify on exit.
} }
// Start starts a new RepoService in background and adds it to monitor. // Start starts a new background service to pull periodically.
func Start(repo *Repo) { func Start(repo *Repo) {
service := &RepoService{ service := &repoService{
repo, repo,
true, gos.NewTicker(repo.Interval),
make(chan struct{}),
make(chan struct{}), make(chan struct{}),
} }
go func(s *repoService) {
// start service
go func(s *RepoService) {
for { for {
// if service is halted select {
if !s.running { case <-s.ticker.C():
// notify exit channel err := repo.Pull()
service.exit <- struct{}{} if err != nil {
break logger().Println(err)
} }
time.Sleep(repo.Interval) case <-s.halt:
s.ticker.Stop()
err := repo.Pull() return
if err != nil {
logger().Println(err)
} }
} }
}(service) }(service)
// add to monitor to enable halting // add to services to make it stoppable
Monitor.add(service) Services.add(service)
} }
// monitor monitors running services (RepoService) // services stores all repoServices
// and can halt them. type services struct {
type monitor struct { services []*repoService
services []*RepoService
sync.Mutex sync.Mutex
} }
// add adds a new service to the monitor. // add adds a new service to list of services.
func (m *monitor) add(service *RepoService) { func (s *services) add(r *repoService) {
m.Lock() s.Lock()
defer m.Unlock() defer s.Unlock()
m.services = append(m.services, service)
// start a goroutine to listen for halt signal s.services = append(s.services, r)
service.running = true
go func(r *RepoService) {
<-r.halt
r.running = false
}(service)
} }
// Stop stops at most `limit` currently running services that is pulling from git repo at // Stop stops at most `limit` running services pulling from git repo at
// repoURL. It returns list of exit channels for the services. A wait for message on the // repoURL. It waits until the service is terminated before returning.
// channels guarantees exit. If limit is less than zero, it is ignored. // If limit is less than zero, it is ignored.
// TODO find better ways to identify repos // TODO find better ways to identify repos
func (m *monitor) Stop(repoURL string, limit int) []chan struct{} { func (s *services) Stop(repoURL string, limit int) {
m.Lock() s.Lock()
defer m.Unlock() defer s.Unlock()
var chans []chan struct{}
// locate services // locate repos
for i, j := 0, 0; i < len(m.services) && ((limit >= 0 && j < limit) || limit < 0); i++ { for i, j := 0, 0; i < len(s.services) && ((limit >= 0 && j < limit) || limit < 0); i++ {
s := m.services[i] service := s.services[i]
if s.repo.URL == repoURL { if service.repo.URL == repoURL {
// send halt signal // send halt signal
s.halt <- struct{}{} service.halt <- struct{}{}
chans = append(chans, s.exit) s.services[i] = nil
j++ j++
m.services[i] = nil
} }
} }
// remove them from services list // remove them from repos list
services := m.services[:0] services := s.services[:0]
for _, s := range m.services { for _, s := range s.services {
if s != nil { if s != nil {
services = append(services, s) services = append(services, s)
} }
} }
m.services = services s.services = services
return chans
}
// StopAndWait is similar to stop but it waits for the services to terminate before
// returning.
func (m *monitor) StopAndWait(repoUrl string, limit int) {
chans := m.Stop(repoUrl, limit)
for _, c := range chans {
<-c
}
} }
...@@ -16,45 +16,45 @@ func Test(t *testing.T) { ...@@ -16,45 +16,45 @@ func Test(t *testing.T) {
repo := &Repo{URL: "git@github.com", Interval: time.Second} repo := &Repo{URL: "git@github.com", Interval: time.Second}
Start(repo) Start(repo)
if len(Monitor.services) != 1 { if len(Services.services) != 1 {
t.Errorf("Expected 1 service, found %v", len(Monitor.services)) t.Errorf("Expected 1 service, found %v", len(Services.services))
} }
Monitor.StopAndWait(repo.URL, 1) Services.Stop(repo.URL, 1)
if len(Monitor.services) != 0 { if len(Services.services) != 0 {
t.Errorf("Expected 1 service, found %v", len(Monitor.services)) t.Errorf("Expected 1 service, found %v", len(Services.services))
} }
repos := make([]*Repo, 5) repos := make([]*Repo, 5)
for i := 0; i < 5; i++ { for i := 0; i < 5; i++ {
repos[i] = &Repo{URL: fmt.Sprintf("test%v", i), Interval: time.Second * 2} repos[i] = &Repo{URL: fmt.Sprintf("test%v", i), Interval: time.Second * 2}
Start(repos[i]) Start(repos[i])
if len(Monitor.services) != i+1 { if len(Services.services) != i+1 {
t.Errorf("Expected %v service(s), found %v", i+1, len(Monitor.services)) t.Errorf("Expected %v service(s), found %v", i+1, len(Services.services))
} }
} }
time.Sleep(time.Second * 5) gos.Sleep(time.Second * 5)
Monitor.StopAndWait(repos[0].URL, 1) Services.Stop(repos[0].URL, 1)
if len(Monitor.services) != 4 { if len(Services.services) != 4 {
t.Errorf("Expected %v service(s), found %v", 4, len(Monitor.services)) t.Errorf("Expected %v service(s), found %v", 4, len(Services.services))
} }
repo = &Repo{URL: "git@github.com", Interval: time.Second} repo = &Repo{URL: "git@github.com", Interval: time.Second}
Start(repo) Start(repo)
if len(Monitor.services) != 5 { if len(Services.services) != 5 {
t.Errorf("Expected %v service(s), found %v", 5, len(Monitor.services)) t.Errorf("Expected %v service(s), found %v", 5, len(Services.services))
} }
repo = &Repo{URL: "git@github.com", Interval: time.Second * 2} repo = &Repo{URL: "git@github.com", Interval: time.Second * 2}
Start(repo) Start(repo)
if len(Monitor.services) != 6 { if len(Services.services) != 6 {
t.Errorf("Expected %v service(s), found %v", 6, len(Monitor.services)) t.Errorf("Expected %v service(s), found %v", 6, len(Services.services))
} }
time.Sleep(time.Second * 5) gos.Sleep(time.Second * 5)
Monitor.StopAndWait(repo.URL, -1) Services.Stop(repo.URL, -1)
if len(Monitor.services) != 4 { if len(Services.services) != 4 {
t.Errorf("Expected %v service(s), found %v", 4, len(Monitor.services)) t.Errorf("Expected %v service(s), found %v", 4, len(Services.services))
} }
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment