初步的虚拟机vnc连接

This commit is contained in:
gfdgd xi 2024-07-30 16:36:45 +08:00
parent 83fd8fffd4
commit 24131b0050
9 changed files with 1751 additions and 3 deletions

View File

@ -62,7 +62,7 @@ if [[ $? == 0 ]] && [[ -f "$HOME/Qemu/Windows/Windows.qcow2" ]]; then
fi
fi
else
qemuUEFI="-vga virtio -device nec-usb-xhci,id=xhci,addr=0x1b -device usb-tablet,id=tablet,bus=xhci.0,port=1 "
qemuUEFI="-vga virtio -machine usb=on -device usb-tablet "
fi
echo $qemuUEFI
./VM/kvm-ok
@ -70,9 +70,10 @@ if [[ $? == 0 ]] && [[ -f "$HOME/Qemu/Windows/Windows.qcow2" ]]; then
echo X86 架构,使用 kvm 加速
$qemuMore qemu-system-x86_64 --enable-kvm -cpu host --hda "$HOME/Qemu/Windows/Windows.qcow2" \
-smp $CpuCount,sockets=$CpuSocketNum,cores=$(($CpuCoreNum / $CpuSocketNum)),threads=$(($CpuCount / $CpuCoreNum / $CpuSocketNum)) \
-m ${use}G -display vnc=:5 -display gtk -usb -nic model=rtl8139 $qemuUEFI \
-m ${use}G -display vnc=:5 -display gtk -nic model=rtl8139 $qemuUEFI \
-device AC97 -device ES1370 -device intel-hda -device hda-duplex \
--boot 'splash=VM/boot.jpg,menu=on,splash-time=2000' \
-usb \
> $TMPDIR/tmp/windows-virtual-machine-installer-for-wine-runner-run.log 2>&1 # 最新的 qemu 已经移除参数 -soundhw all
exit
fi
@ -99,7 +100,7 @@ if [[ $? == 0 ]] && [[ -f "$HOME/Qemu/Windows/Windows.qcow2" ]]; then
echo 不使用 kvm 加速
$qemuPath --hda "$HOME/Qemu/Windows/Windows.qcow2" \
-smp $CpuCount,sockets=$CpuSocketNum,cores=$(($CpuCoreNum / $CpuSocketNum)),threads=$(($CpuCount / $CpuCoreNum / $CpuSocketNum)) \
-m ${use}G -display vnc=:5 -display gtk -usb -nic model=rtl8139 $qemuUEFI \
-m ${use}G -display vnc=:5 -display gtk -nic model=rtl8139 $qemuUEFI \
-device AC97 -device ES1370 -device intel-hda -device hda-duplex \
--boot 'splash=VM/boot.jpg,menu=on,splash-time=2000' \
> $TMPDIR/tmp/windows-virtual-machine-installer-for-wine-runner-run.log 2>&1 # 最新的 qemu 已经移除参数 -soundhw all

44
novnc-client/main.py Normal file
View File

@ -0,0 +1,44 @@
import sys
import logging
from PyQt5.QtWidgets import QApplication, QMainWindow
from qvncwidget import QVNCWidget
#logging.basicConfig(level=logging.DEBUG) # DEBUG及以上的日志信息都会显示
class Window(QMainWindow):
def __init__(self):
super(Window, self).__init__()
self.setWindowTitle("QVNCWidget")
self.vnc = QVNCWidget(
parent=self,
host="127.0.0.1", port=5905,
readOnly=False
)
self.setCentralWidget(self.vnc)
# you can disable mouse tracking if desired
self.vnc.setMouseTracking(True)
self.setAutoFillBackground(True)
self.vnc.start()
def keyPressEvent(self, ev):
self.vnc.keyPressEvent(ev)
return super().keyPressEvent(ev) # in case you need the signal somewhere else in the window
def keyReleaseEvent(self, ev):
self.vnc.keyReleaseEvent(ev)
return super().keyReleaseEvent(ev) # in case you need the signal somewhere else in the window
def closeEvent(self, ev):
self.vnc.stop()
return super().closeEvent(ev)
app = QApplication(sys.argv)
window = Window()
window.resize(800, 600)
window.show()
sys.exit(app.exec_())

View File

@ -0,0 +1 @@
from .qvncwidget import QVNCWidget

View File

@ -0,0 +1,237 @@
#! /usr/bin/env python3
import struct
##
# reading
##
def read_float_buff(buffer, big_endian=False) -> float:
if big_endian:
return struct.unpack(">f", buffer.read(4))[0]
else:
return struct.unpack("<f", buffer.read(4))[0]
def read_double_buff(buffer, big_endian=False) -> float:
if big_endian:
return struct.unpack(">d", buffer.read(8))[0]
else:
return struct.unpack("<d", buffer.read(8))[0]
def read_uint8_buff(buffer, big_endian=False) -> int:
if big_endian:
return struct.unpack(">B", buffer.read(1))[0]
else:
return struct.unpack("<B", buffer.read(1))[0]
def read_uint16_buff(buffer, big_endian=False) -> int:
if big_endian:
return struct.unpack(">H", buffer.read(2))[0]
else:
return struct.unpack("<H", buffer.read(2))[0]
def read_uint32_buff(buffer, big_endian=False) -> int:
if big_endian:
return struct.unpack(">I", buffer.read(4))[0]
else:
return struct.unpack("<I", buffer.read(4))[0]
def read_uint64_buff(buffer, big_endian=False) -> int:
if big_endian:
return struct.unpack(">Q", buffer.read(8))[0]
else:
return struct.unpack("<Q", buffer.read(8))[0]
def read_sint8_buff(buffer, big_endian=False) -> int:
if big_endian:
return struct.unpack(">b", buffer.read(1))[0]
else:
return struct.unpack("<b", buffer.read(1))[0]
def read_sint16_buff(buffer, big_endian=False) -> int:
if big_endian:
return struct.unpack(">h", buffer.read(2))[0]
else:
return struct.unpack("<h", buffer.read(2))[0]
def read_sint32_buff(buffer, big_endian=False) -> int:
if big_endian:
return struct.unpack(">i", buffer.read(4))[0]
else:
return struct.unpack("<i", buffer.read(4))[0]
def read_sint64_buff(buffer, big_endian=False) -> int:
if big_endian:
return struct.unpack(">q", buffer.read(8))[0]
else:
return struct.unpack("<q", buffer.read(8))[0]
##
# writing
##
def write_float_buff(buffer, value: float, big_endian=False) -> None:
buffer.write(return_float_bytes(value, big_endian))
def write_double_buff(buffer, value: float, big_endian=False) -> None:
buffer.write(return_double_bytes(value, big_endian))
def write_uint8_buff(buffer, value: int, big_endian=False) -> None:
buffer.write(return_uint8_bytes(value, big_endian))
def write_uint16_buff(buffer, value: int, big_endian=False) -> None:
buffer.write(return_uint16_bytes(value, big_endian))
def write_uint32_buff(buffer, value: int, big_endian=False) -> None:
buffer.write(return_uint32_bytes(value, big_endian))
def write_uint64_buff(buffer, value: int, big_endian=False) -> None:
buffer.write(return_uint64_bytes(value, big_endian))
def write_sint8_buff(buffer, value: int, big_endian=False) -> None:
buffer.write(return_sint8_bytes(value, big_endian))
def write_sint16_buff(buffer, value: int, big_endian=False) -> None:
buffer.write(return_sint16_bytes(value, big_endian))
def write_sint32_buff(buffer, value: int, big_endian=False) -> None:
buffer.write(return_sint32_bytes(value, big_endian))
def write_sint64_buff(buffer, value: int, big_endian=False) -> None:
buffer.write(return_sint64_bytes(value, big_endian))
##
# return bytes
##
def return_float_bytes(value: float, big_endian=False) -> bytes:
if big_endian:
return struct.pack(">f", value)
else:
return struct.pack("<f", value)
def return_double_bytes(value: float, big_endian=False) -> bytes:
if big_endian:
return struct.pack(">d", value)
else:
return struct.pack("<d", value)
def return_uint8_bytes(value: int, big_endian=False) -> bytes:
if big_endian:
return struct.pack(">B", value)
else:
return struct.pack("<B", value)
def return_uint16_bytes(value: int, big_endian=False) -> bytes:
if big_endian:
return struct.pack(">H", value)
else:
return struct.pack("<H", value)
def return_uint32_bytes(value: int, big_endian=False) -> bytes:
if big_endian:
return struct.pack(">I", value)
else:
return struct.pack("<I", value)
def return_uint64_bytes(value: int, big_endian=False) -> bytes:
if big_endian:
return struct.pack(">Q", value)
else:
return struct.pack("<Q", value)
def return_sint8_bytes(value: int, big_endian=False) -> bytes:
if big_endian:
return struct.pack(">b", value)
else:
return struct.pack("<b", value)
def return_sint16_bytes(value: int, big_endian=False) -> bytes:
if big_endian:
return struct.pack(">h", value)
else:
return struct.pack("<h", value)
def return_sint32_bytes(value: int, big_endian=False) -> bytes:
if big_endian:
return struct.pack(">i", value)
else:
return struct.pack("<i", value)
def return_sint64_bytes(value: int, big_endian=False) -> bytes:
if big_endian:
return struct.pack(">q", value)
else:
return struct.pack("<q", value)
##
# return val
##
def return_float_val(data: bytes, big_endian=False) -> float:
if big_endian:
return struct.unpack(">f", data)[0]
else:
return struct.unpack("<f", data)[0]
def return_double_val(data: bytes, big_endian=False) -> float:
if big_endian:
return struct.unpack(">d", data)[0]
else:
return struct.unpack("<d", data)[0]
def return_uint8_val(data: bytes, big_endian=False) -> int:
if big_endian:
return struct.unpack(">B", data)[0]
else:
return struct.unpack("<B", data)[0]
def return_uint16_val(data: bytes, big_endian=False) -> int:
if big_endian:
return struct.unpack(">H", data)[0]
else:
return struct.unpack("<H", data)[0]
def return_uint32_val(data: bytes, big_endian=False) -> int:
if big_endian:
return struct.unpack(">I", data)[0]
else:
return struct.unpack("<I", data)[0]
def return_uint64_val(data: bytes, big_endian=False) -> int:
if big_endian:
return struct.unpack(">Q", data)[0]
else:
return struct.unpack("<Q", data)[0]
def return_sint8_val(data: bytes, big_endian=False) -> int:
if big_endian:
return struct.unpack(">b", data)[0]
else:
return struct.unpack("<b", data)[0]
def return_sint16_val(data: bytes, big_endian=False) -> int:
if big_endian:
return struct.unpack(">h", data)[0]
else:
return struct.unpack("<h", data)[0]
def return_sint32_val(data: bytes, big_endian=False) -> int:
if big_endian:
return struct.unpack(">i", data)[0]
else:
return struct.unpack("<i", data)[0]
def return_sint64_val(data: bytes, big_endian=False) -> int:
if big_endian:
return struct.unpack(">q", data)[0]
else:
return struct.unpack("<q", data)[0]

View File

@ -0,0 +1,690 @@
"""
Qt Widget for displaying VNC framebuffer using RFB protocol
(c) zocker-160 2024
licensed under GPLv3
"""
import logging
import time
from PyQt5.QtCore import (
QSize,
Qt,
pyqtSignal,
QSemaphore
)
from PyQt5.QtGui import (
QImage,
QPaintEvent,
QPainter,
QColor,
QBrush,
QPixmap,
QResizeEvent,
QKeyEvent,
QMouseEvent
)
from PyQt5.QtWidgets import (
QWidget,
QLabel,
QWidget,
QOpenGLWidget
)
from qvncwidget.rfb import RFBClient
from qvncwidget.rfbhelpers import RFBPixelformat, RFBInput
log = logging.getLogger("QVNCWidget")
class QVNCWidget(QWidget, RFBClient):
onInitialResize = pyqtSignal(QSize)
def __init__(self, parent: QWidget,
host: str, port = 5900, password: str = None,
readOnly = False):
super().__init__(
parent=parent,
host=host, port=port, password=password
)
self.readOnly = readOnly
self.backbuffer: QImage = None
self.frontbuffer: QImage = None
self.setMouseTracking(not self.readOnly)
self.setMinimumSize(1, 1) # make window scalable
self.mouseButtonMask = 0
def start(self):
self.startConnection()
def stop(self):
self.closeConnection()
def onConnectionMade(self):
log.info("VNC handshake done")
self.setPixelFormat(RFBPixelformat.getRGB32())
self.PIX_FORMAT = QImage.Format.Format_RGB32
self.backbuffer = QImage(self.vncWidth, self.vncHeight, self.PIX_FORMAT)
self.onInitialResize.emit(QSize(self.vncWidth, self.vncHeight))
def onRectangleUpdate(self,
x: int, y: int, width: int, height: int, data: bytes):
if self.backbuffer is None:
log.warning("backbuffer is None")
return
else:
log.debug("drawing backbuffer")
#with open(f"{width}x{height}.data", "wb") as f:
# f.write(data)
t1 = time.time()
painter = QPainter(self.backbuffer)
painter.drawImage(x, y, QImage(data, width, height, self.PIX_FORMAT))
painter.end()
log.debug(f"painting took: {(time.time() - t1)*1e3} ms")
del painter
del data
def onFramebufferUpdateFinished(self):
log.debug("FB Update finished")
self.update()
def paintEvent(self, a0: QPaintEvent):
#log.debug("Paint event")
painter = QPainter(self)
if self.backbuffer is None:
log.debug("backbuffer is None")
painter.fillRect(0, 0, self.width(), self.height(), Qt.GlobalColor.black)
else:
self.frontbuffer = self.backbuffer.scaled(
self.width(), self.height(),
Qt.AspectRatioMode.KeepAspectRatio,
Qt.TransformationMode.SmoothTransformation
)
painter.drawImage(0, 0, self.frontbuffer)
painter.end()
# Mouse events
def mousePressEvent(self, ev: QMouseEvent):
if self.readOnly or not self.frontbuffer: return
self.mouseButtonMask = RFBInput.fromQMouseEvent(ev, True, self.mouseButtonMask)
self.pointerEvent(*self._getRemoteRel(ev), self.mouseButtonMask)
def mouseReleaseEvent(self, ev: QMouseEvent):
if self.readOnly or not self.frontbuffer: return
self.mouseButtonMask = RFBInput.fromQMouseEvent(ev, False, self.mouseButtonMask)
self.pointerEvent(*self._getRemoteRel(ev), self.mouseButtonMask)
def mouseMoveEvent(self, ev: QMouseEvent):
if self.readOnly or not self.frontbuffer: return
try:
# 忽略拖动导致的问题
self.pointerEvent(*self._getRemoteRel(ev), self.mouseButtonMask)
except:
pass
def _getRemoteRel(self, ev: QMouseEvent) -> tuple:
xPos = (ev.localPos().x() / self.frontbuffer.width()) * self.vncWidth
yPos = (ev.localPos().y() / self.frontbuffer.height()) * self.vncHeight
return int(xPos), int(yPos)
# Key events
def keyPressEvent(self, ev: QKeyEvent):
if self.readOnly: return
self.keyEvent(RFBInput.fromQKeyEvent(ev.key(), ev.text()), down=1)
def keyReleaseEvent(self, ev: QKeyEvent):
if self.readOnly: return
self.keyEvent(RFBInput.fromQKeyEvent(ev.key(), ev.text()), down=0)
# other experimental implementations
class QVNCWidgetGL(QOpenGLWidget, RFBClient):
IMG_FORMAT = QImage.Format_RGB32
onInitialResize = pyqtSignal(QSize)
#onUpdatePixmap = pyqtSignal(int, int, int, int, bytes)
onUpdatePixmap = pyqtSignal()
onSetPixmap = pyqtSignal()
onKeyPress = pyqtSignal(QKeyEvent)
onKeyRelease = pyqtSignal(QKeyEvent)
def __init__(self, parent,
host, port=5900, password: str=None,
mouseTracking=False):
#super(QOpenGLWidget, self).__init__()
#super(RFBClient, self).__init__(
super().__init__(
parent=parent,
host=host,
port=port,
password=password,
daemonThread=True
)
#self.setAlignment(Qt.AlignCenter)
#self.onUpdatePixmap.connect(self._updateImage)
#self.onSetPixmap.connect(self._setImage)
self.onSetPixmap.connect(self._updateImage)
self.acceptMouseEvents = False # mouse events are not accepted at first
self.setMouseTracking(mouseTracking)
# Allow Resizing
self.setMinimumSize(1, 1)
self.data = list(tuple())
self.dataMonitor = QSemaphore(0)
def start(self):
self.startConnection()
def stop(self):
self.closeConnection()
def onConnectionMade(self):
log.info("VNC handshake done")
self.setPixelFormat(RFBPixelformat.getRGB32())
self.onInitialResize.emit(QSize(self.vncWidth, self.vncHeight))
self._initKeypress()
self._initMouse()
def onRectangleUpdate(self,
x: int, y: int, width: int, height: int, data: bytes):
#img = QImage(data, width, height, self.IMG_FORMAT)
#self.onUpdatePixmap.emit(x, y, width, height, data)
#self.dataMonitor.acquire(1)
self.data.append((x, y, width, height, data))
#self.data = (x, y, width, height, data)
#self.dataMonitor.release(1)
#self.onUpdatePixmap.emit()
#else:
# print("AAAAAAAAAAAAAA", "MONITOR AQUIRE FAILED")
def onFramebufferUpdateFinished(self):
self.onSetPixmap.emit()
return
if self.pixmap:
#self.setPixmap(QPixmap.fromImage(self.image))
self.resizeEvent(None)
def onFatalError(self, error: Exception):
log.error(str(error))
#logging.exception(str(error))
#self.reconnect()
#def _updateImage(self, x: int, y: int, width: int, height: int, data: bytes):
def _updateImage(self):
print("update image")
self.update()
#if not self.screen:
# self.screen = QImage(width, height, self.IMG_FORMAT)
# self.screen.fill(Qt.red)
# self.screenPainter = QPainter(self.screen)
#self.painter.beginNativePainting()
#self.painter.drawPixmapFragments()
#with open("/tmp/images/test.raw", "wb") as f:
# f.write(data)
#p = QPainter(self.screen)
#self.screenPainter.drawImage(
# x, y, QImage(data, width, height, self.IMG_FORMAT))
#p.end()
#self.repaint()
#self.update()
def _setPixmap(self):
if self.pixmap:
self.setPixmap(
self.pixmap.scaled(
self.width(), self.height(),
Qt.KeepAspectRatio,
Qt.SmoothTransformation
)
)
def _setImage(self):
if self.screen:
self.setPixmap(QPixmap.fromImage(
self.screen.scaled(
self.width(), self.height(),
Qt.KeepAspectRatio,
Qt.SmoothTransformation
)
))
self.acceptMouseEvents = True # mouse events are getting accepted
# Passed events
def _keyPress(self, ev: QKeyEvent):
self.keyEvent(
RFBInput.fromQKeyEvent(ev.key(), ev.text()), down=1)
def _keyRelease(self, ev: QKeyEvent):
self.keyEvent(
RFBInput.fromQKeyEvent(ev.key(), ev.text()), down=0)
# Window events
def paintEvent(self, e: QPaintEvent):
print("paint event")
#self.dataMonitor.acquire(1)
#while self.dataMonitor.tryAcquire(1):
while len(self.data) > 0:
x, y, w, h, data = self.data.pop(0)
p = QPainter(self)
#p.setPen(QColor(255, 0, 0))
#p.drawText(e.rect(), Qt.AlignCenter, str(self.dataMonitor.available()))
p.drawImage(x, y, QImage(data, w, h, self.IMG_FORMAT))
p.end()
#self.dataMonitor.release(1)
return
p = QPainter(self)
p.fillRect(e.rect(), QBrush(QColor(255, 255, 255)))
p.end()
return
if self.dataMonitor.tryAcquire(1):
x, y, w, h, data = self.data
p = QPainter(self)
p.drawImage(x, y, QImage(data, w, h, self.IMG_FORMAT))
p.end()
self.dataMonitor.release(1)
print("CCCCC", "Image painted diggah")
else:
print("BBBBBBBBB", "aquire monitor failed")
#return super().paintEvent(a0)
return
if not self.screen:
self.screen = QImage(self.size(), self.IMG_FORMAT)
self.screen.fill(Qt.red)
self.screenPainter = QPainter(self.screen)
p = QPainter()
p.begin(self)
p.drawImage(0, 0,
self.screen.scaled(
self.width(), self.height(),
Qt.KeepAspectRatio,
Qt.SmoothTransformation
))
p.end()
def resizeEvent(self, e: QResizeEvent):
return super().resizeEvent(e)
def resizeGL(self, w: int, h: int):
print("RESIZE THAT BITCH!!!", w, h)
#return super().resizeGL(w, h)
def resizeEvent_(self, a0: QResizeEvent):
#print("RESIZE!", self.width(), self.height())
#return super().resizeEvent(a0)
if self.screen:
self.setPixmap(QPixmap.fromImage(
self.screen.scaled(
self.width(), self.height(),
Qt.KeepAspectRatio,
Qt.SmoothTransformation
))
)
return super().resizeEvent(a0)
def mousePressEvent(self, ev: QMouseEvent):
#print(ev.localPos(), ev.button())
#print(self.height() - self.pixmap().height())
if self.acceptMouseEvents: # need pixmap instance
self.buttonMask = RFBInput.fromQMouseEvent(ev, True, self.buttonMask)
self.pointerEvent(*self._getRemoteRel(ev), self.buttonMask)
return super().mousePressEvent(ev)
def mouseReleaseEvent(self, ev: QMouseEvent):
if self.acceptMouseEvents: # need pixmap instance
self.buttonMask = RFBInput.fromQMouseEvent(ev, False, self.buttonMask)
self.pointerEvent(*self._getRemoteRel(ev), self.buttonMask)
return super().mouseReleaseEvent(ev)
def mouseMoveEvent(self, ev: QMouseEvent):
if self.acceptMouseEvents: # need pixmap instance
self.pointerEvent(*self._getRemoteRel(ev), self.buttonMask)
# FIXME: The pixmap is assumed to be aligned center.
def _getRemoteRel(self, ev: QMouseEvent) -> tuple:
# FIXME: this code is ugly as fk
# y coord is kinda fucked up
yDiff = (self.height() - self.pixmap().height()) / 2
yPos = ev.localPos().y() - yDiff
if yPos < 0: yPos = 0
if yPos > self.pixmap().height(): yPos = self.pixmap().height()
yPos = self._calcRemoteRel(
yPos, self.pixmap().height(), self.vncHeight)
# x coord is kinda fucked up, too
xDiff = (self.width() - self.pixmap().width()) / 2
xPos = ev.localPos().x() - xDiff
if xPos < 0: xPos = 0
if xPos > self.pixmap().width(): xPos = self.pixmap().width()
xPos = self._calcRemoteRel(
xPos, self.pixmap().width(), self.vncWidth)
return xPos, yPos
def _calcRemoteRel(self, locRel, locMax, remoteMax) -> int:
return int( (locRel / locMax) * remoteMax )
def _initMouse(self):
self.buttonMask = 0 # pressed buttons (bit fields)
def _initKeypress(self):
self.onKeyPress.connect(self._keyPress)
self.onKeyRelease.connect(self._keyRelease)
def __del__(self):
self.stop()
def __exit__(self, *args):
self.stop()
self.deleteLater()
class QVNCWidget_old(QLabel, RFBClient):
IMG_FORMAT = QImage.Format_RGB32
onInitialResize = pyqtSignal(QSize)
onUpdatePixmap = pyqtSignal(int, int, int, int, bytes)
onSetPixmap = pyqtSignal()
onKeyPress = pyqtSignal(QKeyEvent)
onKeyRelease = pyqtSignal(QKeyEvent)
def __init__(self, parent,
host, port=5900, password: str=None,
mouseTracking=False):
super().__init__(
parent=parent,
host=host,
port=port,
password=password,
daemonThread=True
)
#import faulthandler
#faulthandler.enable()
self.screen: QImage = None
# FIXME: The pixmap is assumed to be aligned center.
self.setAlignment(Qt.AlignCenter)
self.onUpdatePixmap.connect(self._updateImage)
self.onSetPixmap.connect(self._setImage)
self.acceptMouseEvents = False # mouse events are not accepted at first
self.setMouseTracking(mouseTracking)
# Allow Resizing
self.setMinimumSize(1,1)
def _initMouse(self):
self.buttonMask = 0 # pressed buttons (bit fields)
def _initKeypress(self):
self.onKeyPress.connect(self._keyPress)
self.onKeyRelease.connect(self._keyRelease)
def start(self):
self.startConnection()
def stop(self):
self.closeConnection()
if self.screenPainter: self.screenPainter.end()
def onConnectionMade(self):
self.onInitialResize.emit(QSize(self.vncWidth, self.vncHeight))
self.setPixelFormat(RFBPixelformat.getRGB32())
self._initKeypress()
self._initMouse()
def onRectangleUpdate(self,
x: int, y: int, width: int, height: int, data: bytes):
#img = QImage(data, width, height, self.IMG_FORMAT)
self.onUpdatePixmap.emit(x, y, width, height, data)
def onFramebufferUpdateFinished(self):
self.onSetPixmap.emit()
return
if self.pixmap:
#self.setPixmap(QPixmap.fromImage(self.image))
self.resizeEvent(None)
def onFatalError(self, error: Exception):
log.error(str(error))
#logging.exception(str(error))
#self.reconnect()
def _updateImage(self, x: int, y: int, width: int, height: int, data: bytes):
if not self.screen:
self.screen = QImage(width, height, self.IMG_FORMAT)
self.screen.fill(Qt.red)
self.screenPainter = QPainter(self.screen)
#self.painter.beginNativePainting()
#self.painter.drawPixmapFragments()
#with open("/tmp/images/test.raw", "wb") as f:
# f.write(data)
#p = QPainter(self.screen)
self.screenPainter.drawImage(
x, y, QImage(data, width, height, self.IMG_FORMAT))
#p.end()
#self.repaint()
#self.update()
def _drawPixmap(self, x: int, y: int, pix: QPixmap):
#self.paintLock.acquire()
self.pixmap = pix
if not self.painter:
self.painter = QPainter(self.pixmap)
else:
print("DRAW PIXMAP:", x, y, self.pixmap, self.painter, pix, pix.isNull())
self.painter.drawPixmap(x, y, self.pixmap)
#self.paintLock.release()
def _drawPixmap2(self, x: int, y: int, pix: QPixmap, data: bytes):
if not self.pixmap or (
x == 0 and y == 0 and
pix.width() == self.pixmap.width() and pix.height() == self.pixmap.height()):
self.pixmap = pix.copy()
self._setPixmap()
return
import time
print("DRAW PIXMAP:", x, y, self.pixmap.width(), self.pixmap.height(), pix.width(), pix.height())
_t = time.time()
#self.pixmap.save(f"/tmp/images/imgP_{_t}", "jpg")
#with open(f"/tmp/images/img_{_t}.raw", "wb") as f:
# f.write(data)
#pix.save(f"/tmp/images/img_{_t}", "jpg")
painter = QPainter(self.pixmap)
painter.drawPixmap(x, y, pix)
painter.end()
#self._setPixmap()
def _setPixmap(self):
if self.pixmap:
self.setPixmap(
self.pixmap.scaled(
self.width(), self.height(),
Qt.KeepAspectRatio,
Qt.SmoothTransformation
)
)
def _setImage(self):
if self.screen:
self.setPixmap(QPixmap.fromImage(
self.screen.scaled(
self.width(), self.height(),
Qt.KeepAspectRatio,
Qt.SmoothTransformation
)
))
self.acceptMouseEvents = True # mouse events are getting accepted
# Passed events
def _keyPress(self, ev: QKeyEvent):
self.keyEvent(
RFBInput.fromQKeyEvent(ev.key(), ev.text()), down=1)
def _keyRelease(self, ev: QKeyEvent):
self.keyEvent(
RFBInput.fromQKeyEvent(ev.key(), ev.text()), down=0)
# Window events
def paintEvent(self, a0: QPaintEvent):
return super().paintEvent(a0)
if not self.screen:
self.screen = QImage(self.size(), self.IMG_FORMAT)
self.screen.fill(Qt.red)
self.screenPainter = QPainter(self.screen)
p = QPainter()
p.begin(self)
p.drawImage(0, 0,
self.screen.scaled(
self.width(), self.height(),
Qt.KeepAspectRatio,
Qt.SmoothTransformation
))
p.end()
def resizeEvent(self, a0: QResizeEvent):
#print("RESIZE!", self.width(), self.height())
#return super().resizeEvent(a0)
if self.screen:
self.setPixmap(QPixmap.fromImage(
self.screen.scaled(
self.width(), self.height(),
Qt.KeepAspectRatio,
Qt.SmoothTransformation
))
)
return super().resizeEvent(a0)
def mousePressEvent(self, ev: QMouseEvent):
#print(ev.localPos(), ev.button())
#print(self.height() - self.pixmap().height())
if self.acceptMouseEvents: # need pixmap instance
self.buttonMask = RFBInput.fromQMouseEvent(ev, True, self.buttonMask)
self.pointerEvent(*self._getRemoteRel(ev), self.buttonMask)
return super().mousePressEvent(ev)
def mouseReleaseEvent(self, ev: QMouseEvent):
if self.acceptMouseEvents: # need pixmap instance
self.buttonMask = RFBInput.fromQMouseEvent(ev, False, self.buttonMask)
self.pointerEvent(*self._getRemoteRel(ev), self.buttonMask)
return super().mouseReleaseEvent(ev)
def mouseMoveEvent(self, ev: QMouseEvent):
if self.acceptMouseEvents: # need pixmap instance
self.pointerEvent(*self._getRemoteRel(ev), self.buttonMask)
# FIXME: The pixmap is assumed to be aligned center.
def _getRemoteRel(self, ev: QMouseEvent) -> tuple:
# FIXME: this code is ugly as fk
# y coord is kinda fucked up
yDiff = (self.height() - self.pixmap().height()) / 2
yPos = ev.localPos().y() - yDiff
if yPos < 0: yPos = 0
if yPos > self.pixmap().height(): yPos = self.pixmap().height()
yPos = self._calcRemoteRel(
yPos, self.pixmap().height(), self.vncHeight)
# x coord is kinda fucked up, too
xDiff = (self.width() - self.pixmap().width()) / 2
xPos = ev.localPos().x() - xDiff
if xPos < 0: xPos = 0
if xPos > self.pixmap().width(): xPos = self.pixmap().width()
xPos = self._calcRemoteRel(
xPos, self.pixmap().width(), self.vncWidth)
return xPos, yPos
def _calcRemoteRel(self, locRel, locMax, remoteMax) -> int:
return int( (locRel / locMax) * remoteMax )
def __del__(self):
self.stop()
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
self.deleteLater()

View File

@ -0,0 +1,485 @@
"""
RFB protocol implementation, client side
(c) zocker-160 2024
licensed under GPLv3
References:
- http://www.realvnc.com/docs/rfbproto.pdf
- https://github.com/rfbproto/rfbproto/blob/master/rfbproto.rst
"""
from qvncwidget.rfbhelpers import RFBPixelformat, RFBRectangle
from qvncwidget.rfbdes import RFBDes
import qvncwidget.rfbconstants as c
import qvncwidget.easystruct as es
from threading import Thread
import logging
import socket
from socket import SHUT_RDWR
import struct as s
import time
import threading
class RFBUnexpectedResponse(Exception):
pass
class RFBNoResponse(Exception):
pass
class RFBUnknownVersion(Exception):
pass
class RFBHandshakeFailed(Exception):
pass
class VNCAuthentificationFailed(Exception):
pass
SUPPORTED_VERSIONS = [
(3,3)
]
KNOWN_VERSIONS = [
(3,3), (3,6), (3,7), (3,8),
(4,0), (4,1),
(5,0)
]
"""
3.3: official minimum version
3.6: UltraVNC
3.7: official
3.8: official
4.0: Intel AMT KVM
4.1: RealVNC 4.6
5.0: RealVNC 5.3
"""
SUPPORTED_ENCODINGS = [
c.ENC_RAW
]
MAX_BUFF_SIZE: int = 10*1024*1024 # 10MB
class RFBClient:
log = logging.getLogger("RFB Client")
logc = logging.getLogger("RFB -> Server")
logs = logging.getLogger("RFB Client <-")
pixformat: RFBPixelformat
numRectangles = 0
#rectanglePositions = list() # list[RFBRectangle]
_stop = False
_connected = False
_requestFrameBufferUpdate = False
_incrementalFrameBufferUpdate = True
def __init__(self, host, port = 5900,
password: str = None,
sharedConnection = True,
keepRequesting = True,
requestIncremental = True):
self.host = host
self.port = port
self.password = password
self.sharedConn = sharedConnection
self._requestFrameBufferUpdate = keepRequesting
self._incrementalFrameBufferUpdate = requestIncremental
self._mainLoop: Thread = None
def __recv(self, expectedSize: int = None, maxSize=MAX_BUFF_SIZE) -> bytes:
if not expectedSize:
buffer = self.connection.recv(4096)
else:
buffer = self.connection.recv(expectedSize, socket.MSG_WAITALL)
if len(buffer) <= 50:
self.logs.debug(f"len: {len(buffer)} | {buffer}")
else:
self.logs.debug(f"{len(buffer)} Bytes | {len(buffer)//1024} KB")
return buffer
def __send(self, data: bytes):
self.connection.send(data)
self.logc.debug(data)
def __start(self):
self.connection = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.connection.connect( (self.host, self.port) )
self._handleInitial()
def __close(self):
self.log.debug("Closing connection")
if self.connection:
try:
self.connection.shutdown(SHUT_RDWR)
self.connection.close()
except OSError:
self.log.debug("TCP Connection already closed")
def _handleInitial(self):
buffer = self.__recv(12)
if b'\n' in buffer and buffer.startswith(b'RFB'):
maj, min = [int(x) for x in buffer[3:-1].split(b'.')]
self.log.info(f"RFB from server: {maj}.{min}")
if (maj, min) not in KNOWN_VERSIONS:
raise RFBUnknownVersion(f"Unknown RFB version by server: {maj}.{min}")
if (maj, min) not in SUPPORTED_VERSIONS:
# request highest supported version
# TODO: requested version must not be higher than
# the one offered by the server
maj, min = SUPPORTED_VERSIONS[-1]
else:
self.__close()
raise RFBUnknownVersion(buffer)
self.version_maj, self.version_min = maj, min
# request supported RFB version
self.__send(f"RFB 00{maj}.00{min}\n".encode())
self.log.info("VNC connected")
if (maj, min) == (3,3):
self._handleAuth33(self.__recv(4))
else:
self.log.error(f"Missing AUTH implementation for {maj}.{min}")
def _handleAuth33(self, data: bytes):
"""
Handle security handshake for protocol version 3.3.
In this version, the server decides the protocol (failed, none or VNCAuth)
"""
auth = es.return_uint32_val(data, True)
if auth == c.AUTH_FAIL:
self._handleConnFailed(self.__recv(4))
elif auth == c.AUTH_NONE:
self._doClientInit()
elif auth == c.AUTH_VNCAUTH:
self._handleVNCAuth(self.__recv(16))
else:
self.__close()
raise RFBUnexpectedResponse(f"Unknown auth response {auth}")
def _doClientInit(self):
shared = 1 if self.sharedConn else 0
self.__send(es.return_uint8_bytes(shared, True))
self._handleServerInit(self.__recv(24))
def _handleServerInit(self, data: bytes):
try:
self.vncWidth, self.vncHeight, pixformat, namelen = s.unpack("!HH16sI", data)
except s.error as e:
self.log.error("Handshake failed")
self.__close()
raise RFBHandshakeFailed(e)
threading.Thread(target=a).start()
self.desktopname = self.__recv(namelen).decode()
self.log.debug(f"Connecting to \"{self.desktopname}\"")
pixformatData = s.unpack("!BBBBHHHBBBxxx", pixformat)
self.pixformat = RFBPixelformat(*pixformatData)
self.log.debug(f"Server Pixelformat: {self.pixformat}")
self.log.debug(f"Resolution: {self.vncWidth}x{self.vncHeight}")
# this should not be required, but some VNC servers (like QT QPA VNC)
# require this to send FramebufferUpdate
self.setEncodings(SUPPORTED_ENCODINGS)
self.onConnectionMade()
self._connected = True
# enter main request loop
self._mainRequestLoop()
def _handleVNCAuth(self, data: bytes):
self._VNCAuthChallenge = data
self.log.info("Requesting password")
self.vncRequestPassword()
self._handleVNCAuthResult(self.__recv(4))
def _handleVNCAuthResult(self, data: bytes):
try:
result = es.return_uint32_val(data)
except s.error as e:
raise VNCAuthentificationFailed(f"Authentication failed ({str(e)})")
self.log.debug(f"Auth result {result}")
if result == c.SMSG_AUTH_OK:
self._doClientInit()
elif result == c.SMSG_AUTH_FAIL:
if self.version_min > 7:
self._handleVNCAuthError(self.__recv(4))
else:
raise VNCAuthentificationFailed("Authentication failed")
elif result == c.SMSG_AUTH_TOOMANY:
raise VNCAuthentificationFailed("Too many login attempts")
else:
self.log.error(f"Unknown Auth response ({result})")
def _handleVNCAuthError(self, data: bytes):
waitfor = es.return_uint32_val(data)
raise VNCAuthentificationFailed(
f"Authentication failed ({self.__recv(waitfor)})")
def _handleConnFailed(self, data: bytes):
waitfor = es.return_uint32_val(data)
resp = self.__recv(waitfor)
self.__close()
raise RFBHandshakeFailed(resp)
# ------------------------------------------------------------------
## Main request loop
# ------------------------------------------------------------------
def _mainRequestLoop(self):
time.sleep(0.2)
# first request is non incremental
self.framebufferUpdateRequest(incremental=False)
while not self._stop and self.connection:
try:
dType = self.__recv(1)
# when self.connection.close() is being called
# dType will be empty with length of 0
if len(dType) == 0:
continue
start = time.time()
self._handleConnection(dType)
self.log.debug(f"processing update took: {(time.time() - start)*1e3} ms")
except socket.timeout:
self.log.debug("timeout triggered")
continue
except s.error as e:
self.log.exception(str(e))
continue
except Exception as e:
self.onFatalError(e)
#print("AAA")
if self._requestFrameBufferUpdate:
self.framebufferUpdateRequest(
incremental=self._incrementalFrameBufferUpdate)
#print("BBB")
self.log.debug("loop exit")
# ------------------------------------------------------------------
## Server -> Client messages
# ------------------------------------------------------------------
def _handleConnection(self, data: bytes):
msgid = es.return_uint8_val(data)
if msgid == c.SMSG_FBUPDATE:
# Framebuffer Update
self._handleFramebufferUpdate(self.__recv(3))
elif msgid == c.SMSG_BELL:
# bell
self.onBell()
elif msgid == c.SMSG_SERVERCUTTEXT:
# server cut text
self._handleServerCutText(self.__recv(7))
elif msgid == c.SMSG_SETCOLORMAP:
# set color map entries
pass
else:
self.log.warning(f"Unknown message type recieved (id {msgid})")
raise RFBUnexpectedResponse
def _handleServerCutText(self, data: bytes):
datalength = s.unpack("!xxxI", data)[0]
data = self.__recv(datalength)
self.log.debug(f"Server clipboard: {data}")
# TODO: create callback
def _handleFramebufferUpdate(self, data: bytes):
numRectangles = s.unpack("!xH", data)[0]
self.log.debug(f"numRectangles: {numRectangles}")
self.onBeginUpdate()
for _ in range(numRectangles):
self._handleRectangle(self.__recv(12))
self.onFramebufferUpdateFinished()
def _handleRectangle(self, data: bytes):
xPos, yPos, width, height, encoding = s.unpack("!HHHHI", data)
rect = RFBRectangle(xPos, yPos, width, height)
self.log.debug(f"RECT: {rect}")
if encoding == c.ENC_RAW:
size = (width*height*self.pixformat.bitspp) // 8
self.log.debug(f"expected size: {size}")
start = time.time()
data = self.__recv(expectedSize=size)
self.log.debug(f"fetching data took: {(time.time() - start)*1e3} ms")
self._decodeRAW(data, rect)
del data
else:
raise TypeError(f"Unsupported encoding received ({encoding})")
# ------------------------------------------------------------------
## Image decoding stuff
# ------------------------------------------------------------------
def _decodeRAW(self, data: bytes, rectangle: RFBRectangle):
self.onRectangleUpdate(*rectangle.asTuple(), data)
# ------------------------------------------------------------------
## Client -> Server messages
# ------------------------------------------------------------------
def setPixelFormat(self, pixelformat: RFBPixelformat):
self.pixformat = pixelformat
pformat = s.pack("!BBBBHHHBBBxxx", *pixelformat.asTuple())
self.__send(s.pack("!Bxxx16s", c.CMSG_SETPIXELFORMAT, pformat))
def setEncodings(self, encodings: list):
self.__send(s.pack("!BxH", c.CMSG_SETENCODINGS, len(encodings)))
for encoding in encodings:
self.__send(es.return_sint32_bytes(encoding, True))
def framebufferUpdateRequest(self,
xPos=0, yPos=0, width=None, height=None,
incremental=False):
if not width: width = self.vncWidth - xPos
if not height: height = self.vncHeight - yPos
inc = 1 if incremental else 0
self.__send(s.pack(
"!BBHHHH",
c.CMSG_FBUPDATEREQ, inc,
xPos, yPos, width, height))
def keyEvent(self, key, down=1):
"""
For most ordinary keys, the "keysym" is the same as the corresponding ASCII value.
Other common keys are shown in the KEY_ constants
"""
self.log.debug(f'keyEvent: {key}, {"down" if down else "up"}')
self.__send(s.pack(
"!BBxxI",
c.CMSG_KEYEVENT, down, key))
def pointerEvent(self, x: int, y: int, buttommask=0):
"""
Indicates either pointer movement or a pointer button press or release. The pointer is
now at (x-position, y-position), and the current state of buttons 1 to 8 are represented
by bits 0 to 7 of button-mask respectively, 0 meaning up, 1 meaning down (pressed)
"""
if not self._connected: return
self.log.debug(f"pointerEvent: {x}, {y}, {buttommask}")
self.__send(s.pack(
"!BBHH",
c.CMSG_POINTEREVENT, buttommask, x, y))
# ------------------------------------------------------------------
## Direct Calls
# ------------------------------------------------------------------
def startConnection(self):
self._mainLoop = Thread(target=self.__start)
self._mainLoop.start()
def sendPassword(self, password):
if type(password) is str:
password = password.encode("ascii")
password = (password + bytes(8))[:8]
des = RFBDes(password)
self.__send(des.encrypt(self._VNCAuthChallenge))
def reconnect(self):
self.closeConnection()
self.startConnection()
def closeConnection(self):
self._stop = True
self._connected = False
self.__close()
if self._mainLoop and self._mainLoop.is_alive():
self.log.debug("waiting for main loop to exit")
self._mainLoop.join()
# ------------------------------------------------------------------
## Callbacks
# ------------------------------------------------------------------
def onConnectionMade(self):
"""
connection is initialized and ready
the pixel format and encodings can be set here using
setPixelFormat() and setEncodings()
the RFB main update loop will start after this function is done
"""
def onBeginUpdate(self):
"""
called before a series of updateRectangle(),
copyRectangle() or fillRectangle().
"""
def onRectangleUpdate(self,
x: int, y: int, width: int, height: int, data: bytes):
"""
new bitmap data. data are bytes in the pixel format set
up earlier.
"""
def onFramebufferUpdateFinished(self):
"""
called after a series of updateRectangle(), copyRectangle()
or fillRectangle() are finished.
"""
def onBell(self):
"""
a bell, yes that's right a BELL
"""
def vncRequestPassword(self):
"""
a password is needed to log on, use sendPassword() to
send one.
"""
if not self.password:
raise VNCAuthentificationFailed("No password specified")
else:
self.sendPassword(self.password)
def onFatalError(self, error: Exception):
"""
called when fatal error occurs
which caused the main loop to crash
you can try to reconnect here with reconnect()
"""
raise error

View File

@ -0,0 +1,165 @@
from PyQt5.QtCore import Qt
## Encoding Type for SetEncodings()
# publicly documented
ENC_RAW = 0 # Raw
ENC_COPYRECT = 1 # CopyRect
ENC_RRE = 2 # RRE
ENC_HEXTILE = 5 # Hextile
ENC_TRLE = 15 # TRLE
ENC_ZRLE = 16 # ZRLE
# pseudo-encodings
ENC_CURSOR = -239 # Cursor position pseudo-encoding
ENC_DESKTOPSIZE = -223 # DesktopSize pseudo-encoding
# additional
ENC_CORRE = 4
ENC_ZLIB = 6
ENC_TIGHT = 7
ENC_ZLIBHEX = 8
## Keycodes for KeyEvent()
KEY_BackSpace = 0xff08
KEY_Tab = 0xff09
KEY_Return = 0xff0d
KEY_Escape = 0xff1b
KEY_Insert = 0xff63
KEY_Delete = 0xffff
KEY_Home = 0xff50
KEY_End = 0xff57
KEY_PageUp = 0xff55
KEY_PageDown = 0xff56
KEY_Left = 0xff51
KEY_Up = 0xff52
KEY_Right = 0xff53
KEY_Down = 0xff54
KEY_F1 = 0xffbe
KEY_F2 = 0xffbf
KEY_F3 = 0xffc0
KEY_F4 = 0xffc1
KEY_F5 = 0xffc2
KEY_F6 = 0xffc3
KEY_F7 = 0xffc4
KEY_F8 = 0xffc5
KEY_F9 = 0xffc6
KEY_F10 = 0xffc7
KEY_F11 = 0xffc8
KEY_F12 = 0xffc9
KEY_F13 = 0xFFCA
KEY_F14 = 0xFFCB
KEY_F15 = 0xFFCC
KEY_F16 = 0xFFCD
KEY_F17 = 0xFFCE
KEY_F18 = 0xFFCF
KEY_F19 = 0xFFD0
KEY_F20 = 0xFFD1
KEY_ShiftLeft = 0xffe1
KEY_ShiftRight = 0xffe2
KEY_ControlLeft = 0xffe3
KEY_ControlRight = 0xffe4
KEY_MetaLeft = 0xffe7
KEY_MetaRight = 0xffe8
KEY_AltLeft = 0xffe9
KEY_AltRight = 0xffea
KEY_Scroll_Lock = 0xFF14
KEY_Sys_Req = 0xFF15
KEY_Num_Lock = 0xFF7F
KEY_Caps_Lock = 0xFFE5
KEY_Pause = 0xFF13
KEY_Super_L = 0xFFEB
KEY_Super_R = 0xFFEC
KEY_Hyper_L = 0xFFED
KEY_Hyper_R = 0xFFEE
KEY_KP_0 = 0xFFB0
KEY_KP_1 = 0xFFB1
KEY_KP_2 = 0xFFB2
KEY_KP_3 = 0xFFB3
KEY_KP_4 = 0xFFB4
KEY_KP_5 = 0xFFB5
KEY_KP_6 = 0xFFB6
KEY_KP_7 = 0xFFB7
KEY_KP_8 = 0xFFB8
KEY_KP_9 = 0xFFB9
KEY_KP_Enter = 0xFF8D
# thanks to ken3 (https://github.com/ken3) for this
KEY_TRANSLATION_SPECIAL = {
Qt.Key.Key_Backspace: KEY_BackSpace,
Qt.Key.Key_Tab: KEY_Tab,
Qt.Key.Key_Return: KEY_Return,
Qt.Key.Key_Escape: KEY_Escape,
Qt.Key.Key_Insert: KEY_Insert,
Qt.Key.Key_Delete: KEY_Delete,
Qt.Key.Key_Home: KEY_Home,
Qt.Key.Key_End: KEY_End,
Qt.Key.Key_PageUp: KEY_PageUp,
Qt.Key.Key_PageDown: KEY_PageDown,
Qt.Key.Key_Left: KEY_Left,
Qt.Key.Key_Up: KEY_Up,
Qt.Key.Key_Right: KEY_Right,
Qt.Key.Key_Down: KEY_Down,
Qt.Key.Key_F1: KEY_F1,
Qt.Key.Key_F2: KEY_F2,
Qt.Key.Key_F3: KEY_F3,
Qt.Key.Key_F4: KEY_F4,
Qt.Key.Key_F5: KEY_F5,
Qt.Key.Key_F6: KEY_F6,
Qt.Key.Key_F7: KEY_F7,
Qt.Key.Key_F8: KEY_F8,
Qt.Key.Key_F9: KEY_F9,
Qt.Key.Key_F10: KEY_F10,
Qt.Key.Key_F11: KEY_F11,
Qt.Key.Key_F12: KEY_F12,
Qt.Key.Key_F13: KEY_F13,
Qt.Key.Key_F14: KEY_F14,
Qt.Key.Key_F15: KEY_F15,
Qt.Key.Key_F16: KEY_F16,
Qt.Key.Key_F17: KEY_F17,
Qt.Key.Key_F18: KEY_F18,
Qt.Key.Key_F19: KEY_F19,
Qt.Key.Key_F20: KEY_F20,
Qt.Key.Key_Shift: KEY_ShiftLeft,
Qt.Key.Key_Control: KEY_ControlLeft,
Qt.Key.Key_Meta: KEY_MetaLeft,
Qt.Key.Key_Alt: KEY_AltLeft,
Qt.Key.Key_ScrollLock: KEY_Scroll_Lock,
Qt.Key.Key_SysReq: KEY_Sys_Req,
Qt.Key.Key_NumLock: KEY_Num_Lock,
Qt.Key.Key_CapsLock: KEY_Caps_Lock,
Qt.Key.Key_Pause: KEY_Pause,
Qt.Key.Key_Super_L: KEY_Super_L,
Qt.Key.Key_Super_R: KEY_Super_R,
Qt.Key.Key_Hyper_L: KEY_Hyper_L,
Qt.Key.Key_Hyper_R: KEY_Hyper_R,
Qt.Key.Key_Enter: KEY_KP_Enter,
}
# Authentication protocol types
AUTH_FAIL = 0
AUTH_NONE = 1
AUTH_VNCAUTH = 2
# Authentication result types
SMSG_AUTH_OK = 0
SMSG_AUTH_FAIL = 1
SMSG_AUTH_TOOMANY = 2
# Server message types
SMSG_FBUPDATE = 0
SMSG_SETCOLORMAP = 1
SMSG_BELL = 2
SMSG_SERVERCUTTEXT = 3
# Client message types
CMSG_SETPIXELFORMAT = 0
CMSG_SETENCODINGS = 2
CMSG_FBUPDATEREQ = 3
CMSG_KEYEVENT = 4
CMSG_POINTEREVENT = 5
CMSG_CLIENTCUTTEXT = 6

View File

@ -0,0 +1,19 @@
import pyDes
class RFBDes(pyDes.des):
def setKey(self, key):
"""
RFB protocol for authentication requires client to encrypt
challenge sent by server with password using DES method. However,
bits in each byte of the password are put in reverse order before
using it as encryption key.
"""
newkey = list()
for bsrc in key:
btgt = 0
for i in range(8):
if bsrc & (1 << i):
btgt = btgt | (1 << 7-i)
newkey.append(btgt)
super(RFBDes, self).setKey(newkey)

View File

@ -0,0 +1,106 @@
import logging
import qvncwidget.rfbconstants as c
from PyQt5.QtGui import QMouseEvent
from PyQt5.QtCore import Qt
class RFBPixelformat:
def __init__(self,
bpp=32, depth=24, bigendian=False, truecolor=True,
redmax=255, greenmax=255, bluemax=255,
redshift=0, greenshift=0, blueshift=16):
self.bitspp = bpp
self.depth = depth
self.bigendian = 1 if bigendian else 0
self.truecolor = 1 if truecolor else 0
self.redmax = redmax
self.greenmax = greenmax
self.bluemax = bluemax
self.redshift = redshift
self.greenshift = greenshift
self.blueshift = blueshift
@staticmethod
def getRGB32():
return RFBPixelformat(
bpp=32, depth=32,
redshift=16, greenshift=8, blueshift=0
)
@staticmethod
def getRGB16():
return RFBPixelformat(
bpp=16, depth=16,
redmax=31, greenmax=63, bluemax=31,
redshift=11, greenshift=5, blueshift=0
)
@staticmethod
def getRGB555():
return RFBPixelformat(
bpp=16, depth=15,
redmax=31, greenmax=31, bluemax=31,
redshift=10, greenshift=5, blueshift=0
)
def asTuple(self) -> tuple:
return (
self.bitspp, self.depth, self.bigendian, self.truecolor,
self.redmax, self.greenmax, self.bluemax,
self.redshift, self.greenshift, self.blueshift
)
def __str__(self) -> str:
return ";".join(str(x) for x in self.asTuple())
class RFBRectangle:
def __init__(self, xPos: int, yPos: int, width: int, height: int):
self.xPos = xPos
self.yPos = yPos
self.width = width
self.height = height
def asTuple(self) -> tuple:
return (self.xPos, self.yPos, self.width, self.height)
def __str__(self) -> str:
return f"x: {self.xPos} y: {self.yPos} width: {self.width} height: {self.height}"
class RFBInput:
# thanks to ken3 (https://github.com/ken3) for this
MOUSE_MAPPING = {
Qt.LeftButton: 1 << 0,
Qt.MidButton: 1 << 1,
Qt.RightButton: 1 << 2,
}
@staticmethod
def fromQKeyEvent(eventID: int, eventStr: str) -> int:
rfbKey = c.KEY_TRANSLATION_SPECIAL.get(eventID)
if not rfbKey:
try:
rfbKey = ord(eventStr)
except TypeError:
logging.warning(f"Unknown keytype: {eventID} | {eventStr}")
return 0
return rfbKey
@staticmethod
def fromQMouseEvent(eventID: QMouseEvent, pressEvent: bool, mask) -> int:
_mask = RFBInput.MOUSE_MAPPING.get(eventID.button())
# FIXME: return previous bitmask in case unknown key is pressed
# TODO: implement all RFB supported buttons
if not _mask: return mask
if pressEvent:
return mask | _mask
else:
return mask & ~_mask