Skip to content

Instantly share code, notes, and snippets.

@claudiofahey
Last active March 10, 2020 15:54
Show Gist options
  • Save claudiofahey/46303ab950f1246c5ab63c64b988dbee to your computer and use it in GitHub Desktop.
Save claudiofahey/46303ab950f1246c5ab63c64b988dbee to your computer and use it in GitHub Desktop.
Sample Python app for ingesting a video camera feed into Pravega
#!/usr/bin/env python3
"""
This script reads video from a camera and writes the frames to Pravega.
It also detects motion and can send alerts (via an HTTP call)
when motion is above a threshold.
It uses the Pravega GRPC Gateway (https://github.com/pravega/pravega-grpc-gateway).
"""
import cv2
import logging
import time
import grpc
import json
import base64
import argparse
import multiprocessing
import queue
import traceback
import numpy as np
import requests
import pravega.grpc_gateway as pravega
def video_capture_generator(vidcap, args):
while True:
pos_frames = vidcap.get(cv2.CAP_PROP_POS_FRAMES)
success, image = vidcap.read()
if success:
video_frame = dict(
image=image,
frameNumber=int(pos_frames),
timestamp=int(time.time() * 1000),
)
yield video_frame
def crop_image(img, top=0, bottom=100, left=90, right=235):
cropped = img[top:bottom,left:right,:]
return cropped
def image_pixel_distance(img1, img2):
"""outputs pythagorean distance between two frames"""
img1_32 = np.float32(img1) / 255.0
img2_32 = np.float32(img2) / 255.0
diff = img1_32 - img2_32
dist = np.sqrt(diff[:,:,0]**2 + diff[:,:,1]**2 + diff[:,:,2]**2) / np.sqrt(3.0)
return dist
def image_distance(img1, img2, blur=15, pixel_threshold=0.06, max_valid_distance=0.12):
img1 = crop_image(img1)
img2 = crop_image(img2)
pixel_dist = image_pixel_distance(img1, img2)
blurred_dist = cv2.GaussianBlur(pixel_dist, (blur,blur), 0)
_, thresh = cv2.threshold(blurred_dist, pixel_threshold, 1.0, 0)
distance = thresh.mean()
# If too many pixels changed, it is likely that the lighting changed.
# This should not be detected as motion.
if distance > max_valid_distance:
distance = 0.0 - distance
return distance
def goaway(args):
logging.error('#################### MOTION DETECTED ####################')
response = requests.post(args.goaway_url)
logging.info('goaway: response=%s' % str(response))
def inference_generator(video_frames, args):
prev_video_frame = None
for video_frame in video_frames:
if prev_video_frame is None:
video_frame['distance'] = 0.0
video_frame['has_motion'] = False
video_frame['alert'] = False
else:
distance = float(image_distance(prev_video_frame['image'], video_frame['image']))
has_motion = distance > args.motion_threshold
alert = (has_motion or distance < 0.0)
logging.info('run_inference_process: alert=%d, has_motion=%d, distance=%f',
alert, has_motion, distance)
video_frame['distance'] = distance
video_frame['has_motion'] = has_motion
video_frame['alert'] = alert
if has_motion:
goaway(args)
prev_video_frame = video_frame
yield video_frame
def events_to_write_generator(video_frame_iter, scope, stream, args):
for video_frame in video_frame_iter:
event_dict = video_frame.copy()
event_dict['camera'] = args.camera
event_dict['ssrc'] = 0
success, png_array = cv2.imencode('.png', video_frame['image'])
event_dict['data'] = base64.b64encode(png_array.tobytes()).decode(encoding='UTF-8')
del event_dict['image']
to_log_dict = event_dict.copy()
to_log_dict['data'] = '(%d bytes)' % len(event_dict['data'])
logging.info('events_to_write_generator: ' + json.dumps(to_log_dict))
event_json = json.dumps(event_dict)
event_bytes = event_json.encode(encoding='UTF-8')
event_to_write = pravega.pb.WriteEventsRequest(
scope=scope,
stream=stream,
event=event_bytes,
routing_key=str(args.camera),
)
yield event_to_write
def run_write_to_pravega_process(queue, scope, stream, args):
"""Read events from a queue and write them to Pravega."""
logging.info('run_write_to_pravega_process: BEGIN')
logging.info('run_write_to_pravega_process: args=%s' % str(args))
while True:
try:
with grpc.insecure_channel(args.gateway) as pravega_channel:
pravega_client = pravega.grpc.PravegaGatewayStub(pravega_channel)
if args.create_scope:
request = pravega.pb.CreateScopeRequest(scope=args.scope)
logging.info('run_write_to_pravega_process: CreateScope request=%s' % request)
response = pravega_client.CreateScope(request)
logging.info('run_write_to_pravega_process: CreateScope response=%s' % response)
if args.create_stream:
request = pravega.pb.CreateStreamRequest(
scope=scope,
stream=stream,
scaling_policy=pravega.pb.ScalingPolicy(min_num_segments=1),
)
logging.info('run_write_to_pravega_process: CreateStream request=%s' % request)
response = pravega_client.CreateStream(request)
logging.info('run_write_to_pravega_process: CreateStream response=%s' % response)
queue_iter = iter(queue.get, None)
write_response = pravega_client.WriteEvents(
events_to_write_generator(queue_iter, scope, stream, args))
logging.info('run_write_to_pravega_process: WriteEvents response=%s' % write_response)
except Exception as e:
logging.error('run_write_to_pravega_process: ' + traceback.format_exc())
time.sleep(5)
def main():
logging.basicConfig(level=logging.DEBUG)
logging.info('BEGIN')
parser = argparse.ArgumentParser()
parser.add_argument('--camera', type=int, default=0)
parser.add_argument('--no_create_scope', dest='create_scope', action='store_false')
parser.add_argument('--no_create_stream', dest='create_stream', action='store_false')
parser.add_argument('--gateway', default='docker1:54672')
parser.add_argument('--goaway_url', default='http://jetson:5001/cataway/goaway')
parser.add_argument('--scope', default='examples')
parser.add_argument('--stream', default='jetsoncamera1')
parser.add_argument('--alert_stream', default='alert1')
parser.add_argument('--motion_threshold', type=float, default=0.0)
args = parser.parse_args()
logging.info('args=%s' % str(args))
# We use multiprocessing queues to allow inference to occur even if the writes to Pravega fails.
write_to_pravega_queue = None
if args.stream:
write_to_pravega_queue = multiprocessing.Queue(1)
write_to_pravega_process = multiprocessing.Process(
target=run_write_to_pravega_process,
args=(write_to_pravega_queue, args.scope, args.stream, args))
write_to_pravega_process.start()
write_alerts_to_pravega_queue = None
if args.alert_stream:
write_alerts_to_pravega_queue = multiprocessing.Queue(5)
write_alerts_to_pravega_process = multiprocessing.Process(
target=run_write_to_pravega_process,
args=(write_alerts_to_pravega_queue, args.scope, args.alert_stream, args))
write_alerts_to_pravega_process.start()
gscmd = """
nvarguscamerasrc
! video/x-raw(memory:NVMM), width=1920, height=1080, framerate=2/1, format=NV12
! nvvidconv flip-method=0
! video/x-raw, width=320, height=200, format=BGRx
! videoconvert
! video/x-raw, format=BGR
! appsink
"""
vidcap = cv2.VideoCapture(gscmd, cv2.CAP_GSTREAMER)
logging.info(vidcap)
video_frames = video_capture_generator(vidcap, args)
video_frames_with_inference = inference_generator(video_frames, args)
for video_frame in video_frames_with_inference:
try:
if write_alerts_to_pravega_queue and video_frame['alert']:
write_alerts_to_pravega_queue.put(video_frame, False)
if write_to_pravega_queue:
write_to_pravega_queue.put(video_frame, False)
except queue.Full:
logging.warning('Pravega queue is full')
logging.info('END')
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment