lib/db: Refactor: use a Lowlevel type underneath Instance (ref #5198) (#5212)

This adds a thin type that holds the state associated with the
leveldb.DB, leaving the huge Instance type more or less stateless. Also
moves some keying stuff into the DB package so that other packages need
not know the keying specifics.

(This does not, yet, fix the cmd/stindex program, in order to keep the
diff size down. Hence the keying constants are still exported.)
This commit is contained in:
Jakob Borg
2018-10-10 11:34:24 +02:00
committed by GitHub
parent 8e645ab782
commit b50d57b7fd
22 changed files with 382 additions and 374 deletions

View File

@@ -10,87 +10,28 @@ import (
"bytes"
"encoding/binary"
"fmt"
"os"
"sort"
"strings"
"sync/atomic"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/storage"
"github.com/syndtr/goleveldb/leveldb/util"
)
type deletionHandler func(t readWriteTransaction, folder, device, name []byte, dbi iterator.Iterator)
type Instance struct {
committed int64 // this must be the first attribute in the struct to ensure 64 bit alignment on 32 bit plaforms
*leveldb.DB
location string
folderIdx *smallIndex
deviceIdx *smallIndex
keyer keyer
type instance struct {
*Lowlevel
keyer keyer
}
func Open(file string) (*Instance, error) {
opts := &opt.Options{
OpenFilesCacheCapacity: 100,
WriteBuffer: 4 << 20,
func newInstance(ll *Lowlevel) *instance {
return &instance{
Lowlevel: ll,
keyer: newDefaultKeyer(ll.folderIdx, ll.deviceIdx),
}
db, err := leveldb.OpenFile(file, opts)
if leveldbIsCorrupted(err) {
db, err = leveldb.RecoverFile(file, opts)
}
if leveldbIsCorrupted(err) {
// The database is corrupted, and we've tried to recover it but it
// didn't work. At this point there isn't much to do beyond dropping
// the database and reindexing...
l.Infoln("Database corruption detected, unable to recover. Reinitializing...")
if err := os.RemoveAll(file); err != nil {
return nil, errorSuggestion{err, "failed to delete corrupted database"}
}
db, err = leveldb.OpenFile(file, opts)
}
if err != nil {
return nil, errorSuggestion{err, "is another instance of Syncthing running?"}
}
return newDBInstance(db, file)
}
func OpenMemory() *Instance {
db, _ := leveldb.Open(storage.NewMemStorage(), nil)
ldb, _ := newDBInstance(db, "<memory>")
return ldb
}
func newDBInstance(db *leveldb.DB, location string) (*Instance, error) {
i := &Instance{
DB: db,
location: location,
folderIdx: newSmallIndex(db, []byte{KeyTypeFolderIdx}),
deviceIdx: newSmallIndex(db, []byte{KeyTypeDeviceIdx}),
}
i.keyer = newDefaultKeyer(i.folderIdx, i.deviceIdx)
err := i.updateSchema()
return i, err
}
// Committed returns the number of items committed to the database since startup
func (db *Instance) Committed() int64 {
return atomic.LoadInt64(&db.committed)
}
// Location returns the filesystem path where the database is stored
func (db *Instance) Location() string {
return db.location
}
func (db *Instance) updateFiles(folder, device []byte, fs []protocol.FileInfo, meta *metadataTracker) {
func (db *instance) updateFiles(folder, device []byte, fs []protocol.FileInfo, meta *metadataTracker) {
t := db.newReadWriteTransaction()
defer t.close()
@@ -131,7 +72,7 @@ func (db *Instance) updateFiles(folder, device []byte, fs []protocol.FileInfo, m
}
}
func (db *Instance) addSequences(folder []byte, fs []protocol.FileInfo) {
func (db *instance) addSequences(folder []byte, fs []protocol.FileInfo) {
t := db.newReadWriteTransaction()
defer t.close()
@@ -146,7 +87,7 @@ func (db *Instance) addSequences(folder []byte, fs []protocol.FileInfo) {
}
}
func (db *Instance) removeSequences(folder []byte, fs []protocol.FileInfo) {
func (db *instance) removeSequences(folder []byte, fs []protocol.FileInfo) {
t := db.newReadWriteTransaction()
defer t.close()
@@ -158,7 +99,7 @@ func (db *Instance) removeSequences(folder []byte, fs []protocol.FileInfo) {
}
}
func (db *Instance) withHave(folder, device, prefix []byte, truncate bool, fn Iterator) {
func (db *instance) withHave(folder, device, prefix []byte, truncate bool, fn Iterator) {
if len(prefix) > 0 {
unslashedPrefix := prefix
if bytes.HasSuffix(prefix, []byte{'/'}) {
@@ -199,7 +140,7 @@ func (db *Instance) withHave(folder, device, prefix []byte, truncate bool, fn It
}
}
func (db *Instance) withHaveSequence(folder []byte, startSeq int64, fn Iterator) {
func (db *instance) withHaveSequence(folder []byte, startSeq int64, fn Iterator) {
t := db.newReadOnlyTransaction()
defer t.close()
@@ -226,7 +167,7 @@ func (db *Instance) withHaveSequence(folder []byte, startSeq int64, fn Iterator)
}
}
func (db *Instance) withAllFolderTruncated(folder []byte, fn func(device []byte, f FileInfoTruncated) bool) {
func (db *instance) withAllFolderTruncated(folder []byte, fn func(device []byte, f FileInfoTruncated) bool) {
t := db.newReadWriteTransaction()
defer t.close()
@@ -271,14 +212,14 @@ func (db *Instance) withAllFolderTruncated(folder []byte, fn func(device []byte,
}
}
func (db *Instance) getFile(key []byte) (protocol.FileInfo, bool) {
func (db *instance) getFile(key []byte) (protocol.FileInfo, bool) {
if f, ok := db.getFileTrunc(key, false); ok {
return f.(protocol.FileInfo), true
}
return protocol.FileInfo{}, false
}
func (db *Instance) getFileTrunc(key []byte, trunc bool) (FileIntf, bool) {
func (db *instance) getFileTrunc(key []byte, trunc bool) (FileIntf, bool) {
bs, err := db.Get(key, nil)
if err == leveldb.ErrNotFound {
return nil, false
@@ -296,7 +237,7 @@ func (db *Instance) getFileTrunc(key []byte, trunc bool) (FileIntf, bool) {
return f, true
}
func (db *Instance) getGlobal(folder, file []byte, truncate bool) (FileIntf, bool) {
func (db *instance) getGlobal(folder, file []byte, truncate bool) (FileIntf, bool) {
t := db.newReadOnlyTransaction()
defer t.close()
@@ -304,7 +245,7 @@ func (db *Instance) getGlobal(folder, file []byte, truncate bool) (FileIntf, boo
return f, ok
}
func (db *Instance) getGlobalInto(t readOnlyTransaction, gk, dk, folder, file []byte, truncate bool) ([]byte, []byte, FileIntf, bool) {
func (db *instance) getGlobalInto(t readOnlyTransaction, gk, dk, folder, file []byte, truncate bool) ([]byte, []byte, FileIntf, bool) {
gk = db.keyer.GenerateGlobalVersionKey(gk, folder, file)
bs, err := t.Get(gk, nil)
@@ -325,7 +266,7 @@ func (db *Instance) getGlobalInto(t readOnlyTransaction, gk, dk, folder, file []
return gk, dk, nil, false
}
func (db *Instance) withGlobal(folder, prefix []byte, truncate bool, fn Iterator) {
func (db *instance) withGlobal(folder, prefix []byte, truncate bool, fn Iterator) {
if len(prefix) > 0 {
unslashedPrefix := prefix
if bytes.HasSuffix(prefix, []byte{'/'}) {
@@ -370,7 +311,7 @@ func (db *Instance) withGlobal(folder, prefix []byte, truncate bool, fn Iterator
}
}
func (db *Instance) availability(folder, file []byte) []protocol.DeviceID {
func (db *instance) availability(folder, file []byte) []protocol.DeviceID {
k := db.keyer.GenerateGlobalVersionKey(nil, folder, file)
bs, err := db.Get(k, nil)
if err == leveldb.ErrNotFound {
@@ -401,7 +342,7 @@ func (db *Instance) availability(folder, file []byte) []protocol.DeviceID {
return devices
}
func (db *Instance) withNeed(folder, device []byte, truncate bool, fn Iterator) {
func (db *instance) withNeed(folder, device []byte, truncate bool, fn Iterator) {
if bytes.Equal(device, protocol.LocalDeviceID[:]) {
db.withNeedLocal(folder, truncate, fn)
return
@@ -473,7 +414,7 @@ func (db *Instance) withNeed(folder, device []byte, truncate bool, fn Iterator)
}
}
func (db *Instance) withNeedLocal(folder []byte, truncate bool, fn Iterator) {
func (db *instance) withNeedLocal(folder []byte, truncate bool, fn Iterator) {
t := db.newReadOnlyTransaction()
defer t.close()
@@ -495,31 +436,7 @@ func (db *Instance) withNeedLocal(folder []byte, truncate bool, fn Iterator) {
}
}
func (db *Instance) ListFolders() []string {
t := db.newReadOnlyTransaction()
defer t.close()
dbi := t.NewIterator(util.BytesPrefix([]byte{KeyTypeGlobal}), nil)
defer dbi.Release()
folderExists := make(map[string]bool)
for dbi.Next() {
folder, ok := db.keyer.FolderFromGlobalVersionKey(dbi.Key())
if ok && !folderExists[string(folder)] {
folderExists[string(folder)] = true
}
}
folders := make([]string, 0, len(folderExists))
for k := range folderExists {
folders = append(folders, k)
}
sort.Strings(folders)
return folders
}
func (db *Instance) dropFolder(folder []byte) {
func (db *instance) dropFolder(folder []byte) {
t := db.newReadWriteTransaction()
defer t.close()
@@ -537,7 +454,7 @@ func (db *Instance) dropFolder(folder []byte) {
}
}
func (db *Instance) dropDeviceFolder(device, folder []byte, meta *metadataTracker) {
func (db *instance) dropDeviceFolder(device, folder []byte, meta *metadataTracker) {
t := db.newReadWriteTransaction()
defer t.close()
@@ -556,7 +473,7 @@ func (db *Instance) dropDeviceFolder(device, folder []byte, meta *metadataTracke
}
}
func (db *Instance) checkGlobals(folder []byte, meta *metadataTracker) {
func (db *instance) checkGlobals(folder []byte, meta *metadataTracker) {
t := db.newReadWriteTransaction()
defer t.close()
@@ -604,7 +521,7 @@ func (db *Instance) checkGlobals(folder []byte, meta *metadataTracker) {
l.Debugf("db check completed for %q", folder)
}
func (db *Instance) getIndexID(device, folder []byte) protocol.IndexID {
func (db *instance) getIndexID(device, folder []byte) protocol.IndexID {
key := db.keyer.GenerateIndexIDKey(nil, device, folder)
cur, err := db.Get(key, nil)
if err != nil {
@@ -619,7 +536,7 @@ func (db *Instance) getIndexID(device, folder []byte) protocol.IndexID {
return id
}
func (db *Instance) setIndexID(device, folder []byte, id protocol.IndexID) {
func (db *instance) setIndexID(device, folder []byte, id protocol.IndexID) {
key := db.keyer.GenerateIndexIDKey(nil, device, folder)
bs, _ := id.Marshal() // marshalling can't fail
if err := db.Put(key, bs, nil); err != nil {
@@ -627,44 +544,15 @@ func (db *Instance) setIndexID(device, folder []byte, id protocol.IndexID) {
}
}
// DropLocalDeltaIndexIDs removes all index IDs for the local device ID from
// the database. This will cause a full index transmission on the next
// connection.
func (db *Instance) DropLocalDeltaIndexIDs() {
db.dropDeltaIndexIDs(true)
}
// DropRemoteDeltaIndexIDs removes all index IDs for the other devices than
// the local one from the database. This will cause them to send us a full
// index on the next connection.
func (db *Instance) DropRemoteDeltaIndexIDs() {
db.dropDeltaIndexIDs(false)
}
func (db *Instance) dropDeltaIndexIDs(local bool) {
t := db.newReadWriteTransaction()
defer t.close()
dbi := t.NewIterator(util.BytesPrefix([]byte{KeyTypeIndexID}), nil)
defer dbi.Release()
for dbi.Next() {
device, _ := db.keyer.DeviceFromIndexIDKey(dbi.Key())
if bytes.Equal(device, protocol.LocalDeviceID[:]) == local {
t.Delete(dbi.Key())
}
}
}
func (db *Instance) dropMtimes(folder []byte) {
func (db *instance) dropMtimes(folder []byte) {
db.dropPrefix(db.keyer.GenerateMtimesKey(nil, folder))
}
func (db *Instance) dropFolderMeta(folder []byte) {
func (db *instance) dropFolderMeta(folder []byte) {
db.dropPrefix(db.keyer.GenerateFolderMetaKey(nil, folder))
}
func (db *Instance) dropPrefix(prefix []byte) {
func (db *instance) dropPrefix(prefix []byte) {
t := db.newReadWriteTransaction()
defer t.close()
@@ -701,22 +589,6 @@ func unmarshalVersionList(data []byte) (VersionList, bool) {
return vl, true
}
// A "better" version of leveldb's errors.IsCorrupted.
func leveldbIsCorrupted(err error) bool {
switch {
case err == nil:
return false
case errors.IsCorrupted(err):
return true
case strings.Contains(err.Error(), "corrupted"):
return true
}
return false
}
type errorSuggestion struct {
inner error
suggestion string