构建基于Phoenix与WebRTC的大规模实时音视频数据管道以驱动AI分析


我们面临的第一个问题不是信令,也不是媒体传输,而是数据出口。当上千路 WebRTC 音视频流在我们的 SFU (Selective Forwarding Unit) 集群中穿梭时,业务方的数据科学团队提出了一个看似简单却极难实现的需求:他们需要实时访问这些媒体流的原始数据,用于欺诈检测模型的训练和推理。传统的做法是先录制,再离线分析,但这对于需要秒级响应的场景是完全不可接受的。我们需要的是一个能将 WebRTC 媒体流实时转化为结构化数据流的管道,一个连接实时通信与大数据分析的桥梁。

最初的方案是在媒体服务器(如 Mediasoup 或 Janus)层面进行二次开发,通过插件捕获 RTP 包,然后推送到 Kafka。这个方案很快被否决。对 C/C++ 媒体引擎的深度定制,意味着巨大的开发和维护成本,并且每次媒体服务器升级都可能带来灾难性的兼容问题。在真实项目中,我们追求的是高内聚、低耦合的解决方案,控制平面和数据平面的逻辑应当尽可能分离。

最终,我们决定将重心放在控制平面和数据捕获的编排上,而这个角色的最佳选择就是 Elixir/Phoenix。BEAM 虚拟机的软实时、高并发和强大的容错能力,使其成为管理海量、长生命周期的 WebRTC 连接状态的理想平台。我们的核心架构思路是:使用 Phoenix 作为信令服务器和整个会话生命周期的协调器,它负责与客户端进行信令交互,并远程指令媒体服务器建立媒体路由;同时,我们利用媒体服务器现有的 RTP 转发功能,将指定轨道的媒体流旁路(Sidecar)到一个独立的 Elixir 数据处理管道,这个管道负责解析、聚合,并将数据喂给下游的 AI/ML 系统。

graph TD
    subgraph "客户端 (Browser/Mobile)"
        A[Client]
    end

    subgraph "Phoenix 应用 (控制平面)"
        B(Phoenix Channel)
        C(Room GenServer)
        D(Peer GenServer)
        E(DynamicSupervisor)
    end

    subgraph "媒体服务器 (数据平面 - Mediasoup)"
        F[Mediasoup Worker]
        G[Router]
        H[WebRtcTransport]
        I[Producer]
        J[PlainTransport]
        K[RtpConsumer]
    end

    subgraph "Elixir 数据管道"
        L(RTP Listener)
        M(Broadway Pipeline)
        N(Kafka Producer)
    end

    subgraph "AI/数据科学平台"
        O[Kafka]
        P[ML Model Service]
    end

    A -- WebSocket/Signaling --> B
    B -- "join/leave/signal" --> C
    C -- "spawn/manage" --> E
    E -- "start_child" --> D
    C -- "room state" --> D
    D -- "Mediasoup API Calls" --> F
    F -- "create" --> G
    G -- "create" --> H
    G -- "create" --> J
    H -- "produces" --> I
    I -- "consumed by" --> K
    J -- "connects to" --> L
    K -- "forwards RTP to" --> J
    L -- "RTP Packets" --> M
    M -- "Structured Data" --> N
    N -- "Events" --> O
    O -- "Data Stream" --> P

上图展示了整个系统的生命周期和数据流。Phoenix 不仅处理客户端信令,更核心的是,它通过 Peer GenServer 为每个连接维护一个状态机,并作为唯一的信源(Source of Truth)来驱动媒体服务器的行为。

核心组件一:Peer GenServer 状态机

每个进入房间的用户,我们都会为其启动一个 Peer GenServer 进程。这个进程是该用户在服务器端的逻辑化身,负责处理所有与该用户相关的信令和媒体服务器交互。这比在 Phoenix Channel 进程中直接处理逻辑要健壮得多,因为 Channel 进程的生命周期与 WebSocket 连接绑定,而 Peer 进程的生命周期可以由我们自己定义,即使网络闪断,Peer 进程及其状态依然可以存活。

# lib/my_app/rtc/peer.ex
defmodule MyApp.Rtc.Peer do
  use GenServer
  require Logger

  alias MyApp.Rtc.MediasoupClient

  @enforce_keys [:peer_id, :room_id, :channel_pid]
  defstruct @enforce_keys ++
            [
              transport_id: nil,
              producer_ids: MapSet.new(),
              # This will hold the ID of the transport used for RTP forwarding
              data_forward_transport_id: nil,
              # This maps original producer_id to the data consumer_id
              data_forward_consumer_map: %{}
            ]

  # Client API
  def start_link(opts) do
    peer_id = opts[:peer_id]
    GenServer.start_link(__MODULE__, opts, name: via_tuple(peer_id))
  end

  def init(opts) do
    Logger.info("Peer process starting: #{inspect(opts)}")
    state = struct!(__MODULE__, opts)
    # The first action is to create a WebRTC transport on the media server
    {:ok, transport_options} = MediasoupClient.create_webrtc_transport()

    # Send transport info back to client via their channel
    send(state.channel_pid, {:transport_created, transport_options})

    updated_state = %{state | transport_id: transport_options.id}
    {:ok, updated_state}
  end

  # Internal Callbacks (handle signals from client channel)
  def handle_cast({:connect_transport, dtls_parameters}, state) do
    case MediasoupClient.connect_webrtc_transport(state.transport_id, dtls_parameters) do
      :ok ->
        Logger.info("Peer #{state.peer_id} transport connected")
        # Now the client is ready to produce media
        send(state.channel_pid, {:transport_connected})

      {:error, reason} ->
        Logger.error("Failed to connect transport for peer #{state.peer_id}: #{reason}")
    end
    {:noreply, state}
  end

  def handle_cast({:produce, kind, rtp_parameters}, state) do
    case MediasoupClient.create_producer(state.transport_id, kind, rtp_parameters) do
      {:ok, producer_id} ->
        Logger.info("Peer #{state.peer_id} created producer #{producer_id}")
        updated_producers = MapSet.put(state.producer_ids, producer_id)
        
        # *** CORE LOGIC FOR DATA PIPELINE ***
        # For every new video producer, we set up a corresponding data forwarder.
        # In a real app, you might only do this for specific tracks.
        new_state =
          if kind == "video" do
            setup_data_forwarding(producer_id, state)
          else
            state
          end

        send(state.channel_pid, {:producer_created, producer_id})
        {:noreply, %{new_state | producer_ids: updated_producers}}

      {:error, reason} ->
        Logger.error("Failed to create producer for peer #{state.peer_id}: #{reason}")
        {:noreply, state}
    end
  end

  # ... other handlers for consuming, etc.

  def terminate(reason, state) do
    Logger.warn("Peer process terminating for #{state.peer_id} due to #{inspect(reason)}")
    # CRITICAL: Cleanup mediasoup resources associated with this peer
    MediasoupClient.close_transport(state.transport_id)
    # Also close data forwarding transport and consumers
    if state.data_forward_transport_id, do: MediasoupClient.close_transport(state.data_forward_transport_id)
    :ok
  end

  # Private helpers
  defp setup_data_forwarding(producer_id, state) do
    # This function is the bridge. It orchestrates Mediasoup to create a side-path
    # for the media stream to be sent to our internal data pipeline.
    
    # Ensure we have a PlainTransport for RTP forwarding. Create if it doesn't exist.
    # A PlainTransport sends/receives raw RTP over UDP.
    {transport_id, new_state} =
      case state.data_forward_transport_id do
        nil ->
          # We define the listening IP/port of our data pipeline here.
          # This should come from config.
          config = Application.get_env(:my_app, :data_pipeline)
          
          {:ok, transport} = MediasoupClient.create_plain_transport(
            listen_ip: config[:listen_ip], # Mediasoup sends FROM here
            port: config[:port] # Mediasoup sends TO this port
          )
          Logger.info("Created PlainTransport #{transport.id} for data forwarding")
          {transport.id, %{state | data_forward_transport_id: transport.id}}

        id ->
          {id, state}
      end

    # Now, create a consumer on this new transport that consumes from the original producer.
    # This effectively forks the media stream.
    case MediasoupClient.create_consumer(transport_id, producer_id) do
      {:ok, consumer_id} ->
        Logger.info("Created data consumer #{consumer_id} for producer #{producer_id}")
        updated_map = Map.put(new_state.data_forward_consumer_map, producer_id, consumer_id)
        %{new_state | data_forward_consumer_map: updated_map}

      {:error, reason} ->
        Logger.error("Failed to create data consumer: #{reason}")
        # Return the state unmodified if forwarding setup fails
        state
    end
  end

  defp via_tuple(peer_id), do: {:via, Registry, {MyApp.PeerRegistry, peer_id}}
end

这段代码的核心在于 setup_data_forwarding/2 函数。当一个用户开始推流(handle_cast({:produce, ...}))时,我们不仅仅是在 WebRTC Transport 上创建了一个 Producer,我们还做了一件额外的事情:

  1. 创建 PlainTransport: 检查 Peer 的 state 中是否已经存在一个 data_forward_transport_id。如果没有,就调用媒体服务器的 API 创建一个 PlainTransportPlainTransport 是一种特殊的 Transport,它不使用 ICE/DTLS,而是直接通过 UDP 发送和接收原始的 RTP 包。我们将其配置为将数据发送到我们内部数据管道的监听地址和端口。这是一个一次性的操作,一个 Peer 只需要一个 PlainTransport
  2. 创建 Consumer: 在这个 PlainTransport 上为新的 Producer 创建一个对应的 Consumer。这一步是关键,它相当于在媒体服务器内部复制了一份媒体流,一份通过 WebRtcTransport 发给其他参会者,另一份则通过 PlainTransport 发给我们自己的数据管道。

这里的错误处理和资源清理(terminate/2)至关重要。一个常见的错误是在 Peer 进程异常退出时,没有正确地清理它在媒体服务器上创建的资源(Transports, Producers, Consumers),导致资源泄漏,最终拖垮整个媒体服务器。使用 DynamicSupervisor 来管理 Peer 进程,并确保 terminate/2 回调被正确执行,是保证系统稳定性的基础。

核心组件二:Broadway 数据处理管道

现在,媒体服务器正在将原始的 RTP 包源源不断地发送到我们指定的 UDP 端口。我们需要一个高效、带反压(back-pressure)、可扩展的机制来接收和处理这些数据。直接使用 Elixir 的 gen_udp 当然可以,但为了构建一个生产级的系统,我们需要考虑并发处理、批处理、容错和优雅停机等问题。Broadway 是解决这类问题的完美工具。

我们构建一个 Broadway 拓扑,它从一个自定义的 GenStage Producer 开始,这个 Producer 负责监听 UDP 端口并解析 RTP 包。

# lib/my_app/data_pipeline/rtp_producer.ex
defmodule MyApp.DataPipeline.RtpProducer do
  use GenStage
  require Logger

  def start_link(opts) do
    GenStage.start_link(__MODULE__, opts, name: __MODULE__)
  end

  def init(opts) do
    port = Keyword.fetch!(opts, :port)
    # Open the UDP socket.
    # The `:active => :once` is crucial for flow control. We only re-enable
    # receiving after we have processed the current packet.
    {:ok, socket} = :gen_udp.open(port, [:binary, active: :once, ip: {127, 0, 0, 1}])
    Logger.info("RTP Producer listening on UDP port #{port}")
    {:producer, %{socket: socket, buffer: []}}
  end

  def handle_demand(demand, state) do
    # Broadway is asking for more data. We process what's in our buffer first.
    {events, new_buffer} = Enum.split(state.buffer, demand)
    
    # We re-enable receiving more UDP packets based on remaining demand.
    # This is the core of our back-pressure mechanism.
    remaining_demand = demand - length(events)
    if remaining_demand > 0 do
      activate_socket(state.socket, remaining_demand)
    end
    
    {:noreply, events, %{state | buffer: new_buffer}}
  end

  def handle_info({:udp, socket, _ip, _port, packet}, state) do
    # We received a packet. Parse it.
    # In a real system, you'd use a proper RTP parsing library.
    # Here we just simulate extracting a payload and some metadata.
    case parse_rtp_packet(packet) do
      {:ok, event} ->
        # Add to buffer and wait for demand.
        {:noreply, [], %{state | buffer: state.buffer ++ [event]}}
      
      {:error, _} ->
        # Malformed packet, ignore.
        # Re-enable socket to receive the next one.
        :inet.setopts(socket, active: :once)
        {:noreply, [], state}
    end
  end
  
  defp parse_rtp_packet(<<_version::2, _padding::1, _extension::1, _cc::4, _marker::1, payload_type::7, _sequence_number::16, _timestamp::32, ssrc::32, _csrcs::size(0)-unit(8), payload::binary>>) do
    # Simplified parsing. A real implementation would be more robust.
    # The SSRC tells us which stream this packet belongs to.
    # We can map this back to a peer/track if Mediasoup is configured to do so.
    {:ok, %{ssrc: ssrc, payload_type: payload_type, payload_size: byte_size(payload), received_at: System.os_time(:nanosecond)}}
  end
  
  # A helper to safely re-activate the socket N times.
  defp activate_socket(socket, count) when count > 0 do
    :inet.setopts(socket, active: count)
  end
  defp activate_socket(_, _), do: :ok
end

这个 RtpProducer 的精髓在于它如何利用 GenStage 的 demand 机制和 gen_udp:active => :once (或 :active => N) 模式来实现反压。

  • 当 Broadway 的 consumer 准备好处理数据时,handle_demand/2 会被调用。
  • handle_demand/2 从内部缓冲区取出事件,并根据剩余的 demand 重新激活 UDP 套接字,告诉 BEAM VM 最多再接收 N 个 UDP 包。
  • 当 UDP 包到达时,handle_info/2 被触发,它解析数据包,放入缓冲区,但不会立即重新激活套接字。
  • 只有当下游再次请求数据时,套接字才会被激活。这就形成了一个闭环:如果下游处理不过来(比如 Kafka 写入延迟),demand 就会停止,UDP 包的接收也会暂停,内核的 UDP 缓冲区最终会填满并开始丢包。这是一种优雅的降级,保护了我们的应用进程不被涌入的数据冲垮。

接下来是 Broadway 管道本身的定义。

# lib/my_app/data_pipeline/pipeline.ex
defmodule MyApp.DataPipeline.Pipeline do
  use Broadway
  alias Broadway.Message

  def start_link(_opts) do
    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {MyApp.DataPipeline.RtpProducer, port: 5004}, # Should be from config
        concurrency: 1 # A single UDP socket listener
      ],
      processors: [
        default: [
          concurrency: System.schedulers_online() * 4 # Concurrently process packets
        ]
      ],
      batchers: [
        kafka_batcher: [
          batch_size: 100,
          batch_timeout: 200, # 200ms
          concurrency: System.schedulers_online() * 2
        ]
      ]
    )
  end

  @impl true
  def handle_message(_, %Message{data: event} = message, _) do
    # Stage 1: Initial processing/enrichment on a per-packet basis
    # For example, mapping SSRC to a more meaningful user_id/track_id
    # This mapping could be fetched from a shared ETS table managed by Room/Peer GenServers
    enriched_event = Map.put(event, :user_id, lookup_user_by_ssrc(event.ssrc))
    Message.update_data(message, fn _ -> enriched_event end)
  end

  @impl true
  def handle_batch(:kafka_batcher, messages, _batch_info, _context) do
    # Stage 2: Batch processing and publishing
    events =
      Enum.map(messages, fn message ->
        %{
          "userId" => message.data.user_id,
          "ssrc" => message.data.ssrc,
          "payloadType" => message.data.payload_type,
          "payloadSizeBytes" => message.data.payload_size,
          "receivedAt" => message.data.received_at
        }
      end)

    # In a real app, use a proper Kafka client library like :brod or :kafka_ex
    case MyApp.KafkaClient.produce("video_analytics_stream", events) do
      :ok ->
        Logger.debug("Successfully produced #{length(messages)} events to Kafka")
        messages # Return successful messages
      
      {:error, reason} ->
        Logger.error("Failed to produce to Kafka: #{reason}")
        # Mark all messages in the batch as failed, Broadway will handle retries/failures
        Enum.map(messages, &Message.failed(&1, "kafka_produce_failed"))
    end
  end

  defp lookup_user_by_ssrc(ssrc) do
    # Dummy implementation.
    # In production, this would query an ETS table or a Redis cache.
    # The Peer GenServer would be responsible for populating this table
    # with the SSRC it receives from Mediasoup upon producer creation.
    :ets.lookup(:ssrc_to_user_map, ssrc) |> case do
      [{^ssrc, user_id}] -> user_id
      [] -> "unknown_#{ssrc}"
    end
  end
end

这个 Broadway 管道展示了一个典型的多阶段数据处理流程:

  1. Producer: RtpProducer 监听 UDP 端口,实现反压。
  2. Processors: handle_message/3 进行单条消息的并行处理。这里的一个关键点是数据丰富(enrichment)。RTP 包只包含 SSRC(同步源标识符),这是一个随机数字。我们需要将它映射回有业务意义的 user_idtrack_id。这个映射关系可以在 Peer GenServer 创建 Producer 时,从媒体服务器的 API 响应中获得,然后写入一个由所有进程共享的 ETS 表中。handle_message 就可以高效地从 ETS 中查询这个映射。
  3. Batchers: handle_batch/4 将处理过的消息聚合成批次,然后一次性写入 Kafka。这极大地提高了吞吐量,减少了与外部系统(如 Kafka)的网络交互次数。Broadway 的批处理器还内置了超时机制,确保即使消息量不够一个批次,数据也不会被无限期延迟。

方案的局限性与未来展望

这套架构解决了实时媒体流到大数据系统的“最后一公里”问题,并且具有良好的水平扩展性。Phoenix 节点、媒体服务器节点和数据处理管道节点都可以独立扩展。然而,它并非没有局限性。

首先,原始 RTP 包的转发和处理对网络和 CPU 都是一种负担。当前方案中,我们只是提取了元数据。如果 AI 模型需要的是原始视频帧,那么数据管道就需要集成解码库(如 FFmpeg),这将是巨大的性能挑战。一种可能的优化是在媒体服务器端通过一个轻量级插件直接提取特征,而不是转发整个 RTP 流。

其次,SSRC 到用户 ID 的映射管理是一个潜在的复杂点。虽然 ETS 速度很快,但在一个分布式集群中,维持所有节点上 ETS 表的最终一致性需要额外的机制,比如使用 Horde 或其他分布式注册表。

最后,此架构主要解决了单向的数据流出问题。如果 AI 分析的结果需要被实时反馈给会议中的其他参与者(例如,实时字幕、内容审核警告),就需要构建一个反向的数据通路。这通常可以通过 Phoenix Channel 实现,AI 服务将结果推送到一个内部 API,该 API 再通过 Channel 将消息广播给指定房间的客户端。这将形成一个完整的、数据驱动的实时互动闭环,也是我们下一步迭代的方向。


  目录