"""Pool class for modeling a container of resources.
A pool models a container of homogeneous resources, similar to
:class:`simpy.resources.Container`, but with additional events when the
container is empty or full. Resources are :func:`Pool.put` or :func:`Pool.get`
to/from the pool in specified amounts. The pool's resources may be modeled as
either discrete or continuous depending on whether the put/get amounts are
`int` or `float`.
"""
from heapq import heapify, heappop, heappush
from sys import float_info
from types import TracebackType
from typing import TYPE_CHECKING, Any, Callable, List, Optional, Type, Union
from simpy.core import BoundClass, Environment
from simpy.events import Event
EventCallback = Callable[[Event], None]
PoolAmount = Union[int, float]
class PoolPutEvent(Event):
callbacks: List[EventCallback]
def __init__(self, pool: 'Pool', amount: PoolAmount = 1) -> None:
if not (0 < amount <= pool.capacity):
raise ValueError('amount must be in (0, capacity]')
super().__init__(pool.env)
self.pool = pool
self.amount = amount
self.callbacks.extend([pool._trigger_when_at_least, pool._trigger_get])
pool._put_waiters.append(self)
pool._trigger_put()
def __enter__(self) -> 'PoolPutEvent':
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> Optional[bool]:
self.cancel()
return None
def cancel(self) -> None:
if not self.triggered:
self.pool._put_waiters.remove(self)
self.callbacks = None # type: ignore[assignment] # noqa: F821
class PoolGetEvent(Event):
callbacks: List[EventCallback]
def __init__(self, pool: 'Pool', amount: PoolAmount = 1) -> None:
if not (0 < amount <= pool.capacity):
raise ValueError('amount must be in (0, capacity]')
super().__init__(pool.env)
self.pool = pool
self.amount = amount
self.callbacks.extend([pool._trigger_when_at_most, pool._trigger_put])
pool._get_waiters.append(self)
pool._trigger_get()
def __enter__(self) -> 'PoolGetEvent':
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> Optional[bool]:
self.cancel()
return None
def cancel(self) -> None:
if not self.triggered:
self.pool._get_waiters.remove(self)
self.callbacks = None # type: ignore[assignment] # noqa: F821
class PoolWhenAtMostEvent(Event):
def __init__(self, pool: 'Pool', amount: PoolAmount) -> None:
super().__init__(pool.env)
self.pool = pool
self.amount = amount
heappush(pool._at_most_waiters, self)
pool._trigger_when_at_most()
def __lt__(self, other: 'PoolWhenAtMostEvent') -> bool:
return self.amount > other.amount
def __enter__(self) -> 'PoolWhenAtMostEvent':
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> Optional[bool]:
self.cancel()
return None
def cancel(self) -> None:
if not self.triggered:
self.pool._at_most_waiters.remove(self)
heapify(self.pool._at_most_waiters)
self.callbacks = None # type: ignore[assignment] # noqa: F821
class PoolWhenAtLeastEvent(Event):
def __init__(self, pool: 'Pool', amount: PoolAmount) -> None:
super().__init__(pool.env)
self.pool = pool
self.amount = amount
heappush(pool._at_least_waiters, self)
pool._trigger_when_at_least()
def __lt__(self, other: 'PoolWhenAtLeastEvent') -> bool:
return self.amount < other.amount
def __enter__(self) -> 'PoolWhenAtLeastEvent':
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> Optional[bool]:
self.cancel()
return None
def cancel(self) -> None:
if not self.triggered:
self.pool._at_least_waiters.remove(self)
heapify(self.pool._at_least_waiters)
self.callbacks = None # type: ignore[assignment] # noqa: F821
class PoolWhenAnyEvent(PoolWhenAtLeastEvent):
def __init__(self, pool: 'Pool', epsilon: float = float_info.min):
super().__init__(pool, amount=epsilon)
class PoolWhenFullEvent(PoolWhenAtLeastEvent):
def __init__(self, pool: 'Pool'):
super().__init__(pool, amount=pool.capacity)
class PoolWhenNotFullEvent(PoolWhenAtMostEvent):
def __init__(self, pool: 'Pool', epsilon: Optional[float] = None):
if epsilon is None and isinstance(pool.capacity, int):
epsilon = 0.5
assert epsilon is not None, "when_not_any(epsilon) is required for float Pool."
super().__init__(pool, amount=pool.capacity - epsilon)
class PoolWhenEmptyEvent(PoolWhenAtMostEvent):
def __init__(self, pool: 'Pool'):
super().__init__(pool, amount=0)
[docs]class Pool:
"""Simulation pool of discrete or continuous resources.
`Pool` is similar to :class:`simpy.resources.Container`.
It provides a simulation-aware container for managing a shared pool of
resources. The resources can be either discrete objects (like apples) or
continuous (like water).
Resources are added and removed using :meth:`put()` and :meth:`get()`.
:param env: Simulation environment.
:param capacity: Capacity of the pool; infinite by default.
:param hard_cap:
If specified, the pool overflows when the `capacity` is reached.
:param init_level: Initial level of the pool.
:param name: Optional name to associate with the queue.
"""
def __init__(
self,
env: Environment,
capacity: PoolAmount = float('inf'),
init: PoolAmount = 0,
hard_cap: bool = False,
name: Optional[str] = None,
):
self.env = env
#: Capacity of the pool (maximum level).
self.capacity = capacity
#: Current fill level of the pool.
self.level = init
self._hard_cap = hard_cap
self.name = name
self._put_waiters: List[PoolPutEvent] = []
self._get_waiters: List[PoolGetEvent] = []
self._at_most_waiters: List[PoolWhenAtMostEvent] = []
self._at_least_waiters: List[PoolWhenAtLeastEvent] = []
self._put_hook: Optional[Callable[[], Any]] = None
self._get_hook: Optional[Callable[[], Any]] = None
BoundClass.bind_early(self)
@property
def remaining(self) -> PoolAmount:
"""Remaining pool capacity."""
return self.capacity - self.level
@property
def is_empty(self) -> bool:
"""Indicates whether the pool is empty."""
return self.level == 0
@property
def is_full(self) -> bool:
"""Indicates whether the pool is full."""
return self.level >= self.capacity
if TYPE_CHECKING:
[docs] def put(self, amount: PoolAmount = 1) -> PoolPutEvent:
"""Put amount in the pool."""
...
[docs] def get(self, amount: PoolAmount = 1) -> PoolGetEvent:
"""Get amount from the pool."""
...
[docs] def when_at_least(self, amount: PoolAmount) -> PoolWhenAtLeastEvent:
"""Return an event triggered when the pool has at least `amount` items."""
...
[docs] def when_at_most(self, amount: PoolAmount) -> PoolWhenAtMostEvent:
"""Return an event triggered when the pool has at most `amount` items."""
...
[docs] def when_any(self, epsilon: float = ...) -> PoolWhenAnyEvent:
"""Return an event triggered when the pool is non-empty."""
...
[docs] def when_full(self) -> PoolWhenFullEvent:
"""Return an event triggered when the pool becomes full."""
...
[docs] def when_not_full(self, epsilon: float = ...) -> PoolWhenNotFullEvent:
"""Return an event triggered when the pool becomes not full."""
...
[docs] def when_empty(self) -> PoolWhenEmptyEvent:
"""Return an event triggered when the pool becomes empty."""
...
else:
put = BoundClass(PoolPutEvent)
get = BoundClass(PoolGetEvent)
when_at_least = BoundClass(PoolWhenAtLeastEvent)
when_at_most = BoundClass(PoolWhenAtMostEvent)
when_any = BoundClass(PoolWhenAnyEvent)
when_full = BoundClass(PoolWhenFullEvent)
when_not_full = BoundClass(PoolWhenNotFullEvent)
when_empty = BoundClass(PoolWhenEmptyEvent)
def _trigger_put(self, _: Optional[Event] = None) -> None:
idx = 0
while self._put_waiters and idx < len(self._put_waiters):
put_ev = self._put_waiters[idx]
if self.capacity - self.level >= put_ev.amount:
self._put_waiters.pop(idx)
self.level += put_ev.amount
put_ev.succeed()
if self._put_hook:
self._put_hook()
elif self._hard_cap:
raise OverflowError()
else:
idx += 1
def _trigger_get(self, _: Optional[Event] = None) -> None:
idx = 0
while self._get_waiters and idx < len(self._get_waiters):
get_ev = self._get_waiters[idx]
if get_ev.amount <= self.level:
self._get_waiters.pop(idx)
self.level -= get_ev.amount
get_ev.succeed(get_ev.amount)
if self._get_hook:
self._get_hook()
else:
idx += 1
def _trigger_when_at_least(self, _: Optional[Event] = None) -> None:
while self._at_least_waiters and self.level >= self._at_least_waiters[0].amount:
when_at_least_ev = heappop(self._at_least_waiters)
when_at_least_ev.succeed()
def _trigger_when_at_most(self, _: Optional[Event] = None) -> None:
while self._at_most_waiters and self.level <= self._at_most_waiters[0].amount:
at_most_ev = heappop(self._at_most_waiters)
at_most_ev.succeed()
def __repr__(self) -> str:
return (
f'{self.__class__.__name__}(name={self.name!r} level={self.level}'
f' capacity={self.capacity})'
)
class PriorityPoolPutEvent(Event):
callbacks: List[EventCallback]
def __init__(
self, pool: 'PriorityPool', amount: PoolAmount = 1, priority: int = 0
) -> None:
if not (0 < amount <= pool.capacity):
raise ValueError('amount must be in (0, capacity]')
super().__init__(pool.env)
self.pool = pool
self.amount = amount
self.key = priority, pool._event_count
pool._event_count += 1
self.callbacks.extend([pool._trigger_when_at_least, pool._trigger_get])
heappush(pool._put_waiters, self)
pool._trigger_put()
def __lt__(self, other: 'PriorityPoolPutEvent') -> bool:
return self.key < other.key
def __enter__(self) -> 'PriorityPoolPutEvent':
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> Optional[bool]:
self.cancel()
return None
def cancel(self) -> None:
if not self.triggered:
self.pool._put_waiters.remove(self)
heapify(self.pool._put_waiters)
self.callbacks = None # type: ignore[assignment] # noqa: F821
class PriorityPoolGetEvent(Event):
callbacks: List[EventCallback]
def __init__(self, pool: 'PriorityPool', amount: PoolAmount = 1, priority: int = 0):
if not (0 < amount <= pool.capacity):
raise ValueError('amount must be in (0, capacity]')
super().__init__(pool.env)
self.pool = pool
self.amount = amount
self.key = priority, pool._event_count
pool._event_count += 1
self.callbacks.extend([pool._trigger_when_at_most, pool._trigger_put])
heappush(pool._get_waiters, self)
pool._trigger_get()
def __lt__(self, other: 'PriorityPoolGetEvent') -> bool:
return self.key < other.key
def __enter__(self) -> 'PriorityPoolGetEvent':
return self
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> Optional[bool]:
self.cancel()
return None
def cancel(self) -> None:
if not self.triggered:
self.pool._get_waiters.remove(self)
heapify(self.pool._get_waiters)
self.callbacks = None # type: ignore[assignment] # noqa: F821
[docs]class PriorityPool(Pool):
"""Pool with prioritizied put() and get() requests.
A priority is provided with `put()` and `get()` requests. This priority
determines the strict order in which requests are fulfilled. Requests of
the same priority are serviced in strict FIFO order.
"""
_put_waiters: List[PriorityPoolPutEvent] # type: ignore[assignment] # noqa: F821
_get_waiters: List[PriorityPoolGetEvent] # type: ignore[assignment] # noqa: F821
def __init__(
self,
env: Environment,
capacity: PoolAmount = float('inf'),
init: PoolAmount = 0,
hard_cap: bool = False,
name: Optional[str] = None,
):
super().__init__(env, capacity, init, hard_cap, name)
self._event_count = 0
if TYPE_CHECKING:
[docs] def put( # type: ignore[override] # noqa: F821
self, amount: PoolAmount = 1, priority: int = 0
) -> PriorityPoolPutEvent:
"""Put amount in the pool."""
...
[docs] def get( # type: ignore[override] # noqa: F821
self, amount: PoolAmount = 1, priority: int = 0
) -> PriorityPoolGetEvent:
"""Get amount from the pool."""
...
else:
put = BoundClass(PriorityPoolPutEvent)
get = BoundClass(PriorityPoolGetEvent)
def _trigger_put(self, _: Optional[Event] = None) -> None:
while self._put_waiters:
put_ev = self._put_waiters[0]
if self.capacity - self.level >= put_ev.amount:
heappop(self._put_waiters)
self.level += put_ev.amount
put_ev.succeed()
if self._put_hook:
self._put_hook()
elif self._hard_cap:
raise OverflowError()
else:
break
def _trigger_get(self, _: Optional[Event] = None) -> None:
while self._get_waiters:
get_ev = self._get_waiters[0]
if get_ev.amount <= self.level:
heappop(self._get_waiters)
self.level -= get_ev.amount
get_ev.succeed(get_ev.amount)
if self._get_hook:
self._get_hook()
else:
break