Skip to content

Instantly share code, notes, and snippets.

@jonbinney
Created November 8, 2015 09:25
Show Gist options
  • Save jonbinney/0a09e57db3676e0bd29f to your computer and use it in GitHub Desktop.
Save jonbinney/0a09e57db3676e0bd29f to your computer and use it in GitHub Desktop.
Test for concurrency problems in catkin tools
#!/usr/bin/env python
import os
import os.path
import sys
import subprocess
import random
def make_cmakelists(pkg_name):
return '\n'.join([
'cmake_minimum_required(VERSION 2.8.3)',
'project(%s)' % (pkg_name,),
'execute_process(COMMAND python -c "import time; import math; t=time.time(); time.sleep(math.ceil(t) - t)")',
'find_package(catkin REQUIRED)',
'catkin_package()'])
def make_pkg_xml(pkg_name):
return '\n'.join([
'<?xml version="1.0"?>',
'<package>',
'<name>%s</name>' % (pkg_name,),
'<version>0.0.0</version>',
'<description>The package</description>',
'<maintainer email="jbinney@todo.todo">jbinney</maintainer>',
'<license>TODO</license>',
'<buildtool_depend>catkin</buildtool_depend>',
'</package>'])
def make_pkg(pkg_name, parent_dir):
pkg_path = os.path.join(parent_dir, pkg_name)
os.mkdir(pkg_path)
pkg_xml_file = open(os.path.join(pkg_path, 'package.xml'), 'w+')
pkg_xml_file.write(make_pkg_xml(pkg_name))
pkg_xml_file.close()
cmakelists_file = open(os.path.join(pkg_path, 'CMakeLists.txt'), 'w+')
cmakelists_file.write(make_cmakelists(pkg_name))
cmakelists_file.close()
def get_packages_in_path(devel_dir):
''' Read the ROS package path for a devel directory '''
setup_path = os.path.join(devel_dir, 'setup.sh')
rpp = subprocess.check_output(
['bash', '-c', '. %s; echo $ROS_PACKAGE_PATH' % setup_path]).strip()
rpp_paths = rpp.split(':')
pkgs = []
for p in rpp_paths:
parent_dir, dirname = os.path.split(p)
# Ignore stuff in the base ROS install
if not parent_dir.startswith('/opt'):
pkgs.append(dirname)
return rpp, pkgs
src_dir = 'src'
devel_dir = 'devel'
build_dir = 'build'
num_pkgs = 20
num_trials = 100
expected_pkgs = ['pkg%d' % x for x in range(num_pkgs)]
# Create a new workspace:
try:
os.mkdir(src_dir)
except:
print 'Remove src directory and run again'
sys.exit(1)
for pkg in expected_pkgs:
make_pkg(pkg, src_dir)
num_failures = 0
for trial_i in range(num_trials):
# Get rid of build and devel spaces
subprocess.call(['catkin', 'clean', '-a'], stdout=open(os.devnull, 'w'))
# Build the workspace using catkin tools
subprocess.call(['catkin', 'build'], stdout=open(os.devnull, 'w'))
# Check the ROS_PACKAGE_PATH for the devel space
rpp, pkgs = get_packages_in_path(devel_dir)
# Check whether all packages were found in the path
missing_pkgs = set.difference(set(expected_pkgs), set(pkgs))
extra_pkgs = set.difference(set(pkgs), set(expected_pkgs))
if len(missing_pkgs) > 0 or len(extra_pkgs) > 0:
print 'Trial %d was bad:' % trial_i
print ' Missing packages:', missing_pkgs
print ' Extra packages:', extra_pkgs
print ' ROS_PACKAGE_PATH:', rpp
catkin_file = open(os.path.join(devel_dir, '.catkin'))
print ' devel/.catkin contents:', catkin_file.read().strip()
catkin_file.close()
print ''
num_failures += 1
else:
print 'Trial %d/%d was good' % (trial_i, num_trials)
print 'Found bad ROS_PACKAGE_PATH %d/%d times' % (num_failures, num_trials)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment