Linear Temporal Logic (LTL) Safety Constraint¶
Overview¶
LTL-style safety constraints via DFA monitoring and product constructions.
This module supports safety constraints expressed using a deterministic finite automaton (DFA) derived from an LTL (or LTL-like) specification.
High-level idea¶
Given a labelled MDP and a DFA over the same atomic propositions, we can build a product MDP whose states track both the base state and the current automaton state. If the DFA has an “unsafe” accepting set \(F\), then safety can be monitored by checking whether the automaton enters \(F\).
Let:
base MDP have \(n\) states and \(m\) actions,
DFA have \(k\) automaton states.
Then the product state space has size \(n \cdot k\).
Product transitions¶
If the base transition kernel is \(P(s'\mid s,a)\) and the DFA transition function is \(\delta(q, L(s))\), then the product transition is:
The helper functions in this module create either a dense transition tensor or a sparse successor representation for the product.
Safety cost¶
The monitor uses a cost function derived from the DFA. A common convention is:
and the episode is safe iff the total number of unsafe visits is zero.
Notes¶
The concrete cost construction is delegated to
masa.common.ltl.dfa_to_costfn().LTLSafetyEnvalso augments observations to include the automaton state, enabling product-state learning in model-free settings.
API Reference¶
- class masa.common.constraints.ltl_safety.LTLSafety(dfa: DFA)[source]¶
Bases:
ConstraintDFA-based safety monitor.
This monitor uses
masa.common.ltl.dfa_to_costfn()to obtain a stateful cost function that:tracks the current DFA state,
returns a scalar cost indicating safety violation.
A common convention is binary step cost:
\[c_t \in \{0, 1\}, \quad c_t = 1 \iff \text{DFA enters/indicates an unsafe accepting condition}.\]The episode is considered satisfied iff no unsafe event occurs:
\[\text{satisfied} \iff \sum_t \mathbf{1}[c_t \ge 0.5] = 0.\]- Parameters:
dfa – DFA describing the safety property.
- Variables:
cost_fn – Stateful cost object derived from the DFA (exposes DFA state).
safe – Boolean flag tracking whether any violation has occurred.
step_cost – Most recent step cost.
total_unsafe – Count of unsafe steps (as floats, per current code).
- update(labels: Iterable[str])[source]¶
Update the DFA-cost state and safety flags.
- Parameters:
labels – Iterable of atomic propositions true at the current step.
- episode_metric() Dict[str, float][source]¶
End-of-episode metrics.
- Returns:
"cum_unsafe": count of unsafe steps,"satisfied": 1.0 if safe else 0.0.
- Return type:
Dict containing
- step_metric() Dict[str, float][source]¶
Per-step metrics.
- Returns:
"cost": current step cost,"violation": 1.0 ifcost >= 0.5else 0.0.
- Return type:
Dict containing
- property constraint_type: str¶
"ltl_safety".- Type:
Stable identifier string
- class masa.common.constraints.ltl_safety.LTLSafetyEnv(env: Env, dfa: DFA = make_dummy_dfa(), **kw)[source]¶
Bases:
BaseConstraintEnvGymnasium wrapper that monitors LTL safety and augments observations.
This wrapper attaches
LTLSafetyto the environment and augments the observation space to include the current DFA state, enabling model-free learning over the product.Augmentation behavior depends on the original observation space:
gymnasium.spaces.Discrete: the observation becomes a single discrete index encoding both base state and automaton state.\[\text{obs}_\otimes = q\_\text{idx} \cdot n + s.\]gymnasium.spaces.Box(1-D only): appends a one-hot encoding of the automaton state to the vector.gymnasium.spaces.Dict: adds a new key"automaton"containing a one-hot vector.
The wrapper also writes
info["automaton_state"]each step/reset.- Parameters:
env – Base environment (must be a
LabelledEnv).dfa – DFA for safety monitoring. Defaults to a dummy DFA.
**kw – Extra keyword arguments forwarded to
BaseConstraintEnv.
- Raises:
ValueError – If the DFA reports a non-positive number of automaton states.
TypeError – If augmenting an unsupported observation space type.
TypeError – If augmenting a Box observation that is not 1-D.
Initialize the wrapper.
- Parameters:
env – Base environment. Must already be wrapped as a
LabelledEnvso that step/reset provide label sets ininfo["labels"].constraint – A constraint monitor implementing
Constraint.**kw – Unused extra keyword arguments (kept for wrapper compatibility).
- Raises:
TypeError – If
envis not aLabelledEnv.
- reset(*, seed: int | None = None, options: Dict[str, Any] | None = None)[source]¶
Reset environment and constraint state.
This calls
env.reset(...)and then resets and updates the constraint using the initial label set ininfo["labels"].- Parameters:
seed – Optional RNG seed forwarded to the base environment.
options – Optional reset options forwarded to the base environment.
- Returns:
A tuple
(obs, info)following the Gymnasium API.- Raises:
ValueError – If
info["labels"]is present but not a set/frozenset.
- step(action)[source]¶
Step environment and update constraint from labels.
- Parameters:
action – Action to pass to the underlying environment.
- Returns:
A 5-tuple
(obs, reward, terminated, truncated, info)following the Gymnasium API.- Raises:
ValueError – If
info["labels"]is present but not a set/frozenset.
Helpers¶
- ltl_safety.create_product_transition_matrix(n_actions: int, transition_matrix: ndarray, dfa: DFA, label_fn: Callable[[Any], Iterable[str]]) ndarray¶
Create the dense product transition tensor for (MDP × DFA).
Given a dense base transition tensor of shape
(n_states, n_states, n_actions), this constructs the corresponding dense product transition tensor of shape(n_states * n_aut, n_states * n_aut, n_actions).The DFA transition is computed from labels of the current base state
s(i.e., it applies \(q' = \delta(q, L(s))\)), which matches the product formulation used in the code:\[P_\otimes((s', q') \mid (s,q), a) = P(s'\mid s,a) \cdot \mathbf{1}\{ q' = \delta(q, L(s)) \}.\]- Parameters:
n_states – Number of base MDP states.
n_actions – Number of actions.
transition_matrix – Dense base transition tensor with shape
(n_states, n_states, n_actions), wheretransition_matrix[s, s_next, a] = P(s_next | s, a).dfa – Deterministic finite automaton.
label_fn – Labelling function
L(s) -> set[str].
- Returns:
Dense product transition tensor with shape
(n_states * n_aut, n_states * n_aut, n_actions).- Raises:
AssertionError – If the provided transition matrix does not have the expected shape.
- ltl_safety.create_product_successor_states_and_probabilities(n_actions: int, successor_states: Dict[State, List[State]], probabilities: Dict[Tuple[State, Action], np.ndarray], dfa: DFA, label_fn: LabelFn) Tuple[Dict[ProdState, List[ProdState]], Dict[Tuple[ProdState, Action], np.ndarray]]¶
Create a sparse product successor representation (MDP x DFA).
This constructs:
prod_successor_states: mappingprod_state -> list[prod_state_next]prod_probabilities: mapping(prod_state, action) -> probs
where probability vectors are copied from the base representation and the automaton transition is determined by the current base state labels.
Product state indexing¶
The code uses the encoding:
\[(q\_\text{idx}, s) \mapsto \text{prod} = q\_\text{idx} \cdot n\_\text{states} + s.\]- param n_states:
Number of base MDP states.
- param n_actions:
Number of actions.
- param successor_states:
Mapping
s -> [s_1, s_2, ...]listing successors ofs.- param probabilities:
Mapping
(s, a) -> pwherepis a 1-D array aligned withsuccessor_states[s]and sums to 1.- param dfa:
Deterministic finite automaton.
- param label_fn:
Labelling function
L(s) -> set[str].- returns:
A tuple
(prod_successor_states, prod_probabilities)representing the product dynamics.- raises AssertionError:
If
n_statesis inconsistent with the keys insuccessor_states.
- ltl_safety.create_product_safe_end_component(n_actions: int, sec: List[int], dfa: DFA, label_fn: Callable[[Any], Iterable[str]]) List[int]¶
Lift a base safe end component (SEC) into the product while avoiding accepting DFA states.
Given a base SEC (a subset of base states), this function returns the set of product states that correspond to those base states and do not transition into an accepting DFA state when reading the base labels.
- Parameters:
n_states – Number of base MDP states.
n_actions – Number of actions (unused here but kept for signature consistency).
sec – List of base states in the safe end component.
dfa – DFA whose accepting set corresponds to unsafe/terminal property violation.
label_fn – Labelling function
L(s) -> set[str].
- Returns:
List of product-state indices (ints) that form the lifted SEC in the product.
- ltl_safety.create_product_label_fn(dfa: DFA) Callable[[int], Set[str]]¶
Create a label function on product states indicating DFA acceptance.
The returned labelling function maps a product-state index to
{"accepting"}if the embedded DFA state is accepting, and to the empty set otherwise.- Parameters:
n_states – Number of base states used in product encoding.
dfa – DFA defining which automaton indices are accepting.
- Returns:
A callable
L_prod(prod_state) -> set[str]suitable for cost functions such as:cost = 1.0 if "accepting" in labels else 0.0