Skip to content

mdp

This module contains the Markov Decision Process, value iteration, Q learning and policy gradient.

GridMDP

Bases: MDP

A Markov decision process on a grid.

Source code in src/behavior_generation_lecture_python/mdp/mdp.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
class GridMDP(MDP):
    """A Markov decision process on a grid."""

    def __init__(
        self,
        grid: List[List[Union[float, None]]],
        initial_state: GridState,
        terminal_states: Set[GridState],
        transition_probabilities_per_action: Dict[
            GridState, List[Tuple[float, GridState]]
        ],
        restrict_actions_to_available_states: Optional[bool] = False,
    ) -> None:
        """A Markov decision process on a grid.

        Args:
            grid: List of lists, containing the rewards of the grid
                states or None.
            initial_state: Initial state in the grid.
            terminal_states: Set of terminal states in the grid.
            transition_probabilities_per_action: Dictionary of
                transition probabilities per action, mapping from action
                to list of tuples (probability, next state).
            restrict_actions_to_available_states: Whether to restrict
                actions to those that result in valid next states.
        """
        states = set()
        reward = {}
        grid = grid.copy()
        grid.reverse()  # y-axis pointing upwards
        rows = len(grid)
        cols = len(grid[0])
        self.grid = grid
        for x in range(cols):
            for y in range(rows):
                if grid[y][x] is not None:
                    states.add((x, y))
                    reward_xy = grid[y][x]
                    assert reward_xy is not None
                    reward[(x, y)] = reward_xy

        transition_probabilities = {}
        for state in states:
            for action in transition_probabilities_per_action.keys():
                transition_probability_list = self._generate_transition_probability_list(
                    state=state,
                    action=action,
                    restrict_actions_to_available_states=restrict_actions_to_available_states,
                    states=states,
                    transition_probabilities_per_action=transition_probabilities_per_action,
                    next_state_fn=self._next_state_deterministic,
                )
                if transition_probability_list:
                    transition_probabilities[(state, action)] = (
                        transition_probability_list
                    )

        super().__init__(
            states=states,
            actions=set(transition_probabilities_per_action.keys()),
            initial_state=initial_state,
            terminal_states=terminal_states,
            transition_probabilities=transition_probabilities,
            reward=reward,
        )

    @staticmethod
    def _generate_transition_probability_list(
        state,
        action,
        restrict_actions_to_available_states,
        states,
        transition_probabilities_per_action,
        next_state_fn,
    ):
        """Generate the transition probability list of the grid."""
        transition_probability_list = []
        none_in_next_states = False
        for (
            probability,
            deterministic_action,
        ) in transition_probabilities_per_action[action]:
            next_state = next_state_fn(
                state,
                deterministic_action,
                states,
                output_none_if_non_existing_state=restrict_actions_to_available_states,
            )
            if next_state is None:
                none_in_next_states = True
                break
            transition_probability_list.append((probability, next_state))

        if not none_in_next_states:
            return transition_probability_list

        return []

    @staticmethod
    def _next_state_deterministic(
        state, action, states, output_none_if_non_existing_state=False
    ):
        """Output the next state given the action in a deterministic setting.
        Output None if next state not existing in case output_none_if_non_existing_state is True.
        """
        next_state_candidate = tuple(np.array(state) + np.array(action))
        if next_state_candidate in states:
            return next_state_candidate
        if output_none_if_non_existing_state:
            return None
        return state

__init__(grid, initial_state, terminal_states, transition_probabilities_per_action, restrict_actions_to_available_states=False)

A Markov decision process on a grid.

Parameters:

Name Type Description Default
grid List[List[Union[float, None]]]

List of lists, containing the rewards of the grid states or None.

required
initial_state GridState

Initial state in the grid.

required
terminal_states Set[GridState]

Set of terminal states in the grid.

required
transition_probabilities_per_action Dict[GridState, List[Tuple[float, GridState]]]

Dictionary of transition probabilities per action, mapping from action to list of tuples (probability, next state).

required
restrict_actions_to_available_states Optional[bool]

Whether to restrict actions to those that result in valid next states.

False
Source code in src/behavior_generation_lecture_python/mdp/mdp.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
def __init__(
    self,
    grid: List[List[Union[float, None]]],
    initial_state: GridState,
    terminal_states: Set[GridState],
    transition_probabilities_per_action: Dict[
        GridState, List[Tuple[float, GridState]]
    ],
    restrict_actions_to_available_states: Optional[bool] = False,
) -> None:
    """A Markov decision process on a grid.

    Args:
        grid: List of lists, containing the rewards of the grid
            states or None.
        initial_state: Initial state in the grid.
        terminal_states: Set of terminal states in the grid.
        transition_probabilities_per_action: Dictionary of
            transition probabilities per action, mapping from action
            to list of tuples (probability, next state).
        restrict_actions_to_available_states: Whether to restrict
            actions to those that result in valid next states.
    """
    states = set()
    reward = {}
    grid = grid.copy()
    grid.reverse()  # y-axis pointing upwards
    rows = len(grid)
    cols = len(grid[0])
    self.grid = grid
    for x in range(cols):
        for y in range(rows):
            if grid[y][x] is not None:
                states.add((x, y))
                reward_xy = grid[y][x]
                assert reward_xy is not None
                reward[(x, y)] = reward_xy

    transition_probabilities = {}
    for state in states:
        for action in transition_probabilities_per_action.keys():
            transition_probability_list = self._generate_transition_probability_list(
                state=state,
                action=action,
                restrict_actions_to_available_states=restrict_actions_to_available_states,
                states=states,
                transition_probabilities_per_action=transition_probabilities_per_action,
                next_state_fn=self._next_state_deterministic,
            )
            if transition_probability_list:
                transition_probabilities[(state, action)] = (
                    transition_probability_list
                )

    super().__init__(
        states=states,
        actions=set(transition_probabilities_per_action.keys()),
        initial_state=initial_state,
        terminal_states=terminal_states,
        transition_probabilities=transition_probabilities,
        reward=reward,
    )

MDP

A Markov decision process.

Source code in src/behavior_generation_lecture_python/mdp/mdp.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
class MDP:
    """A Markov decision process."""

    def __init__(
        self,
        states: Set[Any],
        actions: Set[Any],
        initial_state: Any,
        terminal_states: Set[Any],
        transition_probabilities: Dict[Tuple[Any, Any], List[Tuple[float, Any]]],
        reward: Dict[Any, float],
    ) -> None:
        """A Markov decision process.

        Args:
            states: Set of states.
            actions: Set of actions.
            initial_state: Initial state.
            terminal_states: Set of terminal states.
            transition_probabilities: Dictionary of transition
                probabilities, mapping from tuple (state, action) to
                list of tuples (probability, next state).
            reward: Dictionary of rewards per state, mapping from state
                to reward.
        """
        self.states = states

        self.actions = actions

        assert initial_state in self.states
        self.initial_state = initial_state

        for terminal_state in terminal_states:
            assert (
                terminal_state in self.states
            ), f"The terminal state {terminal_state} is not in states {states}"
        self.terminal_states = terminal_states

        for state in self.states:
            for action in self.actions:
                if (state, action) not in transition_probabilities:
                    continue
                total_prob = 0.0
                for prob, next_state in transition_probabilities[(state, action)]:
                    assert (
                        next_state in self.states
                    ), f"next_state={next_state} is not in states={states}"
                    total_prob += prob
                assert math.isclose(total_prob, 1), "Probabilities must add to one"
        self.transition_probabilities = transition_probabilities

        assert set(reward.keys()) == set(
            self.states
        ), "Rewards must be defined for every state in the set of states"
        for state in self.states:
            assert reward[state] is not None
        self.reward = reward

    def get_states(self) -> Set[Any]:
        """Get the set of states."""
        return self.states

    def get_actions(self, state) -> Set[Any]:
        """Get the set of actions available in a certain state, returns [None] for terminal states."""
        if self.is_terminal(state):
            return {None}
        return {a for a in self.actions if (state, a) in self.transition_probabilities}

    def get_reward(self, state) -> float:
        """Get the reward for a specific state."""
        return self.reward[state]

    def is_terminal(self, state) -> bool:
        """Return whether a state is a terminal state."""
        return state in self.terminal_states

    def get_transitions_with_probabilities(
        self, state, action
    ) -> List[Tuple[float, Any]]:
        """Get the list of transitions with their probability, returns [(0.0, state)] for terminal states."""
        if action is None or self.is_terminal(state):
            return [(0.0, state)]
        return self.transition_probabilities[(state, action)]

    def sample_next_state(self, state, action) -> Any:
        """Randomly sample the next state given the current state and taken action."""
        if self.is_terminal(state):
            raise ValueError("No next state for terminal states.")
        if action is None:
            raise ValueError("Action must not be None.")
        prob_per_transition = self.get_transitions_with_probabilities(state, action)
        num_actions = len(prob_per_transition)
        choice = np.random.choice(
            num_actions, p=[ppa[0] for ppa in prob_per_transition]
        )
        return prob_per_transition[choice][1]

    def execute_action(self, state, action) -> Tuple[Any, float, bool]:
        """Executes the action in the current state and returns the new state, obtained reward and terminal flag."""
        new_state = self.sample_next_state(state=state, action=action)
        reward = self.get_reward(state=new_state)
        terminal = self.is_terminal(state=new_state)
        return new_state, reward, terminal

__init__(states, actions, initial_state, terminal_states, transition_probabilities, reward)

A Markov decision process.

Parameters:

Name Type Description Default
states Set[Any]

Set of states.

required
actions Set[Any]

Set of actions.

required
initial_state Any

Initial state.

required
terminal_states Set[Any]

Set of terminal states.

required
transition_probabilities Dict[Tuple[Any, Any], List[Tuple[float, Any]]]

Dictionary of transition probabilities, mapping from tuple (state, action) to list of tuples (probability, next state).

required
reward Dict[Any, float]

Dictionary of rewards per state, mapping from state to reward.

required
Source code in src/behavior_generation_lecture_python/mdp/mdp.py
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def __init__(
    self,
    states: Set[Any],
    actions: Set[Any],
    initial_state: Any,
    terminal_states: Set[Any],
    transition_probabilities: Dict[Tuple[Any, Any], List[Tuple[float, Any]]],
    reward: Dict[Any, float],
) -> None:
    """A Markov decision process.

    Args:
        states: Set of states.
        actions: Set of actions.
        initial_state: Initial state.
        terminal_states: Set of terminal states.
        transition_probabilities: Dictionary of transition
            probabilities, mapping from tuple (state, action) to
            list of tuples (probability, next state).
        reward: Dictionary of rewards per state, mapping from state
            to reward.
    """
    self.states = states

    self.actions = actions

    assert initial_state in self.states
    self.initial_state = initial_state

    for terminal_state in terminal_states:
        assert (
            terminal_state in self.states
        ), f"The terminal state {terminal_state} is not in states {states}"
    self.terminal_states = terminal_states

    for state in self.states:
        for action in self.actions:
            if (state, action) not in transition_probabilities:
                continue
            total_prob = 0.0
            for prob, next_state in transition_probabilities[(state, action)]:
                assert (
                    next_state in self.states
                ), f"next_state={next_state} is not in states={states}"
                total_prob += prob
            assert math.isclose(total_prob, 1), "Probabilities must add to one"
    self.transition_probabilities = transition_probabilities

    assert set(reward.keys()) == set(
        self.states
    ), "Rewards must be defined for every state in the set of states"
    for state in self.states:
        assert reward[state] is not None
    self.reward = reward

execute_action(state, action)

Executes the action in the current state and returns the new state, obtained reward and terminal flag.

Source code in src/behavior_generation_lecture_python/mdp/mdp.py
160
161
162
163
164
165
def execute_action(self, state, action) -> Tuple[Any, float, bool]:
    """Executes the action in the current state and returns the new state, obtained reward and terminal flag."""
    new_state = self.sample_next_state(state=state, action=action)
    reward = self.get_reward(state=new_state)
    terminal = self.is_terminal(state=new_state)
    return new_state, reward, terminal

get_actions(state)

Get the set of actions available in a certain state, returns [None] for terminal states.

Source code in src/behavior_generation_lecture_python/mdp/mdp.py
125
126
127
128
129
def get_actions(self, state) -> Set[Any]:
    """Get the set of actions available in a certain state, returns [None] for terminal states."""
    if self.is_terminal(state):
        return {None}
    return {a for a in self.actions if (state, a) in self.transition_probabilities}

get_reward(state)

Get the reward for a specific state.

Source code in src/behavior_generation_lecture_python/mdp/mdp.py
131
132
133
def get_reward(self, state) -> float:
    """Get the reward for a specific state."""
    return self.reward[state]

get_states()

Get the set of states.

Source code in src/behavior_generation_lecture_python/mdp/mdp.py
121
122
123
def get_states(self) -> Set[Any]:
    """Get the set of states."""
    return self.states

get_transitions_with_probabilities(state, action)

Get the list of transitions with their probability, returns [(0.0, state)] for terminal states.

Source code in src/behavior_generation_lecture_python/mdp/mdp.py
139
140
141
142
143
144
145
def get_transitions_with_probabilities(
    self, state, action
) -> List[Tuple[float, Any]]:
    """Get the list of transitions with their probability, returns [(0.0, state)] for terminal states."""
    if action is None or self.is_terminal(state):
        return [(0.0, state)]
    return self.transition_probabilities[(state, action)]

is_terminal(state)

Return whether a state is a terminal state.

Source code in src/behavior_generation_lecture_python/mdp/mdp.py
135
136
137
def is_terminal(self, state) -> bool:
    """Return whether a state is a terminal state."""
    return state in self.terminal_states

sample_next_state(state, action)

Randomly sample the next state given the current state and taken action.

Source code in src/behavior_generation_lecture_python/mdp/mdp.py
147
148
149
150
151
152
153
154
155
156
157
158
def sample_next_state(self, state, action) -> Any:
    """Randomly sample the next state given the current state and taken action."""
    if self.is_terminal(state):
        raise ValueError("No next state for terminal states.")
    if action is None:
        raise ValueError("Action must not be None.")
    prob_per_transition = self.get_transitions_with_probabilities(state, action)
    num_actions = len(prob_per_transition)
    choice = np.random.choice(
        num_actions, p=[ppa[0] for ppa in prob_per_transition]
    )
    return prob_per_transition[choice][1]

PolicyGradientBuffer dataclass

Buffer for the policy gradient method.

Source code in src/behavior_generation_lecture_python/mdp/mdp.py
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
@dataclass
class PolicyGradientBuffer:
    """Buffer for the policy gradient method."""

    states: List[Any] = field(default_factory=list)
    actions: List[Any] = field(default_factory=list)
    weights: List[float] = field(default_factory=list)
    episode_returns: List[float] = field(default_factory=list)
    episode_lengths: List[int] = field(default_factory=list)

    def mean_episode_return(self) -> float:
        """Mean episode return."""
        return float(np.mean(self.episode_returns))

    def mean_episode_length(self) -> float:
        """Mean episode length."""
        return float(np.mean(self.episode_lengths))

mean_episode_length()

Mean episode length.

Source code in src/behavior_generation_lecture_python/mdp/mdp.py
526
527
528
def mean_episode_length(self) -> float:
    """Mean episode length."""
    return float(np.mean(self.episode_lengths))

mean_episode_return()

Mean episode return.

Source code in src/behavior_generation_lecture_python/mdp/mdp.py
522
523
524
def mean_episode_return(self) -> float:
    """Mean episode return."""
    return float(np.mean(self.episode_returns))

best_action_from_q_table(*, state, available_actions, q_table)

Derive the best action from a Q table.

Parameters:

Name Type Description Default
state Any

The state in which to take an action.

required
available_actions Set[Any]

Set of available actions.

required
q_table QTable

The Q table, mapping from state-action pair to value estimate.

required

Returns:

Type Description
Any

The best action according to the Q table.

Source code in src/behavior_generation_lecture_python/mdp/mdp.py
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
def best_action_from_q_table(
    *, state: Any, available_actions: Set[Any], q_table: QTable
) -> Any:
    """Derive the best action from a Q table.

    Args:
        state: The state in which to take an action.
        available_actions: Set of available actions.
        q_table: The Q table, mapping from state-action pair to value estimate.

    Returns:
        The best action according to the Q table.
    """
    available_actions_list = list(available_actions)
    values = np.array([q_table[(state, action)] for action in available_actions_list])
    action = available_actions_list[np.argmax(values)]
    return action

derive_deterministic_policy(mdp, policy)

Compute the best policy for an MDP given the stochastic policy.

Parameters:

Name Type Description Default
mdp MDP

The underlying MDP.

required
policy CategoricalPolicy

The stochastic policy.

required

Returns:

Type Description
Dict[Any, Any]

Deterministic policy, i.e. mapping from state to action.

Source code in src/behavior_generation_lecture_python/mdp/mdp.py
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
def derive_deterministic_policy(mdp: MDP, policy: CategoricalPolicy) -> Dict[Any, Any]:
    """Compute the best policy for an MDP given the stochastic policy.

    Args:
        mdp: The underlying MDP.
        policy: The stochastic policy.

    Returns:
        Deterministic policy, i.e. mapping from state to action.
    """
    pi = {}
    for state in mdp.get_states():
        if mdp.is_terminal(state):
            continue
        pi[state] = policy.get_action(
            state=torch.as_tensor(state, dtype=torch.float32), deterministic=True
        )
    return pi

derive_policy(mdp, utility_of_states)

Compute the best policy for an MDP given the utility of the states.

Parameters:

Name Type Description Default
mdp MDP

The underlying MDP.

required
utility_of_states StateValueTable

The dictionary containing the utility (estimate) of all states.

required

Returns:

Type Description
Dict[Any, Any]

Policy, i.e. mapping from state to action.

Source code in src/behavior_generation_lecture_python/mdp/mdp.py
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
def derive_policy(mdp: MDP, utility_of_states: StateValueTable) -> Dict[Any, Any]:
    """Compute the best policy for an MDP given the utility of the states.

    Args:
        mdp: The underlying MDP.
        utility_of_states: The dictionary containing the utility
            (estimate) of all states.

    Returns:
        Policy, i.e. mapping from state to action.
    """
    pi = {}
    for state in mdp.get_states():
        pi[state] = max(
            mdp.get_actions(state),
            key=lambda action: expected_utility_of_action(
                mdp=mdp, state=state, action=action, utility_of_states=utility_of_states
            ),
        )
    return pi

expected_utility_of_action(mdp, state, action, utility_of_states)

Compute the expected utility of taking an action in a state.

Parameters:

Name Type Description Default
mdp MDP

The underlying MDP.

required
state Any

The start state.

required
action Any

The action to be taken.

required
utility_of_states StateValueTable

The dictionary containing the utility (estimate) of all states.

required

Returns:

Type Description
float

Expected utility

Source code in src/behavior_generation_lecture_python/mdp/mdp.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
def expected_utility_of_action(
    mdp: MDP, state: Any, action: Any, utility_of_states: StateValueTable
) -> float:
    """Compute the expected utility of taking an action in a state.

    Args:
        mdp: The underlying MDP.
        state: The start state.
        action: The action to be taken.
        utility_of_states: The dictionary containing the utility
            (estimate) of all states.

    Returns:
        Expected utility
    """
    return sum(
        p * (mdp.get_reward(next_state) + utility_of_states[next_state])
        for (p, next_state) in mdp.get_transitions_with_probabilities(
            state=state, action=action
        )
    )

greedy_value_estimate_for_state(*, q_table, state)

Compute the greedy (best possible) value estimate for a state from the Q table.

Parameters:

Name Type Description Default
state Any

The state for which to estimate the value, when being greedy.

required
q_table QTable

The Q table, mapping from state-action pair to value estimate.

required

Returns:

Type Description
float

The value based on the greedy estimate.

Source code in src/behavior_generation_lecture_python/mdp/mdp.py
412
413
414
415
416
417
418
419
420
421
422
423
424
425
def greedy_value_estimate_for_state(*, q_table: QTable, state: Any) -> float:
    """Compute the greedy (best possible) value estimate for a state from the Q table.

    Args:
        state: The state for which to estimate the value, when being greedy.
        q_table: The Q table, mapping from state-action pair to value estimate.

    Returns:
        The value based on the greedy estimate.
    """
    available_actions = [
        state_action[1] for state_action in q_table.keys() if state_action[0] == state
    ]
    return max(q_table[state, action] for action in available_actions)

policy_gradient(*, mdp, policy, lr=0.01, iterations=50, batch_size=5000, seed=None, return_history=False, use_random_init_state=False, verbose=True)

Train a paramterized policy using vanilla policy gradient.

Adapted from: https://github.com/openai/spinningup/blob/master/spinup/examples/pytorch/pg_math/1_simple_pg.py

The MIT License (MIT)

Copyright (c) 2018 OpenAI (http://openai.com)

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

Parameters:

Name Type Description Default
mdp MDP

The underlying MDP.

required
policy CategoricalPolicy

The stochastic policy to be trained.

required
lr float

Learning rate.

0.01
iterations int

Number of iterations.

50
batch_size int

Number of samples generated for each policy update.

5000
seed Optional[int]

Random seed for reproducibility (default: None).

None
return_history bool

Whether to return the whole history of value estimates instead of just the final estimate.

False
use_random_init_state bool

bool, if the agent should be initialized randomly.

False
verbose bool

bool, if traing progress should be printed.

True

Returns:

Type Description
Union[List[CategoricalPolicy], CategoricalPolicy]

The final policy, if return_history is false. The

Union[List[CategoricalPolicy], CategoricalPolicy]

history of policies as list, if return_history is true.

Source code in src/behavior_generation_lecture_python/mdp/mdp.py
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
def policy_gradient(
    *,
    mdp: MDP,
    policy: CategoricalPolicy,
    lr: float = 1e-2,
    iterations: int = 50,
    batch_size: int = 5000,
    seed: Optional[int] = None,
    return_history: bool = False,
    use_random_init_state: bool = False,
    verbose: bool = True,
) -> Union[List[CategoricalPolicy], CategoricalPolicy]:
    """Train a paramterized policy using vanilla policy gradient.

    Adapted from: https://github.com/openai/spinningup/blob/master/spinup/examples/pytorch/pg_math/1_simple_pg.py

    The MIT License (MIT)

    Copyright (c) 2018 OpenAI (http://openai.com)

    Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

    The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

    Args:
        mdp: The underlying MDP.
        policy: The stochastic policy to be trained.
        lr: Learning rate.
        iterations: Number of iterations.
        batch_size: Number of samples generated for each policy update.
        seed: Random seed for reproducibility (default: None).
        return_history: Whether to return the whole history of value estimates
            instead of just the final estimate.
        use_random_init_state: bool, if the agent should be initialized randomly.
        verbose: bool, if traing progress should be printed.

    Returns:
        The final policy, if return_history is false. The
        history of policies as list, if return_history is true.
    """
    if seed is not None:
        np.random.seed(seed)
        torch.manual_seed(seed)

    # add untrained model to model_checkpoints
    model_checkpoints = [deepcopy(policy)]

    # make optimizer
    optimizer = torch.optim.Adam(policy.net.parameters(), lr=lr)

    # get non-terminal states
    non_terminal_states = [state for state in mdp.states if not mdp.is_terminal(state)]

    # training loop
    for i in range(1, iterations + 1):
        # a buffer for storing intermediate values
        buffer = PolicyGradientBuffer()

        # reset episode-specific variables
        if use_random_init_state:
            state = non_terminal_states[np.random.choice(len(non_terminal_states))]
        else:
            state = mdp.initial_state
        episode_rewards = []

        # collect experience by acting in the mdp
        while True:
            # save visited state
            buffer.states.append(deepcopy(state))

            # call model to get next action
            action = policy.get_action(state=torch.tensor(state, dtype=torch.float32))

            # execute action in the environment
            state, reward, done = mdp.execute_action(state=state, action=action)

            # save action, reward
            buffer.actions.append(action)
            episode_rewards.append(reward)

            if done:
                # if episode is over, record info about episode
                episode_return = sum(episode_rewards)
                episode_length = len(episode_rewards)
                buffer.episode_returns.append(episode_return)
                buffer.episode_lengths.append(episode_length)
                # the weight for each logprob(a|s) is R(tau)
                buffer.weights += [episode_return] * episode_length

                # reset episode-specific variables
                if use_random_init_state:
                    state = non_terminal_states[
                        np.random.choice(len(non_terminal_states))
                    ]
                else:
                    state = mdp.initial_state
                episode_rewards = []

                # end experience loop if we have enough of it
                if len(buffer.states) > batch_size:
                    break

        # compute the loss
        logp = policy.get_log_prob(
            states=torch.tensor(buffer.states, dtype=torch.float),
            actions=torch.tensor(buffer.actions, dtype=torch.long),
        )
        batch_loss = -(logp * torch.tensor(buffer.weights, dtype=torch.float)).mean()

        # take a single policy gradient update step
        optimizer.zero_grad()
        batch_loss.backward()
        optimizer.step()

        # logging
        if verbose:
            print(
                f"iteration: {i:3d};  return: {buffer.mean_episode_return():.3f};  episode_length: {buffer.mean_episode_length():.3f}"
            )
        if return_history:
            model_checkpoints.append(deepcopy(policy))
    if return_history:
        return model_checkpoints
    return policy

q_learning(*, mdp, alpha, epsilon, iterations, seed=None, return_history=False)

Derive a value estimate for state-action pairs by means of Q learning.

Parameters:

Name Type Description Default
mdp MDP

The underlying MDP.

required
alpha float

Learning rate.

required
epsilon float

Exploration-exploitation threshold. A random action is taken with probability epsilon, the best action otherwise.

required
iterations int

Number of iterations.

required
seed Optional[int]

Random seed for reproducibility (default: None).

None
return_history Optional[bool]

Whether to return the whole history of value estimates instead of just the final estimate.

False

Returns:

Type Description
Union[QTable, List[QTable]]

The final value estimate, if return_history is false. The

Union[QTable, List[QTable]]

history of value estimates as list, if return_history is true.

Source code in src/behavior_generation_lecture_python/mdp/mdp.py
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
def q_learning(
    *,
    mdp: MDP,
    alpha: float,
    epsilon: float,
    iterations: int,
    seed: Optional[int] = None,
    return_history: Optional[bool] = False,
) -> Union[QTable, List[QTable]]:
    """Derive a value estimate for state-action pairs by means of Q learning.

    Args:
        mdp: The underlying MDP.
        alpha: Learning rate.
        epsilon: Exploration-exploitation threshold. A random action is taken with
            probability epsilon, the best action otherwise.
        iterations: Number of iterations.
        seed: Random seed for reproducibility (default: None).
        return_history: Whether to return the whole history of value estimates
            instead of just the final estimate.

    Returns:
        The final value estimate, if return_history is false. The
        history of value estimates as list, if return_history is true.
    """
    if seed is not None:
        np.random.seed(seed)

    q_table = {}
    for state in mdp.get_states():
        for action in mdp.get_actions(state):
            q_table[(state, action)] = 0.0
    q_table_history = [q_table.copy()]
    state = mdp.initial_state

    for _ in range(iterations):
        # available actions:
        avail_actions = mdp.get_actions(state)

        # choose action (exploration-exploitation trade-off)
        rand = np.random.random()
        if rand < (1 - epsilon):
            chosen_action = best_action_from_q_table(
                state=state, available_actions=avail_actions, q_table=q_table
            )
        else:
            chosen_action = random_action(avail_actions)

        # interact with environment
        next_state = mdp.sample_next_state(state, chosen_action)

        # update Q table
        greedy_value_estimate_next_state = greedy_value_estimate_for_state(
            q_table=q_table, state=next_state
        )
        q_table[(state, chosen_action)] = (1 - alpha) * q_table[
            (state, chosen_action)
        ] + alpha * (mdp.get_reward(next_state) + greedy_value_estimate_next_state)

        if return_history:
            q_table_history.append(q_table.copy())

        if mdp.is_terminal(next_state):
            state = mdp.initial_state  # restart
        else:
            state = next_state  # continue

    if return_history:
        utility_history = []
        for q_tab in q_table_history:
            utility_history.append(
                {
                    state: greedy_value_estimate_for_state(q_table=q_tab, state=state)
                    for state in mdp.get_states()
                }
            )
        return utility_history

    return {
        state: greedy_value_estimate_for_state(q_table=q_table, state=state)
        for state in mdp.get_states()
    }

random_action(available_actions)

Derive a random action from the set of available actions.

Parameters:

Name Type Description Default
available_actions Set[Any]

Set of available actions.

required

Returns:

Type Description
Any

A random action.

Source code in src/behavior_generation_lecture_python/mdp/mdp.py
397
398
399
400
401
402
403
404
405
406
407
408
409
def random_action(available_actions: Set[Any]) -> Any:
    """Derive a random action from the set of available actions.

    Args:
        available_actions: Set of available actions.

    Returns:
        A random action.
    """
    available_actions_list = list(available_actions)
    num_actions = len(available_actions_list)
    choice = np.random.choice(num_actions)
    return available_actions_list[choice]

value_iteration(mdp, epsilon, max_iterations, return_history=False)

Derive a utility estimate by means of value iteration.

Parameters:

Name Type Description Default
mdp MDP

The underlying MDP.

required
epsilon float

Termination criterion: if maximum difference in utility update is below epsilon, the iteration is terminated.

required
max_iterations int

Maximum number of iterations, if exceeded, RuntimeError is raised.

required
return_history Optional[bool]

Whether to return the whole history of utilities instead of just the final estimate.

False

Returns:

Type Description
Union[StateValueTable, List[StateValueTable]]

The final utility estimate, if return_history is false. The

Union[StateValueTable, List[StateValueTable]]

history of utility estimates as list, if return_history is true.

Source code in src/behavior_generation_lecture_python/mdp/mdp.py
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
def value_iteration(
    mdp: MDP,
    epsilon: float,
    max_iterations: int,
    return_history: Optional[bool] = False,
) -> Union[StateValueTable, List[StateValueTable]]:
    """Derive a utility estimate by means of value iteration.

    Args:
        mdp: The underlying MDP.
        epsilon: Termination criterion: if maximum difference in utility
            update is below epsilon, the iteration is terminated.
        max_iterations: Maximum number of iterations, if exceeded,
            RuntimeError is raised.
        return_history: Whether to return the whole history of utilities
            instead of just the final estimate.

    Returns:
        The final utility estimate, if return_history is false. The
        history of utility estimates as list, if return_history is true.
    """
    utility = {state: 0.0 for state in mdp.get_states()}
    utility_history = [utility.copy()]
    for _ in range(max_iterations):
        utility_old = utility.copy()
        max_delta = 0.0
        for state in mdp.get_states():
            utility[state] = max(
                expected_utility_of_action(
                    mdp, state=state, action=action, utility_of_states=utility_old
                )
                for action in mdp.get_actions(state)
            )
            max_delta = max(max_delta, abs(utility[state] - utility_old[state]))
        if return_history:
            utility_history.append(utility.copy())
        if max_delta < epsilon:
            if return_history:
                return utility_history
            return utility
    raise RuntimeError(f"Did not converge in {max_iterations} iterations")