From f53e6d1a335bd16099e3875cd5fa838a8dd71cf4 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Mon, 14 Jul 2014 16:15:13 -0700 Subject: [PATCH] replace logger with go-logging --- common/logging.go | 44 ++++++++++++++++++++++++++++++++++++++++++++ log.go | 36 ++++++++++++++++++------------------ main.go | 22 +++++++++------------- p2p/addrbook.go | 14 +++++++------- p2p/connection.go | 36 +++++++++++++++++++++--------------- p2p/listener.go | 12 ++++++------ p2p/log.go | 23 ++++------------------- p2p/peer.go | 22 +++++++++++----------- p2p/peer_manager.go | 9 +++++---- p2p/switch.go | 20 ++++++++++---------- p2p/switch_test.go | 2 +- 11 files changed, 136 insertions(+), 104 deletions(-) create mode 100644 common/logging.go diff --git a/common/logging.go b/common/logging.go new file mode 100644 index 00000000..91b08a1c --- /dev/null +++ b/common/logging.go @@ -0,0 +1,44 @@ +package common + +import ( + stdlog "log" + "os" + + "github.com/op/go-logging" +) + +var Log = logging.MustGetLogger("main") + +func init() { + // Customize the output format + logging.SetFormatter(logging.MustStringFormatter("▶ %{level:.1s} 0x%{id:x} %{message}")) + + // Setup one stdout and one syslog backend. + logBackend := logging.NewLogBackend(os.Stderr, "", stdlog.LstdFlags|stdlog.Lshortfile) + logBackend.Color = true + + syslogBackend, err := logging.NewSyslogBackend("") + if err != nil { + panic(err) + } + + // Combine them both into one logging backend. + logging.SetBackend(logBackend, syslogBackend) + + // Test + /* + Log.Debug("debug") + Log.Info("info") + Log.Notice("notice") + Log.Warning("warning") + Log.Error("error") + */ +} + +var Debug = Log.Debug +var Info = Log.Info +var Notice = Log.Notice +var Warning = Log.Warning +var Warn = Log.Warning +var Error = Log.Error +var Critical = Log.Critical diff --git a/log.go b/log.go index fc2ca879..9aad621e 100644 --- a/log.go +++ b/log.go @@ -1,30 +1,30 @@ package main import ( - "github.com/cihub/seelog" + "os" + + "github.com/op/go-logging" "github.com/tendermint/tendermint/p2p" ) -var log seelog.LoggerInterface +var log = logging.MustGetLogger("main") func init() { - // TODO: replace with configuration file in the ~/.tendermint directory. - config := ` - - - - - - - - -` + // Customize the output format + logging.SetFormatter(logging.MustStringFormatter("[%{level:.4s}] %{time:2006-01-02T15:04:05} %{shortfile:-20s} %{message}")) - var err error - log, err = seelog.LoggerFromConfigAsBytes([]byte(config)) - if err != nil { - panic(err) - } + logBackend := logging.NewLogBackend(os.Stderr, "", 0) + logBackend.Color = true + logging.SetBackend(logBackend) + + // Test + /* + Log.Debug("debug") + Log.Info("info") + Log.Notice("notice") + Log.Warning("warning") + Log.Error("error") + */ p2p.SetLogger(log) } diff --git a/main.go b/main.go index b2646511..526a3c40 100644 --- a/main.go +++ b/main.go @@ -6,7 +6,6 @@ import ( "os" "os/signal" - . "github.com/tendermint/tendermint/binary" "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/p2p" ) @@ -54,7 +53,7 @@ func NewNode() *Node { } func (n *Node) Start() { - log.Infof("Starting node") + log.Info("Starting node") for _, l := range n.lz { go n.inboundConnectionHandler(l) } @@ -65,7 +64,7 @@ func (n *Node) Start() { // Add a Listener to accept inbound peer connections. func (n *Node) AddListener(l p2p.Listener) { - log.Infof("Added %v", l) + log.Info("Added %v", l) n.lz = append(n.lz, l) } @@ -78,7 +77,7 @@ func (n *Node) inboundConnectionHandler(l p2p.Listener) { // New inbound connection! peer, err := n.sw.AddPeerWithConnection(inConn, false) if err != nil { - log.Infof("Ignoring error from inbound connection: %v\n%v", + log.Info("Ignoring error from inbound connection: %v\n%v", peer, err) continue } @@ -96,11 +95,8 @@ func (n *Node) SendOurExternalAddrs(peer *p2p.Peer) { for _, l := range n.lz { addrs = append(addrs, l.ExternalAddress()) } - pexAddrsMsg := &p2p.PexAddrsMessage{Addrs: addrs} - peer.Send(p2p.NewPacket( - p2p.PexCh, - BinaryBytes(pexAddrsMsg), - )) + msg := &p2p.PexAddrsMessage{Addrs: addrs} + peer.Send(p2p.NewPacket(p2p.PexCh, msg)) // On the remote end, the pexHandler may choose // to add these to its book. } @@ -117,7 +113,7 @@ func (n *Node) newPeersHandler() { } func (n *Node) Stop() { - log.Infof("Stopping node") + log.Info("Stopping node") // TODO: gracefully disconnect from peers. n.sw.Stop() n.book.Stop() @@ -138,11 +134,11 @@ func main() { if config.Config.Seed != "" { peer, err := n.sw.DialPeerWithAddress(p2p.NewNetAddressString(config.Config.Seed)) if err != nil { - log.Errorf("Error dialing seed: %v", err) + log.Error("Error dialing seed: %v", err) //n.book.MarkAttempt(addr) return } else { - log.Infof("Connected to seed: %v", peer) + log.Info("Connected to seed: %v", peer) n.SendOurExternalAddrs(peer) } } @@ -158,7 +154,7 @@ func trapSignal(cb func()) { signal.Notify(c, os.Interrupt) go func() { for sig := range c { - log.Infof("captured %v, exiting..", sig) + log.Info("captured %v, exiting..", sig) cb() os.Exit(1) } diff --git a/p2p/addrbook.go b/p2p/addrbook.go index 52955f2d..6de81924 100644 --- a/p2p/addrbook.go +++ b/p2p/addrbook.go @@ -123,7 +123,7 @@ func (a *AddrBook) init() { func (a *AddrBook) Start() { if atomic.CompareAndSwapUint32(&a.started, 0, 1) { - log.Infof("Starting address manager") + log.Info("Starting address manager") a.loadFromFile(a.filePath) a.wg.Add(1) go a.saveHandler() @@ -132,7 +132,7 @@ func (a *AddrBook) Start() { func (a *AddrBook) Stop() { if atomic.CompareAndSwapUint32(&a.stopped, 0, 1) { - log.Infof("Stopping address manager") + log.Info("Stopping address manager") close(a.quit) a.wg.Wait() } @@ -399,7 +399,7 @@ func (a *AddrBook) addToNewBucket(ka *knownAddress, bucketIdx int) bool { // Enforce max addresses. if len(bucket) > newBucketSize { - log.Infof("new bucket is full, expiring old ") + log.Info("new bucket is full, expiring old ") a.expireNew(bucketIdx) } @@ -519,7 +519,7 @@ func (a *AddrBook) addAddress(addr, src *NetAddress) { bucket := a.calcNewBucket(addr, src) a.addToNewBucket(ka, bucket) - log.Infof("Added new address %s for a total of %d addresses", addr, a.size()) + log.Info("Added new address %s for a total of %d addresses", addr, a.size()) } // Make space in the new buckets by expiring the really bad entries. @@ -528,7 +528,7 @@ func (a *AddrBook) expireNew(bucketIdx int) { for key, ka := range a.addrNew[bucketIdx] { // If an entry is bad, throw it away if ka.isBad() { - log.Infof("expiring bad address %v", key) + log.Info("expiring bad address %v", key) a.removeFromBucket(ka, bucketTypeNew, bucketIdx) return } @@ -572,13 +572,13 @@ func (a *AddrBook) moveToOld(ka *knownAddress) { if !added { added := a.addToNewBucket(oldest, freedBucket) if !added { - log.Warnf("Could not migrate oldest %v to freedBucket %v", oldest, freedBucket) + log.Warning("Could not migrate oldest %v to freedBucket %v", oldest, freedBucket) } } // Finally, add to bucket again. added = a.addToOldBucket(ka, oldBucketIdx) if !added { - log.Warnf("Could not re-add ka %v to oldBucketIdx %v", ka, oldBucketIdx) + log.Warning("Could not re-add ka %v to oldBucketIdx %v", ka, oldBucketIdx) } } } diff --git a/p2p/connection.go b/p2p/connection.go index 845d8a49..ca3a1b17 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -66,7 +66,7 @@ func NewConnection(conn net.Conn) *Connection { // If an error occurs, the recovered reason is passed to "onError". func (c *Connection) Start(channels map[string]*Channel, onError func(interface{})) { if atomic.CompareAndSwapUint32(&c.started, 0, 1) { - log.Debugf("Starting %v", c) + log.Debug("Starting %v", c) c.channels = channels c.onError = onError go c.sendHandler() @@ -76,7 +76,7 @@ func (c *Connection) Start(channels map[string]*Channel, onError func(interface{ func (c *Connection) Stop() { if atomic.CompareAndSwapUint32(&c.stopped, 0, 1) { - log.Debugf("Stopping %v", c) + log.Debug("Stopping %v", c) close(c.quit) c.conn.Close() c.flushThrottler.Stop() @@ -120,7 +120,7 @@ func (c *Connection) flush() { err := c.bufWriter.Flush() if err != nil { if atomic.LoadUint32(&c.stopped) != 1 { - log.Warnf("Connection flush failed: %v", err) + log.Warning("Connection flush failed: %v", err) } } } @@ -139,7 +139,7 @@ func (c *Connection) _recover() { // sendHandler pulls from .sendQueue and writes to .bufWriter func (c *Connection) sendHandler() { - log.Tracef("%v sendHandler", c) + log.Debug("%v sendHandler", c) defer c._recover() FOR_LOOP: @@ -147,7 +147,7 @@ FOR_LOOP: var err error select { case sendPkt := <-c.sendQueue: - log.Tracef("Found pkt from sendQueue. Writing pkt to underlying connection") + log.Debug("Found pkt from sendQueue. Writing pkt to underlying connection") _, err = packetTypeMessage.WriteTo(c.bufWriter) if err != nil { break @@ -158,11 +158,11 @@ FOR_LOOP: c.flush() case <-c.pingRepeatTimer.Ch: _, err = packetTypePing.WriteTo(c.bufWriter) - log.Debugf("Send [Ping] -> %v", c) + log.Debug("Send [Ping] -> %v", c) c.flush() case <-c.pong: _, err = packetTypePong.WriteTo(c.bufWriter) - log.Debugf("Send [Pong] -> %v", c) + log.Debug("Send [Pong] -> %v", c) c.flush() case <-c.quit: break FOR_LOOP @@ -172,33 +172,39 @@ FOR_LOOP: break FOR_LOOP } if err != nil { - log.Infof("%v failed @ sendHandler:\n%v", c, err) + log.Info("%v failed @ sendHandler:\n%v", c, err) c.Stop() break FOR_LOOP } } - log.Tracef("%v sendHandler done", c) + log.Debug("%v sendHandler done", c) // cleanup } // recvHandler reads from .bufReader and pushes to the appropriate // channel's recvQueue. func (c *Connection) recvHandler() { - log.Tracef("%v recvHandler", c) + log.Debug("%v recvHandler", c) defer c._recover() FOR_LOOP: for { + if true { + // peeking into bufReader + numBytes := c.bufReader.Buffered() + bytes, err := c.bufReader.Peek(MinInt(numBytes, 100)) + log.Debug("recvHandler peeked: %X\nerr:%v", bytes, err) + } pktType, err := ReadUInt8Safe(c.bufReader) if err != nil { if atomic.LoadUint32(&c.stopped) != 1 { - log.Infof("%v failed @ recvHandler", c) + log.Info("%v failed @ recvHandler", c) c.Stop() } break FOR_LOOP } else { - log.Tracef("Found pktType %v", pktType) + log.Debug("Found pktType %v", pktType) } switch pktType { @@ -208,12 +214,12 @@ FOR_LOOP: c.pong <- struct{}{} case packetTypePong: // do nothing - log.Debugf("[%v] Received Pong", c) + log.Debug("[%v] Received Pong", c) case packetTypeMessage: pkt, err := ReadPacketSafe(c.bufReader) if err != nil { if atomic.LoadUint32(&c.stopped) != 1 { - log.Infof("%v failed @ recvHandler", c) + log.Info("%v failed @ recvHandler", c) c.Stop() } break FOR_LOOP @@ -230,7 +236,7 @@ FOR_LOOP: c.pingRepeatTimer.Reset() } - log.Tracef("%v recvHandler done", c) + log.Debug("%v recvHandler done", c) // cleanup close(c.pong) for _ = range c.pong { diff --git a/p2p/listener.go b/p2p/listener.go index 042ff208..6ebe28f8 100644 --- a/p2p/listener.go +++ b/p2p/listener.go @@ -58,7 +58,7 @@ func NewDefaultListener(protocol string, lAddr string) Listener { } // Actual listener local IP & port listenerIP, listenerPort := splitHostPort(listener.Addr().String()) - log.Debugf("Local listener: %v:%v", listenerIP, listenerPort) + log.Debug("Local listener: %v:%v", listenerIP, listenerPort) // Determine external address... var extAddr *NetAddress @@ -134,16 +134,16 @@ func (l *DefaultListener) String() string { // UPNP external address discovery & port mapping func getUPNPExternalAddress(externalPort, internalPort int) *NetAddress { - log.Debugf("Getting UPNP external address") + log.Debug("Getting UPNP external address") nat, err := upnp.Discover() if err != nil { - log.Debugf("Could not get UPNP extrernal address: %v", err) + log.Debug("Could not get UPNP extrernal address: %v", err) return nil } ext, err := nat.GetExternalAddress() if err != nil { - log.Debugf("Could not get UPNP external address: %v", err) + log.Debug("Could not get UPNP external address: %v", err) return nil } @@ -154,11 +154,11 @@ func getUPNPExternalAddress(externalPort, internalPort int) *NetAddress { externalPort, err = nat.AddPortMapping("tcp", externalPort, internalPort, "tendermint", 0) if err != nil { - log.Debugf("Could not get UPNP external address: %v", err) + log.Debug("Could not get UPNP external address: %v", err) return nil } - log.Debugf("Got UPNP external address: %v", ext) + log.Debug("Got UPNP external address: %v", ext) return NewNetAddressIPPort(ext, UInt16(externalPort)) } diff --git a/p2p/log.go b/p2p/log.go index d8698b89..5057e1bf 100644 --- a/p2p/log.go +++ b/p2p/log.go @@ -1,30 +1,15 @@ package p2p import ( - "github.com/cihub/seelog" + "github.com/op/go-logging" ) -var log seelog.LoggerInterface +var log = logging.MustGetLogger("p2p") func init() { - config := ` - - - - - - - - -` - - var err error - log, err = seelog.LoggerFromConfigAsBytes([]byte(config)) - if err != nil { - panic(err) - } + logging.SetFormatter(logging.MustStringFormatter("[%{level:.1s}] %{message}")) } -func SetLogger(l seelog.LoggerInterface) { +func SetLogger(l *logging.Logger) { log = l } diff --git a/p2p/peer.go b/p2p/peer.go index be76c317..bb1126df 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -31,7 +31,7 @@ func newPeer(conn *Connection, channels map[string]*Channel) *Peer { } func (p *Peer) start(pktRecvQueues map[string]chan *InboundPacket, onPeerError func(*Peer, interface{})) { - log.Debugf("Starting %v", p) + log.Debug("Starting %v", p) if atomic.CompareAndSwapUint32(&p.started, 0, 1) { // on connection error @@ -50,7 +50,7 @@ func (p *Peer) start(pktRecvQueues map[string]chan *InboundPacket, onPeerError f func (p *Peer) stop() { if atomic.CompareAndSwapUint32(&p.stopped, 0, 1) { - log.Debugf("Stopping %v", p) + log.Debug("Stopping %v", p) close(p.quit) p.conn.Stop() } @@ -75,7 +75,7 @@ func (p *Peer) Channel(chName string) *Channel { // TrySend returns true if the packet was successfully queued. // Returning true does not imply that the packet will be sent. func (p *Peer) TrySend(pkt Packet) bool { - log.Debugf("TrySend [%v] -> %v", pkt, p) + log.Debug("TrySend [%v] -> %v", pkt, p) channel := p.Channel(string(pkt.Channel)) sendQueue := channel.sendQueue @@ -92,7 +92,7 @@ func (p *Peer) TrySend(pkt Packet) bool { } func (p *Peer) Send(pkt Packet) bool { - log.Debugf("Send [%v] -> %v", pkt, p) + log.Debug("Send [%v] -> %v", pkt, p) channel := p.Channel(string(pkt.Channel)) sendQueue := channel.sendQueue @@ -116,7 +116,7 @@ func (p *Peer) String() string { // Each channel gets its own sendHandler goroutine; // Golang's channel implementation handles the scheduling. func (p *Peer) sendHandler(chName string) { - log.Tracef("%v sendHandler [%v]", p, chName) + log.Debug("%v sendHandler [%v]", p, chName) channel := p.channels[chName] sendQueue := channel.sendQueue FOR_LOOP: @@ -125,14 +125,14 @@ FOR_LOOP: case <-p.quit: break FOR_LOOP case pkt := <-sendQueue: - log.Tracef("Sending packet to peer sendQueue") + log.Debug("Sending packet to peer sendQueue") // blocks until the connection is Stop'd, // which happens when this peer is Stop'd. p.conn.Send(pkt) } } - log.Tracef("%v sendHandler [%v] closed", p, chName) + log.Debug("%v sendHandler [%v] closed", p, chName) // cleanup // (none) } @@ -142,7 +142,7 @@ FOR_LOOP: // Many peers have goroutines that push to the same pktRecvQueue. // Golang's channel implementation handles the scheduling. func (p *Peer) recvHandler(chName string, pktRecvQueue chan<- *InboundPacket) { - log.Tracef("%v recvHandler [%v]", p, chName) + log.Debug("%v recvHandler [%v]", p, chName) channel := p.channels[chName] recvQueue := channel.recvQueue @@ -167,7 +167,7 @@ FOR_LOOP: } } - log.Tracef("%v recvHandler [%v] closed", p, chName) + log.Debug("%v recvHandler [%v] closed", p, chName) // cleanup // (none) } @@ -224,7 +224,7 @@ type Packet struct { func NewPacket(chName String, msg Binary) Packet { msgBytes := BinaryBytes(msg) - log.Tracef("NewPacket msg bytes: %X", msgBytes) + log.Debug("NewPacket msg bytes: %X", msgBytes) return Packet{ Channel: chName, Bytes: msgBytes, @@ -255,7 +255,7 @@ func ReadPacketSafe(r io.Reader) (pkt Packet, err error) { if err != nil { return } - log.Tracef("ReadPacket* msg bytes: %X", bytes) + log.Debug("ReadPacket* msg bytes: %X", bytes) return Packet{Channel: chName, Bytes: bytes}, nil } diff --git a/p2p/peer_manager.go b/p2p/peer_manager.go index 9a60a252..da0bc57f 100644 --- a/p2p/peer_manager.go +++ b/p2p/peer_manager.go @@ -46,7 +46,7 @@ func NewPeerManager(sw *Switch, book *AddrBook) *PeerManager { func (pm *PeerManager) Start() { if atomic.CompareAndSwapUint32(&pm.started, 0, 1) { - log.Infof("Starting peerManager") + log.Info("Starting peerManager") go pm.ensurePeersHandler() go pm.pexHandler() } @@ -54,7 +54,7 @@ func (pm *PeerManager) Start() { func (pm *PeerManager) Stop() { if atomic.CompareAndSwapUint32(&pm.stopped, 0, 1) { - log.Infof("Stopping peerManager") + log.Info("Stopping peerManager") close(pm.newPeers) close(pm.quit) } @@ -135,7 +135,7 @@ func (pm *PeerManager) pexHandler() { // decode message msg := decodeMessage(inPkt.Bytes) - log.Infof("pexHandler received %v", msg) + log.Info("pexHandler received %v", msg) switch msg.(type) { case *PexRequestMessage: @@ -143,7 +143,7 @@ func (pm *PeerManager) pexHandler() { // TODO: prevent abuse. addrs := pm.book.GetSelection() response := &PexAddrsMessage{Addrs: addrs} - pkt := NewPacket(PexCh, BinaryBytes(response)) + pkt := NewPacket(PexCh, response) queued := inPkt.Peer.TrySend(pkt) if !queued { // ignore @@ -178,6 +178,7 @@ const ( // TODO: check for unnecessary extra bytes at the end. func decodeMessage(bz ByteSlice) (msg Message) { + log.Debug("decoding msg bytes: %X", bz) switch Byte(bz[0]) { case pexTypeRequest: return &PexRequestMessage{} diff --git a/p2p/switch.go b/p2p/switch.go index b5fc2fdd..68c29da0 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -59,13 +59,13 @@ func NewSwitch(channels []ChannelDescriptor) *Switch { func (s *Switch) Start() { if atomic.CompareAndSwapUint32(&s.started, 0, 1) { - log.Infof("Starting switch") + log.Info("Starting switch") } } func (s *Switch) Stop() { if atomic.CompareAndSwapUint32(&s.stopped, 0, 1) { - log.Infof("Stopping switch") + log.Info("Stopping switch") close(s.quit) // stop each peer. for _, peer := range s.peers.List() { @@ -81,7 +81,7 @@ func (s *Switch) AddPeerWithConnection(conn *Connection, outbound bool) (*Peer, return nil, ErrSwitchStopped } - log.Infof("Adding peer with connection: %v, outbound: %v", conn, outbound) + log.Info("Adding peer with connection: %v, outbound: %v", conn, outbound) // Create channels for peer channels := map[string]*Channel{} for _, chDesc := range s.channels { @@ -104,7 +104,7 @@ func (s *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) { return nil, ErrSwitchStopped } - log.Infof("Dialing peer @ %v", addr) + log.Info("Dialing peer @ %v", addr) s.dialing.Set(addr.String(), addr) conn, err := addr.DialTimeout(peerDialTimeoutSeconds * time.Second) s.dialing.Delete(addr.String()) @@ -123,10 +123,10 @@ func (s *Switch) Broadcast(pkt Packet) (numSuccess, numFailure int) { return } - log.Tracef("Broadcast on [%v] len: %v", pkt.Channel, len(pkt.Bytes)) + log.Debug("Broadcast on [%v] len: %v", pkt.Channel, len(pkt.Bytes)) for _, peer := range s.peers.List() { success := peer.TrySend(pkt) - log.Tracef("Broadcast for peer %v success: %v", peer, success) + log.Debug("Broadcast for peer %v success: %v", peer, success) if success { numSuccess += 1 } else { @@ -145,7 +145,7 @@ func (s *Switch) Receive(chName string) *InboundPacket { return nil } - log.Tracef("Receive on [%v]", chName) + log.Debug("Receive on [%v]", chName) q := s.pktRecvQueues[chName] if q == nil { Panicf("Expected pktRecvQueues[%f], found none", chName) @@ -176,7 +176,7 @@ func (s *Switch) Peers() IPeerSet { // Disconnect from a peer due to external error. // TODO: make record depending on reason. func (s *Switch) StopPeerForError(peer *Peer, reason interface{}) { - log.Infof("%v errored: %v", peer, reason) + log.Info("%v errored: %v", peer, reason) s.StopPeer(peer, false) } @@ -193,11 +193,11 @@ func (s *Switch) addPeer(peer *Peer) error { return ErrSwitchStopped } if s.peers.Add(peer) { - log.Tracef("Adding: %v", peer) + log.Debug("Adding: %v", peer) return nil } else { // ignore duplicate peer - log.Infof("Ignoring duplicate: %v", peer) + log.Info("Ignoring duplicate: %v", peer) return ErrSwitchDuplicatePeer } } diff --git a/p2p/switch_test.go b/p2p/switch_test.go index e1e5ac09..36a957fc 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -128,7 +128,7 @@ func BenchmarkSwitches(b *testing.B) { numFailure += nF } - log.Warnf("success: %v, failure: %v", numSuccess, numFailure) + log.Warning("success: %v, failure: %v", numSuccess, numFailure) // Allow everything to flush before stopping switches & closing connections. b.StopTimer()