Class
Precision
Recall
F1-Score
Support
Bhujangasana
1.00
1.00
1.00
2108
Padmasana
1.00
1.00
1.00
2074
Shavasana
1.00
1.00
1.00
1846
Tadasana
0.99
0.99
0.99
2000
Trikonasana
1.00
1.00
1.00
1866
Vrikshasana
0.99
0.98
0.99
2102
Accuracy
0.99 (Total samples: 11,996)
Macro Avg
0.99
0.99
0.99
11996
Weighted Avg
0.99
0.99
0.99
11996
< /code>
< /div>
< /div>
< /p>
Epoch
Train Loss
Train Accuracy
Validation Loss
Validation Accuracy
Validation F1
360.00040.99990.01980.99410.9941
370.00030.99990.01780.99450.9945
380.00030.99980.02110.99390.9939
390.00350.99880.03070.99140.9914
400.00240.99920.02310.99510.9951
410.00080.99970.02130.99510.9951
420.00030.99990.02530.99470.9947
430.00140.99950.04630.98820.9882
440.00031.00000.02280.99420.9942
< /code>
< /div>
< /div>
< /p>
# -*- coding: utf-8 -*-
# Uninstalling all relevant libraries to ensure a clean state
!pip uninstall -y numpy mediapipe opencv-python tensorflow pandas
# Installing all necessary libraries together
# This helps pip resolve compatible versions
!pip install numpy mediapipe opencv-python tensorflow pandas
import kagglehub
# Download latest version
path = kagglehub.dataset_download("nandwalritik/yoga-pose-videos-dataset")
print("Path to dataset files:", path)
import os
import shutil
import re
def group_videos_by_pose(source_folder, destination_folder):
if not os.path.exists(source_folder):
print(f"Error: Source folder '{source_folder}' not found.")
return
# Create destination folder if it doesn't exist
if not os.path.exists(destination_folder):
os.makedirs(destination_folder)
print(f"Destination folder created: {destination_folder}")
print(f"Grouping videos in '{source_folder}' by pose...")
# List all files in the source folder
for item_name in os.listdir(source_folder):
item_path = os.path.join(source_folder, item_name)
# Process only video files
if os.path.isfile(item_path) and item_name.lower().endswith(".mp4"):
# Extract the pose name from the file name
# Assumes format: personname_posename.mp4 (separated by last underscore)
match = re.search(r'_([^_]+)\.mp4$', item_name, re.IGNORECASE)
if match:
pose_name = match.group(1)
# Create destination subfolder for the pose
destination_pose_folder = os.path.join(destination_folder, pose_name)
if not os.path.exists(destination_pose_folder):
os.makedirs(destination_pose_folder)
print(f"Created pose folder: {pose_name}/")
# Copy video file to the corresponding pose folder
destination_file_path = os.path.join(destination_pose_folder, item_name)
try:
shutil.copy2(item_path, destination_file_path) # Preserve metadata
print(f" '{item_name}' → '{pose_name}/'")
except Exception as e:
print(f" Error copying '{item_name}': {e}")
else:
print(f" Could not extract pose name from '{item_name}', skipped.")
elif os.path.isdir(item_path):
print(f" '{item_name}/' is a folder, skipped.")
else:
print(f" '{item_name}' is not a valid video file, skipped.")
print("
# Example usage:
# The source folder should be the extracted 'Yoga_Vid_Collected' folder from the ZIP file.
# Adjust the path depending on where you extracted the dataset.
# Your notebook variable 'extracted_video_dataset' may point to this structure.
# Example path: /content/extracted_dataset/Yoga_Vid_Collected
source_folder_path = os.path.join(path, "Yoga_Vid_Collected")
destination_folder_path = "/content/grouped_yoga_videos"
group_videos_by_pose(source_folder_path, destination_folder_path)
# You can print the folder structure afterward to verify:
# === 0. Setup and Imports ===
import os
import shutil
import zipfile
import csv
import re
import cv2
import numpy as np
# import mediapipe as mp # Uncomment when using MediaPipe
from google.colab import drive
pose_name = "Yoga_Vid_Collected"
video_folder_path = f"/content/drive/MyDrive/dataset2/{pose_name}.zip"
extracted_video_dataset = "/content/grouped_yoga_videos"
extracted_csv_dataset = "/content/extracted_csv_dataset"
drive_hedef_yolu = "MyDrive/Csv_Dataset_Mediapipe"
zip_adı = pose_name
# === 3. Keypoint Extraction with MediaPipe ===
mp_pose = mp.solutions.pose
pose_model = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5)
def extract_mediapipe_keypoints(frame):
image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = pose_model.process(image_rgb)
if not results.pose_landmarks:
return None
keypoints = []
for lm in results.pose_landmarks.landmark:
keypoints.extend([lm.x, lm.y, lm.visibility])
return keypoints # 33 keypoints * 3 values (x, y, visibility) = 99 values
# === 4. Convert a Single Video to CSV ===
def video_to_csv_mediapipe(video_path, csv_path):
cap = cv2.VideoCapture(video_path)
frame_index = 0
fps = cap.get(cv2.CAP_PROP_FPS)
with open(csv_path, mode="w", newline="", encoding="utf-8") as csv_file:
fieldnames = ["timestamp_sec"] + [f"{coord}{i}" for i in range(1, 34) for coord in ['x', 'y', 'v']]
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
writer.writeheader()
while True:
ret, frame = cap.read()
if not ret:
break
timestamp = frame_index / fps
keypoints = extract_mediapipe_keypoints(frame)
if keypoints is None:
frame_index += 1
continue
row = {"timestamp_sec": round(timestamp, 2)}
for i, val in enumerate(keypoints):
row[fieldnames[i + 1]] = val
writer.writerow(row)
frame_index += 1
cap.release()
print(f"
# === 5. Process All Videos ===
def is_video_file(filename):
return filename.lower().endswith(".mp4")
def process_all_videos_mediapipe(root_path):
for folder in os.listdir(root_path):
folder_path = os.path.join(root_path, folder)
if not os.path.isdir(folder_path):
continue
for video_file in os.listdir(folder_path):
if is_video_file(video_file):
video_path = os.path.join(folder_path, video_file)
video_id = os.path.splitext(video_file)[0]
csv_path = os.path.join(folder_path, f"{video_id}.csv")
print(f"
video_to_csv_mediapipe(video_path, csv_path)
process_all_videos_mediapipe(extracted_video_dataset)
# === 6. Move CSV Files into csv/ Subfolder ===
def move_csv_files_to_subfolder(root_folder):
for yoga_folder in os.listdir(root_folder):
folder_path = os.path.join(root_folder, yoga_folder)
if not os.path.isdir(folder_path):
continue
csv_subfolder_path = os.path.join(folder_path, "csv")
os.makedirs(csv_subfolder_path, exist_ok=True)
for file in os.listdir(folder_path):
if file.endswith(".csv"):
shutil.move(os.path.join(folder_path, file), os.path.join(csv_subfolder_path, file))
move_csv_files_to_subfolder(extracted_video_dataset)
# === 7. Copy CSV Files to Upper-Level Folder ===
def copy_csv_files_to_upper_folder(source_root_folder, target_root_folder):
if not os.path.exists(target_root_folder):
os.makedirs(target_root_folder)
for pose_folder_name in os.listdir(source_root_folder):
pose_folder_path = os.path.join(source_root_folder, pose_folder_name)
csv_subfolder = os.path.join(pose_folder_path, "csv")
if os.path.isdir(csv_subfolder):
target_pose_folder = os.path.join(target_root_folder, pose_folder_name)
os.makedirs(target_pose_folder, exist_ok=True)
for file in os.listdir(csv_subfolder):
if file.endswith(".csv"):
shutil.copy2(os.path.join(csv_subfolder, file), os.path.join(target_pose_folder, file))
copy_csv_files_to_upper_folder(extracted_video_dataset, extracted_csv_dataset)
# -*- coding: utf-8 -*-
"""TezWoA02_TrainModel_MediaPipe.ipynb"""
# === 1. Libraries and Helper Functions ===
import os
import zipfile
from shutil import rmtree
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
# Path to the folder containing extracted CSV files
extract_csv_dataset = "/content/dataset"
# === 3. Dataset Preparation ===
def load_sequences_mediapipe(base_path, test_size=0.40, window_size=50, step=10):
person_data = {}
label_map = {}
current_label = 0
for pose_folder in sorted(os.listdir(base_path)):
pose_path = os.path.join(base_path, pose_folder)
if not os.path.isdir(pose_path):
continue
if pose_folder not in label_map:
label_map[pose_folder] = current_label
current_label += 1
for file in os.listdir(pose_path):
if not file.endswith(".csv") or file.endswith("_genelEvre.csv"):
continue
person_name = file.split("_")[0]
csv_path = os.path.join(pose_path, file)
person_data.setdefault(person_name, []).append((pose_folder, csv_path))
train_people, test_people = train_test_split(sorted(person_data), test_size=test_size, random_state=42)
def augment_sequence(window, noise_std=0.005, time_shift_max=3):
# Add Gaussian noise
noise = np.random.normal(0, noise_std, window.shape)
window_noisy = window + noise
# Apply time shifting
shift = np.random.randint(-time_shift_max, time_shift_max + 1)
if shift > 0:
window_shifted = np.pad(window_noisy, ((shift, 0), (0, 0)), mode='edge')[:-shift]
elif shift < 0:
window_shifted = np.pad(window_noisy, ((0, -shift), (0, 0)), mode='edge')[-shift:]
else:
window_shifted = window_noisy
return window_shifted.astype(np.float32)
def spatial_scaling(window, scale_range=(0.9, 1.1)):
scale = np.random.uniform(*scale_range)
return (window * scale).astype(np.float32)
def mirror_pose(window):
# Mirror pose by flipping x-coordinates: new_x = 1 - x
flipped = window.copy()
flipped[:, ::2] = 1.0 - flipped[:, ::2]
return flipped.astype(np.float32)
def create_dataset(people, augment=False):
X, y = []
coord_columns = [f"{axis}{i}" for i in range(1, 34) for axis in ['x', 'y']]
for person in people:
for pose_label, csv_file in person_data[person]:
df = pd.read_csv(csv_file)
if not set(coord_columns).issubset(df.columns):
continue
features = df[coord_columns].to_numpy(dtype=np.float32)
if np.isnan(features).any() or np.isinf(features).any():
continue
for start in range(0, features.shape[0] - window_size + 1, step):
window = features[start:start + window_size]
if window.shape[0] == window_size:
X.append(window)
y.append(label_map[pose_label])
# Optional: add scaled version of the window
# X.append(spatial_scaling(window))
# y.append(label_map[pose_label])
# Add mirrored version of the window
X.append(mirror_pose(window))
y.append(label_map[pose_label])
# Add augmented version if enabled
if augment:
aug_window = augment_sequence(window)
X.append(aug_window)
y.append(label_map[pose_label])
return np.array(X), np.array(y)
X_train, y_train = create_dataset(train_people, augment=True)
X_test, y_test = create_dataset(test_people, augment=False)
return X_train, X_test, y_train, y_test, label_map
# Call the dataset loading function
X_train, X_test, y_train, y_test, label_map = load_sequences_mediapipe(extract_csv_dataset)
print("Training data:", X_train.shape)
print("Test data:", X_test.shape)
# === 4. PyTorch Dataset and DataLoader ===
import torch
from torch.utils.data import Dataset, DataLoader
class PoseSequenceDataset(torch.utils.data.Dataset):
def __init__(self, X, y):
self.X = torch.tensor(X, dtype=torch.float32)
self.y = torch.tensor(y, dtype=torch.long)
def __len__(self):
return len(self.X)
def __getitem__(self, idx):
return self.X[idx], self.y[idx]
# Create PyTorch datasets and loaders
train_dataset = PoseSequenceDataset(X_train, y_train)
test_dataset = PoseSequenceDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
# === 5. LSTM Model ===
import torch
import torch.nn as nn
class CNNPerFrame(nn.Module):
"""
A CNN block that processes a single frame of keypoints (33 keypoints × 2D coordinates).
"""
def __init__(self, input_channels=66, cnn_out_channels=32, kernel_size=3, dropout=0.2):
super(CNNPerFrame, self).__init__()
self.conv1 = nn.Conv1d(in_channels=2, out_channels=cnn_out_channels,
kernel_size=kernel_size, padding=kernel_size // 2)
self.relu = nn.ReLU()
self.bn = nn.BatchNorm1d(cnn_out_channels)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
# Input shape: (B*T, 66) → reshape to (B*T, 2, 33)
x = x.view(-1, 33, 2).permute(0, 2, 1) # Shape: (B*T, 2, 33)
x = self.conv1(x) # Shape: (B*T, cnn_out_channels, 33)
x = self.relu(x)
x = self.bn(x)
x = self.dropout(x)
return x.view(x.size(0), -1) # Flatten to shape: (B*T, cnn_out_channels * 33)
class CNNLSTM_YogaModel(nn.Module):
"""
A hybrid model that applies CNN to each frame and LSTM across time steps for yoga pose classification.
"""
def __init__(self, frame_cnn_out=32*33, lstm_hidden_size=128, num_classes=6):
super().__init__()
self.cnn_per_frame = CNNPerFrame()
self.lstm = nn.LSTM(input_size=frame_cnn_out,
hidden_size=lstm_hidden_size,
num_layers=1,
batch_first=True,
bidirectional=True)
self.fc = nn.Linear(lstm_hidden_size * 2, num_classes) # Bidirectional → 2×hidden_size
def forward(self, x):
B, T, C = x.size() # Input shape: (batch_size, time_steps, features)
x = x.view(B * T, C) # Flatten to (B*T, features) for CNN
cnn_features = self.cnn_per_frame(x) # Apply CNN → (B*T, cnn_feature_size)
cnn_features = cnn_features.view(B, T, -1) # Reshape back to (B, T, cnn_feature_size)
lstm_out, _ = self.lstm(cnn_features) # LSTM output shape: (B, T, 2*hidden_size)
out = self.fc(lstm_out[:, -1, :]) # Use the last time step: (B, num_classes)
return out
# === 6. Training Preparation ===
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import accuracy_score, f1_score
import torch.nn.functional as F
# Set device to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Compute class weights to handle class imbalance
class_weights = compute_class_weight(class_weight='balanced', classes=np.unique(y_train), y=y_train)
class_weights_tensor = torch.tensor(class_weights, dtype=torch.float32).to(device)
# Initialize the model
model = CNNLSTM_YogaModel(
frame_cnn_out=32 * 33, # Output size of CNN per frame (cnn_out_channels × number of keypoints)
lstm_hidden_size=128, # Hidden size of the LSTM
num_classes=6 # Number of yoga pose classes
).to(device)
# Define loss function with class weights and optimizer
criterion = nn.CrossEntropyLoss(weight=class_weights_tensor)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# === 7. Training Loop ===
num_epochs = 75
patience = 10
best_val_loss = float('inf')
epochs_no_improve = 0
early_stop = False
train_losses, val_losses = [], []
train_accuracies, val_accuracies, val_f1_scores = [], [], []
for epoch in range(num_epochs):
if early_stop:
break
model.train()
train_loss = 0
train_preds, train_labels = [], []
for inputs, labels in train_loader:
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
loss = criterion(outputs, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
train_loss += loss.item()
preds = torch.argmax(outputs, dim=1)
train_preds.extend(preds.cpu().numpy())
train_labels.extend(labels.cpu().numpy())
train_acc = accuracy_score(train_labels, train_preds)
# Validation
model.eval()
val_loss = 0
val_preds, val_labels = [], []
with torch.no_grad():
for inputs, labels in test_loader:
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
loss = criterion(outputs, labels)
val_loss += loss.item()
preds = torch.argmax(outputs, dim=1)
val_preds.extend(preds.cpu().numpy())
val_labels.extend(labels.cpu().numpy())
val_acc = accuracy_score(val_labels, val_preds)
val_f1 = f1_score(val_labels, val_preds, average='weighted')
# Logging
train_losses.append(train_loss / len(train_loader))
val_losses.append(val_loss / len(test_loader))
train_accuracies.append(train_acc)
val_accuracies.append(val_acc)
val_f1_scores.append(val_f1)
if val_loss < best_val_loss:
best_val_loss = val_loss
epochs_no_improve = 0
torch.save(model.state_dict(), "best_model.pt")
else:
epochs_no_improve += 1
if epochs_no_improve >= patience:
early_stop = True
print(f"Epoch {epoch+1}/{num_epochs} - loss: {train_losses[-1]:.4f} - acc: {train_acc:.4f} - "
f"val_loss: {val_losses[-1]:.4f} - val_acc: {val_acc:.4f} - val_f1: {val_f1:.4f}")
# === 8. Evaluation and Model Saving ===
from sklearn.metrics import classification_report
model.eval()
all_preds, all_labels = [], []
with torch.no_grad():
for inputs, labels in test_loader:
inputs = inputs.to(device)
outputs = model(inputs)
preds = torch.argmax(outputs, dim=1).cpu().numpy()
all_preds.extend(preds)
all_labels.extend(labels.numpy())
print("Test Accuracy:", accuracy_score(all_labels, all_preds))
print(classification_report(all_labels, all_preds, target_names=label_map.keys()))
# Save the trained model
torch.save(model.state_dict(), "pose_lstm_model.pt")
print("
# === Loss and Accuracy Plots ===
import matplotlib.pyplot as plt
epochs = range(1, len(train_losses) + 1)
plt.figure(figsize=(14, 5))
# --- LOSS PLOT ---
plt.subplot(1, 2, 1)
plt.plot(epochs, train_losses, label="Train Loss")
plt.plot(epochs, val_losses, label="Validation Loss")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.title("Loss Curve")
plt.legend()
plt.grid(True)
# --- ACCURACY PLOT ---
plt.subplot(1, 2, 2)
plt.plot(epochs, train_accuracies, label="Train Accuracy")
plt.plot(epochs, val_accuracies, label="Validation Accuracy")
plt.xlabel("Epoch")
plt.ylabel("Accuracy")
plt.title("Accuracy Curve")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
# === Confusion Matrix ===
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
# Compute confusion matrix
cm = confusion_matrix(all_labels, all_preds)
# Class label names
class_names = list(label_map.keys())
# Display
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=class_names)
fig, ax = plt.subplots(figsize=(10, 10))
disp.plot(ax=ax, cmap=plt.cm.Blues, xticks_rotation=45)
plt.title("Confusion Matrix")
plt.grid(False)
plt.show()
< /code>
Я обучаю модель CNN+Bilstm в Pytorch для классификации поз йоги с использованием 2D (x, Y) координат из 33 клавиатур, извлеченных с помощью MediaPipe. Каждый кадр передается через 1D CNN для извлечения пространственных особенностей, которые затем подаются в слой Bilstm для временного моделирования. Модель достигает превосходного теста F1-показателя 0,99, но кривые обучения и точности валидации и потерь заметно колеблются из-за эпох. Я использую Crossentropyloss с весами класса, Adam Optimizer (LR = 0,001), размером партии 32 и ранней остановкой (терпение = 10). Набор данных разделен на субъект (60% тест на поезд / 40%), а увеличение данных (гауссовый шум, зеркалирование, переключение времени) применяется только на учебном наборе. Несмотря на тесные значения потери поезда/валидации и отсутствие четких признаков переживания, нестабильность в графах потерь/точности заставляет меня задаться вопросом, является ли это поведение нормальным. Может ли это быть связано со структурой модели, стратегией увеличения или чем -то еще в моей петле обучения?
Подробнее здесь: https://stackoverflow.com/questions/796 ... h-f1-score