Last active
July 28, 2020 02:29
-
-
Save guoxiaolu/e5097c7e7828cdbcb7f116d4f748db14 to your computer and use it in GitHub Desktop.
高空抛物 drop judge
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import cv2 | |
import numpy as np | |
from skimage.feature import blob_dog, blob_log, blob_doh | |
from math import sqrt | |
from time import time | |
from multiprocessing import Pool, Queue | |
def GetDiff(img0,img1,thresh_diff_pix): | |
img_diff=img0-img1 | |
img_abs=np.fabs(img_diff) | |
img_abs_3c=np.sum(img_abs,axis=2)/3/255 | |
lower_th=thresh_diff_pix | |
img_range=cv2.inRange(img_abs_3c,thresh_diff_pix,1) | |
return img_range | |
def find_lines(params): | |
fid, fblobs, seeds, ishape = params | |
cnt = 0 | |
for seed in seeds: | |
for i in range(fid - 2, 0, -1): | |
if i not in fblobs or fid - 2 - i > 10: | |
break | |
ifblobs = fblobs[i] | |
imark = [0] * len(ifblobs) | |
for j, ifblob in enumerate(ifblobs): | |
cnt += 1 | |
if seed[-1][1] <= int(ifblob[0]) or abs(seed[-1][0] - int(ifblob[1])) >= ishape[1] // 10 or \ | |
(seed[-1][1] - int(ifblob[0])) >= ishape[0] // 10: | |
continue | |
if imark[j] != 0: | |
continue | |
dv = [seed[-2][0] - seed[-1][0], seed[-2][1] - seed[-1][1]] | |
dv_norm = np.linalg.norm(np.array(dv)) | |
idv = [seed[-1][0] - int(ifblob[1]), seed[-1][1] - int(ifblob[0])] | |
idv_norm = np.linalg.norm(np.array(idv)) | |
cosangle = np.array(dv).dot(np.array(idv)) / (dv_norm * idv_norm) | |
angle = np.arccos(cosangle) | |
# 0.26 is about 15 degree | |
if angle < 0.26: | |
seed.append((int(ifblob[1]), int(ifblob[0]))) | |
imark[j] = 1 | |
break | |
print (len(seeds), cnt) | |
return seeds | |
class DropDetect: | |
def __init__(self): | |
self.back_sum=None | |
self.back_avg=None | |
self.last_img=[] | |
self.img_counter=0 | |
self.thresh_diff_pix=0.1 | |
self.list_mask_area=[] | |
self.list_mask_area.append(object) | |
self.fblobs = {} | |
def Pre(self,img): | |
self.FeedBackGround(img) | |
def FeedBackGround(self,img): | |
if self.back_sum==None: | |
self.back_sum=img.astype(np.float32) | |
self.back_sum+=img.astype(np.float32) | |
self.img_counter+=1 | |
self.back_avg=self.back_sum/self.img_counter | |
def SubBack(self,img): | |
h,w,u=img.shape | |
if self.img_counter<100: | |
return np.zeros() | |
def GetBackGound(self,source,num_frames): | |
vc=cv2.VideoCapture(source) | |
res,img=vc.read() | |
# temp | |
img = cv2.resize(img, (img.shape[1] // 4, img.shape[0] // 4)) | |
img_sum=img.astype(np.float32) | |
counter=1 | |
while True: | |
res,img=vc.read() | |
if res==False: | |
break | |
# temp | |
img = cv2.resize(img, (img.shape[1] // 4, img.shape[0] // 4)) | |
img_sum+=img | |
counter+=1 | |
if num_frames<0: | |
continue | |
elif counter>num_frames: | |
break | |
back_avg=img_sum/counter | |
return back_avg | |
def GetDiff(self,img, fid): | |
img_float=img.astype(np.float32) | |
diff_avg=GetDiff(img_float, self.back_avg, self.thresh_diff_pix) | |
if self.last_img!=[]: | |
diff_last=GetDiff(img_float, self.last_img, self.thresh_diff_pix) | |
self.last_img=img_float | |
diff = diff_avg | |
mask = np.zeros(diff.shape, np.uint8) | |
mask[diff.shape[0] // 5:diff.shape[0] // 5 * 4, :] = diff[diff.shape[0] // 5:diff.shape[0] // 5 * 4, :] | |
_, labels, stats, centroids = cv2.connectedComponentsWithStats(mask) | |
# blobs_doh = blob_doh(diff, max_sigma=30, threshold=.01) | |
blobs = [] | |
for i, blob in enumerate(stats): | |
if blob[-1] <= 10 or blob[-1] > img.shape[0] * img.shape[1] / 10: | |
continue | |
cpt = centroids[i].tolist() | |
r = sqrt((cpt[0] - blob[0]) * (cpt[0] - blob[0]) + (cpt[1] - blob[1]) * (cpt[1] - blob[1])) | |
cpt.append(r) | |
blobs.append([cpt[1], cpt[0], cpt[2]]) | |
self.fblobs[fid] = blobs | |
# for blob in blobs_doh: | |
# y, x, r = blob | |
# if r <= 5: | |
# continue | |
# blobs.append(blob.tolist()) | |
# self.fblobs[fid] = blobs | |
is_drop = False | |
start = time() | |
if len(blobs) != 0 and fid - 1 in self.fblobs: | |
seeds = [] | |
for blob in blobs: | |
for lblob in self.fblobs[fid - 1]: | |
seed = [(int(blob[1]), int(blob[0])), (int(lblob[1]), int(lblob[0]))] | |
if seed[0][1] > seed[1][1] and abs(seed[0][0] - seed[1][0]) < img.shape[1] // 10 and \ | |
(seed[0][1] - seed[1][1]) < img.shape[0] // 10: | |
seeds.append(seed) | |
params = (fid, self.fblobs, seeds, img.shape) | |
find_lines(params) | |
# if len(seeds) != 0: | |
# params = [] | |
# for i in range(0, len(seeds), 10): | |
# nseeds = seeds[i:i+10] | |
# params.append((fid, self.fblobs, nseeds, img.shape)) | |
# pool = Pool(len(params)) | |
# ret_list = pool.map(find_lines, params) | |
# pool.close() | |
# pool.join() | |
# seeds = [ro for ret in ret_list for ro in ret] | |
for seed in seeds: | |
if len(seed) <= 10: | |
continue | |
# to be add | |
is_drop = True | |
drop_seed = seed | |
return diff, blobs, is_drop, drop_seed | |
end = time() | |
print (end - start) | |
return diff, blobs, is_drop, [] | |
def testdrop(): | |
fn_v="/Users/guoxiaolu/work/Drop/2020_07_03_13_56_IMG_2005.avi" | |
dd=DropDetect() | |
image_back=dd.GetBackGound(fn_v,500) | |
vc=cv2.VideoCapture(fn_v) | |
dd.back_avg=image_back | |
counter=0 | |
fid = 0 | |
basename = os.path.basename(fn_v) | |
extnames = os.path.splitext(basename) | |
fourcc = cv2.VideoWriter_fourcc(*'mpeg') | |
vname = os.path.join('./', extnames[0] + '_predict.mp4') | |
out = cv2.VideoWriter(vname, fourcc, 25, (image_back.shape[1], image_back.shape[0])) | |
start = time() | |
while True: | |
res,img=vc.read() | |
if res==False: | |
break | |
# temp | |
img = cv2.resize(img, (img.shape[1] // 4, img.shape[0] // 4)) | |
fid += 1 | |
if fid > 100: | |
break | |
diff, blobs, is_drop, drop_seed = dd.GetDiff(img, fid) | |
vis = cv2.cvtColor(diff, cv2.COLOR_GRAY2BGR) | |
for blob in blobs: | |
y, x, r = blob | |
cv2.circle(vis, (int(x), int(y)), int(r), (255, 0, 0), 2) | |
if is_drop: | |
cv2.putText(vis,'Drop!', (50, 50),cv2.FONT_HERSHEY_COMPLEX,1,(0, 0, 255), 5) | |
for i, seed in enumerate(drop_seed): | |
if i >= len(drop_seed) - 2: | |
continue | |
cv2.line(vis, seed, drop_seed[i + 1], (0, 0, 255), 2) | |
print (fid) | |
cv2.imshow("img",img) | |
cv2.imshow("diff",vis) | |
cv2.waitKey(1) | |
out.write(vis) | |
print (time() - start) | |
out.release() | |
if __name__=="__main__": | |
testdrop() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment