GTSRB\Final_Training\Images 디렉토리 안에 디렉토리 별로 이미지가 ppm 확장자로 분류되어 있다.
- ppm: 무손실 압축 이미지 파일 형식
- image 크기: 32 x 32
- 신경망 구조를 만들 때 선택 사항
2차원 컨볼루션에서 필터 개수 와 커널 크기
Max Pool 에서의 커널 크기
Fully Connected 계층(Dense)에서의 유닛 개수
계층 별 배치 크기, 최적화 알고리즘, 학습률, 할성화 함수, epoch 수 등
- 이미지 전처리
이미지 크기를 32*32로 수정
43개의 레이블을 생성: 디렉토리 이름이 레이블
표지판의 경우 구분해야 할 것은 색상이 아니라 모양: 이미지를 흑백으로 변경
데이터를 다룰 때 목적을 먼저 생각하고 작업을 시작해야 한다. (도메인)
- 필요한 패키지 와 상수 선언
클래스 개수와 이미지의 크기를 변수에 저장
이 두개의 절대로 변하지 않을 값
변하지 않는 값을 장할 변수의 이름은 모두 대문자로 작성하는 것이 좋다.
SNAKE 표기법이라고 한다.
N_CLASSES = 43
RESIZED_IMAGE = (32, 32)
# 모듈
import matplotlib.pyplot as plt
#디렉토리를 핸들링 할 때 사용하는 모듈
import glob
from skimage.color import rgb2lab
from skimage.transform import resize
import numpy as np
from collections import namedtuple
from sklearn.model_selection import train_test_split
- 이미지 전처리를 위한 함수
이름이 있는 튜플 생성 - 다른 언어에서는 튜플을 이런 형태로 만든다.
튜플의 원래 목적은 변경할 수 없는 하나의 행(record, row)을 표현하기 위한 것
인덱스가 아니라 이름으로 구별하는 것이 타당
Dataset = namedtuple('Dataset', ['X', 'y'])
# 포맷 변경해주는 함수
def to_tf_format(imgs):
return np.stack([img[:, :, np.newaxis] for img in imgs], axis=0).astype(np.float32)
- 이미지 디렉토리에 있는 모든 이미지들에 라벨링을 하기 위한 작업 - 이미지의 크기 변경도 여기서 수행
def read_dataset_ppm(rootpath, n_labels, resize_to):
#이미지 데이터 와 라벨을 저장할 list
images = []
labels = []
for c in range(n_labels):
#루트경로/00001/ 형태로 이미지 경로를 생성
full_path = rootpath + "/" + format(c, '05d') + "/"
#각 이미지 디렉토리를 순회하면서 확장자가 ppm 인 파일의 경로를 가지고
for img_name in glob.glob(full_path + "*.ppm"):
#이미지 읽어오기
img = plt.imread(img_name).astype(np.float32)
#이미지를 정규화
img = rgb2lab(img / 255.0)[:, :,0]
#이미지 크기를 조정
if resize_to:
img = resize(img, resize_to, mode='reflect')
#라벨 생성
#43개 짜리 배열을 만들어서 자신의 인덱스에 해당하는 값에만 1을 대입
#원핫 인코딩 된 라벨
label = np.zeros((n_labels, ), dtype=np.float32)
label[c] = 1.0
images.append(img.astype(np.float32))
labels.append(label)
return Dataset(X = to_tf_format(images).astype(np.float32), y = np.matrix(labels).astype(np.float32))
#모델 만들기
model = keras.Sequential()
#입력 층을 합성곱 층을 사용
model.add(keras.layers.Conv2D(32, kernel_size=3, activation="relu",
padding='same', input_shape=(28, 28, 1)))
model.add(keras.layers.MaxPooling2D(2))
model.add(keras.layers.Conv2D(64, kernel_size=3, activation="relu",
padding='same'))
model.add(keras.layers.MaxPooling2D(2))
#데이터를 1차원으로 만들어주는 층
model.add(keras.layers.Flatten())
#Dense 는 1차원의 데이터만 사용
model.add(keras.layers.Dense(100, activation="relu"))
#드랍 아웃 적용
model.add(keras.layers.Dropout(0.4))
model.add(keras.layers.Dense(10, activation="softmax"))
# 모델 확인
model.summary()
4. 모델 컴파일 (훈련)
model.compile(optimizer="adam", loss="sparse_categorical_crossentropy",
metrics=["accuracy"])
#체크 포인트
checkpoint_cb = keras.callbacks.ModelCheckpoint("best-cnn-model.keras")
#2번의 epoch 동안 점수가 좋아지지 않으면 조기 종료
early_stopping_cb = keras.callbacks.EarlyStopping(patience=2,
restore_best_weights=True)
history = model.fit(train_scaled, train_target, epochs=20,
validation_data=(val_scaled, val_target),
callbacks=[checkpoint_cb, early_stopping_cb])
4. 모델 평가
model.evaluate(val_scaled, val_target)
아래 합성곱 층으로 내려갈 때 뉴런의 개수를 늘렸는데 중간에 MaxPooling2D(Dropout을 적용해도 동일) 을 적용했기 때문에 실제 파라미터의 개수가 줄어들어 사용되므로 늘린 티가 나지 않음
tmp_coloumns = [pop.columns.get_level_values(0)[n] + \
pop.columns.get_level_values(1)[n]
for n in range(0,len(pop.columns.get_level_values(0)))]
pop.columns = tmp_coloumns
pop.head()
plt.figure(figsize=(8, 11))
# 지역 이름 표시
for idx, row in draw_korea_raw_stacked.iterrows():
# 광역시는 구 이름이 겹치는 경우가 많아서 시단위 이름도 같이 표시
# (중구, 서구)
if len(row['ID'].split())==2:
dispname = '{}\n{}'.format(row['ID'].split()[0], row['ID'].split()[1])
elif row['ID'][:2]=='고성':
dispname = '고성'
else:
dispname = row['ID']
# 시도 경계 그리기
for path in BORDER_LINES:
ys, xs = zip(*path)
plt.plot(xs, ys, c='black', lw=1.5)
#y축의 위아래 변경
plt.gca().invert_yaxis()
#plt.gca().set_aspect(1)
#축과 라벨 제거
plt.axis('off')
#자동 레이아웃 설정
plt.tight_layout()
plt.show()
# draw_korea_raw_stacked와 pop의 도시이름 일치시키기
#draw_korea_raw_stacked 와 pop 의 도시이름 비교
print(set(draw_korea_raw_stacked['ID'].unique()) - set(pop['ID'].unique()))
print(set(pop['ID'].unique()) - set(draw_korea_raw_stacked['ID'].unique()))
#일치하지 않는 데이터 삭제
tmp_list = list(set(pop['ID'].unique()) - set(draw_korea_raw_stacked['ID'].unique()))
for tmp in tmp_list:
pop = pop.drop(pop[pop['ID']==tmp].index)
print(set(pop['ID'].unique()) - set(draw_korea_raw_stacked['ID'].unique()))
def drawKorea(targetData, blockedMap, cmapname):
gamma = 0.75
#인구수 데이터의 크고 낮음을 분류하기 위한 값 만들기
whitelabelmin = (max(blockedMap[targetData]) -
min(blockedMap[targetData]))*0.25 + \
min(blockedMap[targetData])
#컬럼이름을 대입하기
datalabel = targetData
#최대값과 최소값 구하기
vmin = min(blockedMap[targetData])
vmax = max(blockedMap[targetData])
#x 와 y를 가지고 피봇 테이블 만들기
mapdata = blockedMap.pivot_table(index='y', columns='x', values=targetData)
#데이터가 존재하는 것 골라내기
masked_mapdata = np.ma.masked_where(np.isnan(mapdata), mapdata)
#그래프 영역 크기 만들기
plt.figure(figsize=(9, 11))
#색상 설정
plt.pcolor(masked_mapdata, vmin=vmin, vmax=vmax, cmap=cmapname,
edgecolor='#aaaaaa', linewidth=0.5)
# 지역 이름 표시
for idx, row in blockedMap.iterrows():
# 광역시는 구 이름이 겹치는 경우가 많아서 시단위 이름도 같이 표시
#(중구, 서구)
if len(row['ID'].split())==2:
dispname = '{}\n{}'.format(row['ID'].split()[0], row['ID'].split()[1])
elif row['ID'][:2]=='고성':
dispname = '고성'
else:
dispname = row['ID']
# 서대문구, 서귀포시 같이 이름이 3자 이상인 경우에 작은 글자로 표시
if len(dispname.splitlines()[-1]) >= 3:
fontsize, linespacing = 10.0, 1.1
else:
fontsize, linespacing = 11, 1.
#글자색상 만들기
annocolor = 'white' if row[targetData] > whitelabelmin else 'black'
#텍스트 출력하기
plt.annotate(dispname, (row['x']+0.5, row['y']+0.5), weight='bold',
fontsize=fontsize, ha='center', va='center', color=annocolor,
linespacing=linespacing)
# 시도 경계 그리기
for path in BORDER_LINES:
ys, xs = zip(*path)
plt.plot(xs, ys, c='black', lw=2)
plt.gca().invert_yaxis()
plt.axis('off')
cb = plt.colorbar(shrink=.1, aspect=10)
cb.set_label(datalabel)
plt.tight_layout()
plt.show()
# 인구수 합계로 Catrogram 그리기
drawKorea('인구수합계', pop, 'Blues')
#소멸위기지역으로 Cartogram 그리기
pop['소멸위기지역'] = [1 if con else 0 for con in pop['소멸위기지역']]
drawKorea('소멸위기지역', pop, 'Reds')
cctv.rename(columns={cctv.columns[0] : '구별'}, inplace=True)
print(cctv.head())
print()
gu = []
for x in cctv['구별']:
gu.append(x.replace(' ', ''))
cctv['구별'] = gu
pop.rename(columns={pop.columns[1] : '구별'}, inplace=True)
print(pop.head())
# 필터링
#pop에서 컬럼 추출
pop = pop[['기간', '구별', '계', '남자', '여자']]
#pop의 첫번째 행은 합계
#첫번째 행 제거
pop.drop([0], inplace=True)
#여성인구 비율을 알아보기 위해서 새로운 열 생성
pop['여성비율'] = pop['여자']/pop['계']*100
pop
# 병합
#구별 컬럼을 이용해서 2개의 frame을 합치기
df = pd.merge(cctv, pop, on='구별')
# 불필요한 컬럼 제거
del df['2011년 이전']
del df['2012년']
del df['2013년']
del df['2014년']
del df['2015년']
del df['2016년']
del df['2017년']
del df['기간']
Sequential API를 이용해서 회귀용 MLP을 구축, 훈련, 평가, 예측하는 방법은 분류에서 했던 것 과 매우 비슷하지만
차이점은 출력 층이 활성화 함수가 없는 하나의 뉴런을 가져야 한다는 것, 손실 함수가 평균 제곱 오차나 평균 절대값 오차로 변경을 해야 함
import tensorflow as tf
from tensorflow import keras
#회귀 모델 만들기
#input_shape 설정할 때 데이터의 개수는 생략
model = keras.models.Sequential([
keras.layers.Dense(30, activation="relu", input_shape=X_train.shape[1:]),
keras.layers.Dense(15, activation="relu"),
keras.layers.Dense(units=1)
])
model.summary()
#여러 경로의 input 사용
input_A = keras.layers.Input(shape=[5])
input_B = keras.layers.Input(shape=[6])
hidden1 = keras.layers.Dense(30, activation="relu")(input_B)
hidden2 = keras.layers.Dense(15, activation="relu")(hidden1)
#input_A는 하나의 hidden 층도 통과하지 않은 데이터
# hidden2는 2개의 hidden 층을
#통과한 데이터
concat = keras.layers.concatenate([input_A, hidden2])
output = keras.layers.Dense(1)(concat)
#입력을 2가지를 사용 - 데이터의 다양성을 추가해서 학습
#모든 입력이 hidden layer를 통과하게 되면 깊이가 깊어질 때 데이터의 왜곡 발생 가능
model = keras.models.Model(inputs=[input_A, input_B], outputs=[output])
#출력이 2개가 된 경우 손실을 적용할 때 비율을 설정하는 것이 가능
#이 경우는 첫번째 출력의 손실을 90% 반영하고 두번째 출력의 손실을 10% 반영
model.compile(loss="mse", optimizer=keras.optimizers.SGD(learning_rate=0.001),
loss_weights=[0.9, 0.1])
history = model.fit((X_train_A, X_train_B), y_train,
epochs=20, validation_data=((X_valid_A, X_valid_B), y_valid))
X_new_A, X_new_B = X_test_A[:3], X_test_B[:3]
#출력이 2개 이므로 각 데이터마다 2개의 값을 리턴
y_pred_main, y_pred_aux = model.predict([X_new_A, X_new_B])
print(y_pred_main)
print(y_pred_aux)
epochs 을 크게 지정하면 훈련을 많이 수행하기 때문에 성능이 좋아질 가능성이 높은데 늘리게 되면 훈련 시간이 오래 걸린다.
일정 에포크 동안 검증 세트에 대한 점수가 향상되지 않으면 훈련을 멈추도록 할 수 있다.
keras.callbacks.EarlyStopping 클래스의 인스턴스를 만들 때 patience 매개변수에 원하는 에포크 지정해서 만들고 모델이 fit 메서드를 호출할 때 callbacks 파라미터에 list 형태로 대입하면 됨
#5번의 epoch 동안 점수가 좋아지지 않으면 조기 종료
early_stopping_cb = keras.callbacks.EarlyStopping(patience=5)
history = model.fit((X_train_A, X_train_B), y_train,
epochs=100, validation_data=((X_valid_A, X_valid_B), y_valid),
callbacks=[early_stopping_cb])
LearningRateScheduler 라는 클래스를 이용하는데 이 때는 에포크 와 학습율을 매개변수로 갖는 함수를 만들어서 인스턴스를 생성할 때 대입
#5번의 epoch 동안 점수가 좋아지지 않으면 조기 종료
early_stopping_cb = keras.callbacks.EarlyStopping(patience=5)
#5번의 epoch 동안은 기존 학습률을 유지하고 그 이후에는 학습률을 감소시키는 함수
def scheduler(epoch, lr):
if epoch < 5:
return lr
else:
lr = lr - 0.0001
return lr
#학습률을 동적으로 변화시키는 체크포인트
lr_scheduler = tf.keras.callbacks.LearningRateScheduler(scheduler)
history = model.fit((X_train_A, X_train_B), y_train,
epochs=100, validation_data=((X_valid_A, X_valid_B), y_valid),
callbacks=[early_stopping_cb, lr_scheduler])
- 모델 저장과 복원
딥러닝은 fit 함수를 호출한 후 다음에 다시 fit을 호출하면 이전에 훈련한 이후 부터 다시 훈련하는 것이 가능
- on_train_begin, on_train_end, on_epoch_begin, on_epoch_end, on_batch_bigin, on_batch_end 이 메서드들은 훈련전후 그리고 한 번의 에포크 전후 그리고 배치 전후에 작업을 수행시키고자 하는 경우에 사용
on_test 로 시작하는 메서드를 오버라이딩 하면 검증 단계에서 작업을 수행
3-6) 신경망의 하이퍼 파라미터 튜닝
- 신경망의 유연성은 단점이 되기도 하는데 조정할 하이퍼 파라미터가 많음
- 복잡한 네트워크 구조에서 뿐 만 아니라 간단한 다층 퍼셉트론에서도 층의 개수, 층마다 존재하는 뉴런의 개수, 각 층에서 사용하는 활성화 함수, 가중치 초기화 전략 등 많은 것을 바꿀 수 있는데 어떤 하이퍼 파라미터 조합이 주어진 문제에 대해서 최적인지 확인
- 이전 머신러닝 모델들은 GridSearchCV 나 RandomizedSearchCV를 이용해서 하이퍼 파라미터 공간을 탐색할 수 있었는데 딥러닝에서는 이 작업을 할려면 Keras 모델을 sklearn 추정기 처럼 보이도록 바꾸기
def build_model(n_hidden=1, n_neurons=30, learning_rate=0.003, input_shape=[8]):
model = keras.models.Sequential()
#입력 레이어 추가
model.add(keras.layers.InputLayer(input_shape=input_shape))
#n_hidden 만큼 히든 층 추가
for layer in range(n_hidden):
model.add(keras.layers.Dense(n_neurons, activation="relu"))
#출력 층 추가
model.add(keras.layers.Dense(1))
#최적화 함수 생성
optimizer = keras.optimizers.SGD(learning_rate = learning_rate)
model.compile(loss="mean_squared_error", optimizer=optimizer, metrics=['mse'])
return model
배치 정규화를 이용하면 일부 뉴런이 죽는 현상을 거의 해소하기 때문에 성능이 좋아질 가능성이 높다.
- 그라디언트 클리핑
역전파 될 때 일정 임계값을 넘어서는 못하게 그라디언트를 잘라 내는 것
RNN 에서는 배치 정규화를 사용하기가 어려움
이 경우에는 Optimizer를 만들 때 clipvalue 나 clipnorm 매개변수를 이용해서 Gradient 값을 제한하는 방법으로 유사한 효과를 나타낸다.
배치 정규화가 -1 ~ 1 사이의 값으로 정규화하므로 임계값은 1.0을 사용한다.
그라디언트의 값이 -1 보다 작거나 1보다 크면 -1 이나 1로 수정해서 기울기가 소실되는 것을 방지
- 사전 훈련된 층 재사용
전이학습 (Transfer Learning) : 큰 규모의 NN을 처음부터 훈련시키는 것은 많은 자원을 소모하게 되는데 이런 경우 해결하려는 것과 비슷한 유형의 문제를 처리한 신경망이 있는지 확인해보고 그 신경망의 하위층을 재사용하는 것이 효율적
전이 학습을 이용하게 되면 훈련 속도를 개선할 수 있고 훈련 데이터의 양도 줄어들게 된다.
동일한 이미지 분류 모델이라면 하위 층에서 유사하게 점이나 선 들의 모형을 추출하는 작업을 할 가능성이 높으므로 하위 층을 공유해도 유사한 성능을 발휘하게 된다.
출력 층은 하고자 하는 마지막 작업이 서로 다를 가능성이 높고(ImageNet 의 이미지를 분류하는 모델은 20,000 개의 카테고리를 가지고 있고 우리는 개와 고양이를 분류하는 이진 분류의 경우) 상위 층은 거의 출력 과 유사한 형태가 만들어 진 것이므로 역시 재사용할 가능성은 낮다.
샌달 과 셔츠를 제외한 모든 이미지를 가지고 분류 모델을 만들고 이를 이용해서 샌달과 셔츠 이미지 중 200개 만 가진 작은 훈련 세트를 훈련해서 정확도를 확인
#데이터 셋 분할
def split_dataset(X, y):
y_5_or_6 = (y == 5) | (y == 6)
#5 나 6이 아닌 데이터
y_A = y[~y_5_or_6]
y_A[y_A > 6] -= 2 #6보다 큰 레이블은 2를 빼서 연속된 레이블로 만들기
y_B = (y[y_5_or_6] == 6).astype(np.float32)
return ((X[~y_5_or_6], y_A), (X[y_5_or_6], y_B))
tf.random.set_seed(42)
np.random.seed(42)
#모델 생성
model_A = keras.models.Sequential()
#입력 층
model_A.add(keras.layers.Flatten(input_shape=[28, 28]))
#히든 층
for n_hidden in (300, 100, 50, 50, 50):
model_A.add(keras.layers.Dense(n_hidden, activation="selu"))
#출력 층
model_A.add(keras.layers.Dense(8, activation="softmax"))
model_A.compile(loss = "sparse_categorical_crossentropy",
optimizer=keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'])
history = model_A.fit(X_train_A, y_train_A, epochs=20,
validation_data=(X_valid_A, y_valid_A))
# 모델 저장
model_A.save("my_model_A.keras")
#실제 해결을 하고자 하는 모델
model_B = keras.models.Sequential()
#입력 층
model_B.add(keras.layers.Flatten(input_shape=[28, 28]))
#히든 층
for n_hidden in (300, 100, 50, 50, 50):
model_B.add(keras.layers.Dense(n_hidden, activation="selu"))
#출력 층: 2개를 분류하는 것은 2가지 방법이 있음
#1일 확률을 구하는 것 과 0 과 1일 확률을 구하는 것
model_B.add(keras.layers.Dense(1, activation="sigmoid"))
model_B.compile(loss = "binary_crossentropy",
optimizer=keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'])
history = model_B.fit(X_train_B, y_train_B, epochs=20,
validation_data=(X_valid_B, y_valid_B))
- 기존 모델인 model_A를 이용해서 해결
#기존 모델 가져오기
model_A = keras.models.load_model("my_model_A.keras")
#기존 모델에서 출력 층을 제외한 레이어를 가져오기
model_B_on_A = keras.models.Sequential(model_A.layers[:-1])
#출력 층 추가
model_B_on_A.add(keras.layers.Dense(1, activation='sigmoid'))
#모든 레이어가 다시 훈련하지 않도록 설정
for layer in model_B_on_A.layers[:-1]:
layer.trainable = False
model_B_on_A.compile(loss = "binary_crossentropy",
optimizer=keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'])
history = model_B_on_A.fit(X_train_B, y_train_B, epochs=20,
validation_data=(X_valid_B, y_valid_B))
# 모든 레이어가 다시 훈련하도록 설정
#기존 모델에서 출력 층을 제외한 레이어를 가져오기
model_B_on_A = keras.models.Sequential(model_A.layers[:-1])
#출력 층 추가
model_B_on_A.add(keras.layers.Dense(1, activation='sigmoid'))
#모든 레이어가 다시 훈련하도록 설정
for layer in model_B_on_A.layers[:-1]:
layer.trainable = True
model_B_on_A.compile(loss = "binary_crossentropy",
optimizer=keras.optimizers.SGD(learning_rate=0.001),
metrics=['accuracy'])
history = model_B_on_A.fit(X_train_B, y_train_B, epochs=20,
validation_data=(X_valid_B, y_valid_B))
손실을 낮추기 위해서 신경망의 가중치 와 학습률 등의 신경망의 속성을 변경하는데 사용되는 최적화 방식
종류
SGD(확률적 경사하강법)
RMSprop
Adam: 가장 많이 사용되는 알고리즘으로 좋은 성능을 내는 것으로 알려져 있음
Adadelta
Adagrad
Nadam
Ftrl
- 평가 지표
분류: auc, precision, recall, accuracy
회귀: mse, mae, rmse
6. 모델 complie
model.compile(optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy'])
# 모델 훈련
history = model.fit(X_train, y_train, epochs=30,
validation_data = (X_valid, y_valid))