Skip to content

并发访问

Go Cursor SDK 设计为线程安全的,可以在多个 goroutine 中安全地并发使用。

线程安全保证

CursorClient

CursorClient 及其所有 Reader 都是线程安全的:

go
client, err := cursor.NewCursorClient(nil)
if err != nil {
    log.Fatal(err)
}
defer client.Close()

var wg sync.WaitGroup

// 并发读取不同类型的数据
wg.Add(3)

go func() {
    defer wg.Done()
    sessions, _ := client.Sessions().ListSessions()
    fmt.Printf("Sessions: %d\n", len(sessions))
}()

go func() {
    defer wg.Done()
    composers, _ := client.Composers().ListComposers()
    fmt.Printf("Composers: %d\n", len(composers))
}()

go func() {
    defer wg.Done()
    stats, _ := client.Stats().GetDailyStats(time.Now())
    fmt.Printf("Stats: %+v\n", stats)
}()

wg.Wait()

并发模式

并行数据读取

同时读取多个数据源以提高性能:

go
type Result struct {
    Sessions  []cursor.Session
    Composers []cursor.Composer
    Stats     *cursor.DailyStats
    Error     error
}

func fetchAllData(client *cursor.CursorClient) *Result {
    result := &Result{}
    var wg sync.WaitGroup
    
    wg.Add(3)
    
    // 并发读取 sessions
    go func() {
        defer wg.Done()
        sessions, err := client.Sessions().ListSessions()
        if err != nil {
            result.Error = err
            return
        }
        result.Sessions = sessions
    }()
    
    // 并发读取 composers
    go func() {
        defer wg.Done()
        composers, err := client.Composers().ListComposers()
        if err != nil {
            result.Error = err
            return
        }
        result.Composers = composers
    }()
    
    // 并发读取 stats
    go func() {
        defer wg.Done()
        stats, err := client.Stats().GetDailyStats(time.Now())
        if err != nil {
            result.Error = err
            return
        }
        result.Stats = stats
    }()
    
    wg.Wait()
    return result
}

批量处理

使用 worker pool 模式处理大量数据:

go
func processSessions(client *cursor.CursorClient, workerCount int) error {
    sessions, err := client.Sessions().ListSessions()
    if err != nil {
        return err
    }
    
    // 创建任务通道
    jobs := make(chan cursor.Session, len(sessions))
    results := make(chan error, len(sessions))
    
    // 启动 workers
    var wg sync.WaitGroup
    for i := 0; i < workerCount; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for session := range jobs {
                // 处理 session
                if err := processSession(session); err != nil {
                    results <- err
                    continue
                }
                results <- nil
            }
        }()
    }
    
    // 发送任务
    for _, session := range sessions {
        jobs <- session
    }
    close(jobs)
    
    // 等待完成
    wg.Wait()
    close(results)
    
    // 检查错误
    for err := range results {
        if err != nil {
            return err
        }
    }
    
    return nil
}

func processSession(session cursor.Session) error {
    // 处理逻辑
    return nil
}

并发查询

同时查询多个时间范围的数据:

go
func getStatsForDateRange(client *cursor.CursorClient, start, end time.Time) ([]cursor.DailyStats, error) {
    days := int(end.Sub(start).Hours() / 24)
    results := make([]cursor.DailyStats, days+1)
    errors := make(chan error, days+1)
    
    var wg sync.WaitGroup
    
    for i := 0; i <= days; i++ {
        wg.Add(1)
        go func(index int) {
            defer wg.Done()
            date := start.AddDate(0, 0, index)
            stats, err := client.Stats().GetDailyStats(date)
            if err != nil {
                errors <- err
                return
            }
            results[index] = *stats
            errors <- nil
        }(i)
    }
    
    wg.Wait()
    close(errors)
    
    // 检查错误
    for err := range errors {
        if err != nil {
            return nil, err
        }
    }
    
    return results, nil
}

缓存与并发

启用缓存可以提高并发访问的性能:

go
config := &cursor.ClientConfig{
    EnableCache: true,
    CacheTTL:    10 * time.Minute,
}

client, err := cursor.NewCursorClient(config)
if err != nil {
    log.Fatal(err)
}
defer client.Close()

// 多个 goroutine 可以共享缓存
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
    wg.Add(1)
    go func() {
        defer wg.Done()
        // 第一次调用会读取数据库,后续调用使用缓存
        sessions, _ := client.Sessions().ListSessions()
        fmt.Printf("Got %d sessions\n", len(sessions))
    }()
}
wg.Wait()

// 查看缓存统计
stats := client.Cache().GetCacheStats()
fmt.Printf("Cache hit rate: %.2f%%\n", stats.HitRate*100)

资源管理

连接池

SDK 内部使用连接池管理数据库连接:

go
// 客户端会自动管理连接池
client, err := cursor.NewCursorClient(nil)
if err != nil {
    log.Fatal(err)
}
defer client.Close() // 关闭所有连接

// 多个 goroutine 共享连接池
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
    wg.Add(1)
    go func() {
        defer wg.Done()
        sessions, _ := client.Sessions().ListSessions()
        // 处理数据
        _ = sessions
    }()
}
wg.Wait()

超时控制

使用 context 控制并发操作的超时:

go
func fetchWithTimeout(client *cursor.CursorClient, timeout time.Duration) error {
    ctx, cancel := context.WithTimeout(context.Background(), timeout)
    defer cancel()
    
    done := make(chan error, 1)
    
    go func() {
        sessions, err := client.Sessions().ListSessions()
        if err != nil {
            done <- err
            return
        }
        // 处理 sessions
        _ = sessions
        done <- nil
    }()
    
    select {
    case err := <-done:
        return err
    case <-ctx.Done():
        return ctx.Err()
    }
}

最佳实践

1. 复用客户端实例

go
// ✅ 好的做法:复用客户端
var globalClient *cursor.CursorClient

func init() {
    var err error
    globalClient, err = cursor.NewCursorClient(nil)
    if err != nil {
        log.Fatal(err)
    }
}

func handler1() {
    sessions, _ := globalClient.Sessions().ListSessions()
    // 使用 sessions
}

func handler2() {
    composers, _ := globalClient.Composers().ListComposers()
    // 使用 composers
}
go
// ❌ 不好的做法:每次创建新客户端
func handler() {
    client, _ := cursor.NewCursorClient(nil)
    defer client.Close()
    sessions, _ := client.Sessions().ListSessions()
    // 使用 sessions
}

2. 使用缓存减少数据库访问

go
config := &cursor.ClientConfig{
    EnableCache: true,
    CacheTTL:    5 * time.Minute,
}
client, _ := cursor.NewCursorClient(config)

3. 限制并发数量

go
// 使用 semaphore 限制并发
sem := make(chan struct{}, 10) // 最多 10 个并发

var wg sync.WaitGroup
for i := 0; i < 100; i++ {
    wg.Add(1)
    go func() {
        defer wg.Done()
        
        sem <- struct{}{}        // 获取信号量
        defer func() { <-sem }() // 释放信号量
        
        sessions, _ := client.Sessions().ListSessions()
        // 处理 sessions
        _ = sessions
    }()
}
wg.Wait()

4. 错误处理

go
type SafeResult struct {
    mu       sync.Mutex
    sessions []cursor.Session
    err      error
}

func (r *SafeResult) SetSessions(sessions []cursor.Session) {
    r.mu.Lock()
    defer r.mu.Unlock()
    r.sessions = sessions
}

func (r *SafeResult) SetError(err error) {
    r.mu.Lock()
    defer r.mu.Unlock()
    if r.err == nil {
        r.err = err
    }
}

func (r *SafeResult) Get() ([]cursor.Session, error) {
    r.mu.Lock()
    defer r.mu.Unlock()
    return r.sessions, r.err
}

性能考虑

并发数量

  • 对于 I/O 密集型操作,可以使用较多的 goroutine
  • 对于 CPU 密集型操作,goroutine 数量应接近 CPU 核心数
  • 使用 runtime.NumCPU() 获取 CPU 核心数
go
workerCount := runtime.NumCPU() * 2

批量操作

批量操作比单个操作更高效:

go
// ✅ 好的做法:批量读取
sessions, err := client.Sessions().ListSessions()

// ❌ 不好的做法:逐个读取
for _, id := range sessionIDs {
    session, _ := client.Sessions().GetSession(id)
    // 处理 session
}

监控与调试

并发统计

go
var (
    activeGoroutines int64
    totalRequests    int64
)

func trackGoroutine() func() {
    atomic.AddInt64(&activeGoroutines, 1)
    atomic.AddInt64(&totalRequests, 1)
    
    return func() {
        atomic.AddInt64(&activeGoroutines, -1)
    }
}

func worker() {
    defer trackGoroutine()()
    
    // 执行工作
    sessions, _ := client.Sessions().ListSessions()
    _ = sessions
}

// 定期打印统计信息
go func() {
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()
    
    for range ticker.C {
        active := atomic.LoadInt64(&activeGoroutines)
        total := atomic.LoadInt64(&totalRequests)
        fmt.Printf("Active: %d, Total: %d\n", active, total)
    }
}()

相关文档

Released under the MIT License.