Skip to content

Instantly share code, notes, and snippets.

@Shamim-38
Created December 5, 2021 07:24
Show Gist options
  • Save Shamim-38/499a6551a7dd37ffc772fd9f1285038f to your computer and use it in GitHub Desktop.
Save Shamim-38/499a6551a7dd37ffc772fd9f1285038f to your computer and use it in GitHub Desktop.
#Writer: Md Shamimul Islam
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import time
import mediapipe as mp
import glob
import os
import os.path
import os
print(os.path.abspath("."))
mp_holistic = mp.solutions.holistic # Holistic model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities
def mediapipe_detection(image, model):
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
image.flags.writeable = False # Image is no longer writeable
results = model.process(image) # Make prediction
image.flags.writeable = True # Image is now writeable
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR
return image, results
def draw_landmarks(image, results):
mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_CONTOURS) # Draw face connections
mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS) # Draw pose connections
mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw left hand connections
mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw right hand connections
def draw_styled_landmarks(image, results):
# Draw face connections
mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_CONTOURS,
mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1),
mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
)
# Draw pose connections
mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4),
mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
)
# Draw left hand connections
mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4),
mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
)
# Draw right hand connections
mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4),
mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
)
"""
cap = cv2.VideoCapture(0)
# Set mediapipe model
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
while cap.isOpened():
# Read feed
ret, frame = cap.read()
# Make detections
image, results = mediapipe_detection(frame, holistic)
print(results)
# Draw landmarks
draw_styled_landmarks(image, results)
# Show to screen
cv2.imshow('OpenCV Feed', image)
# Break gracefully
if cv2.waitKey(10) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
#draw_landmarks(frame, results)
#plt.imshow(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
#### Extract Keypoint Values
#print(len(results.left_hand_landmarks.landmark))
pose = []
for res in results.pose_landmarks.landmark:
test = np.array([res.x, res.y, res.z, res.visibility])
pose.append(test)
pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(132)
face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(1404)
lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(1404)
"""
def extract_keypoints(results):
pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
return np.concatenate([pose, face, lh, rh])
#result_test = extract_keypoints(results)
#print(result_test)
#np.save('0', result_test)
def get_video_parts(video_path):
"""Given a full path to a video, return its parts."""
parts = video_path.split(os.path.sep)
filename = parts[2]
filename_no_ext = filename.split('.')[0]
classname = parts[1]
train_or_test = parts[0]
return train_or_test, classname, filename_no_ext, filename
#np.load('0.npy')
DATA_PATH = os.path.join('data')
data_file = []
classes = np.array(['33_onno', '34_sobai','35_sabdhane'])
sequence_length = 50
sequence = 0
s = 0
i = 0
j = 0
"""
for folder in classes:
print(folder)
class_classes = glob.glob(os.path.join(DATA_PATH, folder, '*'))
print(class_classes)
"""
"""
# Set mediapipe model
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
for vid_class in classes:
print(vid_class)
class_files = glob.glob(os.path.join(DATA_PATH, vid_class, '*.mp4'))
print(class_files)
i += 1
for video_path in class_files:
print(video_path)
j += 1
cap = cv2.VideoCapture(video_path)
property_id = int(cv2.CAP_PROP_FRAME_COUNT)
length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
print("The total frame number is {0}".format(str(length)))
print("The property_id number is {0}".format(str(property_id)))
# Get the parts of the file.
video_parts = get_video_parts(video_path)
#print(video_parts)
train_or_test, classname, filename_no_ext, filename = video_parts
#print(classname)
#print(filename_no_ext)
#print(filename)
sequence += 1
#while cap.isOpened():
for frame_num in range(length):
# Read feed
ret, frame = cap.read()
print(frame)
s += 1
print(i)
print(j)
print(s)
print("################# Done ########################################")
#print(frame)
image, results = mediapipe_detection(frame, holistic)
# NEW Apply wait logic
if frame_num == 0:
cv2.putText(image, 'STARTING COLLECTION', (120,200),
cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255, 0), 4, cv2.LINE_AA)
cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(filename, sequence), (15,12),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
# Show to screen
cv2.imshow('OpenCV Feed', image)
cv2.waitKey(500)
else:
cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(filename, sequence), (15,12),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
# Show to screen
cv2.imshow('OpenCV Feed', image)
# NEW Export keypoints
keypoints = extract_keypoints(results)
print(keypoints)
npy_path = os.path.join(os.path.abspath("."), DATA_PATH, classname, filename_no_ext, str(frame_num))
if not(os.path.exists(os.path.join(os.path.abspath("."), DATA_PATH, classname, filename_no_ext))):
os.mkdir(os.path.join(os.path.abspath("."), DATA_PATH, classname, filename_no_ext))
np.save(npy_path, keypoints)
cap.release()
cv2.destroyAllWindows()
"""
####################### pre-processing #############################################
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
import glob
import os
import os.path
import numpy as np
DATA_PATH = os.path.join('data')
sequence_length = 30
#lebels = ['33_onno', '34_sobai','35_sabdhane']
lebels = ['33_onno', '34_sobai', '35_sabdhane']
label_map = {label:num for num, label in enumerate(lebels)}
print(label_map)
def get_video_parts(video_path):
"""Given a full path to a video, return its parts."""
parts = video_path.split(os.path.sep)
filename = parts[2]
filename_no_ext = filename.split('.')[0]
classname = parts[1]
train_or_test = parts[0]
return train_or_test, classname, filename_no_ext, filename
step_size = 5
sequences, labels = [], []
for lebel in lebels:
class_files = glob.glob(os.path.join(DATA_PATH, lebel, '*.mp4'))
for class_file in class_files:
video_parts = get_video_parts(class_file)
train_or_test, classname, filename_no_ext, filename = video_parts
generated_files = glob.glob(os.path.join(DATA_PATH, lebel, filename_no_ext, '*.npy'))
#print(generated_files)
#print(generated_files[5])
total_npy = len(generated_files)
print("The total_npy number is {0}".format(str(total_npy)))
j = 0
i = 0
step_size = int(((total_npy-30)+5)/5)
print(step_size)
#window = []
print("The Folder name is "+ filename_no_ext)
while(i < step_size):
window = []
for frame_num in range(j, j+30):
#print(frame_num)
if j+30 <= total_npy:
print("The frame number is {0}".format(str(frame_num)))
res = np.load(os.path.join(DATA_PATH, lebel, filename_no_ext, "{}.npy".format(frame_num)))
window.append(res)
i += 1
j += 5
print(j)
#generated_files = generated_files[i * 5]
sequences.append(window)
labels.append(label_map[lebel])
"""
for frame_num in range(j, j+30):
#print(frame_num)
if j+30 < total_npy:
print("The frame number is {0}".format(str(frame_num)))
res = np.load(os.path.join(DATA_PATH, lebel, filename_no_ext, "{}.npy".format(frame_num)))
window.append(res)
#i += 1
#j += 5
print(j)
#generated_files = generated_files[i * 5]
sequences.append(window)
labels.append(label_map[lebel])
j += 5
i += 1
if i < step_size:
for frame_num in range(j, j+30):
#print(frame_num)
if j+30 < total_npy:
print("The frame number is {0}".format(str(frame_num)))
res = np.load(os.path.join(DATA_PATH, lebel, filename_no_ext, "{}.npy".format(frame_num)))
window.append(res)
else:
break
#i += 1
#j += 5
print(j)
#generated_files = generated_files[i * 5]
sequences.append(window)
labels.append(label_map[lebel])
j += 5
i += 1
if i < step_size:
for frame_num in range(j, j+30):
#print(frame_num)
if j+30 < total_npy:
print("The frame number is {0}".format(str(frame_num)))
res = np.load(os.path.join(DATA_PATH, lebel, filename_no_ext, "{}.npy".format(frame_num)))
window.append(res)
else:
break
#i += 1
#j += 5
print(j)
#generated_files = generated_files[i * 5]
sequences.append(window)
labels.append(label_map[lebel])
j += 5
i += 1
if i < step_size:
for frame_num in range(j, j+30):
#print(frame_num)
if j+30 < total_npy:
print("The frame number is {0}".format(str(frame_num)))
res = np.load(os.path.join(DATA_PATH, lebel, filename_no_ext, "{}.npy".format(frame_num)))
window.append(res)
else:
break
#i += 1
#j += 5
print(j)
#generated_files = generated_files[i * 5]
sequences.append(window)
labels.append(label_map[lebel])
j += 5
i += 1
if i < step_size:
for frame_num in range(j, j+30):
#print(frame_num)
if j+30 < total_npy:
print("The frame number is {0}".format(str(frame_num)))
res = np.load(os.path.join(DATA_PATH, lebel, filename_no_ext, "{}.npy".format(frame_num)))
window.append(res)
else:
break
#i += 1
#j += 5
print(j)
#generated_files = generated_files[i * 5]
sequences.append(window)
labels.append(label_map[lebel])
j += 5
i += 1
if i < step_size:
for frame_num in range(j, j+30):
#print(frame_num)
if j+30 < total_npy:
print("The frame number is {0}".format(str(frame_num)))
res = np.load(os.path.join(DATA_PATH, lebel, filename_no_ext, "{}.npy".format(frame_num)))
window.append(res)
else:
break
#i += 1
#j += 5
print(j)
#generated_files = generated_files[i * 5]
sequences.append(window)
labels.append(label_map[lebel])
j += 5
i += 1
if i < step_size:
for frame_num in range(j, j+30):
#print(frame_num)
if j+30 < total_npy:
print("The frame number is {0}".format(str(frame_num)))
res = np.load(os.path.join(DATA_PATH, lebel, filename_no_ext, "{}.npy".format(frame_num)))
window.append(res)
else:
break
#i += 1
#j += 5
print(j)
#generated_files = generated_files[i * 5]
sequences.append(window)
labels.append(label_map[lebel])
j += 5
i += 1
if i < step_size:
for frame_num in range(j, j+30):
#print(frame_num)
if j+30 < total_npy:
print("The frame number is {0}".format(str(frame_num)))
res = np.load(os.path.join(DATA_PATH, lebel, filename_no_ext, "{}.npy".format(frame_num)))
window.append(res)
else:
break
#i += 1
#j += 5
print(j)
#generated_files = generated_files[i * 5]
sequences.append(window)
labels.append(label_map[lebel])
"""
print(np.array(sequences).shape)
print(np.array(labels).shape)
X = np.array(sequences)
print(X)
print(X.shape)
y = to_categorical(labels).astype(int)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05)
print(y_test.shape)
############# Build and Train LSTM Neural Network ###################################
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import TensorBoard
log_dir = os.path.join('Logs')
tb_callback = TensorBoard(log_dir=log_dir)
model = Sequential()
model.add(LSTM(64, return_sequences=True, activation='relu', input_shape=(30,1662)))
model.add(LSTM(128, return_sequences=True, activation='relu'))
model.add(LSTM(64, return_sequences=False, activation='relu'))
model.add(Dense(64, activation='relu'))
model.add(Dense(32, activation='relu'))
model.add(Dense(classes.shape[0], activation='softmax'))
model.compile(optimizer='Adam', loss='categorical_crossentropy', metrics=['categorical_accuracy'])
model.fit(X_train, y_train, epochs=2000, callbacks=[tb_callback])
print(model.summary())
#### Make Predictions ################
res = model.predict(X_test)
print(classes[np.argmax(res[4])])
print(classes[np.argmax(y_test[4])])
#### Save Weights ##########
model.save('action.h5')
#del model
model.load_weights('action.h5')
################### Evaluation using Confusion Matrix and Accuracy ################
from sklearn.metrics import multilabel_confusion_matrix, accuracy_score
yhat = model.predict(X_test)
ytrue = np.argmax(y_test, axis=1).tolist()
yhat = np.argmax(yhat, axis=1).tolist()
print(multilabel_confusion_matrix(ytrue, yhat))
print(accuracy_score(ytrue, yhat))
"""
############### Test in Real Time ###########################################
from scipy import stats
colors = [(245,117,16), (117,245,16), (16,117,245)]
def prob_viz(res, actions, input_frame, colors):
output_frame = input_frame.copy()
for num, prob in enumerate(res):
cv2.rectangle(output_frame, (0,60+num*40), (int(prob*100), 90+num*40), colors[num], -1)
cv2.putText(output_frame, actions[num], (0, 85+num*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
return output_frame
plt.figure(figsize=(18,18))
plt.imshow(prob_viz(res, actions, image, colors))
# 1. New detection variables
sequence = []
sentence = []
predictions = []
threshold = 0.5
cap = cv2.VideoCapture(0)
# Set mediapipe model
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
while cap.isOpened():
# Read feed
ret, frame = cap.read()
# Make detections
image, results = mediapipe_detection(frame, holistic)
print(results)
# Draw landmarks
draw_styled_landmarks(image, results)
# 2. Prediction logic
keypoints = extract_keypoints(results)
sequence.append(keypoints)
sequence = sequence[-30:]
if len(sequence) == 30:
res = model.predict(np.expand_dims(sequence, axis=0))[0]
print(actions[np.argmax(res)])
predictions.append(np.argmax(res))
#3. Viz logic
if np.unique(predictions[-10:])[0]==np.argmax(res):
if res[np.argmax(res)] > threshold:
if len(sentence) > 0:
if lebels[np.argmax(res)] != sentence[-1]:
sentence.append(lebels[np.argmax(res)])
else:
sentence.append(lebels[np.argmax(res)])
if len(sentence) > 5:
sentence = sentence[-5:]
# Viz probabilities
image = prob_viz(res, lebels, image, colors)
cv2.rectangle(image, (0,0), (640, 40), (245, 117, 16), -1)
cv2.putText(image, ' '.join(sentence), (3,30),
cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
# Show to screen
cv2.imshow('OpenCV Feed', image)
# Break gracefully
if cv2.waitKey(10) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment