From d91b8f4eeddfacdeabfeb8182e94c27bb30e921a Mon Sep 17 00:00:00 2001 From: KORuL Date: Wed, 22 Aug 2018 07:43:48 -0700 Subject: [PATCH] initial commit --- .gitignore | 27 ++ .vscode/launch.json | 14 + Cargo.toml | 75 ++++ LICENSE | 674 ++++++++++++++++++++++++++++++ README.md | 63 +++ compile | 9 + compile_debug | 9 + src/blockchain.rs | 200 +++++++++ src/hydrabadger/handler.rs | 741 +++++++++++++++++++++++++++++++++ src/hydrabadger/hydrabadger.rs | 416 ++++++++++++++++++ src/hydrabadger/mod.rs | 73 ++++ src/hydrabadger/state.rs | 509 ++++++++++++++++++++++ src/lib.rs | 518 +++++++++++++++++++++++ src/peer.rs | 531 +++++++++++++++++++++++ 14 files changed, 3859 insertions(+) create mode 100644 .gitignore create mode 100644 .vscode/launch.json create mode 100644 Cargo.toml create mode 100644 LICENSE create mode 100644 README.md create mode 100755 compile create mode 100755 compile_debug create mode 100644 src/blockchain.rs create mode 100644 src/hydrabadger/handler.rs create mode 100644 src/hydrabadger/hydrabadger.rs create mode 100644 src/hydrabadger/mod.rs create mode 100644 src/hydrabadger/state.rs create mode 100644 src/lib.rs create mode 100644 src/peer.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5bd6240 --- /dev/null +++ b/.gitignore @@ -0,0 +1,27 @@ +# Compiled files +*.o +*.so +*.rlib +*.dll + +# Executables +*.exe + +# Other +/*.png + +# Generated by Cargo +Cargo.lock +**/Cargo.lock +/target/ +**/target/ + +# My junk +/data +**/tmp + +/src/junk +/bak + +*.gz +massif* \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..a053d23 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,14 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "type": "node", + "request": "launch", + "name": "Launch Program", + "program": "${file}" + } + ] +} \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..0546399 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,75 @@ +[package] +name = "hydrabadger" +version = "0.1.0" +authors = ["c0gent "] +autobins = false + +# [[bin]] +# name = "simulation" +# path = "src/bin/simulation.rs" + +#[[bin]] +#name = "peer_node" +#path = "src/bin/peer_node.rs" + +# +[target.'cfg(target_os="android")'.dependencies] +jni = { version = "0.5", default-features = false } +# +[lib] +crate-type = ["dylib"] +# + + +[features] +# Used for debugging memory usage. +exit_upon_epoch_1000 = [] + + +[dependencies] +log = "*" +# env_logger = "*" +env_logger = "0.5" +clap = "*" +failure = "*" +crossbeam = "~0.4.1" +crossbeam-channel = "*" +chrono = "*" +rust-crypto = "*" +num-traits = "*" +num-bigint = "*" +colored = "*" +itertools = "*" +pairing = "*" +rand = "0.4.2" +serde = "1" +serde_bytes = "*" +serde_derive = "1" +signifix = "*" +futures = "0.1" +tokio = "0.1.7" +tokio-codec = "*" +tokio-io = "*" +bincode = "0.8" +tokio-serde = "*" +tokio-serde-bincode = "*" +bytes = "*" +uuid = { version = "0.6", features = ["v4", "serde"] } +byteorder = "*" +parking_lot = "*" +clear_on_drop = "*" + +[dependencies.hbbft] +version = "*" +# git = "https://github.com/c0gent/hbbft" +git = "https://github.com/poanetwork/hbbft" +# branch = "c0gent-supertraits" +# branch = "master" +branch = "add-mlock-error-handling" +# branch = "afck-agreement" +# path = "../hbbft" +# features = ["serialization-protobuf"] + +[profile.release] +debug = true +debug-assertions = true diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..94a9ed0 --- /dev/null +++ b/LICENSE @@ -0,0 +1,674 @@ + GNU GENERAL PUBLIC LICENSE + Version 3, 29 June 2007 + + Copyright (C) 2007 Free Software Foundation, Inc. + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + + Preamble + + The GNU General Public License is a free, copyleft license for +software and other kinds of works. + + The licenses for most software and other practical works are designed +to take away your freedom to share and change the works. By contrast, +the GNU General Public License is intended to guarantee your freedom to +share and change all versions of a program--to make sure it remains free +software for all its users. We, the Free Software Foundation, use the +GNU General Public License for most of our software; it applies also to +any other work released this way by its authors. You can apply it to +your programs, too. + + When we speak of free software, we are referring to freedom, not +price. Our General Public Licenses are designed to make sure that you +have the freedom to distribute copies of free software (and charge for +them if you wish), that you receive source code or can get it if you +want it, that you can change the software or use pieces of it in new +free programs, and that you know you can do these things. + + To protect your rights, we need to prevent others from denying you +these rights or asking you to surrender the rights. Therefore, you have +certain responsibilities if you distribute copies of the software, or if +you modify it: responsibilities to respect the freedom of others. + + For example, if you distribute copies of such a program, whether +gratis or for a fee, you must pass on to the recipients the same +freedoms that you received. You must make sure that they, too, receive +or can get the source code. And you must show them these terms so they +know their rights. + + Developers that use the GNU GPL protect your rights with two steps: +(1) assert copyright on the software, and (2) offer you this License +giving you legal permission to copy, distribute and/or modify it. + + For the developers' and authors' protection, the GPL clearly explains +that there is no warranty for this free software. For both users' and +authors' sake, the GPL requires that modified versions be marked as +changed, so that their problems will not be attributed erroneously to +authors of previous versions. + + Some devices are designed to deny users access to install or run +modified versions of the software inside them, although the manufacturer +can do so. This is fundamentally incompatible with the aim of +protecting users' freedom to change the software. The systematic +pattern of such abuse occurs in the area of products for individuals to +use, which is precisely where it is most unacceptable. Therefore, we +have designed this version of the GPL to prohibit the practice for those +products. If such problems arise substantially in other domains, we +stand ready to extend this provision to those domains in future versions +of the GPL, as needed to protect the freedom of users. + + Finally, every program is threatened constantly by software patents. +States should not allow patents to restrict development and use of +software on general-purpose computers, but in those that do, we wish to +avoid the special danger that patents applied to a free program could +make it effectively proprietary. To prevent this, the GPL assures that +patents cannot be used to render the program non-free. + + The precise terms and conditions for copying, distribution and +modification follow. + + TERMS AND CONDITIONS + + 0. Definitions. + + "This License" refers to version 3 of the GNU General Public License. + + "Copyright" also means copyright-like laws that apply to other kinds of +works, such as semiconductor masks. + + "The Program" refers to any copyrightable work licensed under this +License. Each licensee is addressed as "you". "Licensees" and +"recipients" may be individuals or organizations. + + To "modify" a work means to copy from or adapt all or part of the work +in a fashion requiring copyright permission, other than the making of an +exact copy. The resulting work is called a "modified version" of the +earlier work or a work "based on" the earlier work. + + A "covered work" means either the unmodified Program or a work based +on the Program. + + To "propagate" a work means to do anything with it that, without +permission, would make you directly or secondarily liable for +infringement under applicable copyright law, except executing it on a +computer or modifying a private copy. Propagation includes copying, +distribution (with or without modification), making available to the +public, and in some countries other activities as well. + + To "convey" a work means any kind of propagation that enables other +parties to make or receive copies. Mere interaction with a user through +a computer network, with no transfer of a copy, is not conveying. + + An interactive user interface displays "Appropriate Legal Notices" +to the extent that it includes a convenient and prominently visible +feature that (1) displays an appropriate copyright notice, and (2) +tells the user that there is no warranty for the work (except to the +extent that warranties are provided), that licensees may convey the +work under this License, and how to view a copy of this License. If +the interface presents a list of user commands or options, such as a +menu, a prominent item in the list meets this criterion. + + 1. Source Code. + + The "source code" for a work means the preferred form of the work +for making modifications to it. "Object code" means any non-source +form of a work. + + A "Standard Interface" means an interface that either is an official +standard defined by a recognized standards body, or, in the case of +interfaces specified for a particular programming language, one that +is widely used among developers working in that language. + + The "System Libraries" of an executable work include anything, other +than the work as a whole, that (a) is included in the normal form of +packaging a Major Component, but which is not part of that Major +Component, and (b) serves only to enable use of the work with that +Major Component, or to implement a Standard Interface for which an +implementation is available to the public in source code form. A +"Major Component", in this context, means a major essential component +(kernel, window system, and so on) of the specific operating system +(if any) on which the executable work runs, or a compiler used to +produce the work, or an object code interpreter used to run it. + + The "Corresponding Source" for a work in object code form means all +the source code needed to generate, install, and (for an executable +work) run the object code and to modify the work, including scripts to +control those activities. However, it does not include the work's +System Libraries, or general-purpose tools or generally available free +programs which are used unmodified in performing those activities but +which are not part of the work. For example, Corresponding Source +includes interface definition files associated with source files for +the work, and the source code for shared libraries and dynamically +linked subprograms that the work is specifically designed to require, +such as by intimate data communication or control flow between those +subprograms and other parts of the work. + + The Corresponding Source need not include anything that users +can regenerate automatically from other parts of the Corresponding +Source. + + The Corresponding Source for a work in source code form is that +same work. + + 2. Basic Permissions. + + All rights granted under this License are granted for the term of +copyright on the Program, and are irrevocable provided the stated +conditions are met. This License explicitly affirms your unlimited +permission to run the unmodified Program. The output from running a +covered work is covered by this License only if the output, given its +content, constitutes a covered work. This License acknowledges your +rights of fair use or other equivalent, as provided by copyright law. + + You may make, run and propagate covered works that you do not +convey, without conditions so long as your license otherwise remains +in force. You may convey covered works to others for the sole purpose +of having them make modifications exclusively for you, or provide you +with facilities for running those works, provided that you comply with +the terms of this License in conveying all material for which you do +not control copyright. Those thus making or running the covered works +for you must do so exclusively on your behalf, under your direction +and control, on terms that prohibit them from making any copies of +your copyrighted material outside their relationship with you. + + Conveying under any other circumstances is permitted solely under +the conditions stated below. Sublicensing is not allowed; section 10 +makes it unnecessary. + + 3. Protecting Users' Legal Rights From Anti-Circumvention Law. + + No covered work shall be deemed part of an effective technological +measure under any applicable law fulfilling obligations under article +11 of the WIPO copyright treaty adopted on 20 December 1996, or +similar laws prohibiting or restricting circumvention of such +measures. + + When you convey a covered work, you waive any legal power to forbid +circumvention of technological measures to the extent such circumvention +is effected by exercising rights under this License with respect to +the covered work, and you disclaim any intention to limit operation or +modification of the work as a means of enforcing, against the work's +users, your or third parties' legal rights to forbid circumvention of +technological measures. + + 4. Conveying Verbatim Copies. + + You may convey verbatim copies of the Program's source code as you +receive it, in any medium, provided that you conspicuously and +appropriately publish on each copy an appropriate copyright notice; +keep intact all notices stating that this License and any +non-permissive terms added in accord with section 7 apply to the code; +keep intact all notices of the absence of any warranty; and give all +recipients a copy of this License along with the Program. + + You may charge any price or no price for each copy that you convey, +and you may offer support or warranty protection for a fee. + + 5. Conveying Modified Source Versions. + + You may convey a work based on the Program, or the modifications to +produce it from the Program, in the form of source code under the +terms of section 4, provided that you also meet all of these conditions: + + a) The work must carry prominent notices stating that you modified + it, and giving a relevant date. + + b) The work must carry prominent notices stating that it is + released under this License and any conditions added under section + 7. This requirement modifies the requirement in section 4 to + "keep intact all notices". + + c) You must license the entire work, as a whole, under this + License to anyone who comes into possession of a copy. This + License will therefore apply, along with any applicable section 7 + additional terms, to the whole of the work, and all its parts, + regardless of how they are packaged. This License gives no + permission to license the work in any other way, but it does not + invalidate such permission if you have separately received it. + + d) If the work has interactive user interfaces, each must display + Appropriate Legal Notices; however, if the Program has interactive + interfaces that do not display Appropriate Legal Notices, your + work need not make them do so. + + A compilation of a covered work with other separate and independent +works, which are not by their nature extensions of the covered work, +and which are not combined with it such as to form a larger program, +in or on a volume of a storage or distribution medium, is called an +"aggregate" if the compilation and its resulting copyright are not +used to limit the access or legal rights of the compilation's users +beyond what the individual works permit. Inclusion of a covered work +in an aggregate does not cause this License to apply to the other +parts of the aggregate. + + 6. Conveying Non-Source Forms. + + You may convey a covered work in object code form under the terms +of sections 4 and 5, provided that you also convey the +machine-readable Corresponding Source under the terms of this License, +in one of these ways: + + a) Convey the object code in, or embodied in, a physical product + (including a physical distribution medium), accompanied by the + Corresponding Source fixed on a durable physical medium + customarily used for software interchange. + + b) Convey the object code in, or embodied in, a physical product + (including a physical distribution medium), accompanied by a + written offer, valid for at least three years and valid for as + long as you offer spare parts or customer support for that product + model, to give anyone who possesses the object code either (1) a + copy of the Corresponding Source for all the software in the + product that is covered by this License, on a durable physical + medium customarily used for software interchange, for a price no + more than your reasonable cost of physically performing this + conveying of source, or (2) access to copy the + Corresponding Source from a network server at no charge. + + c) Convey individual copies of the object code with a copy of the + written offer to provide the Corresponding Source. This + alternative is allowed only occasionally and noncommercially, and + only if you received the object code with such an offer, in accord + with subsection 6b. + + d) Convey the object code by offering access from a designated + place (gratis or for a charge), and offer equivalent access to the + Corresponding Source in the same way through the same place at no + further charge. You need not require recipients to copy the + Corresponding Source along with the object code. If the place to + copy the object code is a network server, the Corresponding Source + may be on a different server (operated by you or a third party) + that supports equivalent copying facilities, provided you maintain + clear directions next to the object code saying where to find the + Corresponding Source. Regardless of what server hosts the + Corresponding Source, you remain obligated to ensure that it is + available for as long as needed to satisfy these requirements. + + e) Convey the object code using peer-to-peer transmission, provided + you inform other peers where the object code and Corresponding + Source of the work are being offered to the general public at no + charge under subsection 6d. + + A separable portion of the object code, whose source code is excluded +from the Corresponding Source as a System Library, need not be +included in conveying the object code work. + + A "User Product" is either (1) a "consumer product", which means any +tangible personal property which is normally used for personal, family, +or household purposes, or (2) anything designed or sold for incorporation +into a dwelling. In determining whether a product is a consumer product, +doubtful cases shall be resolved in favor of coverage. For a particular +product received by a particular user, "normally used" refers to a +typical or common use of that class of product, regardless of the status +of the particular user or of the way in which the particular user +actually uses, or expects or is expected to use, the product. A product +is a consumer product regardless of whether the product has substantial +commercial, industrial or non-consumer uses, unless such uses represent +the only significant mode of use of the product. + + "Installation Information" for a User Product means any methods, +procedures, authorization keys, or other information required to install +and execute modified versions of a covered work in that User Product from +a modified version of its Corresponding Source. The information must +suffice to ensure that the continued functioning of the modified object +code is in no case prevented or interfered with solely because +modification has been made. + + If you convey an object code work under this section in, or with, or +specifically for use in, a User Product, and the conveying occurs as +part of a transaction in which the right of possession and use of the +User Product is transferred to the recipient in perpetuity or for a +fixed term (regardless of how the transaction is characterized), the +Corresponding Source conveyed under this section must be accompanied +by the Installation Information. But this requirement does not apply +if neither you nor any third party retains the ability to install +modified object code on the User Product (for example, the work has +been installed in ROM). + + The requirement to provide Installation Information does not include a +requirement to continue to provide support service, warranty, or updates +for a work that has been modified or installed by the recipient, or for +the User Product in which it has been modified or installed. Access to a +network may be denied when the modification itself materially and +adversely affects the operation of the network or violates the rules and +protocols for communication across the network. + + Corresponding Source conveyed, and Installation Information provided, +in accord with this section must be in a format that is publicly +documented (and with an implementation available to the public in +source code form), and must require no special password or key for +unpacking, reading or copying. + + 7. Additional Terms. + + "Additional permissions" are terms that supplement the terms of this +License by making exceptions from one or more of its conditions. +Additional permissions that are applicable to the entire Program shall +be treated as though they were included in this License, to the extent +that they are valid under applicable law. If additional permissions +apply only to part of the Program, that part may be used separately +under those permissions, but the entire Program remains governed by +this License without regard to the additional permissions. + + When you convey a copy of a covered work, you may at your option +remove any additional permissions from that copy, or from any part of +it. (Additional permissions may be written to require their own +removal in certain cases when you modify the work.) You may place +additional permissions on material, added by you to a covered work, +for which you have or can give appropriate copyright permission. + + Notwithstanding any other provision of this License, for material you +add to a covered work, you may (if authorized by the copyright holders of +that material) supplement the terms of this License with terms: + + a) Disclaiming warranty or limiting liability differently from the + terms of sections 15 and 16 of this License; or + + b) Requiring preservation of specified reasonable legal notices or + author attributions in that material or in the Appropriate Legal + Notices displayed by works containing it; or + + c) Prohibiting misrepresentation of the origin of that material, or + requiring that modified versions of such material be marked in + reasonable ways as different from the original version; or + + d) Limiting the use for publicity purposes of names of licensors or + authors of the material; or + + e) Declining to grant rights under trademark law for use of some + trade names, trademarks, or service marks; or + + f) Requiring indemnification of licensors and authors of that + material by anyone who conveys the material (or modified versions of + it) with contractual assumptions of liability to the recipient, for + any liability that these contractual assumptions directly impose on + those licensors and authors. + + All other non-permissive additional terms are considered "further +restrictions" within the meaning of section 10. If the Program as you +received it, or any part of it, contains a notice stating that it is +governed by this License along with a term that is a further +restriction, you may remove that term. If a license document contains +a further restriction but permits relicensing or conveying under this +License, you may add to a covered work material governed by the terms +of that license document, provided that the further restriction does +not survive such relicensing or conveying. + + If you add terms to a covered work in accord with this section, you +must place, in the relevant source files, a statement of the +additional terms that apply to those files, or a notice indicating +where to find the applicable terms. + + Additional terms, permissive or non-permissive, may be stated in the +form of a separately written license, or stated as exceptions; +the above requirements apply either way. + + 8. Termination. + + You may not propagate or modify a covered work except as expressly +provided under this License. Any attempt otherwise to propagate or +modify it is void, and will automatically terminate your rights under +this License (including any patent licenses granted under the third +paragraph of section 11). + + However, if you cease all violation of this License, then your +license from a particular copyright holder is reinstated (a) +provisionally, unless and until the copyright holder explicitly and +finally terminates your license, and (b) permanently, if the copyright +holder fails to notify you of the violation by some reasonable means +prior to 60 days after the cessation. + + Moreover, your license from a particular copyright holder is +reinstated permanently if the copyright holder notifies you of the +violation by some reasonable means, this is the first time you have +received notice of violation of this License (for any work) from that +copyright holder, and you cure the violation prior to 30 days after +your receipt of the notice. + + Termination of your rights under this section does not terminate the +licenses of parties who have received copies or rights from you under +this License. If your rights have been terminated and not permanently +reinstated, you do not qualify to receive new licenses for the same +material under section 10. + + 9. Acceptance Not Required for Having Copies. + + You are not required to accept this License in order to receive or +run a copy of the Program. Ancillary propagation of a covered work +occurring solely as a consequence of using peer-to-peer transmission +to receive a copy likewise does not require acceptance. However, +nothing other than this License grants you permission to propagate or +modify any covered work. These actions infringe copyright if you do +not accept this License. Therefore, by modifying or propagating a +covered work, you indicate your acceptance of this License to do so. + + 10. Automatic Licensing of Downstream Recipients. + + Each time you convey a covered work, the recipient automatically +receives a license from the original licensors, to run, modify and +propagate that work, subject to this License. You are not responsible +for enforcing compliance by third parties with this License. + + An "entity transaction" is a transaction transferring control of an +organization, or substantially all assets of one, or subdividing an +organization, or merging organizations. If propagation of a covered +work results from an entity transaction, each party to that +transaction who receives a copy of the work also receives whatever +licenses to the work the party's predecessor in interest had or could +give under the previous paragraph, plus a right to possession of the +Corresponding Source of the work from the predecessor in interest, if +the predecessor has it or can get it with reasonable efforts. + + You may not impose any further restrictions on the exercise of the +rights granted or affirmed under this License. For example, you may +not impose a license fee, royalty, or other charge for exercise of +rights granted under this License, and you may not initiate litigation +(including a cross-claim or counterclaim in a lawsuit) alleging that +any patent claim is infringed by making, using, selling, offering for +sale, or importing the Program or any portion of it. + + 11. Patents. + + A "contributor" is a copyright holder who authorizes use under this +License of the Program or a work on which the Program is based. The +work thus licensed is called the contributor's "contributor version". + + A contributor's "essential patent claims" are all patent claims +owned or controlled by the contributor, whether already acquired or +hereafter acquired, that would be infringed by some manner, permitted +by this License, of making, using, or selling its contributor version, +but do not include claims that would be infringed only as a +consequence of further modification of the contributor version. For +purposes of this definition, "control" includes the right to grant +patent sublicenses in a manner consistent with the requirements of +this License. + + Each contributor grants you a non-exclusive, worldwide, royalty-free +patent license under the contributor's essential patent claims, to +make, use, sell, offer for sale, import and otherwise run, modify and +propagate the contents of its contributor version. + + In the following three paragraphs, a "patent license" is any express +agreement or commitment, however denominated, not to enforce a patent +(such as an express permission to practice a patent or covenant not to +sue for patent infringement). To "grant" such a patent license to a +party means to make such an agreement or commitment not to enforce a +patent against the party. + + If you convey a covered work, knowingly relying on a patent license, +and the Corresponding Source of the work is not available for anyone +to copy, free of charge and under the terms of this License, through a +publicly available network server or other readily accessible means, +then you must either (1) cause the Corresponding Source to be so +available, or (2) arrange to deprive yourself of the benefit of the +patent license for this particular work, or (3) arrange, in a manner +consistent with the requirements of this License, to extend the patent +license to downstream recipients. "Knowingly relying" means you have +actual knowledge that, but for the patent license, your conveying the +covered work in a country, or your recipient's use of the covered work +in a country, would infringe one or more identifiable patents in that +country that you have reason to believe are valid. + + If, pursuant to or in connection with a single transaction or +arrangement, you convey, or propagate by procuring conveyance of, a +covered work, and grant a patent license to some of the parties +receiving the covered work authorizing them to use, propagate, modify +or convey a specific copy of the covered work, then the patent license +you grant is automatically extended to all recipients of the covered +work and works based on it. + + A patent license is "discriminatory" if it does not include within +the scope of its coverage, prohibits the exercise of, or is +conditioned on the non-exercise of one or more of the rights that are +specifically granted under this License. You may not convey a covered +work if you are a party to an arrangement with a third party that is +in the business of distributing software, under which you make payment +to the third party based on the extent of your activity of conveying +the work, and under which the third party grants, to any of the +parties who would receive the covered work from you, a discriminatory +patent license (a) in connection with copies of the covered work +conveyed by you (or copies made from those copies), or (b) primarily +for and in connection with specific products or compilations that +contain the covered work, unless you entered into that arrangement, +or that patent license was granted, prior to 28 March 2007. + + Nothing in this License shall be construed as excluding or limiting +any implied license or other defenses to infringement that may +otherwise be available to you under applicable patent law. + + 12. No Surrender of Others' Freedom. + + If conditions are imposed on you (whether by court order, agreement or +otherwise) that contradict the conditions of this License, they do not +excuse you from the conditions of this License. If you cannot convey a +covered work so as to satisfy simultaneously your obligations under this +License and any other pertinent obligations, then as a consequence you may +not convey it at all. For example, if you agree to terms that obligate you +to collect a royalty for further conveying from those to whom you convey +the Program, the only way you could satisfy both those terms and this +License would be to refrain entirely from conveying the Program. + + 13. Use with the GNU Affero General Public License. + + Notwithstanding any other provision of this License, you have +permission to link or combine any covered work with a work licensed +under version 3 of the GNU Affero General Public License into a single +combined work, and to convey the resulting work. The terms of this +License will continue to apply to the part which is the covered work, +but the special requirements of the GNU Affero General Public License, +section 13, concerning interaction through a network will apply to the +combination as such. + + 14. Revised Versions of this License. + + The Free Software Foundation may publish revised and/or new versions of +the GNU General Public License from time to time. Such new versions will +be similar in spirit to the present version, but may differ in detail to +address new problems or concerns. + + Each version is given a distinguishing version number. If the +Program specifies that a certain numbered version of the GNU General +Public License "or any later version" applies to it, you have the +option of following the terms and conditions either of that numbered +version or of any later version published by the Free Software +Foundation. If the Program does not specify a version number of the +GNU General Public License, you may choose any version ever published +by the Free Software Foundation. + + If the Program specifies that a proxy can decide which future +versions of the GNU General Public License can be used, that proxy's +public statement of acceptance of a version permanently authorizes you +to choose that version for the Program. + + Later license versions may give you additional or different +permissions. However, no additional obligations are imposed on any +author or copyright holder as a result of your choosing to follow a +later version. + + 15. Disclaimer of Warranty. + + THERE IS NO WARRANTY FOR THE PROGRAM, TO THE EXTENT PERMITTED BY +APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT +HOLDERS AND/OR OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY +OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, +THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE PROGRAM +IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF +ALL NECESSARY SERVICING, REPAIR OR CORRECTION. + + 16. Limitation of Liability. + + IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING +WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MODIFIES AND/OR CONVEYS +THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY +GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE +USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED TO LOSS OF +DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD +PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), +EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF +SUCH DAMAGES. + + 17. Interpretation of Sections 15 and 16. + + If the disclaimer of warranty and limitation of liability provided +above cannot be given local legal effect according to their terms, +reviewing courts shall apply local law that most closely approximates +an absolute waiver of all civil liability in connection with the +Program, unless a warranty or assumption of liability accompanies a +copy of the Program in return for a fee. + + END OF TERMS AND CONDITIONS + + How to Apply These Terms to Your New Programs + + If you develop a new program, and you want it to be of the greatest +possible use to the public, the best way to achieve this is to make it +free software which everyone can redistribute and change under these terms. + + To do so, attach the following notices to the program. It is safest +to attach them to the start of each source file to most effectively +state the exclusion of warranty; and each file should have at least +the "copyright" line and a pointer to where the full notice is found. + + + Copyright (C) + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . + +Also add information on how to contact you by electronic and paper mail. + + If the program does terminal interaction, make it output a short +notice like this when it starts in an interactive mode: + + Copyright (C) + This program comes with ABSOLUTELY NO WARRANTY; for details type `show w'. + This is free software, and you are welcome to redistribute it + under certain conditions; type `show c' for details. + +The hypothetical commands `show w' and `show c' should show the appropriate +parts of the General Public License. Of course, your program's commands +might be different; for a GUI interface, you would use an "about box". + + You should also get your employer (if you work as a programmer) or school, +if any, to sign a "copyright disclaimer" for the program, if necessary. +For more information on this, and how to apply and follow the GNU GPL, see +. + + The GNU General Public License does not permit incorporating your program +into proprietary programs. If your program is a subroutine library, you +may consider it more useful to permit linking proprietary applications with +the library. If this is what you want to do, use the GNU Lesser General +Public License instead of this License. But first, please read +. diff --git a/README.md b/README.md new file mode 100644 index 0000000..9bcfb24 --- /dev/null +++ b/README.md @@ -0,0 +1,63 @@ +# Hydrabadger + +An experimental peer-to-peer client using the [Honey Badger Byzantine Fault +Tolerant consensus algorithm](https://github.com/poanetwork/hbbft). + +## Usage + +### Compile + +1. `git clone -b android git@github.com:poanetwork/hydrabadger.git` +2. `cd hydrabadger` +3. set needs environments +`export ANDROID_HOME=/Users/$USER/Library/Android/sdk` +`export NDK_HOME=$ANDROID_HOME/ndk-bundle` +and etc +4. make standalone NDK +`${NDK_HOME}/build/tools/make_standalone_toolchain.py --api 26 --arch arm64 --install-dir NDK/arm64` +`${NDK_HOME}/build/tools/make_standalone_toolchain.py --api 26 --arch arm --install-dir NDK/arm` +`${NDK_HOME}/build/tools/make_standalone_toolchain.py --api 26 --arch x86 --install-dir NDK/x86` +5. set environment to NDK compilers and linkers +`export PATH=$PATH:/NDK/arm64/bin/` +`export PATH=$PATH:/NDK/arm/bin/` +`export PATH=$PATH:/NDK/x86/bin/` +6. make cargo-config.toml +`[target.aarch64-linux-android]` +`ar = "/NDK/arm64/bin/aarch64-linux-android-ar"` +`linker = "/NDK/arm64/bin/aarch64-linux-android-clang"` + +`[target.armv7-linux-androideabi]` +`ar = "/NDK/arm/bin/arm-linux-androideabi-ar"` +`linker = "/NDK/arm/bin/arm-linux-androideabi-clang"` + +`[target.i686-linux-android]` +`ar = "/NDK/x86/bin/i686-linux-android-ar"` +`linker = "/NDK/x86/bin/i686-linux-android-clang"'` +7. need copy this config file to our .cargo directory like this: +`cp cargo-config.toml ~/.cargo/config` +8. `./compile` + +### Current State + +Network initialization node addition, transaction generation, consensus, +and batch outputs are all generally working. Batch outputs for each epoch are +printed to the log. + +Overall the client is fragile and doesn't handle deviation from simple usage +very well yet. + +### Unimplemented + +* **Many edge cases and exceptions:** disconnects, reconnects, etc. + * Connecting to a network which is in the process of key generation causes + the entire network to fail. For now, wait until the network starts + outputting batches before connecting additional peer nodes. +* **Error handling** is atrocious, most errors are simply printed to the log. +* **Usage as a library** is still a work in progress as the API settles. +* **Much, much more...** + +### License + +[![License: LGPL v3.0](https://img.shields.io/badge/License-LGPL%20v3-blue.svg)](https://www.gnu.org/licenses/lgpl-3.0) + +This project is licensed under the GNU Lesser General Public License v3.0. See the [LICENSE](LICENSE) file for details. diff --git a/compile b/compile new file mode 100755 index 0000000..c64e2f0 --- /dev/null +++ b/compile @@ -0,0 +1,9 @@ +#/bin/bash + +# Starts compile library Hydrabadger for android arm64 arm x86 +# ========================= + + +cargo build --target aarch64-linux-android --release +cargo build --target armv7-linux-androideabi --release +cargo build --target i686-linux-android --release diff --git a/compile_debug b/compile_debug new file mode 100755 index 0000000..55ac64c --- /dev/null +++ b/compile_debug @@ -0,0 +1,9 @@ +#/bin/bash + +# Starts compile library Hydrabadger for android arm64 arm x86 +# ========================= + + +cargo build --target aarch64-linux-android +cargo build --target armv7-linux-androideabi +cargo build --target i686-linux-android diff --git a/src/blockchain.rs b/src/blockchain.rs new file mode 100644 index 0000000..5999a09 --- /dev/null +++ b/src/blockchain.rs @@ -0,0 +1,200 @@ +//! An incredibly simple blockchain implementation. +//! + +#![allow(unused_imports, dead_code, unused_variables)] + +use chrono::prelude::*; +use crypto::digest::Digest; +use crypto::sha2::Sha256; +use num_bigint::BigUint; +use num_traits::One; + + +const HASH_BYTE_SIZE: usize = 32; +const DIFFICULTY: usize = 4; +const MAX_NONCE: u64 = 1_000_000; + +pub type Sha256Hash = [u8; HASH_BYTE_SIZE]; + + +/// Transforms a u64 into a little endian array of u8. +pub fn convert_u64_to_u8_array(val: u64) -> [u8; 8] { + return [ + val as u8, + (val >> 8) as u8, + (val >> 16) as u8, + (val >> 24) as u8, + (val >> 32) as u8, + (val >> 40) as u8, + (val >> 48) as u8, + (val >> 56) as u8, + ] +} + + +/// A mining error +#[derive(Debug, Fail)] +pub enum MiningError { + #[fail(display = "Could not mine block, hit iteration limit")] + Iteration, + #[fail(display = "Block has no parent")] + NoParent, +} + + +/// Calculates the hash for the provided block and nonce. +pub fn calculate_hash(block: &Block, nonce: u64) -> Sha256Hash { + let mut headers = block.headers(); + headers.extend_from_slice(&convert_u64_to_u8_array(nonce)); + + let mut hasher = Sha256::new(); + hasher.input(&headers); + let mut hash = Sha256Hash::default(); + + hasher.result(&mut hash); + + hash +} + +/// Attemts to find a satisfactory nonce. +fn try_hash(block: &Block) -> Option<(u64, Sha256Hash)> { + // The target is a number we compare the hash to. It is a 256bit + // binary with `DIFFICULTY` leading zeroes. + let target = BigUint::one() << (256 - 4 * DIFFICULTY); + + for nonce in 0..MAX_NONCE { + let hash = calculate_hash(block, nonce); + let hash_int = BigUint::from_bytes_be(&hash); + + if hash_int < target { + return Some((nonce, hash)); + } + } + None +} + + +/// A block header. +#[derive(Debug)] +pub struct Header { + timestamp: i64, + prev_block_hash: Sha256Hash, + nonce: u64, +} + + +/// A block. +#[derive(Debug)] +pub struct Block { + header: Header, + // Body: Instead of transactions, blocks contain bytes: + data: Vec, + // Hash of the block: + hash: Option, +} + +impl Block { + // Creates a genesis block, which is a block with no parent. + // + // The `prev_block_hash` field is set to all zeroes. + pub fn genesis() -> Result { + Self::new("Genesis block", Sha256Hash::default()) + } + + /// Creates a new block. + pub fn new(data: &str, prev_hash: Sha256Hash) -> Result { + let mut b = Self { + header: Header { + timestamp: Utc::now().timestamp(), + prev_block_hash: prev_hash, + nonce: 0, + }, + data: data.to_owned().into(), + hash: None, + }; + + try_hash(&b) + .ok_or(MiningError::Iteration) + .and_then(|(nonce, hash)| { + b.header.nonce = nonce; + b.hash = Some(hash); + Ok(b) + }) + } + + /// Returns the block headers. + pub fn headers(&self) -> Vec { + let mut vec = Vec::new(); + + vec.extend(&convert_u64_to_u8_array(self.header.timestamp as u64)); + vec.extend_from_slice(&self.header.prev_block_hash); + + vec + } + + /// Returns this block's nonce. + pub fn nonce(&self) -> u64 { + self.header.nonce + } + + /// Returns this block's hash. + pub fn hash(&self) -> Option { + self.hash.clone() + } + + /// Returns this block's hash. + pub fn prev_block_hash(&self) -> Sha256Hash { + self.header.prev_block_hash + } + + pub fn data(&self) -> &[u8] { + &self.data + } +} + + +/// A sequence of blocks. +pub struct Blockchain { + blocks: Vec, +} + +impl Blockchain { + // Initializes a new blockchain with a genesis block. + pub fn new() -> Result { + let blocks = Block::genesis()?; + + Ok(Self { blocks: vec![blocks] }) + } + + // Adds a newly-mined block to the chain. + pub fn add_block(&mut self, data: &str) -> Result<(), MiningError> { + let block: Block; + { + match self.blocks.last() { + Some(prev) => { + block = Block::new(data, prev.hash().unwrap())?; + } + // Adding a block to an empty blockchain is an error, a genesis block needs to be + // created first. + None => { + return Err(MiningError::NoParent) + } + } + } + + self.blocks.push(block); + + Ok(()) + } + + // A method that iterates over the blockchain's blocks and prints out information for each. + pub fn traverse(&self) { + for (i, block) in self.blocks.iter().enumerate() { + println!("block: {}", i); + println!("hash: {:?}", block.hash()); + println!("parent: {:?}", block.prev_block_hash()); + println!("data: {:?}", block.data()); + println!() + } + } +} \ No newline at end of file diff --git a/src/hydrabadger/handler.rs b/src/hydrabadger/handler.rs new file mode 100644 index 0000000..5a12c29 --- /dev/null +++ b/src/hydrabadger/handler.rs @@ -0,0 +1,741 @@ +//! Hydrabadger event handler. +//! +//! FIXME: Reorganize `Handler` and `State` to more clearly separate concerns. +//! * Do not make state changes directly in this module (use closures, etc.). +//! + +#![allow(unused_imports, dead_code, unused_variables, unused_mut, unused_assignments, + unreachable_code)] + +use std::collections::BTreeMap; +use crossbeam::queue::SegQueue; +use tokio::{ + self, + prelude::*, +}; +use hbbft::{ + crypto::{PublicKey, PublicKeySet}, + sync_key_gen::{Part, PartOutcome, Ack, SyncKeyGen}, + messaging::{DistAlgorithm, Target, }, + dynamic_honey_badger::{Message as DhbMessage, JoinPlan, Change, ChangeState}, + queueing_honey_badger::{Input as QhbInput, Change as QhbChange}, +}; +use peer::Peers; +use ::{InternalMessage, InternalMessageKind, WireMessage, WireMessageKind, + OutAddr, InAddr, Uid, NetworkState, InternalRx, Step, Input, Message, NetworkNodeInfo}; +use super::{Hydrabadger, Error, State, StateDsct, InputOrMessage}; +use super::{WIRE_MESSAGE_RETRY_MAX}; + + + + +/// Hydrabadger event (internal message) handler. +pub struct Handler { + hdb: Hydrabadger, + // TODO: Use a bounded tx/rx (find a sensible upper bound): + peer_internal_rx: InternalRx, + // Outgoing wire message queue: + wire_queue: SegQueue<(Uid, WireMessage, usize)>, + // Output from HoneyBadger: + step_queue: SegQueue, +} + +impl Handler { + pub(super) fn new(hdb: Hydrabadger, peer_internal_rx: InternalRx) -> Handler { + Handler { + hdb, + peer_internal_rx, + wire_queue: SegQueue::new(), + step_queue: SegQueue::new(), + } + } + + fn wire_to_all(&self, msg: WireMessage, peers: &Peers) { + for (_p_addr, peer) in peers.iter() + .filter(|(&p_addr, _)| p_addr != OutAddr(self.hdb.addr().0)) { + peer.tx().unbounded_send(msg.clone()).unwrap(); + } + } + + fn wire_to_validators(&self, msg: WireMessage, peers: &Peers) { + // for peer in peers.validators() + // .filter(|p| p.out_addr() != &OutAddr(self.hdb.addr().0)) { + // peer.tx().unbounded_send(msg.clone()).unwrap(); + // } + + // FIXME(DEBUG): TEMPORARILY WIRE TO ALL FOR NOW: + self.wire_to_all(msg, peers) + } + + // `tar_uid` of `None` sends to all peers. + fn wire_to(&self, tar_uid: Uid, msg: WireMessage, retry_count: usize, peers: &Peers) { + match peers.get_by_uid(&tar_uid) { + Some(p) => p.tx().unbounded_send(msg).unwrap(), + None => { + info!("Node '{}' is not yet established. Queueing message for now (retry_count: {}).", + tar_uid, retry_count); + self.wire_queue.push((tar_uid, msg, retry_count + 1)) + }, + } + } + + fn handle_new_established_peer(&self, src_uid: Uid, _src_addr: OutAddr, src_pk: PublicKey, + request_change_add: bool, state: &mut State, peers: &Peers) -> Result<(), Error> { + match state.discriminant() { + StateDsct::Disconnected | StateDsct::DeterminingNetworkState => { + // panic!("hydrabadger::Handler::handle_new_established_peer: \ + // Received `WireMessageKind::WelcomeRequestChangeAdd` or \ + // `InternalMessageKind::NewIncomingConnection` while \ + // `StateDsct::Disconnected` or `DeterminingNetworkState`."); + state.update_peer_connection_added(&peers); + self.hdb.set_state_discriminant(state.discriminant()); + }, + StateDsct::AwaitingMorePeersForKeyGeneration => { + if peers.count_validators() >= self.hdb.config().keygen_peer_count { + info!("== BEGINNING KEY GENERATION =="); + + let local_uid = *self.hdb.uid(); + let local_in_addr = *self.hdb.addr(); + let local_sk = self.hdb.secret_key().public_key(); + + let (part, ack) = state.set_generating_keys(&local_uid, + self.hdb.secret_key().clone(), peers, self.hdb.config())?; + self.hdb.set_state_discriminant(state.discriminant()); + + info!("KEY GENERATION: Sending initial parts and our own ack."); + self.wire_to_validators( + WireMessage::hello_from_validator( + local_uid, local_in_addr, local_sk, state.network_state(&peers)), + peers); + self.wire_to_validators(WireMessage::key_gen_part(part), peers); + + // FIXME: QUEUE ACKS UNTIL PARTS ARE ALL RECEIVED: + self.wire_to_validators(WireMessage::key_gen_part_ack(ack), peers); + } + }, + StateDsct::GeneratingKeys { .. } => { + // This *could* be called multiple times when initially + // establishing outgoing connections. Do nothing for now. + warn!("hydrabadger::Handler::handle_new_established_peer: Ignoring new established \ + peer signal while `StateDsct::GeneratingKeys`."); + }, + StateDsct::Observer | StateDsct::Validator => { + // If the new peer sends a request-change-add (to be a + // validator), input the change into HB and broadcast, etc. + if request_change_add { + let qhb = state.qhb_mut().unwrap(); + info!("Change-Adding ('{}') to honey badger.", src_uid); + let step = qhb.input(QhbInput::Change(QhbChange::Add(src_uid, src_pk))) + .expect("Error adding new peer to HB"); + self.step_queue.push(step); + } + }, + } + Ok(()) + } + + fn handle_input(&self, input: Input, state: &mut State) -> Result<(), Error> { + // match &input { + // QhbInput::User(_contrib) => {}, + // QhbInput::Change(ref qhb_change) => match qhb_change { + // QhbChange::Add(uid, pk) => { + // if uid == self.hdb.uid() { + // debug_assert!(*pk == self.hdb.secret_key().public_key()); + // } + // } + // QhbChange::Remove(_uid) => {}, + // }, + // } + + trace!("hydrabadger::Handler: About to input...."); + if let Some(step_res) = state.input(input) { + let step = step_res.map_err(|err| { + error!("Honey Badger input error: {:?}", err); + Error::HbStepError + })?; + trace!("hydrabadger::Handler: Input step result added to queue...."); + self.step_queue.push(step); + } + Ok(()) + } + + fn handle_message(&self, msg: Message, src_uid: &Uid, state: &mut State) -> Result <(), Error> { + trace!("hydrabadger::Handler: HB_MESSAGE: {:?}", msg); + // match &msg { + // // A message belonging to the `HoneyBadger` algorithm started in + // // the given epoch. + // DhbMessage::HoneyBadger(start_epoch, ref msg) => {}, + // // A transaction to be committed, signed by a node. + // DhbMessage::KeyGen(epoch, _key_gen_msg, _sig) => {}, + // // A vote to be committed, signed by a validator. + // DhbMessage::SignedVote(signed_vote) => {}, + // } + trace!("hydrabadger::Handler: About to handle_message...."); + if let Some(step_res) = state.handle_message(src_uid, msg) { + let step = step_res.map_err(|err| { + error!("Honey Badger handle_message error: {:?}", err); + Error::HbStepError + })?; + trace!("hydrabadger::Handler: Message step result added to queue...."); + self.step_queue.push(step); + } + Ok(()) + } + + fn handle_ack(&self, uid: &Uid, ack: Ack, sync_key_gen: &mut SyncKeyGen, + ack_count: &mut usize) { + info!("KEY GENERATION: Handling ack from '{}'...", uid); + let fault_log = sync_key_gen.handle_ack(uid, ack.clone()); + if !fault_log.is_empty() { + error!("Errors handling ack: '{:?}':\n{:?}", ack, fault_log); + // panic!("Errors handling ack: '{:?}':\n{:?}", ack, fault_log); + } + *ack_count += 1; + } + + fn handle_queued_acks(&self, ack_queue: &SegQueue<(Uid, Ack)>, + sync_key_gen: &mut SyncKeyGen, part_count: usize, ack_count: &mut usize) { + if part_count == self.hdb.config().keygen_peer_count + 1 { + info!("KEY GENERATION: Handling queued acks..."); + + debug!(" Peers complete: {}", sync_key_gen.count_complete()); + debug!(" Part count: {}", part_count); + debug!(" Ack count: {}", ack_count); + + while let Some((uid, ack)) = ack_queue.try_pop() { + self.handle_ack(&uid, ack, sync_key_gen, ack_count); + } + } + } + + fn handle_key_gen_part(&self, src_uid: &Uid, part: Part, state: &mut State) { + match state { + State::GeneratingKeys { ref mut sync_key_gen, ref ack_queue, ref mut part_count, + ref mut ack_count, .. } => { + // TODO: Move this match block into a function somewhere for re-use: + info!("KEY GENERATION: Handling part from '{}'...", src_uid); + let mut skg = sync_key_gen.as_mut().unwrap(); + let ack = match skg.handle_part(src_uid, part) { + Some(PartOutcome::Valid(ack)) => ack, + Some(PartOutcome::Invalid(faults)) => panic!("Invalid part \ + (FIXME: handle): {:?}", faults), + None => { + error!("`QueueingHoneyBadger::handle_part` returned `None`."); + // panic!("`QueueingHoneyBadger::handle_part` returned `None`."); + return; + } + }; + + *part_count += 1; + + info!("KEY GENERATION: Queueing `Ack`."); + ack_queue.as_ref().unwrap().push((*src_uid, ack.clone())); + + self.handle_queued_acks(ack_queue.as_ref().unwrap(), skg, *part_count, ack_count); + + let peers = self.hdb.peers(); + info!("KEY GENERATION: Part from '{}' acknowledged. Broadcasting ack...", src_uid); + self.wire_to_validators(WireMessage::key_gen_part_ack(ack), &peers); + + debug!(" Peers complete: {}", skg.count_complete()); + debug!(" Part count: {}", part_count); + debug!(" Ack count: {}", ack_count); + }, + State::DeterminingNetworkState { network_state, .. } => { + match network_state.is_some() { + true => unimplemented!(), + false => unimplemented!(), + } + }, + s @ _ => panic!("::handle_key_gen_part: State must be `GeneratingKeys`. \ + State: \n{:?} \n\n[FIXME: Enqueue these parts!]\n\n", s.discriminant()), + } + } + + fn handle_key_gen_ack(&self, src_uid: &Uid, ack: Ack, state: &mut State, peers: &Peers) + -> Result<(), Error> { + let mut keygen_is_complete = false; + match state { + State::GeneratingKeys { ref mut sync_key_gen, ref ack_queue, ref part_count, + ref mut ack_count, .. } => { + let mut skg = sync_key_gen.as_mut().unwrap(); + + info!("KEY GENERATION: Queueing `Ack`."); + ack_queue.as_ref().unwrap().push((*src_uid, ack.clone())); + + self.handle_queued_acks(ack_queue.as_ref().unwrap(), skg, *part_count, ack_count); + + let node_n = self.hdb.config().keygen_peer_count + 1; + + if skg.count_complete() == node_n + && *ack_count >= node_n * node_n { + info!("KEY GENERATION: All acks received and handled."); + debug!(" Peers complete: {}", skg.count_complete()); + debug!(" Part count: {}", part_count); + debug!(" Ack count: {}", ack_count); + + assert!(skg.is_ready()); + keygen_is_complete = true; + } + }, + State::Validator { .. } | State::Observer { .. } => { + error!("Additional unhandled `Ack` received from '{}': \n{:?}", src_uid, ack); + // panic!("Additional unhandled `Ack` received from '{}': \n{:?}", src_uid, ack); + } + _ => panic!("::handle_key_gen_ack: State must be `GeneratingKeys`."), + } + if keygen_is_complete { + self.instantiate_hb(None, state, peers)?; + } + Ok(()) + } + + // This may be called spuriously and only need be handled by + // 'unestablished' nodes. + fn handle_join_plan(&self, jp: JoinPlan, state: &mut State, peers: &Peers) + -> Result<(), Error> { + debug!("Join plan: \n{:?}", jp); + + match state.discriminant() { + StateDsct::Disconnected => unimplemented!("hydrabadger::Handler::handle_join_plan: `Disconnected`"), + StateDsct::DeterminingNetworkState => { + info!("Received join plan."); + self.instantiate_hb(Some(jp), state, peers)?; + }, + StateDsct::AwaitingMorePeersForKeyGeneration | StateDsct::GeneratingKeys => { + panic!("hydrabadger::Handler::handle_join_plan: Received join plan while \ + `{}`", state.discriminant()); + }, + StateDsct::Observer | StateDsct::Validator => {}, // Ignore + // sd @ _ => unimplemented!("hydrabadger::Handler::handle_join_plan: {:?}", sd), + } + + Ok(()) + } + + // TODO: Create a type for `net_info`. + fn instantiate_hb(&self, + // net_info: Option<(Vec, PublicKeySet, BTreeMap)>, + jp_opt: Option>, + state: &mut State, peers: &Peers) -> Result<(), Error> { + let mut iom_queue_opt = None; + + match state.discriminant() { + StateDsct::Disconnected => { unimplemented!() }, + StateDsct::DeterminingNetworkState | StateDsct::GeneratingKeys => { + info!("== INSTANTIATING HONEY BADGER =="); + match jp_opt { + // Some((nni, pk_set, pk_map)) => { + // iom_queue_opt = Some(state.set_observer(*self.hdb.uid(), + // self.hdb.secret_key().clone(), nni, pk_set, pk_map)); + // }, + Some(jp) => { + iom_queue_opt = Some(state.set_observer(*self.hdb.uid(), + self.hdb.secret_key().clone(), jp, self.hdb.config(), &self.step_queue)?); + }, + None => { + iom_queue_opt = Some(state.set_validator(*self.hdb.uid(), + self.hdb.secret_key().clone(), peers, self.hdb.config(), &self.step_queue)?); + } + } + }, + StateDsct::AwaitingMorePeersForKeyGeneration => { unimplemented!() }, + StateDsct::Observer => { + // TODO: Add checks to ensure that `net_info` is consistent + // with HB's netinfo. + warn!("hydrabadger::Handler::instantiate_hb: Called when `State::Observer`"); + }, + StateDsct::Validator => { + // TODO: Add checks to ensure that `net_info` is consistent + // with HB's netinfo. + warn!("hydrabadger::Handler::instantiate_hb: Called when `State::Validator`") + }, + } + + self.hdb.set_state_discriminant(state.discriminant()); + + // Handle previously queued input and messages: + if let Some(iom_queue) = iom_queue_opt { + while let Some(iom) = iom_queue.try_pop() { + match iom { + InputOrMessage::Input(input) => { + self.handle_input(input, state)?; + }, + InputOrMessage::Message(uid, msg) => { + self.handle_message(msg, &uid, state)?; + } + } + } + } + Ok(()) + } + + fn handle_net_state(&self, net_state: NetworkState, state: &mut State, peers: &Peers) + -> Result<(), Error> { + let peer_infos; + match net_state { + NetworkState::Unknown(p_infos) => { + peer_infos = p_infos; + state.update_peer_connection_added(peers); + self.hdb.set_state_discriminant(state.discriminant()); + } + NetworkState::AwaitingMorePeersForKeyGeneration(p_infos) => { + peer_infos = p_infos; + state.set_awaiting_more_peers(); + self.hdb.set_state_discriminant(state.discriminant()); + }, + NetworkState::GeneratingKeys(p_infos, public_keys) => { + peer_infos = p_infos; + // state.set_observer(); + }, + NetworkState::Active(net_info) => { + peer_infos = net_info.0.clone(); + match state { + State::DeterminingNetworkState { ref mut network_state, .. } => { + *network_state = Some(NetworkState::Active(net_info)); + }, + | State::Disconnected { .. } + | State::AwaitingMorePeersForKeyGeneration { .. } + | State::GeneratingKeys { .. } => { + panic!("Handler::net_state: Received `NetworkState::Active` while `{}`.", + state.discriminant()); + }, + _ => {}, + } + + // self.instantiate_hb(Some(net_info), peers, state)?; + }, + NetworkState::None => panic!("`NetworkState::None` received."), + } + + // Connect to all newly discovered peers. + for peer_info in peer_infos.iter() { + // Only connect with peers which are not already + // connected (and are not us). + if peer_info.in_addr != *self.hdb.addr() + && !peers.contains_in_addr(&peer_info.in_addr) + && peers.get(&OutAddr(peer_info.in_addr.0)).is_none() { + let local_pk = self.hdb.secret_key().public_key(); + tokio::spawn(self.hdb.clone().connect_outgoing( + peer_info.in_addr.0, + local_pk, + Some((peer_info.uid, peer_info.in_addr, peer_info.pk)), + false, + )); + } + } + Ok(()) + } + + fn handle_peer_disconnect(&self, src_uid: Uid, state: &mut State, peers: &Peers) + -> Result<(), Error> { + // self.hdb.qhb.write().input(Input::Change(Change::Remove(self.uid))) + // .expect("Error adding new peer to HB"); + + // Input::Change(Change::Remove(NodeUid(0))) + // self.hdb.peer_internal_tx.unbounded_send(InternalMessage::input( + // uid, self.out_addr, Input::Change(Change::Remove(uid)))).unwrap(); + + state.update_peer_connection_dropped(peers); + self.hdb.set_state_discriminant(state.discriminant()); + + // TODO: Send a node removal (Change-Remove) vote? + + match state { + State::Disconnected { .. } => { + panic!("Received `WireMessageKind::PeerDisconnect` while disconnected."); + }, + State::DeterminingNetworkState { .. } => { + // unimplemented!(); + }, + State::AwaitingMorePeersForKeyGeneration { .. } => { + // info!("Removing peer ({}: '{}') from await list.", + // src_out_addr, src_uid.clone().unwrap()); + // state.peer_connection_dropped(&*self.hdb.peers()); + }, + State::GeneratingKeys { .. } => { + // Do something here (possibly panic). + }, + State::Observer { ref mut qhb } => { + // Do nothing instead? + let step = qhb.as_mut().unwrap().input(QhbInput::Change(QhbChange::Remove(src_uid)))?; + self.step_queue.push(step); + } + State::Validator { ref mut qhb } => { + let step = qhb.as_mut().unwrap().input(QhbInput::Change(QhbChange::Remove(src_uid)))?; + self.step_queue.push(step); + }, + } + Ok(()) + } + + fn handle_internal_message(&self, i_msg: InternalMessage, state: &mut State) + -> Result<(), Error> { + let (src_uid, src_out_addr, w_msg) = i_msg.into_parts(); + + match w_msg { + // New incoming connection: + InternalMessageKind::NewIncomingConnection(_src_in_addr, src_pk, request_change_add) => { + let peers = self.hdb.peers(); + + // if let StateDsct::Disconnected = state.discriminant() { + // state.set_awaiting_more_peers(); + // } + + // match state.discriminant() { + // StateDsct::Disconnected | StateDsct::DeterminingNetworkState => { + // state.set_awaiting_more_peers(); + // self.hdb.set_state_discriminant(state.discriminant()); + // }, + // _ => {}, + // } + + let net_state; + + match state { + State::Disconnected { } => { + state.set_awaiting_more_peers(); + self.hdb.set_state_discriminant(state.discriminant()); + net_state = state.network_state(&peers); + }, + // | State::GeneratingKeys { .. } + // | State::AwaitingMorePeersForKeyGeneration { .. } => { + // net_state = state.network_state(&peers); + // }, + State::DeterminingNetworkState { ref network_state, .. } => { + match network_state { + Some(ns) => net_state = ns.clone(), + None => net_state = state.network_state(&peers) + } + }, + _ => net_state = state.network_state(&peers), + } + + // // Get the current `NetworkState`: + // let net_state = state.network_state(&peers); + + // Send response to remote peer: + peers.get(&src_out_addr).unwrap().tx().unbounded_send( + WireMessage::welcome_received_change_add( + self.hdb.uid().clone(), self.hdb.secret_key().public_key(), + net_state) + ).unwrap(); + + // Modify state accordingly: + self.handle_new_established_peer(src_uid.unwrap(), src_out_addr, src_pk, + request_change_add, state, &peers)?; + }, + + // New outgoing connection (initial): + InternalMessageKind::NewOutgoingConnection => { + // This message must be immediately followed by either a + // `WireMessage::HelloFromValidator` or + // `WireMessage::WelcomeReceivedChangeAdd`. + debug_assert!(src_uid.is_none()); + + let peers = self.hdb.peers(); + state.update_peer_connection_added(&peers); + self.hdb.set_state_discriminant(state.discriminant()); + }, + + InternalMessageKind::HbInput(input) => { + self.handle_input(input, state)?; + }, + + InternalMessageKind::HbMessage(msg) => { + self.handle_message(msg, src_uid.as_ref().unwrap(), state)?; + }, + + InternalMessageKind::PeerDisconnect => { + let dropped_src_uid = src_uid.clone().unwrap(); + info!("Peer disconnected: ({}: '{}').", src_out_addr, dropped_src_uid); + let peers = self.hdb.peers(); + self.handle_peer_disconnect(dropped_src_uid, state, &peers)?; + }, + + InternalMessageKind::Wire(w_msg) => match w_msg.into_kind() { + // This is sent on the wire to ensure that we have all of the + // relevant details for a peer (generally preceeding other + // messages which may arrive before `Welcome...`. + WireMessageKind::HelloFromValidator(src_uid_new, src_in_addr, src_pk, net_state) => { + debug!("Received hello from {}", src_uid_new); + let mut peers = self.hdb.peers_mut(); + match peers.establish_validator(src_out_addr, (src_uid_new, src_in_addr, src_pk)) { + true => debug_assert!(src_uid_new == src_uid.clone().unwrap()), + false => debug_assert!(src_uid.is_none()), + } + + // Modify state accordingly: + self.handle_net_state(net_state, state, &peers)?; + } + + // New outgoing connection: + WireMessageKind::WelcomeReceivedChangeAdd(src_uid_new, src_pk, net_state) => { + debug!("Received NetworkState: \n{:?}", net_state); + assert!(src_uid_new == src_uid.clone().unwrap()); + let mut peers = self.hdb.peers_mut(); + + // Set new (outgoing-connection) peer's public info: + peers.establish_validator(src_out_addr, + (src_uid_new, InAddr(src_out_addr.0), src_pk)); + + // Modify state accordingly: + self.handle_net_state(net_state, state, &peers)?; + + // Modify state accordingly: + self.handle_new_established_peer(src_uid_new, src_out_addr, src_pk, + false, state, &peers)?; + }, + + // Key gen proposal: + WireMessageKind::KeyGenPart(part) => { + self.handle_key_gen_part(&src_uid.unwrap(), part, state); + }, + + // Key gen proposal acknowledgement: + // + // FIXME: Queue until all parts have been sent. + WireMessageKind::KeyGenAck(ack) => { + let peers = self.hdb.peers(); + self.handle_key_gen_ack(&src_uid.unwrap(), ack, state, &peers)?; + }, + + // Output by validators when a batch with a `ChangeState` + // other than `None` is output. Idempotent. + WireMessageKind::JoinPlan(jp) => { + let peers = self.hdb.peers(); + self.handle_join_plan(jp, state, &peers)?; + }, + + wm @ _ => warn!("hydrabadger::Handler::handle_internal_message: Unhandled wire message: \ + \n{:?}", wm,), + }, + } + Ok(()) + } +} + +impl Future for Handler { + type Item = (); + type Error = Error; + + /// Polls the internal message receiver until all txs are dropped. + fn poll(&mut self) -> Poll<(), Error> { + // Ensure the loop can't hog the thread for too long: + const MESSAGES_PER_TICK: usize = 50; + + trace!("hydrabadger::Handler::poll: Locking 'state' for writing..."); + let mut state = self.hdb.state_mut(); + trace!("hydrabadger::Handler::poll: 'state' locked for writing."); + + // Handle incoming internal messages: + for i in 0..MESSAGES_PER_TICK { + match self.peer_internal_rx.poll() { + Ok(Async::Ready(Some(i_msg))) => { + self.handle_internal_message(i_msg, &mut state)?; + + // Exceeded max messages per tick, schedule notification: + if i + 1 == MESSAGES_PER_TICK { + task::current().notify(); + } + }, + Ok(Async::Ready(None)) => { + // The sending ends have all dropped. + info!("Shutting down Handler..."); + return Ok(Async::Ready(())); + }, + Ok(Async::NotReady) => {}, + Err(()) => return Err(Error::HydrabadgerHandlerPoll), + }; + } + + let peers = self.hdb.peers(); + + // Process outgoing wire queue: + while let Some((tar_uid, msg, retry_count)) = self.wire_queue.try_pop() { + if retry_count < WIRE_MESSAGE_RETRY_MAX { + info!("Sending queued message from retry queue (retry_count: {})", retry_count); + self.wire_to(tar_uid, msg, retry_count, &peers); + } else { + info!("Discarding queued message for '{}': {:?}", tar_uid, msg); + } + } + + trace!("hydrabadger::Handler: Processing step queue...."); + + // Process all honey badger output batches: + while let Some(mut step) = self.step_queue.try_pop() { + if step.output.len() > 0 { info!("NEW STEP OUTPUT:"); } + + for batch in step.output.drain(..) { + info!(" BATCH: \n{:?}", batch); + + if cfg!(exit_upon_epoch_1000) && batch.epoch() >= 1000 { + return Ok(Async::Ready(())) + } + + if let Some(jp) = batch.join_plan() { + // FIXME: Only sent to unconnected nodes: + debug!("Outputting join plan: {:?}", jp); + self.wire_to_all(WireMessage::join_plan(jp), &peers); + } + + match batch.change() { + ChangeState::None => {}, + ChangeState::InProgress(_change) => {}, + ChangeState::Complete(change) => match change { + Change::Add(uid, pk) => { + if uid == self.hdb.uid() { + assert_eq!(*pk, self.hdb.secret_key().public_key()); + assert!(state.qhb().unwrap().dyn_hb().netinfo().is_validator()); + state.promote_to_validator()?; + self.hdb.set_state_discriminant(state.discriminant()); + } + }, + Change::Remove(uid) => { + + }, + }, + } + + let extra_delay = self.hdb.config().output_extra_delay_ms; + + if extra_delay > 0 { + info!("Delaying batch processing thread for {}ms", extra_delay); + ::std::thread::sleep(::std::time::Duration::from_millis(extra_delay)); + } + + // TODO: Something useful! + } + + for hb_msg in step.messages.drain(..) { + trace!("hydrabadger::Handler: Forwarding message: {:?}", hb_msg); + match hb_msg.target { + Target::Node(p_uid) => { + self.wire_to(p_uid, WireMessage::message(*self.hdb.uid(), hb_msg.message), 0, &peers); + }, + Target::All => { + self.wire_to_all(WireMessage::message(*self.hdb.uid(), hb_msg.message), &peers); + }, + } + } + + if !step.fault_log.is_empty() { + error!(" FAULT LOG: \n{:?}", step.fault_log); + } + } + + // TODO: Iterate through `state.qhb().unwrap().dyn_hb().netinfo()` and + // `peers` to ensure that the lists match. Make adjustments where + // necessary. + + trace!("hydrabadger::Handler: Step queue processing complete."); + + drop(peers); + drop(state); + trace!("hydrabadger::Handler::poll: 'state' unlocked for writing."); + + Ok(Async::NotReady) + } +} + diff --git a/src/hydrabadger/hydrabadger.rs b/src/hydrabadger/hydrabadger.rs new file mode 100644 index 0000000..046c3a2 --- /dev/null +++ b/src/hydrabadger/hydrabadger.rs @@ -0,0 +1,416 @@ +//! A hydrabadger consensus node. +//! + +#![allow(unused_imports, dead_code, unused_variables, unused_mut, unused_assignments, + unreachable_code)] + +use std::{ + time::{Duration, Instant}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + collections::HashSet, + net::{SocketAddr, ToSocketAddrs}, +}; +use futures::{ + sync::mpsc, + future::{self, Either}, +}; +use tokio::{ + self, + net::{TcpListener, TcpStream}, + timer::Interval, + prelude::*, +}; +use rand::{self, Rand}; +use parking_lot::{RwLock, Mutex, RwLockReadGuard, RwLockWriteGuard}; +use hbbft::{ + crypto::{PublicKey, SecretKey}, + queueing_honey_badger::{Input as QhbInput}, +}; +use peer::{PeerHandler, Peers}; +use ::{InternalMessage, WireMessage, WireMessageKind, WireMessages, + OutAddr, InAddr, Uid, InternalTx, Transaction}; +use super::{Error, State, StateDsct, Handler}; + + +// The HoneyBadger batch size. +const DEFAULT_BATCH_SIZE: usize = 200; +// The number of random transactions to generate per interval. +const DEFAULT_TXN_GEN_COUNT: usize = 5; +// The interval between randomly generated transactions. +const DEFAULT_TXN_GEN_INTERVAL: u64 = 5000; +// The number of bytes per randomly generated transaction. +const DEFAULT_TXN_GEN_BYTES: usize = 2; +// The minimum number of peers needed to spawn a HB instance. +const DEFAULT_KEYGEN_PEER_COUNT: usize = 2; +// Causes the primary hydrabadger thread to sleep after every batch. Used for +// debugging. +const DEFAULT_OUTPUT_EXTRA_DELAY_MS: u64 = 0; + + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Config { + pub batch_size: usize, + pub txn_gen_count: usize, + pub txn_gen_interval: u64, + // TODO: Make this a range: + pub txn_gen_bytes: usize, + pub keygen_peer_count: usize, + pub output_extra_delay_ms: u64, +} + +impl Config { + pub fn with_defaults() -> Config { + Config { + batch_size: DEFAULT_BATCH_SIZE, + txn_gen_count: DEFAULT_TXN_GEN_COUNT, + txn_gen_interval: DEFAULT_TXN_GEN_INTERVAL, + txn_gen_bytes: DEFAULT_TXN_GEN_BYTES, + keygen_peer_count: DEFAULT_KEYGEN_PEER_COUNT, + output_extra_delay_ms: DEFAULT_OUTPUT_EXTRA_DELAY_MS, + } + } +} + +impl Default for Config { + fn default() -> Config { + Config::with_defaults() + } +} + + +/// The `Arc` wrapped portion of `Hydrabadger`. +/// +/// Shared all over the place. +struct Inner { + /// Node uid: + uid: Uid, + /// Incoming connection socket. + addr: InAddr, + + /// This node's secret key. + secret_key: SecretKey, + + peers: RwLock, + + /// The current state containing HB when connected. + state: RwLock, + + // TODO: Move this into a new state struct. + state_dsct: AtomicUsize, + + // TODO: Use a bounded tx/rx (find a sensible upper bound): + peer_internal_tx: InternalTx, + + config: Config, +} + + +/// A `HoneyBadger` network node. +#[derive(Clone)] +pub struct Hydrabadger { + inner: Arc, + handler: Arc>>, +} + +impl Hydrabadger { + /// Returns a new Hydrabadger node. + pub fn new(addr: SocketAddr, cfg: Config) -> Self { + use std::env; + use env_logger; + use chrono::Local; + + env_logger::Builder::new() + .format(|buf, record| { + write!(buf, + "{} [{}] - HYDRABADGER: {}\n", + Local::now().format("%Y-%m-%dT%H:%M:%S"), + record.level(), + record.args() + ) + }) + .parse(&env::var("HYDRABADGER_LOG").unwrap_or_default()) + .try_init().ok(); + + let uid = Uid::new(); + let secret_key = SecretKey::rand(&mut rand::thread_rng()); + + let (peer_internal_tx, peer_internal_rx) = mpsc::unbounded(); + + info!(""); + info!("Local Hydrabadger Node: "); + info!(" UID: {}", uid); + info!(" Socket Address: {}", addr); + info!(" Public Key: {:?}", secret_key.public_key()); + + warn!(""); + warn!("****** This is an alpha build. Do not use in production! ******"); + warn!(""); + + println!(""); + println!("Local Hydrabadger Node: "); + println!(" UID: {}", uid); + println!(" Socket Address: {}", addr); + println!(" Public Key: {:?}", secret_key.public_key()); + + + + let inner = Arc::new(Inner { + uid, + addr: InAddr(addr), + secret_key, + peers: RwLock::new(Peers::new()), + state: RwLock::new(State::disconnected()), + state_dsct: AtomicUsize::new(0), + peer_internal_tx, + config: cfg, + }); + + let hdb = Hydrabadger { + inner, + handler: Arc::new(Mutex::new(None)), + }; + + *hdb.handler.lock() = Some(Handler::new(hdb.clone(), peer_internal_rx)); + + hdb + } + + /// Returns a new Hydrabadger node. + pub fn with_defaults(addr: SocketAddr) -> Self { + Hydrabadger::new(addr, Config::default()) + } + + /// Returns the pre-created handler. + pub fn handler(&self) -> Option { + self.handler.lock().take() + } + + /// Returns a reference to the inner state. + pub(crate) fn state(&self) -> RwLockReadGuard { + let state = self.inner.state.read(); + state + } + + /// Returns a mutable reference to the inner state. + pub(crate) fn state_mut(&self) -> RwLockWriteGuard { + let state = self.inner.state.write(); + state + } + + /// Returns a recent state discriminant. + /// + /// The returned value may not be up to date and is to be considered + /// immediately stale. + pub fn state_info_stale(&self) -> (StateDsct, usize, usize) { + let sd = self.inner.state_dsct.load(Ordering::Relaxed).into(); + (sd, 0, 0) + } + + /// Sets the publicly visible state discriminant and returns the previous value. + pub(super) fn set_state_discriminant(&self, dsct: StateDsct) -> StateDsct { + let sd = StateDsct::from(self.inner.state_dsct.swap(dsct.into(), Ordering::Release)); + info!("State has been set from '{}' to '{}'.", sd, dsct); + sd + } + + /// Returns a reference to the peers list. + pub(crate) fn peers(&self) -> RwLockReadGuard { + self.inner.peers.read() + } + + /// Returns a mutable reference to the peers list. + pub(crate) fn peers_mut(&self) -> RwLockWriteGuard { + self.inner.peers.write() + } + + /// Returns a mutable reference to the peers list. + pub(crate) fn config(&self) -> &Config { + &self.inner.config + } + + /// Sends a message on the internal tx. + pub(crate) fn send_internal(&self, msg: InternalMessage) { + if let Err(err) = self.inner.peer_internal_tx.unbounded_send(msg) { + error!("Unable to send on internal tx. Internal rx has dropped: {}", err); + ::std::process::exit(-1) + } + } + + /// Returns a future that handles incoming connections on `socket`. + fn handle_incoming(self, socket: TcpStream) + -> impl Future { + info!("Incoming connection from '{}'", socket.peer_addr().unwrap()); + let wire_msgs = WireMessages::new(socket); + + wire_msgs.into_future() + .map_err(|(e, _)| e) + .and_then(move |(msg_opt, w_messages)| { + // let _hdb = self.clone(); + + match msg_opt { + Some(msg) => match msg.into_kind() { + // The only correct entry point: + WireMessageKind::HelloRequestChangeAdd(peer_uid, peer_in_addr, peer_pk) => { + // Also adds a `Peer` to `self.peers`. + let peer_h = PeerHandler::new(Some((peer_uid, peer_in_addr, peer_pk)), + self.clone(), w_messages); + + // Relay incoming `HelloRequestChangeAdd` message internally. + peer_h.hdb().send_internal( + InternalMessage::new_incoming_connection(peer_uid, + *peer_h.out_addr(), peer_in_addr, peer_pk, true) + ); + Either::B(peer_h) + }, + _ => { + // TODO: Return this as a future-error (handled below): + error!("Peer connected without sending \ + `WireMessageKind::HelloRequestChangeAdd`."); + Either::A(future::ok(())) + }, + }, + None => { + // The remote client closed the connection without sending + // a welcome_request_change_add message. + Either::A(future::ok(())) + }, + } + }) + .map_err(|err| error!("Connection error = {:?}", err)) + } + + /// Returns a future that connects to new peer. + pub(super) fn connect_outgoing(self, remote_addr: SocketAddr, local_pk: PublicKey, + pub_info: Option<(Uid, InAddr, PublicKey)>, is_optimistic: bool) + -> impl Future { + let uid = self.inner.uid.clone(); + let in_addr = self.inner.addr; + info!("Initiating outgoing connection to: {}", remote_addr); + + TcpStream::connect(&remote_addr) + .map_err(Error::from) + .and_then(move |socket| { + // Wrap the socket with the frame delimiter and codec: + let mut wire_msgs = WireMessages::new(socket); + let wire_hello_result = wire_msgs.send_msg( + WireMessage::hello_request_change_add(uid, in_addr, local_pk)); + match wire_hello_result { + Ok(_) => { + let peer = PeerHandler::new(pub_info, self.clone(), wire_msgs); + + self.send_internal(InternalMessage::new_outgoing_connection(*peer.out_addr())); + + Either::A(peer) + }, + Err(err) => Either::B(future::err(err)), + } + }) + .map_err(move |err| { + if is_optimistic { + warn!("Unable to connect to: {}", remote_addr); + } else { + error!("Error connecting to: {} \n{:?}", remote_addr, err); + } + }) + } + + /// Returns a future that generates random transactions and logs status + /// messages. + fn generate_txns_status(self) -> impl Future { + Interval::new(Instant::now(), Duration::from_millis(self.inner.config.txn_gen_interval)) + .for_each(move |_| { + let hdb = self.clone(); + let peers = hdb.peers(); + + // Log state: + let (dsct, p_ttl, p_est) = hdb.state_info_stale(); + let peer_count = peers.count_total(); + info!("State: {:?}({})", dsct, peer_count); + + // Log peer list: + let peer_list = peers.peers().map(|p| { + p.in_addr().map(|ia| ia.0.to_string()) + .unwrap_or(format!("No in address")) + }).collect::>(); + info!(" Peers: {:?}", peer_list); + + // Log (trace) full peerhandler details: + trace!("PeerHandler list:"); + for (peer_addr, _peer) in peers.iter() { + trace!(" peer_addr: {}", peer_addr); } + + drop(peers); + + match dsct { + StateDsct::Validator => { + info!("Generating and inputting {} random transactions...", self.inner.config.txn_gen_count); + // Send some random transactions to our internal HB instance. + let txns: Vec<_> = (0..self.inner.config.txn_gen_count).map(|_| { + Transaction::random(self.inner.config.txn_gen_bytes) + }).collect(); + + hdb.send_internal( + InternalMessage::hb_input(hdb.inner.uid, OutAddr(*hdb.inner.addr), QhbInput::User(txns)) + ); + }, + _ => {}, + } + + Ok(()) + }) + .map_err(|err| error!("List connection inverval error: {:?}", err)) + } + + /// Binds to a host address and returns a future which starts the node. + pub fn node(self, remotes: Option>, reactor_remote: Option<()>) + -> impl Future { + let socket = TcpListener::bind(&self.inner.addr).unwrap(); + info!("Listening on: {}", self.inner.addr); + + let remotes = remotes.unwrap_or(HashSet::new()); + + let hdb = self.clone(); + let listen = socket.incoming() + .map_err(|err| error!("Error accepting socket: {:?}", err)) + .for_each(move |socket| { + tokio::spawn(hdb.clone().handle_incoming(socket)); + Ok(()) + }); + + let hdb = self.clone(); + let local_pk = hdb.inner.secret_key.public_key(); + let connect = future::lazy(move || { + for &remote_addr in remotes.iter() { + tokio::spawn(hdb.clone().connect_outgoing(remote_addr, local_pk, None, true)); + } + Ok(()) + }); + + let generate_txns_status = self.clone().generate_txns_status(); + + let hdb_handler = self.handler() + .map_err(|err| error!("Handler internal error: {:?}", err)); + + listen.join4(connect, generate_txns_status, hdb_handler).map(|(_, _, _, _)| ()) + } + + /// Starts a node. + pub fn run_node(self, remotes: Option>) { + tokio::run(self.node(remotes, None)); + } + + pub fn addr(&self) -> &InAddr { + &self.inner.addr + } + + pub fn uid(&self) -> &Uid { + &self.inner.uid + } + + pub(super) fn secret_key(&self) -> &SecretKey { + &self.inner.secret_key + } +} diff --git a/src/hydrabadger/mod.rs b/src/hydrabadger/mod.rs new file mode 100644 index 0000000..67a75b9 --- /dev/null +++ b/src/hydrabadger/mod.rs @@ -0,0 +1,73 @@ +mod state; +mod handler; +mod hydrabadger; + +use std; +use bincode; +use hbbft::{ + dynamic_honey_badger::{Error as DhbError}, + queueing_honey_badger::{Error as QhbError}, + sync_key_gen::{Error as SyncKeyGenError}, +}; +use ::{Message, Input, Uid}; +use self::state::{State, StateDsct}; +use self::handler::{Handler}; + +pub use self::hydrabadger::{Hydrabadger, Config}; + +// Number of times to attempt wire message re-send. +pub const WIRE_MESSAGE_RETRY_MAX: usize = 10; + + +/// A HoneyBadger input or message. +#[derive(Clone, Debug)] +pub(crate) enum InputOrMessage { + Input(Input), + Message(Uid, Message), +} + + +#[derive(Debug, Fail)] +pub enum Error { + #[fail(display = "Io error: {}", _0)] + Io(std::io::Error), + #[fail(display = "Serde error: {}", _0)] + Serde(bincode::Error), + #[fail(display = "Error polling hydrabadger internal receiver")] + HydrabadgerHandlerPoll, + // FIXME: Make honeybadger error thread safe. + #[fail(display = "QueuingHoneyBadger propose error")] + QhbPart, + /// TEMPORARY UNTIL WE FIX HB ERROR TYPES: + #[fail(display = "DynamicHoneyBadger error")] + Dhb(()), + /// TEMPORARY UNTIL WE FIX HB ERROR TYPES: + #[fail(display = "QueuingHoneyBadger error [FIXME]")] + Qhb(()), + /// TEMPORARY UNTIL WE FIX HB ERROR TYPES: + #[fail(display = "QueuingHoneyBadger step error")] + HbStepError, + #[fail(display = "Error creating SyncKeyGen: {}", _0)] + SyncKeyGenNew(SyncKeyGenError), + #[fail(display = "Error generating keys: {}", _0)] + SyncKeyGenGenerate(SyncKeyGenError), +} + +impl From for Error { + fn from(err: std::io::Error) -> Error { + Error::Io(err) + } +} + +impl From for Error { + fn from(_err: QhbError) -> Error { + Error::Qhb(()) + } +} + +impl From for Error { + fn from(_err: DhbError) -> Error { + Error::Dhb(()) + } +} + diff --git a/src/hydrabadger/state.rs b/src/hydrabadger/state.rs new file mode 100644 index 0000000..1fb8c85 --- /dev/null +++ b/src/hydrabadger/state.rs @@ -0,0 +1,509 @@ +//! Hydrabadger state. +//! +//! FIXME: Reorganize `Handler` and `State` to more clearly separate concerns. +//! + +#![allow(dead_code)] + +use std::{ + fmt, + collections::BTreeMap, +}; +use crossbeam::queue::SegQueue; +use hbbft::{ + crypto::{PublicKey, SecretKey}, + sync_key_gen::{SyncKeyGen, Part, PartOutcome, Ack}, + messaging::{DistAlgorithm, NetworkInfo}, + queueing_honey_badger::{Error as QhbError, QueueingHoneyBadger}, + dynamic_honey_badger::{DynamicHoneyBadger, JoinPlan}, + +}; +use peer::Peers; +use ::{Uid, NetworkState, NetworkNodeInfo, Message, Transaction, Step, Input}; +use super::{InputOrMessage, Error, Config}; +// use super::{BATCH_SIZE, config.keygen_peer_count}; + + +/// A `State` discriminant. +#[derive(Copy, Clone, Debug)] +pub enum StateDsct { + Disconnected, + DeterminingNetworkState, + AwaitingMorePeersForKeyGeneration, + GeneratingKeys, + Observer, + Validator, +} + +impl fmt::Display for StateDsct { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{:?}", self) + } +} + +impl From for usize { + fn from(dsct: StateDsct) -> usize { + match dsct { + StateDsct::Disconnected => 0, + StateDsct::DeterminingNetworkState => 1, + StateDsct::AwaitingMorePeersForKeyGeneration => 2, + StateDsct::GeneratingKeys => 3, + StateDsct::Observer => 4, + StateDsct::Validator => 5, + } + } +} + +impl From for StateDsct { + fn from(val: usize) -> StateDsct { + match val { + 0 => StateDsct::Disconnected, + 1 => StateDsct::DeterminingNetworkState, + 2 => StateDsct::AwaitingMorePeersForKeyGeneration, + 3 => StateDsct::GeneratingKeys, + 4 => StateDsct::Observer, + 5 => StateDsct::Validator, + _ => panic!("Invalid state discriminant."), + } + } +} + + +/// The current hydrabadger state. +// +// TODO: Make this into a struct and move the `state_dsct: AtomicUsize` field +// into it. +// +pub(crate) enum State { + Disconnected { }, + DeterminingNetworkState { + ack_queue: Option>, + iom_queue: Option>, + network_state: Option, + }, + AwaitingMorePeersForKeyGeneration { + // Queued input to HoneyBadger: + // FIXME: ACTUALLY USE THIS QUEUED INPUT! + ack_queue: Option>, + iom_queue: Option>, + }, + GeneratingKeys { + sync_key_gen: Option>, + public_key: Option, + public_keys: BTreeMap, + + ack_queue: Option>, + part_count: usize, + ack_count: usize, + + // Queued input to HoneyBadger: + iom_queue: Option>, + }, + Observer { + qhb: Option, Uid>>, + }, + Validator { + qhb: Option, Uid>>, + }, +} + +impl State { + /// Returns the state discriminant. + pub(super) fn discriminant(&self) -> StateDsct { + match self { + State::Disconnected { .. } => StateDsct::Disconnected, + State::DeterminingNetworkState { .. } => StateDsct::DeterminingNetworkState, + State::AwaitingMorePeersForKeyGeneration { .. } => + StateDsct::AwaitingMorePeersForKeyGeneration, + State::GeneratingKeys{ .. } => StateDsct::GeneratingKeys, + State::Observer { .. } => StateDsct::Observer, + State::Validator { .. } => StateDsct::Validator, + } + } + + /// Returns a new `State::Disconnected`. + pub(super) fn disconnected(/*local_uid: Uid, local_addr: InAddr,*/ /*secret_key: SecretKey*/) + -> State { + State::Disconnected { /*secret_key: secret_key*/ } + } + + // /// Sets the state to `DeterminingNetworkState`. + // // + // // TODO: Add proper error handling: + // fn set_determining_network_state(&mut self) { + // *self = match self { + // State::Disconnected { } => { + // info!("Setting state: `DeterminingNetworkState`."); + // State::DeterminingNetworkState { } + // }, + // _ => panic!("Must be disconnected before calling `::peer_connection_added`."), + // }; + // } + + /// Sets the state to `AwaitingMorePeersForKeyGeneration`. + pub(super) fn set_awaiting_more_peers(&mut self) { + *self = match self { + State::Disconnected { } => { + info!("Setting state: `AwaitingMorePeersForKeyGeneration`."); + State::AwaitingMorePeersForKeyGeneration { + ack_queue: Some(SegQueue::new()), + iom_queue: Some(SegQueue::new()), + } + }, + State::DeterminingNetworkState { ref mut iom_queue, ref mut ack_queue, + ref network_state } => { + assert!(!network_state.is_some(), + "State::set_awaiting_more_peers: Network is active!"); + info!("Setting state: `AwaitingMorePeersForKeyGeneration`."); + State::AwaitingMorePeersForKeyGeneration { + ack_queue: ack_queue.take(), + iom_queue: iom_queue.take(), + } + }, + s @ _ => { + debug!("State::set_awaiting_more_peers: Attempted to set \ + `State::AwaitingMorePeersForKeyGeneration` while {}.", s.discriminant()); + return + } + }; + } + + /// Sets the state to `AwaitingMorePeersForKeyGeneration`. + pub(super) fn set_generating_keys(&mut self, local_uid: &Uid, local_sk: SecretKey, peers: &Peers, + config: &Config) -> Result<(Part, Ack), Error> { + let (part, ack); + *self = match self { + State::AwaitingMorePeersForKeyGeneration { ref mut iom_queue, ref mut ack_queue } => { + // let secret_key = secret_key.clone(); + let threshold = config.keygen_peer_count / 3; + + let mut public_keys: BTreeMap = peers.validators().map(|p| { + p.pub_info().map(|(uid, _, pk)| (*uid, *pk)).unwrap() + }).collect(); + + let pk = local_sk.public_key(); + public_keys.insert(*local_uid, pk); + + let (mut sync_key_gen, opt_part) = SyncKeyGen::new(*local_uid, local_sk, + public_keys.clone(), threshold).map_err(Error::SyncKeyGenNew)?; + part = opt_part.expect("This node is not a validator (somehow)!"); + + info!("KEY GENERATION: Handling our own `Part`..."); + ack = match sync_key_gen.handle_part(&local_uid, part.clone()) { + Some(PartOutcome::Valid(ack)) => ack, + Some(PartOutcome::Invalid(faults)) => panic!("Invalid part \ + (FIXME: handle): {:?}", faults), + None => unimplemented!(), + }; + + // info!("KEY GENERATION: Handling our own `Ack`..."); + // let fault_log = sync_key_gen.handle_ack(local_uid, ack.clone()); + // if !fault_log.is_empty() { + // error!("Errors acknowledging part (from self):\n {:?}", fault_log); + // } + + info!("KEY GENERATION: Queueing our own `Ack`..."); + ack_queue.as_ref().unwrap().push((*local_uid, ack.clone())); + + State::GeneratingKeys { + sync_key_gen: Some(sync_key_gen), + public_key: Some(pk), + public_keys, + ack_queue: ack_queue.take(), + part_count: 1, + ack_count: 0, + iom_queue: iom_queue.take(), + } + }, + _ => panic!("State::set_generating_keys: \ + Must be State::AwaitingMorePeersForKeyGeneration"), + }; + + Ok((part, ack)) + } + + /// Changes the variant (in-place) of this `State` to `Observer`. + // + // TODO: Add proper error handling: + #[must_use] + pub(super) fn set_observer(&mut self, local_uid: Uid, local_sk: SecretKey, + jp: JoinPlan, cfg: &Config, step_queue: &SegQueue) + -> Result, Error> { + let iom_queue_ret; + *self = match self { + State::DeterminingNetworkState { ref mut iom_queue, .. } => { + let (dhb, dhb_step) = DynamicHoneyBadger::builder() + .build_joining(local_uid, local_sk, jp)?; + step_queue.push(dhb_step.convert()); + + let (qhb, qhb_step) = QueueingHoneyBadger::builder(dhb) + .batch_size(cfg.batch_size) + .build(); + step_queue.push(qhb_step); + + iom_queue_ret = iom_queue.take().unwrap(); + + info!(""); + info!("== HONEY BADGER INITIALIZED =="); + info!(""); + + { // TODO: Consolidate or remove: + let pk_set = qhb.dyn_hb().netinfo().public_key_set(); + let pk_map = qhb.dyn_hb().netinfo().public_key_map(); + info!(""); + info!(""); + info!("PUBLIC KEY: {:?}", pk_set.public_key()); + info!("PUBLIC KEY SET: \n{:?}", pk_set); + info!("PUBLIC KEY MAP: \n{:?}", pk_map); + info!(""); + info!(""); + } + + State::Observer { qhb: Some(qhb) } + }, + s @ _ => panic!("State::set_observer: State must be `GeneratingKeys`. \ + State: {}", s.discriminant()), + }; + Ok(iom_queue_ret) + } + + /// Changes the variant (in-place) of this `State` to `Observer`. + // + // TODO: Add proper error handling: + #[must_use] + pub(super) fn set_validator(&mut self, local_uid: Uid, local_sk: SecretKey, peers: &Peers, + cfg: &Config, step_queue: &SegQueue) + -> Result, Error> { + let iom_queue_ret; + *self = match self { + State::GeneratingKeys { ref mut sync_key_gen, mut public_key, + ref mut iom_queue, .. } => { + let mut sync_key_gen = sync_key_gen.take().unwrap(); + assert_eq!(public_key.take().unwrap(), local_sk.public_key()); + + let (pk_set, sk_share_opt) = sync_key_gen.generate() + .map_err(Error::SyncKeyGenGenerate)?; + let sk_share = sk_share_opt.unwrap(); + + assert!(peers.count_validators() >= cfg.keygen_peer_count); + + let mut node_ids: BTreeMap = peers.validators().map(|p| { + (p.uid().cloned().unwrap(), p.public_key().cloned().unwrap()) + }).collect(); + node_ids.insert(local_uid, local_sk.public_key()); + + let netinfo = NetworkInfo::new( + local_uid, + sk_share, + pk_set, + local_sk, + node_ids, + ); + + let dhb = DynamicHoneyBadger::builder() + .build(netinfo); + + let (qhb, qhb_step) = QueueingHoneyBadger::builder(dhb) + .batch_size(cfg.batch_size) + .build(); + step_queue.push(qhb_step); + + info!(""); + info!("== HONEY BADGER INITIALIZED =="); + info!(""); + + { // TODO: Consolidate or remove: + let pk_set = qhb.dyn_hb().netinfo().public_key_set(); + let pk_map = qhb.dyn_hb().netinfo().public_key_map(); + info!(""); + info!(""); + info!("PUBLIC KEY: {:?}", pk_set.public_key()); + info!("PUBLIC KEY SET: \n{:?}", pk_set); + info!("PUBLIC KEY MAP: \n{:?}", pk_map); + info!(""); + info!(""); + } + + + iom_queue_ret = iom_queue.take().unwrap(); + State::Validator { qhb: Some(qhb) } + }, + s @ _ => panic!("State::set_validator: State must be `GeneratingKeys`. State: {}", + s.discriminant()), + }; + Ok(iom_queue_ret) + } + + #[must_use] + pub(super) fn promote_to_validator(&mut self) -> Result<(), Error> { + *self = match self { + State::Observer { ref mut qhb } => { + info!("=== PROMOTING NODE TO VALIDATOR ==="); + State::Validator { qhb: qhb.take() } + }, + s @ _ => panic!("State::promote_to_validator: State must be `Observer`. State: {}", + s.discriminant()), + }; + Ok(()) + } + + /// Sets state to `DeterminingNetworkState` if `Disconnected`, otherwise does + /// nothing. + pub(super) fn update_peer_connection_added(&mut self, _peers: &Peers) { + let _dsct = self.discriminant(); + *self = match self { + State::Disconnected { } => { + info!("Setting state: `DeterminingNetworkState`."); + State::DeterminingNetworkState { + ack_queue: Some(SegQueue::new()), + iom_queue: Some(SegQueue::new()), + network_state: None, + } + }, + _ => return, + }; + } + + /// Sets state to `Disconnected` if peer count is zero, otherwise does nothing. + pub(super) fn update_peer_connection_dropped(&mut self, peers: &Peers) { + *self = match self { + State::DeterminingNetworkState { .. } => { + if peers.count_total() == 0 { + State::Disconnected { } + } else { + return; + } + }, + State::Disconnected { .. } => { + error!("Received peer disconnection when `State::Disconnected`."); + assert_eq!(peers.count_total(), 0); + return; + }, + State::AwaitingMorePeersForKeyGeneration { .. } => { + debug!("Ignoring peer disconnection when \ + `State::AwaitingMorePeersForKeyGeneration`."); + return; + }, + State::GeneratingKeys { .. } => { + panic!("FIXME: RESTART KEY GENERATION PROCESS AFTER PEER DISCONNECTS."); + } + State::Observer { qhb: _, .. } => { + debug!("Ignoring peer disconnection when `State::Observer`."); + return; + }, + State::Validator { qhb: _, .. } => { + debug!("Ignoring peer disconnection when `State::Validator`."); + return; + }, + } + } + + /// Returns the network state, if possible. + pub(super) fn network_state(&self, peers: &Peers) -> NetworkState { + let peer_infos = peers.peers().filter_map(|peer| { + peer.pub_info().map(|(&uid, &in_addr, &pk)| { + NetworkNodeInfo { uid, in_addr, pk } + }) + }).collect::>(); + match self { + State::AwaitingMorePeersForKeyGeneration { .. } => { + NetworkState::AwaitingMorePeersForKeyGeneration(peer_infos) + }, + State::GeneratingKeys{ ref public_keys, .. } => { + NetworkState::GeneratingKeys(peer_infos, public_keys.clone()) + }, + State::Observer { ref qhb } | State::Validator { ref qhb } => { + // FIXME: Ensure that `peer_info` matches `NetworkInfo` from HB. + let pk_set = qhb.as_ref().unwrap().dyn_hb().netinfo().public_key_set().clone(); + let pk_map = qhb.as_ref().unwrap().dyn_hb().netinfo().public_key_map().clone(); + NetworkState::Active((peer_infos, pk_set, pk_map)) + }, + _ => NetworkState::Unknown(peer_infos), + } + } + + /// Returns a reference to the internal HB instance. + pub(super) fn qhb(&self) -> Option<&QueueingHoneyBadger, Uid>> { + match self { + State::Observer { ref qhb, .. } => qhb.as_ref(), + State::Validator { ref qhb, .. } => qhb.as_ref(), + _ => None, + } + } + + /// Returns a reference to the internal HB instance. + pub(super) fn qhb_mut(&mut self) -> Option<&mut QueueingHoneyBadger, Uid>> { + match self { + State::Observer { ref mut qhb, .. } => qhb.as_mut(), + State::Validator { ref mut qhb, .. } => qhb.as_mut(), + _ => None, + } + } + + /// Presents input to HoneyBadger or queues it for later. + /// + /// Cannot be called while disconnected or connection-pending. + pub(super) fn input(&mut self, input: Input) -> Option> { + match self { + State::Observer { ref mut qhb, .. } | State::Validator { ref mut qhb, .. } => { + trace!("State::input: Inputting: {:?}", input); + let step_opt = Some(qhb.as_mut().unwrap().input(input)); + + match step_opt { + Some(ref step) => match step { + Ok(s) => trace!("State::input: QHB output: {:?}", s.output), + Err(err) => error!("State::input: QHB output error: {:?}", err), + }, + None => trace!("State::input: QHB Output is `None`"), + } + + return step_opt; + }, + | State::AwaitingMorePeersForKeyGeneration { ref iom_queue, .. } + | State::GeneratingKeys { ref iom_queue, .. } + | State::DeterminingNetworkState { ref iom_queue, .. } => { + trace!("State::input: Queueing input: {:?}", input); + iom_queue.as_ref().unwrap().push(InputOrMessage::Input(input)); + }, + s @ _ => panic!("State::handle_message: Must be connected in order to input to \ + honey badger. State: {}", s.discriminant()), + } + None + } + + /// Presents a message to HoneyBadger or queues it for later. + /// + /// Cannot be called while disconnected or connection-pending. + pub(super) fn handle_message(&mut self, src_uid: &Uid, msg: Message) + -> Option> { + match self { + | State::Observer { ref mut qhb, .. } + | State::Validator { ref mut qhb, .. } => { + trace!("State::handle_message: Handling message: {:?}", msg); + let step_opt = Some(qhb.as_mut().unwrap().handle_message(src_uid, msg)); + + match step_opt { + Some(ref step) => match step { + Ok(s) => trace!("State::handle_message: QHB output: {:?}", s.output), + Err(err) => error!("State::handle_message: QHB output error: {:?}", err), + }, + None => trace!("State::handle_message: QHB Output is `None`"), + } + + return step_opt; + }, + | State::AwaitingMorePeersForKeyGeneration { ref iom_queue, .. } + | State::GeneratingKeys { ref iom_queue, .. } + | State::DeterminingNetworkState { ref iom_queue, .. } => { + trace!("State::handle_message: Queueing message: {:?}", msg); + iom_queue.as_ref().unwrap().push(InputOrMessage::Message(*src_uid, msg)); + }, + // State::GeneratingKeys { ref iom_queue, .. } => { + // iom_queue.as_ref().unwrap().push(InputOrMessage::Message(msg)); + // }, + s @ _ => panic!("State::handle_message: Must be connected in order to input to \ + honey badger. State: {}", s.discriminant()), + } + None + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..20d457d --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,518 @@ +#![cfg_attr(feature = "nightly", feature(alloc_system))] + +#[cfg(feature = "nightly")] +extern crate alloc_system; +extern crate clap; +extern crate env_logger; +#[macro_use] +extern crate log; +#[macro_use] +extern crate failure; +extern crate crossbeam; +// #[macro_use] extern crate crossbeam_channel; +extern crate crypto; +extern crate chrono; +extern crate num_traits; +extern crate num_bigint; +#[macro_use] +extern crate futures; +extern crate tokio; +extern crate tokio_codec; +extern crate tokio_io; +extern crate rand; +extern crate bytes; +extern crate uuid; +extern crate byteorder; +#[macro_use] +extern crate serde_derive; +extern crate serde; +extern crate serde_bytes; +extern crate bincode; +extern crate tokio_serde_bincode; +extern crate parking_lot; +extern crate clear_on_drop; +extern crate hbbft; + + +#[cfg(feature = "nightly")] +use alloc_system::System; + +#[cfg(feature = "nightly")] +#[global_allocator] +static A: System = System; + +// pub mod network; +pub mod hydrabadger; +pub mod blockchain; +pub mod peer; + +use std::{ + collections::BTreeMap, + fmt::{self}, + net::{SocketAddr}, + ops::Deref, +}; +use futures::{ + StartSend, AsyncSink, + sync::mpsc, +}; +use tokio::{ + io, + net::{TcpStream}, + prelude::*, +}; +use tokio_io::codec::length_delimited::Framed; +use bytes::{BytesMut, Bytes}; +use rand::{Rng, Rand}; +use uuid::Uuid; +// use bincode::{serialize, deserialize}; +use hbbft::{ + crypto::{PublicKey, PublicKeySet}, + sync_key_gen::{Part, Ack}, + messaging::Step as MessagingStep, + dynamic_honey_badger::{Message as DhbMessage, JoinPlan}, + queueing_honey_badger::{QueueingHoneyBadger, Input as QhbInput}, +}; + +pub use hydrabadger::{Hydrabadger, Config}; +pub use blockchain::{Blockchain, MiningError}; + +// FIME: TEMPORARY -- Create another error type. +pub use hydrabadger::{Error}; + + +/// Transmit half of the wire message channel. +// TODO: Use a bounded tx/rx (find a sensible upper bound): +type WireTx = mpsc::UnboundedSender; + +/// Receive half of the wire message channel. +// TODO: Use a bounded tx/rx (find a sensible upper bound): +type WireRx = mpsc::UnboundedReceiver; + +/// Transmit half of the internal message channel. +// TODO: Use a bounded tx/rx (find a sensible upper bound): +type InternalTx = mpsc::UnboundedSender; + +/// Receive half of the internal message channel. +// TODO: Use a bounded tx/rx (find a sensible upper bound): +type InternalRx = mpsc::UnboundedReceiver; + + +/// A transaction. +#[derive(Serialize, Deserialize, Eq, PartialEq, Hash, Ord, PartialOrd, Debug, Clone)] +pub struct Transaction(pub Vec); + +impl Transaction { + fn random(len: usize) -> Transaction { + Transaction(rand::thread_rng().gen_iter().take(len).collect()) + } +} + + +/// A unique identifier. +#[derive(Clone, Copy, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize, Deserialize)] +pub struct Uid(pub(crate) Uuid); + +impl Uid { + /// Returns a new, random `Uid`. + pub fn new() -> Uid { + Uid(Uuid::new_v4()) + } +} + +impl Rand for Uid { + fn rand(_rng: &mut R) -> Uid { + Uid::new() + } +} + +impl fmt::Display for Uid { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::LowerHex::fmt(&self.0, f) + } +} + +impl fmt::Debug for Uid { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::LowerHex::fmt(&self.0, f) + } +} + +type Message = DhbMessage; +type Step = MessagingStep, Uid>>; +type Input = QhbInput, Uid>; + +/// A peer's incoming (listening) address. +#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub struct InAddr(pub SocketAddr); + +impl Deref for InAddr { + type Target = SocketAddr; + fn deref(&self) -> &SocketAddr { + &self.0 + } +} + +impl fmt::Display for InAddr { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "InAddr({})", self.0) + } +} + + +/// An internal address used to respond to a connected peer. +#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub struct OutAddr(pub SocketAddr); + +impl Deref for OutAddr { + type Target = SocketAddr; + fn deref(&self) -> &SocketAddr { + &self.0 + } +} + +impl fmt::Display for OutAddr { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "OutAddr({})", self.0) + } +} + + +/// Nodes of the network. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct NetworkNodeInfo { + pub(crate) uid: Uid, + pub(crate) in_addr: InAddr, + pub(crate) pk: PublicKey, +} + + +/// The current state of the network. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum NetworkState { + None, + Unknown(Vec), + AwaitingMorePeersForKeyGeneration(Vec), + GeneratingKeys(Vec, BTreeMap), + Active((Vec, PublicKeySet, BTreeMap)), +} + + +/// Messages sent over the network between nodes. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum WireMessageKind { + HelloFromValidator(Uid, InAddr, PublicKey, NetworkState), + HelloRequestChangeAdd(Uid, InAddr, PublicKey), + WelcomeReceivedChangeAdd(Uid, PublicKey, NetworkState), + RequestNetworkState, + NetworkState(NetworkState), + Goodbye, + #[serde(with = "serde_bytes")] + Bytes(Bytes), + Message(Uid, Message), + Transactions(Uid, Vec), + KeyGenPart(Part), + KeyGenAck(Ack), + JoinPlan(JoinPlan) + // TargetedMessage(TargetedMessage), +} + + +/// Messages sent over the network between nodes. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct WireMessage { + kind: WireMessageKind, +} + +impl WireMessage { + pub fn hello_from_validator(src_uid: Uid, in_addr: InAddr, pk: PublicKey, + net_state: NetworkState) -> WireMessage { + WireMessageKind::HelloFromValidator(src_uid, in_addr, pk, net_state).into() + } + + /// Returns a `HelloRequestChangeAdd` variant. + pub fn hello_request_change_add(src_uid: Uid, in_addr: InAddr, pk: PublicKey) -> WireMessage { + WireMessage { kind: WireMessageKind::HelloRequestChangeAdd(src_uid, in_addr, pk), } + } + + /// Returns a `WelcomeReceivedChangeAdd` variant. + pub fn welcome_received_change_add(src_uid: Uid, pk: PublicKey, net_state: NetworkState) + -> WireMessage { + WireMessage { kind: WireMessageKind::WelcomeReceivedChangeAdd(src_uid, pk, net_state) } + } + + /// Returns an `Input` variant. + pub fn transaction(src_uid: Uid, txns: Vec) -> WireMessage { + WireMessage { kind: WireMessageKind::Transactions(src_uid, txns), } + } + + /// Returns a `Message` variant. + pub fn message(src_uid: Uid, msg: Message) -> WireMessage { + WireMessage { kind: WireMessageKind::Message(src_uid, msg), } + } + + pub fn key_gen_part(part: Part) -> WireMessage { + WireMessage { kind: WireMessageKind::KeyGenPart(part) } + } + + pub fn key_gen_part_ack(outcome: Ack) -> WireMessage { + WireMessageKind::KeyGenAck(outcome).into() + } + + pub fn join_plan(jp: JoinPlan) -> WireMessage { + WireMessageKind::JoinPlan(jp).into() + } + + /// Returns the wire message kind. + pub fn kind(&self) -> &WireMessageKind { + &self.kind + } + + /// Consumes this `WireMessage` into its kind. + pub fn into_kind(self) -> WireMessageKind { + self.kind + } +} + +impl From for WireMessage { + fn from(kind: WireMessageKind) -> WireMessage { + WireMessage { kind } + } +} + + + +/// A stream/sink of `WireMessage`s connected to a socket. +#[derive(Debug)] +pub struct WireMessages { + framed: Framed, +} + +impl WireMessages { + pub fn new(socket: TcpStream) -> WireMessages { + WireMessages { + framed: Framed::new(socket), + } + } + + pub fn socket(&self) -> &TcpStream { + self.framed.get_ref() + } + + pub fn send_msg(&mut self, msg: WireMessage) -> Result<(), Error> { + self.start_send(msg)?; + let _ = self.poll_complete()?; + Ok(()) + } +} + +impl Stream for WireMessages { + type Item = WireMessage; + type Error = Error; + + fn poll(&mut self) -> Poll, Self::Error> { + match try_ready!(self.framed.poll()) { + Some(frame) => { + Ok(Async::Ready(Some( + // deserialize_from(frame.reader()).map_err(Error::Serde)? + bincode::deserialize(&frame.freeze()).map_err(Error::Serde)? + ))) + } + None => Ok(Async::Ready(None)) + } + } +} + +impl Sink for WireMessages { + type SinkItem = WireMessage; + type SinkError = Error; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend { + // TODO: Reuse buffer: + let mut serialized = BytesMut::new(); + + // Downgraded from bincode 1.0: + // + // Original: `bincode::serialize(&item)` + // + match bincode::serialize(&item, bincode::Bounded(1 << 20)) { + Ok(s) => serialized.extend_from_slice(&s), + Err(err) => return Err(Error::Io(io::Error::new(io::ErrorKind::Other, err))), + } + match self.framed.start_send(serialized) { + Ok(async_sink) => match async_sink { + AsyncSink::Ready => Ok(AsyncSink::Ready), + AsyncSink::NotReady(_) => Ok(AsyncSink::NotReady(item)), + }, + Err(err) => Err(Error::Io(err)) + } + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + self.framed.poll_complete().map_err(Error::from) + } + + fn close(&mut self) -> Poll<(), Self::SinkError> { + self.framed.close().map_err(Error::from) + } +} + + + +/// A message between internal threads/tasks. +#[derive(Clone, Debug)] +pub enum InternalMessageKind { + Wire(WireMessage), + HbMessage(Message), + HbInput(Input), + PeerDisconnect, + NewIncomingConnection(InAddr, PublicKey, bool), + NewOutgoingConnection, +} + + +/// A message between internal threads/tasks. +#[derive(Clone, Debug)] +pub struct InternalMessage { + src_uid: Option, + src_addr: OutAddr, + kind: InternalMessageKind, +} + +impl InternalMessage { + pub fn new(src_uid: Option, src_addr: OutAddr, kind: InternalMessageKind) -> InternalMessage { + InternalMessage { src_uid: src_uid, src_addr, kind } + } + + /// Returns a new `InternalMessage` without a uid. + pub fn new_without_uid(src_addr: OutAddr, kind: InternalMessageKind) -> InternalMessage { + InternalMessage::new(None, src_addr, kind) + } + + pub fn wire(src_uid: Option, src_addr: OutAddr, wire_message: WireMessage) -> InternalMessage { + InternalMessage::new(src_uid, src_addr, InternalMessageKind::Wire(wire_message)) + } + + pub fn hb_message(src_uid: Uid, src_addr: OutAddr, msg: Message) -> InternalMessage { + InternalMessage::new(Some(src_uid), src_addr, InternalMessageKind::HbMessage(msg)) + } + + pub fn hb_input(src_uid: Uid, src_addr: OutAddr, input: Input) -> InternalMessage { + InternalMessage::new(Some(src_uid), src_addr, InternalMessageKind::HbInput(input)) + } + + pub fn peer_disconnect(src_uid: Uid, src_addr: OutAddr) -> InternalMessage { + InternalMessage::new(Some(src_uid), src_addr, InternalMessageKind::PeerDisconnect) + } + + pub fn new_incoming_connection(src_uid: Uid, src_addr: OutAddr, src_in_addr: InAddr, + src_pk: PublicKey, request_change_add: bool) -> InternalMessage { + InternalMessage::new(Some(src_uid), src_addr, + InternalMessageKind::NewIncomingConnection(src_in_addr, src_pk, request_change_add)) + } + + pub fn new_outgoing_connection(src_addr: OutAddr) -> InternalMessage { + InternalMessage::new_without_uid(src_addr, InternalMessageKind::NewOutgoingConnection) + } + + /// Returns the source unique identifier this message was received in. + pub fn src_uid(&self) -> Option<&Uid> { + self.src_uid.as_ref() + } + + /// Returns the source socket this message was received on. + pub fn src_addr(&self) -> &OutAddr { + &self.src_addr + } + + /// Returns the internal message kind. + pub fn kind(&self) -> &InternalMessageKind { + &self.kind + } + + /// Consumes this `InternalMessage` into its parts. + pub fn into_parts(self) -> (Option, OutAddr, InternalMessageKind) { + (self.src_uid, self.src_addr, self.kind) + } +} + +use std::collections::HashSet; +use std::net::Ipv4Addr; +use std::net::IpAddr; + +#[no_mangle] +pub extern fn rust_main1() { + let bind_address: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 3000); + + let mut remote_addresses: HashSet = HashSet::new(); + remote_addresses.insert(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 3001)); + remote_addresses.insert(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 3002)); + + let cfg = Config::default(); + + let hb = Hydrabadger::new(bind_address, cfg); + hb.run_node(Some(remote_addresses)); +} + +#[no_mangle] +pub extern fn rust_main2() { + let bind_address: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 3001); + + let mut remote_addresses: HashSet = HashSet::new(); + remote_addresses.insert(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 3000)); + remote_addresses.insert(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 3002)); + + let cfg = Config::default(); + + let hb = Hydrabadger::new(bind_address, cfg); + hb.run_node(Some(remote_addresses)); +} + +#[no_mangle] +pub extern fn rust_main3() { + let bind_address: SocketAddr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 3002); + + let mut remote_addresses: HashSet = HashSet::new(); + remote_addresses.insert(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 3001)); + remote_addresses.insert(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 3000)); + + let cfg = Config::default(); + + let hb = Hydrabadger::new(bind_address, cfg); + hb.run_node(Some(remote_addresses)); +} + +/// Expose the JNI interface for android below +#[cfg(target_os="android")] +#[allow(non_snake_case)] +pub mod android { + extern crate jni; + + use super::*; + use self::jni::JNIEnv; + use self::jni::objects::{JClass}; + use self::jni::sys::{jboolean}; + + #[no_mangle] + pub unsafe extern fn Java_ru_hintsolution_hbbft_hbbft_MainActivity_startNode1(_env: JNIEnv, _: JClass) -> jboolean { + // Our Java companion code might pass-in "world" as a string, hence the name. + rust_main1(); + 1 + } + + #[no_mangle] + pub unsafe extern fn Java_ru_hintsolution_hbbft_hbbft_MainActivity_startNode2(_env: JNIEnv, _: JClass) -> jboolean { + // Our Java companion code might pass-in "world" as a string, hence the name. + rust_main2(); + 1 + } + + #[no_mangle] + pub unsafe extern fn Java_ru_hintsolution_hbbft_hbbft_MainActivity_startNode3(_env: JNIEnv, _: JClass) -> jboolean { + // Our Java companion code might pass-in "world" as a string, hence the name. + rust_main3(); + 1 + } +} + + diff --git a/src/peer.rs b/src/peer.rs new file mode 100644 index 0000000..3c1aa04 --- /dev/null +++ b/src/peer.rs @@ -0,0 +1,531 @@ +//! A peer network node. + +#![allow(unused_imports, dead_code, unused_variables, unused_mut)] + +use std::{ + collections::{ + hash_map::{Iter as HashMapIter, Values as HashMapValues}, + HashMap, + }, + borrow::Borrow, +}; +use futures::sync::mpsc; +use tokio::prelude::*; +use hbbft::crypto::PublicKey; +use hbbft::queueing_honey_badger::{Input as HbInput}; +use ::{InternalMessage, WireMessage, WireMessageKind, WireMessages, WireTx, WireRx, + OutAddr, InAddr, Uid}; +use hydrabadger::{Hydrabadger, Error,}; + + +/// The state for each connected client. +pub struct PeerHandler { + // Peer uid. + uid: Option, + + // The incoming stream of messages: + wire_msgs: WireMessages, + + /// Handle to the shared message state. + hdb: Hydrabadger, + + // TODO: Consider adding back a separate clone of `peer_internal_tx`. Is + // there any difference if capacity isn't an issue? -- doubtful + + /// Receive half of the message channel. + rx: WireRx, + + /// Peer socket address. + out_addr: OutAddr, +} + +impl PeerHandler { + /// Create a new instance of `Peer`. + pub fn new(pub_info: Option<(Uid, InAddr, PublicKey)>, + hdb: Hydrabadger, wire_msgs: WireMessages) -> PeerHandler { + // Get the client socket address + let out_addr = OutAddr(wire_msgs.socket().peer_addr().unwrap()); + + // Create a channel for this peer + let (tx, rx) = mpsc::unbounded(); + + let uid = pub_info.as_ref().map(|(uid, _, _)| uid.clone()); + + // Add an entry for this `Peer` in the shared state map. + hdb.peers_mut().add(out_addr, tx, pub_info); + + PeerHandler { + uid, + wire_msgs, + hdb, + rx, + out_addr, + } + } + + pub(crate) fn hdb(&self) -> &Hydrabadger { + &self.hdb + } + + pub(crate) fn out_addr(&self) -> &OutAddr { + &self.out_addr + } +} + +/// A future representing the client connection. +impl Future for PeerHandler { + type Item = (); + type Error = Error; + + fn poll(&mut self) -> Poll<(), Error> { + const MESSAGES_PER_TICK: usize = 10; + + // Receive all messages from peers. + for i in 0..MESSAGES_PER_TICK { + // Polling an `UnboundedReceiver` cannot fail, so `unwrap` here is + // safe. + match self.rx.poll().unwrap() { + Async::Ready(Some(v)) => { + // Buffer the message. Once all messages are buffered, they will + // be flushed to the socket (right below). + self.wire_msgs.start_send(v)?; + + // Exceeded max messages per tick, schedule notification: + if i + 1 == MESSAGES_PER_TICK { + task::current().notify(); + } + } + _ => break, + } + } + + // Flush the write buffer to the socket + let _ = self.wire_msgs.poll_complete()?; + + // Read new messages from the socket + while let Async::Ready(message) = self.wire_msgs.poll()? { + trace!("Received message: {:?}", message); + + if let Some(msg) = message { + match msg.into_kind() { + WireMessageKind::HelloRequestChangeAdd(src_uid, _in_addr, _pub_key) => { + error!("Duplicate `WireMessage::HelloRequestChangeAdd` \ + received from '{}'", src_uid); + }, + WireMessageKind::WelcomeReceivedChangeAdd(src_uid, pk, net_state) => { + self.uid = Some(src_uid); + self.hdb.send_internal( + InternalMessage::wire(Some(src_uid), self.out_addr, + WireMessage::welcome_received_change_add(src_uid, pk, net_state) + ) + ); + }, + WireMessageKind::Message(src_uid, msg) => { + // let uid = self.uid.clone() + // .expect("`WireMessageKind::Message` received before \ + // establishing peer"); + + if let Some(peer_uid) = self.uid.as_ref() { + debug_assert_eq!(src_uid, *peer_uid); + } + + self.hdb.send_internal( + InternalMessage::hb_message(src_uid, self.out_addr, msg) + ) + }, + WireMessageKind::Transactions(src_uid, txns) => { + if let Some(peer_uid) = self.uid.as_ref() { + debug_assert_eq!(src_uid, *peer_uid); + } + + self.hdb.send_internal( + InternalMessage::hb_input(src_uid, self.out_addr, HbInput::User(txns)) + ) + }, + kind @ _ => { + self.hdb.send_internal(InternalMessage::wire(self.uid.clone(), + self.out_addr, kind.into())) + } + } + } else { + // EOF was reached. The remote client has disconnected. There is + // nothing more to do. + info!("Peer ({}: '{}') disconnected.", self.out_addr, self.uid.clone().unwrap()); + return Ok(Async::Ready(())); + } + } + + // As always, it is important to not just return `NotReady` without + // ensuring an inner future also returned `NotReady`. + // + // We know we got a `NotReady` from either `self.rx` or `self.wire_msgs`, so + // the contract is respected. + Ok(Async::NotReady) + } +} + +impl Drop for PeerHandler { + fn drop(&mut self) { + debug!("Removing peer ({}: '{}') from the list of peers.", + self.out_addr, self.uid.clone().unwrap()); + // Remove peer transmitter from the lists: + self.hdb.peers_mut().remove(&self.out_addr); + + if let Some(uid) = self.uid.clone() { + debug!("Sending peer ({}: '{}') disconnect internal message.", + self.out_addr, self.uid.clone().unwrap()); + + self.hdb.send_internal(InternalMessage::peer_disconnect( + uid, self.out_addr)); + } + } +} + + +#[derive(Clone, Debug)] +#[allow(dead_code)] +enum State { + Handshaking, + PendingJoinInfo { + uid: Uid, + in_addr: InAddr, + pk: PublicKey, + }, + EstablishedObserver { + uid: Uid, + in_addr: InAddr, + pk: PublicKey, + }, + EstablishedValidator { + uid: Uid, + in_addr: InAddr, + pk: PublicKey, + }, +} + + +/// Nodes of the network. +#[derive(Clone, Debug)] +pub struct Peer { + out_addr: OutAddr, + tx: WireTx, + state: State, +} + +impl Peer { + /// Returns a new `Peer` + fn new(out_addr: OutAddr, tx: WireTx, + // uid: Option, in_addr: Option, pk: Option + pub_info: Option<(Uid, InAddr, PublicKey)>, + ) -> Peer { + // assert!(uid.is_some() == in_addr.is_some() && uid.is_some() == pk.is_some()); + let state = match pub_info { + None => State::Handshaking, + Some((uid, in_addr, pk)) => State::EstablishedValidator { uid, in_addr, pk }, + }; + + Peer { + out_addr, + tx, + state, + } + } + + /// Sets a peer state to `State::PendingJoinInfo` and stores public info. + fn set_pending(&mut self, pub_info: (Uid, InAddr, PublicKey)) { + self.state = match self.state { + State::Handshaking => { + State::PendingJoinInfo { + uid: pub_info.0, + in_addr: pub_info.1, + pk: pub_info.2 + } + }, + _ => panic!("Peer::set_pending: Can only set pending when \ + peer state is `Handshaking`."), + }; + } + + /// Sets a peer state to `State::EstablishedObserver` and stores public info. + fn establish_observer(&mut self) { + self.state = match self.state { + State::PendingJoinInfo { uid, in_addr, pk } => { + State::EstablishedObserver { + uid, + in_addr, + pk, + } + }, + _ => panic!("Peer::establish_observer: Can only establish observer when \ + peer state is`PendingJoinInfo`."), + }; + } + + /// Sets a peer state to `State::EstablishedValidator` and stores public info. + fn establish_validator(&mut self, pub_info: Option<(Uid, InAddr, PublicKey)>) { + self.state = match self.state { + State::Handshaking => match pub_info { + Some(pi) => { + State::EstablishedValidator { + uid: pi.0, + in_addr: pi.1, + pk: pi.2 + } + }, + None => { + panic!("Peer::establish_validator: `pub_info` must be supplied \ + when establishing a validator from `Handshaking`."); + }, + }, + State::EstablishedObserver { uid, in_addr, pk } => { + if let Some(_) = pub_info { + panic!("Peer::establish_validator: `pub_info` must be `None` \ + when upgrading an observer node."); + } + State::EstablishedValidator { + uid, + in_addr, + pk, + } + }, + _ => panic!("Peer::establish_validator: Can only establish validator when \ + peer state is`Handshaking` or `EstablishedObserver`."), + }; + } + + /// Returns the peer's unique identifier. + pub fn uid(&self) -> Option<&Uid> { + match self.state { + State::Handshaking => None, + State::PendingJoinInfo { ref uid, .. } => Some(uid), + State::EstablishedObserver { ref uid, .. } => Some(uid), + State::EstablishedValidator { ref uid, .. } => Some(uid), + } + } + + /// Returns the peer's unique identifier. + pub fn out_addr(&self) -> &OutAddr { + &self.out_addr + } + + /// Returns the peer's public key. + pub fn public_key(&self) -> Option<&PublicKey> { + match self.state { + State::Handshaking => None, + State::PendingJoinInfo { ref pk, .. } => Some(pk), + State::EstablishedObserver { ref pk, .. } => Some(pk), + State::EstablishedValidator { ref pk, .. } => Some(pk), + } + } + + /// Returns the peer's incoming (listening) socket address. + pub fn in_addr(&self) -> Option<&InAddr> { + match self.state { + State::Handshaking => None, + State::PendingJoinInfo { ref in_addr, .. } => Some(in_addr), + State::EstablishedObserver { ref in_addr, .. } => Some(in_addr), + State::EstablishedValidator { ref in_addr, .. } => Some(in_addr), + } + } + + /// Returns the peer's public info if established. + pub fn pub_info(&self) -> Option<(&Uid, &InAddr, &PublicKey)> { + match self.state { + State::Handshaking => None, + State::EstablishedObserver { ref uid, ref in_addr, ref pk } => Some((uid, in_addr, pk)), + State::PendingJoinInfo { ref uid, ref in_addr, ref pk } => Some((uid, in_addr, pk)), + State::EstablishedValidator { ref uid, ref in_addr, ref pk } => Some((uid, in_addr, pk)), + } + } + + /// Returns true if this peer is pending. + pub fn is_pending(&self) -> bool { + match self.state { + State::PendingJoinInfo { .. } => true, + _ => false, + } + } + + /// Returns true if this peer is an established observer. + pub fn is_observer(&self) -> bool { + match self.state { + State::EstablishedObserver { .. } => true, + _ => false, + } + } + + /// Returns true if this peer is an established validator. + pub fn is_validator(&self) -> bool { + match self.state { + State::EstablishedValidator { .. } => true, + _ => false, + } + } + + /// Returns the peer's wire transmitter. + pub fn tx(&self) -> &WireTx { + &self.tx + } +} + + +/// Peer nodes of the network. +// +// TODO: Keep a separate `HashSet` of validator `OutAddrs` to avoid having to +// iterate through entire list. +#[derive(Debug)] +pub(crate) struct Peers { + peers: HashMap, + out_addrs: HashMap, +} + +impl Peers { + /// Returns a new empty list of peers. + pub(crate) fn new() -> Peers { + Peers { + peers: HashMap::with_capacity(64), + out_addrs: HashMap::with_capacity(64), + } + } + + /// Adds a peer to the list. + pub(crate) fn add(&mut self, out_addr: OutAddr, tx: WireTx, + // uid: Option, in_addr: Option, pk: Option + pub_info: Option<(Uid, InAddr, PublicKey)>, + ) { + let peer = Peer::new(out_addr, tx, pub_info); + if let State::EstablishedValidator { uid, .. } = peer.state { + self.out_addrs.insert(uid, peer.out_addr); + } + self.peers.insert(peer.out_addr, peer); + } + + /// Attempts to set peer as pending-join-info, storing `pub_info`. + /// + /// Returns `true` if the peer was already pending. + /// + /// ### Panics + /// + /// Peer state must be `Handshaking`. + /// + /// TODO: Error handling... + pub(crate) fn set_pending>(&mut self, out_addr: O, + pub_info: (Uid, InAddr, PublicKey)) -> bool { + let peer = self.peers.get_mut(out_addr.borrow()) + .expect(&format!("Peers::set_pending: \ + No peer found with outgoing address: {}", out_addr.borrow())); + match self.out_addrs.insert(pub_info.0, *out_addr.borrow()) { + Some(_out_addr_pub) => { + let pi_pub = peer.pub_info() + .expect("Peers::set_pending: internal consistency error"); + assert!(pub_info.0 == *pi_pub.0 && pub_info.1 == *pi_pub.1 && pub_info.2 == *pi_pub.2); + assert!(peer.is_validator()); + return true; + }, + None => peer.set_pending(pub_info), + } + + // false + panic!("Peer::set_pending: Do not use yet."); + } + + /// Attempts to establish a peer as an observer. + /// + /// ### Panics + /// + /// Peer state must be `Handshaking`. + /// + /// TODO: Error handling... + pub(crate) fn establish_observer>(&mut self, out_addr: O) { + let peer = self.peers.get_mut(out_addr.borrow()) + .expect(&format!("Peers::establish_observer: \ + No peer found with outgoing address: {}", out_addr.borrow())); + + // peer.establish_observer() + panic!("Peer::set_pending: Do not use yet."); + } + + /// Attempts to establish a peer as a validator, storing `pub_info`. + /// + /// Returns `true` if the peer was already an established validator. + /// + /// ### Panics + /// + /// Peer state must be `Handshaking` or `EstablishedObserver`. + /// + /// TODO: Error handling... + pub(crate) fn establish_validator>(&mut self, out_addr: O, + pub_info: (Uid, InAddr, PublicKey)) -> bool { + let peer = self.peers.get_mut(out_addr.borrow()) + .expect(&format!("Peers::establish_validator: \ + No peer found with outgoing address: {}", out_addr.borrow())); + match self.out_addrs.insert(pub_info.0, *out_addr.borrow()) { + Some(_out_addr_pub) => { + let pi_pub = peer.pub_info() + .expect("Peers::establish_validator: internal consistency error"); + assert!(pub_info.0 == *pi_pub.0 && pub_info.1 == *pi_pub.1 && pub_info.2 == *pi_pub.2); + assert!(peer.is_validator()); + return true; + }, + None => peer.establish_validator(Some(pub_info)), + } + false + } + + /// Removes a peer the list if it exists. + pub(crate) fn remove>(&mut self, out_addr: O) { + let peer = self.peers.remove(out_addr.borrow()); + if let Some(p) = peer { + if let Some(uid) = p.uid() { + self.out_addrs.remove(&uid); + } + } + } + + pub(crate) fn get>(&self, out_addr: O) -> Option<&Peer> { + self.peers.get(out_addr.borrow()) + } + + pub(crate) fn get_by_uid>(&self, uid: U) -> Option<&Peer> { + // self.peers.get() + self.out_addrs.get(uid.borrow()).and_then(|addr| self.get(addr)) + } + + /// Returns an Iterator over the list of peers. + pub(crate) fn iter(&self) -> HashMapIter { + self.peers.iter() + } + + /// Returns an Iterator over the list of peers. + pub(crate) fn peers(&self) -> HashMapValues { + self.peers.values() + } + + /// Returns an iterator over the list of validators. + pub(crate) fn validators(&self) -> impl Iterator { + self.peers.values().filter(|p| p.is_validator()) + } + + /// Returns the current number of connected peers. + pub(crate) fn count_total(&self) -> usize { + self.peers.len() + } + + /// Returns the current number of connected and established validators. + /// + /// This is semi-expensive (O(n)). + pub(crate) fn count_validators(&self) -> usize { + self.validators().count() + } + + pub(crate) fn contains_in_addr>(&self, in_addr: I) -> bool { + for peer in self.peers.values() { + if let Some(peer_in_addr) = peer.in_addr() { + if peer_in_addr == in_addr.borrow() { + return true; + } + } + } + false + } +}