Created
June 20, 2018 19:34
-
-
Save yoelk/9d57fac416fea8017e0c233dffaf67a5 to your computer and use it in GitHub Desktop.
modified tis-camera python example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy | |
import gi | |
from collections import namedtuple | |
import cv2 | |
gi.require_version("Gst", "1.0") | |
gi.require_version("Tcam", "0.1") | |
from gi.repository import Tcam, Gst, GLib, GObject | |
class TIS(object): | |
'The Imaging Source Camera' | |
def __init__(self, serial, width, height, numerator, denumerator, color): | |
Gst.init([]) | |
self.height = height | |
self.width = width | |
self.sample = None | |
self.samplelocked = False | |
self.newsample = False | |
format = "BGRx" | |
if (color == False): | |
format = "GRAY8" | |
p = 'tcambin serial="%s" name=source ! video/x-raw,format=%s,width=%d,height=%d,framerate=%d/%d' % ( | |
serial, format, width, height, numerator, denumerator,) | |
p += ' ! videoconvert ! appsink name=sink' | |
print(p) | |
try: | |
self.pipeline = Gst.parse_launch(p) | |
except GLib.Error as error: | |
print("Error creating pipeline: {0}".format(err)) | |
raise | |
self.pipeline.set_state(Gst.State.READY) | |
self.pipeline.get_state(Gst.CLOCK_TIME_NONE) | |
# Query a pointer to our source, so we can set properties. | |
self.source = self.pipeline.get_by_name("source") | |
# Query a pointer to the appsink, so we can assign the callback function. | |
self.appsink = self.pipeline.get_by_name("sink") | |
self.appsink.set_property("max-buffers", 5) | |
self.appsink.set_property("drop", 1) | |
self.appsink.set_property("emit-signals", 1) | |
self.appsink.connect('new-sample', self.on_new_buffer) | |
def on_new_buffer(self, appsink): | |
self.newsample = True | |
if (self.samplelocked == False): | |
try: | |
self.sample = appsink.get_property('last-sample') | |
except GLib.Error as error: | |
print("Error on_new_buffer pipeline: {0}".format(err)) | |
raise | |
return False | |
def Start_pipeline(self): | |
try: | |
self.pipeline.set_state(Gst.State.PLAYING) | |
self.pipeline.get_state(Gst.CLOCK_TIME_NONE) | |
except GLib.Error as error: | |
print("Error starting pipeline: {0}".format(err)) | |
raise | |
def Get_image(self): | |
# Sample code from https://gist.github.com/cbenhagen/76b24573fa63e7492fb6#file-gst-appsink-opencv-py-L34 | |
if (self.sample != None and self.newsample == True): | |
self.samplelocked = True | |
buf = self.sample.get_buffer() | |
caps = self.sample.get_caps() | |
bpp = 4; | |
if (caps.get_structure(0).get_value('format') == "BGRx"): | |
bpp = 4; | |
if (caps.get_structure(0).get_value('format') == "GRAY8"): | |
bpp = 1; | |
self.img_mat = numpy.ndarray( | |
(caps.get_structure(0).get_value('height'), | |
caps.get_structure(0).get_value('width'), | |
bpp), | |
buffer=buf.extract_dup(0, buf.get_size()), | |
dtype=numpy.uint8) | |
self.newsample = False | |
self.samplelocked = False | |
return self.img_mat | |
return None | |
def Stop_pipeline(self): | |
self.pipeline.set_state(Gst.State.PAUSED) | |
self.pipeline.set_state(Gst.State.READY) | |
self.pipeline.set_state(Gst.State.NULL) | |
Gst.init([]) | |
serial = "20810358" | |
################################################ | |
# ADDING THESE TWO LINES MAKES THE CODE NOT WORK | |
tcam1 = Gst.ElementFactory.make('tcamsrc') | |
tcam1.set_property("serial", "20810358") | |
################################################ | |
LeftCam = TIS(serial, 640, 480, 15, 1, False) | |
cv2.namedWindow('Left Camera', 1) | |
# Start the pipeline | |
LeftCam.Start_pipeline() | |
cv2.waitKey(1000) | |
error = 0 | |
print('Press Esc to stop') | |
lastkey = 0 | |
try: | |
while (lastkey != 27): | |
# Grab an image | |
LeftImage = LeftCam.Get_image() | |
if LeftImage is not None: | |
error = 0 | |
# cv2.imwrite('img.jpg' , LeftImage) | |
# show the image | |
cv2.imshow('Left Camera', LeftImage) | |
else: | |
print("No left image reveived ") | |
error = error + 1 | |
# wait. The wait period should be somewhat longer than 1000 / fps. | |
lastkey = cv2.waitKey(67) # 67 = 1000 / 15 fps | |
except KeyboardInterrupt: | |
cv2.destroyWindow('Window') | |
# Stop the pipeline and clean ip | |
LeftCam.Stop_pipeline() | |
print('Program ended') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment