AI强化学习-策略迭代实战


以下代码演示策略迭代强化算法

前提:

  1. python语言 
  2. OpenAI gym库

主要演示AI自动寻路的算法。如图:

图中格子从做到右,从上到下依次编号,1~8.暗黄色的圆球,初始随机出现在1~5位置,在格子上移动。移动到黑色点失败,移动到黄色点胜利。

首先,写一个gym环境:grid_map.py,代码如下

import logging
import numpy
import random
from gym import spaces
import gym

logger = logging.getLogger(__name__)

class GridEnv(gym.Env):
	metadata = {
		'render.modes': ['human', 'rgb_array'],
		'video.frames_per_second': 2
	}

	def __init__(self):
		self.states = [1,2,3,4,5,6,7,8] #状态空间
		self.x = [140,220,300,380,460,140,300,460]
		self.y = [250,250,250,250,250,150,150,150]
		self.terminate_states = dict()  #终止状态为字典格式
		self.terminate_states[6] = 1
		self.terminate_states[7] = 1
		self.terminate_states[8] = 1

		self.actions = ['n','e','s','w']

		self.rewards = dict()        #回报的数据结构为字典
		self.rewards['1_s'] = -1.0
		self.rewards['3_s'] = 1.0
		self.rewards['5_s'] = -1.0

		self.t = dict()             #状态转移的数据格式为字典
		self.t['1_s'] = 6
		self.t['1_e'] = 2
		self.t['2_w'] = 1
		self.t['2_e'] = 3
		self.t['3_s'] = 7
		self.t['3_w'] = 2
		self.t['3_e'] = 4
		self.t['4_w'] = 3
		self.t['4_e'] = 5
		self.t['5_s'] = 8
		self.t['5_w'] = 4

		self.gamma = 0.8         #折扣因子
		self.viewer = None
		self.state = None
		
	def getTerminal(self):
		return self.terminate_states

	def transform(self,state,action):
		key = "%d_%s" % (state, action)   #将状态和动作组成字典的键值
		r = 0
		s = -1
		if key in self.rewards:
			r = self.rewards[key]
		if key in self.t:
			s = self.t[key]
		return self.t,s,r

	def getGamma(self):
		return self.gamma

	def getStates(self):
		return self.states

	def getAction(self):
		return self.actions
	def getTerminate_states(self):
		return self.terminate_states
	def setAction(self,s):
		self.state = s

	def seed(self, seed=None):
		self.np_random, seed = seeding.np_random(seed)
		return [seed]

	def step(self, action):
		#系统当前状态
		state = self.state
		if state in self.terminate_states:
			return state, 0, True, {}
		key = "%d_%s" % (state, action)   #将状态和动作组成字典的键值

		#状态转移
		if key in self.t:
			next_state = self.t[key]
		else:
			next_state = state
		self.state = next_state

		is_terminal = False

		if next_state in self.terminate_states:
			is_terminal = True

		if key not in self.rewards:
			r = 0.0
		else:
			r = self.rewards[key]


		return next_state, r,is_terminal,{}
	def reset(self):
		self.state = self.states[int(random.random() * 5)]
		return self.state
	def render(self, mode='human', close=False):
		if close:
			if self.viewer is not None:
				self.viewer.close()
				self.viewer = None
			return
		screen_width = 600
		screen_height = 400

		if self.viewer is None:
			from gym.envs.classic_control import rendering
			self.viewer = rendering.Viewer(screen_width, screen_height)
			#创建网格世界
			self.line1 = rendering.Line((100,300),(500,300))
			self.line2 = rendering.Line((100, 200), (500, 200))
			self.line3 = rendering.Line((100, 300), (100, 100))
			self.line4 = rendering.Line((180, 300), (180, 100))
			self.line5 = rendering.Line((260, 300), (260, 100))
			self.line6 = rendering.Line((340, 300), (340, 100))
			self.line7 = rendering.Line((420, 300), (420, 100))
			self.line8 = rendering.Line((500, 300), (500, 100))
			self.line9 = rendering.Line((100, 100), (180, 100))
			self.line10 = rendering.Line((260, 100), (340, 100))
			self.line11 = rendering.Line((420, 100), (500, 100))
			#创建第一个骷髅
			self.kulo1 = rendering.make_circle(40)
			self.circletrans = rendering.Transform(translation=(140,150))
			self.kulo1.add_attr(self.circletrans)
			self.kulo1.set_color(0,0,0)
			#创建第二个骷髅
			self.kulo2 = rendering.make_circle(40)
			self.circletrans = rendering.Transform(translation=(460, 150))
			self.kulo2.add_attr(self.circletrans)
			self.kulo2.set_color(0, 0, 0)
			#创建金条
			self.gold = rendering.make_circle(40)
			self.circletrans = rendering.Transform(translation=(300, 150))
			self.gold.add_attr(self.circletrans)
			self.gold.set_color(1, 0.9, 0)
			#创建机器人
			self.robot = rendering.make_circle(30)
			self.robotrans = rendering.Transform()
			self.robot.add_attr(self.robotrans)
			self.robot.set_color(0.8, 0.6, 0.4)

			self.line1.set_color(0, 0, 0)
			self.line2.set_color(0, 0, 0)
			self.line3.set_color(0, 0, 0)
			self.line4.set_color(0, 0, 0)
			self.line5.set_color(0, 0, 0)
			self.line6.set_color(0, 0, 0)
			self.line7.set_color(0, 0, 0)
			self.line8.set_color(0, 0, 0)
			self.line9.set_color(0, 0, 0)
			self.line10.set_color(0, 0, 0)
			self.line11.set_color(0, 0, 0)

			self.viewer.add_geom(self.line1)
			self.viewer.add_geom(self.line2)
			self.viewer.add_geom(self.line3)
			self.viewer.add_geom(self.line4)
			self.viewer.add_geom(self.line5)
			self.viewer.add_geom(self.line6)
			self.viewer.add_geom(self.line7)
			self.viewer.add_geom(self.line8)
			self.viewer.add_geom(self.line9)
			self.viewer.add_geom(self.line10)
			self.viewer.add_geom(self.line11)
			self.viewer.add_geom(self.kulo1)
			self.viewer.add_geom(self.kulo2)
			self.viewer.add_geom(self.gold)
			self.viewer.add_geom(self.robot)

		if self.state is None: return None
		#self.robotrans.set_translation(self.x[self.state-1],self.y[self.state-1])
		self.robotrans.set_translation(self.x[self.state - 1], self.y[self.state - 1])



		return self.viewer.render(return_rgb_array=mode == 'rgb_array')

安装到gym环境下:

  1. 拷贝grid_map.py至gym目录的env/classic_control目录下。
  2. 修改此目录下的文件__init__.py ,添加一行代码:
from gym.envs.classic_control.grid_map import GridEnv

    3.到上层目录,修改__init__.py,添加代码:

register(
    id='GridWorld-v0',
    entry_point='gym.envs.classic_control:GridEnv',
    max_episode_steps=500,
	reward_threshold = 100.0,
)

    4.完毕

现在可以开始策略迭代的算法了。代码如下

import gym
import random
import time

env = gym.make('GridWorld-v0')
print(env.env)
print(env.env.states)
STEP = 100



gm = env.env


class Learn:
	def __init__(self,grid_mdp):
		self.v = dict()
		for s in grid_mdp.states:
			self.v[s] = 0
		self.pi = dict()
		self.pi[1] = random.choice(['e','s'])
		self.pi[2] = random.choice(['e','w'])
		self.pi[3] = random.choice(['e','s','w'])
		self.pi[4] = random.choice(['e','w'])
		self.pi[5] = random.choice(['w','s'])


	def policy_iterate(self,grid_mdp):
		for i in range(100):
			self.policy_evaluate(grid_mdp)
			self.policy_improve(grid_mdp)


	def policy_evaluate(self,grid_mdp):
		for i in range(1000):
			delta = 0.0
			for state in grid_mdp.states:
				if state in grid_mdp.terminate_states:continue
				action = self.pi[state]
				t,s,r = grid_mdp.transform(state,action)
				if s!= -1:
					new_v = r+grid_mdp.gamma*self.v[s]
					delta += abs(self.v[state]-new_v)
					self.v[state] = new_v
			if(delta<1e-6):
				break
			
	def policy_improve(self,grid_mdp):
		for state in grid_mdp.states:
			if state in grid_mdp.terminate_states:continue
			a1 = self.pi[state]#grid_mdp.actions[0]
			t,s,r = grid_mdp.transform(state,a1)
			if s!=-1:
				v1 = r+grid_mdp.gamma*self.v[s]
				for action in grid_mdp.actions:
					t,s,r = grid_mdp.transform(state,action)
					if s!= -1:
						if v1 < r+grid_mdp.gamma*self.v[s]:
							a1 = action
							v1 = r+grid_mdp.gamma*self.v[s]
				self.pi[state] = a1
	def action(self,state):
		return self.pi[state]


state = env.reset()

learn = Learn(gm)
learn.policy_iterate(gm)

#print(gm.pi)

total_reward = 0

for j in range(STEP):
	env.render()
	action = learn.action(state) # direct action for test
	state,reward,done,_ = env.step(action)
	total_reward += reward
	time.sleep(1)
	if done:
		env.render()
		break