TaskManager

class emmet.cli.task_manager.TaskManager(state_manager, running_status_update_interval=30, daemon_log='~/.emmet/daemon.log')

Bases: object

Manages background tasks and their states.

Parameters:
  • state_manager (StateManager)

  • running_status_update_interval (int)

  • daemon_log (str)

start_task(func, *args, **kwargs)

Start a new task in a separate, fully detached process.

Return type:

str

Parameters:
  • func (Callable[[...], Any])

  • args (Any)

  • kwargs (Any)

get_task_status(task_id)

Get the current status of a task.

Return type:

dict[str, Any]

Parameters:

task_id (str)

Args:

task_id: The ID of the task to check

Returns:

Dict containing the task status and result/error if completed

wait_for_task_completion(task_id, timeout=None, check_interval=1.0)

Helper function to wait for task completion

Return type:

dict[str, Any]

Parameters:
  • task_id (str)

  • timeout (float | None)

  • check_interval (float)

is_task_running(task_id)

Check if a task is still running.

Return type:

bool

Parameters:

task_id (str)

cleanup_finished_tasks()

Remove finished tasks from the state manager.

Return type:

None

terminate_task(task_id)

Terminate a running task.

Return type:

dict[str, Any]

Parameters:

task_id (str)