"""Home of the `Schedule` class."""
from __future__ import annotations
from typing import Any, TYPE_CHECKING
from collections import deque
from job_shop_lib import ScheduledOperation, JobShopInstance, Operation
from job_shop_lib.exceptions import ValidationError
if TYPE_CHECKING:
from job_shop_lib.dispatching import Dispatcher
[docs]
class Schedule:
r"""Data structure to store a complete or partial solution for a particular
:class:`JobShopInstance`.
A schedule is a list of lists of :class:`ScheduledOperation` objects. Each
list represents the order of operations on a machine.
The main methods of this class are:
.. autosummary::
:nosignatures:
makespan
is_complete
add
reset
Args:
instance:
The :class:`JobShopInstance` object that the schedule is for.
schedule:
A list of lists of :class:`ScheduledOperation` objects. Each
list represents the order of operations on a machine. If
not provided, the schedule is initialized as an empty schedule.
\**metadata:
Additional information about the schedule.
"""
__slots__ = {
"instance": (
"The :class:`JobShopInstance` object that the schedule is for."
),
"_schedule": (
"A list of lists of :class:`ScheduledOperation` objects. "
"Each list represents the order of operations on a machine."
),
"metadata": (
"A dictionary with additional information about the "
"schedule. It can be used to store information about the "
"algorithm that generated the schedule, for example."
),
"operation_to_scheduled_operation": (
"A dictionary that maps an :class:`Operation` to its "
":class:`ScheduledOperation` in the schedule. This is used to "
"quickly find the scheduled operation associated with a given "
"operation."
),
"num_scheduled_operations": (
"The number of operations that have been scheduled so far."
),
"operation_with_latest_end_time": (
"The :class:`ScheduledOperation` with the latest end time. "
"This is used to quickly find the last operation in the schedule."
),
}
def __init__(
self,
instance: JobShopInstance,
schedule: list[list[ScheduledOperation]] | None = None,
**metadata: Any,
):
if schedule is None:
schedule = [[] for _ in range(instance.num_machines)]
Schedule.check_schedule(schedule)
self.instance: JobShopInstance = instance
self._schedule = schedule
self.metadata: dict[str, Any] = metadata
self.operation_to_scheduled_operation: dict[
Operation, ScheduledOperation
] = {
scheduled_op.operation: scheduled_op
for machine_schedule in schedule
for scheduled_op in machine_schedule
}
self.num_scheduled_operations = sum(
len(machine_schedule) for machine_schedule in schedule
)
self.operation_with_latest_end_time: ScheduledOperation | None = max(
(
scheduled_op
for machine_schedule in schedule
for scheduled_op in machine_schedule
),
key=lambda op: op.end_time, # type: ignore[union-attr]
default=None,
)
def __repr__(self) -> str:
return str(self.schedule)
@property
def schedule(self) -> list[list[ScheduledOperation]]:
"""A list of lists of :class:`ScheduledOperation` objects. Each list
represents the order of operations on a machine."""
return self._schedule
@schedule.setter
def schedule(self, new_schedule: list[list[ScheduledOperation]]):
Schedule.check_schedule(new_schedule)
self._schedule = new_schedule
[docs]
def to_dict(self) -> dict:
"""Returns a dictionary representation of the schedule.
This representation is useful for saving the instance to a JSON file.
Returns:
A dictionary representation of the schedule with the following
keys:
- **"instance"**: A dictionary representation of the instance.
- **"job_sequences"**: A list of lists of job ids. Each list
of job ids represents the order of operations on the machine.
The machine that the list corresponds to is determined by the
index of the list.
- **"metadata"**: A dictionary with additional information
about the schedule.
"""
return {
"instance": self.instance.to_dict(),
"job_sequences": self.job_sequences(),
"metadata": self.metadata,
}
[docs]
@staticmethod
def from_dict(
instance: dict[str, Any] | JobShopInstance,
job_sequences: list[list[int]],
metadata: dict[str, Any] | None = None,
) -> Schedule:
"""Creates a schedule from a dictionary representation.
Args:
instance:
The instance to create the schedule for. Can be a dictionary
representation of a :class:`JobShopInstance` or a
:class:`JobShopInstance` object.
job_sequences:
A list of lists of job ids. Each list of job ids represents the
order of operations on the machine. The machine that the list
corresponds to is determined by the index of the list.
metadata:
A dictionary with additional information about the schedule.
Returns:
A :class:`Schedule` object with the given job sequences.
"""
if isinstance(instance, dict):
instance = JobShopInstance.from_matrices(**instance)
schedule = Schedule.from_job_sequences(instance, job_sequences)
schedule.metadata = metadata if metadata is not None else {}
return schedule
[docs]
@staticmethod
def from_job_sequences(
instance: JobShopInstance,
job_sequences: list[list[int]],
dispatcher: Dispatcher | None = None,
) -> Schedule:
"""Creates an active schedule from a list of job sequences.
An active schedule is the optimal schedule for the given job sequences.
In other words, it is not possible to construct another schedule,
through changes in the order of processing on the machines, with at
least one operation finishing earlier and no operation finishing later.
Args:
instance:
The :class:`JobShopInstance` object that the schedule is for.
job_sequences:
A list of lists of job ids. Each list of job ids represents the
order of operations on the machine. The machine that the list
corresponds to is determined by the index of the list.
dispatcher:
A :class:`~job_shop_lib.dispatching.Dispatcher` to use for
scheduling. If not provided, a new dispatcher will be
created.
.. note::
You will need to provide a dispatcher if you want to
take into account start time calculators different from
the default one.
Returns:
A :class:`Schedule` object with the given job sequences.
.. seealso::
See :mod:`job_shop_lib.dispatching` for more information
about dispatchers and the start time calculators available.
"""
from job_shop_lib.dispatching import Dispatcher
if dispatcher is None:
dispatcher = Dispatcher(instance)
dispatcher.reset()
raw_solution_deques = [deque(job_ids) for job_ids in job_sequences]
while any(job_seq for job_seq in raw_solution_deques):
at_least_one_operation_scheduled = False
for machine_id, job_ids in enumerate(raw_solution_deques):
if not job_ids:
continue
job_id = job_ids[0]
operation_index = dispatcher.job_next_operation_index[job_id]
operation = instance.jobs[job_id][operation_index]
is_ready = dispatcher.is_operation_ready(operation)
if is_ready and machine_id in operation.machines:
dispatcher.dispatch(operation, machine_id)
job_ids.popleft()
at_least_one_operation_scheduled = True
if not at_least_one_operation_scheduled:
raise ValidationError(
"Invalid job sequences. No valid operation to schedule."
)
return dispatcher.schedule
[docs]
def job_sequences(self) -> list[list[int]]:
"""Returns the sequence of jobs for each machine in the schedule.
This method returns a list of lists, where each sublist contains the
job ids of the operations scheduled on that machine.
"""
job_sequences: list[list[int]] = []
for machine_schedule in self.schedule:
job_sequences.append(
[operation.job_id for operation in machine_schedule]
)
return job_sequences
[docs]
def reset(self):
"""Resets the schedule to an empty state."""
self.schedule = [[] for _ in range(self.instance.num_machines)]
self.operation_to_scheduled_operation = {}
self.num_scheduled_operations = 0
self.operation_with_latest_end_time = None
[docs]
def makespan(self) -> int:
"""Returns the makespan of the schedule.
The makespan is the time at which all operations are completed.
"""
last_operation = self.operation_with_latest_end_time
if last_operation is None:
return 0
return last_operation.end_time
[docs]
def is_complete(self) -> bool:
"""Returns ``True`` if all operations have been scheduled."""
return self.num_scheduled_operations == self.instance.num_operations
[docs]
def add(self, scheduled_operation: ScheduledOperation):
"""Adds a new :class:`ScheduledOperation` to the schedule.
Args:
scheduled_operation:
The :class:`ScheduledOperation` to add to the schedule.
Raises:
ValidationError:
If the start time of the new operation is before the
end time of the last operation on the same machine. In favor of
performance, this method does not checks precedence
constraints.
"""
self._check_start_time_of_new_operation(scheduled_operation)
# Update attributes:
self.schedule[scheduled_operation.machine_id].append(
scheduled_operation
)
self.operation_to_scheduled_operation[
scheduled_operation.operation
] = scheduled_operation
self.num_scheduled_operations += 1
if (
self.operation_with_latest_end_time is None
or scheduled_operation.end_time
> self.operation_with_latest_end_time.end_time
):
self.operation_with_latest_end_time = scheduled_operation
def _check_start_time_of_new_operation(
self,
new_operation: ScheduledOperation,
):
is_first_operation = not self.schedule[new_operation.machine_id]
if is_first_operation:
return
last_operation = self.schedule[new_operation.machine_id][-1]
if not self._is_valid_start_time(new_operation, last_operation):
raise ValidationError(
"Operation cannot be scheduled before the last operation on "
"the same machine: end time of last operation "
f"({last_operation.end_time}) > start time of new operation "
f"({new_operation.start_time})."
)
@staticmethod
def _is_valid_start_time(
scheduled_operation: ScheduledOperation,
previous_operation: ScheduledOperation,
):
return previous_operation.end_time <= scheduled_operation.start_time
[docs]
@staticmethod
def check_schedule(schedule: list[list[ScheduledOperation]]):
"""Checks if a schedule is valid and raises a
:class:`~exceptions.ValidationError` if it is not.
A schedule is considered invalid if:
- A :class:`ScheduledOperation` has a machine id that does not
match the machine id of the machine schedule (the list of
:class:`ScheduledOperation` objects) that it belongs to.
- The start time of a :class:`ScheduledOperation` is before the
end time of the last operation on the same machine.
Args:
schedule:
The schedule (a list of lists of :class:`ScheduledOperation`
objects) to check.
Raises:
ValidationError: If the schedule is invalid.
"""
for machine_id, scheduled_operations in enumerate(schedule):
for i, scheduled_operation in enumerate(scheduled_operations):
if scheduled_operation.machine_id != machine_id:
raise ValidationError(
"The machine id of the scheduled operation "
f"({ScheduledOperation.machine_id}) does not match "
f"the machine id of the machine schedule ({machine_id}"
f"). Index of the operation: [{machine_id}][{i}]."
)
if i == 0:
continue
if not Schedule._is_valid_start_time(
scheduled_operation, scheduled_operations[i - 1]
):
raise ValidationError(
"Invalid schedule. The start time of the new "
"operation is before the end time of the last "
"operation on the same machine."
"End time of last operation: "
f"{scheduled_operations[i - 1].end_time}. "
f"Start time of new operation: "
f"{scheduled_operation.start_time}. At index "
f"[{machine_id}][{i}]."
)
def __eq__(self, value: object) -> bool:
if not isinstance(value, Schedule):
return False
return self.schedule == value.schedule
[docs]
def copy(self) -> Schedule:
"""Returns a copy of the schedule."""
return Schedule(
self.instance,
[machine_schedule.copy() for machine_schedule in self.schedule],
**self.metadata,
)
[docs]
def critical_path(self) -> list[ScheduledOperation]:
"""Returns the critical path of the schedule.
The critical path is the longest path of dependent operations through
the schedule, which determines the makespan. This implementation
correctly identifies the path even in non-compact schedules where
idle time may exist.
It works by starting from an operation that determines the makespan
and tracing backwards, at each step choosing the predecessor (either
from the same job or the same machine) that finished latest.
"""
# 1. Start from the operation that determines the makespan
last_scheduled_op = self.operation_with_latest_end_time
if last_scheduled_op is None:
return []
critical_path = deque([last_scheduled_op])
current_scheduled_op = last_scheduled_op
machine_op_index = {}
for machine_id, schedule_list in enumerate(self.schedule):
machine_op_index[machine_id] = {op: idx for idx, op in
enumerate(schedule_list)}
# 2. Trace backwards from the last operation
while True:
job_pred: ScheduledOperation | None = None
machine_pred: ScheduledOperation | None = None
# Find job predecessor (the previous operation in the same job)
op_idx_in_job = current_scheduled_op.operation.position_in_job
if op_idx_in_job > 0:
prev_op_in_job = self.instance.jobs[
current_scheduled_op.job_id
][op_idx_in_job - 1]
job_pred = self.operation_to_scheduled_operation[
prev_op_in_job
]
# Find machine predecessor (the previous operation on the same
# machine)
machine_schedule = self.schedule[current_scheduled_op.machine_id]
op_idx_on_machine = (
machine_op_index
[current_scheduled_op.machine_id][current_scheduled_op])
if op_idx_on_machine > 0:
machine_pred = machine_schedule[
op_idx_on_machine - 1
]
# 3. Determine the critical predecessor
# The critical predecessor is the one that finished latest, as it
# determined the start time of the current operation.
if job_pred is None and machine_pred is None:
# Reached the beginning of the schedule, no more predecessors
break
job_pred_end_time = (
job_pred.end_time if job_pred is not None else -1
)
machine_pred_end_time = (
machine_pred.end_time if machine_pred is not None else -1
)
critical_pred = (
job_pred
if job_pred_end_time >= machine_pred_end_time
else machine_pred
)
assert critical_pred is not None
# Prepend the critical predecessor to the path and continue tracing
critical_path.appendleft(critical_pred)
current_scheduled_op = critical_pred
return list(critical_path)