Coverage for src/click/_termui_impl.py: 10%

439 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-05-21 11:44 +0200

1""" 

2This module contains implementations for the termui module. To keep the 

3import time of Click down, some infrequently used functionality is 

4placed in this module and only imported as needed. 

5""" 

6import contextlib 

7import math 

8import os 

9import sys 

10import time 

11import typing as t 

12from gettext import gettext as _ 

13 

14from ._compat import _default_text_stdout 

15from ._compat import CYGWIN 

16from ._compat import get_best_encoding 

17from ._compat import isatty 

18from ._compat import open_stream 

19from ._compat import strip_ansi 

20from ._compat import term_len 

21from ._compat import WIN 

22from .exceptions import ClickException 

23from .utils import echo 

24 

25V = t.TypeVar("V") 

26 

27if os.name == "nt": 27 ↛ 28line 27 didn't jump to line 28, because the condition on line 27 was never true

28 BEFORE_BAR = "\r" 

29 AFTER_BAR = "\n" 

30else: 

31 BEFORE_BAR = "\r\033[?25l" 

32 AFTER_BAR = "\033[?25h\n" 

33 

34 

35class ProgressBar(t.Generic[V]): 

36 def __init__( 

37 self, 

38 iterable: t.Optional[t.Iterable[V]], 

39 length: t.Optional[int] = None, 

40 fill_char: str = "#", 

41 empty_char: str = " ", 

42 bar_template: str = "%(bar)s", 

43 info_sep: str = " ", 

44 show_eta: bool = True, 

45 show_percent: t.Optional[bool] = None, 

46 show_pos: bool = False, 

47 item_show_func: t.Optional[t.Callable[[t.Optional[V]], t.Optional[str]]] = None, 

48 label: t.Optional[str] = None, 

49 file: t.Optional[t.TextIO] = None, 

50 color: t.Optional[bool] = None, 

51 update_min_steps: int = 1, 

52 width: int = 30, 

53 ) -> None: 

54 self.fill_char = fill_char 

55 self.empty_char = empty_char 

56 self.bar_template = bar_template 

57 self.info_sep = info_sep 

58 self.show_eta = show_eta 

59 self.show_percent = show_percent 

60 self.show_pos = show_pos 

61 self.item_show_func = item_show_func 

62 self.label = label or "" 

63 if file is None: 

64 file = _default_text_stdout() 

65 self.file = file 

66 self.color = color 

67 self.update_min_steps = update_min_steps 

68 self._completed_intervals = 0 

69 self.width = width 

70 self.autowidth = width == 0 

71 

72 if length is None: 

73 from operator import length_hint 

74 

75 length = length_hint(iterable, -1) 

76 

77 if length == -1: 

78 length = None 

79 if iterable is None: 

80 if length is None: 

81 raise TypeError("iterable or length is required") 

82 iterable = t.cast(t.Iterable[V], range(length)) 

83 self.iter = iter(iterable) 

84 self.length = length 

85 self.pos = 0 

86 self.avg: t.List[float] = [] 

87 self.start = self.last_eta = time.time() 

88 self.eta_known = False 

89 self.finished = False 

90 self.max_width: t.Optional[int] = None 

91 self.entered = False 

92 self.current_item: t.Optional[V] = None 

93 self.is_hidden = not isatty(self.file) 

94 self._last_line: t.Optional[str] = None 

95 

96 def __enter__(self) -> "ProgressBar[V]": 

97 self.entered = True 

98 self.render_progress() 

99 return self 

100 

101 def __exit__(self, *_: t.Any) -> None: 

102 self.render_finish() 

103 

104 def __iter__(self) -> t.Iterator[V]: 

105 if not self.entered: 

106 raise RuntimeError("You need to use progress bars in a with block.") 

107 self.render_progress() 

108 return self.generator() 

109 

110 def __next__(self) -> V: 

111 # Iteration is defined in terms of a generator function, 

112 # returned by iter(self); use that to define next(). This works 

113 # because `self.iter` is an iterable consumed by that generator, 

114 # so it is re-entry safe. Calling `next(self.generator())` 

115 # twice works and does "what you want". 

116 return next(iter(self)) 

117 

118 def render_finish(self) -> None: 

119 if self.is_hidden: 

120 return 

121 self.file.write(AFTER_BAR) 

122 self.file.flush() 

123 

124 @property 

125 def pct(self) -> float: 

126 if self.finished: 

127 return 1.0 

128 return min(self.pos / (float(self.length or 1) or 1), 1.0) 

129 

130 @property 

131 def time_per_iteration(self) -> float: 

132 if not self.avg: 

133 return 0.0 

134 return sum(self.avg) / float(len(self.avg)) 

135 

136 @property 

137 def eta(self) -> float: 

138 if self.length is not None and not self.finished: 

139 return self.time_per_iteration * (self.length - self.pos) 

140 return 0.0 

141 

142 def format_eta(self) -> str: 

143 if self.eta_known: 

144 t = int(self.eta) 

145 seconds = t % 60 

146 t //= 60 

147 minutes = t % 60 

148 t //= 60 

149 hours = t % 24 

150 t //= 24 

151 if t > 0: 

152 return f"{t}d {hours:02}:{minutes:02}:{seconds:02}" 

153 else: 

154 return f"{hours:02}:{minutes:02}:{seconds:02}" 

155 return "" 

156 

157 def format_pos(self) -> str: 

158 pos = str(self.pos) 

159 if self.length is not None: 

160 pos += f"/{self.length}" 

161 return pos 

162 

163 def format_pct(self) -> str: 

164 return f"{int(self.pct * 100): 4}%"[1:] 

165 

166 def format_bar(self) -> str: 

167 if self.length is not None: 

168 bar_length = int(self.pct * self.width) 

169 bar = self.fill_char * bar_length 

170 bar += self.empty_char * (self.width - bar_length) 

171 elif self.finished: 

172 bar = self.fill_char * self.width 

173 else: 

174 chars = list(self.empty_char * (self.width or 1)) 

175 if self.time_per_iteration != 0: 

176 chars[ 

177 int( 

178 (math.cos(self.pos * self.time_per_iteration) / 2.0 + 0.5) 

179 * self.width 

180 ) 

181 ] = self.fill_char 

182 bar = "".join(chars) 

183 return bar 

184 

185 def format_progress_line(self) -> str: 

186 show_percent = self.show_percent 

187 

188 info_bits = [] 

189 if self.length is not None and show_percent is None: 

190 show_percent = not self.show_pos 

191 

192 if self.show_pos: 

193 info_bits.append(self.format_pos()) 

194 if show_percent: 

195 info_bits.append(self.format_pct()) 

196 if self.show_eta and self.eta_known and not self.finished: 

197 info_bits.append(self.format_eta()) 

198 if self.item_show_func is not None: 

199 item_info = self.item_show_func(self.current_item) 

200 if item_info is not None: 

201 info_bits.append(item_info) 

202 

203 return ( 

204 self.bar_template 

205 % { 

206 "label": self.label, 

207 "bar": self.format_bar(), 

208 "info": self.info_sep.join(info_bits), 

209 } 

210 ).rstrip() 

211 

212 def render_progress(self) -> None: 

213 import shutil 

214 

215 if self.is_hidden: 

216 # Only output the label as it changes if the output is not a 

217 # TTY. Use file=stderr if you expect to be piping stdout. 

218 if self._last_line != self.label: 

219 self._last_line = self.label 

220 echo(self.label, file=self.file, color=self.color) 

221 

222 return 

223 

224 buf = [] 

225 # Update width in case the terminal has been resized 

226 if self.autowidth: 

227 old_width = self.width 

228 self.width = 0 

229 clutter_length = term_len(self.format_progress_line()) 

230 new_width = max(0, shutil.get_terminal_size().columns - clutter_length) 

231 if new_width < old_width: 

232 buf.append(BEFORE_BAR) 

233 buf.append(" " * self.max_width) # type: ignore 

234 self.max_width = new_width 

235 self.width = new_width 

236 

237 clear_width = self.width 

238 if self.max_width is not None: 

239 clear_width = self.max_width 

240 

241 buf.append(BEFORE_BAR) 

242 line = self.format_progress_line() 

243 line_len = term_len(line) 

244 if self.max_width is None or self.max_width < line_len: 

245 self.max_width = line_len 

246 

247 buf.append(line) 

248 buf.append(" " * (clear_width - line_len)) 

249 line = "".join(buf) 

250 # Render the line only if it changed. 

251 

252 if line != self._last_line: 

253 self._last_line = line 

254 echo(line, file=self.file, color=self.color, nl=False) 

255 self.file.flush() 

256 

257 def make_step(self, n_steps: int) -> None: 

258 self.pos += n_steps 

259 if self.length is not None and self.pos >= self.length: 

260 self.finished = True 

261 

262 if (time.time() - self.last_eta) < 1.0: 

263 return 

264 

265 self.last_eta = time.time() 

266 

267 # self.avg is a rolling list of length <= 7 of steps where steps are 

268 # defined as time elapsed divided by the total progress through 

269 # self.length. 

270 if self.pos: 

271 step = (time.time() - self.start) / self.pos 

272 else: 

273 step = time.time() - self.start 

274 

275 self.avg = self.avg[-6:] + [step] 

276 

277 self.eta_known = self.length is not None 

278 

279 def update(self, n_steps: int, current_item: t.Optional[V] = None) -> None: 

280 """Update the progress bar by advancing a specified number of 

281 steps, and optionally set the ``current_item`` for this new 

282 position. 

283 

284 :param n_steps: Number of steps to advance. 

285 :param current_item: Optional item to set as ``current_item`` 

286 for the updated position. 

287 

288 .. versionchanged:: 8.0 

289 Added the ``current_item`` optional parameter. 

290 

291 .. versionchanged:: 8.0 

292 Only render when the number of steps meets the 

293 ``update_min_steps`` threshold. 

294 """ 

295 if current_item is not None: 

296 self.current_item = current_item 

297 

298 self._completed_intervals += n_steps 

299 

300 if self._completed_intervals >= self.update_min_steps: 

301 self.make_step(self._completed_intervals) 

302 self.render_progress() 

303 self._completed_intervals = 0 

304 

305 def finish(self) -> None: 

306 self.eta_known = False 

307 self.current_item = None 

308 self.finished = True 

309 

310 def generator(self) -> t.Iterator[V]: 

311 """Return a generator which yields the items added to the bar 

312 during construction, and updates the progress bar *after* the 

313 yielded block returns. 

314 """ 

315 # WARNING: the iterator interface for `ProgressBar` relies on 

316 # this and only works because this is a simple generator which 

317 # doesn't create or manage additional state. If this function 

318 # changes, the impact should be evaluated both against 

319 # `iter(bar)` and `next(bar)`. `next()` in particular may call 

320 # `self.generator()` repeatedly, and this must remain safe in 

321 # order for that interface to work. 

322 if not self.entered: 

323 raise RuntimeError("You need to use progress bars in a with block.") 

324 

325 if self.is_hidden: 

326 yield from self.iter 

327 else: 

328 for rv in self.iter: 

329 self.current_item = rv 

330 

331 # This allows show_item_func to be updated before the 

332 # item is processed. Only trigger at the beginning of 

333 # the update interval. 

334 if self._completed_intervals == 0: 

335 self.render_progress() 

336 

337 yield rv 

338 self.update(1) 

339 

340 self.finish() 

341 self.render_progress() 

342 

343 

344def pager(generator: t.Iterable[str], color: t.Optional[bool] = None) -> None: 

345 """Decide what method to use for paging through text.""" 

346 stdout = _default_text_stdout() 

347 if not isatty(sys.stdin) or not isatty(stdout): 

348 return _nullpager(stdout, generator, color) 

349 pager_cmd = (os.environ.get("PAGER", None) or "").strip() 

350 if pager_cmd: 

351 if WIN: 

352 return _tempfilepager(generator, pager_cmd, color) 

353 return _pipepager(generator, pager_cmd, color) 

354 if os.environ.get("TERM") in ("dumb", "emacs"): 

355 return _nullpager(stdout, generator, color) 

356 if WIN or sys.platform.startswith("os2"): 

357 return _tempfilepager(generator, "more <", color) 

358 if hasattr(os, "system") and os.system("(less) 2>/dev/null") == 0: 

359 return _pipepager(generator, "less", color) 

360 

361 import tempfile 

362 

363 fd, filename = tempfile.mkstemp() 

364 os.close(fd) 

365 try: 

366 if hasattr(os, "system") and os.system(f'more "{filename}"') == 0: 

367 return _pipepager(generator, "more", color) 

368 return _nullpager(stdout, generator, color) 

369 finally: 

370 os.unlink(filename) 

371 

372 

373def _pipepager(generator: t.Iterable[str], cmd: str, color: t.Optional[bool]) -> None: 

374 """Page through text by feeding it to another program. Invoking a 

375 pager through this might support colors. 

376 """ 

377 import subprocess 

378 

379 env = dict(os.environ) 

380 

381 # If we're piping to less we might support colors under the 

382 # condition that 

383 cmd_detail = cmd.rsplit("/", 1)[-1].split() 

384 if color is None and cmd_detail[0] == "less": 

385 less_flags = f"{os.environ.get('LESS', '')}{' '.join(cmd_detail[1:])}" 

386 if not less_flags: 

387 env["LESS"] = "-R" 

388 color = True 

389 elif "r" in less_flags or "R" in less_flags: 

390 color = True 

391 

392 c = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, env=env) 

393 stdin = t.cast(t.BinaryIO, c.stdin) 

394 encoding = get_best_encoding(stdin) 

395 try: 

396 for text in generator: 

397 if not color: 

398 text = strip_ansi(text) 

399 

400 stdin.write(text.encode(encoding, "replace")) 

401 except (OSError, KeyboardInterrupt): 

402 pass 

403 else: 

404 stdin.close() 

405 

406 # Less doesn't respect ^C, but catches it for its own UI purposes (aborting 

407 # search or other commands inside less). 

408 # 

409 # That means when the user hits ^C, the parent process (click) terminates, 

410 # but less is still alive, paging the output and messing up the terminal. 

411 # 

412 # If the user wants to make the pager exit on ^C, they should set 

413 # `LESS='-K'`. It's not our decision to make. 

414 while True: 

415 try: 

416 c.wait() 

417 except KeyboardInterrupt: 

418 pass 

419 else: 

420 break 

421 

422 

423def _tempfilepager( 

424 generator: t.Iterable[str], cmd: str, color: t.Optional[bool] 

425) -> None: 

426 """Page through text by invoking a program on a temporary file.""" 

427 import tempfile 

428 

429 fd, filename = tempfile.mkstemp() 

430 # TODO: This never terminates if the passed generator never terminates. 

431 text = "".join(generator) 

432 if not color: 

433 text = strip_ansi(text) 

434 encoding = get_best_encoding(sys.stdout) 

435 with open_stream(filename, "wb")[0] as f: 

436 f.write(text.encode(encoding)) 

437 try: 

438 os.system(f'{cmd} "{filename}"') 

439 finally: 

440 os.close(fd) 

441 os.unlink(filename) 

442 

443 

444def _nullpager( 

445 stream: t.TextIO, generator: t.Iterable[str], color: t.Optional[bool] 

446) -> None: 

447 """Simply print unformatted text. This is the ultimate fallback.""" 

448 for text in generator: 

449 if not color: 

450 text = strip_ansi(text) 

451 stream.write(text) 

452 

453 

454class Editor: 

455 def __init__( 

456 self, 

457 editor: t.Optional[str] = None, 

458 env: t.Optional[t.Mapping[str, str]] = None, 

459 require_save: bool = True, 

460 extension: str = ".txt", 

461 ) -> None: 

462 self.editor = editor 

463 self.env = env 

464 self.require_save = require_save 

465 self.extension = extension 

466 

467 def get_editor(self) -> str: 

468 if self.editor is not None: 

469 return self.editor 

470 for key in "VISUAL", "EDITOR": 

471 rv = os.environ.get(key) 

472 if rv: 

473 return rv 

474 if WIN: 

475 return "notepad" 

476 for editor in "sensible-editor", "vim", "nano": 

477 if os.system(f"which {editor} >/dev/null 2>&1") == 0: 

478 return editor 

479 return "vi" 

480 

481 def edit_file(self, filename: str) -> None: 

482 import subprocess 

483 

484 editor = self.get_editor() 

485 environ: t.Optional[t.Dict[str, str]] = None 

486 

487 if self.env: 

488 environ = os.environ.copy() 

489 environ.update(self.env) 

490 

491 try: 

492 c = subprocess.Popen(f'{editor} "{filename}"', env=environ, shell=True) 

493 exit_code = c.wait() 

494 if exit_code != 0: 

495 raise ClickException( 

496 _("{editor}: Editing failed").format(editor=editor) 

497 ) 

498 except OSError as e: 

499 raise ClickException( 

500 _("{editor}: Editing failed: {e}").format(editor=editor, e=e) 

501 ) from e 

502 

503 def edit(self, text: t.Optional[t.AnyStr]) -> t.Optional[t.AnyStr]: 

504 import tempfile 

505 

506 if not text: 

507 data = b"" 

508 elif isinstance(text, (bytes, bytearray)): 

509 data = text 

510 else: 

511 if text and not text.endswith("\n"): 

512 text += "\n" 

513 

514 if WIN: 

515 data = text.replace("\n", "\r\n").encode("utf-8-sig") 

516 else: 

517 data = text.encode("utf-8") 

518 

519 fd, name = tempfile.mkstemp(prefix="editor-", suffix=self.extension) 

520 f: t.BinaryIO 

521 

522 try: 

523 with os.fdopen(fd, "wb") as f: 

524 f.write(data) 

525 

526 # If the filesystem resolution is 1 second, like Mac OS 

527 # 10.12 Extended, or 2 seconds, like FAT32, and the editor 

528 # closes very fast, require_save can fail. Set the modified 

529 # time to be 2 seconds in the past to work around this. 

530 os.utime(name, (os.path.getatime(name), os.path.getmtime(name) - 2)) 

531 # Depending on the resolution, the exact value might not be 

532 # recorded, so get the new recorded value. 

533 timestamp = os.path.getmtime(name) 

534 

535 self.edit_file(name) 

536 

537 if self.require_save and os.path.getmtime(name) == timestamp: 

538 return None 

539 

540 with open(name, "rb") as f: 

541 rv = f.read() 

542 

543 if isinstance(text, (bytes, bytearray)): 

544 return rv 

545 

546 return rv.decode("utf-8-sig").replace("\r\n", "\n") # type: ignore 

547 finally: 

548 os.unlink(name) 

549 

550 

551def open_url(url: str, wait: bool = False, locate: bool = False) -> int: 

552 import subprocess 

553 

554 def _unquote_file(url: str) -> str: 

555 from urllib.parse import unquote 

556 

557 if url.startswith("file://"): 

558 url = unquote(url[7:]) 

559 

560 return url 

561 

562 if sys.platform == "darwin": 

563 args = ["open"] 

564 if wait: 

565 args.append("-W") 

566 if locate: 

567 args.append("-R") 

568 args.append(_unquote_file(url)) 

569 null = open("/dev/null", "w") 

570 try: 

571 return subprocess.Popen(args, stderr=null).wait() 

572 finally: 

573 null.close() 

574 elif WIN: 

575 if locate: 

576 url = _unquote_file(url.replace('"', "")) 

577 args = f'explorer /select,"{url}"' 

578 else: 

579 url = url.replace('"', "") 

580 wait_str = "/WAIT" if wait else "" 

581 args = f'start {wait_str} "" "{url}"' 

582 return os.system(args) 

583 elif CYGWIN: 

584 if locate: 

585 url = os.path.dirname(_unquote_file(url).replace('"', "")) 

586 args = f'cygstart "{url}"' 

587 else: 

588 url = url.replace('"', "") 

589 wait_str = "-w" if wait else "" 

590 args = f'cygstart {wait_str} "{url}"' 

591 return os.system(args) 

592 

593 try: 

594 if locate: 

595 url = os.path.dirname(_unquote_file(url)) or "." 

596 else: 

597 url = _unquote_file(url) 

598 c = subprocess.Popen(["xdg-open", url]) 

599 if wait: 

600 return c.wait() 

601 return 0 

602 except OSError: 

603 if url.startswith(("http://", "https://")) and not locate and not wait: 

604 import webbrowser 

605 

606 webbrowser.open(url) 

607 return 0 

608 return 1 

609 

610 

611def _translate_ch_to_exc(ch: str) -> t.Optional[BaseException]: 

612 if ch == "\x03": 

613 raise KeyboardInterrupt() 

614 

615 if ch == "\x04" and not WIN: # Unix-like, Ctrl+D 

616 raise EOFError() 

617 

618 if ch == "\x1a" and WIN: # Windows, Ctrl+Z 

619 raise EOFError() 

620 

621 return None 

622 

623 

624if WIN: 624 ↛ 625line 624 didn't jump to line 625, because the condition on line 624 was never true

625 import msvcrt 

626 

627 @contextlib.contextmanager 

628 def raw_terminal() -> t.Iterator[int]: 

629 yield -1 

630 

631 def getchar(echo: bool) -> str: 

632 # The function `getch` will return a bytes object corresponding to 

633 # the pressed character. Since Windows 10 build 1803, it will also 

634 # return \x00 when called a second time after pressing a regular key. 

635 # 

636 # `getwch` does not share this probably-bugged behavior. Moreover, it 

637 # returns a Unicode object by default, which is what we want. 

638 # 

639 # Either of these functions will return \x00 or \xe0 to indicate 

640 # a special key, and you need to call the same function again to get 

641 # the "rest" of the code. The fun part is that \u00e0 is 

642 # "latin small letter a with grave", so if you type that on a French 

643 # keyboard, you _also_ get a \xe0. 

644 # E.g., consider the Up arrow. This returns \xe0 and then \x48. The 

645 # resulting Unicode string reads as "a with grave" + "capital H". 

646 # This is indistinguishable from when the user actually types 

647 # "a with grave" and then "capital H". 

648 # 

649 # When \xe0 is returned, we assume it's part of a special-key sequence 

650 # and call `getwch` again, but that means that when the user types 

651 # the \u00e0 character, `getchar` doesn't return until a second 

652 # character is typed. 

653 # The alternative is returning immediately, but that would mess up 

654 # cross-platform handling of arrow keys and others that start with 

655 # \xe0. Another option is using `getch`, but then we can't reliably 

656 # read non-ASCII characters, because return values of `getch` are 

657 # limited to the current 8-bit codepage. 

658 # 

659 # Anyway, Click doesn't claim to do this Right(tm), and using `getwch` 

660 # is doing the right thing in more situations than with `getch`. 

661 func: t.Callable[[], str] 

662 

663 if echo: 

664 func = msvcrt.getwche # type: ignore 

665 else: 

666 func = msvcrt.getwch # type: ignore 

667 

668 rv = func() 

669 

670 if rv in ("\x00", "\xe0"): 

671 # \x00 and \xe0 are control characters that indicate special key, 

672 # see above. 

673 rv += func() 

674 

675 _translate_ch_to_exc(rv) 

676 return rv 

677 

678else: 

679 import tty 

680 import termios 

681 

682 @contextlib.contextmanager 

683 def raw_terminal() -> t.Iterator[int]: 

684 f: t.Optional[t.TextIO] 

685 fd: int 

686 

687 if not isatty(sys.stdin): 

688 f = open("/dev/tty") 

689 fd = f.fileno() 

690 else: 

691 fd = sys.stdin.fileno() 

692 f = None 

693 

694 try: 

695 old_settings = termios.tcgetattr(fd) 

696 

697 try: 

698 tty.setraw(fd) 

699 yield fd 

700 finally: 

701 termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) 

702 sys.stdout.flush() 

703 

704 if f is not None: 

705 f.close() 

706 except termios.error: 

707 pass 

708 

709 def getchar(echo: bool) -> str: 

710 with raw_terminal() as fd: 

711 ch = os.read(fd, 32).decode(get_best_encoding(sys.stdin), "replace") 

712 

713 if echo and isatty(sys.stdout): 

714 sys.stdout.write(ch) 

715 

716 _translate_ch_to_exc(ch) 

717 return ch