Compare commits
2 Commits
d951fab25b
...
fcd3e32edf
Author | SHA1 | Date | |
---|---|---|---|
fcd3e32edf | |||
c21076c837 |
@ -6,7 +6,7 @@ import (
|
||||
"errors"
|
||||
"log"
|
||||
"mchess_server/api"
|
||||
con "mchess_server/connection"
|
||||
conn "mchess_server/connection"
|
||||
"mchess_server/types"
|
||||
"time"
|
||||
|
||||
@ -16,7 +16,7 @@ import (
|
||||
|
||||
type Player struct {
|
||||
Uuid uuid.UUID
|
||||
Conn *con.Connection
|
||||
Conn *conn.Connection
|
||||
InGame bool
|
||||
wsConnEstablished chan bool
|
||||
context context.Context
|
||||
@ -28,11 +28,12 @@ func NewPlayer(uuid uuid.UUID) *Player {
|
||||
Conn: nil,
|
||||
InGame: false,
|
||||
wsConnEstablished: make(chan bool),
|
||||
context: context.Background(),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Player) SetConnection(ctx context.Context, conn *websocket.Conn) {
|
||||
p.Conn = con.NewConnection(con.WithWebsocket(conn))
|
||||
func (p *Player) SetConnection(ctx context.Context, ws *websocket.Conn) {
|
||||
p.Conn = conn.NewConnection(conn.WithWebsocket(ws), conn.WithContext(p.context))
|
||||
p.context = ctx
|
||||
p.wsConnEstablished <- true
|
||||
}
|
||||
|
@ -1,54 +1,76 @@
|
||||
package connection
|
||||
|
||||
import "fmt"
|
||||
import "sync"
|
||||
|
||||
type MessageBuffer struct {
|
||||
messages []*string
|
||||
messages []message
|
||||
getIndex int
|
||||
insertIndex int
|
||||
size int
|
||||
newDataInserted chan bool
|
||||
firstWriteHappened bool
|
||||
cond *sync.Cond
|
||||
}
|
||||
|
||||
type message struct {
|
||||
content string
|
||||
new bool
|
||||
}
|
||||
|
||||
func newMessageBuffer(size int) *MessageBuffer {
|
||||
mutex := &sync.Mutex{}
|
||||
cond := sync.NewCond(mutex)
|
||||
|
||||
return &MessageBuffer{
|
||||
messages: make([]*string, size),
|
||||
messages: make([]message, size),
|
||||
size: size,
|
||||
getIndex: 0,
|
||||
insertIndex: 0,
|
||||
newDataInserted: make(chan bool),
|
||||
firstWriteHappened: false,
|
||||
cond: cond,
|
||||
}
|
||||
}
|
||||
|
||||
func (b *MessageBuffer) Insert(msg string) {
|
||||
b.messages[b.insertIndex] = &msg
|
||||
b.insertIndex = b.incrementAndWrapIndex(b.insertIndex)
|
||||
b.cond.L.Lock()
|
||||
defer b.cond.L.Unlock()
|
||||
|
||||
select {
|
||||
case b.newDataInserted <- true:
|
||||
default:
|
||||
}
|
||||
oldMessage := b.messages[b.insertIndex]
|
||||
b.messages[b.insertIndex] = message{content: msg, new: true}
|
||||
|
||||
if b.firstWriteHappened && b.insertIndex-1 == b.getIndex { // insertIndex caught up with getIndex
|
||||
if b.firstWriteHappened &&
|
||||
b.insertIndex == b.getIndex &&
|
||||
oldMessage.new { // insertIndex caught up with getIndex
|
||||
b.getIndex = b.incrementAndWrapIndex(b.getIndex)
|
||||
}
|
||||
|
||||
b.insertIndex = b.incrementAndWrapIndex(b.insertIndex)
|
||||
|
||||
b.firstWriteHappened = true
|
||||
b.cond.Broadcast()
|
||||
}
|
||||
|
||||
func (b *MessageBuffer) Get() (string, error) {
|
||||
if !b.firstWriteHappened || b.messages[b.getIndex] == nil {
|
||||
<-b.newDataInserted
|
||||
b.cond.L.Lock()
|
||||
defer b.cond.L.Unlock()
|
||||
|
||||
if !b.firstWriteHappened {
|
||||
b.cond.Wait()
|
||||
}
|
||||
|
||||
msg := b.messages[b.getIndex]
|
||||
if msg == nil {
|
||||
return "", fmt.Errorf("error getting value from buffer: value was nil")
|
||||
var msg *message
|
||||
for {
|
||||
msg = &b.messages[b.getIndex]
|
||||
if msg.new {
|
||||
msg.new = false
|
||||
break
|
||||
}
|
||||
b.cond.Wait()
|
||||
}
|
||||
b.getIndex = b.incrementAndWrapIndex(b.getIndex)
|
||||
|
||||
return *msg, nil
|
||||
return msg.content, nil
|
||||
}
|
||||
|
||||
func (b MessageBuffer) incrementAndWrapIndex(index int) int {
|
||||
|
@ -1,6 +1,9 @@
|
||||
package connection
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -12,11 +15,13 @@ var (
|
||||
message2 = "message-2"
|
||||
message3 = "message-3"
|
||||
message4 = "message-4"
|
||||
message5 = "message-5"
|
||||
message6 = "message-6"
|
||||
message7 = "message-7"
|
||||
)
|
||||
|
||||
func Test_MessageBuffer_Add(t *testing.T) {
|
||||
buf := newMessageBuffer(3)
|
||||
buf.newDataInserted = nil //otherwise, this would break the test
|
||||
|
||||
t.Run("insert without wrapping", func(t *testing.T) {
|
||||
buf.Insert("message-1")
|
||||
@ -28,24 +33,23 @@ func Test_MessageBuffer_Add(t *testing.T) {
|
||||
buf.Insert("message-3")
|
||||
assert.Equal(
|
||||
t,
|
||||
&MessageBuffer{
|
||||
size: 3,
|
||||
getIndex: 0,
|
||||
insertIndex: 0,
|
||||
firstWriteHappened: true,
|
||||
messages: []*string{&message1, &message2, &message3},
|
||||
[]message{
|
||||
{content: message1, new: true},
|
||||
{content: message2, new: true},
|
||||
{content: message3, new: true},
|
||||
},
|
||||
buf)
|
||||
buf.messages,
|
||||
)
|
||||
})
|
||||
|
||||
t.Run("insert that causes wrapping", func(t *testing.T) {
|
||||
buf.Insert("message-4")
|
||||
assert.Equal(
|
||||
t,
|
||||
[]*string{
|
||||
&message4,
|
||||
&message2,
|
||||
&message3,
|
||||
[]message{
|
||||
{content: message4, new: true},
|
||||
{content: message2, new: true},
|
||||
{content: message3, new: true},
|
||||
},
|
||||
buf.messages)
|
||||
})
|
||||
@ -80,7 +84,7 @@ func Test_MessageBuffer_GetWaitsForNewData(t *testing.T) {
|
||||
assert.Equal(t, "message-1", msg)
|
||||
|
||||
go func() {
|
||||
timer := time.NewTimer(50 * time.Millisecond)
|
||||
timer := time.NewTimer(100 * time.Millisecond)
|
||||
<-timer.C
|
||||
buf.Insert("delayed-message")
|
||||
}()
|
||||
@ -96,10 +100,22 @@ func Test_MessageBuffer_IndexesAreCorrectAfterOverwritingOldData(t *testing.T) {
|
||||
buf.Insert("message-1")
|
||||
buf.Insert("message-2")
|
||||
|
||||
assert.Equal(t, []*string{&message1, &message2}, buf.messages)
|
||||
assert.Equal(
|
||||
t,
|
||||
[]message{
|
||||
{content: message1, new: true},
|
||||
{content: message2, new: true},
|
||||
},
|
||||
buf.messages)
|
||||
|
||||
buf.Insert("message-3")
|
||||
assert.Equal(t, []*string{&message3, &message2}, buf.messages)
|
||||
assert.Equal(
|
||||
t,
|
||||
[]message{
|
||||
{content: message3, new: true},
|
||||
{content: message2, new: true},
|
||||
},
|
||||
buf.messages)
|
||||
|
||||
msg, err := buf.Get()
|
||||
assert.NoError(t, err)
|
||||
@ -110,12 +126,11 @@ func Test_MessageBuffer_GetWaitsForNewDataIfOldOneWasAlreadyGotten(t *testing.T)
|
||||
buf := newMessageBuffer(2)
|
||||
|
||||
buf.Insert(message1)
|
||||
buf.Insert(message2)
|
||||
|
||||
msg, err := buf.Get()
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, message1, msg)
|
||||
|
||||
buf.Insert(message2)
|
||||
msg, err = buf.Get()
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, message2, msg)
|
||||
@ -130,3 +145,55 @@ func Test_MessageBuffer_GetWaitsForNewDataIfOldOneWasAlreadyGotten(t *testing.T)
|
||||
assert.Equal(t, message3, msg)
|
||||
|
||||
}
|
||||
|
||||
func Test_MessageBuffer_InsertCatchesUpWithRead(t *testing.T) {
|
||||
buf := newMessageBuffer(5)
|
||||
|
||||
buf.Insert(message1)
|
||||
buf.Insert(message2)
|
||||
buf.Insert(message3)
|
||||
buf.Insert(message4)
|
||||
buf.Insert(message5)
|
||||
buf.Insert(message6)
|
||||
buf.Insert(message7)
|
||||
|
||||
msg, err := buf.Get()
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, message3, msg)
|
||||
}
|
||||
|
||||
func Test_MessageBuffer_FuckShitUp(t *testing.T) {
|
||||
size := 10
|
||||
buf := newMessageBuffer(size)
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
wg.Add(2)
|
||||
var readMsg = make([]string, 0)
|
||||
go func() {
|
||||
for i := 0; i < size*10; i++ {
|
||||
msg, _ := buf.Get()
|
||||
if msg == "99" {
|
||||
break
|
||||
}
|
||||
fmt.Println("i = ", i, ": msg = ", msg)
|
||||
readMsg = append(readMsg, msg)
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
go func() {
|
||||
for i := 0; i < size*10; i++ {
|
||||
if i%10 == 0 {
|
||||
timer := time.NewTimer(1 * time.Millisecond)
|
||||
<-timer.C
|
||||
}
|
||||
buf.Insert(strconv.Itoa(i))
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
fmt.Println(buf.messages)
|
||||
fmt.Println(readMsg)
|
||||
}
|
||||
|
@ -7,16 +7,29 @@ import (
|
||||
)
|
||||
|
||||
type Connection struct {
|
||||
ws *websocket.Conn
|
||||
ws *websocket.Conn
|
||||
ctx context.Context
|
||||
buffer MessageBuffer
|
||||
}
|
||||
|
||||
func NewConnection(options ...func(*Connection)) *Connection {
|
||||
connection := Connection{}
|
||||
connection := Connection{
|
||||
buffer: *newMessageBuffer(100),
|
||||
}
|
||||
|
||||
for _, option := range options {
|
||||
option(&connection)
|
||||
}
|
||||
|
||||
if connection.ws != nil {
|
||||
go func() {
|
||||
for {
|
||||
_, msg, _ := connection.ws.Read(connection.ctx)
|
||||
connection.buffer.Insert(string(msg))
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
return &connection
|
||||
}
|
||||
|
||||
@ -26,21 +39,23 @@ func WithWebsocket(ws *websocket.Conn) func(*Connection) {
|
||||
}
|
||||
}
|
||||
|
||||
func WithContext(ctx context.Context) func(*Connection) {
|
||||
return func(c *Connection) {
|
||||
c.ctx = ctx
|
||||
}
|
||||
}
|
||||
|
||||
func (conn *Connection) Write(ctx context.Context, msg []byte) error {
|
||||
return conn.ws.Write(ctx, websocket.MessageText, msg)
|
||||
}
|
||||
|
||||
func (conn *Connection) Read(ctx context.Context) ([]byte, error) {
|
||||
var msg []byte
|
||||
var err error
|
||||
for {
|
||||
_, msg, err = conn.ws.Read(ctx)
|
||||
if err != nil {
|
||||
return nil, err // Tell game-handler that connection was lost
|
||||
}
|
||||
msg, err := conn.buffer.Get()
|
||||
if err != nil {
|
||||
return nil, err // Tell game-handler that connection was lost
|
||||
}
|
||||
|
||||
return msg, err
|
||||
return []byte(msg), err
|
||||
}
|
||||
|
||||
func (conn *Connection) Close(msg string) {
|
||||
|
Loading…
Reference in New Issue
Block a user