Source code for pynsot.dotfile

# -*- coding: utf-8 -*-

"""
Handle the read, write, and generation of the .pynsotrc config file.
"""

from __future__ import unicode_literals
from ConfigParser import RawConfigParser
import copy
import logging
import os

from .vendor import click
from .vendor import rcfile
from . import constants


__author__ = 'Jathan McCollum'
__maintainer__ = 'Jathan McCollum'
__email__ = 'jathan@dropbox.com'
__copyright__ = 'Copyright (c) 2015-2016 Dropbox, Inc.'


# Logging object
log = logging.getLogger(__name__)


__all__ = (
    'DotfileError', 'Dotfile',
)


class DotfileError(Exception):
    """Raised when something with the dotfile fails."""


[docs]class Dotfile(object): """Create, read, and write a dotfile.""" def __init__(self, filepath=constants.DOTFILE_PATH, **kwargs): self.filepath = filepath def read(self, **kwargs): """ Read ``~/.pynsotrc`` and return it as a dict. """ config = rcfile.rcfile( constants.SECTION_NAME, args={'config': self.filepath} ) config.pop('config', None) # We don't need this field in here. # If there is *no* config data so far and... if not config and not os.path.exists(self.filepath): p = '%s not found; would you like to create it?' % (self.filepath,) if click.confirm(p, default=True, abort=True): config_data = self.get_config_data(**kwargs) self.write(config_data) # Write config to disk config = config_data # Return the contents self.config = config # If we have configuration values, validate the permissions and # presence of fields in the dotfile. auth_method = config.get('auth_method') if auth_method == 'auth_token': self.validate_perms() required_fields = self.get_required_fields(auth_method) self.validate_fields(config, required_fields) return config def validate_perms(self): """Make sure dotfile ownership and permissions are correct.""" if not os.path.exists(self.filepath): return None # Ownership s = os.stat(self.filepath) if s.st_uid != os.getuid(): raise DotfileError( '%s: %s must be owned by you' % ( constants.DOTFILE_NAME, self.filepath ) ) # Permissions self.enforce_perms() def validate_fields(self, field_names, required_fields): """ Make sure all the fields are set. :param field_names: List of field names to validate :param required_fields: List of required field names to check against """ for rname in sorted(required_fields): if rname not in field_names: msg = '%s: Missing required field: %s' % (self.filepath, rname) raise DotfileError(msg) def enforce_perms(self, perms=constants.DOTFILE_PERMS): """ Enforce permissions on the dotfile. :param perms: Octal number representing permissions to enforce """ log.debug('Enforcing permissions %o on %s' % (perms, self.filepath)) os.chmod(self.filepath, perms) def write(self, config_data, filepath=None): """ Create a dotfile from keyword arguments. :param config_data: Dict of config settings :param filepath: (Optional) Path to write """ if filepath is None: filepath = self.filepath config = RawConfigParser() section = constants.SECTION_NAME config.add_section(section) # Set the config settings for key, val in config_data.iteritems(): config.set(section, key, val) with open(filepath, 'wb') as dotfile: config.write(dotfile) self.enforce_perms() log.debug('wrote %s' % filepath) @classmethod def get_required_fields(cls, auth_method, required_fields=None): """ Return union of all required fields for ``auth_method``. :param auth_method: Authentication method :param required_fields: Mapping of required field names to default values """ if required_fields is None: required_fields = copy.deepcopy(constants.REQUIRED_FIELDS) from .client import AUTH_CLIENTS # To avoid circular import # Fields that do not have default values, which are specific to an # auth_method. auth_fields = AUTH_CLIENTS[auth_method].required_arguments non_specific_fields = dict.fromkeys(auth_fields) required_fields.update(non_specific_fields) # Fields that MAY have default values, which are specific to an # auth_method, if any. specific_fields = constants.SPECIFIC_FIELDS.get(auth_method, {}) required_fields.update(specific_fields) return required_fields @classmethod def get_config_data(cls, required_fields=None, optional_fields=None, **kwargs): """ Enumerate required fields and prompt for ones that weren't provided if they don't have default values. :param required_fields: Mapping of required field names to default values :param optional_fields: Mapping of optional field names to default values :param kwargs: Dict of prepared config settings :returns: Dict of config data """ if required_fields is None: required_fields = constants.REQUIRED_FIELDS if optional_fields is None: optional_fields = constants.OPTIONAL_FIELDS config_data = {} # Base required fields cls.process_fields(config_data, required_fields, **kwargs) # Auth-related fields auth_method = config_data['auth_method'] auth_fields = cls.get_required_fields(auth_method) cls.process_fields(config_data, auth_fields, **kwargs) # Optional fields cls.process_fields( config_data, optional_fields, optional=True, **kwargs ) return config_data @staticmethod def process_fields(config_data, field_items, optional=False, **kwargs): """ Enumerate fields to update ``config_data``. Any fields not already in ``config_data`` will be prompted for a value from ``field_items``. :param config_data: Dict of config data :param field_items: Dict of desired fields to be populated :param optional: Whether to consider values for ``field_items`` as optional :param kwargs: Keyword arguments of prepared values """ for field, default_value in field_items.iteritems(): prompt = 'Please enter %s' % (field,) # If it's already in the config data, move on. if field in config_data: continue # If the field was not provided if field not in kwargs: # If it does not have a default value prompt for it if default_value is None: if not optional: value = click.prompt(prompt, type=str) else: prompt += ' (optional)' value = click.prompt( prompt, default='', type=str, show_default=False ) # If the default_value is a string, prompt for it, but present # it as a default. elif isinstance(default_value, basestring): value = click.prompt( prompt, type=str, default=default_value ) # If it's a list of options, present the choices. elif isinstance(default_value, list): choices = ', '.join(default_value) prompt = 'Please choose %s [%s]' % (field, choices) while True: value = click.prompt(prompt, type=str) if value not in default_value: click.echo('Invalid choice: %s' % value) continue break else: raise RuntimeError( 'Unexpected error condition when processing config ' 'fields.' ) # Or, use the value provided in kwargs elif field in kwargs: value = kwargs[field] # Otherwise, use the default value else: value = default_value # If a value wasn't set, don't save it. if value == '': continue config_data[field] = value return config_data