qbkj/pypi/: qbtsdb-0.1.0 metadata and description

Simple index

author makaspacex
author_email makaspacex@outlook.com
classifiers
  • License :: Other/Proprietary License
  • Programming Language :: Python :: 3
  • Programming Language :: Python :: 3.12
  • Programming Language :: Python :: 3.13
description_content_type text/markdown
license LICENSE
requires_dist
  • influxdb-client (>=1.49.0,<2.0.0)
  • pandas (>=2.3.3,<3.0.0)
  • django (==5.2)
requires_python >=3.12,<4.0

Because this project isn't in the mirror_whitelist, no releases from root/pypi are included.

File Tox results History
qbtsdb-0.1.0-py3-none-any.whl
Size
25 KB
Type
Python Wheel
Python
3
  • Replaced 1 time(s)
  • Uploaded to qbkj/pypi by qbkj 2025-12-22 05:27:10
qbtsdb-0.1.0.tar.gz
Size
22 KB
Type
Source
  • Replaced 1 time(s)
  • Uploaded to qbkj/pypi by qbkj 2025-12-22 05:27:10

qbtsdb

面向工业时序数据的数据库读写工具包,聚焦 InfluxDB 2 与 TimescaleDB。通过 FieldSpec 模型化字段定义,统一了数据写入、查询、补点、类型转换等常见工作流。

特性

安装

项目基于 Python 3.12+。在当前仓库下即可安装:

pip install -e .
# 或使用 poetry
poetry install

InfluxDB 2 快速开始

  1. 定义数据源与模型:
from datetime import datetime, timedelta
from qbtsdb.influxdb2 import InfluxDB2Base, FloatField, StringField, Fill, Agg

# 注册 InfluxDB 连接(可配置多源)
InfluxDB2Base.configure_sources({
    "default": {
        "url": "http://localhost:8086",
        "token": "<your-token>",
        "org": "demo",
        "bucket": "sensor",
    }
})

class Sensor(InfluxDB2Base):
    measurement = "sensor"
    timezone = "Asia/Shanghai"
    # tags 至少包含一个键(常用 name),field 为存储层 _field 名
    temp = FloatField(tags={"name": "temp"}, field="value", fill=Fill.last, agg=Agg.mean, label="温度")
    status = StringField(tags={"name": "status"}, field="svalue", fill=Fill.last, agg=Agg.last)
  1. 写入数据:
sensor = Sensor(temp=23.5, status="ok", time=datetime.now())
sensor.save()              # 同步写入,返回 line protocol 字符串
# 调试时也可以只生成 LP:sensor.to_line_protocol()
  1. 读取最新窗口数据:
# 相对时间字符串(-10m/-2h)或 datetime 均可;freq 为聚合/补点周期
qs = Sensor.read_last(start_time="-10m", freq="10s", fields=["temp", "status"])
for row in qs:  # InfluxQuerySet,按需实例化
    print(row.time, row.temp, row.status)

df = Sensor.read_last_dataframe(
    start_time=datetime.now() - timedelta(hours=1),
    freq="1m",
    fill_initial=True,   # 边界也做 bfill/ffill
)
print(df.head())

常用参数:

TimescaleDB(Django)快速开始

DBaseModel 与 Django models.Model 混入,定义逻辑字段映射到存储的点位:

from django.db import models
from qbtsdb.timescaledb import DBaseModel, FieldSpec, Fill, Agg

class Metric(DBaseModel, models.Model):
    time = models.DateTimeField()
    name = models.CharField(max_length=64)
    value = models.FloatField()
    svalue = models.TextField(null=True, blank=True)

    temp = FieldSpec(point="sensor_temp", dtype=float, fill=Fill.interpolate, agg=Agg.avg)
    status = FieldSpec(point="sensor_status", dtype=str, fill=Fill.locf, agg=Agg.last)

# 读取按桶聚合且缺失补点后的 values QuerySet(dict)
rows = Metric.readlatest(
    start=datetime(2025, 1, 1, 0, 0),
    end=datetime(2025, 1, 1, 1, 0),
    resolution="10 seconds",
    fields=["temp", "status"],
    extra_q=models.Q(name__startswith="sensor"),  # 可选附加过滤
)
for r in rows:
    print(r["ts"], r["temp"], r["status"])

说明:

目录

许可证

本项目使用 MIT License,详见 LICENSE