-
-
Save eyllanesc/6486dc26eebb1f1b71469959d086a649 to your computer and use it in GitHub Desktop.
import asyncio | |
from functools import cached_property, partial | |
from PyQt6.QtCore import ( | |
pyqtSlot as Slot, | |
) | |
from utils import FrameProvider | |
import cv2 | |
def request_frame(video_capture): | |
if not video_capture.isOpened(): | |
return | |
ret, frame = video_capture.read() | |
if ret: | |
return cv2.cvtColor(frame, cv2.COLOR_RGB2RGBA) | |
class OpenCVAsyncProvider(FrameProvider): | |
@cached_property | |
def video_capture(self): | |
return cv2.VideoCapture(0) | |
@Slot() | |
def start(self): | |
asyncio.ensure_future(self.start_request()) | |
async def start_request(self): | |
loop = asyncio.get_running_loop() | |
while True: | |
image_frame = await loop.run_in_executor( | |
None, partial(request_frame, self.video_capture) | |
) | |
self.write_frame(image_frame) |
import QtQuick | |
import QtQuick.Controls | |
import QtMultimedia | |
ApplicationWindow { | |
id: root | |
width: 640 | |
height: 480 | |
visible: true | |
VideoOutput { | |
id: videoOutput | |
anchors.fill: parent | |
Component.onCompleted: function() { | |
frame_provider.video_sink = videoOutput.videoSink; | |
frame_provider.start(); | |
} | |
} | |
} |
import asyncio | |
from functools import partial | |
import sys | |
from PyQt6.QtCore import Qt | |
from PyQt6.QtWidgets import QApplication, QGraphicsScene, QGraphicsView | |
from PyQt6.QtMultimediaWidgets import QGraphicsVideoItem | |
import qasync | |
from asyncprovider import OpenCVAsyncProvider | |
def main(): | |
app = QApplication(sys.argv) | |
loop = qasync.QEventLoop(app) | |
asyncio.set_event_loop(loop) | |
scene = QGraphicsScene() | |
view = QGraphicsView(scene) | |
view.resize(640, 480) | |
view.show() | |
video_item = QGraphicsVideoItem() | |
scene.addItem(video_item) | |
frame_provider = OpenCVAsyncProvider() | |
frame_provider.video_sink = video_item.videoSink() | |
frame_provider.start() | |
video_item.nativeSizeChanged.connect( | |
partial(view.fitInView, video_item, Qt.AspectRatioMode.KeepAspectRatio) | |
) | |
with loop: | |
try: | |
loop.run_forever() | |
except asyncio.exceptions.CancelledError: | |
print("OK") | |
if __name__ == "__main__": | |
main() |
import asyncio | |
from pathlib import Path | |
import os | |
import sys | |
from PyQt6.QtCore import QCoreApplication, Qt, QUrl | |
from PyQt6.QtGui import QGuiApplication | |
from PyQt6.QtQml import QQmlApplicationEngine | |
import qasync | |
from asyncprovider import OpenCVAsyncProvider | |
CURRENT_DIRECTORY = Path(__file__).resolve().parent | |
def main(): | |
app = QGuiApplication(sys.argv) | |
loop = qasync.QEventLoop(app) | |
asyncio.set_event_loop(loop) | |
frame_provider = OpenCVAsyncProvider() | |
engine = QQmlApplicationEngine() | |
engine.rootContext().setContextProperty("frame_provider", frame_provider) | |
filename = os.fspath(CURRENT_DIRECTORY / "main.qml") | |
url = QUrl.fromLocalFile(filename) | |
def handle_object_created(obj, obj_url): | |
if obj is None and url == obj_url: | |
QCoreApplication.exit(-1) | |
engine.objectCreated.connect( | |
handle_object_created, Qt.ConnectionType.QueuedConnection | |
) | |
engine.load(url) | |
with loop: | |
try: | |
loop.run_forever() | |
except asyncio.exceptions.CancelledError: | |
print("OK") | |
if __name__ == "__main__": | |
main() |
import asyncio | |
import sys | |
from PyQt6.QtWidgets import QApplication | |
from PyQt6.QtMultimediaWidgets import QVideoWidget | |
import qasync | |
from asyncprovider import OpenCVAsyncProvider | |
def main(): | |
app = QApplication(sys.argv) | |
loop = qasync.QEventLoop(app) | |
asyncio.set_event_loop(loop) | |
w = QVideoWidget() | |
w.resize(640, 480) | |
w.show() | |
frame_provider = OpenCVAsyncProvider() | |
frame_provider.video_sink = w.videoSink() | |
frame_provider.start() | |
with loop: | |
try: | |
loop.run_forever() | |
except asyncio.exceptions.CancelledError: | |
print("OK") | |
if __name__ == "__main__": | |
main() |
import ctypes | |
from dataclasses import dataclass | |
from functools import cached_property | |
from PyQt6.QtCore import ( | |
QObject, | |
QSize, | |
pyqtSignal as Signal, | |
pyqtProperty as Property, | |
) | |
from PyQt6.QtGui import QImage | |
from PyQt6.QtMultimedia import QVideoFrame, QVideoFrameFormat | |
import numpy as np | |
def write_qvideoframe_from_ndarray(video_frame, np_image, with_ctypes=True): | |
plane = 0 | |
data = video_frame.bits(plane) | |
assert np_image.size == video_frame.mappedBytes(plane), "Must be same size!!!" | |
if with_ctypes: | |
ctypes.memmove( | |
(ctypes.c_ubyte * len(data)).from_buffer(data) | |
if isinstance(data, memoryview) | |
else int(data), | |
np_image.ctypes.data, | |
video_frame.mappedBytes(plane), | |
) | |
else: | |
if not isinstance(data, memoryview): | |
data.setsize(video_frame.mappedBytes(plane)) | |
data[:] = bytearray(np_image) | |
def build_video_frame(size): | |
video_frame_format = QVideoFrameFormat.PixelFormat.Format_BGRA8888 | |
video_frame = QVideoFrame(QVideoFrameFormat(size, video_frame_format)) | |
if not video_frame.isValid() or not video_frame.map(QVideoFrame.MapMode.WriteOnly): | |
raise RuntimeError(f"QVideoFrame is invalid or not writable") | |
return video_frame | |
def convert_qvideoframe_to_qimage(video_frame): | |
image_format = QVideoFrameFormat.imageFormatFromPixelFormat( | |
video_frame.pixelFormat() | |
) | |
if image_format == QImage.Format.Format_Invalid: | |
print("Invalid format") | |
return | |
plane = 0 | |
ptr = video_frame.bits(plane) | |
image = QImage( | |
ptr if isinstance(ptr, memoryview) else int(ptr), | |
video_frame.width(), | |
video_frame.height(), | |
image_format, | |
) | |
return image | |
@dataclass(frozen=True) | |
class _QVideoFrameInterface: | |
video_frame: QVideoFrame | |
plane: int = 0 | |
@cached_property | |
def __array_interface__(self): | |
data = self.video_frame.bits(self.plane) | |
if not isinstance(data, memoryview): | |
data.setsize(self.video_frame.mappedBytes(self.plane)) | |
return dict( | |
shape=(self.video_frame.height(), self.video_frame.width(), 4), | |
typestr="|u1", | |
data=data, | |
strides=(self.video_frame.bytesPerLine(self.plane), 4, 1), | |
version=3, | |
) | |
def convert_qvideoframe_to_numpy(video_frame): | |
return np.asarray(_QVideoFrameInterface(video_frame)) | |
@dataclass(frozen=True) | |
class _QImageInterface: | |
image: QImage | |
@cached_property | |
def __array_interface__(self): | |
data = self.image.bits() | |
if not isinstance(data, memoryview): | |
data.setsize(self.image.sizeInBytes()) | |
return dict( | |
shape=(self.image.height(), self.image.width(), 4), | |
typestr="|u1", | |
data=data, | |
strides=(self.image.bytesPerLine(), 4, 1), | |
version=3, | |
) | |
def convert_qimage_to_numpy(video_frame): | |
return np.asarray(_QImageInterface(video_frame)) | |
class FrameProvider(QObject): | |
video_sink_changed = Signal() | |
def __init__(self, parent=None): | |
super().__init__(parent) | |
self._video_sink = None | |
@Property(QObject, notify=video_sink_changed) | |
def video_sink(self): | |
return self._video_sink | |
@video_sink.setter | |
def video_sink(self, video_sink): | |
if self.video_sink is not None: | |
self.video_sink.destroyed.disconnect(self._handle_destroyed) | |
if self.video_sink.parent() is self: | |
self.video_sink.setParent(None) | |
self.video_sink.deleteLater() | |
self._video_sink = video_sink | |
video_sink.destroyed.connect(self._handle_destroyed) | |
self.video_sink_changed.emit() | |
def write_frame(self, image_frame): | |
if ( | |
self.video_sink is None | |
or image_frame is None | |
or len(image_frame.shape) != 3 | |
): | |
print("video_sink or image_format is None") | |
return | |
height, width, _ = image_frame.shape | |
try: | |
video_frame = build_video_frame(QSize(width, height)) | |
except RuntimeError: | |
pass | |
else: | |
write_qvideoframe_from_ndarray(video_frame, image_frame, with_ctypes=True) | |
video_frame.unmap() | |
self.video_sink.setVideoFrame(video_frame) | |
def _handle_destroyed(self): | |
self._video_sink = None |
Has your example been run recently using Python 3.10 and PySide6 6.2.2? I've opened a test project using your code and it works but it is very laggy and slow.
@donbfry If you comment the logic of the barcode is it still slow? If it isn't then you should optimize that part, for example processing every N frames instead of every frame. Also if your image is very large you could scale it.
The barcode logic is the culprit.
I have an implementation of using pyzbar in my PySide6 project along side OpenCV that works. The issue is OpenCV cannot fetch the "friendly name" of the available input devices like Qt can, only the ID which I believe is due to OpenCV using FFMPEG (which is technically fine, works great, just doesn't look as pretty). I was just trying out your approach to see if I could implement it successfully and came across that issue. I'm actually not sure why the barcode logic affects performance so heavily. My barcode decoding logic is essentially the same although I use a QThread, QGraphicsPixMapItem on a QGraphicsView, and a custom Signal.
import sys
import os
import cv2
import warnings
from PySide6 import QtCore, QtMultimedia
from PySide6.QtCore import QEvent, QPoint, QThread, Qt, Signal, Slot
from PySide6.QtGui import QIcon, QImage, QPixmap
from PySide6.QtWidgets import (
QApplication,
QGraphicsPixmapItem,
QGraphicsScene,
QMainWindow,
QStackedLayout,
)
import numpy as np
from pyzbar import pyzbar
from pyzbar.wrapper import ZBarSymbol
from modules import *
os.environ["QT_FONT_DPI"] = "96"
warnings.filterwarnings("ignore")
class MainWindow(QMainWindow):
def __init__(self):
QMainWindow.__init__(self)
self.ui = Ui_MainWindow()
self.ui.setupUi(self)
UIFunctions.uiDefinitions(self)
self.scene = QGraphicsScene(self)
self.pix = QGraphicsPixmapItem()
self.ui.appFrameLayout.setStackingMode(QStackedLayout.StackAll)
self.ui.graphicsView.setScene(self.scene)
self.scene.addItem(self.pix)
self.cap_thread = CaptureThread()
self.cap_thread.change_pixmap_signal.connect(self.update_image)
self.cap_thread.start()
self.setWindowFlag(Qt.WindowStaysOnTopHint)
self.setWindowFlag(Qt.FramelessWindowHint)
self.setWindowFlag(Qt.MSWindowsFixedSizeDialogHint)
self.show()
self.ui.graphicsView.viewport().installEventFilter(self)
def eventFilter(self, watched, event):
if watched == self.ui.graphicsView.viewport() and event.type() == QEvent.Wheel:
return True
else:
return False
def closeEvent(self, event):
self.cap_thread.stop()
event.accept()
@Slot(np.ndarray)
def update_image(self, cv_img):
qt_img = self.convert_cv_qt(cv_img)
self.pix.setPixmap(qt_img)
def convert_cv_qt(self, cv_img):
h, w, ch = cv_img.shape
bytes_per_line = ch * w
convert_to_Qt_format = QImage(
cv_img.data, w, h, bytes_per_line, QImage.Format_RGB888
)
self.ui.graphicsView.fitInView(self.pix, Qt.KeepAspectRatioByExpanding)
return QPixmap.fromImage(convert_to_Qt_format)
class CaptureThread(QThread):
change_pixmap_signal = Signal(np.ndarray)
formats = [
ZBarSymbol.CODE128,
ZBarSymbol.QRCODE,
]
def __init__(self):
super().__init__()
self._run_flag = True
def run(self):
cap = cv2.VideoCapture(1, cv2.CAP_DSHOW)
while self._run_flag:
ret, cv_img = cap.read()
if ret:
frame = self.read_barcodes(cv_img)
self.change_pixmap_signal.emit(frame)
cap.release()
def stop(self):
self._run_flag = False
self.wait()
def read_barcodes(self, frame):
grayscale = cv2.cvtColor(frame.copy(), cv2.COLOR_BGR2GRAY)
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
barcodes = pyzbar.decode(
grayscale, symbols=[ZBarSymbol.CODE128, ZBarSymbol.QRCODE]
)
if len(barcodes) > 0:
for barcode in barcodes:
x, y, w, h = barcode.rect
frame = cv2.rectangle(frame, (x, y), (x + w, y + h), (255, 0, 0), 2)
barcode_info = barcode.data.decode("utf-8")
return frame
if __name__ == "__main__":
app = QApplication(sys.argv)
icon = QIcon("icon.ico")
system_tray = SystemTray(app)
window = MainWindow()
window.setWindowIcon(icon)
with open("style.css", "r") as f:
_style = f.read()
app.setStyleSheet(_style)
sys.exit(app.exec())
Each frame I am processing the image but performance is virtually unaffected. Could it be from FrameProvider's write_frame() method? If I could get the performance to match what I have now, your method would be my preferred. The closest I came to being able to fetch friendly input device names is when I came across this Stack Overflow post using Windows Media Foundation, CV-camera-finder, which unfortunately does not work for Python 3.10 (but worked for 3.7) and I lack the knowledge of C++ and binding Python functions to C++ to get it to work.
Upon further tinkering, I believe there is a memory leak somewhere. It might have to do with the _QImageInterface dataclass.
Example: processing camera image using pyzbar