可视化框架实战:Grafana与Kibana仪表板设计¶
学习目标¶
完成本教程后,你将能够:
- 理解数据可视化的核心概念和最佳实践
- 掌握Grafana的安装、配置和使用
- 掌握Kibana的安装、配置和使用
- 设计专业的数据可视化仪表板
- 配置各种类型的图表和面板
- 实现实时数据监控和刷新
- 配置告警规则和通知
- 在物联网场景中应用可视化技术
前置要求¶
在开始本教程之前,你需要:
知识要求: - 了解时序数据库基础(建议先学习时序数据库入门) - 熟悉基本的数据库查询 - 了解HTTP和REST API基础 - 了解基本的JSON格式
技能要求: - 能够使用命令行操作 - 会编写简单的数据库查询 - 了解Docker基础(建议先学习Docker容器技术)
硬件要求: - 64位操作系统(Windows 10/11、macOS、Linux) - 至少4GB内存(推荐8GB) - 至少15GB可用磁盘空间
概述¶
数据可视化是将复杂的数据转化为直观图表的过程,帮助我们快速理解数据趋势、发现异常和做出决策。
为什么学习可视化框架?
- 实时监控:监控系统状态、设备运行情况
- 数据分析:发现数据趋势和模式
- 故障诊断:快速定位问题根源
- 业务决策:基于数据做出明智决策
- 团队协作:共享数据洞察
Grafana vs Kibana:
| 特性 | Grafana | Kibana |
|---|---|---|
| 主要用途 | 时序数据监控 | 日志分析和搜索 |
| 数据源 | 多种(InfluxDB、Prometheus等) | Elasticsearch |
| 学习曲线 | 较平缓 | 中等 |
| 图表类型 | 丰富的时序图表 | 丰富的分析图表 |
| 告警功能 | 强大 | 基础 |
| 适用场景 | 系统监控、IoT | 日志分析、APM |
准备工作¶
系统要求¶
通用要求: - 64位操作系统 - 至少4GB内存 - 至少15GB可用磁盘空间 - 稳定的网络连接
环境搭建¶
我们将使用Docker Compose搭建完整的可视化环境,包括: - Grafana(可视化平台) - Kibana(可视化平台) - InfluxDB(时序数据库) - Elasticsearch(搜索引擎) - Logstash(日志处理)
创建工作目录:
创建 docker-compose.yml:
version: '3.8'
services:
# Grafana
grafana:
image: grafana/grafana:latest
container_name: grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_USER=admin
- GF_SECURITY_ADMIN_PASSWORD=admin
- GF_INSTALL_PLUGINS=grafana-clock-panel,grafana-simple-json-datasource
volumes:
- grafana-data:/var/lib/grafana
networks:
- viz-network
# InfluxDB (Grafana数据源)
influxdb:
image: influxdb:2.7
container_name: influxdb
ports:
- "8086:8086"
environment:
- DOCKER_INFLUXDB_INIT_MODE=setup
- DOCKER_INFLUXDB_INIT_USERNAME=admin
- DOCKER_INFLUXDB_INIT_PASSWORD=adminpassword123
- DOCKER_INFLUXDB_INIT_ORG=myorg
- DOCKER_INFLUXDB_INIT_BUCKET=sensors
- DOCKER_INFLUXDB_INIT_RETENTION=30d
volumes:
- influxdb-data:/var/lib/influxdb2
networks:
- viz-network
# Elasticsearch (Kibana数据源)
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
container_name: elasticsearch
ports:
- "9200:9200"
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- elasticsearch-data:/usr/share/elasticsearch/data
networks:
- viz-network
# Kibana
kibana:
image: docker.elastic.co/kibana/kibana:8.11.0
container_name: kibana
ports:
- "5601:5601"
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
depends_on:
- elasticsearch
networks:
- viz-network
# Logstash (可选,用于日志处理)
logstash:
image: docker.elastic.co/logstash/logstash:8.11.0
container_name: logstash
ports:
- "5000:5000"
- "9600:9600"
volumes:
- ./logstash/pipeline:/usr/share/logstash/pipeline
environment:
- "LS_JAVA_OPTS=-Xms256m -Xmx256m"
depends_on:
- elasticsearch
networks:
- viz-network
volumes:
grafana-data:
influxdb-data:
elasticsearch-data:
networks:
viz-network:
driver: bridge
启动所有服务:
等待所有服务启动(约1-2分钟),然后验证:
# 验证Grafana
curl http://localhost:3000
# 验证InfluxDB
curl http://localhost:8086/health
# 验证Elasticsearch
curl http://localhost:9200
# 验证Kibana
curl http://localhost:5601/api/status
步骤1:Grafana入门¶
1.1 访问Grafana¶
打开浏览器访问:http://localhost:3000
首次登录:
- 用户名:admin
- 密码:admin
- 系统会提示修改密码(可以跳过)
1.2 Grafana界面概览¶
Grafana主界面包含以下部分:
顶部导航栏
├── Home - 主页
├── Dashboards - 仪表板列表
├── Explore - 数据探索
├── Alerting - 告警管理
└── Configuration - 配置
左侧菜单
├── Search - 搜索仪表板
├── Dashboards - 仪表板管理
├── Explore - 数据探索
├── Alerting - 告警规则
└── Configuration - 数据源、插件等
1.3 添加数据源¶
添加InfluxDB数据源¶
- 点击左侧菜单 Configuration → Data sources
- 点击 Add data source
- 选择 InfluxDB
- 配置连接信息:
Name: InfluxDB-Sensors
Query Language: Flux
URL: http://influxdb:8086
Organization: myorg
Token: [从InfluxDB获取]
Default Bucket: sensors
获取InfluxDB Token:
# 进入InfluxDB容器
docker exec -it influxdb bash
# 创建Token
influx auth create \
--org myorg \
--all-access \
--description "Grafana Token"
# 复制显示的Token
- 点击 Save & test,确认连接成功
1.4 准备测试数据¶
创建 generate_sensor_data.py 脚本生成模拟数据:
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from datetime import datetime
import random
import time
# InfluxDB配置
url = "http://localhost:8086"
token = "your-token-here" # 替换为实际Token
org = "myorg"
bucket = "sensors"
# 创建客户端
client = InfluxDBClient(url=url, token=token, org=org)
write_api = client.write_api(write_options=SYNCHRONOUS)
print("开始生成传感器数据...")
try:
while True:
# 模拟3个设备的数据
for device_id in ["device_001", "device_002", "device_003"]:
# 生成随机数据
temperature = 20 + random.uniform(-5, 10)
humidity = 60 + random.uniform(-15, 15)
pressure = 1013 + random.uniform(-10, 10)
cpu_usage = random.uniform(10, 90)
memory_usage = random.uniform(20, 80)
# 创建数据点
point = Point("sensor_metrics") \
.tag("device_id", device_id) \
.tag("location", f"room_{device_id[-1]}") \
.tag("type", "environmental") \
.field("temperature", round(temperature, 2)) \
.field("humidity", round(humidity, 2)) \
.field("pressure", round(pressure, 2)) \
.field("cpu_usage", round(cpu_usage, 2)) \
.field("memory_usage", round(memory_usage, 2)) \
.time(datetime.utcnow())
# 写入数据
write_api.write(bucket=bucket, record=point)
print(f"[{datetime.now().strftime('%H:%M:%S')}] 写入3条数据")
time.sleep(5) # 每5秒写入一次
except KeyboardInterrupt:
print("\n停止数据生成")
client.close()
安装依赖并运行:
1.5 创建第一个仪表板¶
创建新仪表板¶
- 点击左侧菜单 + → Dashboard
- 点击 Add new panel
配置时间序列图表¶
Panel 1: 温度趋势图
- 在Query编辑器中选择数据源:InfluxDB-Sensors
- 输入Flux查询:
from(bucket: "sensors")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "sensor_metrics")
|> filter(fn: (r) => r["_field"] == "temperature")
|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
- 在右侧Panel选项中配置:
- Title: 温度趋势
- Description: 实时温度监控
- Unit: Celsius (°C)
- Min: 0
-
Max: 50
-
在**Legend**中设置:
- Mode: List
-
Values: Last, Min, Max, Mean
-
点击右上角 Apply 保存
添加更多面板¶
Panel 2: 湿度仪表盘
- 点击右上角 Add panel
- 选择可视化类型:Gauge(仪表盘)
- 输入查询:
from(bucket: "sensors")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "sensor_metrics")
|> filter(fn: (r) => r["_field"] == "humidity")
|> last()
- 配置面板:
- Title: 当前湿度
- Unit: Percent (0-100)
- Min: 0
- Max: 100
- Thresholds:
- 0-40: 红色(过低)
- 40-70: 绿色(正常)
- 70-100: 黄色(过高)
Panel 3: CPU使用率
- 添加新面板,选择 Stat(统计值)
- 输入查询:
from(bucket: "sensors")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "sensor_metrics")
|> filter(fn: (r) => r["_field"] == "cpu_usage")
|> last()
- 配置:
- Title: CPU使用率
- Unit: Percent (0-100)
- Color mode: Value
- Graph mode: Area
- Thresholds:
- 0-60: 绿色
- 60-80: 黄色
- 80-100: 红色
Panel 4: 设备状态表格
- 添加新面板,选择 Table(表格)
- 输入查询:
from(bucket: "sensors")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "sensor_metrics")
|> filter(fn: (r) => r["_field"] == "temperature" or r["_field"] == "humidity")
|> last()
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
- 配置:
- Title: 设备状态总览
- 添加列转换以美化显示
1.6 仪表板布局和样式¶
调整面板布局¶
- 拖动面板调整位置
- 拖动面板边缘调整大小
- 建议布局:
- 顶部:关键指标(Stat面板)
- 中部:趋势图(Time series)
- 底部:详细表格(Table)
配置仪表板设置¶
- 点击右上角 Dashboard settings(齿轮图标)
- 配置:
- Name: IoT传感器监控
- Description: 实时监控物联网设备状态
- Tags: iot, sensors, monitoring
-
Time options:
- Timezone: Browser Time
- Auto refresh: 5s
- Refresh intervals: 5s, 10s, 30s, 1m, 5m
-
保存仪表板:点击右上角 Save dashboard
1.7 变量和模板¶
使用变量可以创建动态、可交互的仪表板。
创建设备选择变量¶
- 进入 Dashboard settings → Variables
- 点击 Add variable
- 配置:
Name: device
Label: 设备
Type: Query
Data source: InfluxDB-Sensors
Query:
import "influxdata/influxdb/schema"
schema.tagValues(
bucket: "sensors",
tag: "device_id"
)
Multi-value: true
Include All option: true
- 保存变量
在查询中使用变量¶
修改面板查询,添加设备过滤:
from(bucket: "sensors")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "sensor_metrics")
|> filter(fn: (r) => r["device_id"] =~ /^${device:regex}$/)
|> filter(fn: (r) => r["_field"] == "temperature")
|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
现在仪表板顶部会显示设备选择下拉框,可以动态切换查看不同设备的数据。
1.8 告警配置¶
Grafana支持强大的告警功能。
创建告警规则¶
- 编辑温度面板
- 切换到 Alert 标签
- 点击 Create alert rule from this panel
- 配置告警条件:
Rule name: 高温告警
Evaluate every: 1m
For: 5m
Conditions:
- WHEN last() OF query(A, 1m, now)
- IS ABOVE 35
No Data: Alerting
Error: Alerting
- 配置通知:
- 在 Alerting → Contact points 中添加通知渠道
- 支持Email、Slack、Webhook等
告警通知示例(Webhook)¶
创建一个简单的告警接收服务 alert_receiver.py:
from flask import Flask, request, jsonify
import json
app = Flask(__name__)
@app.route('/webhook', methods=['POST'])
def webhook():
data = request.json
print("收到告警:")
print(json.dumps(data, indent=2))
# 这里可以添加自定义处理逻辑
# 例如:发送邮件、短信、推送通知等
return jsonify({"status": "ok"})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
在Grafana中配置Webhook: - URL: http://host.docker.internal:5000/webhook - HTTP Method: POST
步骤2:Kibana入门¶
2.1 访问Kibana¶
打开浏览器访问:http://localhost:5601
首次访问需要等待Kibana初始化(约1-2分钟)。
2.2 Kibana界面概览¶
Kibana主界面包含:
顶部导航
├── Discover - 数据探索
├── Visualize - 可视化
├── Dashboard - 仪表板
├── Canvas - 画布
└── Maps - 地图
左侧菜单
├── Analytics - 分析工具
├── Observability - 可观测性
├── Security - 安全
└── Management - 管理
2.3 准备日志数据¶
创建 generate_logs.py 生成模拟日志数据:
from elasticsearch import Elasticsearch
from datetime import datetime
import random
import time
import json
# 连接Elasticsearch
es = Elasticsearch(['http://localhost:9200'])
# 日志级别和消息模板
log_levels = ['INFO', 'WARNING', 'ERROR', 'DEBUG']
services = ['api-gateway', 'auth-service', 'data-processor', 'notification-service']
messages = {
'INFO': [
'Request processed successfully',
'User logged in',
'Data synchronized',
'Cache updated'
],
'WARNING': [
'High memory usage detected',
'Slow query detected',
'Connection pool nearly exhausted',
'Rate limit approaching'
],
'ERROR': [
'Database connection failed',
'Authentication failed',
'Timeout occurred',
'Invalid request format'
],
'DEBUG': [
'Processing request',
'Validating input',
'Executing query',
'Sending response'
]
}
print("开始生成日志数据...")
try:
counter = 0
while True:
# 生成随机日志
level = random.choice(log_levels)
service = random.choice(services)
message = random.choice(messages[level])
# 创建日志文档
log_doc = {
'@timestamp': datetime.utcnow().isoformat(),
'level': level,
'service': service,
'message': message,
'user_id': f'user_{random.randint(1, 100)}',
'request_id': f'req_{counter}',
'duration_ms': random.randint(10, 5000),
'status_code': random.choice([200, 201, 400, 401, 404, 500]),
'ip_address': f'192.168.1.{random.randint(1, 255)}',
'tags': random.sample(['production', 'api', 'backend', 'critical'], k=random.randint(1, 3))
}
# 索引到Elasticsearch
es.index(index='application-logs', document=log_doc)
counter += 1
if counter % 10 == 0:
print(f"[{datetime.now().strftime('%H:%M:%S')}] 已生成 {counter} 条日志")
time.sleep(0.5) # 每0.5秒生成一条日志
except KeyboardInterrupt:
print(f"\n停止日志生成,共生成 {counter} 条日志")
安装依赖并运行:
2.4 创建索引模式¶
- 在Kibana中,点击左侧菜单 Management → Stack Management
- 在 Kibana 部分,点击 Index Patterns
- 点击 Create index pattern
- 输入索引模式:
application-logs* - 选择时间字段:
@timestamp - 点击 Create index pattern
2.5 数据探索(Discover)¶
- 点击左侧菜单 Discover
- 选择索引模式:
application-logs* - 设置时间范围:Last 15 minutes
探索功能:
-
搜索栏:使用KQL(Kibana Query Language)搜索
-
字段过滤:点击左侧字段添加到表格
- 时间直方图:查看日志分布
- 保存搜索:保存常用查询
2.6 创建可视化¶
可视化1:日志级别分布(饼图)¶
- 点击左侧菜单 Visualize Library
- 点击 Create visualization
- 选择 Pie(饼图)
- 选择索引:
application-logs* - 配置:
- Metrics: Count
- Buckets:
- Aggregation: Terms
- Field: level.keyword
- Size: 10
- 点击 Update 预览
- 保存可视化:命名为"日志级别分布"
可视化2:服务请求趋势(折线图)¶
- 创建新可视化,选择 Line
- 配置:
- Metrics: Count
- Buckets:
- X-axis: Date Histogram, Field: @timestamp, Interval: Auto
- Split series: Terms, Field: service.keyword
- 保存为"服务请求趋势"
可视化3:响应时间分布(直方图)¶
- 创建新可视化,选择 Vertical Bar
- 配置:
- Metrics: Average, Field: duration_ms
- Buckets:
- X-axis: Histogram, Field: duration_ms, Interval: 500
- 保存为"响应时间分布"
可视化4:错误率监控(指标)¶
- 创建新可视化,选择 Metric
- 配置:
- Metrics: Count
- Bucket:
- Split group: Filters
- Filter 1: level: "ERROR"
- Filter 2: level: "WARNING"
- 保存为"错误和警告计数"
可视化5:热力图(服务状态)¶
- 创建新可视化,选择 Heat Map
- 配置:
- Metrics: Count
- Buckets:
- X-axis: Terms, Field: service.keyword
- Y-axis: Terms, Field: level.keyword
- 保存为"服务状态热力图"
2.7 创建仪表板¶
- 点击左侧菜单 Dashboard
- 点击 Create dashboard
- 点击 Add from library
- 选择之前创建的所有可视��
- 调整面板布局:
- 顶部:关键指标(错误计数)
- 中部:趋势图和饼图
-
底部:热力图和直方图
-
配置仪表板:
- 点击右上角 Settings
- 设置自动刷新:30秒
-
设置时间范围:Last 1 hour
-
保存仪表板:命名为"应用日志监控"
2.8 高级查询和过滤¶
使用KQL查询¶
# 查找错误日志
level: "ERROR"
# 查找特定服务的慢请求
service: "api-gateway" AND duration_ms > 1000
# 组合条件
level: ("ERROR" OR "WARNING") AND service: "auth-service"
# 范围查询
status_code >= 400 AND status_code < 500
# 存在性查询
user_id: *
# 通配符查询
message: *failed*
创建过滤器¶
- 在Discover或Dashboard中
- 点击字段值旁的 + 添加过滤器
- 或点击顶部 Add filter 手动创建
保存和共享查询¶
- 在Discover中配置好查询和过滤器
- 点击 Save 保存搜索
- 在Dashboard中可以添加保存的搜索
步骤3:高级可视化技巧¶
3.1 Grafana高级功能¶
使用Transformations¶
Transformations可以在显示前转换查询数据。
示例:计算温度变化率
- 编辑温度面板
- 切换到 Transform 标签
- 添加转换:Add field from calculation
- Mode: Reduce row
- Calculation: Difference
-
Alias: temperature_change
-
添加转换:Filter by value
- Match: All conditions
- Condition: temperature_change > 5
使用Annotations¶
Annotations可以在图表上标记重要事件。
- 在Dashboard settings中,点击 Annotations
- 点击 Add annotation query
- 配置:
- Name: 系统事件
- Data source: InfluxDB-Sensors
- Query: 查询事件数据
- Text field: event_description
- Tags field: event_type
创建复合面板¶
使用 Row 组织相关面板:
- 点击 Add panel → Add a new row
- 命名行:例如"温度监控"
- 将相关面板拖入该行
- 可以折叠/展开行
3.2 Kibana高级功能¶
使用Lens快速创建可视化¶
Lens是Kibana的拖放式可视化工具。
- 点击 Visualize Library → Create visualization
- 选择 Lens
- 拖放字段到配置区:
- 拖动
@timestamp到 X轴 - 拖动
Count到 Y轴 -
拖动
level.keyword到 Break down by -
切换图表类型:Bar、Line、Area等
- 保存可视化
使用Canvas创建自定义布局¶
Canvas提供像素级的布局控制。
- 点击左侧菜单 Canvas
- 创建新工作簿
- 添加元素:
- Markdown: 添加文本说明
- Image: 添加图片
- Metric: 显示指标
-
Time series: 显示趋势
-
自定义样式和布局
- 保存工作簿
使用Vega自定义可视化¶
Vega允许创建完全自定义的可视化。
示例:创建自定义散点图
{
"$schema": "https://vega.github.io/schema/vega-lite/v5.json",
"data": {
"url": {
"index": "application-logs*",
"body": {
"size": 1000,
"query": {
"range": {
"@timestamp": {
"gte": "now-1h"
}
}
}
}
},
"format": {"property": "hits.hits"}
},
"mark": "point",
"encoding": {
"x": {"field": "_source.duration_ms", "type": "quantitative"},
"y": {"field": "_source.status_code", "type": "quantitative"},
"color": {"field": "_source.level", "type": "nominal"}
}
}
3.3 性能优化¶
Grafana性能优化¶
查询优化:
// ❌ 不好 - 查询所有数据
from(bucket: "sensors")
|> range(start: -30d)
|> filter(fn: (r) => r["_measurement"] == "sensor_metrics")
// ✅ 好 - 限制时间范围和使用聚合
from(bucket: "sensors")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "sensor_metrics")
|> aggregateWindow(every: v.windowPeriod, fn: mean)
面板优化: - 限制查询返回的数据点数量 - 使用合适的刷新间隔 - 避免在单个仪表板中放置过多面板(建议<20个)
Kibana性能优化¶
索引优化:
# 设置索引生命周期管理
PUT _ilm/policy/logs_policy
{
"policy": {
"phases": {
"hot": {
"actions": {
"rollover": {
"max_size": "50GB",
"max_age": "7d"
}
}
},
"delete": {
"min_age": "30d",
"actions": {
"delete": {}
}
}
}
}
}
查询优化: - 使用时间范围过滤 - 避免使用通配符开头的查询 - 使用字段过滤减少返回数据
步骤4:实战项目 - 物联网监控系统¶
4.1 项目概述¶
构建一个完整的物联网设备监控系统,包括: - 实时设备状态监控 - 历史数据分析 - 异常告警 - 日志分析
4.2 系统架构¶
物联网设备
↓ (MQTT)
MQTT Broker (Mosquitto)
↓
数据处理服务
├→ InfluxDB (时序数据)
└→ Elasticsearch (日志数据)
↓
可视化层
├→ Grafana (实时监控)
└→ Kibana (日志分析)
4.3 完整的docker-compose配置¶
更新 docker-compose.yml:
version: '3.8'
services:
# MQTT Broker
mosquitto:
image: eclipse-mosquitto:2
container_name: mosquitto
ports:
- "1883:1883"
- "9001:9001"
volumes:
- ./mosquitto/config:/mosquitto/config
- ./mosquitto/data:/mosquitto/data
- ./mosquitto/log:/mosquitto/log
networks:
- iot-network
# InfluxDB
influxdb:
image: influxdb:2.7
container_name: influxdb
ports:
- "8086:8086"
environment:
- DOCKER_INFLUXDB_INIT_MODE=setup
- DOCKER_INFLUXDB_INIT_USERNAME=admin
- DOCKER_INFLUXDB_INIT_PASSWORD=adminpassword123
- DOCKER_INFLUXDB_INIT_ORG=myorg
- DOCKER_INFLUXDB_INIT_BUCKET=sensors
- DOCKER_INFLUXDB_INIT_RETENTION=30d
volumes:
- influxdb-data:/var/lib/influxdb2
networks:
- iot-network
# Elasticsearch
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
container_name: elasticsearch
ports:
- "9200:9200"
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- elasticsearch-data:/usr/share/elasticsearch/data
networks:
- iot-network
# Grafana
grafana:
image: grafana/grafana:latest
container_name: grafana
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_USER=admin
- GF_SECURITY_ADMIN_PASSWORD=admin
- GF_INSTALL_PLUGINS=grafana-clock-panel
volumes:
- grafana-data:/var/lib/grafana
- ./grafana/provisioning:/etc/grafana/provisioning
depends_on:
- influxdb
networks:
- iot-network
# Kibana
kibana:
image: docker.elastic.co/kibana/kibana:8.11.0
container_name: kibana
ports:
- "5601:5601"
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
depends_on:
- elasticsearch
networks:
- iot-network
volumes:
influxdb-data:
elasticsearch-data:
grafana-data:
networks:
iot-network:
driver: bridge
创建Mosquitto配置文件 mosquitto/config/mosquitto.conf:
listener 1883
allow_anonymous true
persistence true
persistence_location /mosquitto/data/
log_dest file /mosquitto/log/mosquitto.log
4.4 数据采集服务¶
创建 iot_data_collector.py:
import paho.mqtt.client as mqtt
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from elasticsearch import Elasticsearch
from datetime import datetime
import json
import threading
# 配置
MQTT_BROKER = "localhost"
MQTT_PORT = 1883
MQTT_TOPIC = "iot/devices/#"
INFLUX_URL = "http://localhost:8086"
INFLUX_TOKEN = "your-token-here"
INFLUX_ORG = "myorg"
INFLUX_BUCKET = "sensors"
ES_HOST = "http://localhost:9200"
# 初始化客户端
influx_client = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
write_api = influx_client.write_api(write_options=SYNCHRONOUS)
es = Elasticsearch([ES_HOST])
def on_connect(client, userdata, flags, rc):
print(f"连接到MQTT Broker,返回码: {rc}")
client.subscribe(MQTT_TOPIC)
def on_message(client, userdata, msg):
try:
# 解析消息
topic = msg.topic
payload = json.loads(msg.payload.decode())
device_id = payload.get('device_id')
timestamp = datetime.fromisoformat(payload.get('timestamp', datetime.utcnow().isoformat()))
# 写入InfluxDB(时序数据)
if 'metrics' in payload:
metrics = payload['metrics']
point = Point("device_metrics") \
.tag("device_id", device_id) \
.tag("device_type", payload.get('device_type', 'unknown')) \
.tag("location", payload.get('location', 'unknown'))
for key, value in metrics.items():
if isinstance(value, (int, float)):
point = point.field(key, float(value))
point = point.time(timestamp)
write_api.write(bucket=INFLUX_BUCKET, record=point)
print(f"[InfluxDB] 写入设备 {device_id} 的指标数据")
# 写入Elasticsearch(日志数据)
if 'event' in payload or 'log_level' in payload:
log_doc = {
'@timestamp': timestamp.isoformat(),
'device_id': device_id,
'device_type': payload.get('device_type', 'unknown'),
'location': payload.get('location', 'unknown'),
'event': payload.get('event', ''),
'log_level': payload.get('log_level', 'INFO'),
'message': payload.get('message', ''),
'metadata': payload.get('metadata', {})
}
es.index(index='iot-device-logs', document=log_doc)
print(f"[Elasticsearch] 写入设备 {device_id} 的日志数据")
except Exception as e:
print(f"处理消息错误: {e}")
# 创建MQTT客户端
mqtt_client = mqtt.Client()
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
# 连接并启动
mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)
print("IoT数据采集服务已启动...")
mqtt_client.loop_forever()
4.5 设备模拟器¶
创建 iot_device_simulator.py:
import paho.mqtt.client as mqtt
import json
import time
import random
from datetime import datetime
MQTT_BROKER = "localhost"
MQTT_PORT = 1883
# 模拟设备配置
devices = [
{
"device_id": "temp_sensor_001",
"device_type": "temperature_sensor",
"location": "warehouse_a"
},
{
"device_id": "humidity_sensor_001",
"device_type": "humidity_sensor",
"location": "warehouse_a"
},
{
"device_id": "pressure_sensor_001",
"device_type": "pressure_sensor",
"location": "production_line_1"
}
]
client = mqtt.Client()
client.connect(MQTT_BROKER, MQTT_PORT, 60)
print("开始模拟IoT设备...")
try:
while True:
for device in devices:
# 生成指标数据
metrics_data = {
"device_id": device["device_id"],
"device_type": device["device_type"],
"location": device["location"],
"timestamp": datetime.utcnow().isoformat(),
"metrics": {}
}
# 根据设备类型生成不同的指标
if device["device_type"] == "temperature_sensor":
metrics_data["metrics"] = {
"temperature": round(20 + random.uniform(-5, 15), 2),
"battery_level": round(random.uniform(20, 100), 2)
}
elif device["device_type"] == "humidity_sensor":
metrics_data["metrics"] = {
"humidity": round(50 + random.uniform(-20, 30), 2),
"battery_level": round(random.uniform(20, 100), 2)
}
elif device["device_type"] == "pressure_sensor":
metrics_data["metrics"] = {
"pressure": round(1013 + random.uniform(-20, 20), 2),
"battery_level": round(random.uniform(20, 100), 2)
}
# 发送指标数据
topic = f"iot/devices/{device['device_id']}/metrics"
client.publish(topic, json.dumps(metrics_data))
print(f"[{datetime.now().strftime('%H:%M:%S')}] 发送 {device['device_id']} 指标数据")
# 随机生成事件日志
if random.random() < 0.1: # 10%概率生成事件
event_data = {
"device_id": device["device_id"],
"device_type": device["device_type"],
"location": device["location"],
"timestamp": datetime.utcnow().isoformat(),
"event": random.choice(["startup", "shutdown", "error", "warning"]),
"log_level": random.choice(["INFO", "WARNING", "ERROR"]),
"message": random.choice([
"Device started successfully",
"Low battery warning",
"Sensor calibration required",
"Connection timeout"
])
}
topic = f"iot/devices/{device['device_id']}/events"
client.publish(topic, json.dumps(event_data))
print(f"[{datetime.now().strftime('%H:%M:%S')}] 发送 {device['device_id']} 事件数据")
time.sleep(5) # 每5秒发送一次
except KeyboardInterrupt:
print("\n停止设备模拟")
client.disconnect()
4.6 运行完整系统¶
# 1. 启动所有服务
docker-compose up -d
# 2. 等待服务启动
sleep 30
# 3. 在不同终端运行
# 终端1: 启动设备模拟器
python iot_device_simulator.py
# 终端2: 启动数据采集服务
python iot_data_collector.py
4.7 创建综合监控仪表板¶
Grafana仪表板¶
创建包含以下面板的仪表板:
- 设备状态总览(Stat面板)
- 在线设备数
- 离线设备数
-
告警数量
-
实时温度监控(Time series)
- 所有温度传感器的实时数据
-
使用变量切换设备
-
电池电量监控(Gauge)
- 显示所有设备的电池电量
-
低电量告警(<20%)
-
设备分布地图(Geomap,如果有地理位置数据)
-
数据质量监控(Table)
- 最后更新时间
- 数据完整性
Kibana仪表板¶
创建包含以下可视化的仪表板:
- 事件时间线(Line chart)
-
按时间显示事件数量
-
事件类型分布(Pie chart)
-
startup, shutdown, error, warning
-
设备日志表格(Data table)
- 最近的设备事件
-
可搜索和过滤
-
错误率趋势(Area chart)
-
按设备类型分组的错误率
-
设备活动热力图(Heat map)
- 设备 × 事件类型
故障排除¶
问题1:Grafana无法连接InfluxDB¶
现象:
可能原因: 1. InfluxDB未启动 2. Token配置错误 3. 网络连接问题
解决方法:
# 1. 检查InfluxDB状态
docker ps | grep influxdb
docker logs influxdb
# 2. 验证InfluxDB连接
curl http://localhost:8086/health
# 3. 重新生成Token
docker exec -it influxdb bash
influx auth list
influx auth create --org myorg --all-access
# 4. 在Grafana中更新Token
# Configuration → Data sources → InfluxDB → 更新Token
问题2:Kibana无法访问¶
现象: 浏览器显示"Kibana server is not ready yet"
可能原因: 1. Elasticsearch未就绪 2. Kibana初始化中 3. 内存不足
解决方法:
# 1. 检查Elasticsearch状态
curl http://localhost:9200/_cluster/health
# 2. 查看Kibana日志
docker logs kibana
# 3. 增加内存限制(在docker-compose.yml中)
environment:
- "NODE_OPTIONS=--max-old-space-size=2048"
# 4. 重启服务
docker-compose restart kibana
问题3:数据不显示¶
现象: 仪表板显示"No data"
可能原因: 1. 时间范围不正确 2. 查询语法错误 3. 数据未写入
解决方法:
# 1. 验证数据是否写入InfluxDB
docker exec -it influxdb influx query 'from(bucket:"sensors") |> range(start:-1h) |> limit(n:10)'
# 2. 验证Elasticsearch数据
curl http://localhost:9200/iot-device-logs/_search?size=10
# 3. 检查时间范围
# 在Grafana/Kibana中调整时间范围为"Last 15 minutes"
# 4. 检查查询语法
# 在Query Inspector中查看实际执行的查询
问题4:告警不触发¶
现象: 配置了告警规则但不触发
可能原因: 1. 告警条件未满足 2. 评估间隔太长 3. 通知渠道配置错误
解决方法:
# 1. 检查告警规则状态
# Grafana → Alerting → Alert rules
# 2. 测试通知渠道
# Alerting → Contact points → Test
# 3. 查看告警历史
# Alerting → Alert rules → 点击规则 → State history
# 4. 调整告警条件
# 降低阈值或缩短评估间隔进行测试
问题5:性能问题¶
现象: 仪表板加载慢,查询超时
可能原因: 1. 查询数据量过大 2. 时间范围太长 3. 面板过多
解决方法:
# 1. 优化查询
# 使用aggregateWindow减少数据点
from(bucket: "sensors")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> aggregateWindow(every: v.windowPeriod, fn: mean)
# 2. 限制时间范围
# 设置默认时间范围为1小时或更短
# 3. 减少面板数量
# 将相关面板分组到不同的仪表板
# 4. 使用缓存
# 在Grafana中启用查询缓存
问题6:Docker容器内存不足¶
现象:
解决方法:
# 在docker-compose.yml中限制内存使用
services:
elasticsearch:
mem_limit: 1g
environment:
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
kibana:
mem_limit: 512m
最佳实践¶
1. 仪表板设计原则¶
信息层次¶
顶部:关键指标(KPI)
├── 使用Stat或Gauge显示
├── 突出显示异常值
└── 使用颜色编码状态
中部:趋势和分析
├── 使用Time series显示趋势
├── 使用Bar chart显示对比
└── 使用Pie chart显示分布
底部:详细数据
├── 使用Table显示明细
├── 使用Logs面板显示日志
└── 提供下钻功能
颜色使用¶
✅ 好的做法:
- 使用一致的颜色方案
- 绿色表示正常
- 黄色表示警告
- 红色表示错误
- 使用色盲友好的配色
❌ 避免的做法:
- 过度使用颜色
- 使用相似的颜色表示不同状态
- 忽略可访问性
命名规范¶
2. 查询优化¶
Grafana查询优化¶
// ❌ 不好 - 查询所有原始数据
from(bucket: "sensors")
|> range(start: -7d)
|> filter(fn: (r) => r["_measurement"] == "sensor_metrics")
// ✅ 好 - 使用时间变量和聚合
from(bucket: "sensors")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "sensor_metrics")
|> filter(fn: (r) => r["device_id"] =~ /^${device:regex}$/)
|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
|> yield(name: "mean")
Kibana查询优化¶
3. 告警策略¶
告警阈值设置¶
原则:
1. 基于历史数据设置合理阈值
2. 使用百分位数而非绝对值
3. 设置多级告警(警告、严重、紧急)
4. 避免告警疲劳
示例:
- 警告:CPU > 70% 持续5分钟
- 严重:CPU > 85% 持续5分钟
- 紧急:CPU > 95% 持续2分钟
告警通知¶
4. 性能优化¶
数据采样¶
# 对于长时间范围,使用降采样数据
if time_range > 7 days:
interval = "1h"
elif time_range > 1 day:
interval = "5m"
else:
interval = "30s"
缓存策略¶
Grafana缓存配置:
1. 启用查询缓存
2. 设置合理的TTL
3. 使用CDN缓存静态资源
Elasticsearch缓存:
1. 启用查询缓存
2. 启用字段数据缓存
3. 使用索引生命周期管理
5. 安全最佳实践¶
访问控制¶
Grafana:
1. 使用组织和团队管理权限
2. 为不同用户设置不同角色
3. 启用匿名访问时限制权限
Kibana:
1. 启用Elasticsearch安全功能
2. 使用空间隔离不同团队
3. 配置字段级安全
数据保护¶
6. 维护和监控¶
定期维护¶
# 1. 清理旧数据
# InfluxDB
influx delete --bucket sensors --start 1970-01-01T00:00:00Z --stop 2026-01-01T00:00:00Z
# Elasticsearch
curl -X POST "localhost:9200/iot-device-logs/_delete_by_query" -H 'Content-Type: application/json' -d'
{
"query": {
"range": {
"@timestamp": {
"lt": "now-30d"
}
}
}
}'
# 2. 优化索引
curl -X POST "localhost:9200/_forcemerge?max_num_segments=1"
# 3. 备份配置
# 导出Grafana仪表板
# 导出Kibana对象
监控可视化系统本身¶
总结¶
通过本教程,你已经掌握了数据可视化的核心技能:
核心要点回顾¶
Grafana: - ✅ 安装和配置Grafana - ✅ 连接多种数据源(InfluxDB、Prometheus等) - ✅ 创建各种类型的面板(Time series、Gauge、Stat、Table) - ✅ 使用变量创建动态仪表板 - ✅ 配置告警规则和通知 - ✅ 使用Transformations转换数据 - ✅ 优化查询性能
Kibana: - ✅ 安装和配置Kibana - ✅ 创建索引模式 - ✅ 使用Discover探索数据 - ✅ 创建多种可视化(Pie、Line、Bar、Heat Map等) - ✅ 构建综合仪表板 - ✅ 使用KQL查询语言 - ✅ 使用Lens快速创建可视化
实践应用: - ✅ 构建物联网监控系统 - ✅ 实时数据采集和处理 - ✅ 日志分析和可视化 - ✅ 告警和通知配置 - ✅ 性能优化技巧
实践成果¶
完成本教程后,你应该能够: 1. 选择合适的可视化工具 2. 设计专业的监控仪表板 3. 配置实时数据刷新 4. 实现智能告警 5. 优化可视化性能 6. 构建完整的监控系统
技能提升路径¶
初级 → 中级 → 高级
├── 基础概念 ├── 高级查询 ├── 集群部署
├── 简单图表 ├── 复杂仪表板 ├── 高可用架构
├── 数据连接 ├── 告警配置 ├── 性能调优
└── 基本操作 ├── 数据转换 ├── 自定义插件
└── 最佳实践 └── 企业级方案
进阶挑战¶
尝试以下挑战来巩固学习:
挑战1:构建多租户监控系统(120分钟)¶
任务: 1. 为不同客户创建独立的仪表板 2. 使用Grafana组织和团队功能 3. 实现数据隔离 4. 配置不同的告警策略 5. 创建客户自定义视图
要求: - 至少3个租户 - 每个租户有独立的数据和仪表板 - 实现权限隔离 - 提供租户管理界面
挑战2:实时异常检测系统(90分钟)¶
任务: 1. 实现基于统计的异常检测 2. 使用移动平均和标准差 3. 在Grafana中可视化异常点 4. 配置异常告警 5. 生成异常报告
要求: - 检测温度、湿度等指标的异常 - 实时标注异常点 - 自动发送告警 - 记录异常历史
挑战3:日志分析和故障诊断(100分钟)¶
任务: 1. 收集应用日志到Elasticsearch 2. 创建日志分析仪表板 3. 实现错误追踪 4. 构建故障诊断流程 5. 生成故障报告
要求: - 支持多种日志格式 - 实现日志关联分析 - 提供快速搜索功能 - 可视化错误趋势
挑战4:性能监控和优化(80分钟)¶
任务: 1. 监控系统性能指标 2. 识别性能瓶颈 3. 优化查询性能 4. 实现性能基线 5. 生成性能报告
目标: - 查询响应时间 < 100ms - 仪表板加载时间 < 2s - 支持100+并发用户 - 数据刷新延迟 < 5s
常见问题¶
Q1: Grafana和Kibana应该选择哪个?¶
A: 根据使用场景选择:
选择Grafana: - 主要监控时序数据(指标、传感器数据) - 需要实时监控和告警 - 数据源多样(InfluxDB、Prometheus、MySQL等) - 团队熟悉时序数据库
选择Kibana: - 主要分析日志数据 - 需要全文搜索功能 - 已使用Elasticsearch - 需要APM(应用性能监控)
两者都用: - 大型系统可以同时使用 - Grafana用于实时监控 - Kibana用于日志分析和故障排查
Q2: 如何设计有效的仪表板?¶
A: 遵循以下原则:
- 明确目标
- 确定仪表板的受众
- 明确要回答的问题
-
选择合适的指标
-
信息层次
- 最重要的信息放在顶部
- 使用大小和颜色突出重点
-
避免信息过载
-
交互性
- 使用变量实现动态过滤
- 提供下钻功能
-
支持时间范围选择
-
性能
- 限制面板数量(<20个)
- 优化查询
- 使用合适的刷新间隔
Q3: 如何处理大量数据?¶
A: 多层次优化:
-
数据层
-
查询层
-
显示层
Q4: 如何避免告警疲劳?¶
A: 智能告警策略:
- 合理设置阈值
- 基于历史数据
- 使用动态阈值
-
考虑业务影响
-
告警分级
-
告警聚合
- 合并相关告警
- 设置静默期
-
使用告警依赖
-
通知策略
- 不同级别不同通知方式
- 工作时间和非工作时间区分
- 升级机制
Q5: 如何备份和恢复仪表板?¶
A: 多种备份方式:
Grafana备份:
# 1. 使用API导出仪表板
curl -H "Authorization: Bearer YOUR_API_KEY" \
http://localhost:3000/api/dashboards/uid/DASHBOARD_UID \
> dashboard_backup.json
# 2. 备份Grafana数据库
docker exec grafana sqlite3 /var/lib/grafana/grafana.db ".backup '/tmp/grafana_backup.db'"
docker cp grafana:/tmp/grafana_backup.db ./grafana_backup.db
# 3. 使用Provisioning
# 将仪表板配置文件放在 provisioning/dashboards/ 目录
Kibana备份:
# 1. 导出Saved Objects
# Kibana UI → Stack Management → Saved Objects → Export
# 2. 使用API导出
curl -X POST "localhost:5601/api/saved_objects/_export" \
-H 'kbn-xsrf: true' \
-H 'Content-Type: application/json' \
-d '{"type": "dashboard"}' \
> kibana_dashboards.ndjson
# 3. 备份Elasticsearch索引
curl -X PUT "localhost:9200/_snapshot/my_backup/snapshot_1?wait_for_completion=true"
Q6: 如何实现跨团队协作?¶
A: 协作最佳实践:
-
组织结构
-
命名规范
-
文档和培训
- 维护仪表板文档
- 定期培训新成员
- 分享最佳实践
Q7: 如何监控可视化系统本身?¶
A: 元监控策略:
监控指标:
1. Grafana
- 查询响应时间
- 活跃用户数
- 告警触发频率
- 数据源健康状态
2. Kibana
- 查询性能
- 索引大小
- 集群健康状态
- 内存使用率
3. 数据库
- InfluxDB写入速率
- Elasticsearch查询延迟
- 磁盘使用率
- 连接数
实现方式: - 使用Prometheus监控Grafana - 使用Metricbeat监控Elasticsearch - 创建专门的监控仪表板
延伸阅读¶
推荐资源¶
官方文档: - Grafana官方文档 - Kibana官方文档 - InfluxDB文档 - Elasticsearch文档
在线教程: - Grafana Tutorials - Elastic Learning - Grafana Labs YouTube频道 - Elastic YouTube频道
书籍推荐: - 《Grafana实战》 - 《Elasticsearch权威指南》 - 《数据可视化实践》 - 《监控系统设计与实现》
社区资源: - Grafana Community Forums - Elastic Discuss - Reddit r/grafana - Stack Overflow
相关技术¶
监控工具: - Prometheus - 时序数据库和监控系统 - Zabbix - 企业级监控解决方案 - Datadog - SaaS监控平台 - New Relic - APM和监控
日志工具: - Logstash - 日志收集和处理 - Fluentd - 统一日志层 - Graylog - 日志管理平台 - Splunk - 企业级日志分析
可视化库: - D3.js - JavaScript可视化库 - ECharts - 百度开源图表库 - Plotly - 交互式图表库 - Chart.js - 简单的图表库
时序数据库: - Prometheus - 监控专用 - TimescaleDB - PostgreSQL扩展 - OpenTSDB - HBase之上 - VictoriaMetrics - 高性能TSDB
进阶主题¶
高级可视化: - 3D可视化 - 地理信息可视化 - 实时流数据可视化 - 机器学习可视化
企业级部署: - 高可用架构 - 负载均衡 - 数据备份和恢复 - 安全加固
性能优化: - 查询优化 - 索引优化 - 缓存策略 - 分布式架构
自动化运维: - Infrastructure as Code - 自动化部署 - 配置管理 - 监控即代码
参考资料¶
- Grafana官方文档 - https://grafana.com/docs/
- Kibana官方文档 - https://www.elastic.co/guide/en/kibana/
- InfluxDB文档 - https://docs.influxdata.com/
- Elasticsearch文档 - https://www.elastic.co/guide/en/elasticsearch/
- Flux查询语言 - https://docs.influxdata.com/flux/
- KQL查询语言 - https://www.elastic.co/guide/en/kibana/current/kuery-query.html
- 数据可视化最佳实践 - https://www.tableau.com/learn/articles/data-visualization
- 监控系统设计 - https://sre.google/books/
反馈与支持: - 如果你在学习过程中遇到问题,欢迎在评论区留言 - 发现文档错误或有改进建议,请提交Issue - 想要分享你的可视化实践经验,欢迎投稿
下一步学习: - 时序数据库入门 - 深入学习数据存储 - 数据分析工具使用 - 学习Pandas和NumPy - 大数据处理技术 - 学习Hadoop和Spark - 实时数据流处理 - 学习Kafka Streams和Flink
实践项目推荐: 1. 智能家居监控系统 2. 工业设备监控平台 3. 应用性能监控(APM) 4. 网络流量分析系统 5. 业务指标监控平台
版权声明: 本文采用 CC BY-NC-SA 4.0 许可协议,欢迎分享和改编,但请注明出处。
致谢: 感谢Grafana Labs和Elastic公司提供优秀的开源工具,感谢社区贡献者的无私分享。
更新日志: - 2026-03-08: 初始版本发布 - 包含Grafana和Kibana完整教程 - 添加物联网实战项目 - 提供故障排除指南
作者信息: - 作者:嵌入式知识平台 - 联系方式:[待添加] - 最后更新:2026-03-08