Source code for binp.service
from asyncio import Task, get_event_loop, sleep, CancelledError
from dataclasses import dataclass
from enum import Enum
from logging import getLogger
from typing import Callable, Awaitable, Optional, Dict, List
from pydantic import BaseModel
from binp.events import Emitter
[docs]class Status(str, Enum):
"""
Service life status
"""
#: service stopped and will not be restarted
stopped = 'stopped'
#: service scheduled to start
starting = 'starting'
#: service up and running
running = 'running'
#: service stopped, is waiting before restart
restarting = 'restarting'
[docs]class Info(BaseModel):
"""
Service information
"""
#: service name
name: str
#: service description
description: str
#: actual service status
status: Status
#: is service marked to be started automatically
autostart: bool
#: is service has to be restarted automatically
restart: bool
#: interval between restarts
restart_delay: float
[docs]@dataclass
class Handler:
info: Info
events: Emitter[Info]
handler: Callable[[], Awaitable]
task: Optional[Task] = None
async def __call__(self):
logger = getLogger('service:' + self.info.name)
try:
while True:
self.info.status = Status.running
self.events.emit(self.info)
try:
await self.handler()
except (CancelledError, KeyboardInterrupt):
break
except Exception as ex:
logger.warning("service stopped: %v", ex, exc_info=ex)
if not self.info.restart:
break
self.info.status = Status.restarting
self.events.emit(self.info)
await sleep(self.info.restart_delay)
finally:
self.info.status = Status.stopped
self.events.emit(self.info)
[docs]class Service:
"""
Annotate async function as service (background task). Supports automatic (default) and manual start,
restarts, restarts delays.
Useful to interact with environment in unpredictable schedule (ex: listen for low-level network requests).
.. code-block:: python
from binp import BINP
from asyncio import sleep
binp = BINP()
@binp.service
async def check_messages():
while True:
await sleep(3)
print("checks")
For scheduling by time better use `aiocron` instead:
.. code-block::
pip install aiocron
.. highlight:: python
.. code-block:: python
from binp import BINP
from aiocron import crontab
binp = BINP()
@crontab("*/5 * * * *")
@binp.journal
async def poll_something():
print("do something every 5 minutes....")
:Conflicts:
Services are indexed by name. If multiple services defined with the same name - the old one will be stopped and
the latest one will be used.
:Events:
* ``service_changed`` - when service status changed. Emits Info
"""
def __init__(self):
self.service_changed: Emitter[Info] = Emitter()
self.__services: Dict[str, Handler] = {}
def __call__(self, func: Optional[Callable[[], Awaitable]] = None, *,
name: Optional[str] = None,
description: Optional[str] = None,
restart: bool = True,
autostart: bool = True,
restart_delay: float = 3):
"""
Mark async function as service
"""
def register_function(fn: Callable[[], Awaitable]):
nonlocal name, description
if name is None:
name = fn.__qualname__
if description is None:
description = "\n".join(line.strip() for line in (fn.__doc__ or '').splitlines()).strip()
if name in self.__services:
old = self.__services[name]
getLogger(self.__class__.__qualname__).warning("redefining service %r: %s => %s", name,
old.handler.__qualname__, fn.__qualname__)
if old.task is not None:
old.task.cancel()
old.task = None
handler = Handler(
info=Info(
name=name,
description=description,
status=Status.stopped,
autostart=autostart,
restart=restart,
restart_delay=restart_delay
),
events=self.service_changed,
handler=fn
)
self.__services[name] = handler
if autostart:
self.start(name)
else:
self.service_changed.emit(handler.info)
return fn
if func is None:
return register_function
return register_function(func)
[docs] def start(self, name: str):
"""
Starts single service by name. Does nothing if no such service or service not yet stopped.
"""
service = self.__services.get(name)
if service is None:
return
if service.info.status != Status.stopped:
return
service.info.status = Status.starting
service.task = get_event_loop().create_task(service())
self.service_changed.emit(service.info)
[docs] def stop(self, name: str):
"""
Stops service by name. Does nothing if no such service or service stopped.
"""
service = self.__services.get(name)
if service is None or service.info.status == Status.stopped or service.task is None:
return
service.task.cancel()
@property
def services(self) -> List[Info]:
"""
List of all services
"""
return [v.info for v in self.__services.values()]