Source code for desmod.queue

"""Queue classes useful for modeling.

A queue may be used for inter-process message passing, resource pools,
event sequences, and many other modeling applications. The :class:`~Queue`
class implements a simulation-aware, general-purpose queue useful for these
modeling applications.

The :class:`~PriorityQueue` class is an alternative to :class:`~Queue` that
dequeues items in priority-order instead of :class:`Queue`'s FIFO discipline.

"""
from heapq import heapify, heappop, heappush
from types import TracebackType
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Generic,
    Iterable,
    List,
    NamedTuple,
    Optional,
    Type,
    TypeVar,
    Union,
)

from simpy.core import BoundClass, Environment
from simpy.events import Event

EventCallback = Callable[[Event], None]


class QueuePutEvent(Event):
    callbacks: List[EventCallback]

    def __init__(self, queue: 'Queue[ItemType]', item: Any) -> None:
        super().__init__(queue.env)
        self.queue = queue
        self.item = item
        queue._put_waiters.append(self)
        self.callbacks.extend([queue._trigger_when_at_least, queue._trigger_get])
        queue._trigger_put()

    def __enter__(self) -> 'QueuePutEvent':
        return self

    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_value: Optional[BaseException],
        traceback: Optional[TracebackType],
    ) -> Optional[bool]:
        self.cancel()
        return None

    def cancel(self) -> None:
        if not self.triggered:
            self.queue._put_waiters.remove(self)
            self.callbacks = None  # type: ignore[assignment] # noqa: F821


class QueueGetEvent(Event):
    callbacks: List[EventCallback]

    def __init__(self, queue: 'Queue[ItemType]') -> None:
        super().__init__(queue.env)
        self.queue = queue
        queue._get_waiters.append(self)
        self.callbacks.extend([queue._trigger_when_at_most, queue._trigger_put])
        queue._trigger_get()

    def __enter__(self) -> 'QueueGetEvent':
        return self

    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_value: Optional[BaseException],
        traceback: Optional[TracebackType],
    ) -> Optional[bool]:
        self.cancel()
        return None

    def cancel(self) -> None:
        if not self.triggered:
            self.queue._get_waiters.remove(self)
            self.callbacks = None  # type: ignore[assignment] # noqa: F821


class QueueWhenAtMostEvent(Event):
    def __init__(self, queue: 'Queue[ItemType]', num_items: Union[int, float]) -> None:
        super().__init__(queue.env)
        self.queue = queue
        self.num_items = num_items
        heappush(queue._at_most_waiters, self)
        queue._trigger_when_at_most()

    def __lt__(self, other: 'QueueWhenAtMostEvent') -> bool:
        return self.num_items > other.num_items

    def __enter__(self) -> 'QueueWhenAtMostEvent':
        return self

    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_value: Optional[BaseException],
        traceback: Optional[TracebackType],
    ) -> Optional[bool]:
        self.cancel()
        return None

    def cancel(self) -> None:
        if not self.triggered:
            self.queue._at_most_waiters.remove(self)
            heapify(self.queue._at_most_waiters)
            self.callbacks = None  # type: ignore[assignment] # noqa: F821


class QueueWhenAtLeastEvent(Event):
    def __init__(self, queue: 'Queue[ItemType]', num_items: Union[int, float]) -> None:
        super().__init__(queue.env)
        self.queue = queue
        self.num_items = num_items
        heappush(queue._at_least_waiters, self)
        queue._trigger_when_at_least()

    def __lt__(self, other: 'QueueWhenAtLeastEvent') -> bool:
        return self.num_items < other.num_items

    def __enter__(self) -> 'QueueWhenAtLeastEvent':
        return self

    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_value: Optional[BaseException],
        traceback: Optional[TracebackType],
    ) -> Optional[bool]:
        self.cancel()
        return None

    def cancel(self) -> None:
        if not self.triggered:
            self.queue._at_least_waiters.remove(self)
            heapify(self.queue._at_least_waiters)
            self.callbacks = None  # type: ignore[assignment] # noqa: F821


class QueueWhenAnyEvent(QueueWhenAtLeastEvent):
    def __init__(self, queue: 'Queue[ItemType]') -> None:
        super().__init__(queue, num_items=1)


class QueueWhenFullEvent(QueueWhenAtLeastEvent):
    def __init__(self, queue: 'Queue[ItemType]') -> None:
        super().__init__(queue, num_items=queue.capacity)


class QueueWhenNotFullEvent(QueueWhenAtMostEvent):
    def __init__(self, queue: 'Queue[ItemType]') -> None:
        super().__init__(queue, num_items=queue.capacity - 1)


class QueueWhenEmptyEvent(QueueWhenAtMostEvent):
    def __init__(self, queue: 'Queue[ItemType]') -> None:
        super().__init__(queue, num_items=0)


ItemType = TypeVar('ItemType')


[docs]class Queue(Generic[ItemType]): """Simulation queue of arbitrary items. `Queue` is similar to :class:`simpy.Store`. It provides a simulation-aware first-in first-out (FIFO) queue useful for passing messages between simulation processes or managing a pool of objects needed by multiple processes. Items are enqueued and dequeued using :meth:`put()` and :meth:`get()`. :param env: Simulation environment. :param capacity: Capacity of the queue; infinite by default. :param hard_cap: If specified, the queue overflows when the `capacity` is reached. :param items: Optional sequence of items to pre-populate the queue. :param name: Optional name to associate with the queue. """ def __init__( self, env: Environment, capacity: Union[int, float] = float('inf'), hard_cap: bool = False, items: Iterable[ItemType] = (), name: Optional[str] = None, ) -> None: self.env = env #: Capacity of the queue (maximum number of items). self.capacity = capacity self._hard_cap = hard_cap self.items: List[ItemType] = list(items) self.name = name self._put_waiters: List[QueuePutEvent] = [] self._get_waiters: List[QueueGetEvent] = [] self._at_most_waiters: List[QueueWhenAtMostEvent] = [] self._at_least_waiters: List[QueueWhenAtLeastEvent] = [] self._put_hook: Optional[Callable[[], Any]] = None self._get_hook: Optional[Callable[[], Any]] = None BoundClass.bind_early(self) @property def size(self) -> int: """Number of items in queue.""" return len(self.items) @property def remaining(self) -> Union[int, float]: """Remaining queue capacity.""" return self.capacity - len(self.items) @property def is_empty(self) -> bool: """Indicates whether the queue is empty.""" return not self.items @property def is_full(self) -> bool: """Indicates whether the queue is full.""" return len(self.items) >= self.capacity
[docs] def peek(self) -> ItemType: """Peek at the next item in the queue.""" return self.items[0]
if TYPE_CHECKING:
[docs] def put(self, item: ItemType) -> QueuePutEvent: """Enqueue an item on the queue.""" ...
[docs] def get(self) -> QueueGetEvent: """Dequeue an item from the queue.""" ...
[docs] def when_at_least(self, num_items: int) -> QueueWhenAtLeastEvent: """Return an event triggered when the queue has at least n items.""" ...
[docs] def when_at_most(self, num_items: int) -> QueueWhenAtMostEvent: """Return an event triggered when the queue has at most n items.""" ...
[docs] def when_any(self) -> QueueWhenAnyEvent: """Return an event triggered when the queue is non-empty.""" ...
[docs] def when_full(self) -> QueueWhenFullEvent: """Return an event triggered when the queue becomes full.""" ...
[docs] def when_not_full(self) -> QueueWhenNotFullEvent: """Return an event triggered when the queue becomes not full.""" ...
[docs] def when_empty(self) -> QueueWhenEmptyEvent: """Return an event triggered when the queue becomes empty.""" ...
else: put = BoundClass(QueuePutEvent) get = BoundClass(QueueGetEvent) when_at_least = BoundClass(QueueWhenAtLeastEvent) when_at_most = BoundClass(QueueWhenAtMostEvent) when_any = BoundClass(QueueWhenAnyEvent) when_full = BoundClass(QueueWhenFullEvent) when_not_full = BoundClass(QueueWhenNotFullEvent) when_empty = BoundClass(QueueWhenEmptyEvent) def _enqueue_item(self, item: ItemType) -> None: self.items.append(item) def _dequeue_item(self) -> ItemType: return self.items.pop(0) def _trigger_put(self, _: Optional[Event] = None) -> None: while self._put_waiters: if len(self.items) < self.capacity: put_ev = self._put_waiters.pop(0) self._enqueue_item(put_ev.item) put_ev.succeed() if self._put_hook: self._put_hook() elif self._hard_cap: raise OverflowError() else: break def _trigger_get(self, _: Optional[Event] = None) -> None: while self._get_waiters and self.items: get_ev = self._get_waiters.pop(0) item = self._dequeue_item() get_ev.succeed(item) if self._get_hook: self._get_hook() def _trigger_when_at_least(self, _: Optional[Event] = None) -> None: while ( self._at_least_waiters and self.size >= self._at_least_waiters[0].num_items ): when_at_least_ev = heappop(self._at_least_waiters) when_at_least_ev.succeed() def _trigger_when_at_most(self, _: Optional[Event] = None) -> None: while self._at_most_waiters and self.size <= self._at_most_waiters[0].num_items: at_most_ev = heappop(self._at_most_waiters) at_most_ev.succeed() def __repr__(self) -> str: return ( f'{self.__class__.__name__}(' f'name={self.name!r} size={self.size} capacity={self.capacity})' )
[docs]class PriorityItem(NamedTuple): """Wrap items with explicit priority for use with :class:`~PriorityQueue`. :param priority: Orderable priority value. Smaller values are dequeued first. :param item: Arbitrary item. Only the `priority` is determines dequeue order, so the `item` itself does not have to be orderable. """ priority: Any item: Any def __lt__( # type: ignore[override] # noqa: F821 self, other: 'PriorityItem' ) -> bool: return self.priority < other.priority
[docs]class PriorityQueue(Queue[ItemType]): """Specialized queue where items are dequeued in priority order. Items in `PriorityQueue` must be orderable (implement :meth:`~object.__lt__`). Unorderable items may be used with `PriorityQueue` by wrapping with :class:`~PriorityItem`. Items that evaluate less-than other items will be dequeued first. """ def __init__( self, env: Environment, capacity: Union[int, float] = float('inf'), hard_cap: bool = False, items: Iterable[ItemType] = (), name: Optional[str] = None, ) -> None: super().__init__(env, capacity, hard_cap, items, name) heapify(self.items) def _enqueue_item(self, item: ItemType) -> None: heappush(self.items, item) def _dequeue_item(self) -> ItemType: return heappop(self.items)