fabric命令执行原理简析

fabric最基本,最常用的用法就是 fab (-f fabfile.py) cmd。

今天就研究一下命令的执行原理。

首先之所以可以用fab命令,是因为在setup.py中编写了如下代码:

entry_points={
    'console_scripts': [
        'fab = fabric.main:main',
    ]

因此寻找fabfile文件重要的代码都在fabric/main.py中,其官方给出的解释:

main is executed as the command line fab program and takes care of parsing options and commands, loading the user settings file, loading a fabfile, and executing the commands given.

下面的代码是主代码(只列出逻辑中重要部分),说明了输入fab命令时,程序是如何执行的:

def main(fabfile_locations=None):
"""
Main command-line execution loop.
"""
try:
    # Parse command line options,从命令行解析参数
    parser, options, arguments = parse_options()

    # Handle regular args vs -- args
    arguments = parser.largs
    remainder_arguments = parser.rargs


    for option in env_options:
        state.env[option.dest] = getattr(options, option.dest)

    # Handle --hosts, --roles, --exclude-hosts (comma separated string =>
    # list)
    for key in ['hosts', 'roles', 'exclude_hosts']:
        if key in state.env and isinstance(state.env[key], basestring):
            state.env[key] = state.env[key].split(',')

    # Feed the env.tasks : tasks that are asked to be executed.
    #存储将要被执行的任务
    state.env['tasks'] = arguments

    # Handle output control level show/hide
    update_output_levels(show=options.show, hide=options.hide)

    # Handle version number option
    if options.show_version:
        print("Fabric %s" % state.env.version)
        print("Paramiko %s" % ssh.__version__)
        sys.exit(0)

    # Load settings from user settings file, into shared env dict.
    state.env.update(load_settings(state.env.rcfile))

    # Find local fabfile path or abort
    fabfile = find_fabfile(fabfile_locations)

    state.env.real_fabfile = fabfile

    # Load fabfile (which calls its module-level code, including
    # tweaks to env values) and put its commands in the shared commands
    # dict
    default = None
    if fabfile:
        docstring, callables, default = load_fabfile(fabfile)
        state.commands.update(callables)

    # Handle case where we were called bare, i.e. just "fab", and print
    # a help message.
    actions = (options.list_commands, options.shortlist, options.display,
        arguments, remainder_arguments, default)
    if not any(actions):
        parser.print_help()
        sys.exit(1)

    # Abort if no commands found
    if not state.commands and not remainder_arguments:
        abort("Fabfile didn't contain any commands!")

    # Now that we're settled on a fabfile, inform user.
    if state.output.debug:
        if fabfile:
            print("Using fabfile '%s'" % fabfile)
        else:
            print("No fabfile loaded -- remainder command only")

    # Shortlist is now just an alias for the "short" list format;
    # it overrides use of --list-format if somebody were to specify both
    if options.shortlist:
        options.list_format = 'short'
        options.list_commands = True

    # List available commands
    if options.list_commands:
        show_commands(docstring, options.list_format)

    # Handle show (command-specific help) option
    if options.display:
        display_command(options.display)

    # If user didn't specify any commands to run, show help
    if not (arguments or remainder_arguments or default):
        parser.print_help()
        sys.exit(0)  # Or should it exit with error (1)?

    # Parse arguments into commands to run (plus args/kwargs/hosts)
    commands_to_run = parse_arguments(arguments)

    # Parse remainders into a faux "command" to execute
    remainder_command = parse_remainder(remainder_arguments)

    # Figure out if any specified task names are invalid
    unknown_commands = []
    for tup in commands_to_run:
        if crawl(tup[0], state.commands) is None:
            unknown_commands.append(tup[0])

    # Abort if any unknown commands were specified
    if unknown_commands and not state.env.get('skip_unknown_tasks', False):
        warn("Command(s) not found:\n%s" \
            % indent(unknown_commands))
        show_commands(None, options.list_format, 1)

    # Generate remainder command and insert into commands, commands_to_run
    if remainder_command:
        r = '<remainder>'
        state.commands[r] = lambda: api.run(remainder_command)
        commands_to_run.append((r, [], {}, [], [], []))

    # Ditto for a default, if found
    if not commands_to_run and default:
        commands_to_run.append((default.name, [], {}, [], [], []))

    # Initial password prompt, if requested
    if options.initial_password_prompt:
        prompt = "Initial value for env.password: "
        state.env.password = getpass.getpass(prompt)

    if state.output.debug:
        names = ", ".join(x[0] for x in commands_to_run)
        print("Commands to run: %s" % names)

    # At this point all commands must exist, so execute them in order.
    for name, args, kwargs, arg_hosts, arg_roles, arg_exclude_hosts in commands_to_run:
        execute(
            name,
            hosts=arg_hosts,
            roles=arg_roles,
            exclude_hosts=arg_exclude_hosts,
            *args, **kwargs
        )
    # If we got here, no errors occurred, so print a final note.
    if state.output.status:
        print("\nDone.")
except SystemExit:
    # a number of internal functions might raise this one.
    raise
except KeyboardInterrupt:
    if state.output.status:
        sys.stderr.write("\nStopped.\n")
    sys.exit(1)
except:
    sys.excepthook(*sys.exc_info())
    # we might leave stale threads if we don't explicitly exit()
    sys.exit(1)
finally:
    disconnect_all()
sys.exit(0)

概括的说,程序会解析命令行,首先找到fabfile文件(根据默认的fabfile或者-f指定的),然后加载fabfile文件中的task,执行相应的任务,之后会断开所有连接。

下面说一下命令执行原理,任务执行最关键的代码为:

for name, args, kwargs, arg_hosts, arg_roles, arg_exclude_hosts in commands_to_run:
        execute(
            name,
            hosts=arg_hosts,
            roles=arg_roles,
            exclude_hosts=arg_exclude_hosts,
            *args, **kwargs
        )

excute函数在tasks模块中:

def execute(task, *args, **kwargs):

    my_env = {'clean_revert': True}
    results = {}
    # Obtain task
    is_callable = callable(task)
    new_kwargs, hosts, roles, exclude_hosts = parse_kwargs(kwargs)
    # Set up host list
    my_env['all_hosts'], my_env['effective_roles'] = task.get_hosts_and_effective_roles(hosts, roles,                                                                                     exclude_hosts, state.env)


    # Call on host list
    if my_env['all_hosts']:
        # Attempt to cycle on hosts, skipping if needed
        for host in my_env['all_hosts']:
            try:
                results[host] = _execute(
                    task, host, my_env, args, new_kwargs, jobs, queue,
                    multiprocessing
                )
            except NetworkError, e:
                results[host] = e
                # Backwards compat test re: whether to use an exception or
                # abort
                if not state.env.use_exceptions_for['network']:
                    func = warn if state.env.skip_bad_hosts else abort
                    error(e.message, func=func, exception=e.wrapped)
                else:
                    raise

            # If requested, clear out connections here and not just at the end.
            if state.env.eagerly_disconnect:
                disconnect_all()

        # If running in parallel, block until job queue is emptied
        if jobs:
            err = "One or more hosts failed while executing task '%s'" % (
                my_env['command']
            )
            jobs.close()
            # Abort if any children did not exit cleanly (fail-fast).
            # This prevents Fabric from continuing on to any other tasks.
            # Otherwise, pull in results from the child run.
            ran_jobs = jobs.run()
            for name, d in ran_jobs.iteritems():
                if d['exit_code'] != 0:
                    if isinstance(d['results'], NetworkError) and \
                            _is_network_error_ignored():
                        error(d['results'].message, func=warn, exception=d['results'].wrapped)
                    elif isinstance(d['results'], BaseException):
                        error(err, exception=d['results'])
                    else:
                        error(err)
                results[name] = d['results']

    # Or just run once for local-only
    else:
        with settings(**my_env):
            results['<local-only>'] = task.run(*args, **new_kwargs)
    # Return what we can from the inner task executions

    return results

该函数会自动获得roles,hosts等值。

my_env['all_hosts'], my_env['effective_roles'] = task.get_hosts_and_effective_roles(hosts, roles,                                                                                     exclude_hosts, state.env)

def get_hosts_and_effective_roles(self, arg_hosts, arg_roles, arg_exclude_hosts, env=None):
    """
    Return a tuple containing the host list the given task should be using
    and the roles being used.

    See :ref:`host-lists` for detailed documentation on how host lists are
    set.

    .. versionchanged:: 1.9
    """
    env = env or {'hosts': [], 'roles': [], 'exclude_hosts': []}
    roledefs = env.get('roledefs', {})
    # Command line per-task takes precedence over anything else.
    if arg_hosts or arg_roles:
        return merge(arg_hosts, arg_roles, arg_exclude_hosts, roledefs), arg_roles
    # Decorator-specific hosts/roles go next
    func_hosts = getattr(self, 'hosts', [])
    func_roles = getattr(self, 'roles', [])
--------EOF---------
微信分享/微信扫码阅读