728x90
반응형
# 기본 강화학습 환경 및 시각화 도구 설치
pip install gym==0.26.2 gym[classic_control] torch numpy matplotlib
문제 개요
- 환경: OpenAI Gym의 CartPole-v1
- 목표: 카트 위에 세워진 막대가 넘어지지 않도록 중심을 잡는 에이전트 학습.
- 성공 기준: 최근 100 에피소드의 평균 점수가 195점 이상 도달 시 조기 종료(Early Stopping) 및 모델 저장.
DQN 필수 구현 요구사항
- Replay Buffer 구현: deque 등을 사용하여 경험(Experience)을 저장하고 랜덤 배치로 샘플링하는 클래스를 직접 구현하시오.
- Q-Network 모델링: PyTorch를 사용하여 입력(State)에 대한 행동(Action) 가치를 출력하는 신경망을 설계하시오.
- Epsilon-Greedy 정책: 탐험(Exploration)과 활용(Exploitation)의 균형을 위한 엡실론 감쇠(Decay) 로직을 포함하시오.
- Target Network 분리: 학습의 안정성을 위해 Target Network를 두고, 주기적으로 Main Network의 가중치를 복사(Soft update 또는 Hard update)하는 로직을 구현하시오.
import gym
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import random
from collections import deque
# -----------------------------------
# 1. Hyperparameters
# -----------------------------------
learning_rate = 0.0005
gamma = 0.98 # 할인율 (Discount Factor)
buffer_limit = 50000 # Replay Buffer 크기
batch_size = 64
target_update_interval = 10 # 타겟 네트워크 업데이트 주기 (에피소드 단위)
# -----------------------------------
# 2. Replay Buffer (경험 저장소)
# -----------------------------------
class ReplayBuffer:
def __init__(self):
self.buffer = deque(maxlen=buffer_limit)
def put(self, transition):
self.buffer.append(transition)
def sample(self, n):
mini_batch = random.sample(self.buffer, n)
s_lst, a_lst, r_lst, s_prime_lst, done_mask_lst = [], [], [], [], []
for transition in mini_batch:
s, a, r, s_prime, done_mask = transition
s_lst.append(s)
a_lst.append([a])
r_lst.append([r])
s_prime_lst.append(s_prime)
done_mask_lst.append([done_mask])
return torch.tensor(np.array(s_lst), dtype=torch.float), \
torch.tensor(np.array(a_lst), dtype=torch.int64), \
torch.tensor(np.array(r_lst), dtype=torch.float), \
torch.tensor(np.array(s_prime_lst), dtype=torch.float), \
torch.tensor(np.array(done_mask_lst), dtype=torch.float)
def size(self):
return len(self.buffer)
# -----------------------------------
# 3. Q-Network (신경망 모델)
# -----------------------------------
class QNet(nn.Module):
def __init__(self):
super(QNet, self).__init__()
self.fc1 = nn.Linear(4, 128) # CartPole State: 4
self.fc2 = nn.Linear(128, 128)
self.fc3 = nn.Linear(128, 2) # Action: Left or Right
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
def sample_action(self, obs, epsilon):
out = self.forward(obs)
if random.random() < epsilon:
return random.randint(0, 1) # 탐험 (Exploration)
else:
return out.argmax().item() # 활용 (Exploitation)
# -----------------------------------
# 4. Train Loop (학습 함수)
# -----------------------------------
def train(q, q_target, memory, optimizer):
for i in range(10): # 데이터 효율성을 위해 배치 학습을 10번 반복
s, a, r, s_prime, done_mask = memory.sample(batch_size)
# Q(s,a) 값 구하기
q_out = q(s)
q_a = q_out.gather(1, a)
# Target Q 값 구하기 (Bellman Equation)
# Max Q(s', a') -> Double DQN이 아닌 기본 DQN 방식
max_q_prime = q_target(s_prime).max(1)[0].unsqueeze(1)
target = r + gamma * max_q_prime * done_mask
# Loss 계산 (MSE Loss) & 업데이트
loss = F.mse_loss(q_a, target)
optimizer.zero_grad()
loss.backward()
optimizer.step()
# -----------------------------------
# 5. Main Execution
# -----------------------------------
def main():
env = gym.make('CartPole-v1')
q = QNet()
q_target = QNet()
q_target.load_state_dict(q.state_dict()) # 초기 가중치 동기화
memory = ReplayBuffer()
optimizer = optim.Adam(q.parameters(), lr=learning_rate)
score = 0.0
epsilon = 0.05 # 실제 학습시는 1.0에서 시작해 줄여나가지만, 빠른 수렴 확인을 위해 낮게 설정
print(f"Start Training... (Goal: 195.0 pts)")
for n_epi in range(2000):
# 엡실론 감쇠 (옵션)
# epsilon = max(0.01, 0.08 - 0.01*(n_epi/200))
s, _ = env.reset()
done = False
score = 0.0
while not done:
a = q.sample_action(torch.from_numpy(s).float(), epsilon)
s_prime, r, terminated, truncated, _ = env.step(a)
done = terminated or truncated
# 종료 상태 처리를 위한 마스킹 (끝났으면 0, 아니면 1)
done_mask = 0.0 if done else 1.0
memory.put((s, a, r/100.0, s_prime, done_mask)) # 보상 스케일링
s = s_prime
score += r
if done:
break
# 데이터가 어느 정도 쌓이면 학습 시작
if memory.size() > 2000:
train(q, q_target, memory, optimizer)
# 타겟 네트워크 업데이트
if n_epi % target_update_interval == 0 and n_epi != 0:
q_target.load_state_dict(q.state_dict())
# 로그 출력
if n_epi % 20 == 0 and n_epi != 0:
print(f"Episode: {n_epi}, Score: {score:.1f}, Buffer: {memory.size()}")
# 조기 종료 조건
if score > 195.0:
print(f"Solved at episode {n_epi}! Score: {score}")
torch.save(q.state_dict(), "cartpole_dqn.pth")
break
env.close()
if __name__ == '__main__':
main()
[DQN] CartPole-v1 (학습 곡선 그래프 포함)
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import random
from collections import deque
import matplotlib.pyplot as plt
# Hyperparameters
learning_rate = 0.0005
gamma = 0.99
buffer_limit = 50000
batch_size = 64
class ReplayBuffer:
def __init__(self):
self.buffer = deque(maxlen=buffer_limit)
def put(self, transition):
self.buffer.append(transition)
def sample(self, n):
mini_batch = random.sample(self.buffer, n)
s_lst, a_lst, r_lst, s_prime_lst, done_mask_lst = [], [], [], [], []
for transition in mini_batch:
s, a, r, s_prime, done_mask = transition
s_lst.append(s)
a_lst.append([a])
r_lst.append([r])
s_prime_lst.append(s_prime)
done_mask_lst.append([done_mask])
return torch.tensor(np.array(s_lst), dtype=torch.float), \
torch.tensor(np.array(a_lst), dtype=torch.int64), \
torch.tensor(np.array(r_lst), dtype=torch.float), \
torch.tensor(np.array(s_prime_lst), dtype=torch.float), \
torch.tensor(np.array(done_mask_lst), dtype=torch.float)
def size(self):
return len(self.buffer)
class QNet(nn.Module):
def __init__(self):
super(QNet, self).__init__()
self.fc1 = nn.Linear(4, 128)
self.fc2 = nn.Linear(128, 128)
self.fc3 = nn.Linear(128, 2)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
return self.fc3(x)
def sample_action(self, obs, epsilon):
out = self.forward(obs)
if random.random() < epsilon:
return random.randint(0, 1)
else:
return out.argmax().item()
def train(q, q_target, memory, optimizer):
for i in range(10):
s, a, r, s_prime, done_mask = memory.sample(batch_size)
q_out = q(s)
q_a = q_out.gather(1, a)
max_q_prime = q_target(s_prime).max(1)[0].unsqueeze(1)
target = r + gamma * max_q_prime * done_mask
loss = F.mse_loss(q_a, target)
optimizer.zero_grad()
loss.backward()
optimizer.step()
def main():
env = gym.make('CartPole-v1')
q = QNet()
q_target = QNet()
q_target.load_state_dict(q.state_dict())
memory = ReplayBuffer()
optimizer = optim.Adam(q.parameters(), lr=learning_rate)
score_history = []
print("DQN Training Started... (Wait for ~30 seconds)")
for n_epi in range(300): # 300 에피소드
epsilon = max(0.01, 0.08 - 0.01*(n_epi/200))
s, _ = env.reset() # gymnasium 호환
done = False
score = 0.0
while not done:
a = q.sample_action(torch.from_numpy(s).float(), epsilon)
s_prime, r, terminated, truncated, _ = env.step(a)
done = terminated or truncated
done_mask = 0.0 if done else 1.0
memory.put((s, a, r/100.0, s_prime, done_mask))
s = s_prime
score += r
if done: break
if memory.size() > 2000:
train(q, q_target, memory, optimizer)
if n_epi % 20 == 0 and n_epi != 0:
q_target.load_state_dict(q.state_dict())
print(f"Episode: {n_epi}, Score: {score:.1f}, Epsilon: {epsilon:.2f}")
score_history.append(score)
if score > 200: # Early Stop
print("Solved!")
break
env.close()
# 결과 그래프 그리기
plt.plot(score_history)
plt.title('DQN CartPole Score')
plt.xlabel('Episode')
plt.ylabel('Score')
plt.show()
if __name__ == '__main__':
main()
2. PPO (Proximal Policy Optimization)
- 특징: On-policy, 구현 용이, 하이퍼파라미터에 덜 민감함.
- 핵심 구현 포인트:
- Actor-Critic 구조: 행동을 결정하는 Actor와 가치를 평가하는 Critic이 파라미터를 공유하거나 분리됨.
- GAE (Generalized Advantage Estimation): 반환값(Return)과 가치(Value)를 이용해 Advantage 계산.
- Surrogate Loss & Clipping: 기존 정책과 너무 달라지지 않도록 ratio를 클리핑(Clipping)하여 학습 안정성 확보.
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
# Hyperparameters
learning_rate = 0.0005
gamma = 0.98
lmbda = 0.95
eps_clip = 0.1
K_epoch = 3
T_horizon = 20
class PPO(nn.Module):
def __init__(self):
super(PPO, self).__init__()
self.data = []
# Actor와 Critic이 앞단 레이어를 공유하는 구조 (분리해도 무방)
self.fc1 = nn.Linear(4, 256)
self.fc_pi = nn.Linear(256, 2) # Action Prob
self.fc_v = nn.Linear(256, 1) # State Value
self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
def pi(self, x, softmax_dim=0):
x = F.relu(self.fc1(x))
x = self.fc_pi(x)
prob = F.softmax(x, dim=softmax_dim)
return prob
def v(self, x):
x = F.relu(self.fc1(x))
v = self.fc_v(x)
return v
def put_data(self, transition):
self.data.append(transition)
def make_batch(self):
s_lst, a_lst, r_lst, s_prime_lst, prob_a_lst, done_lst = [], [], [], [], [], []
for transition in self.data:
s, a, r, s_prime, prob_a, done = transition
s_lst.append(s)
a_lst.append([a])
r_lst.append([r])
s_prime_lst.append(s_prime)
prob_a_lst.append([prob_a])
done_mask = 0 if done else 1
done_lst.append([done_mask])
s,a,r,s_prime,done_mask, prob_a = torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst), \
torch.tensor(r_lst), torch.tensor(s_prime_lst, dtype=torch.float), \
torch.tensor(done_lst, dtype=torch.float), torch.tensor(prob_a_lst)
self.data = []
return s, a, r, s_prime, done_mask, prob_a
def train_net(self):
s, a, r, s_prime, done_mask, prob_a = self.make_batch()
for i in range(K_epoch):
td_target = r + gamma * self.v(s_prime) * done_mask
delta = td_target - self.v(s)
delta = delta.detach().numpy()
advantage_lst = []
advantage = 0.0
for delta_t in delta[::-1]:
advantage = gamma * lmbda * advantage + delta_t[0]
advantage_lst.append([advantage])
advantage_lst.reverse()
advantage = torch.tensor(advantage_lst, dtype=torch.float)
pi = self.pi(s, softmax_dim=1)
pi_a = pi.gather(1, a)
ratio = torch.exp(torch.log(pi_a) - torch.log(prob_a)) # a/b == exp(log(a)-log(b))
surr1 = ratio * advantage
surr2 = torch.clamp(ratio, 1-eps_clip, 1+eps_clip) * advantage
loss = -torch.min(surr1, surr2) + F.smooth_l1_loss(self.v(s), td_target.detach())
self.optimizer.zero_grad()
loss.mean().backward()
self.optimizer.step()
def main():
env = gym.make('CartPole-v1')
model = PPO()
score = 0.0
print_interval = 20
for n_epi in range(1000):
s, _ = env.reset()
done = False
while not done:
for t in range(T_horizon):
prob = model.pi(torch.from_numpy(s).float())
m = Categorical(prob)
a = m.sample().item()
s_prime, r, terminated, truncated, _ = env.step(a)
done = terminated or truncated
model.put_data((s, a, r/100.0, s_prime, prob[a].item(), done))
s = s_prime
score += r
if done:
break
model.train_net()
if n_epi % print_interval == 0 and n_epi != 0:
print(f"# of episode :{n_epi}, avg score : {score/print_interval:.1f}")
score = 0.0
env.close()
if __name__ == '__main__':
main()
[PPO] CartPole-v1 (완전체 코드)
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
import matplotlib.pyplot as plt
# Hyperparameters
learning_rate = 0.0005
gamma = 0.98
lmbda = 0.95
eps_clip = 0.1
K_epoch = 3
T_horizon = 20
class PPO(nn.Module):
def __init__(self):
super(PPO, self).__init__()
self.data = []
self.fc1 = nn.Linear(4, 256)
self.fc_pi = nn.Linear(256, 2)
self.fc_v = nn.Linear(256, 1)
self.optimizer = optim.Adam(self.parameters(), lr=learning_rate)
def pi(self, x, softmax_dim=0):
x = F.relu(self.fc1(x))
prob = F.softmax(self.fc_pi(x), dim=softmax_dim)
return prob
def v(self, x):
x = F.relu(self.fc1(x))
v = self.fc_v(x)
return v
def put_data(self, transition):
self.data.append(transition)
def make_batch(self):
s_lst, a_lst, r_lst, s_prime_lst, prob_a_lst, done_lst = [], [], [], [], [], []
for transition in self.data:
s, a, r, s_prime, prob_a, done = transition
s_lst.append(s)
a_lst.append([a])
r_lst.append([r])
s_prime_lst.append(s_prime)
prob_a_lst.append([prob_a])
done_mask = 0 if done else 1
done_lst.append([done_mask])
s,a,r,s_prime,done_mask, prob_a = torch.tensor(s_lst, dtype=torch.float), torch.tensor(a_lst), \
torch.tensor(r_lst), torch.tensor(s_prime_lst, dtype=torch.float), \
torch.tensor(done_lst, dtype=torch.float), torch.tensor(prob_a_lst)
self.data = []
return s, a, r, s_prime, done_mask, prob_a
def train_net(self):
s, a, r, s_prime, done_mask, prob_a = self.make_batch()
for i in range(K_epoch):
td_target = r + gamma * self.v(s_prime) * done_mask
delta = td_target - self.v(s)
delta = delta.detach().numpy()
advantage_lst = []
advantage = 0.0
for delta_t in delta[::-1]:
advantage = gamma * lmbda * advantage + delta_t[0]
advantage_lst.append([advantage])
advantage_lst.reverse()
advantage = torch.tensor(advantage_lst, dtype=torch.float)
pi = self.pi(s, softmax_dim=1)
pi_a = pi.gather(1, a)
ratio = torch.exp(torch.log(pi_a) - torch.log(prob_a))
surr1 = ratio * advantage
surr2 = torch.clamp(ratio, 1-eps_clip, 1+eps_clip) * advantage
loss = -torch.min(surr1, surr2) + F.smooth_l1_loss(self.v(s), td_target.detach())
self.optimizer.zero_grad()
loss.mean().backward()
self.optimizer.step()
def main():
env = gym.make('CartPole-v1')
model = PPO()
score_history = []
print("PPO Training Started...")
for n_epi in range(300):
s, _ = env.reset()
done = False
score = 0.0
while not done:
for t in range(T_horizon):
prob = model.pi(torch.from_numpy(s).float())
m = Categorical(prob)
a = m.sample().item()
s_prime, r, terminated, truncated, _ = env.step(a)
done = terminated or truncated
model.put_data((s, a, r/100.0, s_prime, prob[a].item(), done))
s = s_prime
score += r
if done: break
model.train_net()
score_history.append(score)
if n_epi % 20 == 0 and n_epi != 0:
print(f"Episode: {n_epi}, Avg Score: {score:.1f}")
env.close()
plt.plot(score_history, color='orange')
plt.title('PPO CartPole Score')
plt.show()
if __name__ == '__main__':
main()
3. SAC (Soft Actor-Critic)
- 환경: Pendulum-v1 (연속 행동 공간, 로봇 관절 제어와 유사)
- 특징: Off-policy, Maximum Entropy RL (탐험과 활용의 균형을 엔트로피로 자동 조절).
- 핵심 구현 포인트:
- Reparameterization Trick: 연속 공간에서 행동을 샘플링하면서도 그래디언트를 전파하기 위해 rsample() 사용.
- Twin Q-Network: DQN의 Overestimation 문제를 해결하기 위해 2개의 Q함수 중 작은 값을 사용.
- Soft Update (Polyak Averaging): 타겟 네트워크를 서서히 업데이트 (tau).
- Alpha Tuning: 엔트로피 계수($\alpha$) 자동 조정 (코드 단축을 위해 이 예제에서는 고정값 사용).
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Normal
import numpy as np
import random
from collections import deque
# Hyperparameters
lr_pi = 0.0005
lr_q = 0.001
gamma = 0.99
batch_size = 32
buffer_limit = 50000
tau = 0.01 # Soft update parameter
init_alpha = 0.01 # Entropy coefficient
class ReplayBuffer:
def __init__(self):
self.buffer = deque(maxlen=buffer_limit)
def put(self, transition):
self.buffer.append(transition)
def sample(self, n):
mini_batch = random.sample(self.buffer, n)
s_lst, a_lst, r_lst, s_prime_lst, done_mask_lst = [], [], [], [], []
for transition in mini_batch:
s, a, r, s_prime, done_mask = transition
s_lst.append(s)
a_lst.append(a)
r_lst.append([r])
s_prime_lst.append(s_prime)
done_mask_lst.append([done_mask])
return torch.tensor(np.array(s_lst), dtype=torch.float), torch.tensor(np.array(a_lst), dtype=torch.float), \
torch.tensor(np.array(r_lst), dtype=torch.float), torch.tensor(np.array(s_prime_lst), dtype=torch.float), \
torch.tensor(np.array(done_mask_lst), dtype=torch.float)
def size(self):
return len(self.buffer)
class PolicyNet(nn.Module):
def __init__(self):
super(PolicyNet, self).__init__()
self.fc1 = nn.Linear(3, 128)
self.fc2 = nn.Linear(128, 128)
self.fc_mu = nn.Linear(128, 1)
self.fc_std = nn.Linear(128, 1)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
mu = self.fc_mu(x)
std = F.softplus(self.fc_std(x)) # 표준편차는 양수여야 함
return mu, std
def sample(self, x):
mu, std = self.forward(x)
dist = Normal(mu, std)
action = dist.rsample() # Reparameterization Trick
log_prob = dist.log_prob(action)
# Tanh Squashing: 행동 범위를 [-1, 1]로 제한
real_action = torch.tanh(action)
# Tanh 변환에 따른 Log-prob 보정 (Jacobian)
real_log_prob = log_prob - torch.log(1 - torch.tanh(action).pow(2) + 1e-7)
return real_action * 2.0, real_log_prob # Pendulum은 범위가 [-2, 2]이므로 2배
class QNet(nn.Module):
def __init__(self):
super(QNet, self).__init__()
# State(3) + Action(1) = Input(4)
self.fc_s = nn.Linear(3, 64)
self.fc_a = nn.Linear(1, 64)
self.fc_cat = nn.Linear(128, 128)
self.fc_out = nn.Linear(128, 1)
def forward(self, x, a):
h1 = F.relu(self.fc_s(x))
h2 = F.relu(self.fc_a(a))
cat = torch.cat([h1, h2], dim=1)
q = self.fc_out(F.relu(self.fc_cat(cat)))
return q
class SAC:
def __init__(self):
self.policy = PolicyNet()
self.q1 = QNet()
self.q2 = QNet() # Twin Q
self.q1_target = QNet()
self.q2_target = QNet()
self.q1_target.load_state_dict(self.q1.state_dict())
self.q2_target.load_state_dict(self.q2.state_dict())
self.pi_optimizer = optim.Adam(self.policy.parameters(), lr=lr_pi)
self.q1_optimizer = optim.Adam(self.q1.parameters(), lr=lr_q)
self.q2_optimizer = optim.Adam(self.q2.parameters(), lr=lr_q)
def train(self, memory, target_entropy):
s, a, r, s_prime, done_mask = memory.sample(batch_size)
# 1. Q-Function Update
with torch.no_grad():
next_action, next_log_prob = self.policy.sample(s_prime)
q1_next = self.q1_target(s_prime, next_action)
q2_next = self.q2_target(s_prime, next_action)
q_target = r + gamma * done_mask * (torch.min(q1_next, q2_next) - init_alpha * next_log_prob)
q1_loss = F.smooth_l1_loss(self.q1(s, a), q_target)
q2_loss = F.smooth_l1_loss(self.q2(s, a), q_target)
self.q1_optimizer.zero_grad()
q1_loss.backward()
self.q1_optimizer.step()
self.q2_optimizer.zero_grad()
q2_loss.backward()
self.q2_optimizer.step()
# 2. Policy Update
action, log_prob = self.policy.sample(s)
q1_val = self.q1(s, action)
q2_val = self.q2(s, action)
# Maximize (Min Q - alpha * log_prob)
pi_loss = -(torch.min(q1_val, q2_val) - init_alpha * log_prob).mean()
self.pi_optimizer.zero_grad()
pi_loss.backward()
self.pi_optimizer.step()
# 3. Soft Update (Target Networks)
for param, target_param in zip(self.q1.parameters(), self.q1_target.parameters()):
target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)
for param, target_param in zip(self.q2.parameters(), self.q2_target.parameters()):
target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)
def main():
env = gym.make('Pendulum-v1')
agent = SAC()
memory = ReplayBuffer()
print("Start Training SAC on Pendulum-v1...")
for n_epi in range(200):
s, _ = env.reset()
done = False
score = 0.0
while not done:
a, _ = agent.policy.sample(torch.from_numpy(s).float())
a = a.item() # Tensor to float
s_prime, r, terminated, truncated, _ = env.step([a])
done = terminated or truncated
memory.put((s, a, r, s_prime, 0.0 if done else 1.0))
s = s_prime
score += r
if memory.size() > 1000:
agent.train(memory, -1.0) # target_entropy는 -dim(A)
if n_epi % 10 == 0:
print(f"Episode: {n_epi}, Score: {score:.1f}")
env.close()
if __name__ == '__main__':
main()
SAC Pendulum 코드 (그래프 포함)
import gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Normal
import numpy as np
import random
from collections import deque
import matplotlib.pyplot as plt
# Hyperparameters
lr_pi = 0.0005
lr_q = 0.001
gamma = 0.99
batch_size = 32
buffer_limit = 50000
tau = 0.01
init_alpha = 0.01
class ReplayBuffer:
def __init__(self):
self.buffer = deque(maxlen=buffer_limit)
def put(self, transition):
self.buffer.append(transition)
def sample(self, n):
mini_batch = random.sample(self.buffer, n)
s_lst, a_lst, r_lst, s_prime_lst, done_mask_lst = [], [], [], [], []
for transition in mini_batch:
s, a, r, s_prime, done_mask = transition
s_lst.append(s)
a_lst.append([a]) # [중요] 스칼라 값을 리스트로 감싸서 [Batch, 1] 형태 유지
r_lst.append([r])
s_prime_lst.append(s_prime)
done_mask_lst.append([done_mask])
return torch.tensor(np.array(s_lst), dtype=torch.float), \
torch.tensor(np.array(a_lst), dtype=torch.float), \
torch.tensor(np.array(r_lst), dtype=torch.float), \
torch.tensor(np.array(s_prime_lst), dtype=torch.float), \
torch.tensor(np.array(done_mask_lst), dtype=torch.float)
def size(self):
return len(self.buffer)
class PolicyNet(nn.Module):
def __init__(self):
super(PolicyNet, self).__init__()
self.fc1 = nn.Linear(3, 128)
self.fc2 = nn.Linear(128, 128)
self.fc_mu = nn.Linear(128, 1)
self.fc_std = nn.Linear(128, 1)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
mu = self.fc_mu(x)
std = F.softplus(self.fc_std(x))
return mu, std
def sample(self, x):
mu, std = self.forward(x)
dist = Normal(mu, std)
action = dist.rsample()
log_prob = dist.log_prob(action)
real_action = torch.tanh(action)
real_log_prob = log_prob - torch.log(1 - torch.tanh(action).pow(2) + 1e-7)
return real_action * 2.0, real_log_prob
class QNet(nn.Module):
def __init__(self):
super(QNet, self).__init__()
# State(3) + Action(1)
self.fc_s = nn.Linear(3, 64)
self.fc_a = nn.Linear(1, 64)
self.fc_cat = nn.Linear(128, 128)
self.fc_out = nn.Linear(128, 1)
def forward(self, x, a):
h1 = F.relu(self.fc_s(x))
h2 = F.relu(self.fc_a(a))
cat = torch.cat([h1, h2], dim=1)
q = self.fc_out(F.relu(self.fc_cat(cat)))
return q
class SAC:
def __init__(self):
self.policy = PolicyNet()
self.q1 = QNet()
self.q2 = QNet()
self.q1_target = QNet()
self.q2_target = QNet()
self.q1_target.load_state_dict(self.q1.state_dict())
self.q2_target.load_state_dict(self.q2.state_dict())
self.pi_optimizer = optim.Adam(self.policy.parameters(), lr=lr_pi)
self.q1_optimizer = optim.Adam(self.q1.parameters(), lr=lr_q)
self.q2_optimizer = optim.Adam(self.q2.parameters(), lr=lr_q)
def train(self, memory, target_entropy):
s, a, r, s_prime, done_mask = memory.sample(batch_size)
with torch.no_grad():
next_action, next_log_prob = self.policy.sample(s_prime)
q1_next = self.q1_target(s_prime, next_action)
q2_next = self.q2_target(s_prime, next_action)
q_target = r + gamma * done_mask * (torch.min(q1_next, q2_next) - init_alpha * next_log_prob)
q1_loss = F.smooth_l1_loss(self.q1(s, a), q_target)
q2_loss = F.smooth_l1_loss(self.q2(s, a), q_target)
self.q1_optimizer.zero_grad()
q1_loss.backward()
self.q1_optimizer.step()
self.q2_optimizer.zero_grad()
q2_loss.backward()
self.q2_optimizer.step()
action, log_prob = self.policy.sample(s)
q1_val = self.q1(s, action)
q2_val = self.q2(s, action)
pi_loss = -(torch.min(q1_val, q2_val) - init_alpha * log_prob).mean()
self.pi_optimizer.zero_grad()
pi_loss.backward()
self.pi_optimizer.step()
for param, target_param in zip(self.q1.parameters(), self.q1_target.parameters()):
target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)
for param, target_param in zip(self.q2.parameters(), self.q2_target.parameters()):
target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)
def main():
# Render 모드는 끄고 학습 속도 향상 (render_mode='human' 제거)
env = gym.make('Pendulum-v1')
agent = SAC()
memory = ReplayBuffer()
print("Start Training SAC on Pendulum-v1...")
score_history = []
for n_epi in range(150): # 시간 관계상 150 에피소드만 (충분히 수렴함)
s, _ = env.reset()
done = False
score = 0.0
while not done:
a, _ = agent.policy.sample(torch.from_numpy(s).float())
a = a.item()
s_prime, r, terminated, truncated, _ = env.step([a])
done = terminated or truncated
# [수정] a를 그대로 넣으면 float이므로, ReplayBuffer.sample에서 [a]로 감싸주도록 처리함
memory.put((s, a, r, s_prime, 0.0 if done else 1.0))
s = s_prime
score += r
if memory.size() > 1000:
agent.train(memory, -1.0)
score_history.append(score)
if n_epi % 10 == 0:
print(f"Episode: {n_epi}, Score: {score:.1f}")
env.close()
# [추가] 결과 그래프 그리기
plt.plot(score_history)
plt.title('SAC Pendulum Score')
plt.xlabel('Episode')
plt.ylabel('Score')
plt.grid(True)
plt.show()
if __name__ == '__main__':
main()728x90
반응형
'Study' 카테고리의 다른 글
| Behavior Cloning (모방 학습) (0) | 2025.11.29 |
|---|---|
| OpenAI Gym 인터페이스를 따르는 Custom Environment 구현 (0) | 2025.11.29 |
| 2D Grid Map에서의 A* 최단 경로 탐색 (0) | 2025.11.29 |
| 자료구조 B-tree 기본 개념 파악 (2) (0) | 2021.12.27 |
| 자료구조 B-tree 기본 개념 파악 (1) (0) | 2021.12.21 |