Source code for job_shop_lib.dispatching._dispatcher

"""Home of the `Dispatcher` class."""

from __future__ import annotations

import abc
from typing import Any, TypeVar
from collections.abc import Callable
from functools import wraps

from job_shop_lib import (
    JobShopInstance,
    Schedule,
    ScheduledOperation,
    Operation,
)
from job_shop_lib.exceptions import ValidationError


# Type alias for start time calculator callable
StartTimeCalculator = Callable[["Dispatcher", Operation, int], int]


[docs] def no_setup_time_calculator( dispatcher: Dispatcher, operation: Operation, machine_id: int ) -> int: """Default start time calculator that implements the standard behavior. The start time is the maximum of the next available time for the machine and the next available time for the job to which the operation belongs. Args: dispatcher: The dispatcher instance. operation: The operation to be scheduled. machine_id: The id of the machine on which the operation is to be scheduled. Returns: The start time for the operation on the given machine. """ return max( dispatcher.machine_next_available_time[machine_id], dispatcher.job_next_available_time[operation.job_id], )
# Added here to avoid circular imports
[docs] class DispatcherObserver(abc.ABC): """Abstract class that allows objects to observe and respond to changes within the :class:`Dispatcher`. It follows the Observer design pattern, where observers subscribe to the dispatcher and receive updates when certain events occur, such as when an operation is scheduled or when the dispatcher is reset. Attributes: dispatcher: The :class:`Dispatcher` instance to observe. Args: dispatcher: The :class:`Dispatcher` instance to observe. subscribe: If ``True``, automatically subscribes the observer to the dispatcher when it is initialized. Defaults to ``True``. Raises: ValidationError: If ``is_singleton`` is ``True`` and an observer of the same type already exists in the dispatcher's list of subscribers. Example: .. code-block:: python from job_shop_lib.dispatching import DispatcherObserver, Dispatcher from job_shop_lib import ScheduledOperation class HistoryObserver(DispatcherObserver): def __init__(self, dispatcher: Dispatcher): super().__init__(dispatcher) self.history: list[ScheduledOperation] = [] def update(self, scheduled_operation: ScheduledOperation): self.history.append(scheduled_operation) def reset(self): self.history = [] """ # Made read-only following Google Style Guide recommendation _is_singleton = True """If True, ensures only one instance of this observer type is subscribed to the dispatcher.""" def __init__( self, dispatcher: Dispatcher, *, subscribe: bool = True, ): if self._is_singleton and any( isinstance(observer, self.__class__) for observer in dispatcher.subscribers ): raise ValidationError( f"An observer of type {self.__class__.__name__} already " "exists in the dispatcher's list of subscribers. If you want " "to create multiple instances of this observer, set " "`is_singleton` to False." ) self.dispatcher = dispatcher if subscribe: self.dispatcher.subscribe(self) @property def is_singleton(self) -> bool: """Returns whether this observer is a singleton. This is a class attribute that determines whether only one instance of this observer type can be subscribed to the dispatcher. """ return self._is_singleton
[docs] @abc.abstractmethod def update(self, scheduled_operation: ScheduledOperation): """Called when an operation is scheduled on a machine."""
[docs] @abc.abstractmethod def reset(self): """Called when the dispatcher is reset."""
def __str__(self) -> str: return self.__class__.__name__ def __repr__(self) -> str: return self.__class__.__name__
# Disable pylint's false positive # pylint: disable=invalid-name ObserverType = TypeVar("ObserverType", bound=DispatcherObserver) def _dispatcher_cache(method): """Decorator to cache results of a method based on its name. This decorator assumes that the class has an attribute called `_cache` that is a dictionary. It caches the result of the method based on its name. If the result is already cached, it returns the cached result instead of recomputing it. The decorator is useful since the dispatcher class can clear the cache when the state of the dispatcher changes. """ @wraps(method) def wrapper(self: Dispatcher, *args, **kwargs): # pylint: disable=protected-access cache_key = method.__name__ cached_result = self._cache.get(cache_key) if cached_result is not None: return cached_result result = method(self, *args, **kwargs) self._cache[cache_key] = result return result return wrapper # pylint: disable=too-many-instance-attributes
[docs] class Dispatcher: """Handles the logic of scheduling operations on machines. This class allow us to just define the order in which operations are sequenced and the machines in which they are processed. It is then responsible for scheduling the operations on the machines and keeping track of the next available time for each machine and job. The core method of the class are: .. autosummary:: dispatch reset It also provides methods to query the state of the schedule and the operations: .. autosummary:: current_time available_operations available_machines available_jobs unscheduled_operations scheduled_operations ongoing_operations completed_operations uncompleted_operations is_scheduled is_ongoing next_operation earliest_start_time remaining_duration The above methods which do not take any arguments are cached to improve performance. After each scheduling operation, the cache is cleared. Args: instance: The instance of the job shop problem to be solved. ready_operations_filter: A function that filters out operations that are not ready to be scheduled. The function should take the dispatcher and a list of operations as input and return a list of operations that are ready to be scheduled. If ``None``, no filtering is done. start_time_calculator: A function that calculates the start time for a given operation on a given machine. The function should take a dispatcher, an operation and a machine id as input and return the start time for the operation. Defaults to the standard calculation which is the maximum of the machine's next available time and the job's next available time. This allows for implementing context-dependent setup times, downtime, and machine breakdowns. """ __slots__ = { "instance": "The instance of the job shop problem to be scheduled.", "schedule": "The schedule of operations on machines.", "_machine_next_available_time": "", "_job_next_operation_index": "", "_job_next_available_time": "", "ready_operations_filter": ( "A function that filters out operations that are not ready to be " "scheduled." ), "start_time_calculator": ( "A function that calculates the start time for a given operation " "on a given machine." ), "subscribers": "A list of observers subscribed to the dispatcher.", "_cache": "A dictionary to cache the results of the cached methods.", } def __init__( self, instance: JobShopInstance, ready_operations_filter: ( Callable[[Dispatcher, list[Operation]], list[Operation]] | None ) = None, start_time_calculator: StartTimeCalculator = ( no_setup_time_calculator ), ) -> None: self.instance = instance self.schedule = Schedule(self.instance) self.ready_operations_filter = ready_operations_filter self.start_time_calculator = start_time_calculator self.subscribers: list[DispatcherObserver] = [] self._machine_next_available_time = [0] * self.instance.num_machines self._job_next_operation_index = [0] * self.instance.num_jobs self._job_next_available_time = [0] * self.instance.num_jobs self._cache: dict[str, Any] = {} def __str__(self) -> str: return f"{self.__class__.__name__}({self.instance})" def __repr__(self) -> str: return str(self) @property def machine_next_available_time(self) -> list[int]: """Returns the next available time for each machine.""" return self._machine_next_available_time @property def job_next_operation_index(self) -> list[int]: """Returns the index of the next operation to be scheduled for each job.""" return self._job_next_operation_index @property def job_next_available_time(self) -> list[int]: """Returns the next available time for each job.""" return self._job_next_available_time
[docs] def subscribe(self, observer: DispatcherObserver): """Subscribes an observer to the dispatcher.""" self.subscribers.append(observer)
[docs] def unsubscribe(self, observer: DispatcherObserver): """Unsubscribes an observer from the dispatcher.""" self.subscribers.remove(observer)
[docs] def reset(self) -> None: """Resets the dispatcher to its initial state.""" self.schedule.reset() self._machine_next_available_time = [0] * self.instance.num_machines self._job_next_operation_index = [0] * self.instance.num_jobs self._job_next_available_time = [0] * self.instance.num_jobs self._cache = {} for subscriber in self.subscribers: subscriber.reset()
[docs] def dispatch( self, operation: Operation, machine_id: int | None = None ) -> None: """Schedules the given operation on the given machine. The start time of the operation is computed based on the next available time for the machine and the next available time for the job to which the operation belongs. The operation is then scheduled on the machine and the tracking attributes are updated. Args: operation: The operation to be scheduled. machine_id: The id of the machine on which the operation is to be scheduled. If ``None``, the :class:`~job_shop_lib.Operation`'s :attr:`~job_shop_lib.Operation.machine_id` attribute is used. Raises: ValidationError: If the operation is not ready to be scheduled. UninitializedAttributeError: If the operation has multiple machines in its list and no ``machine_id`` is provided. """ if not self.is_operation_ready(operation): raise ValidationError("Operation is not ready to be scheduled.") if machine_id is None: machine_id = operation.machine_id start_time = self.start_time(operation, machine_id) scheduled_operation = ScheduledOperation( operation, start_time, machine_id ) self.schedule.add(scheduled_operation) self._update_tracking_attributes(scheduled_operation)
[docs] def is_operation_ready(self, operation: Operation) -> bool: """Returns True if the given operation is ready to be scheduled. An operation is ready to be scheduled if it is the next operation to be scheduled for its job. Args: operation: The operation to be checked. """ return ( self._job_next_operation_index[operation.job_id] == operation.position_in_job )
[docs] def start_time( self, operation: Operation, machine_id: int, ) -> int: """Computes the start time for the given operation on the given machine. Uses the configured start time calculator to compute the start time. By default, this is the maximum of the next available time for the machine and the next available time for the job to which the operation belongs. Args: operation: The operation to be scheduled. machine_id: The id of the machine on which the operation is to be scheduled. """ return self.start_time_calculator(self, operation, machine_id)
def _update_tracking_attributes( self, scheduled_operation: ScheduledOperation ) -> None: # Variables defined here to make the lines shorter job_id = scheduled_operation.job_id machine_id = scheduled_operation.machine_id end_time = scheduled_operation.end_time self._machine_next_available_time[machine_id] = end_time self._job_next_operation_index[job_id] += 1 self._job_next_available_time[job_id] = end_time self._cache = {} # Notify subscribers for subscriber in self.subscribers: subscriber.update(scheduled_operation)
[docs] def create_or_get_observer( self, observer: type[ObserverType], condition: Callable[[DispatcherObserver], bool] = lambda _: True, **kwargs, ) -> ObserverType: """Creates a new observer of the specified type or returns an existing observer of the same type if it already exists in the dispatcher's list of observers. Args: observer: The type of observer to be created or retrieved. condition: A function that takes an observer and returns True if it is the observer to be retrieved. By default, it returns True for all observers. **kwargs: Additional keyword arguments to be passed to the observer's constructor. """ for existing_observer in self.subscribers: if isinstance(existing_observer, observer) and condition( existing_observer ): return existing_observer new_observer = observer(self, **kwargs) return new_observer
[docs] @_dispatcher_cache def current_time(self) -> int: """Returns the current time of the schedule. The current time is the minimum start time of the available operations. """ available_operations = self.available_operations() current_time = self.min_start_time(available_operations) return current_time
[docs] def min_start_time(self, operations: list[Operation]) -> int: """Returns the minimum start time of the available operations.""" if not operations: return self.schedule.makespan() min_start_time = float("inf") for op in operations: for machine_id in op.machines: start_time = self.start_time(op, machine_id) min_start_time = min(min_start_time, start_time) return int(min_start_time)
[docs] @_dispatcher_cache def available_operations(self) -> list[Operation]: """Returns a list of available operations for processing, optionally filtering out operations using the filter function. This method first gathers all possible next operations from the jobs being processed. It then optionally filters these operations using the filter function. Returns: A list of :class:`Operation` objects that are available for scheduling. """ available_operations = self.raw_ready_operations() if self.ready_operations_filter is not None: available_operations = self.ready_operations_filter( self, available_operations ) return available_operations
[docs] @_dispatcher_cache def raw_ready_operations(self) -> list[Operation]: """Returns a list of available operations for processing without applying the filter function. Returns: A list of :class:`Operation` objects that are available for scheduling based on precedence and machine constraints only. """ available_operations = [ self.instance.jobs[job_id][position] for job_id, position in enumerate(self._job_next_operation_index) if position < len(self.instance.jobs[job_id]) ] return available_operations
[docs] @_dispatcher_cache def unscheduled_operations(self) -> list[Operation]: """Returns the list of operations that have not been scheduled.""" unscheduled_operations = [] for job_id, next_position in enumerate(self._job_next_operation_index): operations = self.instance.jobs[job_id][next_position:] unscheduled_operations.extend(operations) return unscheduled_operations
[docs] @_dispatcher_cache def scheduled_operations(self) -> list[ScheduledOperation]: """Returns the list of operations that have been scheduled.""" scheduled_operations = [] for machine_schedule in self.schedule.schedule: scheduled_operations.extend(machine_schedule) return scheduled_operations
[docs] @_dispatcher_cache def available_machines(self) -> list[int]: """Returns the list of ready machines.""" available_operations = self.available_operations() available_machines = set() for operation in available_operations: available_machines.update(operation.machines) return list(available_machines)
[docs] @_dispatcher_cache def available_jobs(self) -> list[int]: """Returns the list of ready jobs.""" available_operations = self.available_operations() available_jobs = set( operation.job_id for operation in available_operations ) return list(available_jobs)
[docs] def earliest_start_time(self, operation: Operation) -> int: """Calculates the earliest start time for a given operation based on machine and job constraints. This method is different from the ``start_time`` method in that it takes into account every machine that can process the operation, not just the one that will process it. However, it also assumes that the operation is ready to be scheduled in the job in favor of performance. Args: operation: The operation for which to calculate the earliest start time. Returns: The earliest start time for the operation. """ machine_earliest_start_time = min( self._machine_next_available_time[machine_id] for machine_id in operation.machines ) job_start_time = self._job_next_available_time[operation.job_id] return max(machine_earliest_start_time, job_start_time)
[docs] def remaining_duration( self, scheduled_operation: ScheduledOperation ) -> int: """Calculates the remaining duration of a scheduled operation. The method computes the remaining time for an operation to finish, based on the maximum of the operation's start time or the current time. This helps in determining how much time is left from 'now' until the operation is completed. Args: scheduled_operation: The operation for which to calculate the remaining time. Returns: The remaining duration. """ adjusted_start_time = max( scheduled_operation.start_time, self.current_time() ) return scheduled_operation.end_time - adjusted_start_time
[docs] @_dispatcher_cache def completed_operations(self) -> set[ScheduledOperation]: """Returns the set of operations that have been completed. This method returns the operations that have been scheduled and the current time is greater than or equal to the end time of the operation. """ scheduled_operations = set(self.scheduled_operations()) ongoing_operations = set(self.ongoing_operations()) completed_operations = scheduled_operations - ongoing_operations return completed_operations
[docs] @_dispatcher_cache def uncompleted_operations(self) -> list[Operation]: """Returns the list of operations that have not been completed yet. This method checks for operations that either haven't been scheduled or have been scheduled but haven't reached their completion time. Note: The behavior of this method changed in version 0.5.0. Previously, it only returned unscheduled operations. For the old behavior, use the `unscheduled_operations` method. """ uncompleted_operations = self.unscheduled_operations() uncompleted_operations.extend( scheduled_operation.operation for scheduled_operation in self.ongoing_operations() ) return uncompleted_operations
[docs] @_dispatcher_cache def ongoing_operations(self) -> list[ScheduledOperation]: """Returns the list of operations that are currently being processed. This method returns the operations that have been scheduled and are currently being processed by the machines. """ current_time = self.current_time() ongoing_operations = [] for machine_schedule in self.schedule.schedule: for scheduled_operation in reversed(machine_schedule): is_completed = scheduled_operation.end_time <= current_time if is_completed: break ongoing_operations.append(scheduled_operation) return ongoing_operations
[docs] def is_scheduled(self, operation: Operation) -> bool: """Checks if the given operation has been scheduled.""" job_next_op_idx = self._job_next_operation_index[operation.job_id] return operation.position_in_job < job_next_op_idx
[docs] def is_ongoing(self, scheduled_operation: ScheduledOperation) -> bool: """Checks if the given operation is currently being processed.""" current_time = self.current_time() return scheduled_operation.start_time <= current_time
[docs] def next_operation(self, job_id: int) -> Operation: """Returns the next operation to be scheduled for the given job. Args: job_id: The id of the job for which to return the next operation. Raises: ValidationError: If there are no more operations left for the job. """ if ( len(self.instance.jobs[job_id]) <= self._job_next_operation_index[job_id] ): raise ValidationError( f"No more operations left for job {job_id} to schedule." ) return self.instance.jobs[job_id][ self._job_next_operation_index[job_id] ]