From f6da436f4b1ebd20143909f025586152760fea90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Rozlach?= Date: Wed, 15 Aug 2018 16:52:20 +0200 Subject: [PATCH] cmd/stdiscosrv: Add replication heartbeats (fixes #5117) (#5120) --- cmd/stdiscosrv/replication.go | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/cmd/stdiscosrv/replication.go b/cmd/stdiscosrv/replication.go index 2969a78e..462033af 100644 --- a/cmd/stdiscosrv/replication.go +++ b/cmd/stdiscosrv/replication.go @@ -18,6 +18,9 @@ import ( "github.com/syncthing/syncthing/lib/protocol" ) +const replicationReadTimeout = time.Minute +const replicationHeartbeatInterval = time.Second * 30 + type replicator interface { send(key string, addrs []DatabaseAddress, seen int64) } @@ -79,10 +82,22 @@ func (s *replicationSender) Serve() { return } + heartBeatTicker := time.NewTicker(replicationHeartbeatInterval) + defer heartBeatTicker.Stop() + // Send records. buf := make([]byte, 1024) for { select { + case <-heartBeatTicker.C: + if len(s.outbox) > 0 { + // No need to send heartbeats if there are events/prevrious + // heartbeats to send, they will keep the connection alive. + continue + } + // Empty replication message is the heartbeat: + s.outbox <- ReplicationRecord{} + case rec := <-s.outbox: // Buffer must hold record plus four bytes for size size := rec.Size() @@ -106,6 +121,7 @@ func (s *replicationSender) Serve() { if _, err := conn.Write(buf[:4+n]); err != nil { replicationSendsTotal.WithLabelValues("error").Inc() log.Println("Replication write:", err) + // Yes, we are loosing the replication event here. return } replicationSendsTotal.WithLabelValues("success").Inc() @@ -242,7 +258,7 @@ func (l *replicationListener) handle(conn net.Conn) { default: } - conn.SetReadDeadline(time.Now().Add(time.Minute)) + conn.SetReadDeadline(time.Now().Add(replicationReadTimeout)) // First four bytes are the size if _, err := io.ReadFull(conn, buf[:4]); err != nil { @@ -256,6 +272,12 @@ func (l *replicationListener) handle(conn net.Conn) { if len(buf) < size { buf = make([]byte, size) } + + if size == 0 { + // Heartbeat, ignore + continue + } + if _, err := io.ReadFull(conn, buf[:size]); err != nil { log.Println("Replication read record:", err) replicationRecvsTotal.WithLabelValues("error").Inc()