#!/usr/bin/python # -*- coding: utf-8 -*- """Test inotify watching a file in Python using ctypes. """ import ctypes import os import posix import struct import sys import time SEEK_END = 2 IN_MODIFY = 2 # hopefully! inotify_event = "=iiii" # should be OK for LP64 and ILP32 inotify_event_size = struct.calcsize(inotify_event) if __name__ == '__main__': verbose = (sys.argv[1] == '-v') if verbose: sys.argv.pop(1) fn = sys.argv[1] fh = open(fn) fh.seek(0, 2) libc = ctypes.CDLL('libc.so.6', use_errno=True) inotify_fd = libc.inotify_init() inotify_fh = os.fdopen(inotify_fd) watch_id = libc.inotify_add_watch(inotify_fd, fn, IN_MODIFY) if watch_id < 0: raise OSError(ctypes.get_errno()) data = fh.read() sys.stdout.write(data) sys.stdout.flush() while True: # An unreasonably large buffer events = posix.read(inotify_fd, 32768) while events: first_event = events[:inotify_event_size] event_header = struct.unpack(inotify_event, first_event) n = event_header[3] if verbose: sys.stderr.write( "wd=%d, mask=0x%x, cookie=%d, len=%d, name=%r\n" % (event_header + (events[inotify_event_size: inotify_event_size+n],))) assert inotify_event_size + n <= len(events) events = events[inotify_event_size+n:] time.sleep(.005) # In case lots of data is being written data = fh.read() sys.stdout.write(data) sys.stdout.flush()