Source code for desmod.simulation

"""Simulation model with batteries included."""

from contextlib import closing
from multiprocessing import Process, Queue, cpu_count
from pprint import pprint
from threading import Thread
from types import TracebackType
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Dict,
    List,
    Optional,
    Sequence,
    Type,
    Union,
)
import json
import os
import random
import shutil
import timeit

import simpy
import yaml

from desmod.config import ConfigDict, ConfigFactor, factorial_config
from desmod.progress import (
    ProgressTuple,
    consume_multi_progress,
    get_multi_progress_manager,
    standalone_progress_manager,
)
from desmod.timescale import parse_time, scale_time
from desmod.tracer import TraceManager

if TYPE_CHECKING:
    from desmod.component import Component  # noqa: F401

ResultDict = Dict[str, Any]


[docs]class SimEnvironment(simpy.Environment): """Simulation Environment. The :class:`SimEnvironment` class is a :class:`simpy.Environment` subclass that adds some useful features: - Access to the configuration dictionary (`config`). - Access to a seeded pseudo-random number generator (`rand`). - Access to the simulation timescale (`timescale`). - Access to the simulation duration (`duration`). Some models may need to share additional state with all its :class:`desmod.component.Component` instances. SimEnvironment may be subclassed to add additional members to achieve this sharing. :param dict config: A fully-initialized configuration dictionary. """ def __init__(self, config: ConfigDict) -> None: super().__init__() #: The configuration dictionary. self.config = config #: The pseudo-random number generator; an instance of #: :class:`random.Random`. self.rand = random.Random() seed = config.setdefault('sim.seed', None) self.rand.seed(seed, version=1) timescale_str = self.config.setdefault('sim.timescale', '1 s') #: Simulation timescale ``(magnitude, units)`` tuple. The current #: simulation time is ``now * timescale``. self.timescale = parse_time(timescale_str) duration = config.setdefault('sim.duration', '0 s') #: The intended simulation duration, in units of :attr:`timescale`. self.duration = scale_time(parse_time(duration), self.timescale) #: The simulation runs "until" this event. By default, this is the #: configured "sim.duration", but may be overridden by subclasses. self.until = self.duration #: From 'meta.sim.index', the simulation's index when running multiple #: related simulations or `None` for a standalone simulation. self.sim_index: Optional[int] = config.get('meta.sim.index') #: :class:`TraceManager` instance. self.tracemgr = TraceManager(self)
[docs] def time(self, t: Optional[float] = None, unit: str = 's') -> Union[int, float]: """The current simulation time scaled to specified unit. :param float t: Time in simulation units. Default is :attr:`now`. :param str unit: Unit of time to scale to. Default is 's' (seconds). :returns: Simulation time scaled to to `unit`. """ target_scale = parse_time(unit) ts_mag, ts_unit = self.timescale sim_time = ((self.now if t is None else t) * ts_mag, ts_unit) return scale_time(sim_time, target_scale)
def get_progress(self) -> ProgressTuple: if isinstance(self.until, SimStopEvent): t_stop = self.until.t_stop else: t_stop = self.until return self.sim_index, self.now, t_stop, self.timescale
class SimStopEvent(simpy.Event): """Event appropriate for stopping the simulation. An instance of this event may be used to override `SimEnvironment.until` to dynamically choose when to stop the simulation. The simulation may be stopped by calling :meth:`schedule()`. The optional `delay` parameter may be used to schedule the simulation to stop at an offset from the current simulation time. """ def __init__(self, env: SimEnvironment) -> None: super().__init__(env) self.t_stop: Optional[Union[int, float]] = None def schedule(self, delay: Union[int, float] = 0) -> None: assert not self.triggered assert delay >= 0 self._ok = True self._value = None self.env.schedule(self, simpy.events.URGENT, delay) self.t_stop = self.env.now + delay class _Workspace: """Context manager for workspace directory management.""" def __init__(self, config: ConfigDict) -> None: self.workspace: str = config.setdefault( 'meta.sim.workspace', config.setdefault('sim.workspace', os.curdir) ) self.overwrite: bool = config.setdefault('sim.workspace.overwrite', False) self.prev_dir: str = os.getcwd() def __enter__(self) -> '_Workspace': if os.path.relpath(self.workspace) != os.curdir: workspace_exists = os.path.isdir(self.workspace) if self.overwrite and workspace_exists: shutil.rmtree(self.workspace) if self.overwrite or not workspace_exists: os.makedirs(self.workspace) os.chdir(self.workspace) return self def __exit__( self, exc_type: Optional[Type[BaseException]], exc_value: Optional[BaseException], traceback: Optional[TracebackType], ) -> Optional[bool]: os.chdir(self.prev_dir) return None
[docs]def simulate( config: ConfigDict, top_type: Type['Component'], env_type: Type[SimEnvironment] = SimEnvironment, reraise: bool = True, progress_manager=standalone_progress_manager, ) -> ResultDict: """Initialize, elaborate, and run a simulation. All exceptions are caught by `simulate()` so they can be logged and captured in the result file. By default, any unhandled exception caught by `simulate()` will be re-raised. Setting `reraise` to False prevents exceptions from propagating to the caller. Instead, the returned result dict will indicate if an exception occurred via the 'sim.exception' item. :param dict config: Configuration dictionary for the simulation. :param top_type: The model's top-level Component subclass. :param env_type: :class:`SimEnvironment` subclass. :param bool reraise: Should unhandled exceptions propogate to the caller. :returns: Dictionary containing the model-specific results of the simulation. """ t0 = timeit.default_timer() result: ResultDict = {} result_file = config.setdefault('sim.result.file') config_file = config.setdefault('sim.config.file') try: with _Workspace(config): env = env_type(config) with closing(env.tracemgr): try: top_type.pre_init(env) env.tracemgr.flush() with progress_manager(env): top = top_type(parent=None, env=env) top.elaborate() env.tracemgr.flush() env.run(until=env.until) env.tracemgr.flush() top.post_simulate() env.tracemgr.flush() top.get_result(result) except BaseException as e: env.tracemgr.trace_exception() result['sim.exception'] = repr(e) raise else: result['sim.exception'] = None finally: env.tracemgr.flush() result['config'] = config result['sim.now'] = env.now result['sim.time'] = env.time() result['sim.runtime'] = timeit.default_timer() - t0 _dump_dict(config_file, config) _dump_dict(result_file, result) except BaseException as e: if reraise: raise result.setdefault('config', config) result.setdefault('sim.runtime', timeit.default_timer() - t0) if result.get('sim.exception') is None: result['sim.exception'] = repr(e) return result
[docs]def simulate_factors( base_config: ConfigDict, factors: List[ConfigFactor], top_type: Type['Component'], env_type: Type[SimEnvironment] = SimEnvironment, jobs: Optional[int] = None, config_filter: Optional[Callable[[ConfigDict], bool]] = None, ) -> List[ResultDict]: """Run multi-factor simulations in separate processes. The `factors` are used to compose specialized config dictionaries for the simulations. The :mod:`python:multiprocessing` module is used run each simulation with a separate Python process. This allows multi-factor simulations to run in parallel on all available CPU cores. :param dict base_config: Base configuration dictionary to be specialized. :param list factors: List of factors. :param top_type: The model's top-level Component subclass. :param env_type: :class:`SimEnvironment` subclass. :param int jobs: User specified number of concurent processes. :param function config_filter: A function which will be passed a config and returns a bool to filter. :returns: Sequence of result dictionaries for each simulation. """ configs = list(factorial_config(base_config, factors, 'meta.sim.special')) ws = base_config.setdefault('sim.workspace', os.curdir) overwrite = base_config.setdefault('sim.workspace.overwrite', False) for index, config in enumerate(configs): config['meta.sim.index'] = index config['meta.sim.workspace'] = os.path.join(ws, str(index)) if config_filter is not None: configs[:] = filter(config_filter, configs) if overwrite and os.path.relpath(ws) != os.curdir and os.path.isdir(ws): shutil.rmtree(ws) return simulate_many(configs, top_type, env_type, jobs)
def simulate_many( configs: Sequence[ConfigDict], top_type: Type['Component'], env_type: Type[SimEnvironment] = SimEnvironment, jobs: Optional[int] = None, ) -> List[ResultDict]: """Run multiple experiments in separate processes. The :mod:`python:multiprocessing` module is used run each simulation with a separate Python process. This allows multi-factor simulations to run in parallel on all available CPU cores. :param dict configs: list of configuration dictionary for the simulation. :param top_type: The model's top-level Component subclass. :param env_type: :class:`SimEnvironment` subclass. :param int jobs: User specified number of concurent processes. :returns: Sequence of result dictionaries for each simulation. """ if jobs is not None and jobs < 1: raise ValueError(f'Invalid number of jobs: {jobs}') progress_enable = any( config.setdefault('sim.progress.enable', False) for config in configs ) progress_queue: Optional[ Queue[ProgressTuple] ] = Queue() if progress_enable else None result_queue: Queue[ResultDict] = Queue() config_queue: Queue[Optional[ConfigDict]] = Queue() workspaces = set() max_width = 0 for index, config in enumerate(configs): max_width = max(config.setdefault('sim.progress.max_width', 0), max_width) workspace = os.path.normpath( config.setdefault( 'meta.sim.workspace', config.setdefault('sim.workspace', os.curdir) ) ) if workspace in workspaces: raise ValueError(f'Duplicate workspace: {workspace}') workspaces.add(workspace) config.setdefault('meta.sim.index', index) config['sim.progress.enable'] = progress_enable config_queue.put(config) num_workers = min(len(configs), cpu_count()) if jobs is not None: num_workers = min(num_workers, jobs) workers = [] for i in range(num_workers): worker = Process( name=f'sim-worker-{i}', target=_simulate_worker, args=( top_type, env_type, False, progress_queue, config_queue, result_queue, ), ) worker.daemon = True # Workers die if main process dies. worker.start() workers.append(worker) config_queue.put(None) # A stop sentinel for each worker. if progress_enable: progress_thread = Thread( target=consume_multi_progress, args=(progress_queue, num_workers, len(configs), max_width), ) progress_thread.daemon = True progress_thread.start() results = [result_queue.get() for _ in configs] if progress_enable: # Although this is a daemon thread, we still make a token attempt to # join with it. This avoids a race with certain testing frameworks # (ahem, py.test) that may monkey-patch and close stderr while # progress_thread is still using it. progress_thread.join(1) for worker in workers: worker.join(5) return sorted(results, key=lambda r: r['config']['meta.sim.index']) def _simulate_worker( top_type: Type['Component'], env_type: Type[SimEnvironment], reraise: bool, progress_queue: Optional['Queue[ProgressTuple]'], config_queue: 'Queue[Optional[ConfigDict]]', result_queue: 'Queue[ResultDict]', ): progress_manager = get_multi_progress_manager(progress_queue) while True: config = config_queue.get() if config is None: break result = simulate(config, top_type, env_type, reraise, progress_manager) result_queue.put(result) def _dump_dict(filename: str, dump_dict: Dict[str, Any]): if filename is not None: _, ext = os.path.splitext(filename) if ext not in ['.yaml', '.yml', '.json', '.py']: raise ValueError(f'Invalid extension: {ext}') with open(filename, 'w') as dump_file: if ext in ['.yaml', '.yml']: yaml.safe_dump(dump_dict, stream=dump_file) elif ext == '.json': json.dump(dump_dict, dump_file, sort_keys=True, indent=2) else: assert ext == '.py' pprint(dump_dict, stream=dump_file)