Study

[Discrete/PPO] Dynamic Grid World: 움직이는 장애물을 피해 목표로 가는 로봇 (동적 환경 계획)

SigmoidFunction 2025. 11. 29. 11:09
728x90
반응형

1. Dynamic Grid World + PPO (동적 장애물 회피)

단순한 미로 찾기가 아니라, 움직이는 장애물(Patrolling Obstacle)을 피해야 하므로 타이밍을 학습해야 합니다. 로봇 내비게이션 기초와 연결됩니다.

 

import gym
from gym import spaces
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
import matplotlib.pyplot as plt
import matplotlib.patches as patches

# -----------------------------------
# 1. Custom Environment: Dynamic Grid
# -----------------------------------
class DynamicGridEnv(gym.Env):
    def __init__(self):
        super(DynamicGridEnv, self).__init__()
        self.grid_size = 6
        self.max_steps = 30
        
        # Action: 상, 하, 좌, 우, 정지 (5개) - 정지가 있어야 타이밍을 잴 수 있음
        self.action_space = spaces.Discrete(5)
        # State: Agent(2) + Goal(2) + Obstacle(2) = 6
        self.observation_space = spaces.Box(low=0, high=self.grid_size, shape=(6,), dtype=np.float32)

        self.obstacles = [{'pos': [2, 0], 'dir': 1}] # (2,0)에서 위아래로 움직이는 장애물

    def reset(self):
        self.agent_pos = [0, 0]
        self.goal_pos = [5, 5]
        self.obstacles = [{'pos': [2, 2], 'dir': 1}] # 초기화
        self.step_count = 0
        return self._get_obs()

    def _get_obs(self):
        return np.array(self.agent_pos + self.goal_pos + self.obstacles[0]['pos'], dtype=np.float32)

    def step(self, action):
        self.step_count += 1
        
        # 1. Agent 이동
        row, col = self.agent_pos
        if action == 0: row = max(0, row - 1)
        elif action == 1: row = min(self.grid_size - 1, row + 1)
        elif action == 2: col = max(0, col - 1)
        elif action == 3: col = min(self.grid_size - 1, col + 1)
        # action 4 is Stay
        self.agent_pos = [row, col]

        # 2. Obstacle 이동 (Patrolling: 위아래 반복)
        obs_row, obs_col = self.obstacles[0]['pos']
        obs_dir = self.obstacles[0]['dir']
        
        if obs_row == 0: obs_dir = 1
        elif obs_row == self.grid_size - 1: obs_dir = -1
        
        obs_row += obs_dir
        self.obstacles[0]['pos'] = [obs_row, obs_col]
        self.obstacles[0]['dir'] = obs_dir

        # 3. 보상 및 종료
        reward = -0.1 # Time step cost
        done = False
        
        # 충돌 체크
        if self.agent_pos == self.obstacles[0]['pos']:
            reward = -10.0
            done = True
        # 목표 도달
        elif self.agent_pos == self.goal_pos:
            reward = 10.0
            done = True
        # 시간 초과
        elif self.step_count >= self.max_steps:
            done = True
            
        return self._get_obs(), reward, done, {}

    # ★ Matplotlib 시각화 핵심 코드
    def render(self, ax):
        ax.clear()
        ax.set_xlim(-0.5, self.grid_size - 0.5)
        ax.set_ylim(self.grid_size - 0.5, -0.5) # y축 반전 (Grid 좌표계)
        ax.grid(True)
        
        # Agent (Red)
        ax.add_patch(patches.Rectangle((self.agent_pos[1]-0.4, self.agent_pos[0]-0.4), 0.8, 0.8, color='red', label='Agent'))
        # Goal (Green)
        ax.add_patch(patches.Rectangle((self.goal_pos[1]-0.4, self.goal_pos[0]-0.4), 0.8, 0.8, color='green', label='Goal'))
        # Obstacle (Black)
        obs_r, obs_c = self.obstacles[0]['pos']
        ax.add_patch(patches.Rectangle((obs_c-0.4, obs_r-0.4), 0.8, 0.8, color='black', label='Obstacle'))
        
        ax.set_title(f"Step: {self.step_count}")

# -----------------------------------
# 2. PPO Agent (Simplified)
# -----------------------------------
class PPO(nn.Module):
    def __init__(self):
        super(PPO, self).__init__()
        self.fc1 = nn.Linear(6, 128)
        self.fc_pi = nn.Linear(128, 5) # Action 5
        self.fc_v = nn.Linear(128, 1)
        self.optimizer = optim.Adam(self.parameters(), lr=0.001)
        self.data = []

    def pi(self, x, softmax_dim=0):
        x = F.relu(self.fc1(x))
        prob = F.softmax(self.fc_pi(x), dim=softmax_dim)
        return prob

    def v(self, x):
        x = F.relu(self.fc1(x))
        return self.fc_v(x)

    def put_data(self, transition):
        self.data.append(transition)

    def train_net(self):
        # (간소화를 위해 상세 PPO 수식 생략, 이전 답변의 로직과 동일하게 구현하면 됨)
        # 실제 시험에선 이전 답변의 PPO train_net 복사해서 사용
        self.data = [] # Dummy flush

# -----------------------------------
# 3. Execution & Visualization
# -----------------------------------
def main():
    env = DynamicGridEnv()
    model = PPO()
    
    # 시각화 설정
    fig, ax = plt.subplots(figsize=(5,5))
    
    # 학습된 척하고 테스트 (실제로는 model.train_net() 필요)
    print("Simulating...")
    s = env.reset()
    done = False
    while not done:
        # 랜덤 액션 대신 학습된 모델 사용 가정
        prob = model.pi(torch.from_numpy(s).float())
        a = Categorical(prob).sample().item()
        
        s_prime, r, done, _ = env.step(a)
        s = s_prime
        
        env.render(ax)
        plt.pause(0.2) # 0.2초 대기 (애니메이션 효과)
        
    plt.show()

if __name__ == '__main__':
    main()

 

 

 

PPO Dynamic Grid 코드 (학습 + 시각화 포함)

import gym
from gym import spaces
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
import matplotlib.pyplot as plt
import matplotlib.patches as patches

# -----------------------------------
# Hyperparameters (학습 속도 부스팅)
# -----------------------------------
learning_rate = 0.002  # [수정] 4배 키움 (빠른 수렴)
gamma = 0.98
lmbda = 0.95
eps_clip = 0.1
K_epoch = 3
T_horizon = 20

# -----------------------------------
# 1. Environment (난이도 완화 & 강력한 보상)
# -----------------------------------
class DynamicGridEnv(gym.Env):
    def __init__(self):
        super(DynamicGridEnv, self).__init__()
        self.grid_size = 6
        self.max_steps = 100 # [수정] 30 -> 100 (시간 여유 부여)
        self.action_space = spaces.Discrete(5) 
        self.observation_space = spaces.Box(low=0, high=1.0, shape=(6,), dtype=np.float32)
        self.obstacles = [{'pos': [2, 0], 'dir': 1}]

    def reset(self):
        self.agent_pos = [0, 0]
        self.goal_pos = [5, 5]
        self.obstacles = [{'pos': [2, 2], 'dir': 1}]
        self.step_count = 0
        return self._get_obs()

    def _get_obs(self):
        # 정규화된 좌표 반환
        raw_obs = self.agent_pos + self.goal_pos + self.obstacles[0]['pos']
        return np.array(raw_obs, dtype=np.float32) / self.grid_size

    def step(self, action):
        self.step_count += 1
        
        # 이전 거리 (Manhattan Distance)
        prev_dist = abs(self.agent_pos[0] - self.goal_pos[0]) + abs(self.agent_pos[1] - self.goal_pos[1])
        
        row, col = self.agent_pos
        if action == 0: row = max(0, row - 1)
        elif action == 1: row = min(self.grid_size - 1, row + 1)
        elif action == 2: col = max(0, col - 1)
        elif action == 3: col = min(self.grid_size - 1, col + 1)
        self.agent_pos = [row, col]

        # 장애물 이동
        obs_row, obs_col = self.obstacles[0]['pos']
        obs_dir = self.obstacles[0]['dir']
        if obs_row == 0: obs_dir = 1
        elif obs_row == self.grid_size - 1: obs_dir = -1
        obs_row += obs_dir
        self.obstacles[0]['pos'] = [obs_row, obs_col]
        self.obstacles[0]['dir'] = obs_dir

        # 현재 거리
        curr_dist = abs(self.agent_pos[0] - self.goal_pos[0]) + abs(self.agent_pos[1] - self.goal_pos[1])

        # --- [핵심 수정: 보상 설계] ---
        reward = -0.01 # [수정] 시간 페널티 대폭 축소 (버티기 허용)
        done = False
        
        # 거리 보상 강화: 가까워지면 +0.5점! (유도탄급 보상)
        if curr_dist < prev_dist:
            reward += 0.5 
        elif curr_dist > prev_dist:
            reward -= 0.3 # 멀어지면 적당히 혼냄
        
        if self.agent_pos == self.obstacles[0]['pos']: # 충돌
            reward = -10.0 # [수정] 충돌은 매우 아프게
            done = True
        elif self.agent_pos == self.goal_pos: # 도착
            reward = 20.0 # [수정] 성공 보상 강화
            done = True
        elif self.step_count >= self.max_steps: # 시간 초과
            done = True
            
        return self._get_obs(), reward, done, {}

    def render(self, ax, episode=0, score=0):
        ax.clear()
        ax.set_xlim(-0.5, self.grid_size - 0.5)
        ax.set_ylim(self.grid_size - 0.5, -0.5)
        ax.grid(True)
        ax.add_patch(patches.Rectangle((self.agent_pos[1]-0.4, self.agent_pos[0]-0.4), 0.8, 0.8, color='red', alpha=0.8, label='Agent'))
        ax.add_patch(patches.Rectangle((self.goal_pos[1]-0.4, self.goal_pos[0]-0.4), 0.8, 0.8, color='green', alpha=0.5, label='Goal'))
        obs_r, obs_c = self.obstacles[0]['pos']
        ax.add_patch(patches.Rectangle((obs_c-0.4, obs_r-0.4), 0.8, 0.8, color='black', label='Obstacle'))
        ax.set_title(f"Ep: {episode} | Score: {score:.1f}")

# -----------------------------------
# 2. PPO Agent (동일 구조)
# -----------------------------------
class PPO(nn.Module):
    def __init__(self):
        super(PPO, self).__init__()
        self.data = []
        self.fc1 = nn.Linear(6, 128)
        self.fc_pi = nn.Linear(128, 5)
        self.fc_v = nn.Linear(128, 1)
        self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)

    def pi(self, x, softmax_dim=0):
        x = F.relu(self.fc1(x))
        prob = F.softmax(self.fc_pi(x), dim=softmax_dim)
        return prob

    def v(self, x):
        x = F.relu(self.fc1(x))
        return self.fc_v(x)

    def put_data(self, transition):
        self.data.append(transition)

    def make_batch(self):
        s_lst, a_lst, r_lst, s_prime_lst, prob_a_lst, done_lst = [], [], [], [], [], []
        for transition in self.data:
            s, a, r, s_prime, prob_a, done = transition
            s_lst.append(s); a_lst.append([a]); r_lst.append([r])
            s_prime_lst.append(s_prime); prob_a_lst.append([prob_a])
            done_mask = 0 if done else 1
            done_lst.append([done_mask])
            
        return torch.tensor(np.array(s_lst), dtype=torch.float), torch.tensor(np.array(a_lst)), \
               torch.tensor(np.array(r_lst), dtype=torch.float), torch.tensor(np.array(s_prime_lst), dtype=torch.float), \
               torch.tensor(np.array(done_lst), dtype=torch.float), torch.tensor(np.array(prob_a_lst))

    def train_net(self):
        s, a, r, s_prime, done_mask, prob_a = self.make_batch()
        for i in range(K_epoch):
            td_target = r + gamma * self.v(s_prime) * done_mask
            delta = td_target - self.v(s)
            delta = delta.detach().numpy()
            advantage_lst = []
            advantage = 0.0
            for delta_t in delta[::-1]:
                advantage = gamma * lmbda * advantage + delta_t[0]
                advantage_lst.append([advantage])
            advantage_lst.reverse()
            advantage = torch.tensor(np.array(advantage_lst), dtype=torch.float)
            
            pi = self.pi(s, softmax_dim=1)
            pi_a = pi.gather(1, a)
            ratio = torch.exp(torch.log(pi_a) - torch.log(prob_a))
            surr1 = ratio * advantage
            surr2 = torch.clamp(ratio, 1-eps_clip, 1+eps_clip) * advantage
            loss = -torch.min(surr1, surr2) + F.smooth_l1_loss(self.v(s), td_target.detach())
            self.optimizer.zero_grad()
            loss.mean().backward()
            self.optimizer.step()
        self.data = []

# -----------------------------------
# 3. Main Execution
# -----------------------------------
def main():
    env = DynamicGridEnv()
    model = PPO()
    
    plt.ion()
    fig, ax = plt.subplots(figsize=(5,5))
    
    print("Start Training PPO (Success Mode)...")
    
    score_history = []
    
    for n_epi in range(500):
        s = env.reset()
        done = False
        score = 0.0
        
        while not done:
            for t in range(T_horizon):
                prob = model.pi(torch.from_numpy(s).float())
                m = Categorical(prob)
                a = m.sample().item()
                
                s_prime, r, done, _ = env.step(a)
                model.put_data((s, a, r, s_prime, prob[a].item(), done))
                
                s = s_prime
                score += r
                
                # 학습 100회 이후부터 시각화
                if n_epi > 100 and n_epi % 10 == 0: 
                    env.render(ax, n_epi, score)
                    plt.pause(0.01)
                
                if done: break
            
            model.train_net()

        score_history.append(score)
        if n_epi % 20 == 0:
            avg_score = np.mean(score_history[-20:])
            print(f"Episode: {n_epi}, Score: {score:.1f}, Avg(last 20): {avg_score:.1f}")
            
    plt.ioff()
    plt.show()

if __name__ == '__main__':
    main()

 

728x90
반응형