"""
the pubsub module contains a number of base classes for the publish/subscribe
pattern as used in pydsol. The package is based on the Java
implementation of DSOL (Distributed Simulation Object Library),
first documented in 2002. Information about the Java and Python DSOL
libraries can be found at https://simulation.tudelft.nl. Specific
information about the publish/subscribe implementation in Java can be
found in the djutils project (Delft Java Utilities) at https://djutils.org
Typically, implemented listener classes inherit from the abstract base class
EventListener (possibly through mixin), and classes that produce events
inherit from the abstract base class EventProducer (possibly using multiple
inheritance).
The whole idea of publish/subscribe is that you decouple the producer of
information and the possible receiver(s), which otherwise has to be
hardcoded, making information exchange between objects very inflexible.
With pub/sub, the producer will notify only subscribed objects of the
content in the event. The receivers can change dynamically over time, by
subscribing or unsubscribing for the receipt of certain EventTypes. We call
the process of producing an event by the publisher "firing" of an event
with the fire() method; receipt by the receiver is done in the notify()
method.
Note that Events are completely different from SimEvents. Events are used
in publish/subscribe; SimEvents contain information about code to be
executed at a later point in time.
"""
from abc import ABC, abstractmethod
import inspect
from typing import Type, Optional, Any, Union, Set, Dict
from pydsol.core.utils import get_module_logger
__all__ = [
"EventError",
"EventType",
"Event",
"TimedEvent",
"EventListener",
"EventProducer",
]
logger = get_module_logger('pubsub')
[docs]class EventError(Exception):
"""General Exception for working with Events and publish/subscribe"""
pass
[docs]class EventType:
"""
EventType is a strongly typed identifier for an event, which can contain
additional information about the event itself such as metadata. The
EventType is typically instantiated as a static variable in a class.
The implementation of the `EventType` takes care that there will not be
a problem with duplicate names for EventTypes. As part of the `EventType`,
the defining class is coded as well to make the `EventType` unique. So,
in the above example, the name "Producer.PRODUCTION" is stored internally
as the unique name for the `EventType`.
Example
-------
.. code-block:: python
class Producer
PRODUCTION_EVENT = EventType("PRODUCTION")
The event type can then be used anywhere in the code as
`Producer.PRODUCTION_EVENT`. Note that an `Event` and a `SimEvent`
are two completely different concepts: Events are for the
publish/subscribe pattern, SimEvents are to store deferred method
information.
"""
# set of the defined types to check for name clashes
# each item in the set has the form class_name.event_type_name
__defined_types: Set[str] = set()
[docs] def __init__(self, name: str, metadata: Dict[str, Type]=None):
"""
Instantiate a new EventType, usually in a static manner.
Example
-------
.. code-block:: python
class Producer
PRODUCTION_EVENT = EventType("PRODUCTION")
The event type can then be used anywhere in the code as
`Producer.PRODUCTION_EVENT`.
Parameters
----------
name : str
the human readable name of the event
metadata : dict[str, Type], optional
a dict with the metadata, defining the structure of the payload
in the event, as pairs of the name to be used in the dict and
the expected type of the data field. When metadata is None
or undefined, the payload of the event can be anything. When
metadata IS defined, the payload has to be a dict.
Raises
------
EventError
when name is not a str, or defining_class is not a type
EventError
when there was already an event defined with this name
in the defining_class
EventError
when the metadata does not consist of [str, Type] pairs
"""
if not isinstance(name, str):
raise EventError("name {name} is not a str")
self._defining_class: str = inspect.stack()[1][0].f_code.co_name
self._name = name
key = self._defining_class + "." + self._name;
if key in EventType.__defined_types:
raise EventError(f"EventType {name} already defined")
EventType.__defined_types.add(key)
if metadata is not None:
for key in metadata.keys():
if not isinstance(key, str):
raise EventError("metadata {metadata} key not a str")
if not isinstance(metadata[key], type):
raise EventError("metadata {metadata} value not a type")
self._metadata = metadata
@property
def name(self):
"""Return the human readable name of the event type."""
return self._name
@property
def defining_class(self):
"""Return the name of the defining class.
The defining class is the class in which the event type has been
defined"""
return self._defining_class
@property
def metadata(self) -> Optional[Dict[str, Type]]:
"""Return the metadata dict or None.
The metadata dict contains the expected structure of the payload
dict of the event in case it is defined. metadata can be None,
in which case the payload does not have to be a dict, and can have
any structure."""
return self._metadata
def __str__(self):
return f"EventType[{self._defining_class}.{self._name}]"
def __repr__(self):
return f"EventType[{self._defining_class}.{self._name} " \
+"metadata={self._metadata}]"
[docs]class Event:
"""
An event is an object with a payload that is to be sent between
a producer and zero or more listeners. In a sense, the Event is the
"envelope" of the content.
"""
[docs] def __init__(self, event_type: EventType, content, check:bool=True):
"""
Instantiate an event with content. Events are strongly typed
using a (usually static) instance of the EventType class, to
distinguish different types of events from each other.
Parameters
----------
event_type : EventType
a reference to a (usually static) event type for identification
content
the payload of the event, which can be of any type; common types
are list, dict, or simple types
check : bool, optional
whether to check the fields in the content in case the
event_type has metadata; the check whether the content is a
dict is always checked when there is metadata
Raises
------
EventError
if event_type is not an EventType
EventError
if the specified metadata and the content is not a dict
EventError
if the dict content is not consistent with the EventType metadata
"""
if not isinstance(event_type, EventType):
raise EventError("event_type is not an instance of EventType")
self._event_type = event_type
self._content = content
if event_type.metadata is not None:
if not isinstance(content, dict):
raise EventError("event_type defined metadata but content "
+"is not specified as a dict")
if check:
if len(event_type.metadata) != len(content):
raise EventError("metadata length but consistent with "
+"content length")
for key in event_type.metadata.keys():
if dict(content).get(key, None) == None:
raise EventError("metadata requited key '{key}' not "
+"found in content dict")
if not isinstance(dict(content).get(key),
event_type.metadata.get(key)):
raise EventError("metadata requited key '{key}' to "
+"be of type {event_type.metadata.get(key).__name__}"
+"but instead got {dict(content).get(key)}")
@property
def event_type(self) -> EventType:
"""Returns the (usually static) event type for identificatio.n"""
return self._event_type
@property
def content(self) -> Any:
"""Returns the payload of the event; can be of any type."""
return self._content
def __str__(self):
c = ""
try:
c = str(self._content)
except:
c = f"[cannot print {type(self._content).__name__}"
return f"Event[{self._event_type.name}: {c}]"
def __repr__(self):
c = ""
try:
c = str(self._content)
except:
c = f"[cannot print {type(self._content).__name__}"
return f"Event[{self._event_type.name}: {c}]"
[docs]class TimedEvent(Event):
"""
A TimedEvent is an event with a time identifier (int, float).
The time identifier is not specified as an extra element in the payload,
but rather as an extra attribute of the Event. The constructor explicitly
demands for the specification of a timestamp, typically the simulator time.
Like an event, it is an object with a payload that is to be sent between
a producer and zero or more listeners. In a sense, the TimedEvent is the
timestamped "envelope" of the content.
"""
[docs] def __init__(self, timestamp: Union[float, int], event_type: EventType,
content, check:bool=True):
"""
Instantiate a timed event with content.
TimedEvents are strongly typed using a (usually static) instance of
the EventType class, to distinguish different types of events from
each other. Furthermore, the timestamp indicates when the event was
fired. Typically the value of the timestamp is the simulator time.
Parameters
----------
timestamp : int or float
the timestamp of the event, typically the simulator time
event_type : EventType
a reference to a (usually static) event type for identification
content
the payload of the event, which can be of any type; common types
are list, dict, or simple types
check : bool, optional
whether to check the fields in the content in case the
event_type has metadata; the check whether the content is a
dict is always checked when there is metadata
Raises
------
EventError
if timestamp is not of type int or float
EventError
if event_type is not an EventType
EventError
if the EventType specified metadata and the content is not a dict
EventError
if the dict content is not consistent with the EventType metadata
"""
if not isinstance(timestamp, (int, float)):
raise EventError("timestamp is not an int or a float")
self._timestamp = timestamp
super().__init__(event_type, content, check)
@property
def timestamp(self) -> Union[int, float]:
"""Returns the timestamp of the event; typically the simulator time."""
return self._timestamp
def __str__(self):
c = ""
try:
c = str(self._content)
except:
c = f"[cannot print {type(self._content).__name__}"
return f"TimedEvent[t={self.timestamp}, {self._event_type.name}: {c}]"
def __repr__(self):
c = ""
try:
c = str(self._content)
except:
c = f"[cannot print {type(self._content).__name__}"
return f"TimedEvent[t={self.timestamp}, {self._event_type.name}: {c}]"
[docs]class EventListener(ABC):
"""
The EventListener abstract class defines the behavior of an event subscriber.
The EventListener is an interface for a class that needs to be able to
receive events from one or more EventProducers. In order to receive
events, a listener has to implement the notify() method. In the
notify() method, events can be tested for their EventType with if-elif
statements, and then the corresponding content can be acted upon.
Its most important method is notify(event) that is called from the
EventProducer (using the fier(event) method) to handle the Event.
"""
[docs] @abstractmethod
def notify(self, event: Event):
"""Handle an event received from an EventProducer"""
[docs]class EventProducer:
"""
EventProducer is an abstract class defining the producer behavior.
The EventProducer class acts as a superclass for classes that need
to fire events to an unknown and possibly varying number of subscribers
(also called listeners). The main methods that can be called on the
EventProducer are: add_listener and remove_listener. In addition, the
logic of the class that extends the base EventProducer class calls the
fire(event_type, content) method to notify the listener(s) (if any).
The most important private attribute of the EventProducer is
``_listeners: dict[EventType, list[EventListener]]``. This structure
Maps the EventType to a list of listeners for that EventType.
Note that this is a list to make behavior reproducible: we want
events to subscribers to be fired in the same order when replicating
the model run. The dictionary is ordered (unsorted) in Python 3.7+,
and the list is reproducible. A ``dict[EventType, set[EventListener]]``
would not be reproducible, since the set is unordered.
"""
[docs] def __init__(self):
"""Instantiate the EventProducer, and initialize the empty
listener data structure"""
self._listeners: dict[EventType, list[EventListener]] = dict()
[docs] def add_listener(self, event_type: EventType, listener: EventListener):
"""
Add an EventListener to this EventProducer for a given EventType.
If the listener already is registered for this EventType, this will
be ignored.
Parameters
----------
event_type : EventType
the EventType for which this listener subscribes
listener : EventListener
the subscriber to register for the provided Eventtype
Raises
------
EventError
if any of the arguments is of the wrong type
"""
if not isinstance(event_type, EventType):
raise EventError("event_type should be an EventType")
if not isinstance(listener, EventListener):
raise EventError("listener should be an EventListener")
if event_type not in self._listeners:
self._listeners[event_type] = []
if listener not in self._listeners[event_type]:
self._listeners[event_type].append(listener)
[docs] def remove_listener(self, event_type: EventType, listener: EventListener):
"""
Remove an EventListener of this EventProducer for a given EventType.
If the listener is not registered for this EventType, this will
be ignored.
Parameters
----------
event_type : EventType
the EventType for which this listener unsubscribes
listener : EventListener
the subscriber to remove for the provided Eventtype
Raises
------
EventError
if any of the arguments is of the wrong type
"""
if not isinstance(event_type, EventType):
raise EventError("event_type should be an EventType")
if not isinstance(listener, EventListener):
raise EventError("listener should be an EventListener")
if event_type in self._listeners:
if listener in self._listeners[event_type]:
self._listeners[event_type].remove(listener)
if len(list(self._listeners[event_type])) == 0:
del self._listeners[event_type]
[docs] def remove_all_listeners(self, event_type:EventType=None,
listener:EventListener=None):
"""
Remove an EventListener (if given) for a provided EventType (if given)
for this EventProducer. It is no problem if there are no matches.
There are four situations:
event_type == None and listener == None
all listeners for all event types are removed
event_type == None and listener is specified
the listener is removed for any event for which it was registered
event_type is specified and listener == None
all listeners are removed for the given event_type
event_type and listener are both specified
the listener for the given event type is removed, if it was
registered; in essence this is the same as remove_listener
Parameters
----------
event_type : EventType, optional
the EventType for which this listener unsubscribes
listener : EventListener, optional
the subscriber to remove for the provided EventType
Raises
------
EventError
if any of the arguments is of the wrong type
"""
if not (event_type == None or isinstance(event_type, EventType)):
raise EventError("event_type should be an EventType or None")
if not (listener == None or isinstance(listener, EventListener)):
raise EventError("listener should be an EventListener or None")
if event_type == None:
if listener == None:
self._listeners.clear()
else:
# a list() is used to avoid concurrent modification error
for et in list(self._listeners.keys()):
self.remove_listener(et, listener)
else:
if listener == None:
if event_type in self._listeners:
del self._listeners[event_type]
else:
self.remove_listener(event_type, listener)
[docs] def has_listeners(self) -> bool:
"""indicate whether this producer has any listeners or not"""
return len(self._listeners) > 0
[docs] def fire_event(self, event: Event):
"""
fire this event to the subscribed listeners for the EventType of
the event.
Parameters
----------
event : Event
the event to fire to the subscribed listeners
Raises
------
EventError
if the event is not of the right type
"""
if not isinstance(event, Event):
raise EventError("event {event} not of type Event")
logger.debug("fire %s to %s", event,
self._listeners.get(event.event_type))
if event.event_type not in self._listeners:
return
# a copy() is used to avoid concurrent modification error in case
# the notification would unsubscribe a listener to this event (!)
for listener in self._listeners.get(event.event_type).copy():
listener.notify(event)
[docs] def fire(self, event_type: EventType, content, check: bool=True):
"""
construct an event based on the arguments and fire this event
to the subscribed listeners for the event_type
Parameters
----------
event_type : EventType
a reference to a (usually static) event type for identification
content
the payload of the event, which can be of any type; common types
are list, dict, or simple types
check : bool, optional
whether to check the fields in the content in case the
event_type has metadata; the check whether the content is a
dict is always checked when there is metadata
Raises
------
EventError
if event_type is not an EventType
EventError
if the EventType specified metadata and the content is not a dict
EventError
if the dict content is not consistent with the EventType metadata
"""
event = Event(event_type, content, check)
self.fire_event(event)
[docs] def fire_timed_event(self, timed_event: TimedEvent):
"""
fire this timed_event to the subscribed listeners for the EventType
of the event.
Parameters
----------
event : TimedEvent
the timed_event to fire to the subscribed listeners
Raises
------
EventError
if the timed_event is not of the right type
"""
if not isinstance(timed_event, TimedEvent):
raise EventError("event {event} not of type TimedEvent")
logger.debug("fire %s to %s", timed_event,
self._listeners.get(timed_event.event_type))
if timed_event.event_type not in self._listeners:
return
# a copy() is used to avoid concurrent modification error in case
# the notification would unsubscribe a listener to this event (!)
for listener in self._listeners.get(timed_event.event_type).copy():
listener.notify(timed_event)
[docs] def fire_timed(self, time: Union[int, float], event_type: EventType,
content, check: bool=True):
"""
construct a timed event based on the arguments and fire this event
to the subscribed listeners for the event_type
Parameters
----------
timestamp : int or float
the timestamp of the event, typically the simulator time
event_type : EventType
a reference to a (usually static) event type for identification
content
the payload of the event, which can be of any type; common types
are list, dict, or simple types
check : bool, optional
whether to check the fields in the content in case the
event_type has metadata; the check whether the content is a
dict is always checked when there is metadata
Raises
------
EventError
if timestamp is not of type int or float
EventError
if event_type is not an EventType
EventError
if the EventType specified metadata and the content is not a dict
EventError
if the dict content is not consistent with the EventType metadata
"""
timed_event = TimedEvent(time, event_type, content, check)
self.fire_timed_event(timed_event)