본문 바로가기

자연어처리

LSTM, GRU (2)

1. nn.LSTM( ), nn.GRU( )


1.1 (일반적인) LSTM/GRU

■ 파이토치에서 LSTM/GRU 셀을 사용하는 방법은 RNN 셀을 사용하려고 했을 때와 유사하다.

torch.nn.LSTM(input_size, hidden_size, num_layers=1, bias=True, batch_first=False, dropout=0.0, 
    bidirectional=False, proj_size=0, device=None, dtype=None)
    
torch.nn.GRU(input_size, hidden_size, num_layers=1, bias=True, batch_first=False, dropout=0.0, 
    bidirectional=False, device=None, dtype=None)

■ 예를 들어, LSTM과 input 데이터를 다음과 같이 정의했을 때, LSTM의 결과로 outputs, hidden_state, cell_state가 반환된다.

■ nn.LSTM( )이 nn.RNN( ) 및 nn.GRU( )와 다른 점은 리턴값으로 cell_state를 반환한다는 점이다.

lstm = nn.LSTM(input_size=5, hidden_size=10, num_layers=1, 
               bidirectional=False, batch_first=True)
lstm
```#결과#```
LSTM(5, 10, batch_first=True)
````````````
               
               
# (batch_size, sequence_length = time steps, input_size = # of features)
inputs = torch.Tensor(1, 15, 5) 

outputs, (hidden_state, cell_state) = lstm(inputs)
print(outputs.shape, hidden_state.shape, cell_state.shape)
```#결과#```
torch.Size([1, 15, 10]) torch.Size([1, 1, 10]) torch.Size([1, 1, 10])
````````````

- 첫 번째 리턴값인 outputs의 크기는 [batch_size, seq_len = time_steps, hidden_size x bidirectional(1)]이며, 모든 시점의 은닉 벡터들을 담고 있다.

- 두 번째 리턴값인 hidden_state는 마지막 시점의 은닉 상태이며 [bidirectional(1) x num_layers, batch_size, hidden_size]의 크기를 갖는다.

- 세 번째 리턴값인 cell_state는 기억 셀(memory cell) 또는 셀 상태 벡터 \( c_t \)이며 [bidirectional(1) x num_layers, batch_size, hidden_size]의 크기를 갖는다.

■ 이번에는 nn.GRU를 정의했을 때, inputs을 입력으로 넣는다면 

gru = nn.GRU(input_size=5, hidden_size=10, num_layers=1, 
               bidirectional=False, batch_first=True)
gru
```#결과#```
GRU(5, 10, batch_first=True)
````````````

outputs, hidden_state = gru(inputs)
print(outputs.shape, hidden_state.shape)
```#결과#```
torch.Size([1, 15, 10]) torch.Size([1, 1, 10])
````````````

- 첫 번째 리턴값인 outputs의 크기는 [batch_size, seq_len = time_steps, hidden_size x bidirectional(1)]이며, 모든 시점의 은닉 벡터들을 담고 있다.

- 두 번째 리턴값인 hidden_state는 마지막 시점의 은닉 상태이며 [bidirectional(1) x num_layers, batch_size, hidden_size]의 크기를 갖는다.


1.2 Stacking LSTM/GRU

■ LSTM/GRU를 겹층으로 쌓아올리는 것을 Stacking LSTM/GRU라고 한다. 겹겹이 LSTM/GRU 셀을 입력-출력 방향으로 쌓기 위해서는 다음과 같이 num_layers 파라미터의 값에 쌓고자 하는 층의 개수를 지정하면 된다.

lstm = nn.LSTM(input_size=5, hidden_size=10, num_layers=2, 
               bidirectional=False, batch_first=True)
lstm
```#결과#```
LSTM(5, 10, num_layers=2, batch_first=True)
````````````

outputs, (hidden_state, cell_state) = lstm(inputs)
print(outputs.shape, hidden_state.shape, cell_state.shape)
```#결과#```
torch.Size([1, 15, 10]) torch.Size([2, 1, 10]) torch.Size([2, 1, 10])
````````````

gru = nn.GRU(input_size=5, hidden_size=10, num_layers=2, 
               bidirectional=False, batch_first=True)             
gru
```#결과#```
GRU(5, 10, num_layers=2, batch_first=True)
````````````

outputs, hidden_state = gru(inputs)
print(outputs.shape, hidden_state.shape)
```#결과#```
torch.Size([1, 15, 10]) torch.Size([2, 1, 10])
````````````

1.3 Bidirectional LSTM/GRU

■ 양방향 LSTM/GRU를 사용하기 위해서는 bidirectional = True로 설정하면 된다.

lstm = nn.LSTM(input_size=5, hidden_size=10, num_layers=2, 
               bidirectional=True, batch_first=True)
lstm
```#결과#```
torch.Size([1, 15, 20]) torch.Size([4, 1, 10]) torch.Size([4, 1, 10])
````````````
               
outputs, (hidden_state, cell_state) = lstm(inputs)
print(outputs.shape, hidden_state.shape, cell_state.shape)
```#결과#```
torch.Size([1, 15, 20]) torch.Size([4, 1, 10]) torch.Size([4, 1, 10])
````````````  
               
gru = nn.GRU(input_size=5, hidden_size=10, num_layers=2, 
               bidirectional=True, batch_first=True)
gru
```#결과#```
torch.Size([1, 15, 20]) torch.Size([4, 1, 10])
````````````        

outputs, hidden_state = gru(inputs)
print(outputs.shape, hidden_state.shape)
```#결과#```
torch.Size([1, 15, 20]) torch.Size([4, 1, 10])
````````````


2. 예제 - IMDB 분류

■ 예제로 사용할 데이터는 torchtext의 IMDB 데이터이다. IMDB 데이터를 이용해서 리뷰에 대한 감성 분류를 수행하는 모델을 만들어본다. 

from torchtext import data
from torchtext import datasets

text = data.Field(batch_first=True, fix_length=500, tokenize=str.split,
                 pad_first=True, pad_token='<pad>', unk_token='<unk>')
label = data.LabelField(dtype=torch.float)

train_data, test_data = datasets.IMDB.splits(text_field=text, label_field=label)

■ data.examples를 통해 데이터의 개수를 확인할 수 있다.

print(f'train data length: {len(train_data.examples)}')
print(f'test data length: {len(test_data.examples)}')
```#결과#```
train data length: 25000
test data length: 25000
````````````

■ vars( ) 함수를 이용하여 데이터 값을 직접 확인해볼 수 있다.

print('input', ' '.join(vars(train_data.examples[1])['text']));print()
print('label', vars(train_data.examples[1])['label']) 
```#결과#```
input Homelessness (or Houselessness as George Carlin stated) has been an issue for years but never a plan to help those on the street that were once considered human who did everything from going to school, work, or vote for the matter. Most people think of
...,
.<br /><br />Or maybe this film will inspire you to help others.

label pos
````````````

- 첫 번째 데이터의 정답은 긍정(pos)임을 확인할 수 있다.

■ 이러한 텍스트 데이터를 토큰화하기에 앞서, 먼저 데이터 정제(cleansing) 작업이 필요하다.

■ 현재 첫 번째 데이터를 출력해보면 대문자, <br /> 태그, 그리고 다양한 특수 문자가 포함되어 있는 것을 확인할 수 있다. 이때, 특수 문자 중 작은따옴표(')는 예외적으로 제거하지 않는다, 이는 he's처럼 단어 내부에서 의미를 가지는 경우가 있기 때문이다.

import re

def preprocess_sentence(sent):
    sent = sent.lower() # 소문자화
    sent = re.sub('<[^>]*>', repl= ' ', string=sent) # <br /> 처리
    sent = re.sub('[!"#$%&\()*+,-./:;<=>?@[\\]^_`{|}~]', repl= ' ', string=sent) # 특수 문자 처리('는 제외)
    sent = re.sub('\\s+', repl=' ', string=sent) # 연속된 띄어쓰기 처리
    if sent:
        return sent

마지막에 if sent: return sent를 한 이유는 전처리 후 결과 문자열이 비어있지 않은지 확인하기 위함이다.

즉, 입력 문자열을 소문자로 만들고 <br /> 태그나 특수문자, 불필요한 공백을 제거한 후에 결과 문자열이 존재할 때만 반환하도록 하는 것이다. 만약 전처리 결과가 빈 문자열이 된다면, 마지막 if 문에 걸리지 않으므로 빈 문자열을 반환하지 않게 된다.

위와 같이 정의한 전처리 함수를 train_data와 test_data에 적용한다.

for example in train_data.examples:
    vars(example)['text'] = preprocess_sentence(' '.join(vars(example)['text'])).split()
    
for example in test_data.examples:
    vars(example)['text'] = preprocess_sentence(' '.join(vars(example)['text'])).split()

- vars(example)['text']는 각 example의 속성 중 'text'값을 가져온다.

■ 이제 주어진 data를 이용해 단어 집합(vocabulary)를 만들면 된다. torchtext에서 vocab을 만드는 방법은 앞서 정의한 text와 label Field에 build_vocab을 적용하면 된다. 그러면 텍스트 데이터와 레이블 데이터의 단어 집합(vocab)을 만들 수 있다.

text.build_vocab(train_data, min_freq=2, max_size=None, vectors='glove.6B.200d')
label.build_vocab(train_data)

- min_freq는 단어 집합(사전)에 등록하기 위한 단어(토큰)의 최소 등장 횟수에 제한을 둘 수 있다. 
- max_size는 단어 집합의 최대 크기를 제한을 둘 수 있다. 즉, 단어 집합의 최대 크기를 지정한다. 
- vectors는 사용할 사전 학습된 임베딩 벡터 룩업 테이블을 string 형태로 지정하는 옵션.
- glove 외에 fasttext도 사용할 수 있다. 사용할 수 있는 임베딩의 종류는 다음과 같다.

charngram.100d, fasttext.en.300d, fasttext.simple.300d, 
glove.42B.300d, glove.840B.300d, 
glove.twitter.27B.25d, glove.twitter.27B.50d, glove.twitter.27B.100d, glove.twitter.27B.200d,
glove.6B.50d, glove.6B.100d, glove.6B.200d, glove.6B.300d
print('vocab size', len(text.vocab))
print('label vocab size', len(label.vocab))
```#결과#```
vocab size 51956
label vocab size 2
````````````

■ 생성한 단어 집합 내의 단어(토큰)는 .stoi를 통해서 확인할 수 있다.

text.vocab.stoi
```#결과#```
defaultdict(None,
            {'<UNK>': 0,
             '<PAD>': 1,
             'the': 2,
             'and': 3,
             'portrayed': 997,
             'secret': 998,
             'co': 999,
             ...})
````````````

label.vocab.stoi
```#결과#```
defaultdict(None, {'neg': 0, 'pos': 1})
````````````

- 딕셔너리로 단어 집합이 생성된 것을 확인할 수 있다.

- 레이블 단어 집합의 경우 부정 리뷰는 정수 인덱스 0, 긍정 리뷰는 정수 인덱스 1에 매칭된 것을 확인할 수 있다.

차원이 200인 사전 학습된 글로브의 임베딩 벡터를 사용하였는데, 이는 다음과 같이 확인할 수 있다.

text.vocab.vectors.shape
```#결과#```
torch.Size([51956, 200])
````````````

- 총 51956개의 단어(토큰)마다 200차원의 임베딩 벡터가 할당되어 있는 것을 볼 수 있다.

■ 이제 모델 학습을 위해 다음과 같이 train, valid, test data를 구분하고 이터레이터(iterator)를 이용해 배치 데이터를 만들면 된다.

import random

train_data, valid_data = train_data.split(random_state = random.seed(42), split_ratio = 0.8) 
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

train_loader, valid_loader, test_loader = data.BucketIterator.splits(
    datasets=(train_data, valid_data, test_data), batch_size = 32, device = device)
    
len(train_loader), len(valid_loader)
```#결과#```
(625, 157)
````````````

- 25000개 중에서 0.8은 train으로 나누었으니 train의 개수는 20000개

- 그리고 배치 크기 32씩 묶어주었으므로 훈련 데이터의 미니 배치 수는 20000/32 = 625개

■ iterator로 정의하였으니 next나 for 문을 이용해 다음과 같이 배치를 하나씩 꺼낼 수 있다.

data = next(iter(train_loader))
print(data.text.shape,'\n',data.text)
```#결과#```
torch.Size([32, 500]) 
 tensor([[    1,     1,     1,  ...,   267,     9,   969],
        [    1,     1,     1,  ...,   764, 11524,   302],
        [    1,     1,     1,  ...,   136,     3,   564],
        ...,
        [    1,     1,     1,  ...,  9795,  6067, 20068],
        [    1,     1,     1,  ...,     4,    75,  2219],
        [    1,     1,     1,  ...,   140,    11,    28]], device='cuda:0')
````````````

print(data.label.shape,'\n',data.label)
```#결과#```
torch.Size([32]) 
 tensor([0., 0., 1., 0., 1., 0., 1., 1., 0., 0., 0., 0., 1., 1., 1., 0., 1., 0.,
        0., 0., 0., 1., 1., 1., 0., 1., 1., 0., 1., 0., 0., 0.],
       device='cuda:0')
````````````

■ 배치 데이터의 크기는 32 x 500이다. 여기서 32는 배치 크기이고 500은 각 샘플의 길이이다. 500은 fix_length에서 지정한 값이다. 즉, 미니 배치의 크기는 (batch size x fix_length)이다.

■ 그리고 샘플 길이가 500보다 작은 샘플들은 앞에 <pad> 토큰의 번호인 숫자 1로 패딩된 것을 볼 수 있다. 

■ 이제 감성 분류를 수행할 모델을 정의하면 된다.

model_config = {
    'embedding_type': 'glove', 
    'embedding_dim':200,
    'vocab_size':len(text.vocab),
    'batch_size':32
}
class textclassifier(nn.Module):
    def __init__(self, **model_config):
        super().__init__()

        if model_config['embedding_type'] in ['glove', 'fasttext']:
            self.embedding = nn.Embedding(model_config['vocab_size'], model_config['embedding_dim'],
                                          _weight = text.vocab.vectors, padding_idx=1)
        else:
            self.embedding = nn.Embedding(model_config['vocab_size'], model_config['embedding_dim'], padding_idx=1)

        self.bidirectional = model_config['bidirectional']
        self.num_direction = 2 if model_config['bidirectional'] else 1
        self.model_type = model_config['model_type'] 

        self.lstm = nn.LSTM(
            input_size = model_config['embedding_dim'], 
            hidden_size = model_config['hidden_dim'],
            num_layers = model_config['num_layers'],
            dropout = model_config['dropout'], 
            bidirectional = model_config['bidirectional'], 
            batch_first = model_config['batch_first']
        )

        self.gru = nn.GRU(
            input_size = model_config['embedding_dim'], 
            hidden_size = model_config['hidden_dim'],
            num_layers = model_config['num_layers'],
            dropout = model_config['dropout'], 
            bidirectional = model_config['bidirectional'], 
            batch_first = model_config['batch_first']
        )
        
        self.fc = nn.Linear(model_config['hidden_dim']*self.num_direction, model_config['output_dim'])
        self.drop = nn.Dropout(model_config['dropout'])

    def forward(self, x):
        embedded = self.embedding(x)
        # batch_first = True이면, 현재 embedded.shape: [batch_first, seq_len = time_steps, embedding_dim]

        if self.model_type == 'lstm':
            outputs, (hidden, cell) = self.lstm(embedded)
        else: # model_type == 'gru'
            outputs, hidden = self.gru(embedded)
            
        # 현재 outputs.shape: [batch_size, seq_len, hidden_dim*self.num_direction(bidirectional)]
        # 현재 hidden.shape: [self.num_direction(bidirectional)*num_layers, batch_size, hidden_dim]

        batch_size = hidden.size(1) 
        
        if self.num_direction == 1: # bidirectional=False # 단방향
            last_hidden = outputs[:,-1,:] # 단방향이면 마지막 시점의 결과만 있어도 된다.
        elif self.num_direction == 2: # bidirectional=True # 양방향
            final_state = hidden.view(model_config['num_layers'], 
                                      self.num_direction, 
                                      batch_size, 
                                      model_config['hidden_dim'])[-1]
            h_1, h_2 = final_state[0], final_state[1] # 정방향 h, 역방향 h
            last_hidden = torch.cat((h_1, h_2), 1)
            
        # 현재 last_hidden.shape: [batch_size, hidden_dim * self.num_direction(bidirectional)]
        output = self.fc(self.drop(last_hidden))
        # 현재 output.shape: [batch_size, output_dim]
        return output

■ 토큰의 정수 인덱스를 임베딩 벡터로 변환하는 임베딩 레이어는 torch.nn.Embedding( )으로 정의할 수 있다. 

torch.nn.Embedding(num_embeddings, embedding_dim, padding_idx, ...)

- num_embeddings에는 단어 집합(또는 사전)의 크기를 지정한다.

- embedding_dim에는 원하는 임베딩 벡터의 차원을 설정한다. 

만약, 사전 학습된 임베딩 벡터를 사용할 경우, embedding_dim은 사전 학습된 벡터의 차원과 일치해야 한다. 

■ 그 이유는 torch.nn.Embedding( )은 파이토치에서 토큰의 정수 인덱스를 고정된 크기의 실수 임베딩 벡터로 매핑하는 룩업 테이블(lookup tabel)역할을 수행하기 때문이다.

■ torch.nn.Embedding()은 내부적으로 (num_embeddings, embedding_dim) 크기의 학습 가능한 가중치(learnable weight) 행렬인 weight 속성을 가지고 있다. 이 가중치 행렬의 값은 일반적으로 정규분포 \( \mathcal{N}(0, 1) \)에서 생성된 값으로 초기화된다. 

■ 만약, 다음과 같은 사전 학습된 임베딩인 text.vocab.vectors로 torch.nn.Embedding()의 weight 행렬을 설정하는 경우, 해당 임베딩으로 가중치 행렬이 초기화된다.

text.vocab.vectors
```#결과#```
tensor([[ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000],
        [ 0.0000,  0.0000,  0.0000,  ...,  0.0000,  0.0000,  0.0000],
        [-0.0715,  0.0935,  0.0237,  ...,  0.3362,  0.0306,  0.2558],
        ...,
        [ 0.5876,  0.3555,  0.5973,  ..., -0.0439, -0.3948,  0.1179],
        ...,
````````````

text.vocab.vectors.shape
```#결과#```
torch.Size([51956, 200])
````````````

 

- 이 예에서 단어 집합의 크기는 51956이고, 사용한 사전 학습된 임베딩 벡터의 차원은 200이므로 torch.nn.Embedding()의 weight은 (51956, 200)의 크기를 가지며, text.vocab.vectors의 값으로 초기화된다.

- 즉, torch.nn.Embedding( )은 '단어 집합의 크기(vocab_size) x 임베딩 차원(embedding_dim)'행렬을 만들어 학습시킨다고 볼 수 있다.

■ 만약, 해당 embedding layer를 학습시키지 않는 'freeze 상태'로 만들고 싶다면, torch.nn.Embedding.from_pretrained(text.vocab.vectors)를 사용하면 된다.

■ 순전파(forward) 연산에서는 입력 텐서 x(토큰의 정수 인덱스 배치 데이터)가 있을 때, 입력 텐서의 한 원소 값이 \( k \)라면, 임베딩(가중치) 행렬 weight에서  51956개의 행(= 단어 집합에 있는 토큰의 개수) 중 \( k \) 번째 행이 선택되어 출력된다. 이것이 룩업 테이블 연산이다.  

- 이 과정으로 인해 embedding layer를 통과한 입력 텐서의 shape은 (batch_size, seq_len, embedding_dim)이 되며,

- 배치 크기가 32라면, 이 예시에서는 (32, 500, 200)의 크기를 가지게 된다.

■ 역전파(backward) 과정에서는 선택된 임베딩 벡터에 대한 그래디언트가 계산되어, 정수 인덱스 \( k \)에 해당하는 단어의 임베딩 벡터값이 학습된다.

■ 위 코드에서는 GloVe나 FastText에서 사전 학습된 임베딩 벡터값을 사용할 경우, 다음과 같이 embedding layer의 가중치 행렬 weight를 해당 벡터값으로 초기화하고, 그렇지 않은 경우에는 정규분포 \( \mathcal{N}(0, 1) \)에서 생성된 값으로 초기화되도록 설정하였다. 또한, <pad> 토큰의 정수 인덱스를 1로 명시하였다.

        if model_config['embedding_type'] in ['glove', 'fasttext']:
            self.embedding = nn.Embedding(model_config['vocab_size'], model_config['embedding_dim'],
                                          _weight = text.vocab.vectors, padding_idx=1)
        else:
            self.embedding = nn.Embedding(model_config['vocab_size'], model_config['embedding_dim'], padding_idx=1)

■ 이러한 임베딩 계층을 통과한 텐서가 lstm 셀 또는 gru 셀의 입력으로 들어가서 lstm 셀이면 outputs, (hidden state, cell state), gru 셀이면 outputs, hidden state으로 반환된다.

        self.lstm = nn.LSTM(
            input_size = model_config['embedding_dim'], 
            hidden_size = model_config['hidden_dim'],
            num_layers = model_config['num_layers'],
            dropout = model_config['dropout'], 
            bidirectional = model_config['bidirectional'], 
            batch_first = model_config['batch_first']
        )

        self.gru = nn.GRU(
            input_size = model_config['embedding_dim'], 
            hidden_size = model_config['hidden_dim'],
            num_layers = model_config['num_layers'],
            dropout = model_config['dropout'], 
            bidirectional = model_config['bidirectional'], 
            batch_first = model_config['batch_first']
        )

- 이때 모든 시점의 결과를 담고 있는 outputs의 첫번째 차원은 배치, 두 번째 차원은 time step = seq_len이다.

- 위와 같이 두 번째 차원에 -1을 하면 마지막 시점의 은닉 상태를 반환한다. 이는 예측값 \( \hat{y} \)로 실제값 \( y \)와 손실을 계산하기 위한 출력값이다.

■ 그리고 현재 문제는 분류 문제이므로 클래스(class)에 대한 점수(score)를 생성하기 위해 fc layer를 1개 만들어 통과시켜준다. 시그모이드 함수가 없는 이유는 손실 함수로 torch.nn.BCEWithLogitsLoss( )을 사용할 것이기 때문이다.

self.fc = nn.Linear(model_config['hidden_dim']*self.num_direction, model_config['output_dim'])

- BCELoss( ) 함수는 Binary Cross Entropy Loss 함수로 함수 내에 시그모이드 함수가 존재하지 않는다. 그러므로 이 함수를 사용할 경우 모델을 정의할 때, 시그모이드 함수 계층을 별도로 만들어야 한다.

- BCEWithLogitsLoss( ) 함수는 함수 내에 시그모이드 함수가 포함되어 있다. 그러므로, 별도로 시그모이드 함수 계층을 정의할 필요가 없다.

■ 그다음, textclassifier 클래스의 인스턴스를 다음과 같이 생성했을 때

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = textclassifier(**model_config).to(device)

■ model에 입력값을 넣고 textclassifier 클래스의 forward 메서드가 작동한다.

배치(batch)를 사용할 경우 크기가 (batch_size, seq_len)인 입력값(입력 텐서) x가 들어오면, 먼저 입력값 x는 embedding layer를 통과한다. 그러면 룩업 테이블 연산 결과로 입력 텐서의 크기는 (batch_size, seq_len, embedding_dim)이 된다.

embedded = self.embedding(x)

■ 그다음 임베딩을 적용한 입력 텐서를 lstm layer나 gru layer에 입력으로 넣는다.

        if self.model_type == 'lstm':
            outputs, (hidden, cell) = self.lstm(embedded)
        else: # model_type == 'gru'
            outputs, hidden = self.gru(embedded)

- forward 메서드가 호출될 때, model_config에서 넘겨준 model_type 값에 따라 어느 쪽을 사용할지 결정한다.

- 만약 lstm 셀이라면 리턴값으로 모든 시점의 hidden states와 마지막 시점의 hidden state 그리고 cell state를 얻게 된다.

- gru 셀이라면 리턴값으로 모든 시점의 hidden states와 마지막 시점의 hidden state를 얻게 된다.

■ 손실을 계산하기 위해선 lstm/gru 셀의 결과인 은닉 상태값이 필요하다. 왜냐하면 은닉 상태값을 이용해서 실제값과 비교하여 손실을 계산해야 하기 때문이다.

■ 현재 풀고자 하는 문제는 감성 분류이므로 구현해야 할 모델의 형태는 다음 그림 중 many-to-one이라고 할 수 있다. 

[출처] https://dotnettutorials.net/lesson/recurrent-neural-network/

■ 즉, many-to-many처럼 일부 또는 모든 시점의 은닉 상태가 필요한 것이 아니라, 마지막 시점의 은닉 상태가 필요하다. 이때, 모델이 단방향인지 역방향인지에 따라 마지막 시점의 은닉 상태를 추출하는 방법이 달라질 수 있다.

- 정확히는 task에 따라 output을 어떻게 쓸지 커스텀할 수 있다. rnn/lstm/gru의 output(=hidden states)을 가져와 연결(concatenate)하는 방법도 있고 합산하거나 평균내는 방법도 있다.

모델이 단방향이라면, 위의 many-to-one의 그림처럼 단방향(정방향)의 마지막 시점의 은닉 상태만 필요하므로 다음과 같이 seq_len = time step 차원에서 -1을 지정해 마지막 시점의 은닉 상태만 가져오면 된다.

        if self.model_type == 'lstm':
            outputs, (hidden, cell) = self.lstm(embedded)
        else: # model_type == 'gru'
            outputs, hidden = self.gru(embedded)
           
       batch_size = hidden.size(1) 
       
       if self.num_direction == 1: # bidirectional=False # 단방향
           last_hidden = outputs[:,-1,:] # 단방향이면 마지막 시점의 결과만 있어도 된다.

■ 양방향 모델일 경우, 다음 그림과 같이 정방향의 은닉 상태와 역방향의 은닉 상태가 존재하게 된다.

■ 양방향의 경우, 위의 그림처럼 각 방향에 대해 hidden_size를 가진 2개의 텐서가 존재한다. 이 2개의 텐서는 정방향과 역방향의 은닉 상태(hidden state)이다. 

■ 이때, 주의할 점은 '양방향'이기 때문에 정방향과 역방향의 마지막 시점(time step)이 서로 다르다는 점이다.

■ 위의 그림에서는 정방향의 마지막 은닉 상태는 h5이지만, 역방향의 경우 첫 번째 시점이 time step 5이기 때문에 h1이 마지막 시점의 은닉 상태가 된다. 

■ 이렇게 모델이 양방향일 경우, 보통 rnn/lstm/gru 계층의 결과를 다음 계층으로 넘기기 전에 두 텐서를 연결한다. 혹은 두 텐서를 요소별로 평균 내거나 합산하는 경우도 있다.

■ 예를 들어, 다음과 같은 lstm 셀이 있을 경우, 마지막 시점의 정방향 은닉 상태 텐서와 역방향 은닉 상태 텐서를 연결하는 방법은 다음과 같다.

lstm = nn.LSTM(input_size=2, hidden_size=3, num_layers=2, 
               bidirectional=True, batch_first=True)
               
lstm               
```#결과#```
LSTM(2, 3, num_layers=2, batch_first=True, bidirectional=True)
````````````

# (batch_size, sequence_length = time steps, input_size = # of features)
inputs = torch.Tensor(3, 4, 2)

■ 예시로 사용할 형태는 양방향 lstm이며 입력-출력 방향으로 2개의 lstm 계층이 쌓여 있는 형태이다. 이때의 outputs과 hidden state의 크기는 다음과 같다.

outputs, (hidden_state, cell_state) = lstm(inputs)
print(outputs.shape, hidden_state.shape, cell_state.shape)
```#결과#```
torch.Size([3, 4, 6]) torch.Size([4, 3, 3]) torch.Size([4, 3, 3])
````````````

■ 반환되는 hidden의 크기는 (num_layers*num_directions, batch_size, hidden_size)이다.

- 여기서 num_directions은 bidirectional 여부로 bidirectional=True면 2, bidirectional=False면 1로 생각하면 된다.

■ 이때 hidden_state에 있는 값을 출력해보면 다음과 같다. 

여기서 첫 번째 레이어의 마지막 시점의 정방향 은닉 상태와 역방향 은닉 상태는 첫 번째와 두 번째 3x3 텐서이며, 두 번째 레이어의 마지막 시점의 정방향 은닉 상태와 역방향 은닉 상태는 세 번째와 네 번째 3x3 텐서이다.

이렇게 되는 이유는 모든 시점의 은닉 상태를 반환하는 outputs을 통해 알 수 있다.

■ 첫 번째 리턴값인 outputs의 크기는 [3, 4, 6]으로

[batch_size,  seq_len = time_steps, hidden_size x num_directions(2)]이다. outputs에 대해 각 배치별 시점(time step)별 정방향 은닉 상태와 역방향 은닉 상태는 다음과 같다.

■ 현재 감성 분류 작업을 위해 만들 many-to-one 모델은 시퀀스 전체 정보를 압축한 마지막 시점의 은닉 상태값이 필요하다.

■ 양방향 LSTM을 사용하는 경우, 두 방향의 마지막 은닉 상태를 모두 활용해야 하는데, 필요한 은닉 상태 벡터들은 위의 그림에서 노란색으로 밑줄친 것들이다.

- 정방향은 시퀀스의 처음부터 순차적으로 진행하므로 각 배치의 마지막 time step(time step 4)의 은닉 상태가 전체 정보를 압축하고 있다. 

- 이는 다음 그림에 있는 마지막 hidden state의 파란색으로 표시된 값들이 이에 해당한다.

- 반면, 역방향은 시퀀스의 끝에서부터 시작하여 역순으로 진행하므로, 각 배치의 첫 번째 time step(time step 1)의
은닉 상태가 역방향의 전체정 정보를 압축하고 있다.

- 이는 다음 그림에 있는 마지막 hidden state의 빨간색으로 표시된 값들이다. 

■ 이 예시의 hidden state의 크기는 (num_layers * num_directions, batch_size, hidden_size)이다. 여기서 필요한 것은 두 개의 lstm 레이어 중 파란색과 빨간색으로 표시한 마지막 레이어의 정방향 및 역방향 은닉 상태이다. 

■ 해당 값들을 구하기 위해서는 (num_layers * num_directions, batch_size, hidden_size) 크기의 hidden state를 view 함수를 이용해서 (num_layers, num_directions, batch_size, hidden_size)로 분리하면 된다. 

■ 여기서 필요한 것은 세 번째와 네 번째의 3x3 텐서이므로  (num_layers, num_directions, batch_size, hidden_size)에 [-1] 인덱싱을 적용하여 마지막 lstm 레이어의 모든 방향 은닉 상태를 추출할 수 있다.

- (num_layers, num_directions, batch_size, hidden_size)[-1] == (num_layers, num_directions, batch_size, hidden_size)[-1, :, :, :]

final_state = hidden_state.view(2, 2, 3, 3)[-1]

final_state.shape, final_state
```#결과#```
(torch.Size([2, 3, 3]),
 tensor([[[-0.2322, -0.0649, -0.2900],
          [-0.2461, -0.0759, -0.3113],
          [-0.2478, -0.0842, -0.3124]],
 
         [[ 0.0659, -0.2434, -0.1086],
          [ 0.0922, -0.1804, -0.1308],
          [ 0.0969, -0.1757, -0.1298]]], grad_fn=<SelectBackward0>))
````````````

 

■ 이렇게 추출한 final_state의 크기는 (num_directions(bidirectional=True=2), batch_size, hidden_size)가 된다. 첫 번째 3x3 텐서가 정방향 lstm의 마지막 시점에서의 은닉 상태, 두 번째 3x3 텐서가 역방향 lstm의 마지막 시점에서의 은닉 상태이다. 그러므로 다음과 같이 분리할 수 있다.

h_1, h_2 = final_state[0], final_state[1]

h_1.shape, h_2.shape
```#결과#```
(torch.Size([3, 3]), torch.Size([3, 3]))
````````````

print(h_1) # forward pass last time hidden state
print()
print(h_2) # backward pass last time hidden state
```#결과#```
tensor([[-0.2322, -0.0649, -0.2900],
        [-0.2461, -0.0759, -0.3113],
        [-0.2478, -0.0842, -0.3124]], grad_fn=<SelectBackward0>)

tensor([[ 0.0659, -0.2434, -0.1086],
        [ 0.0922, -0.1804, -0.1308],
        [ 0.0969, -0.1757, -0.1298]], grad_fn=<SelectBackward0>)
````````````

■ 두 텐서는 다음과 같이 파이토치의 cat( )을 이용하여 연결할 수 있다. 이때 다음 계층인 fc layer에 넘겨주기 위해 다음과 같이 (batch_size, hidden_size * bidirectional(True=2, False=1))로 크기를 맞춰주면 된다.

x = torch.cat((h_1, h_2), 1)
x.shape
```#결과#```
torch.Size([3, 6])
````````````

■ 이렇게 양방향 모델에서 위와 같이 정방향과 역방향 lstm의 마지막 시점에서의 은닉 상태를 분리하고 다시 연결한 것과 다음과 같이 단순히 마지막 시점만 추출하는 경우 추출되는 은닉 상태 벡터가 달라지므로 주의할 필요가 있다.

outputs[:,-1,:]
```#결과#```
tensor([[-0.2322, -0.0649, -0.2900,  0.0442, -0.0905, -0.0733],
        [-0.2461, -0.0759, -0.3113,  0.0454, -0.0899, -0.0744],
        [-0.2478, -0.0842, -0.3124,  0.0480, -0.0861, -0.0757]],
       grad_fn=<SliceBackward0>)
````````````

■ 이에 대한 내용은 textclassifier 클래스의 다음 부분에 해당한다.

        batch_size = hidden.size(1) 
        
        if self.num_direction == 1: # bidirectional=False # 단방향
            last_hidden = outputs[:,-1,:] # 단방향이면 마지막 시점의 결과만 있어도 된다.
        elif self.num_direction == 2: # bidirectional=True # 양방향
            final_state = hidden.view(model_config['num_layers'], 
                                      self.num_direction, 
                                      batch_size, 
                                      model_config['hidden_dim'])[-1]
            h_1, h_2 = final_state[0], final_state[1] # 정방향 h, 역방향 h
            last_hidden = torch.cat((h_1, h_2), 1)

■ 여기서 model_config['batch_size']값을 사용하지 않고 batch_size = hidden.size(1)로 실제 입력 텐서 hidden의 크기를 가져오는 이유는 마지막 배치의 크기가 일정하지 않더라도, 모델이 이에 맞춰 학습 및 추론을 수행할 수 있도록 배치 크기를 동적으로 가져오기 위함이다.

- 예를 들어 총 데이터가 100개라고 했을 때, 배치 크기를 32로 설정했다면, model_config['batch_size']의 value에 해당되는 값은 32가 된다.

- 첫 3개 배치는 각각 32개 데이터(총 96)로 처리되므로 4개의 데이터가 남게 된다. 이렇게 되면 모델은 계속해서 32개의 데이터를 처리할 수 없으므로 오류가 발생한다.

■ 이제, 이렇게 하여 얻은 last_hidden을 fc layer에 통과시켜 클래스에 대한 점수를 계산하면 된다.

        output = self.fc(self.drop(last_hidden))
        # 현재 output.shape: [batch_size, output_dim]
        return output

■ 모델을 만들었으므로, 해당 순환 신경망 모델이 실제 데이터를 feed-forward하는지, 즉 입력값이 출력까지 한 방향으로 전달되는지 1개의 배치 데이터만 가져와서 확인해보자.

model_config.update(dict(batch_first=True, 
                         model_type='lstm',
                         bidirectional=True,
                         hidden_dim=128,
                         num_layers=1,
                         output_dim=1,
                         dropout=0
                        ))
                        
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = textclassifier(**model_config).to(device)
print(model)
```#결과#```
textclassifier(
  (embedding): Embedding(51956, 200, padding_idx=1)
  (lstm): LSTM(200, 128, batch_first=True, bidirectional=True)
  (gru): GRU(200, 128, batch_first=True, bidirectional=True)
  (fc): Linear(in_features=256, out_features=1, bias=True)
  (drop): Dropout(p=0, inplace=False)
)
````````````
data = next(iter(train_loader))

predictions = model.forward(data.text).squeeze() # squeeze() (32, 1) -> (32)
predictions.shape, predictions
```#결과#```
(torch.Size([32]),
 tensor([0.1236, 0.0615, 0.1481, 0.0734, 0.1864, 0.0824, 0.1406, 0.1211, 0.0899,
         0.0800, 0.0738, 0.0453, 0.0649, 0.0781, 0.0491, 0.0717, 0.1030, 0.1191,
         0.0237, 0.0404, 0.0618, 0.1026, 0.0637, 0.1808, 0.1120, 0.0877, 0.0792,
         0.1450, 0.0770, 0.0657, 0.0771, 0.0484], device='cuda:0',
        grad_fn=<SqueezeBackward0>))
````````````

loss_fn = nn.BCEWithLogitsLoss().to(device)
loss = loss_fn(predictions, data.label)
loss
```#결과#```
tensor(0.6963, device='cuda:0',
       grad_fn=<BinaryCrossEntropyWithLogitsBackward0>)
````````````

■ 이진 '분류' 문제이므로 정확도를 계산할 수 있다. 정확도를 계산하기 위해서는 다음과 같은 3개의 단계가 필요하다.

■ 먼저, 모델의 순전파 출력값인 predictions은 이 예에서는 로짓(logits)이라고 볼 수 있다.

로짓은 모델의 각 클래스(class)에 대해 예측한 값이다. 이 로짓값을 시그모이드 함수에 넣었을 때, 0에서 1사이의 (확률)값이 반환된다. 

■ 여기에 다음과 같이 round를 적용하면, 확률값이 0.5 이상이면 긍정(1), 0.5 미만이면 부정(0)의 값을 나타내게 된다.

torch.round(torch.sigmoid(predictions))

■ 그 값을 실제 레이블과 비교하면 다음과 같이 올바르게 예측한 것에 대해서는 True, 틀린 예측에 대해서는 False가 반환된다.

torch.round(torch.sigmoid(predictions))==data.label
```#결과#```
tensor([False,  True,  True,  True, False,  True,  True, False, False,  True,
        False, False, False, False, False,  True,  True, False,  True,  True,
         True, False, False, False,  True, False,  True,  True,  True, False,
         True, False], device='cuda:0')
````````````

그다음, 위의 불리언 값을 실수형(float)으로 변환한다면, 올바른 예측 개수를 더하거나 평균을 내어 정확도를 계산할 수 있다.

(torch.round(torch.sigmoid(predictions))==data.label).float()
```#결과#```
tensor([0., 1., 1., 1., 0., 1., 1., 0., 0., 1., 0., 0., 0., 0., 0., 1., 1., 0.,
        1., 1., 1., 0., 0., 0., 1., 0., 1., 1., 1., 0., 1., 0.],
       device='cuda:0')
````````````

이 단계들을 통합하여 다음과 같은 정확도 계산 함수를 정의할 수 있다.

def binary_accuracy(preds, y):
    rounded_preds = torch.round(torch.sigmoid(preds))
    correct = (rounded_preds == y).float() # float()을 적용해 True/False를 1./0.으로
    acc = correct.sum()/len(correct) # True = 1인 개수 / 전체 개수
    return acc
    
binary_accuracy(predictions, data.label)    
```#결과#```
tensor(0.5000, device='cuda:0')
````````````

■ 이제 bi-LSTM과 bi-GRU 모델을 직접 실행하기 위해 다음과 같이 train 함수와 evaluate 함수를 정의한다.

def train(model, dataloader, optimizer, loss_function, device):
    model.train()
    total_loss, total_acc = 0.0, 0.0

    for batch in dataloader:
        batch.text = batch.text.to(device)
        batch.label = batch.label.to(device)

        optimizer.zero_grad() # initialize
    
        ## forward pass
        pred = model(batch.text).squeeze()
        loss = loss_function(pred, batch.label)
        acc = binary_accuracy(pred, batch.label)

        ## backward pass
        loss.backward()
        optimizer.step() # update

        total_loss += loss.item()
        total_acc += acc.item()

    return total_loss/len(dataloader), total_acc/len(dataloader)
def evaluate(model, dataloader, optimizer, loss_function, device):
    model.eval()
    total_loss, total_acc = 0.0, 0.0

    with torch.no_grad():
        for batch in dataloader:
            batch.text = batch.text.to(device)
            batch.label = batch.label.to(device)
            
            pred = model(batch.text).squeeze(1)
            loss = loss_function(pred, batch.label)
            acc = binary_accuracy(pred, batch.label)

            total_loss += loss.item()
            total_acc += acc.item()
    return total_loss/len(dataloader), total_acc/len(dataloader)

- 훈련 과정에서 iterator에서 나온 배치 데이터 (batch.text, batch.label)을 device에 올려서 모델에 입력하고, 모델로부터 예측값(logits)을 계산한다.

- 이후, 예측값(pred)과 실제 정답(batch.label) 사이의 정확도와 손실을 계산하고, 계산된 손실을 바탕으로 역전파를 수행하여 모델의 가중치를 업데이트한다.

■ 먼저 bi-LSTM 모델과, bi-GRU 모델을 학습한 결과는 다음과 같다.

model = textclassifier(**model_config).to(device)
optimizer = torch.optim.Adam(model.parameters())
loss_function = nn.BCEWithLogitsLoss()

## bi-lstm
num_epochs = 3
best_val_loss = float('inf')

for epoch in range(num_epochs):
    train_loss, train_acc = train(model, train_loader, optimizer, loss_function, device)
    valid_loss, valid_acc = evaluate(model, valid_loader, optimizer, loss_function, device)
    
    if valid_loss < best_val_loss: 
        print(f'valid loss improved from {best_val_loss:.4f} to{valid_loss:.4f}.체크포인트 저장.')
        best_val_loss = valid_loss # valid_loss 업데이트
        torch.save(model.state_dict(), 'best_model_checkpoint.pth')

    print(f'Epoch:{epoch+1}/{num_epochs} | Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f} | Valid Loss: {valid_loss:.4f} | Valid Acc: {valid_acc:.4f}')

```#결과#```
valid loss improved from inf to0.5991.체크포인트 저장.
Epoch:1/3 | Train Loss: 0.5938 | Train Acc: 0.6813 | Valid Loss: 0.5991 | Valid Acc: 0.6869
valid loss improved from 0.5991 to0.2763.체크포인트 저장.
Epoch:2/3 | Train Loss: 0.3197 | Train Acc: 0.8662 | Valid Loss: 0.2763 | Valid Acc: 0.8848
Epoch:3/3 | Train Loss: 0.1299 | Train Acc: 0.9562 | Valid Loss: 0.3005 | Valid Acc: 0.8850
````````````

model_config['model_type'] = 'gru'

model = textclassifier(**model_config).to(device)
optimizer = torch.optim.Adam(model.parameters())
loss_function = nn.BCEWithLogitsLoss()

## bi-gru
위와 동일한 코드
```#결과#```
valid loss improved from inf to0.2633.체크포인트 저장.
Epoch:1/3 | Train Loss: 0.4017 | Train Acc: 0.8055 | Valid Loss: 0.2633 | Valid Acc: 0.8941
Epoch:2/3 | Train Loss: 0.1559 | Train Acc: 0.9436 | Valid Loss: 0.3317 | Valid Acc: 0.8784
Epoch:3/3 | Train Loss: 0.0482 | Train Acc: 0.9853 | Valid Loss: 0.3675 | Valid Acc: 0.8859
````````````

- 검증 과정에서 검증 손실과 검증 정확도를 계산해서, 검증 손실이 이전 최저 검증 손실보다 낮다면, 새로운 최저 검증 손실로 업데이트하고 해당 상태의 모델 가중치를 저장한다. 이 과정은 전체 에포크 동안 반복되며, 최종적으로 성능이 가장 좋은 모델의 가중치가 저장된다.

■ 저장된 bi-GRU 모델을 로드하여 device에 올린 다음, 테스트 데이터에 대한 성능을 측정한 결과는 다음과 같다.

model.load_state_dict(torch.load('best_model_checkpoint.pth')) # 모델 로드
model.to(device)
```#결과#```
textclassifier(
  (embedding): Embedding(51956, 200, padding_idx=1)
  (lstm): LSTM(200, 128, batch_first=True, bidirectional=True)
  (gru): GRU(200, 128, batch_first=True, bidirectional=True)
  (fc): Linear(in_features=256, out_features=1, bias=True)
  (drop): Dropout(p=0, inplace=False)
)
````````````

print(model.model_type)
```#결과#```
gru
````````````

test_loss, test_acc = evaluate(model, test_loader, optimizer, loss_function, device)
print(test_loss, test_acc)
```#결과#```
0.25693247883635406 0.8957001278772379
````````````

- 모델 객체의 model_type 속성을 출력하여 lstm 셀을 사용하는지 gru 셀을 사용하는지 확인할 수 있다.

■ 이제 임의의 입력 문장에 대해 모델을 사용하여 감성 분석을 수행할 수 있다. 이를 위해 아래와 같이 입력 문장이 주어졌을 때, 긍정 또는 부정을 예측하는 predict_sentiment( ) 함수를 정의한다.

def predict_sentiment(model, sentence):
    model.eval()
    token_indices = text.numericalize(text.pad([text.tokenize(preprocess_sentence(sentence))]))
    input_tensor = torch.LongTensor(token_indices).to(device)

    with torch.no_grad():
        logits = model(input_tensor)
    prediction = torch.sigmoid(logits)
    
    if prediction.item() > 0.5:
        print('긍정', prediction.item())
    else:
        print('부정', prediction.item())

- 먼저 model.eval( )을 통해 모델을 평가 모드로 설정한다. 평가 모드로 설정되었기 때문에 드롭아웃 같은 기능은 비활성화된다.

- 그다음, 입력으로 받은 문장을 전처리한 뒤 토큰화를 수행한다. 

- 이후 토큰화 결과(토큰 리스트)를 pad 함수로 처리하여 길이를 맞춰주고, numericalize를 통해 사전에 구축된 단어 집합(vocabulary)을 바탕으로 각 토큰을 정수 인덱스로 변환한다.

- 이렇게 얻은 입력 문장에 대한 토큰의 정수 인덱스를 모델이 입력으로 받을 수 있도록 LongTensor로 변환한다. 그리고 모델의 예측값을 계산한다.

- 예측값이 0.5를 초과하면 긍정, 0.5 미만이면 부정으로 분류한다.

예를 들어, "스토리가 몰입감이 있고 연기도 정말 휼륭했다"는 매우 긍정적인 문장과 "줄거리가 뻔하고 연기도 지루하고 감동이 없다"는 부정적인 문장을 입력으로 넣으면 결과는 다음과 같다.

test_sentence = 'The storyline was captivating and the performances were absolutely brilliant.'
predict_sentiment(model, test_sentence)
```#결과#```
긍정 0.987245500087738
````````````

test_sentence = 'The plot was predictable and the acting felt dull and uninspired.'
predict_sentiment(model, test_sentence)
```#결과#```
부정 0.007339664734899998
````````````

'자연어처리' 카테고리의 다른 글

시퀀스투시퀀스(Sequence‑to‑Sequence, seq2seq) (3)  (0) 2025.04.01
시퀀스투시퀀스(Sequence‑to‑Sequence, seq2seq) (2)  (0) 2025.03.30
LSTM, GRU (1)  (0) 2025.03.28
RNN (2)  (0) 2025.03.26
RNN (1)  (1) 2025.03.23