qbkj/pypi/: qbtsdb-0.1.0 metadata and description
| author | makaspacex |
| author_email | makaspacex@outlook.com |
| classifiers |
|
| description_content_type | text/markdown |
| license | LICENSE |
| metadata_version | 2.3 |
| requires_dist |
|
| requires_python | >=3.12,<4.0 |
Because this project isn't in the mirror_whitelist,
no releases from root/pypi are included.
| File | Tox results | History |
|---|---|---|
qbtsdb-0.1.0-py3-none-any.whl
|
|
|
qbtsdb-0.1.0.tar.gz
|
|
qbtsdb
面向工业时序数据的数据库读写工具包,聚焦 InfluxDB 2 与 TimescaleDB。通过 FieldSpec 模型化字段定义,统一了数据写入、查询、补点、类型转换等常见工作流。
特性
- InfluxDB 2:用类定义 measurement/tag/field,
save()自动序列化为 line protocol,read_last/read_last_dataframe支持窗口聚合、缺失补点、类型转换。 - TimescaleDB:基于 Django ORM 的
time_bucket_gapfill包装,内置 last/locf/interpolate 填充策略,读取即得对齐的时间序列。 - 数据补全:支持按频率生成完整时间索引,对数值列插值或前值填充;字符串列可 last/locf 填充。
- 灵活来源:支持多数据源配置、自定义 InfluxDBClient 创建逻辑;TimescaleDB 读取可叠加额外过滤。
- 友好输出:可返回延迟实例化序列(
InfluxQuerySet)、Pandas DataFrame 或 values 风格字典,便于后续处理或序列化。
安装
项目基于 Python 3.12+。在当前仓库下即可安装:
pip install -e .
# 或使用 poetry
poetry install
InfluxDB 2 快速开始
- 定义数据源与模型:
from datetime import datetime, timedelta
from qbtsdb.influxdb2 import InfluxDB2Base, FloatField, StringField, Fill, Agg
# 注册 InfluxDB 连接(可配置多源)
InfluxDB2Base.configure_sources({
"default": {
"url": "http://localhost:8086",
"token": "<your-token>",
"org": "demo",
"bucket": "sensor",
}
})
class Sensor(InfluxDB2Base):
measurement = "sensor"
timezone = "Asia/Shanghai"
# tags 至少包含一个键(常用 name),field 为存储层 _field 名
temp = FloatField(tags={"name": "temp"}, field="value", fill=Fill.last, agg=Agg.mean, label="温度")
status = StringField(tags={"name": "status"}, field="svalue", fill=Fill.last, agg=Agg.last)
- 写入数据:
sensor = Sensor(temp=23.5, status="ok", time=datetime.now())
sensor.save() # 同步写入,返回 line protocol 字符串
# 调试时也可以只生成 LP:sensor.to_line_protocol()
- 读取最新窗口数据:
# 相对时间字符串(-10m/-2h)或 datetime 均可;freq 为聚合/补点周期
qs = Sensor.read_last(start_time="-10m", freq="10s", fields=["temp", "status"])
for row in qs: # InfluxQuerySet,按需实例化
print(row.time, row.temp, row.status)
df = Sensor.read_last_dataframe(
start_time=datetime.now() - timedelta(hours=1),
freq="1m",
fill_initial=True, # 边界也做 bfill/ffill
)
print(df.head())
常用参数:
create_empty_in_flux:在 Flux 端补空桶(默认开启),可配合本地 pandas 补点。do_gapfill:是否在 pandas 侧补点(支持 Fill.none/last/interpolate)。rename_columns_to_fieldname:把 tag 值列重命名为类字段名,便于直接访问。lazy:read_last返回的 InfluxQuerySet 是否提前实例化缓存。tolerance:容忍“当前时间-最新时间”的秒数,超过则抛异常。
TimescaleDB(Django)快速开始
将 DBaseModel 与 Django models.Model 混入,定义逻辑字段映射到存储的点位:
from django.db import models
from qbtsdb.timescaledb import DBaseModel, FieldSpec, Fill, Agg
class Metric(DBaseModel, models.Model):
time = models.DateTimeField()
name = models.CharField(max_length=64)
value = models.FloatField()
svalue = models.TextField(null=True, blank=True)
temp = FieldSpec(point="sensor_temp", dtype=float, fill=Fill.interpolate, agg=Agg.avg)
status = FieldSpec(point="sensor_status", dtype=str, fill=Fill.locf, agg=Agg.last)
# 读取按桶聚合且缺失补点后的 values QuerySet(dict)
rows = Metric.readlatest(
start=datetime(2025, 1, 1, 0, 0),
end=datetime(2025, 1, 1, 1, 0),
resolution="10 seconds",
fields=["temp", "status"],
extra_q=models.Q(name__startswith="sensor"), # 可选附加过滤
)
for r in rows:
print(r["ts"], r["temp"], r["status"])
说明:
- 内置 TimescaleDB 函数包装
time_bucket_gapfill、last、locf、interpolate,便于在 ORM 中使用。 FieldSpec.dtype支持float/str,fill可选none/locf/interpolate(插值仅数值),agg支持avg/max/min/last。readlatest会自动将 naive datetime 转为当前时区 aware 对象,避免时区混乱。
目录
qbtsdb/influxdb2/:InfluxDB 2 读写封装(字段定义、查询构造、补点、查询结果序列)。qbtsdb/timescaledb/:TimescaleDB + Django 读接口与函数包装。qbtsdb/influxdb2/example/:最小写入示例。
许可证
本项目使用 MIT License,详见 LICENSE。