first commit

This commit is contained in:
Yura 2024-09-15 15:12:16 +03:00
commit 417e54da96
5696 changed files with 900003 additions and 0 deletions

View file

@ -0,0 +1,164 @@
from __future__ import annotations
import logging
import os
from functools import partial
from virtualenv.app_data import make_app_data
from virtualenv.config.cli.parser import VirtualEnvConfigParser
from virtualenv.report import LEVELS, setup_report
from virtualenv.run.session import Session
from virtualenv.seed.wheels.periodic_update import manual_upgrade
from virtualenv.version import __version__
from .plugin.activators import ActivationSelector
from .plugin.creators import CreatorSelector
from .plugin.discovery import get_discover
from .plugin.seeders import SeederSelector
def cli_run(args, options=None, setup_logging=True, env=None): # noqa: FBT002
"""
Create a virtual environment given some command line interface arguments.
:param args: the command line arguments
:param options: passing in a ``VirtualEnvOptions`` object allows return of the parsed options
:param setup_logging: ``True`` if setup logging handlers, ``False`` to use handlers already registered
:param env: environment variables to use
:return: the session object of the creation (its structure for now is experimental and might change on short notice)
"""
env = os.environ if env is None else env
of_session = session_via_cli(args, options, setup_logging, env)
with of_session:
of_session.run()
return of_session
def session_via_cli(args, options=None, setup_logging=True, env=None): # noqa: FBT002
"""
Create a virtualenv session (same as cli_run, but this does not perform the creation). Use this if you just want to
query what the virtual environment would look like, but not actually create it.
:param args: the command line arguments
:param options: passing in a ``VirtualEnvOptions`` object allows return of the parsed options
:param setup_logging: ``True`` if setup logging handlers, ``False`` to use handlers already registered
:param env: environment variables to use
:return: the session object of the creation (its structure for now is experimental and might change on short notice)
""" # noqa: D205
env = os.environ if env is None else env
parser, elements = build_parser(args, options, setup_logging, env)
options = parser.parse_args(args)
creator, seeder, activators = tuple(e.create(options) for e in elements) # create types
return Session(
options.verbosity,
options.app_data,
parser._interpreter, # noqa: SLF001
creator,
seeder,
activators,
)
def build_parser(args=None, options=None, setup_logging=True, env=None): # noqa: FBT002
parser = VirtualEnvConfigParser(options, os.environ if env is None else env)
add_version_flag(parser)
parser.add_argument(
"--with-traceback",
dest="with_traceback",
action="store_true",
default=False,
help="on failure also display the stacktrace internals of virtualenv",
)
_do_report_setup(parser, args, setup_logging)
options = load_app_data(args, parser, options)
handle_extra_commands(options)
discover = get_discover(parser, args)
parser._interpreter = interpreter = discover.interpreter # noqa: SLF001
if interpreter is None:
msg = f"failed to find interpreter for {discover}"
raise RuntimeError(msg)
elements = [
CreatorSelector(interpreter, parser),
SeederSelector(interpreter, parser),
ActivationSelector(interpreter, parser),
]
options, _ = parser.parse_known_args(args)
for element in elements:
element.handle_selected_arg_parse(options)
parser.enable_help()
return parser, elements
def build_parser_only(args=None):
"""Used to provide a parser for the doc generation."""
return build_parser(args)[0]
def handle_extra_commands(options):
if options.upgrade_embed_wheels:
result = manual_upgrade(options.app_data, options.env)
raise SystemExit(result)
def load_app_data(args, parser, options):
parser.add_argument(
"--read-only-app-data",
action="store_true",
help="use app data folder in read-only mode (write operations will fail with error)",
)
options, _ = parser.parse_known_args(args, namespace=options)
# here we need a write-able application data (e.g. the zipapp might need this for discovery cache)
parser.add_argument(
"--app-data",
help="a data folder used as cache by the virtualenv",
type=partial(make_app_data, read_only=options.read_only_app_data, env=options.env),
default=make_app_data(None, read_only=options.read_only_app_data, env=options.env),
)
parser.add_argument(
"--reset-app-data",
action="store_true",
help="start with empty app data folder",
)
parser.add_argument(
"--upgrade-embed-wheels",
action="store_true",
help="trigger a manual update of the embedded wheels",
)
options, _ = parser.parse_known_args(args, namespace=options)
if options.reset_app_data:
options.app_data.reset()
return options
def add_version_flag(parser):
import virtualenv # noqa: PLC0415
parser.add_argument(
"--version",
action="version",
version=f"%(prog)s {__version__} from {virtualenv.__file__}",
help="display the version of the virtualenv package and its location, then exit",
)
def _do_report_setup(parser, args, setup_logging):
level_map = ", ".join(f"{logging.getLevelName(line)}={c}" for c, line in sorted(LEVELS.items()))
msg = "verbosity = verbose - quiet, default {}, mapping => {}"
verbosity_group = parser.add_argument_group(
title="verbosity",
description=msg.format(logging.getLevelName(LEVELS[3]), level_map),
)
verbosity = verbosity_group.add_mutually_exclusive_group()
verbosity.add_argument("-v", "--verbose", action="count", dest="verbose", help="increase verbosity", default=2)
verbosity.add_argument("-q", "--quiet", action="count", dest="quiet", help="decrease verbosity", default=0)
option, _ = parser.parse_known_args(args)
if setup_logging:
setup_report(option.verbosity)
__all__ = [
"cli_run",
"session_via_cli",
]

View file

@ -0,0 +1,62 @@
from __future__ import annotations
from argparse import ArgumentTypeError
from collections import OrderedDict
from .base import ComponentBuilder
class ActivationSelector(ComponentBuilder):
def __init__(self, interpreter, parser) -> None:
self.default = None
possible = OrderedDict(
(k, v) for k, v in self.options("virtualenv.activate").items() if v.supports(interpreter)
)
super().__init__(interpreter, parser, "activators", possible)
self.parser.description = "options for activation scripts"
self.active = None
def add_selector_arg_parse(self, name, choices):
self.default = ",".join(choices)
self.parser.add_argument(
f"--{name}",
default=self.default,
metavar="comma_sep_list",
required=False,
help="activators to generate - default is all supported",
type=self._extract_activators,
)
def _extract_activators(self, entered_str):
elements = [e.strip() for e in entered_str.split(",") if e.strip()]
missing = [e for e in elements if e not in self.possible]
if missing:
msg = f"the following activators are not available {','.join(missing)}"
raise ArgumentTypeError(msg)
return elements
def handle_selected_arg_parse(self, options):
selected_activators = (
self._extract_activators(self.default) if options.activators is self.default else options.activators
)
self.active = {k: v for k, v in self.possible.items() if k in selected_activators}
self.parser.add_argument(
"--prompt",
dest="prompt",
metavar="prompt",
help=(
"provides an alternative prompt prefix for this environment "
"(value of . means name of the current working directory)"
),
default=None,
)
for activator in self.active.values():
activator.add_parser_arguments(self.parser, self.interpreter)
def create(self, options):
return [activator_class(options) for activator_class in self.active.values()]
__all__ = [
"ActivationSelector",
]

View file

@ -0,0 +1,71 @@
from __future__ import annotations
import sys
from collections import OrderedDict
if sys.version_info >= (3, 8):
from importlib.metadata import entry_points
importlib_metadata_version = ()
else:
from importlib_metadata import entry_points, version
importlib_metadata_version = tuple(int(i) for i in version("importlib_metadata").split(".")[:2])
class PluginLoader:
_OPTIONS = None
_ENTRY_POINTS = None
@classmethod
def entry_points_for(cls, key):
if sys.version_info >= (3, 10) or importlib_metadata_version >= (3, 6):
return OrderedDict((e.name, e.load()) for e in cls.entry_points().select(group=key))
return OrderedDict((e.name, e.load()) for e in cls.entry_points().get(key, {}))
@staticmethod
def entry_points():
if PluginLoader._ENTRY_POINTS is None:
PluginLoader._ENTRY_POINTS = entry_points()
return PluginLoader._ENTRY_POINTS
class ComponentBuilder(PluginLoader):
def __init__(self, interpreter, parser, name, possible) -> None:
self.interpreter = interpreter
self.name = name
self._impl_class = None
self.possible = possible
self.parser = parser.add_argument_group(title=name)
self.add_selector_arg_parse(name, list(self.possible))
@classmethod
def options(cls, key):
if cls._OPTIONS is None:
cls._OPTIONS = cls.entry_points_for(key)
return cls._OPTIONS
def add_selector_arg_parse(self, name, choices):
raise NotImplementedError
def handle_selected_arg_parse(self, options):
selected = getattr(options, self.name)
if selected not in self.possible:
msg = f"No implementation for {self.interpreter}"
raise RuntimeError(msg)
self._impl_class = self.possible[selected]
self.populate_selected_argparse(selected, options.app_data)
return selected
def populate_selected_argparse(self, selected, app_data):
self.parser.description = f"options for {self.name} {selected}"
self._impl_class.add_parser_arguments(self.parser, self.interpreter, app_data)
def create(self, options):
return self._impl_class(options, self.interpreter)
__all__ = [
"ComponentBuilder",
"PluginLoader",
]

View file

@ -0,0 +1,91 @@
from __future__ import annotations
from collections import OrderedDict, defaultdict
from typing import TYPE_CHECKING, NamedTuple
from virtualenv.create.describe import Describe
from virtualenv.create.via_global_ref.builtin.builtin_way import VirtualenvBuiltin
from .base import ComponentBuilder
if TYPE_CHECKING:
from virtualenv.create.creator import Creator, CreatorMeta
class CreatorInfo(NamedTuple):
key_to_class: dict[str, type[Creator]]
key_to_meta: dict[str, CreatorMeta]
describe: type[Describe] | None
builtin_key: str
class CreatorSelector(ComponentBuilder):
def __init__(self, interpreter, parser) -> None:
creators, self.key_to_meta, self.describe, self.builtin_key = self.for_interpreter(interpreter)
super().__init__(interpreter, parser, "creator", creators)
@classmethod
def for_interpreter(cls, interpreter):
key_to_class, key_to_meta, builtin_key, describe = OrderedDict(), {}, None, None
errors = defaultdict(list)
for key, creator_class in cls.options("virtualenv.create").items():
if key == "builtin":
msg = "builtin creator is a reserved name"
raise RuntimeError(msg)
meta = creator_class.can_create(interpreter)
if meta:
if meta.error:
errors[meta.error].append(creator_class)
else:
if "builtin" not in key_to_class and issubclass(creator_class, VirtualenvBuiltin):
builtin_key = key
key_to_class["builtin"] = creator_class
key_to_meta["builtin"] = meta
key_to_class[key] = creator_class
key_to_meta[key] = meta
if describe is None and issubclass(creator_class, Describe) and creator_class.can_describe(interpreter):
describe = creator_class
if not key_to_meta:
if errors:
rows = [f"{k} for creators {', '.join(i.__name__ for i in v)}" for k, v in errors.items()]
raise RuntimeError("\n".join(rows))
msg = f"No virtualenv implementation for {interpreter}"
raise RuntimeError(msg)
return CreatorInfo(
key_to_class=key_to_class,
key_to_meta=key_to_meta,
describe=describe,
builtin_key=builtin_key,
)
def add_selector_arg_parse(self, name, choices):
# prefer the built-in venv if present, otherwise fallback to first defined type
choices = sorted(choices, key=lambda a: 0 if a == "builtin" else 1)
default_value = self._get_default(choices)
self.parser.add_argument(
f"--{name}",
choices=choices,
default=default_value,
required=False,
help=f"create environment via{'' if self.builtin_key is None else f' (builtin = {self.builtin_key})'}",
)
@staticmethod
def _get_default(choices):
return next(iter(choices))
def populate_selected_argparse(self, selected, app_data):
self.parser.description = f"options for {self.name} {selected}"
self._impl_class.add_parser_arguments(self.parser, self.interpreter, self.key_to_meta[selected], app_data)
def create(self, options):
options.meta = self.key_to_meta[getattr(options, self.name)]
if not issubclass(self._impl_class, Describe):
options.describe = self.describe(options, self.interpreter)
return super().create(options)
__all__ = [
"CreatorInfo",
"CreatorSelector",
]

View file

@ -0,0 +1,40 @@
from __future__ import annotations
from .base import PluginLoader
class Discovery(PluginLoader):
"""Discovery plugins."""
def get_discover(parser, args):
discover_types = Discovery.entry_points_for("virtualenv.discovery")
discovery_parser = parser.add_argument_group(
title="discovery",
description="discover and provide a target interpreter",
)
choices = _get_default_discovery(discover_types)
# prefer the builtin if present, otherwise fallback to first defined type
choices = sorted(choices, key=lambda a: 0 if a == "builtin" else 1)
discovery_parser.add_argument(
"--discovery",
choices=choices,
default=next(iter(choices)),
required=False,
help="interpreter discovery method",
)
options, _ = parser.parse_known_args(args)
discover_class = discover_types[options.discovery]
discover_class.add_parser_arguments(discovery_parser)
options, _ = parser.parse_known_args(args, namespace=options)
return discover_class(options)
def _get_default_discovery(discover_types):
return list(discover_types.keys())
__all__ = [
"Discovery",
"get_discover",
]

View file

@ -0,0 +1,40 @@
from __future__ import annotations
from .base import ComponentBuilder
class SeederSelector(ComponentBuilder):
def __init__(self, interpreter, parser) -> None:
possible = self.options("virtualenv.seed")
super().__init__(interpreter, parser, "seeder", possible)
def add_selector_arg_parse(self, name, choices):
self.parser.add_argument(
f"--{name}",
choices=choices,
default=self._get_default(),
required=False,
help="seed packages install method",
)
self.parser.add_argument(
"--no-seed",
"--without-pip",
help="do not install seed packages",
action="store_true",
dest="no_seed",
)
@staticmethod
def _get_default():
return "app-data"
def handle_selected_arg_parse(self, options):
return super().handle_selected_arg_parse(options)
def create(self, options):
return self._impl_class(options)
__all__ = [
"SeederSelector",
]

View file

@ -0,0 +1,89 @@
from __future__ import annotations
import json
import logging
class Session:
"""Represents a virtual environment creation session."""
def __init__(self, verbosity, app_data, interpreter, creator, seeder, activators) -> None: # noqa: PLR0913
self._verbosity = verbosity
self._app_data = app_data
self._interpreter = interpreter
self._creator = creator
self._seeder = seeder
self._activators = activators
@property
def verbosity(self):
"""The verbosity of the run."""
return self._verbosity
@property
def interpreter(self):
"""Create a virtual environment based on this reference interpreter."""
return self._interpreter
@property
def creator(self):
"""The creator used to build the virtual environment (must be compatible with the interpreter)."""
return self._creator
@property
def seeder(self):
"""The mechanism used to provide the seed packages (pip, setuptools, wheel)."""
return self._seeder
@property
def activators(self):
"""Activators used to generate activations scripts."""
return self._activators
def run(self):
self._create()
self._seed()
self._activate()
self.creator.pyenv_cfg.write()
def _create(self):
logging.info("create virtual environment via %s", self.creator)
self.creator.run()
logging.debug(_DEBUG_MARKER)
logging.debug("%s", _Debug(self.creator))
def _seed(self):
if self.seeder is not None and self.seeder.enabled:
logging.info("add seed packages via %s", self.seeder)
self.seeder.run(self.creator)
def _activate(self):
if self.activators:
active = ", ".join(type(i).__name__.replace("Activator", "") for i in self.activators)
logging.info("add activators for %s", active)
for activator in self.activators:
activator.generate(self.creator)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._app_data.close()
_DEBUG_MARKER = "=" * 30 + " target debug " + "=" * 30
class _Debug:
"""lazily populate debug."""
def __init__(self, creator) -> None:
self.creator = creator
def __repr__(self) -> str:
return json.dumps(self.creator.debug, indent=2)
__all__ = [
"Session",
]