728x90
반응형
문제 개요
- 배경: 강화학습은 초기에 탐색 비용이 많이 듭니다. 로봇 팔을 무작위로 휘두르면 고장 날 수 있죠. 그래서 **사람이 조종한 데이터(Expert Demo)**를 미리 학습시켜 초기 성능을 확보합니다.
- 목표: CartPole-v1 환경에서, 사전에 수집된 '전문가 데이터(State, Action)'를 사용하여 지도 학습(Supervised Learning) 방식으로 정책 네트워크를 학습시키시오.
- 조건:
- 데이터 생성: 학습된 DQN 모델을 이용해 에피소드 5개 분량의 (state, action) 쌍을 수집하시오. (코드에 포함)
- 학습: 수집된 데이터를 DataLoader로 만들고, CrossEntropyLoss를 사용하여 새 네트워크를 학습시키시오.
- 평가: 모방 학습된 모델로 게임을 실행하여 점수를 출력하시오.
import gym
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
import numpy as np
# -----------------------------------
# 0. 준비: 학습된 전문가 모델 (DQN 예시) 가정
# -----------------------------------
# (실제 시험에선 미리 저장된 .pth 파일을 로드하거나, 간단한 규칙 기반 전문가를 사용)
class ExpertModel(nn.Module):
def __init__(self):
super(ExpertModel, self).__init__()
self.fc = nn.Sequential(
nn.Linear(4, 128), nn.ReLU(),
nn.Linear(128, 2)
)
def forward(self, x):
return self.fc(x)
def get_action(self, x):
# 간단한 Heuristic 전문가 (막대가 기울어지는 쪽으로 이동)
# 실제로는 학습된 DQN 모델을 로드해서 사용
if x[2] > 0: return 1
else: return 0
# -----------------------------------
# 1. 데이터 수집 (Data Collection)
# -----------------------------------
def collect_expert_data(env, expert, n_episodes=10):
states = []
actions = []
for _ in range(n_episodes):
s, _ = env.reset()
done = False
while not done:
# 전문가의 행동 채택
a = expert.get_action(s)
states.append(s)
actions.append(a)
s, r, terminated, truncated, _ = env.step(a)
done = terminated or truncated
return np.array(states), np.array(actions)
# -----------------------------------
# 2. 학생 모델 (Student Network)
# -----------------------------------
class StudentNet(nn.Module):
def __init__(self):
super(StudentNet, self).__init__()
self.fc = nn.Sequential(
nn.Linear(4, 64),
nn.ReLU(),
nn.Linear(64, 2) # Output: Logits
)
def forward(self, x):
return self.fc(x)
# -----------------------------------
# 3. Main Logic (Training)
# -----------------------------------
def main():
env = gym.make('CartPole-v1')
expert = ExpertModel() # 실제로는 torch.load('dqn.pth')
print("Collecting Expert Data...")
x_train, y_train = collect_expert_data(env, expert, n_episodes=50)
print(f"Collected {len(x_train)} samples.")
# Tensor 변환
x_tensor = torch.FloatTensor(x_train)
y_tensor = torch.LongTensor(y_train)
dataset = TensorDataset(x_tensor, y_tensor)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)
student = StudentNet()
optimizer = optim.Adam(student.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss() # Classification 문제로 접근
print("Start Behavior Cloning (Supervised Learning)...")
for epoch in range(20): # 짧게 20 에폭
total_loss = 0
for bx, by in dataloader:
optimizer.zero_grad()
pred = student(bx)
loss = criterion(pred, by)
loss.backward()
optimizer.step()
total_loss += loss.item()
if epoch % 5 == 0:
print(f"Epoch {epoch}, Loss: {total_loss/len(dataloader):.4f}")
# -----------------------------------
# 4. 평가 (Evaluation)
# -----------------------------------
print("Evaluating Student Agent...")
s, _ = env.reset()
score = 0
done = False
while not done:
with torch.no_grad():
logits = student(torch.FloatTensor(s))
a = torch.argmax(logits).item()
s, r, terminated, truncated, _ = env.step(a)
done = terminated or truncated
score += r
print(f"Student Score: {score}")
env.close()
if __name__ == '__main__':
main()
- Sim2Real Gap (시뮬레이션과 현실의 차이):
- "Custom Gym 환경을 만들 때 단순히 물리 법칙만 구현하는 것이 아니라, 실제 센서의 **노이즈(Gaussian Noise)**나 통신 **지연(Latency)**을 step 함수 내에 모델링해야 강화학습 모델이 실제 로봇에서도 잘 작동한다."
- Data-Driven Approach:
- "강화학습은 맨땅에 헤딩(Scratch)하는 것보다, Behavior Cloning으로 **초기 정책(Initial Policy)**을 잡아주고 그 이후에 PPO 등으로 Fine-tuning하는 것이 로봇 제어에서 훨씬 효율적이고 안전하다."
- State Design의 중요성:
- "Observation Space를 정의할 때, 로봇의 모든 정보를 다 넣는 것보다 **필수적인 정보(Feature Engineering)**만 추려서 넣어주는 것이 수렴 속도에 큰 영향을 미친다."

[Behavior Cloning] CartPole (전문가 자동 생성 포함)
import gym
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
import numpy as np
import time
# -----------------------------------
# 1. Rule-Based Expert (데이터 생성용)
# -----------------------------------
class RuleBasedExpert:
# 학습된 모델 대신, 간단한 물리 법칙으로 전문가 흉내를 냄
def get_action(self, state):
# state: [cart_pos, cart_vel, pole_angle, pole_vel]
pole_angle = state[2]
pole_vel = state[3]
# 막대가 오른쪽으로 기울면 오른쪽으로 이동(1), 아니면 왼쪽(0)
# 각속도까지 고려하여 더 안정적으로 버팀
if pole_angle + 0.1 * pole_vel > 0:
return 1
else:
return 0
# -----------------------------------
# 2. Student Network
# -----------------------------------
class StudentNet(nn.Module):
def __init__(self):
super(StudentNet, self).__init__()
self.fc = nn.Sequential(
nn.Linear(4, 64),
nn.ReLU(),
nn.Linear(64, 2)
)
def forward(self, x):
return self.fc(x)
# -----------------------------------
# 3. Main Process
# -----------------------------------
def main():
env = gym.make('CartPole-v1')
expert = RuleBasedExpert()
# [Step 1] 전문가 데이터 수집
print("1. Collecting Expert Data...")
states = []
actions = []
for _ in range(30): # 30 에피소드만큼 데이터 수집
s, _ = env.reset()
done = False
while not done:
a = expert.get_action(s) # 전문가의 행동
states.append(s)
actions.append(a)
s, r, terminated, truncated, _ = env.step(a)
done = terminated or truncated
print(f" -> Collected {len(states)} samples.")
# 데이터셋 생성
x_tensor = torch.FloatTensor(np.array(states))
y_tensor = torch.LongTensor(np.array(actions))
dataset = TensorDataset(x_tensor, y_tensor)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)
# [Step 2] 학생 모델 학습 (Supervised Learning)
student = StudentNet()
optimizer = optim.Adam(student.parameters(), lr=0.01)
criterion = nn.CrossEntropyLoss()
print("2. Training Student Network...")
for epoch in range(15):
total_loss = 0
for bx, by in dataloader:
optimizer.zero_grad()
pred = student(bx)
loss = criterion(pred, by)
loss.backward()
optimizer.step()
total_loss += loss.item()
if epoch % 5 == 0:
print(f" -> Epoch {epoch}, Loss: {total_loss/len(dataloader):.4f}")
# [Step 3] 학생 모델 평가
print("3. Testing Student Agent (Visualizing)...")
s, _ = env.reset()
done = False
total_score = 0
# 텍스트로 시각화 (터미널 렌더링)
while not done:
with torch.no_grad():
logits = student(torch.FloatTensor(s))
a = torch.argmax(logits).item()
s, r, terminated, truncated, _ = env.step(a)
done = terminated or truncated
total_score += r
# 간단한 진행상황 출력
# print(f"Step: {total_score:.0f}, Action: {a}")
print(f"Student Final Score: {total_score}")
if total_score >= 195:
print("Success! The student learned well.")
else:
print("Keep training...")
env.close()
if __name__ == '__main__':
main()728x90
반응형
'Study' 카테고리의 다른 글
| [Continuous/SAC] 1D Hovering Drone: 고도를 유지하려는 드론 (PID 제어를 AI로 대체하는 컨셉) (1) | 2025.11.29 |
|---|---|
| [Discrete/PPO] Dynamic Grid World: 움직이는 장애물을 피해 목표로 가는 로봇 (동적 환경 계획) (0) | 2025.11.29 |
| OpenAI Gym 인터페이스를 따르는 Custom Environment 구현 (0) | 2025.11.29 |
| 2D Grid Map에서의 A* 최단 경로 탐색 (0) | 2025.11.29 |
| CartPole-v1 환경에서의 강화학습 에이전트 구현 (0) | 2025.11.29 |