diff --git a/cmd/syncthing/main.go b/cmd/syncthing/main.go index 674aab17..a6f13671 100644 --- a/cmd/syncthing/main.go +++ b/cmd/syncthing/main.go @@ -928,6 +928,7 @@ func syncthingMain(runtimeOptions RuntimeOptions) { code := exit.waitForExit() mainService.Stop() + ldb.Close() l.Infoln("Exiting") diff --git a/lib/db/instance.go b/lib/db/instance.go index d7c7bbbe..b07c8aef 100644 --- a/lib/db/instance.go +++ b/lib/db/instance.go @@ -505,7 +505,7 @@ func (db *instance) getIndexID(device, folder []byte) protocol.IndexID { func (db *instance) setIndexID(device, folder []byte, id protocol.IndexID) { bs, _ := id.Marshal() // marshalling can't fail - if err := db.Put(db.keyer.GenerateIndexIDKey(nil, device, folder), bs, nil); err != nil { + if err := db.Put(db.keyer.GenerateIndexIDKey(nil, device, folder), bs, nil); err != nil && err != leveldb.ErrClosed { panic("storing index ID: " + err.Error()) } } diff --git a/lib/db/lowlevel.go b/lib/db/lowlevel.go index 69f1758f..4c77ed74 100644 --- a/lib/db/lowlevel.go +++ b/lib/db/lowlevel.go @@ -9,12 +9,15 @@ package db import ( "os" "strings" + "sync" "sync/atomic" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/errors" + "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/storage" + "github.com/syndtr/goleveldb/leveldb/util" ) const ( @@ -34,6 +37,8 @@ type Lowlevel struct { location string folderIdx *smallIndex deviceIdx *smallIndex + closed bool + closeMut *sync.RWMutex } // Open attempts to open the database at the given location, and runs @@ -103,6 +108,36 @@ func (db *Lowlevel) Delete(key []byte, wo *opt.WriteOptions) error { return db.DB.Delete(key, wo) } +func (db *Lowlevel) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { + db.closeMut.RLock() + defer db.closeMut.RUnlock() + if db.closed { + return &closedIter{} + } + return db.DB.NewIterator(slice, ro) +} + +func (db *Lowlevel) GetSnapshot() snapshot { + snap, err := db.DB.GetSnapshot() + if err != nil { + if err == leveldb.ErrClosed { + return &closedSnap{} + } + panic(err) + } + return snap +} + +func (db *Lowlevel) Close() { + db.closeMut.Lock() + defer db.closeMut.Unlock() + if db.closed { + return + } + db.closed = true + db.DB.Close() +} + // NewLowlevel wraps the given *leveldb.DB into a *lowlevel func NewLowlevel(db *leveldb.DB, location string) *Lowlevel { return &Lowlevel{ @@ -110,6 +145,7 @@ func NewLowlevel(db *leveldb.DB, location string) *Lowlevel { location: location, folderIdx: newSmallIndex(db, []byte{KeyTypeFolderIdx}), deviceIdx: newSmallIndex(db, []byte{KeyTypeDeviceIdx}), + closeMut: &sync.RWMutex{}, } } @@ -150,7 +186,37 @@ func (b *batch) checkFlush() { } func (b *batch) flush() { - if err := b.db.Write(b.Batch, nil); err != nil { + if err := b.db.Write(b.Batch, nil); err != nil && err != leveldb.ErrClosed { panic(err) } } + +type closedIter struct{} + +func (it *closedIter) Release() {} +func (it *closedIter) Key() []byte { return nil } +func (it *closedIter) Value() []byte { return nil } +func (it *closedIter) Next() bool { return false } +func (it *closedIter) Prev() bool { return false } +func (it *closedIter) First() bool { return false } +func (it *closedIter) Last() bool { return false } +func (it *closedIter) Seek(key []byte) bool { return false } +func (it *closedIter) Valid() bool { return false } +func (it *closedIter) Error() error { return leveldb.ErrClosed } +func (it *closedIter) SetReleaser(releaser util.Releaser) {} + +type snapshot interface { + Get([]byte, *opt.ReadOptions) ([]byte, error) + Has([]byte, *opt.ReadOptions) (bool, error) + NewIterator(*util.Range, *opt.ReadOptions) iterator.Iterator + Release() +} + +type closedSnap struct{} + +func (s *closedSnap) Get([]byte, *opt.ReadOptions) ([]byte, error) { return nil, leveldb.ErrClosed } +func (s *closedSnap) Has([]byte, *opt.ReadOptions) (bool, error) { return false, leveldb.ErrClosed } +func (s *closedSnap) NewIterator(*util.Range, *opt.ReadOptions) iterator.Iterator { + return &closedIter{} +} +func (s *closedSnap) Release() {} diff --git a/lib/db/transactions.go b/lib/db/transactions.go index ebbe6363..b1945305 100644 --- a/lib/db/transactions.go +++ b/lib/db/transactions.go @@ -14,17 +14,13 @@ import ( // A readOnlyTransaction represents a database snapshot. type readOnlyTransaction struct { - *leveldb.Snapshot + snapshot keyer keyer } func (db *instance) newReadOnlyTransaction() readOnlyTransaction { - snap, err := db.GetSnapshot() - if err != nil { - panic(err) - } return readOnlyTransaction{ - Snapshot: snap, + snapshot: db.GetSnapshot(), keyer: db.keyer, } } @@ -129,7 +125,10 @@ func (t readWriteTransaction) updateGlobal(gk, keyBuf, folder, device []byte, fi if new, ok := t.getFileByKey(keyBuf); ok { global = new } else { - panic("This file must exist in the db") + // This file must exist in the db, so this must be caused + // by the db being closed - bail out. + l.Debugln("File should exist:", name) + return keyBuf, false } } @@ -246,7 +245,10 @@ func (t readWriteTransaction) removeFromGlobal(gk, keyBuf, folder, device []byte keyBuf = t.keyer.GenerateDeviceFileKey(keyBuf, folder, fl.Versions[0].Device, file) global, ok := t.getFileByKey(keyBuf) if !ok { - panic("This file must exist in the db") + // This file must exist in the db, so this must be caused + // by the db being closed - bail out. + l.Debugln("File should exist:", file) + return keyBuf } keyBuf = t.updateLocalNeed(keyBuf, folder, file, fl, global) meta.addFile(protocol.GlobalDeviceID, global) diff --git a/lib/model/folder_recvonly_test.go b/lib/model/folder_recvonly_test.go index cccd3ebb..bdacbe5b 100644 --- a/lib/model/folder_recvonly_test.go +++ b/lib/model/folder_recvonly_test.go @@ -32,6 +32,7 @@ func TestRecvOnlyRevertDeletes(t *testing.T) { ffs := f.Filesystem() defer os.Remove(m.cfg.ConfigPath()) defer os.Remove(ffs.URI()) + defer m.db.Close() defer m.Stop() // Create some test data @@ -115,6 +116,7 @@ func TestRecvOnlyRevertNeeds(t *testing.T) { ffs := f.Filesystem() defer os.Remove(m.cfg.ConfigPath()) defer os.Remove(ffs.URI()) + defer m.db.Close() defer m.Stop() // Create some test data @@ -208,6 +210,7 @@ func TestRecvOnlyUndoChanges(t *testing.T) { ffs := f.Filesystem() defer os.Remove(m.cfg.ConfigPath()) defer os.Remove(ffs.URI()) + defer m.db.Close() defer m.Stop() // Create some test data diff --git a/lib/model/model_test.go b/lib/model/model_test.go index 902d8f65..dbdc076d 100644 --- a/lib/model/model_test.go +++ b/lib/model/model_test.go @@ -195,7 +195,10 @@ func setupModel(w config.Wrapper) *model { func TestRequest(t *testing.T) { m := setupModel(defaultCfgWrapper) - defer m.Stop() + defer func() { + m.Stop() + m.db.Close() + }() // Existing, shared file res, err := m.Request(device1, "default", "foo", 6, 0, nil, 0, false) @@ -264,7 +267,10 @@ func BenchmarkIndex_100(b *testing.B) { func benchmarkIndex(b *testing.B, nfiles int) { m := setupModel(defaultCfgWrapper) - defer m.Stop() + defer func() { + m.Stop() + m.db.Close() + }() files := genFiles(nfiles) m.Index(device1, "default", files) @@ -290,7 +296,10 @@ func BenchmarkIndexUpdate_10000_1(b *testing.B) { func benchmarkIndexUpdate(b *testing.B, nfiles, nufiles int) { m := setupModel(defaultCfgWrapper) - defer m.Stop() + defer func() { + m.Stop() + m.db.Close() + }() files := genFiles(nfiles) ufiles := genFiles(nufiles) @@ -498,7 +507,10 @@ func (f *fakeConnection) sendIndexUpdate() { func BenchmarkRequestOut(b *testing.B) { m := setupModel(defaultCfgWrapper) - defer m.Stop() + defer func() { + m.Stop() + m.db.Close() + }() const n = 1000 files := genFiles(n) @@ -574,7 +586,10 @@ func TestDeviceRename(t *testing.T) { m.AddConnection(conn, hello) m.ServeBackground() - defer m.Stop() + defer func() { + m.Stop() + m.db.Close() + }() if cfg.Devices()[device1].Name != "" { t.Errorf("Device already has a name") @@ -666,7 +681,10 @@ func TestClusterConfig(t *testing.T) { m.AddFolder(cfg.Folders[0]) m.AddFolder(cfg.Folders[1]) m.ServeBackground() - defer m.Stop() + defer func() { + m.Stop() + m.db.Close() + }() cm := m.generateClusterConfig(device2) @@ -1150,7 +1168,10 @@ func TestIssue5063(t *testing.T) { os.RemoveAll(id) } }() - defer m.Stop() + defer func() { + m.Stop() + m.db.Close() + }() finished := make(chan struct{}) go func() { @@ -1328,6 +1349,7 @@ func TestAutoAcceptMultipleFolders(t *testing.T) { id2 := srand.String(8) defer os.RemoveAll(id2) defer m.Stop() + defer m.db.Close() m.ClusterConfig(device1, protocol.ClusterConfig{ Folders: []protocol.Folder{ { @@ -1365,6 +1387,7 @@ func TestAutoAcceptExistingFolder(t *testing.T) { wcfg, m := newState(tcfg) defer os.Remove(wcfg.ConfigPath()) defer m.Stop() + defer m.db.Close() if fcfg, ok := wcfg.Folder(id); !ok || fcfg.SharedWith(device1) { t.Error("missing folder, or shared", id) } @@ -1399,6 +1422,7 @@ func TestAutoAcceptNewAndExistingFolder(t *testing.T) { wcfg, m := newState(tcfg) defer os.Remove(wcfg.ConfigPath()) defer m.Stop() + defer m.db.Close() if fcfg, ok := wcfg.Folder(id1); !ok || fcfg.SharedWith(device1) { t.Error("missing folder, or shared", id1) } @@ -1441,6 +1465,7 @@ func TestAutoAcceptAlreadyShared(t *testing.T) { wcfg, m := newState(tcfg) defer os.Remove(wcfg.ConfigPath()) defer m.Stop() + defer m.db.Close() if fcfg, ok := wcfg.Folder(id); !ok || !fcfg.SharedWith(device1) { t.Error("missing folder, or not shared", id) } @@ -1470,6 +1495,7 @@ func TestAutoAcceptNameConflict(t *testing.T) { wcfg, m := newState(defaultAutoAcceptCfg) defer os.Remove(wcfg.ConfigPath()) defer m.Stop() + defer m.db.Close() m.ClusterConfig(device1, protocol.ClusterConfig{ Folders: []protocol.Folder{ { @@ -1492,6 +1518,7 @@ func TestAutoAcceptPrefersLabel(t *testing.T) { defer os.RemoveAll(id) defer os.RemoveAll(label) defer m.Stop() + defer m.db.Close() m.ClusterConfig(device1, protocol.ClusterConfig{ Folders: []protocol.Folder{ { @@ -1518,6 +1545,7 @@ func TestAutoAcceptFallsBackToID(t *testing.T) { defer os.RemoveAll(label) defer os.RemoveAll(id) defer m.Stop() + defer m.db.Close() m.ClusterConfig(device1, protocol.ClusterConfig{ Folders: []protocol.Folder{ { @@ -1551,6 +1579,7 @@ func TestAutoAcceptPausedWhenFolderConfigChanged(t *testing.T) { wcfg, m := newState(tcfg) defer os.Remove(wcfg.ConfigPath()) defer m.Stop() + defer m.db.Close() if fcfg, ok := wcfg.Folder(id); !ok || !fcfg.SharedWith(device1) { t.Error("missing folder, or not shared", id) } @@ -1609,6 +1638,7 @@ func TestAutoAcceptPausedWhenFolderConfigNotChanged(t *testing.T) { wcfg, m := newState(tcfg) defer os.Remove(wcfg.ConfigPath()) defer m.Stop() + defer m.db.Close() if fcfg, ok := wcfg.Folder(id); !ok || !fcfg.SharedWith(device1) { t.Error("missing folder, or not shared", id) } @@ -1713,7 +1743,10 @@ func TestIgnores(t *testing.T) { ioutil.WriteFile("testdata/.stignore", []byte(".*\nquux\n"), 0644) m := setupModel(defaultCfgWrapper) - defer m.Stop() + defer func() { + m.Stop() + m.db.Close() + }() // Reach in and update the ignore matcher to one that always does // reloads when asked to, instead of checking file mtimes. This is @@ -1817,7 +1850,10 @@ func TestROScanRecovery(t *testing.T) { m.AddFolder(fcfg) m.StartFolder("default") m.ServeBackground() - defer m.Stop() + defer func() { + m.Stop() + m.db.Close() + }() waitForState(t, m, "default", "folder path missing") @@ -1871,7 +1907,10 @@ func TestRWScanRecovery(t *testing.T) { m.AddFolder(fcfg) m.StartFolder("default") m.ServeBackground() - defer m.Stop() + defer func() { + m.Stop() + m.db.Close() + }() waitForState(t, m, "default", "folder path missing") @@ -1898,7 +1937,10 @@ func TestGlobalDirectoryTree(t *testing.T) { m := newModel(defaultCfgWrapper, myID, "syncthing", "dev", db, nil) m.AddFolder(defaultFolderConfig) m.ServeBackground() - defer m.Stop() + defer func() { + m.Stop() + m.db.Close() + }() b := func(isfile bool, path ...string) protocol.FileInfo { typ := protocol.FileInfoTypeDirectory @@ -2390,7 +2432,10 @@ func TestIssue4357(t *testing.T) { defer os.Remove(wrapper.ConfigPath()) m := newModel(wrapper, myID, "syncthing", "dev", db, nil) m.ServeBackground() - defer m.Stop() + defer func() { + m.Stop() + m.db.Close() + }() // Force the model to wire itself and add the folders p, err := wrapper.Replace(cfg) @@ -2539,7 +2584,10 @@ func TestSharedWithClearedOnDisconnect(t *testing.T) { defer os.Remove(wcfg.ConfigPath()) m := setupModel(wcfg) - defer m.Stop() + defer func() { + m.Stop() + m.db.Close() + }() conn1 := &fakeConnection{id: device1} m.AddConnection(conn1, protocol.HelloResult{}) @@ -2647,7 +2695,10 @@ func TestIssue3496(t *testing.T) { // checks on the completion calculation stuff. m := setupModel(defaultCfgWrapper) - defer m.Stop() + defer func() { + m.Stop() + m.db.Close() + }() m.ScanFolder("default") @@ -2716,7 +2767,10 @@ func TestIssue3496(t *testing.T) { func TestIssue3804(t *testing.T) { m := setupModel(defaultCfgWrapper) - defer m.Stop() + defer func() { + m.Stop() + m.db.Close() + }() // Subdirs ending in slash should be accepted @@ -2727,7 +2781,10 @@ func TestIssue3804(t *testing.T) { func TestIssue3829(t *testing.T) { m := setupModel(defaultCfgWrapper) - defer m.Stop() + defer func() { + m.Stop() + m.db.Close() + }() // Empty subdirs should be accepted @@ -2747,7 +2804,10 @@ func TestNoRequestsFromPausedDevices(t *testing.T) { defer os.Remove(wcfg.ConfigPath()) m := setupModel(wcfg) - defer m.Stop() + defer func() { + m.Stop() + m.db.Close() + }() file := testDataExpected["foo"] files := m.folderFiles["default"] @@ -2982,7 +3042,10 @@ func TestCustomMarkerName(t *testing.T) { m.AddFolder(fcfg) m.StartFolder("default") m.ServeBackground() - defer m.Stop() + defer func() { + m.Stop() + m.db.Close() + }() waitForState(t, m, "default", "folder path missing") @@ -3005,7 +3068,10 @@ func TestRemoveDirWithContent(t *testing.T) { fd.Close() m := setupModel(defaultCfgWrapper) - defer m.Stop() + defer func() { + m.Stop() + m.db.Close() + }() dir, ok := m.CurrentFolderFile("default", "dirwith") if !ok { @@ -3315,7 +3381,10 @@ func TestPausedFolders(t *testing.T) { defer os.Remove(wrapper.ConfigPath()) m := setupModel(wrapper) - defer m.Stop() + defer func() { + m.Stop() + m.db.Close() + }() if err := m.ScanFolder("default"); err != nil { t.Error(err) @@ -3345,7 +3414,10 @@ func TestIssue4094(t *testing.T) { defer os.Remove(wrapper.ConfigPath()) m := newModel(wrapper, myID, "syncthing", "dev", db, nil) m.ServeBackground() - defer m.Stop() + defer func() { + m.Stop() + m.db.Close() + }() // Force the model to wire itself and add the folders folderPath := "nonexistent" @@ -3382,7 +3454,10 @@ func TestIssue4903(t *testing.T) { defer os.Remove(wrapper.ConfigPath()) m := newModel(wrapper, myID, "syncthing", "dev", db, nil) m.ServeBackground() - defer m.Stop() + defer func() { + m.Stop() + m.db.Close() + }() // Force the model to wire itself and add the folders folderPath := "nonexistent" @@ -3414,7 +3489,10 @@ func TestIssue5002(t *testing.T) { // recheckFile should not panic when given an index equal to the number of blocks m := setupModel(defaultCfgWrapper) - defer m.Stop() + defer func() { + m.Stop() + m.db.Close() + }() if err := m.ScanFolder("default"); err != nil { t.Error(err) @@ -3477,7 +3555,10 @@ func TestFolderRestartZombies(t *testing.T) { wrapper.SetFolder(folderCfg) m := setupModel(wrapper) - defer m.Stop() + defer func() { + m.Stop() + m.db.Close() + }() // Make sure the folder is up and running, because we want to count it. m.ScanFolder("default") @@ -3567,7 +3648,10 @@ func TestRequestLimit(t *testing.T) { dev.MaxRequestKiB = 1 wrapper.SetDevice(dev) m, _ := setupModelWithConnectionFromWrapper(wrapper) - defer m.Stop() + defer func() { + m.Stop() + m.db.Close() + }() file := "tmpfile" befReq := time.Now() diff --git a/lib/model/requests_test.go b/lib/model/requests_test.go index 661135d6..91f4707a 100644 --- a/lib/model/requests_test.go +++ b/lib/model/requests_test.go @@ -32,6 +32,7 @@ func TestRequestSimple(t *testing.T) { tfs := fcfg.Filesystem() defer func() { m.Stop() + m.db.Close() os.RemoveAll(tfs.URI()) os.Remove(w.ConfigPath()) }() @@ -78,6 +79,7 @@ func TestSymlinkTraversalRead(t *testing.T) { m, fc, fcfg, w := setupModelWithConnection() defer func() { m.Stop() + m.db.Close() os.RemoveAll(fcfg.Filesystem().URI()) os.Remove(w.ConfigPath()) }() @@ -125,6 +127,7 @@ func TestSymlinkTraversalWrite(t *testing.T) { m, fc, fcfg, w := setupModelWithConnection() defer func() { m.Stop() + m.db.Close() os.RemoveAll(fcfg.Filesystem().URI()) os.Remove(w.ConfigPath()) }() @@ -188,6 +191,7 @@ func TestRequestCreateTmpSymlink(t *testing.T) { m, fc, fcfg, w := setupModelWithConnection() defer func() { m.Stop() + m.db.Close() os.RemoveAll(fcfg.Filesystem().URI()) os.Remove(w.ConfigPath()) }() @@ -238,8 +242,8 @@ func TestRequestVersioningSymlinkAttack(t *testing.T) { fcfg.Versioning = config.VersioningConfiguration{Type: "trashcan"} w.SetFolder(fcfg) - m, fc := setupModelWithConnectionFromWrapper(w) + defer m.db.Close() defer m.Stop() // Create a temporary directory that we will use as target to see if @@ -312,6 +316,7 @@ func pullInvalidIgnored(t *testing.T, ft config.FolderType) { m, fc := setupModelWithConnectionFromWrapper(w) defer func() { m.Stop() + m.db.Close() os.RemoveAll(fss.URI()) os.Remove(w.ConfigPath()) }() @@ -430,6 +435,7 @@ func TestIssue4841(t *testing.T) { m, fc, fcfg, w := setupModelWithConnection() defer func() { m.Stop() + m.db.Close() os.RemoveAll(fcfg.Filesystem().URI()) os.Remove(w.ConfigPath()) }() @@ -473,6 +479,7 @@ func TestRescanIfHaveInvalidContent(t *testing.T) { tmpDir := fcfg.Filesystem().URI() defer func() { m.Stop() + m.db.Close() os.RemoveAll(tmpDir) os.Remove(w.ConfigPath()) }() @@ -538,6 +545,7 @@ func TestParentDeletion(t *testing.T) { testFs := fcfg.Filesystem() defer func() { m.Stop() + m.db.Close() os.RemoveAll(testFs.URI()) os.Remove(w.ConfigPath()) }() @@ -620,6 +628,7 @@ func TestRequestSymlinkWindows(t *testing.T) { m, fc, fcfg, w := setupModelWithConnection() defer func() { m.Stop() + m.db.Close() os.RemoveAll(fcfg.Filesystem().URI()) os.Remove(w.ConfigPath()) }() @@ -737,6 +746,7 @@ func TestRequestRemoteRenameChanged(t *testing.T) { tmpDir := tfs.URI() defer func() { m.Stop() + m.db.Close() os.RemoveAll(tmpDir) os.Remove(w.ConfigPath()) }() @@ -869,6 +879,7 @@ func TestRequestRemoteRenameConflict(t *testing.T) { tmpDir := tfs.URI() defer func() { m.Stop() + m.db.Close() os.RemoveAll(tmpDir) os.Remove(w.ConfigPath()) }() @@ -963,6 +974,7 @@ func TestRequestDeleteChanged(t *testing.T) { tfs := fcfg.Filesystem() defer func() { m.Stop() + m.db.Close() os.RemoveAll(tfs.URI()) os.Remove(w.ConfigPath()) }()