V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
anviod
V2EX  ›  Go 编程语言

[Go]消息发布订阅的代码 单元测试始终不通过 请教一下

  •  
  •   anviod · 17 小时 53 分钟前 · 296 次点击

    单元测试代码

    func TestDynamicExpansion(t *testing.T) 测试不通过

    package eventBus
    
    import (
    	"testing"
    	"time"
    )
    // 该测试不通过
    func TestDynamicExpansion(t *testing.T) {
    	ps, err := NewPubsub(2) // 初始化缓冲区大小为 2
    	if err != nil {
    		t.Fatalf("Failed to create pubsub: %v", err)
    	}
    	defer ps.Close()
    
    	ch, err := ps.Subscribe("test")
    	if err != nil {
    		t.Fatalf("Failed to subscribe to topic: %v", err)
    	}
    
    	// 发布 3 条消息,触发动态扩容
    	ps.Publish("test", "msg1")
    	ps.Publish("test", "msg2")
    	ps.Publish("test", "msg3")
    
    	// 读取消息
    	expectedMessages := map[string]bool{"msg1": false, "msg2": false, "msg3": false}
    	for i := 0; i < 3; i++ {
    		msg := <-ch
    		strMsg := msg.(string) // 类型断言
    		if _, ok := expectedMessages[strMsg]; !ok {
    			t.Errorf("Unexpected message: %v", strMsg)
    		}
    		expectedMessages[strMsg] = true
    	}
    
    	for msg, received := range expectedMessages {
    		if !received {
    			t.Errorf("Expected message %v not received", msg)
    		}
    	}
    
    	ps.Unsubscribe("test", ch)
    	ps.Close()
    }
    
    func TestBus_SubscribeAndPublish(t *testing.T) {
    	ps, err := NewPubsub(10)
    	if err != nil {
    		t.Fatalf("Failed to create pubsub: %v", err)
    	}
    	defer ps.Close()
    
    	topic := "test_topic"
    	ch, err := ps.Subscribe(topic)
    	if err != nil {
    		t.Fatalf("Failed to subscribe to topic: %v", err)
    	}
    
    	msg := "test_message"
    	go func() {
    		if err := ps.Publish(topic, msg); err != nil {
    			t.Errorf("Failed to publish message: %v", err)
    		}
    	}()
    
    	select {
    	case received := <-ch:
    		if received != msg {
    			t.Errorf("Expected message %v, but got %v", msg, received)
    		}
    	case <-time.After(time.Second):
    		t.Errorf("Timeout waiting for message")
    	}
    }
    
    func TestBus_UnSubscribe(t *testing.T) {
    	ps, err := NewPubsub(10)
    	if err != nil {
    		t.Fatalf("Failed to create pubsub: %v", err)
    	}
    	defer ps.Close()
    
    	topic := "test_topic"
    	ch, err := ps.Subscribe(topic)
    	if err != nil {
    		t.Fatalf("Failed to subscribe to topic: %v", err)
    	}
    
    	if err := ps.Unsubscribe(topic, ch); err != nil {
    		t.Fatalf("Failed to unsubscribe from topic: %v", err)
    	}
    
    	msg := "test_message"
    	go func() {
    		if err := ps.Publish(topic, msg); err != nil {
    			t.Errorf("Failed to publish message: %v", err)
    		}
    	}()
    
    	select {
    	case <-ch:
    		t.Errorf("Received message after unsubscribe")
    	case <-time.After(time.Second):
    		// Expected timeout
    	}
    }
    
    func BenchmarkBus_Publish(b *testing.B) {
    	ps, err := NewPubsub(100)
    	if err != nil {
    		b.Fatalf("Failed to create pubsub: %v", err)
    	}
    	defer ps.Close()
    
    	topic := "benchmark_topic"
    	ch, err := ps.Subscribe(topic)
    	if err != nil {
    		b.Fatalf("Failed to subscribe to topic: %v", err)
    	}
    
    	msg := "benchmark_message"
    	b.ResetTimer()
    
    	for i := 0; i < b.N; i++ {
    		go func() {
    			if err := ps.Publish(topic, msg); err != nil {
    				b.Errorf("Failed to publish message: %v", err)
    			}
    		}()
    
    		select {
    		case <-ch:
    		case <-time.After(time.Second):
    			b.Errorf("Timeout waiting for message")
    		}
    	}
    }
    
    func BenchmarkBus_ConcurrentPublish(b *testing.B) {
    	ps, err := NewPubsub(2048)
    	if err != nil {
    		b.Fatalf("Failed to create pubsub: %v", err)
    	}
    	defer ps.Close()
    
    	topic := "benchmark_topic"
    	ch, err := ps.Subscribe(topic)
    	if err != nil {
    		b.Fatalf("Failed to subscribe to topic: %v", err)
    	}
    
    	msg := "benchmark_message"
    	b.ResetTimer()
    
    	for i := 0; i < b.N; i++ {
    		go func() {
    			if err := ps.Publish(topic, msg); err != nil {
    				b.Errorf("Failed to publish message: %v", err)
    			}
    		}()
    
    		select {
    		case <-ch:
    		case <-time.After(time.Second):
    			b.Errorf("Timeout waiting for message")
    		}
    	}
    }
    
    func TestPubsub(t *testing.T) {
    	ps, err := NewPubsub(10)
    	if err != nil {
    		t.Fatalf("Failed to create pubsub: %v", err)
    	}
    
    	topic := "testTopic"
    	msg := "testMessage"
    
    	// Subscribe to the topic
    	ch, err := ps.Subscribe(topic)
    	if err != nil {
    		t.Fatalf("Failed to subscribe to topic: %v", err)
    	}
    
    	// Publish a message to the topic
    	err = ps.Publish(topic, msg)
    	if err != nil {
    		t.Fatalf("Failed to publish message: %v", err)
    	}
    
    	// Verify the message is received
    	select {
    	case receivedMsg := <-ch:
    		if receivedMsg != msg {
    			t.Fatalf("Expected message %v, but got %v", msg, receivedMsg)
    		}
    	case <-time.After(time.Second):
    		t.Fatal("Timeout waiting for message")
    	}
    
    	// Unsubscribe from the topic
    	err = ps.Unsubscribe(topic, ch)
    	if err != nil {
    		t.Fatalf("Failed to unsubscribe from topic: %v", err)
    	}
    
    	// Close the pubsub
    	err = ps.Close()
    	if err != nil {
    		t.Fatalf("Failed to close pubsub: %v", err)
    	}
    }
    
    

    源代码如下

    package eventBus
    
    import (
    	"errors"
    	"sync"
    	"time"
    )
    
    const (
    	pubTimeout = time.Millisecond * 10
    )
    
    var (
    	ErrPubsubTimeout = errors.New("failed to send message to topic because of timeout")
    	ErrChannelFull   = errors.New("channel is full")
    )
    
    type Pubsub interface {
    	Publish(topic string, msg interface{}) error
    	Subscribe(topic string) (chan interface{}, error)
    	Unsubscribe(topic string, ch chan interface{}) error
    	Close() error
    }
    
    type pubsub struct {
    	size     int
    	channels map[string]map[chan interface{}]struct{}
    	mu       sync.RWMutex
    }
    
    var channelPool *sync.Pool
    
    // NewPubsub 初始化 pubsub 系统,使用默认的通道大小
    func NewPubsub(size int) (Pubsub, error) {
    	// 初始化通道池,使用给定的大小作为默认缓冲区大小
    	channelPool = &sync.Pool{
    		New: func() interface{} {
    			return make(chan interface{}, size)
    		},
    	}
    
    	return &pubsub{
    		size:     size,
    		channels: make(map[string]map[chan interface{}]struct{}),
    	}, nil
    }
    
    // getChannelFromPool 从池中获取一个通道
    func getChannelFromPool() chan interface{} {
    	return channelPool.Get().(chan interface{})
    }
    
    // putChannelToPool 将通道放回池中,并清空其内容
    func putChannelToPool(ch chan interface{}) {
    	for len(ch) > 0 {
    		<-ch
    	}
    	channelPool.Put(ch)
    }
    
    // Publish 向订阅者发送消息
    func (m *pubsub) Publish(topic string, msg interface{}) error {
    	m.mu.RLock() // 读锁,允许并发读取
    	defer m.mu.RUnlock()
    
    	if chs, ok := m.channels[topic]; ok {
    		for ch := range chs {
    			if err := m.publish(topic, ch, msg); err != nil {
    				return err
    			}
    		}
    	}
    	return nil
    }
    
    // publish 尝试向单个通道发送消息,并处理通道扩容
    func (m *pubsub) publish(topic string, ch chan interface{}, msg interface{}) error {
    	// 尝试向现有通道发送消息
    	select {
    	case ch <- msg: // 尝试发送消息
    		return nil
    	default:
    		// 通道已满,需要动态扩容
    		newCh := make(chan interface{}, cap(ch)*2) // 扩容为原来的两倍
    
    		// 使用锁确保旧通道被正确替换为新通道
    		m.mu.Lock()
    		defer m.mu.Unlock()
    
    		// 将旧通道中的消息移动到新通道
    		go func() {
    			for v := range ch {
    				newCh <- v
    			}
    		}()
    
    		// 更新通道映射,指向新通道
    		if chs, ok := m.channels[topic]; ok {
    			delete(chs, ch)         // 删除旧通道
    			chs[newCh] = struct{}{} // 添加新扩容的通道
    		}
    
    		// 尝试向新扩容的通道发送消息
    		select {
    		case newCh <- msg:
    			return nil
    		case <-time.After(pubTimeout):
    			return ErrPubsubTimeout
    		}
    	}
    }
    
    // Subscribe 为给定主题创建一个新的订阅者通道
    func (m *pubsub) Subscribe(topic string) (chan interface{}, error) {
    	ch := getChannelFromPool() // 从池中获取一个通道
    
    	m.mu.Lock()
    	defer m.mu.Unlock()
    
    	if _, ok := m.channels[topic]; !ok {
    		m.channels[topic] = make(map[chan interface{}]struct{})
    	}
    	m.channels[topic][ch] = struct{}{} // 存储通道到主题的映射中
    	return ch, nil
    }
    
    // Unsubscribe 从给定主题中移除订阅者通道
    func (m *pubsub) Unsubscribe(topic string, ch chan interface{}) error {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    
    	if chs, ok := m.channels[topic]; ok {
    		delete(chs, ch)      // 从主题的订阅者列表中移除通道
    		putChannelToPool(ch) // 将通道放回池中
    	}
    	return nil
    }
    
    // Close 关闭 pubsub 系统,关闭所有通道并清理资源
    func (m *pubsub) Close() error {
    	m.mu.Lock()
    	defer m.mu.Unlock()
    
    	for topic, chs := range m.channels {
    		for ch := range chs {
    			close(ch)
    		}
    		delete(m.channels, topic)
    	}
    	return nil
    }
    
    

    测试结果

    Running tool: C:\Program Files\Go\bin\go.exe test -timeout 30s -run ^TestDynamicExpansion$ codex/src/eventBus
    
    panic: test timed out after 30s
    	running tests:
    		TestDynamicExpansion (30s)
    
    goroutine 7 [running]:
    testing.(*M).startAlarm.func1()
    	C:/Program Files/Go/src/testing/testing.go:2484 +0x394
    created by time.goFunc
    	C:/Program Files/Go/src/time/sleep.go:215 +0x2d
    
    goroutine 1 [chan receive]:
    testing.(*T).Run(0xc000003340, {0xef7e20?, 0x7ffef7500e50?}, 0xf02878)
    	C:/Program Files/Go/src/testing/testing.go:1859 +0x414
    testing.runTests.func1(0xc000003340)
    	C:/Program Files/Go/src/testing/testing.go:2279 +0x37
    testing.tRunner(0xc000003340, 0xc00002bc70)
    	C:/Program Files/Go/src/testing/testing.go:1792 +0xcb
    testing.runTests(0xc0000080d8, {0x102fde0, 0x4, 0x4}, {0x1035d00?, 0x7?, 0x1034b80?})
    	C:/Program Files/Go/src/testing/testing.go:2277 +0x4b4
    testing.(*M).Run(0xc00007a320)
    	C:/Program Files/Go/src/testing/testing.go:2142 +0x64a
    main.main()
    	_testmain.go:55 +0x9b
    
    goroutine 6 [sync.RWMutex.Lock]:
    sync.runtime_SemacquireRWMutex(0xd75301?, 0x40?, 0xec97c0?)
    	C:/Program Files/Go/src/runtime/sema.go:105 +0x25
    sync.(*RWMutex).Lock(0x8080808080808074?)
    	C:/Program Files/Go/src/sync/rwmutex.go:155 +0x6a
    codex/src/eventBus.(*pubsub).publish(0xc000020a80, {0xef3abd, 0x4}, 0xc00012a2a0, {0xec3120, 0xf2f220})
    	d:/code/X/codex/src/eventBus/bus.go:87 +0xc5
    codex/src/eventBus.(*pubsub).Publish(0xc000020a80, {0xef3abd, 0x4}, {0xec3120, 0xf2f220})
    	d:/code/X/codex/src/eventBus/bus.go:68 +0x14b
    codex/src/eventBus.TestDynamicExpansion(0xc000003500)
    	d:/code/X/codex/src/eventBus/bus_test.go:23 +0x1fb
    testing.tRunner(0xc000003500, 0xf02878)
    	C:/Program Files/Go/src/testing/testing.go:1792 +0xcb
    created by testing.(*T).Run in goroutine 1
    	C:/Program Files/Go/src/testing/testing.go:1851 +0x3f6
    FAIL	codex/src/eventBus	30.025s
    FAIL
    
    anviod
        1
    anviod  
    OP
       17 小时 45 分钟前
    问题似乎是由于 sync.RWMutex.Lock 阻塞导致的,尤其是在 publish 方法中的扩容逻辑。
    在扩容时我的代码里面使用了锁,但由于 publish 方法中同时有多个 goroutine 竞争访问共享资源,导致了死锁。
    现在彻底没有了头绪,向大家请教一下🤦‍♂️
    pike0002
        2
    pike0002  
       16 小时 49 分钟前
    @anviod 死锁了。你不能在一个 goroutine 里面获取读锁之后又获取写锁
    Dganzh
        3
    Dganzh  
       6 小时 52 分钟前
    func (m *pubsub) Publish(topic string, msg interface{}) error {
    m.mu.RLock() // 读锁,允许并发读取
    defer m.mu.RUnlock()

    后续又调用 m.publish 的 default 那里又获取写锁
    // 通道已满,需要动态扩容
    newCh := make(chan interface{}, cap(ch)*2) // 扩容为原来的两倍

    // 使用锁确保旧通道被正确替换为新通道
    m.mu.Lock()
    defer m.mu.Unlock()
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   964 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 20ms · UTC 21:50 · PVG 05:50 · LAX 13:50 · JFK 16:50
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.