Study

Behavior Cloning (모방 학습)

SigmoidFunction 2025. 11. 29. 11:05
728x90
반응형

문제 개요

  • 배경: 강화학습은 초기에 탐색 비용이 많이 듭니다. 로봇 팔을 무작위로 휘두르면 고장 날 수 있죠. 그래서 **사람이 조종한 데이터(Expert Demo)**를 미리 학습시켜 초기 성능을 확보합니다.
  • 목표: CartPole-v1 환경에서, 사전에 수집된 '전문가 데이터(State, Action)'를 사용하여 지도 학습(Supervised Learning) 방식으로 정책 네트워크를 학습시키시오.
  • 조건:
    1. 데이터 생성: 학습된 DQN 모델을 이용해 에피소드 5개 분량의 (state, action) 쌍을 수집하시오. (코드에 포함)
    2. 학습: 수집된 데이터를 DataLoader로 만들고, CrossEntropyLoss를 사용하여 새 네트워크를 학습시키시오.
    3. 평가: 모방 학습된 모델로 게임을 실행하여 점수를 출력하시오.
import gym
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
import numpy as np

# -----------------------------------
# 0. 준비: 학습된 전문가 모델 (DQN 예시) 가정
# -----------------------------------
# (실제 시험에선 미리 저장된 .pth 파일을 로드하거나, 간단한 규칙 기반 전문가를 사용)
class ExpertModel(nn.Module):
    def __init__(self):
        super(ExpertModel, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(4, 128), nn.ReLU(),
            nn.Linear(128, 2)
        )
    def forward(self, x):
        return self.fc(x)
    
    def get_action(self, x):
        # 간단한 Heuristic 전문가 (막대가 기울어지는 쪽으로 이동)
        # 실제로는 학습된 DQN 모델을 로드해서 사용
        if x[2] > 0: return 1
        else: return 0

# -----------------------------------
# 1. 데이터 수집 (Data Collection)
# -----------------------------------
def collect_expert_data(env, expert, n_episodes=10):
    states = []
    actions = []
    
    for _ in range(n_episodes):
        s, _ = env.reset()
        done = False
        while not done:
            # 전문가의 행동 채택
            a = expert.get_action(s)
            
            states.append(s)
            actions.append(a)
            
            s, r, terminated, truncated, _ = env.step(a)
            done = terminated or truncated
            
    return np.array(states), np.array(actions)

# -----------------------------------
# 2. 학생 모델 (Student Network)
# -----------------------------------
class StudentNet(nn.Module):
    def __init__(self):
        super(StudentNet, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(4, 64),
            nn.ReLU(),
            nn.Linear(64, 2) # Output: Logits
        )
        
    def forward(self, x):
        return self.fc(x)

# -----------------------------------
# 3. Main Logic (Training)
# -----------------------------------
def main():
    env = gym.make('CartPole-v1')
    expert = ExpertModel() # 실제로는 torch.load('dqn.pth')
    
    print("Collecting Expert Data...")
    x_train, y_train = collect_expert_data(env, expert, n_episodes=50)
    print(f"Collected {len(x_train)} samples.")
    
    # Tensor 변환
    x_tensor = torch.FloatTensor(x_train)
    y_tensor = torch.LongTensor(y_train)
    
    dataset = TensorDataset(x_tensor, y_tensor)
    dataloader = DataLoader(dataset, batch_size=64, shuffle=True)
    
    student = StudentNet()
    optimizer = optim.Adam(student.parameters(), lr=0.01)
    criterion = nn.CrossEntropyLoss() # Classification 문제로 접근
    
    print("Start Behavior Cloning (Supervised Learning)...")
    for epoch in range(20): # 짧게 20 에폭
        total_loss = 0
        for bx, by in dataloader:
            optimizer.zero_grad()
            pred = student(bx)
            loss = criterion(pred, by)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
            
        if epoch % 5 == 0:
            print(f"Epoch {epoch}, Loss: {total_loss/len(dataloader):.4f}")
            
    # -----------------------------------
    # 4. 평가 (Evaluation)
    # -----------------------------------
    print("Evaluating Student Agent...")
    s, _ = env.reset()
    score = 0
    done = False
    while not done:
        with torch.no_grad():
            logits = student(torch.FloatTensor(s))
            a = torch.argmax(logits).item()
        
        s, r, terminated, truncated, _ = env.step(a)
        done = terminated or truncated
        score += r
        
    print(f"Student Score: {score}")
    env.close()

if __name__ == '__main__':
    main()

 

 

 

  • Sim2Real Gap (시뮬레이션과 현실의 차이):
    • "Custom Gym 환경을 만들 때 단순히 물리 법칙만 구현하는 것이 아니라, 실제 센서의 **노이즈(Gaussian Noise)**나 통신 **지연(Latency)**을 step 함수 내에 모델링해야 강화학습 모델이 실제 로봇에서도 잘 작동한다."
  • Data-Driven Approach:
    • "강화학습은 맨땅에 헤딩(Scratch)하는 것보다, Behavior Cloning으로 **초기 정책(Initial Policy)**을 잡아주고 그 이후에 PPO 등으로 Fine-tuning하는 것이 로봇 제어에서 훨씬 효율적이고 안전하다."
  • State Design의 중요성:
    • "Observation Space를 정의할 때, 로봇의 모든 정보를 다 넣는 것보다 **필수적인 정보(Feature Engineering)**만 추려서 넣어주는 것이 수렴 속도에 큰 영향을 미친다."

 

[Behavior Cloning] CartPole (전문가 자동 생성 포함)

import gym
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
import numpy as np
import time

# -----------------------------------
# 1. Rule-Based Expert (데이터 생성용)
# -----------------------------------
class RuleBasedExpert:
    # 학습된 모델 대신, 간단한 물리 법칙으로 전문가 흉내를 냄
    def get_action(self, state):
        # state: [cart_pos, cart_vel, pole_angle, pole_vel]
        pole_angle = state[2]
        pole_vel = state[3]
        
        # 막대가 오른쪽으로 기울면 오른쪽으로 이동(1), 아니면 왼쪽(0)
        # 각속도까지 고려하여 더 안정적으로 버팀
        if pole_angle + 0.1 * pole_vel > 0:
            return 1
        else:
            return 0

# -----------------------------------
# 2. Student Network
# -----------------------------------
class StudentNet(nn.Module):
    def __init__(self):
        super(StudentNet, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(4, 64),
            nn.ReLU(),
            nn.Linear(64, 2)
        )
    def forward(self, x):
        return self.fc(x)

# -----------------------------------
# 3. Main Process
# -----------------------------------
def main():
    env = gym.make('CartPole-v1')
    expert = RuleBasedExpert()
    
    # [Step 1] 전문가 데이터 수집
    print("1. Collecting Expert Data...")
    states = []
    actions = []
    
    for _ in range(30): # 30 에피소드만큼 데이터 수집
        s, _ = env.reset()
        done = False
        while not done:
            a = expert.get_action(s) # 전문가의 행동
            states.append(s)
            actions.append(a)
            s, r, terminated, truncated, _ = env.step(a)
            done = terminated or truncated
            
    print(f" -> Collected {len(states)} samples.")

    # 데이터셋 생성
    x_tensor = torch.FloatTensor(np.array(states))
    y_tensor = torch.LongTensor(np.array(actions))
    dataset = TensorDataset(x_tensor, y_tensor)
    dataloader = DataLoader(dataset, batch_size=64, shuffle=True)

    # [Step 2] 학생 모델 학습 (Supervised Learning)
    student = StudentNet()
    optimizer = optim.Adam(student.parameters(), lr=0.01)
    criterion = nn.CrossEntropyLoss()
    
    print("2. Training Student Network...")
    for epoch in range(15):
        total_loss = 0
        for bx, by in dataloader:
            optimizer.zero_grad()
            pred = student(bx)
            loss = criterion(pred, by)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        if epoch % 5 == 0:
            print(f" -> Epoch {epoch}, Loss: {total_loss/len(dataloader):.4f}")

    # [Step 3] 학생 모델 평가
    print("3. Testing Student Agent (Visualizing)...")
    s, _ = env.reset()
    done = False
    total_score = 0
    
    # 텍스트로 시각화 (터미널 렌더링)
    while not done:
        with torch.no_grad():
            logits = student(torch.FloatTensor(s))
            a = torch.argmax(logits).item()
        
        s, r, terminated, truncated, _ = env.step(a)
        done = terminated or truncated
        total_score += r
        
        # 간단한 진행상황 출력
        # print(f"Step: {total_score:.0f}, Action: {a}") 
        
    print(f"Student Final Score: {total_score}")
    if total_score >= 195:
        print("Success! The student learned well.")
    else:
        print("Keep training...")
        
    env.close()

if __name__ == '__main__':
    main()
728x90
반응형