From 6679c84cfb2e1ed609ffd34952f8ceada5cba160 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Sun, 5 Jan 2014 16:16:37 +0100 Subject: [PATCH] Refactor statistics printing --- main.go | 34 ++++++++++++++++++ model.go | 84 +++++++++++++++++++------------------------- protocol/protocol.go | 37 ++++++++----------- util.go | 31 +++++++++++++++- 4 files changed, 115 insertions(+), 71 deletions(-) diff --git a/main.go b/main.go index 5ff6ab69..cd48c04e 100644 --- a/main.go +++ b/main.go @@ -174,9 +174,43 @@ func main() { } }() + // Periodically print statistics + go printStatsLoop(m) + select {} } +func printStatsLoop(m *Model) { + var lastUpdated int64 + var lastStats = make(map[string]protocol.Statistics) + + for { + time.Sleep(60 * time.Second) + + for node, stats := range m.ConnectionStats() { + secs := time.Since(lastStats[node].At).Seconds() + inbps := 8 * int(float64(stats.InBytesTotal-lastStats[node].InBytesTotal)/secs) + outbps := 8 * int(float64(stats.OutBytesTotal-lastStats[node].OutBytesTotal)/secs) + + if inbps+outbps > 0 { + infof("%s: %sb/s in, %sb/s out", node, MetricPrefix(inbps), MetricPrefix(outbps)) + } + + lastStats[node] = stats + } + + if lu := m.Generation(); lu > lastUpdated { + lastUpdated = lu + files, bytes := m.GlobalSize() + infof("%6d files, %9sB in cluster", files, BinaryPrefix(bytes)) + files, bytes = m.LocalSize() + infof("%6d files, %9sB in local repo", files, BinaryPrefix(bytes)) + files, bytes = m.NeedSize() + infof("%6d files, %9sB in to synchronize", files, BinaryPrefix(bytes)) + } + } +} + func listen(myID string, addr string, m *Model, cfg *tls.Config) { l, err := tls.Listen("tcp", addr, cfg) fatalErr(err) diff --git a/model.go b/model.go index e7567095..5059733f 100644 --- a/model.go +++ b/model.go @@ -60,7 +60,6 @@ func NewModel(dir string) *Model { lastIdxBcast: time.Now(), } - go m.printStatsLoop() go m.broadcastIndexLoop() return m } @@ -69,62 +68,55 @@ func (m *Model) Start() { go m.puller() } -func (m *Model) printStatsLoop() { - var lastUpdated int64 - for { - time.Sleep(60 * time.Second) - m.RLock() - m.printConnectionStats() - if m.updatedLocal+m.updateGlobal > lastUpdated { - m.printModelStats() - lastUpdated = m.updatedLocal + m.updateGlobal - } - m.RUnlock() - } +func (m *Model) Generation() int64 { + m.RLock() + defer m.RUnlock() + + return m.updatedLocal + m.updateGlobal } -func (m *Model) printConnectionStats() { +func (m *Model) ConnectionStats() map[string]protocol.Statistics { + m.RLock() + defer m.RUnlock() + + var res = make(map[string]protocol.Statistics) for node, conn := range m.nodes { - stats := conn.Statistics() - if stats.InBytesPerSec > 0 || stats.OutBytesPerSec > 0 { - infof("%s: %sB/s in, %sB/s out", node, toSI(stats.InBytesPerSec), toSI(stats.OutBytesPerSec)) - } + res[node] = conn.Statistics() } + return res } -func (m *Model) printModelStats() { - var tot int +func (m *Model) GlobalSize() (files, bytes int) { + m.RLock() + defer m.RUnlock() + + files = len(m.global) for _, f := range m.global { - tot += f.Size() - } - infof("%6d files, %8sB in cluster", len(m.global), toSI(tot)) - - if len(m.need) > 0 { - tot = 0 - for _, f := range m.local { - tot += f.Size() - } - infof("%6d files, %8sB in local repo", len(m.local), toSI(tot)) - - tot = 0 - for n := range m.need { - tot += m.global[n].Size() - } - infof("%6d files, %8sB to synchronize", len(m.need), toSI(tot)) + bytes += f.Size() } + return } -func toSI(n int) string { - if n > 1<<30 { - return fmt.Sprintf("%.02f G", float64(n)/(1<<30)) +func (m *Model) LocalSize() (files, bytes int) { + m.RLock() + defer m.RUnlock() + + files = len(m.local) + for _, f := range m.local { + bytes += f.Size() } - if n > 1<<20 { - return fmt.Sprintf("%.02f M", float64(n)/(1<<20)) + return +} + +func (m *Model) NeedSize() (files, bytes int) { + m.RLock() + defer m.RUnlock() + + files = len(m.need) + for n := range m.need { + bytes += m.global[n].Size() } - if n > 1<<10 { - return fmt.Sprintf("%.01f K", float64(n)/(1<<10)) - } - return fmt.Sprintf("%d ", n) + return } // Index is called when a new node is connected and we receive their full index. @@ -147,7 +139,6 @@ func (m *Model) Index(nodeID string, fs []protocol.FileInfo) { m.recomputeGlobal() m.recomputeNeed() - m.printModelStats() } // IndexUpdate is called for incremental updates to connected nodes' indexes. @@ -188,7 +179,6 @@ func (m *Model) SeedIndex(fs []protocol.FileInfo) { m.recomputeGlobal() m.recomputeNeed() - m.printModelStats() } func (m *Model) Close(node string, err error) { diff --git a/protocol/protocol.go b/protocol/protocol.go index 9fc2bb46..0dc4e3ce 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -60,7 +60,6 @@ type Connection struct { hasSentIndex bool hasRecvdIndex bool - lastStatistics Statistics statisticsLock sync.Mutex } @@ -84,14 +83,13 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M } c := Connection{ - receiver: receiver, - reader: flrd, - mreader: &marshalReader{r: flrd}, - writer: flwr, - mwriter: &marshalWriter{w: flwr}, - awaiting: make(map[int]chan asyncResult), - ID: nodeID, - lastStatistics: Statistics{At: time.Now()}, + receiver: receiver, + reader: flrd, + mreader: &marshalReader{r: flrd}, + writer: flwr, + mwriter: &marshalWriter{w: flwr}, + awaiting: make(map[int]chan asyncResult), + ID: nodeID, } go c.readerLoop() @@ -373,27 +371,20 @@ func (c *Connection) pingerLoop() { } type Statistics struct { - At time.Time - InBytesTotal int - InBytesPerSec int - OutBytesTotal int - OutBytesPerSec int + At time.Time + InBytesTotal int + OutBytesTotal int } func (c *Connection) Statistics() Statistics { c.statisticsLock.Lock() defer c.statisticsLock.Unlock() - secs := time.Since(c.lastStatistics.At).Seconds() - rt := int(c.mreader.getTot()) - wt := int(c.mwriter.getTot()) stats := Statistics{ - At: time.Now(), - InBytesTotal: rt, - InBytesPerSec: int(float64(rt-c.lastStatistics.InBytesTotal) / secs), - OutBytesTotal: wt, - OutBytesPerSec: int(float64(wt-c.lastStatistics.OutBytesTotal) / secs), + At: time.Now(), + InBytesTotal: int(c.mreader.getTot()), + OutBytesTotal: int(c.mwriter.getTot()), } - c.lastStatistics = stats + return stats } diff --git a/util.go b/util.go index f8346f5d..9096b9f6 100644 --- a/util.go +++ b/util.go @@ -1,7 +1,36 @@ package main -import "time" +import ( + "fmt" + "time" +) func timing(name string, t0 time.Time) { debugf("%s: %.02f ms", name, time.Since(t0).Seconds()*1000) } + +func MetricPrefix(n int) string { + if n > 1e9 { + return fmt.Sprintf("%.02f G", float64(n)/1e9) + } + if n > 1e6 { + return fmt.Sprintf("%.02f M", float64(n)/1e6) + } + if n > 1e3 { + return fmt.Sprintf("%.01f k", float64(n)/1e3) + } + return fmt.Sprintf("%d ", n) +} + +func BinaryPrefix(n int) string { + if n > 1<<30 { + return fmt.Sprintf("%.02f Gi", float64(n)/(1<<30)) + } + if n > 1<<20 { + return fmt.Sprintf("%.02f Mi", float64(n)/(1<<20)) + } + if n > 1<<10 { + return fmt.Sprintf("%.01f Ki", float64(n)/(1<<10)) + } + return fmt.Sprintf("%d ", n) +}