728x90
반응형
1D Hovering Drone + SAC (고도 유지 제어)
로봇 제어의 핵심인 PID 제어를 강화학습으로 대체하는 시나리오입니다. 연속 행동 공간(Thrust)을 다루므로 SAC가 적합합니다.
- 물리 모델:
- 상태: 현재 높이(), 수직 속도()
- 목표: 높이 1.0 유지
import gym
from gym import spaces
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.distributions import Normal
import matplotlib.pyplot as plt
# -----------------------------------
# 1. Custom Environment: Hover Drone
# -----------------------------------
class HoverDroneEnv(gym.Env):
def __init__(self):
super(HoverDroneEnv, self).__init__()
# 물리 상수
self.dt = 0.05
self.gravity = 9.8
self.mass = 1.0
self.target_height = 1.0
# Action: 추력 (Thrust) -1.0 ~ 1.0 (실제 힘은 0 ~ 20N으로 매핑)
self.action_space = spaces.Box(low=-1.0, high=1.0, shape=(1,), dtype=np.float32)
# State: [height, velocity]
self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(2,), dtype=np.float32)
def reset(self):
self.state = np.array([0.0, 0.0], dtype=np.float32) # 바닥에서 시작
self.step_count = 0
return self.state
def step(self, action):
self.step_count += 1
h, v = self.state
# Action Mapping (-1~1 -> 0~20N)
# Hovering force approx = m * g = 9.8N
thrust = (action[0] + 1.0) * 10.0
# Physics (Euler Integration)
force = thrust - (self.mass * self.gravity)
acc = force / self.mass
v_next = v + acc * self.dt
h_next = h + v_next * self.dt
# 바닥 충돌 처리
if h_next < 0:
h_next = 0
v_next = 0
self.state = np.array([h_next, v_next], dtype=np.float32)
# Reward: 목표 높이와의 거리 + 에너지 패널티
dist_penalty = abs(self.target_height - h_next)
reward = -dist_penalty - 0.01 * abs(action[0])
done = self.step_count >= 100
return self.state, reward, done, {}
# ★ Matplotlib 시각화
def render(self, ax):
ax.clear()
ax.set_xlim(-1, 1)
ax.set_ylim(0, 2.5)
ax.axhline(y=self.target_height, color='g', linestyle='--', label='Target')
# Drone (점)
h = self.state[0]
ax.plot(0, h, 'bo', markersize=20, label='Drone')
# Thrust 표시 (화살표)
ax.arrow(0, h, 0, -0.2, head_width=0.05, head_length=0.1, fc='r', ec='r')
ax.legend()
ax.set_title(f"Height: {h:.2f}m")
# -----------------------------------
# 2. SAC Agent (Policy Only for Test)
# -----------------------------------
class PolicyNet(nn.Module):
def __init__(self):
super(PolicyNet, self).__init__()
self.fc1 = nn.Linear(2, 64)
self.fc_mu = nn.Linear(64, 1)
self.fc_std = nn.Linear(64, 1)
def forward(self, x):
x = F.relu(self.fc1(x))
mu = self.fc_mu(x)
std = F.softplus(self.fc_std(x)) + 1e-3
return mu, std
def sample(self, x):
mu, std = self.forward(x)
dist = Normal(mu, std)
action = torch.tanh(dist.rsample()) # Tanh Squashing
return action
# -----------------------------------
# 3. Execution
# -----------------------------------
def main():
env = HoverDroneEnv()
policy = PolicyNet()
# 그래프 설정
fig, ax = plt.subplots(figsize=(4, 6))
print("Simulation Start... (Press Ctrl+C to stop)")
# 에피소드 반복을 위한 루프 추가
for epi in range(1000): # 1000번 반복
s = env.reset()
done = False
score = 0.0
while not done:
# ----------------------------------------
# [학습 연결 포인트]
# 실제 학습 코드를 짠다면 여기서 agent.train() 등을 호출해야 함
# 지금은 '학습되지 않은' 랜덤 정책으로 움직이는 중
# ----------------------------------------
with torch.no_grad():
# 랜덤하게 움직입니다 (학습 전이라 비틀거릴 수 있음)
a = policy.sample(torch.from_numpy(s).float()).numpy()
s_prime, r, done, _ = env.step(a)
s = s_prime
score += r
# 시각화 (너무 빠르면 숫자를 0.05 -> 0.1로 늘리세요)
env.render(ax)
plt.pause(0.05)
print(f"Episode {epi} finished. Score: {score:.1f}")
plt.show()
if __name__ == '__main__':
main()
업그레이드 버전
import gym
from gym import spaces
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Normal
import numpy as np
import random
from collections import deque
import matplotlib.pyplot as plt
# -----------------------------------
# [수정] 하이퍼파라미터
# -----------------------------------
lr_pi = 0.0003 # [수정] Policy learning rate 조정
lr_q = 0.001
gamma = 0.99
batch_size = 128
buffer_limit = 50000
tau = 0.005
init_alpha = 0.01 # [수정] 탐험 조절 (더 작게)
hidden_dim = 256 # [수정] 네트워크 크기 증가
# -----------------------------------
# 1. 환경 정의 (Environment)
# -----------------------------------
class HoverDroneEnv(gym.Env):
def __init__(self):
super(HoverDroneEnv, self).__init__()
self.dt = 0.05
self.gravity = 9.8
self.mass = 1.0
self.target_height = 1.0
self.max_height = 3.0
# Action: -1.0 ~ 1.0
self.action_space = spaces.Box(low=-1.0, high=1.0, shape=(1,), dtype=np.float32)
# State: [height, velocity]
self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(2,), dtype=np.float32)
def reset(self):
# [수정] 더 안전한 초기화 (목표 높이 근처에서 시작)
self.state = np.array([
1.0 + random.uniform(-0.1, 0.1), # 목표 높이 ± 0.1
random.uniform(-0.05, 0.05) # 작은 초기 속도
], dtype=np.float32)
self.step_count = 0
return self.state
def step(self, action):
self.step_count += 1
h, v = self.state
# [핵심 수정] Action Mapping - 중력을 이기기 쉽도록 조정
# action -1 -> thrust = 5, action 0 -> thrust = 10, action 1 -> thrust = 15
# 중력(9.8)보다 항상 작지 않도록 최소값 보장
thrust = 8.0 + (action[0] + 1.0) * 4.0 # 범위: 8.0 ~ 16.0
# Physics
force = thrust - (self.mass * self.gravity)
acc = force / self.mass
v_next = v + acc * self.dt
h_next = h + v_next * self.dt
# [수정] 물리적 충돌 처리
if h_next <= 0.0:
h_next = 0.0
v_next = max(0.0, v_next) # 바닥에서는 위쪽 속도만 허용
elif h_next >= self.max_height:
h_next = self.max_height
v_next = min(0.0, v_next) # 천장에서는 아래쪽 속도만 허용
self.state = np.array([h_next, v_next], dtype=np.float32)
# [핵심 수정] Reward 함수 개선
dist_to_target = abs(self.target_height - h_next)
# 1. 기본 거리 기반 리워드 (0.0 ~ 1.0)
height_reward = max(0.0, 1.0 - dist_to_target)
# 2. 바닥 페널티 (바닥에 있으면 큰 페널티)
ground_penalty = -2.0 if h_next <= 0.1 else 0.0
# 3. 속도 페널티 (너무 빠르면 페널티)
velocity_penalty = -0.1 * abs(v_next) if abs(v_next) > 1.0 else 0.0
# 4. 목표 근처에서 보너스
target_bonus = 0.5 if dist_to_target < 0.1 else 0.0
reward = height_reward + ground_penalty + velocity_penalty + target_bonus
# Episode 종료 조건
done = False
if self.step_count >= 200:
done = True
return self.state, reward, done, {}
def render(self, ax, episode, score):
ax.clear()
ax.set_xlim(-1, 1)
ax.set_ylim(0, self.max_height)
# 목표 높이 라인
ax.axhline(y=self.target_height, color='g', linestyle='--', linewidth=2, label='Target Height')
# 드론 위치
h, v = self.state
drone_color = 'blue' if h > 0.1 else 'red' # 바닥에 있으면 빨간색
ax.plot(0, h, 'o', color=drone_color, markersize=15)
# 추력 표시
thrust_action = self.last_action if hasattr(self, 'last_action') else 0.0
thrust_len = 0.3 * (thrust_action + 1.0) # 0 ~ 0.6
ax.arrow(0, h, 0, -thrust_len, head_width=0.05, head_length=0.05, fc='orange', ec='orange')
# 속도 벡터
if abs(v) > 0.01:
ax.arrow(0, h, 0, v * 0.1, head_width=0.03, head_length=0.03, fc='purple', ec='purple')
# 정보 표시
ax.text(-0.9, self.max_height-0.2, f"Episode: {episode}")
ax.text(-0.9, self.max_height-0.4, f"Score: {score:.1f}")
ax.text(-0.9, self.max_height-0.6, f"Height: {h:.2f}")
ax.text(-0.9, self.max_height-0.8, f"Velocity: {v:.2f}")
ax.set_ylabel('Height')
ax.set_title('Hover Drone Training')
ax.grid(True, alpha=0.3)
# -----------------------------------
# 2. SAC 알고리즘 - 개선된 버전
# -----------------------------------
class ReplayBuffer:
def __init__(self):
self.buffer = deque(maxlen=buffer_limit)
def put(self, transition):
self.buffer.append(transition)
def sample(self, n):
mini_batch = random.sample(self.buffer, n)
s_lst, a_lst, r_lst, s_prime_lst, done_mask_lst = [], [], [], [], []
for transition in mini_batch:
s, a, r, s_prime, done_mask = transition
s_lst.append(s)
a_lst.append(a)
r_lst.append([r])
s_prime_lst.append(s_prime)
done_mask_lst.append([done_mask])
return (torch.tensor(np.array(s_lst), dtype=torch.float),
torch.tensor(np.array(a_lst), dtype=torch.float),
torch.tensor(np.array(r_lst), dtype=torch.float),
torch.tensor(np.array(s_prime_lst), dtype=torch.float),
torch.tensor(np.array(done_mask_lst), dtype=torch.float))
def size(self):
return len(self.buffer)
class PolicyNet(nn.Module):
def __init__(self):
super(PolicyNet, self).__init__()
self.fc1 = nn.Linear(2, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim) # [수정] 레이어 추가
self.fc_mu = nn.Linear(hidden_dim, 1)
self.fc_std = nn.Linear(hidden_dim, 1)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x)) # [수정] 추가 레이어
mu = self.fc_mu(x)
std = F.softplus(self.fc_std(x)) + 1e-5 # [수정] 최소값 증가
return mu, std
def sample(self, x):
mu, std = self.forward(x)
dist = Normal(mu, std)
z = dist.rsample() # reparameterization trick
action = torch.tanh(z)
log_prob = dist.log_prob(z) - torch.log(1 - action.pow(2) + 1e-7)
return action, log_prob
class QNet(nn.Module):
def __init__(self):
super(QNet, self).__init__()
self.fc_s = nn.Linear(2, hidden_dim//2)
self.fc_a = nn.Linear(1, hidden_dim//2)
self.fc_cat = nn.Linear(hidden_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim//2) # [수정] 레이어 추가
self.fc_out = nn.Linear(hidden_dim//2, 1)
def forward(self, x, a):
h1 = F.relu(self.fc_s(x))
h2 = F.relu(self.fc_a(a))
cat = torch.cat([h1, h2], dim=1)
q = F.relu(self.fc_cat(cat))
q = F.relu(self.fc2(q)) # [수정] 추가 레이어
q = self.fc_out(q)
return q
class SAC_Agent:
def __init__(self):
self.policy = PolicyNet()
self.q1 = QNet()
self.q2 = QNet()
self.q1_target = QNet()
self.q2_target = QNet()
# 타겟 네트워크 초기화
self.q1_target.load_state_dict(self.q1.state_dict())
self.q2_target.load_state_dict(self.q2.state_dict())
# Optimizer
self.pi_optimizer = optim.Adam(self.policy.parameters(), lr=lr_pi)
self.q1_optimizer = optim.Adam(self.q1.parameters(), lr=lr_q)
self.q2_optimizer = optim.Adam(self.q2.parameters(), lr=lr_q)
def get_action(self, state, deterministic=False):
"""액션 선택 (학습용과 테스트용)"""
with torch.no_grad():
if deterministic:
mu, _ = self.policy(torch.from_numpy(state).float())
return torch.tanh(mu).numpy()
else:
action, _ = self.policy.sample(torch.from_numpy(state).float())
return action.numpy()
def train(self, memory):
s, a, r, s_prime, done_mask = memory.sample(batch_size)
# Q-function 업데이트
with torch.no_grad():
next_action, next_log_prob = self.policy.sample(s_prime)
q1_next = self.q1_target(s_prime, next_action)
q2_next = self.q2_target(s_prime, next_action)
q_target = r + gamma * done_mask * (torch.min(q1_next, q2_next) - init_alpha * next_log_prob)
q1_loss = F.smooth_l1_loss(self.q1(s, a), q_target)
q2_loss = F.smooth_l1_loss(self.q2(s, a), q_target)
self.q1_optimizer.zero_grad()
q1_loss.backward()
self.q1_optimizer.step()
self.q2_optimizer.zero_grad()
q2_loss.backward()
self.q2_optimizer.step()
# Policy 업데이트
action, log_prob = self.policy.sample(s)
q1_val = self.q1(s, action)
q2_val = self.q2(s, action)
pi_loss = -(torch.min(q1_val, q2_val) - init_alpha * log_prob).mean()
self.pi_optimizer.zero_grad()
pi_loss.backward()
self.pi_optimizer.step()
# 타겟 네트워크 소프트 업데이트
for param, target_param in zip(self.q1.parameters(), self.q1_target.parameters()):
target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)
for param, target_param in zip(self.q2.parameters(), self.q2_target.parameters()):
target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)
# -----------------------------------
# 3. 메인 실행
# -----------------------------------
def main():
env = HoverDroneEnv()
agent = SAC_Agent()
memory = ReplayBuffer()
# 시각화 설정
plt.ion()
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 6))
scores = []
heights = []
print("=== Hover Drone SAC Training ===")
print("Goal: Keep the drone at height 1.0 (green line)")
print("Blue drone = flying, Red drone = crashed")
for n_epi in range(300): # [수정] 에피소드 수 증가
s = env.reset()
done = False
score = 0.0
episode_heights = []
while not done:
# 액션 선택
action = agent.get_action(s, deterministic=False)
env.last_action = action[0] # 시각화용
# 환경과 상호작용
s_prime, r, done, _ = env.step(action)
done_mask = 0.0 if done else 1.0
# 경험 저장
memory.put((s, action, r, s_prime, done_mask))
s = s_prime
score += r
episode_heights.append(s[0]) # 높이 기록
# 학습 (충분한 경험이 쌓인 후)
if memory.size() > 2000: # [수정] 더 많은 경험 후 학습 시작
agent.train(memory)
# 시각화 (일정 간격으로)
if n_epi % 10 == 0 and env.step_count % 5 == 0:
env.render(ax1, n_epi, score)
plt.pause(0.01)
# 에피소드 결과 기록
scores.append(score)
avg_height = np.mean(episode_heights)
heights.append(avg_height)
# 로그 출력
if n_epi % 20 == 0:
recent_score = np.mean(scores[-20:]) if len(scores) >= 20 else np.mean(scores)
recent_height = np.mean(heights[-20:]) if len(heights) >= 20 else np.mean(heights)
print(f"Episode {n_epi:3d} | Score: {score:6.1f} | Avg Score: {recent_score:6.1f} | "
f"Avg Height: {recent_height:.2f} | Buffer: {memory.size()}")
# 성능 그래프 업데이트
if n_epi % 10 == 0 and len(scores) > 1:
ax2.clear()
ax2.plot(scores, 'b-', alpha=0.7, label='Episode Score')
if len(scores) >= 20:
smooth_scores = []
for i in range(19, len(scores)):
smooth_scores.append(np.mean(scores[i-19:i+1]))
ax2.plot(range(19, len(scores)), smooth_scores, 'r-', linewidth=2, label='Moving Average')
ax2.set_xlabel('Episode')
ax2.set_ylabel('Score')
ax2.set_title('Training Progress')
ax2.legend()
ax2.grid(True, alpha=0.3)
plt.ioff()
# 최종 테스트
print("\n=== Final Test ===")
test_scores = []
for i in range(10):
s = env.reset()
done = False
score = 0.0
while not done:
action = agent.get_action(s, deterministic=True) # 결정적 정책
s, r, done, _ = env.step(action)
score += r
test_scores.append(score)
print(f"Average test score: {np.mean(test_scores):.2f} ± {np.std(test_scores):.2f}")
plt.show()
if __name__ == '__main__':
main()728x90
반응형
'Study' 카테고리의 다른 글
| [Discrete/PPO] Dynamic Grid World: 움직이는 장애물을 피해 목표로 가는 로봇 (동적 환경 계획) (0) | 2025.11.29 |
|---|---|
| Behavior Cloning (모방 학습) (0) | 2025.11.29 |
| OpenAI Gym 인터페이스를 따르는 Custom Environment 구현 (0) | 2025.11.29 |
| 2D Grid Map에서의 A* 최단 경로 탐색 (0) | 2025.11.29 |
| CartPole-v1 환경에서의 강화학습 에이전트 구현 (0) | 2025.11.29 |