202 lines
4.7 KiB
Go
202 lines
4.7 KiB
Go
package main
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"fmt"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
_ "github.com/mattn/go-sqlite3" // cgo 版(极致性能)
|
||
)
|
||
|
||
type LogRow struct {
|
||
TSns int64 // ts_ns
|
||
TxTime time.Time // send time
|
||
Proto string //
|
||
Method string // GET/POST
|
||
URL string // url
|
||
TxHeader string // Tx HTTP Header
|
||
TxBody string // Tx HTTP Body
|
||
Status int // HTTP Stats
|
||
RxTime time.Time // response time
|
||
RxHeader string // Rx HTTP header
|
||
RxBody string // Rx HTTP Body
|
||
Modified bool // if the request has been modified
|
||
}
|
||
|
||
// Batcher:按“字节阈值/条数阈值/时间阈值”触发批量提交
|
||
type Batcher struct {
|
||
db *sql.DB
|
||
stmtInsert *sql.Stmt
|
||
ch chan LogRow
|
||
wg sync.WaitGroup
|
||
ctx context.Context
|
||
cancel context.CancelFunc
|
||
maxRows int // 条数阈值,例如 1000
|
||
maxBytes int64 // 字节阈值,例如 2<<20 (2MB)
|
||
flushEvery time.Duration // 时间阈值,例如 200ms
|
||
curBytes atomic.Int64 // 原子计数(近似值)
|
||
errOnce sync.Once
|
||
lastErrStore atomic.Value // *error
|
||
}
|
||
|
||
func NewBatcher(db *sql.DB, maxRows int, maxBytes int64, flushEvery time.Duration) (*Batcher, error) {
|
||
ctx, cancel := context.WithCancel(context.Background())
|
||
|
||
// PRAGMA 调优(仅示例,可按需调整)
|
||
if _, err := db.Exec(`PRAGMA journal_mode=WAL;`); err != nil {
|
||
return nil, fmt.Errorf("set WAL: %w", err)
|
||
}
|
||
if _, err := db.Exec(`PRAGMA synchronous=NORMAL;`); err != nil {
|
||
return nil, fmt.Errorf("set synchronous: %w", err)
|
||
}
|
||
// 只开 1 个写连接(SQLite 单写者)
|
||
db.SetMaxOpenConns(1)
|
||
|
||
// 建表(示例)
|
||
if _, err := db.Exec(`
|
||
CREATE TABLE IF NOT EXISTS http_logs (
|
||
ts_ns INTEGER NOT NULL,
|
||
tx_time DATETIME NOT NULL,
|
||
proto TEXT NOT NULL,
|
||
method TEXT NOT NULL,
|
||
url TEXT NOT NULL,
|
||
tx_header TEXT NOT NULL,
|
||
tx_body TEXT NOT NULL,
|
||
status INTEGER NOT NULL,
|
||
rx_time DATETIME NOT NULL,
|
||
rx_header TEXT NOT NULL,
|
||
rx_body TEXT NOT NULL,
|
||
modified INTEGER NOT NULL
|
||
)`); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 预编译语句(长期复用)
|
||
stmt, err := db.Prepare("INSERT INTO http_logs " +
|
||
"(ts_ns, tx_time, proto, method, url, tx_header, tx_body, status, rx_time, rx_header, rx_body, modified) " +
|
||
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?)")
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
b := &Batcher{
|
||
db: db,
|
||
stmtInsert: stmt,
|
||
ch: make(chan LogRow, maxRows*4), // 适当留余量
|
||
ctx: ctx,
|
||
cancel: cancel,
|
||
maxRows: maxRows,
|
||
maxBytes: maxBytes,
|
||
flushEvery: flushEvery,
|
||
}
|
||
b.curBytes.Store(0)
|
||
b.wg.Add(1)
|
||
go b.loop()
|
||
return b, nil
|
||
}
|
||
|
||
func (b *Batcher) Close() error {
|
||
b.cancel()
|
||
b.wg.Wait()
|
||
_ = b.stmtInsert.Close()
|
||
return nil
|
||
}
|
||
|
||
func (b *Batcher) Err() error {
|
||
v := b.lastErrStore.Load()
|
||
if v == nil {
|
||
return nil
|
||
}
|
||
return *(v.(*error))
|
||
}
|
||
|
||
// Write:投递一条日志(异步)。返回可能的累积错误(如果后台 flush 失败)。
|
||
func (b *Batcher) Write(row LogRow) error {
|
||
// 估算本条大小(粗略:字段长度之和 + 头部开销)
|
||
est := int64(184 +
|
||
len(row.Proto) +
|
||
len(row.Method) +
|
||
len(row.URL) +
|
||
len(row.TxHeader) +
|
||
len(row.TxBody) +
|
||
len(row.TxHeader) +
|
||
len(row.RxBody))
|
||
b.curBytes.Add(est)
|
||
|
||
select {
|
||
case b.ch <- row:
|
||
return b.Err()
|
||
case <-b.ctx.Done():
|
||
return b.Err()
|
||
}
|
||
}
|
||
|
||
func (b *Batcher) loop() {
|
||
defer b.wg.Done()
|
||
|
||
buf := make([]LogRow, 0, b.maxRows)
|
||
ticker := time.NewTicker(b.flushEvery)
|
||
defer ticker.Stop()
|
||
|
||
flush := func() {
|
||
if len(buf) == 0 {
|
||
return
|
||
}
|
||
if err := b.flushBatch(buf); err != nil {
|
||
b.errOnce.Do(func() {
|
||
e := err
|
||
b.lastErrStore.Store(&e)
|
||
})
|
||
}
|
||
// 清空缓冲及字节计数
|
||
buf = buf[:0]
|
||
b.curBytes.Store(0)
|
||
}
|
||
|
||
for {
|
||
select {
|
||
case <-b.ctx.Done():
|
||
flush()
|
||
return
|
||
|
||
case <-ticker.C:
|
||
flush()
|
||
|
||
case row := <-b.ch:
|
||
buf = append(buf, row)
|
||
// 条数触发
|
||
if len(buf) >= b.maxRows {
|
||
flush()
|
||
continue
|
||
}
|
||
// 字节触发
|
||
if b.curBytes.Load() >= b.maxBytes {
|
||
flush()
|
||
continue
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
func (b *Batcher) flushBatch(batch []LogRow) error {
|
||
// 单事务提交整批
|
||
tx, err := b.db.Begin()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
// 复用预编译语句:在事务上下文中临时 Re-prepare 性能更好,但简单起见直接用全局 stmt。
|
||
// 如需极致性能,可在此 tx.Prepare 一次专用 stmtInsertTX。
|
||
for _, r := range batch {
|
||
if _, err := tx.Stmt(b.stmtInsert).Exec(
|
||
r.TSns, r.TxTime, r.Proto, r.Method, r.URL, r.TxHeader, r.TxBody, r.Status, r.RxTime, r.RxHeader, r.RxBody, r.Modified,
|
||
); err != nil {
|
||
_ = tx.Rollback()
|
||
return err
|
||
}
|
||
}
|
||
return tx.Commit()
|
||
}
|