一、准备工作¶
2.1.1 准备Ansible环境¶
1、准备两台机器,机器配置无需太高,比如1c2g,这里建议大家使用阿里云按量付费机器。





2、安装Ansible
两台机器都执行
说明:以下命令为阿里云alibaba cloud linux系统
dnf install python38 python38-pip ##新版本ansible需要python3.8或更高版本
pip3.8 install pip --upgrade ##升级pip版本
pip3.8 install ansible ##用pip安装ansible
3、配置Ansible
两台机器,其中一台机器作为控制端,另外一台作为被控制端
设置主机名
hostnamectl set-hostname aming01 ##控制端
hostnamectl set-hostname aming02 ##被控制端
编写hosts,两台机器都执行
编辑/etc/hosts 文件,最后面增加
192.168.118.34 aming01
192.168.118.35 aming02
做免密登录,在控制端执行
## 生成密钥对
ssh-keygen -t rsa
## 将公钥拷贝到被控制机
ssh-copy-id root@aming01 ## 本机到本机的免密
ssh-copy-id root@aming02 ## 本机到被控制端
创建主机清单配置,控制端执行
mkdir -p /etc/ansible
vi /etc/ansible/hosts ##最后面增加
[coze]
aming01 ansible_python_interpreter=/usr/bin/python3.8
aming02 ansible_python_interpreter=/usr/bin/python3.8
测试
ansible all --list-hosts ##列出所有主机组合主机
ansible aming02 -m command -a 'hostname' ##查看aming02主机名
2.1.2 编写ansible api服务脚本并开启API¶
1、在Ansible控制端编写api服务脚本
创建目录
mkdir /opt/ansible_api
cd /opt/ansible_api
编写脚本 main.py,内容如下:
from flask import Flask, request, jsonify
import ansible_runner
import os
import tempfile
import logging
from ansible.inventory.manager import InventoryManager
from ansible.parsing.dataloader import DataLoader
app = Flask(__name__)
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
API_TOKEN = "<your-api-token>"
DEFAULT_INVENTORY_PATH = "/etc/ansible/hosts" # 系统默认inventory文件路径
PLAYBOOK_DIR = "/opt/ansible_api/playbook/" # playbook文件所在目录
# 确保playbook目录存在
if not os.path.exists(PLAYBOOK_DIR):
os.makedirs(PLAYBOOK_DIR)
logger.info(f"Created playbook directory: {os.path.abspath(PLAYBOOK_DIR)}")
def read_file_content(file_obj):
"""安全读取文件对象内容"""
if file_obj is None:
return ""
if hasattr(file_obj, 'read'):
try:
return file_obj.read()
except Exception as e:
logger.error(f"Error reading file: {e}")
return str(file_obj)
return str(file_obj)
def get_inventory_groups(inventory_path):
"""从inventory文件中获取所有组名"""
try:
loader = DataLoader()
inventory_manager = InventoryManager(loader=loader, sources=inventory_path)
groups = list(inventory_manager.groups.keys())
logger.info(f"Found groups in inventory: {groups}")
return groups
except Exception as e:
logger.error(f"Failed to get inventory groups: {e}")
return []
def get_inventory_hosts(inventory_path, group_name=None):
"""从inventory文件中获取主机列表"""
try:
loader = DataLoader()
inventory_manager = InventoryManager(loader=loader, sources=inventory_path)
if group_name and group_name in inventory_manager.groups:
group = inventory_manager.groups[group_name]
hosts = [host.name for host in group.hosts]
logger.info(f"Found hosts in group '{group_name}': {hosts}")
return hosts
else:
# 获取所有主机
hosts = [host.name for host in inventory_manager.get_hosts()]
logger.info(f"Found all hosts in inventory: {hosts}")
return hosts
except Exception as e:
logger.error(f"Failed to get inventory hosts: {e}")
return []
@app.route('/run', methods=['POST'])
def run_playbook():
# 验证Token
if request.headers.get('X-API-Token') != API_TOKEN:
return jsonify({"error": "Unauthorized"}), 401
# 获取请求参数
try:
data = request.get_json()
if not data:
return jsonify({"error": "Invalid JSON"}), 400
except Exception as e:
logger.error(f"JSON parse error: {e}")
return jsonify({"error": f"JSON parse error: {str(e)}"}), 400
playbook = data.get('playbook')
inventory = data.get('inventory', 'default') # 默认使用系统inventory
# 验证必填参数
if not playbook:
return jsonify({"error": "Missing playbook"}), 400
if not inventory:
return jsonify({"error": "Missing inventory"}), 400
# 处理playbook参数 - 安全检查并构建完整路径
# 移除路径中的任何目录部分,只保留文件名
playbook_filename = os.path.basename(playbook)
playbook_path = os.path.join(PLAYBOOK_DIR, playbook_filename)
# 检查playbook文件是否存在
if not os.path.isfile(playbook_path):
return jsonify({
"error": f"Playbook file '{playbook_filename}' not found in {os.path.abspath(PLAYBOOK_DIR)}",
"available_playbooks": os.listdir(PLAYBOOK_DIR) if os.path.exists(PLAYBOOK_DIR) else []
}), 400
# 创建临时工作目录
with tempfile.TemporaryDirectory() as tmp_dir:
# 处理inventory参数
limit = None # 用于限制执行的主机或组
if isinstance(inventory, str):
# 检查系统inventory文件是否存在
if os.path.exists(DEFAULT_INVENTORY_PATH):
# 获取系统inventory中的所有组
system_groups = get_inventory_groups(DEFAULT_INVENTORY_PATH)
if inventory.lower() in ['all', 'default']:
# 使用系统默认inventory,不限制组
inventory_path = DEFAULT_INVENTORY_PATH
logger.info(f"Using system default inventory: {inventory_path}")
elif inventory in system_groups:
# 使用系统inventory,并限制到指定组
inventory_path = DEFAULT_INVENTORY_PATH
limit = inventory
logger.info(f"Using system inventory with group limit: {limit}")
elif os.path.isfile(inventory):
# 使用指定的inventory文件路径
inventory_path = inventory
logger.info(f"Using specified inventory file: {inventory_path}")
else:
# 检查是否是主机列表(逗号分隔)
hosts = [host.strip() for host in inventory.split(',')]
# 检查是否有保留字"all"
if any(host.lower() == 'all' for host in hosts):
return jsonify({"error": "Cannot use 'all' as a hostname. It is a reserved word in Ansible."}), 400
# 检查是否有系统inventory中的组名
if any(host in system_groups for host in hosts):
return jsonify({
"error": f"Cannot use group names as hostnames. Found group names in: {hosts}. "
f"Use the group name directly to target all hosts in that group."
}), 400
# 创建临时inventory文件
inv_content = f"[all]\n" + "\n".join(hosts)
inv_path = os.path.join(tmp_dir, "inventory")
with open(inv_path, 'w') as f:
f.write(inv_content)
inventory_path = inv_path
logger.info(f"Created temporary inventory with hosts: {hosts}")
else:
# 系统inventory文件不存在,只能使用主机列表或指定文件
if os.path.isfile(inventory):
inventory_path = inventory
logger.info(f"Using specified inventory file: {inventory_path}")
else:
# 检查是否是主机列表(逗号分隔)
hosts = [host.strip() for host in inventory.split(',')]
# 检查是否有保留字"all"
if any(host.lower() == 'all' for host in hosts):
return jsonify({"error": "Cannot use 'all' as a hostname. It is a reserved word in Ansible."}), 400
# 创建临时inventory文件
inv_content = f"[all]\n" + "\n".join(hosts)
inv_path = os.path.join(tmp_dir, "inventory")
with open(inv_path, 'w') as f:
f.write(inv_content)
inventory_path = inv_path
logger.info(f"Created temporary inventory with hosts: {hosts}")
# 执行Ansible
try:
logger.info(f"Running playbook: {playbook_path}")
logger.info(f"Inventory: {inventory_path}")
if limit:
logger.info(f"Limiting to group/host: {limit}")
# 使用正确的参数调用ansible_runner(移除了extravars参数)
result = ansible_runner.run(
private_data_dir=tmp_dir,
playbook=playbook_path, # 使用完整路径
inventory=inventory_path,
limit=limit, # 添加limit参数
quiet=True # 禁止输出到控制台
)
# 获取输出内容
stdout_content = read_file_content(result.stdout)
stderr_content = read_file_content(result.stderr)
# 获取统计信息
stats = {}
if hasattr(result, 'stats'):
stats = result.stats
logger.info(f"Playbook execution completed with RC: {result.rc}")
return jsonify({
"status": "success",
"rc": result.rc,
"stdout": stdout_content,
"stderr": stderr_content,
"stats": stats
})
except Exception as e:
logger.error(f"Ansible execution failed: {e}", exc_info=True)
return jsonify({
"error": f"Ansible execution failed: {str(e)}",
"traceback": str(e.__traceback__) if hasattr(e, '__traceback__') else None
}), 500
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
2、安装依赖模块
python3.8 -m pip install flask ansible-runner
3、开启服务
nohup python3.8 main.py >> ansible_api.log 2>>ansible_api.log &
4、编写测试playbook
创建playbook目录,以后所有的playbook文件全部放到这里
mkdir /opt/ansible_api/playbook
cd /opt/ansible_api/playbook
vi test_playbook.yml #内容为
- name: Simple Example Playbook
hosts: all
become: yes # 使用sudo权限执行任务
tasks:
- name: Create a directory
ansible.builtin.file:
path: /tmp/ansible_test
state: directory
mode: '0755'
5、使用curl测试接口是否成功
curl -X POST http://localhost:5000/run \
-H "Content-Type: application/json" \
-H "X-API-Token: <your-api-token>" \
-d '{
"playbook": "test_playbook.yml",
"inventory": "aming01"
}'
2.1.3 编写playbook¶
起初阶段,我们可以先自行手动编写playbook,后期可以借助LLM自动生成。
几个常见的场景:重启某服务、执行一个shell脚本、查看系统负载、查看磁盘使用情况等
1、重启服务
在/opt/ansible_api/playbook目录下创建 restart_nginx.yml
---
- name: Restart Nginx with systemd module
hosts: all
become: yes
tasks:
- name: Restart nginx service
ansible.builtin.systemd:
name: nginx
state: restarted
2、执行shell脚本
在/opt/ansible_api/playbook目录下创建shell_script.yml
说明:shell脚本在ansible控制端,会自动传输到远程
---
- name: Execute shell script
hosts: all
tasks:
- name: Run local script on remote hosts
script: /path/to/local_script.sh # 本地脚本路径
args:
chdir: /tmp # 可选:指定远程执行目录
register: result
changed_when: false # 避免误报 changed 状态
- name: Print script output
debug:
var: result.stdout
3、查看系统负载
在/opt/ansible_api/playbook目录下创建get_systemload.yml
---
- name: Check System Load Average
hosts: all
gather_facts: no # 禁用 facts 收集以提高效率
tasks:
- name: Get system load via uptime
command: uptime
register: uptime_result
changed_when: false # 此命令不会改变系统状态
- name: Display load information
debug:
msg: |
Host: {{ inventory_hostname }}
System Info: {{ uptime_result.stdout }}
Load Average (1m/5m/15m): {{ uptime_result.stdout | regex_replace('.*load average: (.*)', '\\1') }}
4、查看磁盘
在/opt/ansible_api/playbook目录下创建get_diskinfo.yml
---
- name: Check Disk Usage
hosts: all
vars:
warn_threshold: 80 # 警告阈值(百分比)
crit_threshold: 90 # 严重阈值(百分比)
exclude_fs: # 排除的文件系统类型
- tmpfs
- devtmpfs
- squashfs
- overlay
- nfs
- cifs
tasks:
- name: Get disk usage information
shell: |
df -h | awk 'NR>1 {
gsub(/%/, "", $5);
if ($5 >= {{ crit_threshold }}) {
printf "\033[1;31mCRITICAL\033[0m";
} else if ($5 >= {{ warn_threshold }}) {
printf "\033[1;33mWARNING\033[0m";
} else {
printf "\033[1;32mOK\033[0m";
}
printf " %s %s %s %s%% %s\n", $1, $2, $3, $5, $6;
}' | grep -vE '{{ exclude_fs | join("|") }}'
register: disk_info
changed_when: false
failed_when: false
- name: Display disk usage report
debug:
msg: |
===== Disk Usage Report for {{ inventory_hostname }} =====
Status Filesystem Size Used Use% Mountpoint
---------------------------------------------------------
{{ disk_info.stdout }}
{% if disk_info.stderr %}
Errors encountered:
{{ disk_info.stderr }}
{% endif %}