并发访问
Go Cursor SDK 设计为线程安全的,可以在多个 goroutine 中安全地并发使用。
线程安全保证
CursorClient
CursorClient 及其所有 Reader 都是线程安全的:
go
client, err := cursor.NewCursorClient(nil)
if err != nil {
log.Fatal(err)
}
defer client.Close()
var wg sync.WaitGroup
// 并发读取不同类型的数据
wg.Add(3)
go func() {
defer wg.Done()
sessions, _ := client.Sessions().ListSessions()
fmt.Printf("Sessions: %d\n", len(sessions))
}()
go func() {
defer wg.Done()
composers, _ := client.Composers().ListComposers()
fmt.Printf("Composers: %d\n", len(composers))
}()
go func() {
defer wg.Done()
stats, _ := client.Stats().GetDailyStats(time.Now())
fmt.Printf("Stats: %+v\n", stats)
}()
wg.Wait()并发模式
并行数据读取
同时读取多个数据源以提高性能:
go
type Result struct {
Sessions []cursor.Session
Composers []cursor.Composer
Stats *cursor.DailyStats
Error error
}
func fetchAllData(client *cursor.CursorClient) *Result {
result := &Result{}
var wg sync.WaitGroup
wg.Add(3)
// 并发读取 sessions
go func() {
defer wg.Done()
sessions, err := client.Sessions().ListSessions()
if err != nil {
result.Error = err
return
}
result.Sessions = sessions
}()
// 并发读取 composers
go func() {
defer wg.Done()
composers, err := client.Composers().ListComposers()
if err != nil {
result.Error = err
return
}
result.Composers = composers
}()
// 并发读取 stats
go func() {
defer wg.Done()
stats, err := client.Stats().GetDailyStats(time.Now())
if err != nil {
result.Error = err
return
}
result.Stats = stats
}()
wg.Wait()
return result
}批量处理
使用 worker pool 模式处理大量数据:
go
func processSessions(client *cursor.CursorClient, workerCount int) error {
sessions, err := client.Sessions().ListSessions()
if err != nil {
return err
}
// 创建任务通道
jobs := make(chan cursor.Session, len(sessions))
results := make(chan error, len(sessions))
// 启动 workers
var wg sync.WaitGroup
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for session := range jobs {
// 处理 session
if err := processSession(session); err != nil {
results <- err
continue
}
results <- nil
}
}()
}
// 发送任务
for _, session := range sessions {
jobs <- session
}
close(jobs)
// 等待完成
wg.Wait()
close(results)
// 检查错误
for err := range results {
if err != nil {
return err
}
}
return nil
}
func processSession(session cursor.Session) error {
// 处理逻辑
return nil
}并发查询
同时查询多个时间范围的数据:
go
func getStatsForDateRange(client *cursor.CursorClient, start, end time.Time) ([]cursor.DailyStats, error) {
days := int(end.Sub(start).Hours() / 24)
results := make([]cursor.DailyStats, days+1)
errors := make(chan error, days+1)
var wg sync.WaitGroup
for i := 0; i <= days; i++ {
wg.Add(1)
go func(index int) {
defer wg.Done()
date := start.AddDate(0, 0, index)
stats, err := client.Stats().GetDailyStats(date)
if err != nil {
errors <- err
return
}
results[index] = *stats
errors <- nil
}(i)
}
wg.Wait()
close(errors)
// 检查错误
for err := range errors {
if err != nil {
return nil, err
}
}
return results, nil
}缓存与并发
启用缓存可以提高并发访问的性能:
go
config := &cursor.ClientConfig{
EnableCache: true,
CacheTTL: 10 * time.Minute,
}
client, err := cursor.NewCursorClient(config)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// 多个 goroutine 可以共享缓存
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
// 第一次调用会读取数据库,后续调用使用缓存
sessions, _ := client.Sessions().ListSessions()
fmt.Printf("Got %d sessions\n", len(sessions))
}()
}
wg.Wait()
// 查看缓存统计
stats := client.Cache().GetCacheStats()
fmt.Printf("Cache hit rate: %.2f%%\n", stats.HitRate*100)资源管理
连接池
SDK 内部使用连接池管理数据库连接:
go
// 客户端会自动管理连接池
client, err := cursor.NewCursorClient(nil)
if err != nil {
log.Fatal(err)
}
defer client.Close() // 关闭所有连接
// 多个 goroutine 共享连接池
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
sessions, _ := client.Sessions().ListSessions()
// 处理数据
_ = sessions
}()
}
wg.Wait()超时控制
使用 context 控制并发操作的超时:
go
func fetchWithTimeout(client *cursor.CursorClient, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
done := make(chan error, 1)
go func() {
sessions, err := client.Sessions().ListSessions()
if err != nil {
done <- err
return
}
// 处理 sessions
_ = sessions
done <- nil
}()
select {
case err := <-done:
return err
case <-ctx.Done():
return ctx.Err()
}
}最佳实践
1. 复用客户端实例
go
// ✅ 好的做法:复用客户端
var globalClient *cursor.CursorClient
func init() {
var err error
globalClient, err = cursor.NewCursorClient(nil)
if err != nil {
log.Fatal(err)
}
}
func handler1() {
sessions, _ := globalClient.Sessions().ListSessions()
// 使用 sessions
}
func handler2() {
composers, _ := globalClient.Composers().ListComposers()
// 使用 composers
}go
// ❌ 不好的做法:每次创建新客户端
func handler() {
client, _ := cursor.NewCursorClient(nil)
defer client.Close()
sessions, _ := client.Sessions().ListSessions()
// 使用 sessions
}2. 使用缓存减少数据库访问
go
config := &cursor.ClientConfig{
EnableCache: true,
CacheTTL: 5 * time.Minute,
}
client, _ := cursor.NewCursorClient(config)3. 限制并发数量
go
// 使用 semaphore 限制并发
sem := make(chan struct{}, 10) // 最多 10 个并发
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
sem <- struct{}{} // 获取信号量
defer func() { <-sem }() // 释放信号量
sessions, _ := client.Sessions().ListSessions()
// 处理 sessions
_ = sessions
}()
}
wg.Wait()4. 错误处理
go
type SafeResult struct {
mu sync.Mutex
sessions []cursor.Session
err error
}
func (r *SafeResult) SetSessions(sessions []cursor.Session) {
r.mu.Lock()
defer r.mu.Unlock()
r.sessions = sessions
}
func (r *SafeResult) SetError(err error) {
r.mu.Lock()
defer r.mu.Unlock()
if r.err == nil {
r.err = err
}
}
func (r *SafeResult) Get() ([]cursor.Session, error) {
r.mu.Lock()
defer r.mu.Unlock()
return r.sessions, r.err
}性能考虑
并发数量
- 对于 I/O 密集型操作,可以使用较多的 goroutine
- 对于 CPU 密集型操作,goroutine 数量应接近 CPU 核心数
- 使用
runtime.NumCPU()获取 CPU 核心数
go
workerCount := runtime.NumCPU() * 2批量操作
批量操作比单个操作更高效:
go
// ✅ 好的做法:批量读取
sessions, err := client.Sessions().ListSessions()
// ❌ 不好的做法:逐个读取
for _, id := range sessionIDs {
session, _ := client.Sessions().GetSession(id)
// 处理 session
}监控与调试
并发统计
go
var (
activeGoroutines int64
totalRequests int64
)
func trackGoroutine() func() {
atomic.AddInt64(&activeGoroutines, 1)
atomic.AddInt64(&totalRequests, 1)
return func() {
atomic.AddInt64(&activeGoroutines, -1)
}
}
func worker() {
defer trackGoroutine()()
// 执行工作
sessions, _ := client.Sessions().ListSessions()
_ = sessions
}
// 定期打印统计信息
go func() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
active := atomic.LoadInt64(&activeGoroutines)
total := atomic.LoadInt64(&totalRequests)
fmt.Printf("Active: %d, Total: %d\n", active, total)
}
}()