1、用golang 写一个 消息队列,通过channel 多协程实现,一个写队列多个读队列
type MessageQueue struct {
mu sync.Mutex
queue chan string
readers []chan string
}
func NewMessageQueue() *MessageQueue {
return &MessageQueue{
queue: make(chan string, 10), // Buffer size 10, adjust as needed
readers: make([]chan string, 0),
}
}
func (mq *MessageQueue) AddReader(readerChan chan string) {
mq.mu.Lock()
defer mq.mu.Unlock()
mq.readers = append(mq.readers, readerChan)
}
func (mq *MessageQueue) Write(message string) {
mq.queue <- message
}
func (mq *MessageQueue) StartReaders() {
for _, readerChan := range mq.readers {
go func(ch chan string) {
for {
message, ok := <-ch
if !ok {
break
}
fmt.Println("Received:", message)
}
}(readerChan)
}
}
func main() {
messageQueue := NewMessageQueue()
// 启动多个 reader goroutine
for i := 0; i < 3; i++ {
readerChan := make(chan string, 10) // 缓冲区大小10,根据需要调整
messageQueue.AddReader(readerChan)
}
// 启动 reader goroutine
messageQueue.StartReaders()
// 将消息写入队列
for i := 1; i <= 5; i++ {
messageQueue.Write(fmt.Sprintf("Message %d", i))
time.Sleep(time.Second)
}
// 完成后关闭读者通道
messageQueue.mu.Lock()
for _, readerChan := range messageQueue.readers {
close(readerChan)
}
messageQueue.mu.Unlock()
// 留出时间让 reader goroutine 完成处理
time.Sleep(time.Second)
}
2、用golang 写一个 消息队列,通过channel 多协程实现,多个写队列多个读队列
package main
import (
"fmt"
"sync"
)
type MessageQueue1 struct {
mu sync.Mutex
queue []string
}
func NewMessageQueue1() *MessageQueue1 {
return &MessageQueue1{
queue: make([]string, 0),
}
}
func (mq *MessageQueue1) Write(message string) {
mq.mu.Lock()
defer mq.mu.Unlock()
mq.queue = append(mq.queue, message)
}
func (mq *MessageQueue1) Read() (string, bool) {
mq.mu.Lock()
defer mq.mu.Unlock()
if len(mq.queue) > 0 {
message := mq.queue[0]
mq.queue = mq.queue[1:]
return message, true
}
return "", false
}
func main() {
messageQueue1 := NewMessageQueue1()
var wg sync.WaitGroup
done := make(chan struct{})
// 启动多个写协程
for i := 1; i <= 8; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for j := 1; j <= 5; j++ {
message := fmt.Sprintf("-- Writer %d: Message %d", id, j)
messageQueue1.Write(message)
//time.Sleep(time.Millisecond * 1)
}
}(i)
}
// 启动多个读协程
for i := 0; i <= 7; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for {
message, ok := messageQueue1.Read()
if !ok {
break
}
fmt.Printf("Reader %d received: %s\n", id, message)
}
}(i)
}
// 等待所有协程完成
go func() {
wg.Wait()
close(done)
}()
// 等待所有协程完成后结束程序
<-done
}
3、并发安全的全局计数器编写一个 Go 程序,实现一个并发安全的全局计数器,要求能够支持并发的增加和获取计数。请使用互斥锁或其他并发安全的机制,确保多个 goroutine同时访问时不会出现竞态条件。
type Counter struct {
Count int
mx sync.Mutex
}
func (c Counter) InCream() {
c.mx.Lock()
defer c.mx.Unlock()
c.Count++
}
func (c Counter) CountNumber() int {
c.mx.Lock() //并发安全
defer c.mx.Unlock()
return c.Count
}
func main() {
count := Counter{}
var wg sync.WaitGroup
for i := 0; i < 100; i++ { //并发的增加
wg.Add(1)
go func() {
count.InCream()
wg.Done()
}()
}
wg.Wait()
for i := 0; i < 20; i++ {
go func(i int) {
fmt.Printf("Count %v,%v \n", i, count.CountNumber())
}(i)
}
time.Sleep(1 * time.Second)
}
4、并发安全的缓存
实现一个带有过期时间的并发安全的缓存系统。
缓存应该支持设置键值对、获取键值对和定期清理过期的键值对。使用互斥锁或其他并发安全的机制确保多个 goroutine 同时访问时不会出现竟态条件。
package main
import (
"sync"
"time"
)
// CacheItem 表示缓存中的一个键值对
type CacheItem struct {
Value interface{}
Expiration int64 // 过期时间戳,单位秒
}
// ConcurrentCache 表示并发安全的缓存系统
type ConcurrentCache struct {
cache map[string]CacheItem
mutex sync.Mutex
}
// NewConcurrentCache 创建一个新的并发安全的缓存系统
func NewConcurrentCache() *ConcurrentCache {
return &ConcurrentCache{
cache: make(map[string]CacheItem),
}
}
// Set 设置缓存中的键值对,并指定过期时间(秒)
func (c *ConcurrentCache) Set(key string, value interface{}, expirationSeconds int64) {
c.mutex.Lock()
defer c.mutex.Unlock()
expirationTime := time.Now().Unix() + expirationSeconds
c.cache[key] = CacheItem{
Value: value,
Expiration: expirationTime,
}
}
// Get 获取缓存中指定键的值,如果键不存在或已过期则返回nil
func (c *ConcurrentCache) Get(key string) interface{} {
c.mutex.Lock()
defer c.mutex.Unlock()
item, exists := c.cache[key]
if !exists || time.Now().Unix() > item.Expiration {
// 键不存在或已过期
return nil
}
return item.Value
}
// CleanExpired 清理过期的键值对
func (c *ConcurrentCache) CleanExpired() {
c.mutex.Lock()
defer c.mutex.Unlock()
currentTime := time.Now().Unix()
for key, item := range c.cache {
if currentTime > item.Expiration {
delete(c.cache, key)
}
}
}
func main() {
// 示例用法
cache := NewConcurrentCache()
// 设置键值对,并指定过期时间为10秒
cache.Set("key1", "value1", 10)
// 获取键值对
result := cache.Get("key1")
if result != nil {
println(result.(string))
} else {
println("Key not found or expired.")
}
// 定期清理过期的键值对
cache.CleanExpired()
}