"""This module takes care of all the logic related to block creation.
The logic is encapsulated in the ``BlockPipeline`` class, while the sequence
of actions to do on transactions is specified in the ``create_pipeline``
function.
"""
import logging
import rethinkdb as r
from multipipes import Pipeline, Node
from bigchaindb.models import Transaction
from bigchaindb.pipelines.utils import ChangeFeed
from bigchaindb import Bigchain
logger = logging.getLogger(__name__)
[docs]class BlockPipeline:
"""This class encapsulates the logic to create blocks.
Note:
Methods of this class will be executed in different processes.
"""
def __init__(self):
"""Initialize the BlockPipeline creator"""
self.bigchain = Bigchain()
self.txs = []
[docs] def filter_tx(self, tx):
"""Filter a transaction.
Args:
tx (dict): the transaction to process.
Returns:
dict: The transaction if assigned to the current node,
``None`` otherwise.
"""
if tx['assignee'] == self.bigchain.me:
tx.pop('assignee')
tx.pop('assignment_timestamp')
return tx
[docs] def validate_tx(self, tx):
"""Validate a transaction.
Also checks if the transaction already exists in the blockchain. If it
does, or it's invalid, it's deleted from the backlog immediately.
Args:
tx (dict): the transaction to validate.
Returns:
:class:`~bigchaindb.models.Transaction`: The transaction if valid,
``None`` otherwise.
"""
tx = Transaction.from_dict(tx)
if self.bigchain.transaction_exists(tx.id):
# if the transaction already exists, we must check whether
# it's in a valid or undecided block
tx, status = self.bigchain.get_transaction(tx.id,
include_status=True)
if status == self.bigchain.TX_VALID \
or status == self.bigchain.TX_UNDECIDED:
# if the tx is already in a valid or undecided block,
# then it no longer should be in the backlog, or added
# to a new block. We can delete and drop it.
self.bigchain.delete_transaction(tx.id)
return None
tx_validated = self.bigchain.is_valid_transaction(tx)
if tx_validated:
return tx
else:
# if the transaction is not valid, remove it from the
# backlog
self.bigchain.delete_transaction(tx.id)
return None
[docs] def create(self, tx, timeout=False):
"""Create a block.
This method accumulates transactions to put in a block and outputs
a block when one of the following conditions is true:
- the size limit of the block has been reached, or
- a timeout happened.
Args:
tx (:class:`~bigchaindb.models.Transaction`): the transaction
to validate, might be None if a timeout happens.
timeout (bool): ``True`` if a timeout happened
(Default: ``False``).
Returns:
:class:`~bigchaindb.models.Block`: The block,
if a block is ready, or ``None``.
"""
if tx:
self.txs.append(tx)
if len(self.txs) == 1000 or (timeout and self.txs):
block = self.bigchain.create_block(self.txs)
self.txs = []
return block
[docs] def write(self, block):
"""Write the block to the Database.
Args:
block (:class:`~bigchaindb.models.Block`): the block of
transactions to write to the database.
Returns:
:class:`~bigchaindb.models.Block`: The Block.
"""
logger.info('Write new block %s with %s transactions', block.id, len(block.transactions))
self.bigchain.write_block(block)
return block
[docs] def delete_tx(self, block):
"""Delete transactions.
Args:
block (:class:`~bigchaindb.models.Block`): the block
containg the transactions to delete.
Returns:
:class:`~bigchaindb.models.Block`: The block.
"""
self.bigchain.delete_transaction(*[tx.id for tx in block.transactions])
return block
[docs]def initial():
"""Return old transactions from the backlog."""
bigchain = Bigchain()
return bigchain.connection.run(
r.table('backlog')
.between([bigchain.me, r.minval],
[bigchain.me, r.maxval],
index='assignee__transaction_timestamp')
.order_by(index=r.asc('assignee__transaction_timestamp')))
[docs]def get_changefeed():
"""Create and return the changefeed for the backlog."""
return ChangeFeed('backlog', ChangeFeed.INSERT | ChangeFeed.UPDATE,
prefeed=initial())
[docs]def create_pipeline():
"""Create and return the pipeline of operations to be distributed
on different processes."""
block_pipeline = BlockPipeline()
pipeline = Pipeline([
Node(block_pipeline.filter_tx),
Node(block_pipeline.validate_tx, fraction_of_cores=1),
Node(block_pipeline.create, timeout=1),
Node(block_pipeline.write),
Node(block_pipeline.delete_tx),
])
return pipeline
[docs]def start():
"""Create, start, and return the block pipeline."""
pipeline = create_pipeline()
pipeline.setup(indata=get_changefeed())
pipeline.start()
return pipeline