C# LiteDB 時間序列數據處理完整指南
作者:iamrick
LiteDB 提供了一個輕量、靈活的解決方案,用于在 C# 中處理時間序列數據。通過合理的數據模型和查詢策略,可以高效地存儲和分析時間序列信息。
時間序列數據是一種特殊的數據類型,它按照時間順序記錄和組織數據點,每個數據點都與特定的時間戳相關聯。這種數據形式在現代數據分析中扮演著極其重要的角色。
主要特征:
- 時間維度:每個數據點都有明確的時間標記
- 順序性:數據按時間先后順序排列
- 連續性:通常以固定或變化的時間間隔收集
- 趨勢性:可能展現出特定的模式、周期或趨勢
應用場景舉例:
- 工業領域:設備傳感器實時監測數據,如溫度、壓力、振動等
- 金融市場:股票價格、匯率、交易量的每日記錄
- 環境監測:氣溫、濕度、空氣質量等定期采樣數據
- 互聯網運維:服務器性能指標、網絡流量、用戶訪問量等
- 物聯網設備:智能家居設備的使用數據、能源消耗記錄
Nuget 安裝LiteDB包
圖片
數據模型設計
時間序列數據模型
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using LiteDB;
namespace App07
{
/// <summary>
/// 時間序列數據點模型
/// </summary>
public class TimeSeriesDataPoint
{
/// <summary>
/// 唯一標識符
/// </summary>
[BsonId]
public ObjectId Id { get; set; }
/// <summary>
/// 數據記錄時間戳
/// </summary>
public DateTime Timestamp { get; set; }
/// <summary>
/// 數據源或傳感器標識
/// </summary>
public string SourceId { get; set; }
/// <summary>
/// 數值類型數據
/// </summary>
public double Value { get; set; }
/// <summary>
/// 可選的額外元數據
/// </summary>
public Dictionary<string, object> Metadata { get; set; }
}
}
LiteDB 時間序列數據操作
數據插入
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using LiteDB;
namespace App07
{
public class TimeSeriesRepository
{
private readonly LiteDatabase _database;
private readonly ILiteCollection<TimeSeriesDataPoint> _collection;
public LiteDatabase Database => _database;
public TimeSeriesRepository(string databasePath)
{
_database = new LiteDatabase(databasePath);
_collection = _database.GetCollection<TimeSeriesDataPoint>("time_series_data");
// 創建時間戳和源ID的復合索引
_collection.EnsureIndex(x => x.Timestamp);
_collection.EnsureIndex(x => x.SourceId);
}
/// <summary>
/// 插入單個數據點
/// </summary>
public void InsertDataPoint(TimeSeriesDataPoint dataPoint)
{
_collection.Insert(dataPoint);
}
/// <summary>
/// 批量插入數據點
/// </summary>
public void InsertBatch(IEnumerable<TimeSeriesDataPoint> dataPoints)
{
_collection.InsertBulk(dataPoints);
}
}
}
數據查詢
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using LiteDB;
namespace App07
{
public class TimeSeriesQuery
{
private readonly ILiteCollection<TimeSeriesDataPoint> _collection;
public TimeSeriesQuery(LiteDatabase database)
{
_collection = database.GetCollection<TimeSeriesDataPoint>("time_series_data");
}
/// <summary>
/// 查詢特定時間范圍內的數據
/// </summary>
public IEnumerable<TimeSeriesDataPoint> GetDataInTimeRange(
DateTime startTime,
DateTime endTime,
string sourceId = null)
{
var query = Query.And(
Query.GTE("Timestamp", startTime),
Query.LTE("Timestamp", endTime)
);
if (!string.IsNullOrEmpty(sourceId))
{
query = Query.And(query, Query.EQ("SourceId", sourceId));
}
return _collection.Find(query);
}
/// <summary>
/// 獲取最近的數據點
/// </summary>
public TimeSeriesDataPoint GetLatestDataPoint(string sourceId)
{
return _collection
.Find(x => x.SourceId == sourceId)
.OrderByDescending(x => x.Timestamp)
.FirstOrDefault();
}
}
}
數據聚合與分析
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using LiteDB;
namespace App07
{
public class TimeSeriesAnalytics
{
private readonly ILiteCollection<TimeSeriesDataPoint> _collection;
public TimeSeriesAnalytics(LiteDatabase database)
{
_collection = database.GetCollection<TimeSeriesDataPoint>("time_series_data");
}
/// <summary>
/// 計算時間范圍內的統計數據
/// </summary>
public (double Min, double Max, double Average)
CalculateStatistics(DateTime startTime, DateTime endTime, string sourceId)
{
var data = _collection.Find(x =>
x.Timestamp >= startTime &&
x.Timestamp <= endTime &&
x.SourceId == sourceId);
return (
data.Min(x => x.Value),
data.Max(x => x.Value),
data.Average(x => x.Value)
);
}
/// <summary>
/// 按時間間隔聚合數據
/// </summary>
public IEnumerable<(DateTime Interval, double AggregatedValue)>
AggregateByInterval(
DateTime startTime,
DateTime endTime,
string sourceId,
TimeSpan interval)
{
return _collection
.Find(x => x.Timestamp >= startTime &&
x.Timestamp <= endTime &&
x.SourceId == sourceId)
.GroupBy(x => Math.Floor((x.Timestamp - startTime).TotalMinutes / interval.TotalMinutes))
.Select(g => (
Interval: startTime.AddMinutes(g.Key * interval.TotalMinutes),
AggregatedValue: g.Average(x => x.Value)
));
}
}
}
使用示例
namespace App07
{
internal class Program
{
static void Main(string[] args)
{
var repository = new TimeSeriesRepository("timeseries.db");
var query = new TimeSeriesQuery(repository.Database);
var analytics = new TimeSeriesAnalytics(repository.Database);
// 創建一個隨機數生成器,用于模擬傳感器數據
var random = new Random();
Console.WriteLine("開始模擬數據寫入,按 'Q' 鍵退出...");
// 創建一個取消令牌源
using var cancellationTokenSource = new CancellationTokenSource();
// 啟動異步任務來寫入數據
Task.Run(async () =>
{
while (!cancellationTokenSource.Token.IsCancellationRequested)
{
var dataPoint = new TimeSeriesDataPoint
{
Timestamp = DateTime.Now,
SourceId = "sensor_001",
Value = 20 + random.NextDouble() * 10, // 生成20-30之間的隨機溫度
Metadata = new Dictionary<string, object>
{
{ "Location", "Main Room" },
{ "Unit", "Celsius" }
}
};
repository.InsertDataPoint(dataPoint);
Console.WriteLine($"寫入數據點 - 時間: {dataPoint.Timestamp}, 值: {dataPoint.Value:F2}°C");
// 等待1秒
await Task.Delay(1000, cancellationTokenSource.Token);
}
}, cancellationTokenSource.Token);
// 等待用戶按 'Q' 鍵退出
while (Console.ReadKey(true).Key != ConsoleKey.Q)
{
// 繼續循環直到按下 'Q' 鍵
}
// 取消數據寫入任務
cancellationTokenSource.Cancel();
Console.WriteLine("\n數據寫入已停止");
// 顯示一些統計信息
try
{
var lastMinute = DateTime.Now.AddMinutes(-1);
var stats = analytics.CalculateStatistics(
lastMinute,
DateTime.Now,
"sensor_001"
);
Console.WriteLine("\n最近一分鐘的統計數據:");
Console.WriteLine($"最小值: {stats.Min:F2}°C");
Console.WriteLine($"最大值: {stats.Max:F2}°C");
Console.WriteLine($"平均值: {stats.Average:F2}°C");
}
catch (Exception ex)
{
Console.WriteLine($"計算統計數據時出錯: {ex.Message}");
}
var result = query.GetLatestDataPoint("sensor_001");
Console.WriteLine($"\n最新數據點: 時間: {result.Timestamp}, 值: {result.Value:F2}°C");
Console.WriteLine("\n按任意鍵退出...");
Console.ReadKey();
}
}
}
圖片
圖片
注意事項
- LiteDB 適合中小型時間序列數據
- 對于海量數據,考慮使用專業的時序數據庫
- 定期備份數據庫文件
- 注意內存使用和查詢性能
- 使用復合索引
- 定期歸檔和清理舊數據
- 批量插入數據
- 選擇合適的數據粒度
- 考慮數據壓縮和分區策略
結論
LiteDB 提供了一個輕量、靈活的解決方案,用于在 C# 中處理時間序列數據。通過合理的數據模型和查詢策略,可以高效地存儲和分析時間序列信息。
責任編輯:武曉燕
來源:
技術老小子