* update

* Update MC_113_GridWorld_6_6.py

* Update MC_113_GridWorld_6_6.py

* Update MC_113_GridWorld_6_6.py

* Update MC_113_GridWorld_6_6.py

* up

* hu

* update

* update

* update

* update

* update

* update

* update

* update

* update
This commit is contained in:
xiaowuhu 2022-08-16 08:00:57 +08:00 коммит произвёл GitHub
Родитель bd757f1a48
Коммит e1f7183228
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
38 изменённых файлов: 1109 добавлений и 551 удалений

Просмотреть файл

@ -197,8 +197,9 @@ if __name__=="__main__":
假设有一个均值为 3.1 的不知名分布,我们从中采样四次,得到序列 $x=[1,2,4,5]$,则:
- 均值为:$\mu=(1+2+4+5)/4=3$
- 方差为:$var(x)=[(1-3)^2+(2-3)^2+(4-3)^2+(5-3)^2]/4=2.5$
- 偏差为:$bias(x)=\sqrt{[(1+2+4+5)/4-3.1]^2}=0.1$
- 偏差为:$bias(x)=\sqrt{(3-3.1)^2}=0.1$
方差很大,偏差很小。

Просмотреть файл

Просмотреть файл

@ -5,7 +5,7 @@
<center>
<img src="./img/BlackJack.png">
图 11.1.2 二十一点游戏
图 11.1.1 二十一点游戏
</center>
该游戏由 2 到 6 人进行,一名庄家,其余为玩家。
@ -39,9 +39,9 @@
#### 状态空间
表 11.1.1 玩家的状态
表 11.1.1 玩家的状态空间
|庄家手中的明牌 $\to$<br>玩家手中的牌$\downarrow$|1|2|...|10|
|庄家明牌点数 $\to$<br>玩家点数总和$\downarrow$|1|2|...|10|
|-|-|-|-|-|
|12||||
|13||||
@ -54,11 +54,9 @@
1. 庄家手中的牌(一张明牌);
2. 玩家自己手中的所有牌。
因为当玩家手中的牌不到 12 点时,再来任何一张牌都后,总数都不会超过 21 点,所以我们只考虑 12 点到 21 点(包含天和的情况)之间的情况。
因为当玩家手中的牌不到 12 点时,再来任何一张牌都后,总数都不会超过 21 点,所以肯定是要牌。我们只考虑 12 点到 21 点(包含天和的情况)之间的情况。
所以,所有的状态在表 11.1.1 中所示,一共有 10x10=100 种状态。
另外,玩家有 A 的话,那将会有很大的变数,所以有 A 无 A 算是第三个因素,那么整个状态空间是 2x10x10=200。
所有的状态在表 11.1.1 中所示,一共有 10x10=100 种状态。另外,玩家有 A 的话,那将会有很大的变数,所以有 A 无 A 算是第三个因素,那么整个状态空间是 2x10x10=200。
#### 分幕与奖励
@ -76,10 +74,12 @@
在本问题中,是站在某个玩家的角度来考虑问题,不管其它玩家的情况如何。因为庄家的规则是固定的,玩家的策略相对灵活,而不同的玩家有不同的策略,因人而异。我们的目的是找到最优的玩家策略。
所以,就假设只有庄家和一个玩家。
所以,就假设只有庄家和一个玩家。那么强化学习的目标就是在表 11.1.1 中的每个格子内决定采取要牌hit还是停牌stick
#### 环境测试
【代码位置】MC_111_BlackJack.py
```python
import gym
import numpy as np

Просмотреть файл

@ -1,24 +1,30 @@
## 11.2 蒙特卡洛控制
### 11.2.1 价值迭代与策略迭代
要想解决二十一点问题,就需要得到每个状态下的最优策略,比如:当庄家明牌是 9 点,玩家自己手中是 16 点,此时是要牌还是停牌?
在第 9 章中,我们已经学习了如何得到最优策略,就是为了解决这一类问题的。但是在很多问题中,比如二十一点,没有环境信息,不能使用贝尔曼最优方程及动态规划方法,所以就要研究蒙特卡洛控制问题。
什么是蒙特卡洛控制?
在第 10 章中,我们学习了蒙特卡洛预测,就是通过采样的方法来评估一个策略的状态价值函数和动作价值函数,也称作策略评估。基于这个评估结果,可以在每个状态上都取动作价值函数最大的方向,形成新的策略,然后再进行一轮或多轮策略评估,得到每个状态上的新的动作价值函数,再取最大值形成新策略。这就是蒙特卡洛控制,意为控制智能体采取最优策略。
策略迭代
#### 价值迭代
上一节中,通过对三种策略的评估比较得知:相对正确的策略,其动作价值函数值也会相对较大。那么如何找到最优策略呢?
我们先复习一下第九章中学习的价值迭代的工作方法,如图 11.1.1 所示。
我们先复习一下第 9 章中学习的价值迭代的工作方法,如图 11.2.1 所示。
<center>
<img src='./img/ValueIteration.png'>
图 11.1.1 价值迭代方法示意图
图 11.2.1 价值迭代方法示意图
</center>
价值迭代,是以计算动作价值函数 $Q_{k+1}(s,a)=P[R+\gamma V_k(s')]$ 为手段,在每次迭代中都取 $V_{k+1}(s)=\max_a Q_{k+1}(s,a)$,这样一步步地逼近最优价值函数。
#### 策略迭代
在本章中,由于没有环境信息(缺乏 $P,R$ 的直接定义),所以无法使用价值迭代法,只能尝试其它方法。而在第十章中曾经提到过:预测 $Q$ 函数比预测 $V$ 函数更有用,因为从 $Q$ 函数的表格型结果中可以抽取出策略 $\pi$ 来。由此,我们看到了解决问题的思路:
1. 首先,给定任意策略 $\pi$,使用蒙特卡洛方法来预测 $Q$,这在第十章中已经学习过了;
@ -33,10 +39,7 @@
这种方法被称为**策略迭代**。
【算法 11.1】策略迭代
【算法 11.2】策略迭代
----
@ -46,33 +49,36 @@
  初始化数组:$Q(S,A) \leftarrow 0$
  初始化策略:$\pi(S) \leftarrow Random$ 随机策略
2. 策略评估(多幕 Episodes 循环):
  根据当前策略与环境进行多幕交互
2. 策略评估(单幕或多幕 Episodes 循环):
  根据当前策略与环境进行交互
  得到 $Q(S,A)$
3. 策略改进
  $old.policy \leftarrow \pi$
  对每个状态更新策略:$\pi(s) \leftarrow \argmax_a Q(s)$
  如果 $\pi \ne old.policy$,跳转到第 2 步
  否则结束
  或者是达到了指定的多幕循环次数而结束
输出:$Q(S,A), \pi$
----
### 11.2.2 策略迭代算法研究基础代码框架
在上述算法中,策略改进是我们本章要研究的重点,其它部分利用以前的知识都可以抽象成通用模块,所以我们来做一下模块划分,以便可以统一接口,专心研究算法部分。
模块可以做如图 11.1.1 的划分。
模块可以做如图 11.2.2 的划分。
<center>
<img src="./img/PolicyIterationFlow.png">
图 11.1.1 策略迭代示意图
(左图:初始化在迭代中;右图:初始化在迭代外)
图 11.2.2 策略迭代示意图
</center>
有左右两个子图,我们稍后再解释,先看看各个模块的具体功能。
下面看看各个模块的具体功能。
【代码位置】Algorithm/Base_MC_Policy_Iteration.py
#### 初始化模块
@ -80,21 +86,19 @@
# 策略迭代
class Policy_Iteration(object):
# 初始化
def __init__(self, env, policy, gamma, episodes, final, check_policy_method=0):
self.policy = policy # 初始策略
self.rough_episodes = episodes # 粗预测的分幕循环次数
self.final_episodes = final # 细预测的分幕循环次数
self.env = env # 环境
self.gamma = gamma # 折扣
self.check_method = check_policy_method # 迭代终止条件
self.nA = self.env.action_space.n # 动作空间
self.nS = self.env.observation_space.n # 状态空间
self.n_iteration = 0 # 分幕循环次数总和
self.Value = np.zeros((self.nS, self.nA)) # G 的总和
self.Count = np.zeros((self.nS, self.nA)) # G 的数量
def __init__(self, env, init_policy, gamma):
self.env = env # 环境
self.gamma = gamma # 折扣
self.nA = self.env.action_space.n # 动作空间
self.nS = self.env.observation_space.n # 状态空间
self.n_episode = 0 # 分幕循环次数计数器
self.Value = np.zeros((self.nS, self.nA)) # 回报值 G 累计
self.Count = np.zeros((self.nS, self.nA)) # 访问次数累计
self.Q = np.zeros((self.nS, self.nA)) # Q=Value/Count实时计算
self.policy = init_policy # 初始策略
```
初始化工作很简单,主要是接收初始化参数,其中最主要的工作是最后两行代码,把计算 G 值的数据清零
初始化工作很简单,主要是接收初始化参数,准备好数据存储与计算的空间。其中的初始策略一般选用随机策略
#### 策略 $\pi$ 评估(粗预测)模块
@ -108,33 +112,35 @@ class Policy_Iteration(object):
<center>
<img src="./img/GridWorld44.png">
图 11.1.2 简单的方格世界
图 11.2.2 简单的方格世界
</center>
在图 11.1.2 所示的 4x4 方格世界中,可以从任意点出发,终点为 $s_{15}$。在到达 $s_{15}$ 之前,每一步移动都得到 -1 的奖励,到 $s_{15}$ 的奖励为 0。智能体可以选择四个方向移动出界后会返回原地。无折扣。
在图 11.2.2 所示的 4x4 方格世界中,可以从任意点出发,终点为 $s_{15}$。在到达 $s_{15}$ 之前,每一步移动都得到 -1 的奖励,到 $s_{15}$ 的奖励为 0。智能体可以选择四个方向移动出界后会返回原地。无折扣。
由于存在 $s_{15}$ 终止状态,所以是一个分幕任务,可以用蒙特卡洛法来解决。当然,这个问题也可以用动态规划来解决,因为知道环境信息。
我们先看看用动态规划法得到的解:
我们先看看用动态规划法得到的解,如图 11.2.3 所示。
【代码位置】MC_112_GridWorld.py
<center>
<img src="./img/GridWorld44_DP.png">
图 11.1.2 简单的方格世界
图 11.2.3 简单的方格世界
Q 表格函数值;右:动作方向)
</center>
可以看到这种方法的结果是沿对角线成镜像对称的,很漂亮。
我们再看看用蒙特卡洛法循环 100幕、200幕、300幕、400幕、500幕和1000幕得到的解
我们再看看用蒙特卡洛法采样 100幕、200幕、300幕、400幕、500幕和1000幕得到的解如图 11.2.4。
<center>
<img src="./img/GridWorld44_MC.png">
图 11.1.2 简单的方格世界
图 11.2.4 简单的方格世界
</center>
- 100幕的结果只有左上角的状态位不靠谱
@ -142,30 +148,46 @@ class Policy_Iteration(object):
- 300幕时在细节上与后面的图还有差距但是在每个状态上的动作选择都是合理的尽管不是最优的
- 后续三图,动作选择已经没有什么可挑剔的了。
在比较图 11.1.2 和 图 11.1.3 后,读者可能会有疑问:在图 11.1.2 中,$s_0,s_5,s_{10}$ 三个状态上的动作是可以向下和向右,两个 q 函数值相等,而整体上看,也是以对角线为中心成镜像的。那为什么在图 11.1.3 中,即使是经过了 1000 幕,也没有出现这种完全对称的情况?
在比较图 11.2.3 和 图 11.2.4 后,读者可能会有疑问:在图 11.2.3 中,$s_0,s_5,s_{10}$ 三个状态上的动作是可以向下和向右,两个 q 函数值相等,而整体上看,也是以对角线为中心成镜像的。那为什么在图 11.2.4 中,即使是经过了 1000 幕,也没有出现这种完全对称的情况?
因为图 11.1.2 是用动态规划算法得到的结果,比较精准,如果一个状态中有两个动作等价,那这种方法确实可以得出两个相等的 q 值来。而图 11.1.3 是用蒙特卡洛法,就不那么精准了,误差在小数点以后两位都是正常的,所以极少能够得到两个完全相等的 q 值。
因为图 11.2.4 是用动态规划算法得到的结果,比较精准,如果一个状态中有两个动作等价,那这种方法确实可以得出两个相等的 q 值来。而图 11.2.4 是用蒙特卡洛法,就不那么精准了,误差在小数点以后两位都是正常的,所以极少能够得到两个完全相等的 q 值。
比如 $q_1$ = 0.0506$q_2$ = 0.0507,在我们看来 $q_1,q_2$ 之间的误差是蒙特卡洛方法造成的,可以忽略,但是算法就是认为它们不等,除非强制截断到小数点后两位。但是,不同的问题,或者相同的问题但折扣不同,小数点后截断多少位不是固定的,需要具体问题具体分析。
比如 $q_1$ = 0.0506$q_2$ = 0.0507,在我们看来 $q_1,q_2$ 之间的误差是蒙特卡洛方法造成的,可以忽略,但是算法就是认为它们不等,除非强制截断到小数点后两位。对于不同的问题,或者相同的问题但折扣不同,小数点后的精度需要多少位不是固定的,需要具体问题具体分析。
回到我们要解决的问题上来,事实证明,经过 300 幕的循环,就能够得到正确的策略了,在此之上继续做策略迭代没有问题,不必非得要循环到 Q 函数值收敛为止。而在实际的应用中,我们甚至可以让粗预测模块循环更少的次数,以加快整体迭代速度
下面是具体实现,基本就是采样,然后计算 G 值
在图 11.1.1 所示的价值迭代的算法中(贝尔曼最优方程),每计算一次 Q 值,立刻从中取最大者来计算一次 V 值,相当于评估与改进的比例为 1:1这是一种极端的例子。
```python
# 策略迭代
def policy_iteration(self, episodes):
for _ in tqdm.trange(episodes): # 多幕循环
self.n_episode += 1
# 重置环境,开始新的一幕采样
Episode = self.sampling()
# 从后向前遍历计算 G 值
G = 0
for t in range(len(Episode)-1, -1, -1):
G, s = self.calculate(Episode, t, G) # 计算Gt值
self.policy_improvement(s) # 评估后立刻做策略改进
```
回到我们要解决的问题上来,事实证明,经过 300 幕的循环,方格世界问题就能够得到正确的策略了,在此之上继续做策略迭代没有问题,不必非得要循环到 $Q$ 函数值收敛为止,也就是说最终的 $Q$ 函数值可以不等于理论上的最优策略的最大值,但是策略与最优策略没有差别。而在实际的应用中,我们甚至可以让粗预测模块循环更少的次数,以加快整体迭代速度。
在图 11.2.1 所示的价值迭代的算法中(贝尔曼最优方程),每计算一次 $Q$ 值,立刻从中取最大者来计算一次 $V$ 值,相当于评估与改进的速度比例为 1:1这是一种极端的情况。
#### 策略改进模块
我们可以借助图 11.1.2 来理解策略改进机制。
我们可以借助图 11.2.5 来理解策略改进机制。
<center>
<img src="./img/PolicyIteration.png">
图 11.1.2 策略迭代示意图
图 11.2.5 策略迭代示意图
</center>
初始化策略可以命名为 $\pi_0$,而 $Q_0$ 在代码中虽然不存在,但是 $Q=Value/Count$,所以 self.Value=np.zeros(...) 和 self.Count=np.zeros(...) 代表了 $Q_0 \leftarrow 0$。
过程描述:
1. 初始化 $Q, \pi$ 为 0不妨命名为 $Q_0, \pi_0$
1. 初始化 $Q, \pi$ 为 0不妨命名为 $Q_0, \pi_0$$Q_0$ 为全 0 的表格
2. 使用蒙特卡洛每次访问法根据 $\pi_0$ 估算 $Q_1$
3. 使用贪心算法从 $Q_1$ 中抽取出新的策略 $\pi_1$
4. 比较 $\pi_0$ 和 $\pi_1$,如果一致则认为收敛;
@ -174,38 +196,36 @@ class Policy_Iteration(object):
7. 比较 $\pi_1$ 和 $\pi_2$,如果一致则认为收敛;
......
以此类推,直到收敛为止,就认为得到了最优策略 $\pi_*$,如果需要的话可以再估算一次 $Q$ 值来得到 $Q_*$
所以,所谓的策略改进模块(算法),就是 $\pi=greedy(Q)$ 贪心算法,也是我们本章研究的对象。
以此类推,直到收敛为止,就认为得到了最优策略 $\pi_*$,如果需要的话可以再估算一次 $Q$ 值来得到 $Q_*$。所以,所谓的策略改进模块(算法),就是 $\pi=greedy(Q)$ 贪心算法,也是我们本章研究的对象。
这里的代码只是定义了一个 policy_improvement() 函数,并没有具体实现,要靠后面将要研究的各种算法来重写此函数。
```python
# 策略改进(算法重写此函数)
def policy_improvement(self, Q):
def policy_improvement(self, s):
pass
```
该函数输入为 $Q$ 表格值,实现时需要在内部修改 self.policy 的值,来改进策略。
该函数输入为状态值 $s$,实现时需要在内部计算 self.Q[s],然后修改 self.policy[s] 的值,来改进策略。
注意,这个框架缺省设置为在做一次策略评估后,立刻做一次策略改进,有些过于激进。如果需要做多次策略评估后才进行一次策略改进,需要在实现 policy_improvement() 函数时,判断 self.n_episode 的值是否可以被 $n$ 整除($n$ 是需要策略评估的次数),然后再做策略改进。由于随机过程的存在,只进行一次策略评估就立刻做策略改进的话,确实有些不够稳妥。
至于算法何时可以结束的问题,检测是否收敛有一定的难度,因为蒙特卡洛采样会造成一定的波动,整体看会有一个收敛的趋势,如图 11.2.6 所示。
<center>
<img src="./img/error_episode.png">
图 11.2.6 蒙特卡洛方法误差收敛过程示意图
</center>
虚线是整体趋势,呈下降/收敛形态,但是局部可能会有较大波动,以至于很难用算法来判断是否收敛。在中间的某些时段内,误差已经呈收敛态势,但是忽然就会有一个上升的现象发生。所以,一般会用绘制图表的方法来辅助判断。
#### 策略 $\pi_*$ 评估(细预测)模块
这一部分是可选的,用来验证我们认为的最优策略 $\pi_*$ 是不是名副其实。简单的验证方法是从 $\pi_*$ 得到 $Q_*$ 后,看看 $Q_*$ 的值是不是大于以往每个策略上得到的相应位置的 $Q$ 值,或者看循环多次以后,$Q_*$ 表格中的每个值是不是都可以收敛到某个具体的数值。
####
左图
初始化放在迭代内部,理由是:使用 $\pi$ 策略评估模块进行预测,生成 Q 表格后,通过策略改进模块抽取出新的策略 $\pi'$。此时应该再次初始化,以保证
右图
这里需要注意一个问题,在做最终的评估时,需要把 $Q$ 表格置为全 0用于统计蒙特卡洛法采样时累计原始数据的 Value 和 Count 也要置为全 0相当于重新开始。因为在得到最优策略之前已经累计了一些回报值会拉低整体平均值不能计算在最优策略之内。

Просмотреть файл

@ -1,26 +1,236 @@
## 贪心法与探索性出发
## 11.3 贪心法与探索性出发
### 11.3.1 贪心法Greedy
仍然用上一小节中的简单的方格世界举例,来理解**策略改进**是如何工作的。但是为了尽快收敛和增加复杂性,在 11.2 的方格世界中做了三个改动:
1. 每次移动的奖励为 0出界时有 -1 的奖励,到达终点时奖励 1。这样可以减少出界的情况发生。
2. 每次移动有 0.8 的概率到达目标状态,各有 0.1 的概率偏向左右两个状态,如果出界则回到原地。
3. 设置折扣为 $\gamma$ = 0.9,这样如果在界内多绕路的话,会得到较小的奖励,鼓励智能体走最短路径。
根据图 11.2.5,算法可以这样写:
【算法 11.3.1】
---
输入:起始状态 $s$,策略 $\pi$,折扣 $\gamma$, 幕数 Episodes
初始化数组:$V(S,A) \leftarrow 0, N(S,A) \leftarrow 0$$S$ 为状态空间,$A$ 为动作空间
多幕 Episodes 循环:
  列表置空 $Episode = [\ ] $ 用于存储序列数据 $(s,a,r)$
  幕内循环直到终止状态:
    $Episode \leftarrow$ 采样
  $G_t \leftarrow 0$
  对 $Episode$ 从后向前遍历, $t=\tau-1,\tau-2,...,0$
    计算 $G_t$
    $Q(s_t,A) \leftarrow V(s_t,A) / N(s_t,A)$
    $\pi(s_t) \leftarrow \argmax_a Q(s_t,A)$ # 取出最大值对应的动作序号
输出:$Q(S,A)$
---
#### 贪心法
对比算法 10.5,前面的部分都是相同的,因为同样是计算 $Q$ 值。不同之处在于新增加了两行:
- 倒数第三行,$Q(s_t,A) \leftarrow V(s_t,A) / N(s_t,A)$,大写 $A$ 表示数组操作,同时计算某个状态下的所有动作的 $Q$ 值,便于后面对于该状态下的所有动作做比较;
- 倒数第二行,修改策略 $\pi$。
以下代码为初始化环境部分:
【代码位置】MC_113_GridWorld_ES.py
```python
env = model.GridWorld(
GridWidth, GridHeight, StartStates, EndStates, # 关于状态的参数
Actions, Policy, Transition, # 关于动作的参数
StepReward, SpecialReward, # 关于奖励的参数
SpecialMove, Blocks) # 关于移动的限制
gamma = 0.9
max_iteration = 500
```
可以看到初始的策略是随机的,每个动作的选择概率都是 0.25。折扣为 0.9。最大采样幕数为 500当然可以再多些
其中,我们用概率值来表示策略。
下面的代码是策略改进:
```python
class MC_Greedy(base.Policy_Iteration):
def policy_improvement(self, s):
# 做策略改进,贪心算法
self.Q[s] = self.Value[s] / self.Count[s] # 得到该状态下所有动作的 q 值
self.policy[s] = 0 # 先设置该状态所有策略为 0后面再把有效的动作设置为非 0
argmax = np.argwhere(self.Q[s]==np.max(self.Q[s])) # 取最大值所在的位置(可能不止一个)
p = 1 / argmax.shape[0] # 概率平均分配给几个最大值
for arg in argmax: # 每个最大值位置都设置为相同的概率
self.policy[s][arg[0]] = p
```
其中,“概率平均分配”的意思是:
- 如果最大值只有一个,比如 [0.23, 0., 0., 0. ],那么策略会是 [1, 0, 0, 0]
- 如果最大值只有两个,比如 [0.54, 0., 0.54., 0. ],那么策略会是 [0.5, 0, 0.5, 0]
- 以此类推,同时有三个最大值时概率是 0.333,四个时概率是 0.25。
当我们信心满满地用代码实现了这个算法后实际运行时会发现一个问题运行了几幕后程序就停在某处不动了通过审查算法可以知道能造成死循环的只有分幕采样部分。Debug 后发现,有两种情况导致程序在幕内循环时,始终不能到达终止状态:
1. 在某个边角位置的状态。智能体出界后回到原地,下一步又出界,还是回到原地,始终不能离开这个状态;
2. 在某几个状态内来回移动,形成闭环。比如从 $s_4$ 向右移动到 $s_5$;在 $s_5$ 向左移动,又回到 $s_4$。
为什么会造成这种情况呢?
还是要看算法:在每一次分幕采样结束后,立刻就开始进行策略更新。因为是从后向前遍历,所以在某个状态中,即使采取了错误方向的动作,由于后续的动作是正确的,那么开始的错误动作也会被认为正确的动作。
<center>
<img src="./img/greedy.png">
图 11.3.1
</center>
举例来说,在图 11.3.1 中,$s_{12}$ 状态时(我们暂时忽略从 $s_0$ 到 $s_{12}$ 的过程):
1. 随机选择了动作 1 向左走,回到原地;
2. 随机选择了动作 2 向右走到 $s_{13}$
3. 随机选择动作 3 向下走回到原地;
4. 随机选择动作 4 向右走到 $s_{14}$
5. 随机选择动作 5 向右走到 $s_{15}$,分幕结束。
当然,由于随机性较强,真实的采样过程可能比上面的例子更复杂。
#### 探索性出发Exploring Starts
由于最终是到达了终点,所以前面的一系列动作都被认为是有效的,包括错误的动作 1 和动作 3。在倒序遍历时
1. 看动作 5能够确定状态 $s_{14}$ 时向右走;
2. 看动作 4能够确定状态 $s_{13}$ 时向右走;
3. 看动作 3认为状态 $s_{13}$ 时应该向下走,这就把动作 4 的策略给覆盖了;
4. 看动作 2能够确定状态 $s_{12}$ 时向右走;
5. 看动作 1认为状态 $s_{12}$ 时应该向左走,这就把动作 2 的策略给覆盖了。
在 8.4 节中,我们学习过 Q 函数的定义。如果应用到冰面行走问题上Q 函数表格会如表 10.4.1 所示。
所以,最终确定的策略是在状态 $s_{12}$ 时向左走!这样就永远不能跳出分幕循环。原因很简单,就是分幕采样的样本太少了,不能做出正确的判断
表 10.4.1 二十一点问题的 Q 函数表格
### 11.3.2 探索性出发Exploring Starts
|状态 $\downarrow$ 动作$\to$|HIT|Stick|
|:-:|:-:|:-:|
|12|$q_\pi(12,a_0)$|$q_\pi(12,a_1)$|$q_\pi(s_0,a_2)$|$q_\pi(s_0,a_3)$|
|13|$q_\pi(13,a_0)$|$q_\pi(13,a_1)$|$q_\pi(s_1,a_2)$|$q_\pi(s_1,a_3)$|
除了上面说的错误动作外,在图 11.3.1 中还展示了另外一个问题:右上方的 9 个状态1,2,3,5,6,7,9,10,11没有被遍历到或者是被遍历的次数极少不能正确体现出其实际的动作价值。在 8.4 节中,我们学习过 $Q$ 函数的定义。如果应用到简单的方格世界问题上,$Q$ 函数表格会如表 11.3.1 所示。
表 11.3.1 简单方格问题的 $Q$ 函数表格
|状态 $\downarrow$ 动作$\to$|左|下|右|上|
|:-:|:-:|:-:|:-:|:-:|
|0|$q_\pi(s_0,a_0)$|$q_\pi(s_0,a_1)$|$q_\pi(s_0,a_2)$|$q_\pi(s_0,a_3)$|
|1|$q_\pi(s_1,a_0)$|$q_\pi(s_1,a_1)$|$q_\pi(s_1,a_2)$|$q_\pi(s_1,a_3)$|
|...|...|...|...|...|
|20|$q_\pi(20,a_0)$|$q_\pi(20,a_1)$|
|21|$q_\pi(21,a_0)$|$q_\pi(21,a_1)$|
|14|$q_\pi(s_{14},a_0)$|$q_\pi(s_{14},a_1)$|$q_\pi(s_{14},a_2)$|$q_\pi(s_{14},a_3)$|
|15|$q_\pi(s_{15},a_0)$|$q_\pi(s_{15},a_1)$|$q_\pi(s_{15},a_2)$|$q_\pi(s_{15},a_3)$|
状态-动作的组合构成 Q 表格,一共有 10x2=20 个组合。如果想评价在某个状态上哪个动作最好,那么最起码要在该状态上尝试完所有动作后才会有评价的基础。
状态-动作的组合构成 $Q$ 表格,一共有 16x4=64 个组合。如果想评价在某个状态上哪个动作最好,那么最起码要在该状态上尝试完所有动作后才会有评价的基础。
但是还有一个问题,如果总是从 $s_0$ 出发的,状态 $s_5$ 有可能会被绕过去,永远没有机会被访问到,虽然概率很小,但不可避免。所以我们需要以 $s_5$ 为起点,和 $s_0$ 一样做很多次分幕采样。
我们把这种方法叫做探索性出发,它包括三个含义:
1. 从所有可能的状态出发;
2. 尝试所有可能的动作;
3. 多次采样。
首先改一下方格世界的起始状态定义:
```python
# 起点,可以多个
StartStates = list(range(15))
```
上述代码可以产生一个列表,从 0 到 14不包括 15因为它是终止状态。
然后在方格世界的基础代码中,修改 reset() 函数,让它随机从 0-14 中选择起始状态。
```python
def reset(self):
self.curr_state = np.random.choice(self.StartStates)
return self.curr_state
```
关于策略,我们仍然在开始时使用随机策略,可以保证每个动作都被选择到。
```python
policy = helper.create_policy(env.nS, env.nA, (0.25, 0.25, 0.25, 0.25))
```
还需要指定一个我们期望的探索次数,即每个状态上的每个动作被选择的次数。
```python
exploration = 20
```
算法改进:
【算法 11.3.2】
---
输入:起始状态 $s$,策略 $\pi$,折扣 $\gamma$,幕数 Episodes最小搜索次数 exploration
初始化数组:$V(S,A) \leftarrow 0, N(S,A) \leftarrow 0$$S$ 为状态空间,$A$ 为动作空间
多幕 Episodes 循环:
  随机选择所有状态和所有动作作为起始
  列表置空 $Episode = [\ ] $ 用于存储序列数据 $(s,a,r)$
  幕内循环直到终止状态:
    $Episode \leftarrow$ 采样
  $G_t \leftarrow 0$
  对 $Episode$ 从后向前遍历, $t=\tau-1,\tau-2,...,0$
    计算 $G_t$
    $if \min_a N(s_t,A) > $ exploration则开始执行优化
      $Q(s_t,A) \leftarrow G(s_t,A) / N(s_t,A)$
      $\pi(s_t) \leftarrow \argmax_a Q(s_t,A)$ # 取出最大值对应的动作序号
输出:$Q(S,A)$
---
与算法 11.3.1 相比,有两点不同:
1. “随机选择所有状态和所有动作作为起始”是新增加的,用 env.reset() 来保证起始状态是随机的,用 policy=[0.25, 0.25, 0.25, 0.25] 来保证所有动作都被选择到。
2. $if \ \min_a N(s_t,A) > $ exploration是新增加的用来保证每个状态下的所有动作的被选择次数都大于探索次数 exploration才开始进行策略改进。
算法实现如下:
```python
class MC_ES(base.Policy_Iteration):
def __init__(self, env, init_policy, gamma, exploration):
super().__init__(env, init_policy, gamma)
self.exploration = exploration # 设置探索次数
def policy_improvement(self, s):
if np.min(self.Count[s]) <= exploration:
return # 如果次数不够,则不做策略改进
# 做策略改进,贪心算法
self.Q[s] = self.Value[s] / self.Count[s] # 得到该状态下所有动作的 q 值
self.policy[s] = 0 # 先设置该状态所有策略为 0后面再把有效的动作设置为非 0
argmax = np.argwhere(self.Q[s]==np.max(self.Q[s])) # 取最大值所在的位置(可能不止一个)
p = 1 / argmax.shape[0] # 概率平均分配给几个最大值
for arg in argmax: # 每个最大值位置都设置为相同的概率
self.policy[s][arg[0]] = p
```
主要是在做策略改进之前,先判断是不是探索次数足够多。有可能某些状态被访问了很多次,那么可以在这些状态上先做策略改进,而那些次数不够的状态,依然保持随机策略。
首先我们尝试 exploration = 10即每个动作的最小访问次数为 10 次时,会造成分幕采样时的死循环,所以最终令 exploration = 20可以避免这个问题。如果设置成更大的值效果更佳要针对具体情况来分析。
图 11.3.2 为不同的探索次数与最终的策略结果的对应关系。
<center>
<img src="./img/GridWorld44_Exploration.png">
图 11.3.2
</center>
- 真实值:
在对角线上是可以向下和向右两个方向移动,在对角线右上方都是向下移动,在对角线左下方都是向右移动;
- 探索 20 幕:$s_3$ 有问题;
- 探索 40 幕:$s_3$ 有问题;
- 探索 60 幕:没有问题;
- 探索 80 幕:
没有问题。由于是蒙特卡洛方法,所以不能要求非常精确地在对角线上是两个方向,只要看对角线上方和下方的箭头方向正确即可。
在探索 20 幕时,它不是最优策略,但是已经是可以到达终点的策略了,虽然在 $s_3$ 处还有问题。增加 max_iteration 的次数,并不会改善上面的问题,因为使用的是贪心策略,一旦定型,很难更改。此时需要增加 exploration 的数值,才能得到更好的策略。
在二十一点问题中,由于 GYM 环境的限制,我们无法指定初始状态,只能由环境随机分配,有可能是按正态分布或均匀分布,这会造成一些不确定性。但是在算法实现中的下面的这一行代码很“聪明”:
```python
if np.min(self.Count[s]) <= exploration:
return # 如果次数不够,则不做策略改进
```
可以通过指定 exlploration 值来变向使用探索性出发的算法,比如,令其为 500基本就可以满足二十一点的需求了。

Просмотреть файл

@ -1,11 +1,245 @@
### 10.4.2 软性策略
## 10.4 软性策略
$\varepsilon$-Soft
### 10.4.1 $\varepsilon$-贪心策略
我们已经不是第一次接触软性策略了,在第二章的躲避强盗问题中,曾经学习过梯度上升法,里面使用了 Softmax 函数,根据各个动作的价值计算出备选概率,而不是使用非黑即白的硬性策略 argmax() 来选择后续动作。这样做的好处是:一方面在以最大概率选择(利用)了历史表现最好的动作的同时,给其它表现不好的动作一定的机会来进行探索。
在 11.3 节中,我们使用贪心算法配合探索性出发,来评估动作价值。因为探索性出发可以提供足够的机会让备选动作来表现出价值,然后才能贪心,这与第二章的多臂强盗问题是相同的。但是它的缺点是显而易见的:
1. 不知道要探索多少次才算合理;
2. 需要每个状态都可以作为起点。
在本章中我们将学习软性策略来避开探索性出发的缺点。比如在二十一点问题中Gym 的交互环境不可能让我们指定每个状态作为起点,虽然它具有一定的随机性,但仍然不能满足均等的机会。
我们已经不是第一次接触软性策略了,在第二章的多臂强盗问题中,曾经学习过梯度上升法,里面使用了 Softmax 函数,根据各个动作的价值计算出备选概率,而不是使用非黑即白的硬性策略 argmax() 来选择后续动作。这样做的好处是:一方面在以最大概率选择(利用)了历史表现最好的动作的同时,给其它表现不好的动作一定的机会来进行探索。
具体到策略问题上来,与 4.4.2 节的 $\varepsilon$-贪心策略差不多使用一个概率值来贪心地执行当前已知的最佳动作而用另外一个概率值来进行探索。其策略表示为式11.4.1
$$
\pi'(a \mid s)=
\begin{cases}
1-\varepsilon+\frac{\varepsilon}{|A|}, & a=\argmax_a Q(s,a)
\\
\frac{\epsilon}{|A|}, & a \ne \argmax_a Q(s,a)
\end{cases}
\tag{11.4.1}
$$
用 $\pi'$ 表示更新策略,在实际的操作中是在原策略 $\pi$ 上直接更新,初始原策略可以是随机策略。
假设 $\varepsilon=0.4,|A|=4$,则:
- 当 $a$ 是四个动作的最大值时,$\pi(a|s)=1-0.4+0.4/4=0.7$
- 当 $a$ 不是四个动作的最大值时,$\pi(a|s)=0.4/4=0.1$
其中,最大值动作独占 0.7,其它三个动作分享剩下的 0.3。很容易看到,$0.7+0.1 \times 3=1.0$ 是一个全概率值,则后续的策略是在那个目前最佳的动作上,以 0.7 的概率继续贪婪执行,以 0.1 的概率选择执行其它三个动作。
【算法 11.3.2】
### 10.4.3 GLIE 方法
---
输入:起始状态 $s$,策略 $\pi$,折扣 $\gamma$,幕数 Episodes贪心系数 $\varepsilon$
初始化数组:$G(S,A) \leftarrow 0, N(S,A) \leftarrow 0$$S$ 为状态空间,$A$ 为动作空间
多幕 Episodes 循环:
  列表置空 $Episode = [\ ] $ 用于存储序列数据 $(s,a,r)$
  幕内循环直到终止状态:
    从 $s$ 根据策略 $\pi$ 得到动作 $a$
    执行 $a$ 从环境得到 $s',r$ 以及是否终止的标志
    $Episode \Leftarrow (s,a,r)$,相当于是 $(s_t,a_t,r_{t+1})$
    $s \leftarrow s'$
  $G_t \leftarrow 0$
  对 $Episode$ 从后向前遍历, $t=\tau-1,\tau-2,...,0$
    取出 $(s_t,a_t,r_{t+1})$
    $G_t \leftarrow \gamma G_t+r_{t+1}$
    $G(s_t,a_t) \leftarrow G(s_t,a_t)+G_t$
    $N(s_t,a_t) \leftarrow N(s_t,a_t)+1$
    $Q(s_t,A) \leftarrow G(s_t,A)/N(s_t,A)$
    $A^* \leftarrow \argmax_a Q(s_t,A)$
    $\pi(a \mid s)=
\begin{cases}
1-\varepsilon+\frac{\varepsilon}{|A|}, & if \ a=A^*
\\
\frac{\varepsilon}{|A|}, & if \ a\ne A^*
\end{cases}
$
输出:$Q(S,A)$
---
算法实现如下:
```python
class MC_SoftGreedy(base.Policy_Iteration):
def __init__(self, env, init_policy, gamma, epsilon):
super().__init__(env, init_policy, gamma)
self.epsilon = epsilon
self.best_p = 1 - epsilon + epsilon / self.nA # 最佳动作备选概率
self.other_p = epsilon / self.nA # 其它动作备选概率
def policy_improvement(self, s):
# 做策略改进soft-greedy 算法
if np.min(self.Count[s]) == 0: # 避免被除数为 0
return
self.Q[s] = self.Value[s] / self.Count[s] # 得到该状态下所有动作的 q 值
self.policy[s] = self.other_p # 先设置该状态所有策略为普通动作策略
argmax = np.argmax(self.Q[s])
self.policy[s, argmax] = self.best_p # 设置最佳动作策略
```
选择 $\varepsilon=[0.1, 0.3, 0.5, 0.7]$ 做试验,最大采样次数为 episodes = 1000得到图 11.3.2 所示结果:
<center>
<img src="./img/GridWorld44_SoftGreedy.png">
图 11.3.2
</center>
请读者注意,$\varepsilon$ 的值如果越大对探索越有利,越小越偏向贪心利用。
$\varepsilon=0.1$ 时,
$$
\pi(a \mid s)=
\begin{cases}
0.925, & a 是最佳动作,贪心利用的概率
\\
0.025, & a 不是最佳动作,探索的概率
\end{cases}
\tag{11.4.2}
$$
在图 11.3.2 第二张子图中,右上角的两个状态和左下角的两个状态实际上是不对的,这说明探索的概率不够。
$\varepsilon=0.7$ 时,
$$
\pi(a \mid s)=
\begin{cases}
0.475, & a 是最佳动作,贪心利用的概率
\\
0.175, & a 不是最佳动作,探索的概率
\end{cases}
\tag{11.4.3}
$$
在图 11.3.2 第五张子图中,所有状态上的策略都很完美。
当 $\varepsilon=0.3,0.5$ 时,第三张子图所示的策略非常不错,第四张子图反而不是很完美,$s_4$ 应该向右而不是向下,这与随机过程有关,与算法无关。
从式11.4.211.4.3可以看到贪心利用的概率值的差异一个是0.925一个是0.475,最终的结果是该值越大效果越好,越可以找到最终的最佳策略。
$\varepsilon$-贪心策略为什么可以做到策略改进呢?假设 $\pi$ 是上一轮的策略,$\pi'$ 是改进后的策略,则对任意的状态 $s$ 有:
$$
\begin{aligned}
q_\pi(s,\pi'(s))&=\sum \pi'(a|s)q_\pi(s,a)
\\
&=\frac{\varepsilon}{|A(s)|} \sum q_\pi(s,a) + (1-\varepsilon) \max_a q_\pi(s,a)
\\
&\ge \frac{\varepsilon}{|A(s)|} \sum q_\pi(s,a) + (1-\varepsilon) \sum_a \frac{\pi(a|s)-\frac{\varepsilon}{|A(s)|}}{1-\varepsilon} q_\pi(s,a)
\\
&= \frac{\varepsilon}{|A(s)|} \sum q_\pi(s,a) - \frac{\varepsilon}{|A(s)|} \sum q_\pi(s,a) + \sum_a \pi(a|s)q_\pi(s,a)
\\
&=v_\pi(s)
\end{aligned}
\tag{11.4.4}
$$
11.4.4)中第一个等号根据回溯图很容易得到,大于等于号中第二项将贪心改为了非贪心形式(一种加权平均的形式)。根据之前的证明,有 $v'_\pi \ge v_\pi$ 。
该策略改进方法,当改进后的策略与原策略相同时,是满足-软性策略下的贝尔曼最优方程的,因此一定能达到最优价值函数。
所以,我们现在把动态规划中的固定策略推广到了随机策略上。知道了如何让策略迭代适用于 -软性策略上。将贪心策略的原生概念扩展到了-软性策略上,在迭代时每一步都能保证策略有改进。注意:这里的分析假设了动作价值函数可以被精确的计算,但是在策略改进的时候,我们并不在意动作价值函数是如何被计算的。因此,我们目前可以获得 -软性策略集合中的最优策略了,不再需要试探性出发假设。
### 11.4.2 GLIE 方法
在上面的 $\varepsilon$-贪心策略中,给定一个 $\varepsilon$ 后,在整个过程中始终使用这个值来分配探索与利用的概率分配,在开始阶段,如果有错误的话,并且如果探索概率较小的话,在后期很难纠正。那么是不是可以这样做:在开始阶段使用比较大的概率来探索,在后期基本确定方向后,用较大的概率来利用?
这就是 GLIE 方法可以解决的问题。
GLIEGreedy in the Limit with Infinite Exploration有限贪心与无限探索这让笔者想起了吾辈之楷模雷锋叔叔的一句话把有限的生命投入到无限的为人民服务之中。转义成一个人在把个人生活搞好的前提下可以拿出更多的时间来帮助他人或者创造更多的社会价值这个世界会变得更美好。
具体这样实现:假设一共采样 1000 次的话,$k$ 为当前的采样次数,取值从 1 到 1000。然后取 $\varepsilon=1/k$,观察两个极限状态:
- $k=1$ 时,$\varepsilon=1$,贪心的概率为 $1-\varepsilon+\varepsilon/4=0.25$,探索的总概率为 $0.75$
- $k=1000$ 时,$\varepsilon=0.001$,贪心的概率为 $1-\varepsilon+\varepsilon/4=0.99925$,探索的总概率为 $0.00075$。
从图 11.4.2 左图可以看到,蓝色的贪心概率在 1000 次采样中形成一条变化的曲线,橙色的探索概率也形成一条曲线,二者在任一横坐标上相加为 1。但是两条曲线都过于陡峭了也就是说只探索了不到 75 次,贪心的概率已经大于 0.99 了。
<center>
<img src="./img/GLIE1.png">
图 11.4.2
左图传统的GLIE右图改进的GLIE
</center>
算法流程与算法 11.3.2 基本一致,只是修改策略 $\pi$ 的方法不一样。实现如下:
【代码位置】MC_11_GLIE_Standard.py
```python
class MC_GLIE_Standard(base.Policy_Iteration):
def policy_improvement(self, s):
if np.min(self.Count[s]) == 0: # 避免被除数为 0
return
epsilon = 1 / self.n_episode
other_p = epsilon / self.nA
best_p = 1 - epsilon + epsilon/self.nA
self.Q[s] = self.Value[s] / self.Count[s] # 得到该状态下所有动作的 q 值
self.policy[s] = other_p # 先设置该状态所有策略为 epsilong/nA
argmax = np.argmax(self.Q[s])
self.policy[s, argmax] = best_p
return self.policy
```
那么这种方法的探索和利用的比例是否合适呢?我们仍然用简单的方格世界来做试验,一共做四次独立的试验,每次都是采样 2000 幕,得到结果如图 11.4.3 所示。
<center>
<img src="./img/MC_GLIE_Standard.png">
图 11.4.3 传统的 GLIE 方法的结果
</center>
每次试验都有些状态上的策略不令人满意,比如:
- 第一次试验中的 $s_{12}$
- 第二次中的 $s_6$
- 第三次中的 $s_1,s_3,s_8$
- 第四次中的 $s_2,s_3,s_6,s_{13}$。
这说明在这个例子中探索的次数不够,所以我们需要调整图 11.4.2 左图中的曲线的形状,让它们更平缓一些。最简单的办法就是先取 $k$ 的对数,再求倒数,即令 $\varepsilon=1/(\lg(k)+1)$,则陡峭的曲线将会变得平缓,如图 11.4.2 右图所示。分母加 1 的原因是避免被除数为 0。当 $k=1000$ 时,$\varepsilon=0.25$,贪心的概率为 $1-\varepsilon+\varepsilon/4=0.8125$,探索的总概率为 $0.1875$。
算法流程与算法 11.3.2 基本一致,只是修改策略 $\pi$ 的方法不一样。实现如下:
【代码位置】MC_11_GLIE_Improved.py
```python
class MC_GLIE_Improved(base.Policy_Iteration):
def policy_improvement(self, s):
if np.min(self.Count[s]) == 0: # 避免被除数为 0
return
epsilon = 1 / (math.log(self.n_episode, 10)+1)
other_p = epsilon / self.nA
best_p = 1 - epsilon + epsilon/self.nA
self.Q[s] = self.Value[s] / self.Count[s] # 得到该状态下所有动作的 q 值
self.policy[s] = other_p # 先设置该状态所有策略为 epsilong/nA
argmax = np.argmax(self.Q[s])
self.policy[s, argmax] = best_p
return
```
注意要取以 10 为底的对数,否则就是取自然对数了,那么改进效率要低很多。
这种简单的改进带来的效果是明显的,仍然做 4 次独立试验,采样 2000 次,得到结果如图 11.4.4。
<center>
<img src="./img/MC_GLIE_Improved.png">
图 11.4.4 改进的 GLIE 方法的结果
</center>
有些错误的策略如:
- 第一次试验中的 $s_3$
- 第二次试验中的 $s_2$
但总体上比传统的 GLIE 方法要好很多。

Просмотреть файл

@ -0,0 +1,6 @@
### 思考与练习
1. 使用 11.3 节最后描述的探索性出发来解决二十一点问题。
### 参考资料

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 28 KiB

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 9.5 KiB

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 8.8 KiB

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 7.3 KiB

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 10 KiB

Двоичный файл не отображается.

До

Ширина:  |  Высота:  |  Размер: 31 KiB

После

Ширина:  |  Высота:  |  Размер: 16 KiB

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 12 KiB

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 14 KiB

Просмотреть файл

@ -7,7 +7,7 @@ def q_star(p_s_r_d, gamma, V):
# 遍历每个转移概率,以计算 q
for p, s_next, reward, done in p_s_r_d:
# math: \sum_{s'} p_{ss'}^a [ r_{ss'}^a + \gamma * v_{\pi}(s')]
q += p * (reward + gamma * V[s_next] * (1-done))
q += p * (reward + gamma * V[s_next])
return q
# 式 (9.6.3) 计算 v*
@ -29,6 +29,8 @@ def calculate_VQ_star(env, gamma, max_iteration):
V_old = V.copy()
# 遍历所有状态 s
for s in range(env.observation_space.n):
if env.is_end(s): # 终止状态v=0
continue
actions = env.P[s]
V[s] = v_star(s, actions, gamma, V, Q)
# 检查收敛性

Просмотреть файл

@ -4,75 +4,94 @@ import tqdm
# 策略迭代
class Policy_Iteration(object):
# 初始化
def __init__(self, env, policy_args, gamma, rough, final, check_policy_method=0):
self.rough_episodes = rough # 粗预测的分幕循环次数
self.final_episodes = final # 细预测的分幕循环次数
def __init__(self, env, init_policy, gamma):
self.env = env # 环境
self.gamma = gamma # 折扣
self.check_method = check_policy_method # 迭代终止条件
self.nA = self.env.action_space.n # 动作空间
self.n_iteration = 0 # 分幕循环次数总和
if env.spec.id == 'Blackjack-v1':
# 0-21=22, 0-10=11, A/noA=2, Hit/Stick=2
self.Value = np.zeros((22, 11, 2, self.nA)) # G 的总和
self.Count = np.zeros((22, 11, 2, self.nA)) # G 的数量
else:
self.n_episode = 0 # 分幕循环次数计数器
if hasattr(env, "spec") is False:
self.nS = self.env.observation_space.n # 状态空间
self.Value = np.zeros((self.nS, self.nA))
self.Count = np.zeros((self.nS, self.nA))
self.policy = self.init_policy(policy_args)
self.Q = np.zeros((self.nS, self.nA))
elif env.spec.id == 'Blackjack-v1':
# 0-21=22, 0-10=11, A/noA=2, Hit/Stick=2
self.Value = np.zeros((22, 11, 2, self.nA)) # G 的总和
self.Count = np.zeros((22, 11, 2, self.nA)) # G 的数量
self.policy = init_policy
def init_policy(self, policy_args):
if self.env.spec.id == 'Blackjack-v1':
policy = np.zeros_like(self.Value)
policy[:,:,:] = policy_args
return policy
def sampling_blackjack(self):
s = self.env.reset()
Episode = [] # 一幕内的(状态,奖励)序列
done = False
while (done is False): # 幕内循环
int_s = (s[0], s[1], int(s[2]))
action = np.random.choice(self.nA, p=self.policy[int_s])
next_s, reward, done, _ = self.env.step(action)
Episode.append((int_s, action, reward))
s = next_s
return Episode
def sampling(self):
s = self.env.reset()
Episode = [] # 一幕内的(状态,动作,奖励)序列
done = False
while (done is False): # 幕内循环
action = np.random.choice(self.nA, p=self.policy[s])
next_s, reward, done, _ = self.env.step(action)
Episode.append((s, action, reward))
s = next_s # 迭代
return Episode
def calculate(self, Episode, t, G):
s, a, r = Episode[t]
G = self.gamma * G + r
self.Value[s][a] += G # 值累加
self.Count[s][a] += 1 # 数量加 1
return G, s
# 策略评估
def policy_evaluation(self, episodes):
for _ in tqdm.trange(episodes): # 多幕循环
self.n_iteration += 1
self.n_episode += 1
# 重置环境,开始新的一幕采样
s = self.env.reset()
Episode = [] # 一幕内的(状态,奖励)序列
done = False
while (done is False): # 幕内循环
int_s = (s[0], s[1], int(s[2]))
action = np.random.choice(self.nA, p=self.policy[int_s])
next_s, reward, done, _ = self.env.step(action)
Episode.append((int_s, action, reward))
s = next_s
Episode = self.sampling()
# 从后向前遍历计算 G 值
G = 0
for t in range(len(Episode)-1, -1, -1):
s, a, r = Episode[t]
G = self.gamma * G + r
self.Value[s][a] += G # 值累加
self.Count[s][a] += 1 # 数量加 1
self.Count[s][a] += 1 # 数量加 1
# 多幕循环结束,计算 Q 值
self.Count[self.Count==0] = 1 # 把分母为0的填成1主要是针对终止状态Count为0
Q = self.Value / self.Count # 求均值
return Q
# 策略迭代
def policy_iteration(self, episodes):
for _ in tqdm.trange(episodes): # 多幕循环
self.n_episode += 1
# 重置环境,开始新的一幕采样
Episode = self.sampling()
# 从后向前遍历计算 G 值
G = 0
for t in range(len(Episode)-1, -1, -1):
G, s = self.calculate(Episode, t, G)
self.policy_improvement(s)
# 多幕循环结束,计算 Q 值
self.Count[self.Count==0] = 1 # 把分母为0的填成1主要是针对终止状态Count为0
Q = self.Value / self.Count # 求均值
return Q
# 策略改进(算法重写此函数)
def policy_improvement(self, Q):
def policy_improvement(self, s):
# change your policy here
pass
# 策略迭代
def policy_iteration(self):
while True:
print("策略评估")
Q = self.policy_evaluation(self.rough_episodes)
print("策略改进")
old_policy = self.policy.copy()
self.policy_improvement(Q)
if self.check_policy_diff(old_policy, self.policy):
print("新旧策略相等")
break
print("精准的策略评估")
Q = self.policy_evaluation(self.final_episodes)
return Q, self.policy
def check_policy_diff(self, old_policy, new_policy):
if (self.check_method == 0):
return (old_policy == new_policy).all()

Просмотреть файл

@ -1,53 +0,0 @@
import numpy as np
import gym
import Algorithm.Algo_MC_Policy_Iteration as algoMC
import Algorithm.Algo_OptimalValueFunction as algoDP
import common.DrawQpi as drawQ
import common.CommonHelper as helper
import tqdm
import matplotlib.pyplot as plt
import matplotlib as mpl
import math
mpl.rcParams['font.sans-serif'] = ['SimHei']
mpl.rcParams['axes.unicode_minus'] = False
class MC_Greedy(algoMC.Policy_Iteration):
def policy_improvement(self, Q):
for s in range(self.nS):
arg = np.argmax(Q[s])
self.policy[s] = 0
self.policy[s, arg] = 1
return self.policy
def get_groud_truth(env, gamma):
iteration = 100
_, Q = algoDP.calculate_VQ_star(env, gamma, iteration)
return Q
if __name__=="__main__":
gamma = 0.9
episodes = 1000
final = 2000
np.set_printoptions(suppress=True)
env = gym.make("FrozenLake-v1", desc=None, map_name = "8x8", is_slippery=False)
Q_real = get_groud_truth(env, gamma)
print(np.round(Q_real,3))
drawQ.draw(Q_real,(8,8))
policy = helper.create_policy(env, (0.25,0.25,0.25,0.25))
env.reset(seed=5)
algo = MC_Greedy(env, policy, gamma, episodes, final)
Q, policy = algo.policy_iteration()
env.close()
print("------ 最优动作价值函数 -----")
print(np.round(Q,3))
drawQ.draw(policy,(8,8))
print(policy)

Просмотреть файл

@ -1,70 +0,0 @@
from cProfile import label
from email import policy
import numpy as np
import gym
import Algorithm.Base_MC_Policy_Iteration as algoMC
import Algorithm.Algo_OptimalValueFunction as algoDP
import common.DrawQpi as drawQ
import common.CommonHelper as helper
import tqdm
import matplotlib.pyplot as plt
import matplotlib as mpl
import math
mpl.rcParams['font.sans-serif'] = ['SimHei']
mpl.rcParams['axes.unicode_minus'] = False
class MC_E_Greedy(algoMC.Policy_Iteration):
def __init__(self, env, policy, gamma:float, episodes:int, final:int, epsilon:float):
super().__init__(env, policy, gamma, episodes, final)
self.epsilon = epsilon
self.other_p = self.epsilon / self.nA
self.best_p = 1 - self.epsilon + self.epsilon / self.nA
def policy_improvement(self, Q):
print(np.sum(Q))
for s in range(self.nS):
if s in end_states:
self.policy[s] = 0
else:
max_A = np.max(Q[s])
argmax_A = np.where(Q[s] == max_A)[0]
A = np.random.choice(argmax_A)
self.policy[s] = self.other_p
self.policy[s,A] = self.best_p
return self.policy
def get_groud_truth(env, gamma):
iteration = 100
_, Q = algoDP.calculate_VQ_star(env, gamma, iteration)
return Q
if __name__=="__main__":
gamma = 0.9
episodes = 10
final = 2000
epsilon = 0.05
np.set_printoptions(suppress=True)
env = gym.make("FrozenLake-v1", desc=None, map_name = "4x4", is_slippery=False)
end_states = [5, 7, 11, 12, 15]
Q_real = get_groud_truth(env, gamma)
print(np.round(Q_real,3))
drawQ.draw(Q_real,(4,4))
policy = helper.create_policy(env.observation_space.n, env.action_space.n, (0.25,0.25,0.25,0.25))
env.reset(seed=5)
algo = MC_E_Greedy(env, policy, gamma, episodes, final, epsilon)
Q, policy = algo.policy_iteration()
env.close()
print("------ 最优动作价值函数 -----")
print(np.round(Q,3))
drawQ.draw(Q,(4,4))
print(policy)

Просмотреть файл

@ -1,67 +0,0 @@
from cProfile import label
from email import policy
import numpy as np
import gym
import Algorithm.Algo_MC_Policy_Iteration as algoMC
import Algorithm.Algo_OptimalValueFunction as algoDP
import common.DrawQpi as drawQ
import common.CommonHelper as helper
import tqdm
import matplotlib.pyplot as plt
import matplotlib as mpl
import math
mpl.rcParams['font.sans-serif'] = ['SimHei']
mpl.rcParams['axes.unicode_minus'] = False
class MC_GLIE(algoMC.Policy_Iteration):
def policy_improvement(self, Q):
print(np.sum(Q))
epsilon = 1 / (math.log(self.n_iteration+1)+1)
other_p = epsilon / self.nA
best_p = 1 - epsilon + epsilon/self.nA
for s in range(self.nS):
if s in end_states:
self.policy[s] = 0
else:
max_A = np.max(Q[s])
argmax_A = np.where(Q[s] == max_A)[0]
A = np.random.choice(argmax_A)
self.policy[s] = other_p
self.policy[s,A] = best_p
return self.policy
def get_groud_truth(env, gamma):
iteration = 100
_, Q = algoDP.calculate_VQ_star(env, gamma, iteration)
return Q
if __name__=="__main__":
gamma = 0.9
episodes = 10
final = 2000
np.set_printoptions(suppress=True)
env = gym.make("FrozenLake-v1", desc=None, map_name = "4x4", is_slippery=False)
end_states = [5, 7, 11, 12, 15]
Q_real = get_groud_truth(env, gamma)
print(np.round(Q_real,3))
drawQ.draw(Q_real,(4,4))
policy = helper.create_policy(env, (0.25,0.25,0.25,0.25))
env.reset(seed=5)
algo = MC_GLIE(env, policy, gamma, episodes, final, check_policy_method=1)
Q, policy = algo.policy_iteration()
env.close()
print("------ 最优动作价值函数 -----")
print(np.round(Q,3))
drawQ.draw(Q,(4,4))
print(policy)

Просмотреть файл

@ -1,109 +0,0 @@
from cProfile import label
from email import policy
import numpy as np
import gym
import Algorithm.Algo_MC_Policy_Iteration as algoMC
import Algorithm.Algo_OptimalValueFunction as algoDP
import common.DrawQpi as drawQ
import common.CommonHelper as helper
import tqdm
import matplotlib.pyplot as plt
import matplotlib as mpl
import math
mpl.rcParams['font.sans-serif'] = ['SimHei']
mpl.rcParams['axes.unicode_minus'] = False
class MC_Greedy(algoMC.Policy_Iteration):
def policy_improvement(self, Q):
for s in range(self.nS):
arg = np.argmax(Q[s])
self.policy[s] = 0
self.policy[s, arg] = 1
return self.policy
class MC_E_Greedy(algoMC.Policy_Iteration):
def __init__(self, env, policy, episodes, gamma, epsilon):
super().__init__(env, policy, episodes, gamma)
self.epsilon = epsilon
def initialize(self):
super().initialize()
self.other_p = self.epsilon / self.nA
self.best_p = 1 - self.epsilon + self.epsilon / self.nA
def policy_improvement(self, Q):
for s in range(self.nS):
max_A = np.max(Q[s])
if max_A == 0:
self.policy[s] = 0
else:
argmax_A = np.where(Q[s] == max_A)[0]
A = np.random.choice(argmax_A)
self.policy[s] = self.other_p
self.policy[s,A] = self.best_p
return self.policy
class MC_GLIE(algoMC.Policy_Iteration):
def __init__(self, env, policy, episodes, gamma, epsilon):
super().__init__(env, policy, episodes, gamma)
self.epsilon = epsilon
def initialize(self):
super().initialize()
epsilon = 1
self.other_p = epsilon / self.nA
self.best_p = 1 - epsilon + epsilon/self.nA
def policy_improvement(self, Q):
epsilon = 1 / (math.log(k+1)+1)
other_p = epsilon / self.nA
best_p = 1 - epsilon + epsilon/self.nA
# 更新策略
for s in range(self.nS):
max_A = np.max(Q[s])
if max_A == 0:
self.policy[s] = 0
else:
argmax_A = np.where(Q[s] == max_A)[0]
A = np.random.choice(argmax_A)
self.policy[s] = self.other_p
self.policy[s,A] = self.best_p
return self.policy
def get_groud_truth(env, gamma):
iteration = 100
_, Q = algoDP.calculate_VQ_star(env, gamma, iteration)
return Q
if __name__=="__main__":
gamma = 0.9
episodes = 1000
np.set_printoptions(suppress=True)
env = gym.make("FrozenLake-v1", desc=None, map_name = "4x4", is_slippery=False)
Q_real = get_groud_truth(env, gamma)
print(np.round(Q_real,3))
drawQ.draw(Q_real,(4,4))
policy = helper.create_policy(env, (0.25,0.25,0.25,0.25))
env.reset(seed=5)
algo = MC_Greedy(env, policy, episodes, gamma)
#algo = MC_E_Greedy(env, policy, episodes, gamma, 0.1)
Q, policy = algo.policy_iteration()
env.close()
print("------ 最优动作价值函数 -----")
print(np.round(Q,3))
drawQ.draw(Q,(4,4))
print(policy)

Просмотреть файл

@ -4,6 +4,7 @@ import numpy as np
env = gym.make('Blackjack-v1', sab=True)
print(env.action_space)
print(env.observation_space)
print(env.spec)
for i in range(100):
s = env.reset()
Episode = []

Просмотреть файл

@ -1,71 +0,0 @@
import numpy as np
import gym
import Algorithm.Algo_OptimalValueFunction as algoDP
import Algorithm.Base_MC_Policy_Iteration as base
import common.DrawQpi as drawQ
import common.CommonHelper as helper
import matplotlib.pyplot as plt
import matplotlib as mpl
mpl.rcParams['font.sans-serif'] = ['SimHei']
mpl.rcParams['axes.unicode_minus'] = False
class MC_E_Greedy(base.Policy_Iteration):
def __init__(self, env, policy, gamma:float, episodes:int, final:int, epsilon:float):
super().__init__(env, policy, gamma, episodes, final)
self.epsilon = epsilon
self.other_p = self.epsilon / self.nA
self.best_p = 1 - self.epsilon + self.epsilon / self.nA
def policy_improvement(self, Q):
print(np.sum(Q))
for s in range(self.nS):
if s in end_states:
self.policy[s] = 0
else:
max_A = np.max(Q[s])
argmax_A = np.where(Q[s] == max_A)[0]
A = np.random.choice(argmax_A)
self.policy[s] = self.other_p
self.policy[s,A] = self.best_p
return self.policy
def get_groud_truth(env, gamma):
iteration = 100
_, Q = algoDP.calculate_VQ_star(env, gamma, iteration)
return Q
if __name__=="__main__":
gamma = 0.9
rough = 300
final = 10000
epsilon = 0.05
np.set_printoptions(suppress=True)
#desc = ["SFHF","FFFF","HFHF","FFFF","FFFG"]
env = gym.make("FrozenLake-v1", desc=None, map_name = "4x4", is_slippery=True)
end_states = [5, 7, 11, 12, 15]
helper.print_seperator_line(helper.SeperatorLines.long, "动态规划法")
Q_real = get_groud_truth(env, gamma)
helper.print_seperator_line(helper.SeperatorLines.short, "Q 函数")
print(np.round(Q_real,3))
drawQ.draw(Q_real,(4,4))
policy = helper.create_policy(env.observation_space.n, env.action_space.n, (0.25,0.25,0.25,0.25))
env.reset(seed=5)
algo = MC_E_Greedy(env, policy, gamma, rough, final, epsilon)
Q, policy = algo.policy_iteration()
env.close()
helper.print_seperator_line(helper.SeperatorLines.long, "蒙特卡洛法 - E-Greedy")
helper.print_seperator_line(helper.SeperatorLines.short, "Q 函数")
print(np.round(Q,3))
drawQ.draw(Q,(4,4))
print(policy)

Просмотреть файл

@ -1,45 +0,0 @@
from email import policy
import numpy as np
import gym
import Algorithm.Algo_OptimalValueFunction as algoDP
import Algorithm.Base_MC_Policy_Iteration as base
import common.DrawQpi as drawQ
import common.CommonHelper as helper
import matplotlib.pyplot as plt
import matplotlib as mpl
mpl.rcParams['font.sans-serif'] = ['SimHei']
mpl.rcParams['axes.unicode_minus'] = False
class MC_Greedy(base.Policy_Iteration):
def policy_improvement(self, Q):
for i in range(Q.shape[0]):
for j in range(Q.shape[1]):
for k in range(Q.shape[2]):
arg = np.argmax(Q[i,j,k])
self.policy[i,j,k] = 0
self.policy[i,j,k,arg] = 1
return self.policy
def get_groud_truth(env, gamma):
iteration = 100
_, Q = algoDP.calculate_VQ_star(env, gamma, iteration)
return Q
if __name__=="__main__":
gamma = 1
rough = 1000
final = 20000
np.set_printoptions(suppress=True)
env = gym.make('Blackjack-v1', sab=True)
env.reset(seed=5)
algo = MC_Greedy(env, (0.5,0.5), gamma, rough, final)
Q, policy = algo.policy_iteration()
env.close()
print(policy)

Просмотреть файл

@ -0,0 +1,163 @@
import numpy as np
import common.GridWorld_Model as model
import Algorithm.Base_MC_Policy_Iteration as base
import Algorithm.Algo_OptimalValueFunction as algo
import common.CommonHelper as helper
import common.DrawQpi as drawQ
# 状态空间 = 空间宽度 x 空间高度
GridWidth, GridHeight = 4, 4
# 起点,可以多个
StartStates = list(range(15))
# 终点,可以多个
EndStates = [15]
# 动作空间
LEFT, DOWN, RIGHT, UP = 0, 1, 2, 3
Actions = [LEFT, DOWN, RIGHT, UP]
# 初始策略
Policy = [0.25, 0.25, 0.25, 0.25]
# 转移概率: [SlipLeft, MoveFront, SlipRight, SlipBack]
Transition = [0.1, 0.8, 0.1, 0.0]
# 每走一步的奖励值可以是0或者-1
StepReward = 0
# 特殊奖励 from s->s' then get r, 其中 s,s' 为状态序号,不是坐标位置
SpecialReward = {
(0,0):-1, # s0 -> s0 得到-1奖励
(1,1):-1,
(2,2):-1,
(3,3):-1,
(4,4):-1,
(7,7):-1,
(8,8):-1,
(11,11):-1,
(12,12):-1,
(13,13):-1,
(14,14):-1,
(11,15):1,
(14,15):1
}
# 特殊移动,用于处理类似虫洞场景
SpecialMove = {
}
# 墙
Blocks = []
class MC_Greedy(base.Policy_Iteration):
def policy_improvement(self, s):
# 做策略改进,贪心算法
self.Q[s] = self.Value[s] / self.Count[s] # 得到该状态下所有动作的 q 值
self.policy[s] = 0 # 先设置该状态所有策略为 0后面再把有效的动作设置为非 0
argmax = np.argwhere(self.Q[s]==np.max(self.Q[s])) # 取最大值所在的位置(可能不止一个)
p = 1 / argmax.shape[0] # 概率平均分配给几个最大值
for arg in argmax: # 每个最大值位置都设置为相同的概率
self.policy[s][arg[0]] = p
class MC_ES(base.Policy_Iteration):
def __init__(self, env, init_policy, gamma, exploration):
super().__init__(env, init_policy, gamma)
self.exploration = exploration
def policy_improvement(self, s):
if np.min(self.Count[s]) <= exploration:
return # 如果次数不够,则不做策略改进
# 做策略改进,贪心算法
self.Q[s] = self.Value[s] / self.Count[s] # 得到该状态下所有动作的 q 值
self.policy[s] = 0 # 先设置该状态所有策略为 0后面再把有效的动作设置为非 0
argmax = np.argwhere(self.Q[s]==np.max(self.Q[s])) # 取最大值所在的位置(可能不止一个)
p = 1 / argmax.shape[0] # 概率平均分配给几个最大值
for arg in argmax: # 每个最大值位置都设置为相同的概率
self.policy[s][arg[0]] = p
if __name__=="__main__":
np.random.seed(5)
env = model.GridWorld(
GridWidth, GridHeight, StartStates, EndStates, # 关于状态的参数
Actions, Policy, Transition, # 关于动作的参数
StepReward, SpecialReward, # 关于奖励的参数
SpecialMove, Blocks) # 关于移动的限制
#model.print_P(env.P_S_R)
gamma = 0.9
max_iteration = 500
helper.print_seperator_line(helper.SeperatorLines.long, "精确解")
V_real,Q_real = algo.calculate_VQ_star(env, gamma, max_iteration)
helper.print_seperator_line(helper.SeperatorLines.short, "V 函数值")
print(np.round(V_real,1).reshape(4,4))
helper.print_seperator_line(helper.SeperatorLines.short, "Q 函数值")
print(np.around(Q_real, 1))
helper.print_seperator_line(helper.SeperatorLines.short, "策略")
drawQ.draw(Q_real, (4,4))
####################################
helper.print_seperator_line(helper.SeperatorLines.long, "试验探索次数")
explorations = [20,40,60,80]
for exploration in explorations:
helper.print_seperator_line(helper.SeperatorLines.middle, "探索次数="+str(exploration))
init_policy = helper.create_policy(env.nS, env.nA, (0.25, 0.25, 0.25, 0.25))
es = MC_ES(env, init_policy, gamma, exploration)
Q = es.policy_iteration(max_iteration)
V = helper.calculat_V_from_Q(Q, es.policy)
#helper.print_seperator_line(helper.SeperatorLines.short, "V 函数")
#print(np.round(V,1).reshape(4,4))
#helper.print_seperator_line(helper.SeperatorLines.short, "Q 函数")
#print(np.around(Q, 1))
#helper.print_seperator_line(helper.SeperatorLines.short, "策略")
#drawQ.draw(Q, (4,4))
#helper.print_seperator_line(helper.SeperatorLines.short, "策略概率值")
#print(policy)
Qs = []
for i in range(10):
es2 = MC_ES(env, es.policy, gamma, exploration)
q = es2.policy_evaluation(max_iteration)
Qs.append(q)
Q = np.array(Qs).mean(axis=0)
V = helper.calculat_V_from_Q(Q, init_policy)
helper.print_seperator_line(helper.SeperatorLines.short, "V 函数")
print(np.round(V,1).reshape(4,4))
helper.print_seperator_line(helper.SeperatorLines.short, "Q 函数")
print(np.around(Q, 1))
helper.print_seperator_line(helper.SeperatorLines.short, "策略")
drawQ.draw(Q, (4,4))
#helper.print_seperator_line(helper.SeperatorLines.short, "RMSE误差")
#print("状态价值函数误差=", helper.RMSE(V_real, V))
#print("动作价值函数误差=", helper.RMSE(Q_real, Q))
####################################################
"""
helper.print_seperator_line(helper.SeperatorLines.long, "试验采样次数与误差的关系")
exploration = 100
iterations = [200,500,1000,2000]
for max_iteration in iterations:
policy = helper.create_policy(env.nS, env.nA, (0.25, 0.25, 0.25, 0.25))
helper.print_seperator_line(helper.SeperatorLines.middle, "采样次数="+str(max_iteration))
Q = MC_EveryVisit_Q_Policy_test(env, max_iteration, gamma, policy, exploration)
V = helper.calculat_V_from_Q(Q, policy)
Qs = []
for i in range(10):
q = MC_EveryVisit_Q_Policy_test(env, max_iteration, gamma, policy, max_iteration)
Qs.append(q)
Q = np.array(Qs).mean(axis=0)
V = helper.calculat_V_from_Q(Q, policy)
helper.print_seperator_line(helper.SeperatorLines.short, "V 函数")
print(np.round(V,1).reshape(4,4))
helper.print_seperator_line(helper.SeperatorLines.short, "Q 函数")
print(np.around(Q, 1))
helper.print_seperator_line(helper.SeperatorLines.short, "策略")
drawQ.draw(Q, (4,4))
helper.print_seperator_line(helper.SeperatorLines.short, "RMSE误差")
print("状态价值函数误差=", helper.RMSE(V_real, V))
print("动作价值函数误差=", helper.RMSE(Q_real, Q))
"""

Просмотреть файл

@ -0,0 +1,46 @@
from cProfile import label
import matplotlib.pyplot as plt
import matplotlib as mpl
import math
mpl.rcParams['font.sans-serif'] = ['SimHei']
mpl.rcParams['axes.unicode_minus'] = False
def standard_GLIE(ax):
A = []
B = []
for i in range(1000):
epsilon = 1/(i+1)
greedy = 1 - epsilon + epsilon / 4
explor = 1 - greedy
A.append(greedy)
B.append(explor)
ax.plot(A, label="贪心")
ax.plot(B, label="探索")
ax.grid()
ax.legend()
ax.set_title(u"传统的GLIE")
def improved_GLIE(ax):
A = []
B = []
for i in range(1000):
epsilon = 1 / (math.log(i+1,10)+1)
greedy = 1 - epsilon + epsilon / 4
explor = 1 - greedy
A.append(greedy)
B.append(explor)
ax.plot(A, label="贪心")
ax.plot(B, label="探索")
ax.grid()
ax.legend()
ax.set_title(u"改进的GLIE")
if __name__=="__main__":
fig = plt.figure(figsize=(8,4))
ax1 = fig.add_subplot(121)
standard_GLIE(ax1)
ax2 = fig.add_subplot(122)
improved_GLIE(ax2)
plt.show()

Просмотреть файл

@ -0,0 +1,85 @@
import numpy as np
import common.GridWorld_Model as model
import Algorithm.Algo_OptimalValueFunction as algo
import Algorithm.Base_MC_Policy_Iteration as base
import common.CommonHelper as helper
import common.DrawQpi as drawQ
import tqdm, math
# 状态空间 = 空间宽度 x 空间高度
GridWidth, GridHeight = 4, 4
# 起点,可以多个
StartStates = list(range(15))
# 终点,可以多个
EndStates = [15]
# 动作空间
LEFT, DOWN, RIGHT, UP = 0, 1, 2, 3
Actions = [LEFT, DOWN, RIGHT, UP]
# 初始策略
Policy = [0.25, 0.25, 0.25, 0.25]
# 转移概率: [SlipLeft, MoveFront, SlipRight, SlipBack]
Transition = [0.0, 1.0, 0.0, 0.0]
# 每走一步的奖励值可以是0或者-1
StepReward = 0
# 特殊奖励 from s->s' then get r, 其中 s,s' 为状态序号,不是坐标位置
SpecialReward = {
(0,0):-1, # s0 -> s0 得到-1奖励
(1,1):-1,
(2,2):-1,
(3,3):-1,
(4,4):-1,
(7,7):-1,
(8,8):-1,
(11,11):-1,
(12,12):-1,
(13,13):-1,
(14,14):-1,
(11,15):1,
(14,15):1
}
# 特殊移动,用于处理类似虫洞场景
SpecialMove = {
}
# 墙
Blocks = []
class MC_GLIE_Improved(base.Policy_Iteration):
def policy_improvement(self, s):
if np.min(self.Count[s]) == 0: # 避免被除数为 0
return
epsilon = 1 / (math.log(self.n_episode, 10)+1)
other_p = epsilon / self.nA
best_p = 1 - epsilon + epsilon/self.nA
self.Q[s] = self.Value[s] / self.Count[s] # 得到该状态下所有动作的 q 值
self.policy[s] = other_p # 先设置该状态所有策略为 epsilong/nA
argmax = np.argmax(self.Q[s])
self.policy[s, argmax] = best_p
return
if __name__=="__main__":
env = model.GridWorld(
GridWidth, GridHeight, StartStates, EndStates, # 关于状态的参数
Actions, Policy, Transition, # 关于动作的参数
StepReward, SpecialReward, # 关于奖励的参数
SpecialMove, Blocks) # 关于移动的限制
gamma = 0.9
episodes = 2000
Qs = []
for i in range(4):
policy = helper.create_policy(env.nS, env.nA, (0.25, 0.25, 0.25, 0.25))
mc = MC_GLIE_Improved(env, policy, gamma)
Q = mc.policy_iteration(episodes)
V = helper.calculat_V_from_Q(Q, policy)
helper.print_seperator_line(helper.SeperatorLines.short, "V 函数")
print(np.round(V,1).reshape(4,4))
helper.print_seperator_line(helper.SeperatorLines.short, "Q 函数")
print(np.around(Q, 1))
helper.print_seperator_line(helper.SeperatorLines.short, "策略")
drawQ.draw(Q, (4,4))
print(policy)

Просмотреть файл

@ -0,0 +1,84 @@
import numpy as np
import common.GridWorld_Model as model
import Algorithm.Algo_OptimalValueFunction as algo
import Algorithm.Base_MC_Policy_Iteration as base
import common.CommonHelper as helper
import common.DrawQpi as drawQ
import tqdm, math
# 状态空间 = 空间宽度 x 空间高度
GridWidth, GridHeight = 4, 4
# 起点,可以多个
StartStates = list(range(15))
# 终点,可以多个
EndStates = [15]
# 动作空间
LEFT, DOWN, RIGHT, UP = 0, 1, 2, 3
Actions = [LEFT, DOWN, RIGHT, UP]
# 初始策略
Policy = [0.25, 0.25, 0.25, 0.25]
# 转移概率: [SlipLeft, MoveFront, SlipRight, SlipBack]
Transition = [0.0, 1.0, 0.0, 0.0]
# 每走一步的奖励值可以是0或者-1
StepReward = 0
# 特殊奖励 from s->s' then get r, 其中 s,s' 为状态序号,不是坐标位置
SpecialReward = {
(0,0):-1, # s0 -> s0 得到-1奖励
(1,1):-1,
(2,2):-1,
(3,3):-1,
(4,4):-1,
(7,7):-1,
(8,8):-1,
(11,11):-1,
(12,12):-1,
(13,13):-1,
(14,14):-1,
(11,15):1,
(14,15):1
}
# 特殊移动,用于处理类似虫洞场景
SpecialMove = {
}
# 墙
Blocks = []
class MC_GLIE_Standard(base.Policy_Iteration):
def policy_improvement(self, s):
if np.min(self.Count[s]) == 0: # 避免被除数为 0
return
epsilon = 1 / self.n_episode
other_p = epsilon / self.nA
best_p = 1 - epsilon + epsilon/self.nA
self.Q[s] = self.Value[s] / self.Count[s] # 得到该状态下所有动作的 q 值
self.policy[s] = other_p # 先设置该状态所有策略为 epsilong/nA
argmax = np.argmax(self.Q[s])
self.policy[s, argmax] = best_p
return self.policy
if __name__=="__main__":
env = model.GridWorld(
GridWidth, GridHeight, StartStates, EndStates, # 关于状态的参数
Actions, Policy, Transition, # 关于动作的参数
StepReward, SpecialReward, # 关于奖励的参数
SpecialMove, Blocks) # 关于移动的限制
gamma = 0.9
episodes = 2000
Qs = []
for i in range(4):
policy = helper.create_policy(env.nS, env.nA, (0.25, 0.25, 0.25, 0.25))
mc = MC_GLIE_Standard(env, policy, gamma)
Q = mc.policy_iteration(episodes)
V = helper.calculat_V_from_Q(Q, policy)
helper.print_seperator_line(helper.SeperatorLines.short, "V 函数")
print(np.round(V,1).reshape(4,4))
helper.print_seperator_line(helper.SeperatorLines.short, "Q 函数")
print(np.around(Q, 1))
helper.print_seperator_line(helper.SeperatorLines.short, "策略")
drawQ.draw(Q, (4,4))
print(policy)

Просмотреть файл

@ -0,0 +1,95 @@
import numpy as np
import common.GridWorld_Model as model
import Algorithm.Algo_OptimalValueFunction as algo
import Algorithm.Base_MC_Policy_Iteration as base
import common.CommonHelper as helper
import common.DrawQpi as drawQ
import tqdm
# 状态空间 = 空间宽度 x 空间高度
GridWidth, GridHeight = 4, 4
# 起点,可以多个
StartStates = list(range(15))
# 终点,可以多个
EndStates = [15]
# 动作空间
LEFT, DOWN, RIGHT, UP = 0, 1, 2, 3
Actions = [LEFT, DOWN, RIGHT, UP]
# 初始策略
Policy = [0.25, 0.25, 0.25, 0.25]
# 转移概率: [SlipLeft, MoveFront, SlipRight, SlipBack]
Transition = [0.0, 1.0, 0.0, 0.0]
# 每走一步的奖励值可以是0或者-1
StepReward = 0
# 特殊奖励 from s->s' then get r, 其中 s,s' 为状态序号,不是坐标位置
SpecialReward = {
(0,0):-1, # s0 -> s0 得到-1奖励
(1,1):-1,
(2,2):-1,
(3,3):-1,
(4,4):-1,
(7,7):-1,
(8,8):-1,
(11,11):-1,
(12,12):-1,
(13,13):-1,
(14,14):-1,
(11,15):1,
(14,15):1
}
# 特殊移动,用于处理类似虫洞场景
SpecialMove = {
}
# 墙
Blocks = []
class MC_SoftGreedy(base.Policy_Iteration):
def __init__(self, env, init_policy, gamma, epsilon):
super().__init__(env, init_policy, gamma)
self.epsilon = epsilon
self.best_p = 1 - epsilon + epsilon / self.nA
self.other_p = epsilon / self.nA
def policy_improvement(self, s):
# 做策略改进,贪心算法
if np.min(self.Count[s]) == 0: # 避免被除数为 0
return
self.Q[s] = self.Value[s] / self.Count[s] # 得到该状态下所有动作的 q 值
self.policy[s] = self.other_p # 先设置该状态所有策略为 epsilong/nA
argmax = np.argmax(self.Q[s])
self.policy[s, argmax] = self.best_p
if __name__=="__main__":
np.random.seed(15)
env = model.GridWorld(
GridWidth, GridHeight, StartStates, EndStates, # 关于状态的参数
Actions, Policy, Transition, # 关于动作的参数
StepReward, SpecialReward, # 关于奖励的参数
SpecialMove, Blocks) # 关于移动的限制
#model.print_P(env.P_S_R)
gamma = 0.9
episodes = 1000
helper.print_seperator_line(helper.SeperatorLines.long, "试验探索次数")
epsilons = [0.1, 0.3, 0.5, 0.7]
for epsilon in epsilons:
helper.print_seperator_line(helper.SeperatorLines.middle, "epsilon="+str(epsilon))
policy = helper.create_policy(env.nS, env.nA, (0.25, 0.25, 0.25, 0.25))
mc = MC_SoftGreedy(env, policy, gamma, epsilon)
Q = mc.policy_iteration(episodes)
V = helper.calculat_V_from_Q(Q, policy)
helper.print_seperator_line(helper.SeperatorLines.short, "V 函数")
print(np.round(V,1).reshape(4,4))
helper.print_seperator_line(helper.SeperatorLines.short, "Q 函数")
print(np.around(Q, 1))
helper.print_seperator_line(helper.SeperatorLines.short, "策略")
drawQ.draw(Q, (4,4))
print(policy)

Просмотреть файл

@ -63,7 +63,7 @@ def create_policy(nS, nA, args):
sum = 0
for i in range(nA):
sum += args[i]
policy[:][i] = args[i]
policy[:,i] = args[i]
assert(sum == 1)
return policy

Просмотреть файл

@ -104,9 +104,10 @@ class GridWorld(object):
return transitions[0][1], transitions[0][2], transitions[0][3], transitions[0][0]
else:
self.p = [transitions[i][0] for i in range(num)]
item = np.random.choice(self.nA, p=self.p)
self.curr_state = item[0][1]
return item[1], item[2], item[3], item[0]
idx = np.random.choice(num, p=self.p)
item = transitions[idx]
self.curr_state = item[1] # s
return item[1], item[2], item[3], item[0] # s, r, done, p
action_names = ['LEFT', 'UP', 'RIGHT', 'DOWN']

Просмотреть файл

@ -2,3 +2,7 @@
### 思考与练习
1. 使用本章中学习到的贝尔曼方程来解决第 6 章中的安全驾驶问题的状态价值函数计算。
### 参考资料

Просмотреть файл

@ -0,0 +1,2 @@
### 思考与练习