Journal¶
-
class
binp.journals.
Headline
(**data)[source]¶ Journal header
-
description
: str¶ operation description
-
duration
: Optional[float]¶ accurate operation duration
-
error
: Optional[str]¶ error message if any
-
finished_at
: Optional[datetime.datetime]¶ execution end time
-
id
: int¶ Journal unique ID
-
labels
: List[str]¶ assigned labels
-
operation
: str¶ operation name
-
started_at
: datetime.datetime¶ journal creation time
-
-
class
binp.journals.
Journal
(**data)[source]¶ Journal entity with header and linked records
-
records
: List[binp.journals.Record]¶ journal records
-
-
class
binp.journals.
Journals
(database=None)[source]¶ Journal of logged invokes.
Should be used as decorator for async methods. Will create, track and record changes automatically. In case of exception, the error will be logged and an exception re-raised.
- Example
from binp import BINP from asyncio import sleep binp = BINP() @binp.journal async def invoke(): ''' Do something ''' await sleep(3) # emulate some work print("done")
By default, journal will be created with name equal to fully-qualified function name and description from doc-string (if exists).
Name and description could by optionally defined manually.
from binp import BINP from asyncio import sleep binp = BINP() @binp.journal(operation='Do Something', description='Emulate some heavy work') async def invoke(): await sleep(3) print("done")
It’s possible to add multiple record to journal. Each record contains text message and unlimited number of key-value pairs, where value should JSON serializable objects or be subclass of BaseModel (pydantic).
from binp import BINP from asyncio import sleep binp = BINP() @binp.journal async def invoke(): await binp.journal.record("begin work", source="http://example.com", by="reddec") await sleep(3) await binp.journal.record("work done", status="success")
It’s safe to combine journal annotation with any other decorators.
To get current journal ID use
current_journal
from binp.journals import current_journal binp = BINP() @binp.journal async def invoke(): print("Journal ID:", current_journal.get())
current_journal
can be used only with function decorated by @journal in call chain. Otherwise it will return None. The variable is context-depended and coroutine-safe.- Events
journal_updated
- when journal created or updated. Emits journal IDrecord_added
- when record added. Emits journal ID
Important! Never set current journal manually.
-
property
current
¶ Helper to get current journal ID
- Return type
Optional
[int
]
-
async
headline
(journal_id)[source]¶ Get single journal headline (without records) by ID
- Return type
Optional
[Headline
]
-
async
history
(offset=0, limit=20)[source]¶ Get journal headlines in reverse order (newest - first).
- Return type
List
[Headline
]
-
async
labels
(*labels)[source]¶ Assign labels to journal. Duplicated labels will be ignored. Only under @journal function.
-
async
record
(message, **events)[source]¶ Add record to journal. Will work only if there is @journal function in call chain.
- Parameters
message (
str
) – short message, describes recordevents (
Union
[BaseModel
,str
,int
,float
,bool
]) – key->value of events, where key is event name, and value is basic type or pydantic model. Value will be serialized as JSON
-
async
remove_dead
()[source]¶ Remove all records without finish_at timestamp. Should be called only once BEFORE any writes.
-
async
search
(operation=None, failed=None, pending=None, labels=None, offset=0, limit=20)[source]¶ Search journals. Each condition joined by AND operator. Null conditions will not be applied. If no conditions defined, it’s equal to plain history() operation. Result ordered in reverse order (newest - first).
- Parameters
operation (
Optional
[str
]) – operation namefailed (
Optional
[bool
]) – with error messagepending (
Optional
[bool
]) – without finished_at attributelabels (
Optional
[Collection
[str
]]) – labels names (at least one of list)offset (
int
) – how many records to skiplimit (
int
) – maximum number of records to return
- Return type
List
[Headline
]