Source code for src.client

from .adapter import UrllibAdapter, RequestsAdapter
from .response import Response
from .request import Request

# at the moment we only support urllib and requests
# we can add more adapters in the future such as: httpx, aiohttp, etc
adapters = {"urllib": UrllibAdapter, "requests": RequestsAdapter}


[docs] class HttpClient: """HTTP client for making requests with pluggable adapters and middleware. Provides a unified interface for HTTP requests with support for different adapters (urllib, requests) and middleware processing (retry, logging, timeout, and also rate limit). """ def __init__( self, adapter: str = "urllib", middleware: list = None, headers: dict = None, **config ) -> None: """Initialize HttpClient with adapter and middleware. Args: adapter (str, optional): Name of the adapter to use ('urllib' or 'requests'). Defaults to 'urllib'. middleware (list, optional): List of middleware functions to apply. Defaults to None. headers (dict, optional): Argument to accepts headers with per-request headers. Defaults to None. **config: Additional configuration options passed to the adapter Raises: ValueError: If the specified adapter is not found """ get_adapter = adapters.get(adapter) if not get_adapter: raise ValueError(f"Adapter {adapter} not found") self.adapter = get_adapter(**config) self.middleware = middleware or {} # detect whether the timeout is present in the config and chain # when it's present, request.timeout must not be forwarded to the adapter, # to prevent double-timer conflict which can cause unxpected behavior. # when it's not present, it will be forwarded to the adapter and still # work without requiring the middleware to be added self.has_timeout_middleware = any( getattr(mdl, "_is_timeout_middleware", False) for mdl in self.middleware ) # create request object and build chain of responsibility self._handler = self._build_chain() # avoids any None checks across per-request headers on the adapters or middleware self.default_headers = headers or {} def _build_chain(self): def core(request): return self.adapter.request( request.method, request.url, request.headers, request.data, # suppress request.timeout if timeout middleware is present to prevent double-timer conflict. # the middleware owns the context deadline if timeout middleware is present. # for the future, it also possibly would break `httpx` which has it's own timeout system None if self.has_timeout_middleware else request.timeout, ) handler = core # wrap handler with middleware in reverse order this is to ensure that # the first middleware is called first and the last middleware is called last. # Otherwise, the behavior will be reversed and changed. Normally, # it will execute the logging wraps retry, since retry happens inside logging for middleware in reversed(self.middleware): handler = middleware(handler) return handler
[docs] def request(self, method: str, url: str, **kwargs) -> Response: """Execute an HTTP request with the specified method. Args: method (str): HTTP method (GET, POST, PUT, DELETE, etc.) url (str): URL to request **kwargs: Additional request options (headers, data, timeout, etc.) Returns: Response: Response object containing status, headers, and body """ # per-request headers override default headers, not replace them entirely merged_headers = {**self.default_headers, **(kwargs.pop("headers", None) or {})} _request = Request(method=method, url=url, headers=merged_headers, **kwargs) return self._handler(_request)
[docs] def get(self, url: str, **kwargs) -> Response: """Send a GET request. Args: url (str): URL to request **kwargs: Additional request options (headers, timeout, etc.) Returns: Response: Response object """ return self.request(method="GET", url=url, **kwargs)
[docs] def post(self, url: str, **kwargs) -> Response: """Send a POST request. Args: url (str): URL to request **kwargs: Additional request options (headers, data, timeout, etc.) Returns: Response: Response object """ return self.request(method="POST", url=url, **kwargs)
[docs] def put(self, url: str, **kwargs) -> Response: """Send a PUT request. Args: url (str): URL to request **kwargs: Additional request options (headers, data, timeout, etc.) Returns: Response: Response object """ return self.request(method="PUT", url=url, **kwargs)
[docs] def delete(self, url: str, **kwargs) -> Response: """Send a DELETE request. Args: url (str): URL to request **kwargs: Additional request options (headers, timeout, etc.) Returns: Response: Response object """ return self.request(method="DELETE", url=url, **kwargs)
[docs] def options(self, url: str, **kwargs) -> Response: """Send an OPTIONS request. Args: url (str): URL to request **kwargs: Additional request options (headers, timeout, etc.) Returns: Response: Response object """ return self.request(method="OPTIONS", url=url, **kwargs)
[docs] def head(self, url: str, **kwargs) -> Response: """Send a HEAD request. Args: url (str): URL to request **kwargs: Additional request options (headers, timeout, etc.) Returns: Response: Response object with no response body, headers preserved """ response = self.request(method="HEAD", url=url, **kwargs) # HEAD requests should not return a response body per HTTP/1.1 spec # however, the headers (such as content-length) should be preserved to allow clients # to determine the size of the response body without actually fetching it return Response( status_code=response.status_code, headers=response.headers, body=b"", elapsed=response.elapsed, )
[docs] def patch(self, url: str, **kwargs) -> Response: """Send a PATCH request. Args: url (str): URL to request **kwargs: Additional request options (headers, data, timeout, etc.) Returns: Response: Response object """ return self.request(method="PATCH", url=url, **kwargs)
[docs] def close(self) -> None: """Close the client and clean up adapter resources. Closes the underlying adapter and releases all connections if the adapter supports closing. """ # close session if exists with print statement if hasattr(self.adapter, "close"): print("Closing adapter and all connections") self.adapter.close()