"""
The simulator module defines different simulators that can be used to
advance time in a simulation and change the state of the model over time.
"""
from abc import abstractmethod
import enum
import logging
import sys
from threading import Thread
import threading
from time import sleep
import time
import traceback
from typing import TypeVar, Generic
from pydsol.core.eventlist import EventListInterface, EventListHeap
from pydsol.core.interfaces import ModelInterface, SimulatorInterface
from pydsol.core.interfaces import ReplicationInterface
from pydsol.core.pubsub import EventProducer
from pydsol.core.simevent import SimEventInterface, SimEvent
from pydsol.core.units import Duration
from pydsol.core.utils import DSOLError, get_module_logger
__all__ = [
"Simulator",
"DEVSSimulator",
"DEVSSimulatorFloat",
"DEVSSimulatorInt",
"DEVSSimulatorDuration",
"RunState",
"ReplicationState",
"SimulatorWorkerThread",
"ErrorStrategy",
]
logger = get_module_logger('simulator')
# The TypeVar for time is used for type hinting for simulator time types
TIME = TypeVar("TIME", float, int)
[docs]class RunState(enum.Enum):
"""
RunState indicates the precise state of the Simulator.
"""
NOT_INITIALIZED = 1
""""The simulator has been instantiated, but not yet initialized with
a Replication"""
INITIALIZED = 2
"""The replication has started, and the simulator has been initialized,
but it has not been started yet"""
STARTING = 3
"""The Simulator has been started, but the run() thread did not
start yet"""
STARTED = 4
"""The Simulator run() thread has started; the simulation is running"""
STOPPING = 5
"""The stopping of the simulator has been initiated, but the run()
thread is still running"""
STOPPED = 6
"""The Simulator run() thread has been stopped; the simulator is
not running"""
ENDED = 7
"""The replication has ended, and the simulator cannot be restarted"""
[docs]class ReplicationState(enum.Enum):
"""
ReplicationState indicates the precise state of the replication that is
being executed by the simulator.
"""
NOT_INITIALIZED = 1
""""The simulator has been instantiated, but not yet initialized
with a Replication"""
INITIALIZED = 2
"""The simulator has been initialized with the replication, but it has
not been started yet, and the the START_REPLICATION_EVENT has not yet
been fired."""
STARTED = 3
"""The execution of the replication has started, and the
START_REPLICATION_EVENT has been fired"""
ENDING = 4
"""The replication has ended, but the run() thread is still running;
the END_REPLICATION_EVENT has not yet been fired"""
ENDED = 5
"""The replication has ended, and the simulator cannot be restarted;
the END_REPLICATION_EVENT has been fired."""
[docs]class ErrorStrategy():
"""
ErrorStrategy indicates what to do when there is an error in the
execution of the simulation. In order to set the error handling,
the Simulator's error_strategy can be set and changed, even during
the execution of the simulation run. The log level can be overridden
(and even be set to NONE).
"""
LOG_AND_CONTINUE = 1
"""
Send the error to the logger as WARNING. Both RunState and
ReplicationState remain in the RUNNING state. The Simulator.run()
continues as if the error did not occur.
"""
WARN_AND_CONTINUE = 2
"""
Send the error to logger as ERROR and print the exception on stderr.
Both RunState and ReplicationState remain in the RUNNING state.
The Simulator.run() continues as if the error did not occur.
"""
WARN_AND_PAUSE = 3
"""
Send the error to logger as ERROR and print the exception on stderr
The RunState goes to STOPPING, leading to the stop of the loop in the
Simulator.run() method and a subsequent STOPPED state in the
SimulatorWorkerThread.run() method. The SimulatorWorkerThread will go
into a Thread.wait(), to wait for start (or cleanup).
"""
WARN_AND_END = 4
"""
Send the error to logger as CRITICAL and print the exception on stderr
The Simulator.cleanup() method is called to ensure the
SimulatorWorkerThread.run() method completely ends and can be garbage
collected. If there is a UI thread, it will keep running.
"""
WARN_AND_EXIT = 5
"""
Send the error to logger as FATAL and print the exception on stderr
The Simulator.cleanup() method is called to ensure the stop of the
run() in SimulatorWorkerThread; the System.exit() method is called to
end the complete program.
"""
LOG_LEVELS = {LOG_AND_CONTINUE: logging.WARNING,
WARN_AND_CONTINUE: logging.ERROR,
WARN_AND_PAUSE: logging.ERROR,
WARN_AND_END: logging.CRITICAL,
WARN_AND_EXIT: logging.FATAL}
"""Dictionary for default log level per error handling strategy."""
[docs]class SimulatorWorkerThread(Thread):
"""
The Simulator uses a worker thread to execute the simulation. The
reason to use a worker thread is to not block execution of user interface
activities while the simulation is running. In an interactive setting, a
user can hereby stop() the simulator and inspect the state of the
simulation model, and continue the simulation by calling start().
"""
def __init__(self, name: str, job: 'Simulator'):
super().__init__(name=name)
self._job: 'Simulator' = job
self.daemon = False
self._running: bool = False
self._finalized: bool = False
self.__wakeup_flag = threading.Event()
self.start()
[docs] def cleanup(self):
self._running = False
self._finalized = True
self.wakeup() # just to be sure...
[docs] def is_running(self) -> bool:
return self._running
[docs] def wakeup(self):
self.__wakeup_flag.set()
[docs] def is_waiting(self):
return len(self.__wakeup_flag._cond._waiters) > 0
[docs] def is_finalized(self):
return self._finalized
[docs] def run(self):
while not self._finalized:
# wait till wakeup, e.g., to start the simulation
self.__wakeup_flag.wait()
self._running = True
if not self._finalized:
if self._job._replication_state != ReplicationState.ENDING:
try:
self._job.fire_timed(self._job.simulator_time,
Simulator.START_EVENT, None)
self._job._run_state = RunState.STARTED
self._job._run()
self._job.fire_timed(self._job.simulator_time,
Simulator.STOP_EVENT, None)
self._job._run_state = RunState.STOPPED
except Exception as e:
print("Simulator run interrupted by exception:")
print(str(e))
traceback.print_exc()
if self._job._replication_state == ReplicationState.ENDING:
self._job._replication_state = ReplicationState.ENDED
self._job._run_state = RunState.ENDED
self._job.fire_timed(self._job.simulator_time,
ReplicationInterface.END_REPLICATION_EVENT, None)
self._finalized = True
self.__wakeup_flag.clear()
self._running = False
# end while
# end run()
[docs]class Simulator(EventProducer, SimulatorInterface, Generic[TIME]):
"""
The Simulator class
"""
def __init__(self, name: str, time_type: type, initial_time: TIME):
EventProducer.__init__(self)
"""
"""
if not isinstance(name, str):
raise DSOLError("simulator id should be a str")
if not (issubclass(time_type, int) or issubclass(time_type, float)):
raise DSOLError("simulator time_type should be float or int")
if not isinstance(initial_time, time_type):
raise DSOLError(f"simulator initial_time not of type {time_type}")
self._name = name
self._time_type: type = time_type
self._simulator_time: TIME = initial_time
self._run_until_time: TIME = None
self._run_until_including: bool = True
self._replication: ReplicationInterface = None
self._model: ModelInterface = None
self._run_state: RunState = RunState.NOT_INITIALIZED
self._replication_state = ReplicationState.NOT_INITIALIZED
self.__worker: Thread = None
self._initial_time = initial_time
self._initial_methods: list[SimEventInterface] = []
self._error_strategy = ErrorStrategy.WARN_AND_PAUSE
self._error_log_level = logging.ERROR
self._runflag: bool = False
@property
def name(self) -> str:
"""return the name of the simulator"""
return self._name
@property
def time_type(self) -> type:
"""return the time type of the simulator"""
return self._time_type
@property
def simulator_time(self) -> TIME:
"""return the current absolute time of the simulator"""
return self._simulator_time
@property
def initial_time(self) -> TIME:
"""return the first possible time of the used time type"""
return self._initial_time
@property
def replication(self) -> ReplicationInterface:
"""return the replication with which the simulator has been
initialized, or None when initialize has not yet been called"""
return self._replication
@property
def model(self) -> ModelInterface:
"""return the model that is being simulated, or None when
initialize for a model has not yet been called"""
return self._model
[docs] def initialize(self, model: ModelInterface, replication: ReplicationInterface):
"""initialize the simulator with a replication for a model"""
if not isinstance(model, ModelInterface):
raise DSOLError(f"model {model} not valid")
if not hasattr(model, '_simulator'):
raise DSOLError(f"model {model} does not have a simulator. " +
"Did you call super.__init__(...) in the model constructor?")
if not isinstance(replication, ReplicationInterface):
raise DSOLError(f"replication {replication} not valid")
if self.is_starting_or_running():
raise DSOLError("cannot initialize a running simulation")
if self.__worker is not None:
self.cleanup()
self.__worker = SimulatorWorkerThread(self.name, self)
self._replication = replication
self._model = model
self._simulator_time = replication.start_sim_time
model.construct_model()
self._run_state = RunState.INITIALIZED
self._replication_state = ReplicationState.INITIALIZED
for initial_event in self._initial_methods:
initial_event.execute()
# wait till the worker thread is waiting or ready (end replication)
count: int = 0
while (not self.__worker.is_waiting()
and not self.__worker.is_finalized()
and count < 1000):
sleep(0.001)
count += 1
[docs] def add_initial_method(self, target, method: str, **kwargs):
"""Add a method call that has to be performed at the end if
initialize, and before the model starts. This can, for instance,
be used to schedule the execution of simulation events before
initialize has been called, and solved the problem that,
for discrete event simulators, the scheduleEvent(...) methods
cannot be called before initialize()."""
self._initial_methods.append(SimEvent(self.initial_time,
target, method, **kwargs))
[docs] def cleanup(self):
"""clean up after a replication has finished, and prepare for the
next replication to run"""
if self.has_listeners():
self.remove_all_listeners()
if self.__worker is not None:
self._stop_impl()
self.__worker.cleanup()
self.__worker = None
self._run_state = RunState.NOT_INITIALIZED
self._replication_state = ReplicationState.NOT_INITIALIZED
def _start_impl(self):
"""Implementation of the start method. Checks preconditions for
running and fires the right events."""
if self.is_starting_or_running():
raise DSOLError("cannot start a running simulator")
if self._replication == None:
raise DSOLError("no replication details")
if not self.is_initialized():
raise DSOLError("cannot start an uninitialized simulator")
if not (self._replication_state == ReplicationState.INITIALIZED \
or self.replication_state == ReplicationState.STARTED):
raise DSOLError("replication state not INITIALIZED or STARTED")
if self._simulator_time >= self._replication.end_sim_time:
raise DSOLError("cannot start: simulator_time > run length")
self._run_state = RunState.STARTING
if self._replication_state == ReplicationState.INITIALIZED:
self.fire_timed(self._simulator_time,
ReplicationInterface.START_REPLICATION_EVENT, None)
self._replication_state = ReplicationState.STARTED
self.fire(Simulator.STARTING_EVENT, None)
# wake up the run() method of the worker thread to start Simulator.run
self.__worker.wakeup()
# wait maximally one second
msec: int = int(time.time() * 1000)
while not self._runflag and int(time.time() * 1000) - msec < 1000:
sleep(0.001)
self._runflag = False
[docs] def start(self):
"""Starts the simulator, and fire a START_EVENT that the simulator
was started. Note that when the simulator was already started an
exception will be thrown, and no event will be fired. The start
uses the RunUntil property with a value of the end time of the
replication when starting the simulator."""
if self._replication == None:
raise DSOLError("no replication details")
self._run_until_time = self._replication.end_sim_time
self._run_until_including = True
self._start_impl()
@abstractmethod
def _step_impl(self):
"""The implementation body of the step() method. The stepImpl()
method should fire the TIME_CHANGED_EVENT before the execution of
the simulation event, or before executing the integration of the
differential equation for the next timestep. So the time is changed
first to match the logic carried out for that time, and then the
action for that time is carried out. This is INDEPENDENT of the
fact whether the time changes or not. The TIME_CHANGED_EVENT is
always fired."""
[docs] def step(self):
"""Steps the simulator, and fire a STEP_EVENT to indicate the
simulator made a step. Note that when the simulator is running
an exception will be thrown, and no event will be fired."""
if self.is_starting_or_running():
raise DSOLError("cannot start a running simulator")
if not self.is_initialized():
raise DSOLError("cannot start an uninitialized simulator")
if (self._replication_state != ReplicationState.INITIALIZED \
and self.replication_state != ReplicationState.STARTED):
raise DSOLError("replication state not INITIALIZED or STARTED")
if self._simulator_time >= self._replication.end_sim_time:
raise DSOLError("cannot start: simulator_time > run length")
try:
if self._replication_state == ReplicationState.INITIALIZED:
self.fire_timed(self._simulator_time,
ReplicationInterface.START_REPLICATION_EVENT, None)
self._replication_state = ReplicationState.STARTED
self._run_state = RunState.STARTED
self.fire_timed(self._simulator_time,
Simulator.START_EVENT, None)
self._step_impl()
except Exception as e:
print("Simulator step got exception: " + e)
finally:
self.fire_timed(self._simulator_time,
Simulator.STOP_EVENT, None)
self._run_state = RunState.STOPPED
def _stop_impl(self):
"""Implementation of the stop behavior."""
self._run_state = RunState.STOPPING
# wait till the worker thread is waiting or ready (end replication)
msec: int = int(time.time() * 1000)
while (not self.__worker.is_waiting()
and not self.__worker.is_finalized()
and int(time.time() * 1000) - msec < 1000):
sleep(0.001)
[docs] def stop(self):
"""Stops the simulator, and fire a STOP_EVENT that the simulator
was stopped. Note that when the simulator was already stopped an
exception will be thrown, and no event will be fired."""
if self.is_stopping_or_stopped():
raise DSOLError("cannot stop an already stopped simulator")
self.fire(Simulator.STOPPING_EVENT, None)
self._stop_impl()
[docs] def run_up_to(self, stop_time: TIME):
"""Runs the simulator up to a certain time; any events at that time,
or the solving of the differential equation at that timestep,
will not yet be executed."""
self._run_until_time = stop_time
self._run_until_including = False
self._start_impl()
[docs] def run_up_to_including(self, stop_time: TIME):
"""Runs the simulator up to a certain time; all events at that time,
or the solving of the differential equation at that timestep,
will be executed."""
self._run_until_time = stop_time
self._run_until_including = True
self._start_impl()
[docs] def warmup(self):
self.fire_timed(self.simulator_time,
ReplicationInterface.WARMUP_EVENT, None)
@property
def run_state(self) -> RunState:
"""return the run state of the simulator"""
return self._run_state
[docs] def is_initialized(self) -> bool:
"""Return whether the simulator has been initialized with a
replication for a model."""
return self.run_state != RunState.NOT_INITIALIZED
[docs] def is_starting_or_running(self) -> bool:
"""Return whether the simulator is starting or has started."""
return (self.run_state == RunState.STARTING or \
self.run_state == RunState.STARTED)
[docs] def is_stopping_or_stopped(self) -> bool:
"""Return whether the simulator is stopping or has been stopped.
This method also returns True when the simulator has not yet been
initialized, or when the model has not yet started, or when the
model run has ended."""
return not self.is_starting_or_running()
@property
def replication_state(self) -> ReplicationState:
"""return the replication state"""
return self._replication_state
[docs] def end_replication(self):
self._replication_state = ReplicationState.ENDING
self.__worker.wakeup() # just to be sure
if self._simulator_time < self._replication.end_sim_time:
print("warning: end_replication called with simtime < runlength")
self._simulator_time = self._replication.end_sim_time
[docs] def set_error_strategy(self, error_strategy: ErrorStrategy,
log_level: int=-1):
"""
Set a new error handling strategy for the simulator, and possibly
override the default log level belonging to the error strategy
(e.g. with logging.NONE to suppress logging altogether).
"""
if not error_strategy in ErrorStrategy.LOG_LEVELS:
raise ValueError("None-existent error strategy for simulator")
self._error_strategy = error_strategy
if log_level >= 0:
self._error_log_level = log_level
return
self._error_log_level = ErrorStrategy.LOG_LEVELS[error_strategy]
@abstractmethod
def _run(self):
"""
The run method defines the actual time step mechanism of the
simulator. The implementation of this method depends on the
formalism. Where discrete event formalisms loop over an event list,
continuous simulators take predefined time steps.
Make sure that:
- self._runflag is set to True at the start of the run() method.
- SimulatorInterface.TIME_CHANGED_EVENT is fired when the time
of the simulator changes.
- the warmup() method is called when the warmup period has expired
(through an event or based on simulation time).
- the endReplication() method is called when the replication
has ended.
- the simulator runs until the runUntil time, which is also set
by the start() method.
"""
[docs]class DEVSSimulator(Simulator[TIME], Generic[TIME]):
def __init__(self, name: str, time_type: type, initial_time: TIME):
super().__init__(name, time_type, initial_time)
self._eventlist: EventListInterface = EventListHeap()
[docs] def initialize(self, model:ModelInterface, replication:ReplicationInterface):
# this check HAS to be done before clearing the eventlist
if self.is_starting_or_running():
raise DSOLError("cannot initialize a running simulation")
self._eventlist.clear()
super().initialize(model, replication)
# schedule warmup BEFORE events at warmup time
self.schedule_event_abs(self.replication.warmup_sim_time,
self, "warmup", priority=SimEventInterface.MAX_PRIORITY)
[docs] def eventlist(self) -> EventListInterface:
"""Return the event list used by this simulator."""
return self._eventlist
[docs] def schedule_event(self, event: SimEventInterface) -> SimEventInterface:
"""schedule the provided event on the event list"""
if event.time < self._simulator_time:
raise DSOLError("cannot schedule event in the past")
self._eventlist.add(event)
return event
[docs] def schedule_event_now(self, target, method: str,
priority: int=SimEventInterface.NORMAL_PRIORITY,
**kwargs) -> SimEventInterface:
"""schedule a method call at the current simulator time"""
return self.schedule_event(SimEvent(self._simulator_time, target,
method, priority, **kwargs))
[docs] def schedule_event_rel(self, delay, target, method: str,
priority: int=SimEventInterface.NORMAL_PRIORITY,
**kwargs) -> SimEventInterface:
"""schedule a methodCall at a relative duration. The execution
time is thus simulator.simulator_time + delay."""
if delay < 0:
raise DSOLError("cannot schedule event in the past")
return self.schedule_event(SimEvent(self._simulator_time + delay,
target, method, priority, **kwargs))
[docs] def schedule_event_abs(self, time, target, method: str,
priority: int=SimEventInterface.NORMAL_PRIORITY,
**kwargs) -> SimEventInterface:
"""schedule a methodCall at a relative duration. The execution
time is thus simulator.simulator_time + delay."""
if time < self._simulator_time:
raise DSOLError("cannot schedule event in the past")
return self.schedule_event(SimEvent(time,
target, method, priority, **kwargs))
[docs] def cancel_event(self, event: SimEventInterface):
"""remove the provided event from the event list"""
self._eventlist.remove(event)
def _step_impl(self):
"""The implementation body of the step() method. The stepImpl()
method fires the TIME_CHANGED_EVENT before the execution of the
simulation event. So the time is changed first to match the logic
carried out for that time, and then the action for that time is
carried out. This is INDEPENDENT of the fact whether the time changes
or not. The TIME_CHANGED_EVENT is always fired."""
if not self._eventlist.is_empty():
event: SimEventInterface = self._eventlist.pop_first()
self.fire_timed(event.time, Simulator.TIME_CHANGED_EVENT,
event.time)
self._simulator_time = event.time
event.execute()
[docs] def end_replication(self):
super().end_replication()
self.eventlist().clear()
def _run(self):
self._runflag = True
while not self.is_stopping_or_stopped():
# check if we are done
if self.eventlist().is_empty():
t = self._run_until_time
else:
t = self.eventlist().peek_first().time
if (t > self._run_until_time or (t == self._run_until_time \
and not self._run_until_including)
or self.eventlist().is_empty()):
self._simulator_time = self._run_until_time
self._replication_state = ReplicationState.ENDING
self._run_state = RunState.STOPPING
return;
# get the first event
event: SimEventInterface = self.eventlist().pop_first()
if not isinstance(event, SimEventInterface):
raise DSOLError(f"Invalid SimEvent {event} from eventlist")
if (event.time != self.simulator_time):
self.fire_timed(event.time, Simulator.TIME_CHANGED_EVENT,
event.time)
self._simulator_time = event.time
try:
event.execute()
except Exception as e:
s = "Exception during simulation at t={self.simulator_time}: "
logger.log(self._error_log_level, s + str(e))
if self._error_strategy > ErrorStrategy.LOG_AND_CONTINUE:
print(s + str(e))
traceback.print_exc()
if self._error_strategy == ErrorStrategy.WARN_AND_PAUSE:
self._run_state = RunState.STOPPING
elif self._error_strategy == ErrorStrategy.WARN_AND_END:
self.cleanup()
elif self._error_strategy == ErrorStrategy.WARN_AND_EXIT:
sys.exit()
# end except
# end while
# end run()
[docs]class DEVSSimulatorFloat(DEVSSimulator[float]):
def __init__(self, name:str):
super().__init__(name, float, 0.0)
[docs]class DEVSSimulatorInt(DEVSSimulator[int]):
def __init__(self, name:str):
super().__init__(name, int, 0)
[docs]class DEVSSimulatorDuration(DEVSSimulator[Duration]):
def __init__(self, name: str, display_unit: str='s'):
super().__init__(name, Duration, Duration(0.0, display_unit))
self._display_unit = display_unit