Coverage for crontab.py: 33%
871 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-05-21 00:22 +0200
« prev ^ index » next coverage.py v7.2.2, created at 2023-05-21 00:22 +0200
1#
2# Copyright 2021, Martin Owens <doctormo@gmail.com>
3#
4# This library is free software; you can redistribute it and/or
5# modify it under the terms of the GNU Lesser General Public
6# License as published by the Free Software Foundation; either
7# version 3.0 of the License, or (at your option) any later version.
8#
9# This library is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12# Lesser General Public License for more details.
13#
14# You should have received a copy of the GNU Lesser General Public
15# License along with this library.
16#
17# pylint: disable=logging-format-interpolation,too-many-lines
18"""
19from crontab import CronTab
20import sys
22# Create a new non-installed crontab
23cron = CronTab(tab='')
24job = cron.new(command='/usr/bin/echo')
26job.minute.during(5,50).every(5)
27job.hour.every(4)
29job.dow.on('SUN')
30job.month.during('APR', 'JUN')
31job.month.also.during('OCT', 'DEC')
33job.every(2).days()
34job.setall(1, 12, None, None, None)
36job2 = cron.new(command='/foo/bar', comment='SomeID')
37job2.every_reboot()
39jobs = list(cron.find_command('bar'))
40job3 = jobs[0]
41job3.clear()
42job3.minute.every(1)
44sys.stdout.write(str(cron.render()))
46job3.enable(False)
48for job4 in cron.find_command('echo'):
49 sys.stdout.write(job4)
51for job5 in cron.find_comment('SomeID'):
52 sys.stdout.write(job5)
54for job6 in cron:
55 sys.stdout.write(job6)
57for job7 in cron:
58 job7.every(3).hours()
59 sys.stdout.write(job7)
60 job7.every().dow()
62cron.remove_all(command='/foo/bar')
63cron.remove_all(comment='This command')
64cron.remove_all(time='* * * * *')
65cron.remove_all()
67output = cron.render()
69cron.write()
71cron.write(filename='/tmp/output.txt')
73#cron.write_to_user(user=True)
75#cron.write_to_user(user='root')
77# Croniter Extentions allow you to ask for the scheduled job times, make
78# sure you have croniter installed, it's not a hard dependancy.
80job3.schedule().get_next()
81job3.schedule().get_prev()
83"""
85import os
86import re
87import shlex
89import types
90import codecs
91import logging
92import tempfile
93import platform
94import subprocess as sp
96from calendar import monthrange
97from time import sleep
98from datetime import time, date, datetime, timedelta
99from collections import OrderedDict
101__pkgname__ = 'python-crontab'
102__version__ = '2.8.0'
104ITEMREX = re.compile(r'^\s*([^@#\s]+)\s+([^@#\s]+)\s+([^@#\s]+)\s+([^@#\s]+)'
105 r'\s+([^@#\s]+)\s+([^\n]*?)(\s+#\s*([^\n]*)|$)')
106SPECREX = re.compile(r'^\s*@(\w+)\s([^#\n]*)(\s+#\s*([^\n]*)|$)')
107DEVNULL = ">/dev/null 2>&1"
109WEEK_ENUM = ['sun', 'mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun']
111MONTH_ENUM = [None, 'jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug',
112 'sep', 'oct', 'nov', 'dec']
114SPECIALS = {"reboot": '@reboot',
115 "hourly": '0 * * * *',
116 "daily": '0 0 * * *',
117 "weekly": '0 0 * * 0',
118 "monthly": '0 0 1 * *',
119 "yearly": '0 0 1 1 *',
120 "annually": '0 0 1 1 *',
121 "midnight": '0 0 * * *'}
123SPECIAL_IGNORE = ['midnight', 'annually']
125S_INFO = [
126 {'max': 59, 'min': 0, 'name': 'Minutes'},
127 {'max': 23, 'min': 0, 'name': 'Hours'},
128 {'max': 31, 'min': 1, 'name': 'Day of Month'},
129 {'max': 12, 'min': 1, 'name': 'Month', 'enum': MONTH_ENUM},
130 {'max': 6, 'min': 0, 'name': 'Day of Week', 'enum': WEEK_ENUM},
131]
133# Detect Python3 and which OS for temperments.
134WINOS = platform.system() == 'Windows'
135POSIX = os.name == 'posix'
136SYSTEMV = not WINOS and os.uname()[0] in ["SunOS", "AIX", "HP-UX"]
137SYSTEMV = not WINOS and (
138 os.uname()[0] in ["SunOS", "AIX", "HP-UX"]
139 or
140 os.uname()[4] in ["mips"]
141)
143# Switch this on if you want your crontabs to have zero padding.
144ZERO_PAD = False
146LOG = logging.getLogger('crontab')
148CRON_COMMAND = "/usr/bin/crontab"
149SHELL = os.environ.get('SHELL', '/bin/sh')
150# The shell won't actually work on windows here, but
151# it should be updated later in the below conditional.
153# pylint: disable=W0622,invalid-name,too-many-public-methods
154# pylint: disable=function-redefined,too-many-instance-attributes
155current_user = lambda: None 155 ↛ exitline 155 didn't run the lambda on line 155
156if not WINOS: 156 ↛ 162line 156 didn't jump to line 162, because the condition on line 156 was never false
157 import pwd
158 def current_user():
159 """Returns the username of the current user"""
160 return pwd.getpwuid(os.getuid())[0]
162def open_pipe(cmd, *args, **flags):
163 """Runs a program and orders the arguments for compatability.
165 a. keyword args are flags and always appear /before/ arguments for bsd
166 """
167 cmd_args = tuple(shlex.split(cmd, posix=flags.pop('posix', POSIX)))
168 env = flags.pop('env', None)
169 for (key, value) in flags.items():
170 if len(key) == 1:
171 cmd_args += (f"-{key}",)
172 if value is not None:
173 cmd_args += (str(value),)
174 else:
175 cmd_args += (f"--{key}={value}",)
176 args = tuple(arg for arg in (cmd_args + tuple(args)) if arg)
177 return sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE, env=env)
179def _str(text):
180 """Convert to the best string format for this python version"""
181 if isinstance(text, bytes): 181 ↛ 182line 181 didn't jump to line 182, because the condition on line 181 was never true
182 return text.decode('utf-8')
183 return text
186class CronTab:
187 """
188 Crontab object which can access any time based cron using the standard.
190 user - Set the user of the crontab (default: None)
191 * 'user' = Load from $username's crontab (instead of tab or tabfile)
192 * None = Don't load anything from any user crontab.
193 * True = Load from current $USER's crontab (unix only)
194 * False = This is a system crontab, each command has a username
196 tab - Use a string variable as the crontab instead of installed crontab
197 tabfile - Use a file for the crontab instead of installed crontab
198 log - Filename for logfile instead of /var/log/syslog
199 """
200 def __init__(self, user=None, tab=None, tabfile=None, log=None):
201 self.lines = None
202 self.crons = None
203 self.filen = None
204 self.cron_command = CRON_COMMAND
205 self.env = None
206 self._parked_env = OrderedDict()
207 # Protect windows users
208 self.root = not WINOS and os.getuid() == 0
209 # Storing user flag / username
210 self._user = user
211 # Load string or filename as inital crontab
212 self.intab = tab
213 self.tabfile = tabfile
214 self.read(tabfile)
215 self._log = log
217 def __enter__(self):
218 return self
220 def __exit__(self, exc_type, exc_val, exc_tb):
221 self.write()
223 @property
224 def log(self):
225 """Returns the CronLog object for this tab (user or root tab only)"""
226 from cronlog import CronLog # pylint: disable=import-outside-toplevel
227 if self._log is None or isinstance(self._log, str):
228 self._log = CronLog(self._log, user=self.user or 'root')
229 return self._log
231 @property
232 def user(self):
233 """Return user's username of this crontab if applicable"""
234 if self._user is True: 234 ↛ 235line 234 didn't jump to line 235, because the condition on line 234 was never true
235 return current_user()
236 return self._user
238 @property
239 def user_opt(self):
240 """Returns the user option for the crontab commandline"""
241 # Fedora and Mac require the current user to not specify
242 # But Ubuntu/Debian doesn't care. Be careful here.
243 if self._user and self._user is not True:
244 if self._user != current_user():
245 return {'u': self._user}
246 return {}
248 def __setattr__(self, name, value):
249 """Catch setting crons and lines directly"""
250 if name == 'lines' and value:
251 for line in value:
252 self.append(CronItem.from_line(line, cron=self), line, read=True)
253 elif name == 'crons' and value: 253 ↛ 254line 253 didn't jump to line 254, because the condition on line 253 was never true
254 raise AttributeError("You can NOT set crons attribute directly")
255 else:
256 super().__setattr__(name, value)
258 def read(self, filename=None):
259 """
260 Read in the crontab from the system into the object, called
261 automatically when listing or using the object. use for refresh.
262 """
263 self.crons = []
264 self.lines = []
265 self.env = OrderedVariableList()
266 lines = []
268 if self.intab is not None: 268 ↛ 271line 268 didn't jump to line 271, because the condition on line 268 was never false
269 lines = self.intab.split('\n')
271 elif filename:
272 self.filen = filename
273 with codecs.open(filename, 'r', encoding='utf-8') as fhl:
274 lines = fhl.readlines()
276 elif self.user:
277 (out, err) = open_pipe(self.cron_command, l='', **self.user_opt).communicate()
278 if err and 'no crontab for' in str(err):
279 pass
280 elif err:
281 raise IOError(f"Read crontab {self.user}: {err}")
282 lines = out.decode('utf-8').split("\n")
284 self.lines = lines
286 def append(self, item, line='', read=False, before=None):
287 """Append a CronItem object to this CronTab
289 Keyword arguments:
290 item - The CronItem object to append
291 line - The textual line which this item is.
292 read - Internal use only
293 before - Append before this CronItem, comment regex or generator
294 """
295 cron_id = len(self.crons)
296 line_id = len(self.lines)
298 if isinstance(before, (str, type(ITEMREX))): 298 ↛ 299line 298 didn't jump to line 299, because the condition on line 298 was never true
299 before = self.find_comment(before)
301 try:
302 if isinstance(before, (list, tuple, types.GeneratorType)): 302 ↛ 303line 302 didn't jump to line 303, because the condition on line 302 was never true
303 *_, before = before
305 if before is not None: 305 ↛ 306line 305 didn't jump to line 306, because the condition on line 305 was never true
306 cron_id = self.crons.index(before)
307 line_id = self.lines.index(before)
309 except ValueError as err:
310 raise ValueError("Can not find CronItem in crontab to insert before") from err
312 if item.is_valid(): 312 ↛ 321line 312 didn't jump to line 321, because the condition on line 312 was never false
313 item.env.update(self._parked_env)
314 self._parked_env = OrderedDict()
315 if read and not item.comment and self.lines and \ 315 ↛ 317line 315 didn't jump to line 317, because the condition on line 315 was never true
316 self.lines[-1] and self.lines[-1][0] == '#':
317 item.set_comment(self.lines.pop()[1:].strip(), True)
319 self.crons.insert(cron_id, item)
320 self.lines.insert(line_id, item)
321 elif '=' in line:
322 if ' ' not in line or line.index('=') < line.index(' '):
323 (name, value) = line.split('=', 1)
324 value = value.strip()
325 for quot in "\"'":
326 if value[0] == quot and value[-1] == quot:
327 value = value.strip(quot)
328 break
329 self._parked_env[name.strip()] = value
330 else:
331 if not self.crons and self._parked_env:
332 self.env.update(self._parked_env)
333 self._parked_env = OrderedDict()
334 self.lines.append(line.replace('\n', ''))
336 def write(self, filename=None, user=None, errors=False):
337 """Write the crontab to it's source or a given filename."""
338 if filename:
339 self.filen = filename
340 elif user is not None:
341 self.filen = None
342 self.intab = None
343 self._user = user
345 # Add to either the crontab or the internal tab.
346 if self.intab is not None:
347 self.intab = self.render()
348 # And that's it if we never saved to a file
349 if not self.filen:
350 return
352 if self.filen:
353 fileh = open(self.filen, 'wb') # pylint: disable=consider-using-with
354 else:
355 filed, path = tempfile.mkstemp()
356 fileh = os.fdopen(filed, 'wb')
358 fileh.write(self.render(errors=errors).encode('utf-8'))
359 fileh.close()
361 if not self.filen:
362 # Add the entire crontab back to the user crontab
363 if not self.user:
364 os.unlink(path)
365 raise IOError("Please specify user or filename to write.")
367 proc = open_pipe(self.cron_command, path, **self.user_opt)
368 ret = proc.wait()
369 if ret != 0:
370 msg = proc.stderr.read()
371 raise IOError(f"Program Error: {self.cron_command} returned {ret}: {msg}")
372 proc.stdout.close()
373 proc.stderr.close()
374 os.unlink(path)
376 def write_to_user(self, user=True):
377 """Write the crontab to a user (or root) instead of a file."""
378 return self.write(user=user)
380 def run_pending(self, **kwargs):
381 """Run all commands in this crontab if pending (generator)"""
382 for job in self:
383 ret = job.run_pending(**kwargs)
384 if ret not in [None, -1]:
385 yield ret
387 def run_scheduler(self, timeout=-1, **kwargs):
388 """Run the CronTab as an internal scheduler (generator)"""
389 count = 0
390 while count != timeout:
391 now = datetime.now()
392 if 'warp' in kwargs:
393 now += timedelta(seconds=count * 60)
394 for value in self.run_pending(now=now):
395 yield value
397 sleep(kwargs.get('cadence', 60))
398 count += 1
400 def render(self, errors=False, specials=True):
401 """Render this crontab as it would be in the crontab.
403 errors - Should we not comment out invalid entries and cause errors?
404 specials - Turn known times into keywords such as "@daily"
405 True - (default) force all values to be converted (unless SYSTEMV)
406 False - force all values back from being a keyword
407 None - don't change the special keyword use
408 """
409 crons = []
410 for line in self.lines:
411 if isinstance(line, (str, str)):
412 if line.strip().startswith('#') or not line.strip():
413 crons.append(line.strip())
414 elif not errors:
415 crons.append('# DISABLED LINE\n# ' + line)
416 else:
417 raise ValueError(f"Invalid line: {line}")
418 elif isinstance(line, CronItem):
419 if not line.is_valid() and not errors:
420 line.enabled = False
421 crons.append(line.render(specials=specials).strip())
423 # Environment variables are attached to cron lines so order will
424 # always work no matter how you add lines in the middle of the stack.
425 result = str(self.env) + '\n'.join(crons)
426 if result and result[-1] not in ('\n', '\r'):
427 result += '\n'
428 return result
430 def new(self, command='', comment='', user=None, pre_comment=False, before=None): # pylint: disable=too-many-arguments
431 """
432 Create a new CronItem and append it to the cron.
434 Keyword arguments:
435 command - The command that will be run.
436 comment - The comment that should be associated with this command.
437 user - For system cron tabs, the user this command should run as.
438 pre_comment - If true the comment will apear just before the command line.
439 before - Append this command before this item instead of at the end.
441 Returns the new CronItem object.
442 """
443 if not user and self.user is False:
444 raise ValueError("User is required for system crontabs.")
445 item = CronItem(command, comment, user=user, pre_comment=pre_comment)
446 item.cron = self
447 self.append(item, before=before)
448 return item
450 def find_command(self, command):
451 """Return an iter of jobs matching any part of the command."""
452 for job in list(self.crons):
453 if isinstance(command, type(ITEMREX)):
454 if command.findall(job.command):
455 yield job
456 elif command in job.command:
457 yield job
459 def find_comment(self, comment):
460 """Return an iter of jobs that match the comment field exactly."""
461 for job in list(self.crons):
462 if isinstance(comment, type(ITEMREX)):
463 if comment.findall(job.comment):
464 yield job
465 elif comment == job.comment:
466 yield job
468 def find_time(self, *args):
469 """Return an iter of jobs that match this time pattern"""
470 for job in list(self.crons):
471 if job.slices == CronSlices(*args):
472 yield job
474 @property
475 def commands(self):
476 """Return a generator of all unqiue commands used in this crontab"""
477 returned = []
478 for cron in self.crons:
479 if cron.command not in returned:
480 yield cron.command
481 returned.append(cron.command)
483 @property
484 def comments(self):
485 """Return a generator of all unique comments/Id used in this crontab"""
486 returned = []
487 for cron in self.crons:
488 if cron.comment and cron.comment not in returned:
489 yield cron.comment
490 returned.append(cron.comment)
492 def remove_all(self, *args, **kwargs):
493 """Removes all crons using the stated command OR that have the
494 stated comment OR removes everything if no arguments specified.
496 command - Remove all with this command
497 comment - Remove all with this comment or ID
498 time - Remove all with this time code
499 """
500 if args:
501 raise AttributeError("Invalid use: remove_all(command='cmd')")
502 if 'command' in kwargs:
503 return self.remove(*self.find_command(kwargs['command']))
504 if 'comment' in kwargs:
505 return self.remove(*self.find_comment(kwargs['comment']))
506 if 'time' in kwargs:
507 return self.remove(*self.find_time(kwargs['time']))
508 return self.remove(*self.crons[:])
510 def remove(self, *items):
511 """Remove a selected cron from the crontab."""
512 result = 0
513 for item in items:
514 if isinstance(item, (list, tuple, types.GeneratorType)): 514 ↛ 515line 514 didn't jump to line 515, because the condition on line 514 was never true
515 for subitem in item:
516 result += self._remove(subitem)
517 elif isinstance(item, CronItem): 517 ↛ 520line 517 didn't jump to line 520, because the condition on line 517 was never false
518 result += self._remove(item)
519 else:
520 raise TypeError("You may only remove CronItem objects, "\
521 "please use remove_all() to specify by name, id, etc.")
522 return result
524 def _remove(self, item):
525 """Internal removal of an item"""
526 # Manage siblings when items are deleted
527 for sibling in self.lines[self.lines.index(item)+1:]: 527 ↛ 528line 527 didn't jump to line 528, because the loop on line 527 never started
528 if isinstance(sibling, CronItem):
529 env = sibling.env
530 sibling.env = item.env
531 sibling.env.update(env)
532 sibling.env.job = sibling
533 break
534 if sibling != '':
535 break
536 self.lines.remove(sibling)
538 self.crons.remove(item)
539 self.lines.remove(item)
540 return 1
542 def __repr__(self):
543 kind = 'System ' if self._user is False else ''
544 if self.filen:
545 return f"<{kind}CronTab '{self.filen}'>"
546 if self.user and not self.user_opt:
547 return "<My CronTab>"
548 if self.user:
549 return f"<User CronTab '{self.user}'>"
550 return f"<Unattached {kind}CronTab>"
552 def __iter__(self):
553 """Return generator so we can track jobs after removal"""
554 for job in list(self.crons.__iter__()):
555 yield job
557 def __getitem__(self, i):
558 return self.crons[i]
560 def __len__(self):
561 return len(self.crons)
563 def __str__(self):
564 return self.render()
567class CronItem:
568 """
569 An item which objectifies a single line of a crontab and
570 May be considered to be a cron job object.
571 """
572 def __init__(self, command='', comment='', user=None, pre_comment=False):
573 self.cron = None
574 self.user = user
575 self.valid = False
576 self.enabled = True
577 self.special = False
578 self.comment = None
579 self.command = None
580 self.last_run = None
581 self.env = OrderedVariableList(job=self)
583 # Marker labels Ansible jobs etc
584 self.pre_comment = False
585 self.marker = None
586 self.stdin = None
587 self._log = None
589 # Initalise five cron slices using static info.
590 self.slices = CronSlices()
592 self.set_comment(comment, pre_comment)
594 if command: 594 ↛ 595line 594 didn't jump to line 595, because the condition on line 594 was never true
595 self.set_command(command)
597 def __hash__(self):
598 return hash((self.command, self.comment, self.hour, self.minute, self.dow))
600 def __eq__(self, other):
601 if not isinstance(other, CronItem): 601 ↛ 602line 601 didn't jump to line 602, because the condition on line 601 was never true
602 return False
603 return self.__hash__() == other.__hash__()
605 @classmethod
606 def from_line(cls, line, user=None, cron=None):
607 """Generate CronItem from a cron-line and parse out command and comment"""
608 obj = cls(user=user)
609 obj.cron = cron
610 obj.parse(line.strip())
611 return obj
613 def delete(self):
614 """Delete this item and remove it from it's parent"""
615 if not self.cron:
616 raise UnboundLocalError("Cron item is not in a crontab!")
617 self.cron.remove(self)
619 def set_command(self, cmd, parse_stdin=False):
620 """Set the command and filter as needed"""
621 if parse_stdin: 621 ↛ 625line 621 didn't jump to line 625, because the condition on line 621 was never false
622 cmd = cmd.replace('%', '\n').replace('\\\n', '%')
623 if '\n' in cmd: 623 ↛ 624line 623 didn't jump to line 624, because the condition on line 623 was never true
624 cmd, self.stdin = cmd.split('\n', 1)
625 self.command = _str(cmd.strip())
626 self.valid = True
628 def set_comment(self, cmt, pre_comment=False):
629 """Set the comment and don't filter, pre_comment indicates comment appears
630 before the cron, otherwise it appears ont he same line after the command.
631 """
632 if cmt and cmt[:8] == 'Ansible:': 632 ↛ 633line 632 didn't jump to line 633, because the condition on line 632 was never true
633 self.marker = 'Ansible'
634 cmt = cmt[8:].lstrip()
635 pre_comment = True
637 self.comment = cmt
638 self.pre_comment = pre_comment
640 def parse(self, line):
641 """Parse a cron line string and save the info as the objects."""
642 line = _str(line)
643 if not line or line[0] == '#': 643 ↛ 644line 643 didn't jump to line 644, because the condition on line 643 was never true
644 self.enabled = False
645 line = line[1:].strip()
646 # We parse all lines so we can detect disabled entries.
647 self._set_parse(ITEMREX.findall(line), line)
648 self._set_parse(SPECREX.findall(line), line)
650 def _set_parse(self, result, line=""):
651 """Set all the parsed variables into the item"""
652 if not result:
653 return
654 self.comment = result[0][-1]
655 if self.cron.user is False: 655 ↛ 657line 655 didn't jump to line 657, because the condition on line 655 was never true
656 # Special flag to look for per-command user
657 ret = result[0][-3].split(None, 1)
658 self.set_command(ret[-1], True)
659 if len(ret) == 2:
660 self.user = ret[0]
661 else:
662 # Disabled jobs might be ordinary comments, so log as DEBUG
663 level = logging.ERROR if self.enabled else logging.DEBUG
664 self.valid = False
665 self.enabled = False
666 LOG.log(level,
667 str("Missing user or command in system cron %s: %s"),
668 '' if self.cron is None else (self.cron.tabfile or ''),
669 line)
670 else:
671 self.set_command(result[0][-3], True)
672 try:
673 self.setall(*result[0][:-3])
674 except (ValueError, KeyError) as err:
675 if self.enabled:
676 LOG.error(str(err))
677 self.valid = False
678 self.enabled = False
680 def enable(self, enabled=True):
681 """Set if this cron job is enabled or not"""
682 if enabled in [True, False]:
683 self.enabled = enabled
684 return self.enabled
686 def is_enabled(self):
687 """Return true if this job is enabled (not commented out)"""
688 return self.enabled
690 def is_valid(self):
691 """Return true if this job is valid"""
692 return self.valid
694 def render(self, specials=True):
695 """Render this set cron-job to a string"""
696 if not self.is_valid() and self.enabled:
697 raise ValueError('Refusing to render invalid crontab.'
698 ' Disable to continue.')
699 command = _str(self.command).replace('%', '\\%')
700 user = ''
701 if self.cron and self.cron.user is False:
702 if not self.user:
703 raise ValueError("Job to system-cron format, no user set!")
704 user = self.user + ' '
705 rend = self.slices.render(specials=specials)
706 result = f"{rend} {user}{command}"
707 if self.stdin:
708 result += ' %' + self.stdin.replace('\n', '%')
709 if not self.enabled:
710 result = "# " + result
711 if self.comment:
712 comment = self.comment = _str(self.comment)
713 if self.marker:
714 comment = f"#{self.marker}: {comment}"
715 else:
716 comment = "# " + comment
718 if SYSTEMV or self.pre_comment or self.stdin:
719 result = comment + "\n" + result
720 else:
721 result += ' ' + comment
723 return str(self.env) + result
725 def every_reboot(self):
726 """Set to every reboot instead of a time pattern: @reboot"""
727 self.clear()
728 return self.slices.setall('@reboot')
730 def every(self, unit=1):
731 """
732 Replace existing time pattern with a single unit, setting all lower
733 units to first value in valid range.
735 For instance job.every(3).days() will be `0 0 */3 * *`
736 while job.day().every(3) would be `* * */3 * *`
738 Many of these patterns exist as special tokens on Linux, such as
739 `@midnight` and `@hourly`
740 """
741 return Every(self.slices, unit)
743 def setall(self, *args):
744 """Replace existing time pattern with these five values given as args:
746 job.setall("1 2 * * *")
747 job.setall(1, 2) == '1 2 * * *'
748 job.setall(0, 0, None, '>', 'SUN') == '0 0 * 12 SUN'
749 """
750 return self.slices.setall(*args)
752 def clear(self):
753 """Clear the special and set values"""
754 return self.slices.clear()
756 def frequency(self, year=None):
757 """Returns the number of times this item will execute in a given year
758 (defaults to this year)
759 """
760 return self.slices.frequency(year=year)
762 def frequency_at_hour(self, year=None, month=None, day=None, hour=None):
763 """Returns the number of times this item will execute in a given hour
764 (defaults to this hour)
765 """
766 return self.slices.frequency_at_hour(year=year, month=month, day=day, hour=hour)
768 def frequency_at_day(self, year=None, month=None, day=None):
769 """Returns the number of times this item will execute in a given day
770 (defaults to today)
771 """
772 return self.slices.frequency_at_day(year=year, month=month, day=day)
774 def frequency_at_month(self, year=None, month=None):
775 """Returns the number of times this item will execute in a given month
776 (defaults to this month)
777 """
778 return self.slices.frequency_at_month(year=year, month=month)
780 def frequency_at_year(self, year=None):
781 """Returns the number of times this item will execute in a given year
782 (defaults to this year)
783 """
784 return self.slices.frequency_at_year(year=year)
786 def frequency(self, year=None):
787 """Return frequence per year times frequency per day"""
788 return self.frequency_per_year(year=year) * self.frequency_per_day()
790 def frequency_per_year(self, year=None):
791 """Returns the number of /days/ this item will execute on in a year
792 (defaults to this year)
793 """
794 return self.slices.frequency_per_year(year=year)
796 def frequency_per_day(self):
797 """Returns the number of time this item will execute in any day"""
798 return self.slices.frequency_per_day()
800 def frequency_per_hour(self):
801 """Returns the number of times this item will execute in any hour"""
802 return self.slices.frequency_per_hour()
804 def run_pending(self, now=None):
805 """Runs the command if scheduled"""
806 now = now or datetime.now()
807 if self.is_enabled():
808 if self.last_run is None:
809 self.last_run = now
811 next_time = self.schedule(self.last_run).get_next()
812 if next_time < now:
813 self.last_run = now
814 return self.run()
815 return -1
817 def run(self):
818 """Runs the given command as a pipe"""
819 env = os.environ.copy()
820 env.update(self.env.all())
821 shell = self.env.get('SHELL', SHELL)
822 (out, err) = open_pipe(shell, '-c', self.command, env=env).communicate()
823 if err:
824 LOG.error(err.decode("utf-8"))
825 return out.decode("utf-8").strip()
827 def schedule(self, date_from=None):
828 """Return a croniter schedule if available."""
829 if not date_from:
830 date_from = datetime.now()
831 try:
832 # Croniter is an optional import
833 from croniter.croniter import croniter # pylint: disable=import-outside-toplevel
834 except ImportError as err:
835 raise ImportError("Croniter not available. Please install croniter"
836 " python module via pip or your package manager") from err
837 return croniter(self.slices.clean_render(), date_from, ret_type=datetime)
839 def description(self, **kw):
840 """
841 Returns a description of the crontab's schedule (if available)
843 **kw - Keyword arguments to pass to cron_descriptor (see docs)
844 """
845 try:
846 from cron_descriptor import ExpressionDescriptor # pylint: disable=import-outside-toplevel
847 except ImportError as err:
848 raise ImportError("cron_descriptor not available. Please install"\
849 "cron_descriptor python module via pip or your package manager") from err
851 exdesc = ExpressionDescriptor(self.slices.clean_render(), **kw)
852 return exdesc.get_description()
854 @property
855 def log(self):
856 """Return a cron log specific for this job only"""
857 if not self._log and self.cron:
858 self._log = self.cron.log.for_program(self.command)
859 return self._log
861 @property
862 def minute(self):
863 """Return the minute slice"""
864 return self.slices[0]
866 @property
867 def minutes(self):
868 """Same as minute"""
869 return self.minute
871 @property
872 def hour(self):
873 """Return the hour slice"""
874 return self.slices[1]
876 @property
877 def hours(self):
878 """Same as hour"""
879 return self.hour
881 @property
882 def day(self):
883 """Return the day slice"""
884 return self.dom
886 @property
887 def dom(self):
888 """Return the day-of-the month slice"""
889 return self.slices[2]
891 @property
892 def month(self):
893 """Return the month slice"""
894 return self.slices[3]
896 @property
897 def months(self):
898 """Same as month"""
899 return self.month
901 @property
902 def dow(self):
903 """Return the day of the week slice"""
904 return self.slices[4]
906 def __repr__(self):
907 return f"<CronItem '{self}'>"
909 def __len__(self):
910 return len(str(self))
912 def __getitem__(self, key):
913 return self.slices[key]
915 def __lt__(self, value):
916 return self.frequency() < CronSlices(value).frequency()
918 def __gt__(self, value):
919 return self.frequency() > CronSlices(value).frequency()
921 def __str__(self):
922 return self.render()
925class Every:
926 """Provide an interface to the job.every() method:
927 Available Calls:
928 minute, minutes, hour, hours, dom, doms, month, months, dow, dows
930 Once run all units will be cleared (set to *) then proceeding units
931 will be set to '0' and the target unit will be set as every x units.
932 """
933 def __init__(self, item, units):
934 self.slices = item
935 self.unit = units
936 for (key, name) in enumerate(['minute', 'hour', 'dom', 'month', 'dow',
937 'min', 'hour', 'day', 'moon', 'weekday']):
938 setattr(self, name, self.set_attr(key % 5))
939 setattr(self, name+'s', self.set_attr(key % 5))
941 def set_attr(self, target):
942 """Inner set target, returns function"""
943 def innercall():
944 """Returned inner call for setting slice targets"""
945 self.slices.clear()
946 # Day-of-week is actually a level 2 set, not level 4.
947 for key in range(target == 4 and 2 or target):
948 self.slices[key].on('<')
949 self.slices[target].every(self.unit)
950 return innercall
952 def year(self):
953 """Special every year target"""
954 if self.unit > 1:
955 raise ValueError(f"Invalid value '{self.unit}', outside 1 year")
956 self.slices.setall('@yearly')
959class CronSlices(list):
960 """Controls a list of five time 'slices' which reprisent:
961 minute frequency, hour frequency, day of month frequency,
962 month requency and finally day of the week frequency.
963 """
964 def __init__(self, *args):
965 super().__init__([CronSlice(info) for info in S_INFO])
966 self.special = None
967 self.setall(*args)
968 self.is_valid = self.is_self_valid
970 def is_self_valid(self, *args):
971 """Object version of is_valid"""
972 return CronSlices.is_valid(*(args or (self,)))
974 @classmethod
975 def is_valid(cls, *args): #pylint: disable=method-hidden
976 """Returns true if the arguments are valid cron pattern"""
977 try:
978 return bool(cls(*args))
979 except (ValueError, KeyError):
980 return False
982 def setall(self, *slices):
983 """Parses the various ways date/time frequency can be specified"""
984 self.clear()
985 if len(slices) == 1: 985 ↛ 986line 985 didn't jump to line 986, because the condition on line 985 was never true
986 (slices, self.special) = self._parse_value(slices[0])
987 if slices[0] == '@reboot':
988 return
989 if id(slices) == id(self): 989 ↛ 990line 989 didn't jump to line 990, because the condition on line 989 was never true
990 raise AssertionError("Can not set cron to itself!")
991 for set_a, set_b in zip(self, slices):
992 set_a.parse(set_b)
994 @staticmethod
995 def _parse_value(value):
996 """Parse a single value into an array of slices"""
997 if isinstance(value, str) and value:
998 return CronSlices._parse_str(value)
999 if isinstance(value, CronItem):
1000 return value.slices, None
1001 if isinstance(value, datetime):
1002 return [value.minute, value.hour, value.day, value.month, '*'], None
1003 if isinstance(value, time):
1004 return [value.minute, value.hour, '*', '*', '*'], None
1005 if isinstance(value, date):
1006 return [0, 0, value.day, value.month, '*'], None
1007 # It might be possible to later understand timedelta objects
1008 # but there's no convincing mathematics to do the conversion yet.
1009 if not isinstance(value, (list, tuple)):
1010 typ = type(value).__name__
1011 raise ValueError(f"Unknown type: {typ}")
1012 return value, None
1014 @staticmethod
1015 def _parse_str(value):
1016 """Parse a string which contains slice information"""
1017 key = value.lstrip('@').lower()
1018 if value.count(' ') == 4:
1019 return value.strip().split(' '), None
1020 if key in SPECIALS:
1021 return SPECIALS[key].split(' '), '@' + key
1022 if value.startswith('@'):
1023 raise ValueError(f"Unknown special '{value}'")
1024 return [value], None
1026 def clean_render(self):
1027 """Return just numbered parts of this crontab"""
1028 return ' '.join([str(s) for s in self])
1030 def render(self, specials=True):
1031 "Return just the first part of a cron job (the numbers or special)"
1032 slices = self.clean_render()
1033 if self.special and specials is not False:
1034 if self.special == '@reboot' or \
1035 SPECIALS[self.special.strip('@')] == slices:
1036 return self.special
1037 if not SYSTEMV and specials is True:
1038 for (name, value) in SPECIALS.items():
1039 if value == slices and name not in SPECIAL_IGNORE:
1040 return f"@{name}"
1041 return slices
1043 def clear(self):
1044 """Clear the special and set values"""
1045 self.special = None
1046 for item in self:
1047 item.clear()
1049 def frequency(self, year=None):
1050 """Return frequence per year times frequency per day"""
1051 return self.frequency_per_year(year=year) * self.frequency_per_day()
1053 def frequency_per_year(self, year=None):
1054 """Returns the number of times this item will execute
1055 in a given year (default is this year)"""
1056 result = 0
1057 if not year:
1058 year = date.today().year
1060 weekdays = list(self[4])
1062 for month in self[3]:
1063 for day in self[2]:
1064 try:
1065 if (date(year, month, day).weekday() + 1) % 7 in weekdays:
1066 result += 1
1067 except ValueError:
1068 continue
1069 return result
1071 def frequency_per_day(self):
1072 """Returns the number of times this item will execute in any day"""
1073 return len(self[0]) * len(self[1])
1075 def frequency_per_hour(self):
1076 """Returns the number of times this item will execute in any hour"""
1077 return len(self[0])
1079 def frequency_at_year(self, year=None):
1080 """Returns the number of /days/ this item will execute
1081 in a given year (default is this year)"""
1082 if not year:
1083 year = date.today().year
1085 total = 0
1086 for month in range(1, 13):
1087 total += self.frequency_at_month(year, month)
1088 return total
1090 def frequency_at_month(self, year=None, month=None):
1091 """Returns the number of times this item will execute in given month
1092 (default: current month)
1093 """
1094 if year is None and month is None:
1095 year = date.today().year
1096 month = date.today().month
1097 elif year is None or month is None:
1098 raise ValueError(
1099 f"One of more arguments undefined: year={year}, month={month}")
1101 total = 0
1102 if month in self[3]:
1103 # Calculate amount of days of specific month
1104 days = monthrange(year, month)[1]
1105 for day in range(1, days + 1):
1106 total += self.frequency_at_day(year, month, day)
1107 return total
1109 def frequency_at_day(self, year=None, month=None, day=None):
1110 """Returns the number of times this item will execute in a day
1111 (default: any executed day)
1112 """
1113 # If arguments provided, all needs to be provided
1114 test_none = [x is None for x in [year, month, day]]
1116 if all(test_none):
1117 return len(self[0]) * len(self[1])
1119 if any(test_none):
1120 raise ValueError(
1121 f"One of more arguments undefined: year={year}, month={month}, day={day}")
1123 total = 0
1124 if day in self[2]:
1125 for hour in range(24):
1126 total += self.frequency_at_hour(year, month, day, hour)
1127 return total
1129 def frequency_at_hour(self, year=None, month=None, day=None, hour=None):
1130 """Returns the number of times this item will execute in a hour
1131 (default: any executed hour)
1132 """
1133 # If arguments provided, all needs to be provided
1134 test_none = [x is None for x in [year, month, day, hour]]
1136 if all(test_none):
1137 return len(self[0])
1139 if any(test_none):
1140 raise ValueError(
1141 f"One of more arguments undefined: year={year}, month={month}, day={day}, hour={hour}")
1143 result = 0
1144 weekday = date(year, month, day).weekday()
1146 # Check if scheduled for execution at defined moment
1147 if hour in self[1] and \
1148 day in self[2] and \
1149 month in self[3] and \
1150 ((weekday + 1) % 7) in self[4]:
1151 result = len(self[0])
1153 return result
1155 def __str__(self):
1156 return self.render()
1158 def __eq__(self, arg):
1159 return self.render() == CronSlices(arg).render()
1162class SundayError(KeyError):
1163 """Sunday was specified as 7 instead of 0"""
1165class Also:
1166 """Link range values together (appending instead of replacing)"""
1167 def __init__(self, obj):
1168 self.obj = obj
1170 def every(self, *a):
1171 """Also every one of these"""
1172 return self.obj.every(*a, also=True)
1174 def on(self, *a):
1175 """Also on these"""
1176 return self.obj.on(*a, also=True)
1178 def during(self, *a):
1179 """Also during these"""
1180 return self.obj.during(*a, also=True)
1182class CronSlice:
1183 """Cron slice object which shows a time pattern"""
1184 def __init__(self, info, value=None):
1185 if isinstance(info, int): 1185 ↛ 1186line 1185 didn't jump to line 1186, because the condition on line 1185 was never true
1186 info = S_INFO[info]
1187 self.min = info.get('min', None)
1188 self.max = info.get('max', None)
1189 self.name = info.get('name', None)
1190 self.enum = info.get('enum', None)
1191 self.parts = []
1192 if value: 1192 ↛ 1193line 1192 didn't jump to line 1193, because the condition on line 1192 was never true
1193 self.parse(value)
1195 def __hash__(self):
1196 return hash(str(self))
1198 def parse(self, value):
1199 """Set values into the slice."""
1200 self.clear()
1201 if value is not None: 1201 ↛ exitline 1201 didn't return from function 'parse', because the condition on line 1201 was never false
1202 for part in str(value).split(','):
1203 if part.find("/") > 0 or part.find("-") > 0 or part == '*':
1204 self.parts += self.get_range(part)
1205 continue
1206 self.parts.append(self.parse_value(part, sunday=0))
1208 def render(self, resolve=False):
1209 """Return the slice rendered as a crontab.
1211 resolve - return integer values instead of enums (default False)
1213 """
1214 if not self.parts: 1214 ↛ 1215line 1214 didn't jump to line 1215, because the condition on line 1214 was never true
1215 return '*'
1216 return _render_values(self.parts, ',', resolve)
1218 def __repr__(self):
1219 return f"<CronSlice '{self}'>"
1221 def __eq__(self, value):
1222 return str(self) == str(value)
1224 def __str__(self):
1225 return self.render()
1227 def every(self, n_value, also=False):
1228 """Set the every X units value"""
1229 if not also:
1230 self.clear()
1231 self.parts += self.get_range(int(n_value))
1232 return self.parts[-1]
1234 def on(self, *n_value, **opts):
1235 """Set the time values to the specified placements."""
1236 if not opts.get('also', False):
1237 self.clear()
1238 for set_a in n_value:
1239 self.parts += (self.parse_value(set_a, sunday=0),)
1240 return self.parts
1242 def during(self, vfrom, vto, also=False):
1243 """Set the During value, which sets a range"""
1244 if not also:
1245 self.clear()
1246 self.parts += self.get_range(str(vfrom) + '-' + str(vto))
1247 return self.parts[-1]
1249 @property
1250 def also(self):
1251 """Appends rather than replaces the new values"""
1252 return Also(self)
1254 def clear(self):
1255 """clear the slice ready for new vaues"""
1256 self.parts = []
1258 def get_range(self, *vrange):
1259 """Return a cron range for this slice"""
1260 ret = CronRange(self, *vrange)
1261 if ret.dangling is not None: 1261 ↛ 1262line 1261 didn't jump to line 1262, because the condition on line 1261 was never true
1262 return [ret.dangling, ret]
1263 return [ret]
1265 def __iter__(self):
1266 """Return the entire element as an iterable"""
1267 ret = {}
1268 # An empty part means '*' which is every(1)
1269 if not self.parts:
1270 self.every(1)
1271 for part in self.parts:
1272 if isinstance(part, CronRange):
1273 for bit in part.range():
1274 ret[bit] = 1
1275 else:
1276 ret[int(part)] = 1
1277 for val in ret:
1278 yield val
1280 def __len__(self):
1281 """Returns the number of times this slice happens in it's range"""
1282 return len(list(self.__iter__()))
1284 def parse_value(self, val, sunday=None):
1285 """Parse the value of the cron slice and raise any errors needed"""
1286 if val == '>': 1286 ↛ 1287line 1286 didn't jump to line 1287, because the condition on line 1286 was never true
1287 val = self.max
1288 elif val == '<': 1288 ↛ 1289line 1288 didn't jump to line 1289, because the condition on line 1288 was never true
1289 val = self.min
1290 try:
1291 out = get_cronvalue(val, self.enum)
1292 except ValueError as err:
1293 raise ValueError(f"Unrecognised {self.name}: '{val}'") from err
1294 except KeyError as err:
1295 raise KeyError(f"No enumeration for {self.name}: '{val}'") from err
1297 if self.max == 6 and int(out) == 7: 1297 ↛ 1298line 1297 didn't jump to line 1298, because the condition on line 1297 was never true
1298 if sunday is not None:
1299 return sunday
1300 raise SundayError("Detected Sunday as 7 instead of 0!")
1302 if int(out) < self.min or int(out) > self.max: 1302 ↛ 1303line 1302 didn't jump to line 1303, because the condition on line 1302 was never true
1303 raise ValueError(f"'{val}', not in {self.min}-{self.max} for {self.name}")
1304 return out
1307def get_cronvalue(value, enums):
1308 """Returns a value as int (pass-through) or a special enum value"""
1309 if isinstance(value, int): 1309 ↛ 1310line 1309 didn't jump to line 1310, because the condition on line 1309 was never true
1310 return value
1311 if str(value).isdigit(): 1311 ↛ 1313line 1311 didn't jump to line 1313, because the condition on line 1311 was never false
1312 return int(str(value))
1313 if not enums:
1314 raise KeyError("No enumeration allowed")
1315 return CronValue(str(value), enums)
1318class CronValue: # pylint: disable=too-few-public-methods
1319 """Represent a special value in the cron line"""
1320 def __init__(self, value, enums):
1321 self.text = value
1322 self.value = enums.index(value.lower())
1324 def __lt__(self, value):
1325 return self.value < int(value)
1327 def __repr__(self):
1328 return str(self)
1330 def __str__(self):
1331 return self.text
1333 def __int__(self):
1334 return self.value
1337def _render_values(values, sep=',', resolve=False):
1338 """Returns a rendered list, sorted and optionally resolved"""
1339 if len(values) > 1: 1339 ↛ 1340line 1339 didn't jump to line 1340, because the condition on line 1339 was never true
1340 values.sort()
1341 return sep.join([_render(val, resolve) for val in values])
1344def _render(value, resolve=False):
1345 """Return a single value rendered"""
1346 if isinstance(value, CronRange):
1347 return value.render(resolve)
1348 if resolve: 1348 ↛ 1349line 1348 didn't jump to line 1349, because the condition on line 1348 was never true
1349 return str(int(value))
1350 return str(f'{value:02d}' if ZERO_PAD else value)
1353class CronRange:
1354 """A range between one value and another for a time range."""
1355 def __init__(self, vslice, *vrange):
1356 # holds an extra dangling entry, for example sundays.
1357 self.dangling = None
1358 self.slice = vslice
1359 self.cron = None
1360 self.seq = 1
1362 if not vrange: 1362 ↛ 1363line 1362 didn't jump to line 1363, because the condition on line 1362 was never true
1363 self.all()
1364 elif isinstance(vrange[0], str): 1364 ↛ 1366line 1364 didn't jump to line 1366, because the condition on line 1364 was never false
1365 self.parse(vrange[0])
1366 elif isinstance(vrange[0], (int, CronValue)):
1367 if len(vrange) == 2:
1368 (self.vfrom, self.vto) = vrange
1369 else:
1370 self.seq = vrange[0]
1371 self.all()
1373 def parse(self, value):
1374 """Parse a ranged value in a cronjob"""
1375 if value.count('/') == 1: 1375 ↛ 1376line 1375 didn't jump to line 1376, because the condition on line 1375 was never true
1376 value, seq = value.split('/')
1377 try:
1378 self.seq = self.slice.parse_value(seq)
1379 except SundayError:
1380 self.seq = 1
1381 value = "0-0"
1382 if self.seq < 1 or self.seq > self.slice.max:
1383 raise ValueError("Sequence can not be divided by zero or max")
1384 if value.count('-') == 1: 1384 ↛ 1385line 1384 didn't jump to line 1385, because the condition on line 1384 was never true
1385 vfrom, vto = value.split('-')
1386 self.vfrom = self.slice.parse_value(vfrom, sunday=0)
1387 try:
1388 self.vto = self.slice.parse_value(vto)
1389 except SundayError:
1390 if self.vfrom == 1:
1391 self.vfrom = 0
1392 else:
1393 self.dangling = 0
1394 self.vto = self.slice.parse_value(vto, sunday=6)
1395 if self.vto < self.vfrom:
1396 raise ValueError(f"Bad range '{self.vfrom}-{self.vto}'")
1397 elif value == '*': 1397 ↛ 1400line 1397 didn't jump to line 1400, because the condition on line 1397 was never false
1398 self.all()
1399 else:
1400 raise ValueError(f'Unknown cron range value "{value}"')
1402 def all(self):
1403 """Set this slice to all units between the miniumum and maximum"""
1404 self.vfrom = self.slice.min
1405 self.vto = self.slice.max
1407 def render(self, resolve=False):
1408 """Render the ranged value for a cronjob"""
1409 value = '*'
1410 if int(self.vfrom) > self.slice.min or int(self.vto) < self.slice.max: 1410 ↛ 1411line 1410 didn't jump to line 1411, because the condition on line 1410 was never true
1411 if self.vfrom == self.vto:
1412 value = str(self.vfrom)
1413 else:
1414 value = _render_values([self.vfrom, self.vto], '-', resolve)
1415 if self.seq != 1: 1415 ↛ 1416line 1415 didn't jump to line 1416, because the condition on line 1415 was never true
1416 value += f"/{self.seq:d}"
1417 if value != '*' and SYSTEMV: 1417 ↛ 1418line 1417 didn't jump to line 1418, because the condition on line 1417 was never true
1418 value = ','.join([str(val) for val in self.range()])
1419 return value
1421 def range(self):
1422 """Returns the range of this cron slice as a iterable list"""
1423 return range(int(self.vfrom), int(self.vto)+1, self.seq)
1425 def every(self, value):
1426 """Set the sequence value for this range."""
1427 self.seq = int(value)
1429 def __lt__(self, value):
1430 return int(self.vfrom) < int(value)
1432 def __gt__(self, value):
1433 return int(self.vto) > int(value)
1435 def __int__(self):
1436 return int(self.vfrom)
1438 def __str__(self):
1439 return self.render()
1442class OrderedVariableList(OrderedDict):
1443 """An ordered dictionary with a linked list containing
1444 the previous OrderedVariableList which this list depends.
1446 Duplicates in this list are weeded out in favour of the previous
1447 list in the chain.
1449 This is all in aid of the ENV variables list which must exist one
1450 per job in the chain.
1451 """
1452 def __init__(self, *args, **kw):
1453 self.job = kw.pop('job', None)
1454 super().__init__(*args, **kw)
1456 @property
1457 def previous(self):
1458 """Returns the previous env in the list of jobs in the cron"""
1459 if self.job is not None and self.job.cron is not None:
1460 index = self.job.cron.crons.index(self.job)
1461 if index == 0:
1462 return self.job.cron.env
1463 return self.job.cron[index-1].env
1464 return None
1466 def all(self):
1467 """
1468 Returns the full dictionary, everything from this dictionary
1469 plus all those in the chain above us.
1470 """
1471 if self.job is not None:
1472 ret = self.previous.all().copy()
1473 ret.update(self)
1474 return ret
1475 return self.copy()
1477 def __getitem__(self, key):
1478 previous = self.previous
1479 if key in self:
1480 return super().__getitem__(key)
1481 if previous is not None:
1482 return previous.all()[key]
1483 raise KeyError(f"Environment Variable '{key}' not found.")
1485 def __str__(self):
1486 """Constructs to variable list output used in cron jobs"""
1487 ret = []
1488 for key, value in self.items():
1489 if self.previous:
1490 if self.previous.all().get(key, None) == value:
1491 continue
1492 if ' ' in str(value) or value == '':
1493 value = f'"{value}"'
1494 ret.append(f"{key}={value}")
1495 ret.append('')
1496 return "\n".join(ret)