83 lines
1.6 KiB
Go
83 lines
1.6 KiB
Go
|
package connection
|
||
|
|
||
|
import "sync"
|
||
|
|
||
|
type MessageBuffer struct {
|
||
|
messages []message
|
||
|
getIndex int
|
||
|
insertIndex int
|
||
|
size int
|
||
|
newDataInserted chan bool
|
||
|
firstWriteHappened bool
|
||
|
cond *sync.Cond
|
||
|
}
|
||
|
|
||
|
type message struct {
|
||
|
content string
|
||
|
new bool
|
||
|
}
|
||
|
|
||
|
func newMessageBuffer(size int) *MessageBuffer {
|
||
|
cond := sync.NewCond(&sync.Mutex{})
|
||
|
|
||
|
return &MessageBuffer{
|
||
|
messages: make([]message, size),
|
||
|
size: size,
|
||
|
getIndex: 0,
|
||
|
insertIndex: 0,
|
||
|
newDataInserted: make(chan bool),
|
||
|
firstWriteHappened: false,
|
||
|
cond: cond,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (b *MessageBuffer) Insert(msg string) {
|
||
|
b.cond.L.Lock()
|
||
|
defer b.cond.L.Unlock()
|
||
|
|
||
|
oldMessage := b.messages[b.insertIndex]
|
||
|
b.messages[b.insertIndex] = message{content: msg, new: true}
|
||
|
|
||
|
if b.firstWriteHappened &&
|
||
|
b.insertIndex == b.getIndex &&
|
||
|
oldMessage.new { // insertIndex caught up with getIndex
|
||
|
b.getIndex = b.incrementAndWrapIndex(b.getIndex)
|
||
|
}
|
||
|
|
||
|
b.insertIndex = b.incrementAndWrapIndex(b.insertIndex)
|
||
|
|
||
|
b.firstWriteHappened = true
|
||
|
b.cond.Broadcast()
|
||
|
}
|
||
|
|
||
|
func (b *MessageBuffer) Get() (string, error) {
|
||
|
b.cond.L.Lock()
|
||
|
defer b.cond.L.Unlock()
|
||
|
|
||
|
if !b.firstWriteHappened {
|
||
|
b.cond.Wait()
|
||
|
}
|
||
|
|
||
|
var msg *message
|
||
|
for {
|
||
|
msg = &b.messages[b.getIndex]
|
||
|
if msg.new {
|
||
|
msg.new = false
|
||
|
break
|
||
|
}
|
||
|
b.cond.Wait()
|
||
|
}
|
||
|
b.getIndex = b.incrementAndWrapIndex(b.getIndex)
|
||
|
|
||
|
return msg.content, nil
|
||
|
}
|
||
|
|
||
|
func (b MessageBuffer) incrementAndWrapIndex(index int) int {
|
||
|
newIndex := index + 1
|
||
|
if newIndex == b.size {
|
||
|
newIndex = 0
|
||
|
}
|
||
|
|
||
|
return newIndex
|
||
|
}
|