Created
January 12, 2019 06:15
-
-
Save AshMartian/eb46959adfa6899976e7d5d3ce455213 to your computer and use it in GitHub Desktop.
Vector Sees
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import argparse | |
import matplotlib.pyplot as plt | |
from mxnet import nd, image, img | |
from mxnet.gluon.data.vision import transforms | |
from gluoncv.model_zoo import get_model | |
from gluoncv.data.transforms.presets.imagenet import transform_eval | |
# Load Model | |
class Prediction(object): | |
def __init__(self, human, score): | |
self.description = human | |
self.score = score | |
def detect_image(image, model="resnet152_v2"): | |
net = get_model(model, pretrained=True) | |
# Load Images | |
decoded_img = img.imdecode(image) | |
# Transform | |
decoded_img = transform_eval(decoded_img) | |
pred = net(decoded_img) | |
topK = 5 | |
ind = nd.topk(pred, k=topK)[0].astype('int') | |
results = [] | |
for i in range(topK): | |
print('\t[%s], with probability %.3f.'% | |
(net.classes[ind[i].asscalar()], nd.softmax(pred)[0][ind[i]].asscalar())) | |
human_string = net.classes[ind[i].asscalar()] | |
score = nd.softmax(pred)[0][ind[i]].asscalar() | |
prediction = Prediction(human_string, score) | |
results.append(prediction) | |
return results |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import anki_vector | |
from anki_vector.events import Events | |
#from google.cloud import vision | |
#from google.cloud.vision import types | |
import time | |
import datetime | |
import io | |
import random | |
import functools | |
import threading | |
import asyncio | |
import mx_classify | |
import sys | |
evt = threading.Event() | |
#client = vision.ImageAnnotatorClient() | |
saw_face = False | |
saw_face_time = time.time() | |
runtime = 0 | |
max_runtime = 360 | |
images_processed = 0 | |
last_image = 0 | |
async def main(loop): | |
async def auto_reconnect(conn: anki_vector.connection.Connection): | |
await conn.control_lost_event.wait() | |
def on_robot_status_update(robot, event_type, event): | |
#print("Got status update") | |
battery_state = robot.get_battery_state() | |
if battery_state.battery_level == 3 and battery_state.is_on_charger_platform: | |
print("{0}: Requesting control to explore".format(datetime.datetime.now())) | |
robot.conn.request_control() | |
robot.behavior.drive_off_charger() | |
robot.conn.release_control() | |
time.sleep(5); | |
else: | |
if saw_face and last_image < time.time() - 6: | |
image_recognition(robot) | |
elif last_image < time.time() - 45: | |
image_recognition(robot, probability=0.4) | |
def on_robot_observed_object(robot, event_type, event): | |
global saw_face, last_image | |
if saw_face and last_image < time.time() - 2: | |
image_recognition(robot) | |
def on_robot_observed_face(robot, event_type, event): | |
for face in robot.world.visible_faces: | |
print(f"Face ID: {face.face_id}") | |
if not face.face_id == -1: | |
battery_state = robot.get_battery_state() | |
if not battery_state.is_on_charger_platform: | |
global saw_face, saw_face_time | |
if not saw_face: | |
saw_face = True | |
saw_face_time = time.time() | |
#await robot.conn.control_lost_event.wait() | |
robot.conn.request_control(timeout=5.0) | |
robot.say_text("I am now going to start saying what I see for a few minutes.") | |
robot.conn.release_control() | |
time.sleep(5); | |
break | |
def on_robot_wake_word(): | |
global saw_face | |
saw_face = False | |
async def setup_vector(): | |
with anki_vector.Robot(requires_behavior_control=False, enable_camera_feed=True, enable_face_detection=True) as robot: | |
try: | |
on_robot_observed_face_event = functools.partial(on_robot_observed_face, robot) | |
on_robot_observed_object_event = functools.partial(on_robot_observed_object, robot) | |
on_robot_status_update_event = functools.partial(on_robot_status_update, robot) | |
robot.events.subscribe(on_robot_observed_face_event, Events.robot_observed_face) | |
robot.events.subscribe(on_robot_status_update_event, Events.robot_state) | |
robot.events.subscribe(on_robot_observed_object_event, Events.robot_observed_object) | |
robot.events.subscribe(on_robot_observed_object_event, Events.object_appeared) | |
robot.events.subscribe(on_robot_observed_object_event, Events.object_stopped_moving) | |
robot.events.subscribe(on_robot_wake_word, Events.wake_word) | |
global saw_face, images_processed, saw_face_time | |
while True: | |
time.sleep(5) | |
if saw_face and saw_face_time < time.time() - max_runtime or images_processed > 60: | |
saw_face = False | |
images_processed = 0 | |
print("Not saying things anymore") | |
except anki_vector.exceptions.VectorNotFoundException: | |
print("Unable to connect to Vector, re-trying in 20 seconds") | |
time.sleep(20) | |
await setup_vector() | |
except KeyboardInterrupt: | |
pass | |
robot.events.unsubscribe(on_robot_observed_face_event, Events.robot_observed_face) | |
robot.events.unsubscribe(on_robot_observed_object_event, Events.robot_observec_object) | |
robot.events.unsubscribe(on_robot_observed_object_event, Events.object_appeared) | |
robot.events.unsubscribe(on_robot_status_update_event, Events.robot_state) | |
def image_recognition(robot, probability=0.15): | |
try: | |
global images_processed, last_image | |
battery_state = robot.get_battery_state() | |
if battery_state: | |
if not battery_state.is_on_charger_platform and not robot.status.is_carrying_block and not robot.status.is_charging and not robot.status.is_animating and not robot.status.is_pathing: | |
print("Getting image from vector") | |
robot.camera.init_camera_feed() | |
image = robot.camera.latest_image | |
#robot.camera.close_camera_feed() | |
output = io.BytesIO() | |
image.save(output, format='JPEG') | |
#image_data = types.Image(content=output.getvalue()) | |
#get_text(robot, image_data) | |
get_label(robot, output, probability=probability) | |
last_image = time.time() | |
images_processed = images_processed + 1 | |
#robot.conn.close() | |
#print("{1}: Vector battery voltage: {0}".format(battery_state.battery_volts, datetime.datetime.now())) | |
except AttributeError as e: | |
print(e) | |
except Exception as e: | |
print("Unexpected error:", sys.exc_info()[0]) | |
print(e) | |
print("Could not get image") | |
models = [ "resnet152_v2", "vgg19_bn", "alexnet", "densenet201", "squeezenet1.1" ] | |
def get_label(robot, image_data, attempt=1, probability=0.15): | |
if attempt > 7: | |
image_recognition(robot) | |
print("Getting labels with " + models[attempt]) | |
labels = mx_classify.detect_image(image_data.getvalue(), models[attempt]) | |
#response = client.label_detection(image=image_data) | |
#labels = response.label_annotations | |
possibilities = [] | |
for label in labels: | |
if label.score > probability and "auto" not in label.description and "product" not in label.description and "technology" not in label.description: | |
possibilities.append(label) | |
if len(possibilities) > 0: | |
take_control_and_say(robot, possibilities) | |
else: | |
get_label(robot, image_data, attempt + 1, probability) | |
def get_text(robot, image_data): | |
if random.randint(0,3) == 2: | |
get_label(robot, image_data) | |
print("Getting text") | |
response = client.text_detection(image=image_data) | |
texts = response.text_annotations | |
possibilities = [] | |
for text in texts: | |
if text.description and text.locale == "en": | |
possibilities.append(text) | |
take_control_and_say(robot, [text]) | |
break | |
if len(possibilities) == 0: | |
get_label(robot, image_data) | |
def take_control_and_say(robot, text): | |
print("------ Vector saying %s --------" % text) | |
if len(text) == 2: | |
text_say = str(text[0].description) + " and " + str(text[1].description) | |
else: | |
text_say = random.choice(text).description | |
#await conn.control_lost_event.wait() | |
robot.conn.request_control(timeout=5.0) | |
robot.say_text(text_say) | |
robot.conn.release_control() | |
time.sleep(5); | |
await setup_vector() | |
if __name__ == "__main__": | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(main(loop)) | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment