时序数据库入门:InfluxDB与TimescaleDB实践应用¶
学习目标¶
完成本教程后,你将能够:
- 理解时序数据库的基本概念和应用场景
- 掌握InfluxDB的安装、配置和基本操作
- 掌握TimescaleDB的安装、配置和基本操作
- 理解时序数据的建模方法
- 熟练进行时序数据的写入和查询
- 掌握时序数据库的性能优化技巧
- 在物联网场景中应用时序数据库
前置要求¶
在开始本教程之前,你需要:
知识要求: - 了解基本的数据库概念 - 熟悉SQL查询语言基础 - 了解基本的Linux命令 - 了解数据库设计基础(建议先学习数据库设计基础)
技能要求: - 能够使用命令行操作 - 会编写简单的SQL查询 - 了解基本的数据建模
硬件要求: - 64位操作系统(Windows 10/11、macOS、Linux) - 至少4GB内存(推荐8GB) - 至少10GB可用磁盘空间
概述¶
时序数据库(Time Series Database,TSDB)是专门用于存储和查询时间序列数据的数据库系统。时间序列数据是按时间顺序排列的数据点序列,每个数据点都包含时间戳和对应的值。
为什么学习时序数据库?
- 物联网应用:传感器数据、设备监控数据
- 系统监控:服务器性能指标、应用日志
- 金融交易:股票价格、交易记录
- 工业控制:生产线数据、设备状态
- 智能家居:环境数据、能耗统计
时序数据库的优势: - 高效的时间范围查询 - 优化的数据压缩 - 自动数据保留策略 - 强大的聚合和降采样功能 - 专为时序数据优化的存储引擎
什么是时序数据库¶
时序数据的特点¶
时序数据具有以下典型特征:
- 时间戳:每条数据都有明确的时间标记
- 顺序性:数据按时间顺序产生
- 高频率:数据产生频率高,写入量大
- 不可变:历史数据通常不会修改
- 时间相关:查询通常基于时间范围
时序数据示例:
传感器温度数据:
时间戳 | 设备ID | 位置 | 温度值
2026-03-08 10:00:00 | dev001 | room1 | 23.5
2026-03-08 10:00:10 | dev001 | room1 | 23.6
2026-03-08 10:00:20 | dev001 | room1 | 23.4
2026-03-08 10:00:30 | dev001 | room1 | 23.7
时序数据库 vs 传统数据库¶
| 特性 | 时序数据库 | 传统关系型数据库 |
|---|---|---|
| 写入性能 | 极高(百万级/秒) | 中等 |
| 时间查询 | 优化的时间索引 | 需要额外索引 |
| 数据压缩 | 高效压缩(10-100倍) | 一般压缩 |
| 数据保留 | 自动过期策略 | 手动删除 |
| 聚合查询 | 内置时间聚合 | 需要复杂SQL |
| 存储成本 | 低 | 高 |
常见时序数据库对比¶
| 数据库 | 类型 | 特点 | 适用场景 |
|---|---|---|---|
| InfluxDB | 专用TSDB | 易用、功能完整 | 监控、物联网 |
| TimescaleDB | PostgreSQL扩展 | SQL兼容、功能强大 | 需要SQL的场景 |
| Prometheus | 监控专用 | 内置告警 | 系统监控 |
| OpenTSDB | HBase之上 | 高可扩展 | 大规模数据 |
准备工作¶
系统要求¶
通用要求: - 64位操作系统 - 至少4GB内存 - 至少10GB可用磁盘空间 - 稳定的网络连接
使用Docker安装(推荐)¶
我们将使用Docker来快速搭建实验环境,这样可以避免复杂的安装配置。
前置条件: - 已安装Docker(参考Docker容器技术入门)
创建工作目录:
步骤1:InfluxDB入门¶
1.1 安装InfluxDB¶
使用Docker安装¶
# 拉取InfluxDB 2.x镜像
docker pull influxdb:2.7
# 创建数据目录
mkdir -p ~/tsdb-tutorial/influxdb-data
# 运行InfluxDB容器
docker run -d \
--name influxdb \
-p 8086:8086 \
-v ~/tsdb-tutorial/influxdb-data:/var/lib/influxdb2 \
influxdb:2.7
# 查看容器状态
docker ps | grep influxdb
# 查看日志
docker logs influxdb
初始化配置¶
# 访问Web界面进行初始化
# 打开浏览器访问: http://localhost:8086
# 或使用CLI初始化
docker exec influxdb influx setup \
--username admin \
--password adminpassword123 \
--org myorg \
--bucket mybucket \
--retention 30d \
--force
配置说明:
- username: 管理员用户名
- password: 管理员密码(至少8位)
- org: 组织名称
- bucket: 数据桶名称(类似数据库)
- retention: 数据保留时间(30天)
1.2 InfluxDB核心概念¶
数据模型¶
InfluxDB使用以下概念组织数据:
组织 (Organization)
└── 数据桶 (Bucket)
└── 测量 (Measurement)
├── 标签 (Tags) - 索引的元数据
├── 字段 (Fields) - 实际的数据值
└── 时间戳 (Timestamp)
概念说明:
| 概念 | 说明 | 示例 |
|---|---|---|
| Organization | 组织,顶层容器 | myorg |
| Bucket | 数据桶,存储数据 | sensor_data |
| Measurement | 测量,类似表名 | temperature |
| Tags | 标签,索引的元数据 | device_id=dev001, location=room1 |
| Fields | 字段,实际数据值 | value=23.5 |
| Timestamp | 时间戳 | 2026-03-08T10:00:00Z |
数据点示例:
measurement: temperature
tags: device_id=dev001, location=room1
fields: value=23.5, humidity=65.2
timestamp: 2026-03-08T10:00:00Z
完整表示:
temperature,device_id=dev001,location=room1 value=23.5,humidity=65.2 1709892000000000000
1.3 使用InfluxDB CLI¶
进入CLI¶
# 进入容器的交互式shell
docker exec -it influxdb bash
# 启动influx CLI
influx
# 或直接执行命令
docker exec influxdb influx --help
基本命令¶
# 查看组织
influx org list
# 查看数据桶
influx bucket list
# 创建新的数据桶
influx bucket create \
--name iot_data \
--org myorg \
--retention 90d
# 查看令牌(用于API访问)
influx auth list
# 创建新令牌
influx auth create \
--org myorg \
--read-bucket iot_data \
--write-bucket iot_data
1.4 写入数据¶
使用Line Protocol写入¶
Line Protocol是InfluxDB的数据格式:
格式:
measurement,tag1=value1,tag2=value2 field1=value1,field2=value2 timestamp
示例:
temperature,device=dev001,location=room1 value=23.5 1709892000000000000
使用CLI写入:
# 写入单条数据
docker exec influxdb influx write \
--bucket mybucket \
--org myorg \
--precision s \
'temperature,device=dev001,location=room1 value=23.5'
# 写入多条数据
docker exec influxdb influx write \
--bucket mybucket \
--org myorg \
--precision s \
'temperature,device=dev001,location=room1 value=23.5
temperature,device=dev001,location=room1 value=23.6
temperature,device=dev002,location=room2 value=24.1'
使用Python写入¶
创建 influx_write.py:
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from datetime import datetime
import random
import time
# 连接配置
url = "http://localhost:8086"
token = "your-token-here" # 从influx auth list获取
org = "myorg"
bucket = "mybucket"
# 创建客户端
client = InfluxDBClient(url=url, token=token, org=org)
write_api = client.write_api(write_options=SYNCHRONOUS)
# 写入单条数据
point = Point("temperature") \
.tag("device", "dev001") \
.tag("location", "room1") \
.field("value", 23.5) \
.field("humidity", 65.2) \
.time(datetime.utcnow())
write_api.write(bucket=bucket, record=point)
print("写入单条数据成功")
# 批量写入模拟数据
print("开始写入模拟数据...")
for i in range(100):
# 模拟3个设备的数据
for device_id in ["dev001", "dev002", "dev003"]:
temp = 20 + random.uniform(-2, 5)
humidity = 60 + random.uniform(-10, 10)
point = Point("temperature") \
.tag("device", device_id) \
.tag("location", f"room{device_id[-1]}") \
.field("value", round(temp, 2)) \
.field("humidity", round(humidity, 2))
write_api.write(bucket=bucket, record=point)
time.sleep(0.1) # 每100ms写入一次
if (i + 1) % 10 == 0:
print(f"已写入 {(i + 1) * 3} 条数据")
print("数据写入完成!")
client.close()
安装依赖并运行:
1.5 查询数据¶
使用Flux查询语言¶
Flux是InfluxDB 2.x的查询语言,功能强大且灵活。
基本查询:
// 查询最近1小时的数据
from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> filter(fn: (r) => r._field == "value")
// 查询特定设备的数据
from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> filter(fn: (r) => r.device == "dev001")
// 聚合查询 - 计算平均值
from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> filter(fn: (r) => r._field == "value")
|> aggregateWindow(every: 5m, fn: mean)
// 多字段查询
from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> filter(fn: (r) => r._field == "value" or r._field == "humidity")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
使用Python查询¶
创建 influx_query.py:
from influxdb_client import InfluxDBClient
from datetime import datetime, timedelta
# 连接配置
url = "http://localhost:8086"
token = "your-token-here"
org = "myorg"
bucket = "mybucket"
# 创建客户端
client = InfluxDBClient(url=url, token=token, org=org)
query_api = client.query_api()
# 查询最近1小时的数据
query = f'''
from(bucket: "{bucket}")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> filter(fn: (r) => r._field == "value")
'''
result = query_api.query(query=query)
# 处理查询结果
print("查询结果:")
for table in result:
for record in table.records:
print(f"时间: {record.get_time()}, "
f"设备: {record.values.get('device')}, "
f"位置: {record.values.get('location')}, "
f"温度: {record.get_value()}")
# 聚合查询 - 每5分钟的平均值
query_agg = f'''
from(bucket: "{bucket}")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> filter(fn: (r) => r._field == "value")
|> aggregateWindow(every: 5m, fn: mean)
'''
result_agg = query_api.query(query=query_agg)
print("\n5分钟平均值:")
for table in result_agg:
for record in table.records:
print(f"时间: {record.get_time()}, "
f"设备: {record.values.get('device')}, "
f"平均温度: {record.get_value():.2f}")
client.close()
1.6 数据保留策略¶
InfluxDB可以自动删除过期数据:
# 创建带保留策略的bucket
influx bucket create \
--name short_term_data \
--org myorg \
--retention 7d
# 更新现有bucket的保留策略
influx bucket update \
--name mybucket \
--retention 30d
# 查看bucket信息
influx bucket list
步骤2:TimescaleDB入门¶
2.1 安装TimescaleDB¶
TimescaleDB是PostgreSQL的扩展,完全兼容SQL。
使用Docker安装¶
# 拉取TimescaleDB镜像
docker pull timescale/timescaledb:latest-pg15
# 创建数据目录
mkdir -p ~/tsdb-tutorial/timescaledb-data
# 运行TimescaleDB容器
docker run -d \
--name timescaledb \
-p 5432:5432 \
-e POSTGRES_PASSWORD=password \
-v ~/tsdb-tutorial/timescaledb-data:/var/lib/postgresql/data \
timescale/timescaledb:latest-pg15
# 查看容器状态
docker ps | grep timescaledb
# 查看日志
docker logs timescaledb
连接数据库¶
# 使用psql连接
docker exec -it timescaledb psql -U postgres
# 或从主机连接(需要安装postgresql-client)
psql -h localhost -U postgres -d postgres
2.2 TimescaleDB核心概念¶
Hypertable(超表)¶
Hypertable是TimescaleDB的核心概念,它是一个抽象层,底层自动分区为多个chunks。
普通表 (Regular Table)
└── 转换为 Hypertable
└── 自动分区为 Chunks(按时间)
├── Chunk 1: 2026-03-01 to 2026-03-07
├── Chunk 2: 2026-03-08 to 2026-03-14
└── Chunk 3: 2026-03-15 to 2026-03-21
优势: - 自动分区管理 - 高效的时间范围查询 - 透明的数据压缩 - 保留策略自动化
2.3 创建Hypertable¶
-- 连接到数据库
\c postgres
-- 创建TimescaleDB扩展
CREATE EXTENSION IF NOT EXISTS timescaledb;
-- 创建普通表
CREATE TABLE sensor_data (
time TIMESTAMPTZ NOT NULL,
device_id TEXT NOT NULL,
location TEXT NOT NULL,
temperature DOUBLE PRECISION,
humidity DOUBLE PRECISION
);
-- 转换为Hypertable
SELECT create_hypertable('sensor_data', 'time');
-- 创建索引以提高查询性能
CREATE INDEX ON sensor_data (device_id, time DESC);
CREATE INDEX ON sensor_data (location, time DESC);
-- 查看hypertable信息
\d+ sensor_data
2.4 写入数据¶
使用SQL写入¶
-- 写入单条数据
INSERT INTO sensor_data (time, device_id, location, temperature, humidity)
VALUES (NOW(), 'dev001', 'room1', 23.5, 65.2);
-- 批量写入
INSERT INTO sensor_data (time, device_id, location, temperature, humidity)
VALUES
(NOW(), 'dev001', 'room1', 23.5, 65.2),
(NOW(), 'dev002', 'room2', 24.1, 63.8),
(NOW(), 'dev003', 'room3', 22.8, 67.5);
-- 查看数据
SELECT * FROM sensor_data ORDER BY time DESC LIMIT 10;
使用Python写入¶
创建 timescale_write.py:
import psycopg2
from datetime import datetime, timedelta
import random
import time
# 数据库连接配置
conn = psycopg2.connect(
host="localhost",
port=5432,
database="postgres",
user="postgres",
password="password"
)
cur = conn.cursor()
# 创建表和hypertable(如果不存在)
cur.execute("""
CREATE TABLE IF NOT EXISTS sensor_data (
time TIMESTAMPTZ NOT NULL,
device_id TEXT NOT NULL,
location TEXT NOT NULL,
temperature DOUBLE PRECISION,
humidity DOUBLE PRECISION
);
""")
# 检查是否已经是hypertable
cur.execute("""
SELECT * FROM timescaledb_information.hypertables
WHERE hypertable_name = 'sensor_data';
""")
if cur.fetchone() is None:
cur.execute("SELECT create_hypertable('sensor_data', 'time');")
print("创建hypertable成功")
conn.commit()
# 批量写入模拟数据
print("开始写入模拟数据...")
batch_size = 100
total_records = 1000
for batch in range(total_records // batch_size):
records = []
for i in range(batch_size):
# 模拟3个设备的数据
for device_id in ["dev001", "dev002", "dev003"]:
timestamp = datetime.now() - timedelta(seconds=i)
temp = 20 + random.uniform(-2, 5)
humidity = 60 + random.uniform(-10, 10)
location = f"room{device_id[-1]}"
records.append((timestamp, device_id, location, temp, humidity))
# 批量插入
cur.executemany(
"INSERT INTO sensor_data (time, device_id, location, temperature, humidity) "
"VALUES (%s, %s, %s, %s, %s)",
records
)
conn.commit()
print(f"已写入 {(batch + 1) * batch_size * 3} 条数据")
print("数据写入完成!")
# 查看统计信息
cur.execute("SELECT COUNT(*) FROM sensor_data;")
count = cur.fetchone()[0]
print(f"总记录数: {count}")
cur.close()
conn.close()
安装依赖并运行:
2.5 查询数据¶
基本查询¶
-- 查询最近1小时的数据
SELECT * FROM sensor_data
WHERE time > NOW() - INTERVAL '1 hour'
ORDER BY time DESC;
-- 查询特定设备的数据
SELECT * FROM sensor_data
WHERE device_id = 'dev001'
AND time > NOW() - INTERVAL '1 hour'
ORDER BY time DESC;
-- 查询特定时间范围
SELECT * FROM sensor_data
WHERE time BETWEEN '2026-03-08 10:00:00' AND '2026-03-08 11:00:00'
ORDER BY time DESC;
聚合查询¶
-- 计算每个设备的平均温度
SELECT
device_id,
AVG(temperature) as avg_temp,
MIN(temperature) as min_temp,
MAX(temperature) as max_temp,
COUNT(*) as count
FROM sensor_data
WHERE time > NOW() - INTERVAL '1 hour'
GROUP BY device_id;
-- 使用time_bucket进行时间分组(每5分钟)
SELECT
time_bucket('5 minutes', time) AS bucket,
device_id,
AVG(temperature) as avg_temp,
AVG(humidity) as avg_humidity
FROM sensor_data
WHERE time > NOW() - INTERVAL '1 hour'
GROUP BY bucket, device_id
ORDER BY bucket DESC, device_id;
-- 计算移动平均
SELECT
time,
device_id,
temperature,
AVG(temperature) OVER (
PARTITION BY device_id
ORDER BY time
ROWS BETWEEN 4 PRECEDING AND CURRENT ROW
) as moving_avg
FROM sensor_data
WHERE time > NOW() - INTERVAL '1 hour'
ORDER BY time DESC;
使用Python查询¶
创建 timescale_query.py:
import psycopg2
import pandas as pd
from datetime import datetime, timedelta
# 连接数据库
conn = psycopg2.connect(
host="localhost",
port=5432,
database="postgres",
user="postgres",
password="password"
)
# 查询最近1小时的数据
query = """
SELECT * FROM sensor_data
WHERE time > NOW() - INTERVAL '1 hour'
ORDER BY time DESC
LIMIT 100;
"""
df = pd.read_sql_query(query, conn)
print("最近的数据:")
print(df.head())
# 聚合查询 - 每5分钟的平均值
query_agg = """
SELECT
time_bucket('5 minutes', time) AS bucket,
device_id,
AVG(temperature) as avg_temp,
AVG(humidity) as avg_humidity,
COUNT(*) as count
FROM sensor_data
WHERE time > NOW() - INTERVAL '1 hour'
GROUP BY bucket, device_id
ORDER BY bucket DESC, device_id;
"""
df_agg = pd.read_sql_query(query_agg, conn)
print("\n5分钟聚合数据:")
print(df_agg)
# 统计每个设备的数据
query_stats = """
SELECT
device_id,
location,
COUNT(*) as count,
AVG(temperature) as avg_temp,
MIN(temperature) as min_temp,
MAX(temperature) as max_temp,
STDDEV(temperature) as stddev_temp
FROM sensor_data
WHERE time > NOW() - INTERVAL '24 hours'
GROUP BY device_id, location
ORDER BY device_id;
"""
df_stats = pd.read_sql_query(query_stats, conn)
print("\n设备统计信息:")
print(df_stats)
conn.close()
2.6 数据压缩¶
TimescaleDB支持自动压缩以节省存储空间:
-- 启用压缩
ALTER TABLE sensor_data SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'device_id',
timescaledb.compress_orderby = 'time DESC'
);
-- 添加压缩策略(自动压缩7天前的数据)
SELECT add_compression_policy('sensor_data', INTERVAL '7 days');
-- 手动压缩特定chunk
SELECT compress_chunk(i)
FROM show_chunks('sensor_data', older_than => INTERVAL '7 days') i;
-- 查看压缩状态
SELECT
chunk_name,
compression_status,
before_compression_total_bytes,
after_compression_total_bytes,
before_compression_total_bytes::float / after_compression_total_bytes::float as compression_ratio
FROM chunk_compression_stats('sensor_data');
2.7 数据保留策略¶
自动删除旧数据:
-- 添加保留策略(自动删除30天前的数据)
SELECT add_retention_policy('sensor_data', INTERVAL '30 days');
-- 查看保留策略
SELECT * FROM timescaledb_information.jobs
WHERE proc_name = 'policy_retention';
-- 删除保留策略
SELECT remove_retention_policy('sensor_data');
步骤3:性能优化¶
3.1 InfluxDB性能优化¶
批量写入¶
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS, ASYNCHRONOUS
import time
client = InfluxDBClient(url="http://localhost:8086", token="your-token", org="myorg")
# 使用批量写入API
write_api = client.write_api(write_options=ASYNCHRONOUS)
# 批量写入
points = []
for i in range(1000):
point = Point("temperature") \
.tag("device", f"dev{i%10:03d}") \
.field("value", 20 + i % 10)
points.append(point)
write_api.write(bucket="mybucket", record=points)
write_api.close()
client.close()
优化查询¶
// ❌ 不好的查询 - 扫描所有数据
from(bucket: "mybucket")
|> range(start: 0)
|> filter(fn: (r) => r._measurement == "temperature")
// ✅ 好的查询 - 限制时间范围
from(bucket: "mybucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> filter(fn: (r) => r.device == "dev001")
// ✅ 使用降采样减少数据量
from(bucket: "mybucket")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "temperature")
|> aggregateWindow(every: 5m, fn: mean)
配置优化¶
# 调整缓存大小(在docker-compose.yml中)
environment:
- INFLUXD_CACHE_MAX_MEMORY_SIZE=1073741824 # 1GB
- INFLUXD_CACHE_SNAPSHOT_MEMORY_SIZE=26214400 # 25MB
3.2 TimescaleDB性能优化¶
索引优化¶
-- 创建复合索引
CREATE INDEX ON sensor_data (device_id, time DESC);
CREATE INDEX ON sensor_data (location, time DESC);
-- 创建部分索引(只索引最近的数据)
CREATE INDEX ON sensor_data (device_id, time DESC)
WHERE time > NOW() - INTERVAL '7 days';
-- 查看索引使用情况
EXPLAIN ANALYZE
SELECT * FROM sensor_data
WHERE device_id = 'dev001'
AND time > NOW() - INTERVAL '1 hour';
批量插入优化¶
import psycopg2
from psycopg2.extras import execute_values
conn = psycopg2.connect(
host="localhost",
database="postgres",
user="postgres",
password="password"
)
cur = conn.cursor()
# 准备大量数据
records = [(datetime.now(), f'dev{i%10:03d}', f'room{i%5}', 20+i%10, 60+i%20)
for i in range(10000)]
# 使用execute_values批量插入(更快)
execute_values(
cur,
"INSERT INTO sensor_data (time, device_id, location, temperature, humidity) VALUES %s",
records,
page_size=1000
)
conn.commit()
cur.close()
conn.close()
分区优化¶
-- 调整chunk时间间隔(默认7天)
SELECT set_chunk_time_interval('sensor_data', INTERVAL '1 day');
-- 查看chunk信息
SELECT
chunk_name,
range_start,
range_end,
pg_size_pretty(total_bytes) as size
FROM timescaledb_information.chunks
WHERE hypertable_name = 'sensor_data'
ORDER BY range_start DESC;
3.3 通用优化建议¶
数据建模最佳实践¶
✅ 好的做法:
1. 使用标签/tag存储元数据(设备ID、位置等)
2. 使用字段/field存储测量值(温度、湿度等)
3. 合理设置数据保留期
4. 使用批量写入
5. 创建适当的索引
❌ 避免的做法:
1. 在字段中存储高基数数据(如UUID)
2. 过度使用标签(标签过多影响性能)
3. 频繁的单条写入
4. 查询时不限制时间范围
5. 存储不必要的历史数据
监控和调优¶
-- TimescaleDB: 查看数据库大小
SELECT
hypertable_name,
pg_size_pretty(hypertable_size(format('%I.%I', hypertable_schema, hypertable_name)::regclass)) as size
FROM timescaledb_information.hypertables;
-- 查看最慢的查询
SELECT
query,
calls,
total_time,
mean_time,
max_time
FROM pg_stat_statements
ORDER BY mean_time DESC
LIMIT 10;
步骤4:物联网应用实践¶
4.1 完整的物联网数据采集系统¶
让我们构建一个完整的物联网数据采集和分析系统。
项目结构¶
iot-tsdb-project/
├── docker-compose.yml
├── mqtt_to_influx.py
├── mqtt_to_timescale.py
├── data_generator.py
└── requirements.txt
docker-compose.yml¶
version: '3.8'
services:
# MQTT Broker
mosquitto:
image: eclipse-mosquitto:2
ports:
- "1883:1883"
volumes:
- ./mosquitto.conf:/mosquitto/config/mosquitto.conf
networks:
- iot-network
# InfluxDB
influxdb:
image: influxdb:2.7
ports:
- "8086:8086"
environment:
- DOCKER_INFLUXDB_INIT_MODE=setup
- DOCKER_INFLUXDB_INIT_USERNAME=admin
- DOCKER_INFLUXDB_INIT_PASSWORD=adminpassword123
- DOCKER_INFLUXDB_INIT_ORG=myorg
- DOCKER_INFLUXDB_INIT_BUCKET=iot_data
- DOCKER_INFLUXDB_INIT_RETENTION=30d
volumes:
- influxdb-data:/var/lib/influxdb2
networks:
- iot-network
# TimescaleDB
timescaledb:
image: timescale/timescaledb:latest-pg15
ports:
- "5432:5432"
environment:
- POSTGRES_PASSWORD=password
volumes:
- timescaledb-data:/var/lib/postgresql/data
networks:
- iot-network
# Grafana
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana-data:/var/lib/grafana
depends_on:
- influxdb
- timescaledb
networks:
- iot-network
volumes:
influxdb-data:
timescaledb-data:
grafana-data:
networks:
iot-network:
driver: bridge
mosquitto.conf¶
data_generator.py¶
模拟物联网设备发送数据:
import paho.mqtt.client as mqtt
import json
import time
import random
from datetime import datetime
# MQTT配置
MQTT_BROKER = "localhost"
MQTT_PORT = 1883
MQTT_TOPIC = "sensors/data"
# 创建MQTT客户端
client = mqtt.Client()
client.connect(MQTT_BROKER, MQTT_PORT, 60)
print("开始发送模拟传感器数据...")
try:
while True:
# 模拟3个设备的数据
for device_id in ["dev001", "dev002", "dev003"]:
# 生成模拟数据
data = {
"device_id": device_id,
"location": f"room{device_id[-1]}",
"timestamp": datetime.utcnow().isoformat(),
"temperature": round(20 + random.uniform(-2, 5), 2),
"humidity": round(60 + random.uniform(-10, 10), 2),
"pressure": round(1013 + random.uniform(-5, 5), 2)
}
# 发送到MQTT
client.publish(MQTT_TOPIC, json.dumps(data))
print(f"发送数据: {data}")
time.sleep(5) # 每5秒发送一次
except KeyboardInterrupt:
print("\n停止发送数据")
client.disconnect()
mqtt_to_influx.py¶
从MQTT接收数据并写入InfluxDB:
import paho.mqtt.client as mqtt
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import json
from datetime import datetime
# MQTT配置
MQTT_BROKER = "localhost"
MQTT_PORT = 1883
MQTT_TOPIC = "sensors/data"
# InfluxDB配置
INFLUX_URL = "http://localhost:8086"
INFLUX_TOKEN = "your-token-here"
INFLUX_ORG = "myorg"
INFLUX_BUCKET = "iot_data"
# 创建InfluxDB客户端
influx_client = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
write_api = influx_client.write_api(write_options=SYNCHRONOUS)
# MQTT回调函数
def on_connect(client, userdata, flags, rc):
print(f"连接到MQTT Broker,返回码: {rc}")
client.subscribe(MQTT_TOPIC)
def on_message(client, userdata, msg):
try:
# 解析JSON数据
data = json.loads(msg.payload.decode())
# 创建InfluxDB数据点
point = Point("sensor_data") \
.tag("device_id", data["device_id"]) \
.tag("location", data["location"]) \
.field("temperature", float(data["temperature"])) \
.field("humidity", float(data["humidity"])) \
.field("pressure", float(data["pressure"])) \
.time(datetime.fromisoformat(data["timestamp"]))
# 写入InfluxDB
write_api.write(bucket=INFLUX_BUCKET, record=point)
print(f"写入InfluxDB: {data['device_id']} - {data['temperature']}°C")
except Exception as e:
print(f"错误: {e}")
# 创建MQTT客户端
mqtt_client = mqtt.Client()
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
# 连接并开始循环
mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)
print("MQTT to InfluxDB 桥接已启动...")
mqtt_client.loop_forever()
mqtt_to_timescale.py¶
从MQTT接收数据并写入TimescaleDB:
import paho.mqtt.client as mqtt
import psycopg2
from psycopg2.extras import execute_values
import json
from datetime import datetime
# MQTT配置
MQTT_BROKER = "localhost"
MQTT_PORT = 1883
MQTT_TOPIC = "sensors/data"
# TimescaleDB配置
conn = psycopg2.connect(
host="localhost",
port=5432,
database="postgres",
user="postgres",
password="password"
)
# 创建表(如果不存在)
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS sensor_data (
time TIMESTAMPTZ NOT NULL,
device_id TEXT NOT NULL,
location TEXT NOT NULL,
temperature DOUBLE PRECISION,
humidity DOUBLE PRECISION,
pressure DOUBLE PRECISION
);
""")
# 创建hypertable
cur.execute("""
SELECT * FROM timescaledb_information.hypertables
WHERE hypertable_name = 'sensor_data';
""")
if cur.fetchone() is None:
cur.execute("SELECT create_hypertable('sensor_data', 'time');")
print("创建hypertable成功")
conn.commit()
# MQTT回调函数
def on_connect(client, userdata, flags, rc):
print(f"连接到MQTT Broker,返回码: {rc}")
client.subscribe(MQTT_TOPIC)
def on_message(client, userdata, msg):
try:
# 解析JSON数据
data = json.loads(msg.payload.decode())
# 插入TimescaleDB
cur.execute(
"""
INSERT INTO sensor_data (time, device_id, location, temperature, humidity, pressure)
VALUES (%s, %s, %s, %s, %s, %s)
""",
(
datetime.fromisoformat(data["timestamp"]),
data["device_id"],
data["location"],
float(data["temperature"]),
float(data["humidity"]),
float(data["pressure"])
)
)
conn.commit()
print(f"写入TimescaleDB: {data['device_id']} - {data['temperature']}°C")
except Exception as e:
print(f"错误: {e}")
conn.rollback()
# 创建MQTT客户端
mqtt_client = mqtt.Client()
mqtt_client.on_connect = on_connect
mqtt_client.on_message = on_message
# 连接并开始循环
mqtt_client.connect(MQTT_BROKER, MQTT_PORT, 60)
print("MQTT to TimescaleDB 桥接已启动...")
mqtt_client.loop_forever()
requirements.txt¶
4.2 运行完整系统¶
# 1. 启动所有服务
docker-compose up -d
# 2. 等待服务启动(约30秒)
docker-compose ps
# 3. 安装Python依赖
pip install -r requirements.txt
# 4. 在不同终端运行各个组件
# 终端1: 启动数据生成器
python data_generator.py
# 终端2: 启动InfluxDB桥接
python mqtt_to_influx.py
# 终端3: 启动TimescaleDB桥接
python mqtt_to_timescale.py
4.3 数据可视化¶
访问Grafana进行数据可视化:
- 打开浏览器访问 http://localhost:3000
- 登录(用户名: admin, 密码: admin)
- 添加数据源:
- InfluxDB: http://influxdb:8086
-
PostgreSQL (TimescaleDB): host=timescaledb, database=postgres
-
创建仪表板显示实时数据
故障排除¶
问题1:InfluxDB无法启动¶
现象:
可能原因: 1. 数据目录权限问题 2. 端口被占用 3. 数据文件损坏
解决方法:
# 1. 检查端口占用
netstat -ano | findstr :8086 # Windows
lsof -i :8086 # Linux/macOS
# 2. 修改数据目录权限
sudo chown -R 1000:1000 ~/tsdb-tutorial/influxdb-data
# 3. 重新创建容器
docker stop influxdb
docker rm influxdb
rm -rf ~/tsdb-tutorial/influxdb-data/*
# 重新运行docker run命令
问题2:TimescaleDB连接失败¶
现象:
可能原因: 1. 容器未启动 2. 端口映射错误 3. 密码错误
解决方法:
# 1. 检查容器状态
docker ps | grep timescaledb
# 2. 查看日志
docker logs timescaledb
# 3. 测试连接
docker exec -it timescaledb psql -U postgres
# 4. 检查端口
docker port timescaledb
问题3:数据写入慢¶
现象: 写入速度很慢,延迟高
解决方法:
InfluxDB:
# 使用批量写入
from influxdb_client.client.write_api import ASYNCHRONOUS
write_api = client.write_api(write_options=ASYNCHRONOUS)
# 批量写入多个点
write_api.write(bucket="mybucket", record=points_list)
TimescaleDB:
# 使用execute_values批量插入
from psycopg2.extras import execute_values
execute_values(
cur,
"INSERT INTO sensor_data VALUES %s",
records,
page_size=1000
)
问题4:查询性能差¶
现象: 查询响应时间长
解决方法:
-- TimescaleDB: 使用EXPLAIN分析查询
EXPLAIN ANALYZE
SELECT * FROM sensor_data
WHERE device_id = 'dev001'
AND time > NOW() - INTERVAL '1 hour';
-- 添加索引
CREATE INDEX ON sensor_data (device_id, time DESC);
-- 限制查询时间范围
-- ❌ 不好
SELECT * FROM sensor_data WHERE device_id = 'dev001';
-- ✅ 好
SELECT * FROM sensor_data
WHERE device_id = 'dev001'
AND time > NOW() - INTERVAL '1 hour';
问题5:磁盘空间不足¶
现象:
解决方法:
# 1. 查看磁盘使用
df -h
# 2. 查看数据库大小
docker exec influxdb du -sh /var/lib/influxdb2
docker exec timescaledb du -sh /var/lib/postgresql/data
# 3. 设置数据保留策略
# InfluxDB: 在创建bucket时设置retention
# TimescaleDB: 使用retention policy
# 4. 手动清理旧数据
# InfluxDB
docker exec influxdb influx delete \
--bucket mybucket \
--start 1970-01-01T00:00:00Z \
--stop 2026-01-01T00:00:00Z
# TimescaleDB
docker exec timescaledb psql -U postgres -c \
"DELETE FROM sensor_data WHERE time < NOW() - INTERVAL '30 days';"
问题6:MQTT连接失败¶
现象:
解决方法:
# 1. 检查Mosquitto容器
docker ps | grep mosquitto
# 2. 查看日志
docker logs mosquitto
# 3. 测试MQTT连接
docker exec mosquitto mosquitto_sub -t "sensors/#" -v
# 4. 检查配置文件
cat mosquitto.conf
最佳实践¶
1. 数据建模¶
✅ 好的做法:
InfluxDB:
- 使用tag存储元数据(设备ID、位置、类型)
- 使用field存储测量值(温度、湿度、压力)
- tag值应该是有限的(低基数)
- 合理命名measurement
TimescaleDB:
- 时间列必须是TIMESTAMPTZ类型
- 为常用查询字段创建索引
- 使用合适的chunk时间间隔
- 考虑使用压缩
❌ 避免的做法:
- 在tag中存储高基数数据(如UUID、时间戳)
- 过多的tag(影响查询性能)
- 不设置数据保留策略
- 频繁的单条写入
2. 性能优化¶
# ✅ 批量写入
points = []
for i in range(1000):
points.append(create_point(i))
write_api.write(bucket="mybucket", record=points)
# ❌ 单条写入
for i in range(1000):
write_api.write(bucket="mybucket", record=create_point(i))
3. 查询优化¶
-- ✅ 限制时间范围
SELECT * FROM sensor_data
WHERE time > NOW() - INTERVAL '1 hour'
AND device_id = 'dev001';
-- ❌ 全表扫描
SELECT * FROM sensor_data
WHERE device_id = 'dev001';
-- ✅ 使用聚合减少数据量
SELECT
time_bucket('5 minutes', time) AS bucket,
AVG(temperature) as avg_temp
FROM sensor_data
WHERE time > NOW() - INTERVAL '24 hours'
GROUP BY bucket;
4. 数据保留¶
# InfluxDB: 创建bucket时设置
influx bucket create \
--name short_term \
--retention 7d
# TimescaleDB: 添加保留策略
SELECT add_retention_policy('sensor_data', INTERVAL '30 days');
5. 监控和告警¶
-- 监控数据写入速率
SELECT
time_bucket('1 minute', time) AS bucket,
COUNT(*) as records_per_minute
FROM sensor_data
WHERE time > NOW() - INTERVAL '1 hour'
GROUP BY bucket
ORDER BY bucket DESC;
-- 检测异常值
SELECT * FROM sensor_data
WHERE temperature > 50 OR temperature < -10
AND time > NOW() - INTERVAL '1 hour';
总结¶
通过本教程,你已经学习了时序数据库的核心内容:
核心要点回顾¶
时序数据库基础: - ✅ 时序数据的特点和应用场景 - ✅ 时序数据库 vs 传统数据库 - ✅ InfluxDB和TimescaleDB的特点
InfluxDB: - ✅ 安装和配置 - ✅ 数据模型(Organization、Bucket、Measurement、Tags、Fields) - ✅ 使用Flux查询语言 - ✅ 数据写入和查询 - ✅ 数据保留策略
TimescaleDB: - ✅ 安装和配置 - ✅ Hypertable概念 - ✅ 使用标准SQL查询 - ✅ 数据压缩和保留 - ✅ 性能优化
实践应用: - ✅ 物联网数据采集系统 - ✅ MQTT集成 - ✅ 批量写入优化 - ✅ 查询性能优化 - ✅ 数据可视化
实践成果¶
完成本教程后,你应该能够: 1. 选择合适的时序数据库 2. 设计时序数据模型 3. 高效地写入和查询时序数据 4. 优化时序数据库性能 5. 构建完整的物联网数据采集系统
下一步学习建议¶
- 深入学习
- 数据可视化(Grafana)
- 实时流处理(Kafka Streams)
- 大数据处理(Spark)
-
机器学习应用
-
实践项目
- 智能家居监控系统
- 工业设备监控平台
- 环境监测系统
-
能源管理系统
-
进阶主题
- 高可用集群部署
- 数据备份和恢复
- 安全加固
- 性能调优
进阶挑战¶
尝试以下挑战来巩固学习:
挑战1:构建环境监测系统(90分钟)¶
任务: 1. 模拟多个环境传感器(温度、湿度、PM2.5、CO2) 2. 使用MQTT传输数据 3. 同时写入InfluxDB和TimescaleDB 4. 实现数据聚合和统计 5. 创建Grafana仪表板
要求: - 至少5个模拟设备 - 每秒产生数据 - 实时显示最新数据 - 显示历史趋势图 - 实现异常告警
挑战2:性能压测(60分钟)¶
任务: 1. 编写压测脚本 2. 测试InfluxDB和TimescaleDB的写入性能 3. 测试不同批量大小的影响 4. 对比两个数据库的查询性能 5. 生成性能报告
目标: - 达到10000条/秒的写入速率 - 查询响应时间 < 100ms - 资源使用率 < 80%
挑战3:数据分析应用(120分钟)¶
任务: 1. 收集一周的模拟数据 2. 实现数据清洗和预处理 3. 计算统计指标(均值、方差、异常值) 4. 实现数据降采样 5. 生成数据分析报告
要求: - 使用Python进行数据分析 - 使用Pandas处理数据 - 生成可视化图表 - 导出分析结果
常见问题¶
Q1: InfluxDB和TimescaleDB应该选择哪个?¶
A: 根据需求选择:
选择InfluxDB: - 纯时序数据场景 - 需要简单易用的解决方案 - 不需要复杂的关联查询 - 团队不熟悉SQL
选择TimescaleDB: - 需要SQL兼容性 - 有复杂的关联查询需求 - 已有PostgreSQL经验 - 需要事务支持
两者都用: - 大型系统可以同时使用 - InfluxDB用于实时监控 - TimescaleDB用于长期存储和分析
Q2: 如何选择数据保留期?¶
A: 考虑以下因素:
- 业务需求
- 法规要求(如医疗数据需保留7年)
-
分析需求(需要多长时间的历史数据)
-
存储成本
- 磁盘空间限制
-
云存储费用
-
查询性能
- 数据量越大,查询越慢
- 考虑使用降采样
建议: - 原始数据:7-30天 - 降采样数据(5分钟):90天 - 降采样数据(1小时):1年 - 降采样数据(1天):永久
Q3: 如何处理数据丢失?¶
A: 多层保护:
- 应用层
- 实现重试机制
-
使用消息队列缓冲
-
数据库层
- 定期备份
- 主从复制
-
集群部署
-
监控层
- 监控写入速率
- 告警异常情况
# 实现重试机制
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
def write_data(point):
write_api.write(bucket="mybucket", record=point)
Q4: 如何优化查询性能?¶
A: 多方面优化:
-
限制时间范围
-
使用索引
-
使用聚合
-
使用连续聚合(TimescaleDB)
Q5: 如何备份时序数据?¶
A: 备份策略:
InfluxDB:
# 使用influx backup
docker exec influxdb influx backup /backup/
# 恢复
docker exec influxdb influx restore /backup/
TimescaleDB:
# 使用pg_dump
docker exec timescaledb pg_dump -U postgres > backup.sql
# 恢复
docker exec -i timescaledb psql -U postgres < backup.sql
建议: - 每天自动备份 - 保留多个备份版本 - 定期测试恢复流程 - 异地备份
延伸阅读¶
推荐资源¶
官方文档: - InfluxDB官方文档 - TimescaleDB官方文档 - Flux查询语言指南
在线教程: - InfluxDB University - TimescaleDB教程
书籍推荐: - 《时序数据库原理与实践》 - 《InfluxDB实战》 - 《PostgreSQL实战》
视频课程: - InfluxDB官方YouTube频道 - TimescaleDB官方YouTube频道
相关技术¶
数据可视化: - Grafana - 时序数据可视化 - Kibana - 日志可视化 - Tableau - 商业智能
流处理: - Apache Kafka - 消息队列 - Apache Flink - 流处理 - Kafka Streams - 流处理库
监控工具: - Prometheus - 监控系统 - Telegraf - 数据采集 - Chronograf - InfluxDB可视化
参考资料¶
- InfluxDB官方文档 - https://docs.influxdata.com/
- TimescaleDB官方文档 - https://docs.timescale.com/
- Flux查询语言参考 - https://docs.influxdata.com/flux/v0.x/
- PostgreSQL文档 - https://www.postgresql.org/docs/
- 时序数据库对比 - https://db-engines.com/en/ranking/time+series+dbms
反馈与支持: - 如果你在学习过程中遇到问题,欢迎在评论区留言 - 发现文档错误或有改进建议,请提交Issue - 想要分享你的时序数据库实践经验,欢迎投稿
下一步学习: - 数据分析工具使用 - 学习Pandas和NumPy - 可视化框架应用 - 学习Grafana和Kibana - 实时数据流处理 - 学习Kafka Streams和Flink - 大数据处理技术 - 学习Hadoop和Spark
版权声明: 本文采用 CC BY-NC-SA 4.0 许可协议,欢迎分享和改编,但请注明出处。