In production MLOps systems, ML pipelines consist of multiple interdependent tasks (data loading, feature engineering, training, evaluation, etc.) organized as a Directed Acyclic Graph (DAG). Understanding task dependencies, execution order, and the critical path is essential for pipeline optimization and resource allocation.
Given a list of pipeline tasks where each task has an ID, duration, and list of dependencies (task IDs that must complete before this task can start), implement a function analyze_ml_pipeline(tasks) that performs complete pipeline analysis.
The function should compute:
-
Execution Order: A valid topological ordering of tasks (use alphabetical ordering when multiple tasks are available)
-
Earliest Start/Finish Times: For each task, the earliest possible start and finish times assuming tasks start as soon as dependencies complete
-
Latest Start/Finish Times: For each task, the latest times the task can start/finish without delaying the overall pipeline
-
Slack Time: For each task, the amount of time the task can be delayed without affecting the makespan
-
Critical Path: The sequence of tasks with zero slack that determines the minimum pipeline duration
-
Makespan: The total time required to complete all tasks
If the input is empty, return a result with empty collections and makespan of 0. Assume the input DAG has no cycles.
Examples
tasks = [{'id': 'start', 'duration': 2, 'dependencies': []}, {'id': 'left', 'duration': 5, 'dependencies': ['start']}, {'id': 'right', 'duration': 10, 'dependencies': ['start']}, {'id': 'end', 'duration': 3, 'dependencies': ['left', 'right']}]{'execution_order': ['start', 'left', 'right', 'end'], 'earliest_start': {'start': 0, 'left': 2, 'right': 2, 'end': 12}, 'earliest_finish': {'start': 2, 'left': 7, 'right': 12, 'end': 15}, 'latest_start': {'start': 0, 'left': 7, 'right': 2, 'end': 12}, 'latest_finish': {'start': 2, 'left': 12, 'right': 12, 'end': 15}, 'slack': {'start': 0, 'left': 5, 'right': 0, 'end': 0}, 'critical_path': ['start', 'right', 'end'], 'makespan': 15}Starter Code
def analyze_ml_pipeline(tasks: list) -> dict:
"""
Analyze an ML pipeline DAG for scheduling and critical path.
Args:
tasks: list of task dicts with:
- 'id': task identifier (str)
- 'duration': task duration in minutes (int)
- 'dependencies': list of task IDs this task depends on
Returns:
dict with:
- 'execution_order': topologically sorted list of task IDs
- 'earliest_start': dict mapping task ID to earliest start time
- 'earliest_finish': dict mapping task ID to earliest finish time
- 'latest_start': dict mapping task ID to latest start time
- 'latest_finish': dict mapping task ID to latest finish time
- 'slack': dict mapping task ID to slack time
- 'critical_path': list of task IDs on critical path (in execution order)
- 'makespan': total time to complete pipeline
"""
pass