"""Clients."""
from __future__ import annotations
import warnings
from abc import ABC
from logging import getLogger
from typing import Annotated, Any, Awaitable, Callable, NoReturn, Optional, Sequence
import httpx
from annotated_types import Ge, Le
from pydantic import HttpUrl
from dataservice.config import PlaywrightConfig
from dataservice.exceptions import (
DataServiceException,
NonRetryableException,
RetryableException,
TimeoutException,
)
from dataservice.models import (
CallbackType,
InterceptRequest,
InterceptResponse,
Request,
Response,
)
try:
from playwright.async_api import (
Browser,
BrowserContext,
Playwright,
async_playwright,
)
from playwright.async_api import Page as PlaywrightPage
from playwright.async_api import Request as PlaywrightRequest
from playwright.async_api import Response as PlaywrightResponse
from playwright.async_api import TimeoutError as PlaywrightTimeoutError
PLAYWRIGHT_AVAILABLE = True
except ImportError:
Browser = None
BrowserContext = None
Playwright = None
PlaywrightPage = None
PlaywrightRequest = None
PlaywrightResponse = None
async_playwright = None
PLAYWRIGHT_AVAILABLE = False
logger = getLogger(__name__)
[docs]
class BaseClient(ABC):
"""Base client class."""
async def __call__(
self, *args, **kwargs
) -> Response | Sequence[Response] | NoReturn:
"""Make a request using the client."""
return await self.make_request(*args, **kwargs)
[docs]
async def make_request(
self, request: Request
) -> Response | Sequence[Response] | NoReturn:
raise NotImplementedError
@staticmethod
def _raise_for_status(status_code: int, status_text: str):
"""Raise an exception if the response status code is not 2xx.
:param status_code: The status code of the response.
:param status_text: The status text of the response.
:raises RetryableException: If the status code is 5xx.
:raises DataServiceException: If the status code is not 2xx or 5xx.
"""
if status_code == 200:
return
elif 500 <= status_code < 600 or status_code in [429, 403]:
raise RetryableException(status_text, status_code=status_code)
else:
raise NonRetryableException(status_text, status_code=status_code)
[docs]
class HttpXClient(BaseClient):
"""Client that uses HTTPX library to make requests."""
def __init__(self):
self.async_client = httpx.AsyncClient
[docs]
async def make_request(self, request: Request) -> Response | NoReturn:
"""Make a request using HTTPX.
:param request: The request object containing the details of the HTTP request.
:return: A Response object containing the response data.
"""
try:
logger.info(f"Requesting {request.url}")
return await self._get_response(request)
except httpx.HTTPStatusError as e:
logger.debug(f"HTTP Status Error making request: {e}")
status_code: Annotated[int, Ge(400), Le(600)] = e.response.status_code
self._raise_for_status(status_code, e.response.reason_phrase)
except httpx.TimeoutException as e:
msg = f"Timeout making request: {e}, {e.__class__.__name__}"
logger.debug(msg)
raise TimeoutException(msg)
except httpx.HTTPError as e:
msg = f"HTTP Error making request: {e}, {e.__class__.__name__}"
logger.debug(msg)
raise DataServiceException(msg)
assert False, "Should not reach this point"
async def _get_response(self, request) -> Response:
"""Get the response from the request.
:param request: The request object containing the details of the HTTP request.
:return: A Response object containing the response data.
"""
async with self.async_client(
headers=request.headers,
proxy=request.proxy.url if request.proxy else None,
timeout=request.timeout,
follow_redirects=True,
) as client:
match request.method:
case "GET":
response = await client.get(request.url, params=request.params)
case "POST":
response = await client.post(
request.url,
params=request.params,
data=request.form_data,
json=request.json_data,
)
response.raise_for_status()
match request.content_type:
case "text":
data = None
case "json":
data = response.json()
msg = f"Received response for {request.url}"
if request.params:
msg += f" - params {request.params}"
if request.form_data:
msg += f" - form data {request.form_data}"
if request.json_data:
msg += f" - json data {request.json_data}"
logger.debug(msg)
return Response(
request=request,
text=response.text,
data=data,
url=HttpUrl(str(response.url)),
headers=dict(response.headers),
)
[docs]
class PlaywrightClient(BaseClient):
"""Client that uses Playwright library to make requests."""
def __init__(
self,
*,
actions: Optional[Callable[[PlaywrightPage], Awaitable[None]]] = None,
intercept_url: Optional[str] = None,
config: PlaywrightConfig = PlaywrightConfig(),
):
"""Initialize the PlaywrightClient.
:param actions: Optional coroutine with actions to perform on the page before returning the response.
:param intercept_url: Optional URL to intercept and get data from.
:param config: PlaywrightConfig object.
"""
if not PLAYWRIGHT_AVAILABLE:
raise ImportError(
"Playwright optional dependency is not installed. Please install it with `pip install python-dataservice[playwright]`."
)
self.actions = actions
self.intercept_url = intercept_url
self.config = config
self._intercepted_requests: list[PlaywrightRequest] = []
if self.intercept_url:
warnings.warn(
"Please consider using PlaywrightInterceptClient to intercept requests. "
"In future releases, PlaywrightClient will not support intercepting requests."
)
def _get_context_kwargs(
self, request: Request, config: PlaywrightConfig
) -> dict[str, Any]:
"""Get the context kwargs for the Playwright client.
:param request: The request object containing the details of the HTTP request.
:param config: The Playwright configuration object.
:return: A dictionary containing the context kwargs.
"""
context_kwargs = {}
if request.proxy:
context_kwargs["proxy"] = {"server": request.proxy.url}
if request.headers:
context_kwargs["extra_http_headers"] = request.headers
if config is not None and config.device:
context_kwargs.update(config.device)
return context_kwargs
async def _set_up(
self, request: Request, config: PlaywrightConfig
) -> tuple[Browser, BrowserContext, PlaywrightPage, Playwright]:
"""Set up the Playwright client.
:param request: The request object containing the details of the HTTP request.
:param config: The Playwright configuration object.
"""
playwright = await async_playwright().start()
browser = await getattr(playwright, config.browser).launch(
headless=config.headless
)
context_kwargs = self._get_context_kwargs(request, config)
context = await browser.new_context(**context_kwargs)
page = await context.new_page()
return browser, context, page, playwright
async def _clean_up(self, browser, context, page, playwright):
"""Close the Playwright resources.
:param browser: The Playwright browser object.
:param context: The Playwright context object.
:param page: The Playwright page object.
:param playwright: The Playwright object.
"""
await page.close()
await context.close()
await browser.close()
await playwright.stop()
def _intercept_requests(self, request: PlaywrightRequest):
"""Intercept requests and store the data.
:param request: The request object to intercept.
"""
seen = set()
if self.intercept_url in request.url and request.url not in seen:
logger.debug(f"Intercepted request: {request.url}")
seen.add(request.url)
if self._intercepted_requests:
self._intercepted_requests.append(request)
else:
self._intercepted_requests = [request]
async def _get_intercepted_requests(self) -> dict[str, dict[str, Any]]:
"""Get the responses from the intercepted requests.
:return: A dictionary containing the responses from the intercepted requests.
"""
responses = {}
if self._intercepted_requests:
for request in self._intercepted_requests:
if request.url not in responses:
response = await request.response()
content_type = response.headers.get("content-type", "")
if "application/json" in content_type:
responses[request.url] = await response.json()
else:
responses[request.url] = await response.text()
return responses
[docs]
async def make_request(self, request: Request) -> Response:
"""Make a request using Playwright without assigning instance variables.
:param request: The request object containing the details of the HTTP request.
"""
browser, context, page, playwright = await self._set_up(request, self.config)
if self.intercept_url is not None:
page.on("request", lambda pw_request: self._intercept_requests(pw_request))
try:
logger.debug(f"Requesting {request.url_encoded}")
# Playwright page.goto() timeout is in milliseconds
pw_response = await page.goto(request.url, timeout=request.timeout * 1000)
logger.debug(f"Received response for {request.url_encoded}")
self._raise_for_status(pw_response.status, pw_response.status_text)
if self.actions is not None:
await self.actions(page)
text = await page.content()
data = (
await self._get_intercepted_requests()
if self._intercepted_requests
else None
)
cookies = await context.cookies()
return Response(
request=request,
text=text,
data=data,
url=HttpUrl(pw_response.url),
status_code=pw_response.status,
cookies=cookies,
headers=pw_response.headers,
)
except PlaywrightTimeoutError as e:
logger.debug(f"Timeout making request: {e}")
raise TimeoutException(
f"Timeout making request: {e}, {e.__class__.__name__}"
)
except (RetryableException, NonRetryableException) as e:
raise e
except Exception as e:
raise DataServiceException(
f"Error making request: {e}, {e.__class__.__name__}"
)
finally:
# Close resources
await self._clean_up(browser, context, page, playwright)
[docs]
class PlaywrightInterceptClient(PlaywrightClient):
"""Client that uses Playwright library to make requests and intercept responses."""
def __init__(
self,
*,
intercept_url: str,
callback: CallbackType,
return_html: bool = True,
actions: Optional[Callable[[PlaywrightPage], Awaitable[None]]] = None,
config: PlaywrightConfig = PlaywrightConfig(),
):
"""Initialize the PlaywrightInterceptClient.
:param intercept_url: The URL to intercept and get data from.
:param callback: The callback function to process the intercepted response.
:param return_html: Whether to return the HTML content of the page.
:param actions: Optional coroutine with actions to perform on the page before returning the response.
:param config: PlaywrightConfig object.
"""
if not PLAYWRIGHT_AVAILABLE:
raise ImportError(
"Playwright optional dependency is not installed. Please install it with `pip install python-dataservice[playwright]`."
)
super().__init__(actions=actions, config=config)
self.intercept_url = intercept_url
self.callback = callback
self.return_html = return_html
self._intercepted_requests: list[PlaywrightRequest] = []
[docs]
async def make_request(self, request: Request) -> Sequence[Response]: # type: ignore
"""Make a request and intercept Fetch/XHR responses.
:param request: The request object containing the details of the HTTP request.
:return: A list of ResponseObjects.
:raises RequestException: If a non-retryable HTTP error occurs.
:raises RetryableRequestException: If a retryable HTTP error occurs.
"""
browser, context, page, playwright = await self._set_up(request, self.config)
page.on("request", lambda pw_request: self._intercept_requests(pw_request))
responses = []
try:
logger.debug(f"Requesting {request.url_encoded}")
# Playwright page.goto() timeout is in milliseconds
pw_response = await page.goto(request.url, timeout=request.timeout * 1000)
logger.debug(f"Received response for {request.url_encoded}")
self._raise_for_status(pw_response.status, pw_response.status_text)
if self.actions is not None:
await self.actions(page)
if self.return_html:
text = await page.content()
cookies = await context.cookies()
html_response = Response(
request=request,
text=text,
data=None,
url=HttpUrl(pw_response.url),
status_code=pw_response.status,
cookies=cookies,
headers=pw_response.headers,
)
responses.append(html_response)
for pw_request in self._intercepted_requests:
pw_response = await pw_request.response()
data, text = None, ""
pw_response_headers = pw_response.headers
content_type = pw_response_headers.get("content-type", "")
if "application/json" in content_type:
data = await pw_response.json()
else:
text = await pw_response.text()
request = InterceptRequest(
parent=request,
url=pw_request.url,
headers=pw_request.headers,
method=pw_request.method,
json_data=pw_request.post_data_json,
callback=self.callback,
)
responses.append(
InterceptResponse(
request=request,
text=text,
data=data,
url=HttpUrl(pw_response.url),
status_code=pw_response.status,
headers=pw_response.headers,
)
)
return responses
except PlaywrightTimeoutError as e:
logger.debug(f"Timeout making request: {e}")
raise TimeoutException(
f"Timeout making request: {e}, {e.__class__.__name__}"
)
except (RetryableException, NonRetryableException) as e:
raise e
except Exception as e:
raise DataServiceException(
f"Error making request: {e}, {e.__class__.__name__}"
)
finally:
# Close resources
await self._clean_up(browser, context, page, playwright)