一个棘手的需求摆在面前:为高吞吐量的用户行为日志(每秒数万次页面浏览)构建一个近实时的监控仪表盘。传统方案,如使用ELK或直连时序数据库的前端轮询,因其高昂的实时查询成本和复杂的后端维护而被否决。我们的目标是极致的性能、低廉的成本和最小的运维负担。这意味着最终的用户界面必须是静态的,可以通过CDN在全球分发。但这又引出了核心矛盾:一个静态站点(SSG)如何展示一个动态变化的流式数据源?
我们的破局思路是构建一个“流式驱动的静态站点生成管道”。每当流处理系统计算出一个新的聚合结果时,它不是将数据写入数据库,而是直接触发一次静态站点的重新构建和部署。这个想法听起来有些疯狂,但它能完美地结合流计算的实时性和静态站点的性能优势。
整个系统的技术选型围绕这个核心展开:
- 流处理引擎:
Apache Flink
。其强大的状态管理和精确一次(Exactly-Once)语义是保证数据准确性的基石。 - 应用架构:
Clean Architecture
。流处理任务的逻辑会变得复杂,将其业务逻辑与Flink框架本身解耦,对可测试性和长期可维护性至关重要。 - 云基础设施:
AWS
。利用其托管服务(Kinesis, S3, Lambda)来构建一个事件驱动的、无服务器的管道,将运维成本降至最低。 - 前端呈现:
SSG (Static Site Generation)
。我们选择Hugo,因为它惊人的构建速度,但任何SSG框架都适用。
最终的架构图如下:
graph TD subgraph AWS Cloud A[Client Web App] -- PageView Event --> B(Amazon Kinesis Data Streams) B -- events --> C{Amazon Kinesis Data Analytics for Flink} C -- Checkpoint & State --> S3_State(S3 Bucket for Flink State) C -- Aggregated Data --> D(S3 Bucket for Data) D -- S3:ObjectCreated Event --> E(AWS Lambda Function) E -- reads data --> D E -- generates site --> F(S3 Bucket for Website) F -- serves content --> G(Amazon CloudFront) end H[End User] -- views dashboard --> G style C fill:#f9f,stroke:#333,stroke-width:2px style E fill:#f9f,stroke:#333,stroke-width:2px
用整洁架构(Clean Architecture)组织Flink作业
在真实项目中,直接在Flink的MapFunction
或ProcessFunction
中堆砌业务逻辑是一场灾难。它使得代码难以单元测试,并且与Flink的API深度耦合。我们采用Clean Architecture将代码分为四个层次,即使在数据流处理的场景下也同样适用。
1. Domain Layer (领域层)
这是最核心的层,包含纯粹的业务实体和规则,没有任何外部依赖。
// src/main/java/com/example/domain/entity/PageViewEvent.java
package com.example.domain.entity;
import java.time.Instant;
/**
* 代表一次页面浏览事件的领域实体.
* 这是最纯粹的数据结构, POJO.
*/
public class PageViewEvent {
private String userId;
private String pageUrl;
private Instant timestamp;
private String region;
// Constructors, Getters, Setters...
}
// src/main/java/com/example/domain/entity/SiteMetrics.java
package com.example.domain.entity;
import java.util.HashMap;
import java.util.Map;
/**
* 站点指标的聚合结果.
* 同样是纯粹的领域对象.
*/
public class SiteMetrics {
// Key: 页面URL, Value: 浏览次数
private Map<String, Long> pageViews = new HashMap<>();
// Key: 地区, Value: 用户数
private Map<String, Long> uniqueUsersByRegion = new HashMap<>();
private long totalViews;
// Getters, and methods to update metrics
public void addPageView(String url) {
pageViews.merge(url, 1L, Long::sum);
totalViews++;
}
public void addRegionUser(String region) {
uniqueUsersByRegion.merge(region, 1L, Long::sum);
}
// ... other business logic
}
2. Use Case Layer (用例层)
这一层定义了应用程序的具体操作,它编排领域实体来完成业务目标。接口是关键,它定义了外部层必须实现的数据访问方式。
// src/main/java/com/example/usecase/UpdateSiteMetricsUseCase.java
package com.example.usecase;
import com.example.domain.entity.PageViewEvent;
import com.example.domain.entity.SiteMetrics;
/**
* 用例: 更新站点指标.
* 定义了业务流程, 但不关心如何实现.
*/
public class UpdateSiteMetricsUseCase {
/**
* 定义一个接口, 用例依赖于此接口来获取和保存指标状态.
* Flink层将实现这个接口, 使用Flink的ValueState来持久化.
*/
public interface SiteMetricsStateGateway {
SiteMetrics get();
void save(SiteMetrics metrics);
}
private final SiteMetricsStateGateway stateGateway;
public UpdateSiteMetricsUseCase(SiteMetricsStateGateway stateGateway) {
this.stateGateway = stateGateway;
}
/**
* 执行用例: 处理单个页面浏览事件.
* @param event A single PageViewEvent.
*/
public void processPageView(PageViewEvent event) {
SiteMetrics currentMetrics = stateGateway.get();
if (currentMetrics == null) {
currentMetrics = new SiteMetrics();
}
currentMetrics.addPageView(event.getPageUrl());
// 这里的业务逻辑可以更复杂, 比如需要判断用户是否为当日首次访问来更新 uniqueUsersByRegion
// ...
stateGateway.save(currentMetrics);
}
/**
* 获取当前的指标状态.
* @return The current SiteMetrics.
*/
public SiteMetrics getCurrentMetrics() {
return stateGateway.get();
}
}
这个设计的核心在于 SiteMetricsStateGateway
接口。用例层不关心状态是存在内存、数据库还是Flink的状态后端里,它只管调用接口。这使得我们可以轻松地对UpdateSiteMetricsUseCase
进行单元测试,只需模拟一个Map
来实现SiteMetricsStateGateway
即可。
3. Interface Adapters Layer (接口适配器层)
这层是连接用例层和外部框架(如Flink)的桥梁。
// src/main/java/com/example/adapter/flink/FlinkSiteMetricsStateAdapter.java
package com.example.adapter.flink;
import com.example.domain.entity.SiteMetrics;
import com.example.usecase.UpdateSiteMetricsUseCase;
import org.apache.flink.api.common.state.ValueState;
/**
* Flink状态网关的实现.
* 它使用Flink的ValueState来持久化SiteMetrics.
* 这个适配器将Flink的底层状态API适配到我们的用例层定义的接口.
*/
public class FlinkSiteMetricsStateAdapter implements UpdateSiteMetricsUseCase.SiteMetricsStateGateway {
private final ValueState<SiteMetrics> state;
public FlinkSiteMetricsStateAdapter(ValueState<SiteMetrics> state) {
this.state = state;
}
@Override
public SiteMetrics get() {
try {
return state.value();
} catch (Exception e) {
// 在真实项目中, 异常处理需要更健壮
throw new RuntimeException("Failed to get value from Flink state", e);
}
}
@Override
public void save(SiteMetrics metrics) {
try {
state.update(metrics);
} catch (Exception e) {
throw new RuntimeException("Failed to update Flink state", e);
}
}
}
现在,我们可以创建一个Flink的ProcessFunction
,它使用UpdateSiteMetricsUseCase
来处理业务逻辑,而不是自己实现。
// src/main/java/com/example/adapter/flink/MetricsProcessFunction.java
package com.example.adapter.flink;
import com.example.domain.entity.PageViewEvent;
import com.example.domain.entity.SiteMetrics;
import com.example.usecase.UpdateSiteMetricsUseCase;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
public class MetricsProcessFunction extends KeyedProcessFunction<String, PageViewEvent, SiteMetrics> {
// 短暂状态, Flink会为每个key管理一个实例
private transient ValueState<SiteMetrics> metricsState;
private transient UpdateSiteMetricsUseCase useCase;
@Override
public void open(Configuration parameters) throws Exception {
// 状态描述符, 定义状态的名称和类型
ValueStateDescriptor<SiteMetrics> descriptor = new ValueStateDescriptor<>(
"siteMetrics",
TypeInformation.of(SiteMetrics.class)
);
metricsState = getRuntimeContext().getState(descriptor);
// 初始化用例, 注入Flink状态适配器
UpdateSiteMetricsUseCase.SiteMetricsStateGateway gateway = new FlinkSiteMetricsStateAdapter(metricsState);
this.useCase = new UpdateSiteMetricsUseCase(gateway);
}
@Override
public void processElement(PageViewEvent value, Context ctx, Collector<SiteMetrics> out) throws Exception {
// 调用用例来处理事件, 完全屏蔽了业务逻辑细节
useCase.processPageView(value);
// 这里我们选择在每个事件后都输出当前状态
// 在生产环境中, 可能会使用计时器来定期输出, 以减少输出量
SiteMetrics currentMetrics = useCase.getCurrentMetrics();
if (currentMetrics != null) {
out.collect(currentMetrics);
}
}
}
4. Frameworks and Drivers Layer (框架与驱动层)
这是最外层,负责配置和启动Flink作业,包括数据源(Source)、数据槽(Sink)和执行环境。
实现一个生产级的自定义Sink
Flink处理完的数据需要一个目的地。我们的目的地是S3,并且写入操作需要触发Lambda。因此,一个自定义的SinkFunction
是必需的。这个Sink需要做到几点:
- 原子性写入: 使用“写入临时文件再重命名”的模式,避免下游读到不完整的数据。
- 批处理: 避免为每条记录都创建一个S3对象,我们将数据缓冲到一定大小或超时后再写入。
- 容错: 必须与Flink的Checkpoint机制集成,保证精确一次语义。
我们将使用 Flink 1.15+ 的新版 Sink API,它比老的 SinkFunction
接口更强大且易于实现。
// src/main/java/com/example/adapter/aws/S3JsonSink.java
package com.example.adapter.aws;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.formats.json.JsonSerializationSchema;
import org.apache.flink.connector.file.sink.FileSink;
import java.io.IOException;
import java.io.Serializable;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
/**
* 一个配置好的FileSink, 用于将SiteMetrics对象以JSON格式写入S3.
* FileSink是Flink提供的强大的、支持精确一次语义的文件Sink.
*/
public class S3JsonSink {
/**
* 创建一个写入S3的FileSink.
* @param s3BucketPath S3桶的路径, 例如 "s3://my-bucket/data/"
* @return 配置好的FileSink实例
*/
public static FileSink<SiteMetrics> create(String s3BucketPath) {
return FileSink
// 1. 指定输出格式为行式输出
.forRowFormat(new Path(s3BucketPath), new JsonSerializationSchema<SiteMetrics>())
// 2. 配置分桶策略, 我们按小时创建文件夹
.withBucketAssigner(new DateTimeBucketAssigner())
// 3. 配置滚动策略: 每次Checkpoint时, 将正在写入的文件关闭并变为可读状态.
// 这是实现精确一次的关键, Flink会保证Checkpoint成功后, 这些文件才会被“提交”.
.withRollingPolicy(OnCheckpointRollingPolicy.build())
// 4. 指定文件名前缀和后缀
.withOutputFileConfig(
OutputFileConfig.builder()
.withPartPrefix("metrics")
.withPartSuffix(".json")
.build()
)
.build();
}
/**
* 自定义序列化逻辑,以支持Java 8 Time API.
*/
private static class JsonSerializationSchema<T> implements org.apache.flink.api.common.serialization.SerializationSchema<T> {
private static final ObjectMapper MAPPER = new ObjectMapper().registerModule(new JavaTimeModule());
@Override
public byte[] serialize(T element) {
try {
// 添加换行符,以便每个JSON对象占一行
return (MAPPER.writeValueAsString(element) + "\n").getBytes();
} catch (IOException e) {
throw new RuntimeException("Failed to serialize object to JSON", e);
}
}
}
/**
* 按时间分桶的分配器.
* 例如, "s3://my-bucket/data/2023-10-27--11/..."
*/
public static class DateTimeBucketAssigner implements BucketAssigner<SiteMetrics, String>, Serializable {
private static final long serialVersionUID = 1L;
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd--HH").withZone(ZoneId.of("UTC"));
@Override
public String getBucketId(SiteMetrics element, Context context) {
return FORMATTER.format(Instant.ofEpochMilli(context.currentProcessingTime()));
}
}
}
FileSink
是一个高度优化的Sink,它内部已经处理了与Checkpoint集成的所有复杂逻辑,包括两阶段提交,确保了端到端的精确一次。
组装并部署到AWS
现在我们将所有部分串联起来。
Flink 作业入口点:
// src/main/java/com/example/StreamingJob.java
package com.example;
import com.example.adapter.aws.S3JsonSink;
import com.example.adapter.flink.MetricsProcessFunction;
import com.example.domain.entity.PageViewEvent;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.kinesis.source.KinesisSource;
import org.apache.flink.connector.kinesis.source.enumerator.InitialPosition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.formats.json.JsonDeserializationSchema;
import software.amazon.awssdk.regions.Region;
public class StreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从环境变量或参数中获取配置, 这是生产实践
String streamName = System.getenv("KINESIS_STREAM_NAME");
String region = System.getenv("AWS_REGION");
String s3SinkPath = System.getenv("S3_SINK_PATH");
// 1. 配置Kinesis Source
KinesisSource<PageViewEvent> kinesisSource = KinesisSource.<PageViewEvent>builder()
.setStreamArn("arn:aws:kinesis:" + region + ":<ACCOUNT_ID>:stream/" + streamName)
.setStartingPosition(InitialPosition.LATEST)
.setDeserializationSchema(new JsonDeserializationSchema<>(PageViewEvent.class))
.setRegion(Region.of(region))
.build();
// 2. 创建数据流
DataStream<PageViewEvent> stream = env.fromSource(kinesisSource, WatermarkStrategy.noWatermarks(), "Kinesis Source");
// 3. 核心处理逻辑
DataStream<SiteMetrics> metricsStream = stream
// 假设我们只有一个全局指标, 所以用一个固定的key
.keyBy(event -> "global_metrics")
.process(new MetricsProcessFunction());
// 4. 配置S3 Sink
metricsStream.sinkTo(S3JsonSink.create(s3SinkPath));
env.execute("Real-time Site Metrics SSG Pipeline");
}
}
AWS Lambda 函数 (Node.js)
当Flink作业将聚合后的 metrics-....json
文件写入S3时,会触发一个S3事件。这个事件会调用我们的Lambda函数。
// lambda/ssg-builder/index.js
const { execSync } = require('child_process');
const { S3Client, GetObjectCommand, PutObjectCommand, ListObjectsV2Command, DeleteObjectsCommand } = require("@aws-sdk/client-s3");
const fs = require('fs');
const path = require('path');
const os = require('os');
const s3Client = new S3Client({ region: process.env.AWS_REGION });
const SITE_BUCKET = process.env.SITE_BUCKET_NAME;
exports.handler = async (event) => {
// 1. 从S3事件中获取数据源文件信息
const sourceBucket = event.Records[0].s3.bucket.name;
const sourceKey = decodeURIComponent(event.Records[0].s3.object.key.replace(/\+/g, ' '));
const tmpdir = os.tmpdir();
console.log(`Processing data file: s3://${sourceBucket}/${sourceKey}`);
try {
// 2. 下载聚合数据文件
const getObjectParams = { Bucket: sourceBucket, Key: sourceKey };
const dataFileContent = await s3Client.send(new GetObjectCommand(getObjectParams));
const dataBody = await streamToString(dataFileContent.Body);
// Flink的FileSink可能写入多个JSON对象, 每行一个
const metrics = dataBody.trim().split('\n').map(line => JSON.parse(line)).pop(); // 取最新的一个
const dataPath = path.join(tmpdir, 'data');
if (!fs.existsSync(dataPath)) fs.mkdirSync(dataPath);
fs.writeFileSync(path.join(dataPath, 'metrics.json'), JSON.stringify(metrics, null, 2));
// 3. 运行SSG (Hugo) 构建
// Lambda环境中需要包含hugo可执行文件或作为Lambda Layer
const hugoPath = path.join(__dirname, 'hugo'); // 假设hugo可执行文件与index.js在同一目录
const siteSourcePath = path.join(__dirname, 'site'); // 假设hugo站点源码也在部署包内
console.log('Running Hugo build...');
// Hugo默认会在 `public` 目录下生成站点
const outputPath = path.join(tmpdir, 'public');
execSync(`${hugoPath} --source ${siteSourcePath} --destination ${outputPath} --contentDir ${tmpdir} --dataDir ${tmpdír}`, { stdio: 'inherit' });
console.log('Hugo build complete.');
// 4. 清理并上传新站点到S3
await emptyS3Directory(SITE_BUCKET, '');
await uploadDirectory(outputPath, SITE_BUCKET, '');
console.log('Site successfully deployed to S3.');
return { statusCode: 200, body: 'Deployment successful.' };
} catch (error) {
console.error('Error during SSG build and deployment:', error);
throw error; // 让Lambda调用失败重试
}
};
// ... (helper functions streamToString, uploadDirectory, emptyS3Directory)
// 这些辅助函数负责处理流、递归上传目录和清空S3目录
局限性与未来展望
这个架构虽然优雅,但并非没有权衡。首先,”近实时”的延迟取决于Flink的Checkpoint间隔和SSG的构建时长。如果Checkpoint间隔设置为1分钟,Hugo构建需要5秒,那么数据的端到端延迟大约在1分钟左右,这对于许多监控场景已经足够,但不是真正的毫秒级实时。
其次,如果数据更新非常频繁(例如每秒一次),持续触发Lambda和SSG构建可能会导致巨大的成本和构建任务的堆积。解决方案可以是增加Flink作业的聚合窗口,或者在Lambda前增加一个SQS队列和Step Function来对构建任务进行节流(Throttling)和去重(Debouncing)。
最后,这个模式非常适合展示聚合性、概览性的数据。如果需要下钻查询或探索原始事件,它就无能为力了。一个更完整的系统可能会将原始事件流的另一分支写入一个可查询的存储(如OpenSearch),将这个SSG仪表盘作为系统状态的高性能“快照”视图。