构建基于Phoenix、InfluxDB与Ant Design的统一实时指标网关


我们面临的第一个问题是指标孤岛。数十个微服务各自通过不同的方式暴露Prometheus端点、写入日志或直接推送数据到消息队列。运维团队需要维护一个庞杂的监控栈,而开发团队想要排查一个跨服务的请求链路问题,则需要在多个系统之间来回跳转。我们需要一个统一的入口,一个高性能的指标网关,它能以极低的延迟接收所有服务的实时指标,进行持久化,并实时推送给前端监控大盘。

初步的构想是建立一个独立的Phoenix服务作为这个指标网关。为什么选择Phoenix而不是更主流的Spring Boot或Node.js?答案在于BEAM虚拟机。它为构建高并发、高容错的系统提供了无与伦比的基础。指标网关的核心挑战是同时处理成千上万个来自内部服务的HTTP短连接请求,以及维护数千个到前端监控面板的WebSocket长连接。BEAM基于Actor模型的轻量级进程,使得每个连接都可以由一个独立的进程来处理,系统资源开销极小,且进程间的隔离保证了单个连接的崩溃不会影响整个系统。这是一个天然的优势。

数据存储的选择相对直接:InfluxDB。作为时序数据库的领导者,其数据模型、高写入吞吐量和强大的查询语言(Flux或InfluxQL)都为指标存储量身定做。前端则采用Ant Design Pro,它提供了丰富的图表和布局组件,能够快速搭建复杂的数据可视化界面。

整个架构的核心,是这个Phoenix网关如何优雅地衔接两端:作为HTTP服务器接收数据,并作为WebSocket服务器推送数据。一个常见的错误是,在处理HTTP请求的Controller中直接向InfluxDB写入数据,并同步广播WebSocket消息。在生产环境中,这会带来两个致命问题:

  1. 写延迟放大: InfluxDB的写入操作有网络开销和磁盘I/O。如果每个HTTP请求都同步等待写入完成,网关的吞吐量将严重受限于InfluxDB的性能,并对上游服务造成反压。
  2. 广播风暴: 如果指标的写入频率非常高(例如每秒数千次),同步广播将导致WebSocket信道被大量小数据包淹没,对服务器和客户端都会造成巨大压力。

我们的解决方案是引入一个缓冲层,利用Elixir的GenServer实现一个异步批处理工作流。

graph TD
    subgraph 内部微服务集群
        ServiceA -->|HTTP POST /metrics| Gateway
        ServiceB -->|HTTP POST /metrics| Gateway
        ServiceC -->|HTTP POST /metrics| Gateway
    end

    subgraph Phoenix 指标网关
        Gateway(Phoenix Endpoint) -->|投递指标| MetricsBuffer[GenServer 异步缓冲]
        MetricsBuffer -- 定时/定量触发 --> BatchWriter[任务进程]
        BatchWriter -- 批量写入 --> InfluxDB[(InfluxDB)]
        BatchWriter -- 批量广播 --> PhoenixChannel[Phoenix Channel]
    end

    subgraph 浏览器客户端
        PhoenixChannel -- WebSocket推送 --> Frontend[React App]
        Frontend -- 状态更新 --> ZustandStore{Zustand 状态管理}
        ZustandStore -- 驱动渲染 --> AntDesignCharts[Ant Design 图表]
    end

第一步:构建异步批处理核心 MetricsBuffer

这个GenServer是整个系统的心脏。它接收来自Controller的指标数据,将其暂存在内存中,并根据预设的批次大小或时间间隔,将累积的数据一次性刷入InfluxDB并进行广播。

lib/metrics_gateway/metrics_buffer.ex:

defmodule MetricsGateway.MetricsBuffer do
  use GenServer
  require Logger

  @flush_interval :timer.seconds(5) # 每5秒强制刷一次
  @batch_size 1000 # 或者累积1000条记录就刷

  alias MetricsGateway.Influx.Writer

  # ==================================================================
  # Public API
  # ==================================================================

  def start_link(opts) do
    GenServer.start_link(__MODULE__, :ok, name: Keyword.get(opts, :name, __MODULE__))
  end

  @doc """
  异步接收单个指标点。
  这是提供给外部(如Controller)的接口。
  """
  def record(point, server \\ __MODULE__) do
    GenServer.cast(server, {:record, point})
  end

  # ==================================================================
  # GenServer Callbacks
  # ==================================================================

  @impl true
  def init(:ok) do
    Logger.info("MetricsBuffer started. Flushing every #{@flush_interval}ms or on batch size #{@batch_size}.")
    # 启动一个定时器,用于周期性地强制 flush
    Process.send_after(self(), :flush, @flush_interval)
    {:ok, %{buffer: [], count: 0}}
  end

  @impl true
  def handle_cast({:record, point}, state) do
    new_state = %{state | buffer: [point | state.buffer], count: state.count + 1}
    
    # 检查是否达到批处理大小
    if new_state.count >= @batch_size do
      # 异步执行 flush 操作,避免阻塞当前 GenServer
      Task.start(fn -> flush_data(new_state.buffer) end)
      {:noreply, %{buffer: [], count: 0}}
    else
      {:noreply, new_state}
    end
  end

  @impl true
  def handle_info(:flush, state) do
    # 定时器触发,即使缓冲区未满也要 flush
    if state.count > 0 do
      # 同样异步执行
      Task.start(fn -> flush_data(state.buffer) end)
    end
    
    # 重置定时器,形成循环
    Process.send_after(self(), :flush, @flush_interval)
    {:noreply, %{buffer: [], count: 0}}
  end

  # ==================================================================
  # Private Helpers
  # ==================================================================
  
  defp flush_data(buffer) do
    # buffer 是后进先出,所以需要反转以保持顺序
    points_to_write = Enum.reverse(buffer)

    # 1. 批量写入 InfluxDB
    case Writer.write_points(points_to_write) do
      :ok ->
        Logger.debug("Successfully flushed #{length(points_to_write)} points to InfluxDB.")
      {:error, reason} ->
        # 在真实项目中,这里应该有重试机制和死信队列
        Logger.error("Failed to flush points to InfluxDB. Reason: #{inspect(reason)}")
    end

    # 2. 批量广播到 Phoenix Channel
    # 我们将原始数据点分组,以便前端可以高效处理
    grouped_points =
      points_to_write
      |> Enum.group_by(& &1.measurement)
      |> Enum.map(fn {measurement, points} ->
        %{
          measurement: measurement,
          # 仅提取前端需要的数据,减小负载
          values: Enum.map(points, &%{tags: &1.tags, fields: &1.fields, timestamp: &1.timestamp})
        }
      end)
      
    if length(grouped_points) > 0 do
        MetricsGatewayWeb.Endpoint.broadcast!(
          "metrics:lobby", 
          "new_points", 
          %{data: grouped_points}
        )
    end
  end
end

这个GenServer的设计体现了几个关键的生产实践:

  • 解耦: Controller只负责接收和验证数据,然后立即castMetricsBuffer并返回响应,实现了请求处理与数据持久化的解耦。
  • 批处理: 显著降低了对InfluxDB的写入压力和网络开销。
  • 双重触发: 结合了数量和时间两种批处理触发机制,确保数据既能及时刷新,又能在高负载时高效处理。
  • 异步Flush: Task.start确保了实际的IO操作(写入DB和广播)不会阻塞MetricsBuffer进程,使其能持续不断地接收新指标。

第二步:实现InfluxDB写入模块

这个模块负责与InfluxDB通信。在生产环境中,配置必须外部化。

config/config.exs:

import Config

config :metrics_gateway, MetricsGateway.Influx.Writer,
  url: System.get_env("INFLUXDB_URL", "http://localhost:8086"),
  token: System.get_env("INFLUXDB_TOKEN"),
  org: System.get_env("INFLUXDB_ORG"),
  bucket: System.get_env("INFLUXDB_BUCKET")

lib/metrics_gateway/influx/writer.ex:

defmodule MetricsGateway.Influx.Writer do
  use Tesla
  require Logger

  # 从应用配置中读取InfluxDB连接信息
  @url Application.get_env(:metrics_gateway, __MODULE__)[:url]
  @token Application.get_env(:metrics_gateway, __MODULE__)[:token]
  @org Application.get_env(:metrics_gateway, __MODULE__)[:org]
  @bucket Application.get_env(:metrics_gateway, __MODULE__)[:bucket]

  # 定义Tesla客户端中间件
  plug Tesla.Middleware.BaseUrl, @url
  plug Tesla.Middleware.Headers, %{
    "Authorization" => "Token #{@token}",
    "Content-Type" => "text/plain; charset=utf-f"
  }
  plug Tesla.Middleware.JSON # 用于解析错误响应

  # InfluxDB v2 的写入API端点
  @write_path "/api/v2/write"

  # 定义一个数据点结构体,方便处理
  defstruct measurement: nil, tags: %{}, fields: %{}, timestamp: nil

  @doc """
  批量写入数据点。
  points 是一个 [%MetricsGateway.Influx.Writer{}] 列表。
  """
  def write_points(points) when is_list(points) and points != [] do
    body = points |> Enum.map(&to_line_protocol/1) |> Enum.join("\n")
    
    # 这里的关键是超时和错误处理
    opts = [
      params: [org: @org, bucket: @bucket, precision: "ns"],
      adapter: [hackney: [recv_timeout: 5000, connect_timeout: 2000]]
    ]

    case post(@write_path, body, opts) do
      {:ok, %Tesla.Env{status: status}} when status in 200..299 ->
        :ok
      {:ok, %Tesla.Env{status: status, body: body}} ->
        Logger.error("InfluxDB write failed with status #{status}. Body: #{inspect(body)}")
        {:error, {:http_error, status, body}}
      {:error, reason} ->
        Logger.error("InfluxDB request failed. Reason: #{inspect(reason)}")
        {:error, {:request_error, reason}}
    end
  end
  def write_points(_), do: :ok # 空列表直接返回

  @doc """
  将结构体转换为 InfluxDB Line Protocol 格式。
  格式: measurement,tag_key=tag_value field_key=field_value timestamp
  """
  def to_line_protocol(%__MODULE__{measurement: m, tags: t, fields: f, timestamp: ts}) do
    tags_str = t |> Enum.map(fn {k, v} -> "#{k}=#{v}" end) |> Enum.join(",")
    fields_str = f |> Enum.map(fn {k, v} -> format_field(k, v) end) |> Enum.join(",")

    # 时间戳是纳秒
    timestamp_str = if ts, do: " #{ts}", else: ""

    if tags_str == "" do
      "#{m} #{fields_str}#{timestamp_str}"
    else
      "#{m},#{tags_str} #{fields_str}#{timestamp_str}"
    end
  end

  defp format_field(key, value) when is_integer(value), do: "#{key}=#{value}i"
  defp format_field(key, value) when is_float(value), do: "#{key}=#{value}"
  defp format_field(key, value) when is_boolean(value), do: "#{key}=#{value}"
  defp format_field(key, value) when is_binary(value), do: ~s(#{key}="#{escape_string(value)}")

  defp escape_string(str) do
    # InfluxDB line protocol 对特殊字符需要转义
    String.replace(str, ~r/([",\\])/, "\\\\\\1")
  end
end

这个模块使用Tesla库来构建HTTP客户端,并实现了将Elixir数据结构转换为InfluxDB Line Protocol的核心逻辑,包括对不同数据类型的正确格式化和特殊字符的转义。

第三步:前端状态管理与高性能渲染

前端是这个系统成败的另一个关键。如果每秒收到数百个数据点,直接setState会导致React应用因频繁重渲染而卡死。这里的核心挑战是 UI渲染与数据接收的解耦

我们使用Zustand进行状态管理,因为它轻量且不依赖React Context,可以实现组件外的状态更新和细粒度的订阅。

stores/metricsStore.js:

import { create } from 'zustand';
import { produce } from 'immer';

// 定义一个时间序列数据的最大长度,防止内存无限增长
const MAX_DATA_POINTS = 500;

const useMetricsStore = create((set) => ({
  // 数据结构:{ [measurement_name]: { tags_hash: [{x: timestamp, y: value}, ...], ... } }
  timeSeriesData: {},

  // 这是从Phoenix Channel调用的核心函数
  appendDataPoints: (newData) => set(produce((draft) => {
    newData.data.forEach(measurementData => {
      const { measurement, values } = measurementData;
      
      if (!draft.timeSeriesData[measurement]) {
        draft.timeSeriesData[measurement] = {};
      }

      values.forEach(point => {
        // 使用tags的JSON字符串作为key,来区分不同的series
        const tagsHash = JSON.stringify(point.tags || {});
        
        if (!draft.timeSeriesData[measurement][tagsHash]) {
          draft.timeSeriesData[measurement][tagsHash] = [];
        }

        const series = draft.timeSeriesData[measurement][tagsHash];
        
        // 假设每个point的fields里只有一个value字段用于绘图
        const mainField = Object.keys(point.fields)[0];
        if (mainField) {
            series.push({
                // 时间戳转换为毫秒给图表库使用
                x: Math.floor(point.timestamp / 1_000_000), 
                y: point.fields[mainField],
            });
        }
        
        // 维持队列长度
        if (series.length > MAX_DATA_POINTS) {
          series.shift();
        }
      });
    });
  })),

  // 清空所有数据
  clearData: () => set({ timeSeriesData: {} }),
}));

export default useMetricsStore;

这里的状态管理有几个关键设计:

  • Immer集成: produce使得我们可以用“可变”的语法来安全地更新不可变状态,代码更直观。
  • 数据结构: 采用嵌套对象结构,方便按measurementtags快速索引到特定的时间序列。
  • 有界缓存: MAX_DATA_POINTS确保了浏览器内存不会被无限增长的数据撑爆。
  • 批量更新: appendDataPoints函数设计为接收MetricsBuffer广播的整个批次数据,一次性更新状态,而不是每个点更新一次。这与后端的批处理策略完美匹配。

接下来是在React组件中消费这些数据。

components/RealtimeChart.jsx:

import React, { useEffect, useRef } from 'react';
import { Line } from '@ant-design/plots';
import useMetricsStore from '../stores/metricsStore';
import shallow from 'zustand/shallow';

// 自定义 selector,用于精确订阅
// 只有当特定 measurement 和 tags 的数据点数量变化时才触发重渲染
const seriesSelector = (measurement, tags) => (state) => {
    const tagsHash = JSON.stringify(tags || {});
    return state.timeSeriesData[measurement]?.[tagsHash]?.length || 0;
};

const RealtimeChart = ({ measurement, tags, title }) => {
  const chartRef = useRef(null);
  const dataLength = useMetricsStore(seriesSelector(measurement, tags));

  useEffect(() => {
    if (chartRef.current) {
      // 这里的关键:我们不直接把数据作为React prop传递给图表组件
      // 而是通过 ref 获取图表实例,调用其 changeData 方法来增量更新
      // 这可以绕过React的Diff算法,实现极致的性能
      const tagsHash = JSON.stringify(tags || {});
      const data = useMetricsStore.getState().timeSeriesData[measurement]?.[tagsHash] || [];
      chartRef.current.changeData(data);
    }
  }, [dataLength, measurement, tags]); // 仅在数据点数量变化时执行

  const config = {
    padding: 'auto',
    xField: 'x',
    yField: 'y',
    xAxis: {
      type: 'time',
      mask: 'YYYY-MM-DD HH:mm:ss',
    },
    smooth: true,
    animation: false, // 关闭首次渲染动画,但保留数据更新动画
    onReady: (chart) => {
      chartRef.current = chart;
    },
  };

  return <Line {...config} />;
};

export default RealtimeChart;

这个RealtimeChart组件是性能优化的典范:

  1. 细粒度订阅: useMetricsStore(seriesSelector(...))确保只有与该图表相关的数据变化时,组件才会收到通知。如果其他图表的数据在更新,这个组件不会重渲染。
  2. 避免大数据作为Prop: 巨大的数据数组不作为prop传递给<Line>组件。这避免了每次数据更新时,React都需要深度比较这个大数组,造成性能损耗。
  3. 直接操作图表实例: 通过refonReady回调获取图表实例,然后调用changeData方法。这是大多数成熟图表库都支持的高性能API,它内部做了优化,只会重绘变化的部分,而不是整个图表。

最后,在顶层组件中设置Phoenix Channel的连接。

import { Socket } from 'phoenix';
import useMetricsStore from '../stores/metricsStore';

// This would be in your main App component or a layout component
useEffect(() => {
  const socket = new Socket("/socket", { params: { token: window.userToken } });
  socket.connect();

  const channel = socket.channel("metrics:lobby", {});
  const appendDataPoints = useMetricsStore.getState().appendDataPoints;

  channel.on("new_points", (payload) => {
    // 直接调用 store 的 action,不通过 React state
    appendDataPoints(payload);
  });

  channel.join()
    .receive("ok", resp => { console.log("Joined successfully", resp) })
    .receive("error", resp => { console.log("Unable to join", resp) });

  return () => {
    channel.leave();
    socket.disconnect();
  };
}, []);

这套架构解决了我们最初的痛点。它提供了一个统一、高性能的指标接收入口。通过在网关层实现异步批处理,我们保护了下游的InfluxDB,并平滑了流量峰值。Phoenix Channels与前端精细化的状态管理相结合,使得我们能够在浏览器中流畅地展示高频更新的实时数据,而不会牺牲用户体验。

当前方案的一个局限性在于MetricsBuffer GenServer本身是一个单点。如果该进程崩溃,内存中的数据将会丢失。在更严格的生产环境中,可以考虑使用Elixir的Registry来动态地启动多个Buffer进程,并根据指标的measurementtags进行哈希分发,以分散负载和风险。此外,对于失败的InfluxDB写入,当前只是记录日志,一个更健壮的实现会引入持久化的死信队列(如使用ETS或外部消息队列),以便后续进行重试。对于前端,当数据点密度过高时,可以考虑在appendDataPoints中实现降采样逻辑,进一步减轻渲染压力。


  目录