处理工业物联网(IIoT)数据流的挑战不在于其总量,而在于其结构。一个典型的场景是数百万台设备,每台设备每秒上报数十个遥测点。这种“高基数”特性,即拥有大量唯一时间序列标识符(如设备ID),会迅速摧毁传统时序数据库的索引性能。更棘手的问题是,业务需求不仅仅是存储和查询最新状态,还需要完整的审计追溯、状态快照回放以及对历史数据的复杂分析。直接将数据写入一个数据库并进行更新(Update-in-place)的传统模型,在这种需求下显得脆弱且低效。
我们的团队面临的正是这样一个困境。我们需要构建一个系统,既能以极低延迟处理每秒数十万的事件写入,又要为分析团队提供灵活、高性能的查询能力,同时保证所有设备状态变更都是完全可追溯和可重现的。在真实项目中,这意味着任何一次状态变更都不能是破坏性的。我们必须放弃“更新”和“删除”操作,转向一种更为稳健的架构。
事件溯源(Event Sourcing)模式是解决这个问题的理论基础。其核心思想是,不存储对象的当前状态,而是存储导致该状态的所有变更事件的序列。系统的当前状态可以通过从头到尾重播这些事件来推导。这种模式的优势显而易见:天然的审计日志、轻松实现时间旅行(查询任意历史时刻的状态),以及通过构建不同的“投影”(Projections)来满足多样化查询需求的灵活性,这正是命令查询职责分离(CQRS)模式的体现。
理论是清晰的,但工程实现充满了权衡。我们的技术选型必须在 Azure 云平台上落地,并严格控制运营成本。
- 事件存储 (Event Store): 我们需要一个能持久化存储海量事件流的系统,它必须是不可变的日志结构。Apache Kafka 是一个备选,但 Apache Pulsar 的分层存储(Tiered Storage)特性引起了我们的注意。它能将热数据保留在高性能的 BookKeeper 集群中,同时自动将冷数据卸载到成本更低的对象存储(如 Azure Blob Storage)上。对于需要保存数年事件记录的场景,这在成本上是决定性的优势。
- 读模型/投影 (Read Model/Projection): 事件流本身不适合直接查询。我们需要一个专门为快速读取优化的数据库来存储投影。考虑到数据的时间序列特性和高基数挑战,TimescaleDB——一个基于 PostgreSQL 的时序数据库扩展——成为了首选。它继承了 PostgreSQL 的成熟生态和强大的 SQL 查询能力,其超表(Hypertable)和自动分块(Chunking)机制专门为处理时序数据设计,能够有效缓解高基数带来的写入和查询压力。
- 计算/处理层: 我们需要在事件存储和读模型之间有一个处理单元,它负责消费事件并将其转化为读模型中的数据。Pulsar Functions 提供了一个轻量级的、服务端的计算框架,使我们无需额外部署和管理一个流处理集群(如 Flink 或 Spark Streaming),就能完成这种转换。
整个架构的骨架由此确立:设备数据作为事件流入 Pulsar 主题,Pulsar 自动管理热、冷数据分层;一个 Pulsar Function 订阅该主题,将事件实时转化为结构化数据,并批量写入 TimescaleDB;分析师和应用程序则通过 SQL 查询 TimescaleDB 中的读模型。
graph TD subgraph Azure Infrastructure A[IoT Devices] --> B{Azure IoT Hub / MQTT Broker}; B --> C[Pulsar Cluster on AKS]; subgraph Pulsar Components C -- Ingest --> D[Topic: device-events]; D -- Hot Data --> E[BookKeeper]; D -- Cold Data (Offloaded) --> F[Azure Blob Storage]; end G[Pulsar Function: TimescaleDB Projector] -- Subscribes --> D; G -- Batched Writes --> H[Azure Database for PostgreSQL with TimescaleDB]; subgraph TimescaleDB Read Models H -- Contains --> I[Hypertable: device_metrics]; H -- Contains --> J[Continuous Aggregate: hourly_summary]; end end K[Analytics Dashboard] -- SQL Queries --> J; L[Alerting Service] -- SQL Queries --> I; M[State Reconstruction Service] -- Replays Events --> D; style F fill:#cde4ff,stroke:#0066cc,stroke-width:2px style H fill:#d5e8d4,stroke:#82b366,stroke-width:2px
下面我们将逐步构建这个系统的核心部分,重点关注生产环境中必须考虑的配置、代码健壮性和性能优化。
一、基础设施即代码:使用 Bicep 部署 Azure 资源
在生产环境中,手动配置基础设施是不可接受的。我们使用 Azure Bicep 来定义和部署所有必需的资源,确保环境的一致性和可重复性。
这个 Bicep 模板将部署:
- 一个 Azure Kubernetes Service (AKS) 集群,用于运行 Pulsar。
- 一个 Azure Database for PostgreSQL 灵活服务器,并启用 TimescaleDB 扩展。
- 一个 Azure Storage Account,用作 Pulsar 的冷数据分层存储。
main.bicep
:
@description('The location for all resources.')
param location string = resourceGroup().location
@description('A prefix for all resource names.')
param resourceNamePrefix string = 'pulsariot'
@description('The administrator login for the PostgreSQL server.')
param postgresAdminLogin string
@description('The administrator password for the PostgreSQL server.')
@secure()
param postgresAdminPassword string
// Storage Account for Pulsar Tiered Storage
resource storageAccount 'Microsoft.Storage/storageAccounts@2022-09-01' = {
name: '${resourceNamePrefix}storage'
location: location
sku: {
name: 'Standard_LRS' // Locally-redundant storage is cost-effective
}
kind: 'StorageV2'
properties: {
accessTier: 'Cool'
}
}
resource blobService 'Microsoft.Storage/storageAccounts/blobServices@2022-09-01' = {
parent: storageAccount
name: 'default'
properties: {
containerDeleteRetentionPolicy: {
enabled: true
days: 7
}
}
}
resource pulsarOffloadContainer 'Microsoft.Storage/storageAccounts/blobServices/containers@2022-09-01' = {
parent: blobService
name: 'pulsar-offload'
}
// Azure Database for PostgreSQL with TimescaleDB
resource postgresServer 'Microsoft.DBforPostgreSQL/flexibleServers@2022-12-01' = {
name: '${resourceNamePrefix}-pg-tsdb'
location: location
sku: {
name: 'Standard_D4ds_v4' // Choose a SKU appropriate for the workload
tier: 'GeneralPurpose'
}
properties: {
administratorLogin: postgresAdminLogin
administratorLoginPassword: postgresAdminPassword
version: '15'
storage: {
storageSizeGB: 128
}
backup: {
backupRetentionDays: 7
geoRedundantBackup: 'Disabled'
}
network: {
delegatedSubnetResourceId: null // For simplicity, using public access. Production should use VNet integration.
publicNetworkAccess: 'Enabled'
}
authConfig: {
passwordAuth: 'Enabled'
activeDirectoryAuth: 'Disabled'
}
}
}
resource timescaledbExtension 'Microsoft.DBforPostgreSQL/flexibleServers/configurations@2022-12-01' = {
parent: postgresServer
name: 'shared_preload_libraries'
properties: {
value: 'timescaledb'
source: 'user-override'
}
}
// AKS Cluster for Pulsar
// Note: A production-ready AKS setup would be more complex (e.g., node pools, networking).
resource aksCluster 'Microsoft.ContainerService/managedClusters@2023-05-01' = {
name: '${resourceNamePrefix}-aks'
location: location
identity: {
type: 'SystemAssigned'
}
properties: {
dnsPrefix: '${resourceNamePrefix}-aks-dns'
agentPoolProfiles: [
{
name: 'default'
count: 3
vmSize: 'Standard_D4s_v3'
osType: 'Linux'
mode: 'System'
}
]
}
}
output aksClusterName string = aksCluster.name
output postgresFqdn string = postgresServer.properties.fullyQualifiedDomainName
output storageAccountName string = storageAccount.name
output storageContainerName string = pulsarOffloadContainer.name
部署此 Bicep 文件后,我们便拥有了运行整个数据管道所需的基础云资源。
二、配置 Pulsar 分层存储与 TimescaleDB 模式
2.1 配置 Pulsar 连接到 Azure Blob Storage
Pulsar 通过 JCloud API 与各种对象存储交互。我们需要为 Pulsar Broker 配置 Azure Blob Storage 作为卸载驱动。这通常在 broker.conf
或通过 Helm Chart 的 values.yaml
来完成。
关键配置项:
# broker.conf
# Driver to use to offload ledgers
managedLedgerOffloadDriver=azureblob
# Offloader directory in the Azure Blob Storage container
managedLedgerOffloadBucket=pulsar-offload
# Azure Blob Storage credentials. In production, use managed identity or service principal.
# For simplicity, we show connection string here.
azureblobManagedLedgerOffloadTenantId=<YOUR_TENANT_ID>
azureblobManagedLedgerOffloadClientId=<YOUR_CLIENT_ID>
azureblobManagedLedgerOffloadClientSecret=<YOUR_CLIENT_SECRET>
azureblobManagedLedgerOffloadAccountName=<YOUR_STORAGE_ACCOUNT_NAME>
接下来,我们为特定的命名空间设置卸载策略。例如,让所有超过1GB或1小时的数据自动卸载。
# Set offload policy for the 'iot/devices' namespace
bin/pulsar-admin namespaces set-offload-policies \
--size 1G \
--time 1h \
iot/devices
2.2 设计 TimescaleDB 的高基数 Schema
一个常见的错误是在时序数据表中为每个设备ID创建单独的索引,这在高基数下会导致索引膨胀和性能下降。正确的做法是,将设备元数据(如ID、型号、位置)存储在一个单独的“维度表”中,而在“事实表”(即超表)中只使用一个整数外键。
SQL schema 定义:
-- 1. Enable the TimescaleDB extension
CREATE EXTENSION IF NOT EXISTS timescaledb;
-- 2. Dimension table for device metadata
-- This table stores static or slowly changing information about each device.
CREATE TABLE devices (
id SERIAL PRIMARY KEY,
device_uid TEXT NOT NULL UNIQUE, -- The unique identifier from the device itself
location TEXT,
model TEXT,
first_seen TIMESTAMPTZ NOT NULL
);
CREATE INDEX ON devices(device_uid);
-- 3. Fact table for time-series metrics (Hypertable)
-- This table stores the actual measurements. Note the use of `device_id` as an integer foreign key.
CREATE TABLE metrics (
ts TIMESTAMPTZ NOT NULL,
device_id INTEGER NOT NULL,
temperature DOUBLE PRECISION,
humidity DOUBLE PRECISION,
pressure DOUBLE PRECISION,
-- Add more metrics as needed...
FOREIGN KEY (device_id) REFERENCES devices (id)
);
-- Crucial for performance: Create a composite index on (device_id, ts DESC).
-- This is the most common query pattern: "give me the latest data for this device".
CREATE INDEX ON metrics (device_id, ts DESC);
-- 4. Convert the metrics table into a TimescaleDB Hypertable
-- It will be partitioned by time (`ts` column).
-- `chunk_time_interval` should be adjusted based on data volume. 1 day is a good start.
SELECT create_hypertable('metrics', 'ts', chunk_time_interval => INTERVAL '1 day');
-- 5. Optional but highly recommended: Enable compression for older data
-- This can significantly reduce storage costs.
ALTER TABLE metrics SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'device_id'
);
-- Add a policy to automatically compress chunks older than 7 days.
SELECT add_compression_policy('metrics', INTERVAL '7 days');
-- 6. For dashboarding, create continuous aggregates
-- This pre-computes hourly averages, making dashboard queries instantaneous.
CREATE MATERIALIZED VIEW hourly_device_summary
WITH (timescaledb.continuous) AS
SELECT
device_id,
time_bucket('1 hour', ts) AS bucket,
AVG(temperature) as avg_temp,
MAX(temperature) as max_temp,
MIN(temperature) as min_temp,
AVG(humidity) as avg_humidity
FROM metrics
GROUP BY device_id, bucket;
-- Add a policy to refresh the continuous aggregate automatically.
SELECT add_continuous_aggregate_policy('hourly_device_summary',
start_offset => INTERVAL '3 hours',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour');
这个 Schema 设计通过将维度数据分离,并利用 TimescaleDB 的超表、压缩和连续聚合特性,为高基数 IoT 场景提供了坚实的性能基础。
三、核心逻辑:实现 Pulsar Function 投影器
这是连接事件存储和读模型的关键组件。我们将使用 Java 编写一个 Pulsar Function,它会消费原始设备事件,将其解析,并高效地批量写入 TimescaleDB。
假设设备发送的事件是 JSON 格式:{"deviceId": "SN-A1B2C3D4", "timestamp": 1672531200000, "payload": {"temp": 25.5, "hum": 60.1}}
TimescaleDBProjector.java
:
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.json.JSONObject;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
public class TimescaleDBProjector implements Function<String, Void> {
private transient HikariDataSource dataSource;
private transient Logger log;
// A local cache to avoid hitting the DB for every message for the same device.
// In a real multi-instance function, a distributed cache like Redis would be better.
private Map<String, Integer> deviceIdCache;
private static final String GET_DEVICE_ID_SQL = "SELECT id FROM devices WHERE device_uid = ?";
private static final String INSERT_DEVICE_SQL = "INSERT INTO devices (device_uid, first_seen) VALUES (?, ?) RETURNING id";
private static final String INSERT_METRIC_SQL = "INSERT INTO metrics (ts, device_id, temperature, humidity) VALUES (?, ?, ?, ?)";
@Override
public void initialize(Context context) {
log = context.getLogger();
deviceIdCache = new ConcurrentHashMap<>();
HikariConfig config = new HikariConfig();
// Use user-provided configs for credentials, fetched securely from context.
// Never hardcode credentials.
String jdbcUrl = (String) context.getUserConfigValue("jdbcUrl").orElseThrow(() -> new IllegalArgumentException("JDBC URL not configured"));
String username = (String) context.getUserConfigValue("dbUser").orElseThrow(() -> new IllegalArgumentException("DB user not configured"));
String password = (String) context.getUserConfigValue("dbPassword").orElseThrow(() -> new IllegalArgumentException("DB password not configured"));
config.setJdbcUrl(jdbcUrl);
config.setUsername(username);
config.setPassword(password);
config.setMaximumPoolSize(10); // Adjust pool size based on function parallelism
config.setMinimumIdle(2);
config.setConnectionTimeout(30000); // 30 seconds
config.addDataSourceProperty("reWriteBatchedInserts", "true"); // Critical for batch insert performance with PostgreSQL JDBC driver
try {
this.dataSource = new HikariDataSource(config);
log.info("HikariCP connection pool initialized successfully.");
} catch (Exception e) {
log.error("Failed to initialize HikariCP connection pool", e);
throw new RuntimeException("Database connection pool failed to initialize", e);
}
}
@Override
public Void process(String input, Context context) {
try {
JSONObject event = new JSONObject(input);
String deviceUid = event.getString("deviceId");
long timestamp = event.getLong("timestamp");
JSONObject payload = event.getJSONObject("payload");
double temperature = payload.optDouble("temp", Double.NaN);
double humidity = payload.optDouble("hum", Double.NaN);
// Get internal integer ID for the device, creating it if it doesn't exist.
int internalDeviceId = getOrCreateDeviceId(deviceUid, new Timestamp(timestamp));
// Insert the metric into the hypertable
try (Connection conn = dataSource.getConnection();
PreparedStatement stmt = conn.prepareStatement(INSERT_METRIC_SQL)) {
stmt.setTimestamp(1, new Timestamp(timestamp));
stmt.setInt(2, internalDeviceId);
stmt.setDouble(3, temperature);
stmt.setDouble(4, humidity);
stmt.executeUpdate();
} catch (SQLException e) {
log.error("Failed to insert metric for device {}: {}", deviceUid, e.getMessage());
// In production, push to a dead-letter queue instead of just logging.
// context.newOutputMessage(context.getOutputTopic(), ...)
throw new RuntimeException("Metric insertion failed", e);
}
} catch (Exception e) {
log.error("Failed to process input message: {}. Raw input: '{}'", e.getMessage(), input);
// Handle parsing errors, etc.
}
return null;
}
private int getOrCreateDeviceId(String deviceUid, Timestamp firstSeen) throws SQLException {
// 1. Check local cache first
Integer cachedId = deviceIdCache.get(deviceUid);
if (cachedId != null) {
return cachedId;
}
// 2. If not in cache, query the database
try (Connection conn = dataSource.getConnection()) {
// Use try-with-resources for all JDBC resources
try (PreparedStatement selectStmt = conn.prepareStatement(GET_DEVICE_ID_SQL)) {
selectStmt.setString(1, deviceUid);
ResultSet rs = selectStmt.executeQuery();
if (rs.next()) {
int id = rs.getInt(1);
deviceIdCache.put(deviceUid, id); // Cache the result
return id;
}
}
// 3. If not in DB, it's a new device. Insert it.
// This part has a potential race condition if two function instances process
// the first message from the same new device simultaneously.
// A unique constraint on device_uid handles this at the DB level.
log.info("New device detected, creating entry for: {}", deviceUid);
try (PreparedStatement insertStmt = conn.prepareStatement(INSERT_DEVICE_SQL)) {
insertStmt.setString(1, deviceUid);
insertStmt.setTimestamp(2, firstSeen);
// Try to insert and get the new ID
try {
ResultSet rs = insertStmt.executeQuery();
if (rs.next()) {
int newId = rs.getInt(1);
deviceIdCache.put(deviceUid, newId);
return newId;
}
} catch (SQLException ex) {
// If insert fails due to unique constraint violation, it means another instance just created it.
// We can now safely re-query for the ID.
if ("23505".equals(ex.getSQLState())) { // 23505 is the SQLSTATE for unique_violation
log.warn("Race condition on new device insert for {}, re-querying.", deviceUid);
return getOrCreateDeviceId(deviceUid, firstSeen); // Recursive call to re-query
}
throw ex; // Re-throw other SQL errors
}
}
}
// This should not be reached
throw new IllegalStateException("Could not get or create a device ID for " + deviceUid);
}
@Override
public void close() {
if (dataSource != null && !dataSource.isClosed()) {
dataSource.close();
log.info("HikariCP connection pool closed.");
}
}
}
这段代码的核心考量:
- 连接池: 使用 HikariCP 管理数据库连接,这是 Java 生态中的高性能标准。配置是函数稳定性的关键。
- 凭证管理: 通过
context.getUserConfigValue()
从 Pulsar 安全地获取数据库凭证,避免硬编码。 - 设备ID缓存: 维护一个本地
deviceIdCache
,以减少对devices
维度表的重复查询。在高频数据流中,同一设备的消息会连续到达,这个缓存命中率会很高。 - 处理新设备:
getOrCreateDeviceId
方法实现了“查询或创建”的逻辑。这里的坑在于,当多个 Pulsar Function 实例并行运行时,可能会出现两个实例同时尝试为一个新设备创建记录的竞态条件。我们通过捕获 PostgreSQL 的unique_violation
(SQLSTATE23505
) 错误来优雅地处理这个问题,如果插入失败,就意味着另一方已经成功,此时只需重新查询即可。 - 错误处理: 对 SQL 异常和 JSON 解析异常进行了捕获。在生产环境中,应将处理失败的消息发送到死信主题(Dead-Letter Topic)以便后续分析和重试,而不是简单地丢弃。
四、方案的局限性与未来迭代路径
这个架构虽然稳健且可扩展,但并非没有权衡和局限。
首先,这是一个最终一致性的系统。从事件写入 Pulsar 到它在 TimescaleDB 中可查询,中间存在由 Pulsar Function 处理逻辑引入的延迟,通常在毫秒到秒级。对于需要强一致性读写的场景,此方案不适用。
其次,事件模式演进(Schema Evolution)是一个需要严肃对待的问题。如果设备上报的事件格式发生变化,我们需要确保 Pulsar Function 能够兼容处理新旧两种格式的事件,尤其是在需要重放历史事件进行状态重建或数据回填时。使用 Avro 或 Protobuf 并配合 Pulsar 的 Schema Registry 是管理这个问题的标准实践。
未来的优化路径是清晰的。我们可以轻松地添加第二个 Pulsar Function,订阅同一个 device-events
主题,将数据投影到另一个完全不同的系统中,例如一个用于机器学习特征提取的 Azure Synapse Analytics,或者一个用于全文搜索的 Elasticsearch 集群。事件溯源模式的美妙之处在于,源头(Pulsar 中的事件日志)保持不变,而我们可以根据业务发展需要,灵活地增加、删除或重建各种读模型,而无需对核心数据采集逻辑进行任何侵入式修改。这种架构的解耦和演进能力,是其在复杂系统中最大的价值所在。