使用Go、Nginx、Pulsar与SQLite构建高可用的边缘数据采集网关


我们面临一个棘手的工程问题:需要从部署在全球上千个边缘节点的设备上,实时采集大量的遥测数据。这些节点所处的网络环境极不稳定,从时常抖动的Wi-Fi到信号微弱的4G网络,无所不有。业务要求是数据绝对不能丢失,即便边缘节点与中心云端的连接中断数小时甚至数天。

一个简单的方案是边缘服务直接通过HTTP/gRPC将数据推送到云端API。但这在弱网环境下是灾难性的。任何一次网络波动、云端服务升级或负载均衡的瞬时抖动,都会导致数据推送失败。重试可以缓解一部分问题,但如果服务长时间不可用,内存中的重试队列会溢出,进程重启则会丢失所有待处理数据,这在生产环境中是不可接受的。

我们需要一个更健壮的方案。一个能在边缘侧“削峰填谷”并提供持久化保证的采集网关。初步构想是,这个网关应该具备以下特性:

  1. 高性能接收: 能够稳定处理瞬时的高并发数据写入请求。
  2. 本地持久化: 接收到的数据必须立即落盘,确保进程或机器重启后数据不丢失。
  3. 异步上传: 数据上传至中心云端的操作必须与数据接收解耦,在独立的流程中进行,并能处理长时间的网络中断。
  4. 轻量级: 边缘节点的资源通常有限,整个方案的资源占用(CPU、内存)必须很低。

基于这些考量,我们最终的技术选型决策是 Nginx + Go + SQLite + Pulsar 的组合。

  • Nginx: 作为流量入口。它在处理TLS终止、HTTP连接管理、速率限制等方面的能力是身经百战的。让Nginx处理这些网络边缘的脏活累活,Go服务本身则可以更专注于核心业务逻辑。
  • Go: 网关核心逻辑的实现语言。它的并发模型、低资源消耗和高效的性能,使其成为构建这类网络中间件的理想选择。
  • SQLite: 作为本地持久化缓冲。相比于直接写文件,SQLite提供了事务、ACID保证以及在数据积压时进行查询分析的便利性。开启WAL(Write-Ahead Logging)模式后,其写入性能非常出色,完全能满足高吞吐的写入需求,同时保证了数据的强一致性。
  • Pulsar: 作为云边数据传输的消息队列。Pulsar的几个特性在这里至关重要:首先,它的 geo-replication 能力天然适合全球分布式的边缘场景;其次,Pulsar客户端对网络中断有良好的容错和重连机制;最后,它的分片和多租户模型也便于在云端进行数据隔离和水平扩展。

整个数据流的设计如下:

graph TD
    A[外部数据源] -- HTTPS --> B(Nginx);
    B -- HTTP/1.1 --> C{Go HTTP服务};
    C -- 写入channel --> D[Go 写入协程];
    D -- 事务写入 --> E[(SQLite数据库)];
    F[Go 上传协程] -- 定时轮询 --> E;
    F -- 异步发送 --> G[Pulsar集群];
    C -- 202 Accepted --> A;

    subgraph 边缘节点
        B; C; D; E; F;
    end

    subgraph 中心云
        G;
    end

第一步: Nginx配置与Go HTTP入口

Nginx的角色是纯粹的代理和流量防护。配置非常直接,它监听443端口,处理TLS,然后将请求代理到本地的Go服务。

# /etc/nginx/conf.d/edge-gateway.conf

# 定义上游Go服务
upstream go_gateway {
    server 127.0.0.1:8080;
    keepalive 32;
}

server {
    listen 443 ssl http2;
    server_name your.edge.domain;

    # SSL/TLS 配置 (此处省略)
    # ssl_certificate /path/to/fullchain.pem;
    # ssl_certificate_key /path/to/privkey.pem;

    # 性能调优
    sendfile on;
    tcp_nopush on;
    tcp_nodelay on;

    # 安全头
    add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always;

    # 日志
    access_log /var/log/nginx/access.log;
    error_log /var/log/nginx/error.log;

    location /ingest {
        # 速率限制: 每秒1000个请求,突发量200个
        limit_req zone=ingest_burst burst=200 nodelay;

        # 拒绝超大请求体
        client_max_body_size 1M;

        proxy_pass http://go_gateway;
        proxy_http_version 1.1;
        proxy_set_header Connection "keep-alive";
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_connect_timeout 2s;
        proxy_send_timeout 5s;
        proxy_read_timeout 5s;
    }
}

# 在 nginx.conf 的 http 块中定义限流区域
limit_req_zone $binary_remote_addr zone=ingest_burst:10m rate=1000r/s;

Go服务则专注于接收经过Nginx预处理的请求。这里的关键点是,HTTP Handler不应执行任何耗时操作。它的唯一职责是:解析和校验请求体,然后迅速将数据推入一个内部的Go channel,并立即返回202 Accepted响应。这确保了API端点极高的响应速度和吞吐量。

// pkg/api/server.go
package api

import (
	"context"
	"encoding/json"
	"log/slog"
	"net/http"
	"time"
)

// IngestPayload 定义了接收的数据结构
type IngestPayload struct {
	SourceID string          `json:"source_id"`
	Data     json.RawMessage `json:"data"`
}

type Server struct {
	httpServer *http.Server
	dataChan   chan<- []byte // 只写channel,将数据发送给存储层
}

func NewServer(addr string, dataChan chan<- []byte) *Server {
	mux := http.NewServeMux()
	server := &Server{
		dataChan: dataChan,
	}
	mux.HandleFunc("/ingest", server.handleIngest)
	
	httpServer := &http.Server{
		Addr:         addr,
		Handler:      mux,
		ReadTimeout:  5 * time.Second,
		WriteTimeout: 10 * time.Second,
		IdleTimeout:  60 * time.Second,
	}
	server.httpServer = httpServer
	return server
}

func (s *Server) Start() error {
	slog.Info("starting HTTP server", "addr", s.httpServer.Addr)
	return s.httpServer.ListenAndServe()
}

func (s *Server) Shutdown(ctx context.Context) error {
	slog.Info("shutting down HTTP server")
	return s.httpServer.Shutdown(ctx)
}

func (s *Server) handleIngest(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
		return
	}

	// 简单的校验,生产环境会更复杂
	var payload IngestPayload
	if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
		slog.Error("failed to decode payload", "error", err)
		http.Error(w, "Bad request", http.StatusBadRequest)
		return
	}
	
	// 序列化回 []byte 以便通用处理
	rawPayload, err := json.Marshal(payload)
	if err != nil {
		slog.Error("failed to re-marshal payload", "error", err)
		http.Error(w, "Internal server error", http.StatusInternalServerError)
		return
	}

	// 将数据推入channel,这里使用了非阻塞发送
	// 如果channel满了,意味着后端处理不过来,需要给客户端施加压力
	select {
	case s.dataChan <- rawPayload:
		w.WriteHeader(http.StatusAccepted)
	default:
		// 这是背压机制的关键一环
		slog.Warn("data channel is full, rejecting request")
		http.Error(w, "Server busy, please retry later", http.StatusServiceUnavailable)
	}
}

第二步: SQLite作为持久化缓冲

这是整个方案的核心。一个专门的协程从dataChan中消费数据,并将其批量写入SQLite。

数据库表结构很简单:

CREATE TABLE IF NOT EXISTS event_queue (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    payload BLOB NOT NULL,
    status TEXT NOT NULL DEFAULT 'pending', -- pending, sending, sent
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX IF NOT EXISTS idx_status_created_at ON event_queue (status, created_at);

使用SQLite时,有几个关键的配置和实践必须遵循,以确保性能和稳定性:

  1. WAL模式: PRAGMA journal_mode=WAL;。这允许读写并发,极大地提高了写入性能。
  2. 单一写入者: 虽然WAL允许多个读取者,但SQLite的写入仍然是串行的。最佳实践是在Go应用中只用一个协程(或一个专用的数据库连接)来执行所有写操作。这避免了锁争用。
  3. 批量事务: 不要每条消息都开启一个事务。应该将多条消息捆绑在一个事务中提交,这能将写入性能提升几个数量级。
// pkg/storage/sqlite.go
package storage

import (
	"context"
	"database/sql"
	"fmt"
	"log/slog"
	"time"

	_ "github.com/mattn/go-sqlite3"
)

const (
	batchSize    = 100
	batchTimeout = 500 * time.Millisecond
)

type SQLiteStore struct {
	db       *sql.DB
	dataChan <-chan []byte
}

func NewSQLiteStore(dbPath string, dataChan <-chan []byte) (*SQLiteStore, error) {
	db, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?_journal_mode=WAL&_busy_timeout=5000", dbPath))
	if err != nil {
		return nil, fmt.Errorf("failed to open sqlite db: %w", err)
	}

	// 关键:对于写操作,限制连接数为1以避免锁争用
	db.SetMaxOpenConns(1)

	// 创建表结构
	// ... (执行上面的CREATE TABLE和CREATE INDEX语句)
	
	return &SQLiteStore{
		db:       db,
		dataChan: dataChan,
	}, nil
}

// StartWriterLoop 启动一个协程,负责将数据从channel写入数据库
func (s *SQLiteStore) StartWriterLoop(ctx context.Context) {
	go func() {
		ticker := time.NewTicker(batchTimeout)
		defer ticker.Stop()

		batch := make([][]byte, 0, batchSize)

		for {
			select {
			case <-ctx.Done():
				// 在退出前,将最后一批数据写入
				if len(batch) > 0 {
					s.flushBatch(batch)
				}
				slog.Info("sqlite writer loop stopped")
				return
			case data, ok := <-s.dataChan:
				if !ok { // channel被关闭
					if len(batch) > 0 {
						s.flushBatch(batch)
					}
					slog.Info("data channel closed, sqlite writer loop stopping")
					return
				}
				batch = append(batch, data)
				if len(batch) >= batchSize {
					s.flushBatch(batch)
					batch = make([][]byte, 0, batchSize) // 重置batch
				}
			case <-ticker.C:
				if len(batch) > 0 {
					s.flushBatch(batch)
					batch = make([][]byte, 0, batchSize) // 重置batch
				}
			}
		}
	}()
}

func (s *SQLiteStore) flushBatch(batch [][]byte) {
	tx, err := s.db.Begin()
	if err != nil {
		slog.Error("failed to begin transaction", "error", err)
		return
	}
	defer tx.Rollback() // 如果没有Commit,则自动回滚

	stmt, err := tx.Prepare("INSERT INTO event_queue (payload, status) VALUES (?, 'pending')")
	if err != nil {
		slog.Error("failed to prepare statement", "error", err)
		return
	}
	defer stmt.Close()

	for _, data := range batch {
		if _, err := stmt.Exec(data); err != nil {
			slog.Error("failed to execute statement in batch", "error", err)
			// 遇到错误,整个事务回滚,这批数据会丢失。
			// 生产级代码需要更精细的错误处理,比如重试或将坏数据移到死信表。
			return
		}
	}

	if err := tx.Commit(); err != nil {
		slog.Error("failed to commit transaction", "error", err)
	} else {
		slog.Info("flushed batch to sqlite", "count", len(batch))
	}
}

// ... FetchPendingEvents 和 UpdateEventsStatus 方法将在下一步实现

第三步: Pulsar数据上传与状态回写

另一个独立的协程,我们称之为Dispatcher,负责将持久化的数据上传到Pulsar。

Dispatcher的逻辑是:

  1. 定期从SQLite中拉取一批pending状态的事件。
  2. 使用Pulsar的异步Send方法发送消息。这样做不会阻塞Dispatcher的主循环。
  3. 在Pulsar客户端的发送回调中,如果发送成功,则将对应事件的ID加入一个待更新列表。
  4. 批量更新SQLite中已成功发送的事件状态为sent或直接删除。这里的批量更新同样重要。
// pkg/dispatcher/pulsar.go
package dispatcher

import (
	"context"
	"log/slog"
	"sync"
	"time"
	"strconv"

	"github.com/apache/pulsar-client-go/pulsar"
)

// Storage a defines the interface we need from our storage layer.
type Storage interface {
	FetchPendingEvents(limit int) (map[int64][]byte, error)
	UpdateEventsStatus(ids []int64, status string) error
	DeleteEvents(ids []int64) error
}

type PulsarDispatcher struct {
	store    Storage
	producer pulsar.Producer
	client   pulsar.Client
}

func NewPulsarDispatcher(pulsarURL, topic string, store Storage) (*PulsarDispatcher, error) {
	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL:               pulsarURL,
		ConnectionTimeout: 5 * time.Second,
		OperationTimeout:  5 * time.Second,
	})
	if err != nil {
		return nil, err
	}

	producer, err := client.CreateProducer(pulsar.ProducerOptions{
		Topic:                topic,
		BatchingMaxMessages:  1000,
		BatchingMaxPublishDelay: 100 * time.Millisecond,
		BlockIfQueueFull:     true, // 背压到dispatcher
	})
	if err != nil {
		return nil, err
	}

	return &PulsarDispatcher{
		store:    store,
		producer: producer,
		client:   client,
	}, nil
}

func (d *PulsarDispatcher) Start(ctx context.Context) {
	slog.Info("starting pulsar dispatcher loop")
	ticker := time.NewTicker(2 * time.Second) // 轮询周期
	defer ticker.Stop()

	for {
		select {
		case <-ctx.Done():
			slog.Info("pulsar dispatcher loop stopped")
			return
		case <-ticker.C:
			d.processBatch(ctx)
		}
	}
}

func (d *PulsarDispatcher) processBatch(ctx context.Context) {
	// 1. 获取待处理事件
	events, err := d.store.FetchPendingEvents(500) // 每次处理500条
	if err != nil {
		slog.Error("failed to fetch pending events", "error", err)
		return
	}
	if len(events) == 0 {
		return
	}

	slog.Info("processing event batch", "count", len(events))

	var wg sync.WaitGroup
	successfulIDs := make(chan int64, len(events))
	
	idsToProcess := make([]int64, 0, len(events))
	for id := range events {
		idsToProcess = append(idsToProcess, id)
	}

	// 2. 将事件状态更新为 'sending',防止被其他dispatcher实例(如果未来扩展)重复获取
	// 这是一个重要的步骤,确保了处理的幂等性。
	if err := d.store.UpdateEventsStatus(idsToProcess, "sending"); err != nil {
		slog.Error("failed to update events to sending status", "error", err)
		return
	}

	// 3. 异步发送
	for id, payload := range events {
		wg.Add(1)
		d.producer.SendAsync(ctx, &pulsar.ProducerMessage{
			Payload: payload,
			Properties: map[string]string{
				"event_id": strconv.FormatInt(id, 10), // 将DB ID放入属性,用于追踪
			},
		}, func(msgID pulsar.MessageID, prodMsg *pulsar.ProducerMessage, err error) {
			defer wg.Done()
			dbID, _ := strconv.ParseInt(prodMsg.Properties["event_id"], 10, 64)

			if err != nil {
				slog.Error("failed to send message to pulsar", "db_id", dbID, "error", err)
				// 发送失败,需要一个错误处理机制,例如将状态更新回 'pending' 并增加重试次数
				// 为简化起见,我们暂时只记录日志,它会在下一次轮询中被重试
				d.store.UpdateEventsStatus([]int64{dbID}, "pending")
			} else {
				slog.Debug("message sent successfully", "db_id", dbID, "msg_id", msgID)
				successfulIDs <- dbID
			}
		})
	}

	wg.Wait()
	close(successfulIDs)

	// 4. 收集所有成功ID并批量更新数据库
	finalIDs := make([]int64, 0, len(events))
	for id := range successfulIDs {
		finalIDs = append(finalIDs, id)
	}

	if len(finalIDs) > 0 {
		// 在真实项目中,删除是更常见的操作以回收空间
		if err := d.store.DeleteEvents(finalIDs); err != nil {
			slog.Error("failed to delete sent events", "error", err)
		} else {
			slog.Info("successfully deleted sent events", "count", len(finalIDs))
		}
	}
}

func (d *PulsarDispatcher) Close() {
	d.producer.Close()
	d.client.Close()
}

现在需要回去补全SQLiteStore中Dispatcher所需的方法。

// pkg/storage/sqlite.go (追加)

func (s *SQLiteStore) FetchPendingEvents(limit int) (map[int64][]byte, error) {
	rows, err := s.db.Query("SELECT id, payload FROM event_queue WHERE status = 'pending' ORDER BY created_at ASC LIMIT ?", limit)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	events := make(map[int64][]byte)
	for rows.Next() {
		var id int64
		var payload []byte
		if err := rows.Scan(&id, &payload); err != nil {
			return nil, err
		}
		events[id] = payload
	}
	return events, nil
}

func (s *SQLiteStore) UpdateEventsStatus(ids []int64, status string) error {
	// ... 实现批量UPDATE的逻辑,使用 IN 子句 ...
}

func (s *SQLiteStore) DeleteEvents(ids []int64) error {
	// ... 实现批量DELETE的逻辑,使用 IN 子句 ...
}

第四步: 整合与优雅停机

最后,在main函数中,我们需要将所有组件串联起来,并确保在接收到SIGINTSIGTERM信号时,应用能够优雅地关闭。这包括停止接收新请求,等待处理中的任务完成,然后关闭数据库和Pulsar连接。

// cmd/gateway/main.go
package main

import (
	"context"
	"log/slog"
	"os"
	"os/signal"
	"syscall"
	"time"

	"golang.org/x/sync/errgroup"

	"your-project/pkg/api"
	"your-project/pkg/dispatcher"
	"your-project/pkg/storage"
)

func main() {
	// ... 配置加载 ...
	
	// 创建一个带cancel的上下文,用于协调所有协程的退出
	ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
	defer stop()

	// 1. 初始化数据通道
	dataChan := make(chan []byte, 1024) // 缓冲大小是一个需要调优的参数

	// 2. 初始化SQLite存储
	store, err := storage.NewSQLiteStore("./events.db", dataChan)
	if err != nil {
		slog.Error("failed to init sqlite store", "error", err)
		os.Exit(1)
	}

	// 3. 初始化Pulsar Dispatcher
	pulsarDispatcher, err := dispatcher.NewPulsarDispatcher("pulsar://localhost:6650", "persistent://public/default/edge-events", store)
	if err != nil {
		slog.Error("failed to init pulsar dispatcher", "error", err)
		os.Exit(1)
	}
	defer pulsarDispatcher.Close()

	// 4. 初始化API服务器
	apiServer := api.NewServer(":8080", dataChan)

	// 使用 errgroup 来管理各个组件的生命周期
	g, gCtx := errgroup.WithContext(ctx)

	// 启动SQLite写入协程
	store.StartWriterLoop(gCtx)

	// 启动Pulsar上传协程
	g.Go(func() error {
		pulsarDispatcher.Start(gCtx)
		return nil
	})

	// 启动HTTP服务器
	g.Go(func() error {
		return apiServer.Start()
	})
	
	// 监听服务器关闭
	g.Go(func() error {
		<-gCtx.Done() // 等待取消信号
		slog.Info("shutdown signal received, initiating graceful shutdown")
		
		shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
		defer cancel()

		// 先关闭HTTP服务器,不再接收新请求
		if err := apiServer.Shutdown(shutdownCtx); err != nil {
			slog.Error("http server shutdown failed", "error", err)
		}

		// 关闭数据通道,这将使SQLite writer处理完剩余数据后退出
		close(dataChan)
		
		return nil
	})

	// 等待所有协程结束
	if err := g.Wait(); err != nil && err != http.ErrServerClosed {
		slog.Error("errgroup exited with error", "error", err)
	}
	slog.Info("gateway shut down gracefully")
}

方案的局限性与未来迭代

这个架构解决了在不可靠网络环境下无损数据采集的核心痛点。然而,它并非没有缺点,在真实生产环境中,还需要考虑以下几点:

  1. 磁盘空间管理: SQLite数据库会无限增长。必须实现一个清理策略,例如定期删除已经发送成功且超过一定时间的旧数据。同时,需要监控磁盘使用率,当磁盘空间不足时,应停止接收新数据(返回503)以防止写满磁盘。
  2. 数据可观测性: 当前方案缺少关键指标的监控。需要引入Prometheus等监控系统,暴露如队列深度(SQLite中pending状态的记录数)、收发速率、Pulsar发送成功/失败率等指标。
  3. 死信队列: 对于某些格式错误或业务逻辑上无法处理的数据,在多次重试后应该将其从主流程中移出,存入一个“死信队列”(可以是SQLite中的另一张表),供后续人工分析,而不是无限次地重试,阻塞正常数据。
  4. 本地高可用: 当前设计中,Go进程本身是单点。虽然重启后数据不丢失,但在进程崩溃期间服务不可用。在要求更高的场景下,可以考虑在边缘节点上运行多个实例,通过本地的leader选举机制(如使用Consul或Etcd)来确保总有一个实例在工作。
  5. 配置动态化: 硬编码的配置(如Pulsar地址、Topic)不利于管理。可以通过集成一个配置中心或启动时从远程API拉取配置来使其更灵活。

  目录