diff --git a/client/README.md b/client/README.md index 83b9fe9..0eb1269 100644 --- a/client/README.md +++ b/client/README.md @@ -1,2 +1,3 @@ `# apt-get install libzmq3-dev` + `$ go get github.com/pebbe/zmq4` diff --git a/client/zmq_client.go b/client/zmq_client.go index 93a88fc..499e3e9 100644 --- a/client/zmq_client.go +++ b/client/zmq_client.go @@ -1,22 +1,98 @@ package main import ( + "database/sql" "encoding/binary" - "log" + "encoding/hex" + "flag" + "fmt" + "os" + + "github.com/golang/protobuf/proto" + _ "github.com/mattn/go-sqlite3" + zmq "github.com/pebbe/zmq4" + "github.com/sirupsen/logrus" "github.com/gtank/ctxd/parser" - zmq "github.com/pebbe/zmq4" - "github.com/pkg/errors" + "github.com/gtank/ctxd/storage" ) -const ( - PORT = 28332 -) +var log *logrus.Entry +var logger = logrus.New() +var db *sql.DB + +type Options struct { + zmqAddr string + dbPath string + logLevel uint64 + logPath string +} func main() { + opts := &Options{} + flag.StringVar(&opts.zmqAddr, "zmq-addr", "127.0.0.1:28332", "the address of the 0MQ publisher") + flag.StringVar(&opts.dbPath, "db-path", "", "the path to a sqlite database file") + flag.Uint64Var(&opts.logLevel, "log-level", uint64(logrus.InfoLevel), "log level (logrus 1-7)") + flag.StringVar(&opts.logPath, "log-file", "", "log file to write to") + // TODO prod metrics + // TODO support config from file and env vars + flag.Parse() + + if opts.dbPath == "" { + flag.Usage() + os.Exit(1) + } + + // Initialize logging + logger.SetFormatter(&logrus.TextFormatter{ + //DisableColors: true, + FullTimestamp: true, + DisableLevelTruncation: true, + }) + + if opts.logPath != "" { + // instead write parsable logs for logstash/splunk/etc + output, err := os.Open(opts.logPath) + if err != nil { + log.WithFields(logrus.Fields{ + "error": err, + "path": opts.logPath, + }).Fatal("couldn't open log file") + } + defer output.Close() + logger.SetOutput(output) + logger.SetFormatter(&logrus.JSONFormatter{}) + } + + logger.SetLevel(logrus.Level(opts.logLevel)) + + log = logger.WithFields(logrus.Fields{ + "app": "zmqclient", + }) + + // Initialize database + db, err := sql.Open("sqlite3", opts.dbPath) + if err != nil { + log.WithFields(logrus.Fields{ + "db_path": opts.dbPath, + "error": err, + }).Fatal("couldn't open SQL db") + } + + // Creates our tables if they don't already exist. + err = storage.CreateTables(db) + if err != nil { + log.WithFields(logrus.Fields{ + "error": err, + }).Fatal("couldn't create SQL tables") + } + + // Initialize ZMQ ctx, err := zmq.NewContext() if err != nil { - log.Fatal(err) + log.WithFields(logrus.Fields{ + "error": err, + }).Fatal("couldn't create zmq context") } defer ctx.Term() @@ -25,27 +101,43 @@ func main() { // like a mutex. sock, err := ctx.NewSocket(zmq.SUB) if err != nil { - log.Fatal(errors.Wrap(err, "creating socket")) + log.WithFields(logrus.Fields{ + "error": err, + }).Fatal("couldn't create zmq context socket") } + err = sock.SetSubscribe("rawblock") if err != nil { - log.Fatal(errors.Wrap(err, "subscribing")) + log.WithFields(logrus.Fields{ + "error": err, + "stream": "rawblock", + }).Fatal("couldn't subscribe to stream") } - err = sock.Connect("tcp://127.0.0.1:28332") + + connString := fmt.Sprintf("tcp://%s", opts.zmqAddr) + err = sock.Connect(connString) if err != nil { - log.Fatal(errors.Wrap(err, "connecting")) + log.WithFields(logrus.Fields{ + "error": err, + "connection": connString, + }).Fatal("couldn't connect to socket") } defer sock.Close() + // Start listening for new blocks for { msg, err := sock.RecvMessageBytes(0) if err != nil { - log.Println(errors.Wrap(err, "on message receipt")) + log.WithFields(logrus.Fields{ + "error": err, + }).Error("error on msg recv") continue } if len(msg) < 3 { - log.Printf("got unknown msg: %v", msg) + log.WithFields(logrus.Fields{ + "msg": msg, + }).Warn("got unknown message type") continue } @@ -57,26 +149,67 @@ func main() { } switch string(topic) { + case "rawblock": - log.Printf("got block (%d): %x\n", sequence, body[:80]) - go handleBlock(sequence, body) + log.WithFields(logrus.Fields{ + "seqnum": sequence, + "header": fmt.Sprintf("%x", body[:80]), + }).Debug("got block") + + // there's an implicit mutex here + go handleBlock(db, sequence, body) + default: - log.Printf("unexpected topic: %s (%d)", topic, sequence) + log.WithFields(logrus.Fields{ + "seqnum": sequence, + "topic": topic, + }).Warn("got message with unknown topic") } } } -func handleBlock(sequence int, blockData []byte) { +func handleBlock(db *sql.DB, sequence int, blockData []byte) { block := parser.NewBlock() rest, err := block.ParseFromSlice(blockData) if err != nil { - log.Println("Error parsing block (%d): %v", err) + log.WithFields(logrus.Fields{ + "seqnum": sequence, + "error": err, + }).Error("error parsing block") return } if len(rest) != 0 { - log.Println("Received overlong message:\n%x", rest) + log.WithFields(logrus.Fields{ + "seqnum": sequence, + "length": len(rest), + }).Warn("received overlong message") return } - log.Printf("Received a version %d block with %d transactions.", block.GetVersion(), block.GetTxCount()) + displayHash := hex.EncodeToString(block.GetEncodableHash()) + marshaledBlock, _ := proto.Marshal(block.ToCompact()) + + err = storage.StoreBlock( + db, + block.GetHeight(), + displayHash, + block.HasSaplingTransactions(), + marshaledBlock, + ) + + entry := log.WithFields(logrus.Fields{ + "seqnum": sequence, + "block_height": block.GetHeight(), + "block_hash": displayHash, + "block_version": block.GetVersion(), + "tx_count": block.GetTxCount(), + "has_sapling_tx": block.HasSaplingTransactions(), + "error": err, + }) + + if err != nil { + entry.Error("error storing block") + } else { + entry.Info("received new block") + } }