工業環境離線數據緩存解決方案:C# SQLite 實踐指南
作者:iamrick
你是不是也遇到過類似的問題?工業物聯網項目都面臨網絡不穩定導致的數據丟失風險。今天就來分享一套完整的C#離線數據緩存解決方案,讓你的工業應用在斷網情況下也能穩如泰山!
你是不是也遇到過類似的問題?工業物聯網項目都面臨網絡不穩定導致的數據丟失風險。今天就來分享一套完整的C#離線數據緩存解決方案,讓你的工業應用在斷網情況下也能穩如泰山!
問題分析:工業環境的數據挑戰
典型痛點
- 網絡不穩定工廠環境信號差,經常斷網
- 數據量大設備24小時采集,數據量驚人
- 實時性要求高生產數據不能丟失
- 存儲成本云端存儲費用昂貴
核心需求
- 離線時本地存儲數據
- 網絡恢復后自動同步
- 數據完整性保證
- 高性能讀寫操作
解決方案:SQLite + C# 完美組合
技術選型理由
為什么選擇SQLite?
- ? 零配置,單文件數據庫
- ? 高并發讀寫性能
- ? 事務支持,保證數據一致性
- ? 跨平臺,適合嵌入式環境
?? 核心實現:分步驟詳解
步驟1:數據模型設計
// 工業數據模型 - 簡潔而全面
public class IndustrialDataModel
{
public int Id { get; set; } // 主鍵ID
public DateTime Timestamp { get; set; } // 采集時間戳
public string DeviceId { get; set; } // 設備編號
public double Temperature { get; set; } // 溫度值
public double Pressure { get; set; } // 壓力值
public bool IsSynced { get; set; } // 同步狀態標記
}
設計亮點:
IsSynced
字段是關鍵,標記數據是否已同步- 時間戳精確到毫秒,保證數據時序性
- 字段類型選擇兼顧性能和精度
步驟2:本地緩存倉儲實現
using System;
using System.Collections.Generic;
using System.Data.SQLite;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace AppSqliteLocalCache
{
publicclass LocalCacheRepository
{
privatestring _connectionString;
public LocalCacheRepository(string dbPath)
{
// 構造SQLite連接字符串
_connectionString = $"Data Source={dbPath};Version=3;";
InitializeDatabase();
}
/// <summary>
/// 初始化數據庫表結構 - 首次運行自動創建
/// </summary>
private void InitializeDatabase()
{
using (var connection = new SQLiteConnection(_connectionString))
{
connection.Open();
using (var command = new SQLiteCommand(connection))
{
// 創建工業數據表,支持自增主鍵
command.CommandText = @"
CREATE TABLE IF NOT EXISTS IndustrialData (
Id INTEGER PRIMARY KEY AUTOINCREMENT,
Timestamp DATETIME,
DeviceId TEXT,
Temperature REAL,
Pressure REAL,
IsSynced INTEGER
)";
command.ExecuteNonQuery();
}
}
}
/// <summary>
/// 插入采集數據 - 高性能參數化查詢
/// </summary>
public void InsertData(IndustrialDataModel data)
{
using (var connection = new SQLiteConnection(_connectionString))
{
connection.Open();
using (var command = new SQLiteCommand(connection))
{
// 使用參數化查詢防止SQL注入
command.CommandText = @"
INSERT INTO IndustrialData
(Timestamp, DeviceId, Temperature, Pressure, IsSynced)
VALUES (@Timestamp, @DeviceId, @Temperature, @Pressure, @IsSynced)";
command.Parameters.AddWithValue("@Timestamp", data.Timestamp);
command.Parameters.AddWithValue("@DeviceId", data.DeviceId);
command.Parameters.AddWithValue("@Temperature", data.Temperature);
command.Parameters.AddWithValue("@Pressure", data.Pressure);
command.Parameters.AddWithValue("@IsSynced", data.IsSynced ? 1 : 0);
command.ExecuteNonQuery();
}
}
}
/// <summary>
/// 獲取未同步數據 - 網絡恢復后批量上傳
/// </summary>
public List<IndustrialDataModel> GetUnsyncedData()
{
var unsyncedData = new List<IndustrialDataModel>();
using (var connection = new SQLiteConnection(_connectionString))
{
connection.Open();
using (var command = new SQLiteCommand("SELECT * FROM IndustrialData WHERE IsSynced = 0", connection))
{
using (var reader = command.ExecuteReader())
{
while (reader.Read())
{
// 安全的數據類型轉換
unsyncedData.Add(new IndustrialDataModel
{
Id = Convert.ToInt32(reader["Id"]),
Timestamp = Convert.ToDateTime(reader["Timestamp"]),
DeviceId = reader["DeviceId"].ToString(),
Temperature = Convert.ToDouble(reader["Temperature"]),
Pressure = Convert.ToDouble(reader["Pressure"]),
IsSynced = Convert.ToBoolean(reader["IsSynced"])
});
}
}
}
}
return unsyncedData;
}
/// <summary>
/// 批量標記為已同步 - 事務保證數據一致性
/// </summary>
public void MarkAsSynced(List<int> ids)
{
using (var connection = new SQLiteConnection(_connectionString))
{
connection.Open();
// 使用事務確保批量操作的原子性
using (var transaction = connection.BeginTransaction())
{
foreach (var id in ids)
{
using (var command = new SQLiteCommand(connection))
{
command.CommandText = "UPDATE IndustrialData SET IsSynced = 1 WHERE Id = @Id";
command.Parameters.AddWithValue("@Id", id);
command.ExecuteNonQuery();
}
}
transaction.Commit(); // 批量提交,提高性能
}
}
}
}
}
步驟3:數據同步服務
我這里是用的http post,實際用mq效果會更好。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
namespace AppSqliteLocalCache
{
publicclass DataSyncService
{
private readonly LocalCacheRepository _repository;
private readonly HttpClient _httpClient;
private readonly string _apiEndpoint;
public DataSyncService(LocalCacheRepository repository, string apiEndpoint)
{
_repository = repository;
_apiEndpoint = apiEndpoint;
_httpClient = new HttpClient();
}
/// <summary>
/// 智能同步:檢測網絡狀態并自動同步數據
/// </summary>
public async Task<bool> TrySyncData()
{
try
{
// 獲取待同步數據
var unsyncedData = _repository.GetUnsyncedData();
if (unsyncedData.Count == 0)
{
Console.WriteLine("? 無待同步數據");
returntrue;
}
// 分批上傳,避免單次請求過大
constint batchSize = 100;
var syncedIds = new List<int>();
for (int i = 0; i < unsyncedData.Count; i += batchSize)
{
var batch = unsyncedData.GetRange(i, Math.Min(batchSize, unsyncedData.Count - i));
if (await UploadBatch(batch))
{
// 記錄成功同步的ID
foreach (var item in batch)
{
syncedIds.Add(item.Id);
}
}
else
{
Console.WriteLine($"? 批次 {i / batchSize + 1} 同步失敗");
break;
}
}
// 更新同步狀態
if (syncedIds.Count > 0)
{
_repository.MarkAsSynced(syncedIds);
Console.WriteLine($"? 成功同步 {syncedIds.Count} 條數據");
}
return syncedIds.Count == unsyncedData.Count;
}
catch (Exception ex)
{
Console.WriteLine($"? 同步異常: {ex.Message}");
returnfalse;
}
}
/// <summary>
/// 批量上傳數據到云端API
/// </summary>
private async Task<bool> UploadBatch(List<IndustrialDataModel> batch)
{
try
{
var json = JsonSerializer.Serialize(batch);
var content = new StringContent(json, System.Text.Encoding.UTF8, "application/json");
var response = await _httpClient.PostAsync(_apiEndpoint, content);
return response.IsSuccessStatusCode;
}
catch
{
returnfalse; // 網絡異常返回失敗
}
}
public void Dispose()
{
_httpClient?.Dispose();
}
}
}
步驟4:完整應用示例
using System.Text;
namespace AppSqliteLocalCache
{
internal class Program
{
static async Task Main(string[] args)
{
Console.OutputEncoding = Encoding.UTF8;
// 初始化本地緩存
var repository = new LocalCacheRepository("industrial_data.db");
var syncService = new DataSyncService(repository, "https://localhost:7284/api/Industrial");
Console.WriteLine("?? 工業數據采集系統啟動");
// 模擬數據采集和同步
var cts = new CancellationTokenSource();
// 啟動數據采集任務
var dataCollectionTask = StartDataCollection(repository, cts.Token);
// 啟動定時同步任務
var syncTask = StartPeriodicSync(syncService, cts.Token);
Console.WriteLine("按任意鍵停止系統...");
Console.ReadKey();
cts.Cancel();
await Task.WhenAll(dataCollectionTask, syncTask);
Console.WriteLine("?? 系統已停止");
}
/// <summary>
/// 模擬設備數據采集
/// </summary>
static async Task StartDataCollection(LocalCacheRepository repository, CancellationToken token)
{
var random = new Random();
while (!token.IsCancellationRequested)
{
try
{
// 模擬多設備數據采集
for (int deviceId = 1; deviceId <= 5; deviceId++)
{
var data = new IndustrialDataModel
{
Timestamp = DateTime.Now,
DeviceId = $"DEVICE_{deviceId:D3}",
Temperature = 20 + random.NextDouble() * 60, // 20-80°C
Pressure = 1 + random.NextDouble() * 9, // 1-10 bar
IsSynced = false
};
repository.InsertData(data);
}
Console.WriteLine($"?? {DateTime.Now:HH:mm:ss} 采集5個設備數據");
await Task.Delay(5000, token); // 每5秒采集一次
}
catch (OperationCanceledException)
{
break;
}
}
}
/// <summary>
/// 定時同步任務
/// </summary>
static async Task StartPeriodicSync(DataSyncService syncService, CancellationToken token)
{
while (!token.IsCancellationRequested)
{
try
{
await syncService.TrySyncData();
await Task.Delay(30000, token); // 每30秒嘗試同步
}
catch (OperationCanceledException)
{
break;
}
}
}
}
}
圖片
性能優化技巧
數據庫優化
// 1. 創建索引提升查詢性能
private void CreateIndexes()
{
using (var connection = new SQLiteConnection(_connectionString))
{
connection.Open();
using (var command = new SQLiteCommand(connection))
{
// 為常用查詢字段創建索引
command.CommandText = "CREATE INDEX IF NOT EXISTS idx_sync_status ON IndustrialData(IsSynced)";
command.ExecuteNonQuery();
command.CommandText = "CREATE INDEX IF NOT EXISTS idx_timestamp ON IndustrialData(Timestamp)";
command.ExecuteNonQuery();
}
}
}
// 2. 批量插入優化
public void BatchInsert(List<IndustrialDataModel> dataList)
{
using (var connection = new SQLiteConnection(_connectionString))
{
connection.Open();
using (var transaction = connection.BeginTransaction())
{
using (var command = new SQLiteCommand(connection))
{
command.CommandText = @"
INSERT INTO IndustrialData
(Timestamp, DeviceId, Temperature, Pressure, IsSynced)
VALUES (@Timestamp, @DeviceId, @Temperature, @Pressure, @IsSynced)";
foreach (var data in dataList)
{
command.Parameters.Clear();
command.Parameters.AddWithValue("@Timestamp", data.Timestamp);
command.Parameters.AddWithValue("@DeviceId", data.DeviceId);
command.Parameters.AddWithValue("@Temperature", data.Temperature);
command.Parameters.AddWithValue("@Pressure", data.Pressure);
command.Parameters.AddWithValue("@IsSynced", data.IsSynced ? 1 : 0);
command.ExecuteNonQuery();
}
transaction.Commit(); // 批量提交大幅提升性能
}
}
}
}
生產環境注意事項
數據安全
- 定期備份設置自動備份機制
- 數據加密敏感數據需要加密存儲
- 訪問控制限制數據庫文件訪問權限
監控告警
// 監控數據庫大小,及時清理歷史數據
public long GetDatabaseSize(string dbPath)
{
var fileInfo = new FileInfo(dbPath);
return fileInfo.Length;
}
// 清理已同步的歷史數據
public void CleanupSyncedData(DateTime olderThan)
{
using (var connection = new SQLiteConnection(_connectionString))
{
connection.Open();
using (var command = new SQLiteCommand(connection))
{
command.CommandText = "DELETE FROM IndustrialData WHERE IsSynced = 1 AND Timestamp < @OlderThan";
command.Parameters.AddWithValue("@OlderThan", olderThan);
var deletedRows = command.ExecuteNonQuery();
Console.WriteLine($"??? 清理了 {deletedRows} 條歷史數據");
}
}
}
總結:三個核心要點
- 架構設計SQLite輕量級數據庫 + 參數化查詢 + 事務保證數據一致性
- 同步策略離線緩存 + 定時同步 + 分批上傳提升效率
- 性能優化索引優化 + 批量操作 + 定期清理保持高性能
這套解決方案已在多個工業項目中驗證,數據零丟失率達到99.9%以上!無論是制造業、能源行業還是智慧農業,都能輕松適配。
責任編輯:武曉燕
來源:
技術老小子