from .adapter import UrllibAdapter, RequestsAdapter
from .response import Response
from .request import Request
# at the moment we only support urllib and requests
# we can add more adapters in the future such as: httpx, aiohttp, etc
adapters = {"urllib": UrllibAdapter, "requests": RequestsAdapter}
[docs]
class HttpClient:
"""HTTP client for making requests with pluggable adapters and middleware.
Provides a unified interface for HTTP requests with support for different
adapters (urllib, requests) and middleware processing (retry, logging, timeout,
and also rate limit).
"""
def __init__(
self, adapter: str = "urllib", middleware: list = None, headers: dict = None, **config
) -> None:
"""Initialize HttpClient with adapter and middleware.
Args:
adapter (str, optional): Name of the adapter to use ('urllib' or 'requests').
Defaults to 'urllib'.
middleware (list, optional): List of middleware functions to apply.
Defaults to None.
headers (dict, optional): Argument to accepts headers with per-request headers.
Defaults to None.
**config: Additional configuration options passed to the adapter
Raises:
ValueError: If the specified adapter is not found
"""
get_adapter = adapters.get(adapter)
if not get_adapter:
raise ValueError(f"Adapter {adapter} not found")
self.adapter = get_adapter(**config)
self.middleware = middleware or {}
# detect whether the timeout is present in the config and chain
# when it's present, request.timeout must not be forwarded to the adapter,
# to prevent double-timer conflict which can cause unxpected behavior.
# when it's not present, it will be forwarded to the adapter and still
# work without requiring the middleware to be added
self.has_timeout_middleware = any(
getattr(mdl, "_is_timeout_middleware", False) for mdl in self.middleware
)
# create request object and build chain of responsibility
self._handler = self._build_chain()
# avoids any None checks across per-request headers on the adapters or middleware
self.default_headers = headers or {}
def _build_chain(self):
def core(request):
return self.adapter.request(
request.method,
request.url,
request.headers,
request.data,
# suppress request.timeout if timeout middleware is present to prevent double-timer conflict.
# the middleware owns the context deadline if timeout middleware is present.
# for the future, it also possibly would break `httpx` which has it's own timeout system
None if self.has_timeout_middleware else request.timeout,
)
handler = core
# wrap handler with middleware in reverse order this is to ensure that
# the first middleware is called first and the last middleware is called last.
# Otherwise, the behavior will be reversed and changed. Normally,
# it will execute the logging wraps retry, since retry happens inside logging
for middleware in reversed(self.middleware):
handler = middleware(handler)
return handler
[docs]
def request(self, method: str, url: str, **kwargs) -> Response:
"""Execute an HTTP request with the specified method.
Args:
method (str): HTTP method (GET, POST, PUT, DELETE, etc.)
url (str): URL to request
**kwargs: Additional request options (headers, data, timeout, etc.)
Returns:
Response: Response object containing status, headers, and body
"""
# per-request headers override default headers, not replace them entirely
merged_headers = {**self.default_headers, **(kwargs.pop("headers", None) or {})}
_request = Request(method=method, url=url, headers=merged_headers, **kwargs)
return self._handler(_request)
[docs]
def get(self, url: str, **kwargs) -> Response:
"""Send a GET request.
Args:
url (str): URL to request
**kwargs: Additional request options (headers, timeout, etc.)
Returns:
Response: Response object
"""
return self.request(method="GET", url=url, **kwargs)
[docs]
def post(self, url: str, **kwargs) -> Response:
"""Send a POST request.
Args:
url (str): URL to request
**kwargs: Additional request options (headers, data, timeout, etc.)
Returns:
Response: Response object
"""
return self.request(method="POST", url=url, **kwargs)
[docs]
def put(self, url: str, **kwargs) -> Response:
"""Send a PUT request.
Args:
url (str): URL to request
**kwargs: Additional request options (headers, data, timeout, etc.)
Returns:
Response: Response object
"""
return self.request(method="PUT", url=url, **kwargs)
[docs]
def delete(self, url: str, **kwargs) -> Response:
"""Send a DELETE request.
Args:
url (str): URL to request
**kwargs: Additional request options (headers, timeout, etc.)
Returns:
Response: Response object
"""
return self.request(method="DELETE", url=url, **kwargs)
[docs]
def options(self, url: str, **kwargs) -> Response:
"""Send an OPTIONS request.
Args:
url (str): URL to request
**kwargs: Additional request options (headers, timeout, etc.)
Returns:
Response: Response object
"""
return self.request(method="OPTIONS", url=url, **kwargs)
[docs]
def head(self, url: str, **kwargs) -> Response:
"""Send a HEAD request.
Args:
url (str): URL to request
**kwargs: Additional request options (headers, timeout, etc.)
Returns:
Response: Response object with no response body, headers preserved
"""
response = self.request(method="HEAD", url=url, **kwargs)
# HEAD requests should not return a response body per HTTP/1.1 spec
# however, the headers (such as content-length) should be preserved to allow clients
# to determine the size of the response body without actually fetching it
return Response(
status_code=response.status_code,
headers=response.headers,
body=b"",
elapsed=response.elapsed,
)
[docs]
def patch(self, url: str, **kwargs) -> Response:
"""Send a PATCH request.
Args:
url (str): URL to request
**kwargs: Additional request options (headers, data, timeout, etc.)
Returns:
Response: Response object
"""
return self.request(method="PATCH", url=url, **kwargs)
[docs]
def close(self) -> None:
"""Close the client and clean up adapter resources.
Closes the underlying adapter and releases all connections if the adapter
supports closing.
"""
# close session if exists with print statement
if hasattr(self.adapter, "close"):
print("Closing adapter and all connections")
self.adapter.close()