zabbix by julianobarbosa/claude-code-skills
npx skills add https://github.com/julianobarbosa/claude-code-skills --skill zabbix本技能提供通过 API 和官方 Python 库 zabbix_utils 自动化 Zabbix 监控操作的指导。
pip install zabbix-utils --break-system-packages
from zabbix_utils import ZabbixAPI
# 选项 1:用户名/密码
api = ZabbixAPI(url="https://zabbix.example.com")
api.login(user="Admin", password="zabbix")
# 选项 2:API 令牌(Zabbix 5.4+,推荐)
api = ZabbixAPI(url="https://zabbix.example.com")
api.login(token="your_api_token")
# 验证连接
print(api.api_version())
import os
from zabbix_utils import ZabbixAPI
api = ZabbixAPI(url=os.environ.get("ZABBIX_URL", "http://localhost/zabbix"))
api.login(token=os.environ["ZABBIX_TOKEN"])
所有 API 都遵循模式:api.<对象>.<方法>(),方法包括:、、、。
广告位招租
在这里展示您的产品或服务
触达数万 AI 开发者,精准高效
getcreateupdatedelete# 获取主机
hosts = api.host.get(output=["hostid", "host", "name"],
selectInterfaces=["ip"])
# 创建主机
api.host.create(
host="server01",
groups=[{"groupid": "2"}], # Linux 服务器
interfaces=[{
"type": 1, # 1=agent, 2=SNMP, 3=IPMI, 4=JMX
"main": 1,
"useip": 1,
"ip": "192.168.1.100",
"dns": "",
"port": "10050"
}],
templates=[{"templateid": "10001"}]
)
# 更新主机
api.host.update(hostid="10084", status=0) # 0=启用, 1=禁用
# 删除主机
api.host.delete("10084")
# 获取模板
templates = api.template.get(output=["templateid", "host", "name"],
selectHosts=["hostid", "name"])
# 将模板链接到主机
api.host.update(hostid="10084",
templates=[{"templateid": "10001"}])
# 从 XML 导入模板
with open("template.xml") as f:
api.configuration.import_(
source=f.read(),
format="xml",
rules={
"templates": {"createMissing": True, "updateExisting": True},
"items": {"createMissing": True, "updateExisting": True},
"triggers": {"createMissing": True, "updateExisting": True}
}
)
# 获取监控项
items = api.item.get(hostids="10084",
output=["itemid", "name", "key_"],
search={"key_": "system.cpu"})
# 创建监控项
api.item.create(
name="CPU 负载",
key_="system.cpu.load[percpu,avg1]",
hostid="10084",
type=0, # 0=Zabbix agent
value_type=0, # 0=float, 3=integer, 4=text
delay="30s",
interfaceid="1"
)
# 获取触发器
triggers = api.trigger.get(hostids="10084",
output=["triggerid", "description", "priority"],
selectFunctions="extend")
# 创建触发器
api.trigger.create(
description="{HOST.NAME} 上 CPU 过高",
expression="last(/server01/system.cpu.load[percpu,avg1])>5",
priority=3 # 0=未分类, 1=信息, 2=警告, 3=一般严重, 4=严重, 5=灾难
)
# 获取组
groups = api.hostgroup.get(output=["groupid", "name"])
# 创建组
api.hostgroup.create(name="生产环境/Web 服务器")
# 将主机添加到组
api.hostgroup.massadd(groups=[{"groupid": "5"}],
hosts=[{"hostid": "10084"}])
import time
# 创建维护
api.maintenance.create(
name="服务器维护",
active_since=int(time.time()),
active_till=int(time.time()) + 3600, # 1 小时
hostids=["10084"],
timeperiods=[{
"timeperiod_type": 0, # 一次性
"period": 3600
}]
)
# 获取当前问题
problems = api.problem.get(output=["eventid", "name", "severity"],
recent=True)
# 获取事件
events = api.event.get(hostids="10084",
time_from=int(time.time()) - 86400,
output="extend")
# 获取历史数据(value_type 必须与监控项的 value_type 匹配)
# 0=float, 1=character, 2=log, 3=integer, 4=text
history = api.history.get(
itemids="28269",
history=0, # float
time_from=int(time.time()) - 3600,
output="extend",
sortfield="clock",
sortorder="DESC"
)
from zabbix_utils import Sender
sender = Sender(server="zabbix.example.com", port=10051)
# 发送单个值
response = sender.send_value("hostname", "trap.key", "value123")
print(response) # {"processed": 1, "failed": 0, "total": 1}
# 发送多个值
from zabbix_utils import ItemValue
values = [
ItemValue("host1", "key1", "value1"),
ItemValue("host2", "key2", 42),
]
response = sender.send(values)
from zabbix_utils import Getter
agent = Getter(host="192.168.1.100", port=10050)
response = agent.get("system.uname")
print(response.value)
import csv
from zabbix_utils import ZabbixAPI
api = ZabbixAPI(url="https://zabbix.example.com")
api.login(token="your_token")
with open("hosts.csv") as f:
for row in csv.DictReader(f):
try:
api.host.create(
host=row["hostname"],
groups=[{"groupid": row["groupid"]}],
interfaces=[{
"type": 1, "main": 1, "useip": 1,
"ip": row["ip"], "dns": "", "port": "10050"
}]
)
print(f"已创建: {row['hostname']}")
except Exception as e:
print(f"失败 {row['hostname']}: {e}")
# 获取所有主机
all_hosts = api.host.get(output=["hostid", "host"],
selectParentTemplates=["templateid"])
# 过滤没有特定模板的主机
template_id = "10001"
hosts_without = [h for h in all_hosts
if not any(t["templateid"] == template_id
for t in h.get("parentTemplates", []))]
triggers = api.trigger.get(
search={"description": "test"},
output=["triggerid"]
)
for t in triggers:
api.trigger.update(triggerid=t["triggerid"], status=1) # 1=禁用
| 类型 | 值 | 描述 |
|---|---|---|
| Zabbix agent | 0 | 主动检查 |
| Zabbix trapper | 2 | 被动,通过 sender 推送数据 |
| Simple check | 3 | ICMP、TCP 等 |
| Zabbix internal | 5 | 服务器内部指标 |
| Zabbix agent (active) | 7 | Agent 主动发起 |
| HTTP agent | 19 | HTTP/REST API 监控 |
| Dependent item | 18 | 从主监控项派生 |
| Script | 21 | 自定义脚本 |
| 类型 | 值 | 描述 |
|---|---|---|
| Float | 0 | 数值(浮点) |
| Character | 1 | 字符串 |
| Log | 2 | 日志文件 |
| Unsigned | 3 | 数值(整数) |
| Text | 4 | 文本 |
| 严重性 | 值 | 颜色 |
|---|---|---|
| Not classified | 0 | 灰色 |
| Information | 1 | 浅蓝色 |
| Warning | 2 | 黄色 |
| Average | 3 | 橙色 |
| High | 4 | 浅红色 |
| Disaster | 5 | 红色 |
from zabbix_utils import ZabbixAPI
from zabbix_utils.exceptions import APIRequestError
try:
api.host.create(host="duplicate_host", groups=[{"groupid": "2"}])
except APIRequestError as e:
print(f"API 错误: {e.message}")
print(f"代码: {e.code}")
import logging
logging.basicConfig(level=logging.DEBUG)
# 现在所有 API 调用都将被记录
查看 scripts/ 目录以获取现成的自动化脚本:
zabbix-bulk-hosts.py - 从 CSV 进行批量主机管理zabbix-maintenance.py - 创建/管理维护窗口zabbix-export.py - 将主机/模板导出为 JSON/XMLoutput=["field1", "field2"] 而不是 output="extend"limit 和 offsetmassadd、massupdate 进行批量更改每周安装次数
94
仓库
GitHub 星标数
41
首次出现
2026年1月24日
安全审计
安装于
opencode81
codex78
cursor74
gemini-cli73
github-copilot71
amp66
This skill provides guidance for automating Zabbix monitoring operations via the API and official Python library zabbix_utils.
pip install zabbix-utils --break-system-packages
from zabbix_utils import ZabbixAPI
# Option 1: Username/password
api = ZabbixAPI(url="https://zabbix.example.com")
api.login(user="Admin", password="zabbix")
# Option 2: API token (Zabbix 5.4+, preferred)
api = ZabbixAPI(url="https://zabbix.example.com")
api.login(token="your_api_token")
# Verify connection
print(api.api_version())
import os
from zabbix_utils import ZabbixAPI
api = ZabbixAPI(url=os.environ.get("ZABBIX_URL", "http://localhost/zabbix"))
api.login(token=os.environ["ZABBIX_TOKEN"])
All APIs follow pattern: api.<object>.<method>() with methods: get, create, update, delete.
# Get hosts
hosts = api.host.get(output=["hostid", "host", "name"],
selectInterfaces=["ip"])
# Create host
api.host.create(
host="server01",
groups=[{"groupid": "2"}], # Linux servers
interfaces=[{
"type": 1, # 1=agent, 2=SNMP, 3=IPMI, 4=JMX
"main": 1,
"useip": 1,
"ip": "192.168.1.100",
"dns": "",
"port": "10050"
}],
templates=[{"templateid": "10001"}]
)
# Update host
api.host.update(hostid="10084", status=0) # 0=enabled, 1=disabled
# Delete host
api.host.delete("10084")
# Get templates
templates = api.template.get(output=["templateid", "host", "name"],
selectHosts=["hostid", "name"])
# Link template to host
api.host.update(hostid="10084",
templates=[{"templateid": "10001"}])
# Import template from XML
with open("template.xml") as f:
api.configuration.import_(
source=f.read(),
format="xml",
rules={
"templates": {"createMissing": True, "updateExisting": True},
"items": {"createMissing": True, "updateExisting": True},
"triggers": {"createMissing": True, "updateExisting": True}
}
)
# Get items
items = api.item.get(hostids="10084",
output=["itemid", "name", "key_"],
search={"key_": "system.cpu"})
# Create item
api.item.create(
name="CPU Load",
key_="system.cpu.load[percpu,avg1]",
hostid="10084",
type=0, # 0=Zabbix agent
value_type=0, # 0=float, 3=integer, 4=text
delay="30s",
interfaceid="1"
)
# Get triggers
triggers = api.trigger.get(hostids="10084",
output=["triggerid", "description", "priority"],
selectFunctions="extend")
# Create trigger
api.trigger.create(
description="High CPU on {HOST.NAME}",
expression="last(/server01/system.cpu.load[percpu,avg1])>5",
priority=3 # 0=not classified, 1=info, 2=warning, 3=average, 4=high, 5=disaster
)
# Get groups
groups = api.hostgroup.get(output=["groupid", "name"])
# Create group
api.hostgroup.create(name="Production/Web Servers")
# Add hosts to group
api.hostgroup.massadd(groups=[{"groupid": "5"}],
hosts=[{"hostid": "10084"}])
import time
# Create maintenance
api.maintenance.create(
name="Server Maintenance",
active_since=int(time.time()),
active_till=int(time.time()) + 3600, # 1 hour
hostids=["10084"],
timeperiods=[{
"timeperiod_type": 0, # One-time
"period": 3600
}]
)
# Get current problems
problems = api.problem.get(output=["eventid", "name", "severity"],
recent=True)
# Get events
events = api.event.get(hostids="10084",
time_from=int(time.time()) - 86400,
output="extend")
# Get history (value_type must match item's value_type)
# 0=float, 1=character, 2=log, 3=integer, 4=text
history = api.history.get(
itemids="28269",
history=0, # float
time_from=int(time.time()) - 3600,
output="extend",
sortfield="clock",
sortorder="DESC"
)
from zabbix_utils import Sender
sender = Sender(server="zabbix.example.com", port=10051)
# Send single value
response = sender.send_value("hostname", "trap.key", "value123")
print(response) # {"processed": 1, "failed": 0, "total": 1}
# Send multiple values
from zabbix_utils import ItemValue
values = [
ItemValue("host1", "key1", "value1"),
ItemValue("host2", "key2", 42),
]
response = sender.send(values)
from zabbix_utils import Getter
agent = Getter(host="192.168.1.100", port=10050)
response = agent.get("system.uname")
print(response.value)
import csv
from zabbix_utils import ZabbixAPI
api = ZabbixAPI(url="https://zabbix.example.com")
api.login(token="your_token")
with open("hosts.csv") as f:
for row in csv.DictReader(f):
try:
api.host.create(
host=row["hostname"],
groups=[{"groupid": row["groupid"]}],
interfaces=[{
"type": 1, "main": 1, "useip": 1,
"ip": row["ip"], "dns": "", "port": "10050"
}]
)
print(f"Created: {row['hostname']}")
except Exception as e:
print(f"Failed {row['hostname']}: {e}")
# Get all hosts
all_hosts = api.host.get(output=["hostid", "host"],
selectParentTemplates=["templateid"])
# Filter hosts without specific template
template_id = "10001"
hosts_without = [h for h in all_hosts
if not any(t["templateid"] == template_id
for t in h.get("parentTemplates", []))]
triggers = api.trigger.get(
search={"description": "test"},
output=["triggerid"]
)
for t in triggers:
api.trigger.update(triggerid=t["triggerid"], status=1) # 1=disabled
| Type | Value | Description |
|---|---|---|
| Zabbix agent | 0 | Active checks |
| Zabbix trapper | 2 | Passive, data pushed via sender |
| Simple check | 3 | ICMP, TCP, etc. |
| Zabbix internal | 5 | Server internal metrics |
| Zabbix agent (active) | 7 | Agent-initiated |
| HTTP agent | 19 | HTTP/REST API monitoring |
| Dependent item | 18 | Derived from master item |
| Script | 21 | Custom scripts |
| Type | Value | Description |
|---|---|---|
| Float | 0 | Numeric (float) |
| Character | 1 | Character string |
| Log | 2 | Log file |
| Unsigned | 3 | Numeric (integer) |
| Text | 4 | Text |
| Severity | Value | Color |
|---|---|---|
| Not classified | 0 | Gray |
| Information | 1 | Light blue |
| Warning | 2 | Yellow |
| Average | 3 | Orange |
| High | 4 | Light red |
| Disaster | 5 | Red |
from zabbix_utils import ZabbixAPI
from zabbix_utils.exceptions import APIRequestError
try:
api.host.create(host="duplicate_host", groups=[{"groupid": "2"}])
except APIRequestError as e:
print(f"API Error: {e.message}")
print(f"Code: {e.code}")
import logging
logging.basicConfig(level=logging.DEBUG)
# Now all API calls will be logged
See scripts/ directory for ready-to-use automation:
zabbix-bulk-hosts.py - Bulk host management from CSVzabbix-maintenance.py - Create/manage maintenance windowszabbix-export.py - Export hosts/templates to JSON/XMLoutput=["field1", "field2"] instead of output="extend"limit and offsetmassadd, massupdate for bulk changesWeekly Installs
94
Repository
GitHub Stars
41
First Seen
Jan 24, 2026
Security Audits
Gen Agent Trust HubPassSocketPassSnykFail
Installed on
opencode81
codex78
cursor74
gemini-cli73
github-copilot71
amp66
Azure Data Explorer (Kusto) 查询技能:KQL数据分析、日志遥测与时间序列处理
130,600 周安装
Whale Index 自动镜像交易策略 | 跟随顶级交易者,智能评分与每日再平衡
121 周安装
WebSocket 安全最佳实践:预防 CSWSH 攻击与连接管理指南
126 周安装
macOS原生开发技能 - 提升Claude在macOS平台的集成与自动化能力
134 周安装
小说结局诊断指南:如何写出既必然又惊喜的完美结局 | 写作技巧与问题解决方案
93 周安装
GitHub自动化工具:通过Rube MCP与Composio自动化管理仓库、问题、PR和CI/CD
156 周安装
Hyperliquid 机会扫描器 v5:AI 驱动的量化交易工具,低代币消耗筛选永续合约机会
128 周安装