In [1]:
import torch

import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from tqdm import tqdm

In [None]:
# Check if CUDA is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [3]:
data = pd.read_csv('data/bel_data_test.csv')
# Load the data
data = np.array(data)

# Split features and labels
X = data[:, 1:]  # All columns except the first one
y = data[:, 0].astype(int)  # First column as labels

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [4]:
# Convert to PyTorch tensors
X_train_tensor = torch.FloatTensor(X_train)
y_train_tensor = torch.LongTensor(y_train)
X_test_tensor = torch.FloatTensor(X_test)
y_test_tensor = torch.LongTensor(y_test)

# Create DataLoader objects
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

In [5]:
class SemanticsMLP(nn.Module):
    def __init__(self, input_size=1024, hidden_sizes=[512, 256, 128], num_classes=62):
        super(SemanticsMLP, self).__init__()
        self.input_size = input_size
        self.hidden_sizes = hidden_sizes
        self.num_classes = num_classes

        # Encoder (feature extractor)
        self.encoder_layers = nn.ModuleList()
        prev_size = input_size
        for hidden_size in hidden_sizes:
            self.encoder_layers.append(nn.Linear(prev_size, hidden_size))
            prev_size = hidden_size

        # Classifier
        self.classifier = nn.Linear(hidden_sizes[-1], num_classes)

        # Decoder
        self.decoder_layers = nn.ModuleList()
        reversed_hidden_sizes = list(reversed(hidden_sizes))
        prev_size = hidden_sizes[-1]
        for hidden_size in reversed_hidden_sizes[1:] + [input_size]:
            self.decoder_layers.append(nn.Linear(prev_size, hidden_size))
            prev_size = hidden_size

    def encode(self, x):
        for layer in self.encoder_layers:
            x = F.relu(layer(x))
        return x

    def decode(self, x):
        for layer in self.decoder_layers[:-1]:
            x = F.relu(layer(x))
        x = self.decoder_layers[-1](x)  # No activation on the final layer
        return x

    def forward(self, x):
        encoded = self.encode(x)
        logits = self.classifier(encoded)
        reconstructed = self.decode(encoded)
        return logits, reconstructed

In [6]:
def show_image_comparison(original, reconstructed, label, prediction):
    """
    Display the original and reconstructed images side by side.
    
    :param original: Original image (1D tensor of 1024 elements)
    :param reconstructed: Reconstructed image (1D tensor of 1024 elements)
    :param label: True label of the image
    :param prediction: Predicted label of the image
    """
    # Convert to numpy arrays and move to CPU if they're on GPU
    original = original.cpu().numpy()
    reconstructed = reconstructed.cpu().numpy()
    
    # Reshape the 1D arrays to 32x32 images
    original_img = original.reshape(32, 32)
    reconstructed_img = reconstructed.reshape(32, 32)
    
    # Create a figure with two subplots side by side
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 5))
    
    # Show original image
    ax1.imshow(original_img, cmap='gray')
    ax1.set_title(f'Original (Label: {label})')
    ax1.axis('off')
    
    # Show reconstructed image
    ax2.imshow(reconstructed_img, cmap='gray')
    ax2.set_title(f'Reconstructed (Predicted: {prediction})')
    ax2.axis('off')
    
    plt.tight_layout()
    plt.show()

In [7]:
model = SemanticsMLP(input_size=1024, hidden_sizes=[10], num_classes=62).to(device)
criterion = nn.CrossEntropyLoss()
reconstruction_criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
num_epochs = 250
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    
    with tqdm(train_loader, unit="batch") as tepoch:
        for images, labels in tepoch:
            tepoch.set_description(f"Epoch {epoch+1}")
            
            images, labels = images.to(device), labels.to(device)
            
            optimizer.zero_grad()
            
            logits, reconstructed = model(images)
            
            classification_loss = criterion(logits, labels)
            reconstruction_loss = reconstruction_criterion(reconstructed, images)
            total_loss = classification_loss + reconstruction_loss
            
            total_loss.backward()
            optimizer.step()
            
            running_loss += total_loss.item()
            
            tepoch.set_postfix(loss=total_loss.item())
    
    epoch_loss = running_loss / len(train_loader)
    # print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}')

In [None]:
model.eval()
with torch.no_grad():
    images, labels = next(iter(test_loader))
    images, labels = images.to(device), labels.to(device)
    
    logits, reconstructed = model(images)
    
    _, predicted = torch.max(logits.data, 1)
    
    num_images_to_show = min(5, len(images))
    for i in range(num_images_to_show):
        show_image_comparison(
            images[i], 
            reconstructed[i], 
            labels[i].item(), 
            predicted[i].item()
        )
    
    correct = (predicted == labels).sum().item()
    total = labels.size(0)
    accuracy = 100 * correct / total
    print(f'Test Accuracy: {accuracy:.2f}%')