update and fix the merchant script (fixes issue #254)

This commit is contained in:
ThomasV 2013-10-06 21:16:09 +02:00
parent 33db7a0c43
commit ea0f270fd9
4 changed files with 116 additions and 120 deletions

View File

@ -1426,7 +1426,7 @@ class Wallet:
self.verifier = TxVerifier(self.network, self.storage)
self.verifier.start()
self.set_verifier(self.verifier)
self.synchronizer = WalletSynchronizer(self)
self.synchronizer = WalletSynchronizer(self, network)
self.synchronizer.start()
def stop_threads(self):
@ -1476,13 +1476,11 @@ class Wallet:
class WalletSynchronizer(threading.Thread):
def __init__(self, wallet):
def __init__(self, wallet, network):
threading.Thread.__init__(self)
self.daemon = True
self.wallet = wallet
wallet.synchronizer = self
self.network = self.wallet.network
#self.wallet.network.register_callback('connected', lambda: self.wallet.set_up_to_date(False))
self.network = network
self.was_updated = True
self.running = False
self.lock = threading.Lock()

View File

@ -3,15 +3,13 @@ host = hostname of the machine where you run this program
port = choose a port number
password = choose a password
[db]
instance = the hostname of your sql server
name = the database name
user = your database username
password = your database password
[sqlite3]
database = database filename
[electrum]
server = the electrum server you will use
mpk = the master public key of your wallet (in hexadecimal)
chain = second part of the mastrer public key (hexadecimal)
wallet_path = path where the script will save the wallet
[callback]
received = URL where we POST json data when payment has been received

View File

@ -19,75 +19,64 @@
import time, thread, sys, socket, os
import urllib2,json
import MySQLdb as mdb
import Queue
from electrum import Wallet, Interface, WalletVerifier, SimpleConfig, WalletSynchronizer
import sqlite3
from electrum import Wallet, WalletStorage, SimpleConfig, Network, set_verbosity
set_verbosity(False)
import ConfigParser
config = ConfigParser.ConfigParser()
config.read("merchant.conf")
db_instance = config.get('db','instance')
db_user = config.get('db','user')
db_password = config.get('db','password')
db_name = config.get('db','name')
electrum_server = config.get('electrum','server')
my_password = config.get('main','password')
my_host = config.get('main','host')
my_port = config.getint('main','port')
cb_received = config.get('callback','received')
cb_expired = config.get('callback','expired')
database = config.get('sqlite3','database')
received_url = config.get('callback','received')
expired_url = config.get('callback','expired')
cb_password = config.get('callback','password')
wallet_config = SimpleConfig()
wallet_path = config.get('electrum','wallet_path')
master_public_key = config.get('electrum','mpk')
wallet_config.set_key('master_public_key',master_public_key)
wallet = Wallet(wallet_config)
wallet.synchronize = lambda: None # prevent address creation by the wallet
master_chain = config.get('electrum','chain')
omg_addresses = {}
pending_requests = {}
def input_reader_thread(request_queue):
while True:
addr, amount, confirmations = request_queue.get(True,1000000000)
if addr in omg_addresses:
continue
else:
print "subscribing to ", addr
omg_addresses[addr] = {'requested':float(amount), 'confirmations':int(confirmations)}
num = 0
if addr not in wallet.addresses:
with wallet.lock:
print "adding %s to wallet"%addr
wallet.addresses.append(addr)
wallet.history[addr] = []
synchronizer.subscribe_to_addresses([addr])
wallet.up_to_date = False
def check_create_table(conn):
global num
c = conn.cursor()
c.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='electrum_payments';")
data = c.fetchall()
if not data:
c.execute("""CREATE TABLE electrum_payments (address VARCHAR(40), amount FLOAT, confirmations INT(8), received_at TIMESTAMP, expires_at TIMESTAMP, paid INT(1), processed INT(1));""")
conn.commit()
c.execute("SELECT Count(address) FROM 'electrum_payments'")
num = c.fetchone()[0]
print "num rows", num
# this process detects when addresses have received payments
def on_wallet_update():
print "updated_callback"
for addr in omg_addresses:
h = wallet.history.get(addr)
requested_amount = omg_addresses[addr].get('requested')
requested_confs = omg_addresses[addr].get('confirmations')
for addr, v in pending_requests.items():
h = wallet.history.get(addr, [])
requested_amount = v.get('requested')
requested_confs = v.get('confirmations')
value = 0
for tx_hash, tx_height in h:
tx = wallet.transactions.get(tx_hash)
if not tx: continue
if verifier.get_confirmations(tx_hash) < requested_confs: continue
for o in tx.get('outputs'):
if o.get('address') == addr:
value += o.get('value')
if wallet.verifier.get_confirmations(tx_hash) < requested_confs: continue
for o in tx.outputs:
o_address, o_value = o
if o_address == addr:
value += o_value
s = (value)/1.e8
print "balance for %s:"%addr, s, requested_amount
@ -98,50 +87,54 @@ def on_wallet_update():
stopping = False
def do_stop():
def do_stop(password):
global stopping
if password != my_password:
return "wrong password"
stopping = True
return "ok"
def do_create(conn):
# creation
cur = conn.cursor()
cur.execute("CREATE TABLE electrum_payments (id INT PRIMARY KEY, address VARCHAR(40), amount FLOAT, confirmations INT(8), received_at TIMESTAMP, expires_at TIMESTAMP, paid INT(1), processed INT(1));")
conn.commit()
def process_request(amount, confirmations, expires_in, password):
global num
def process_request(i, amount, confirmations, expires_in, password):
print "process_request", i, amount, confirmations, expires_in
if password!=my_password:
print "wrong password ", password
return
addr = wallet.get_new_address(0, i, 0)
out_queue.put( ('request', (i, addr, amount, confirmations, expires_in) ))
if password != my_password:
return "wrong password"
try:
amount = float(amount)
confirmations = int(confirmations)
expires_in = float(expires_in)
except:
return "incorrect parameters"
account = wallet.accounts["m/0'/0"]
addr = account.get_address(0, num)
num += 1
out_queue.put( ('request', (addr, amount, confirmations, expires_in) ))
return addr
def get_mpk():
return wallet.master_public_key
def server_thread(conn):
from jsonrpclib.SimpleJSONRPCServer import SimpleJSONRPCServer
server = SimpleJSONRPCServer(( my_host, my_port))
server.register_function(process_request, 'request')
server.register_function(get_mpk, 'mpk')
server.register_function(do_stop, 'stop')
server.serve_forever()
def handle_command(cmd):
def send_command(cmd, params):
import jsonrpclib
server = jsonrpclib.Server('http://%s:%d'%(my_host, my_port))
try:
if cmd == 'mpk':
out = server.mpk()
if cmd == 'request':
out = server.request(*params)
elif cmd == 'stop':
out = server.stop()
elif cmd == 'create':
conn = mdb.connect(db_instance, db_user, db_password, db_name);
do_create(conn)
out = "ok"
out = server.stop(*params)
else:
out = "unknown command"
except socket.error:
@ -155,32 +148,32 @@ def handle_command(cmd):
if __name__ == '__main__':
if len(sys.argv) > 1:
ret = handle_command(sys.argv[1])
cmd = sys.argv[1]
params = sys.argv[2:] + [my_password]
ret = send_command(cmd, params)
sys.exit(ret)
print "using database", db_name
conn = mdb.connect(db_instance, db_user, db_password, db_name);
conn = sqlite3.connect(database);
# create table if needed
check_create_table(conn)
interface = Interface({'server':"%s:%d:t"%(electrum_server, 50001)})
interface.start()
interface.send([('blockchain.numblocks.subscribe',[])])
# init network
config = SimpleConfig({'wallet_path':wallet_path})
network = Network(config)
network.start(wait=True)
wallet.interface = interface
interface.register_callback('updated', on_wallet_update)
# create watching_only wallet
storage = WalletStorage(config)
wallet = Wallet(storage)
if not storage.file_exists:
wallet.seed = ''
wallet.create_watching_only_wallet(master_chain,master_public_key)
verifier = WalletVerifier(interface, wallet_config)
wallet.set_verifier(verifier)
synchronizer = WalletSynchronizer(wallet, wallet_config)
synchronizer.start()
verifier.start()
wallet.synchronize = lambda: None # prevent address creation by the wallet
wallet.start_threads(network)
network.register_callback('updated', on_wallet_update)
# this process detects when addresses have paid
request_queue = Queue.Queue()
out_queue = Queue.Queue()
thread.start_new_thread(input_reader_thread, (request_queue,))
thread.start_new_thread(server_thread, (conn,))
while not stopping:
@ -189,8 +182,18 @@ if __name__ == '__main__':
# read pending requests from table
cur.execute("SELECT address, amount, confirmations FROM electrum_payments WHERE paid IS NULL;")
data = cur.fetchall()
# add pending requests to the wallet
for item in data:
request_queue.put(item)
addr, amount, confirmations = item
if addr in pending_requests:
continue
else:
with wallet.lock:
print "subscribing to %s"%addr
pending_requests[addr] = {'requested':float(amount), 'confirmations':int(confirmations)}
wallet.synchronizer.subscribe_to_addresses([addr])
wallet.up_to_date = False
try:
cmd, params = out_queue.get(True, 10)
@ -201,45 +204,41 @@ if __name__ == '__main__':
addr = params
# set paid=1 for received payments
print "received payment from", addr
cur.execute("select id from electrum_payments where address='%s';"%addr)
id = cur.fetchone()[0]
cur.execute("update electrum_payments set paid=1 where id=%d;"%(id))
cur.execute("update electrum_payments set paid=1 where address='%s'"%addr)
elif cmd == 'request':
# add a new request to the table.
i, addr, amount, confs, hours = params
sql = "INSERT INTO electrum_payments (id, address, amount, confirmations, received_at, expires_at, paid, processed)"\
+ " VALUES (%d, '%s', %f, %d, CURRENT_TIMESTAMP, ADDTIME(CURRENT_TIMESTAMP, '0 %d:0:0'), NULL, NULL);"%(i, addr, amount, confs, hours)
addr, amount, confs, minutes = params
sql = "INSERT INTO electrum_payments (address, amount, confirmations, received_at, expires_at, paid, processed)"\
+ " VALUES ('%s', %f, %d, datetime('now'), datetime('now', '+%d Minutes'), NULL, NULL);"%(addr, amount, confs, minutes)
print sql
cur.execute(sql)
# set paid=0 for expired requests
cur.execute("""UPDATE electrum_payments set paid=0 WHERE expires_at < CURRENT_TIMESTAMP and paid is NULL;""")
# do callback for addresses that received payment
cur.execute("""SELECT id, address, paid from electrum_payments WHERE paid is not NULL and processed is NULL;""")
# do callback for addresses that received payment or expired
cur.execute("""SELECT address, paid from electrum_payments WHERE paid is not NULL and processed is NULL;""")
data = cur.fetchall()
for item in data:
print "callback:", item
id = int(item[0])
address = item[1]
paid = int(item[2])
address, paid = item
paid = bool(paid)
headers = {'content-type':'application/json'}
data_json = { 'id':id, 'address':address, 'btc_auth':cb_password }
data_json = { 'address':address, 'password':cb_password, 'paid':paid }
data_json = json.dumps(data_json)
url = cb_received if paid else cb_expired
url = received_url if paid else expired_url
req = urllib2.Request(url, data_json, headers)
try:
response_stream = urllib2.urlopen(req)
cur.execute("UPDATE electrum_payments SET processed=1 WHERE id=%d;"%(id))
except urllib2.HTTPError:
print "cannot do callback", data_json
except ValueError, e:
print e
print "cannot do callback", data_json
conn.commit()
conn.close()
print "terminated"
print "Done"

View File

@ -6,13 +6,14 @@ received and notifies your web application.
The workflow goes like this:
- the server sends a request to the daemon via POST. the request
contains an ID, an amount to be paid, an expiration period.
contains an amount to be paid, a number of confirmations, and an
expiration period in hours.
- the daemon answers with a Bitcoin address, where the customer needs
to send the coins.
- later, the daemon will send a POST to the webserver, to notify that
payment has been received OR that the request has expired
the payment has been received OR that the request has expired
Since addresses are generated using an Electrum master public key, it