Skip to content

Instantly share code, notes, and snippets.

Last active September 19, 2024 12:14
Show Gist options
  • Save patrickelectric/443645bb0fd6e71b34c504d20d475d5a to your computer and use it in GitHub Desktop.
Save patrickelectric/443645bb0fd6e71b34c504d20d475d5a to your computer and use it in GitHub Desktop.
Get video from gstreamer udp with python and visualize with OpenCV
#!/usr/bin/env python
import cv2
import gi
import numpy as np
gi.require_version('Gst', '1.0')
from gi.repository import Gst
class Video():
"""BlueRov video capture class constructor
port (int): Video UDP port
video_codec (string): Source h264 parser
video_decode (string): Transform YUV (12bits) to BGR (24bits)
video_pipe (object): GStreamer top-level pipeline
video_sink (object): Gstreamer sink element
video_sink_conf (string): Sink configuration
video_source (string): Udp source ip and port
def __init__(self, port=5600):
port (int, optional): UDP port
self.port = port
self._frame = None
# [Software component diagram](
# UDP video stream (:5600)
self.video_source = 'udpsrc port={}'.format(self.port)
# [Rasp raw image](
# Cam -> CSI-2 -> H264 Raw (YUV 4-4-4 (12bits) I420)
self.video_codec = '! application/x-rtp, payload=96 ! rtph264depay ! h264parse ! avdec_h264'
# Python don't have nibble, convert YUV nibbles (4-4-4) to OpenCV standard BGR bytes (8-8-8)
self.video_decode = \
'! decodebin ! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert'
# Create a sink to get data
self.video_sink_conf = \
'! appsink emit-signals=true sync=false max-buffers=2 drop=true'
self.video_pipe = None
self.video_sink = None
def start_gst(self, config=None):
""" Start gstreamer pipeline and sink
Pipeline description list e.g:
'videotestsrc ! decodebin', \
'! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert',
'! appsink'
config (list, optional): Gstreamer pileline description list
if not config:
config = \
'videotestsrc ! decodebin',
'! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert',
'! appsink'
command = ' '.join(config)
self.video_pipe = Gst.parse_launch(command)
self.video_sink = self.video_pipe.get_by_name('appsink0')
def gst_to_opencv(sample):
"""Transform byte array into np array
sample (TYPE): Description
TYPE: Description
buf = sample.get_buffer()
caps = sample.get_caps()
array = np.ndarray(
buffer=buf.extract_dup(0, buf.get_size()), dtype=np.uint8)
return array
def frame(self):
""" Get Frame
iterable: bool and image frame, output
return self._frame
def frame_available(self):
"""Check if frame is available
bool: true if frame is available
return type(self._frame) != type(None)
def run(self):
""" Get frame to update _frame
self.video_sink.connect('new-sample', self.callback)
def callback(self, sink):
sample = sink.emit('pull-sample')
new_frame = self.gst_to_opencv(sample)
self._frame = new_frame
return Gst.FlowReturn.OK
if __name__ == '__main__':
# Create the video object
# Add port= if is necessary to use a different one
video = Video()
while True:
# Wait for the next frame
if not video.frame_available():
frame = video.frame()
cv2.imshow('frame', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
Copy link

check the rtsp server, it may help you with that:

Copy link

thx for your answer! ok I have already the source of the udp video I need to receive it and stream by a web page is that possible with your script?

Copy link

sunnyjocker commented Apr 30, 2020

you saved my ass, bro. i can finally devote myself into some vision study from the remote video. thank you so much.

Copy link

thanks man :)

Copy link

Thank you!!!

Copy link

HanJIk commented Dec 28, 2020

I have one question.
With the code above, I can see the ROV camera control video in a different window instead of qgc.
I want to measure the frame size and FPS of the video coming into the window.
Is there any way??

Copy link

I have one question.
With the code above, I can see the ROV camera control video in a different window instead of qgc.
I want to measure the frame size and FPS of the video coming into the window.
Is there any way??

Check the height and width variables, for frame rate check the callback frequency.

Copy link

HanJIk commented Dec 28, 2020

Can I do this??
while True:

Wait for the next frame

if not video.frame_available():

fps check

** curTime = time.time()**
** sec = curTime - prevTime**
** prevTime = curTime**

** fps = 1/(sec)**
** #end**
frame = video.frame()
cv2.imshow(‘frame’, frame)
if cv2.waitKey(1) & 0xFF == ord(‘q’):

Copy link

What is the command used on the Pi to broadcast the video?

Copy link

HanJIk commented Jan 4, 2021 via email

Copy link

grant7788 commented Jan 15, 2021

I encountered an error:
buffer=buf.extract_dup(0, buf.getsize()), dtype=np.uint8)
TypeError: buffer is too small for requested array
Thank you so much.

(checked caps.get_structure(0).get_value('height'), get the right number of video height.)

Copy link

Hello @patrickelectric,

This looks like exactly what I'm looking for, except that nothing happens when I run your code.
On my jetson nano I run:

gst-launch-1.0 -v v4l2src device=/dev/video1 ! "image/jpeg,width=1280, height=720, framerate=30/1" ! rtpjpegpay ! udpsink host= port=5600

and I can perfectly receive it on my PC if I run

gst-launch-1.0 -e -v udpsrc port=5600 ! application/x-rtp, encoding-name=JPEG, payload=26 ! rtpjpegdepay ! jpegdec ! autovideosink

but I would like to work on the images with python before displaying them, which led me to find your code.
However, when I run your code, nothing happens.

As everyone else seems to enjoy your work, I'm wondering if you could give me a hint as to what I'm missing by chance?

To summerize, I have a camera plugged to a jetson nano, and I'm trying to stream the video feed through the network towards another computer (a Jetson Xavier) that should receive the images, apply some image treatment and display them on screen.

Any help would be greatly appreciated.

Copy link

@lweingart the example runs with h264, you'll need to adapt it to your webcam and colorspace.

Copy link

Hi, thank you for your quick reply !

So if I understand what you said correctly, I need to somehow incorporate what works for me in the receiver command:
gst-launch-1.0 -e -v udpsrc port=5600 ! application/x-rtp, encoding-name=JPEG, payload=26 ! rtpjpegdepay ! jpegdec ! autovideosink into your code, right ?

Let me try that.
I have to admit, it took me already so much time just to make this udp stream to work, there are so many options to use with gstreamer, it's a real headache.

Thank you again for your availability

Copy link

Hello again @patrickelectric,

Just out of curiosity and to further my understanding, could you please tell me what command on the computer sending the images would work with your code as it is ?

Thank you.

Copy link

doman93 commented Apr 12, 2021

@patrickelectric I tried to rewrite your code in C++ but it doesn't work. Can you help me to figure this out

int main(int argc, char** argv)
     VideoCapture video("udpsrc port=5600 ! application/x-rtp, payload=96 ! rtph264depay ! h264parse ! avdec_h264"
             " ! decodebin ! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert! appsink emit-signals=true sync=false max-buffers=2 drop=true", CAP_GSTREAMER);
       while (!video.isOpened()) {
        cerr <<"VideoCapture not opened"<<endl;
    while (true) {

        Mat frame;;

        imshow("Camera", frame);

        if (waitKey(1) == 27) {
    return 0;

Your code Python works perfectly in my case, but somehow C++ stuck. Build file uses the default OpenCV 3.3 of Ros kinetic

Copy link

Copy link

I have tried to run your code but faced this error,

Traceback (most recent call last):
File "/home/shirin/Downloads/depth/", line 158, in
video = Video()
File "/home/shirin/Downloads/depth/", line 67, in init
File "/home/shirin/Downloads/depth/", line 145, in run
self.video_sink.connect('new-sample', self.callback)
TypeError: <gi.GstAutoVideoSink object at 0x7f9cc7c2b100 (GstAutoVideoSink at 0x34b6020)>: unknown signal name: new-sample

I was wondering if you could share your idea!

Copy link

You are a savior! Thanks a lot for sharing this.

Copy link

Can you please comment on the issue I am having here?

Copy link

Found the fix and posted

Copy link

swcho84 commented Dec 22, 2022

You are my Hero!

Copy link

madjxatw commented Jul 26, 2023

Thanks for sharing. What I am currently interesting is how to reduce the CPU usage (12%~17% CPU on my intel i9-11900k machine). I've used OpenCV cv2.VideoCapture(stream_url, cv2.CAP_GSTREAMER) that uses GStreamer as video capturing backend and it only consumed around 3%~5% CPU, I have no idea what magic behind in OpenCV.

Copy link

Thanks for sharing. What I am currently interesting is how to reduce the CPU usage (12%~17% CPU on my intel i9-11900k machine). I've used OpenCV cv2.VideoCapture(stream_url, cv2.CAP_GSTREAMER) that uses GStreamer as video capturing backend and it only consumed around 3%~5% CPU, I have no idea what magic behind in OpenCV.

Hi, this example was projected to work in any scenario. Decodebin will try to be smart to select the best decode for you, but it usually fail. Try to use decodebin3 or choosing your decoder manually to use what your hardware provide.

Copy link

I am currently not using decodebin in my pipeline but it still works. Any benefit if I insert a decodebin element into the pipeline?

Copy link

anselmobattisti commented Jul 27, 2023

I am currently not using decodebin in my pipeline but it still works. Any benefit if I insert a decodebin element into the pipeline?

Plugin documentation:

"GstBin that auto-magically constructs a decoding pipeline using available decoders and demuxers via auto-plugging."

Using a decodebin your pipeline will be more resilient regarding on the ingress multimedia stream. If you only will have a single type of multimedia stream, and its working, them your pipeline is safe.

Copy link

patrickelectric commented Jul 27, 2023 via email

Copy link

@patrickelectric, thanks for sharing!

Copy link

Hi, patrickelectric, is it possible to have the same functionality using tcpserver on gstreamer?

Copy link

Yes, take a look in tcpclientsrc and tcpserversrc

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment