楼主
|
写在前面:
ansible是一个非常棒的运维工具,可以远程批量执行命令、上传文件等自动化运维操作,由于要搞配置管理,初始化等批量操作,而自己对ansible相对熟悉,因此选择了ansible playbook。不过在调用playbook API的过程中,发现原始api并不能满足我的需求,网络上多数文档还是1.0版本,因此下载了2.0源码查看,重写了部分类。因此这里总结学习,也给有相同需求的朋友参考
ansible-playbook使用
一、ansible-playbook简单介绍
playbook字面意思看就是剧本,其实也很形象,ansible-playbook就是事先定义好很多task(任务)放在.yml文件中,执行的时候就像剧本一样,依次往下演(执行)
- #/tmp/xx.yml
- ---
- - hosts: all #hosts中指定
- tasks:
- - name: "ls /tmp" #- name:任务名称,下面是任务过程
- shell: 'ls /tmp/'
复制代码
ansible-playbook执行:
二、ansible-playbook api介绍
ansible-playbook api由ansible官方提供接口,用于在python代码中更灵活的使用,但官方只提供了ansible-api的参考文档,且2.0以后变化较大,api编写更复杂,但更为灵活
官方参考: http://docs.ansible.com/ansib...
一个基本的ansible-playbook api调用
- # -*- coding:utf-8 -*-
- # 描述:
- # V1 WZJ 2016-12-20
- from collections import namedtuple
- from ansible.parsing.dataloader import DataLoader
- from ansible.vars import VariableManager
- from ansible.inventory import Inventory
- from ansible.executor.playbook_executor import PlaybookExecutor
- # 用来加载解析yaml文件或JSON内容,并且支持vault的解密
- loader = DataLoader()
- # 管理变量的类,包括主机,组,扩展等变量,之前版本是在 inventory中的
- variable_manager = VariableManager()
- # 根据inventory加载对应变量,此处host_list参数可以有两种格式:
- # 1: hosts文件(需要),
- # 2: 可以是IP列表,此处使用IP列表
- inventory = Inventory(loader=loader, variable_manager=variable_manager,host_list=['172.16.1.121'])
- variable_manager.set_inventory(inventory)
- # 设置密码,需要是dict类型
- passwords=dict(conn_pass='your password')
- # 初始化需要的对象
- Options = namedtuple('Options',
- ['connection',
- 'remote_user',
- 'ask_sudo_pass',
- 'verbosity',
- 'ack_pass',
- 'module_path',
- 'forks',
- 'become',
- 'become_method',
- 'become_user',
- 'check',
- 'listhosts',
- 'listtasks',
- 'listtags',
- 'syntax',
- 'sudo_user',
- 'sudo'])
- # 初始化需要的对象
- options = Options(connection='smart',
- remote_user='root',
- ack_pass=None,
- sudo_user='root',
- forks=5,
- sudo='yes',
- ask_sudo_pass=False,
- verbosity=5,
- module_path=None,
- become=True,
- become_method='sudo',
- become_user='root',
- check=None,
- listhosts=None,
- listtasks=None,
- listtags=None,
- syntax=None)
- # playbooks就填写yml文件即可,可以有多个,以列表形式
- playbook = PlaybookExecutor(playbooks=['/tmp/xx.yml'],inventory=inventory,
- variable_manager=variable_manager,
- loader=loader,options=options,passwords=passwords)
- # 开始执行
- playbook.run()
复制代码
执行上面脚本:
可以发现,这个api调用执行虽然成功了,但是并没有什么详细输出(这个跟一个名叫CallbackBase的类有关)
重写palybook api
1 为什么重写?
在实际应用中,有许多地方是不太满足需求的
1.比如inventory有host group(主机组)的概念,但是默认都放在all组里面去了,导致在编写yml的时候,无法对不同IP组进行不同的操作
2.console输出结果有时候需要自定义
3.调用api需要更为灵活
当然除了以上需求外,还有一些其它意义,比如阅读源码有助于熟悉ansible的工作方式,也提升自己的脚本能力
注意:所谓重写只是在源码的基础上继承并修改了部分代码,有可能导致ansible不稳定的情况,另外只有在有中文注释的地方才是重写内容
2 ansible源码文件简单介绍
源码下载地址:https://github.com/ansible/an...
其实ansible我们所用到的源码都在lib里面,而需要关注的也主要是三个文件夹和一些类:
用于解析和聚合host,group等环境变量: lib/ansible/inventory/__init__.py 中 Inventory 类
用于console输出: lib/ansible/plugins/callback/default.py 中 CallbackModule 类
用于playbook的解析工作: lib/executor/playbook_executor.py 中 PlaybookExecutor 类
playbook底层用到的任务队列:lib/executor/task_queue_manager.py 中 TaskQueueManager 类
几个重写的类的树状图:
一、Inventory类重写
1.修改可以为IP传入group name2.额外的自定义参数,如:{"test":1000}
- # -*- coding:utf-8 -*-
- # 描述:
- # V1 WZJ 2016-12-20 Inventory重写封装api基本功能
- import fnmatch
- from ansible.compat.six import string_types, iteritems
- from ansible import constants as C
- from ansible.errors import AnsibleError
- from ansible.inventory.dir import InventoryDirectory, get_file_parser
- from ansible.inventory.group import Group
- from ansible.inventory.host import Host
- from ansible.module_utils._text import to_bytes, to_text
- from ansible.parsing.utils.addresses import parse_address
- from ansible.plugins import vars_loader
- from ansible.utils.vars import combine_vars
- from ansible.utils.path import unfrackpath
- try:
- from __main__ import display
- except ImportError:
- from ansible.utils.display import Display
- display = Display()
- HOSTS_PATTERNS_CACHE = {}
- from ansible.inventory import Inventory
- class YunweiInventory(Inventory):
- '''重写Inventory'''
- def __init__(self, loader, variable_manager, group_name, ext_vars=None,host_list=C.DEFAULT_HOST_LIST):
- # the host file file, or script path, or list of hosts
- # if a list, inventory data will NOT be loaded
- # self.host_list = unfrackpath(host_list, follow=False)
- # 传入的hosts
- self.host_list = host_list
- # 传入的项目名也是组名
- self.group_name = group_name
- # 传入的外部变量,格式为字典
- self.ext_vars = ext_vars
- # caching to avoid repeated calculations, particularly with
- # external inventory scripts.
- self._vars_per_host = {}
- self._vars_per_group = {}
- self._hosts_cache = {}
- self._pattern_cache = {}
- self._group_dict_cache = {}
- self._vars_plugins = []
- self._basedir = self.basedir()
- # Contains set of filenames under group_vars directories
- self._group_vars_files = self._find_group_vars_files(self._basedir)
- self._host_vars_files = self._find_host_vars_files(self._basedir)
- # to be set by calling set_playbook_basedir by playbook code
- self._playbook_basedir = None
- # the inventory object holds a list of groups
- self.groups = {}
- # a list of host(names) to contain current inquiries to
- self._restriction = None
- self._subset = None
- # clear the cache here, which is only useful if more than
- # one Inventory objects are created when using the API directly
- self.clear_pattern_cache()
- self.clear_group_dict_cache()
- self.parse_inventory(host_list)
- def parse_inventory(self, host_list):
- if isinstance(host_list, string_types):
- if "," in host_list:
- host_list = host_list.split(",")
- host_list = [ h for h in host_list if h and h.strip() ]
- self.parser = None
- # Always create the 'all' and 'ungrouped' groups, even if host_list is
- # empty: in this case we will subsequently an the implicit 'localhost' to it.
- ungrouped = Group('ungrouped')
- all = Group('all')
- all.add_child_group(ungrouped)
- # 加一个本地IP
- local_group = Group('local')
- # 自定义组名
- zdy_group_name = Group(self.group_name)
- self.groups = {self.group_name:zdy_group_name,"all":all,"ungrouped":ungrouped,"local":local_group}
- #self.groups = dict(all=all, ungrouped=ungrouped)
- if host_list is None:
- pass
- elif isinstance(host_list, list):
- # 默认添加一个本地IP
- (lhost, lport) = parse_address('127.0.0.1', allow_ranges=False)
- new_host = Host(lhost, lport)
- local_group.add_host(new_host)
- for h in host_list:
- try:
- (host, port) = parse_address(h, allow_ranges=False)
- except AnsibleError as e:
- display.vvv("Unable to parse address from hostname, leaving unchanged: %s" % to_text(e))
- host = h
- port = None
- new_host = Host(host, port)
- if h in C.LOCALHOST:
- # set default localhost from inventory to avoid creating an implicit one. Last localhost defined 'wins'.
- if self.localhost is not None:
- display.warning("A duplicate localhost-like entry was found (%s). First found localhost was %s" % (h, self.localhost.name))
- display.vvvv("Set default localhost to %s" % h)
- self.localhost = new_host
- # 为组添加host
- zdy_group_name.add_host(new_host)
- # 为主机组添加额外参数
- # 添加外部变量
- if self.ext_vars and isinstance(self.ext_vars,dict):
- for k,v in self.ext_vars.items():
- zdy_group_name.set_variable(k,v)
- local_group.set_variable(k,v)
- elif self._loader.path_exists(host_list):
- # TODO: switch this to a plugin loader and a 'condition' per plugin on which it should be tried, restoring 'inventory pllugins'
- if self.is_directory(host_list):
- # Ensure basedir is inside the directory
- host_list = os.path.join(self.host_list, "")
- self.parser = InventoryDirectory(loader=self._loader, groups=self.groups, filename=host_list)
- else:
- self.parser = get_file_parser(host_list, self.groups, self._loader)
- vars_loader.add_directory(self._basedir, with_subdir=True)
- if not self.parser:
- # should never happen, but JIC
- raise AnsibleError("Unable to parse %s as an inventory source" % host_list)
- else:
- display.warning("Host file not found: %s" % to_text(host_list))
- self._vars_plugins = [ x for x in vars_loader.all(self) ]
- # set group vars from group_vars/ files and vars plugins
- for g in self.groups:
- group = self.groups[g]
- group.vars = combine_vars(group.vars, self.get_group_variables(group.name))
- self.get_group_vars(group)
- # get host vars from host_vars/ files and vars plugins
- for host in self.get_hosts(ignore_limits=True, ignore_restrictions=True):
- host.vars = combine_vars(host.vars, self.get_host_variables(host.name))
- self.get_host_vars(host)
复制代码
二、collback重写,用于自定义输出
1.以callback.default.CallbackModule为基类继承
2.自定义了输出格式,正确输出情况,不需要的输出就忽略了,只留下几个有用的stdout
3.最后的输出结果:
- #-*- coding:utf-8 -*-
- # 描述:
- # V1 WZJ 2016-12-19 CallBack重写封装api基本功能,用于console输出
- import json
- from ansible import constants as C
- from ansible.plugins.callback.default import CallbackModule
- try:
- from __main__ import display
- except ImportError:
- from ansible.utils.display import Display
- display = Display()
- class YunweiCallback(CallbackModule):
- """重写console输出日志"""
- # 重写2.0版本正确stdout
- def v2_runner_on_ok(self, result):
- if self._play.strategy == 'free' and self._last_task_banner != result._task._uuid:
- self._print_task_banner(result._task)
- self._clean_results(result._result, result._task.action)
- #delegated_vars = result._result.get('_ansible_delegated_vars', None)
- delegated_vars = self._dump_results(result._result)
- #delegated_vars = result._result
- #n_delegated_vars = self._dump_results(result)
- #print n_delegated_vars
- self._clean_results(result._result, result._task.action)
- if result._task.action in ('include', 'include_role'):
- return
- elif result._result.get('changed', False):
- if delegated_vars:
- # 自定义输出
- zdy_msg = self.zdy_stdout(json.loads(delegated_vars))
- if zdy_msg:
- msg = "changed: [%s]%s" % (result._host.get_name(), zdy_msg)
- else:
- msg = "changed: [%s -> %s]" % (result._host.get_name(), delegated_vars)
- else:
- msg = "changed: [%s]" % result._host.get_name()
- color = C.COLOR_CHANGED
- # 判断是否是第一步 setup
- elif result._result.get('ansible_facts',False):
- msg = "ok: [ %s | %s ]" % (str(result._host),str(result._host.get_groups()))
- color = C.COLOR_OK
- else:
- if delegated_vars:
- # 自定义输出
- zdy_msg = self.zdy_stdout(json.loads(delegated_vars))
- if zdy_msg:
- msg = "ok: [%s]%s" % (result._host.get_name(), zdy_msg)
- else:
- msg = "ok: [%s -> %s]" % (result._host.get_name(), delegated_vars)
- else:
- msg = "ok: [%s]" % result._host.get_name()
- color = C.COLOR_OK
- if result._task.loop and 'results' in result._result:
- self._process_items(result)
- else:
- self._display.display(msg, color=color)
- self._handle_warnings(result._result)
- # 自定义输出,格式清晰一些
- def zdy_stdout(self,result):
- msg = ''
- if result.get('delta',False):
- msg += u'\t执行时间:%s'%result['delta']
- if result.get('cmd', False):
- msg += u'\n执行命令:%s'%result['cmd']
- if result.get('stderr',False):
- msg += u'\n错误输出:\n%s'%result['stderr']
- if result.get('stdout',False):
- msg += u'\n正确输出:\n%s'%result['stdout']
- if result.get('warnings',False):
- msg += u'\n警告:%s'%result['warnings']
- return msg
复制代码
三、封装PlaybookExecutor
1.封装为更容易调用的playbook api调用2.定制化一部分参数
- # -*- coding:utf-8 -*-
- # 描述:
- # V1 WZJ 2016-12-19 PlayBook重写封装api基本功能
- import json
- from ansible import constants as C
- from collections import namedtuple
- from ansible.parsing.dataloader import DataLoader
- from ansible.vars import VariableManager
- from ansible.playbook.play import Play
- from ansible.executor.task_queue_manager import TaskQueueManager
- from ansible.executor.playbook_executor import PlaybookExecutor
- from callback import YunweiCallback
- from ansible.utils.ssh_functions import check_for_controlpersist
- # 调用自定义Inventory
- from inventory import YunweiInventory as Inventory
- try:
- from __main__ import display
- except ImportError:
- from ansible.utils.display import Display
- display = Display()
- class YunweiPlaybookExecutor(PlaybookExecutor):
- '''重写PlayBookExecutor'''
- def __init__(self, playbooks, inventory, variable_manager, loader, options, passwords, stdout_callback=None):
- self._playbooks = playbooks
- self._inventory = inventory
- self._variable_manager = variable_manager
- self._loader = loader
- self._options = options
- self.passwords = passwords
- self._unreachable_hosts = dict()
- if options.listhosts or options.listtasks or options.listtags or options.syntax:
- self._tqm = None
- else:
- self._tqm = TaskQueueManager(inventory=inventory, variable_manager=variable_manager, loader=loader, options=options, passwords=self.passwords, stdout_callback=stdout_callback)
- # Note: We run this here to cache whether the default ansible ssh
- # executable supports control persist. Sometime in the future we may
- # need to enhance this to check that ansible_ssh_executable specified
- # in inventory is also cached. We can't do this caching at the point
- # where it is used (in task_executor) because that is post-fork and
- # therefore would be discarded after every task.
- check_for_controlpersist(C.ANSIBLE_SSH_EXECUTABLE)
- class PlayBookJob(object):
- '''封装一个playbook接口,提供给外部使用'''
- def __init__(self,playbooks,host_list,ssh_user='bbs',passwords='null',project_name='all',ack_pass=False,forks=5,ext_vars=None):
- self.playbooks = playbooks
- self.host_list = host_list
- self.ssh_user = ssh_user
- self.passwords = dict(vault_pass=passwords)
- self.project_name = project_name
- self.ack_pass = ack_pass
- self.forks = forks
- self.connection='smart'
- self.ext_vars = ext_vars
- ## 用来加载解析yaml文件或JSON内容,并且支持vault的解密
- self.loader = DataLoader()
- # 管理变量的类,包括主机,组,扩展等变量,之前版本是在 inventory中的
- self.variable_manager = VariableManager()
- # 根据inventory加载对应变量
- self.inventory = Inventory(loader=self.loader,
- variable_manager=self.variable_manager,
- group_name=self.project_name, # 项目名对应组名,区分当前执行的内容
- ext_vars=self.ext_vars,
- host_list=self.host_list)
- self.variable_manager.set_inventory(self.inventory)
- # 初始化需要的对象1
- self.Options = namedtuple('Options',
- ['connection',
- 'remote_user',
- 'ask_sudo_pass',
- 'verbosity',
- 'ack_pass',
- 'module_path',
- 'forks',
- 'become',
- 'become_method',
- 'become_user',
- 'check',
- 'listhosts',
- 'listtasks',
- 'listtags',
- 'syntax',
- 'sudo_user',
- 'sudo'
- ])
- # 初始化需要的对象2
- self.options = self.Options(connection=self.connection,
- remote_user=self.ssh_user,
- ack_pass=self.ack_pass,
- sudo_user=self.ssh_user,
- forks=self.forks,
- sudo='yes',
- ask_sudo_pass=False,
- verbosity=5,
- module_path=None,
- become=True,
- become_method='sudo',
- become_user='root',
- check=None,
- listhosts=None,
- listtasks=None,
- listtags=None,
- syntax=None
- )
- # 初始化console输出
- self.callback = YunweiCallback()
- # 直接开始
- self.run()
- def run(self):
- pb = None
- pb = YunweiPlaybookExecutor(
- playbooks = self.playbooks,
- inventory = self.inventory,
- variable_manager = self.variable_manager,
- loader = self.loader,
- options = self.options,
- passwords = self.passwords,
- stdout_callback = self.callback
- )
- result = pb.run()
- # daemo
- if __name__ == "__main__":
- PlayBookJob(playbooks=['xx.yml'],
- host_list=['10.45.176.2'],
- ssh_user='root',
- project_name="test",
- forks=20,
- ext_vars=None
- )
复制代码
参考:
ansible-api官方文档:http://docs.ansible.com/ansib...
一个非常好的ansible2.0 api调用文档,相见恨晚:https://serversforhackers.com...
ansible源码:https://github.com/ansible/an...
原文作者: 青云2016 来源:开发者头条
|
-
|