Refactor state tracking (...)
Move state tracking into the puller/scanner objects. This is a first step towards resolving #1391. Rename Puller and Scanner to roFolder and rwFolder as they have more duties than just pulling and scanning, and don't need to be exported.
This commit is contained in:
89
internal/model/folderstate.go
Normal file
89
internal/model/folderstate.go
Normal file
@@ -0,0 +1,89 @@
|
||||
// Copyright (C) 2015 The Syncthing Authors.
|
||||
//
|
||||
// This program is free software: you can redistribute it and/or modify it
|
||||
// under the terms of the GNU General Public License as published by the Free
|
||||
// Software Foundation, either version 3 of the License, or (at your option)
|
||||
// any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
|
||||
// more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License along
|
||||
// with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
package model
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/syncthing/syncthing/internal/events"
|
||||
)
|
||||
|
||||
type folderState int
|
||||
|
||||
const (
|
||||
FolderIdle folderState = iota
|
||||
FolderScanning
|
||||
FolderSyncing
|
||||
FolderCleaning
|
||||
)
|
||||
|
||||
func (s folderState) String() string {
|
||||
switch s {
|
||||
case FolderIdle:
|
||||
return "idle"
|
||||
case FolderScanning:
|
||||
return "scanning"
|
||||
case FolderCleaning:
|
||||
return "cleaning"
|
||||
case FolderSyncing:
|
||||
return "syncing"
|
||||
default:
|
||||
return "unknown"
|
||||
}
|
||||
}
|
||||
|
||||
type stateTracker struct {
|
||||
folder string
|
||||
|
||||
mut sync.Mutex
|
||||
current folderState
|
||||
changed time.Time
|
||||
}
|
||||
|
||||
func (s *stateTracker) setState(newState folderState) {
|
||||
s.mut.Lock()
|
||||
if newState != s.current {
|
||||
/* This should hold later...
|
||||
if s.current != FolderIdle && (newState == FolderScanning || newState == FolderSyncing) {
|
||||
panic("illegal state transition " + s.current.String() + " -> " + newState.String())
|
||||
}
|
||||
*/
|
||||
|
||||
eventData := map[string]interface{}{
|
||||
"folder": s.folder,
|
||||
"to": newState.String(),
|
||||
"from": s.current.String(),
|
||||
}
|
||||
|
||||
if !s.changed.IsZero() {
|
||||
eventData["duration"] = time.Since(s.changed).Seconds()
|
||||
}
|
||||
|
||||
s.current = newState
|
||||
s.changed = time.Now()
|
||||
|
||||
events.Default.Log(events.StateChanged, eventData)
|
||||
}
|
||||
s.mut.Unlock()
|
||||
}
|
||||
|
||||
func (s *stateTracker) getState() (current folderState, changed time.Time) {
|
||||
s.mut.Lock()
|
||||
current, changed = s.current, s.changed
|
||||
s.mut.Unlock()
|
||||
return
|
||||
}
|
||||
@@ -36,30 +36,6 @@ import (
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
)
|
||||
|
||||
type folderState int
|
||||
|
||||
const (
|
||||
FolderIdle folderState = iota
|
||||
FolderScanning
|
||||
FolderSyncing
|
||||
FolderCleaning
|
||||
)
|
||||
|
||||
func (s folderState) String() string {
|
||||
switch s {
|
||||
case FolderIdle:
|
||||
return "idle"
|
||||
case FolderScanning:
|
||||
return "scanning"
|
||||
case FolderCleaning:
|
||||
return "cleaning"
|
||||
case FolderSyncing:
|
||||
return "syncing"
|
||||
default:
|
||||
return "unknown"
|
||||
}
|
||||
}
|
||||
|
||||
// How many files to send in each Index/IndexUpdate message.
|
||||
const (
|
||||
indexTargetSize = 250 * 1024 // Aim for making index messages no larger than 250 KiB (uncompressed)
|
||||
@@ -73,6 +49,9 @@ type service interface {
|
||||
Stop()
|
||||
Jobs() ([]string, []string) // In progress, Queued
|
||||
BringToFront(string)
|
||||
|
||||
setState(folderState)
|
||||
getState() (folderState, time.Time)
|
||||
}
|
||||
|
||||
type Model struct {
|
||||
@@ -95,10 +74,6 @@ type Model struct {
|
||||
folderStatRefs map[string]*stats.FolderStatisticsReference // folder -> statsRef
|
||||
fmut sync.RWMutex // protects the above
|
||||
|
||||
folderState map[string]folderState // folder -> state
|
||||
folderStateChanged map[string]time.Time // folder -> time when state changed
|
||||
smut sync.RWMutex
|
||||
|
||||
protoConn map[protocol.DeviceID]protocol.Connection
|
||||
rawConn map[protocol.DeviceID]io.Closer
|
||||
deviceVer map[protocol.DeviceID]string
|
||||
@@ -120,26 +95,24 @@ var (
|
||||
// for file data without altering the local folder in any way.
|
||||
func NewModel(cfg *config.Wrapper, deviceName, clientName, clientVersion string, ldb *leveldb.DB) *Model {
|
||||
m := &Model{
|
||||
cfg: cfg,
|
||||
db: ldb,
|
||||
deviceName: deviceName,
|
||||
clientName: clientName,
|
||||
clientVersion: clientVersion,
|
||||
folderCfgs: make(map[string]config.FolderConfiguration),
|
||||
folderFiles: make(map[string]*db.FileSet),
|
||||
folderDevices: make(map[string][]protocol.DeviceID),
|
||||
deviceFolders: make(map[protocol.DeviceID][]string),
|
||||
deviceStatRefs: make(map[protocol.DeviceID]*stats.DeviceStatisticsReference),
|
||||
folderIgnores: make(map[string]*ignore.Matcher),
|
||||
folderRunners: make(map[string]service),
|
||||
folderStatRefs: make(map[string]*stats.FolderStatisticsReference),
|
||||
folderState: make(map[string]folderState),
|
||||
folderStateChanged: make(map[string]time.Time),
|
||||
protoConn: make(map[protocol.DeviceID]protocol.Connection),
|
||||
rawConn: make(map[protocol.DeviceID]io.Closer),
|
||||
deviceVer: make(map[protocol.DeviceID]string),
|
||||
finder: db.NewBlockFinder(ldb, cfg),
|
||||
progressEmitter: NewProgressEmitter(cfg),
|
||||
cfg: cfg,
|
||||
db: ldb,
|
||||
deviceName: deviceName,
|
||||
clientName: clientName,
|
||||
clientVersion: clientVersion,
|
||||
folderCfgs: make(map[string]config.FolderConfiguration),
|
||||
folderFiles: make(map[string]*db.FileSet),
|
||||
folderDevices: make(map[string][]protocol.DeviceID),
|
||||
deviceFolders: make(map[protocol.DeviceID][]string),
|
||||
deviceStatRefs: make(map[protocol.DeviceID]*stats.DeviceStatisticsReference),
|
||||
folderIgnores: make(map[string]*ignore.Matcher),
|
||||
folderRunners: make(map[string]service),
|
||||
folderStatRefs: make(map[string]*stats.FolderStatisticsReference),
|
||||
protoConn: make(map[protocol.DeviceID]protocol.Connection),
|
||||
rawConn: make(map[protocol.DeviceID]io.Closer),
|
||||
deviceVer: make(map[protocol.DeviceID]string),
|
||||
finder: db.NewBlockFinder(ldb, cfg),
|
||||
progressEmitter: NewProgressEmitter(cfg),
|
||||
}
|
||||
if cfg.Options().ProgressUpdateIntervalS > -1 {
|
||||
go m.progressEmitter.Serve()
|
||||
@@ -153,7 +126,6 @@ func NewModel(cfg *config.Wrapper, deviceName, clientName, clientVersion string,
|
||||
}
|
||||
}
|
||||
deadlockDetect(&m.fmut, time.Duration(timeout)*time.Second)
|
||||
deadlockDetect(&m.smut, time.Duration(timeout)*time.Second)
|
||||
deadlockDetect(&m.pmut, time.Duration(timeout)*time.Second)
|
||||
return m
|
||||
}
|
||||
@@ -172,18 +144,7 @@ func (m *Model) StartFolderRW(folder string) {
|
||||
if ok {
|
||||
panic("cannot start already running folder " + folder)
|
||||
}
|
||||
p := &Puller{
|
||||
folder: folder,
|
||||
dir: cfg.Path,
|
||||
scanIntv: time.Duration(cfg.RescanIntervalS) * time.Second,
|
||||
model: m,
|
||||
ignorePerms: cfg.IgnorePerms,
|
||||
lenientMtimes: cfg.LenientMtimes,
|
||||
progressEmitter: m.progressEmitter,
|
||||
copiers: cfg.Copiers,
|
||||
pullers: cfg.Pullers,
|
||||
queue: newJobQueue(),
|
||||
}
|
||||
p := newRWFolder(m, cfg)
|
||||
m.folderRunners[folder] = p
|
||||
m.fmut.Unlock()
|
||||
|
||||
@@ -216,11 +177,7 @@ func (m *Model) StartFolderRO(folder string) {
|
||||
if ok {
|
||||
panic("cannot start already running folder " + folder)
|
||||
}
|
||||
s := &Scanner{
|
||||
folder: folder,
|
||||
intv: time.Duration(cfg.RescanIntervalS) * time.Second,
|
||||
model: m,
|
||||
}
|
||||
s := newROFolder(m, folder, time.Duration(cfg.RescanIntervalS)*time.Second)
|
||||
m.folderRunners[folder] = s
|
||||
m.fmut.Unlock()
|
||||
|
||||
@@ -1154,11 +1111,15 @@ func (m *Model) ScanFolderSub(folder, sub string) error {
|
||||
}
|
||||
|
||||
m.fmut.Lock()
|
||||
fs, ok := m.folderFiles[folder]
|
||||
fs := m.folderFiles[folder]
|
||||
folderCfg := m.folderCfgs[folder]
|
||||
ignores := m.folderIgnores[folder]
|
||||
runner, ok := m.folderRunners[folder]
|
||||
m.fmut.Unlock()
|
||||
|
||||
// Folders are added to folderRunners only when they are started. We can't
|
||||
// scan them before they have started, so that's what we need to check for
|
||||
// here.
|
||||
if !ok {
|
||||
return errors.New("no such folder")
|
||||
}
|
||||
@@ -1189,7 +1150,7 @@ func (m *Model) ScanFolderSub(folder, sub string) error {
|
||||
Hashers: folderCfg.Hashers,
|
||||
}
|
||||
|
||||
m.setState(folder, FolderScanning)
|
||||
runner.setState(FolderScanning)
|
||||
fchan, err := w.Walk()
|
||||
|
||||
if err != nil {
|
||||
@@ -1289,7 +1250,7 @@ func (m *Model) ScanFolderSub(folder, sub string) error {
|
||||
fs.Update(protocol.LocalDeviceID, batch)
|
||||
}
|
||||
|
||||
m.setState(folder, FolderIdle)
|
||||
runner.setState(FolderIdle)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1332,40 +1293,24 @@ func (m *Model) clusterConfig(device protocol.DeviceID) protocol.ClusterConfigMe
|
||||
return cm
|
||||
}
|
||||
|
||||
func (m *Model) setState(folder string, state folderState) {
|
||||
m.smut.Lock()
|
||||
oldState := m.folderState[folder]
|
||||
changed, ok := m.folderStateChanged[folder]
|
||||
if state != oldState {
|
||||
m.folderState[folder] = state
|
||||
m.folderStateChanged[folder] = time.Now()
|
||||
eventData := map[string]interface{}{
|
||||
"folder": folder,
|
||||
"to": state.String(),
|
||||
}
|
||||
if ok {
|
||||
eventData["duration"] = time.Since(changed).Seconds()
|
||||
eventData["from"] = oldState.String()
|
||||
}
|
||||
events.Default.Log(events.StateChanged, eventData)
|
||||
}
|
||||
m.smut.Unlock()
|
||||
}
|
||||
|
||||
func (m *Model) State(folder string) (string, time.Time) {
|
||||
m.smut.RLock()
|
||||
state := m.folderState[folder]
|
||||
changed := m.folderStateChanged[folder]
|
||||
m.smut.RUnlock()
|
||||
m.fmut.RLock()
|
||||
runner, ok := m.folderRunners[folder]
|
||||
m.fmut.RUnlock()
|
||||
if !ok {
|
||||
return "", time.Time{}
|
||||
}
|
||||
state, changed := runner.getState()
|
||||
return state.String(), changed
|
||||
}
|
||||
|
||||
func (m *Model) Override(folder string) {
|
||||
m.fmut.RLock()
|
||||
fs := m.folderFiles[folder]
|
||||
runner := m.folderRunners[folder]
|
||||
m.fmut.RUnlock()
|
||||
|
||||
m.setState(folder, FolderScanning)
|
||||
runner.setState(FolderScanning)
|
||||
batch := make([]protocol.FileInfo, 0, indexBatchSize)
|
||||
fs.WithNeed(protocol.LocalDeviceID, func(fi db.FileIntf) bool {
|
||||
need := fi.(protocol.FileInfo)
|
||||
@@ -1391,7 +1336,7 @@ func (m *Model) Override(folder string) {
|
||||
if len(batch) > 0 {
|
||||
fs.Update(protocol.LocalDeviceID, batch)
|
||||
}
|
||||
m.setState(folder, FolderIdle)
|
||||
runner.setState(FolderIdle)
|
||||
}
|
||||
|
||||
// CurrentLocalVersion returns the change version for the given folder.
|
||||
|
||||
@@ -90,6 +90,7 @@ func TestRequest(t *testing.T) {
|
||||
|
||||
// device1 shares default, but device2 doesn't
|
||||
m.AddFolder(defaultFolderConfig)
|
||||
m.StartFolderRO("default")
|
||||
m.ScanFolder("default")
|
||||
|
||||
// Existing, shared file
|
||||
@@ -470,6 +471,7 @@ func TestIgnores(t *testing.T) {
|
||||
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
|
||||
m := NewModel(defaultConfig, "device", "syncthing", "dev", db)
|
||||
m.AddFolder(defaultFolderConfig)
|
||||
m.StartFolderRO("default")
|
||||
|
||||
expected := []string{
|
||||
".*",
|
||||
|
||||
@@ -54,31 +54,53 @@ var (
|
||||
errNoDevice = errors.New("no available source device")
|
||||
)
|
||||
|
||||
type Puller struct {
|
||||
folder string
|
||||
dir string
|
||||
scanIntv time.Duration
|
||||
type rwFolder struct {
|
||||
stateTracker
|
||||
|
||||
model *Model
|
||||
stop chan struct{}
|
||||
versioner versioner.Versioner
|
||||
ignorePerms bool
|
||||
lenientMtimes bool
|
||||
progressEmitter *ProgressEmitter
|
||||
copiers int
|
||||
pullers int
|
||||
queue *jobQueue
|
||||
|
||||
folder string
|
||||
dir string
|
||||
scanIntv time.Duration
|
||||
versioner versioner.Versioner
|
||||
ignorePerms bool
|
||||
lenientMtimes bool
|
||||
copiers int
|
||||
pullers int
|
||||
|
||||
stop chan struct{}
|
||||
queue *jobQueue
|
||||
}
|
||||
|
||||
func newRWFolder(m *Model, cfg config.FolderConfiguration) *rwFolder {
|
||||
return &rwFolder{
|
||||
stateTracker: stateTracker{folder: cfg.ID},
|
||||
|
||||
model: m,
|
||||
progressEmitter: m.progressEmitter,
|
||||
|
||||
folder: cfg.ID,
|
||||
dir: cfg.Path,
|
||||
scanIntv: time.Duration(cfg.RescanIntervalS) * time.Second,
|
||||
ignorePerms: cfg.IgnorePerms,
|
||||
lenientMtimes: cfg.LenientMtimes,
|
||||
copiers: cfg.Copiers,
|
||||
pullers: cfg.Pullers,
|
||||
|
||||
stop: make(chan struct{}),
|
||||
queue: newJobQueue(),
|
||||
}
|
||||
}
|
||||
|
||||
// Serve will run scans and pulls. It will return when Stop()ed or on a
|
||||
// critical error.
|
||||
func (p *Puller) Serve() {
|
||||
func (p *rwFolder) Serve() {
|
||||
if debug {
|
||||
l.Debugln(p, "starting")
|
||||
defer l.Debugln(p, "exiting")
|
||||
}
|
||||
|
||||
p.stop = make(chan struct{})
|
||||
|
||||
pullTimer := time.NewTimer(checkPullIntv)
|
||||
scanTimer := time.NewTimer(time.Millisecond) // The first scan should be done immediately.
|
||||
|
||||
@@ -86,7 +108,7 @@ func (p *Puller) Serve() {
|
||||
pullTimer.Stop()
|
||||
scanTimer.Stop()
|
||||
// TODO: Should there be an actual FolderStopped state?
|
||||
p.model.setState(p.folder, FolderIdle)
|
||||
p.setState(FolderIdle)
|
||||
}()
|
||||
|
||||
var prevVer int64
|
||||
@@ -143,7 +165,7 @@ loop:
|
||||
if debug {
|
||||
l.Debugln(p, "pulling", prevVer, curVer)
|
||||
}
|
||||
p.model.setState(p.folder, FolderSyncing)
|
||||
p.setState(FolderSyncing)
|
||||
tries := 0
|
||||
for {
|
||||
tries++
|
||||
@@ -191,7 +213,7 @@ loop:
|
||||
break
|
||||
}
|
||||
}
|
||||
p.model.setState(p.folder, FolderIdle)
|
||||
p.setState(FolderIdle)
|
||||
|
||||
// The reason for running the scanner from within the puller is that
|
||||
// this is the easiest way to make sure we are not doing both at the
|
||||
@@ -200,12 +222,12 @@ loop:
|
||||
if debug {
|
||||
l.Debugln(p, "rescan")
|
||||
}
|
||||
p.model.setState(p.folder, FolderScanning)
|
||||
p.setState(FolderScanning)
|
||||
if err := p.model.ScanFolder(p.folder); err != nil {
|
||||
p.model.cfg.InvalidateFolder(p.folder, err.Error())
|
||||
break loop
|
||||
}
|
||||
p.model.setState(p.folder, FolderIdle)
|
||||
p.setState(FolderIdle)
|
||||
if p.scanIntv > 0 {
|
||||
// Sleep a random time between 3/4 and 5/4 of the configured interval.
|
||||
sleepNanos := (p.scanIntv.Nanoseconds()*3 + rand.Int63n(2*p.scanIntv.Nanoseconds())) / 4
|
||||
@@ -224,19 +246,19 @@ loop:
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Puller) Stop() {
|
||||
func (p *rwFolder) Stop() {
|
||||
close(p.stop)
|
||||
}
|
||||
|
||||
func (p *Puller) String() string {
|
||||
return fmt.Sprintf("puller/%s@%p", p.folder, p)
|
||||
func (p *rwFolder) String() string {
|
||||
return fmt.Sprintf("rwFolder/%s@%p", p.folder, p)
|
||||
}
|
||||
|
||||
// pullerIteration runs a single puller iteration for the given folder and
|
||||
// returns the number items that should have been synced (even those that
|
||||
// might have failed). One puller iteration handles all files currently
|
||||
// flagged as needed in the folder.
|
||||
func (p *Puller) pullerIteration(ignores *ignore.Matcher) int {
|
||||
func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
|
||||
pullChan := make(chan pullBlockState)
|
||||
copyChan := make(chan copyBlocksState)
|
||||
finisherChan := make(chan *sharedPullerState)
|
||||
@@ -422,7 +444,7 @@ nextFile:
|
||||
}
|
||||
|
||||
// handleDir creates or updates the given directory
|
||||
func (p *Puller) handleDir(file protocol.FileInfo) {
|
||||
func (p *rwFolder) handleDir(file protocol.FileInfo) {
|
||||
var err error
|
||||
events.Default.Log(events.ItemStarted, map[string]interface{}{
|
||||
"folder": p.folder,
|
||||
@@ -497,7 +519,7 @@ func (p *Puller) handleDir(file protocol.FileInfo) {
|
||||
}
|
||||
|
||||
// deleteDir attempts to delete the given directory
|
||||
func (p *Puller) deleteDir(file protocol.FileInfo) {
|
||||
func (p *rwFolder) deleteDir(file protocol.FileInfo) {
|
||||
var err error
|
||||
events.Default.Log(events.ItemStarted, map[string]interface{}{
|
||||
"folder": p.folder,
|
||||
@@ -532,7 +554,7 @@ func (p *Puller) deleteDir(file protocol.FileInfo) {
|
||||
}
|
||||
|
||||
// deleteFile attempts to delete the given file
|
||||
func (p *Puller) deleteFile(file protocol.FileInfo) {
|
||||
func (p *rwFolder) deleteFile(file protocol.FileInfo) {
|
||||
var err error
|
||||