danoan.llm_assistant.runner.cli.commands.session.task_runner module
- class danoan.llm_assistant.runner.cli.commands.session.task_runner.EventNotifier(event_name: str, register_fn: Callable[[str, Callable[[...], Any]], Any], unregister_fn: Callable[[str, Callable[[...], Any]], Any])[source]
- Bases: - object- Helper class that notifies caller when an event is complete. - The EventNotifier is usually returned by an external class (let us say Manager) that accepts registering listeners to its events. A common use is to create a context and check the value of the notifier. - with Manager.get_notifier(event_name) as notifier:
- while True:
- if notifier.is_set():
- break 
 - do_something() 
 
 - Parameters:
- event_name (str) 
- register_fn (Callable[[str, Callable[[...], Any]], Any]) 
- unregister_fn (Callable[[str, Callable[[...], Any]], Any]) 
 
 
- class danoan.llm_assistant.runner.cli.commands.session.task_runner.Task[source]
- Bases: - ABC- Represent a task of TaskRunner. - get_event_notifier(event: Event) EventNotifier[source]
- Create EventNotifier for an event. - The EventNotifier allows us to check if events such as Task.Event.Complete or Task.Event.Stop were triggered. - Parameters:
- event (Event) 
- Return type:
 
 - register_listener(event: Event, callback)[source]
- Register a callback function to be called whenever the event is triggered. - Parameters:
- event (Event) 
 
 - start(**kwargs)[source]
- Start executing the task logic. - It triggers all listeners of Task.Event.Complete whenever is finished. 
 
- class danoan.llm_assistant.runner.cli.commands.session.task_runner.TaskInstruction(task_name: str, task_input: Dict[str, Any] | None)[source]
- Bases: - object- Parameters:
- task_name (str) 
- task_input (Dict[str, Any] | None) 
 
 - task_input: Dict[str, Any] | None
 - task_name: str
 
- exception danoan.llm_assistant.runner.cli.commands.session.task_runner.TaskListenerNotRegisteredError[source]
- Bases: - Exception
- exception danoan.llm_assistant.runner.cli.commands.session.task_runner.TaskNotRegisteredError[source]
- Bases: - Exception
- class danoan.llm_assistant.runner.cli.commands.session.task_runner.TaskRunner[source]
- Bases: - object- Execute TaskInstruction added to its queue until emptness. - TaskRunner executes tasks in the same order they are added in its internal queue via a TaskInstruction. The TaskInstruction contains the task name and its input variables. To execute a task, the latter needs to be priorly registered. - The Task output is an optional TaskInstruction and it is added to the TaskRunner queue as soon it is completed. The added TaskInstruction is then executed in the next turn. - The queue object is adapted to concurrent thread execution. - QuitInstruction = TaskInstruction(task_name='quit', task_input=None)
 - add(td: TaskInstruction)[source]
- Add a new TaskInstruction to the queue. - Parameters:
- td (TaskInstruction)