Skip to content

Lilota module

Lilota

High-level interface for Lilota task scheduling and worker management.

The Lilota class coordinates a scheduler instance and a pool of worker processes that execute user-defined task scripts.

A scheduler manages task persistence, distribution, and node heartbeats, while worker processes execute tasks defined inside the provided script.

Workers are started as separate Python processes that run the provided script file as a module entry point.

Source code in lilota/core.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
class Lilota():
  """High-level interface for Lilota task scheduling and worker management.

  The Lilota class coordinates a scheduler instance and a pool of worker
  processes that execute user-defined task scripts.

  A scheduler manages task persistence, distribution, and node heartbeats,
  while worker processes execute tasks defined inside the provided script.

  Workers are started as separate Python processes that run the provided
  script file as a module entry point.
  """

  def __init__(self, db_url: str, script_path: str, number_of_workers: int = cpu_count(), scheduler_heartbeat_interval: float = 5, scheduler_timeout_sec: int = 20, process_manager_interval: float = 1.0, stop_worker_timeout: int = 60, logging_level = logging.INFO):
    """Create a new Lilota runtime instance.

    Args:
      db_url (str):
        Database connection URL used by the scheduler.

      script_path (str):
        Path to a Python script that defines and registers Lilota tasks.
        Each worker process executes this script as its entry point.

      number_of_workers (int, optional):
        Number of worker processes to spawn. Defaults to the number of CPU cores.

      scheduler_heartbeat_interval (float, optional):
        Interval in seconds between scheduler node heartbeats.
        Defaults to 5 seconds.

      scheduler_timeout_sec (int, optional):
        Time in seconds after which a node is considered inactive
        if no heartbeat is received. Defaults to 20 seconds.

      stop_worker_timeout (int, optional):
        Maximum time in seconds to wait for worker processes to
        exit gracefully before they are forcefully killed.
        Defaults to 60 seconds.

      logging_level (int, optional):
        Logging level used by the scheduler. Defaults to logging.INFO.
    """
    self._db_url = db_url
    self._script_path = script_path
    self._number_of_workers = number_of_workers
    self._process_manager_interval = process_manager_interval
    self._stop_worker_timeout = stop_worker_timeout
    self._error_queue: Queue = None
    self._processes: list[WorkerProcess] = []
    self._process_manager_heartbeat: Heartbeat = None

    self._scheduler = LilotaScheduler(
      db_url=db_url,
      node_heartbeat_interval=scheduler_heartbeat_interval,
      node_timeout_sec=scheduler_timeout_sec,
      logging_level=logging_level
    )

    self._node_store = SqlAlchemyNodeStore(self._db_url, None)
    self._task_store = SqlAlchemyTaskStore(self._db_url, None)
    self._is_started = False


  def start(self):
    """Start the scheduler and spawn worker processes.

    This method:
      1. Starts the Lilota scheduler.
      2. Launches worker processes that execute the configured task script.
      3. Monitors the started processes.

    After startup, the instance is ready to schedule and process tasks.
    """
    self._error_queue = Queue()
    self._start_scheduler()
    self._start_workers()
    self._start_process_manager()
    self._is_started = True


  def _start_scheduler(self):
    self._scheduler.start()


  def _start_workers(self):
    # Start processes
    for _ in range(self._number_of_workers):
      p = self._create_process()
      p.start()
      self._processes.append(p)


  def _create_process(self):
    return WorkerProcess(
      id=uuid4(),
      target=_run_script,
      args=(self._script_path, self._error_queue)
    )


  def _start_process_manager(self):
    # Start the process manager
    process_manager_task = ProcessManagerTask(
      interval=self._process_manager_interval,
      processes=self._processes,
      error_queue=self._error_queue,
      process_factory=self._create_process
    )
    self._process_manager_heartbeat = Heartbeat(f"process_manager", process_manager_task, None)
    self._process_manager_heartbeat.start()


  def stop(self):
    """Stop the scheduler and terminate all worker processes.

    Worker processes are first asked to terminate gracefully. If a worker
    does not exit within ``stop_worker_timeout`` seconds, it will be
    forcefully killed.

    After completion, all worker processes are cleaned up and removed
    from the internal process list.
    """
    self._stop_scheduler()
    self._stop_process_manager()
    self._stop_workers()
    self._is_started = False


  def _stop_scheduler(self):
    self._scheduler.stop()


  def _stop_workers(self):
    for p in self._processes:
      if p.is_alive():
        # Ask process to terminate nicely
        p.terminate()

    # Give them a chance to exit
    for p in self._processes:
      p.join(self._stop_worker_timeout)

    # Force kill if still alive (usually not needed, but safe)
    for p in self._processes:
      if p.is_alive():
        p.kill()
        p.join()

    # Remove all processes from the list
    self._processes.clear()


  def _stop_process_manager(self):
    # Stop the process manager
    if self._process_manager_heartbeat:
      self._process_manager_heartbeat.stop_and_join(timeout=self._stop_worker_timeout)
      self._process_manager_heartbeat = None

    # Close the error queue
    if self._error_queue:
      self._error_queue.close()
      self._error_queue.join_thread()
      self._error_queue = None


  def join(self):
    """Block until all worker processes have finished.

    This is typically used in long-running worker setups where the main
    process should wait until workers exit.
    """
    for p in self._processes:
      if p.is_alive():
        p.join()


  def schedule(self, name: str, input: Any = None) -> int:
    """Schedule a task for execution.

    Args:
      name (str): Name of the registered task.
      input (Any, optional): Input payload for the task. Defaults to None.

    Returns:
      int: Identifier of the scheduled task.
    """
    return self._scheduler.schedule(name, input)


  def get_all_nodes(self) -> list[Node]:
    """Retrieve all registered scheduler nodes.

    Returns:
        list[Node]: A list containing all nodes currently stored in the
        node store.
    """
    return self._node_store.get_all_nodes()


  def get_task_by_id(self, id: UUID):
    """Retrieve a task by its unique identifier.

    Args:
      id (UUID): Unique task identifier.

    Returns:
      Any: Task object associated with the given ID.
    """
    return self._task_store.get_task_by_id(id)

__init__(db_url, script_path, number_of_workers=cpu_count(), scheduler_heartbeat_interval=5, scheduler_timeout_sec=20, process_manager_interval=1.0, stop_worker_timeout=60, logging_level=logging.INFO)

Create a new Lilota runtime instance.

Parameters:

Name Type Description Default
db_url str

Database connection URL used by the scheduler.

required
script_path str

Path to a Python script that defines and registers Lilota tasks. Each worker process executes this script as its entry point.

required
number_of_workers int

Number of worker processes to spawn. Defaults to the number of CPU cores.

cpu_count()
scheduler_heartbeat_interval float

Interval in seconds between scheduler node heartbeats. Defaults to 5 seconds.

5
scheduler_timeout_sec int

Time in seconds after which a node is considered inactive if no heartbeat is received. Defaults to 20 seconds.

20
stop_worker_timeout int

Maximum time in seconds to wait for worker processes to exit gracefully before they are forcefully killed. Defaults to 60 seconds.

60
logging_level int

Logging level used by the scheduler. Defaults to logging.INFO.

INFO
Source code in lilota/core.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def __init__(self, db_url: str, script_path: str, number_of_workers: int = cpu_count(), scheduler_heartbeat_interval: float = 5, scheduler_timeout_sec: int = 20, process_manager_interval: float = 1.0, stop_worker_timeout: int = 60, logging_level = logging.INFO):
  """Create a new Lilota runtime instance.

  Args:
    db_url (str):
      Database connection URL used by the scheduler.

    script_path (str):
      Path to a Python script that defines and registers Lilota tasks.
      Each worker process executes this script as its entry point.

    number_of_workers (int, optional):
      Number of worker processes to spawn. Defaults to the number of CPU cores.

    scheduler_heartbeat_interval (float, optional):
      Interval in seconds between scheduler node heartbeats.
      Defaults to 5 seconds.

    scheduler_timeout_sec (int, optional):
      Time in seconds after which a node is considered inactive
      if no heartbeat is received. Defaults to 20 seconds.

    stop_worker_timeout (int, optional):
      Maximum time in seconds to wait for worker processes to
      exit gracefully before they are forcefully killed.
      Defaults to 60 seconds.

    logging_level (int, optional):
      Logging level used by the scheduler. Defaults to logging.INFO.
  """
  self._db_url = db_url
  self._script_path = script_path
  self._number_of_workers = number_of_workers
  self._process_manager_interval = process_manager_interval
  self._stop_worker_timeout = stop_worker_timeout
  self._error_queue: Queue = None
  self._processes: list[WorkerProcess] = []
  self._process_manager_heartbeat: Heartbeat = None

  self._scheduler = LilotaScheduler(
    db_url=db_url,
    node_heartbeat_interval=scheduler_heartbeat_interval,
    node_timeout_sec=scheduler_timeout_sec,
    logging_level=logging_level
  )

  self._node_store = SqlAlchemyNodeStore(self._db_url, None)
  self._task_store = SqlAlchemyTaskStore(self._db_url, None)
  self._is_started = False

get_all_nodes()

Retrieve all registered scheduler nodes.

Returns:

Type Description
list[Node]

list[Node]: A list containing all nodes currently stored in the

list[Node]

node store.

Source code in lilota/core.py
331
332
333
334
335
336
337
338
def get_all_nodes(self) -> list[Node]:
  """Retrieve all registered scheduler nodes.

  Returns:
      list[Node]: A list containing all nodes currently stored in the
      node store.
  """
  return self._node_store.get_all_nodes()

get_task_by_id(id)

Retrieve a task by its unique identifier.

Parameters:

Name Type Description Default
id UUID

Unique task identifier.

required

Returns:

Name Type Description
Any

Task object associated with the given ID.

Source code in lilota/core.py
341
342
343
344
345
346
347
348
349
350
def get_task_by_id(self, id: UUID):
  """Retrieve a task by its unique identifier.

  Args:
    id (UUID): Unique task identifier.

  Returns:
    Any: Task object associated with the given ID.
  """
  return self._task_store.get_task_by_id(id)

join()

Block until all worker processes have finished.

This is typically used in long-running worker setups where the main process should wait until workers exit.

Source code in lilota/core.py
307
308
309
310
311
312
313
314
315
def join(self):
  """Block until all worker processes have finished.

  This is typically used in long-running worker setups where the main
  process should wait until workers exit.
  """
  for p in self._processes:
    if p.is_alive():
      p.join()

schedule(name, input=None)

Schedule a task for execution.

Parameters:

Name Type Description Default
name str

Name of the registered task.

required
input Any

Input payload for the task. Defaults to None.

None

Returns:

Name Type Description
int int

Identifier of the scheduled task.

Source code in lilota/core.py
318
319
320
321
322
323
324
325
326
327
328
def schedule(self, name: str, input: Any = None) -> int:
  """Schedule a task for execution.

  Args:
    name (str): Name of the registered task.
    input (Any, optional): Input payload for the task. Defaults to None.

  Returns:
    int: Identifier of the scheduled task.
  """
  return self._scheduler.schedule(name, input)

start()

Start the scheduler and spawn worker processes.

This method
  1. Starts the Lilota scheduler.
  2. Launches worker processes that execute the configured task script.
  3. Monitors the started processes.

After startup, the instance is ready to schedule and process tasks.

Source code in lilota/core.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def start(self):
  """Start the scheduler and spawn worker processes.

  This method:
    1. Starts the Lilota scheduler.
    2. Launches worker processes that execute the configured task script.
    3. Monitors the started processes.

  After startup, the instance is ready to schedule and process tasks.
  """
  self._error_queue = Queue()
  self._start_scheduler()
  self._start_workers()
  self._start_process_manager()
  self._is_started = True

stop()

Stop the scheduler and terminate all worker processes.

Worker processes are first asked to terminate gracefully. If a worker does not exit within stop_worker_timeout seconds, it will be forcefully killed.

After completion, all worker processes are cleaned up and removed from the internal process list.

Source code in lilota/core.py
254
255
256
257
258
259
260
261
262
263
264
265
266
267
def stop(self):
  """Stop the scheduler and terminate all worker processes.

  Worker processes are first asked to terminate gracefully. If a worker
  does not exit within ``stop_worker_timeout`` seconds, it will be
  forcefully killed.

  After completion, all worker processes are cleaned up and removed
  from the internal process list.
  """
  self._stop_scheduler()
  self._stop_process_manager()
  self._stop_workers()
  self._is_started = False

ProcessManagerTask

Bases: HeartbeatTask

Periodic task responsible for monitoring child processes and propagating exceptions raised inside them to the parent process.

The task checks a shared error queue for exceptions reported by child processes. If an error is found, it reconstructs and raises the exception in the parent process context so it can be handled or terminate execution.

Source code in lilota/core.py
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
class ProcessManagerTask(HeartbeatTask):
  """
  Periodic task responsible for monitoring child processes and propagating
  exceptions raised inside them to the parent process.

  The task checks a shared error queue for exceptions reported by child
  processes. If an error is found, it reconstructs and raises the exception
  in the parent process context so it can be handled or terminate execution.
  """

  def __init__(self, interval: float, processes: list[Process], error_queue: Queue, process_factory: Callable[[], WorkerProcess]):
    """
    Initialize the process manager task.

    Args:
        interval (float): Time interval (in seconds) between task executions.
        processes (list[Process]): List of managed child processes.
        error_queue (Queue): Queue used by child processes to report
            exceptions back to the parent process. Each entry is expected
            to contain a dictionary with exception metadata.
        process_factory (Callable[[], Process]): Function that creates a new worker
            process and returns it.
    """
    super().__init__(interval)
    self._processes: list[WorkerProcess] = processes
    self._error_queue: Queue = error_queue
    self._process_factory = process_factory


  def execute(self):
    """
    Execute one monitoring cycle.

    Attempts to retrieve an error report from the error queue without
    blocking. If an error is present, it will be reconstructed and raised
    in the parent process via `raise_child_exception`.
    The processes are also monitored. If a process is no longer alive, 
    it is removed from the list and a new worker process is started.
    """
    self._retrieve_error()
    self._monitor_processes()


  def _retrieve_error(self):
    try:
      error = self._error_queue.get_nowait()
      self.raise_child_exception(error)
    except Empty:
      pass


  def _monitor_processes(self):
    for i, p in enumerate(self._processes):
      if p.exitcode is not None:
        new_process = self._process_factory()
        self._processes[i] = new_process
        new_process.start()


  def raise_child_exception(self, error_info):
    """
    Reconstruct and raise an exception originating from a child process.

    The error information is expected to be a dictionary containing:
        - "type": Name of the exception class
        - "message": Original exception message
        - "traceback": Serialized traceback string from the child process

    The method attempts to recreate the exception using the built-in
    exception class matching the provided type name. If the exception
    type cannot be resolved, a RuntimeError is used as a fallback.

    Args:
        error_info (dict): Dictionary containing exception metadata from
            a child process.

    Raises:
        Exception: Reconstructed exception with the original message and
        appended child traceback.
    """
    exc_type = error_info["type"]
    exc_message = error_info["message"]
    tb = error_info["traceback"]

    # Create a new exception instance with the same message
    # Fallback to RuntimeError if type unknown
    try:
      # Get exception class from builtins
      exc_cls = getattr(__builtins__, exc_type)
    except AttributeError:
      exc_cls = RuntimeError

    # Raise it with original message, attach traceback
    raise exc_cls(f"{exc_message}\nChild traceback:\n{tb}")

__init__(interval, processes, error_queue, process_factory)

Initialize the process manager task.

Parameters:

Name Type Description Default
interval float

Time interval (in seconds) between task executions.

required
processes list[Process]

List of managed child processes.

required
error_queue Queue

Queue used by child processes to report exceptions back to the parent process. Each entry is expected to contain a dictionary with exception metadata.

required
process_factory Callable[[], Process]

Function that creates a new worker process and returns it.

required
Source code in lilota/core.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def __init__(self, interval: float, processes: list[Process], error_queue: Queue, process_factory: Callable[[], WorkerProcess]):
  """
  Initialize the process manager task.

  Args:
      interval (float): Time interval (in seconds) between task executions.
      processes (list[Process]): List of managed child processes.
      error_queue (Queue): Queue used by child processes to report
          exceptions back to the parent process. Each entry is expected
          to contain a dictionary with exception metadata.
      process_factory (Callable[[], Process]): Function that creates a new worker
          process and returns it.
  """
  super().__init__(interval)
  self._processes: list[WorkerProcess] = processes
  self._error_queue: Queue = error_queue
  self._process_factory = process_factory

execute()

Execute one monitoring cycle.

Attempts to retrieve an error report from the error queue without blocking. If an error is present, it will be reconstructed and raised in the parent process via raise_child_exception. The processes are also monitored. If a process is no longer alive, it is removed from the list and a new worker process is started.

Source code in lilota/core.py
73
74
75
76
77
78
79
80
81
82
83
84
def execute(self):
  """
  Execute one monitoring cycle.

  Attempts to retrieve an error report from the error queue without
  blocking. If an error is present, it will be reconstructed and raised
  in the parent process via `raise_child_exception`.
  The processes are also monitored. If a process is no longer alive, 
  it is removed from the list and a new worker process is started.
  """
  self._retrieve_error()
  self._monitor_processes()

raise_child_exception(error_info)

Reconstruct and raise an exception originating from a child process.

The error information is expected to be a dictionary containing
  • "type": Name of the exception class
  • "message": Original exception message
  • "traceback": Serialized traceback string from the child process

The method attempts to recreate the exception using the built-in exception class matching the provided type name. If the exception type cannot be resolved, a RuntimeError is used as a fallback.

Parameters:

Name Type Description Default
error_info dict

Dictionary containing exception metadata from a child process.

required

Raises:

Type Description
Exception

Reconstructed exception with the original message and

Source code in lilota/core.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def raise_child_exception(self, error_info):
  """
  Reconstruct and raise an exception originating from a child process.

  The error information is expected to be a dictionary containing:
      - "type": Name of the exception class
      - "message": Original exception message
      - "traceback": Serialized traceback string from the child process

  The method attempts to recreate the exception using the built-in
  exception class matching the provided type name. If the exception
  type cannot be resolved, a RuntimeError is used as a fallback.

  Args:
      error_info (dict): Dictionary containing exception metadata from
          a child process.

  Raises:
      Exception: Reconstructed exception with the original message and
      appended child traceback.
  """
  exc_type = error_info["type"]
  exc_message = error_info["message"]
  tb = error_info["traceback"]

  # Create a new exception instance with the same message
  # Fallback to RuntimeError if type unknown
  try:
    # Get exception class from builtins
    exc_cls = getattr(__builtins__, exc_type)
  except AttributeError:
    exc_cls = RuntimeError

  # Raise it with original message, attach traceback
  raise exc_cls(f"{exc_message}\nChild traceback:\n{tb}")

LilotaNode

Bases: ABC

Abstract base class for all Lilota nodes.

A node represents a running component in the Lilota system, such as a scheduler or a worker. This class handles common functionality including:

  • database initialization and migrations
  • node lifecycle management
  • node status updates
  • heartbeat management
  • access to node and task stores
Source code in lilota/node.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
class LilotaNode(ABC):
  """Abstract base class for all Lilota nodes.

  A node represents a running component in the Lilota system, such as a
  scheduler or a worker. This class handles common functionality including:

  - database initialization and migrations
  - node lifecycle management
  - node status updates
  - heartbeat management
  - access to node and task stores
  """

  def __init__(
    self,
    *,
    db_url: str,
    node_type: NodeType,
    node_heartbeat_interval: float,
    node_timeout_sec: int,
    logger_name: str,
    logging_level):
    """Initialize a Lilota node.

    Args:
      db_url (str): Database connection URL.
      node_type (NodeType): Type of node (`scheduler` or `worker`).
      node_heartbeat_interval (float): Interval in seconds between node
        heartbeat updates.
      node_timeout_sec (int): Time in seconds after which a node is
        considered inactive if no heartbeat is received.
      logger_name (str): Name of the logger used by the node.
      logging_level (int): Logging level used for the node logger.
    """

    self._db_url = db_url
    self._node_type = node_type
    self._node_heartbeat_interval = node_heartbeat_interval
    self._node_heartbeat_join_timeout_in_sec = 60
    self._node_timeout_sec = node_timeout_sec
    self._node_id = None
    self._node_store = None
    self._task_store = None
    self._heartbeat: Heartbeat = None
    self._logging_level = logging_level
    self._is_started = False

    # Upgrade the database
    upgrade_db(self._db_url)

    # Setup logging
    self._logger = configure_logging(self._db_url, logger_name, logging_level)


  def start(self):
    """Start the node.

    Initializes the node in the database, sets its status to ``RUNNING``,
    and triggers the node-specific startup logic implemented in
    ``_on_started()``.

    Raises:
      Exception: If the node is already started.
    """

    # Check if the node is already started
    if self._is_started:
      raise Exception("The node is already started")

    if not self._node_id:
      # Create stores
      self._node_store = SqlAlchemyNodeStore(self._db_url, self._logger)
      self._task_store = SqlAlchemyTaskStore(self._db_url, self._logger, self._should_set_progress_manually())

      # Create node with status STARTING
      self._node_id = self._node_store.create_node(self._node_type, NodeStatus.IDLE)

      # Create a context logger
      self._logger = create_context_logger(self._logger, node_id=self._node_id)

    # Change status to RUNNING
    self._node_store.update_node_status(self._node_id, NodeStatus.RUNNING)

    # Set the node as started
    self._is_started = True

    # On started
    self._on_started()


  def stop(self):
    """Stop the node.

    Updates the node status to ``STOPPING``, executes subclass-specific
    shutdown logic via ``_on_stop()``, and then sets the node status to
    ``IDLE``.

    Raises:
      Exception: If the node was not started.
    """

    # Check if the node was started
    if not self._is_started:
      raise Exception("The node cannot be stopped because it was not started")

    # Change status to STOPPING
    self._node_store.update_node_status(self._node_id, NodeStatus.STOPPING)

    # Stop additional stuff
    self._on_stop()

    # Change status to IDLE
    self._node_store.update_node_status(self._node_id, NodeStatus.IDLE)

    # Log Node stopped message
    self._logger.debug("Node stopped")

    # Set the node as not started
    self._is_started = False


  def get_nodes(self):
    """Return all nodes currently registered in the system.

    Returns:
      list: A list of node records.
    """
    return self._node_store.get_all_nodes()


  def get_node(self):
    """Return the current node record.

    Returns:
      Any | None: The node record if the node has been created,
      otherwise ``None``.
    """
    return self._node_store.get_node_by_id(self._node_id) if self._node_id else None


  def get_task_by_id(self, id: UUID):
    """Retrieve a task by its identifier.

    Args:
      id (UUID): Unique identifier of the task.

    Returns:
      Any: Task record associated with the given ID.
    """
    return self._task_store.get_task_by_id(id)


  def delete_task_by_id(self, id: UUID):
    """Delete a task from the system.

    Args:
      id (UUID): Unique identifier of the task.

    Returns:
      bool: True if the task was deleted, otherwise False.
    """
    self._logger.debug(f"Delete task with id {id}")
    success = self._task_store.delete_task_by_id(id)
    if success:
      self._logger.debug(f"Task deleted!")
    else:
      self._logger.debug(f"Task not deleted!")
    return success


  def _stop_node_heartbeat(self):
    # Stop heartbeat thread
    if self._heartbeat:
      self._heartbeat.stop_and_join(timeout=self._node_heartbeat_join_timeout_in_sec)
      self._heartbeat = None


  @abstractmethod
  def _on_started(self):
    """Hook executed after the node has successfully started.

    Subclasses should implement this method to start any required
    background threads, heartbeats, or task loops.
    """
    pass


  @abstractmethod
  def _on_stop(self):
    """Hook executed when the node is stopping.

    Subclasses should implement this method to clean up resources
    such as threads or background workers.
    """
    pass


  @abstractmethod
  def _should_set_progress_manually(self) -> bool:
    """Determine whether task progress should be set manually.

    Returns:
      bool: True if users must manually update the task progress, otherwise False.
    """
    pass

__init__(*, db_url, node_type, node_heartbeat_interval, node_timeout_sec, logger_name, logging_level)

Initialize a Lilota node.

Parameters:

Name Type Description Default
db_url str

Database connection URL.

required
node_type NodeType

Type of node (scheduler or worker).

required
node_heartbeat_interval float

Interval in seconds between node heartbeat updates.

required
node_timeout_sec int

Time in seconds after which a node is considered inactive if no heartbeat is received.

required
logger_name str

Name of the logger used by the node.

required
logging_level int

Logging level used for the node logger.

required
Source code in lilota/node.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def __init__(
  self,
  *,
  db_url: str,
  node_type: NodeType,
  node_heartbeat_interval: float,
  node_timeout_sec: int,
  logger_name: str,
  logging_level):
  """Initialize a Lilota node.

  Args:
    db_url (str): Database connection URL.
    node_type (NodeType): Type of node (`scheduler` or `worker`).
    node_heartbeat_interval (float): Interval in seconds between node
      heartbeat updates.
    node_timeout_sec (int): Time in seconds after which a node is
      considered inactive if no heartbeat is received.
    logger_name (str): Name of the logger used by the node.
    logging_level (int): Logging level used for the node logger.
  """

  self._db_url = db_url
  self._node_type = node_type
  self._node_heartbeat_interval = node_heartbeat_interval
  self._node_heartbeat_join_timeout_in_sec = 60
  self._node_timeout_sec = node_timeout_sec
  self._node_id = None
  self._node_store = None
  self._task_store = None
  self._heartbeat: Heartbeat = None
  self._logging_level = logging_level
  self._is_started = False

  # Upgrade the database
  upgrade_db(self._db_url)

  # Setup logging
  self._logger = configure_logging(self._db_url, logger_name, logging_level)

delete_task_by_id(id)

Delete a task from the system.

Parameters:

Name Type Description Default
id UUID

Unique identifier of the task.

required

Returns:

Name Type Description
bool

True if the task was deleted, otherwise False.

Source code in lilota/node.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
def delete_task_by_id(self, id: UUID):
  """Delete a task from the system.

  Args:
    id (UUID): Unique identifier of the task.

  Returns:
    bool: True if the task was deleted, otherwise False.
  """
  self._logger.debug(f"Delete task with id {id}")
  success = self._task_store.delete_task_by_id(id)
  if success:
    self._logger.debug(f"Task deleted!")
  else:
    self._logger.debug(f"Task not deleted!")
  return success

get_node()

Return the current node record.

Returns:

Type Description

Any | None: The node record if the node has been created,

otherwise None.

Source code in lilota/node.py
177
178
179
180
181
182
183
184
def get_node(self):
  """Return the current node record.

  Returns:
    Any | None: The node record if the node has been created,
    otherwise ``None``.
  """
  return self._node_store.get_node_by_id(self._node_id) if self._node_id else None

get_nodes()

Return all nodes currently registered in the system.

Returns:

Name Type Description
list

A list of node records.

Source code in lilota/node.py
168
169
170
171
172
173
174
def get_nodes(self):
  """Return all nodes currently registered in the system.

  Returns:
    list: A list of node records.
  """
  return self._node_store.get_all_nodes()

get_task_by_id(id)

Retrieve a task by its identifier.

Parameters:

Name Type Description Default
id UUID

Unique identifier of the task.

required

Returns:

Name Type Description
Any

Task record associated with the given ID.

Source code in lilota/node.py
187
188
189
190
191
192
193
194
195
196
def get_task_by_id(self, id: UUID):
  """Retrieve a task by its identifier.

  Args:
    id (UUID): Unique identifier of the task.

  Returns:
    Any: Task record associated with the given ID.
  """
  return self._task_store.get_task_by_id(id)

start()

Start the node.

Initializes the node in the database, sets its status to RUNNING, and triggers the node-specific startup logic implemented in _on_started().

Raises:

Type Description
Exception

If the node is already started.

Source code in lilota/node.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def start(self):
  """Start the node.

  Initializes the node in the database, sets its status to ``RUNNING``,
  and triggers the node-specific startup logic implemented in
  ``_on_started()``.

  Raises:
    Exception: If the node is already started.
  """

  # Check if the node is already started
  if self._is_started:
    raise Exception("The node is already started")

  if not self._node_id:
    # Create stores
    self._node_store = SqlAlchemyNodeStore(self._db_url, self._logger)
    self._task_store = SqlAlchemyTaskStore(self._db_url, self._logger, self._should_set_progress_manually())

    # Create node with status STARTING
    self._node_id = self._node_store.create_node(self._node_type, NodeStatus.IDLE)

    # Create a context logger
    self._logger = create_context_logger(self._logger, node_id=self._node_id)

  # Change status to RUNNING
  self._node_store.update_node_status(self._node_id, NodeStatus.RUNNING)

  # Set the node as started
  self._is_started = True

  # On started
  self._on_started()

stop()

Stop the node.

Updates the node status to STOPPING, executes subclass-specific shutdown logic via _on_stop(), and then sets the node status to IDLE.

Raises:

Type Description
Exception

If the node was not started.

Source code in lilota/node.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def stop(self):
  """Stop the node.

  Updates the node status to ``STOPPING``, executes subclass-specific
  shutdown logic via ``_on_stop()``, and then sets the node status to
  ``IDLE``.

  Raises:
    Exception: If the node was not started.
  """

  # Check if the node was started
  if not self._is_started:
    raise Exception("The node cannot be stopped because it was not started")

  # Change status to STOPPING
  self._node_store.update_node_status(self._node_id, NodeStatus.STOPPING)

  # Stop additional stuff
  self._on_stop()

  # Change status to IDLE
  self._node_store.update_node_status(self._node_id, NodeStatus.IDLE)

  # Log Node stopped message
  self._logger.debug("Node stopped")

  # Set the node as not started
  self._is_started = False

NodeHeartbeatTask

Bases: HeartbeatTask

Heartbeat task used by Lilota nodes.

This task periodically updates the last_seen_at timestamp of the associated node in the database to indicate that the node is still alive.

Source code in lilota/node.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class NodeHeartbeatTask(HeartbeatTask):
  """Heartbeat task used by Lilota nodes.

  This task periodically updates the ``last_seen_at`` timestamp of the
  associated node in the database to indicate that the node is still alive.
  """

  def __init__(self, interval: float, node_id: str, node_store: SqlAlchemyNodeStore, logger: logging.Logger):
    """Initialize the heartbeat task.

    Args:
      interval (float): Interval in seconds between heartbeat executions.
      node_id (str): Unique identifier of the node.
      node_store (SqlAlchemyNodeStore): Store used for node operations.
      logger (logging.Logger): Logger instance used for reporting errors.
    """
    super().__init__(interval)
    self._node_id = node_id
    self._node_store = node_store
    self._logger = logger


  def execute(self):
    """Execute the heartbeat update.

    Updates the ``last_seen_at`` field of the node in the database.
    Any errors are logged without interrupting the heartbeat loop.
    """
    try:
      self._node_store.update_node_last_seen_at(self._node_id)
    except Exception:
      self._logger.exception(f"Heartbeat update failed for node_id '{self._node_id}'")

__init__(interval, node_id, node_store, logger)

Initialize the heartbeat task.

Parameters:

Name Type Description Default
interval float

Interval in seconds between heartbeat executions.

required
node_id str

Unique identifier of the node.

required
node_store SqlAlchemyNodeStore

Store used for node operations.

required
logger Logger

Logger instance used for reporting errors.

required
Source code in lilota/node.py
19
20
21
22
23
24
25
26
27
28
29
30
31
def __init__(self, interval: float, node_id: str, node_store: SqlAlchemyNodeStore, logger: logging.Logger):
  """Initialize the heartbeat task.

  Args:
    interval (float): Interval in seconds between heartbeat executions.
    node_id (str): Unique identifier of the node.
    node_store (SqlAlchemyNodeStore): Store used for node operations.
    logger (logging.Logger): Logger instance used for reporting errors.
  """
  super().__init__(interval)
  self._node_id = node_id
  self._node_store = node_store
  self._logger = logger

execute()

Execute the heartbeat update.

Updates the last_seen_at field of the node in the database. Any errors are logged without interrupting the heartbeat loop.

Source code in lilota/node.py
34
35
36
37
38
39
40
41
42
43
def execute(self):
  """Execute the heartbeat update.

  Updates the ``last_seen_at`` field of the node in the database.
  Any errors are logged without interrupting the heartbeat loop.
  """
  try:
    self._node_store.update_node_last_seen_at(self._node_id)
  except Exception:
    self._logger.exception(f"Heartbeat update failed for node_id '{self._node_id}'")

LilotaScheduler

Bases: LilotaNode

Scheduler node responsible for creating and managing tasks.

The scheduler registers itself as a scheduler node in the system and periodically sends heartbeats to indicate it is alive. Its main role is to create tasks and store them in the task store for workers to execute.

Source code in lilota/scheduler.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class LilotaScheduler(LilotaNode):
  """Scheduler node responsible for creating and managing tasks.

  The scheduler registers itself as a scheduler node in the system and
  periodically sends heartbeats to indicate it is alive. Its main role
  is to create tasks and store them in the task store for workers to
  execute.
  """

  LOGGER_NAME = "lilota.scheduler"

  def __init__(self, db_url: str, node_heartbeat_interval: float = 5.0, node_timeout_sec: int = 20, logging_level=logging.INFO, **kwargs):
    """Initialize the scheduler node.

    Args:
      db_url (str): Database connection URL.
      node_heartbeat_interval (float, optional): Interval in seconds between
        node heartbeats. Defaults to 5.0.
      node_timeout_sec (int, optional): Time in seconds before a node is
        considered inactive. Defaults to 20.
      logging_level (int, optional): Logging level used by the scheduler.
        Defaults to logging.INFO.
      **kwargs: Additional keyword arguments passed to the parent
        ``LilotaNode`` initializer.
    """
    super().__init__(
      db_url=db_url,
      node_type=NodeType.SCHEDULER,
      node_heartbeat_interval=node_heartbeat_interval,
      node_timeout_sec=node_timeout_sec,
      logger_name=self.LOGGER_NAME,
      logging_level=logging_level,
      **kwargs,
    )


  def _on_started(self):
    # Start heartbeat thread
    heartbeat_task = NodeHeartbeatTask(
      self._node_heartbeat_interval, 
      self._node_id, 
      self._node_store, 
      self._logger
    )
    self._heartbeat = Heartbeat(f"scheduler_heartbeat_{self._node_id}", heartbeat_task, self._logger)
    self._heartbeat.start()

    # Log Node started message
    self._logger.debug("Node started")


  def _on_stop(self):
    # Stop heartbeat thread
    self._stop_node_heartbeat()


  def _should_set_progress_manually(self):
    return False


  def schedule(self, name: str, input: Any = None) -> int:
    """Create and store a new task.

    Args:
      name (str): Name of the registered task.
      input (Any, optional): Input payload for the task. Defaults to None.

    Returns:
      int: Identifier of the created task.
    """
    self._logger.debug(f"Create task (name: '{name}', input: {input})")
    return self._task_store.create_task(name, input)


  def has_unfinished_tasks(self):
    """Check whether there are unfinished tasks in the system.

    Returns:
      bool: True if unfinished tasks exist, otherwise False.
    """
    return self._task_store.has_unfinished_tasks()

__init__(db_url, node_heartbeat_interval=5.0, node_timeout_sec=20, logging_level=logging.INFO, **kwargs)

Initialize the scheduler node.

Parameters:

Name Type Description Default
db_url str

Database connection URL.

required
node_heartbeat_interval float

Interval in seconds between node heartbeats. Defaults to 5.0.

5.0
node_timeout_sec int

Time in seconds before a node is considered inactive. Defaults to 20.

20
logging_level int

Logging level used by the scheduler. Defaults to logging.INFO.

INFO
**kwargs

Additional keyword arguments passed to the parent LilotaNode initializer.

{}
Source code in lilota/scheduler.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def __init__(self, db_url: str, node_heartbeat_interval: float = 5.0, node_timeout_sec: int = 20, logging_level=logging.INFO, **kwargs):
  """Initialize the scheduler node.

  Args:
    db_url (str): Database connection URL.
    node_heartbeat_interval (float, optional): Interval in seconds between
      node heartbeats. Defaults to 5.0.
    node_timeout_sec (int, optional): Time in seconds before a node is
      considered inactive. Defaults to 20.
    logging_level (int, optional): Logging level used by the scheduler.
      Defaults to logging.INFO.
    **kwargs: Additional keyword arguments passed to the parent
      ``LilotaNode`` initializer.
  """
  super().__init__(
    db_url=db_url,
    node_type=NodeType.SCHEDULER,
    node_heartbeat_interval=node_heartbeat_interval,
    node_timeout_sec=node_timeout_sec,
    logger_name=self.LOGGER_NAME,
    logging_level=logging_level,
    **kwargs,
  )

has_unfinished_tasks()

Check whether there are unfinished tasks in the system.

Returns:

Name Type Description
bool

True if unfinished tasks exist, otherwise False.

Source code in lilota/scheduler.py
83
84
85
86
87
88
89
def has_unfinished_tasks(self):
  """Check whether there are unfinished tasks in the system.

  Returns:
    bool: True if unfinished tasks exist, otherwise False.
  """
  return self._task_store.has_unfinished_tasks()

schedule(name, input=None)

Create and store a new task.

Parameters:

Name Type Description Default
name str

Name of the registered task.

required
input Any

Input payload for the task. Defaults to None.

None

Returns:

Name Type Description
int int

Identifier of the created task.

Source code in lilota/scheduler.py
69
70
71
72
73
74
75
76
77
78
79
80
def schedule(self, name: str, input: Any = None) -> int:
  """Create and store a new task.

  Args:
    name (str): Name of the registered task.
    input (Any, optional): Input payload for the task. Defaults to None.

  Returns:
    int: Identifier of the created task.
  """
  self._logger.debug(f"Create task (name: '{name}', input: {input})")
  return self._task_store.create_task(name, input)

LilotaWorker

Bases: LilotaNode

Worker node responsible for executing scheduled tasks.

Workers poll the task store for pending tasks, execute registered functions, and update task status and progress. Each worker also sends periodic heartbeats and participates in leader election for cluster maintenance.

Source code in lilota/worker.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
class LilotaWorker(LilotaNode):
  """Worker node responsible for executing scheduled tasks.

  Workers poll the task store for pending tasks, execute registered
  functions, and update task status and progress. Each worker also
  sends periodic heartbeats and participates in leader election
  for cluster maintenance.
  """

  LOGGER_NAME = "lilota.worker"

  def __init__(
    self,
    db_url: str,
    node_heartbeat_interval: float = 5.0,
    node_timeout_sec: int = 20,
    task_heartbeat_interval: float = 0.1,
    max_task_heartbeat_interval: float = 5.0,
    set_progress_manually: bool = False,
    logging_level=logging.INFO,
    **kwargs):
    """Initialize a worker node.

    Args:
      db_url (str): Database connection URL.
      node_heartbeat_interval (float, optional): Interval in seconds between
        node heartbeats. Defaults to 5.0.
      node_timeout_sec (int, optional): Time in seconds before a node is
        considered inactive. Defaults to 20.
      task_heartbeat_interval (float, optional): Initial interval in seconds
        between polling attempts for tasks. Defaults to 0.1.
      max_task_heartbeat_interval (float, optional): Maximum polling interval
        when no tasks are available. Defaults to 5.0.
      set_progress_manually (bool, optional): User is responsible for setting
      the task progress. Defaults to False.
      logging_level (int, optional): Logging level used by the worker.
      **kwargs: Additional keyword arguments passed to ``LilotaNode``.
    """

    super().__init__(
      db_url=db_url,
      node_type=NodeType.WORKER,
      node_heartbeat_interval=node_heartbeat_interval,
      node_timeout_sec=node_timeout_sec,
      logger_name=self.LOGGER_NAME,
      logging_level=logging_level,
      **kwargs,
    )

    self._task_heartbeat_interval: float = task_heartbeat_interval
    self._max_task_heartbeat_interval: float = max_task_heartbeat_interval
    self._set_progress_manually: bool = set_progress_manually
    self._registry: dict[str, RegisteredTask] = {}


  def _register(
    self,
    name: str,
    func: Callable,
    *,
    input_model: Optional[Type[Any]] = None,
    output_model: Optional[Type[Any]] = None,
    task_progress: Optional[TaskProgress] = None,
    timeout: Optional[timedelta] = None
  ):
    if self._is_started:
      raise Exception("It is not allowed to register functions after startup. Stop by using the stop() method.")

    if name in self._registry:
      raise RuntimeError(f"Task {name!r} is already registered")

    if task_progress is not None and not isinstance(task_progress, type(TaskProgress)):
      raise TypeError("task_progress must be of type TaskProgress")

    # Register the task
    task = RegisteredTask(
      func=func,
      input_model=input_model,
      output_model=output_model,
      task_progress=task_progress,
      timeout=timeout
    )

    self._registry[name] = task


  def register(
    self,
    name: str,
    *,
    input_model=None,
    output_model=None,
    task_progress=None,
    timeout=timedelta(minutes=5)
  ):
    """Decorator for registering a task function.

    This method allows task registration using decorator syntax.

    Example:
      @lilota.register("my_task")
      def my_task(data):
        return data

    Args:
      name (str): Unique name of the task.
      input_model (Optional[Type[Any]]): Optional input validation model.
      output_model (Optional[Type[Any]]): Optional output validation model.
      task_progress (Optional[TaskProgress]): Task progress tracking strategy.
      timeout (Optional[timedelta]): Optional timeout that can be set for a task.

    Returns:
      Callable: A decorator that registers the function.
    """
    def decorator(func):
      self._register(
        name=name,
        func=func,
        input_model=input_model,
        output_model=output_model,
        task_progress=task_progress,
        timeout=timeout
      )
      return func
    return decorator


  def _on_started(self):
    # Create node leader store
    node_leader_store = SqlAlchemyNodeLeaderStore(self._db_url, self._logger, self._node_timeout_sec)

    # Start worker heartbeat thread
    heartbeat_task = WorkerHeartbeatTask(
      self._node_heartbeat_interval, 
      self._node_id, 
      self._node_timeout_sec, 
      self._node_store,
      node_leader_store,
      self._task_store,
      self._logger
    )
    self._heartbeat = Heartbeat(f"scheduler_heartbeat_{self._node_id}", heartbeat_task, self._logger)
    self._heartbeat.start()

    # Log Node started message
    self._logger.debug(f"Node started")

    # Execute tasks
    self._execute_tasks()


  def _execute_tasks(self):
    interval: float = self._task_heartbeat_interval

    while True:
      # Get the next available task
      task = self._task_store.get_next_task(self._node_id)
      if task:
        # Initialize the variables
        task_id = task.id
        interval = 0.1

        # Configure logging
        logger: logging.Logger = create_context_logger(self._logger, node_id=self._node_id, task_id=task_id)

        # Get the registered task
        registered_task = self._registry.get(task.name)
        if registered_task is None:
          error_message = f"Task {task.name!r} not registered"
          error = error_to_dict(error_message)
          self._task_store.end_task_failure(task_id, error)
          logger.error(error_message)
        else:
          # Execute the task
          try:
            # Set status to running
            timeout = registered_task.timeout
            started_task = self._task_store.start_task(task_id, timeout)

            # Set task_progress object if available
            task_progress: TaskProgress = None 
            if registered_task.task_progress is not None:
              task_progress = TaskProgress(task_id, self._task_store.set_progress)

            # Run task
            result = self._execute_task_with_watchdog(started_task, registered_task, task_progress, logger)

            # Set status to completed
            self._task_store.end_task_success(task_id, result)
          except Exception as ex:
            # Set status to failed
            self._task_store.end_task_failure(task_id, exception_to_dict(ex))
            logger.exception(f"Task execution failed (id: {task_id})")
      else:
        # Increase the interval
        interval = min(interval * 2, self._max_task_heartbeat_interval)

      # Sleep
      time.sleep(interval)


  def _execute_task_with_watchdog(self, task: Task, registered_task: RegisteredTask, task_progress: TaskProgress, logger: logging.Logger):
    if task.expires_at is None:
      return registered_task(task.input, task_progress)

    timer = self._calculate_timer_and_set_watchdog(task, logger)
    try:
      return registered_task(task.input, task_progress)
    finally:
      timer.cancel()


  def _calculate_timer_and_set_watchdog(self, task: Task, logger: logging.Logger) -> threading.Timer:
    timeout: float = max(0, (task.expires_at - datetime.now(timezone.utc)).total_seconds())

    def watchdog():
      # Log that process will be killed
      logger.error(f"The process will be stopped because the task '{task.name}' ({task.id}) has expired")

      # Kill the worker process
      os.kill(os.getpid(), signal.SIGKILL)

    timer = threading.Timer(timeout, watchdog)
    timer.start()
    return timer


  def _on_stop(self):
    # Stop worker heartbeat thread
    self._stop_node_heartbeat()


  def _should_set_progress_manually(self):
    return self._set_progress_manually

__init__(db_url, node_heartbeat_interval=5.0, node_timeout_sec=20, task_heartbeat_interval=0.1, max_task_heartbeat_interval=5.0, set_progress_manually=False, logging_level=logging.INFO, **kwargs)

Initialize a worker node.

Parameters:

Name Type Description Default
db_url str

Database connection URL.

required
node_heartbeat_interval float

Interval in seconds between node heartbeats. Defaults to 5.0.

5.0
node_timeout_sec int

Time in seconds before a node is considered inactive. Defaults to 20.

20
task_heartbeat_interval float

Initial interval in seconds between polling attempts for tasks. Defaults to 0.1.

0.1
max_task_heartbeat_interval float

Maximum polling interval when no tasks are available. Defaults to 5.0.

5.0
set_progress_manually bool

User is responsible for setting

False
logging_level int

Logging level used by the worker.

INFO
**kwargs

Additional keyword arguments passed to LilotaNode.

{}
Source code in lilota/worker.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def __init__(
  self,
  db_url: str,
  node_heartbeat_interval: float = 5.0,
  node_timeout_sec: int = 20,
  task_heartbeat_interval: float = 0.1,
  max_task_heartbeat_interval: float = 5.0,
  set_progress_manually: bool = False,
  logging_level=logging.INFO,
  **kwargs):
  """Initialize a worker node.

  Args:
    db_url (str): Database connection URL.
    node_heartbeat_interval (float, optional): Interval in seconds between
      node heartbeats. Defaults to 5.0.
    node_timeout_sec (int, optional): Time in seconds before a node is
      considered inactive. Defaults to 20.
    task_heartbeat_interval (float, optional): Initial interval in seconds
      between polling attempts for tasks. Defaults to 0.1.
    max_task_heartbeat_interval (float, optional): Maximum polling interval
      when no tasks are available. Defaults to 5.0.
    set_progress_manually (bool, optional): User is responsible for setting
    the task progress. Defaults to False.
    logging_level (int, optional): Logging level used by the worker.
    **kwargs: Additional keyword arguments passed to ``LilotaNode``.
  """

  super().__init__(
    db_url=db_url,
    node_type=NodeType.WORKER,
    node_heartbeat_interval=node_heartbeat_interval,
    node_timeout_sec=node_timeout_sec,
    logger_name=self.LOGGER_NAME,
    logging_level=logging_level,
    **kwargs,
  )

  self._task_heartbeat_interval: float = task_heartbeat_interval
  self._max_task_heartbeat_interval: float = max_task_heartbeat_interval
  self._set_progress_manually: bool = set_progress_manually
  self._registry: dict[str, RegisteredTask] = {}

register(name, *, input_model=None, output_model=None, task_progress=None, timeout=timedelta(minutes=5))

Decorator for registering a task function.

This method allows task registration using decorator syntax.

Example

@lilota.register("my_task") def my_task(data): return data

Parameters:

Name Type Description Default
name str

Unique name of the task.

required
input_model Optional[Type[Any]]

Optional input validation model.

None
output_model Optional[Type[Any]]

Optional output validation model.

None
task_progress Optional[TaskProgress]

Task progress tracking strategy.

None
timeout Optional[timedelta]

Optional timeout that can be set for a task.

timedelta(minutes=5)

Returns:

Name Type Description
Callable

A decorator that registers the function.

Source code in lilota/worker.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def register(
  self,
  name: str,
  *,
  input_model=None,
  output_model=None,
  task_progress=None,
  timeout=timedelta(minutes=5)
):
  """Decorator for registering a task function.

  This method allows task registration using decorator syntax.

  Example:
    @lilota.register("my_task")
    def my_task(data):
      return data

  Args:
    name (str): Unique name of the task.
    input_model (Optional[Type[Any]]): Optional input validation model.
    output_model (Optional[Type[Any]]): Optional output validation model.
    task_progress (Optional[TaskProgress]): Task progress tracking strategy.
    timeout (Optional[timedelta]): Optional timeout that can be set for a task.

  Returns:
    Callable: A decorator that registers the function.
  """
  def decorator(func):
    self._register(
      name=name,
      func=func,
      input_model=input_model,
      output_model=output_model,
      task_progress=task_progress,
      timeout=timeout
    )
    return func
  return decorator

WorkerHeartbeatTask

Bases: NodeHeartbeatTask

Heartbeat task used by worker nodes.

In addition to updating the node heartbeat, this task also performs leader election among workers. The elected leader periodically performs maintenance tasks such as cleaning up stale nodes.

Source code in lilota/worker.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
class WorkerHeartbeatTask(NodeHeartbeatTask):
  """Heartbeat task used by worker nodes.

  In addition to updating the node heartbeat, this task also performs
  leader election among workers. The elected leader periodically performs
  maintenance tasks such as cleaning up stale nodes.
  """

  def __init__(self, interval: float, node_id: str, node_timeout_sec: int, node_store: SqlAlchemyNodeStore, node_leader_store: SqlAlchemyNodeLeaderStore, task_store: SqlAlchemyTaskStore, logger: logging.Logger):
    """Initialize the worker heartbeat task.

    Args:
      interval (float): Interval in seconds between heartbeats.
      node_id (str): Unique identifier of the worker node.
      node_timeout_sec (int): Timeout in seconds after which nodes are
        considered dead.
      node_store (SqlAlchemyNodeStore): Store used for node operations.
      node_leader_store (SqlAlchemyNodeLeaderStore): Store used for leader
        election and leadership renewal.
      logger (logging.Logger): Logger instance.
    """
    super().__init__(interval, node_id, node_store, logger)
    self._node_timeout_sec = node_timeout_sec
    self._node_leader_store = node_leader_store
    self._task_store = task_store
    self._is_leader = False


  def execute(self):
    """Execute the heartbeat logic.

    Updates the node's last-seen timestamp and attempts to perform leader
    election. If the node becomes leader, it will also trigger cleanup tasks.
    """

    # Update last_seen_at
    super().execute()

    # Try to set leader and the leader should trigger a cleanup
    try:
      self._try_set_leader_and_run_maintenance()
    except Exception:
      self._logger.exception("Leader election failed")


  def _try_set_leader_and_run_maintenance(self):
    # Try to renew leadership
    if self._is_leader:
      self._is_leader = self._node_leader_store.renew_leadership(self._node_id)

      if not self._is_leader:
        self._logger.debug(f"Leadership lost (node id: {self._node_id})")

    # Try to acquire leadership if not leader
    if not self._is_leader:
      self._is_leader = self._node_leader_store.try_acquire_leadership(self._node_id)

    # Leader-only work
    if self._is_leader:
      self._run_maintenance()


  def _run_maintenance(self) -> None:
    try:
      self._update_status_on_dead_nodes()
      self._update_status_on_expired_tasks()
    except Exception:
      # Never let maintenance kill the heartbeat thread
      self._logger.exception("Node maintenance failed")


  def _update_status_on_dead_nodes(self):
    cutoff = datetime.now(timezone.utc) - timedelta(seconds=self._node_timeout_sec)
    cleaned = self._node_store.update_status_on_dead_nodes(cutoff, self._node_id)
    if cleaned > 0:
      self._logger.debug(f"Marked {cleaned} stale node(s) as DEAD")


  def _update_status_on_expired_tasks(self):
    self._task_store.expire_overdue_tasks()

__init__(interval, node_id, node_timeout_sec, node_store, node_leader_store, task_store, logger)

Initialize the worker heartbeat task.

Parameters:

Name Type Description Default
interval float

Interval in seconds between heartbeats.

required
node_id str

Unique identifier of the worker node.

required
node_timeout_sec int

Timeout in seconds after which nodes are considered dead.

required
node_store SqlAlchemyNodeStore

Store used for node operations.

required
node_leader_store SqlAlchemyNodeLeaderStore

Store used for leader election and leadership renewal.

required
logger Logger

Logger instance.

required
Source code in lilota/worker.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def __init__(self, interval: float, node_id: str, node_timeout_sec: int, node_store: SqlAlchemyNodeStore, node_leader_store: SqlAlchemyNodeLeaderStore, task_store: SqlAlchemyTaskStore, logger: logging.Logger):
  """Initialize the worker heartbeat task.

  Args:
    interval (float): Interval in seconds between heartbeats.
    node_id (str): Unique identifier of the worker node.
    node_timeout_sec (int): Timeout in seconds after which nodes are
      considered dead.
    node_store (SqlAlchemyNodeStore): Store used for node operations.
    node_leader_store (SqlAlchemyNodeLeaderStore): Store used for leader
      election and leadership renewal.
    logger (logging.Logger): Logger instance.
  """
  super().__init__(interval, node_id, node_store, logger)
  self._node_timeout_sec = node_timeout_sec
  self._node_leader_store = node_leader_store
  self._task_store = task_store
  self._is_leader = False

execute()

Execute the heartbeat logic.

Updates the node's last-seen timestamp and attempts to perform leader election. If the node becomes leader, it will also trigger cleanup tasks.

Source code in lilota/worker.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def execute(self):
  """Execute the heartbeat logic.

  Updates the node's last-seen timestamp and attempts to perform leader
  election. If the node becomes leader, it will also trigger cleanup tasks.
  """

  # Update last_seen_at
  super().execute()

  # Try to set leader and the leader should trigger a cleanup
  try:
    self._try_set_leader_and_run_maintenance()
  except Exception:
    self._logger.exception("Leader election failed")

SqlAlchemyLogStore

Database store for logging entries into Lilota.

Source code in lilota/stores.py
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
class SqlAlchemyLogStore():
  """Database store for logging entries into Lilota."""

  def __init__(self, db_url: str):
    """
    Args:
        db_url (str): Database connection URL.
    """
    self._db_url = db_url
    self._engine = None
    self._Session = None


  def _ensure_engine(self):
    if self._engine is None:
      self._engine = create_engine(self._db_url, pool_pre_ping=True)
      self._Session = sessionmaker(
        bind=self._engine,
        expire_on_commit=False,
      )


  def get_session(self):
    """Return a new SQLAlchemy session."""
    self._ensure_engine()
    return self._Session()


  def get_log_entries_by_node_id(self, node_id: UUID) -> list[LogEntry]:
    """Return all log entries associated with a given node."""
    with self.get_session() as session:
      return (
        session.query(LogEntry)
          .filter(LogEntry.node_id == node_id)
          .order_by(LogEntry.created_at)
          .all()
      )

__init__(db_url)

Parameters:

Name Type Description Default
db_url str

Database connection URL.

required
Source code in lilota/stores.py
400
401
402
403
404
405
406
407
def __init__(self, db_url: str):
  """
  Args:
      db_url (str): Database connection URL.
  """
  self._db_url = db_url
  self._engine = None
  self._Session = None

get_log_entries_by_node_id(node_id)

Return all log entries associated with a given node.

Source code in lilota/stores.py
425
426
427
428
429
430
431
432
433
def get_log_entries_by_node_id(self, node_id: UUID) -> list[LogEntry]:
  """Return all log entries associated with a given node."""
  with self.get_session() as session:
    return (
      session.query(LogEntry)
        .filter(LogEntry.node_id == node_id)
        .order_by(LogEntry.created_at)
        .all()
    )

get_session()

Return a new SQLAlchemy session.

Source code in lilota/stores.py
419
420
421
422
def get_session(self):
  """Return a new SQLAlchemy session."""
  self._ensure_engine()
  return self._Session()

SqlAlchemyNodeLeaderStore

Bases: StoreBase

Store managing leader election for worker nodes.

Source code in lilota/stores.py
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
class SqlAlchemyNodeLeaderStore(StoreBase):
  """Store managing leader election for worker nodes."""

  def __init__(self, db_url: str, logger: logging.Logger, node_timeout_sec: int):
    """
    Args:
        db_url (str): Database connection URL.
        logger (logging.Logger): Logger for store operations.
        node_timeout_sec (int): Leader lease timeout in seconds.
    """
    super().__init__(db_url, logger)
    self._node_timeout_sec: int = node_timeout_sec


  def try_acquire_leadership(self, node_id) -> bool:
    """Attempt to acquire leadership for the given node.

    Returns True if leadership is acquired, False otherwise.
    """
    now = datetime.now(timezone.utc)
    new_expiry = now + timedelta(seconds=self._node_timeout_sec)
    session = self._get_session()

    try:
      # Try to take over expired lease
      result = session.execute(
        update(NodeLeader)
        .where(
          NodeLeader.id == 1,
          NodeLeader.lease_expires_at < now,
        )
        .values(
          node_id=node_id,
          lease_expires_at=new_expiry,
        )
      )

      if result.rowcount == 1:
        session.commit()
        if self._logger:
          self._logger.debug(f"Leadership acquired (new node id: {node_id})")
        return True

      # Check if row exists
      exists = session.execute(
        select(NodeLeader.id).where(NodeLeader.id == 1)
      ).first()

      if exists:
        session.rollback()
        return False

      # No row → try to create it
      session.add(
        NodeLeader(
          id=1,
          node_id=node_id,
          lease_expires_at=new_expiry,
        )
      )
      session.commit()
      if self._logger:
        self._logger.debug(f"Leadership acquired first time (node id: {node_id})")
      return True
    except IntegrityError:
        # Someone else won the race to insert
        session.rollback()
        return False
    except Exception:
        session.rollback()
        if self._logger:
          self._logger.exception("Leader election failed")
        return False
    finally:
        session.close()


  def renew_leadership(self, node_id):
    """Renew leadership lease for the given node.

    Returns True if renewed successfully.
    """
    now = datetime.now(timezone.utc)
    new_expiry = now + timedelta(seconds=self._node_timeout_sec)

    with self._get_session() as session:
      result = session.execute(
        update(NodeLeader)
        .where(
          NodeLeader.id == 1,
          NodeLeader.node_id == node_id,
          NodeLeader.lease_expires_at >= now
        )
        .values(lease_expires_at=new_expiry)
      )

      session.commit()
      renewed = result.rowcount == 1
      if renewed and self._logger:
        self._logger.debug(f"Leadership renewed (node id: {node_id})")
      return renewed


  def delete_leader_by_id(self, id: int):
    """Delete the leader record by ID.

    Returns True if deleted successfully, False otherwise.
    """
    with self._get_session() as session:
      with session.begin():
        leader = session.get(Task, id)
        if leader is None:
          return False
        session.delete(leader)
    return True

__init__(db_url, logger, node_timeout_sec)

Parameters:

Name Type Description Default
db_url str

Database connection URL.

required
logger Logger

Logger for store operations.

required
node_timeout_sec int

Leader lease timeout in seconds.

required
Source code in lilota/stores.py
440
441
442
443
444
445
446
447
448
def __init__(self, db_url: str, logger: logging.Logger, node_timeout_sec: int):
  """
  Args:
      db_url (str): Database connection URL.
      logger (logging.Logger): Logger for store operations.
      node_timeout_sec (int): Leader lease timeout in seconds.
  """
  super().__init__(db_url, logger)
  self._node_timeout_sec: int = node_timeout_sec

delete_leader_by_id(id)

Delete the leader record by ID.

Returns True if deleted successfully, False otherwise.

Source code in lilota/stores.py
540
541
542
543
544
545
546
547
548
549
550
551
def delete_leader_by_id(self, id: int):
  """Delete the leader record by ID.

  Returns True if deleted successfully, False otherwise.
  """
  with self._get_session() as session:
    with session.begin():
      leader = session.get(Task, id)
      if leader is None:
        return False
      session.delete(leader)
  return True

renew_leadership(node_id)

Renew leadership lease for the given node.

Returns True if renewed successfully.

Source code in lilota/stores.py
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
def renew_leadership(self, node_id):
  """Renew leadership lease for the given node.

  Returns True if renewed successfully.
  """
  now = datetime.now(timezone.utc)
  new_expiry = now + timedelta(seconds=self._node_timeout_sec)

  with self._get_session() as session:
    result = session.execute(
      update(NodeLeader)
      .where(
        NodeLeader.id == 1,
        NodeLeader.node_id == node_id,
        NodeLeader.lease_expires_at >= now
      )
      .values(lease_expires_at=new_expiry)
    )

    session.commit()
    renewed = result.rowcount == 1
    if renewed and self._logger:
      self._logger.debug(f"Leadership renewed (node id: {node_id})")
    return renewed

try_acquire_leadership(node_id)

Attempt to acquire leadership for the given node.

Returns True if leadership is acquired, False otherwise.

Source code in lilota/stores.py
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
def try_acquire_leadership(self, node_id) -> bool:
  """Attempt to acquire leadership for the given node.

  Returns True if leadership is acquired, False otherwise.
  """
  now = datetime.now(timezone.utc)
  new_expiry = now + timedelta(seconds=self._node_timeout_sec)
  session = self._get_session()

  try:
    # Try to take over expired lease
    result = session.execute(
      update(NodeLeader)
      .where(
        NodeLeader.id == 1,
        NodeLeader.lease_expires_at < now,
      )
      .values(
        node_id=node_id,
        lease_expires_at=new_expiry,
      )
    )

    if result.rowcount == 1:
      session.commit()
      if self._logger:
        self._logger.debug(f"Leadership acquired (new node id: {node_id})")
      return True

    # Check if row exists
    exists = session.execute(
      select(NodeLeader.id).where(NodeLeader.id == 1)
    ).first()

    if exists:
      session.rollback()
      return False

    # No row → try to create it
    session.add(
      NodeLeader(
        id=1,
        node_id=node_id,
        lease_expires_at=new_expiry,
      )
    )
    session.commit()
    if self._logger:
      self._logger.debug(f"Leadership acquired first time (node id: {node_id})")
    return True
  except IntegrityError:
      # Someone else won the race to insert
      session.rollback()
      return False
  except Exception:
      session.rollback()
      if self._logger:
        self._logger.exception("Leader election failed")
      return False
  finally:
      session.close()

SqlAlchemyNodeStore

Bases: StoreBase

Database store for managing Lilota nodes.

Source code in lilota/stores.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
class SqlAlchemyNodeStore(StoreBase):
  """Database store for managing Lilota nodes."""

  def __init__(self, db_url: str, logger: logging.Logger):
    """
    Args:
        db_url (str): Database connection URL.
        logger (logging.Logger): Logger for store operations.
    """
    super().__init__(db_url, logger)


  def create_node(self, type: NodeType, status: NodeStatus = NodeStatus.IDLE) -> UUID:
    """Create a new node record in the database.

    Args:
        type (NodeType): Type of the node (scheduler or worker).
        status (NodeStatus): Initial lifecycle status.

    Returns:
        UUID: The unique identifier of the created node.
    """
    node = Node(
      type=type,
      status=status
    )

    with self._get_session() as session:
      with session.begin():
        session.add(node)

    return node.id


  def get_all_nodes(self):
    """Return all nodes in the database."""
    with self._get_session() as session:
      return session.query(Node).all()


  def get_node_by_id(self, id: UUID):
    """Return a node by its UUID.

    Args:
        id (UUID): Node identifier.

    Returns:
        Node | None: Node object if found, else None.
    """
    with self._get_session() as session:
      node = session.get(Node, id)
      if node is None:
        return None
      return node


  def update_node_status(self, id: UUID, status: NodeStatus):
    """Update the status of a node.

    Args:
        id (UUID): Node identifier.
        status (NodeStatus): New status.
    """
    with self._get_session() as session:
      with session.begin():
        stmt = (
          update(Node).where(Node.id == id).values(status = status)
        )
        session.execute(stmt)


  def update_status_on_dead_nodes(self, cutoff: datetime, exclude_node_id: UUID):
    """Mark nodes as DEAD if their last_seen_at is older than cutoff.

    Args:
        cutoff (datetime): Time threshold to consider a node dead.
        exclude_node_id (UUID): Node to exclude from the update.

    Returns:
        int: Number of nodes marked as DEAD.
    """
    with self._get_session() as session:
      with session.begin():
        result = session.execute(
          update(Node)
          .where(Node.last_seen_at < cutoff)
          .where(Node.id != exclude_node_id)
          .where(Node.status != NodeStatus.DEAD)
          .values(status=NodeStatus.DEAD)
        )
        return result.rowcount


  def update_node_last_seen_at(self, id: UUID):
    """Update the heartbeat timestamp of a node.

    Args:
        id (UUID): Node identifier.
    """
    with self._get_session() as session:
      with session.begin():
        session.execute(
          update(Node)
          .where(Node.id == id)
          .values(last_seen_at = datetime.now(timezone.utc))
        )

__init__(db_url, logger)

Parameters:

Name Type Description Default
db_url str

Database connection URL.

required
logger Logger

Logger for store operations.

required
Source code in lilota/stores.py
50
51
52
53
54
55
56
def __init__(self, db_url: str, logger: logging.Logger):
  """
  Args:
      db_url (str): Database connection URL.
      logger (logging.Logger): Logger for store operations.
  """
  super().__init__(db_url, logger)

create_node(type, status=NodeStatus.IDLE)

Create a new node record in the database.

Parameters:

Name Type Description Default
type NodeType

Type of the node (scheduler or worker).

required
status NodeStatus

Initial lifecycle status.

IDLE

Returns:

Name Type Description
UUID UUID

The unique identifier of the created node.

Source code in lilota/stores.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def create_node(self, type: NodeType, status: NodeStatus = NodeStatus.IDLE) -> UUID:
  """Create a new node record in the database.

  Args:
      type (NodeType): Type of the node (scheduler or worker).
      status (NodeStatus): Initial lifecycle status.

  Returns:
      UUID: The unique identifier of the created node.
  """
  node = Node(
    type=type,
    status=status
  )

  with self._get_session() as session:
    with session.begin():
      session.add(node)

  return node.id

get_all_nodes()

Return all nodes in the database.

Source code in lilota/stores.py
81
82
83
84
def get_all_nodes(self):
  """Return all nodes in the database."""
  with self._get_session() as session:
    return session.query(Node).all()

get_node_by_id(id)

Return a node by its UUID.

Parameters:

Name Type Description Default
id UUID

Node identifier.

required

Returns:

Type Description

Node | None: Node object if found, else None.

Source code in lilota/stores.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def get_node_by_id(self, id: UUID):
  """Return a node by its UUID.

  Args:
      id (UUID): Node identifier.

  Returns:
      Node | None: Node object if found, else None.
  """
  with self._get_session() as session:
    node = session.get(Node, id)
    if node is None:
      return None
    return node

update_node_last_seen_at(id)

Update the heartbeat timestamp of a node.

Parameters:

Name Type Description Default
id UUID

Node identifier.

required
Source code in lilota/stores.py
140
141
142
143
144
145
146
147
148
149
150
151
152
def update_node_last_seen_at(self, id: UUID):
  """Update the heartbeat timestamp of a node.

  Args:
      id (UUID): Node identifier.
  """
  with self._get_session() as session:
    with session.begin():
      session.execute(
        update(Node)
        .where(Node.id == id)
        .values(last_seen_at = datetime.now(timezone.utc))
      )

update_node_status(id, status)

Update the status of a node.

Parameters:

Name Type Description Default
id UUID

Node identifier.

required
status NodeStatus

New status.

required
Source code in lilota/stores.py
103
104
105
106
107
108
109
110
111
112
113
114
115
def update_node_status(self, id: UUID, status: NodeStatus):
  """Update the status of a node.

  Args:
      id (UUID): Node identifier.
      status (NodeStatus): New status.
  """
  with self._get_session() as session:
    with session.begin():
      stmt = (
        update(Node).where(Node.id == id).values(status = status)
      )
      session.execute(stmt)

update_status_on_dead_nodes(cutoff, exclude_node_id)

Mark nodes as DEAD if their last_seen_at is older than cutoff.

Parameters:

Name Type Description Default
cutoff datetime

Time threshold to consider a node dead.

required
exclude_node_id UUID

Node to exclude from the update.

required

Returns:

Name Type Description
int

Number of nodes marked as DEAD.

Source code in lilota/stores.py
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def update_status_on_dead_nodes(self, cutoff: datetime, exclude_node_id: UUID):
  """Mark nodes as DEAD if their last_seen_at is older than cutoff.

  Args:
      cutoff (datetime): Time threshold to consider a node dead.
      exclude_node_id (UUID): Node to exclude from the update.

  Returns:
      int: Number of nodes marked as DEAD.
  """
  with self._get_session() as session:
    with session.begin():
      result = session.execute(
        update(Node)
        .where(Node.last_seen_at < cutoff)
        .where(Node.id != exclude_node_id)
        .where(Node.status != NodeStatus.DEAD)
        .values(status=NodeStatus.DEAD)
      )
      return result.rowcount

SqlAlchemyTaskStore

Bases: StoreBase

Database store for managing Lilota tasks.

Source code in lilota/stores.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
class SqlAlchemyTaskStore(StoreBase):
  """Database store for managing Lilota tasks."""

  def __init__(self, db_url: str, logger: logging.Logger, set_progress_manually: bool = False):
    """
    Args:
        db_url (str): Database connection URL.
        logger (logging.Logger): Logger for store operations.
        set_progress_manually (bool): Whether task progress must be updated manually.
    """
    super().__init__(db_url, logger)
    self._set_progress_manually = set_progress_manually


  def create_task(self, name: str, input: Any = None):
    """Create a new task record in the database.

    Args:
        name (str): Registered task name.
        input (Any, optional): Input data for the task.

    Returns:
        UUID: The unique identifier of the created task.
    """
    if not input is None:
      input = normalize_data(input)

    task = Task(
      name=name,
      input=input,
      status = TaskStatus.CREATED
    )

    with self._get_session() as session:
      with session.begin():
        session.add(task)

    return task.id


  def get_all_tasks(self):
    """Return all tasks ordered by ID."""
    with self._get_session() as session:
      return session.query(Task).order_by(Task.id).all()


  def get_unfinished_tasks(self) -> list[Task]:
    """Return all tasks that are not yet completed or failed."""
    with self._get_session() as session:
      return (
        session.query(Task)
          .filter(Task.status.in_([TaskStatus.CREATED, TaskStatus.SCHEDULED, TaskStatus.RUNNING]))
          .order_by(Task.id)
          .all()
      )


  def expire_overdue_tasks(self) -> None:
    """Set status to "expired" for running tasks whose expiration time has passed."""
    with self._get_session() as session:
      with session.begin():
        session.execute(
          update(Task)
          .where(
            Task.status == TaskStatus.RUNNING,
            Task.expires_at.is_not(None),
            Task.expires_at < datetime.now(timezone.utc)
          )
          .values(status=TaskStatus.EXPIRED)
        )


  def has_unfinished_tasks(self) -> bool:
    """Check if there are any unfinished tasks in the database."""
    with self._get_session() as session:
      return session.query(
        session.query(Task)
        .filter(
          Task.status.in_([
            TaskStatus.CREATED,
            TaskStatus.SCHEDULED,
            TaskStatus.RUNNING
          ])
        )
        .exists()
      ).scalar()


  def get_task_by_id(self, id: UUID) -> Task:
    """Return a task by its UUID."""
    with self._get_session() as session:
      task = session.get(Task, id)
      if task is None:
        return None
      return task


  def get_next_task(self, worker_id: UUID) -> Task:
    """Return the next available task for a worker and lock it.

    Args:
        worker_id (UUID): Worker node locking the task.

    Returns:
        Task | None: The next scheduled task, or None if no task is available.
    """
    with self._get_session() as session:
      with session.begin():
        task_id = session.execute(
          select(Task.id)
          .where(Task.status == TaskStatus.CREATED)
          .where(Task.run_at <= datetime.now(timezone.utc))
          .order_by(Task.run_at)
          .limit(1)
        ).scalar()

        if not task_id:
          return None

        result = session.execute(
          update(Task)
          .where(Task.id == task_id)
          .where(Task.status == TaskStatus.CREATED)
          .values(
            status=TaskStatus.SCHEDULED,
            locked_at=datetime.now(timezone.utc),
            locked_by=worker_id,
          )
        )

      if result.rowcount != 1:
        return None

      return session.get(Task, task_id)


  def start_task(self, id: UUID, timeout: timedelta | None = None) -> Task:
    """Mark a task as RUNNING and initialize metadata.

    Args:
        id (UUID): Id of the task.
        timeout (timedelta | None): Optional timeout that can be set for a task.

    Returns:
        Task | None: The started task, or None if no task is available.
    """
    expires_at = None
    start_date_time = datetime.now(timezone.utc)
    timeout_sec = int(timeout.total_seconds()) if timeout is not None else None

    with self._get_session() as session:
      with session.begin():
        task = self._load_task(session, id)
        task.timeout = timeout_sec

        if timeout is not None:
          expires_at = start_date_time + timeout

        task.pid = os.getpid()
        task.status = TaskStatus.RUNNING
        task.progress_percentage = 0
        task.start_date_time = start_date_time
        task.expires_at = expires_at
        task.end_date_time = None
        return task


  def set_progress(self, id: UUID, progress: int):
    """Update the progress percentage of a task.

    Args:
        id (UUID): Id of the task.
        progress (int): The progress of the task (0-100)
    """
    with self._get_session() as session:
      with session.begin():
        task = self._load_task(session, id)
        task.progress_percentage = max(0, min(progress, 100))


  def end_task_success(self, id: UUID, output: Any):
    """Mark a task as successfully completed.

    Args:
        id (UUID): Id of the task.
        output (Any): Task result data.
    """
    if not output is None:
      output = normalize_data(output)

    with self._get_session() as session:
      with session.begin():
        task = self._load_task(session, id)
        task.output = output
        self._complete_progress(task, TaskStatus.COMPLETED)


  def end_task_failure(self, id: UUID, error: dict):
    """Mark a task as failed.

    Args:
        id (UUID): Id of the task.
        error (dict): Error information to store.
    """
    with self._get_session() as session:
      with session.begin():
        task = self._load_task(session, id)
        task.error = error
        self._complete_progress(task, TaskStatus.FAILED)


  def delete_task_by_id(self, id: UUID):
    """Delete a task by its UUID.

    Returns:
        bool: True if deleted, False if task not found.
    """
    with self._get_session() as session:
      with session.begin():
        task = session.get(Task, id)
        if task is None:
          return False
        session.delete(task)
    return True


  def _complete_progress(self, task: Task, task_status: TaskStatus):
    if not self._set_progress_manually:
      task.progress_percentage = 100
    task.status = task_status
    task.end_date_time = datetime.now(timezone.utc)


  def _load_task(self, session, id: UUID) -> Task:
    task: Task = session.get(Task, id)
    if task is None:
      raise ValueError(f"Task {id} not found")
    return task

__init__(db_url, logger, set_progress_manually=False)

Parameters:

Name Type Description Default
db_url str

Database connection URL.

required
logger Logger

Logger for store operations.

required
set_progress_manually bool

Whether task progress must be updated manually.

False
Source code in lilota/stores.py
159
160
161
162
163
164
165
166
167
def __init__(self, db_url: str, logger: logging.Logger, set_progress_manually: bool = False):
  """
  Args:
      db_url (str): Database connection URL.
      logger (logging.Logger): Logger for store operations.
      set_progress_manually (bool): Whether task progress must be updated manually.
  """
  super().__init__(db_url, logger)
  self._set_progress_manually = set_progress_manually

create_task(name, input=None)

Create a new task record in the database.

Parameters:

Name Type Description Default
name str

Registered task name.

required
input Any

Input data for the task.

None

Returns:

Name Type Description
UUID

The unique identifier of the created task.

Source code in lilota/stores.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def create_task(self, name: str, input: Any = None):
  """Create a new task record in the database.

  Args:
      name (str): Registered task name.
      input (Any, optional): Input data for the task.

  Returns:
      UUID: The unique identifier of the created task.
  """
  if not input is None:
    input = normalize_data(input)

  task = Task(
    name=name,
    input=input,
    status = TaskStatus.CREATED
  )

  with self._get_session() as session:
    with session.begin():
      session.add(task)

  return task.id

delete_task_by_id(id)

Delete a task by its UUID.

Returns:

Name Type Description
bool

True if deleted, False if task not found.

Source code in lilota/stores.py
367
368
369
370
371
372
373
374
375
376
377
378
379
def delete_task_by_id(self, id: UUID):
  """Delete a task by its UUID.

  Returns:
      bool: True if deleted, False if task not found.
  """
  with self._get_session() as session:
    with session.begin():
      task = session.get(Task, id)
      if task is None:
        return False
      session.delete(task)
  return True

end_task_failure(id, error)

Mark a task as failed.

Parameters:

Name Type Description Default
id UUID

Id of the task.

required
error dict

Error information to store.

required
Source code in lilota/stores.py
353
354
355
356
357
358
359
360
361
362
363
364
def end_task_failure(self, id: UUID, error: dict):
  """Mark a task as failed.

  Args:
      id (UUID): Id of the task.
      error (dict): Error information to store.
  """
  with self._get_session() as session:
    with session.begin():
      task = self._load_task(session, id)
      task.error = error
      self._complete_progress(task, TaskStatus.FAILED)

end_task_success(id, output)

Mark a task as successfully completed.

Parameters:

Name Type Description Default
id UUID

Id of the task.

required
output Any

Task result data.

required
Source code in lilota/stores.py
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
def end_task_success(self, id: UUID, output: Any):
  """Mark a task as successfully completed.

  Args:
      id (UUID): Id of the task.
      output (Any): Task result data.
  """
  if not output is None:
    output = normalize_data(output)

  with self._get_session() as session:
    with session.begin():
      task = self._load_task(session, id)
      task.output = output
      self._complete_progress(task, TaskStatus.COMPLETED)

expire_overdue_tasks()

Set status to "expired" for running tasks whose expiration time has passed.

Source code in lilota/stores.py
213
214
215
216
217
218
219
220
221
222
223
224
225
def expire_overdue_tasks(self) -> None:
  """Set status to "expired" for running tasks whose expiration time has passed."""
  with self._get_session() as session:
    with session.begin():
      session.execute(
        update(Task)
        .where(
          Task.status == TaskStatus.RUNNING,
          Task.expires_at.is_not(None),
          Task.expires_at < datetime.now(timezone.utc)
        )
        .values(status=TaskStatus.EXPIRED)
      )

get_all_tasks()

Return all tasks ordered by ID.

Source code in lilota/stores.py
196
197
198
199
def get_all_tasks(self):
  """Return all tasks ordered by ID."""
  with self._get_session() as session:
    return session.query(Task).order_by(Task.id).all()

get_next_task(worker_id)

Return the next available task for a worker and lock it.

Parameters:

Name Type Description Default
worker_id UUID

Worker node locking the task.

required

Returns:

Type Description
Task

Task | None: The next scheduled task, or None if no task is available.

Source code in lilota/stores.py
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def get_next_task(self, worker_id: UUID) -> Task:
  """Return the next available task for a worker and lock it.

  Args:
      worker_id (UUID): Worker node locking the task.

  Returns:
      Task | None: The next scheduled task, or None if no task is available.
  """
  with self._get_session() as session:
    with session.begin():
      task_id = session.execute(
        select(Task.id)
        .where(Task.status == TaskStatus.CREATED)
        .where(Task.run_at <= datetime.now(timezone.utc))
        .order_by(Task.run_at)
        .limit(1)
      ).scalar()

      if not task_id:
        return None

      result = session.execute(
        update(Task)
        .where(Task.id == task_id)
        .where(Task.status == TaskStatus.CREATED)
        .values(
          status=TaskStatus.SCHEDULED,
          locked_at=datetime.now(timezone.utc),
          locked_by=worker_id,
        )
      )

    if result.rowcount != 1:
      return None

    return session.get(Task, task_id)

get_task_by_id(id)

Return a task by its UUID.

Source code in lilota/stores.py
244
245
246
247
248
249
250
def get_task_by_id(self, id: UUID) -> Task:
  """Return a task by its UUID."""
  with self._get_session() as session:
    task = session.get(Task, id)
    if task is None:
      return None
    return task

get_unfinished_tasks()

Return all tasks that are not yet completed or failed.

Source code in lilota/stores.py
202
203
204
205
206
207
208
209
210
def get_unfinished_tasks(self) -> list[Task]:
  """Return all tasks that are not yet completed or failed."""
  with self._get_session() as session:
    return (
      session.query(Task)
        .filter(Task.status.in_([TaskStatus.CREATED, TaskStatus.SCHEDULED, TaskStatus.RUNNING]))
        .order_by(Task.id)
        .all()
    )

has_unfinished_tasks()

Check if there are any unfinished tasks in the database.

Source code in lilota/stores.py
228
229
230
231
232
233
234
235
236
237
238
239
240
241
def has_unfinished_tasks(self) -> bool:
  """Check if there are any unfinished tasks in the database."""
  with self._get_session() as session:
    return session.query(
      session.query(Task)
      .filter(
        Task.status.in_([
          TaskStatus.CREATED,
          TaskStatus.SCHEDULED,
          TaskStatus.RUNNING
        ])
      )
      .exists()
    ).scalar()

set_progress(id, progress)

Update the progress percentage of a task.

Parameters:

Name Type Description Default
id UUID

Id of the task.

required
progress int

The progress of the task (0-100)

required
Source code in lilota/stores.py
323
324
325
326
327
328
329
330
331
332
333
def set_progress(self, id: UUID, progress: int):
  """Update the progress percentage of a task.

  Args:
      id (UUID): Id of the task.
      progress (int): The progress of the task (0-100)
  """
  with self._get_session() as session:
    with session.begin():
      task = self._load_task(session, id)
      task.progress_percentage = max(0, min(progress, 100))

start_task(id, timeout=None)

Mark a task as RUNNING and initialize metadata.

Parameters:

Name Type Description Default
id UUID

Id of the task.

required
timeout timedelta | None

Optional timeout that can be set for a task.

None

Returns:

Type Description
Task

Task | None: The started task, or None if no task is available.

Source code in lilota/stores.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
def start_task(self, id: UUID, timeout: timedelta | None = None) -> Task:
  """Mark a task as RUNNING and initialize metadata.

  Args:
      id (UUID): Id of the task.
      timeout (timedelta | None): Optional timeout that can be set for a task.

  Returns:
      Task | None: The started task, or None if no task is available.
  """
  expires_at = None
  start_date_time = datetime.now(timezone.utc)
  timeout_sec = int(timeout.total_seconds()) if timeout is not None else None

  with self._get_session() as session:
    with session.begin():
      task = self._load_task(session, id)
      task.timeout = timeout_sec

      if timeout is not None:
        expires_at = start_date_time + timeout

      task.pid = os.getpid()
      task.status = TaskStatus.RUNNING
      task.progress_percentage = 0
      task.start_date_time = start_date_time
      task.expires_at = expires_at
      task.end_date_time = None
      return task

StoreBase

Bases: ABC

Abstract base class for all database stores.

Provides common initialization and session management.

Source code in lilota/stores.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class StoreBase(ABC):
  """Abstract base class for all database stores.

  Provides common initialization and session management.
  """

  def __init__(self, db_url: str, logger: logging.Logger):
    """
    Args:
        db_url (str): Database connection URL.
        logger (logging.Logger): Logger for store operations.
    """
    self._db_url = db_url
    self._logger = logger
    self._engine = None
    self._Session = None


  def _ensure_engine(self):
    if self._engine is None:
      self._engine = create_engine(self._db_url, future=True)
      self._Session = sessionmaker(
        bind=self._engine,
        expire_on_commit=False,
      )


  def _get_session(self):
    self._ensure_engine()
    return self._Session()

__init__(db_url, logger)

Parameters:

Name Type Description Default
db_url str

Database connection URL.

required
logger Logger

Logger for store operations.

required
Source code in lilota/stores.py
20
21
22
23
24
25
26
27
28
29
def __init__(self, db_url: str, logger: logging.Logger):
  """
  Args:
      db_url (str): Database connection URL.
      logger (logging.Logger): Logger for store operations.
  """
  self._db_url = db_url
  self._logger = logger
  self._engine = None
  self._Session = None

LogEntry

Bases: Base

Database model storing log messages generated by Lilota.

Source code in lilota/models.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
class LogEntry(Base):
  """Database model storing log messages generated by Lilota."""

  __tablename__ = "lilota_log"
  id: Mapped[int] = mapped_column(primary_key=True)
  created_at: Mapped[datetime] = mapped_column(
    DateTime(timezone=True),
    nullable=False,
  )
  level: Mapped[str] = mapped_column(String(20), nullable=False)
  logger: Mapped[str] = mapped_column(String(255), nullable=False)
  message: Mapped[str] = mapped_column(Text, nullable=False)
  process: Mapped[str | None] = mapped_column(String(64), default=None)
  thread: Mapped[str | None] = mapped_column(String(64), default=None)
  node_id: Mapped[UUID | None] = mapped_column(default=None)
  task_id: Mapped[UUID | None] = mapped_column(default=None)

ModelProtocol

Bases: Protocol

Protocol for models that can be serialized to dictionaries.

Source code in lilota/models.py
73
74
75
76
77
78
79
@runtime_checkable
class ModelProtocol(Protocol):
  """Protocol for models that can be serialized to dictionaries."""

  def as_dict(self) -> dict[str, Any]:
    """Return a dictionary representation of the model."""
    pass

as_dict()

Return a dictionary representation of the model.

Source code in lilota/models.py
77
78
79
def as_dict(self) -> dict[str, Any]:
  """Return a dictionary representation of the model."""
  pass

Node

Bases: Base

Database model representing a Lilota node.

A node represents a running component of the system such as a scheduler or a worker.

Source code in lilota/models.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
class Node(Base):
  """Database model representing a Lilota node.

  A node represents a running component of the system such as a
  scheduler or a worker.
  """

  __tablename__ = "lilota_node"
  id: Mapped[UUID] = mapped_column(
    primary_key=True,
    default=uuid4
  )
  name: Mapped[str | None] = mapped_column(String(255), nullable=True)
  type: Mapped[str] = mapped_column(String(32), nullable=False)
  status: Mapped[str] = mapped_column(String(32), nullable=False)
  created_at: Mapped[datetime] = mapped_column(
    DateTime(timezone=True), 
    default=lambda: datetime.now(timezone.utc),
    nullable=False
  )
  last_seen_at: Mapped[datetime] = mapped_column(
    DateTime(timezone=True), 
    default=lambda: datetime.now(timezone.utc),
    nullable=False
  )

  __table_args__ = (
    CheckConstraint(
        "type IN ('scheduler', 'worker')",
        name="lilota_note_type_check"
    ),
    CheckConstraint(
        "status IN ('idle', 'starting', 'running', 'stopping', 'stopped', 'crashed', 'dead')",
        name="lilota_note_status_check"
    )
  )

NodeLeader

Bases: Base

Database model representing the worker node currently acting as leader.

The leader is responsible for cluster-level maintenance tasks such as cleaning up stale nodes.

Source code in lilota/models.py
282
283
284
285
286
287
288
289
290
291
292
class NodeLeader(Base):
  """Database model representing the worker node currently acting as leader.

  The leader is responsible for cluster-level maintenance tasks such as
  cleaning up stale nodes.
  """

  __tablename__ = "lilota_node_leader"
  id: Mapped[int] = mapped_column(Integer, primary_key=True, default=1)
  node_id: Mapped[UUID] = mapped_column(nullable=False)
  lease_expires_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), nullable=False)

NodeStatus

Bases: StrEnum

Enumeration describing the lifecycle state of a node.

Source code in lilota/models.py
25
26
27
28
29
30
31
32
33
class NodeStatus(StrEnum):
  """Enumeration describing the lifecycle state of a node."""
  IDLE = "idle"
  STARTING = "starting"
  RUNNING = "running"
  STOPPING = "stopping"
  STOPPED = "stopped"
  CRASHED = "crashed"
  DEAD ="dead"

NodeType

Bases: StrEnum

Enumeration of supported node types.

Source code in lilota/models.py
37
38
39
40
class NodeType(StrEnum):
  """Enumeration of supported node types."""
  SCHEDULER = "scheduler"
  WORKER = "worker"

RegisteredTask

Wrapper representing a registered task function.

This class stores metadata about the task function such as input and output models and optionally a progress tracker.

It is responsible for: - deserializing the task input - executing the task function - serializing the output

Parameters:

Name Type Description Default
func Callable

Task function to execute.

required
input_model Optional[Type]

Optional input model used to deserialize input payloads.

required
output_model Optional[Type]

Optional output model used to serialize the task result.

required
task_progress Optional[TaskProgress]

Optional progress helper.

required
timeout Optional[timedelta]

Optional timeout that can be set for a task.

required
Source code in lilota/models.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class RegisteredTask:
  """Wrapper representing a registered task function.

  This class stores metadata about the task function such as input
  and output models and optionally a progress tracker.

  It is responsible for:
  - deserializing the task input
  - executing the task function
  - serializing the output

  Args:
    func (Callable): Task function to execute.
    input_model (Optional[Type]): Optional input model used to
      deserialize input payloads.
    output_model (Optional[Type]): Optional output model used to
      serialize the task result.
    task_progress (Optional[TaskProgress]): Optional progress helper.
    timeout (Optional[timedelta]): Optional timeout that can be set for a task.
  """

  def __init__(self, func: Callable, input_model: Optional[Type], output_model: Optional[Type], task_progress: Optional[TaskProgress], timeout: Optional[timedelta]):
    self.func = func
    self.input_model = input_model
    self.output_model = output_model
    self.task_progress = task_progress
    self.timeout = timeout


  def __call__(self, raw_input: Any, task_progress: TaskProgress):
    """Execute the registered task.

    Args:
      raw_input (Any): Raw input payload stored in the database.
      task_progress (TaskProgress): Progress tracking object.

    Returns:
      Any: Serialized task result.
    """

    # Deserialize input
    input_value = self._deserialize_input(raw_input)

    # Execute the function
    if task_progress is None:
      if input_value is None:
        result = self.func()
      else:
        result = self.func(input_value)
    else:
      if input_value is None:
        result = self.func(task_progress)
      else:
        result = self.func(input_value, task_progress)

    # Serialize output to JSON-safe dict
    return self._serialize_output(result)


  def _deserialize_input(self, raw_input: Any):
    if not self.input_model:
      return raw_input
    if isinstance(self.input_model, ModelProtocol):
      return self.input_model(**raw_input)
    if is_dataclass(self.input_model):
      return self.input_model(**raw_input)
    if isinstance(raw_input, dict):
      return raw_input
    if isinstance(raw_input, self.input_model):
      return raw_input
    raise TypeError(f"Unsupported input_model type: {self.input_model}")


  def _serialize_output(self, output: Any):
    if not self.output_model:
      return output
    if isinstance(output, dict):
      return output
    if isinstance(output, ModelProtocol):
      return output.as_dict()
    if isinstance(output, self.output_model):
      if is_dataclass(output):
        return asdict(output)
    if is_dataclass(self.output_model):
      return self.output_model(**output)
    return output

__call__(raw_input, task_progress)

Execute the registered task.

Parameters:

Name Type Description Default
raw_input Any

Raw input payload stored in the database.

required
task_progress TaskProgress

Progress tracking object.

required

Returns:

Name Type Description
Any

Serialized task result.

Source code in lilota/models.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def __call__(self, raw_input: Any, task_progress: TaskProgress):
  """Execute the registered task.

  Args:
    raw_input (Any): Raw input payload stored in the database.
    task_progress (TaskProgress): Progress tracking object.

  Returns:
    Any: Serialized task result.
  """

  # Deserialize input
  input_value = self._deserialize_input(raw_input)

  # Execute the function
  if task_progress is None:
    if input_value is None:
      result = self.func()
    else:
      result = self.func(input_value)
  else:
    if input_value is None:
      result = self.func(task_progress)
    else:
      result = self.func(input_value, task_progress)

  # Serialize output to JSON-safe dict
  return self._serialize_output(result)

Task

Bases: Base

Database model representing a scheduled task.

Tasks are created by the scheduler and executed by worker nodes. The model stores the execution state, input/output data, and runtime metadata.

Source code in lilota/models.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
class Task(Base):
  """Database model representing a scheduled task.

  Tasks are created by the scheduler and executed by worker nodes.
  The model stores the execution state, input/output data, and
  runtime metadata.
  """

  __tablename__ = "lilota_task"
  id: Mapped[UUID] = mapped_column(
    primary_key=True,
    default=uuid4
  )
  name: Mapped[str] = mapped_column(String, nullable=False)
  pid: Mapped[int] = mapped_column(nullable=False, default=0)
  status: Mapped[str] = mapped_column(
    String(32),
    nullable=False,
    index=False
  )
  run_at: Mapped[datetime] = mapped_column(
    DateTime(timezone=True), 
    nullable=False,
    default=lambda: datetime.now(timezone.utc)
  )
  attempts: Mapped[int] = mapped_column(nullable=False, default=0)
  max_attempts: Mapped[int] = mapped_column(nullable=False, default=1)
  previous_task_id: Mapped[UUID] = mapped_column(nullable=True, default=None)
  timeout: Mapped[int | None] = mapped_column(Integer, nullable=True, default=None)
  expires_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True, default=None)
  progress_percentage: Mapped[int] = mapped_column(default=0)
  start_date_time: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True, default=None)
  end_date_time: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True, default=None)
  input: Mapped[Any | None] = mapped_column(JSON)
  output: Mapped[Any | None] = mapped_column(JSON)
  error: Mapped[Any | None] = mapped_column(JSON)
  locked_by: Mapped[UUID | None] = mapped_column(default=None)
  locked_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True, default=None)

  __table_args__ = (
    CheckConstraint(
      "status IN ('created', 'scheduled', 'running', 'completed', 'failed', 'expired', 'cancelled')",
      name="lilota_task_status_check"
    ),
    Index("idx_get_next_task", "status", "run_at"),
  )

  def __repr__(self):
    return f"<TaskInfo(id={self.id}, name={self.name}, progress={self.progress_percentage}%)>"

TaskProgress

Helper object used to update task progress.

This object is passed to task functions when progress tracking is enabled. It allows the task to report its current progress to the task store.

Parameters:

Name Type Description Default
task_id int

Identifier of the task.

required
set_progress Callable[[int, int], None]

Function used to persist progress updates.

required
Source code in lilota/models.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class TaskProgress:
  """Helper object used to update task progress.

  This object is passed to task functions when progress tracking is
  enabled. It allows the task to report its current progress to the
  task store.

  Args:
    task_id (int): Identifier of the task.
    set_progress (Callable[[int, int], None]): Function used to persist
      progress updates.
  """

  def __init__(self, task_id: int, set_progress: Callable[[int, int], None]):
    self.task_id = task_id
    self.set_progress = set_progress

  def set(self, progress: int):
    """Update the progress of the task.

    Args:
      progress (int): Progress value (typically 0–100).
    """
    self.set_progress(self.task_id, progress)

set(progress)

Update the progress of the task.

Parameters:

Name Type Description Default
progress int

Progress value (typically 0–100).

required
Source code in lilota/models.py
61
62
63
64
65
66
67
def set(self, progress: int):
  """Update the progress of the task.

  Args:
    progress (int): Progress value (typically 0–100).
  """
  self.set_progress(self.task_id, progress)

TaskStatus

Bases: StrEnum

Enumeration of possible task states.

Source code in lilota/models.py
13
14
15
16
17
18
19
20
21
class TaskStatus(StrEnum):
  """Enumeration of possible task states."""
  CREATED = "created"
  SCHEDULED = "scheduled"
  RUNNING = "running"
  COMPLETED = "completed"
  FAILED = "failed"
  EXPIRED = "expired"
  CANCELLED = "cancelled"

Heartbeat

Bases: Thread

Background thread that executes a heartbeat task periodically.

The heartbeat repeatedly calls the provided :class:HeartbeatTask at the configured interval until the thread is stopped.

Source code in lilota/heartbeat.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class Heartbeat(threading.Thread):
  """Background thread that executes a heartbeat task periodically.

  The heartbeat repeatedly calls the provided :class:`HeartbeatTask`
  at the configured interval until the thread is stopped.
  """

  def __init__(self, name: str, task: HeartbeatTask, logger: logging.Logger):
    """Initialize the heartbeat thread.

    Args:
      name (str): Name of the thread.
      task (HeartbeatTask): Task executed periodically by the heartbeat.
      logger (logging.Logger): Logger used for reporting errors.
    """
    super().__init__(name=name, daemon=True)
    self._task = task
    self._logger = logger
    self._stop_event = threading.Event()


  def run(self) -> None:
    """Run the heartbeat loop.

    The loop repeatedly executes the configured heartbeat task and
    waits for the defined interval before executing it again. The
    loop stops when the stop event is triggered.
    """
    while not self._stop_event.is_set():
      # Execute
      try:
        self._task.execute()
      except Exception as ex:
        if self._logger is not None:
          self._logger.exception("Heartbeat task failed")
        else:
          raise

      # Wait
      interval = max(0.0, self._task.interval)
      self._stop_event.wait(interval)


  def stop(self) -> None:
    """Signal the heartbeat thread to stop.

    The thread will stop after the current execution cycle completes.
    """
    self._stop_event.set()


  def stop_and_join(self, timeout=None):
    """Stop the heartbeat thread and wait for it to finish.

    Args:
      timeout (float | None, optional): Maximum number of seconds to
        wait for the thread to terminate. If ``None``, wait indefinitely.
    """
    self.stop()
    self.join(timeout)

__init__(name, task, logger)

Initialize the heartbeat thread.

Parameters:

Name Type Description Default
name str

Name of the thread.

required
task HeartbeatTask

Task executed periodically by the heartbeat.

required
logger Logger

Logger used for reporting errors.

required
Source code in lilota/heartbeat.py
40
41
42
43
44
45
46
47
48
49
50
51
def __init__(self, name: str, task: HeartbeatTask, logger: logging.Logger):
  """Initialize the heartbeat thread.

  Args:
    name (str): Name of the thread.
    task (HeartbeatTask): Task executed periodically by the heartbeat.
    logger (logging.Logger): Logger used for reporting errors.
  """
  super().__init__(name=name, daemon=True)
  self._task = task
  self._logger = logger
  self._stop_event = threading.Event()

run()

Run the heartbeat loop.

The loop repeatedly executes the configured heartbeat task and waits for the defined interval before executing it again. The loop stops when the stop event is triggered.

Source code in lilota/heartbeat.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def run(self) -> None:
  """Run the heartbeat loop.

  The loop repeatedly executes the configured heartbeat task and
  waits for the defined interval before executing it again. The
  loop stops when the stop event is triggered.
  """
  while not self._stop_event.is_set():
    # Execute
    try:
      self._task.execute()
    except Exception as ex:
      if self._logger is not None:
        self._logger.exception("Heartbeat task failed")
      else:
        raise

    # Wait
    interval = max(0.0, self._task.interval)
    self._stop_event.wait(interval)

stop()

Signal the heartbeat thread to stop.

The thread will stop after the current execution cycle completes.

Source code in lilota/heartbeat.py
76
77
78
79
80
81
def stop(self) -> None:
  """Signal the heartbeat thread to stop.

  The thread will stop after the current execution cycle completes.
  """
  self._stop_event.set()

stop_and_join(timeout=None)

Stop the heartbeat thread and wait for it to finish.

Parameters:

Name Type Description Default
timeout float | None

Maximum number of seconds to wait for the thread to terminate. If None, wait indefinitely.

None
Source code in lilota/heartbeat.py
84
85
86
87
88
89
90
91
92
def stop_and_join(self, timeout=None):
  """Stop the heartbeat thread and wait for it to finish.

  Args:
    timeout (float | None, optional): Maximum number of seconds to
      wait for the thread to terminate. If ``None``, wait indefinitely.
  """
  self.stop()
  self.join(timeout)

HeartbeatTask

Bases: ABC

Abstract base class for heartbeat tasks.

A heartbeat task defines logic that should be executed periodically by a :class:Heartbeat thread.

Source code in lilota/heartbeat.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class HeartbeatTask(ABC):
  """Abstract base class for heartbeat tasks.

  A heartbeat task defines logic that should be executed periodically
  by a :class:`Heartbeat` thread.
  """

  def __init__(self, interval: float):
    """Initialize the heartbeat task.

    Args:
      interval (float): Interval in seconds between task executions.
    """
    self.interval = interval


  @abstractmethod
  def execute(self):
    """Execute the heartbeat logic.

    Subclasses must implement this method to define the work performed
    during each heartbeat cycle.
    """
    pass

__init__(interval)

Initialize the heartbeat task.

Parameters:

Name Type Description Default
interval float

Interval in seconds between task executions.

required
Source code in lilota/heartbeat.py
13
14
15
16
17
18
19
def __init__(self, interval: float):
  """Initialize the heartbeat task.

  Args:
    interval (float): Interval in seconds between task executions.
  """
  self.interval = interval

execute() abstractmethod

Execute the heartbeat logic.

Subclasses must implement this method to define the work performed during each heartbeat cycle.

Source code in lilota/heartbeat.py
22
23
24
25
26
27
28
29
@abstractmethod
def execute(self):
  """Execute the heartbeat logic.

  Subclasses must implement this method to define the work performed
  during each heartbeat cycle.
  """
  pass

ContextLogger

Bases: LoggerAdapter

Logger adapter that automatically injects contextual metadata.

This adapter attaches additional context such as node_id and task_id to log records so that log entries can be associated with specific nodes or tasks.

Source code in lilota/logging.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class ContextLogger(logging.LoggerAdapter):
  """Logger adapter that automatically injects contextual metadata.

  This adapter attaches additional context such as ``node_id`` and
  ``task_id`` to log records so that log entries can be associated
  with specific nodes or tasks.
  """

  def process(self, msg, kwargs):
    """Inject contextual metadata into the log record.

    Args:
      msg (str): Log message.
      kwargs (dict): Keyword arguments passed to the logger.

    Returns:
      tuple: Processed ``(msg, kwargs)`` pair with injected metadata.
    """
    kwargs.setdefault("extra", {})
    kwargs["extra"].setdefault("node_id", self.extra.get("node_id"))
    kwargs["extra"].setdefault("task_id", self.extra.get("task_id"))
    return msg, kwargs

process(msg, kwargs)

Inject contextual metadata into the log record.

Parameters:

Name Type Description Default
msg str

Log message.

required
kwargs dict

Keyword arguments passed to the logger.

required

Returns:

Name Type Description
tuple

Processed (msg, kwargs) pair with injected metadata.

Source code in lilota/logging.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def process(self, msg, kwargs):
  """Inject contextual metadata into the log record.

  Args:
    msg (str): Log message.
    kwargs (dict): Keyword arguments passed to the logger.

  Returns:
    tuple: Processed ``(msg, kwargs)`` pair with injected metadata.
  """
  kwargs.setdefault("extra", {})
  kwargs["extra"].setdefault("node_id", self.extra.get("node_id"))
  kwargs["extra"].setdefault("task_id", self.extra.get("task_id"))
  return msg, kwargs

LilotaLoggingFilter

Bases: Filter

Logging filter used to suppress noisy third-party logs.

Currently filters Alembic logs so that only warnings and errors are recorded.

Source code in lilota/logging.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class LilotaLoggingFilter(logging.Filter):
  """Logging filter used to suppress noisy third-party logs.

  Currently filters Alembic logs so that only warnings and errors
  are recorded.
  """

  def filter(self, record: logging.LogRecord) -> bool:
    """Determine whether a log record should be processed.

    Args:
      record (logging.LogRecord): Log record being evaluated.

    Returns:
      bool: ``True`` if the record should be logged, otherwise ``False``.
    """
    if record.name.startswith("alembic."):
      return record.levelno >= logging.WARNING
    return True

filter(record)

Determine whether a log record should be processed.

Parameters:

Name Type Description Default
record LogRecord

Log record being evaluated.

required

Returns:

Name Type Description
bool bool

True if the record should be logged, otherwise False.

Source code in lilota/logging.py
79
80
81
82
83
84
85
86
87
88
89
90
def filter(self, record: logging.LogRecord) -> bool:
  """Determine whether a log record should be processed.

  Args:
    record (logging.LogRecord): Log record being evaluated.

  Returns:
    bool: ``True`` if the record should be logged, otherwise ``False``.
  """
  if record.name.startswith("alembic."):
    return record.levelno >= logging.WARNING
  return True

SqlAlchemyHandler

Bases: Handler

Logging handler that stores log records in the database.

This handler writes log messages to the lilota_log table using the provided :class:SqlAlchemyLogStore. Each log record is converted into a :class:LogEntry model instance.

Source code in lilota/logging.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class SqlAlchemyHandler(logging.Handler):
  """Logging handler that stores log records in the database.

  This handler writes log messages to the ``lilota_log`` table using
  the provided :class:`SqlAlchemyLogStore`. Each log record is converted
  into a :class:`LogEntry` model instance.
  """

  def __init__(self, log_store: SqlAlchemyLogStore):
    """Initialize the logging handler.

    Args:
      log_store (SqlAlchemyLogStore): Store used to persist log entries.
    """
    super().__init__()
    self.log_store: SqlAlchemyLogStore = log_store


  def emit(self, record: logging.LogRecord) -> None:
    """Persist a log record in the database.

    Args:
      record (logging.LogRecord): Log record to store.
    """
    with self.log_store.get_session() as session:
      entry = LogEntry(
        created_at=datetime.fromtimestamp(record.created),
        level=record.levelname,
        logger=record.name,
        message=self.format(record),
        process=record.processName,
        thread=record.threadName,
        node_id=getattr(record, "node_id", None),
        task_id=getattr(record, "task_id", None),
      )
      session.add(entry)
      session.commit()

__init__(log_store)

Initialize the logging handler.

Parameters:

Name Type Description Default
log_store SqlAlchemyLogStore

Store used to persist log entries.

required
Source code in lilota/logging.py
15
16
17
18
19
20
21
22
def __init__(self, log_store: SqlAlchemyLogStore):
  """Initialize the logging handler.

  Args:
    log_store (SqlAlchemyLogStore): Store used to persist log entries.
  """
  super().__init__()
  self.log_store: SqlAlchemyLogStore = log_store

emit(record)

Persist a log record in the database.

Parameters:

Name Type Description Default
record LogRecord

Log record to store.

required
Source code in lilota/logging.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def emit(self, record: logging.LogRecord) -> None:
  """Persist a log record in the database.

  Args:
    record (logging.LogRecord): Log record to store.
  """
  with self.log_store.get_session() as session:
    entry = LogEntry(
      created_at=datetime.fromtimestamp(record.created),
      level=record.levelname,
      logger=record.name,
      message=self.format(record),
      process=record.processName,
      thread=record.threadName,
      node_id=getattr(record, "node_id", None),
      task_id=getattr(record, "task_id", None),
    )
    session.add(entry)
    session.commit()

configure_logging(db_url, logger_name, logging_level)

Configure a Lilota logger that writes log messages to the database.

This function creates and configures a logger with the given name. The logger uses :class:SqlAlchemyHandler to persist log records in the database through :class:SqlAlchemyLogStore. Any existing handlers attached to the logger are removed before configuration.

Parameters:

Name Type Description Default
db_url str

Database connection URL used by the log store.

required
logger_name str

Name of the logger to configure.

required
logging_level int

Logging level to apply to both the logger and its handler.

required

Returns:

Type Description
Logger

logging.Logger: Configured logger instance that writes log messages to the database.

Source code in lilota/logging.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def configure_logging(db_url: str, logger_name: str, logging_level: int) -> logging.Logger:
  """Configure a Lilota logger that writes log messages to the database.

  This function creates and configures a logger with the given name. The
  logger uses :class:`SqlAlchemyHandler` to persist log records in the
  database through :class:`SqlAlchemyLogStore`. Any existing handlers
  attached to the logger are removed before configuration.

  Args:
    db_url (str):
      Database connection URL used by the log store.

    logger_name (str):
      Name of the logger to configure.

    logging_level (int):
      Logging level to apply to both the logger and its handler.

  Returns:
    logging.Logger:
      Configured logger instance that writes log messages to the database.
  """
  logger = logging.getLogger(logger_name)
  logger.setLevel(logging_level)
  logger.handlers.clear()
  db_handler = SqlAlchemyHandler(SqlAlchemyLogStore(db_url))
  db_handler.setLevel(logging_level)
  db_handler.setFormatter(logging.Formatter("%(message)s"))
  db_handler.addFilter(LilotaLoggingFilter())
  logger.addHandler(db_handler)
  return logger

create_context_logger(base_logger, **kwargs)

Create a context-aware logger.

The returned logger automatically attaches contextual metadata (such as node_id and task_id) to all emitted log records.

Parameters:

Name Type Description Default
base_logger Logger

Base logger instance.

required
**kwargs

Optional context values (e.g., node_id, task_id).

{}

Returns:

Name Type Description
ContextLogger

Logger adapter with contextual metadata.

Source code in lilota/logging.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def create_context_logger(base_logger: logging.Logger, **kwargs):
  """Create a context-aware logger.

  The returned logger automatically attaches contextual metadata
  (such as ``node_id`` and ``task_id``) to all emitted log records.

  Args:
    base_logger (logging.Logger): Base logger instance.
    **kwargs: Optional context values (e.g., ``node_id``, ``task_id``).

  Returns:
    ContextLogger: Logger adapter with contextual metadata.
  """
  extra = {}

  if "node_id" in kwargs:
    extra["node_id"] = kwargs["node_id"]

  if "task_id" in kwargs:
    extra["task_id"] = kwargs["task_id"]

  return ContextLogger(base_logger, extra,)

error_to_dict(error_message)

Wrap an error message string into a dictionary.

Parameters:

Name Type Description Default
error_message str

The error message to wrap.

required

Returns:

Name Type Description
dict dict

A dictionary with key "message" containing the error message.

Source code in lilota/utils.py
26
27
28
29
30
31
32
33
34
35
36
37
def error_to_dict(error_message: str) -> dict:
  """Wrap an error message string into a dictionary.

    Args:
        error_message (str): The error message to wrap.

    Returns:
        dict: A dictionary with key "message" containing the error message.
    """
  return {
    "message": error_message
  }

exception_to_dict(ex)

Convert an exception into a dictionary containing type, message, and traceback.

Parameters:

Name Type Description Default
ex Exception

The exception to convert.

required

Returns:

Name Type Description
dict dict

A dictionary with keys: - "type": Exception class name. - "message": Exception message string. - "traceback": Formatted traceback string.

Source code in lilota/utils.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def exception_to_dict(ex: Exception) -> dict:
  """Convert an exception into a dictionary containing type, message, and traceback.

    Args:
        ex (Exception): The exception to convert.

    Returns:
        dict: A dictionary with keys:
            - "type": Exception class name.
            - "message": Exception message string.
            - "traceback": Formatted traceback string.
    """
  return {
    "type": ex.__class__.__name__,
    "message": str(ex),
    "traceback": traceback.format_exc(),
  }

normalize_data(data)

Normalize input data to a dictionary for storage or serialization.

Supports dict, ModelProtocol objects, and dataclasses.

Parameters:

Name Type Description Default
data Any

The input data to normalize.

required

Returns:

Name Type Description
dict dict

A dictionary representation of the data.

Raises:

Type Description
TypeError

If data is not a dict, ModelProtocol, or dataclass.

Source code in lilota/utils.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def normalize_data(data: Any) -> dict:
  """Normalize input data to a dictionary for storage or serialization.

    Supports `dict`, `ModelProtocol` objects, and dataclasses.

    Args:
        data (Any): The input data to normalize.

    Returns:
        dict: A dictionary representation of the data.

    Raises:
        TypeError: If `data` is not a `dict`, `ModelProtocol`, or dataclass.
    """
  # Dict
  if isinstance(data, dict):
    return data

  # ModelProtocol
  if isinstance(data, ModelProtocol):
    return data.as_dict()

  # Dataclass
  if is_dataclass(data):
    return asdict(data)

  raise TypeError(
    f"Unsupported type: {type(data).__name__}. Expected ModelProtocol, dataclass, or dict."
  )