package main import ( "context" "database/sql" "fmt" "sync" "sync/atomic" "time" _ "github.com/mattn/go-sqlite3" // cgo 版(极致性能) ) type LogRow struct { TSns int64 // ts_ns TxTime time.Time // send time Proto string // Method string // GET/POST URL string // url TxHeader string // Tx HTTP Header TxBody string // Tx HTTP Body Status int // HTTP Stats RxTime time.Time // response time RxHeader string // Rx HTTP header RxBody string // Rx HTTP Body Modified bool // if the request has been modified } // Batcher:按“字节阈值/条数阈值/时间阈值”触发批量提交 type Batcher struct { db *sql.DB stmtInsert *sql.Stmt ch chan LogRow wg sync.WaitGroup ctx context.Context cancel context.CancelFunc maxRows int // 条数阈值,例如 1000 maxBytes int64 // 字节阈值,例如 2<<20 (2MB) flushEvery time.Duration // 时间阈值,例如 200ms curBytes atomic.Int64 // 原子计数(近似值) errOnce sync.Once lastErrStore atomic.Value // *error } func NewBatcher(db *sql.DB, maxRows int, maxBytes int64, flushEvery time.Duration) (*Batcher, error) { ctx, cancel := context.WithCancel(context.Background()) // PRAGMA 调优(仅示例,可按需调整) if _, err := db.Exec(`PRAGMA journal_mode=WAL;`); err != nil { return nil, fmt.Errorf("set WAL: %w", err) } if _, err := db.Exec(`PRAGMA synchronous=NORMAL;`); err != nil { return nil, fmt.Errorf("set synchronous: %w", err) } // 只开 1 个写连接(SQLite 单写者) db.SetMaxOpenConns(1) // 建表(示例) if _, err := db.Exec(` CREATE TABLE IF NOT EXISTS http_logs ( ts_ns INTEGER NOT NULL, tx_time DATETIME NOT NULL, proto TEXT NOT NULL, method TEXT NOT NULL, url TEXT NOT NULL, tx_header TEXT NOT NULL, tx_body TEXT NOT NULL, status INTEGER NOT NULL, rx_time DATETIME NOT NULL, rx_header TEXT NOT NULL, rx_body TEXT NOT NULL, modified INTEGER NOT NULL )`); err != nil { return nil, err } // 预编译语句(长期复用) stmt, err := db.Prepare("INSERT INTO http_logs " + "(ts_ns, tx_time, proto, method, url, tx_header, tx_body, status, rx_time, rx_header, rx_body, modified) " + "VALUES (?,?,?,?,?,?,?,?,?,?,?,?)") if err != nil { return nil, err } b := &Batcher{ db: db, stmtInsert: stmt, ch: make(chan LogRow, maxRows*4), // 适当留余量 ctx: ctx, cancel: cancel, maxRows: maxRows, maxBytes: maxBytes, flushEvery: flushEvery, } b.curBytes.Store(0) b.wg.Add(1) go b.loop() return b, nil } func (b *Batcher) Close() error { b.cancel() b.wg.Wait() _ = b.stmtInsert.Close() return nil } func (b *Batcher) Err() error { v := b.lastErrStore.Load() if v == nil { return nil } return *(v.(*error)) } // Write:投递一条日志(异步)。返回可能的累积错误(如果后台 flush 失败)。 func (b *Batcher) Write(row LogRow) error { // 估算本条大小(粗略:字段长度之和 + 头部开销) est := int64(184 + len(row.Proto) + len(row.Method) + len(row.URL) + len(row.TxHeader) + len(row.TxBody) + len(row.TxHeader) + len(row.RxBody)) b.curBytes.Add(est) select { case b.ch <- row: return b.Err() case <-b.ctx.Done(): return b.Err() } } func (b *Batcher) loop() { defer b.wg.Done() buf := make([]LogRow, 0, b.maxRows) ticker := time.NewTicker(b.flushEvery) defer ticker.Stop() flush := func() { if len(buf) == 0 { return } if err := b.flushBatch(buf); err != nil { b.errOnce.Do(func() { e := err b.lastErrStore.Store(&e) }) } // 清空缓冲及字节计数 buf = buf[:0] b.curBytes.Store(0) } for { select { case <-b.ctx.Done(): flush() return case <-ticker.C: flush() case row := <-b.ch: buf = append(buf, row) // 条数触发 if len(buf) >= b.maxRows { flush() continue } // 字节触发 if b.curBytes.Load() >= b.maxBytes { flush() continue } } } } func (b *Batcher) flushBatch(batch []LogRow) error { // 单事务提交整批 tx, err := b.db.Begin() if err != nil { return err } // 复用预编译语句:在事务上下文中临时 Re-prepare 性能更好,但简单起见直接用全局 stmt。 // 如需极致性能,可在此 tx.Prepare 一次专用 stmtInsertTX。 for _, r := range batch { if _, err := tx.Stmt(b.stmtInsert).Exec( r.TSns, r.TxTime, r.Proto, r.Method, r.URL, r.TxHeader, r.TxBody, r.Status, r.RxTime, r.RxHeader, r.RxBody, r.Modified, ); err != nil { _ = tx.Rollback() return err } } return tx.Commit() }