"""HTTP utility methods."""
import asyncio
from aiohttp import BaseConnector, ClientError, ClientResponse, ClientSession
from aiohttp.web import HTTPConflict
from ..core.error import BaseError
from .repeat import RepeatSequence
[docs]class FetchError(BaseError):
"""Error raised when an HTTP fetch fails."""
[docs]class PutError(BaseError):
"""Error raised when an HTTP put fails."""
[docs]async def fetch_stream(
url: str,
*,
headers: dict = None,
retry: bool = True,
max_attempts: int = 5,
interval: float = 1.0,
backoff: float = 0.25,
request_timeout: float = 10.0,
connector: BaseConnector = None,
session: ClientSession = None,
):
"""Fetch from an HTTP server with automatic retries and timeouts.
Args:
url: the address to fetch
headers: an optional dict of headers to send
retry: flag to retry the fetch
max_attempts: the maximum number of attempts to make
interval: the interval between retries, in seconds
backoff: the backoff interval, in seconds
request_timeout: the HTTP request timeout, in seconds
connector: an optional existing BaseConnector
session: a shared ClientSession
json: flag to parse the result as JSON
"""
limit = max_attempts if retry else 1
if not session:
session = ClientSession(
connector=connector, connector_owner=(not connector), trust_env=True
)
async with session:
async for attempt in RepeatSequence(limit, interval, backoff):
try:
async with attempt.timeout(request_timeout):
response: ClientResponse = await session.get(url, headers=headers)
if response.status < 200 or response.status >= 300:
raise ClientError(
f"Bad response from server: {response.status} - "
f"{response.reason}"
)
return response.content
except (ClientError, asyncio.TimeoutError) as e:
if attempt.final:
raise FetchError("Exceeded maximum fetch attempts") from e
[docs]async def fetch(
url: str,
*,
headers: dict = None,
retry: bool = True,
max_attempts: int = 5,
interval: float = 1.0,
backoff: float = 0.25,
request_timeout: float = 10.0,
connector: BaseConnector = None,
session: ClientSession = None,
json: bool = False,
):
"""Fetch from an HTTP server with automatic retries and timeouts.
Args:
url: the address to fetch
headers: an optional dict of headers to send
retry: flag to retry the fetch
max_attempts: the maximum number of attempts to make
interval: the interval between retries, in seconds
backoff: the backoff interval, in seconds
request_timeout: the HTTP request timeout, in seconds
connector: an optional existing BaseConnector
session: a shared ClientSession
json: flag to parse the result as JSON
"""
limit = max_attempts if retry else 1
if not session:
session = ClientSession(
connector=connector, connector_owner=(not connector), trust_env=True
)
async with session:
async for attempt in RepeatSequence(limit, interval, backoff):
try:
async with attempt.timeout(request_timeout):
response: ClientResponse = await session.get(url, headers=headers)
if response.status < 200 or response.status >= 300:
raise ClientError(
f"Bad response from server: {response.status} - "
f"{response.reason}"
)
return await (response.json() if json else response.text())
except (ClientError, asyncio.TimeoutError) as e:
if attempt.final:
raise FetchError("Exceeded maximum fetch attempts") from e
[docs]async def put_file(
url: str,
file_data: dict,
extra_data: dict,
*,
retry: bool = True,
max_attempts: int = 5,
interval: float = 1.0,
backoff: float = 0.25,
request_timeout: float = 10.0,
connector: BaseConnector = None,
session: ClientSession = None,
json: bool = False,
):
"""Put to HTTP server with automatic retries and timeouts.
Args:
url: the address to use
file_data: dict with data key and path of file to upload
extra_data: further content to include in data to put
headers: an optional dict of headers to send
retry: flag to retry the fetch
max_attempts: the maximum number of attempts to make
interval: the interval between retries, in seconds
backoff: the backoff interval, in seconds
request_timeout: the HTTP request timeout, in seconds
connector: an optional existing BaseConnector
session: a shared ClientSession
json: flag to parse the result as JSON
"""
(data_key, file_path) = [k for k in file_data.items()][0]
data = {**extra_data}
limit = max_attempts if retry else 1
if not session:
session = ClientSession(
connector=connector, connector_owner=(not connector), trust_env=True
)
async with session:
async for attempt in RepeatSequence(limit, interval, backoff):
try:
async with attempt.timeout(request_timeout):
with open(file_path, "rb") as f:
data[data_key] = f
response: ClientResponse = await session.put(url, data=data)
if (response.status < 200 or response.status >= 300) and (
response.status != HTTPConflict.status_code
):
raise ClientError(
f"Bad response from server: {response.status}, "
f"{response.reason}"
)
return await (response.json() if json else response.text())
except (ClientError, asyncio.TimeoutError) as e:
if attempt.final:
raise PutError("Exceeded maximum put attempts") from e