Skip to content

Instantly share code, notes, and snippets.

@willpatera
Last active August 29, 2015 14:14
Show Gist options
  • Save willpatera/8574c65348194f7161a1 to your computer and use it in GitHub Desktop.
Save willpatera/8574c65348194f7161a1 to your computer and use it in GitHub Desktop.
Demo code for Pupil Google Group

Eye Image Screen Capture and Apparent Pupil Size

This gist contains modified source code and an example plugin for the Pupil Google Group as a demonstration of concept. This code runs, but is not intended for distribution (only as one potential starting point for other users who might want to further develop a plugin that saves eye images in real-time along with pupil data.

Changes

screen_cap_eye.py

  • the very rough example of the plugin that could be developed to achieve eye screen captures along with other pupil data.

eye.py

world.py

recorder.py

  • changes to the eye_tx messages and format
'''
(*)~----------------------------------------------------------------------------------
Pupil - eye tracking platform
Copyright (C) 2012-2014 Pupil Labs
Distributed under the terms of the CC BY-NC-SA License.
License details are in the file license.txt, distributed as part of this software.
----------------------------------------------------------------------------------~(*)
'''
import os
from time import time, sleep
from file_methods import Persistent_Dict
import logging
from ctypes import c_int,c_bool,c_float
import numpy as np
import atb
from glfw import *
from gl_utils import basic_gl_setup,adjust_gl_view, clear_gl_screen, draw_gl_point_norm,make_coord_system_pixel_based,make_coord_system_norm_based,create_named_texture,draw_named_texture,draw_gl_polyline
from methods import *
from uvc_capture import autoCreateCapture, FileCaptureError, EndofVideoFileError, CameraCaptureError
from calibrate import get_map_from_cloud
from pupil_detectors import Canny_Detector,MSER_Detector,Blob_Detector
def eye(g_pool,cap_src,cap_size):
"""
Creates a window, gl context.
Grabs images from a capture.
Streams Pupil coordinates into g_pool.pupil_queue
"""
# modify the root logger for this process
logger = logging.getLogger()
# remove inherited handlers
logger.handlers = []
# create file handler which logs even debug messages
fh = logging.FileHandler(os.path.join(g_pool.user_dir,'eye.log'),mode='w')
fh.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.WARNING)
# create formatter and add it to the handlers
formatter = logging.Formatter('EYE Process: %(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
formatter = logging.Formatter('E Y E Process [%(levelname)s] %(name)s : %(message)s')
ch.setFormatter(formatter)
# add the handlers to the logger
logger.addHandler(fh)
logger.addHandler(ch)
# create logger for the context of this function
logger = logging.getLogger(__name__)
# Callback functions
def on_resize(window,w, h):
adjust_gl_view(w,h,window)
norm_size = normalize((w,h),glfwGetWindowSize(window))
fb_size = denormalize(norm_size,glfwGetFramebufferSize(window))
atb.TwWindowSize(*map(int,fb_size))
def on_key(window, key, scancode, action, mods):
if not atb.TwEventKeyboardGLFW(key,int(action == GLFW_PRESS)):
if action == GLFW_PRESS:
if key == GLFW_KEY_ESCAPE:
on_close(window)
def on_char(window,char):
if not atb.TwEventCharGLFW(char,1):
pass
def on_button(window,button, action, mods):
if not atb.TwEventMouseButtonGLFW(button,int(action == GLFW_PRESS)):
if action == GLFW_PRESS:
if bar.display.value ==1:
pos = glfwGetCursorPos(window)
pos = normalize(pos,glfwGetWindowSize(window))
pos = denormalize(pos,(frame.img.shape[1],frame.img.shape[0]) ) # pos in frame.img pixels
u_r.setStart(pos)
bar.draw_roi.value = 1
else:
bar.draw_roi.value = 0
def on_pos(window,x, y):
norm_pos = normalize((x,y),glfwGetWindowSize(window))
fb_x,fb_y = denormalize(norm_pos,glfwGetFramebufferSize(window))
if atb.TwMouseMotion(int(fb_x),int(fb_y)):
pass
if bar.draw_roi.value == 1:
pos = denormalize(norm_pos,(frame.img.shape[1],frame.img.shape[0]) ) # pos in frame.img pixels
u_r.setEnd(pos)
def on_scroll(window,x,y):
if not atb.TwMouseWheel(int(x)):
pass
def on_close(window):
g_pool.quit.value = True
logger.info('Process closing from window')
# Helper functions called by the main atb bar
def start_roi():
bar.display.value = 1
bar.draw_roi.value = 2
def update_fps():
old_time, bar.timestamp = bar.timestamp, time()
dt = bar.timestamp - old_time
if dt:
bar.fps.value += .05 * (1. / dt - bar.fps.value)
bar.dt.value = dt
def get_from_data(data):
"""
helper for atb getter and setter use
"""
return data.value
# load session persistent settings
session_settings = Persistent_Dict(os.path.join(g_pool.user_dir,'user_settings_eye') )
def load(var_name,default):
return session_settings.get(var_name,default)
def save(var_name,var):
session_settings[var_name] = var
# Initialize capture
cap = autoCreateCapture(cap_src, cap_size,timebase=g_pool.timebase)
if cap is None:
logger.error("Did not receive valid Capture")
return
# check if it works
frame = cap.get_frame()
if frame.img is None:
logger.error("Could not retrieve image from capture")
cap.close()
return
height,width = frame.img.shape[:2]
u_r = Roi(frame.img.shape)
u_r.set(load('roi',default=None))
writer = None
pupil_detector = Canny_Detector(g_pool)
atb.init()
# Create main ATB Controls
bar = atb.Bar(name = "Eye", label="Display",
help="Scene controls", color=(50, 50, 50), alpha=100,
text='light', position=(10, 10),refresh=.3, size=(200, 100))
bar.fps = c_float(0.0)
bar.timestamp = time()
bar.dt = c_float(0.0)
bar.sleep = c_float(0.0)
bar.display = c_int(load('bar.display',0))
bar.draw_pupil = c_bool(load('bar.draw_pupil',True))
bar.draw_roi = c_int(0)
dispay_mode_enum = atb.enum("Mode",{"Camera Image":0,
"Region of Interest":1,
"Algorithm":2,
"CPU Save": 3})
bar.add_var("FPS",bar.fps, step=1.,readonly=True)
bar.add_var("Mode", bar.display,vtype=dispay_mode_enum, help="select the view-mode")
bar.add_var("Show_Pupil_Point", bar.draw_pupil)
bar.add_button("Draw_ROI", start_roi, help="drag on screen to select a region of interest")
bar.add_var("SlowDown",bar.sleep, step=0.01,min=0.0)
bar.add_var("SaveSettings&Exit", g_pool.quit)
cap.create_atb_bar(pos=(220,10))
# create a bar for the detector
pupil_detector.create_atb_bar(pos=(10,120))
glfwInit()
window = glfwCreateWindow(width, height, "Eye", None, None)
glfwMakeContextCurrent(window)
# Register callbacks window
glfwSetWindowSizeCallback(window,on_resize)
glfwSetWindowCloseCallback(window,on_close)
glfwSetKeyCallback(window,on_key)
glfwSetCharCallback(window,on_char)
glfwSetMouseButtonCallback(window,on_button)
glfwSetCursorPosCallback(window,on_pos)
glfwSetScrollCallback(window,on_scroll)
glfwSetWindowPos(window,800,0)
on_resize(window,width,height)
# gl_state settings
basic_gl_setup()
g_pool.image_tex = create_named_texture(frame.img)
# refresh speed settings
glfwSwapInterval(0)
# event loop
while not g_pool.quit.value:
# Get an image from the grabber
try:
frame = cap.get_frame()
except CameraCaptureError:
logger.error("Capture from Camera Failed. Stopping.")
break
except EndofVideoFileError:
logger.warning("Video File is done. Stopping")
break
update_fps()
sleep(bar.sleep.value) # for debugging only
### RECORDING of Eye Video (on demand) ###
# Setup variables and lists for recording
if g_pool.eye_rx.poll():
command = g_pool.eye_rx.recv()
if command[0] == 'start_record_eye':
record_path = command[1] # get the next thing in the pipe
logger.info("Will save eye video to: %s"%record_path)
video_path = os.path.join(record_path, "eye.avi")
timestamps_path = os.path.join(record_path, "eye_timestamps.npy")
writer = cv2.VideoWriter(video_path, cv2.cv.CV_FOURCC(*'DIVX'), bar.fps.value, (frame.img.shape[1], frame.img.shape[0]))
timestamps = []
if command[0] == "stop_record_eye":
logger.info("Done recording eye.")
writer = None
np.save(timestamps_path,np.asarray(timestamps))
del timestamps
if command[0] == "screen_cap_eye":
file_path = command[1]
cv2.imwrite(file_path, frame.img)
if writer:
writer.write(frame.img)
timestamps.append(frame.timestamp)
# pupil ellipse detection
result = pupil_detector.detect(frame,user_roi=u_r,visualize=bar.display.value == 2)
# stream the result
g_pool.pupil_queue.put(result)
# VISUALIZATION direct visualizations on the frame.img data
if bar.display.value == 1:
# and a solid (white) frame around the user defined ROI
r_img = frame.img[u_r.lY:u_r.uY,u_r.lX:u_r.uX]
r_img[:,0] = 255,255,255
r_img[:,-1]= 255,255,255
r_img[0,:] = 255,255,255
r_img[-1,:]= 255,255,255
# GL-drawing
clear_gl_screen()
make_coord_system_norm_based()
if bar.display.value != 3:
draw_named_texture(g_pool.image_tex,frame.img)
else:
draw_named_texture(g_pool.image_tex)
make_coord_system_pixel_based(frame.img.shape)
if result['norm_pupil'] is not None and bar.draw_pupil.value:
if result.has_key('axes'):
pts = cv2.ellipse2Poly( (int(result['center'][0]),int(result['center'][1])),
(int(result["axes"][0]/2),int(result["axes"][1]/2)),
int(result["angle"]),0,360,15)
draw_gl_polyline(pts,(1.,0,0,.5))
draw_gl_point_norm(result['norm_pupil'],color=(1.,0.,0.,0.5))
atb.draw()
glfwSwapBuffers(window)
glfwPollEvents()
# END while running
# in case eye reconding was still runnnig: Save&close
if writer:
logger.info("Done recording eye.")
writer = None
np.save(timestamps_path,np.asarray(timestamps))
# save session persistent settings
save('roi',u_r.get())
save('bar.display',bar.display.value)
save('bar.draw_pupil',bar.draw_pupil.value)
session_settings.close()
pupil_detector.cleanup()
cap.close()
atb.terminate()
glfwDestroyWindow(window)
glfwTerminate()
#flushing queue incase world process did not exit gracefully
while not g_pool.pupil_queue.empty():
g_pool.pupil_queue.get()
g_pool.pupil_queue.close()
logger.debug("Process done")
def eye_profiled(g_pool,cap_src,cap_size):
import cProfile,subprocess,os
from eye import eye
cProfile.runctx("eye(g_pool,cap_src,cap_size)",{"g_pool":g_pool,'cap_src':cap_src,'cap_size':cap_size},locals(),"eye.pstats")
loc = os.path.abspath(__file__).rsplit('pupil_src', 1)
gprof2dot_loc = os.path.join(loc[0], 'pupil_src', 'shared_modules','gprof2dot.py')
subprocess.call("python "+gprof2dot_loc+" -f pstats eye.pstats | dot -Tpng -o eye_cpu_time.png", shell=True)
print "created cpu time graph for eye process. Please check out the png next to the eye.py file"
'''
(*)~----------------------------------------------------------------------------------
Pupil - eye tracking platform
Copyright (C) 2012-2014 Pupil Labs
Distributed under the terms of the CC BY-NC-SA License.
License details are in the file license.txt, distributed as part of this software.
----------------------------------------------------------------------------------~(*)
'''
import os, sys
import cv2
import atb
import numpy as np
from plugin import Plugin
from time import strftime,localtime,time,gmtime
from ctypes import create_string_buffer
from shutil import copy2
from glob import glob
from audio import Audio_Capture
#logging
import logging
logger = logging.getLogger(__name__)
class Recorder(Plugin):
"""Capture Recorder"""
def __init__(self,g_pool, session_str, fps, img_shape, record_eye, eye_tx,audio = -1):
Plugin.__init__(self)
self.g_pool = g_pool
self.session_str = session_str
self.record_eye = record_eye
self.frame_count = 0
self.timestamps = []
self.gaze_list = []
self.eye_tx = eye_tx
self.start_time = time()
session = os.path.join(self.g_pool.rec_dir, self.session_str)
try:
os.mkdir(session)
logger.debug("Created new recordings session dir %s"%session)
except:
logger.debug("Recordings session dir %s already exists, using it." %session)
# set up self incrementing folder within session folder
counter = 0
while True:
self.rec_path = os.path.join(session, "%03d/" % counter)
try:
os.mkdir(self.rec_path)
logger.debug("Created new recording dir %s"%self.rec_path)
break
except:
logger.debug("We dont want to overwrite data, incrementing counter & trying to make new data folder")
counter += 1
self.meta_info_path = os.path.join(self.rec_path, "info.csv")
with open(self.meta_info_path, 'w') as f:
f.write("Recording Name\t"+self.session_str+ "\n")
f.write("Start Date\t"+ strftime("%d.%m.%Y", localtime(self.start_time))+ "\n")
f.write("Start Time\t"+ strftime("%H:%M:%S", localtime(self.start_time))+ "\n")
if audio >=0:
audio_src = audio
audio_path = os.path.join(self.rec_path, "world.wav")
self.audio_writer = Audio_Capture(audio_src,audio_path)
else:
self.audio_writer = None
video_path = os.path.join(self.rec_path, "world.avi")
self.writer = cv2.VideoWriter(video_path, cv2.cv.CV_FOURCC(*'DIVX'), fps, (img_shape[1], img_shape[0]))
self.height = img_shape[0]
self.width = img_shape[1]
# positions path to eye process
if self.record_eye:
self.eye_tx.send(["start_record_eye",self.rec_path])
atb_pos = (10, 540)
self._bar = atb.Bar(name = self.__class__.__name__, label='REC: '+session_str,
help="capture recording control", color=(220, 0, 0), alpha=150,
text='light', position=atb_pos,refresh=.3, size=(300, 80))
self._bar.add_var("rec time",create_string_buffer(512), getter=lambda: create_string_buffer(self.get_rec_time_str(),512), readonly=True)
self._bar.add_button("stop",self.on_stop, key="s", help="stop recording")
self._bar.define("contained=true")
def get_rec_time_str(self):
rec_time = gmtime(time()-self.start_time)
return strftime("%H:%M:%S", rec_time)
def update(self,frame,recent_pupil_positions,events):
# cv2.putText(frame.img, "Frame %s"%self.frame_count,(200,200), cv2.FONT_HERSHEY_SIMPLEX,1,(255,100,100))
for p in recent_pupil_positions:
if p['norm_pupil'] is not None:
gaze_pt = p['norm_gaze'][0],p['norm_gaze'][1],p['norm_pupil'][0],p['norm_pupil'][1],p['timestamp'],p['confidence']
self.gaze_list.append(gaze_pt)
self.timestamps.append(frame.timestamp)
self.writer.write(frame.img)
self.frame_count += 1
def stop_and_destruct(self):
#explicit release of VideoWriter
self.writer.release()
self.writer = None
if self.record_eye:
try:
self.eye_tx.send(["stop_record_eye",None])
except:
logger.warning("Could not stop eye-recording. Please report this bug!")
gaze_list_path = os.path.join(self.rec_path, "gaze_positions.npy")
np.save(gaze_list_path,np.asarray(self.gaze_list))
timestamps_path = os.path.join(self.rec_path, "timestamps.npy")
np.save(timestamps_path,np.array(self.timestamps))
try:
copy2(os.path.join(self.g_pool.user_dir,"surface_definitions"),os.path.join(self.rec_path,"surface_definitions"))
except:
logger.info("No surface_definitions data found. You may want this if you do marker tracking.")
try:
copy2(os.path.join(self.g_pool.user_dir,"cal_pt_cloud.npy"),os.path.join(self.rec_path,"cal_pt_cloud.npy"))
except:
logger.warning("No calibration data found. Please calibrate first.")
try:
copy2(os.path.join(self.g_pool.user_dir,"camera_matrix.npy"),os.path.join(self.rec_path,"camera_matrix.npy"))
copy2(os.path.join(self.g_pool.user_dir,"dist_coefs.npy"),os.path.join(self.rec_path,"dist_coefs.npy"))
except:
logger.info("No camera intrinsics found.")
try:
copy2(os.path.join(self.g_pool.user_dir,"screen_cap_eye/"),os.path.join(self.rec_path,"screen_cap_eye/"))
except:
logger.info("No eye screen capture data found....")
try:
with open(self.meta_info_path, 'a') as f:
f.write("Duration Time\t"+ self.get_rec_time_str()+ "\n")
f.write("World Camera Frames\t"+ str(self.frame_count)+ "\n")
f.write("World Camera Resolution\t"+ str(self.width)+"x"+str(self.height)+"\n")
f.write("Capture Software Version\t"+ self.g_pool.version + "\n")
f.write("User\t"+os.getlogin()+"\n")
try:
sysname, nodename, release, version, machine = os.uname()
except:
sysname, nodename, release, version, machine = sys.platform,None,None,None,None
f.write("Platform\t"+sysname+"\n")
f.write("Machine\t"+nodename+"\n")
f.write("Release\t"+release+"\n")
f.write("Version\t"+version+"\n")
except Exception:
logger.exception("Could not save metadata. Please report this bug!")
if self.audio_writer:
self.audio_writer = None
self.alive = False
def on_stop(self):
"""
get called from _bar to init termination.
"""
self.alive= False
def cleanup(self):
"""gets called when the plugin get terminated.
either volunatily or forced.
"""
self.stop_and_destruct()
self._bar.destroy()
def get_auto_name():
return strftime("%Y_%m_%d", localtime())
'''
(*)~----------------------------------------------------------------------------------
Pupil - eye tracking platform
Copyright (C) 2012-2014 Pupil Labs
Distributed under the terms of the CC BY-NC-SA License.
License details are in the file license.txt, distributed as part of this software.
----------------------------------------------------------------------------------~(*)
'''
import os, sys
import numpy as np
from plugin import Plugin
#logging
import logging
logger = logging.getLogger(__name__)
class Screen_Cap_Eye(Plugin):
"""Screen Capture Eye"""
def __init__(self,g_pool,session_str,eye_tx):
Plugin.__init__(self)
self.g_pool = g_pool
self.session_str = session_str
self.apparent_pupil_size_list = []
self.eye_tx = eye_tx
self.frame_count = 0
# session = os.path.join(self.g_pool.rec_dir, self.session_str)
self.screen_cap_path = os.path.join(g_pool.user_dir,'screen_cap_eye')
# set up folder within user dir
try:
os.mkdir(self.screen_cap_path)
logger.debug("Created new screen capture dir %s"%self.screen_cap_path)
except:
logger.debug("Screen capture dir %s already exists, using it." %self.screen_cap_path)
self.eye_cap_path = os.path.join(self.screen_cap_path, "eye_cap.jpg")
# detector_list = ['confidence', 'ellipse', 'pos_in_roi', 'major', 'apparent_pupil_size', 'minor', 'axes', 'angle', 'norm_pupil', 'center', 'timestamp']
# self.eye_tx.send(["screen_cap_eye",self.eye_cap_path])
def update(self,frame,recent_pupil_positions,events):
# cv2.putText(frame.img, "Frame %s"%self.frame_count,(200,200), cv2.FONT_HERSHEY_SIMPLEX,1,(255,100,100))
for p in recent_pupil_positions:
if p['norm_pupil'] is not None:
apparent_pupil_size = p['apparent_pupil_size'],p['timestamp']
self.apparent_pupil_size_list.append(apparent_pupil_size)
print self.apparent_pupil_size_list
self.frame_count += 1
if self.frame_count > 0:
self.cleanup()
self.eye_tx.send(["screen_cap_eye",os.path.join(self.screen_cap_path, "eye_cap_%s.jpg" % self.apparent_pupil_size_list[0][1])])
self.frame_count = 0
def stop_and_destruct(self):
apparent_pupil_sizes_path = os.path.join(self.screen_cap_path, "apparent_pupil_sizes.npy")
np.save(apparent_pupil_sizes_path,np.asarray(self.apparent_pupil_size_list))
self.alive = False
def cleanup(self):
"""gets called when the plugin get terminated.
either volunatily or forced.
"""
self.stop_and_destruct()
'''
(*)~----------------------------------------------------------------------------------
Pupil - eye tracking platform
Copyright (C) 2012-2014 Pupil Labs
Distributed under the terms of the CC BY-NC-SA License.
License details are in the file license.txt, distributed as part of this software.
----------------------------------------------------------------------------------~(*)
'''
if __name__ == '__main__':
# make shared modules available across pupil_src
from sys import path as syspath
from os import path as ospath
loc = ospath.abspath(__file__).rsplit('pupil_src', 1)
syspath.append(ospath.join(loc[0], 'pupil_src', 'shared_modules'))
del syspath, ospath
import os, sys
from time import time
from file_methods import Persistent_Dict
import logging
from ctypes import c_int,c_bool,c_float,create_string_buffer
import numpy as np
#display
from glfw import *
import atb
# helpers/utils
from methods import normalize, denormalize,Temp
from gl_utils import basic_gl_setup,adjust_gl_view, clear_gl_screen, draw_gl_point_norm,make_coord_system_pixel_based,make_coord_system_norm_based,create_named_texture,draw_named_texture
from uvc_capture import autoCreateCapture, FileCaptureError, EndofVideoFileError, CameraCaptureError, FakeCapture
from audio import Audio_Input_List
import calibrate
# Plug-ins
import calibration_routines
import recorder
from show_calibration import Show_Calibration
from display_recent_gaze import Display_Recent_Gaze
from pupil_server import Pupil_Server
from pupil_remote import Pupil_Remote
from marker_detector import Marker_Detector
from screen_cap_eye import Screen_Cap_Eye
# create logger for the context of this function
logger = logging.getLogger(__name__)
def world(g_pool,cap_src,cap_size):
"""world
Creates a window, gl context.
Grabs images from a capture.
Receives Pupil coordinates from g_pool.pupil_queue
Can run various plug-ins.
"""
# Callback functions
def on_resize(window,w, h):
active_window = glfwGetCurrentContext()
glfwMakeContextCurrent(window)
norm_size = normalize((w,h),glfwGetWindowSize(window))
fb_size = denormalize(norm_size,glfwGetFramebufferSize(window))
atb.TwWindowSize(*map(int,fb_size))
adjust_gl_view(w,h,window)
glfwMakeContextCurrent(active_window)
for p in g_pool.plugins:
p.on_window_resize(window,w,h)
def on_iconify(window,iconfied):
if not isinstance(cap,FakeCapture):
g_pool.update_textures.value = not iconfied
def on_key(window, key, scancode, action, mods):
if not atb.TwEventKeyboardGLFW(key,action):
if action == GLFW_PRESS:
if key == GLFW_KEY_ESCAPE:
on_close(window)
def on_char(window,char):
if not atb.TwEventCharGLFW(char,1):
pass
def on_button(window,button, action, mods):
if not atb.TwEventMouseButtonGLFW(button,action):
pos = glfwGetCursorPos(window)
pos = normalize(pos,glfwGetWindowSize(world_window))
pos = denormalize(pos,(frame.img.shape[1],frame.img.shape[0]) ) # Position in img pixels
for p in g_pool.plugins:
p.on_click(pos,button,action)
def on_pos(window,x, y):
norm_pos = normalize((x,y),glfwGetWindowSize(window))
fb_x,fb_y = denormalize(norm_pos,glfwGetFramebufferSize(window))
if atb.TwMouseMotion(int(fb_x),int(fb_y)):
pass
def on_scroll(window,x,y):
if not atb.TwMouseWheel(int(x)):
pass
def on_close(window):
g_pool.quit.value = True
logger.info('Process closing from window')
# load session persistent settings
session_settings = Persistent_Dict(os.path.join(g_pool.user_dir,'user_settings_world'))
def load(var_name,default):
return session_settings.get(var_name,default)
def save(var_name,var):
session_settings[var_name] = var
# Initialize capture
cap = autoCreateCapture(cap_src, cap_size, 24, timebase=g_pool.timebase)
# Get an image from the grabber
try:
frame = cap.get_frame()
except CameraCaptureError:
logger.error("Could not retrieve image from capture")
cap.close()
return
height,width = frame.img.shape[:2]
# load last calibration data
try:
pt_cloud = np.load(os.path.join(g_pool.user_dir,'cal_pt_cloud.npy'))
logger.debug("Using calibration found in %s" %g_pool.user_dir)
map_pupil = calibrate.get_map_from_cloud(pt_cloud,(width,height))
except :
logger.debug("No calibration found.")
def map_pupil(vector):
""" 1 to 1 mapping """
return vector
# any object we attach to the g_pool object *from now on* will only be visible to this process!
# vars should be declared here to make them visible to the code reader.
g_pool.plugins = []
g_pool.map_pupil = map_pupil
g_pool.update_textures = c_bool(1)
if isinstance(cap,FakeCapture):
g_pool.update_textures.value = False
g_pool.capture = cap
g_pool.rec_name = recorder.get_auto_name()
# helpers called by the main atb bar
def update_fps():
old_time, bar.timestamp = bar.timestamp, time()
dt = bar.timestamp - old_time
if dt:
bar.fps.value += .05 * (1. / dt - bar.fps.value)
def set_window_size(mode,data):
height,width = frame.img.shape[:2]
ratio = (1,.75,.5,.25)[mode]
w,h = int(width*ratio),int(height*ratio)
glfwSetWindowSize(world_window,w,h)
data.value=mode # update the bar.value
def get_from_data(data):
"""
helper for atb getter and setter use
"""
return data.value
def set_rec_dir(val):
try:
n_path = os.path.expanduser(val.value)
logger.debug("Expanded user path.")
except:
n_path = val.value
if not n_path:
logger.warning("Please specify a path.")
elif not os.path.isdir(n_path):
logger.warning("This is not a valid path.")
else:
g_pool.rec_dir = n_path
def get_rec_dir():
return create_string_buffer(g_pool.rec_dir,512)
def set_rec_name(val):
if not val.value:
g_pool.rec_name = recorder.get_auto_name()
else:
g_pool.rec_name = val.value
def get_rec_name():
return create_string_buffer(g_pool.rec_name,512)
def open_calibration(selection,data):
# prepare destruction of current ref_detector... and remove it
for p in g_pool.plugins:
if isinstance(p,calibration_routines.detector_by_index):
p.alive = False
g_pool.plugins = [p for p in g_pool.plugins if p.alive]
new_ref_detector = calibration_routines.detector_by_index[selection](g_pool,atb_pos=bar.next_atb_pos)
g_pool.plugins.append(new_ref_detector)
g_pool.plugins.sort(key=lambda p: p.order)
# save the value for atb bar
data.value=selection
def toggle_record_video():
for p in g_pool.plugins:
if isinstance(p,recorder.Recorder):
p.alive = False
return
new_plugin = recorder.Recorder(g_pool,g_pool.rec_name, bar.fps.value, frame.img.shape, bar.record_eye.value, g_pool.eye_tx,bar.audio.value)
g_pool.plugins.append(new_plugin)
g_pool.plugins.sort(key=lambda p: p.order)
def toggle_show_calib_result():
for p in g_pool.plugins:
if isinstance(p,Show_Calibration):
p.alive = False
return
new_plugin = Show_Calibration(g_pool,frame.img.shape)
g_pool.plugins.append(new_plugin)
g_pool.plugins.sort(key=lambda p: p.order)
def toggle_server():
for p in g_pool.plugins:
if isinstance(p,Pupil_Server):
p.alive = False
return
new_plugin = Pupil_Server(g_pool,(10,300))
g_pool.plugins.append(new_plugin)
g_pool.plugins.sort(key=lambda p: p.order)
def toggle_remote():
for p in g_pool.plugins:
if isinstance(p,Pupil_Remote):
p.alive = False
return
new_plugin = Pupil_Remote(g_pool,(10,360),on_char)
g_pool.plugins.append(new_plugin)
g_pool.plugins.sort(key=lambda p: p.order)
def toggle_ar():
for p in g_pool.plugins:
if isinstance(p,Marker_Detector):
p.alive = False
return
new_plugin = Marker_Detector(g_pool,(10,400))
g_pool.plugins.append(new_plugin)
g_pool.plugins.sort(key=lambda p: p.order)
def reset_timebase():
#the last frame from worldcam will be t0
g_pool.timebase.value = g_pool.capture.get_now()
logger.info("New timebase set to %s all timestamps will count from here now."%g_pool.timebase.value)
def screen_cap_eye():
for p in g_pool.plugins:
if isinstance(p,Screen_Cap_Eye):
p.alive = False
return
new_plugin = Screen_Cap_Eye(g_pool, g_pool.rec_name, g_pool.eye_tx)
g_pool.plugins.append(new_plugin)
g_pool.plugins.sort(key=lambda p: p.order)
atb.init()
# add main controls ATB bar
bar = atb.Bar(name = "World", label="Controls",
help="Scene controls", color=(50, 50, 50), alpha=100,valueswidth=150,
text='light', position=(10, 10),refresh=.3, size=(300, 200))
bar.next_atb_pos = (10,220)
bar.fps = c_float(0.0)
bar.timestamp = time()
bar.calibration_type = c_int(load("calibration_type",0))
bar.record_eye = c_bool(load("record_eye",0))
bar.audio = c_int(load("audio",-1))
bar.window_size = c_int(load("window_size",0))
window_size_enum = atb.enum("Display Size",{"Full":0, "Medium":1,"Half":2,"Mini":3})
calibrate_type_enum = atb.enum("Calibration Method",calibration_routines.index_by_name)
audio_enum = atb.enum("Audio Input",dict(Audio_Input_List()))
bar.version = create_string_buffer(g_pool.version,512)
bar.add_var("fps", bar.fps, step=1., readonly=True, help="Refresh speed of this process. Especially during recording it should not drop below the camera set frame rate.")
bar.add_var("display size", vtype=window_size_enum,setter=set_window_size,getter=get_from_data,data=bar.window_size,help="Resize the world window. This has no effect on the actual image.")
bar.add_var("calibration method",setter=open_calibration,getter=get_from_data,data=bar.calibration_type, vtype=calibrate_type_enum,group="Calibration", help="Please choose your desired calibration method.")
bar.add_button("show calibration result",toggle_show_calib_result, group="Calibration", help="Click to show calibration result.")
bar.add_var("rec dir",create_string_buffer(512),getter = get_rec_dir,setter= set_rec_dir, group="Recording", help="Specify the recording path")
bar.add_var("session name",create_string_buffer(512),getter = get_rec_name,setter= set_rec_name, group="Recording", help="Give your recording session a custom name.")
bar.add_button("record", toggle_record_video, key="r", group="Recording", help="Start/Stop Recording")
bar.add_var("record eye", bar.record_eye, group="Recording", help="check to save raw video of eye")
bar.add_var("record audio", bar.audio, vtype=audio_enum, group="Recording", help="Select from audio recording options.")
bar.add_button("start/stop marker tracking",toggle_ar,key="x",help="find markers in scene to map gaze onto referace surfaces")
bar.add_button("start/stop server",toggle_server,key="s",help="the server broadcasts pupil and gaze positions locally or via network")
bar.add_button("start/stop remote",toggle_remote,key="w",help="remote allows seding commad to pupil via network")
bar.add_button("set timebase to now",reset_timebase,help="this button allows the timestamps to count from now on.",key="t")
bar.add_var("update screen", g_pool.update_textures,help="if you dont need to see the camera image updated, you can turn this of to reduce CPU load.")
bar.add_button("screen cap eye", screen_cap_eye, help="Take a screen capture of the eye window and save gaze information.")
bar.add_separator("Sep1")
bar.add_var("version",bar.version, readonly=True)
bar.add_var("exit", g_pool.quit)
# add uvc camera controls ATB bar
cap.create_atb_bar(pos=(320,10))
# Initialize glfw
glfwInit()
world_window = glfwCreateWindow(width, height, "World", None, None)
glfwMakeContextCurrent(world_window)
# Register callbacks world_window
glfwSetWindowSizeCallback(world_window,on_resize)
glfwSetWindowCloseCallback(world_window,on_close)
glfwSetWindowIconifyCallback(world_window,on_iconify)
glfwSetKeyCallback(world_window,on_key)
glfwSetCharCallback(world_window,on_char)
glfwSetMouseButtonCallback(world_window,on_button)
glfwSetCursorPosCallback(world_window,on_pos)
glfwSetScrollCallback(world_window,on_scroll)
#set the last saved window size
set_window_size(bar.window_size.value,bar.window_size)
on_resize(world_window, *glfwGetWindowSize(world_window))
glfwSetWindowPos(world_window,0,0)
# gl_state settings
basic_gl_setup()
g_pool.image_tex = create_named_texture(frame.img)
# refresh speed settings
glfwSwapInterval(0)
#load calibration plugin
open_calibration(bar.calibration_type.value,bar.calibration_type)
#load gaze_display plugin
g_pool.plugins.append(Display_Recent_Gaze(g_pool))
# Event loop
while not g_pool.quit.value:
# Get an image from the grabber
try:
frame = cap.get_frame()
except CameraCaptureError:
logger.error("Capture from Camera Failed. Stopping.")
break
except EndofVideoFileError:
logger.warning("Video File is done. Stopping")
break
update_fps()
#a container that allows plugins to post and read events
events = []
#receive and map pupil positions
recent_pupil_positions = []
while not g_pool.pupil_queue.empty():
p = g_pool.pupil_queue.get()
if p['norm_pupil'] is None:
p['norm_gaze'] = None
else:
p['norm_gaze'] = g_pool.map_pupil(p['norm_pupil'])
recent_pupil_positions.append(p)
# allow each Plugin to do its work.
for p in g_pool.plugins:
p.update(frame,recent_pupil_positions,events)
#check if a plugin need to be destroyed
g_pool.plugins = [p for p in g_pool.plugins if p.alive]
# render camera image
glfwMakeContextCurrent(world_window)
make_coord_system_norm_based()
if g_pool.update_textures.value:
draw_named_texture(g_pool.image_tex,frame.img)
else:
draw_named_texture(g_pool.image_tex)
make_coord_system_pixel_based(frame.img.shape)
# render visual feedback from loaded plugins
for p in g_pool.plugins:
p.gl_display()
atb.draw()
glfwSwapBuffers(world_window)
glfwPollEvents()
# de-init all running plugins
for p in g_pool.plugins:
p.alive = False
#reading p.alive actually runs plug-in cleanup
_ = p.alive
save('window_size',bar.window_size.value)
save('calibration_type',bar.calibration_type.value)
save('record_eye',bar.record_eye.value)
save('audio',bar.audio.value)
session_settings.close()
cap.close()
atb.terminate()
glfwDestroyWindow(world_window)
glfwTerminate()
logger.debug("Process done")
def world_profiled(g_pool,cap_src,cap_size):
import cProfile,subprocess,os
from world import world
cProfile.runctx("world(g_pool,cap_src,cap_size)",{"g_pool":g_pool,'cap_src':cap_src,'cap_size':cap_size},locals(),"world.pstats")
loc = os.path.abspath(__file__).rsplit('pupil_src', 1)
gprof2dot_loc = os.path.join(loc[0], 'pupil_src', 'shared_modules','gprof2dot.py')
subprocess.call("python "+gprof2dot_loc+" -f pstats world.pstats | dot -Tpng -o world_cpu_time.png", shell=True)
print "created cpu time graph for world process. Please check out the png next to the world.py file"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment