package eventBus
import (
"testing"
"time"
)
// 该测试不通过
func TestDynamicExpansion(t *testing.T) {
ps, err := NewPubsub(2) // 初始化缓冲区大小为 2
if err != nil {
t.Fatalf("Failed to create pubsub: %v", err)
}
defer ps.Close()
ch, err := ps.Subscribe("test")
if err != nil {
t.Fatalf("Failed to subscribe to topic: %v", err)
}
// 发布 3 条消息,触发动态扩容
ps.Publish("test", "msg1")
ps.Publish("test", "msg2")
ps.Publish("test", "msg3")
// 读取消息
expectedMessages := map[string]bool{"msg1": false, "msg2": false, "msg3": false}
for i := 0; i < 3; i++ {
msg := <-ch
strMsg := msg.(string) // 类型断言
if _, ok := expectedMessages[strMsg]; !ok {
t.Errorf("Unexpected message: %v", strMsg)
}
expectedMessages[strMsg] = true
}
for msg, received := range expectedMessages {
if !received {
t.Errorf("Expected message %v not received", msg)
}
}
ps.Unsubscribe("test", ch)
ps.Close()
}
func TestBus_SubscribeAndPublish(t *testing.T) {
ps, err := NewPubsub(10)
if err != nil {
t.Fatalf("Failed to create pubsub: %v", err)
}
defer ps.Close()
topic := "test_topic"
ch, err := ps.Subscribe(topic)
if err != nil {
t.Fatalf("Failed to subscribe to topic: %v", err)
}
msg := "test_message"
go func() {
if err := ps.Publish(topic, msg); err != nil {
t.Errorf("Failed to publish message: %v", err)
}
}()
select {
case received := <-ch:
if received != msg {
t.Errorf("Expected message %v, but got %v", msg, received)
}
case <-time.After(time.Second):
t.Errorf("Timeout waiting for message")
}
}
func TestBus_UnSubscribe(t *testing.T) {
ps, err := NewPubsub(10)
if err != nil {
t.Fatalf("Failed to create pubsub: %v", err)
}
defer ps.Close()
topic := "test_topic"
ch, err := ps.Subscribe(topic)
if err != nil {
t.Fatalf("Failed to subscribe to topic: %v", err)
}
if err := ps.Unsubscribe(topic, ch); err != nil {
t.Fatalf("Failed to unsubscribe from topic: %v", err)
}
msg := "test_message"
go func() {
if err := ps.Publish(topic, msg); err != nil {
t.Errorf("Failed to publish message: %v", err)
}
}()
select {
case <-ch:
t.Errorf("Received message after unsubscribe")
case <-time.After(time.Second):
// Expected timeout
}
}
func BenchmarkBus_Publish(b *testing.B) {
ps, err := NewPubsub(100)
if err != nil {
b.Fatalf("Failed to create pubsub: %v", err)
}
defer ps.Close()
topic := "benchmark_topic"
ch, err := ps.Subscribe(topic)
if err != nil {
b.Fatalf("Failed to subscribe to topic: %v", err)
}
msg := "benchmark_message"
b.ResetTimer()
for i := 0; i < b.N; i++ {
go func() {
if err := ps.Publish(topic, msg); err != nil {
b.Errorf("Failed to publish message: %v", err)
}
}()
select {
case <-ch:
case <-time.After(time.Second):
b.Errorf("Timeout waiting for message")
}
}
}
func BenchmarkBus_ConcurrentPublish(b *testing.B) {
ps, err := NewPubsub(2048)
if err != nil {
b.Fatalf("Failed to create pubsub: %v", err)
}
defer ps.Close()
topic := "benchmark_topic"
ch, err := ps.Subscribe(topic)
if err != nil {
b.Fatalf("Failed to subscribe to topic: %v", err)
}
msg := "benchmark_message"
b.ResetTimer()
for i := 0; i < b.N; i++ {
go func() {
if err := ps.Publish(topic, msg); err != nil {
b.Errorf("Failed to publish message: %v", err)
}
}()
select {
case <-ch:
case <-time.After(time.Second):
b.Errorf("Timeout waiting for message")
}
}
}
func TestPubsub(t *testing.T) {
ps, err := NewPubsub(10)
if err != nil {
t.Fatalf("Failed to create pubsub: %v", err)
}
topic := "testTopic"
msg := "testMessage"
// Subscribe to the topic
ch, err := ps.Subscribe(topic)
if err != nil {
t.Fatalf("Failed to subscribe to topic: %v", err)
}
// Publish a message to the topic
err = ps.Publish(topic, msg)
if err != nil {
t.Fatalf("Failed to publish message: %v", err)
}
// Verify the message is received
select {
case receivedMsg := <-ch:
if receivedMsg != msg {
t.Fatalf("Expected message %v, but got %v", msg, receivedMsg)
}
case <-time.After(time.Second):
t.Fatal("Timeout waiting for message")
}
// Unsubscribe from the topic
err = ps.Unsubscribe(topic, ch)
if err != nil {
t.Fatalf("Failed to unsubscribe from topic: %v", err)
}
// Close the pubsub
err = ps.Close()
if err != nil {
t.Fatalf("Failed to close pubsub: %v", err)
}
}
package eventBus
import (
"errors"
"sync"
"time"
)
const (
pubTimeout = time.Millisecond * 10
)
var (
ErrPubsubTimeout = errors.New("failed to send message to topic because of timeout")
ErrChannelFull = errors.New("channel is full")
)
type Pubsub interface {
Publish(topic string, msg interface{}) error
Subscribe(topic string) (chan interface{}, error)
Unsubscribe(topic string, ch chan interface{}) error
Close() error
}
type pubsub struct {
size int
channels map[string]map[chan interface{}]struct{}
mu sync.RWMutex
}
var channelPool *sync.Pool
// NewPubsub 初始化 pubsub 系统,使用默认的通道大小
func NewPubsub(size int) (Pubsub, error) {
// 初始化通道池,使用给定的大小作为默认缓冲区大小
channelPool = &sync.Pool{
New: func() interface{} {
return make(chan interface{}, size)
},
}
return &pubsub{
size: size,
channels: make(map[string]map[chan interface{}]struct{}),
}, nil
}
// getChannelFromPool 从池中获取一个通道
func getChannelFromPool() chan interface{} {
return channelPool.Get().(chan interface{})
}
// putChannelToPool 将通道放回池中,并清空其内容
func putChannelToPool(ch chan interface{}) {
for len(ch) > 0 {
<-ch
}
channelPool.Put(ch)
}
// Publish 向订阅者发送消息
func (m *pubsub) Publish(topic string, msg interface{}) error {
m.mu.RLock() // 读锁,允许并发读取
defer m.mu.RUnlock()
if chs, ok := m.channels[topic]; ok {
for ch := range chs {
if err := m.publish(topic, ch, msg); err != nil {
return err
}
}
}
return nil
}
// publish 尝试向单个通道发送消息,并处理通道扩容
func (m *pubsub) publish(topic string, ch chan interface{}, msg interface{}) error {
// 尝试向现有通道发送消息
select {
case ch <- msg: // 尝试发送消息
return nil
default:
// 通道已满,需要动态扩容
newCh := make(chan interface{}, cap(ch)*2) // 扩容为原来的两倍
// 使用锁确保旧通道被正确替换为新通道
m.mu.Lock()
defer m.mu.Unlock()
// 将旧通道中的消息移动到新通道
go func() {
for v := range ch {
newCh <- v
}
}()
// 更新通道映射,指向新通道
if chs, ok := m.channels[topic]; ok {
delete(chs, ch) // 删除旧通道
chs[newCh] = struct{}{} // 添加新扩容的通道
}
// 尝试向新扩容的通道发送消息
select {
case newCh <- msg:
return nil
case <-time.After(pubTimeout):
return ErrPubsubTimeout
}
}
}
// Subscribe 为给定主题创建一个新的订阅者通道
func (m *pubsub) Subscribe(topic string) (chan interface{}, error) {
ch := getChannelFromPool() // 从池中获取一个通道
m.mu.Lock()
defer m.mu.Unlock()
if _, ok := m.channels[topic]; !ok {
m.channels[topic] = make(map[chan interface{}]struct{})
}
m.channels[topic][ch] = struct{}{} // 存储通道到主题的映射中
return ch, nil
}
// Unsubscribe 从给定主题中移除订阅者通道
func (m *pubsub) Unsubscribe(topic string, ch chan interface{}) error {
m.mu.Lock()
defer m.mu.Unlock()
if chs, ok := m.channels[topic]; ok {
delete(chs, ch) // 从主题的订阅者列表中移除通道
putChannelToPool(ch) // 将通道放回池中
}
return nil
}
// Close 关闭 pubsub 系统,关闭所有通道并清理资源
func (m *pubsub) Close() error {
m.mu.Lock()
defer m.mu.Unlock()
for topic, chs := range m.channels {
for ch := range chs {
close(ch)
}
delete(m.channels, topic)
}
return nil
}
Running tool: C:\Program Files\Go\bin\go.exe test -timeout 30s -run ^TestDynamicExpansion$ codex/src/eventBus
panic: test timed out after 30s
running tests:
TestDynamicExpansion (30s)
goroutine 7 [running]:
testing.(*M).startAlarm.func1()
C:/Program Files/Go/src/testing/testing.go:2484 +0x394
created by time.goFunc
C:/Program Files/Go/src/time/sleep.go:215 +0x2d
goroutine 1 [chan receive]:
testing.(*T).Run(0xc000003340, {0xef7e20?, 0x7ffef7500e50?}, 0xf02878)
C:/Program Files/Go/src/testing/testing.go:1859 +0x414
testing.runTests.func1(0xc000003340)
C:/Program Files/Go/src/testing/testing.go:2279 +0x37
testing.tRunner(0xc000003340, 0xc00002bc70)
C:/Program Files/Go/src/testing/testing.go:1792 +0xcb
testing.runTests(0xc0000080d8, {0x102fde0, 0x4, 0x4}, {0x1035d00?, 0x7?, 0x1034b80?})
C:/Program Files/Go/src/testing/testing.go:2277 +0x4b4
testing.(*M).Run(0xc00007a320)
C:/Program Files/Go/src/testing/testing.go:2142 +0x64a
main.main()
_testmain.go:55 +0x9b
goroutine 6 [sync.RWMutex.Lock]:
sync.runtime_SemacquireRWMutex(0xd75301?, 0x40?, 0xec97c0?)
C:/Program Files/Go/src/runtime/sema.go:105 +0x25
sync.(*RWMutex).Lock(0x8080808080808074?)
C:/Program Files/Go/src/sync/rwmutex.go:155 +0x6a
codex/src/eventBus.(*pubsub).publish(0xc000020a80, {0xef3abd, 0x4}, 0xc00012a2a0, {0xec3120, 0xf2f220})
d:/code/X/codex/src/eventBus/bus.go:87 +0xc5
codex/src/eventBus.(*pubsub).Publish(0xc000020a80, {0xef3abd, 0x4}, {0xec3120, 0xf2f220})
d:/code/X/codex/src/eventBus/bus.go:68 +0x14b
codex/src/eventBus.TestDynamicExpansion(0xc000003500)
d:/code/X/codex/src/eventBus/bus_test.go:23 +0x1fb
testing.tRunner(0xc000003500, 0xf02878)
C:/Program Files/Go/src/testing/testing.go:1792 +0xcb
created by testing.(*T).Run in goroutine 1
C:/Program Files/Go/src/testing/testing.go:1851 +0x3f6
FAIL codex/src/eventBus 30.025s
FAIL
![]() |
1
anviod OP 问题似乎是由于 sync.RWMutex.Lock 阻塞导致的,尤其是在 publish 方法中的扩容逻辑。
在扩容时我的代码里面使用了锁,但由于 publish 方法中同时有多个 goroutine 竞争访问共享资源,导致了死锁。 现在彻底没有了头绪,向大家请教一下🤦♂️ |
3
Dganzh 6 小时 52 分钟前
func (m *pubsub) Publish(topic string, msg interface{}) error {
m.mu.RLock() // 读锁,允许并发读取 defer m.mu.RUnlock() 后续又调用 m.publish 的 default 那里又获取写锁 // 通道已满,需要动态扩容 newCh := make(chan interface{}, cap(ch)*2) // 扩容为原来的两倍 // 使用锁确保旧通道被正确替换为新通道 m.mu.Lock() defer m.mu.Unlock() |