dataservice package

Submodules

dataservice.clients module

Clients.

class dataservice.clients.BaseClient[source]

Bases: ABC

Base client class.

async make_request(request)[source]
Return type:

Union[Response, Sequence[Response], NoReturn]

class dataservice.clients.HttpXClient[source]

Bases: BaseClient

Client that uses HTTPX library to make requests.

async make_request(request)[source]

Make a request using HTTPX.

Parameters:

request (Request) – The request object containing the details of the HTTP request.

Return type:

Union[Response, NoReturn]

Returns:

A Response object containing the response data.

class dataservice.clients.PlaywrightClient(*, actions=None, intercept_url=None, config=PlaywrightConfig(browser='chromium', headless=True, slow_mo=0, device=None))[source]

Bases: BaseClient

Client that uses Playwright library to make requests.

Initialize the PlaywrightClient.

Parameters:
  • actions (Optional[Callable[[Page], Awaitable[None]]]) – Optional coroutine with actions to perform on the page before returning the response.

  • intercept_url (Optional[str]) – Optional URL to intercept and get data from.

  • config (PlaywrightConfig) – PlaywrightConfig object.

async make_request(request)[source]

Make a request using Playwright without assigning instance variables. :type request: Request :param request: The request object containing the details of the HTTP request.

Return type:

Response

class dataservice.clients.PlaywrightInterceptClient(*, intercept_url, callback, return_html=True, actions=None, config=PlaywrightConfig(browser='chromium', headless=True, slow_mo=0, device=None))[source]

Bases: PlaywrightClient

Client that uses Playwright library to make requests and intercept responses.

Initialize the PlaywrightInterceptClient.

Parameters:
  • intercept_url (str) – The URL to intercept and get data from.

  • callback (Callable[[Response], Union[Iterator[Union[Request, dict[Any, Any], BaseModel]], Request, dict[Any, Any], BaseModel]]) – The callback function to process the intercepted response.

  • return_html (bool) – Whether to return the HTML content of the page.

  • actions (Optional[Callable[[Page], Awaitable[None]]]) – Optional coroutine with actions to perform on the page before returning the response.

  • config (PlaywrightConfig) – PlaywrightConfig object.

async make_request(request)[source]

Make a request and intercept Fetch/XHR responses.

Parameters:

request (Request) – The request object containing the details of the HTTP request.

Return type:

Sequence[Response]

Returns:

A list of ResponseObjects.

Raises:
  • RequestException – If a non-retryable HTTP error occurs.

  • RetryableRequestException – If a retryable HTTP error occurs.

dataservice.config module

Config.

pydantic model dataservice.config.CacheConfig[source]

Bases: BaseModel

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Validators:
field cache_type: Literal['json', 'pickle', 'remote'] = 'json'

The type of cache to use.

Validated by:
field load_state: Optional[Callable[[], Awaitable[Any]]] = None

A function to load the cache state. Only used for remote cache.

Validated by:
field path: FilePath | NewPath = 'cache.json'

The path of the file to use for the cache. Defaults to ‘cache.json’. Unused for remote cache.

Validated by:
field save_state: Optional[Callable[[dict], Awaitable[None]]] = None

A function to save the cache state. Only used for remote cache.

Validated by:
field use: bool = False

Whether to cache requests.

Validated by:
field write_interval: PositiveInt = 1200

The interval to write the cache in seconds. Defaults to 20 minutes.

Constraints:
  • ge = 0

Validated by:
field write_periodically: bool = True

Whether to write the cache to disk periodically. Defaults to True.

Validated by:
validator validate  »  all fields[source]
Return type:

CacheConfig

pydantic model dataservice.config.DelayConfig[source]

Bases: BaseModel

Delay configuration for the service.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

field amount: Milliseconds = 0

The total amount of delay in milliseconds.

field type: Literal['constant', 'random'] = 'random'

The type of delay. Either constant or random. Defaults to random.

get()[source]
pydantic model dataservice.config.PlaywrightConfig[source]

Bases: BaseModel

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

field browser: Literal['chromium', 'firefox', 'webkit'] = 'chromium'

The browser to use.

field device: Optional[dict[str, Any]] = None

The devices to use.

field headless: bool = True

Whether to run in headless mode.

field slow_mo: PositiveInt = 0

The slow motion delay in milliseconds.

Constraints:
  • ge = 0

pydantic model dataservice.config.ProxyConfig[source]

Bases: BaseModel

Proxy configuration for the service.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

field host: str [Required]

The proxy host.

field password: Optional[str] = None

The proxy password.

field port: int [Required]

The proxy port.

field username: Optional[str] = None

The proxy username.

classmethod from_url(url)[source]
Return type:

ProxyConfig

property url: str
pydantic model dataservice.config.RateLimiterConfig[source]

Bases: BaseModel

Retry configuration for the service.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

field max_rate: PositiveInt = 10
Constraints:
  • ge = 0

field time_period: Seconds = 60
pydantic model dataservice.config.RetryConfig[source]

Bases: BaseModel

Retry configuration for the service.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

field max_attempts: PositiveInt = 3
Constraints:
  • ge = 0

field wait_exp_max: PositiveInt = 10
Constraints:
  • ge = 0

field wait_exp_min: PositiveInt = 4
Constraints:
  • ge = 0

field wait_exp_mul: PositiveInt = 1
Constraints:
  • ge = 0

pydantic model dataservice.config.ServiceConfig[source]

Bases: BaseModel

Global configuration for the service.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

field cache: CacheConfig [Optional]

The cache configuration

field deduplication: bool = True

Whether to deduplicate requests.

field delay: DelayConfig [Optional]

The delay configuration

field limiter: RateLimiterConfig | None = None

The rate limiter configuration

field max_concurrency: PositiveInt = 10

The maximum number of concurrent requests.

Constraints:
  • ge = 0

field retry: RetryConfig [Optional]

The retry configuration.

dataservice.data module

Data Module.

pydantic model dataservice.data.BaseDataItem[source]

Bases: BaseModel

Base class for all data items.

Implements a model validator that wraps the data in a DataWrapper and returns the wrapped data with errors.

Example:

class MyDataItem(BaseDataItem):
    data: int
    data_callable: int

item = MyDataItem({"data": 1, "data_callable": lambda: 1 / 0})
print(item)
# MyDataItem data=1 data_callable=None
print(item.errors)
# {'data_callable': {'type': 'ZeroDivisionError', 'message': 'division by zero'}}

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Validators:
  • _run_callables » all fields

field errors: dict[Any, DataError] = {}
Validated by:
  • _run_callables

class dataservice.data.DataError[source]

Bases: TypedDict

Data error type.

message: str
type: str
class dataservice.data.DataSink[source]

Bases: ABC

Data sink protocol.

Base class used to define the interface for data sinks.

write(data)[source]

Write data to the sink.

Return type:

None

class dataservice.data.DataWrapper(mapping=None, /, **kwargs)[source]

Bases: dict

Special type of dictionary that runs callables and stores exceptions. Values can be callables or any other type. Callables are evaluated when accessed. If an exception occurs, the exception is stored in the errors dictionary.

static maybe(value)[source]

When value is a callable, return (value(), None) or (None, exception) if an exception occurs, Return (value, None) if value is not a callable.

Example:

DataWrapper.maybe(lambda: 1)
(1, None)
DataWrapper.maybe(lambda: 1 / 0)
(None, ZeroDivisionError('division by zero'))
Parameters:

value (Any) – The value to be evaluated. It can be a callable or any other type.

Return type:

tuple[Any | None, None | Exception]

Returns:

A tuple containing the evaluated value or None, and an exception or None.

dataservice.exceptions module

Exceptions module.

exception dataservice.exceptions.DataServiceException(message, status_code=None)[source]

Bases: Exception

Base class for all DataService exceptions.

Initialize the DataService. :type message: str :param message: The message to display. :type status_code: int | None :param status_code: The status code of the response if there is one

exception dataservice.exceptions.NonRetryableException(message, status_code=None)[source]

Bases: DataServiceException

Exception for non retryable exceptions.

Initialize the DataService. :type message: str :param message: The message to display. :type status_code: int | None :param status_code: The status code of the response if there is one

exception dataservice.exceptions.ParsingException(message, status_code=None)[source]

Bases: DataServiceException

Exception raised when parsing fails.

Initialize the DataService. :type message: str :param message: The message to display. :type status_code: int | None :param status_code: The status code of the response if there is one

exception dataservice.exceptions.RetryableException(message, status_code=None)[source]

Bases: DataServiceException

Base class for all retryable exceptions.

Initialize the DataService. :type message: str :param message: The message to display. :type status_code: int | None :param status_code: The status code of the response if there is one

exception dataservice.exceptions.TimeoutException(message, status_code=None)[source]

Bases: DataServiceException

Exception raised when a request times out.

Initialize the DataService. :type message: str :param message: The message to display. :type status_code: int | None :param status_code: The status code of the response if there is one

dataservice.models module

Models for the data service.

class dataservice.models.FailedRequest[source]

Bases: TypedDict

Failed request model.

exception: str
message: str
request: Request
pydantic model dataservice.models.InterceptRequest[source]

Bases: Request

Intercept request model.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Validators:

field callback: CallbackType [Required]

The callback function to process the intercepted response.

Validated by:
field client: ClientCallable = None

Override base class.

Validated by:
field parent: Request [Required]

The parent request object.

Validated by:
pydantic model dataservice.models.InterceptResponse[source]

Bases: Response

Intercept response model.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

model_post_init(context: Any, /) None

We need to both initialize private attributes and call the user-defined model_post_init method.

Return type:

None

pydantic model dataservice.models.Request[source]

Bases: BaseModel

Request model.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Validators:
field callback: CallbackType [Required]

The callback function to process the response.

Validated by:
field client: ClientCallable [Required]

The client callable to use for the request.

Validated by:
field content_type: Literal['text', 'json'] = 'text'

The content type of the request.

Validated by:
field cookies: Optional[list[dict]] = None

The cookies of the request.

Validated by:
field form_data: Optional[dict] = None

The form data of the request.

Validated by:
field headers: Optional[dict] = None

The headers of the request.

Validated by:
field json_data: Optional[dict] = None

The json data of the request.

Validated by:
field method: Literal['GET', 'POST'] = 'GET'

The method of the request.

Validated by:
field params: Optional[dict] = None

The parameters of the request.

Validated by:
field proxy: Optional[ProxyConfig] = None

The proxy configuration for the request.

Validated by:
field timeout: int = 30

The time out of the request.

Constraints:
  • ge = 1

  • le = 300

Validated by:
field url: Annotated[HttpUrl, AfterValidator(str), Field(description='The URL of the request.')] [Required]

The URL of the request.

Constraints:
  • max_length = 2083

  • allowed_schemes = [‘http’, ‘https’]

  • func = <class ‘str’>

Validated by:
ser_model()[source]
Return type:

dict[str, Any]

validator validate  »  all fields[source]
Return type:

Request

property callback_name: str
property client_name: str
property unique_key: str

Return a unique key for the request.

property url_encoded: Annotated[Url, UrlConstraints(max_length=2083, allowed_schemes=['http', 'https'], host_required=None, default_host=None, default_port=None, default_path=None)]

Return the URL encoded.

pydantic model dataservice.models.Response[source]

Bases: BaseModel

Response model.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

field cookies: Optional[list[dict]] = None

The cookies of the response.

field data: dict | list[dict] | None = None

The data of the response.

field headers: Optional[dict] = None

The headers of the response.

field request: Request [Required]

The request that generated the response.

field status_code: int = 200

The status code of the response.

Constraints:
  • ge = 100

  • le = 599

field text: str = ''

The text of the response.

field url: Annotated[HttpUrl, AfterValidator(str), Field(description='The URL of the response.')] [Required]

The URL of the response.

Constraints:
  • max_length = 2083

  • allowed_schemes = [‘http’, ‘https’]

  • func = <class ‘str’>

model_post_init(context, /)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Parameters:
  • self (BaseModel) – The BaseModel instance.

  • context (Any) – The context.

Return type:

None

property client: Callable[[Request], Awaitable[Response]]
property html: BeautifulSoup

Return the BeautifulSoup object of the response, if the initial request asked for text data.

dataservice.service module

Manages the overall data processing service, including initialization, iteration, and running the data worker.

class dataservice.service.AsyncDataService(requests, config=ServiceConfig(retry=RetryConfig(max_attempts=3, wait_exp_max=10, wait_exp_min=4, wait_exp_mul=1), deduplication=True, max_concurrency=10, limiter=None, cache=CacheConfig(use=False, cache_type='json', path='cache.json', write_interval=1200, write_periodically=True, save_state=None, load_state=None), delay=DelayConfig(amount=0, type='random')))[source]

Bases: BaseDataService

An asynchronous version of the data service. This class is an asynchronous iterator that can be used to fetch data items asynchronously.

Example:
from dataservice import AsyncDataService, HttpXClient, Request, Response

def parse_books_page(response: Response):
    articles = response.html.find_all("article", {"class": "product_pod"})
    return {
        "url": response.request.url,
        "title": response.html.title.get_text(strip=True),
        "articles": len(articles)
    }

async def main():
    start_requests = [Request(url="https://books.toscrape.com/index.html", callback=parse_books_page, client=HttpXClient())]
    service = AsyncDataService(start_requests)
    async for data_item in service:
        print(data_item)

asyncio.run(main())
class dataservice.service.BaseDataService(requests, config=ServiceConfig(retry=RetryConfig(max_attempts=3, wait_exp_max=10, wait_exp_min=4, wait_exp_mul=1), deduplication=True, max_concurrency=10, limiter=None, cache=CacheConfig(use=False, cache_type='json', path='cache.json', write_interval=1200, write_periodically=True, save_state=None, load_state=None), delay=DelayConfig(amount=0, type='random')))[source]

Bases: ABC

A base class for the data service.

cleanup_signal_handlers()[source]

Remove signal handlers.

property data_worker: DataWorker

Lazily initialize the data worker.

get_failures()[source]

Returns a dict of failed requests.

Return type:

dict[str, FailedRequest]

register_signal_handlers()[source]

Register signal handlers for SIGINT and SIGTERM.

write(filepath, results)[source]

Writes the results to a file.

Parameters:
  • results (Iterable[dict | BaseModel]) – An iterable of data items to write.

  • filepath (Path) – The path to the output file.

Return type:

None

class dataservice.service.DataService(requests, config=ServiceConfig(retry=RetryConfig(max_attempts=3, wait_exp_max=10, wait_exp_min=4, wait_exp_mul=1), deduplication=True, max_concurrency=10, limiter=None, cache=CacheConfig(use=False, cache_type='json', path='cache.json', write_interval=1200, write_periodically=True, save_state=None, load_state=None), delay=DelayConfig(amount=0, type='random')))[source]

Bases: BaseDataService

A service class to handle data requests and processing. This is the synchronous version of the data service. It will run the data worker in the main thread and block until all data items are fetched.

Example:
from dataservice import DataService, HttpXClient, Request, Response

def parse_books_page(response: Response):
    articles = response.html.find_all("article", {"class": "product_pod"})
    return {
        "url": response.request.url,
        "title": response.html.title.get_text(strip=True),
        "articles": len(articles)
    }

start_requests = [Request(url="https://books.toscrape.com/index.html", callback=parse_books_page, client=HttpXClient())]
service = DataService(start_requests)
for data_item in service:
    print(data_item)

dataservice.worker module

Handles the actual data processing tasks, including managing queues, handling requests, and processing data items.

class dataservice.worker.DataWorker(requests, *, config, cache=<contextlib.nullcontext object>)[source]

Bases: object

A worker class to handle asynchronous data processing.

Initializes the DataWorker with the given parameters. :type requests: Iterable[Request] :param requests: An iterable of requests to process. :type config: ServiceConfig :param config: The configuration for the service.

async fetch()[source]

Fetches data items by processing the work queue.

Return type:

None

get_data_item()[source]

Retrieve a data item from the data queue.

Return type:

dict[Any, Any] | BaseModel

Returns:

The data item.

get_failures()[source]

Return a dictionary of failed requests.

Return type:

dict[str, FailedRequest]

Returns:

A tuple of failed requests.

has_jobs()[source]

Check if there are jobs in the work queue.

Return type:

bool

Returns:

True if there are jobs in the work queue, False otherwise.

has_no_more_data()[source]

Check if there are no more data items in the data queue.

Return type:

bool

Returns:

True if there are no more data items, False otherwise.

property has_started: bool

Check if the worker has started.

Returns:

True if the worker has started, False otherwise.

Module contents

class dataservice.AsyncDataService(requests, config=ServiceConfig(retry=RetryConfig(max_attempts=3, wait_exp_max=10, wait_exp_min=4, wait_exp_mul=1), deduplication=True, max_concurrency=10, limiter=None, cache=CacheConfig(use=False, cache_type='json', path='cache.json', write_interval=1200, write_periodically=True, save_state=None, load_state=None), delay=DelayConfig(amount=0, type='random')))[source]

Bases: BaseDataService

An asynchronous version of the data service. This class is an asynchronous iterator that can be used to fetch data items asynchronously.

Example:
from dataservice import AsyncDataService, HttpXClient, Request, Response

def parse_books_page(response: Response):
    articles = response.html.find_all("article", {"class": "product_pod"})
    return {
        "url": response.request.url,
        "title": response.html.title.get_text(strip=True),
        "articles": len(articles)
    }

async def main():
    start_requests = [Request(url="https://books.toscrape.com/index.html", callback=parse_books_page, client=HttpXClient())]
    service = AsyncDataService(start_requests)
    async for data_item in service:
        print(data_item)

asyncio.run(main())
pydantic model dataservice.BaseDataItem[source]

Bases: BaseModel

Base class for all data items.

Implements a model validator that wraps the data in a DataWrapper and returns the wrapped data with errors.

Example:

class MyDataItem(BaseDataItem):
    data: int
    data_callable: int

item = MyDataItem({"data": 1, "data_callable": lambda: 1 / 0})
print(item)
# MyDataItem data=1 data_callable=None
print(item.errors)
# {'data_callable': {'type': 'ZeroDivisionError', 'message': 'division by zero'}}

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Validators:
  • _run_callables » all fields

field errors: dict[Any, DataError] = {}
Validated by:
  • _run_callables

pydantic model dataservice.CacheConfig[source]

Bases: BaseModel

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Validators:
field cache_type: Literal['json', 'pickle', 'remote'] = 'json'

The type of cache to use.

Validated by:
field load_state: Optional[Callable[[], Awaitable[Any]]] = None

A function to load the cache state. Only used for remote cache.

Validated by:
field path: FilePath | NewPath = 'cache.json'

The path of the file to use for the cache. Defaults to ‘cache.json’. Unused for remote cache.

Validated by:
field save_state: Optional[Callable[[dict], Awaitable[None]]] = None

A function to save the cache state. Only used for remote cache.

Validated by:
field use: bool = False

Whether to cache requests.

Validated by:
field write_interval: PositiveInt = 1200

The interval to write the cache in seconds. Defaults to 20 minutes.

Constraints:
  • ge = 0

Validated by:
field write_periodically: bool = True

Whether to write the cache to disk periodically. Defaults to True.

Validated by:
validator validate  »  all fields[source]
Return type:

CacheConfig

class dataservice.DataService(requests, config=ServiceConfig(retry=RetryConfig(max_attempts=3, wait_exp_max=10, wait_exp_min=4, wait_exp_mul=1), deduplication=True, max_concurrency=10, limiter=None, cache=CacheConfig(use=False, cache_type='json', path='cache.json', write_interval=1200, write_periodically=True, save_state=None, load_state=None), delay=DelayConfig(amount=0, type='random')))[source]

Bases: BaseDataService

A service class to handle data requests and processing. This is the synchronous version of the data service. It will run the data worker in the main thread and block until all data items are fetched.

Example:
from dataservice import DataService, HttpXClient, Request, Response

def parse_books_page(response: Response):
    articles = response.html.find_all("article", {"class": "product_pod"})
    return {
        "url": response.request.url,
        "title": response.html.title.get_text(strip=True),
        "articles": len(articles)
    }

start_requests = [Request(url="https://books.toscrape.com/index.html", callback=parse_books_page, client=HttpXClient())]
service = DataService(start_requests)
for data_item in service:
    print(data_item)
exception dataservice.DataServiceException(message, status_code=None)[source]

Bases: Exception

Base class for all DataService exceptions.

Initialize the DataService. :type message: str :param message: The message to display. :type status_code: int | None :param status_code: The status code of the response if there is one

class dataservice.DataWrapper(mapping=None, /, **kwargs)[source]

Bases: dict

Special type of dictionary that runs callables and stores exceptions. Values can be callables or any other type. Callables are evaluated when accessed. If an exception occurs, the exception is stored in the errors dictionary.

static maybe(value)[source]

When value is a callable, return (value(), None) or (None, exception) if an exception occurs, Return (value, None) if value is not a callable.

Example:

DataWrapper.maybe(lambda: 1)
(1, None)
DataWrapper.maybe(lambda: 1 / 0)
(None, ZeroDivisionError('division by zero'))
Parameters:

value (Any) – The value to be evaluated. It can be a callable or any other type.

Return type:

tuple[Any | None, None | Exception]

Returns:

A tuple containing the evaluated value or None, and an exception or None.

pydantic model dataservice.DelayConfig[source]

Bases: BaseModel

Delay configuration for the service.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

field amount: Milliseconds = 0

The total amount of delay in milliseconds.

field type: Literal['constant', 'random'] = 'random'

The type of delay. Either constant or random. Defaults to random.

get()[source]
class dataservice.FailedRequest[source]

Bases: TypedDict

Failed request model.

exception: str
message: str
request: Request
class dataservice.HttpXClient[source]

Bases: BaseClient

Client that uses HTTPX library to make requests.

async make_request(request)[source]

Make a request using HTTPX.

Parameters:

request (Request) – The request object containing the details of the HTTP request.

Return type:

Union[Response, NoReturn]

Returns:

A Response object containing the response data.

class dataservice.PlaywrightClient(*, actions=None, intercept_url=None, config=PlaywrightConfig(browser='chromium', headless=True, slow_mo=0, device=None))[source]

Bases: BaseClient

Client that uses Playwright library to make requests.

Initialize the PlaywrightClient.

Parameters:
  • actions (Optional[Callable[[Page], Awaitable[None]]]) – Optional coroutine with actions to perform on the page before returning the response.

  • intercept_url (Optional[str]) – Optional URL to intercept and get data from.

  • config (PlaywrightConfig) – PlaywrightConfig object.

async make_request(request)[source]

Make a request using Playwright without assigning instance variables. :type request: Request :param request: The request object containing the details of the HTTP request.

Return type:

Response

pydantic model dataservice.PlaywrightConfig[source]

Bases: BaseModel

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

field browser: Literal['chromium', 'firefox', 'webkit'] = 'chromium'

The browser to use.

field device: Optional[dict[str, Any]] = None

The devices to use.

field headless: bool = True

Whether to run in headless mode.

field slow_mo: PositiveInt = 0

The slow motion delay in milliseconds.

Constraints:
  • ge = 0

class dataservice.PlaywrightInterceptClient(*, intercept_url, callback, return_html=True, actions=None, config=PlaywrightConfig(browser='chromium', headless=True, slow_mo=0, device=None))[source]

Bases: PlaywrightClient

Client that uses Playwright library to make requests and intercept responses.

Initialize the PlaywrightInterceptClient.

Parameters:
  • intercept_url (str) – The URL to intercept and get data from.

  • callback (Callable[[Response], Union[Iterator[Union[Request, dict[Any, Any], BaseModel]], Request, dict[Any, Any], BaseModel]]) – The callback function to process the intercepted response.

  • return_html (bool) – Whether to return the HTML content of the page.

  • actions (Optional[Callable[[Page], Awaitable[None]]]) – Optional coroutine with actions to perform on the page before returning the response.

  • config (PlaywrightConfig) – PlaywrightConfig object.

async make_request(request)[source]

Make a request and intercept Fetch/XHR responses.

Parameters:

request (Request) – The request object containing the details of the HTTP request.

Return type:

Sequence[Response]

Returns:

A list of ResponseObjects.

Raises:
  • RequestException – If a non-retryable HTTP error occurs.

  • RetryableRequestException – If a retryable HTTP error occurs.

dataservice.PlaywrightPage

alias of Page

pydantic model dataservice.ProxyConfig[source]

Bases: BaseModel

Proxy configuration for the service.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

field host: str [Required]

The proxy host.

field password: Optional[str] = None

The proxy password.

field port: int [Required]

The proxy port.

field username: Optional[str] = None

The proxy username.

classmethod from_url(url)[source]
Return type:

ProxyConfig

property url: str
pydantic model dataservice.RateLimiterConfig[source]

Bases: BaseModel

Retry configuration for the service.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

field max_rate: PositiveInt = 10
Constraints:
  • ge = 0

field time_period: Seconds = 60
pydantic model dataservice.Request[source]

Bases: BaseModel

Request model.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Validators:
field callback: CallbackType [Required]

The callback function to process the response.

Validated by:
field client: ClientCallable [Required]

The client callable to use for the request.

Validated by:
field content_type: Literal['text', 'json'] = 'text'

The content type of the request.

Validated by:
field cookies: Optional[list[dict]] = None

The cookies of the request.

Validated by:
field form_data: Optional[dict] = None

The form data of the request.

Validated by:
field headers: Optional[dict] = None

The headers of the request.

Validated by:
field json_data: Optional[dict] = None

The json data of the request.

Validated by:
field method: Literal['GET', 'POST'] = 'GET'

The method of the request.

Validated by:
field params: Optional[dict] = None

The parameters of the request.

Validated by:
field proxy: Optional[ProxyConfig] = None

The proxy configuration for the request.

Validated by:
field timeout: int = 30

The time out of the request.

Constraints:
  • ge = 1

  • le = 300

Validated by:
field url: Annotated[HttpUrl, AfterValidator(str), Field(description='The URL of the request.')] [Required]

The URL of the request.

Constraints:
  • max_length = 2083

  • allowed_schemes = [‘http’, ‘https’]

  • func = <class ‘str’>

Validated by:
ser_model()[source]
Return type:

dict[str, Any]

validator validate  »  all fields[source]
Return type:

Request

property callback_name: str
property client_name: str
property unique_key: str

Return a unique key for the request.

property url_encoded: Annotated[Url, UrlConstraints(max_length=2083, allowed_schemes=['http', 'https'], host_required=None, default_host=None, default_port=None, default_path=None)]

Return the URL encoded.

pydantic model dataservice.Response[source]

Bases: BaseModel

Response model.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

field cookies: Optional[list[dict]] = None

The cookies of the response.

field data: dict | list[dict] | None = None

The data of the response.

field headers: Optional[dict] = None

The headers of the response.

field request: Request [Required]

The request that generated the response.

field status_code: int = 200

The status code of the response.

Constraints:
  • ge = 100

  • le = 599

field text: str = ''

The text of the response.

field url: Annotated[HttpUrl, AfterValidator(str), Field(description='The URL of the response.')] [Required]

The URL of the response.

Constraints:
  • max_length = 2083

  • allowed_schemes = [‘http’, ‘https’]

  • func = <class ‘str’>

model_post_init(context, /)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that’s what pydantic-core passes when calling it.

Parameters:
  • self (BaseModel) – The BaseModel instance.

  • context (Any) – The context.

Return type:

None

property client: Callable[[Request], Awaitable[Response]]
property html: BeautifulSoup

Return the BeautifulSoup object of the response, if the initial request asked for text data.

pydantic model dataservice.RetryConfig[source]

Bases: BaseModel

Retry configuration for the service.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

field max_attempts: PositiveInt = 3
Constraints:
  • ge = 0

field wait_exp_max: PositiveInt = 10
Constraints:
  • ge = 0

field wait_exp_min: PositiveInt = 4
Constraints:
  • ge = 0

field wait_exp_mul: PositiveInt = 1
Constraints:
  • ge = 0

exception dataservice.RetryableException(message, status_code=None)[source]

Bases: DataServiceException

Base class for all retryable exceptions.

Initialize the DataService. :type message: str :param message: The message to display. :type status_code: int | None :param status_code: The status code of the response if there is one

pydantic model dataservice.ServiceConfig[source]

Bases: BaseModel

Global configuration for the service.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

field cache: CacheConfig [Optional]

The cache configuration

field deduplication: bool = True

Whether to deduplicate requests.

field delay: DelayConfig [Optional]

The delay configuration

field limiter: RateLimiterConfig | None = None

The rate limiter configuration

field max_concurrency: PositiveInt = 10

The maximum number of concurrent requests.

Constraints:
  • ge = 0

field retry: RetryConfig [Optional]

The retry configuration.

dataservice.setup_logging(logger_name=None, level='DEBUG')[source]

Setup logging configuration.

Parameters:
  • logger_name (str | None) – The logger name.

  • level (Literal['DEBUG', 'INFO', 'WARNING', 'ERROR']) – The logging level.