Utils

class binp.events.Emitter[source]

Typed event emitter based on async queues.

Event emitting is non-blocking operation. After subscription, listener will not miss any event regardless of processing time (in exchange of memory). Events order are strictly the same as emitting order.

Can be used as decorator.

Example

on_something : Emitter[str] = Emitter()

@on_something
async def subscriber(payload: str):
    print("payload:", payload)

def emitter():
    on_something.emit('hello world')

Also can be used without decorator

Example

on_something : Emitter[str] = Emitter()

async def subscriber():
    with on_something.subscribe() as queue:
         while True:
             payload: str = await queue.get()
             print("payload:", payload)

def emitter():
    on_something.emit('hello world')
emit(payload)[source]

Emit event. Non-blocking operation.

subscribe(own_queue=None)[source]

Create queue that will listen for the event. Queue will be automatically unsubscribed. A new queue will be created if no own queue will be provided.