Skip to content
Snippets Groups Projects
Select Git revision
  • a080d8317fb895987ed498267517f92b5ae1ef73
  • master default protected
2 results

README.md

Blame
  • ilp_test.py 7.30 KiB
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # This program and the accompanying materials are made available under the
    # terms of the MIT license (X11 license) which accompanies this distribution.
    
    # Author: R. Schoene
    
    import re, threading, os, shutil, sys, timeit
    try:
    	from fabric.api import task
    except ImportError:
    	from fabric_workaround import task
    from properties import profiling
    import utils
    from utils import local_quiet, assertTrue, assertTrueAssertion, call_racket, call_larceny
    
    NUM_PROCESSORS = 4
    
    utils.assertTrue = assertTrueAssertion
    
    @task(name = 'racket')
    def run_racket(*given_ranges):
    	""" Run ILP tests using Racket """
    	do_run(call_racket, given_ranges)
    
    @task(name = 'larceny')
    def run_larceny(*given_ranges):
    	""" Run ILP tests using larceny """
    	do_run(call_larceny, given_ranges)
    
    def do_run(call_impl, given_ranges):
    	if not os.path.exists('test'):
    		os.mkdir('test')
    	supported_ranges = get_ranges(call_impl)
    	ranges = parse_ranges(supported_ranges, given_ranges)
    	print "Ranges:", ranges
    	if not ranges:
    		print 'No test matches {0}. Aborting.'.format(list(given_ranges))
    		sys.exit(1)
    
    	# disable profiling as it disturbs ilp output
    	profiling.value = False
    	profiling.write_value()
    
    	test_ids = []
    	for lb,ub in ranges:
    		test_ids.extend(range(lb,ub+1))
    	progress = TestProgress(test_ids)
    	number_tests = len(test_ids)
    	threads = []
    	start = timeit.default_timer()
    	for i in xrange(NUM_PROCESSORS):
    		t = threading.Thread(target = loop, args = (progress,i,call_impl))
    		threads.append(t)
    		t.start()
    	for t in threads:
    		t.join()
    	stop = timeit.default_timer()
    	print '\n'
    	if progress.has_failures():
    		print '=' * 30
    		print 'FAILURE\n{0}'.format(progress.failure_msg)
    	else:
    		print 'OK'
    	print '\nRan {0} tests in {1:.3f}s'.format(number_tests - len(test_ids), stop - start)
    
    def get_ranges(call_impl):
    	intervals = call_impl('cli.scm', 'test', 'ranges')
    	if intervals.failed or len(intervals) == 0:
    		print "Could not get intervals"
    		print intervals.stdout
    		print intervals.stderr
    		sys.exit(1)
    	result = []
    	lb = False
    	for token in intervals.split():
    		if not lb:
    			lb = int(token)
    		else:
    			result.append( (lb,int(token)) )
    			lb = False
    #	print "Ranges:", result
    	return result
    
    def intersect(a0, a1, b0, b1):
    	"""
    |	  Case 1		Case 2		Case 3
    |	  ======		======		======
    |	a0   a1		  a0   a1	a0   a1
    |	 -----		   -----	 -----
    |	   -----	 -----				 -----
    |	  b0   b1	b0   b1				b0   b1
    |	    ↓			↓			↓
    |	   ---		   ---		  False
    |	  b0 a1		  a0 b1
    	"""
    	if a0 <= b0:
    		if a1 < b0:
    			return False
    		return (b0, a1 if a1<b1 else b1)
    	else:
    		return intersect(b0,b1,a0,a1)
    
    def _notFalse(item):
    	return item is not False
    
    def _flatten(l):
    	return [item for sublist in l for item in sublist]
    	
    def _merge_ranges(supported_ranges, given_ranges):
    	# filter for empty intervals
    	return filter(_notFalse,
    		_flatten(map(lambda (s_min,s_max):
    			map(lambda (g_min, g_max): intersect(s_min, s_max, g_min, g_max), given_ranges),
    		supported_ranges)))
    
    def parse_ranges(supported_ranges, ranges):
    	if ranges == ():
    		return supported_ranges
    	if ranges[0].endswith("+"):
    		ranges = (ranges[0][:-1], sys.maxint)
    	ranges = map(int, ranges)
    	if len(ranges) == 1:
    		return _merge_ranges(supported_ranges, [(ranges[0], ranges[0])])
    	else:
    		return _merge_ranges(supported_ranges, [(ranges[i], ranges[i+1]) for i in xrange(0, len(ranges), 2)])
    
    def read_solution(fname):
    	status = 0 # 0=search for column activities, 1=skip line, 2=name, 3=values
    	sol = {}
    	name = ''
    	obj_matcher = re.compile(r'obj = ([^\s]*)')
    	combined_matcher = re.compile(r'(\d+) ([^\s]+)\s*\*?\s*(\d*\.?\d+)\s*(\d*\.?\d+)\s*(\d*\.?\d+)') #1=column,2=name,3=activity,4=lb,5=ub
    	name_matcher  = re.compile(r'(\d+) ([^\s]+)') #1=column,2=name
    	value_matcher = re.compile(r'\*?\s*(\d*\.?\d+)\s*(\d*\.?\d+)\s*(\d*\.?\d+)') #1=activity,2=lb,3=ub
    	with open(fname) as fd:
    		for line in fd:
    			line = line.strip()
    			#print "line = >{0}<".format(line)
    			if status == 0:
    				m = obj_matcher.search(line)
    				if m:
    					obj = float(m.group(1))
    				if re.search('Column name', line):
    					#print "found column line"
    					status = 1
    			elif status == 1:
    				status = 2
    			elif status == 2: #name line
    				if len(line) == 0:
    					break
    				m = combined_matcher.match(line)
    				if m:
    					name  = m.group(2)
    					value = m.group(3)
    					sol[name] = float(value)
    					#print "found combined value line for '{0}' = '{1}'".format(name, value)
    					# stay in same status
    				else:
    					name = name_matcher.match(line).group(2)
    					#print "found name line for '{0}'".format(name)
    					status = 3
    			elif status == 3: #value line
    				value = value_matcher.match(line).group(1)
    				sol[name] = float(value)
    				#print "found value line for '{0}' = '{1}'".format(name, value)
    				status = 2
    	return (obj,sol)
    
    def write_solution(sol, fname):
    	with open(fname, 'w') as fd:
    		fd.write('(\n')
    		for key,value in sol.iteritems():
    			fd.write('("{0}" . {1})\n'.format(key, value))
    		fd.write(')\n')
    
    class TestProgress:
    	def __init__(self, test_ids):
    		self.failure_msg = None
    		self.available = True
    		self.C = threading.Condition()
    		self.test_ids = test_ids
    	def next_nr(self):
    		self.C.acquire()
    		while not self.available:
    			self.C.wait()
    		try:
    			nr = None if self.has_failures() else self.test_ids.pop(0)
    		except IndexError:
    			nr = None
    		self.C.release()
    		return nr
    	def failure(self, test_id, msg):
    		self.C.acquire()
    		self.failure_msg = 'Test-Case {0} FAILED\n{1}'.format(test_id, msg)
    		self.C.notifyAll()
    		self.C.release()
    	def has_failures(self):
    		return self.failure_msg is not None
    
    def loop(progress, thread_id, call_impl):
    	working = True
    	while working:
    		nr = progress.next_nr()
    		if not nr:
    			working = False
    		else:
    			try:
    				run_case(nr, thread_id, call_impl)
    			except AssertionError as e:
    				working = False
    				progress.failure(nr, e.message)
    #		progress.notify()
    	
    def _tmp_lp(thread_nr):
    	return "test/tmp_{0}.lp".format(thread_nr)
    
    def _solution_file(test_nr):
    	return "test/{0}.sol".format(test_nr)
    
    def _scheme_solution_file(test_nr):
    	return "test/{0}.scsol".format(test_nr)
    
    def _lp_file(test_nr):
    	return "test/{0}.lp".format(test_nr)
    
    def _out_file(test_nr):
    	return "test/{0}.out".format(test_nr)
    
    def run_case(test_nr, thread_id, call_impl):
    	s = '{0}.'.format(test_nr)
    	sys.stdout.write(s)
    	sys.stdout.flush()
    	## Run scheme impl to generate ILP
    	fname_sol = _solution_file(test_nr)
    	fname_lp_python = _lp_file(test_nr)
    	fname_scheme_sol = _scheme_solution_file(test_nr)
    	fname_lp_scheme = _tmp_lp(thread_id)
    	if os.path.exists(fname_lp_scheme):
    		os.remove(fname_lp_scheme)
    	if os.path.exists(fname_sol):
    		os.remove(fname_sol)
    	out = call_impl('cli.scm', 'test', 'run', test_nr, fname_lp_scheme)
    	with open(_out_file(test_nr), "w") as fd:
    		fd.write(out)
    
    	assertTrue(os.path.exists(fname_lp_scheme), "ILP was not generated\n{0}".format(out))
    	shutil.move(fname_lp_scheme, fname_lp_python)
    	
    	## Solve the ILP with glpsol
    	out = local_quiet('glpsol --lp {0} -o {1}'.format(fname_lp_python, fname_sol))
    	assertTrue(os.path.exists(fname_sol), "No solution file created")
    	obj,sol = read_solution(fname_sol)
    	
    	## Write the solution to fname_sc_sol
    	write_solution(sol, fname_scheme_sol)
    
    	if not re.search('INTEGER OPTIMAL SOLUTION FOUND', out):
    		## No solution found
    		pass
    	
    	## Check solution with the scheme impl
    	call_impl('cli.scm', 'test', 'check', test_nr, obj, fname_scheme_sol)
    
    if __name__ == '__main__':
    	print run_racket(sys.argv[1:])