pts.py 16.9 KB
Newer Older
1
#!   /usr/bin/env   python
2 3
#    coding: utf8

4 5 6 7
# Copyright CERN, 2011
# Author: Juan David Gonzalez Cobas <dcobas@cern.ch>
# Licence: GPL v2 or later.
# Website: http://www.ohwr.org
8

9 10
import sys
import cmd
11
import glob
12
import re
13
import os, os.path
14
import stat
15
import datetime
16
import random
17
import warnings
18
import zipfile
19
import string
20

21
from ConfigParser import ConfigParser, NoOptionError
22
from optparse import OptionParser
23
from sha import sha as sha160
24
from ptsexcept import *
25 26
sys.path.append('sdbfs')
from gen_flash_image import check_mac
27

28 29 30
default_config_file  = 'ptsdefault.cfg'
default_log_pattern  = 'pts_tst_{runid}_{timestamp}_{board}_{serial}_{number}.txt'
default_log_name     = 'pts_run_{runid}_{timestamp}_{board}_{serial}.txt'
31
default_zip_name     = 'zip_run_{runid}_{timestamp}_{board}_{serial}.zip'
32
default_test_pattern = r'test[0-9][0-9]'
33
default_test_syntax  = r'(test)?(\d\d)'
34

35 36
original_raw_input = raw_input

37
def pts_raw_input(msg, default='y'):
38 39 40 41 42 43
    try:
        ret = original_raw_input(msg)
    except EOFError:
        return default
    return ret

44 45 46 47 48
def make_zip(zipname, ziplist):
    with zipfile.ZipFile(zipname, 'w') as z:
        for f in ziplist:
            z.write(f)

49
def run_test(testname, logname, test_path, yes=False):
50
    """run test testname with output redirected to logname
51 52

    If yes is true, assume affirmative answers from the user
53 54 55
    """
    try:
        tmpout = sys.stdout
56
        sys.stdout = open(logname, 'w')
57 58 59
        if yes:
            tmpin = sys.stdin
            sys.stdin = open('/dev/null')
60
            __builtins__.raw_input = pts_raw_input
61
        mod = __import__(testname, globals(), locals(), [])
62
        mod.main(default_directory=test_path)
63
    finally:
64
        sys.stdout.close()
65
        sys.stdout = tmpout
66
        if yes:
67 68
            sys.stdin = tmpin
            raw_input = original_raw_input
69

70 71
class Suite(object):
    def __init__(self, cfgfilename=default_config_file):
72

73 74
        self.required     =  [ 'board', 'serial', 'extra_serial', 'mac_addr',
                                'test_path', 'log_path', 'sequence' ]
75 76
        for fieldname in self.required:
            self.__setattr__(fieldname, None)
77 78 79
        self.config       =  default_config_file
        self.log_pattern  =  default_log_pattern
        self.log_name     =  default_log_name
80
        self.zip_name     =  default_zip_name
81
        #self.read_config(self.config)
82 83 84 85 86 87 88

    def missing(self):
        """report missing fields before suite run"""

        missing = [ fieldname for fieldname in self.required
                if self.__getattribute__(fieldname) is None ]
        return missing
89

90 91 92
    def read_config(self, name=None):
        if name:
            self.config = name
93 94 95 96 97
        try:
            cfg = file(self.config).read()
        except IOError:
            errmsg = 'could not read configuration file {0}'
            errmsg = errmsg.format(self.config)
98
            raise PtsCritical(errmsg)
99 100
        config = ConfigParser(cfg)

101 102 103
        try:
            self.board        =  config.get('global', 'board')
            self.serial       =  config.get('global', 'serial')
104
            self.extra_serial =  config.get('global', 'extra_serial')
105
            self.mac_addr     =  config.get('global', 'mac_addr')
106 107 108 109 110 111 112
            self.test_path    =  config.get('global', 'test_path')
            self.log_path     =  config.get('global', 'log_path')
            self.sequence     =  config.get('global', 'sequence')
            self.repeat       =  config.get('global', 'repeat')
            self.randomize    =  config.get('global', 'randomize')
        except NoOptionError:
            pass
113

114
    def save(self):
115 116 117 118 119
        config = ConfigParser()

        config.add_section('global')
        config.set('global', 'board', self.board)
        config.set('global', 'serial', self.serial)
120
        config.set('global', 'extra_serial', self.extra_serial)
121
        config.set('global', 'mac_addr', self.mac_addr)
122 123
        config.set('global', 'test_path', self.test_path)
        config.set('global', 'log_path', self.log_path)
124 125 126
        config.set('global', 'sequence', self.sequence)
        config.set('global', 'repeat', self.repeat)
        config.set('global', 'randomize', self.randomize)
127 128

        # Writing our configuration file
129 130 131
        configfile = open(self.config, 'wb')
        config.write(configfile)
        configfile.close()
132

133 134 135 136 137
    def validate_and_compute_run(self):
        """validate run paramenters"""

        if not self.board:
            msg = 'invalid board name [{0}]'.format(self.board)
138
            raise PtsInvalid(msg)
139 140
        if not self.serial:
            msg = 'invalid serial number [{0}]'.format(self.serial)
141
            raise PtsInvalid(msg)
142
	# self.serial = self.serial.strip(',')
143
        if not self.extra_serial:
144 145 146
		self.extra_serial = '0000'
        else :
	    self.extra_serial = self.extra_serial.strip(',')
147

148
        warnings.simplefilter('error')
149 150 151 152 153 154 155
        try:
            tmp = os.tempnam(self.test_path)
            open(tmp, 'w')
            os.unlink(tmp)
        except RuntimeWarning:
            pass
        except IOError:
156
            msg = 'invalid test path [{0}]'.format(self.test_path)
157
            raise PtsInvalid(msg)
158

159 160 161 162 163 164 165 166
        try:
            tmp = os.tempnam(self.log_path)
            open(tmp, 'w')
            os.unlink(tmp)
        except RuntimeWarning:
            pass
        except:
            msg = 'invalid log path [{0}]'.format(self.log_path)
167
            raise PtsInvalid(msg)
168

169
        if not self.repeat:
170
            self.repeat = 1
171
        else:
172 173 174 175
            try:
                self.repeat = int(self.repeat)
            except ValueError:
                msg = 'invalid repeat factor [{0}]'.format(self.repeat)
176
                raise PtsInvalid(msg)
177

178
        if not self.sequence:
179
            raise PtsNoBatch('null test sequence')
180 181 182 183 184 185
        run = []
        for testno in self.sequence:
            test_glob = os.path.join(self.test_path, 'test' + testno + '.py')
            files = glob.glob(test_glob)
            if not files:
                print files, test_glob
186
                raise PtsBadTestNo('no test number [%s], aborting' % testno)
187 188 189 190 191
            run.append(files[0])

        if self.randomize:
            random.shuffle(run)

192
        self.run_ = self.repeat * run
193

194
        return self.run_
195

196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
    def search_prev_logs(self) :
        """Search for previous logs and ask the operator why repeat the test"""
        
	for filename in os.listdir(self.log_path):

	    if string.find(filename, "run") == -1 :
		continue;

            try:
                serial = re.match(r'^.*_([^_.]+)\.txt$', filename).group(1)
            	if serial == self.serial :
               	    self.comment = raw_input('Previous logs for this board have been recorded.\nWhy do you want to repeat the test? (press ENTER to finish) : \n')
                break;
            except AttributeError:
                pass 

212
    def run(self):
213 214 215
        self.comment = ""
	self.search_prev_logs();

216 217 218
        sequence = self.validate_and_compute_run()
        ts          = timestamp()
        runid       = sha(self.board + ':' + self.serial + ':' + ts)
219 220 221 222
        logfilename = self.log_name.format(board=self.board,
                                serial=self.serial,
                                timestamp=ts,
                                runid=runid)
223
        logfilename = os.path.join(self.log_path, logfilename)
224
        log         = file(logfilename, 'wb')
225 226 227 228 229 230
        zipfilename = self.zip_name.format(board=self.board,
                                serial=self.serial,
                                timestamp=ts,
                                runid=runid)
        zipfilename = os.path.join(self.log_path, zipfilename)
        ziplist     = [ logfilename ]
231

232 233
        if self.test_path not in sys.path:
            sys.path.append(self.test_path)
234 235

        log.write('test run\n'
236 237 238
            '    board           = {0}\n'
            '    serial          = {1}\n'
            '    optional serial = {2}\n'
239 240 241 242 243
            '    MAC address     = {3}\n'
            '    comment         = {4}\n'
            '    timestamp       = {5}\n'
            '    runid           = {6}\n'.format(
                self.board, self.serial, self.extra_serial, self.mac_addr, self.comment, ts, runid))
244
        failures = []
245
        for test in sequence:
246 247
            try:
                testname = os.path.splitext(os.path.basename(test))[0]
248
                shortname= re.match('test(\d\d)', testname).group(1)
249 250 251 252
                logname  = self.log_pattern.format(board=self.board,
                                serial=self.serial,
                                timestamp=timestamp(),
                                runid = runid,
253 254
                                number=shortname)
                logname  = os.path.join(self.log_path, logname)
255
                ziplist.append(logname)
256 257
                log.write('------------------------\n')
                log.write('running test {0} = {1}\n'.format(shortname, test))
258
                print 'running test ' + shortname
259
                run_test(testname, logname, test_path=self.test_path, yes=self.yes)
260
            except PtsCritical, e:
261 262
                print 'test [%s]: critical error, aborting: [%s]' % (shortname, e)
                log.write('    critical error in test {0}, exception [{1}]\n'.format(shortname, e))
263
                log.write('    cannot continue, aborting test suite')
264
                failures.append((shortname, e, ))
265
                break
266
            except PtsError, e:
267 268
                print 'test [%s]: error, continuing: [%s]' % (shortname, e)
                log.write('    error in test {0}, exception [{1}]\n'.format(shortname, e))
269
                failures.append((shortname, e, ))
270
            except PtsUser, e:
271 272
                print 'test [%s]: user error, user intervention required: [%s]' % (shortname, e)
                log.write('    error in test {0}, exception [{1}]\n'.format(shortname, e))
273
                failures.append((shortname, e, ))
274
                while True:
275 276 277
                    if self.yes:
                        log.write('    user intervention: continue (assuming --yes)\n')
                        continue
278 279 280
                    ans = raw_input('Abort or Continue? (A/C) ')
                    ans = ans.lower()
                    if ans in ('a', 'c'):
281
                        break
282
                if ans == 'a':
283
                    log.write('    user intervention: abort\n')
284 285
                    break
                elif ans == 'c':
286
                    log.write('    user intervention: continue\n')
287
                    continue
288
            except PtsWarning, e:
289 290
                print 'test [%s]: warning: [%s]' % (shortname, e)
                log.write('    warning in test {0}, exception [{1}]\n'.format(shortname, e))
291
                failures.append((shortname, e, ))
292 293 294 295
            except Exception, e:
		print 'test [%s]: unknown exception [%s]' % (shortname, e)
                log.write('    unknown exception in test {0}, exception [{1}]\n'.format(shortname, e))
                failures.append((shortname, e, ))
296 297
            else:
                log.write('    OK\n')
298
                print 'test '+ shortname + '    OK\n'
299 300 301 302 303

        log.write('\n')
        log.write('------------------------\n')
        log.write('Test suite finished.\n')
        if not failures:
304
            msg = 'All tests OK\n'
305
        else:
306
            msg = [ 'FAILED:' ]
307
            for fail in failures:
308 309 310 311
                msg.append(fail[0])
            msg = ' '.join(msg)
        print msg
        log.write(msg)
312
        log.close()
313

314 315
        make_zip(zipfilename, ziplist)

316 317 318
def get_serial():
    """return serial number of current board to test
    """
319
    return raw_input('board serial number? ').strip()
320

321 322 323 324 325
def get_extra_serial():
    """return serial number of current board to test
    """
    return raw_input('board serial number? ').strip()

326 327 328
def timestamp():
    """timestamp for now
    """
329
    return datetime.datetime.now().strftime('%Y%m%d.%H%M%S.%f')
330 331 332 333 334 335 336 337 338 339 340 341

def sha(blob, len=7):
    """create a sha-160 hash of a binary object

    len is the number of hex digits to take from the hex digest,
    defaulting to 7 just as in git
    """

    hash = sha160(blob)
    ret = hash.hexdigest()
    if len:
        return ret[:len]
342

343 344
class Cli(cmd.Cmd, Suite):
    def __init__(self, cfgfilename=default_config_file):
345
        cmd.Cmd.__init__(self)
346
        Suite.__init__(self, cfgfilename)
347
        self.ruler = ''
348

349
    def do_board(self, arg):
350
        if arg:
351
            self.board = arg
352
        else:
353
            print self.board
354

355
    def do_serial(self, arg):
356
        if arg:
357
            self.serial = arg
358
        else:
359
            print self.serial
360

361 362 363 364 365 366 367
    def do_extra_serial(self, arg):
        if arg:
            self.extra_serial = arg
        else:
            print self.extra_serial


368
    def do_test_path(self, arg):
369
        if arg:
370
            self.test_path = arg
371
        else:
372
            print self.test_path
373

374
    def do_log_path(self, arg):
375
        if arg:
376
            self.log_path = arg
377
        else:
378
            print self.log_path
379

380 381 382 383 384 385
    def do_save(self, arg):
        self.write_config()

    def do_run(self, arg):
        pass
    def do_repeat(self, arg):
386 387 388 389 390 391 392 393
        if arg:
            try:
                self.repeat = int(arg)
            except ValueError:
                print arg, 'is not an integer'
        else:
            print self.repeat

394

395 396 397 398 399
    def do_EOF(self, arg):
        print
        return True

    def do_quit(self, arg):
400
        "exit cli"
401 402
        return True

403 404 405 406 407 408
    def do_show(self, arg):
        "show current configuration of suite"

        params_to_list = (
            'board',
            'serial',
409
            'extra_serial',
410
            'mac_addr',
411 412 413 414 415 416 417 418 419
            'test_path',
            'log_path',
            'repeat',
            'random', )
        for param in params_to_list:
            if param in self.__dict__:
                print '%-12s' % (param + ':'),
                print self.__getattribute__(param)

420 421 422
    do_q = do_quit
    do_h = cmd.Cmd.do_help

423 424 425 426 427 428 429 430 431 432 433 434 435 436
def normalize_testname(name):

    if name[:4] == 'test':
        return name[4:]
    return name

def validate_args(args):

    valid_args = [ normalize_testname(arg) for arg in args
            if re.match(default_test_syntax, arg) ]
    invalid_args = [ arg for arg in args
            if not re.match(default_test_syntax, arg) ]
    return valid_args, invalid_args

437 438 439 440 441 442
def store_mac(path, mac):
    filepath = os.path.join(path, 'mac.tmp')
    mac_file = open(filepath, 'w')
    mac_file.write(mac)
    mac_file.close()

443
def main():
444

445 446
    usage = ( '%prog: [options] test ...\n'
            'run %prog with option -h or --help for more help' )
447
    parser = OptionParser(usage)
448
    parser.add_option("-c", "--config", dest="config",
449
                        default=default_config_file,
450
                        help="config file name")
451
    parser.add_option("-C", "--cli", dest="cli", action="store_true",
452 453 454
                        help="enter command-line interpreter")
    parser.add_option("-b", "--board", dest="board",
                        help="board name (e.g. -b SPEC)", metavar="NAME")
455
    parser.add_option("-s", "--serial", dest="serial",
456
                        help="board serial number", metavar="SERIAL")
457 458
    parser.add_option("-e", "--extra_serial", dest="extra_serial",
                        help="another board serial number [Optional]", metavar="SERIAL")
459 460
    parser.add_option("-m", "--mac", dest="mac_addr",
                        help="MAC address of the SFP port")
461 462 463 464
    parser.add_option("-t", "--test-path", dest="test_path",
                        help="path to test files", metavar="PATH")
    parser.add_option("-l", "--log-path", dest="log_path",
                        help="path to log files", metavar="PATH")
465 466 467 468
    parser.add_option("-n", "--ntimes", dest="repeat",
                        help="number of times to repeat the batch of tests",
                        metavar="NUMBER")
    parser.add_option("-r", "--randomize", action="store_true",
469
                        default=False,
470
                        help="run the batch in random order", )
471 472
    parser.add_option("-w", "--write-config", action="store_true",
                        help="write configuration data to config file", )
473 474
    parser.add_option("-y", "--yes", action="store_true",
                        help="assume all user interventions are affirmative", )
475 476

    (options, args) = parser.parse_args()
477

Juan David González Cobas's avatar
Juan David González Cobas committed
478
    # validate arguments and set up Suite object
479 480 481
    if not args:
        parser.print_usage()
        return
482 483
    valid, invalid = validate_args(args)
    if invalid:
484
        print 'invalid test names, aborting:',
485
        for i in invalid: print i,
486 487
        print
        return
488

489
    s = Cli(options.config)
490
    s.__dict__.update(options.__dict__)
491
    s.sequence = valid
492 493
    try:
        s.validate_and_compute_run()
494
    except PtsInvalid, e:
495
        print 'bad parameters:', e
496 497
        return

498 499
    # store MAC address if needed
    if options.mac_addr:
500 501
        if not check_mac(options.mac_addr):
            return
502 503
        store_mac(options.test_path, options.mac_addr)

504 505
    # decide what to do
    if options.write_config:
506
        s.save()
507
    elif options.cli:
508 509 510
        s.cmdloop()
    else:
        s.run()
511 512

if __name__ == '__main__':
513
    main()