Source code for bigchaindb_driver.connection

# Copyright BigchainDB GmbH and BigchainDB contributors
# SPDX-License-Identifier: (Apache-2.0 AND CC-BY-4.0)
# Code is Apache-2.0 and docs are CC-BY-4.0

import time

from collections import namedtuple
from datetime import datetime, timedelta

from requests import Session
from requests.exceptions import ConnectionError

from .exceptions import HTTP_EXCEPTIONS, TransportError


BACKOFF_DELAY = 0.5  # seconds

HttpResponse = namedtuple('HttpResponse', ('status_code', 'headers', 'data'))


[docs]class Connection: """A Connection object to make HTTP requests to a particular node."""
[docs] def __init__(self, *, node_url, headers=None): """Initializes a :class:`~bigchaindb_driver.connection.Connection` instance. Args: node_url (str): Url of the node to connect to. headers (dict): Optional headers to send with each request. """ self.node_url = node_url self.session = Session() if headers: self.session.headers.update(headers) self._retries = 0 self.backoff_time = None
[docs] def request(self, method, *, path=None, json=None, params=None, headers=None, timeout=None, backoff_cap=None, **kwargs): """Performs an HTTP request with the given parameters. Implements exponential backoff. If `ConnectionError` occurs, a timestamp equal to now + the default delay (`BACKOFF_DELAY`) is assigned to the object. The timestamp is in UTC. Next time the function is called, it either waits till the timestamp is passed or raises `TimeoutError`. If `ConnectionError` occurs two or more times in a row, the retry count is incremented and the new timestamp is calculated as now + the default delay multiplied by two to the power of the number of retries. If a request is successful, the backoff timestamp is removed, the retry count is back to zero. Args: method (str): HTTP method (e.g.: ``'GET'``). path (str): API endpoint path (e.g.: ``'/transactions'``). json (dict): JSON data to send along with the request. params (dict): Dictionary of URL (query) parameters. headers (dict): Optional headers to pass to the request. timeout (int): Optional timeout in seconds. backoff_cap (int): The maximal allowed backoff delay in seconds to be assigned to a node. kwargs: Optional keyword arguments. """ backoff_timedelta = self.get_backoff_timedelta() if timeout is not None and timeout < backoff_timedelta: raise TimeoutError if backoff_timedelta > 0: time.sleep(backoff_timedelta) connExc = None timeout = timeout if timeout is None else timeout - backoff_timedelta try: response = self._request( method=method, timeout=timeout, url=self.node_url + path if path else self.node_url, json=json, params=params, headers=headers, **kwargs, ) except ConnectionError as err: connExc = err raise err finally: self.update_backoff_time(success=connExc is None, backoff_cap=backoff_cap) return response
def get_backoff_timedelta(self): if self.backoff_time is None: return 0 return (self.backoff_time - datetime.utcnow()).total_seconds() def update_backoff_time(self, success, backoff_cap=None): if success: self._retries = 0 self.backoff_time = None else: utcnow = datetime.utcnow() backoff_delta = BACKOFF_DELAY * 2 ** self._retries if backoff_cap is not None: backoff_delta = min(backoff_delta, backoff_cap) self.backoff_time = utcnow + timedelta(seconds=backoff_delta) self._retries += 1 def _request(self, **kwargs): response = self.session.request(**kwargs) text = response.text try: json = response.json() except ValueError: json = None if not (200 <= response.status_code < 300): exc_cls = HTTP_EXCEPTIONS.get(response.status_code, TransportError) raise exc_cls(response.status_code, text, json, kwargs['url']) data = json if json is not None else text return HttpResponse(response.status_code, response.headers, data)