Coverage for crontabs.py: 20%

85 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-05-21 00:22 +0200

1# 

2# Copyright 2016, Martin Owens <doctormo@gmail.com> 

3# 

4# This library is free software; you can redistribute it and/or 

5# modify it under the terms of the GNU Lesser General Public 

6# License as published by the Free Software Foundation; either 

7# version 3.0 of the License, or (at your option) any later version. 

8# 

9# This library is distributed in the hope that it will be useful, 

10# but WITHOUT ANY WARRANTY; without even the implied warranty of 

11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

12# Lesser General Public License for more details. 

13# 

14# You should have received a copy of the GNU Lesser General Public 

15# License along with this library. 

16# 

17""" 

18The crontabs manager will list all available crontabs on the system. 

19""" 

20 

21import os 

22import sys 

23import pwd 

24import itertools 

25 

26from os import stat, access, X_OK 

27from pwd import getpwuid 

28from crontab import CronTab 

29 

30class UserSpool(list): 

31 """Generates all user crontabs, yields both owned and abandoned tabs""" 

32 def __init__(self, loc, tabs=None): 

33 for username in self.listdir(loc): 

34 tab = self.generate(loc, username) 

35 if tab: 

36 self.append(tab) 

37 if not self: 

38 tab = CronTab(user=True) 

39 if tab: 

40 self.append(tab) 

41 

42 def listdir(self, loc): 

43 try: 

44 return os.listdir(loc) 

45 except OSError: 

46 return [] 

47 

48 def get_owner(self, path): 

49 """Returns user file at path""" 

50 try: 

51 return getpwuid(stat(path).st_uid).pw_name 

52 except KeyError: 

53 return 

54 

55 def generate(self, loc, username): 

56 path = os.path.join(loc, username) 

57 if username != self.get_owner(path): 

58 # Abandoned crontab pool entry! 

59 return CronTab(tabfile=path) 

60 return CronTab(user=username) 

61 

62 

63class SystemTab(list): 

64 """Generates all system tabs""" 

65 def __init__(self, loc, tabs=None): 

66 if os.path.isdir(loc): 

67 for item in os.listdir(loc): 

68 if item[0] == '.': 

69 continue 

70 path = os.path.join(loc, item) 

71 self.append(CronTab(user=False, tabfile=path)) 

72 elif os.path.isfile(loc): 

73 self.append(CronTab(user=False, tabfile=loc)) 

74 

75 

76class AnaCronTab(list): 

77 """Attempts to digest anacron entries (if possible)""" 

78 def __init__(self, loc, tabs=None): 

79 if tabs and os.path.isdir(loc): 

80 self.append(CronTab(user=False)) 

81 jobs = list(tabs.all.find_command(loc)) 

82 if jobs: 

83 for item in os.listdir(loc): 

84 self.add(loc, item, jobs[0]) 

85 jobs[0].delete() 

86 

87 def add(self, loc, item, anajob): 

88 path = os.path.join(loc, item) 

89 if item in ['0anacron'] or item[0] == '.' or not access(path, X_OK): 

90 return 

91 job = self[0].new(command=path, user=anajob.user) 

92 job.set_comment('Anacron %s' % loc.split('.')[-1]) 

93 job.setall(anajob) 

94 return job 

95 

96 

97# Files are direct, directories are listed (recursively) 

98KNOWN_LOCATIONS = [ 

99 # Known linux locations (Debian, RedHat, etc) 

100 (UserSpool, '/var/spool/cron/crontabs/'), 

101 (SystemTab, '/etc/crontab'), 

102 (SystemTab, '/etc/cron.d/'), 

103 # Anacron digestion (we want to know more) 

104 (AnaCronTab, '/etc/cron.hourly'), 

105 (AnaCronTab, '/etc/cron.daily'), 

106 (AnaCronTab, '/etc/cron.weekly'), 

107 (AnaCronTab, '/etc/cron.monthly'), 

108 # Known MacOSX locations 

109 # None 

110 # Other (windows, bsd) 

111 # None 

112] 

113 

114class CronTabs(list): 

115 """Singleton dictionary of all detectable crontabs""" 

116 _all = None 

117 _self = None 

118 

119 def __new__(cls, *args, **kw): 

120 if not cls._self: 

121 cls._self = super(CronTabs, cls).__new__(cls, *args, **kw) 

122 return cls._self 

123 

124 def __init__(self): 

125 if not self: 

126 for loc in KNOWN_LOCATIONS: 

127 self.add(*loc) 

128 

129 def add(self, cls, *args): 

130 for tab in cls(*args, tabs=self): 

131 self.append(tab) 

132 self._all = None 

133 

134 @property 

135 def all(self): 

136 """Return a CronTab object with all jobs (read-only)""" 

137 if self._all is None: 

138 self._all = CronTab(user=False) 

139 for tab in self: 

140 for job in tab: 

141 if job.user is None: 

142 job.user = tab.user or 'unknown' 

143 self._all.append(job) 

144 return self._all 

145