Coverage for crontab.py: 33%

871 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-05-21 00:22 +0200

1# 

2# Copyright 2021, Martin Owens <doctormo@gmail.com> 

3# 

4# This library is free software; you can redistribute it and/or 

5# modify it under the terms of the GNU Lesser General Public 

6# License as published by the Free Software Foundation; either 

7# version 3.0 of the License, or (at your option) any later version. 

8# 

9# This library is distributed in the hope that it will be useful, 

10# but WITHOUT ANY WARRANTY; without even the implied warranty of 

11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 

12# Lesser General Public License for more details. 

13# 

14# You should have received a copy of the GNU Lesser General Public 

15# License along with this library. 

16# 

17# pylint: disable=logging-format-interpolation,too-many-lines 

18""" 

19from crontab import CronTab 

20import sys 

21 

22# Create a new non-installed crontab 

23cron = CronTab(tab='') 

24job = cron.new(command='/usr/bin/echo') 

25 

26job.minute.during(5,50).every(5) 

27job.hour.every(4) 

28 

29job.dow.on('SUN') 

30job.month.during('APR', 'JUN') 

31job.month.also.during('OCT', 'DEC') 

32 

33job.every(2).days() 

34job.setall(1, 12, None, None, None) 

35 

36job2 = cron.new(command='/foo/bar', comment='SomeID') 

37job2.every_reboot() 

38 

39jobs = list(cron.find_command('bar')) 

40job3 = jobs[0] 

41job3.clear() 

42job3.minute.every(1) 

43 

44sys.stdout.write(str(cron.render())) 

45 

46job3.enable(False) 

47 

48for job4 in cron.find_command('echo'): 

49 sys.stdout.write(job4) 

50 

51for job5 in cron.find_comment('SomeID'): 

52 sys.stdout.write(job5) 

53 

54for job6 in cron: 

55 sys.stdout.write(job6) 

56 

57for job7 in cron: 

58 job7.every(3).hours() 

59 sys.stdout.write(job7) 

60 job7.every().dow() 

61 

62cron.remove_all(command='/foo/bar') 

63cron.remove_all(comment='This command') 

64cron.remove_all(time='* * * * *') 

65cron.remove_all() 

66 

67output = cron.render() 

68 

69cron.write() 

70 

71cron.write(filename='/tmp/output.txt') 

72 

73#cron.write_to_user(user=True) 

74 

75#cron.write_to_user(user='root') 

76 

77# Croniter Extentions allow you to ask for the scheduled job times, make 

78# sure you have croniter installed, it's not a hard dependancy. 

79 

80job3.schedule().get_next() 

81job3.schedule().get_prev() 

82 

83""" 

84 

85import os 

86import re 

87import shlex 

88 

89import types 

90import codecs 

91import logging 

92import tempfile 

93import platform 

94import subprocess as sp 

95 

96from calendar import monthrange 

97from time import sleep 

98from datetime import time, date, datetime, timedelta 

99from collections import OrderedDict 

100 

101__pkgname__ = 'python-crontab' 

102__version__ = '2.8.0' 

103 

104ITEMREX = re.compile(r'^\s*([^@#\s]+)\s+([^@#\s]+)\s+([^@#\s]+)\s+([^@#\s]+)' 

105 r'\s+([^@#\s]+)\s+([^\n]*?)(\s+#\s*([^\n]*)|$)') 

106SPECREX = re.compile(r'^\s*@(\w+)\s([^#\n]*)(\s+#\s*([^\n]*)|$)') 

107DEVNULL = ">/dev/null 2>&1" 

108 

109WEEK_ENUM = ['sun', 'mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun'] 

110 

111MONTH_ENUM = [None, 'jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', 

112 'sep', 'oct', 'nov', 'dec'] 

113 

114SPECIALS = {"reboot": '@reboot', 

115 "hourly": '0 * * * *', 

116 "daily": '0 0 * * *', 

117 "weekly": '0 0 * * 0', 

118 "monthly": '0 0 1 * *', 

119 "yearly": '0 0 1 1 *', 

120 "annually": '0 0 1 1 *', 

121 "midnight": '0 0 * * *'} 

122 

123SPECIAL_IGNORE = ['midnight', 'annually'] 

124 

125S_INFO = [ 

126 {'max': 59, 'min': 0, 'name': 'Minutes'}, 

127 {'max': 23, 'min': 0, 'name': 'Hours'}, 

128 {'max': 31, 'min': 1, 'name': 'Day of Month'}, 

129 {'max': 12, 'min': 1, 'name': 'Month', 'enum': MONTH_ENUM}, 

130 {'max': 6, 'min': 0, 'name': 'Day of Week', 'enum': WEEK_ENUM}, 

131] 

132 

133# Detect Python3 and which OS for temperments. 

134WINOS = platform.system() == 'Windows' 

135POSIX = os.name == 'posix' 

136SYSTEMV = not WINOS and os.uname()[0] in ["SunOS", "AIX", "HP-UX"] 

137SYSTEMV = not WINOS and ( 

138 os.uname()[0] in ["SunOS", "AIX", "HP-UX"] 

139 or 

140 os.uname()[4] in ["mips"] 

141) 

142 

143# Switch this on if you want your crontabs to have zero padding. 

144ZERO_PAD = False 

145 

146LOG = logging.getLogger('crontab') 

147 

148CRON_COMMAND = "/usr/bin/crontab" 

149SHELL = os.environ.get('SHELL', '/bin/sh') 

150# The shell won't actually work on windows here, but 

151# it should be updated later in the below conditional. 

152 

153# pylint: disable=W0622,invalid-name,too-many-public-methods 

154# pylint: disable=function-redefined,too-many-instance-attributes 

155current_user = lambda: None 155 ↛ exitline 155 didn't run the lambda on line 155

156if not WINOS: 156 ↛ 162line 156 didn't jump to line 162, because the condition on line 156 was never false

157 import pwd 

158 def current_user(): 

159 """Returns the username of the current user""" 

160 return pwd.getpwuid(os.getuid())[0] 

161 

162def open_pipe(cmd, *args, **flags): 

163 """Runs a program and orders the arguments for compatability. 

164 

165 a. keyword args are flags and always appear /before/ arguments for bsd 

166 """ 

167 cmd_args = tuple(shlex.split(cmd, posix=flags.pop('posix', POSIX))) 

168 env = flags.pop('env', None) 

169 for (key, value) in flags.items(): 

170 if len(key) == 1: 

171 cmd_args += (f"-{key}",) 

172 if value is not None: 

173 cmd_args += (str(value),) 

174 else: 

175 cmd_args += (f"--{key}={value}",) 

176 args = tuple(arg for arg in (cmd_args + tuple(args)) if arg) 

177 return sp.Popen(args, stdout=sp.PIPE, stderr=sp.PIPE, env=env) 

178 

179def _str(text): 

180 """Convert to the best string format for this python version""" 

181 if isinstance(text, bytes): 181 ↛ 182line 181 didn't jump to line 182, because the condition on line 181 was never true

182 return text.decode('utf-8') 

183 return text 

184 

185 

186class CronTab: 

187 """ 

188 Crontab object which can access any time based cron using the standard. 

189 

190 user - Set the user of the crontab (default: None) 

191 * 'user' = Load from $username's crontab (instead of tab or tabfile) 

192 * None = Don't load anything from any user crontab. 

193 * True = Load from current $USER's crontab (unix only) 

194 * False = This is a system crontab, each command has a username 

195 

196 tab - Use a string variable as the crontab instead of installed crontab 

197 tabfile - Use a file for the crontab instead of installed crontab 

198 log - Filename for logfile instead of /var/log/syslog 

199 """ 

200 def __init__(self, user=None, tab=None, tabfile=None, log=None): 

201 self.lines = None 

202 self.crons = None 

203 self.filen = None 

204 self.cron_command = CRON_COMMAND 

205 self.env = None 

206 self._parked_env = OrderedDict() 

207 # Protect windows users 

208 self.root = not WINOS and os.getuid() == 0 

209 # Storing user flag / username 

210 self._user = user 

211 # Load string or filename as inital crontab 

212 self.intab = tab 

213 self.tabfile = tabfile 

214 self.read(tabfile) 

215 self._log = log 

216 

217 def __enter__(self): 

218 return self 

219 

220 def __exit__(self, exc_type, exc_val, exc_tb): 

221 self.write() 

222 

223 @property 

224 def log(self): 

225 """Returns the CronLog object for this tab (user or root tab only)""" 

226 from cronlog import CronLog # pylint: disable=import-outside-toplevel 

227 if self._log is None or isinstance(self._log, str): 

228 self._log = CronLog(self._log, user=self.user or 'root') 

229 return self._log 

230 

231 @property 

232 def user(self): 

233 """Return user's username of this crontab if applicable""" 

234 if self._user is True: 234 ↛ 235line 234 didn't jump to line 235, because the condition on line 234 was never true

235 return current_user() 

236 return self._user 

237 

238 @property 

239 def user_opt(self): 

240 """Returns the user option for the crontab commandline""" 

241 # Fedora and Mac require the current user to not specify 

242 # But Ubuntu/Debian doesn't care. Be careful here. 

243 if self._user and self._user is not True: 

244 if self._user != current_user(): 

245 return {'u': self._user} 

246 return {} 

247 

248 def __setattr__(self, name, value): 

249 """Catch setting crons and lines directly""" 

250 if name == 'lines' and value: 

251 for line in value: 

252 self.append(CronItem.from_line(line, cron=self), line, read=True) 

253 elif name == 'crons' and value: 253 ↛ 254line 253 didn't jump to line 254, because the condition on line 253 was never true

254 raise AttributeError("You can NOT set crons attribute directly") 

255 else: 

256 super().__setattr__(name, value) 

257 

258 def read(self, filename=None): 

259 """ 

260 Read in the crontab from the system into the object, called 

261 automatically when listing or using the object. use for refresh. 

262 """ 

263 self.crons = [] 

264 self.lines = [] 

265 self.env = OrderedVariableList() 

266 lines = [] 

267 

268 if self.intab is not None: 268 ↛ 271line 268 didn't jump to line 271, because the condition on line 268 was never false

269 lines = self.intab.split('\n') 

270 

271 elif filename: 

272 self.filen = filename 

273 with codecs.open(filename, 'r', encoding='utf-8') as fhl: 

274 lines = fhl.readlines() 

275 

276 elif self.user: 

277 (out, err) = open_pipe(self.cron_command, l='', **self.user_opt).communicate() 

278 if err and 'no crontab for' in str(err): 

279 pass 

280 elif err: 

281 raise IOError(f"Read crontab {self.user}: {err}") 

282 lines = out.decode('utf-8').split("\n") 

283 

284 self.lines = lines 

285 

286 def append(self, item, line='', read=False, before=None): 

287 """Append a CronItem object to this CronTab 

288 

289 Keyword arguments: 

290 item - The CronItem object to append 

291 line - The textual line which this item is. 

292 read - Internal use only 

293 before - Append before this CronItem, comment regex or generator 

294 """ 

295 cron_id = len(self.crons) 

296 line_id = len(self.lines) 

297 

298 if isinstance(before, (str, type(ITEMREX))): 298 ↛ 299line 298 didn't jump to line 299, because the condition on line 298 was never true

299 before = self.find_comment(before) 

300 

301 try: 

302 if isinstance(before, (list, tuple, types.GeneratorType)): 302 ↛ 303line 302 didn't jump to line 303, because the condition on line 302 was never true

303 *_, before = before 

304 

305 if before is not None: 305 ↛ 306line 305 didn't jump to line 306, because the condition on line 305 was never true

306 cron_id = self.crons.index(before) 

307 line_id = self.lines.index(before) 

308 

309 except ValueError as err: 

310 raise ValueError("Can not find CronItem in crontab to insert before") from err 

311 

312 if item.is_valid(): 312 ↛ 321line 312 didn't jump to line 321, because the condition on line 312 was never false

313 item.env.update(self._parked_env) 

314 self._parked_env = OrderedDict() 

315 if read and not item.comment and self.lines and \ 315 ↛ 317line 315 didn't jump to line 317, because the condition on line 315 was never true

316 self.lines[-1] and self.lines[-1][0] == '#': 

317 item.set_comment(self.lines.pop()[1:].strip(), True) 

318 

319 self.crons.insert(cron_id, item) 

320 self.lines.insert(line_id, item) 

321 elif '=' in line: 

322 if ' ' not in line or line.index('=') < line.index(' '): 

323 (name, value) = line.split('=', 1) 

324 value = value.strip() 

325 for quot in "\"'": 

326 if value[0] == quot and value[-1] == quot: 

327 value = value.strip(quot) 

328 break 

329 self._parked_env[name.strip()] = value 

330 else: 

331 if not self.crons and self._parked_env: 

332 self.env.update(self._parked_env) 

333 self._parked_env = OrderedDict() 

334 self.lines.append(line.replace('\n', '')) 

335 

336 def write(self, filename=None, user=None, errors=False): 

337 """Write the crontab to it's source or a given filename.""" 

338 if filename: 

339 self.filen = filename 

340 elif user is not None: 

341 self.filen = None 

342 self.intab = None 

343 self._user = user 

344 

345 # Add to either the crontab or the internal tab. 

346 if self.intab is not None: 

347 self.intab = self.render() 

348 # And that's it if we never saved to a file 

349 if not self.filen: 

350 return 

351 

352 if self.filen: 

353 fileh = open(self.filen, 'wb') # pylint: disable=consider-using-with 

354 else: 

355 filed, path = tempfile.mkstemp() 

356 fileh = os.fdopen(filed, 'wb') 

357 

358 fileh.write(self.render(errors=errors).encode('utf-8')) 

359 fileh.close() 

360 

361 if not self.filen: 

362 # Add the entire crontab back to the user crontab 

363 if not self.user: 

364 os.unlink(path) 

365 raise IOError("Please specify user or filename to write.") 

366 

367 proc = open_pipe(self.cron_command, path, **self.user_opt) 

368 ret = proc.wait() 

369 if ret != 0: 

370 msg = proc.stderr.read() 

371 raise IOError(f"Program Error: {self.cron_command} returned {ret}: {msg}") 

372 proc.stdout.close() 

373 proc.stderr.close() 

374 os.unlink(path) 

375 

376 def write_to_user(self, user=True): 

377 """Write the crontab to a user (or root) instead of a file.""" 

378 return self.write(user=user) 

379 

380 def run_pending(self, **kwargs): 

381 """Run all commands in this crontab if pending (generator)""" 

382 for job in self: 

383 ret = job.run_pending(**kwargs) 

384 if ret not in [None, -1]: 

385 yield ret 

386 

387 def run_scheduler(self, timeout=-1, **kwargs): 

388 """Run the CronTab as an internal scheduler (generator)""" 

389 count = 0 

390 while count != timeout: 

391 now = datetime.now() 

392 if 'warp' in kwargs: 

393 now += timedelta(seconds=count * 60) 

394 for value in self.run_pending(now=now): 

395 yield value 

396 

397 sleep(kwargs.get('cadence', 60)) 

398 count += 1 

399 

400 def render(self, errors=False, specials=True): 

401 """Render this crontab as it would be in the crontab. 

402 

403 errors - Should we not comment out invalid entries and cause errors? 

404 specials - Turn known times into keywords such as "@daily" 

405 True - (default) force all values to be converted (unless SYSTEMV) 

406 False - force all values back from being a keyword 

407 None - don't change the special keyword use 

408 """ 

409 crons = [] 

410 for line in self.lines: 

411 if isinstance(line, (str, str)): 

412 if line.strip().startswith('#') or not line.strip(): 

413 crons.append(line.strip()) 

414 elif not errors: 

415 crons.append('# DISABLED LINE\n# ' + line) 

416 else: 

417 raise ValueError(f"Invalid line: {line}") 

418 elif isinstance(line, CronItem): 

419 if not line.is_valid() and not errors: 

420 line.enabled = False 

421 crons.append(line.render(specials=specials).strip()) 

422 

423 # Environment variables are attached to cron lines so order will 

424 # always work no matter how you add lines in the middle of the stack. 

425 result = str(self.env) + '\n'.join(crons) 

426 if result and result[-1] not in ('\n', '\r'): 

427 result += '\n' 

428 return result 

429 

430 def new(self, command='', comment='', user=None, pre_comment=False, before=None): # pylint: disable=too-many-arguments 

431 """ 

432 Create a new CronItem and append it to the cron. 

433 

434 Keyword arguments: 

435 command - The command that will be run. 

436 comment - The comment that should be associated with this command. 

437 user - For system cron tabs, the user this command should run as. 

438 pre_comment - If true the comment will apear just before the command line. 

439 before - Append this command before this item instead of at the end. 

440 

441 Returns the new CronItem object. 

442 """ 

443 if not user and self.user is False: 

444 raise ValueError("User is required for system crontabs.") 

445 item = CronItem(command, comment, user=user, pre_comment=pre_comment) 

446 item.cron = self 

447 self.append(item, before=before) 

448 return item 

449 

450 def find_command(self, command): 

451 """Return an iter of jobs matching any part of the command.""" 

452 for job in list(self.crons): 

453 if isinstance(command, type(ITEMREX)): 

454 if command.findall(job.command): 

455 yield job 

456 elif command in job.command: 

457 yield job 

458 

459 def find_comment(self, comment): 

460 """Return an iter of jobs that match the comment field exactly.""" 

461 for job in list(self.crons): 

462 if isinstance(comment, type(ITEMREX)): 

463 if comment.findall(job.comment): 

464 yield job 

465 elif comment == job.comment: 

466 yield job 

467 

468 def find_time(self, *args): 

469 """Return an iter of jobs that match this time pattern""" 

470 for job in list(self.crons): 

471 if job.slices == CronSlices(*args): 

472 yield job 

473 

474 @property 

475 def commands(self): 

476 """Return a generator of all unqiue commands used in this crontab""" 

477 returned = [] 

478 for cron in self.crons: 

479 if cron.command not in returned: 

480 yield cron.command 

481 returned.append(cron.command) 

482 

483 @property 

484 def comments(self): 

485 """Return a generator of all unique comments/Id used in this crontab""" 

486 returned = [] 

487 for cron in self.crons: 

488 if cron.comment and cron.comment not in returned: 

489 yield cron.comment 

490 returned.append(cron.comment) 

491 

492 def remove_all(self, *args, **kwargs): 

493 """Removes all crons using the stated command OR that have the 

494 stated comment OR removes everything if no arguments specified. 

495 

496 command - Remove all with this command 

497 comment - Remove all with this comment or ID 

498 time - Remove all with this time code 

499 """ 

500 if args: 

501 raise AttributeError("Invalid use: remove_all(command='cmd')") 

502 if 'command' in kwargs: 

503 return self.remove(*self.find_command(kwargs['command'])) 

504 if 'comment' in kwargs: 

505 return self.remove(*self.find_comment(kwargs['comment'])) 

506 if 'time' in kwargs: 

507 return self.remove(*self.find_time(kwargs['time'])) 

508 return self.remove(*self.crons[:]) 

509 

510 def remove(self, *items): 

511 """Remove a selected cron from the crontab.""" 

512 result = 0 

513 for item in items: 

514 if isinstance(item, (list, tuple, types.GeneratorType)): 514 ↛ 515line 514 didn't jump to line 515, because the condition on line 514 was never true

515 for subitem in item: 

516 result += self._remove(subitem) 

517 elif isinstance(item, CronItem): 517 ↛ 520line 517 didn't jump to line 520, because the condition on line 517 was never false

518 result += self._remove(item) 

519 else: 

520 raise TypeError("You may only remove CronItem objects, "\ 

521 "please use remove_all() to specify by name, id, etc.") 

522 return result 

523 

524 def _remove(self, item): 

525 """Internal removal of an item""" 

526 # Manage siblings when items are deleted 

527 for sibling in self.lines[self.lines.index(item)+1:]: 527 ↛ 528line 527 didn't jump to line 528, because the loop on line 527 never started

528 if isinstance(sibling, CronItem): 

529 env = sibling.env 

530 sibling.env = item.env 

531 sibling.env.update(env) 

532 sibling.env.job = sibling 

533 break 

534 if sibling != '': 

535 break 

536 self.lines.remove(sibling) 

537 

538 self.crons.remove(item) 

539 self.lines.remove(item) 

540 return 1 

541 

542 def __repr__(self): 

543 kind = 'System ' if self._user is False else '' 

544 if self.filen: 

545 return f"<{kind}CronTab '{self.filen}'>" 

546 if self.user and not self.user_opt: 

547 return "<My CronTab>" 

548 if self.user: 

549 return f"<User CronTab '{self.user}'>" 

550 return f"<Unattached {kind}CronTab>" 

551 

552 def __iter__(self): 

553 """Return generator so we can track jobs after removal""" 

554 for job in list(self.crons.__iter__()): 

555 yield job 

556 

557 def __getitem__(self, i): 

558 return self.crons[i] 

559 

560 def __len__(self): 

561 return len(self.crons) 

562 

563 def __str__(self): 

564 return self.render() 

565 

566 

567class CronItem: 

568 """ 

569 An item which objectifies a single line of a crontab and 

570 May be considered to be a cron job object. 

571 """ 

572 def __init__(self, command='', comment='', user=None, pre_comment=False): 

573 self.cron = None 

574 self.user = user 

575 self.valid = False 

576 self.enabled = True 

577 self.special = False 

578 self.comment = None 

579 self.command = None 

580 self.last_run = None 

581 self.env = OrderedVariableList(job=self) 

582 

583 # Marker labels Ansible jobs etc 

584 self.pre_comment = False 

585 self.marker = None 

586 self.stdin = None 

587 self._log = None 

588 

589 # Initalise five cron slices using static info. 

590 self.slices = CronSlices() 

591 

592 self.set_comment(comment, pre_comment) 

593 

594 if command: 594 ↛ 595line 594 didn't jump to line 595, because the condition on line 594 was never true

595 self.set_command(command) 

596 

597 def __hash__(self): 

598 return hash((self.command, self.comment, self.hour, self.minute, self.dow)) 

599 

600 def __eq__(self, other): 

601 if not isinstance(other, CronItem): 601 ↛ 602line 601 didn't jump to line 602, because the condition on line 601 was never true

602 return False 

603 return self.__hash__() == other.__hash__() 

604 

605 @classmethod 

606 def from_line(cls, line, user=None, cron=None): 

607 """Generate CronItem from a cron-line and parse out command and comment""" 

608 obj = cls(user=user) 

609 obj.cron = cron 

610 obj.parse(line.strip()) 

611 return obj 

612 

613 def delete(self): 

614 """Delete this item and remove it from it's parent""" 

615 if not self.cron: 

616 raise UnboundLocalError("Cron item is not in a crontab!") 

617 self.cron.remove(self) 

618 

619 def set_command(self, cmd, parse_stdin=False): 

620 """Set the command and filter as needed""" 

621 if parse_stdin: 621 ↛ 625line 621 didn't jump to line 625, because the condition on line 621 was never false

622 cmd = cmd.replace('%', '\n').replace('\\\n', '%') 

623 if '\n' in cmd: 623 ↛ 624line 623 didn't jump to line 624, because the condition on line 623 was never true

624 cmd, self.stdin = cmd.split('\n', 1) 

625 self.command = _str(cmd.strip()) 

626 self.valid = True 

627 

628 def set_comment(self, cmt, pre_comment=False): 

629 """Set the comment and don't filter, pre_comment indicates comment appears 

630 before the cron, otherwise it appears ont he same line after the command. 

631 """ 

632 if cmt and cmt[:8] == 'Ansible:': 632 ↛ 633line 632 didn't jump to line 633, because the condition on line 632 was never true

633 self.marker = 'Ansible' 

634 cmt = cmt[8:].lstrip() 

635 pre_comment = True 

636 

637 self.comment = cmt 

638 self.pre_comment = pre_comment 

639 

640 def parse(self, line): 

641 """Parse a cron line string and save the info as the objects.""" 

642 line = _str(line) 

643 if not line or line[0] == '#': 643 ↛ 644line 643 didn't jump to line 644, because the condition on line 643 was never true

644 self.enabled = False 

645 line = line[1:].strip() 

646 # We parse all lines so we can detect disabled entries. 

647 self._set_parse(ITEMREX.findall(line), line) 

648 self._set_parse(SPECREX.findall(line), line) 

649 

650 def _set_parse(self, result, line=""): 

651 """Set all the parsed variables into the item""" 

652 if not result: 

653 return 

654 self.comment = result[0][-1] 

655 if self.cron.user is False: 655 ↛ 657line 655 didn't jump to line 657, because the condition on line 655 was never true

656 # Special flag to look for per-command user 

657 ret = result[0][-3].split(None, 1) 

658 self.set_command(ret[-1], True) 

659 if len(ret) == 2: 

660 self.user = ret[0] 

661 else: 

662 # Disabled jobs might be ordinary comments, so log as DEBUG 

663 level = logging.ERROR if self.enabled else logging.DEBUG 

664 self.valid = False 

665 self.enabled = False 

666 LOG.log(level, 

667 str("Missing user or command in system cron %s: %s"), 

668 '' if self.cron is None else (self.cron.tabfile or ''), 

669 line) 

670 else: 

671 self.set_command(result[0][-3], True) 

672 try: 

673 self.setall(*result[0][:-3]) 

674 except (ValueError, KeyError) as err: 

675 if self.enabled: 

676 LOG.error(str(err)) 

677 self.valid = False 

678 self.enabled = False 

679 

680 def enable(self, enabled=True): 

681 """Set if this cron job is enabled or not""" 

682 if enabled in [True, False]: 

683 self.enabled = enabled 

684 return self.enabled 

685 

686 def is_enabled(self): 

687 """Return true if this job is enabled (not commented out)""" 

688 return self.enabled 

689 

690 def is_valid(self): 

691 """Return true if this job is valid""" 

692 return self.valid 

693 

694 def render(self, specials=True): 

695 """Render this set cron-job to a string""" 

696 if not self.is_valid() and self.enabled: 

697 raise ValueError('Refusing to render invalid crontab.' 

698 ' Disable to continue.') 

699 command = _str(self.command).replace('%', '\\%') 

700 user = '' 

701 if self.cron and self.cron.user is False: 

702 if not self.user: 

703 raise ValueError("Job to system-cron format, no user set!") 

704 user = self.user + ' ' 

705 rend = self.slices.render(specials=specials) 

706 result = f"{rend} {user}{command}" 

707 if self.stdin: 

708 result += ' %' + self.stdin.replace('\n', '%') 

709 if not self.enabled: 

710 result = "# " + result 

711 if self.comment: 

712 comment = self.comment = _str(self.comment) 

713 if self.marker: 

714 comment = f"#{self.marker}: {comment}" 

715 else: 

716 comment = "# " + comment 

717 

718 if SYSTEMV or self.pre_comment or self.stdin: 

719 result = comment + "\n" + result 

720 else: 

721 result += ' ' + comment 

722 

723 return str(self.env) + result 

724 

725 def every_reboot(self): 

726 """Set to every reboot instead of a time pattern: @reboot""" 

727 self.clear() 

728 return self.slices.setall('@reboot') 

729 

730 def every(self, unit=1): 

731 """ 

732 Replace existing time pattern with a single unit, setting all lower 

733 units to first value in valid range. 

734 

735 For instance job.every(3).days() will be `0 0 */3 * *` 

736 while job.day().every(3) would be `* * */3 * *` 

737 

738 Many of these patterns exist as special tokens on Linux, such as 

739 `@midnight` and `@hourly` 

740 """ 

741 return Every(self.slices, unit) 

742 

743 def setall(self, *args): 

744 """Replace existing time pattern with these five values given as args: 

745 

746 job.setall("1 2 * * *") 

747 job.setall(1, 2) == '1 2 * * *' 

748 job.setall(0, 0, None, '>', 'SUN') == '0 0 * 12 SUN' 

749 """ 

750 return self.slices.setall(*args) 

751 

752 def clear(self): 

753 """Clear the special and set values""" 

754 return self.slices.clear() 

755 

756 def frequency(self, year=None): 

757 """Returns the number of times this item will execute in a given year 

758 (defaults to this year) 

759 """ 

760 return self.slices.frequency(year=year) 

761 

762 def frequency_at_hour(self, year=None, month=None, day=None, hour=None): 

763 """Returns the number of times this item will execute in a given hour 

764 (defaults to this hour) 

765 """ 

766 return self.slices.frequency_at_hour(year=year, month=month, day=day, hour=hour) 

767 

768 def frequency_at_day(self, year=None, month=None, day=None): 

769 """Returns the number of times this item will execute in a given day 

770 (defaults to today) 

771 """ 

772 return self.slices.frequency_at_day(year=year, month=month, day=day) 

773 

774 def frequency_at_month(self, year=None, month=None): 

775 """Returns the number of times this item will execute in a given month 

776 (defaults to this month) 

777 """ 

778 return self.slices.frequency_at_month(year=year, month=month) 

779 

780 def frequency_at_year(self, year=None): 

781 """Returns the number of times this item will execute in a given year 

782 (defaults to this year) 

783 """ 

784 return self.slices.frequency_at_year(year=year) 

785 

786 def frequency(self, year=None): 

787 """Return frequence per year times frequency per day""" 

788 return self.frequency_per_year(year=year) * self.frequency_per_day() 

789 

790 def frequency_per_year(self, year=None): 

791 """Returns the number of /days/ this item will execute on in a year 

792 (defaults to this year) 

793 """ 

794 return self.slices.frequency_per_year(year=year) 

795 

796 def frequency_per_day(self): 

797 """Returns the number of time this item will execute in any day""" 

798 return self.slices.frequency_per_day() 

799 

800 def frequency_per_hour(self): 

801 """Returns the number of times this item will execute in any hour""" 

802 return self.slices.frequency_per_hour() 

803 

804 def run_pending(self, now=None): 

805 """Runs the command if scheduled""" 

806 now = now or datetime.now() 

807 if self.is_enabled(): 

808 if self.last_run is None: 

809 self.last_run = now 

810 

811 next_time = self.schedule(self.last_run).get_next() 

812 if next_time < now: 

813 self.last_run = now 

814 return self.run() 

815 return -1 

816 

817 def run(self): 

818 """Runs the given command as a pipe""" 

819 env = os.environ.copy() 

820 env.update(self.env.all()) 

821 shell = self.env.get('SHELL', SHELL) 

822 (out, err) = open_pipe(shell, '-c', self.command, env=env).communicate() 

823 if err: 

824 LOG.error(err.decode("utf-8")) 

825 return out.decode("utf-8").strip() 

826 

827 def schedule(self, date_from=None): 

828 """Return a croniter schedule if available.""" 

829 if not date_from: 

830 date_from = datetime.now() 

831 try: 

832 # Croniter is an optional import 

833 from croniter.croniter import croniter # pylint: disable=import-outside-toplevel 

834 except ImportError as err: 

835 raise ImportError("Croniter not available. Please install croniter" 

836 " python module via pip or your package manager") from err 

837 return croniter(self.slices.clean_render(), date_from, ret_type=datetime) 

838 

839 def description(self, **kw): 

840 """ 

841 Returns a description of the crontab's schedule (if available) 

842 

843 **kw - Keyword arguments to pass to cron_descriptor (see docs) 

844 """ 

845 try: 

846 from cron_descriptor import ExpressionDescriptor # pylint: disable=import-outside-toplevel 

847 except ImportError as err: 

848 raise ImportError("cron_descriptor not available. Please install"\ 

849 "cron_descriptor python module via pip or your package manager") from err 

850 

851 exdesc = ExpressionDescriptor(self.slices.clean_render(), **kw) 

852 return exdesc.get_description() 

853 

854 @property 

855 def log(self): 

856 """Return a cron log specific for this job only""" 

857 if not self._log and self.cron: 

858 self._log = self.cron.log.for_program(self.command) 

859 return self._log 

860 

861 @property 

862 def minute(self): 

863 """Return the minute slice""" 

864 return self.slices[0] 

865 

866 @property 

867 def minutes(self): 

868 """Same as minute""" 

869 return self.minute 

870 

871 @property 

872 def hour(self): 

873 """Return the hour slice""" 

874 return self.slices[1] 

875 

876 @property 

877 def hours(self): 

878 """Same as hour""" 

879 return self.hour 

880 

881 @property 

882 def day(self): 

883 """Return the day slice""" 

884 return self.dom 

885 

886 @property 

887 def dom(self): 

888 """Return the day-of-the month slice""" 

889 return self.slices[2] 

890 

891 @property 

892 def month(self): 

893 """Return the month slice""" 

894 return self.slices[3] 

895 

896 @property 

897 def months(self): 

898 """Same as month""" 

899 return self.month 

900 

901 @property 

902 def dow(self): 

903 """Return the day of the week slice""" 

904 return self.slices[4] 

905 

906 def __repr__(self): 

907 return f"<CronItem '{self}'>" 

908 

909 def __len__(self): 

910 return len(str(self)) 

911 

912 def __getitem__(self, key): 

913 return self.slices[key] 

914 

915 def __lt__(self, value): 

916 return self.frequency() < CronSlices(value).frequency() 

917 

918 def __gt__(self, value): 

919 return self.frequency() > CronSlices(value).frequency() 

920 

921 def __str__(self): 

922 return self.render() 

923 

924 

925class Every: 

926 """Provide an interface to the job.every() method: 

927 Available Calls: 

928 minute, minutes, hour, hours, dom, doms, month, months, dow, dows 

929 

930 Once run all units will be cleared (set to *) then proceeding units 

931 will be set to '0' and the target unit will be set as every x units. 

932 """ 

933 def __init__(self, item, units): 

934 self.slices = item 

935 self.unit = units 

936 for (key, name) in enumerate(['minute', 'hour', 'dom', 'month', 'dow', 

937 'min', 'hour', 'day', 'moon', 'weekday']): 

938 setattr(self, name, self.set_attr(key % 5)) 

939 setattr(self, name+'s', self.set_attr(key % 5)) 

940 

941 def set_attr(self, target): 

942 """Inner set target, returns function""" 

943 def innercall(): 

944 """Returned inner call for setting slice targets""" 

945 self.slices.clear() 

946 # Day-of-week is actually a level 2 set, not level 4. 

947 for key in range(target == 4 and 2 or target): 

948 self.slices[key].on('<') 

949 self.slices[target].every(self.unit) 

950 return innercall 

951 

952 def year(self): 

953 """Special every year target""" 

954 if self.unit > 1: 

955 raise ValueError(f"Invalid value '{self.unit}', outside 1 year") 

956 self.slices.setall('@yearly') 

957 

958 

959class CronSlices(list): 

960 """Controls a list of five time 'slices' which reprisent: 

961 minute frequency, hour frequency, day of month frequency, 

962 month requency and finally day of the week frequency. 

963 """ 

964 def __init__(self, *args): 

965 super().__init__([CronSlice(info) for info in S_INFO]) 

966 self.special = None 

967 self.setall(*args) 

968 self.is_valid = self.is_self_valid 

969 

970 def is_self_valid(self, *args): 

971 """Object version of is_valid""" 

972 return CronSlices.is_valid(*(args or (self,))) 

973 

974 @classmethod 

975 def is_valid(cls, *args): #pylint: disable=method-hidden 

976 """Returns true if the arguments are valid cron pattern""" 

977 try: 

978 return bool(cls(*args)) 

979 except (ValueError, KeyError): 

980 return False 

981 

982 def setall(self, *slices): 

983 """Parses the various ways date/time frequency can be specified""" 

984 self.clear() 

985 if len(slices) == 1: 985 ↛ 986line 985 didn't jump to line 986, because the condition on line 985 was never true

986 (slices, self.special) = self._parse_value(slices[0]) 

987 if slices[0] == '@reboot': 

988 return 

989 if id(slices) == id(self): 989 ↛ 990line 989 didn't jump to line 990, because the condition on line 989 was never true

990 raise AssertionError("Can not set cron to itself!") 

991 for set_a, set_b in zip(self, slices): 

992 set_a.parse(set_b) 

993 

994 @staticmethod 

995 def _parse_value(value): 

996 """Parse a single value into an array of slices""" 

997 if isinstance(value, str) and value: 

998 return CronSlices._parse_str(value) 

999 if isinstance(value, CronItem): 

1000 return value.slices, None 

1001 if isinstance(value, datetime): 

1002 return [value.minute, value.hour, value.day, value.month, '*'], None 

1003 if isinstance(value, time): 

1004 return [value.minute, value.hour, '*', '*', '*'], None 

1005 if isinstance(value, date): 

1006 return [0, 0, value.day, value.month, '*'], None 

1007 # It might be possible to later understand timedelta objects 

1008 # but there's no convincing mathematics to do the conversion yet. 

1009 if not isinstance(value, (list, tuple)): 

1010 typ = type(value).__name__ 

1011 raise ValueError(f"Unknown type: {typ}") 

1012 return value, None 

1013 

1014 @staticmethod 

1015 def _parse_str(value): 

1016 """Parse a string which contains slice information""" 

1017 key = value.lstrip('@').lower() 

1018 if value.count(' ') == 4: 

1019 return value.strip().split(' '), None 

1020 if key in SPECIALS: 

1021 return SPECIALS[key].split(' '), '@' + key 

1022 if value.startswith('@'): 

1023 raise ValueError(f"Unknown special '{value}'") 

1024 return [value], None 

1025 

1026 def clean_render(self): 

1027 """Return just numbered parts of this crontab""" 

1028 return ' '.join([str(s) for s in self]) 

1029 

1030 def render(self, specials=True): 

1031 "Return just the first part of a cron job (the numbers or special)" 

1032 slices = self.clean_render() 

1033 if self.special and specials is not False: 

1034 if self.special == '@reboot' or \ 

1035 SPECIALS[self.special.strip('@')] == slices: 

1036 return self.special 

1037 if not SYSTEMV and specials is True: 

1038 for (name, value) in SPECIALS.items(): 

1039 if value == slices and name not in SPECIAL_IGNORE: 

1040 return f"@{name}" 

1041 return slices 

1042 

1043 def clear(self): 

1044 """Clear the special and set values""" 

1045 self.special = None 

1046 for item in self: 

1047 item.clear() 

1048 

1049 def frequency(self, year=None): 

1050 """Return frequence per year times frequency per day""" 

1051 return self.frequency_per_year(year=year) * self.frequency_per_day() 

1052 

1053 def frequency_per_year(self, year=None): 

1054 """Returns the number of times this item will execute 

1055 in a given year (default is this year)""" 

1056 result = 0 

1057 if not year: 

1058 year = date.today().year 

1059 

1060 weekdays = list(self[4]) 

1061 

1062 for month in self[3]: 

1063 for day in self[2]: 

1064 try: 

1065 if (date(year, month, day).weekday() + 1) % 7 in weekdays: 

1066 result += 1 

1067 except ValueError: 

1068 continue 

1069 return result 

1070 

1071 def frequency_per_day(self): 

1072 """Returns the number of times this item will execute in any day""" 

1073 return len(self[0]) * len(self[1]) 

1074 

1075 def frequency_per_hour(self): 

1076 """Returns the number of times this item will execute in any hour""" 

1077 return len(self[0]) 

1078 

1079 def frequency_at_year(self, year=None): 

1080 """Returns the number of /days/ this item will execute 

1081 in a given year (default is this year)""" 

1082 if not year: 

1083 year = date.today().year 

1084 

1085 total = 0 

1086 for month in range(1, 13): 

1087 total += self.frequency_at_month(year, month) 

1088 return total 

1089 

1090 def frequency_at_month(self, year=None, month=None): 

1091 """Returns the number of times this item will execute in given month 

1092 (default: current month) 

1093 """ 

1094 if year is None and month is None: 

1095 year = date.today().year 

1096 month = date.today().month 

1097 elif year is None or month is None: 

1098 raise ValueError( 

1099 f"One of more arguments undefined: year={year}, month={month}") 

1100 

1101 total = 0 

1102 if month in self[3]: 

1103 # Calculate amount of days of specific month 

1104 days = monthrange(year, month)[1] 

1105 for day in range(1, days + 1): 

1106 total += self.frequency_at_day(year, month, day) 

1107 return total 

1108 

1109 def frequency_at_day(self, year=None, month=None, day=None): 

1110 """Returns the number of times this item will execute in a day 

1111 (default: any executed day) 

1112 """ 

1113 # If arguments provided, all needs to be provided 

1114 test_none = [x is None for x in [year, month, day]] 

1115 

1116 if all(test_none): 

1117 return len(self[0]) * len(self[1]) 

1118 

1119 if any(test_none): 

1120 raise ValueError( 

1121 f"One of more arguments undefined: year={year}, month={month}, day={day}") 

1122 

1123 total = 0 

1124 if day in self[2]: 

1125 for hour in range(24): 

1126 total += self.frequency_at_hour(year, month, day, hour) 

1127 return total 

1128 

1129 def frequency_at_hour(self, year=None, month=None, day=None, hour=None): 

1130 """Returns the number of times this item will execute in a hour 

1131 (default: any executed hour) 

1132 """ 

1133 # If arguments provided, all needs to be provided 

1134 test_none = [x is None for x in [year, month, day, hour]] 

1135 

1136 if all(test_none): 

1137 return len(self[0]) 

1138 

1139 if any(test_none): 

1140 raise ValueError( 

1141 f"One of more arguments undefined: year={year}, month={month}, day={day}, hour={hour}") 

1142 

1143 result = 0 

1144 weekday = date(year, month, day).weekday() 

1145 

1146 # Check if scheduled for execution at defined moment 

1147 if hour in self[1] and \ 

1148 day in self[2] and \ 

1149 month in self[3] and \ 

1150 ((weekday + 1) % 7) in self[4]: 

1151 result = len(self[0]) 

1152 

1153 return result 

1154 

1155 def __str__(self): 

1156 return self.render() 

1157 

1158 def __eq__(self, arg): 

1159 return self.render() == CronSlices(arg).render() 

1160 

1161 

1162class SundayError(KeyError): 

1163 """Sunday was specified as 7 instead of 0""" 

1164 

1165class Also: 

1166 """Link range values together (appending instead of replacing)""" 

1167 def __init__(self, obj): 

1168 self.obj = obj 

1169 

1170 def every(self, *a): 

1171 """Also every one of these""" 

1172 return self.obj.every(*a, also=True) 

1173 

1174 def on(self, *a): 

1175 """Also on these""" 

1176 return self.obj.on(*a, also=True) 

1177 

1178 def during(self, *a): 

1179 """Also during these""" 

1180 return self.obj.during(*a, also=True) 

1181 

1182class CronSlice: 

1183 """Cron slice object which shows a time pattern""" 

1184 def __init__(self, info, value=None): 

1185 if isinstance(info, int): 1185 ↛ 1186line 1185 didn't jump to line 1186, because the condition on line 1185 was never true

1186 info = S_INFO[info] 

1187 self.min = info.get('min', None) 

1188 self.max = info.get('max', None) 

1189 self.name = info.get('name', None) 

1190 self.enum = info.get('enum', None) 

1191 self.parts = [] 

1192 if value: 1192 ↛ 1193line 1192 didn't jump to line 1193, because the condition on line 1192 was never true

1193 self.parse(value) 

1194 

1195 def __hash__(self): 

1196 return hash(str(self)) 

1197 

1198 def parse(self, value): 

1199 """Set values into the slice.""" 

1200 self.clear() 

1201 if value is not None: 1201 ↛ exitline 1201 didn't return from function 'parse', because the condition on line 1201 was never false

1202 for part in str(value).split(','): 

1203 if part.find("/") > 0 or part.find("-") > 0 or part == '*': 

1204 self.parts += self.get_range(part) 

1205 continue 

1206 self.parts.append(self.parse_value(part, sunday=0)) 

1207 

1208 def render(self, resolve=False): 

1209 """Return the slice rendered as a crontab. 

1210 

1211 resolve - return integer values instead of enums (default False) 

1212 

1213 """ 

1214 if not self.parts: 1214 ↛ 1215line 1214 didn't jump to line 1215, because the condition on line 1214 was never true

1215 return '*' 

1216 return _render_values(self.parts, ',', resolve) 

1217 

1218 def __repr__(self): 

1219 return f"<CronSlice '{self}'>" 

1220 

1221 def __eq__(self, value): 

1222 return str(self) == str(value) 

1223 

1224 def __str__(self): 

1225 return self.render() 

1226 

1227 def every(self, n_value, also=False): 

1228 """Set the every X units value""" 

1229 if not also: 

1230 self.clear() 

1231 self.parts += self.get_range(int(n_value)) 

1232 return self.parts[-1] 

1233 

1234 def on(self, *n_value, **opts): 

1235 """Set the time values to the specified placements.""" 

1236 if not opts.get('also', False): 

1237 self.clear() 

1238 for set_a in n_value: 

1239 self.parts += (self.parse_value(set_a, sunday=0),) 

1240 return self.parts 

1241 

1242 def during(self, vfrom, vto, also=False): 

1243 """Set the During value, which sets a range""" 

1244 if not also: 

1245 self.clear() 

1246 self.parts += self.get_range(str(vfrom) + '-' + str(vto)) 

1247 return self.parts[-1] 

1248 

1249 @property 

1250 def also(self): 

1251 """Appends rather than replaces the new values""" 

1252 return Also(self) 

1253 

1254 def clear(self): 

1255 """clear the slice ready for new vaues""" 

1256 self.parts = [] 

1257 

1258 def get_range(self, *vrange): 

1259 """Return a cron range for this slice""" 

1260 ret = CronRange(self, *vrange) 

1261 if ret.dangling is not None: 1261 ↛ 1262line 1261 didn't jump to line 1262, because the condition on line 1261 was never true

1262 return [ret.dangling, ret] 

1263 return [ret] 

1264 

1265 def __iter__(self): 

1266 """Return the entire element as an iterable""" 

1267 ret = {} 

1268 # An empty part means '*' which is every(1) 

1269 if not self.parts: 

1270 self.every(1) 

1271 for part in self.parts: 

1272 if isinstance(part, CronRange): 

1273 for bit in part.range(): 

1274 ret[bit] = 1 

1275 else: 

1276 ret[int(part)] = 1 

1277 for val in ret: 

1278 yield val 

1279 

1280 def __len__(self): 

1281 """Returns the number of times this slice happens in it's range""" 

1282 return len(list(self.__iter__())) 

1283 

1284 def parse_value(self, val, sunday=None): 

1285 """Parse the value of the cron slice and raise any errors needed""" 

1286 if val == '>': 1286 ↛ 1287line 1286 didn't jump to line 1287, because the condition on line 1286 was never true

1287 val = self.max 

1288 elif val == '<': 1288 ↛ 1289line 1288 didn't jump to line 1289, because the condition on line 1288 was never true

1289 val = self.min 

1290 try: 

1291 out = get_cronvalue(val, self.enum) 

1292 except ValueError as err: 

1293 raise ValueError(f"Unrecognised {self.name}: '{val}'") from err 

1294 except KeyError as err: 

1295 raise KeyError(f"No enumeration for {self.name}: '{val}'") from err 

1296 

1297 if self.max == 6 and int(out) == 7: 1297 ↛ 1298line 1297 didn't jump to line 1298, because the condition on line 1297 was never true

1298 if sunday is not None: 

1299 return sunday 

1300 raise SundayError("Detected Sunday as 7 instead of 0!") 

1301 

1302 if int(out) < self.min or int(out) > self.max: 1302 ↛ 1303line 1302 didn't jump to line 1303, because the condition on line 1302 was never true

1303 raise ValueError(f"'{val}', not in {self.min}-{self.max} for {self.name}") 

1304 return out 

1305 

1306 

1307def get_cronvalue(value, enums): 

1308 """Returns a value as int (pass-through) or a special enum value""" 

1309 if isinstance(value, int): 1309 ↛ 1310line 1309 didn't jump to line 1310, because the condition on line 1309 was never true

1310 return value 

1311 if str(value).isdigit(): 1311 ↛ 1313line 1311 didn't jump to line 1313, because the condition on line 1311 was never false

1312 return int(str(value)) 

1313 if not enums: 

1314 raise KeyError("No enumeration allowed") 

1315 return CronValue(str(value), enums) 

1316 

1317 

1318class CronValue: # pylint: disable=too-few-public-methods 

1319 """Represent a special value in the cron line""" 

1320 def __init__(self, value, enums): 

1321 self.text = value 

1322 self.value = enums.index(value.lower()) 

1323 

1324 def __lt__(self, value): 

1325 return self.value < int(value) 

1326 

1327 def __repr__(self): 

1328 return str(self) 

1329 

1330 def __str__(self): 

1331 return self.text 

1332 

1333 def __int__(self): 

1334 return self.value 

1335 

1336 

1337def _render_values(values, sep=',', resolve=False): 

1338 """Returns a rendered list, sorted and optionally resolved""" 

1339 if len(values) > 1: 1339 ↛ 1340line 1339 didn't jump to line 1340, because the condition on line 1339 was never true

1340 values.sort() 

1341 return sep.join([_render(val, resolve) for val in values]) 

1342 

1343 

1344def _render(value, resolve=False): 

1345 """Return a single value rendered""" 

1346 if isinstance(value, CronRange): 

1347 return value.render(resolve) 

1348 if resolve: 1348 ↛ 1349line 1348 didn't jump to line 1349, because the condition on line 1348 was never true

1349 return str(int(value)) 

1350 return str(f'{value:02d}' if ZERO_PAD else value) 

1351 

1352 

1353class CronRange: 

1354 """A range between one value and another for a time range.""" 

1355 def __init__(self, vslice, *vrange): 

1356 # holds an extra dangling entry, for example sundays. 

1357 self.dangling = None 

1358 self.slice = vslice 

1359 self.cron = None 

1360 self.seq = 1 

1361 

1362 if not vrange: 1362 ↛ 1363line 1362 didn't jump to line 1363, because the condition on line 1362 was never true

1363 self.all() 

1364 elif isinstance(vrange[0], str): 1364 ↛ 1366line 1364 didn't jump to line 1366, because the condition on line 1364 was never false

1365 self.parse(vrange[0]) 

1366 elif isinstance(vrange[0], (int, CronValue)): 

1367 if len(vrange) == 2: 

1368 (self.vfrom, self.vto) = vrange 

1369 else: 

1370 self.seq = vrange[0] 

1371 self.all() 

1372 

1373 def parse(self, value): 

1374 """Parse a ranged value in a cronjob""" 

1375 if value.count('/') == 1: 1375 ↛ 1376line 1375 didn't jump to line 1376, because the condition on line 1375 was never true

1376 value, seq = value.split('/') 

1377 try: 

1378 self.seq = self.slice.parse_value(seq) 

1379 except SundayError: 

1380 self.seq = 1 

1381 value = "0-0" 

1382 if self.seq < 1 or self.seq > self.slice.max: 

1383 raise ValueError("Sequence can not be divided by zero or max") 

1384 if value.count('-') == 1: 1384 ↛ 1385line 1384 didn't jump to line 1385, because the condition on line 1384 was never true

1385 vfrom, vto = value.split('-') 

1386 self.vfrom = self.slice.parse_value(vfrom, sunday=0) 

1387 try: 

1388 self.vto = self.slice.parse_value(vto) 

1389 except SundayError: 

1390 if self.vfrom == 1: 

1391 self.vfrom = 0 

1392 else: 

1393 self.dangling = 0 

1394 self.vto = self.slice.parse_value(vto, sunday=6) 

1395 if self.vto < self.vfrom: 

1396 raise ValueError(f"Bad range '{self.vfrom}-{self.vto}'") 

1397 elif value == '*': 1397 ↛ 1400line 1397 didn't jump to line 1400, because the condition on line 1397 was never false

1398 self.all() 

1399 else: 

1400 raise ValueError(f'Unknown cron range value "{value}"') 

1401 

1402 def all(self): 

1403 """Set this slice to all units between the miniumum and maximum""" 

1404 self.vfrom = self.slice.min 

1405 self.vto = self.slice.max 

1406 

1407 def render(self, resolve=False): 

1408 """Render the ranged value for a cronjob""" 

1409 value = '*' 

1410 if int(self.vfrom) > self.slice.min or int(self.vto) < self.slice.max: 1410 ↛ 1411line 1410 didn't jump to line 1411, because the condition on line 1410 was never true

1411 if self.vfrom == self.vto: 

1412 value = str(self.vfrom) 

1413 else: 

1414 value = _render_values([self.vfrom, self.vto], '-', resolve) 

1415 if self.seq != 1: 1415 ↛ 1416line 1415 didn't jump to line 1416, because the condition on line 1415 was never true

1416 value += f"/{self.seq:d}" 

1417 if value != '*' and SYSTEMV: 1417 ↛ 1418line 1417 didn't jump to line 1418, because the condition on line 1417 was never true

1418 value = ','.join([str(val) for val in self.range()]) 

1419 return value 

1420 

1421 def range(self): 

1422 """Returns the range of this cron slice as a iterable list""" 

1423 return range(int(self.vfrom), int(self.vto)+1, self.seq) 

1424 

1425 def every(self, value): 

1426 """Set the sequence value for this range.""" 

1427 self.seq = int(value) 

1428 

1429 def __lt__(self, value): 

1430 return int(self.vfrom) < int(value) 

1431 

1432 def __gt__(self, value): 

1433 return int(self.vto) > int(value) 

1434 

1435 def __int__(self): 

1436 return int(self.vfrom) 

1437 

1438 def __str__(self): 

1439 return self.render() 

1440 

1441 

1442class OrderedVariableList(OrderedDict): 

1443 """An ordered dictionary with a linked list containing 

1444 the previous OrderedVariableList which this list depends. 

1445 

1446 Duplicates in this list are weeded out in favour of the previous 

1447 list in the chain. 

1448 

1449 This is all in aid of the ENV variables list which must exist one 

1450 per job in the chain. 

1451 """ 

1452 def __init__(self, *args, **kw): 

1453 self.job = kw.pop('job', None) 

1454 super().__init__(*args, **kw) 

1455 

1456 @property 

1457 def previous(self): 

1458 """Returns the previous env in the list of jobs in the cron""" 

1459 if self.job is not None and self.job.cron is not None: 

1460 index = self.job.cron.crons.index(self.job) 

1461 if index == 0: 

1462 return self.job.cron.env 

1463 return self.job.cron[index-1].env 

1464 return None 

1465 

1466 def all(self): 

1467 """ 

1468 Returns the full dictionary, everything from this dictionary 

1469 plus all those in the chain above us. 

1470 """ 

1471 if self.job is not None: 

1472 ret = self.previous.all().copy() 

1473 ret.update(self) 

1474 return ret 

1475 return self.copy() 

1476 

1477 def __getitem__(self, key): 

1478 previous = self.previous 

1479 if key in self: 

1480 return super().__getitem__(key) 

1481 if previous is not None: 

1482 return previous.all()[key] 

1483 raise KeyError(f"Environment Variable '{key}' not found.") 

1484 

1485 def __str__(self): 

1486 """Constructs to variable list output used in cron jobs""" 

1487 ret = [] 

1488 for key, value in self.items(): 

1489 if self.previous: 

1490 if self.previous.all().get(key, None) == value: 

1491 continue 

1492 if ' ' in str(value) or value == '': 

1493 value = f'"{value}"' 

1494 ret.append(f"{key}={value}") 

1495 ret.append('') 

1496 return "\n".join(ret)