Skip to content

Instantly share code, notes, and snippets.

@retr0h
Last active June 23, 2017 21:33
Show Gist options
  • Save retr0h/14abdaf70b1a3eab979bd72c283ba863 to your computer and use it in GitHub Desktop.
Save retr0h/14abdaf70b1a3eab979bd72c283ba863 to your computer and use it in GitHub Desktop.
import json
import ansible
import ansible.cli
import ansible.cli.playbook
import ansible.executor.task_queue_manager
import ansible.inventory
import ansible.parsing.dataloader
import ansible.playbook.play
import ansible.plugins.callback
import ansible.utils.vars
import ansible.vars
from collections import namedtuple
class ResultCallback(ansible.plugins.callback.CallbackBase):
def v2_runner_on_ok(self, result, **kwargs):
host = result._host
print json.dumps({host.name: result._result}, indent=4)
def v2_runner_on_failed(self, result, ignore_errors=False):
print result._result
class AnsibleRunnerBase(object):
def __init__(self, host_list=None):
self.host_list = host_list
super(AnsibleRunnerBase, self).__init__()
def get_hosts(self, pattern=None):
raise NotImplementedError
def get_variables(self, host):
raise NotImplementedError
def run(self, host, module_name, module_args, **kwargs):
raise NotImplementedError
class AnsibleRunner(AnsibleRunnerBase):
Options = namedtuple('Options', [
'connection', 'module_path', 'forks', 'become', 'become_method',
'become_user', 'check'
])
variable_manager = ansible.vars.VariableManager()
loader = ansible.parsing.dataloader.DataLoader()
options = Options(
connection='local',
module_path=None,
forks=100,
become=None,
become_method=None,
become_user=None,
check=False)
passwords = dict(vault_pass='secret')
results_callback = ResultCallback()
inventory = ansible.inventory.Inventory(
loader=loader,
variable_manager=variable_manager,
host_list='localhost')
variable_manager.set_inventory(inventory)
play_source = dict(
hosts='localhost',
connection='local',
gather_facts='no',
roles=['stockclerk.debug'], )
play = ansible.playbook.play.Play().load(
play_source, variable_manager=variable_manager, loader=loader)
tqm = None
try:
tqm = ansible.executor.task_queue_manager.TaskQueueManager(
inventory=inventory,
variable_manager=variable_manager,
loader=loader,
options=options,
passwords=passwords,
stdout_callback=results_callback, )
result = tqm.run(play)
print result
finally:
if tqm is not None:
tqm.cleanup()
ar = AnsibleRunner()
$ ANSIBLE_LIBRARY=library ANSIBLE_ROLE_PATH=./roles python runner.py
[WARNING]: Host file not found: localhost
{
"invocation": {
"module_name": "debug",
"module_args": {
"var": "vars"
}
},
"changed": false,
"_ansible_verbose_always": true,
"vars": {
"omit": "__omit_place_holder__375436603ca5bc466ccfbb58e31a1a81f5491479",
"ansible_osrevision": "199506",
"ansible_user_gecos": "John Dewey",
"ansible_fips": false,
"ansible_service_mgr": "/sbin/launchd",
"ansible_vboxnet0": {
"macaddress": "0a:00:27:00:00:00",
"mtu": "1500",
"flags": [
"UP",
...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment