Created
July 5, 2024 07:14
-
-
Save auscompgeek/5bac481a7bfece2fb522d0b3aa588ee0 to your computer and use it in GitHub Desktop.
read photonvision multi-tag result reprojection errors
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import collections.abc | |
import datetime | |
import sys | |
import photonlibpy.packet | |
import photonlibpy.photonPipelineResult | |
import wpiutil.log | |
def main() -> None: | |
log = wpiutil.log.DataLogReader(sys.argv[1]) | |
cams = { | |
"NT:/photonvision/ardu_cam_port/rawBytes": "port", | |
"NT:/photonvision/ardu_cam_starboard/rawBytes": "starboard" | |
} | |
for timestamp, entry, value in read_log(log): | |
if entry.name not in cams: | |
continue | |
assert isinstance(value, bytes) | |
if not value: | |
continue | |
packet = photonlibpy.packet.Packet(value) | |
result = photonlibpy.photonPipelineResult.PhotonPipelineResult() | |
result.populateFromPacket(packet) | |
p = result.multiTagResult.estimatedPose | |
if p.isPresent: | |
print(p.bestReprojError, cams[entry.name], sep="\t") | |
def read_log(log: wpiutil.log.DataLogReader) -> collections.abc.Iterable[tuple[float, wpiutil.log.DataLogEntry, object]]: | |
entries = {} | |
for record in log: | |
timestamp = record.getTimestamp() / 1000000 | |
if record.isStart(): | |
try: | |
data = record.getStartData() | |
# print(f"{data} [{timestamp}]") | |
if data.entry in entries: | |
print("...DUPLICATE entry ID, overriding", file=sys.stderr) | |
entries[data.entry] = data | |
except TypeError as e: | |
print("Start(INVALID)", e, file=sys.stderr) | |
elif record.isFinish(): | |
try: | |
entry = record.getFinishEntry() | |
print(f"Finish({entry}) [{timestamp}]", file=sys.stderr) | |
if entry not in entries: | |
print("...ID not found", file=sys.stderr) | |
else: | |
del entries[entry] | |
except TypeError as e: | |
print("Finish(INVALID)", e, file=sys.stderr) | |
elif record.isSetMetadata(): | |
try: | |
data = record.getSetMetadataData() | |
print(f"{data} [{timestamp}]") | |
if data.entry not in entries: | |
print("...ID not found") | |
except TypeError as e: | |
print("SetMetadata(INVALID)", e, file=sys.stderr) | |
elif record.isControl(): | |
print("Unrecognized control record", file=sys.stderr) | |
else: | |
# print(f"Data({record.getEntry()}, size={record.getSize()}) ", end="") | |
entry = entries.get(record.getEntry(), None) | |
if entry is None: | |
print("<ID not found>", file=sys.stderr) | |
continue | |
# print(f"<name='{entry.name}', type='{entry.type}'> [{timestamp}]") | |
try: | |
# handle systemTime specially | |
if entry.name == "systemTime" and entry.type == "int64": | |
dt = datetime.datetime.fromtimestamp(record.getInteger() / 1000000) | |
yield timestamp, entry, dt | |
continue | |
value: object | |
if entry.type == "double": | |
value = record.getDouble() | |
elif entry.type == "float": | |
value = record.getFloat() | |
elif entry.type == "int64": | |
value = record.getInteger() | |
elif entry.type == "string": | |
value = record.getString() | |
elif entry.type == "json": | |
value = record.getString() | |
elif entry.type == "boolean": | |
value = record.getBoolean() | |
elif entry.type == "boolean[]": | |
value = record.getBooleanArray() | |
elif entry.type == "double[]": | |
value = record.getDoubleArray() | |
elif entry.type == "float[]": | |
value = record.getFloatArray() | |
elif entry.type == "int64[]": | |
value = record.getIntegerArray() | |
elif entry.type == "string[]": | |
value = record.getStringArray() | |
elif entry.type == "rawBytes": | |
value = record.getRaw() | |
else: | |
print("unhandled", entry.type, file=sys.stderr) | |
continue | |
yield timestamp, entry, value | |
except TypeError as e: | |
print(" invalid", e, file=sys.stderr) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment