#!/usr/bin/env python # -*- coding: utf8 -*- ############################## # ____ ___ ____ ______ # # | \ / _] / T| T # # | o )/ [_ Y o || | # # | _/Y _]| |l_j l_j # # | | | [_ | _ | | | # # | | | T| | | | | # # l__j l_____jl__j__j l__j # # # ##### ##### # Repeat commands! # ################## import errno, os, subprocess, sys, time from optparse import OptionParser interval = 1.0 command = 'true' clear = True get_paths = lambda: set() verbose = True dynamic = None last_run = None USAGE = r"""usage: %prog [options] COMMAND COMMAND should be given as a single argument using a shell string. A list of paths to watch should be piped in on standard input. For example: find . | peat './test.sh' find . -name '*.py' | peat 'rm *.pyc' find . -name '*.py' -print0 | peat -0 'rm *.pyc' If --dynamic is used, the given command will be run each time to generate the list of files to check: peat --dynamic 'find .' './test.sh' peat --dynamic 'find . -name '\''*.py'\''' 'rm *.pyc' """ def log(s): if verbose: print(s) def die(s): sys.stderr.write('ERROR: ' + s + '\n') sys.exit(1) def check(paths): for p in paths: try: if os.stat(p).st_mtime >= last_run: return True except OSError as e: # If the file has been deleted since we started watching, don't # worry about it. if e.errno == errno.ENOENT: pass else: raise return False def run(): global last_run last_run = time.time() log("running: " + command) subprocess.call(command, shell=True) def build_option_parser(): p = OptionParser(USAGE) # Main options p.add_option('-i', '--interval', default=None, help='interval between checks in milliseconds', metavar='N') p.add_option('-I', '--smart-interval', dest='interval', action='store_const', const=None, help='determine the interval based on number of files watched (default)') p.add_option('-d', '--dynamic', default=None, help='run COMMAND before each run to generate the list of files to check', metavar='COMMAND') p.add_option('-D', '--no-dynamic', dest='dynamic', action='store_const', const=None, help='take a list of files to watch on standard in (default)') p.add_option('-c', '--clear', default=True, action='store_true', dest='clear', help='clear screen before runs (default)') p.add_option('-C', '--no-clear', action='store_false', dest='clear', help="don't clear screen before runs") p.add_option('-v', '--verbose', default=True, action='store_true', dest='verbose', help='show extra logging output (default)') p.add_option('-q', '--quiet', action='store_false', dest='verbose', help="don't show extra logging output") p.add_option('-w', '--whitespace', default=None, action='store_const', dest='sep', const=None, help="assume paths are separated by whitespace (default)") p.add_option('-n', '--newlines', action='store_const', dest='sep', const='\n', help="assume paths are separated by newlines") p.add_option('-s', '--spaces', action='store_const', dest='sep', const=' ', help="assume paths are separated by spaces") p.add_option('-0', '--zero', action='store_const', dest='sep', const='\0', help="assume paths are separated by null bytes") return p def _main(): if dynamic: log("Running the following command to generate watch list:") log(' ' + dynamic) log('') log("Watching the following paths:") for p in get_paths(): log(' ' + p) log('') log('Checking for changes every %d milliseconds.' % int(interval * 1000)) log('') run() while True: time.sleep(interval) if check(get_paths()): if clear: subprocess.check_call('clear') run() def smart_interval(count): """Return the smart interval to use in milliseconds.""" if count >= 50: return 1000 else: sq = lambda n: n * n return int(1000 * (1 - (sq(50.0 - count) / sq(50)))) def _parse_interval(options): global get_paths if options.interval: i = int(options.interval) elif options.dynamic: i = 1000 else: i = smart_interval(len(get_paths())) return i / 1000.0 def _parse_paths(sep, data): if not sep: paths = data.split() else: paths = data.split(sep) paths = [p.rstrip('\n') for p in paths if p] paths = map(os.path.abspath, paths) paths = set(paths) return paths def main(): global interval, command, clear, get_paths, verbose, dynamic (options, args) = build_option_parser().parse_args() if len(args) != 1: die("exactly one command must be given") command = args[0] clear = options.clear verbose = options.verbose sep = options.sep dynamic = options.dynamic if dynamic: def _get_paths(): data = subprocess.check_output(dynamic, shell=True) return _parse_paths(sep, data) get_paths = _get_paths else: data = sys.stdin.read() paths = _parse_paths(sep, data) if not paths: die("no paths to watch were given on standard input") for path in paths: if not os.path.exists(path): die('path to watch does not exist: ' + repr(path)) get_paths = lambda: paths interval = _parse_interval(options) _main() if __name__ == '__main__': import signal def sigint_handler(signal, frame): sys.stdout.write('\n') sys.exit(130) signal.signal(signal.SIGINT, sigint_handler) main()