semantics/bel_semantics.ipynb

363 lines
15 KiB
Text

{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import torch\n",
"\n",
"import torch.nn as nn\n",
"import torch.optim as optim\n",
"import torch.nn.functional as F\n",
"import numpy as np\n",
"import pandas as pd\n",
"import matplotlib.pyplot as plt\n",
"\n",
"from torch.utils.data import DataLoader, TensorDataset\n",
"from sklearn.model_selection import train_test_split\n",
"from tqdm import tqdm"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Using device: cuda\n"
]
}
],
"source": [
"# Check if CUDA is available\n",
"device = torch.device(\"cuda\" if torch.cuda.is_available() else \"cpu\")\n",
"print(f\"Using device: {device}\")"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Train set shape: (2860, 1024)\n",
"Validation set shape: (715, 1024)\n",
"Test set shape: (1000, 1024)\n"
]
}
],
"source": [
"data = pd.read_csv('data/bel_data_test.csv')\n",
"data = np.array(data)\n",
"\n",
"# Split features and labels\n",
"X = data[:, 1:] # All columns except the first one\n",
"y = data[:, 0].astype(int) # First column as labels\n",
"\n",
"# Create test set from the first thousand rows\n",
"X_test = X[:1000]\n",
"y_test = y[:1000]\n",
"\n",
"# Use the remaining data for train and validation\n",
"X_remaining = X[1000:]\n",
"y_remaining = y[1000:]\n",
"\n",
"# Split the remaining data into training and validation sets\n",
"X_train, X_val, y_train, y_val = train_test_split(X_remaining, y_remaining, test_size=0.2, random_state=42)\n",
"\n",
"# Print the shapes of the resulting sets\n",
"print(f\"Train set shape: {X_train.shape}\")\n",
"print(f\"Validation set shape: {X_val.shape}\")\n",
"print(f\"Test set shape: {X_test.shape}\")"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"# # Convert to PyTorch tensors\n",
"# X_train_tensor = torch.FloatTensor(X_train)\n",
"# y_train_tensor = torch.LongTensor(y_train)\n",
"# X_test_tensor = torch.FloatTensor(X_test)\n",
"# y_test_tensor = torch.LongTensor(y_test)\n",
"\n",
"# # Create DataLoader objects\n",
"# train_dataset = TensorDataset(X_train_tensor, y_train_tensor)\n",
"# test_dataset = TensorDataset(X_test_tensor, y_test_tensor)\n",
"\n",
"# train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)\n",
"# test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"class SemanticsMLP:\n",
" def __init__(self, input_size=1024, hidden_sizes=[512, 256, 128], num_classes=62):\n",
" self.input_size = input_size\n",
" self.hidden_sizes = hidden_sizes\n",
" self.num_classes = num_classes\n",
"\n",
" # Initialize weights and biases\n",
" self.encoder_weights = []\n",
" self.encoder_biases = []\n",
" prev_size = input_size\n",
" for hidden_size in hidden_sizes:\n",
" self.encoder_weights.append(np.random.randn(prev_size, hidden_size) * np.sqrt(2. / prev_size))\n",
" self.encoder_biases.append(np.zeros(hidden_size))\n",
" prev_size = hidden_size\n",
"\n",
" self.classifier_weight = np.random.randn(hidden_sizes[-1], num_classes) * np.sqrt(2. / hidden_sizes[-1])\n",
" self.classifier_bias = np.zeros(num_classes)\n",
"\n",
" self.decoder_weights = []\n",
" self.decoder_biases = []\n",
" reversed_hidden_sizes = list(reversed(hidden_sizes))\n",
" prev_size = hidden_sizes[-1]\n",
" for hidden_size in reversed_hidden_sizes[1:] + [input_size]:\n",
" self.decoder_weights.append(np.random.randn(prev_size, hidden_size) * np.sqrt(2. / prev_size))\n",
" self.decoder_biases.append(np.zeros(hidden_size))\n",
" prev_size = hidden_size\n",
"\n",
" def relu(self, x):\n",
" return np.maximum(0, x)\n",
"\n",
" def encode(self, x):\n",
" for weight, bias in zip(self.encoder_weights, self.encoder_biases):\n",
" x = self.relu(np.dot(x, weight) + bias)\n",
" return x\n",
"\n",
" def decode(self, x):\n",
" for weight, bias in zip(self.decoder_weights[:-1], self.decoder_biases[:-1]):\n",
" x = self.relu(np.dot(x, weight) + bias)\n",
" x = np.dot(x, self.decoder_weights[-1]) + self.decoder_biases[-1] # No activation on the final layer\n",
" return x\n",
"\n",
" def forward(self, x):\n",
" encoded = self.encode(x)\n",
" logits = np.dot(encoded, self.classifier_weight) + self.classifier_bias\n",
" reconstructed = self.decode(encoded)\n",
" return logits, reconstructed"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"def softmax(x):\n",
" exp_x = np.exp(x - np.max(x, axis=1, keepdims=True))\n",
" return exp_x / np.sum(exp_x, axis=1, keepdims=True)\n",
"\n",
"def cross_entropy_loss(y_pred, y_true):\n",
" m = y_true.shape[0]\n",
" p = softmax(y_pred)\n",
" log_likelihood = -np.log(p[range(m), y_true])\n",
" loss = np.sum(log_likelihood) / m\n",
" return loss\n",
"\n",
"def cross_entropy_gradient(y_pred, y_true):\n",
" m = y_true.shape[0]\n",
" grad = softmax(y_pred)\n",
" grad[range(m), y_true] -= 1\n",
" grad = grad / m\n",
" return grad\n",
"\n",
"def mse_loss(y_pred, y_true):\n",
" return np.mean((y_pred - y_true) ** 2)\n",
"\n",
"def mse_gradient(y_pred, y_true):\n",
" return 2 * (y_pred - y_true) / y_true.shape[0]\n",
"\n",
"def train_step(model, X, y, learning_rate):\n",
" # Forward pass\n",
" logits, reconstructed = model.forward(X)\n",
" \n",
" # Compute gradients\n",
" ce_grad = cross_entropy_gradient(logits, y)\n",
" mse_grad = mse_gradient(reconstructed, X)\n",
" \n",
" # Backpropagation (simplified, not computing full gradients for all layers)\n",
" encoded = model.encode(X)\n",
" \n",
" # Update classifier\n",
" model.classifier_weight -= learning_rate * np.dot(encoded.T, ce_grad)\n",
" model.classifier_bias -= learning_rate * np.sum(ce_grad, axis=0)\n",
" \n",
" # Update decoder (last layer only for simplicity)\n",
" decoder_grad = np.dot(encoded.T, mse_grad)\n",
" if decoder_grad.shape != model.decoder_weights[-1].shape:\n",
" raise ValueError(f\"Shape mismatch: decoder_grad {decoder_grad.shape}, decoder_weights[-1] {model.decoder_weights[-1].shape}\")\n",
" model.decoder_weights[-1] -= learning_rate * decoder_grad\n",
" model.decoder_biases[-1] -= learning_rate * np.sum(mse_grad, axis=0)\n",
" \n",
" # Compute loss\n",
" ce_loss = cross_entropy_loss(logits, y)\n",
" mse_loss_val = mse_loss(reconstructed, X)\n",
" \n",
" return ce_loss, mse_loss_val\n",
"\n",
"def evaluate(model, X, y):\n",
" logits, reconstructed = model.forward(X)\n",
" ce_loss = cross_entropy_loss(logits, y)\n",
" mse_loss_val = mse_loss(reconstructed, X)\n",
" accuracy = np.mean(np.argmax(logits, axis=1) == y)\n",
" return ce_loss, mse_loss_val, accuracy"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"def show_image_comparison(original, reconstructed, label, prediction):\n",
" \"\"\"\n",
" Display the original and reconstructed images side by side.\n",
" \n",
" :param original: Original image (1D tensor of 1024 elements)\n",
" :param reconstructed: Reconstructed image (1D tensor of 1024 elements)\n",
" :param label: True label of the image\n",
" :param prediction: Predicted label of the image\n",
" \"\"\"\n",
" # Convert to numpy arrays and move to CPU if they're on GPU\n",
" original = original.cpu().numpy()\n",
" reconstructed = reconstructed.cpu().numpy()\n",
" \n",
" # Reshape the 1D arrays to 32x32 images\n",
" original_img = original.reshape(32, 32)\n",
" reconstructed_img = reconstructed.reshape(32, 32)\n",
" \n",
" # Create a figure with two subplots side by side\n",
" fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(10, 5))\n",
" \n",
" # Show original image\n",
" ax1.imshow(original_img, cmap='gray')\n",
" ax1.set_title(f'Original (Label: {label})')\n",
" ax1.axis('off')\n",
" \n",
" # Show reconstructed image\n",
" ax2.imshow(reconstructed_img, cmap='gray')\n",
" ax2.set_title(f'Reconstructed (Predicted: {prediction})')\n",
" ax2.axis('off')\n",
" \n",
" plt.tight_layout()\n",
" plt.show()"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"input_size = X_train.shape[1]\n",
"num_classes = len(np.unique(y))\n",
"model = SemanticsMLP(input_size=input_size, hidden_sizes=[512, 256, 128], num_classes=num_classes)"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"ename": "ValueError",
"evalue": "operands could not be broadcast together with shapes (512,1024) (128,1024) (512,1024) ",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mValueError\u001b[0m Traceback (most recent call last)",
"Cell \u001b[0;32mIn[9], line 17\u001b[0m\n\u001b[1;32m 14\u001b[0m X_batch \u001b[38;5;241m=\u001b[39m X_train[i:i\u001b[38;5;241m+\u001b[39mbatch_size]\n\u001b[1;32m 15\u001b[0m y_batch \u001b[38;5;241m=\u001b[39m y_train[i:i\u001b[38;5;241m+\u001b[39mbatch_size]\n\u001b[0;32m---> 17\u001b[0m ce_loss, mse_loss_val \u001b[38;5;241m=\u001b[39m \u001b[43mtrain_step\u001b[49m\u001b[43m(\u001b[49m\u001b[43mmodel\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mX_batch\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43my_batch\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mlearning_rate\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 19\u001b[0m \u001b[38;5;66;03m# Evaluate on validation set (using test set as validation for simplicity)\u001b[39;00m\n\u001b[1;32m 20\u001b[0m val_ce_loss, val_mse_loss, val_accuracy \u001b[38;5;241m=\u001b[39m evaluate(model, X_test, y_test)\n",
"Cell \u001b[0;32mIn[6], line 41\u001b[0m, in \u001b[0;36mtrain_step\u001b[0;34m(model, X, y, learning_rate)\u001b[0m\n\u001b[1;32m 38\u001b[0m model\u001b[38;5;241m.\u001b[39mclassifier_bias \u001b[38;5;241m-\u001b[39m\u001b[38;5;241m=\u001b[39m learning_rate \u001b[38;5;241m*\u001b[39m np\u001b[38;5;241m.\u001b[39msum(ce_grad, axis\u001b[38;5;241m=\u001b[39m\u001b[38;5;241m0\u001b[39m)\n\u001b[1;32m 40\u001b[0m \u001b[38;5;66;03m# Update decoder (last layer only for simplicity)\u001b[39;00m\n\u001b[0;32m---> 41\u001b[0m \u001b[43mmodel\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdecoder_weights\u001b[49m\u001b[43m[\u001b[49m\u001b[38;5;241;43m-\u001b[39;49m\u001b[38;5;241;43m1\u001b[39;49m\u001b[43m]\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m-\u001b[39;49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43m \u001b[49m\u001b[43mlearning_rate\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43m \u001b[49m\u001b[43mnp\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdot\u001b[49m\u001b[43m(\u001b[49m\u001b[43mencoded\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mT\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mmse_grad\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 42\u001b[0m model\u001b[38;5;241m.\u001b[39mdecoder_biases[\u001b[38;5;241m-\u001b[39m\u001b[38;5;241m1\u001b[39m] \u001b[38;5;241m-\u001b[39m\u001b[38;5;241m=\u001b[39m learning_rate \u001b[38;5;241m*\u001b[39m np\u001b[38;5;241m.\u001b[39msum(mse_grad, axis\u001b[38;5;241m=\u001b[39m\u001b[38;5;241m0\u001b[39m)\n\u001b[1;32m 44\u001b[0m \u001b[38;5;66;03m# Compute loss\u001b[39;00m\n",
"\u001b[0;31mValueError\u001b[0m: operands could not be broadcast together with shapes (512,1024) (128,1024) (512,1024) "
]
}
],
"source": [
"epochs = 100\n",
"batch_size = 32\n",
"learning_rate = 0.001\n",
"\n",
"for epoch in range(epochs):\n",
" # Shuffle the training data\n",
" indices = np.arange(X_train.shape[0])\n",
" np.random.shuffle(indices)\n",
" X_train = X_train[indices]\n",
" y_train = y_train[indices]\n",
" \n",
" # Mini-batch training\n",
" for i in range(0, X_train.shape[0], batch_size):\n",
" X_batch = X_train[i:i+batch_size]\n",
" y_batch = y_train[i:i+batch_size]\n",
" \n",
" try:\n",
" ce_loss, mse_loss_val = train_step(model, X_batch, y_batch, learning_rate)\n",
" except ValueError as e:\n",
" print(f\"Error in batch {i // batch_size}: {e}\")\n",
" print(f\"X_batch shape: {X_batch.shape}\")\n",
" print(f\"y_batch shape: {y_batch.shape}\")\n",
" raise\n",
" \n",
" # Evaluate on validation set\n",
" val_ce_loss, val_mse_loss, val_accuracy = evaluate(model, X_val, y_val)\n",
" \n",
" if epoch % 10 == 0:\n",
" print(f\"Epoch {epoch}, Val CE Loss: {val_ce_loss:.4f}, Val MSE Loss: {val_mse_loss:.4f}, Val Accuracy: {val_accuracy:.4f}\")\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"test_ce_loss, test_mse_loss, test_accuracy = evaluate(model, X_test, y_test)\n",
"print(f\"Final Test CE Loss: {test_ce_loss:.4f}, Test MSE Loss: {test_mse_loss:.4f}, Test Accuracy: {test_accuracy:.4f}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "semantics",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.1"
}
},
"nbformat": 4,
"nbformat_minor": 2
}