[Go]消息发布订阅的代码 单元测试始终不通过 请教一下

216 天前
 anviod

单元测试代码

func TestDynamicExpansion(t *testing.T) 测试不通过

package eventBus

import (
	"testing"
	"time"
)
// 该测试不通过
func TestDynamicExpansion(t *testing.T) {
	ps, err := NewPubsub(2) // 初始化缓冲区大小为 2
	if err != nil {
		t.Fatalf("Failed to create pubsub: %v", err)
	}
	defer ps.Close()

	ch, err := ps.Subscribe("test")
	if err != nil {
		t.Fatalf("Failed to subscribe to topic: %v", err)
	}

	// 发布 3 条消息,触发动态扩容
	ps.Publish("test", "msg1")
	ps.Publish("test", "msg2")
	ps.Publish("test", "msg3")

	// 读取消息
	expectedMessages := map[string]bool{"msg1": false, "msg2": false, "msg3": false}
	for i := 0; i < 3; i++ {
		msg := <-ch
		strMsg := msg.(string) // 类型断言
		if _, ok := expectedMessages[strMsg]; !ok {
			t.Errorf("Unexpected message: %v", strMsg)
		}
		expectedMessages[strMsg] = true
	}

	for msg, received := range expectedMessages {
		if !received {
			t.Errorf("Expected message %v not received", msg)
		}
	}

	ps.Unsubscribe("test", ch)
	ps.Close()
}

func TestBus_SubscribeAndPublish(t *testing.T) {
	ps, err := NewPubsub(10)
	if err != nil {
		t.Fatalf("Failed to create pubsub: %v", err)
	}
	defer ps.Close()

	topic := "test_topic"
	ch, err := ps.Subscribe(topic)
	if err != nil {
		t.Fatalf("Failed to subscribe to topic: %v", err)
	}

	msg := "test_message"
	go func() {
		if err := ps.Publish(topic, msg); err != nil {
			t.Errorf("Failed to publish message: %v", err)
		}
	}()

	select {
	case received := <-ch:
		if received != msg {
			t.Errorf("Expected message %v, but got %v", msg, received)
		}
	case <-time.After(time.Second):
		t.Errorf("Timeout waiting for message")
	}
}

func TestBus_UnSubscribe(t *testing.T) {
	ps, err := NewPubsub(10)
	if err != nil {
		t.Fatalf("Failed to create pubsub: %v", err)
	}
	defer ps.Close()

	topic := "test_topic"
	ch, err := ps.Subscribe(topic)
	if err != nil {
		t.Fatalf("Failed to subscribe to topic: %v", err)
	}

	if err := ps.Unsubscribe(topic, ch); err != nil {
		t.Fatalf("Failed to unsubscribe from topic: %v", err)
	}

	msg := "test_message"
	go func() {
		if err := ps.Publish(topic, msg); err != nil {
			t.Errorf("Failed to publish message: %v", err)
		}
	}()

	select {
	case <-ch:
		t.Errorf("Received message after unsubscribe")
	case <-time.After(time.Second):
		// Expected timeout
	}
}

func BenchmarkBus_Publish(b *testing.B) {
	ps, err := NewPubsub(100)
	if err != nil {
		b.Fatalf("Failed to create pubsub: %v", err)
	}
	defer ps.Close()

	topic := "benchmark_topic"
	ch, err := ps.Subscribe(topic)
	if err != nil {
		b.Fatalf("Failed to subscribe to topic: %v", err)
	}

	msg := "benchmark_message"
	b.ResetTimer()

	for i := 0; i < b.N; i++ {
		go func() {
			if err := ps.Publish(topic, msg); err != nil {
				b.Errorf("Failed to publish message: %v", err)
			}
		}()

		select {
		case <-ch:
		case <-time.After(time.Second):
			b.Errorf("Timeout waiting for message")
		}
	}
}

func BenchmarkBus_ConcurrentPublish(b *testing.B) {
	ps, err := NewPubsub(2048)
	if err != nil {
		b.Fatalf("Failed to create pubsub: %v", err)
	}
	defer ps.Close()

	topic := "benchmark_topic"
	ch, err := ps.Subscribe(topic)
	if err != nil {
		b.Fatalf("Failed to subscribe to topic: %v", err)
	}

	msg := "benchmark_message"
	b.ResetTimer()

	for i := 0; i < b.N; i++ {
		go func() {
			if err := ps.Publish(topic, msg); err != nil {
				b.Errorf("Failed to publish message: %v", err)
			}
		}()

		select {
		case <-ch:
		case <-time.After(time.Second):
			b.Errorf("Timeout waiting for message")
		}
	}
}

func TestPubsub(t *testing.T) {
	ps, err := NewPubsub(10)
	if err != nil {
		t.Fatalf("Failed to create pubsub: %v", err)
	}

	topic := "testTopic"
	msg := "testMessage"

	// Subscribe to the topic
	ch, err := ps.Subscribe(topic)
	if err != nil {
		t.Fatalf("Failed to subscribe to topic: %v", err)
	}

	// Publish a message to the topic
	err = ps.Publish(topic, msg)
	if err != nil {
		t.Fatalf("Failed to publish message: %v", err)
	}

	// Verify the message is received
	select {
	case receivedMsg := <-ch:
		if receivedMsg != msg {
			t.Fatalf("Expected message %v, but got %v", msg, receivedMsg)
		}
	case <-time.After(time.Second):
		t.Fatal("Timeout waiting for message")
	}

	// Unsubscribe from the topic
	err = ps.Unsubscribe(topic, ch)
	if err != nil {
		t.Fatalf("Failed to unsubscribe from topic: %v", err)
	}

	// Close the pubsub
	err = ps.Close()
	if err != nil {
		t.Fatalf("Failed to close pubsub: %v", err)
	}
}

源代码如下

package eventBus

import (
	"errors"
	"sync"
	"time"
)

const (
	pubTimeout = time.Millisecond * 10
)

var (
	ErrPubsubTimeout = errors.New("failed to send message to topic because of timeout")
	ErrChannelFull   = errors.New("channel is full")
)

type Pubsub interface {
	Publish(topic string, msg interface{}) error
	Subscribe(topic string) (chan interface{}, error)
	Unsubscribe(topic string, ch chan interface{}) error
	Close() error
}

type pubsub struct {
	size     int
	channels map[string]map[chan interface{}]struct{}
	mu       sync.RWMutex
}

var channelPool *sync.Pool

// NewPubsub 初始化 pubsub 系统,使用默认的通道大小
func NewPubsub(size int) (Pubsub, error) {
	// 初始化通道池,使用给定的大小作为默认缓冲区大小
	channelPool = &sync.Pool{
		New: func() interface{} {
			return make(chan interface{}, size)
		},
	}

	return &pubsub{
		size:     size,
		channels: make(map[string]map[chan interface{}]struct{}),
	}, nil
}

// getChannelFromPool 从池中获取一个通道
func getChannelFromPool() chan interface{} {
	return channelPool.Get().(chan interface{})
}

// putChannelToPool 将通道放回池中,并清空其内容
func putChannelToPool(ch chan interface{}) {
	for len(ch) > 0 {
		<-ch
	}
	channelPool.Put(ch)
}

// Publish 向订阅者发送消息
func (m *pubsub) Publish(topic string, msg interface{}) error {
	m.mu.RLock() // 读锁,允许并发读取
	defer m.mu.RUnlock()

	if chs, ok := m.channels[topic]; ok {
		for ch := range chs {
			if err := m.publish(topic, ch, msg); err != nil {
				return err
			}
		}
	}
	return nil
}

// publish 尝试向单个通道发送消息,并处理通道扩容
func (m *pubsub) publish(topic string, ch chan interface{}, msg interface{}) error {
	// 尝试向现有通道发送消息
	select {
	case ch <- msg: // 尝试发送消息
		return nil
	default:
		// 通道已满,需要动态扩容
		newCh := make(chan interface{}, cap(ch)*2) // 扩容为原来的两倍

		// 使用锁确保旧通道被正确替换为新通道
		m.mu.Lock()
		defer m.mu.Unlock()

		// 将旧通道中的消息移动到新通道
		go func() {
			for v := range ch {
				newCh <- v
			}
		}()

		// 更新通道映射,指向新通道
		if chs, ok := m.channels[topic]; ok {
			delete(chs, ch)         // 删除旧通道
			chs[newCh] = struct{}{} // 添加新扩容的通道
		}

		// 尝试向新扩容的通道发送消息
		select {
		case newCh <- msg:
			return nil
		case <-time.After(pubTimeout):
			return ErrPubsubTimeout
		}
	}
}

// Subscribe 为给定主题创建一个新的订阅者通道
func (m *pubsub) Subscribe(topic string) (chan interface{}, error) {
	ch := getChannelFromPool() // 从池中获取一个通道

	m.mu.Lock()
	defer m.mu.Unlock()

	if _, ok := m.channels[topic]; !ok {
		m.channels[topic] = make(map[chan interface{}]struct{})
	}
	m.channels[topic][ch] = struct{}{} // 存储通道到主题的映射中
	return ch, nil
}

// Unsubscribe 从给定主题中移除订阅者通道
func (m *pubsub) Unsubscribe(topic string, ch chan interface{}) error {
	m.mu.Lock()
	defer m.mu.Unlock()

	if chs, ok := m.channels[topic]; ok {
		delete(chs, ch)      // 从主题的订阅者列表中移除通道
		putChannelToPool(ch) // 将通道放回池中
	}
	return nil
}

// Close 关闭 pubsub 系统,关闭所有通道并清理资源
func (m *pubsub) Close() error {
	m.mu.Lock()
	defer m.mu.Unlock()

	for topic, chs := range m.channels {
		for ch := range chs {
			close(ch)
		}
		delete(m.channels, topic)
	}
	return nil
}

测试结果

Running tool: C:\Program Files\Go\bin\go.exe test -timeout 30s -run ^TestDynamicExpansion$ codex/src/eventBus

panic: test timed out after 30s
	running tests:
		TestDynamicExpansion (30s)

goroutine 7 [running]:
testing.(*M).startAlarm.func1()
	C:/Program Files/Go/src/testing/testing.go:2484 +0x394
created by time.goFunc
	C:/Program Files/Go/src/time/sleep.go:215 +0x2d

goroutine 1 [chan receive]:
testing.(*T).Run(0xc000003340, {0xef7e20?, 0x7ffef7500e50?}, 0xf02878)
	C:/Program Files/Go/src/testing/testing.go:1859 +0x414
testing.runTests.func1(0xc000003340)
	C:/Program Files/Go/src/testing/testing.go:2279 +0x37
testing.tRunner(0xc000003340, 0xc00002bc70)
	C:/Program Files/Go/src/testing/testing.go:1792 +0xcb
testing.runTests(0xc0000080d8, {0x102fde0, 0x4, 0x4}, {0x1035d00?, 0x7?, 0x1034b80?})
	C:/Program Files/Go/src/testing/testing.go:2277 +0x4b4
testing.(*M).Run(0xc00007a320)
	C:/Program Files/Go/src/testing/testing.go:2142 +0x64a
main.main()
	_testmain.go:55 +0x9b

goroutine 6 [sync.RWMutex.Lock]:
sync.runtime_SemacquireRWMutex(0xd75301?, 0x40?, 0xec97c0?)
	C:/Program Files/Go/src/runtime/sema.go:105 +0x25
sync.(*RWMutex).Lock(0x8080808080808074?)
	C:/Program Files/Go/src/sync/rwmutex.go:155 +0x6a
codex/src/eventBus.(*pubsub).publish(0xc000020a80, {0xef3abd, 0x4}, 0xc00012a2a0, {0xec3120, 0xf2f220})
	d:/code/X/codex/src/eventBus/bus.go:87 +0xc5
codex/src/eventBus.(*pubsub).Publish(0xc000020a80, {0xef3abd, 0x4}, {0xec3120, 0xf2f220})
	d:/code/X/codex/src/eventBus/bus.go:68 +0x14b
codex/src/eventBus.TestDynamicExpansion(0xc000003500)
	d:/code/X/codex/src/eventBus/bus_test.go:23 +0x1fb
testing.tRunner(0xc000003500, 0xf02878)
	C:/Program Files/Go/src/testing/testing.go:1792 +0xcb
created by testing.(*T).Run in goroutine 1
	C:/Program Files/Go/src/testing/testing.go:1851 +0x3f6
FAIL	codex/src/eventBus	30.025s
FAIL
1171 次点击
所在节点    Go 编程语言
3 条回复
anviod
216 天前
问题似乎是由于 sync.RWMutex.Lock 阻塞导致的,尤其是在 publish 方法中的扩容逻辑。
在扩容时我的代码里面使用了锁,但由于 publish 方法中同时有多个 goroutine 竞争访问共享资源,导致了死锁。
现在彻底没有了头绪,向大家请教一下🤦‍♂️
pike0002
216 天前
@anviod 死锁了。你不能在一个 goroutine 里面获取读锁之后又获取写锁
Dganzh
215 天前
func (m *pubsub) Publish(topic string, msg interface{}) error {
m.mu.RLock() // 读锁,允许并发读取
defer m.mu.RUnlock()

后续又调用 m.publish 的 default 那里又获取写锁
// 通道已满,需要动态扩容
newCh := make(chan interface{}, cap(ch)*2) // 扩容为原来的两倍

// 使用锁确保旧通道被正确替换为新通道
m.mu.Lock()
defer m.mu.Unlock()

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/1112268

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX