Coverage for src/click/_termui_impl.py: 10%
439 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-05-21 11:44 +0200
« prev ^ index » next coverage.py v7.2.2, created at 2023-05-21 11:44 +0200
1"""
2This module contains implementations for the termui module. To keep the
3import time of Click down, some infrequently used functionality is
4placed in this module and only imported as needed.
5"""
6import contextlib
7import math
8import os
9import sys
10import time
11import typing as t
12from gettext import gettext as _
14from ._compat import _default_text_stdout
15from ._compat import CYGWIN
16from ._compat import get_best_encoding
17from ._compat import isatty
18from ._compat import open_stream
19from ._compat import strip_ansi
20from ._compat import term_len
21from ._compat import WIN
22from .exceptions import ClickException
23from .utils import echo
25V = t.TypeVar("V")
27if os.name == "nt": 27 ↛ 28line 27 didn't jump to line 28, because the condition on line 27 was never true
28 BEFORE_BAR = "\r"
29 AFTER_BAR = "\n"
30else:
31 BEFORE_BAR = "\r\033[?25l"
32 AFTER_BAR = "\033[?25h\n"
35class ProgressBar(t.Generic[V]):
36 def __init__(
37 self,
38 iterable: t.Optional[t.Iterable[V]],
39 length: t.Optional[int] = None,
40 fill_char: str = "#",
41 empty_char: str = " ",
42 bar_template: str = "%(bar)s",
43 info_sep: str = " ",
44 show_eta: bool = True,
45 show_percent: t.Optional[bool] = None,
46 show_pos: bool = False,
47 item_show_func: t.Optional[t.Callable[[t.Optional[V]], t.Optional[str]]] = None,
48 label: t.Optional[str] = None,
49 file: t.Optional[t.TextIO] = None,
50 color: t.Optional[bool] = None,
51 update_min_steps: int = 1,
52 width: int = 30,
53 ) -> None:
54 self.fill_char = fill_char
55 self.empty_char = empty_char
56 self.bar_template = bar_template
57 self.info_sep = info_sep
58 self.show_eta = show_eta
59 self.show_percent = show_percent
60 self.show_pos = show_pos
61 self.item_show_func = item_show_func
62 self.label = label or ""
63 if file is None:
64 file = _default_text_stdout()
65 self.file = file
66 self.color = color
67 self.update_min_steps = update_min_steps
68 self._completed_intervals = 0
69 self.width = width
70 self.autowidth = width == 0
72 if length is None:
73 from operator import length_hint
75 length = length_hint(iterable, -1)
77 if length == -1:
78 length = None
79 if iterable is None:
80 if length is None:
81 raise TypeError("iterable or length is required")
82 iterable = t.cast(t.Iterable[V], range(length))
83 self.iter = iter(iterable)
84 self.length = length
85 self.pos = 0
86 self.avg: t.List[float] = []
87 self.start = self.last_eta = time.time()
88 self.eta_known = False
89 self.finished = False
90 self.max_width: t.Optional[int] = None
91 self.entered = False
92 self.current_item: t.Optional[V] = None
93 self.is_hidden = not isatty(self.file)
94 self._last_line: t.Optional[str] = None
96 def __enter__(self) -> "ProgressBar[V]":
97 self.entered = True
98 self.render_progress()
99 return self
101 def __exit__(self, *_: t.Any) -> None:
102 self.render_finish()
104 def __iter__(self) -> t.Iterator[V]:
105 if not self.entered:
106 raise RuntimeError("You need to use progress bars in a with block.")
107 self.render_progress()
108 return self.generator()
110 def __next__(self) -> V:
111 # Iteration is defined in terms of a generator function,
112 # returned by iter(self); use that to define next(). This works
113 # because `self.iter` is an iterable consumed by that generator,
114 # so it is re-entry safe. Calling `next(self.generator())`
115 # twice works and does "what you want".
116 return next(iter(self))
118 def render_finish(self) -> None:
119 if self.is_hidden:
120 return
121 self.file.write(AFTER_BAR)
122 self.file.flush()
124 @property
125 def pct(self) -> float:
126 if self.finished:
127 return 1.0
128 return min(self.pos / (float(self.length or 1) or 1), 1.0)
130 @property
131 def time_per_iteration(self) -> float:
132 if not self.avg:
133 return 0.0
134 return sum(self.avg) / float(len(self.avg))
136 @property
137 def eta(self) -> float:
138 if self.length is not None and not self.finished:
139 return self.time_per_iteration * (self.length - self.pos)
140 return 0.0
142 def format_eta(self) -> str:
143 if self.eta_known:
144 t = int(self.eta)
145 seconds = t % 60
146 t //= 60
147 minutes = t % 60
148 t //= 60
149 hours = t % 24
150 t //= 24
151 if t > 0:
152 return f"{t}d {hours:02}:{minutes:02}:{seconds:02}"
153 else:
154 return f"{hours:02}:{minutes:02}:{seconds:02}"
155 return ""
157 def format_pos(self) -> str:
158 pos = str(self.pos)
159 if self.length is not None:
160 pos += f"/{self.length}"
161 return pos
163 def format_pct(self) -> str:
164 return f"{int(self.pct * 100): 4}%"[1:]
166 def format_bar(self) -> str:
167 if self.length is not None:
168 bar_length = int(self.pct * self.width)
169 bar = self.fill_char * bar_length
170 bar += self.empty_char * (self.width - bar_length)
171 elif self.finished:
172 bar = self.fill_char * self.width
173 else:
174 chars = list(self.empty_char * (self.width or 1))
175 if self.time_per_iteration != 0:
176 chars[
177 int(
178 (math.cos(self.pos * self.time_per_iteration) / 2.0 + 0.5)
179 * self.width
180 )
181 ] = self.fill_char
182 bar = "".join(chars)
183 return bar
185 def format_progress_line(self) -> str:
186 show_percent = self.show_percent
188 info_bits = []
189 if self.length is not None and show_percent is None:
190 show_percent = not self.show_pos
192 if self.show_pos:
193 info_bits.append(self.format_pos())
194 if show_percent:
195 info_bits.append(self.format_pct())
196 if self.show_eta and self.eta_known and not self.finished:
197 info_bits.append(self.format_eta())
198 if self.item_show_func is not None:
199 item_info = self.item_show_func(self.current_item)
200 if item_info is not None:
201 info_bits.append(item_info)
203 return (
204 self.bar_template
205 % {
206 "label": self.label,
207 "bar": self.format_bar(),
208 "info": self.info_sep.join(info_bits),
209 }
210 ).rstrip()
212 def render_progress(self) -> None:
213 import shutil
215 if self.is_hidden:
216 # Only output the label as it changes if the output is not a
217 # TTY. Use file=stderr if you expect to be piping stdout.
218 if self._last_line != self.label:
219 self._last_line = self.label
220 echo(self.label, file=self.file, color=self.color)
222 return
224 buf = []
225 # Update width in case the terminal has been resized
226 if self.autowidth:
227 old_width = self.width
228 self.width = 0
229 clutter_length = term_len(self.format_progress_line())
230 new_width = max(0, shutil.get_terminal_size().columns - clutter_length)
231 if new_width < old_width:
232 buf.append(BEFORE_BAR)
233 buf.append(" " * self.max_width) # type: ignore
234 self.max_width = new_width
235 self.width = new_width
237 clear_width = self.width
238 if self.max_width is not None:
239 clear_width = self.max_width
241 buf.append(BEFORE_BAR)
242 line = self.format_progress_line()
243 line_len = term_len(line)
244 if self.max_width is None or self.max_width < line_len:
245 self.max_width = line_len
247 buf.append(line)
248 buf.append(" " * (clear_width - line_len))
249 line = "".join(buf)
250 # Render the line only if it changed.
252 if line != self._last_line:
253 self._last_line = line
254 echo(line, file=self.file, color=self.color, nl=False)
255 self.file.flush()
257 def make_step(self, n_steps: int) -> None:
258 self.pos += n_steps
259 if self.length is not None and self.pos >= self.length:
260 self.finished = True
262 if (time.time() - self.last_eta) < 1.0:
263 return
265 self.last_eta = time.time()
267 # self.avg is a rolling list of length <= 7 of steps where steps are
268 # defined as time elapsed divided by the total progress through
269 # self.length.
270 if self.pos:
271 step = (time.time() - self.start) / self.pos
272 else:
273 step = time.time() - self.start
275 self.avg = self.avg[-6:] + [step]
277 self.eta_known = self.length is not None
279 def update(self, n_steps: int, current_item: t.Optional[V] = None) -> None:
280 """Update the progress bar by advancing a specified number of
281 steps, and optionally set the ``current_item`` for this new
282 position.
284 :param n_steps: Number of steps to advance.
285 :param current_item: Optional item to set as ``current_item``
286 for the updated position.
288 .. versionchanged:: 8.0
289 Added the ``current_item`` optional parameter.
291 .. versionchanged:: 8.0
292 Only render when the number of steps meets the
293 ``update_min_steps`` threshold.
294 """
295 if current_item is not None:
296 self.current_item = current_item
298 self._completed_intervals += n_steps
300 if self._completed_intervals >= self.update_min_steps:
301 self.make_step(self._completed_intervals)
302 self.render_progress()
303 self._completed_intervals = 0
305 def finish(self) -> None:
306 self.eta_known = False
307 self.current_item = None
308 self.finished = True
310 def generator(self) -> t.Iterator[V]:
311 """Return a generator which yields the items added to the bar
312 during construction, and updates the progress bar *after* the
313 yielded block returns.
314 """
315 # WARNING: the iterator interface for `ProgressBar` relies on
316 # this and only works because this is a simple generator which
317 # doesn't create or manage additional state. If this function
318 # changes, the impact should be evaluated both against
319 # `iter(bar)` and `next(bar)`. `next()` in particular may call
320 # `self.generator()` repeatedly, and this must remain safe in
321 # order for that interface to work.
322 if not self.entered:
323 raise RuntimeError("You need to use progress bars in a with block.")
325 if self.is_hidden:
326 yield from self.iter
327 else:
328 for rv in self.iter:
329 self.current_item = rv
331 # This allows show_item_func to be updated before the
332 # item is processed. Only trigger at the beginning of
333 # the update interval.
334 if self._completed_intervals == 0:
335 self.render_progress()
337 yield rv
338 self.update(1)
340 self.finish()
341 self.render_progress()
344def pager(generator: t.Iterable[str], color: t.Optional[bool] = None) -> None:
345 """Decide what method to use for paging through text."""
346 stdout = _default_text_stdout()
347 if not isatty(sys.stdin) or not isatty(stdout):
348 return _nullpager(stdout, generator, color)
349 pager_cmd = (os.environ.get("PAGER", None) or "").strip()
350 if pager_cmd:
351 if WIN:
352 return _tempfilepager(generator, pager_cmd, color)
353 return _pipepager(generator, pager_cmd, color)
354 if os.environ.get("TERM") in ("dumb", "emacs"):
355 return _nullpager(stdout, generator, color)
356 if WIN or sys.platform.startswith("os2"):
357 return _tempfilepager(generator, "more <", color)
358 if hasattr(os, "system") and os.system("(less) 2>/dev/null") == 0:
359 return _pipepager(generator, "less", color)
361 import tempfile
363 fd, filename = tempfile.mkstemp()
364 os.close(fd)
365 try:
366 if hasattr(os, "system") and os.system(f'more "{filename}"') == 0:
367 return _pipepager(generator, "more", color)
368 return _nullpager(stdout, generator, color)
369 finally:
370 os.unlink(filename)
373def _pipepager(generator: t.Iterable[str], cmd: str, color: t.Optional[bool]) -> None:
374 """Page through text by feeding it to another program. Invoking a
375 pager through this might support colors.
376 """
377 import subprocess
379 env = dict(os.environ)
381 # If we're piping to less we might support colors under the
382 # condition that
383 cmd_detail = cmd.rsplit("/", 1)[-1].split()
384 if color is None and cmd_detail[0] == "less":
385 less_flags = f"{os.environ.get('LESS', '')}{' '.join(cmd_detail[1:])}"
386 if not less_flags:
387 env["LESS"] = "-R"
388 color = True
389 elif "r" in less_flags or "R" in less_flags:
390 color = True
392 c = subprocess.Popen(cmd, shell=True, stdin=subprocess.PIPE, env=env)
393 stdin = t.cast(t.BinaryIO, c.stdin)
394 encoding = get_best_encoding(stdin)
395 try:
396 for text in generator:
397 if not color:
398 text = strip_ansi(text)
400 stdin.write(text.encode(encoding, "replace"))
401 except (OSError, KeyboardInterrupt):
402 pass
403 else:
404 stdin.close()
406 # Less doesn't respect ^C, but catches it for its own UI purposes (aborting
407 # search or other commands inside less).
408 #
409 # That means when the user hits ^C, the parent process (click) terminates,
410 # but less is still alive, paging the output and messing up the terminal.
411 #
412 # If the user wants to make the pager exit on ^C, they should set
413 # `LESS='-K'`. It's not our decision to make.
414 while True:
415 try:
416 c.wait()
417 except KeyboardInterrupt:
418 pass
419 else:
420 break
423def _tempfilepager(
424 generator: t.Iterable[str], cmd: str, color: t.Optional[bool]
425) -> None:
426 """Page through text by invoking a program on a temporary file."""
427 import tempfile
429 fd, filename = tempfile.mkstemp()
430 # TODO: This never terminates if the passed generator never terminates.
431 text = "".join(generator)
432 if not color:
433 text = strip_ansi(text)
434 encoding = get_best_encoding(sys.stdout)
435 with open_stream(filename, "wb")[0] as f:
436 f.write(text.encode(encoding))
437 try:
438 os.system(f'{cmd} "{filename}"')
439 finally:
440 os.close(fd)
441 os.unlink(filename)
444def _nullpager(
445 stream: t.TextIO, generator: t.Iterable[str], color: t.Optional[bool]
446) -> None:
447 """Simply print unformatted text. This is the ultimate fallback."""
448 for text in generator:
449 if not color:
450 text = strip_ansi(text)
451 stream.write(text)
454class Editor:
455 def __init__(
456 self,
457 editor: t.Optional[str] = None,
458 env: t.Optional[t.Mapping[str, str]] = None,
459 require_save: bool = True,
460 extension: str = ".txt",
461 ) -> None:
462 self.editor = editor
463 self.env = env
464 self.require_save = require_save
465 self.extension = extension
467 def get_editor(self) -> str:
468 if self.editor is not None:
469 return self.editor
470 for key in "VISUAL", "EDITOR":
471 rv = os.environ.get(key)
472 if rv:
473 return rv
474 if WIN:
475 return "notepad"
476 for editor in "sensible-editor", "vim", "nano":
477 if os.system(f"which {editor} >/dev/null 2>&1") == 0:
478 return editor
479 return "vi"
481 def edit_file(self, filename: str) -> None:
482 import subprocess
484 editor = self.get_editor()
485 environ: t.Optional[t.Dict[str, str]] = None
487 if self.env:
488 environ = os.environ.copy()
489 environ.update(self.env)
491 try:
492 c = subprocess.Popen(f'{editor} "{filename}"', env=environ, shell=True)
493 exit_code = c.wait()
494 if exit_code != 0:
495 raise ClickException(
496 _("{editor}: Editing failed").format(editor=editor)
497 )
498 except OSError as e:
499 raise ClickException(
500 _("{editor}: Editing failed: {e}").format(editor=editor, e=e)
501 ) from e
503 def edit(self, text: t.Optional[t.AnyStr]) -> t.Optional[t.AnyStr]:
504 import tempfile
506 if not text:
507 data = b""
508 elif isinstance(text, (bytes, bytearray)):
509 data = text
510 else:
511 if text and not text.endswith("\n"):
512 text += "\n"
514 if WIN:
515 data = text.replace("\n", "\r\n").encode("utf-8-sig")
516 else:
517 data = text.encode("utf-8")
519 fd, name = tempfile.mkstemp(prefix="editor-", suffix=self.extension)
520 f: t.BinaryIO
522 try:
523 with os.fdopen(fd, "wb") as f:
524 f.write(data)
526 # If the filesystem resolution is 1 second, like Mac OS
527 # 10.12 Extended, or 2 seconds, like FAT32, and the editor
528 # closes very fast, require_save can fail. Set the modified
529 # time to be 2 seconds in the past to work around this.
530 os.utime(name, (os.path.getatime(name), os.path.getmtime(name) - 2))
531 # Depending on the resolution, the exact value might not be
532 # recorded, so get the new recorded value.
533 timestamp = os.path.getmtime(name)
535 self.edit_file(name)
537 if self.require_save and os.path.getmtime(name) == timestamp:
538 return None
540 with open(name, "rb") as f:
541 rv = f.read()
543 if isinstance(text, (bytes, bytearray)):
544 return rv
546 return rv.decode("utf-8-sig").replace("\r\n", "\n") # type: ignore
547 finally:
548 os.unlink(name)
551def open_url(url: str, wait: bool = False, locate: bool = False) -> int:
552 import subprocess
554 def _unquote_file(url: str) -> str:
555 from urllib.parse import unquote
557 if url.startswith("file://"):
558 url = unquote(url[7:])
560 return url
562 if sys.platform == "darwin":
563 args = ["open"]
564 if wait:
565 args.append("-W")
566 if locate:
567 args.append("-R")
568 args.append(_unquote_file(url))
569 null = open("/dev/null", "w")
570 try:
571 return subprocess.Popen(args, stderr=null).wait()
572 finally:
573 null.close()
574 elif WIN:
575 if locate:
576 url = _unquote_file(url.replace('"', ""))
577 args = f'explorer /select,"{url}"'
578 else:
579 url = url.replace('"', "")
580 wait_str = "/WAIT" if wait else ""
581 args = f'start {wait_str} "" "{url}"'
582 return os.system(args)
583 elif CYGWIN:
584 if locate:
585 url = os.path.dirname(_unquote_file(url).replace('"', ""))
586 args = f'cygstart "{url}"'
587 else:
588 url = url.replace('"', "")
589 wait_str = "-w" if wait else ""
590 args = f'cygstart {wait_str} "{url}"'
591 return os.system(args)
593 try:
594 if locate:
595 url = os.path.dirname(_unquote_file(url)) or "."
596 else:
597 url = _unquote_file(url)
598 c = subprocess.Popen(["xdg-open", url])
599 if wait:
600 return c.wait()
601 return 0
602 except OSError:
603 if url.startswith(("http://", "https://")) and not locate and not wait:
604 import webbrowser
606 webbrowser.open(url)
607 return 0
608 return 1
611def _translate_ch_to_exc(ch: str) -> t.Optional[BaseException]:
612 if ch == "\x03":
613 raise KeyboardInterrupt()
615 if ch == "\x04" and not WIN: # Unix-like, Ctrl+D
616 raise EOFError()
618 if ch == "\x1a" and WIN: # Windows, Ctrl+Z
619 raise EOFError()
621 return None
624if WIN: 624 ↛ 625line 624 didn't jump to line 625, because the condition on line 624 was never true
625 import msvcrt
627 @contextlib.contextmanager
628 def raw_terminal() -> t.Iterator[int]:
629 yield -1
631 def getchar(echo: bool) -> str:
632 # The function `getch` will return a bytes object corresponding to
633 # the pressed character. Since Windows 10 build 1803, it will also
634 # return \x00 when called a second time after pressing a regular key.
635 #
636 # `getwch` does not share this probably-bugged behavior. Moreover, it
637 # returns a Unicode object by default, which is what we want.
638 #
639 # Either of these functions will return \x00 or \xe0 to indicate
640 # a special key, and you need to call the same function again to get
641 # the "rest" of the code. The fun part is that \u00e0 is
642 # "latin small letter a with grave", so if you type that on a French
643 # keyboard, you _also_ get a \xe0.
644 # E.g., consider the Up arrow. This returns \xe0 and then \x48. The
645 # resulting Unicode string reads as "a with grave" + "capital H".
646 # This is indistinguishable from when the user actually types
647 # "a with grave" and then "capital H".
648 #
649 # When \xe0 is returned, we assume it's part of a special-key sequence
650 # and call `getwch` again, but that means that when the user types
651 # the \u00e0 character, `getchar` doesn't return until a second
652 # character is typed.
653 # The alternative is returning immediately, but that would mess up
654 # cross-platform handling of arrow keys and others that start with
655 # \xe0. Another option is using `getch`, but then we can't reliably
656 # read non-ASCII characters, because return values of `getch` are
657 # limited to the current 8-bit codepage.
658 #
659 # Anyway, Click doesn't claim to do this Right(tm), and using `getwch`
660 # is doing the right thing in more situations than with `getch`.
661 func: t.Callable[[], str]
663 if echo:
664 func = msvcrt.getwche # type: ignore
665 else:
666 func = msvcrt.getwch # type: ignore
668 rv = func()
670 if rv in ("\x00", "\xe0"):
671 # \x00 and \xe0 are control characters that indicate special key,
672 # see above.
673 rv += func()
675 _translate_ch_to_exc(rv)
676 return rv
678else:
679 import tty
680 import termios
682 @contextlib.contextmanager
683 def raw_terminal() -> t.Iterator[int]:
684 f: t.Optional[t.TextIO]
685 fd: int
687 if not isatty(sys.stdin):
688 f = open("/dev/tty")
689 fd = f.fileno()
690 else:
691 fd = sys.stdin.fileno()
692 f = None
694 try:
695 old_settings = termios.tcgetattr(fd)
697 try:
698 tty.setraw(fd)
699 yield fd
700 finally:
701 termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
702 sys.stdout.flush()
704 if f is not None:
705 f.close()
706 except termios.error:
707 pass
709 def getchar(echo: bool) -> str:
710 with raw_terminal() as fd:
711 ch = os.read(fd, 32).decode(get_best_encoding(sys.stdin), "replace")
713 if echo and isatty(sys.stdout):
714 sys.stdout.write(ch)
716 _translate_ch_to_exc(ch)
717 return ch