from collections import namedtuple
from requests import Session
from .exceptions import HTTP_EXCEPTIONS, TransportError
HttpResponse = namedtuple('HttpResponse', ('status_code', 'headers', 'data'))
[docs]class Connection:
"""A Connection object to make HTTP requests."""
[docs] def __init__(self, *, node_url):
"""Initializes a :class:`~bigchaindb_driver.connection.Connection`
instance.
Args:
node_url (str): Url of the node to connect to.
"""
self.node_url = node_url
self.session = Session()
[docs] def request(self, method, *, path=None, json=None, **kwargs):
"""Performs an HTTP requests for the specified arguments.
Args:
method (str): HTTP method (e.g.: `'GET`'.
path (str): API endpoint path (e.g.: `'/transactions'`.
json (dict): JSON data to send along with the request.
kwargs: Optional keyword arguments.
"""
url = self.node_url + path if path else self.node_url
response = self.session.request(
method=method, url=url, json=json, **kwargs)
text = response.text
try:
json = response.json()
except ValueError:
json = None
if not (200 <= response.status_code < 300):
exc_cls = HTTP_EXCEPTIONS.get(response.status_code, TransportError)
raise exc_cls(response.status_code, text, json)
data = json if json else text
return HttpResponse(response.status_code, response.headers, data)