pts.py 16.3 KB
Newer Older
1
#!   /usr/bin/env   python
2 3
#    coding: utf8

4 5 6 7
# Copyright CERN, 2011
# Author: Juan David Gonzalez Cobas <dcobas@cern.ch>
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
8

9 10
import sys
import cmd
11
import glob
12
import re
13
import os, os.path
14
import stat
15
import datetime
16
import random
17
import warnings
18
import zipfile
19
import string
20

21
from ConfigParser import ConfigParser, NoOptionError
22
from optparse import OptionParser
23
from sha import sha as sha160
24
from ptsexcept import *
25

26 27 28
default_config_file  = 'ptsdefault.cfg'
default_log_pattern  = 'pts_tst_{runid}_{timestamp}_{board}_{serial}_{number}.txt'
default_log_name     = 'pts_run_{runid}_{timestamp}_{board}_{serial}.txt'
29
default_zip_name     = 'zip_run_{runid}_{timestamp}_{board}_{serial}.zip'
30
default_test_pattern = r'test[0-9][0-9]'
31
default_test_syntax  = r'(test)?(\d\d)'
32

33 34
original_raw_input = raw_input

35
def pts_raw_input(msg, default='y'):
36 37 38 39 40 41
    try:
        ret = original_raw_input(msg)
    except EOFError:
        return default
    return ret

42 43 44 45 46
def make_zip(zipname, ziplist):
    with zipfile.ZipFile(zipname, 'w') as z:
        for f in ziplist:
            z.write(f)

47
def run_test(testname, logname, test_path, yes=False):
48
    """run test testname with output redirected to logname
49 50

    If yes is true, assume affirmative answers from the user
51 52 53
    """
    try:
        tmpout = sys.stdout
54
        sys.stdout = open(logname, 'w')
55 56 57
        if yes:
            tmpin = sys.stdin
            sys.stdin = open('/dev/null')
58
            __builtins__.raw_input = pts_raw_input
59
        mod = __import__(testname, globals(), locals(), [])
60
        mod.main(default_directory=test_path)
61
    finally:
62
        sys.stdout.close()
63
        sys.stdout = tmpout
64
        if yes:
65 66
            sys.stdin = tmpin
            raw_input = original_raw_input
67

68 69
class Suite(object):
    def __init__(self, cfgfilename=default_config_file):
70

71
        self.required     =  [ 'board', 'serial', 'extra_serial', 'test_path',
72
                                'log_path', 'sequence' ]
73 74
        for fieldname in self.required:
            self.__setattr__(fieldname, None)
75 76 77
        self.config       =  default_config_file
        self.log_pattern  =  default_log_pattern
        self.log_name     =  default_log_name
78
        self.zip_name     =  default_zip_name
79
        #self.read_config(self.config)
80 81 82 83 84 85 86

    def missing(self):
        """report missing fields before suite run"""

        missing = [ fieldname for fieldname in self.required
                if self.__getattribute__(fieldname) is None ]
        return missing
87

88 89 90
    def read_config(self, name=None):
        if name:
            self.config = name
91 92 93 94 95
        try:
            cfg = file(self.config).read()
        except IOError:
            errmsg = 'could not read configuration file {0}'
            errmsg = errmsg.format(self.config)
96
            raise PtsCritical(errmsg)
97 98
        config = ConfigParser(cfg)

99 100 101
        try:
            self.board        =  config.get('global', 'board')
            self.serial       =  config.get('global', 'serial')
102
            self.extra_serial =  config.get('global', 'extra_serial')
103 104 105 106 107 108 109
            self.test_path    =  config.get('global', 'test_path')
            self.log_path     =  config.get('global', 'log_path')
            self.sequence     =  config.get('global', 'sequence')
            self.repeat       =  config.get('global', 'repeat')
            self.randomize    =  config.get('global', 'randomize')
        except NoOptionError:
            pass
110

111
    def save(self):
112 113 114 115 116
        config = ConfigParser()

        config.add_section('global')
        config.set('global', 'board', self.board)
        config.set('global', 'serial', self.serial)
117
        config.set('global', 'extra_serial', self.extra_serial)
118 119
        config.set('global', 'test_path', self.test_path)
        config.set('global', 'log_path', self.log_path)
120 121 122
        config.set('global', 'sequence', self.sequence)
        config.set('global', 'repeat', self.repeat)
        config.set('global', 'randomize', self.randomize)
123 124

        # Writing our configuration file
125 126 127
        configfile = open(self.config, 'wb')
        config.write(configfile)
        configfile.close()
128

129 130 131 132 133
    def validate_and_compute_run(self):
        """validate run paramenters"""

        if not self.board:
            msg = 'invalid board name [{0}]'.format(self.board)
134
            raise PtsInvalid(msg)
135 136
        if not self.serial:
            msg = 'invalid serial number [{0}]'.format(self.serial)
137
            raise PtsInvalid(msg)
138
	# self.serial = self.serial.strip(',')
139
        if not self.extra_serial:
140 141 142
		self.extra_serial = '0000'
        else :
	    self.extra_serial = self.extra_serial.strip(',')
143

144
        warnings.simplefilter('error')
145 146 147 148 149 150 151
        try:
            tmp = os.tempnam(self.test_path)
            open(tmp, 'w')
            os.unlink(tmp)
        except RuntimeWarning:
            pass
        except IOError:
152
            msg = 'invalid test path [{0}]'.format(self.test_path)
153
            raise PtsInvalid(msg)
154

155 156 157 158 159 160 161 162
        try:
            tmp = os.tempnam(self.log_path)
            open(tmp, 'w')
            os.unlink(tmp)
        except RuntimeWarning:
            pass
        except:
            msg = 'invalid log path [{0}]'.format(self.log_path)
163
            raise PtsInvalid(msg)
164

165
        if not self.repeat:
166
            self.repeat = 1
167
        else:
168 169 170 171
            try:
                self.repeat = int(self.repeat)
            except ValueError:
                msg = 'invalid repeat factor [{0}]'.format(self.repeat)
172
                raise PtsInvalid(msg)
173

174
        if not self.sequence:
175
            raise PtsNoBatch('null test sequence')
176 177 178 179 180 181
        run = []
        for testno in self.sequence:
            test_glob = os.path.join(self.test_path, 'test' + testno + '.py')
            files = glob.glob(test_glob)
            if not files:
                print files, test_glob
182
                raise PtsBadTestNo('no test number [%s], aborting' % testno)
183 184 185 186 187
            run.append(files[0])

        if self.randomize:
            random.shuffle(run)

188
        self.run_ = self.repeat * run
189

190
        return self.run_
191

192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207
    def search_prev_logs(self) :
        """Search for previous logs and ask the operator why repeat the test"""
        
	for filename in os.listdir(self.log_path):

	    if string.find(filename, "run") == -1 :
		continue;

            try:
                serial = re.match(r'^.*_([^_.]+)\.txt$', filename).group(1)
            	if serial == self.serial :
               	    self.comment = raw_input('Previous logs for this board have been recorded.\nWhy do you want to repeat the test? (press ENTER to finish) : \n')
                break;
            except AttributeError:
                pass 

208
    def run(self):
209 210 211
        self.comment = ""
	self.search_prev_logs();

212 213 214
        sequence = self.validate_and_compute_run()
        ts          = timestamp()
        runid       = sha(self.board + ':' + self.serial + ':' + ts)
215 216 217 218
        logfilename = self.log_name.format(board=self.board,
                                serial=self.serial,
                                timestamp=ts,
                                runid=runid)
219
        logfilename = os.path.join(self.log_path, logfilename)
220
        log         = file(logfilename, 'wb')
221 222 223 224 225 226
        zipfilename = self.zip_name.format(board=self.board,
                                serial=self.serial,
                                timestamp=ts,
                                runid=runid)
        zipfilename = os.path.join(self.log_path, zipfilename)
        ziplist     = [ logfilename ]
227

228 229
        if self.test_path not in sys.path:
            sys.path.append(self.test_path)
230 231

        log.write('test run\n'
232 233 234
            '    board           = {0}\n'
            '    serial          = {1}\n'
            '    optional serial = {2}\n'
235 236 237 238
            '    comment         = {3}\n'
            '    timestamp       = {4}\n'
            '    runid           = {5}\n'.format(
                self.board, self.serial, self.extra_serial, self.comment, ts, runid))
239
        failures = []
240
        for test in sequence:
241 242
            try:
                testname = os.path.splitext(os.path.basename(test))[0]
243
                shortname= re.match('test(\d\d)', testname).group(1)
244 245 246 247
                logname  = self.log_pattern.format(board=self.board,
                                serial=self.serial,
                                timestamp=timestamp(),
                                runid = runid,
248 249
                                number=shortname)
                logname  = os.path.join(self.log_path, logname)
250
                ziplist.append(logname)
251 252
                log.write('------------------------\n')
                log.write('running test {0} = {1}\n'.format(shortname, test))
253
                print 'running test ' + shortname
254
                run_test(testname, logname, test_path=self.test_path, yes=self.yes)
255
            except PtsCritical, e:
256 257
                print 'test [%s]: critical error, aborting: [%s]' % (shortname, e)
                log.write('    critical error in test {0}, exception [{1}]\n'.format(shortname, e))
258
                log.write('    cannot continue, aborting test suite')
259
                failures.append((shortname, e, ))
260
                break
261
            except PtsError, e:
262 263
                print 'test [%s]: error, continuing: [%s]' % (shortname, e)
                log.write('    error in test {0}, exception [{1}]\n'.format(shortname, e))
264
                failures.append((shortname, e, ))
265
            except PtsUser, e:
266 267
                print 'test [%s]: user error, user intervention required: [%s]' % (shortname, e)
                log.write('    error in test {0}, exception [{1}]\n'.format(shortname, e))
268
                failures.append((shortname, e, ))
269
                while True:
270 271 272
                    if self.yes:
                        log.write('    user intervention: continue (assuming --yes)\n')
                        continue
273 274 275
                    ans = raw_input('Abort or Continue? (A/C) ')
                    ans = ans.lower()
                    if ans in ('a', 'c'):
276
                        break
277
                if ans == 'a':
278
                    log.write('    user intervention: abort\n')
279 280
                    break
                elif ans == 'c':
281
                    log.write('    user intervention: continue\n')
282
                    continue
283
            except PtsWarning, e:
284 285
                print 'test [%s]: warning: [%s]' % (shortname, e)
                log.write('    warning in test {0}, exception [{1}]\n'.format(shortname, e))
286
                failures.append((shortname, e, ))
287 288 289 290
            except Exception, e:
		print 'test [%s]: unknown exception [%s]' % (shortname, e)
                log.write('    unknown exception in test {0}, exception [{1}]\n'.format(shortname, e))
                failures.append((shortname, e, ))
291 292
            else:
                log.write('    OK\n')
293
                print 'test '+ shortname + '    OK\n'
294 295 296 297 298

        log.write('\n')
        log.write('------------------------\n')
        log.write('Test suite finished.\n')
        if not failures:
299
            msg = 'All tests OK\n'
300
        else:
301
            msg = [ 'FAILED:' ]
302
            for fail in failures:
303 304 305 306
                msg.append(fail[0])
            msg = ' '.join(msg)
        print msg
        log.write(msg)
307
        log.close()
308

309
        make_zip(zipfilename, ziplist)
310
        return failures
311

312 313 314
def get_serial():
    """return serial number of current board to test
    """
315
    return raw_input('board serial number? ').strip()
316

317 318 319 320 321
def get_extra_serial():
    """return serial number of current board to test
    """
    return raw_input('board serial number? ').strip()

322 323 324
def timestamp():
    """timestamp for now
    """
325
    return datetime.datetime.now().strftime('%Y%m%d.%H%M%S.%f')
326 327 328 329 330 331 332 333 334 335 336 337

def sha(blob, len=7):
    """create a sha-160 hash of a binary object

    len is the number of hex digits to take from the hex digest,
    defaulting to 7 just as in git
    """

    hash = sha160(blob)
    ret = hash.hexdigest()
    if len:
        return ret[:len]
338

339 340
class Cli(cmd.Cmd, Suite):
    def __init__(self, cfgfilename=default_config_file):
341
        cmd.Cmd.__init__(self)
342
        Suite.__init__(self, cfgfilename)
343
        self.ruler = ''
344

345
    def do_board(self, arg):
346
        if arg:
347
            self.board = arg
348
        else:
349
            print self.board
350

351
    def do_serial(self, arg):
352
        if arg:
353
            self.serial = arg
354
        else:
355
            print self.serial
356

357 358 359 360 361 362 363
    def do_extra_serial(self, arg):
        if arg:
            self.extra_serial = arg
        else:
            print self.extra_serial


364
    def do_test_path(self, arg):
365
        if arg:
366
            self.test_path = arg
367
        else:
368
            print self.test_path
369

370
    def do_log_path(self, arg):
371
        if arg:
372
            self.log_path = arg
373
        else:
374
            print self.log_path
375

376 377 378 379 380 381
    def do_save(self, arg):
        self.write_config()

    def do_run(self, arg):
        pass
    def do_repeat(self, arg):
382 383 384 385 386 387 388 389
        if arg:
            try:
                self.repeat = int(arg)
            except ValueError:
                print arg, 'is not an integer'
        else:
            print self.repeat

390

391 392 393 394 395
    def do_EOF(self, arg):
        print
        return True

    def do_quit(self, arg):
396
        "exit cli"
397 398
        return True

399 400 401 402 403 404
    def do_show(self, arg):
        "show current configuration of suite"

        params_to_list = (
            'board',
            'serial',
405
            'extra_serial',
406 407 408 409 410 411 412 413 414
            'test_path',
            'log_path',
            'repeat',
            'random', )
        for param in params_to_list:
            if param in self.__dict__:
                print '%-12s' % (param + ':'),
                print self.__getattribute__(param)

415 416 417
    do_q = do_quit
    do_h = cmd.Cmd.do_help

418 419 420 421 422 423 424 425 426 427 428 429 430 431
def normalize_testname(name):

    if name[:4] == 'test':
        return name[4:]
    return name

def validate_args(args):

    valid_args = [ normalize_testname(arg) for arg in args
            if re.match(default_test_syntax, arg) ]
    invalid_args = [ arg for arg in args
            if not re.match(default_test_syntax, arg) ]
    return valid_args, invalid_args

432
def main():
433

434 435
    usage = ( '%prog: [options] test ...\n'
            'run %prog with option -h or --help for more help' )
436
    parser = OptionParser(usage)
437
    parser.add_option("-c", "--config", dest="config",
438
                        default=default_config_file,
439
                        help="config file name")
440
    parser.add_option("-C", "--cli", dest="cli", action="store_true",
441 442 443
                        help="enter command-line interpreter")
    parser.add_option("-b", "--board", dest="board",
                        help="board name (e.g. -b SPEC)", metavar="NAME")
444
    parser.add_option("-s", "--serial", dest="serial",
445
                        help="board serial number", metavar="SERIAL")
446 447
    parser.add_option("-e", "--extra_serial", dest="extra_serial",
                        help="another board serial number [Optional]", metavar="SERIAL")
448 449 450 451
    parser.add_option("-t", "--test-path", dest="test_path",
                        help="path to test files", metavar="PATH")
    parser.add_option("-l", "--log-path", dest="log_path",
                        help="path to log files", metavar="PATH")
452 453 454 455
    parser.add_option("-n", "--ntimes", dest="repeat",
                        help="number of times to repeat the batch of tests",
                        metavar="NUMBER")
    parser.add_option("-r", "--randomize", action="store_true",
456
                        default=False,
457
                        help="run the batch in random order", )
458 459
    parser.add_option("-w", "--write-config", action="store_true",
                        help="write configuration data to config file", )
460 461
    parser.add_option("-y", "--yes", action="store_true",
                        help="assume all user interventions are affirmative", )
462 463

    (options, args) = parser.parse_args()
464

Juan David González Cobas's avatar
Juan David González Cobas committed
465
    # validate arguments and set up Suite object
466 467 468
    if not args:
        parser.print_usage()
        return
469 470
    valid, invalid = validate_args(args)
    if invalid:
471
        print 'invalid test names, aborting:',
472
        for i in invalid: print i,
473 474
        print
        return
475

476
    s = Cli(options.config)
477
    s.__dict__.update(options.__dict__)
478
    s.sequence = valid
479 480
    try:
        s.validate_and_compute_run()
481
    except PtsInvalid, e:
482
        print 'bad parameters:', e
483 484 485 486
        return

    # decide what to do
    if options.write_config:
487
        s.save()
488
    elif options.cli:
489 490
        s.cmdloop()
    else:
491 492
        ret=s.run()
        return len(ret)
493 494

if __name__ == '__main__':
495
    sys.exit(main())