728x90
반응형
1. Dynamic Grid World + PPO (동적 장애물 회피)
단순한 미로 찾기가 아니라, 움직이는 장애물(Patrolling Obstacle)을 피해야 하므로 타이밍을 학습해야 합니다. 로봇 내비게이션 기초와 연결됩니다.
import gym
from gym import spaces
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
import matplotlib.pyplot as plt
import matplotlib.patches as patches
# -----------------------------------
# 1. Custom Environment: Dynamic Grid
# -----------------------------------
class DynamicGridEnv(gym.Env):
def __init__(self):
super(DynamicGridEnv, self).__init__()
self.grid_size = 6
self.max_steps = 30
# Action: 상, 하, 좌, 우, 정지 (5개) - 정지가 있어야 타이밍을 잴 수 있음
self.action_space = spaces.Discrete(5)
# State: Agent(2) + Goal(2) + Obstacle(2) = 6
self.observation_space = spaces.Box(low=0, high=self.grid_size, shape=(6,), dtype=np.float32)
self.obstacles = [{'pos': [2, 0], 'dir': 1}] # (2,0)에서 위아래로 움직이는 장애물
def reset(self):
self.agent_pos = [0, 0]
self.goal_pos = [5, 5]
self.obstacles = [{'pos': [2, 2], 'dir': 1}] # 초기화
self.step_count = 0
return self._get_obs()
def _get_obs(self):
return np.array(self.agent_pos + self.goal_pos + self.obstacles[0]['pos'], dtype=np.float32)
def step(self, action):
self.step_count += 1
# 1. Agent 이동
row, col = self.agent_pos
if action == 0: row = max(0, row - 1)
elif action == 1: row = min(self.grid_size - 1, row + 1)
elif action == 2: col = max(0, col - 1)
elif action == 3: col = min(self.grid_size - 1, col + 1)
# action 4 is Stay
self.agent_pos = [row, col]
# 2. Obstacle 이동 (Patrolling: 위아래 반복)
obs_row, obs_col = self.obstacles[0]['pos']
obs_dir = self.obstacles[0]['dir']
if obs_row == 0: obs_dir = 1
elif obs_row == self.grid_size - 1: obs_dir = -1
obs_row += obs_dir
self.obstacles[0]['pos'] = [obs_row, obs_col]
self.obstacles[0]['dir'] = obs_dir
# 3. 보상 및 종료
reward = -0.1 # Time step cost
done = False
# 충돌 체크
if self.agent_pos == self.obstacles[0]['pos']:
reward = -10.0
done = True
# 목표 도달
elif self.agent_pos == self.goal_pos:
reward = 10.0
done = True
# 시간 초과
elif self.step_count >= self.max_steps:
done = True
return self._get_obs(), reward, done, {}
# ★ Matplotlib 시각화 핵심 코드
def render(self, ax):
ax.clear()
ax.set_xlim(-0.5, self.grid_size - 0.5)
ax.set_ylim(self.grid_size - 0.5, -0.5) # y축 반전 (Grid 좌표계)
ax.grid(True)
# Agent (Red)
ax.add_patch(patches.Rectangle((self.agent_pos[1]-0.4, self.agent_pos[0]-0.4), 0.8, 0.8, color='red', label='Agent'))
# Goal (Green)
ax.add_patch(patches.Rectangle((self.goal_pos[1]-0.4, self.goal_pos[0]-0.4), 0.8, 0.8, color='green', label='Goal'))
# Obstacle (Black)
obs_r, obs_c = self.obstacles[0]['pos']
ax.add_patch(patches.Rectangle((obs_c-0.4, obs_r-0.4), 0.8, 0.8, color='black', label='Obstacle'))
ax.set_title(f"Step: {self.step_count}")
# -----------------------------------
# 2. PPO Agent (Simplified)
# -----------------------------------
class PPO(nn.Module):
def __init__(self):
super(PPO, self).__init__()
self.fc1 = nn.Linear(6, 128)
self.fc_pi = nn.Linear(128, 5) # Action 5
self.fc_v = nn.Linear(128, 1)
self.optimizer = optim.Adam(self.parameters(), lr=0.001)
self.data = []
def pi(self, x, softmax_dim=0):
x = F.relu(self.fc1(x))
prob = F.softmax(self.fc_pi(x), dim=softmax_dim)
return prob
def v(self, x):
x = F.relu(self.fc1(x))
return self.fc_v(x)
def put_data(self, transition):
self.data.append(transition)
def train_net(self):
# (간소화를 위해 상세 PPO 수식 생략, 이전 답변의 로직과 동일하게 구현하면 됨)
# 실제 시험에선 이전 답변의 PPO train_net 복사해서 사용
self.data = [] # Dummy flush
# -----------------------------------
# 3. Execution & Visualization
# -----------------------------------
def main():
env = DynamicGridEnv()
model = PPO()
# 시각화 설정
fig, ax = plt.subplots(figsize=(5,5))
# 학습된 척하고 테스트 (실제로는 model.train_net() 필요)
print("Simulating...")
s = env.reset()
done = False
while not done:
# 랜덤 액션 대신 학습된 모델 사용 가정
prob = model.pi(torch.from_numpy(s).float())
a = Categorical(prob).sample().item()
s_prime, r, done, _ = env.step(a)
s = s_prime
env.render(ax)
plt.pause(0.2) # 0.2초 대기 (애니메이션 효과)
plt.show()
if __name__ == '__main__':
main()
PPO Dynamic Grid 코드 (학습 + 시각화 포함)
import gym
from gym import spaces
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
import matplotlib.pyplot as plt
import matplotlib.patches as patches
# -----------------------------------
# Hyperparameters (학습 속도 부스팅)
# -----------------------------------
learning_rate = 0.002 # [수정] 4배 키움 (빠른 수렴)
gamma = 0.98
lmbda = 0.95
eps_clip = 0.1
K_epoch = 3
T_horizon = 20
# -----------------------------------
# 1. Environment (난이도 완화 & 강력한 보상)
# -----------------------------------
class DynamicGridEnv(gym.Env):
def __init__(self):
super(DynamicGridEnv, self).__init__()
self.grid_size = 6
self.max_steps = 100 # [수정] 30 -> 100 (시간 여유 부여)
self.action_space = spaces.Discrete(5)
self.observation_space = spaces.Box(low=0, high=1.0, shape=(6,), dtype=np.float32)
self.obstacles = [{'pos': [2, 0], 'dir': 1}]
def reset(self):
self.agent_pos = [0, 0]
self.goal_pos = [5, 5]
self.obstacles = [{'pos': [2, 2], 'dir': 1}]
self.step_count = 0
return self._get_obs()
def _get_obs(self):
# 정규화된 좌표 반환
raw_obs = self.agent_pos + self.goal_pos + self.obstacles[0]['pos']
return np.array(raw_obs, dtype=np.float32) / self.grid_size
def step(self, action):
self.step_count += 1
# 이전 거리 (Manhattan Distance)
prev_dist = abs(self.agent_pos[0] - self.goal_pos[0]) + abs(self.agent_pos[1] - self.goal_pos[1])
row, col = self.agent_pos
if action == 0: row = max(0, row - 1)
elif action == 1: row = min(self.grid_size - 1, row + 1)
elif action == 2: col = max(0, col - 1)
elif action == 3: col = min(self.grid_size - 1, col + 1)
self.agent_pos = [row, col]
# 장애물 이동
obs_row, obs_col = self.obstacles[0]['pos']
obs_dir = self.obstacles[0]['dir']
if obs_row == 0: obs_dir = 1
elif obs_row == self.grid_size - 1: obs_dir = -1
obs_row += obs_dir
self.obstacles[0]['pos'] = [obs_row, obs_col]
self.obstacles[0]['dir'] = obs_dir
# 현재 거리
curr_dist = abs(self.agent_pos[0] - self.goal_pos[0]) + abs(self.agent_pos[1] - self.goal_pos[1])
# --- [핵심 수정: 보상 설계] ---
reward = -0.01 # [수정] 시간 페널티 대폭 축소 (버티기 허용)
done = False
# 거리 보상 강화: 가까워지면 +0.5점! (유도탄급 보상)
if curr_dist < prev_dist:
reward += 0.5
elif curr_dist > prev_dist:
reward -= 0.3 # 멀어지면 적당히 혼냄
if self.agent_pos == self.obstacles[0]['pos']: # 충돌
reward = -10.0 # [수정] 충돌은 매우 아프게
done = True
elif self.agent_pos == self.goal_pos: # 도착
reward = 20.0 # [수정] 성공 보상 강화
done = True
elif self.step_count >= self.max_steps: # 시간 초과
done = True
return self._get_obs(), reward, done, {}
def render(self, ax, episode=0, score=0):
ax.clear()
ax.set_xlim(-0.5, self.grid_size - 0.5)
ax.set_ylim(self.grid_size - 0.5, -0.5)
ax.grid(True)
ax.add_patch(patches.Rectangle((self.agent_pos[1]-0.4, self.agent_pos[0]-0.4), 0.8, 0.8, color='red', alpha=0.8, label='Agent'))
ax.add_patch(patches.Rectangle((self.goal_pos[1]-0.4, self.goal_pos[0]-0.4), 0.8, 0.8, color='green', alpha=0.5, label='Goal'))
obs_r, obs_c = self.obstacles[0]['pos']
ax.add_patch(patches.Rectangle((obs_c-0.4, obs_r-0.4), 0.8, 0.8, color='black', label='Obstacle'))
ax.set_title(f"Ep: {episode} | Score: {score:.1f}")
# -----------------------------------
# 2. PPO Agent (동일 구조)
# -----------------------------------
class PPO(nn.Module):
def __init__(self):
super(PPO, self).__init__()
self.data = []
self.fc1 = nn.Linear(6, 128)
self.fc_pi = nn.Linear(128, 5)
self.fc_v = nn.Linear(128, 1)
self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
def pi(self, x, softmax_dim=0):
x = F.relu(self.fc1(x))
prob = F.softmax(self.fc_pi(x), dim=softmax_dim)
return prob
def v(self, x):
x = F.relu(self.fc1(x))
return self.fc_v(x)
def put_data(self, transition):
self.data.append(transition)
def make_batch(self):
s_lst, a_lst, r_lst, s_prime_lst, prob_a_lst, done_lst = [], [], [], [], [], []
for transition in self.data:
s, a, r, s_prime, prob_a, done = transition
s_lst.append(s); a_lst.append([a]); r_lst.append([r])
s_prime_lst.append(s_prime); prob_a_lst.append([prob_a])
done_mask = 0 if done else 1
done_lst.append([done_mask])
return torch.tensor(np.array(s_lst), dtype=torch.float), torch.tensor(np.array(a_lst)), \
torch.tensor(np.array(r_lst), dtype=torch.float), torch.tensor(np.array(s_prime_lst), dtype=torch.float), \
torch.tensor(np.array(done_lst), dtype=torch.float), torch.tensor(np.array(prob_a_lst))
def train_net(self):
s, a, r, s_prime, done_mask, prob_a = self.make_batch()
for i in range(K_epoch):
td_target = r + gamma * self.v(s_prime) * done_mask
delta = td_target - self.v(s)
delta = delta.detach().numpy()
advantage_lst = []
advantage = 0.0
for delta_t in delta[::-1]:
advantage = gamma * lmbda * advantage + delta_t[0]
advantage_lst.append([advantage])
advantage_lst.reverse()
advantage = torch.tensor(np.array(advantage_lst), dtype=torch.float)
pi = self.pi(s, softmax_dim=1)
pi_a = pi.gather(1, a)
ratio = torch.exp(torch.log(pi_a) - torch.log(prob_a))
surr1 = ratio * advantage
surr2 = torch.clamp(ratio, 1-eps_clip, 1+eps_clip) * advantage
loss = -torch.min(surr1, surr2) + F.smooth_l1_loss(self.v(s), td_target.detach())
self.optimizer.zero_grad()
loss.mean().backward()
self.optimizer.step()
self.data = []
# -----------------------------------
# 3. Main Execution
# -----------------------------------
def main():
env = DynamicGridEnv()
model = PPO()
plt.ion()
fig, ax = plt.subplots(figsize=(5,5))
print("Start Training PPO (Success Mode)...")
score_history = []
for n_epi in range(500):
s = env.reset()
done = False
score = 0.0
while not done:
for t in range(T_horizon):
prob = model.pi(torch.from_numpy(s).float())
m = Categorical(prob)
a = m.sample().item()
s_prime, r, done, _ = env.step(a)
model.put_data((s, a, r, s_prime, prob[a].item(), done))
s = s_prime
score += r
# 학습 100회 이후부터 시각화
if n_epi > 100 and n_epi % 10 == 0:
env.render(ax, n_epi, score)
plt.pause(0.01)
if done: break
model.train_net()
score_history.append(score)
if n_epi % 20 == 0:
avg_score = np.mean(score_history[-20:])
print(f"Episode: {n_epi}, Score: {score:.1f}, Avg(last 20): {avg_score:.1f}")
plt.ioff()
plt.show()
if __name__ == '__main__':
main()
728x90
반응형
'Study' 카테고리의 다른 글
| [Continuous/SAC] 1D Hovering Drone: 고도를 유지하려는 드론 (PID 제어를 AI로 대체하는 컨셉) (1) | 2025.11.29 |
|---|---|
| Behavior Cloning (모방 학습) (0) | 2025.11.29 |
| OpenAI Gym 인터페이스를 따르는 Custom Environment 구현 (0) | 2025.11.29 |
| 2D Grid Map에서의 A* 최단 경로 탐색 (0) | 2025.11.29 |
| CartPole-v1 환경에서의 강화학습 에이전트 구현 (0) | 2025.11.29 |