mitm/db.go

202 lines
4.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"database/sql"
"fmt"
"sync"
"sync/atomic"
"time"
_ "github.com/mattn/go-sqlite3" // cgo 版(极致性能)
)
type LogRow struct {
TSns int64 // ts_ns
TxTime time.Time // send time
Proto string //
Method string // GET/POST
URL string // url
TxHeader string // Tx HTTP Header
TxBody string // Tx HTTP Body
Status int // HTTP Stats
RxTime time.Time // response time
RxHeader string // Rx HTTP header
RxBody string // Rx HTTP Body
Modified bool // if the request has been modified
}
// Batcher按“字节阈值/条数阈值/时间阈值”触发批量提交
type Batcher struct {
db *sql.DB
stmtInsert *sql.Stmt
ch chan LogRow
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
maxRows int // 条数阈值,例如 1000
maxBytes int64 // 字节阈值,例如 2<<20 (2MB)
flushEvery time.Duration // 时间阈值,例如 200ms
curBytes atomic.Int64 // 原子计数(近似值)
errOnce sync.Once
lastErrStore atomic.Value // *error
}
func NewBatcher(db *sql.DB, maxRows int, maxBytes int64, flushEvery time.Duration) (*Batcher, error) {
ctx, cancel := context.WithCancel(context.Background())
// PRAGMA 调优(仅示例,可按需调整)
if _, err := db.Exec(`PRAGMA journal_mode=WAL;`); err != nil {
return nil, fmt.Errorf("set WAL: %w", err)
}
if _, err := db.Exec(`PRAGMA synchronous=NORMAL;`); err != nil {
return nil, fmt.Errorf("set synchronous: %w", err)
}
// 只开 1 个写连接SQLite 单写者)
db.SetMaxOpenConns(1)
// 建表(示例)
if _, err := db.Exec(`
CREATE TABLE IF NOT EXISTS http_logs (
ts_ns INTEGER NOT NULL,
tx_time DATETIME NOT NULL,
proto TEXT NOT NULL,
method TEXT NOT NULL,
url TEXT NOT NULL,
tx_header TEXT NOT NULL,
tx_body TEXT NOT NULL,
status INTEGER NOT NULL,
rx_time DATETIME NOT NULL,
rx_header TEXT NOT NULL,
rx_body TEXT NOT NULL,
modified INTEGER NOT NULL
)`); err != nil {
return nil, err
}
// 预编译语句(长期复用)
stmt, err := db.Prepare("INSERT INTO http_logs " +
"(ts_ns, tx_time, proto, method, url, tx_header, tx_body, status, rx_time, rx_header, rx_body, modified) " +
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?)")
if err != nil {
return nil, err
}
b := &Batcher{
db: db,
stmtInsert: stmt,
ch: make(chan LogRow, maxRows*4), // 适当留余量
ctx: ctx,
cancel: cancel,
maxRows: maxRows,
maxBytes: maxBytes,
flushEvery: flushEvery,
}
b.curBytes.Store(0)
b.wg.Add(1)
go b.loop()
return b, nil
}
func (b *Batcher) Close() error {
b.cancel()
b.wg.Wait()
_ = b.stmtInsert.Close()
return nil
}
func (b *Batcher) Err() error {
v := b.lastErrStore.Load()
if v == nil {
return nil
}
return *(v.(*error))
}
// Write投递一条日志异步。返回可能的累积错误如果后台 flush 失败)。
func (b *Batcher) Write(row LogRow) error {
// 估算本条大小(粗略:字段长度之和 + 头部开销)
est := int64(184 +
len(row.Proto) +
len(row.Method) +
len(row.URL) +
len(row.TxHeader) +
len(row.TxBody) +
len(row.TxHeader) +
len(row.RxBody))
b.curBytes.Add(est)
select {
case b.ch <- row:
return b.Err()
case <-b.ctx.Done():
return b.Err()
}
}
func (b *Batcher) loop() {
defer b.wg.Done()
buf := make([]LogRow, 0, b.maxRows)
ticker := time.NewTicker(b.flushEvery)
defer ticker.Stop()
flush := func() {
if len(buf) == 0 {
return
}
if err := b.flushBatch(buf); err != nil {
b.errOnce.Do(func() {
e := err
b.lastErrStore.Store(&e)
})
}
// 清空缓冲及字节计数
buf = buf[:0]
b.curBytes.Store(0)
}
for {
select {
case <-b.ctx.Done():
flush()
return
case <-ticker.C:
flush()
case row := <-b.ch:
buf = append(buf, row)
// 条数触发
if len(buf) >= b.maxRows {
flush()
continue
}
// 字节触发
if b.curBytes.Load() >= b.maxBytes {
flush()
continue
}
}
}
}
func (b *Batcher) flushBatch(batch []LogRow) error {
// 单事务提交整批
tx, err := b.db.Begin()
if err != nil {
return err
}
// 复用预编译语句:在事务上下文中临时 Re-prepare 性能更好,但简单起见直接用全局 stmt。
// 如需极致性能,可在此 tx.Prepare 一次专用 stmtInsertTX。
for _, r := range batch {
if _, err := tx.Stmt(b.stmtInsert).Exec(
r.TSns, r.TxTime, r.Proto, r.Method, r.URL, r.TxHeader, r.TxBody, r.Status, r.RxTime, r.RxHeader, r.RxBody, r.Modified,
); err != nil {
_ = tx.Rollback()
return err
}
}
return tx.Commit()
}