基于观测云 DataKit 实现 H3C 路由器有源 Ping 链路质量监控

    banner-2.png

    背景:为什么传统监控不够?

    在企业网络运维中,仅依赖设备存活(如 ICMP 或 SNMP)无法全面反映链路质量。尤其在多出口、策略路由或 VRF 场景下,需要从指定源地址发起探测,以真实反映业务路径质量。但是设备存活监控(ICMP/SNMP)只能回答"通不通",无法回答"好不好"。尤其在以下场景,传统方案存在明显盲区:

    场景 传统监控局限 业务影响
    多出口链路 默认路由探测,无法覆盖备用链路 主链路故障时,备用链路质量未知
    策略路由(PBR) 探测源地址与实际业务流不一致 监控正常,但用户访问卡顿
    VRF 隔离网络 跨 VRF 路由不可见 私网互通问题无法提前发现
    云专线/SD-WAN 公网探测路径与专线路径分离 专线质量劣化无感知

    为了弥补传统监控盲区,下面借助观测云,实现从指定源地址发起探测,真实还原业务流量的实际转发路径。

    方案架构:DataKit + SSH 有源探测

    本方案基于观测云 DataKit 的 PythonD 插件,通过 SSH 登录 H3C 路由器执行有源 Ping,实现以下目标:

    • 多链路质量监控(指定源 IP)
    • 延迟 / 丢包 / 抖动指标采集
    • 统一指标上报观测云

    整体架构

    关键技术选型

    组件 选型 理由
    采集器 DataKit PythonD 观测云原生插件,支持自定义 Point 上报
    SSH 库 Paramiko Python 生态成熟,支持密钥认证、长连接复用
    目标设备 H3C 路由器 支持 -a 参数指定源地址,Comware 系统通用
    传输协议 HTTP/HTTPS 观测云标准接入,支持压缩与批量上报

    核心能力:有源 Ping 原理

    H3C 有源 Ping 命令

    H3C 支持通过指定源地址进行 Ping。

    示例:

    ping -a 172.30.253.1 172.30.253.6
    Ping 172.30.253.6 (172.30.253.6) from 172.30.253.1: 56 data bytes, press CTRL_C to break
    56 bytes from 172.30.253.6: icmp_seq=0 ttl=255 time=3.246 ms
    56 bytes from 172.30.253.6: icmp_seq=1 ttl=255 time=48.882 ms
    56 bytes from 172.30.253.6: icmp_seq=2 ttl=255 time=0.522 ms
    56 bytes from 172.30.253.6: icmp_seq=3 ttl=255 time=0.547 ms
    56 bytes from 172.30.253.6: icmp_seq=4 ttl=255 time=0.561 ms
    
    --- Ping statistics for 172.30.253.6 ---
    5 packet(s) transmitted, 5 packet(s) received, 0.0% packet loss
    

    指标设计:完整链路质量画像

    核心指标集

    指标名 类型 说明
    status int 连通性(1=成功,0=失败)
    packet_loss float 丢包率 (%)
    latency_avg float 平均延迟(ms)
    latency_min float 最小延迟(ms)
    latency_max float 最大延迟(ms)
    latency_stddev float 抖动

    接入步骤

    安装 DataKit

    登录能连接路由器的 Linux 主机,安装文档注册观测云,安装 DataKit。

    开启 Pythond 采集器

    cd /usr/local/datakit/conf.d
    cp samples/pythond.conf.sample pythond.conf
    

    编辑 pythond.conf

    cd /usr/local/datakit/python.d
    mkdir route_demo
    

    在 route_demo 下新建 route_demo.py 文件,内容如下:

    from datakit_framework import DataKitFramework
    import paramiko
    import re
    import time
    import json
    
    class H3CPingMonitor(DataKitFramework):
        name = 'H3CPingMonitor'
        interval = 60  # 每60秒执行一次
        
        def __init__(self, **kwargs):
            super().__init__(ip='127.0.0.1', port=9529)
            # H3C路由器配置
            self.router_ip = '172.30.250.201'
            self.router_port = 22
            self.username = 'test1'      # 修改为你的H3C用户名
            self.password = 'xxxx'   # 修改为你的H3C密码
            # 目标ping地址列表
            self.target_ips = ['172.30.253.6']
            # 源地址(有源ping的源接口IP,根据实际情况修改)
            self.source_ip = '172.30.253.1'  # 例如: '10.0.0.1',如果为None则使用默认源
            
        def ssh_connect(self):
            """建立SSH连接到H3C路由器"""
            try:
                client = paramiko.SSHClient()
                client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
                client.connect(
                    hostname=self.router_ip,
                    port=self.router_port,
                    username=self.username,
                    password=self.password,
                    timeout=30,
                    allow_agent=False,
                    look_for_keys=False
                )
                return client
            except Exception as e:
                print(f"SSH连接失败: {str(e)}")
                return None
        
        def parse_ping_output(self, output):
            """
            解析H3C ping输出,提取延迟指标
            返回: dict 包含 success, packet_loss, latency_min, latency_avg, latency_max, latency_stddev
            """
            result = {
                'success': False,
                'packet_loss': 100,
                'latency_min': None,
                'latency_avg': None,
                'latency_max': None,
                'latency_stddev': None
            }
            
            # 检查是否成功(0%丢包)
            if re.search(r'0\.0%\s+packet\s+loss|0%\s+packet\s+loss', output):
                result['success'] = True
                result['packet_loss'] = 0
            else:
                # 提取丢包率
                loss_match = re.search(r'(\d+\.?\d*)%\s+packet\s+loss', output)
                if loss_match:
                    result['packet_loss'] = float(loss_match.group(1))
                    result['success'] = result['packet_loss'] < 100
            
            # 提取延迟统计: round-trip min/avg/max/std-dev = 0.522/10.752/48.882/19.094 ms
            stats_pattern = r'round-trip\s+min/avg/max/std-dev\s*=\s*([\d.]+)/([\d.]+)/([\d.]+)/([\d.]+)\s*ms'
            stats_match = re.search(stats_pattern, output)
            
            if stats_match:
                result['latency_min'] = float(stats_match.group(1))
                result['latency_avg'] = float(stats_match.group(2))
                result['latency_max'] = float(stats_match.group(3))
                result['latency_stddev'] = float(stats_match.group(4))
            else:
                # 备用方案:从每行time=提取计算
                times = re.findall(r'time=([\d.]+)\s*ms', output)
                if times:
                    times = [float(t) for t in times]
                    result['latency_min'] = min(times)
                    result['latency_avg'] = sum(times) / len(times)
                    result['latency_max'] = max(times)
            
            return result
        
        def execute_ping(self, client, target_ip):
            """
            在H3C路由器上执行有源ping命令
            H3C命令格式: ping -a <source-ip> <destination-ip>
            """
            try:
                # 打开交互式shell
                shell = client.invoke_shell()
                shell.settimeout(30)
                
                # 等待初始提示符
                time.sleep(1)
                shell.recv(65535).decode('utf-8', errors='ignore')
                
                # 构建有源ping命令
                if self.source_ip:
                    ping_cmd = f"ping -a {self.source_ip} {target_ip}\n"
                else:
                    ping_cmd = f"ping {target_ip}\n"
                
                print(f"执行命令: {ping_cmd.strip()}")
                shell.send(ping_cmd)
                
                # 等待命令执行完成
                time.sleep(2)
                
                # 接收输出
                output = ""
                start_time = time.time()
                while True:
                    if shell.recv_ready():
                        chunk = shell.recv(65535).decode('utf-8', errors='ignore')
                        output += chunk
                        # 检测到统计信息或超时退出
                        if "Ping statistics" in output and "round-trip" in output:
                            time.sleep(0.3)
                            if shell.recv_ready():
                                output += shell.recv(65535).decode('utf-8', errors='ignore')
                            break
                    elif time.time() - start_time > 10:
                        break
                    else:
                        time.sleep(0.2)
                
                shell.close()
                
                print(f"Ping {target_ip} 输出:\n{output}")
                
                # 解析ping结果
                parsed = self.parse_ping_output(output)
                
                return {
                    'target': target_ip,
                    'status': 1 if parsed['success'] else 0,
                    'packet_loss': parsed['packet_loss'],
                    'latency_min': parsed['latency_min'],
                    'latency_avg': parsed['latency_avg'],
                    'latency_max': parsed['latency_max'],
                    'latency_stddev': parsed['latency_stddev'],
                    'output': output[:500]
                }
                
            except Exception as e:
                print(f"执行ping命令失败 {target_ip}: {str(e)}")
                return {
                    'target': target_ip,
                    'status': 0,
                    'packet_loss': 100,
                    'latency_min': None,
                    'latency_avg': None,
                    'latency_max': None,
                    'latency_stddev': None,
                    'output': str(e)
                }
        
        def run(self):
            print(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] 开始H3C路由器Ping监控...")
            
            # 建立SSH连接
            client = self.ssh_connect()
            if not client:
                # 连接失败,上报所有目标为失败状态
                data = []
                for ip in self.target_ips:
                    fields = {
                        "status": 0,
                        "packet_loss": 100,
                        "error": "ssh_connection_failed"
                    }
                    data.append({
                        "measurement": "h3c_ping_monitor",
                        "tags": {
                            "router_ip": self.router_ip,
                            "target_ip": ip,
                            "source_ip": self.source_ip or "default"
                        },
                        "fields": fields
                    })
                
                in_data = {
                    'M': data,
                    'input': "h3c_ping_monitor"
                }
                return self.report(in_data)
            
            try:
                # 执行所有ping检测
                ping_results = []
                for target_ip in self.target_ips:
                    result = self.execute_ping(client, target_ip)
                    ping_results.append(result)
                    time.sleep(1)
                
                # 构建上报数据
                data = []
                for result in ping_results:
                    fields = {
                        "status": result['status'],
                        "packet_loss": result['packet_loss']
                    }
                    
                    # 添加延迟指标(仅当ping成功且有数据时)
                    if result['status'] == 1:
                        if result['latency_avg'] is not None:
                            fields["latency_avg"] = result['latency_avg']
                        if result['latency_min'] is not None:
                            fields["latency_min"] = result['latency_min']
                        if result['latency_max'] is not None:
                            fields["latency_max"] = result['latency_max']
                        if result['latency_stddev'] is not None:
                            fields["latency_stddev"] = result['latency_stddev']
                    
                    data.append({
                        "measurement": "h3c_ping_monitor",
                        "tags": {
                            "router_ip": self.router_ip,
                            "target_ip": result['target'],
                            "source_ip": self.source_ip or "default"
                        },
                        "fields": fields
                    })
                
                in_data = {
                    'M': data,
                    'input': "h3c_ping_monitor"
                }
                
                print(f"上报数据: {json.dumps(in_data, indent=2)}")
                
                # 上报到DataKit
                return self.report(in_data)
                
            finally:
                client.close()
                print("SSH连接已关闭")
    
    
    # 如果直接运行此脚本进行测试
    if __name__ == '__main__':
        monitor = H3CPingMonitor()
        monitor.run()
    

    重启 DataKit

    datakit service -R
    

    监控视图

    使用采集到的指标,自定义仪表板,效果如下。

    总结

    本方案通过 DataKit PythonD + SSH,实现了对 H3C 路由器的有源链路质量监控,具备以下优势:

    • 支持真实业务路径检测(指定源 IP)
    • 指标体系完整(延迟 / 丢包 / 抖动)
    • 易扩展(多目标、多源、并发)
    • 可直接接入观测云告警体系

    联系我们

    加入社区

    微信扫码
    加入官方交流群

    立即体验

    在线开通,按量计费,真正的云服务!

    立即开始

    选择观测云版本

    代码托管平台