From d48e46a29c386b2e857f73b13d3273b41871d01b Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Thu, 13 Apr 2017 17:14:34 +0000 Subject: [PATCH] cmd/syncthing: Allow custom event subscriptions (fixes #1879) This adds a parameter "events" to the /rest/events endpoint. It should be a comma separated list of the events the consumer is interested in. When not given it defaults to the current set of events, so it's backwards compatible. The API service then manages subscriptions, creating them as required for each requested event mask. Old subscriptions are not "garbage collected" - it's assumed that in normal usage the set of event subscriptions will be small enough. Possibly lower than before, as we will not set up the disk event subscription unless it's actually used. GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/4092 --- cmd/syncthing/gui.go | 65 ++++++++++++++++++++++++++++++--------- cmd/syncthing/gui_test.go | 32 +++++++++++++++++++ cmd/syncthing/main.go | 10 +++--- lib/events/events.go | 61 ++++++++++++++++++++++++++++++++++++ 4 files changed, 149 insertions(+), 19 deletions(-) diff --git a/cmd/syncthing/gui.go b/cmd/syncthing/gui.go index ca431e6b..05a2b19f 100644 --- a/cmd/syncthing/gui.go +++ b/cmd/syncthing/gui.go @@ -45,6 +45,12 @@ var ( startTime = time.Now() ) +const ( + defaultEventMask = events.AllEvents &^ events.LocalChangeDetected &^ events.RemoteChangeDetected + diskEventMask = events.LocalChangeDetected | events.RemoteChangeDetected + eventSubBufferSize = 1000 +) + type apiService struct { id protocol.DeviceID cfg configIntf @@ -52,8 +58,8 @@ type apiService struct { httpsKeyFile string statics *staticsServer model modelIntf - eventSub events.BufferedSubscription - diskEventSub events.BufferedSubscription + eventSubs map[events.EventType]events.BufferedSubscription + eventSubsMut sync.Mutex discoverer discover.CachingMux connectionsService connectionsIntf fss *folderSummaryService @@ -114,16 +120,19 @@ type connectionsIntf interface { Status() map[string]interface{} } -func newAPIService(id protocol.DeviceID, cfg configIntf, httpsCertFile, httpsKeyFile, assetDir string, m modelIntf, eventSub events.BufferedSubscription, diskEventSub events.BufferedSubscription, discoverer discover.CachingMux, connectionsService connectionsIntf, errors, systemLog logger.Recorder) *apiService { +func newAPIService(id protocol.DeviceID, cfg configIntf, httpsCertFile, httpsKeyFile, assetDir string, m modelIntf, defaultSub, diskSub events.BufferedSubscription, discoverer discover.CachingMux, connectionsService connectionsIntf, errors, systemLog logger.Recorder) *apiService { service := &apiService{ - id: id, - cfg: cfg, - httpsCertFile: httpsCertFile, - httpsKeyFile: httpsKeyFile, - statics: newStaticsServer(cfg.GUI().Theme, assetDir), - model: m, - eventSub: eventSub, - diskEventSub: diskEventSub, + id: id, + cfg: cfg, + httpsCertFile: httpsCertFile, + httpsKeyFile: httpsKeyFile, + statics: newStaticsServer(cfg.GUI().Theme, assetDir), + model: m, + eventSubs: map[events.EventType]events.BufferedSubscription{ + defaultEventMask: defaultSub, + diskEventMask: diskSub, + }, + eventSubsMut: sync.NewMutex(), discoverer: discoverer, connectionsService: connectionsService, systemConfigMut: sync.NewMutex(), @@ -234,7 +243,7 @@ func (s *apiService) Serve() { getRestMux.HandleFunc("/rest/db/need", s.getDBNeed) // folder [perpage] [page] getRestMux.HandleFunc("/rest/db/status", s.getDBStatus) // folder getRestMux.HandleFunc("/rest/db/browse", s.getDBBrowse) // folder [prefix] [dirsonly] [levels] - getRestMux.HandleFunc("/rest/events", s.getIndexEvents) // [since] [limit] [timeout] + getRestMux.HandleFunc("/rest/events", s.getIndexEvents) // [since] [limit] [timeout] [events] getRestMux.HandleFunc("/rest/events/disk", s.getDiskEvents) // [since] [limit] [timeout] getRestMux.HandleFunc("/rest/stats/device", s.getDeviceStats) // - getRestMux.HandleFunc("/rest/stats/folder", s.getFolderStats) // - @@ -1011,11 +1020,14 @@ func (s *apiService) postDBIgnores(w http.ResponseWriter, r *http.Request) { func (s *apiService) getIndexEvents(w http.ResponseWriter, r *http.Request) { s.fss.gotEventRequest() - s.getEvents(w, r, s.eventSub) + mask := s.getEventMask(r.URL.Query().Get("events")) + sub := s.getEventSub(mask) + s.getEvents(w, r, sub) } func (s *apiService) getDiskEvents(w http.ResponseWriter, r *http.Request) { - s.getEvents(w, r, s.diskEventSub) + sub := s.getEventSub(diskEventMask) + s.getEvents(w, r, sub) } func (s *apiService) getEvents(w http.ResponseWriter, r *http.Request, eventSub events.BufferedSubscription) { @@ -1047,6 +1059,31 @@ func (s *apiService) getEvents(w http.ResponseWriter, r *http.Request, eventSub sendJSON(w, evs) } +func (s *apiService) getEventMask(evs string) events.EventType { + eventMask := defaultEventMask + if evs != "" { + eventList := strings.Split(evs, ",") + eventMask = 0 + for _, ev := range eventList { + eventMask |= events.UnmarshalEventType(strings.TrimSpace(ev)) + } + } + return eventMask +} + +func (s *apiService) getEventSub(mask events.EventType) events.BufferedSubscription { + s.eventSubsMut.Lock() + bufsub, ok := s.eventSubs[mask] + if !ok { + evsub := events.Default.Subscribe(mask) + bufsub = events.NewBufferedSubscription(evsub, eventSubBufferSize) + s.eventSubs[mask] = bufsub + } + s.eventSubsMut.Unlock() + + return bufsub +} + func (s *apiService) getSystemUpgrade(w http.ResponseWriter, r *http.Request) { if noUpgradeFromEnv { http.Error(w, upgrade.ErrUpgradeUnsupported.Error(), 500) diff --git a/cmd/syncthing/gui_test.go b/cmd/syncthing/gui_test.go index bd2919f8..92e2df7f 100644 --- a/cmd/syncthing/gui_test.go +++ b/cmd/syncthing/gui_test.go @@ -23,6 +23,7 @@ import ( "github.com/d4l3k/messagediff" "github.com/syncthing/syncthing/lib/config" + "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/sync" "github.com/thejerf/suture" @@ -924,3 +925,34 @@ func TestOptionsRequest(t *testing.T) { t.Fatal("OPTIONS on /rest/system/status should return a 'Access-Control-Allow-Headers: Content-Type, X-API-KEY' header") } } + +func TestEventMasks(t *testing.T) { + cfg := new(mockedConfig) + defSub := new(mockedEventSub) + diskSub := new(mockedEventSub) + svc := newAPIService(protocol.LocalDeviceID, cfg, "", "", "", nil, defSub, diskSub, nil, nil, nil, nil) + + if mask := svc.getEventMask(""); mask != defaultEventMask { + t.Errorf("incorrect default mask %x != %x", int64(mask), int64(defaultEventMask)) + } + + expected := events.FolderSummary | events.LocalChangeDetected + if mask := svc.getEventMask("FolderSummary,LocalChangeDetected"); mask != expected { + t.Errorf("incorrect parsed mask %x != %x", int64(mask), int64(expected)) + } + + expected = 0 + if mask := svc.getEventMask("WeirdEvent,something else that doens't exist"); mask != expected { + t.Errorf("incorrect parsed mask %x != %x", int64(mask), int64(expected)) + } + + if res := svc.getEventSub(defaultEventMask); res != defSub { + t.Errorf("should have returned the given default event sub") + } + if res := svc.getEventSub(diskEventMask); res != diskSub { + t.Errorf("should have returned the given disk event sub") + } + if res := svc.getEventSub(events.LocalIndexUpdated); res == nil || res == defSub || res == diskSub { + t.Errorf("should have returned a valid, non-default event sub") + } +} diff --git a/cmd/syncthing/main.go b/cmd/syncthing/main.go index 26faaaef..13ba2a55 100644 --- a/cmd/syncthing/main.go +++ b/cmd/syncthing/main.go @@ -637,8 +637,8 @@ func syncthingMain(runtimeOptions RuntimeOptions) { // Event subscription for the API; must start early to catch the early // events. The LocalChangeDetected event might overwhelm the event // receiver in some situations so we will not subscribe to it here. - apiSub := events.NewBufferedSubscription(events.Default.Subscribe(events.AllEvents&^events.LocalChangeDetected&^events.RemoteChangeDetected), 1000) - diskSub := events.NewBufferedSubscription(events.Default.Subscribe(events.LocalChangeDetected|events.RemoteChangeDetected), 1000) + defaultSub := events.NewBufferedSubscription(events.Default.Subscribe(defaultEventMask), eventSubBufferSize) + diskSub := events.NewBufferedSubscription(events.Default.Subscribe(diskEventMask), eventSubBufferSize) if len(os.Getenv("GOMAXPROCS")) == 0 { runtime.GOMAXPROCS(runtime.NumCPU()) @@ -868,7 +868,7 @@ func syncthingMain(runtimeOptions RuntimeOptions) { // GUI - setupGUI(mainService, cfg, m, apiSub, diskSub, cachedDiscovery, connectionsService, errors, systemLog, runtimeOptions) + setupGUI(mainService, cfg, m, defaultSub, diskSub, cachedDiscovery, connectionsService, errors, systemLog, runtimeOptions) if runtimeOptions.cpuProfile { f, err := os.Create(fmt.Sprintf("cpu-%d.pprof", os.Getpid())) @@ -1086,7 +1086,7 @@ func startAuditing(mainService *suture.Supervisor, auditFile string) { l.Infoln("Audit log in", auditDest) } -func setupGUI(mainService *suture.Supervisor, cfg *config.Wrapper, m *model.Model, apiSub events.BufferedSubscription, diskSub events.BufferedSubscription, discoverer discover.CachingMux, connectionsService *connections.Service, errors, systemLog logger.Recorder, runtimeOptions RuntimeOptions) { +func setupGUI(mainService *suture.Supervisor, cfg *config.Wrapper, m *model.Model, defaultSub, diskSub events.BufferedSubscription, discoverer discover.CachingMux, connectionsService *connections.Service, errors, systemLog logger.Recorder, runtimeOptions RuntimeOptions) { guiCfg := cfg.GUI() if !guiCfg.Enabled { @@ -1097,7 +1097,7 @@ func setupGUI(mainService *suture.Supervisor, cfg *config.Wrapper, m *model.Mode l.Warnln("Insecure admin access is enabled.") } - api := newAPIService(myID, cfg, locations[locHTTPSCertFile], locations[locHTTPSKeyFile], runtimeOptions.assetDir, m, apiSub, diskSub, discoverer, connectionsService, errors, systemLog) + api := newAPIService(myID, cfg, locations[locHTTPSCertFile], locations[locHTTPSKeyFile], runtimeOptions.assetDir, m, defaultSub, diskSub, discoverer, connectionsService, errors, systemLog) cfg.Subscribe(api) mainService.Add(api) diff --git a/lib/events/events.go b/lib/events/events.go index 3eca53c1..6d4033e8 100644 --- a/lib/events/events.go +++ b/lib/events/events.go @@ -118,6 +118,67 @@ func (t EventType) MarshalText() ([]byte, error) { return []byte(t.String()), nil } +func UnmarshalEventType(s string) EventType { + switch s { + case "Starting": + return Starting + case "StartupComplete": + return StartupComplete + case "DeviceDiscovered": + return DeviceDiscovered + case "DeviceConnected": + return DeviceConnected + case "DeviceDisconnected": + return DeviceDisconnected + case "DeviceRejected": + return DeviceRejected + case "LocalChangeDetected": + return LocalChangeDetected + case "RemoteChangeDetected": + return RemoteChangeDetected + case "LocalIndexUpdated": + return LocalIndexUpdated + case "RemoteIndexUpdated": + return RemoteIndexUpdated + case "ItemStarted": + return ItemStarted + case "ItemFinished": + return ItemFinished + case "StateChanged": + return StateChanged + case "FolderRejected": + return FolderRejected + case "ConfigSaved": + return ConfigSaved + case "DownloadProgress": + return DownloadProgress + case "RemoteDownloadProgress": + return RemoteDownloadProgress + case "FolderSummary": + return FolderSummary + case "FolderCompletion": + return FolderCompletion + case "FolderErrors": + return FolderErrors + case "DevicePaused": + return DevicePaused + case "DeviceResumed": + return DeviceResumed + case "FolderScanProgress": + return FolderScanProgress + case "FolderPaused": + return FolderPaused + case "FolderResumed": + return FolderResumed + case "ListenAddressesChanged": + return ListenAddressesChanged + case "LoginAttempt": + return LoginAttempt + default: + return 0 + } +} + const BufferSize = 64 type Logger struct {