"""Manages the overall data processing service, including initialization, iteration, and running the data worker."""from__future__importannotationsimportasyncioimportpathlibimportsignalfromabcimportABCfromloggingimportgetLoggerfromtypingimportAsyncIterator,Iterable,IteratorfrompydanticimportBaseModel,validate_callfromdataservice.cacheimportCacheFactoryfromdataservice.configimportServiceConfigfromdataservice.filesimportwritersfromdataservice.modelsimportFailedRequest,GenericDataItem,Requestfromdataservice.workerimportDataWorkerlogger=getLogger(__name__)
[docs]classBaseDataService(ABC):"""A base class for the data service."""def__init__(self,requests:Iterable[Request],config:ServiceConfig=ServiceConfig()):self._requests:Iterable[Request]=requestsself.config:ServiceConfig=configself.cache_factory=CacheFactory(config.cache)self._data_worker:DataWorker|None=None@propertydefdata_worker(self)->DataWorker:"""Lazily initialize the data worker."""ifself._data_workerisNone:raiseValueError("Data worker not initialized.")returnself._data_workerasyncdef_init_data_worker(self)->None:"""Initializes the data worker."""ifnotself._data_worker:cache=awaitself.cache_factory.init_cache()self._data_worker=DataWorker(requests=self._requests,config=self.config,cache=cache)
[docs]defregister_signal_handlers(self):"""Register signal handlers for SIGINT and SIGTERM."""loop=asyncio.get_running_loop()loop.add_signal_handler(signal.SIGINT,self._handle_stop_signal)loop.add_signal_handler(signal.SIGTERM,self._handle_stop_signal)logger.info("Signal handlers registered for SIGINT and SIGTERM.")
def_handle_stop_signal(self):"""Set the stop event when a termination signal is received."""logger.info("Received stop signal. Cancelling remaining tasks.")fortaskinasyncio.all_tasks():task.cancel()
[docs]defcleanup_signal_handlers(self):"""Remove signal handlers."""loop=asyncio.get_running_loop()loop.remove_signal_handler(signal.SIGINT)loop.remove_signal_handler(signal.SIGTERM)logger.info("Signal handlers cleaned up.")
asyncdef_run_data_worker(self)->None:"""Runs the data worker to fetch data items."""ifself.data_worker:self.register_signal_handlers()try:awaitself.data_worker.fetch()exceptasyncio.CancelledError:logger.info("DataService cancelled.")finally:self.cleanup_signal_handlers()
[docs]defget_failures(self)->dict[str,FailedRequest]:""" Returns a dict of failed requests. """returnself.data_worker.get_failures()
[docs]@validate_calldefwrite(self,filepath:pathlib.Path,results:Iterable[dict|BaseModel],)->None:""" Writes the results to a file. :param results: An iterable of data items to write. :param filepath: The path to the output file. """ext=filepath.suffixwriter=writers[ext[1:]]writer(filepath).write(results)
[docs]classDataService(BaseDataService):""" A service class to handle data requests and processing. This is the synchronous version of the data service. It will run the data worker in the main thread and block until all data items are fetched. :Example: .. code-block:: python from dataservice import DataService, HttpXClient, Request, Response def parse_books_page(response: Response): articles = response.html.find_all("article", {"class": "product_pod"}) return { "url": response.request.url, "title": response.html.title.get_text(strip=True), "articles": len(articles) } start_requests = [Request(url="https://books.toscrape.com/index.html", callback=parse_books_page, client=HttpXClient())] service = DataService(start_requests) for data_item in service: print(data_item) """def__iter__(self)->Iterator[GenericDataItem]:""" Returns the iterator object itself. """returnselfdef__next__(self)->GenericDataItem:""" Fetches the next data item from the data worker. """self._init_data_worker_sync()ifnotself.data_worker.has_started:logger.info("Start fetching.")self._run_data_worker_sync()logger.info("Retrieving data.")ifself.data_worker.has_no_more_data():raiseStopIterationreturnself.data_worker.get_data_item()def_init_data_worker_sync(self)->None:"""Wrapper for the async init_data_worker method."""asyncio.run(self._init_data_worker())def_run_data_worker_sync(self)->None:""" Runs the data worker to fetch data items. """asyncio.run(self._run_data_worker())
[docs]classAsyncDataService(BaseDataService):"""An asynchronous version of the data service. This class is an asynchronous iterator that can be used to fetch data items asynchronously. :Example: .. code-block:: python from dataservice import AsyncDataService, HttpXClient, Request, Response def parse_books_page(response: Response): articles = response.html.find_all("article", {"class": "product_pod"}) return { "url": response.request.url, "title": response.html.title.get_text(strip=True), "articles": len(articles) } async def main(): start_requests = [Request(url="https://books.toscrape.com/index.html", callback=parse_books_page, client=HttpXClient())] service = AsyncDataService(start_requests) async for data_item in service: print(data_item) asyncio.run(main()) """def__aiter__(self)->AsyncIterator[GenericDataItem]:"""Returns the asynchronous iterator object itself."""returnselfasyncdef__anext__(self)->GenericDataItem:"""Fetches the next data item from the data worker."""awaitself._init_data_worker()ifnotself.data_worker.has_started:logger.info("Start fetching.")awaitself._run_data_worker()logger.info("Retrieving data.")ifself.data_worker.has_no_more_data():raiseStopAsyncIterationreturnself.data_worker.get_data_item()