from contextvars import ContextVar
from datetime import datetime
from functools import wraps
from json import dumps, loads
from logging import getLogger
from time import monotonic
from typing import List, Optional, Union, Any, Dict, Mapping, Collection, Tuple
from databases import Database
from pydantic.main import BaseModel
from binp.db import ensure
from binp.events import Emitter
"""
Current journal record ID. Can be used only from functions under @journal.log decorators.
Useful to link some other entities to the journal record.
It's not recommended modify the variable outside core module.
"""
current_journal: ContextVar[Optional[int]] = ContextVar('current_journal', default=None)
[docs]class Record(BaseModel):
"""
Single record in journal
"""
#: message provided by invoker
message: str
#: record creation time
created_at: datetime
#: fields defined for record
params: Dict[str, Any]
[docs]class Headline(BaseModel):
"""
Journal header
"""
#: Journal unique ID
id: int
#: operation name
operation: str
#: operation description
description: str
#: journal creation time
started_at: datetime
#: assigned labels
labels: List[str]
#: execution end time
finished_at: Optional[datetime] = None
#: error message if any
error: Optional[str] = None
#: accurate operation duration
duration: Optional[float] = None
[docs] @classmethod
def from_database(cls, info: Mapping, labels: List[str]) -> 'Headline':
return Headline(
id=info['id'],
operation=info['operation'],
description=info['description'],
started_at=info['started_at'],
finished_at=info['finished_at'],
error=info['error'],
duration=info['duration'],
labels=labels,
)
[docs]class Journal(Headline):
"""
Journal entity with header and linked records
"""
#: journal records
records: List[Record]
[docs]class Journals:
"""
Journal of logged invokes.
Should be used as decorator for async methods. Will create, track and record changes automatically.
In case of exception, the error will be logged and an exception re-raised.
:Example:
.. code-block:: python
from binp import BINP
from asyncio import sleep
binp = BINP()
@binp.journal
async def invoke():
'''
Do something
'''
await sleep(3) # emulate some work
print("done")
By default, journal will be created with name equal to fully-qualified
function name and description from doc-string (if exists).
Name and description could by optionally defined manually.
.. code-block:: python
from binp import BINP
from asyncio import sleep
binp = BINP()
@binp.journal(operation='Do Something', description='Emulate some heavy work')
async def invoke():
await sleep(3)
print("done")
It's possible to add multiple record to journal. Each record contains text message and unlimited number
of key-value pairs, where value should JSON serializable objects or be subclass of BaseModel (pydantic).
.. code-block:: python
from binp import BINP
from asyncio import sleep
binp = BINP()
@binp.journal
async def invoke():
await binp.journal.record("begin work", source="http://example.com", by="reddec")
await sleep(3)
await binp.journal.record("work done", status="success")
It's safe to combine journal annotation with any other decorators.
To get current journal ID use ``current_journal``
.. code-block:: python
from binp.journals import current_journal
binp = BINP()
@binp.journal
async def invoke():
print("Journal ID:", current_journal.get())
``current_journal`` can be used only with function decorated by @journal in call chain.
Otherwise it will return None. The variable is context-depended and coroutine-safe.
:Events:
* ``journal_updated`` - when journal created or updated. Emits journal ID
* ``record_added`` - when record added. Emits journal ID
**Important!** Never set current journal manually.
"""
def __init__(self, database: Optional[Database] = None):
self.__db = ensure(database)
self.journal_updated: Emitter[int] = Emitter()
self.record_added: Emitter[int] = Emitter()
def __call__(self, func=None, *, operation: Optional[str] = None, description: Optional[str] = None):
"""
Decorator that tracks operation and put it to journal.
"""
def trace_operation(fn):
nonlocal operation
nonlocal description
if operation is None:
operation = fn.__qualname__
if description is None:
description = "\n".join(line.strip() for line in (fn.__doc__ or '').splitlines()).strip()
@wraps(fn)
async def wrapper(*args, **kwargs):
a = monotonic()
ex: Optional[Exception] = None
rec = await self.__begin(operation, description)
token = current_journal.set(rec)
try:
return await fn(*args, **kwargs)
except Exception as f_ex:
ex = f_ex
raise
finally:
current_journal.reset(token)
await self.__end(rec, monotonic() - a, ex)
return wrapper
if func is None:
return trace_operation
return trace_operation(func)
[docs] async def history(self, offset: int = 0, limit: int = 20) -> List[Headline]:
"""
Get journal headlines in reverse order (newest - first).
"""
db = await self.__db()
rows = await db.fetch_all('''
SELECT * FROM journal ORDER BY id DESC LIMIT :limit OFFSET :offset
''', values={
'offset': offset,
'limit': limit
})
ans = []
for info in rows:
labels = await self.__fetch_labels(info['id'])
ans.append(Headline.from_database(info, labels))
return ans
[docs] async def search(self, operation: Optional[str] = None,
failed: Optional[bool] = None,
pending: Optional[bool] = None,
labels: Optional[Collection[str]] = None,
offset: int = 0,
limit: int = 20) -> List[Headline]:
"""
Search journals. Each condition joined by AND operator. Null conditions will not be applied.
If no conditions defined, it's equal to plain history() operation.
Result ordered in reverse order (newest - first).
:param operation: operation name
:param failed: with error message
:param pending: without finished_at attribute
:param labels: labels names (at least one of list)
:param offset: how many records to skip
:param limit: maximum number of records to return
"""
conditions = []
args = {
'offset': offset,
'limit': limit
}
if operation is not None:
conditions.append('operation = :operation')
args['operation'] = operation
if failed is not None:
if failed:
conditions.append('error IS NOT NULL')
else:
conditions.append('error IS NULL')
if pending is not None:
if pending:
conditions.append('finished_at IS NULL')
else:
conditions.append('finished_at IS NOT NULL')
if labels is not None:
opts = []
for i, label in enumerate(labels):
key = f'label_{i}'
args[key] = label
opts.append(":" + key)
conditions.append(
f'id IN (SELECT distinct(journal_id) FROM journal_label WHERE label IN ({",".join(opts)}))')
if len(conditions) == 0:
return await self.history(offset, limit)
where = ' AND '.join(conditions)
query = f'SELECT journal.* FROM journal WHERE {where} ORDER BY id LIMIT :limit OFFSET :offset'
getLogger(self.__class__.__qualname__).debug('search query: %s', query)
db = await self.__db()
rows = await db.fetch_all(query, values=args)
ans = []
for info in rows:
labels = await self.__fetch_labels(info['id'])
ans.append(Headline.from_database(info, labels))
return ans
[docs] async def get(self, journal_id: int) -> Optional[Journal]:
"""
Get single journal by ID
"""
db = await self.__db()
info = await db.fetch_one('SELECT * FROM journal WHERE id = :journal_id', values={
'journal_id': journal_id
})
if info is None:
return None
headline = await self.headline(journal_id)
if headline is None:
return None
records = await self.__fetch_records(journal_id)
return Journal(
records=records,
**dict(headline),
)
[docs] async def headline(self, journal_id: int) -> Optional[Headline]:
"""
Get single journal headline (without records) by ID
"""
db = await self.__db()
info = await db.fetch_one('SELECT * FROM journal WHERE id = :journal_id', values={
'journal_id': journal_id
})
if info is None:
return None
labels = await self.__fetch_labels(journal_id)
return Headline.from_database(info, labels)
[docs] async def labels(self, *labels: str):
"""
Assign labels to journal. Duplicated labels will be ignored. Only under @journal function.
"""
logger = getLogger(self.__class__.__qualname__)
journal_id = current_journal.get()
if journal_id is None:
logger.warning('function no marked as @journal - label will not be assigned')
return
db = await self.__db()
await db.execute_many('INSERT OR IGNORE INTO journal_label (journal_id, label) VALUES (:journal_id, :label)',
values=[
{'journal_id': journal_id, 'label': label} for label in labels
])
[docs] async def record(self, message: str, **events: Union[BaseModel, str, int, float, bool]):
"""
Add record to journal. Will work only if there is
@journal function in call chain.
:param message: short message, describes record
:param events: key->value of events, where key is event name, and value is basic type or pydantic model.
Value will be serialized as JSON
"""
logger = getLogger(self.__class__.__qualname__)
journal_id = current_journal.get()
if journal_id is None:
logger.warning('function no marked as @journal - event will not be published')
return
db = await self.__db()
async with db.transaction():
await db.execute('''INSERT INTO record (journal_id, message) VALUES (:journal_id, :message)''', values={
'journal_id': journal_id,
'message': message or '',
})
record_id = (await db.fetch_one('SELECT last_insert_rowid()'))[0]
logger.info(message)
await db.execute_many('''INSERT INTO record_field (record_id, name, value)
VALUES (:record_id, :name, :value)
''', values=[{
'name': name,
'value': value.json() if isinstance(value, BaseModel) else dumps(value, ensure_ascii=False),
'record_id': record_id
} for name, value in events.items()])
self.record_added.emit(journal_id)
[docs] async def remove_dead(self):
"""
Remove all records without finish_at timestamp. Should be called only once BEFORE any writes.
"""
db = await self.__db()
await db.execute('''
DELETE FROM journal
WHERE finished_at IS NULL
''')
@property
def current(self) -> Optional[int]:
"""
Helper to get current journal ID
"""
return current_journal.get()
async def __fetch_labels(self, journal_id: int) -> List[str]:
db = await self.__db()
rows = await db.fetch_all('SELECT label FROM journal_label WHERE journal_id = :journal_id', values={
'journal_id': journal_id
})
if rows is None:
return []
return [row['label'] for row in rows]
async def __fetch_records(self, journal_id: int) -> List[Record]:
db = await self.__db()
rows = await db.fetch_all('SELECT * FROM record WHERE journal_id = :journal_id ORDER BY id DESC', values={
'journal_id': journal_id
})
if rows is None or len(rows) == 0:
return []
result = []
for row in rows:
fields = await self.__fetch_fields(row['id'])
result.append(Record(
message=row['message'],
created_at=row['created_at'],
params=fields
))
return result
async def __fetch_fields(self, record_id: int) -> Dict[str, Any]:
db = await self.__db()
rows = await db.fetch_all('SELECT name, value FROM record_field WHERE record_id = :record_id', values={
'record_id': record_id
})
if rows is None or len(rows) == 0:
return {}
return dict((row['name'], loads(row['value'])) for row in rows)
async def __begin(self, name, description) -> int:
db = await self.__db()
async with db.transaction():
await db.execute('''INSERT INTO journal (operation, description)
VALUES (:operation, :description)
''', values={
'operation': name,
'description': description,
})
res = await db.fetch_one('SELECT last_insert_rowid()')
journal_id = res[0]
self.journal_updated.emit(journal_id)
return journal_id
async def __end(self, journal_id: int, delta: float, exc=None):
db = await self.__db()
await db.execute('''
UPDATE journal
SET finished_at = current_timestamp,
duration = :duration,
error = :error
WHERE id = :id
''', values={
'duration': delta,
'id': journal_id,
'error': str(exc) if exc is not None else None
})
self.journal_updated.emit(journal_id)