Coverage for crontabs.py: 20%
85 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-05-21 00:22 +0200
« prev ^ index » next coverage.py v7.2.2, created at 2023-05-21 00:22 +0200
1#
2# Copyright 2016, Martin Owens <doctormo@gmail.com>
3#
4# This library is free software; you can redistribute it and/or
5# modify it under the terms of the GNU Lesser General Public
6# License as published by the Free Software Foundation; either
7# version 3.0 of the License, or (at your option) any later version.
8#
9# This library is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12# Lesser General Public License for more details.
13#
14# You should have received a copy of the GNU Lesser General Public
15# License along with this library.
16#
17"""
18The crontabs manager will list all available crontabs on the system.
19"""
21import os
22import sys
23import pwd
24import itertools
26from os import stat, access, X_OK
27from pwd import getpwuid
28from crontab import CronTab
30class UserSpool(list):
31 """Generates all user crontabs, yields both owned and abandoned tabs"""
32 def __init__(self, loc, tabs=None):
33 for username in self.listdir(loc):
34 tab = self.generate(loc, username)
35 if tab:
36 self.append(tab)
37 if not self:
38 tab = CronTab(user=True)
39 if tab:
40 self.append(tab)
42 def listdir(self, loc):
43 try:
44 return os.listdir(loc)
45 except OSError:
46 return []
48 def get_owner(self, path):
49 """Returns user file at path"""
50 try:
51 return getpwuid(stat(path).st_uid).pw_name
52 except KeyError:
53 return
55 def generate(self, loc, username):
56 path = os.path.join(loc, username)
57 if username != self.get_owner(path):
58 # Abandoned crontab pool entry!
59 return CronTab(tabfile=path)
60 return CronTab(user=username)
63class SystemTab(list):
64 """Generates all system tabs"""
65 def __init__(self, loc, tabs=None):
66 if os.path.isdir(loc):
67 for item in os.listdir(loc):
68 if item[0] == '.':
69 continue
70 path = os.path.join(loc, item)
71 self.append(CronTab(user=False, tabfile=path))
72 elif os.path.isfile(loc):
73 self.append(CronTab(user=False, tabfile=loc))
76class AnaCronTab(list):
77 """Attempts to digest anacron entries (if possible)"""
78 def __init__(self, loc, tabs=None):
79 if tabs and os.path.isdir(loc):
80 self.append(CronTab(user=False))
81 jobs = list(tabs.all.find_command(loc))
82 if jobs:
83 for item in os.listdir(loc):
84 self.add(loc, item, jobs[0])
85 jobs[0].delete()
87 def add(self, loc, item, anajob):
88 path = os.path.join(loc, item)
89 if item in ['0anacron'] or item[0] == '.' or not access(path, X_OK):
90 return
91 job = self[0].new(command=path, user=anajob.user)
92 job.set_comment('Anacron %s' % loc.split('.')[-1])
93 job.setall(anajob)
94 return job
97# Files are direct, directories are listed (recursively)
98KNOWN_LOCATIONS = [
99 # Known linux locations (Debian, RedHat, etc)
100 (UserSpool, '/var/spool/cron/crontabs/'),
101 (SystemTab, '/etc/crontab'),
102 (SystemTab, '/etc/cron.d/'),
103 # Anacron digestion (we want to know more)
104 (AnaCronTab, '/etc/cron.hourly'),
105 (AnaCronTab, '/etc/cron.daily'),
106 (AnaCronTab, '/etc/cron.weekly'),
107 (AnaCronTab, '/etc/cron.monthly'),
108 # Known MacOSX locations
109 # None
110 # Other (windows, bsd)
111 # None
112]
114class CronTabs(list):
115 """Singleton dictionary of all detectable crontabs"""
116 _all = None
117 _self = None
119 def __new__(cls, *args, **kw):
120 if not cls._self:
121 cls._self = super(CronTabs, cls).__new__(cls, *args, **kw)
122 return cls._self
124 def __init__(self):
125 if not self:
126 for loc in KNOWN_LOCATIONS:
127 self.add(*loc)
129 def add(self, cls, *args):
130 for tab in cls(*args, tabs=self):
131 self.append(tab)
132 self._all = None
134 @property
135 def all(self):
136 """Return a CronTab object with all jobs (read-only)"""
137 if self._all is None:
138 self._all = CronTab(user=False)
139 for tab in self:
140 for job in tab:
141 if job.user is None:
142 job.user = tab.user or 'unknown'
143 self._all.append(job)
144 return self._all