Study

[Continuous/SAC] 1D Hovering Drone: 고도를 유지하려는 드론 (PID 제어를 AI로 대체하는 컨셉)

SigmoidFunction 2025. 11. 29. 11:10
728x90
반응형

1D Hovering Drone + SAC (고도 유지 제어)

로봇 제어의 핵심인 PID 제어를 강화학습으로 대체하는 시나리오입니다. 연속 행동 공간(Thrust)을 다루므로 SAC가 적합합니다.

  • 물리 모델:   
  • 상태: 현재 높이(), 수직 속도()
  • 목표: 높이 1.0 유지
import gym
from gym import spaces
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Normal
import matplotlib.pyplot as plt

# -----------------------------------
# 1. Custom Environment: Hover Drone
# -----------------------------------
class HoverDroneEnv(gym.Env):
    def __init__(self):
        super(HoverDroneEnv, self).__init__()
        # 물리 상수
        self.dt = 0.05
        self.gravity = 9.8
        self.mass = 1.0
        self.target_height = 1.0
        
        # Action: 추력 (Thrust) -1.0 ~ 1.0 (실제 힘은 0 ~ 20N으로 매핑)
        self.action_space = spaces.Box(low=-1.0, high=1.0, shape=(1,), dtype=np.float32)
        # State: [height, velocity]
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(2,), dtype=np.float32)

    def reset(self):
        self.state = np.array([0.0, 0.0], dtype=np.float32) # 바닥에서 시작
        self.step_count = 0
        return self.state

    def step(self, action):
        self.step_count += 1
        h, v = self.state
        
        # Action Mapping (-1~1 -> 0~20N)
        # Hovering force approx = m * g = 9.8N
        thrust = (action[0] + 1.0) * 10.0 
        
        # Physics (Euler Integration)
        force = thrust - (self.mass * self.gravity)
        acc = force / self.mass
        
        v_next = v + acc * self.dt
        h_next = h + v_next * self.dt
        
        # 바닥 충돌 처리
        if h_next < 0:
            h_next = 0
            v_next = 0
            
        self.state = np.array([h_next, v_next], dtype=np.float32)
        
        # Reward: 목표 높이와의 거리 + 에너지 패널티
        dist_penalty = abs(self.target_height - h_next)
        reward = -dist_penalty - 0.01 * abs(action[0])
        
        done = self.step_count >= 100
        
        return self.state, reward, done, {}

    # ★ Matplotlib 시각화
    def render(self, ax):
        ax.clear()
        ax.set_xlim(-1, 1)
        ax.set_ylim(0, 2.5)
        ax.axhline(y=self.target_height, color='g', linestyle='--', label='Target')
        
        # Drone (점)
        h = self.state[0]
        ax.plot(0, h, 'bo', markersize=20, label='Drone')
        # Thrust 표시 (화살표)
        ax.arrow(0, h, 0, -0.2, head_width=0.05, head_length=0.1, fc='r', ec='r')
        
        ax.legend()
        ax.set_title(f"Height: {h:.2f}m")

# -----------------------------------
# 2. SAC Agent (Policy Only for Test)
# -----------------------------------
class PolicyNet(nn.Module):
    def __init__(self):
        super(PolicyNet, self).__init__()
        self.fc1 = nn.Linear(2, 64)
        self.fc_mu = nn.Linear(64, 1)
        self.fc_std = nn.Linear(64, 1)
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        mu = self.fc_mu(x)
        std = F.softplus(self.fc_std(x)) + 1e-3
        return mu, std
        
    def sample(self, x):
        mu, std = self.forward(x)
        dist = Normal(mu, std)
        action = torch.tanh(dist.rsample()) # Tanh Squashing
        return action

# -----------------------------------
# 3. Execution
# -----------------------------------
def main():
    env = HoverDroneEnv()
    policy = PolicyNet()
    
    # 그래프 설정
    fig, ax = plt.subplots(figsize=(4, 6))
    
    print("Simulation Start... (Press Ctrl+C to stop)")
    
    # 에피소드 반복을 위한 루프 추가
    for epi in range(1000):  # 1000번 반복
        s = env.reset()
        done = False
        score = 0.0
        
        while not done:
            # ----------------------------------------
            # [학습 연결 포인트]
            # 실제 학습 코드를 짠다면 여기서 agent.train() 등을 호출해야 함
            # 지금은 '학습되지 않은' 랜덤 정책으로 움직이는 중
            # ----------------------------------------
            with torch.no_grad():
                # 랜덤하게 움직입니다 (학습 전이라 비틀거릴 수 있음)
                a = policy.sample(torch.from_numpy(s).float()).numpy()
            
            s_prime, r, done, _ = env.step(a)
            s = s_prime
            score += r
            
            # 시각화 (너무 빠르면 숫자를 0.05 -> 0.1로 늘리세요)
            env.render(ax)
            plt.pause(0.05)
        
        print(f"Episode {epi} finished. Score: {score:.1f}")

    plt.show()

if __name__ == '__main__':
    main()

 

 

 

업그레이드 버전

import gym
from gym import spaces
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Normal
import numpy as np
import random
from collections import deque
import matplotlib.pyplot as plt

# -----------------------------------
# [수정] 하이퍼파라미터
# -----------------------------------
lr_pi = 0.0003        # [수정] Policy learning rate 조정
lr_q = 0.001
gamma = 0.99
batch_size = 128
buffer_limit = 50000
tau = 0.005 
init_alpha = 0.01     # [수정] 탐험 조절 (더 작게)
hidden_dim = 256      # [수정] 네트워크 크기 증가

# -----------------------------------
# 1. 환경 정의 (Environment)
# -----------------------------------
class HoverDroneEnv(gym.Env):
    def __init__(self):
        super(HoverDroneEnv, self).__init__()
        self.dt = 0.05
        self.gravity = 9.8
        self.mass = 1.0
        self.target_height = 1.0
        self.max_height = 3.0
        
        # Action: -1.0 ~ 1.0
        self.action_space = spaces.Box(low=-1.0, high=1.0, shape=(1,), dtype=np.float32)
        # State: [height, velocity] 
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(2,), dtype=np.float32)

    def reset(self):
        # [수정] 더 안전한 초기화 (목표 높이 근처에서 시작)
        self.state = np.array([
            1.0 + random.uniform(-0.1, 0.1),  # 목표 높이 ± 0.1
            random.uniform(-0.05, 0.05)       # 작은 초기 속도
        ], dtype=np.float32)
        self.step_count = 0
        return self.state

    def step(self, action):
        self.step_count += 1
        h, v = self.state
        
        # [핵심 수정] Action Mapping - 중력을 이기기 쉽도록 조정
        # action -1 -> thrust = 5, action 0 -> thrust = 10, action 1 -> thrust = 15
        # 중력(9.8)보다 항상 작지 않도록 최소값 보장
        thrust = 8.0 + (action[0] + 1.0) * 4.0  # 범위: 8.0 ~ 16.0
        
        # Physics
        force = thrust - (self.mass * self.gravity)
        acc = force / self.mass
        v_next = v + acc * self.dt
        h_next = h + v_next * self.dt
        
        # [수정] 물리적 충돌 처리
        if h_next <= 0.0:
            h_next = 0.0
            v_next = max(0.0, v_next)  # 바닥에서는 위쪽 속도만 허용
        elif h_next >= self.max_height:
            h_next = self.max_height
            v_next = min(0.0, v_next)  # 천장에서는 아래쪽 속도만 허용
            
        self.state = np.array([h_next, v_next], dtype=np.float32)
        
        # [핵심 수정] Reward 함수 개선
        dist_to_target = abs(self.target_height - h_next)
        
        # 1. 기본 거리 기반 리워드 (0.0 ~ 1.0)
        height_reward = max(0.0, 1.0 - dist_to_target)
        
        # 2. 바닥 페널티 (바닥에 있으면 큰 페널티)
        ground_penalty = -2.0 if h_next <= 0.1 else 0.0
        
        # 3. 속도 페널티 (너무 빠르면 페널티)
        velocity_penalty = -0.1 * abs(v_next) if abs(v_next) > 1.0 else 0.0
        
        # 4. 목표 근처에서 보너스
        target_bonus = 0.5 if dist_to_target < 0.1 else 0.0
        
        reward = height_reward + ground_penalty + velocity_penalty + target_bonus
        
        # Episode 종료 조건
        done = False
        if self.step_count >= 200:
            done = True
            
        return self.state, reward, done, {}

    def render(self, ax, episode, score):
        ax.clear()
        ax.set_xlim(-1, 1)
        ax.set_ylim(0, self.max_height)
        
        # 목표 높이 라인
        ax.axhline(y=self.target_height, color='g', linestyle='--', linewidth=2, label='Target Height')
        
        # 드론 위치
        h, v = self.state
        drone_color = 'blue' if h > 0.1 else 'red'  # 바닥에 있으면 빨간색
        ax.plot(0, h, 'o', color=drone_color, markersize=15)
        
        # 추력 표시
        thrust_action = self.last_action if hasattr(self, 'last_action') else 0.0
        thrust_len = 0.3 * (thrust_action + 1.0)  # 0 ~ 0.6
        ax.arrow(0, h, 0, -thrust_len, head_width=0.05, head_length=0.05, fc='orange', ec='orange')
        
        # 속도 벡터
        if abs(v) > 0.01:
            ax.arrow(0, h, 0, v * 0.1, head_width=0.03, head_length=0.03, fc='purple', ec='purple')
        
        # 정보 표시
        ax.text(-0.9, self.max_height-0.2, f"Episode: {episode}")
        ax.text(-0.9, self.max_height-0.4, f"Score: {score:.1f}")
        ax.text(-0.9, self.max_height-0.6, f"Height: {h:.2f}")
        ax.text(-0.9, self.max_height-0.8, f"Velocity: {v:.2f}")
        
        ax.set_ylabel('Height')
        ax.set_title('Hover Drone Training')
        ax.grid(True, alpha=0.3)

# -----------------------------------
# 2. SAC 알고리즘 - 개선된 버전
# -----------------------------------
class ReplayBuffer:
    def __init__(self):
        self.buffer = deque(maxlen=buffer_limit)
    
    def put(self, transition):
        self.buffer.append(transition)
    
    def sample(self, n):
        mini_batch = random.sample(self.buffer, n)
        s_lst, a_lst, r_lst, s_prime_lst, done_mask_lst = [], [], [], [], []
        
        for transition in mini_batch:
            s, a, r, s_prime, done_mask = transition
            s_lst.append(s)
            a_lst.append(a)
            r_lst.append([r])
            s_prime_lst.append(s_prime)
            done_mask_lst.append([done_mask])
        
        return (torch.tensor(np.array(s_lst), dtype=torch.float), 
                torch.tensor(np.array(a_lst), dtype=torch.float), 
                torch.tensor(np.array(r_lst), dtype=torch.float), 
                torch.tensor(np.array(s_prime_lst), dtype=torch.float), 
                torch.tensor(np.array(done_mask_lst), dtype=torch.float))
    
    def size(self):
        return len(self.buffer)

class PolicyNet(nn.Module):
    def __init__(self):
        super(PolicyNet, self).__init__()
        self.fc1 = nn.Linear(2, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)  # [수정] 레이어 추가
        self.fc_mu = nn.Linear(hidden_dim, 1)
        self.fc_std = nn.Linear(hidden_dim, 1)
    
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))  # [수정] 추가 레이어
        mu = self.fc_mu(x)
        std = F.softplus(self.fc_std(x)) + 1e-5  # [수정] 최소값 증가
        return mu, std
    
    def sample(self, x):
        mu, std = self.forward(x)
        dist = Normal(mu, std)
        z = dist.rsample()  # reparameterization trick
        action = torch.tanh(z)
        log_prob = dist.log_prob(z) - torch.log(1 - action.pow(2) + 1e-7)
        return action, log_prob

class QNet(nn.Module):
    def __init__(self):
        super(QNet, self).__init__()
        self.fc_s = nn.Linear(2, hidden_dim//2)
        self.fc_a = nn.Linear(1, hidden_dim//2)
        self.fc_cat = nn.Linear(hidden_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim//2)  # [수정] 레이어 추가
        self.fc_out = nn.Linear(hidden_dim//2, 1)
    
    def forward(self, x, a):
        h1 = F.relu(self.fc_s(x))
        h2 = F.relu(self.fc_a(a))
        cat = torch.cat([h1, h2], dim=1)
        q = F.relu(self.fc_cat(cat))
        q = F.relu(self.fc2(q))  # [수정] 추가 레이어
        q = self.fc_out(q)
        return q

class SAC_Agent:
    def __init__(self):
        self.policy = PolicyNet()
        self.q1 = QNet()
        self.q2 = QNet()
        self.q1_target = QNet()
        self.q2_target = QNet()
        
        # 타겟 네트워크 초기화
        self.q1_target.load_state_dict(self.q1.state_dict())
        self.q2_target.load_state_dict(self.q2.state_dict())
        
        # Optimizer
        self.pi_optimizer = optim.Adam(self.policy.parameters(), lr=lr_pi)
        self.q1_optimizer = optim.Adam(self.q1.parameters(), lr=lr_q)
        self.q2_optimizer = optim.Adam(self.q2.parameters(), lr=lr_q)

    def get_action(self, state, deterministic=False):
        """액션 선택 (학습용과 테스트용)"""
        with torch.no_grad():
            if deterministic:
                mu, _ = self.policy(torch.from_numpy(state).float())
                return torch.tanh(mu).numpy()
            else:
                action, _ = self.policy.sample(torch.from_numpy(state).float())
                return action.numpy()

    def train(self, memory):
        s, a, r, s_prime, done_mask = memory.sample(batch_size)
        
        # Q-function 업데이트
        with torch.no_grad():
            next_action, next_log_prob = self.policy.sample(s_prime)
            q1_next = self.q1_target(s_prime, next_action)
            q2_next = self.q2_target(s_prime, next_action)
            q_target = r + gamma * done_mask * (torch.min(q1_next, q2_next) - init_alpha * next_log_prob)

        q1_loss = F.smooth_l1_loss(self.q1(s, a), q_target)
        q2_loss = F.smooth_l1_loss(self.q2(s, a), q_target)

        self.q1_optimizer.zero_grad()
        q1_loss.backward()
        self.q1_optimizer.step()

        self.q2_optimizer.zero_grad()
        q2_loss.backward()
        self.q2_optimizer.step()

        # Policy 업데이트
        action, log_prob = self.policy.sample(s)
        q1_val = self.q1(s, action)
        q2_val = self.q2(s, action)
        pi_loss = -(torch.min(q1_val, q2_val) - init_alpha * log_prob).mean()

        self.pi_optimizer.zero_grad()
        pi_loss.backward()
        self.pi_optimizer.step()

        # 타겟 네트워크 소프트 업데이트
        for param, target_param in zip(self.q1.parameters(), self.q1_target.parameters()):
            target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)
        for param, target_param in zip(self.q2.parameters(), self.q2_target.parameters()):
            target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)

# -----------------------------------
# 3. 메인 실행
# -----------------------------------
def main():
    env = HoverDroneEnv()
    agent = SAC_Agent()
    memory = ReplayBuffer()
    
    # 시각화 설정
    plt.ion()
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 6))
    
    scores = []
    heights = []
    
    print("=== Hover Drone SAC Training ===")
    print("Goal: Keep the drone at height 1.0 (green line)")
    print("Blue drone = flying, Red drone = crashed")
    
    for n_epi in range(300):  # [수정] 에피소드 수 증가
        s = env.reset()
        done = False
        score = 0.0
        episode_heights = []
        
        while not done:
            # 액션 선택
            action = agent.get_action(s, deterministic=False)
            env.last_action = action[0]  # 시각화용
            
            # 환경과 상호작용
            s_prime, r, done, _ = env.step(action)
            done_mask = 0.0 if done else 1.0
            
            # 경험 저장
            memory.put((s, action, r, s_prime, done_mask))
            
            s = s_prime
            score += r
            episode_heights.append(s[0])  # 높이 기록
            
            # 학습 (충분한 경험이 쌓인 후)
            if memory.size() > 2000:  # [수정] 더 많은 경험 후 학습 시작
                agent.train(memory)

            # 시각화 (일정 간격으로)
            if n_epi % 10 == 0 and env.step_count % 5 == 0:
                env.render(ax1, n_epi, score)
                plt.pause(0.01)

        # 에피소드 결과 기록
        scores.append(score)
        avg_height = np.mean(episode_heights)
        heights.append(avg_height)
        
        # 로그 출력
        if n_epi % 20 == 0:
            recent_score = np.mean(scores[-20:]) if len(scores) >= 20 else np.mean(scores)
            recent_height = np.mean(heights[-20:]) if len(heights) >= 20 else np.mean(heights)
            print(f"Episode {n_epi:3d} | Score: {score:6.1f} | Avg Score: {recent_score:6.1f} | "
                  f"Avg Height: {recent_height:.2f} | Buffer: {memory.size()}")
        
        # 성능 그래프 업데이트
        if n_epi % 10 == 0 and len(scores) > 1:
            ax2.clear()
            ax2.plot(scores, 'b-', alpha=0.7, label='Episode Score')
            if len(scores) >= 20:
                smooth_scores = []
                for i in range(19, len(scores)):
                    smooth_scores.append(np.mean(scores[i-19:i+1]))
                ax2.plot(range(19, len(scores)), smooth_scores, 'r-', linewidth=2, label='Moving Average')
            ax2.set_xlabel('Episode')
            ax2.set_ylabel('Score')
            ax2.set_title('Training Progress')
            ax2.legend()
            ax2.grid(True, alpha=0.3)
    
    plt.ioff()
    
    # 최종 테스트
    print("\n=== Final Test ===")
    test_scores = []
    for i in range(10):
        s = env.reset()
        done = False
        score = 0.0
        while not done:
            action = agent.get_action(s, deterministic=True)  # 결정적 정책
            s, r, done, _ = env.step(action)
            score += r
        test_scores.append(score)
    
    print(f"Average test score: {np.mean(test_scores):.2f} ± {np.std(test_scores):.2f}")
    plt.show()

if __name__ == '__main__':
    main()
728x90
반응형