Source code for rosdep2.installers

# Copyright (c) 2009, Willow Garage, Inc.
# All rights reserved.
# 
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# 
#     * Redistributions of source code must retain the above copyright
#       notice, this list of conditions and the following disclaimer.
#     * Redistributions in binary form must reproduce the above copyright
#       notice, this list of conditions and the following disclaimer in the
#       documentation and/or other materials provided with the distribution.
#     * Neither the name of the Willow Garage, Inc. nor the names of its
#       contributors may be used to endorse or promote products derived from
#       this software without specific prior written permission.
# 
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

# Author Tully Foote/tfoote@willowgarage.com, Ken Conley/kwc@willowgarage.com

from __future__ import print_function

import subprocess
import traceback

from rospkg.os_detect import OsDetect

from .core import rd_debug, RosdepInternalError, InstallFailed, print_bold, InvalidData

# kwc: InstallerContext is basically just a bunch of dictionaries with
# defined lookup methods.  It really encompasses two facets of a
# rosdep configuration: the pluggable nature of installers and
# platforms, as well as the resolution of the operating system for a
# specific machine.  It is possible to decouple those two notions,
# though there are some touch points over how this interfaces with the
# rospkg.os_detect library, i.e. how platforms can tweak these
# detectors and how the higher-level APIs can override them.
[docs]class InstallerContext(object): """ :class:`InstallerContext` manages the context of execution for rosdep as it relates to the installers, OS detectors, and other extensible APIs. """ def __init__(self, os_detect=None): """ :param os_detect: (optional) :class:`rospkg.os_detect.OsDetect` instance to use for detecting platforms. If `None`, default instance will be used. """ # platform configuration self.installers = {} self.os_installers = {} self.default_os_installer = {} # stores configuration of which value to use for the OS version key (version number or codename) self.os_version_type = {} # OS detection and override if os_detect is None: os_detect = OsDetect() self.os_detect = os_detect self.os_override = None self.verbose = False def set_verbose(self, verbose): self.verbose = verbose
[docs] def set_os_override(self, os_name, os_version): """ Override the OS detector with *os_name* and *os_version*. See :meth:`InstallerContext.detect_os`. :param os_name: OS name value to use, ``str`` :param os_version: OS version value to use, ``str`` """ if self.verbose: print("overriding OS to [%s:%s]"%(os_name, os_version)) self.os_override = os_name, os_version
def get_os_version_type(self, os_name): return self.os_version_type.get(os_name, OsDetect.get_version) def set_os_version_type(self, os_name, version_type): if not hasattr(version_type, '__call__'): raise ValueError("version type should be a method") self.os_version_type[os_name] = version_type
[docs] def get_os_name_and_version(self): """ Get the OS name and version key to use for resolution and installation. This will be the detected OS name/version unless :meth:`InstallerContext.set_os_override()` has been called. :returns: (os_name, os_version), ``(str, str)`` """ if self.os_override: return self.os_override else: os_name = self.os_detect.get_name() os_key = self.get_os_version_type(os_name) os_version = os_key(self.os_detect) return os_name, os_version
[docs] def get_os_detect(self): """ :returns os_detect: :class:`OsDetect` instance used for detecting platforms. """ return self.os_detect
[docs] def set_installer(self, installer_key, installer): """ Set the installer to use for *installer_key*. This will replace any existing installer associated with the key. *installer_key* should be the same key used for the ``rosdep.yaml`` package manager key. If *installer* is ``None``, this will delete any existing associated installer from this context. :param installer_key: key/name to associate with installer, ``str`` :param installer: :class:`Installer` implementation, ``class``. :raises: :exc:`TypeError` if *installer* is not a subclass of :class:`Installer` """ if installer is None: del self.installers[installer_key] return if not isinstance(installer, Installer): raise TypeError("installer must be a instance of Installer") if self.verbose: print("registering installer [%s]"%(installer_key)) self.installers[installer_key] = installer
[docs] def get_installer(self, installer_key): """ :returns: :class:`Installer` class associated with *installer_key*. :raises: :exc:`KeyError` If not associated installer :raises: :exc:`InstallFailed` If installer cannot produce an install command (e.g. if installer is not installed) """ return self.installers[installer_key]
[docs] def get_installer_keys(self): """ :returns: list of registered installer keys """ return self.installers.keys()
[docs] def get_os_keys(self): """ :returns: list of OS keys that have registered with this context, ``[str]`` """ return self.os_installers.keys()
[docs] def add_os_installer_key(self, os_key, installer_key): """ Register an installer for the specified OS. This will fail with a :exc:`KeyError` if no :class:`Installer` can be found with the associated *installer_key*. :param os_key: Key for OS :param installer_key: Key for installer to add to OS :raises: :exc:`KeyError`: if installer for *installer_key* is not set. """ # validate, will throw KeyError self.get_installer(installer_key) if self.verbose: print("add installer [%s] to OS [%s]"%(installer_key, os_key)) if os_key in self.os_installers: self.os_installers[os_key].append(installer_key) else: self.os_installers[os_key] = [installer_key]
[docs] def get_os_installer_keys(self, os_key): """ Get list of installer keys registered for the specified OS. These keys can be resolved by calling :meth:`InstallerContext.get_installer`. :param os_key: Key for OS :raises: :exc:`KeyError`: if no information for OS *os_key* is registered. """ if os_key in self.os_installers: return self.os_installers[os_key][:] else: raise KeyError(os_key)
[docs] def set_default_os_installer_key(self, os_key, installer_key): """ Set the default OS installer to use for OS. :meth:`InstallerContext.add_os_installer` must have previously been called with the same arguments. :param os_key: Key for OS :param installer_key: Key for installer to add to OS :raises: :exc:`KeyError`: if installer for *installer_key* is not set or if OS for *os_key* has no associated installers. """ if not os_key in self.os_installers: raise KeyError("unknown OS: %s"%(os_key)) if not hasattr(installer_key, '__call__'): raise ValueError("version type should be a method") if not installer_key(self.os_detect) in self.os_installers[os_key]: raise KeyError("installer [%s] is not associated with OS [%s]. call add_os_installer_key() first"%(installer_key(self.os_detect), os_key)) if self.verbose: print("set default installer for OS [%s]"%(os_key,)) self.default_os_installer[os_key] = installer_key
[docs] def get_default_os_installer_key(self, os_key): """ Get the default OS installer key to use for OS, or ``None`` if there is no default. :param os_key: Key for OS :returns: :class:`Installer` :raises: :exc:`KeyError`: if no information for OS *os_key* is registered. """ if not os_key in self.os_installers: raise KeyError("unknown OS: %s"%(os_key)) try: installer_key = self.default_os_installer[os_key](self.os_detect) if not installer_key in self.os_installers[os_key]: raise KeyError("installer [%s] is not associated with OS [%s]. call add_os_installer_key() first"%(installer_key, os_key)) # validate, will throw KeyError self.get_installer(installer_key) return installer_key except KeyError: return None
[docs]class Installer(object): """ The :class:`Installer` API is designed around opaque *resolved* parameters. These parameters can be any type of sequence object, but they must obey set arithmetic. They should also implement ``__str__()`` methods so they can be pretty printed. """
[docs] def is_installed(self, resolved_item): """ :param resolved: resolved installation item. NOTE: this is a single item, not a list of items like the other APIs, ``opaque``. :returns: ``True`` if all of the *resolved* items are installed on the local system """ raise NotImplementedError("is_installed", resolved_item)
[docs] def get_install_command(self, resolved, interactive=True, reinstall=False, quiet=False): """ :param resolved: list of resolved installation items, ``[opaque]`` :param interactive: If `False`, disable interactive prompts, e.g. Pass through ``-y`` or equivalant to package manager. :param reinstall: If `True`, install everything even if already installed """ raise NotImplementedError("get_package_install_command", resolved, interactive, reinstall, quiet)
[docs] def get_depends(self, rosdep_args): """ :returns: list of dependencies on other rosdep keys. Only necessary if the package manager doesn't handle dependencies. """ return [] # Default return empty list
[docs] def resolve(self, rosdep_args_dict): """ :param rosdep_args_dict: argument dictionary to the rosdep rule for this package manager :returns: [resolutions]. resolved objects should be printable to a user, but are otherwise opaque. """ raise NotImplementedError("Base class resolve", rosdep_args_dict)
[docs] def unique(self, *resolved_rules): """ Combine the resolved rules into a unique list. This is meant to combine the results of multiple calls to :meth:`PackageManagerInstaller.resolve`. Example:: resolved1 = installer.resolve(args1) resolved2 = installer.resolve(args2) resolved = installer.unique(resolved1, resolved2) :param *resolved_rules: resolved arguments. Resolved arguments must all be from this :class:`Installer` instance. """ raise NotImplementedError("Base class unique", resolved_rules)
[docs]class PackageManagerInstaller(Installer): """ General form of a package manager :class:`Installer` implementation that assumes: - installer rosdep args spec is a list of package names stored with the key "packages" - a detect function exists that can return a list of packages that are installed Also, if *supports_depends* is set to ``True``: - installer rosdep args spec can also include dependency specification with the key "depends" """ def __init__(self, detect_fn, supports_depends=False): """ :param supports_depends: package manager supports dependency key """ self.detect_fn = detect_fn self.supports_depends = supports_depends self.as_root = True self.sudo_command = 'sudo -H'
[docs] def elevate_priv(self, cmd): """ Prepend *self.sudo_command* to the command if *self.as_root* is ``True``. :param list cmd: list of strings comprising the command :returns: a list of commands """ return (self.sudo_command.split() if self.as_root else []) + cmd
[docs] def resolve(self, rosdep_args): """ See :meth:`Installer.resolve()` """ packages = None if type(rosdep_args) == dict: packages = rosdep_args.get("packages", []) if type(packages) == type("string"): packages = packages.split() elif type(rosdep_args) == type('str'): packages = rosdep_args.split(' ') elif type(rosdep_args) == list: packages = rosdep_args else: raise InvalidData("Invalid rosdep args: %s"%(rosdep_args)) return packages
[docs] def unique(self, *resolved_rules): """ See :meth:`Installer.unique()` """ s = set() for resolved in resolved_rules: s.update(resolved) return sorted(list(s))
def get_packages_to_install(self, resolved, reinstall=False): if reinstall: return resolved if not resolved: return [] else: return list(set(resolved) - set(self.detect_fn(resolved))) def is_installed(self, resolved_item): return not self.get_packages_to_install([resolved_item]) def get_install_command(self, resolved, interactive=True, reinstall=False, quiet=False): raise NotImplementedError('subclasses must implement', resolved, interactive, reinstall, quiet)
[docs] def get_depends(self, rosdep_args): """ :returns: list of dependencies on other rosdep keys. Only necessary if the package manager doesn't handle dependencies. """ if self.supports_depends and type(rosdep_args) == dict: return rosdep_args.get('depends', []) return [] # Default return empty list
class RosdepInstaller(object): def __init__(self, installer_context, lookup): self.installer_context = installer_context self.lookup = lookup def get_uninstalled(self, resources, implicit=False, verbose=False): """ Get list of system dependencies that have not been installed as well as a list of errors from performing the resolution. This is a bulk API in order to provide performance optimizations in checking install state. :param resources: List of resource names (e.g. ROS package names), ``[str]]`` :param implicit: Install implicit (recursive) dependencies of resources. Default ``False``. :returns: (uninstalled, errors), ``({str: [opaque]}, {str: ResolutionError})``. Uninstalled is a dictionary with the installer_key as the key. :raises: :exc:`RosdepInternalError` """ installer_context = self.installer_context # resolutions have been unique()d if verbose: print("resolving for resources [%s]"%(', '.join(resources))) resolutions, errors = self.lookup.resolve_all(resources, installer_context, implicit=implicit) # for each installer, figure out what is left to install uninstalled = [] if resolutions == []: return uninstalled, errors for installer_key, resolved in resolutions: #py3k if verbose: print("resolution: %s [%s]" % (installer_key, ', '.join([str(r) for r in resolved]))) try: installer = installer_context.get_installer(installer_key) except KeyError as e: # lookup has to be buggy to cause this raise RosdepInternalError(e) try: packages_to_install = installer.get_packages_to_install(resolved) except Exception as e: rd_debug(traceback.format_exc()) raise RosdepInternalError(e, message="Bad installer [%s]: %s"%(installer_key, e)) # only create key if there is something to do if packages_to_install: uninstalled.append((installer_key, packages_to_install)) if verbose: print("uninstalled: [%s]"%(', '.join([str(p) for p in packages_to_install]))) return uninstalled, errors def install(self, uninstalled, interactive=True, simulate=False, continue_on_error=False, reinstall=False, verbose=False, quiet=False): """ Install the uninstalled rosdeps. This API is for the bulk workflow of rosdep (see example below). For a more targeted install API, see :meth:`RosdepInstaller.install_resolved`. :param uninstalled: uninstalled value from :meth:`RosdepInstaller.get_uninstalled`. Value is a dictionary mapping installer key to a dictionary with resolution data, ``{str: {str: vals}}`` :param interactive: If ``False``, suppress interactive prompts (e.g. by passing '-y' to ``apt``). :param simulate: If ``False`` simulate installation without actually executing. :param continue_on_error: If ``True``, continue installation even if an install fails. Otherwise, stop after first installation failure. :param reinstall: If ``True``, install dependencies if even already installed (default ``False``). :raises: :exc:`InstallFailed` if any rosdeps fail to install and *continue_on_error* is ``False``. :raises: :exc:`KeyError` If *uninstalled* value has invalid installer keys Example:: uninstalled, errors = installer.get_uninstalled(packages) installer.install(uninstalled) """ if verbose: print("install options: reinstall[%s] simulate[%s] interactive[%s]"%(reinstall, simulate, interactive)) print("install: uninstalled keys are %s"%(', '.join([', '.join(pkg) for pkg in [v for k,v in uninstalled]]))) # Squash uninstalled again, in case some dependencies were already installed squashed_uninstalled = [] previous_installer_key = None for installer_key, resolved in uninstalled: if previous_installer_key != installer_key: squashed_uninstalled.append((installer_key, [])) previous_installer_key = installer_key squashed_uninstalled[-1][1].extend(resolved) failures = [] for installer_key, resolved in squashed_uninstalled: try: self.install_resolved(installer_key, resolved, simulate=simulate, interactive=interactive, reinstall=reinstall, continue_on_error=continue_on_error, verbose=verbose, quiet=quiet) except InstallFailed as e: if not continue_on_error: raise else: #accumulate errors failures.extend(e.failures) if failures: raise InstallFailed(failures=failures) def install_resolved(self, installer_key, resolved, simulate=False, interactive=True, reinstall=False, continue_on_error=False, verbose=False, quiet=False): """ Lower-level API for installing a rosdep dependency. The rosdep keys have already been resolved to *installer_key* and *resolved* via :exc:`RosdepLookup` or other means. :param installer_key: Key for installer to apply to *resolved*, ``str`` :param resolved: Opaque resolution list from :class:`RosdepLookup`. :param interactive: If ``True``, allow interactive prompts (default ``True``) :param simulate: If ``True``, don't execute installation commands, just print to screen. :param reinstall: If ``True``, install dependencies if even already installed (default ``False``). :param verbose: If ``True``, print verbose output to screen (default ``False``) :param quiet: If ``True``, supress output except for errors (default ``False``) :raises: :exc:`InstallFailed` if any of *resolved* fail to install. """ installer_context = self.installer_context installer = installer_context.get_installer(installer_key) command = installer.get_install_command(resolved, interactive=interactive, reinstall=reinstall, quiet=quiet) if not command: if verbose: print("#No packages to install") return if simulate: print("#[%s] Installation commands:"%(installer_key)) for sub_command in command: print(' '+' '.join(sub_command)) # nothing left to do for simulation if simulate: return # run each install command set and collect errors failures = [] for sub_command in command: # always echo commands to screen print_bold("executing command [%s]"%' '.join(sub_command)) result = subprocess.call(sub_command) if verbose: print("command return code [%s]: %s"%(' '.join(sub_command), result)) if result != 0: failures.append((installer_key, 'command [%s] failed'%(' '.join(sub_command))) ) if not continue_on_error: raise InstallFailed(failures=failures) # test installation of each for r in resolved: if not installer.is_installed(r): failures.append((installer_key, "Failed to detect successful installation of [%s]"%(r))) # finalize result if failures: raise InstallFailed(failures=failures) elif verbose: print("#successfully installed")