Messagebuffer seems to work now.

This commit is contained in:
Marco 2023-11-20 21:46:25 +01:00
parent c21076c837
commit fcd3e32edf
4 changed files with 91 additions and 34 deletions

View File

@ -6,7 +6,7 @@ import (
"errors" "errors"
"log" "log"
"mchess_server/api" "mchess_server/api"
con "mchess_server/connection" conn "mchess_server/connection"
"mchess_server/types" "mchess_server/types"
"time" "time"
@ -16,7 +16,7 @@ import (
type Player struct { type Player struct {
Uuid uuid.UUID Uuid uuid.UUID
Conn *con.Connection Conn *conn.Connection
InGame bool InGame bool
wsConnEstablished chan bool wsConnEstablished chan bool
context context.Context context context.Context
@ -28,11 +28,12 @@ func NewPlayer(uuid uuid.UUID) *Player {
Conn: nil, Conn: nil,
InGame: false, InGame: false,
wsConnEstablished: make(chan bool), wsConnEstablished: make(chan bool),
context: context.Background(),
} }
} }
func (p *Player) SetConnection(ctx context.Context, conn *websocket.Conn) { func (p *Player) SetConnection(ctx context.Context, ws *websocket.Conn) {
p.Conn = con.NewConnection(con.WithWebsocket(conn)) p.Conn = conn.NewConnection(conn.WithWebsocket(ws), conn.WithContext(p.context))
p.context = ctx p.context = ctx
p.wsConnEstablished <- true p.wsConnEstablished <- true
} }

View File

@ -1,5 +1,7 @@
package connection package connection
import "sync"
type MessageBuffer struct { type MessageBuffer struct {
messages []message messages []message
getIndex int getIndex int
@ -7,6 +9,7 @@ type MessageBuffer struct {
size int size int
newDataInserted chan bool newDataInserted chan bool
firstWriteHappened bool firstWriteHappened bool
cond *sync.Cond
} }
type message struct { type message struct {
@ -15,6 +18,9 @@ type message struct {
} }
func newMessageBuffer(size int) *MessageBuffer { func newMessageBuffer(size int) *MessageBuffer {
mutex := &sync.Mutex{}
cond := sync.NewCond(mutex)
return &MessageBuffer{ return &MessageBuffer{
messages: make([]message, size), messages: make([]message, size),
size: size, size: size,
@ -22,10 +28,14 @@ func newMessageBuffer(size int) *MessageBuffer {
insertIndex: 0, insertIndex: 0,
newDataInserted: make(chan bool), newDataInserted: make(chan bool),
firstWriteHappened: false, firstWriteHappened: false,
cond: cond,
} }
} }
func (b *MessageBuffer) Insert(msg string) { func (b *MessageBuffer) Insert(msg string) {
b.cond.L.Lock()
defer b.cond.L.Unlock()
oldMessage := b.messages[b.insertIndex] oldMessage := b.messages[b.insertIndex]
b.messages[b.insertIndex] = message{content: msg, new: true} b.messages[b.insertIndex] = message{content: msg, new: true}
@ -38,16 +48,15 @@ func (b *MessageBuffer) Insert(msg string) {
b.insertIndex = b.incrementAndWrapIndex(b.insertIndex) b.insertIndex = b.incrementAndWrapIndex(b.insertIndex)
b.firstWriteHappened = true b.firstWriteHappened = true
b.cond.Broadcast()
select {
case b.newDataInserted <- true:
default:
}
} }
func (b *MessageBuffer) Get() (string, error) { func (b *MessageBuffer) Get() (string, error) {
b.cond.L.Lock()
defer b.cond.L.Unlock()
if !b.firstWriteHappened { if !b.firstWriteHappened {
<-b.newDataInserted b.cond.Wait()
} }
var msg *message var msg *message
@ -57,7 +66,7 @@ func (b *MessageBuffer) Get() (string, error) {
msg.new = false msg.new = false
break break
} }
<-b.newDataInserted b.cond.Wait()
} }
b.getIndex = b.incrementAndWrapIndex(b.getIndex) b.getIndex = b.incrementAndWrapIndex(b.getIndex)

View File

@ -1,6 +1,9 @@
package connection package connection
import ( import (
"fmt"
"strconv"
"sync"
"testing" "testing"
"time" "time"
@ -19,7 +22,6 @@ var (
func Test_MessageBuffer_Add(t *testing.T) { func Test_MessageBuffer_Add(t *testing.T) {
buf := newMessageBuffer(3) buf := newMessageBuffer(3)
buf.newDataInserted = nil //otherwise, this would break the test
t.Run("insert without wrapping", func(t *testing.T) { t.Run("insert without wrapping", func(t *testing.T) {
buf.Insert("message-1") buf.Insert("message-1")
@ -31,18 +33,13 @@ func Test_MessageBuffer_Add(t *testing.T) {
buf.Insert("message-3") buf.Insert("message-3")
assert.Equal( assert.Equal(
t, t,
&MessageBuffer{ []message{
size: 3, {content: message1, new: true},
getIndex: 0, {content: message2, new: true},
insertIndex: 0, {content: message3, new: true},
firstWriteHappened: true,
messages: []message{
{content: message1, new: true},
{content: message2, new: true},
{content: message3, new: true},
},
}, },
buf) buf.messages,
)
}) })
t.Run("insert that causes wrapping", func(t *testing.T) { t.Run("insert that causes wrapping", func(t *testing.T) {
@ -87,7 +84,7 @@ func Test_MessageBuffer_GetWaitsForNewData(t *testing.T) {
assert.Equal(t, "message-1", msg) assert.Equal(t, "message-1", msg)
go func() { go func() {
timer := time.NewTimer(500 * time.Millisecond) timer := time.NewTimer(100 * time.Millisecond)
<-timer.C <-timer.C
buf.Insert("delayed-message") buf.Insert("delayed-message")
}() }()
@ -165,3 +162,38 @@ func Test_MessageBuffer_InsertCatchesUpWithRead(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, message3, msg) assert.Equal(t, message3, msg)
} }
func Test_MessageBuffer_FuckShitUp(t *testing.T) {
size := 10
buf := newMessageBuffer(size)
wg := sync.WaitGroup{}
wg.Add(2)
var readMsg = make([]string, 0)
go func() {
for i := 0; i < size*10; i++ {
msg, _ := buf.Get()
if msg == "99" {
break
}
fmt.Println("i = ", i, ": msg = ", msg)
readMsg = append(readMsg, msg)
}
wg.Done()
}()
go func() {
for i := 0; i < size*10; i++ {
if i%10 == 0 {
timer := time.NewTimer(1 * time.Millisecond)
<-timer.C
}
buf.Insert(strconv.Itoa(i))
}
wg.Done()
}()
wg.Wait()
fmt.Println(buf.messages)
fmt.Println(readMsg)
}

View File

@ -7,16 +7,29 @@ import (
) )
type Connection struct { type Connection struct {
ws *websocket.Conn ws *websocket.Conn
ctx context.Context
buffer MessageBuffer
} }
func NewConnection(options ...func(*Connection)) *Connection { func NewConnection(options ...func(*Connection)) *Connection {
connection := Connection{} connection := Connection{
buffer: *newMessageBuffer(100),
}
for _, option := range options { for _, option := range options {
option(&connection) option(&connection)
} }
if connection.ws != nil {
go func() {
for {
_, msg, _ := connection.ws.Read(connection.ctx)
connection.buffer.Insert(string(msg))
}
}()
}
return &connection return &connection
} }
@ -26,21 +39,23 @@ func WithWebsocket(ws *websocket.Conn) func(*Connection) {
} }
} }
func WithContext(ctx context.Context) func(*Connection) {
return func(c *Connection) {
c.ctx = ctx
}
}
func (conn *Connection) Write(ctx context.Context, msg []byte) error { func (conn *Connection) Write(ctx context.Context, msg []byte) error {
return conn.ws.Write(ctx, websocket.MessageText, msg) return conn.ws.Write(ctx, websocket.MessageText, msg)
} }
func (conn *Connection) Read(ctx context.Context) ([]byte, error) { func (conn *Connection) Read(ctx context.Context) ([]byte, error) {
var msg []byte msg, err := conn.buffer.Get()
var err error if err != nil {
for { return nil, err // Tell game-handler that connection was lost
_, msg, err = conn.ws.Read(ctx)
if err != nil {
return nil, err // Tell game-handler that connection was lost
}
} }
return msg, err return []byte(msg), err
} }
func (conn *Connection) Close(msg string) { func (conn *Connection) Close(msg string) {