"""This module monitors for stale transactions.
It reassigns transactions which have been assigned a node but
remain in the backlog past a certain amount of time.
"""
import logging
from multipipes import Pipeline, Node
from bigchaindb import Bigchain
from time import sleep
logger = logging.getLogger(__name__)
[docs]class StaleTransactionMonitor:
"""This class encapsulates the logic for re-assigning stale transactions.
Note:
Methods of this class will be executed in different processes.
"""
def __init__(self, timeout=5, backlog_reassign_delay=None):
"""Initialize StaleTransaction monitor
Args:
timeout: how often to check for stale tx (in sec)
backlog_reassign_delay: How stale a transaction should
be before reassignment (in sec). If supplied, overrides
the Bigchain default value.
"""
self.bigchain = Bigchain(backlog_reassign_delay=backlog_reassign_delay)
self.timeout = timeout
[docs] def check_transactions(self):
"""Poll backlog for stale transactions
Returns:
txs (list): txs to be re assigned
"""
sleep(self.timeout)
for tx in self.bigchain.get_stale_transactions():
yield tx
[docs] def reassign_transactions(self, tx):
"""Put tx back in backlog with new assignee
Returns:
transaction
"""
# NOTE: Maybe this is to verbose?
logger.info('Reassigning transaction with id %s', tx['id'])
self.bigchain.reassign_transaction(tx)
return tx
[docs]def create_pipeline(timeout=5, backlog_reassign_delay=5):
"""Create and return the pipeline of operations to be distributed
on different processes."""
stm = StaleTransactionMonitor(timeout=timeout,
backlog_reassign_delay=backlog_reassign_delay)
monitor_pipeline = Pipeline([
Node(stm.check_transactions),
Node(stm.reassign_transactions)
])
return monitor_pipeline
[docs]def start(timeout=5, backlog_reassign_delay=None):
"""Create, start, and return the block pipeline."""
pipeline = create_pipeline(timeout=timeout,
backlog_reassign_delay=backlog_reassign_delay)
pipeline.start()
return pipeline