mirror of
https://github.com/strukturag/nextcloud-spreed-signaling.git
synced 2025-04-11 14:21:18 +00:00
Support bandwidth limits when selecting proxy to use.
This commit is contained in:
parent
a4b8a81734
commit
7e7a6d5c09
9 changed files with 677 additions and 72 deletions
29
api_proxy.go
29
api_proxy.go
|
@ -299,12 +299,41 @@ type PayloadProxyServerMessage struct {
|
|||
|
||||
// Type "event"
|
||||
|
||||
type EventProxyServerBandwidth struct {
|
||||
// Incoming is the bandwidth utilization for publishers in percent.
|
||||
Incoming *float64 `json:"incoming,omitempty"`
|
||||
// Outgoing is the bandwidth utilization for subscribers in percent.
|
||||
Outgoing *float64 `json:"outgoing,omitempty"`
|
||||
}
|
||||
|
||||
func (b *EventProxyServerBandwidth) String() string {
|
||||
if b.Incoming != nil && b.Outgoing != nil {
|
||||
return fmt.Sprintf("bandwidth: incoming=%.3f%%, outgoing=%.3f%%", *b.Incoming, *b.Outgoing)
|
||||
} else if b.Incoming != nil {
|
||||
return fmt.Sprintf("bandwidth: incoming=%.3f%%, outgoing=unlimited", *b.Incoming)
|
||||
} else if b.Outgoing != nil {
|
||||
return fmt.Sprintf("bandwidth: incoming=unlimited, outgoing=%.3f%%", *b.Outgoing)
|
||||
} else {
|
||||
return "bandwidth: incoming=unlimited, outgoing=unlimited"
|
||||
}
|
||||
}
|
||||
|
||||
func (b EventProxyServerBandwidth) AllowIncoming() bool {
|
||||
return b.Incoming == nil || *b.Incoming < 100
|
||||
}
|
||||
|
||||
func (b EventProxyServerBandwidth) AllowOutgoing() bool {
|
||||
return b.Outgoing == nil || *b.Outgoing < 100
|
||||
}
|
||||
|
||||
type EventProxyServerMessage struct {
|
||||
Type string `json:"type"`
|
||||
|
||||
ClientId string `json:"clientId,omitempty"`
|
||||
Load int64 `json:"load,omitempty"`
|
||||
Sid string `json:"sid,omitempty"`
|
||||
|
||||
Bandwidth *EventProxyServerBandwidth `json:"bandwidth,omitempty"`
|
||||
}
|
||||
|
||||
// Information on a proxy in the etcd cluster.
|
||||
|
|
|
@ -103,6 +103,8 @@ The running container can be configured through different environment variables:
|
|||
- `EXTERNAL_HOSTNAME`: The external hostname for remote streams. Will try to autodetect if omitted.
|
||||
- `TOKEN_ID`: Id of the token to use when connecting remote streams.
|
||||
- `TOKEN_KEY`: Private key for the configured token id.
|
||||
- `BANDWIDTH_INCOMING`: Optional incoming target bandwidth (in megabits per second).
|
||||
- `BANDWIDTH_OUTGOING`: Optional outgoing target bandwidth (in megabits per second).
|
||||
- `JANUS_URL`: Url to Janus server.
|
||||
- `MAX_STREAM_BITRATE`: Optional maximum bitrate for audio/video streams.
|
||||
- `MAX_SCREEN_BITRATE`: Optional maximum bitrate for screensharing streams.
|
||||
|
|
|
@ -52,6 +52,11 @@ if [ ! -f "$CONFIG" ]; then
|
|||
fi
|
||||
if [ -n "$TOKEN_KEY" ]; then
|
||||
sed -i "s|#token_key =.*|token_key = $TOKEN_KEY|" "$CONFIG"
|
||||
if [ -n "$BANDWIDTH_INCOMING" ]; then
|
||||
sed -i "s|#incoming =.*|incoming = $BANDWIDTH_INCOMING|" "$CONFIG"
|
||||
fi
|
||||
if [ -n "$BANDWIDTH_OUTGOING" ]; then
|
||||
sed -i "s|#outgoing =.*|outgoing = $BANDWIDTH_OUTGOING|" "$CONFIG"
|
||||
fi
|
||||
|
||||
HAS_ETCD=
|
||||
|
|
173
mcu_proxy.go
173
mcu_proxy.go
|
@ -340,6 +340,7 @@ type mcuProxyConnection struct {
|
|||
ip net.IP
|
||||
|
||||
load atomic.Int64
|
||||
bandwidth atomic.Pointer[EventProxyServerBandwidth]
|
||||
mu sync.Mutex
|
||||
closer *Closer
|
||||
closedDone *Closer
|
||||
|
@ -391,6 +392,7 @@ func newMcuProxyConnection(proxy *mcuProxy, baseUrl string, ip net.IP) (*mcuProx
|
|||
}
|
||||
conn.reconnectInterval.Store(int64(initialReconnectInterval))
|
||||
conn.load.Store(loadNotConnected)
|
||||
conn.bandwidth.Store(nil)
|
||||
conn.country.Store("")
|
||||
return conn, nil
|
||||
}
|
||||
|
@ -494,6 +496,10 @@ func (c *mcuProxyConnection) Load() int64 {
|
|||
return c.load.Load()
|
||||
}
|
||||
|
||||
func (c *mcuProxyConnection) Bandwidth() *EventProxyServerBandwidth {
|
||||
return c.bandwidth.Load()
|
||||
}
|
||||
|
||||
func (c *mcuProxyConnection) Country() string {
|
||||
return c.country.Load().(string)
|
||||
}
|
||||
|
@ -538,7 +544,10 @@ func (c *mcuProxyConnection) readPump() {
|
|||
}
|
||||
}()
|
||||
defer c.close()
|
||||
defer c.load.Store(loadNotConnected)
|
||||
defer func() {
|
||||
c.load.Store(loadNotConnected)
|
||||
c.bandwidth.Store(nil)
|
||||
}()
|
||||
|
||||
c.mu.Lock()
|
||||
conn := c.conn
|
||||
|
@ -1004,9 +1013,10 @@ func (c *mcuProxyConnection) processEvent(msg *ProxyServerMessage) {
|
|||
return
|
||||
case "update-load":
|
||||
if proxyDebugMessages {
|
||||
log.Printf("Load of %s now at %d", c, event.Load)
|
||||
log.Printf("Load of %s now at %d (%s)", c, event.Load, event.Bandwidth)
|
||||
}
|
||||
c.load.Store(event.Load)
|
||||
c.bandwidth.Store(event.Bandwidth)
|
||||
statsProxyBackendLoadCurrent.WithLabelValues(c.url.String()).Set(float64(event.Load))
|
||||
return
|
||||
case "shutdown-scheduled":
|
||||
|
@ -1738,27 +1748,27 @@ func (m *mcuProxy) removePublisher(publisher *mcuProxyPublisher) {
|
|||
delete(m.publishers, getStreamId(publisher.id, publisher.StreamType()))
|
||||
}
|
||||
|
||||
func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) {
|
||||
connections := m.getSortedConnections(initiator)
|
||||
func (m *mcuProxy) createPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, bitrate int, mediaTypes MediaType, initiator McuInitiator, connections []*mcuProxyConnection, isAllowed func(c *mcuProxyConnection) bool) McuPublisher {
|
||||
var maxBitrate int
|
||||
if streamType == StreamTypeScreen {
|
||||
maxBitrate = m.maxScreenBitrate
|
||||
} else {
|
||||
maxBitrate = m.maxStreamBitrate
|
||||
}
|
||||
if bitrate <= 0 {
|
||||
bitrate = maxBitrate
|
||||
} else {
|
||||
bitrate = min(bitrate, maxBitrate)
|
||||
}
|
||||
|
||||
for _, conn := range connections {
|
||||
if conn.IsShutdownScheduled() || conn.IsTemporary() {
|
||||
if !isAllowed(conn) || conn.IsShutdownScheduled() || conn.IsTemporary() {
|
||||
continue
|
||||
}
|
||||
|
||||
subctx, cancel := context.WithTimeout(ctx, m.proxyTimeout)
|
||||
defer cancel()
|
||||
|
||||
var maxBitrate int
|
||||
if streamType == StreamTypeScreen {
|
||||
maxBitrate = m.maxScreenBitrate
|
||||
} else {
|
||||
maxBitrate = m.maxStreamBitrate
|
||||
}
|
||||
if bitrate <= 0 {
|
||||
bitrate = maxBitrate
|
||||
} else {
|
||||
bitrate = min(bitrate, maxBitrate)
|
||||
}
|
||||
publisher, err := conn.newPublisher(subctx, listener, id, sid, streamType, bitrate, mediaTypes)
|
||||
if err != nil {
|
||||
log.Printf("Could not create %s publisher for %s on %s: %s", streamType, id, conn, err)
|
||||
|
@ -1769,11 +1779,61 @@ func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id st
|
|||
m.publishers[getStreamId(id, streamType)] = conn
|
||||
m.mu.Unlock()
|
||||
m.publisherWaiters.Wakeup()
|
||||
return publisher, nil
|
||||
return publisher
|
||||
}
|
||||
|
||||
statsProxyNobackendAvailableTotal.WithLabelValues(string(streamType)).Inc()
|
||||
return nil, fmt.Errorf("No MCU connection available")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mcuProxy) NewPublisher(ctx context.Context, listener McuListener, id string, sid string, streamType StreamType, bitrate int, mediaTypes MediaType, initiator McuInitiator) (McuPublisher, error) {
|
||||
connections := m.getSortedConnections(initiator)
|
||||
publisher := m.createPublisher(ctx, listener, id, sid, streamType, bitrate, mediaTypes, initiator, connections, func(c *mcuProxyConnection) bool {
|
||||
bw := c.Bandwidth()
|
||||
return bw == nil || bw.AllowIncoming()
|
||||
})
|
||||
if publisher == nil {
|
||||
// No proxy has available bandwidth, select one with the lowest currently used bandwidth.
|
||||
connections2 := make([]*mcuProxyConnection, 0, len(connections))
|
||||
for _, c := range connections {
|
||||
if c.Bandwidth() != nil {
|
||||
connections2 = append(connections2, c)
|
||||
}
|
||||
}
|
||||
SlicesSortFunc(connections2, func(a *mcuProxyConnection, b *mcuProxyConnection) int {
|
||||
var incoming_a *float64
|
||||
if bw := a.Bandwidth(); bw != nil {
|
||||
incoming_a = bw.Incoming
|
||||
}
|
||||
|
||||
var incoming_b *float64
|
||||
if bw := b.Bandwidth(); bw != nil {
|
||||
incoming_b = bw.Incoming
|
||||
}
|
||||
|
||||
if incoming_a == nil && incoming_b == nil {
|
||||
return 0
|
||||
} else if incoming_a == nil && incoming_b != nil {
|
||||
return -1
|
||||
} else if incoming_a != nil && incoming_b == nil {
|
||||
return -1
|
||||
} else if *incoming_a < *incoming_b {
|
||||
return -1
|
||||
} else if *incoming_a > *incoming_b {
|
||||
return 1
|
||||
}
|
||||
return 0
|
||||
})
|
||||
publisher = m.createPublisher(ctx, listener, id, sid, streamType, bitrate, mediaTypes, initiator, connections2, func(c *mcuProxyConnection) bool {
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
if publisher == nil {
|
||||
statsProxyNobackendAvailableTotal.WithLabelValues(string(streamType)).Inc()
|
||||
return nil, fmt.Errorf("No MCU connection available")
|
||||
}
|
||||
|
||||
return publisher, nil
|
||||
}
|
||||
|
||||
func (m *mcuProxy) getPublisherConnection(publisher string, streamType StreamType) *mcuProxyConnection {
|
||||
|
@ -1820,6 +1880,30 @@ type proxyPublisherInfo struct {
|
|||
err error
|
||||
}
|
||||
|
||||
func (m *mcuProxy) createSubscriber(ctx context.Context, listener McuListener, id string, publisher string, streamType StreamType, publisherConn *mcuProxyConnection, connections []*mcuProxyConnection, isAllowed func(c *mcuProxyConnection) bool) McuSubscriber {
|
||||
for _, conn := range connections {
|
||||
if !isAllowed(conn) || conn.IsShutdownScheduled() || conn.IsTemporary() {
|
||||
continue
|
||||
}
|
||||
|
||||
var subscriber McuSubscriber
|
||||
var err error
|
||||
if conn == publisherConn {
|
||||
subscriber, err = conn.newSubscriber(ctx, listener, id, publisher, streamType)
|
||||
} else {
|
||||
subscriber, err = conn.newRemoteSubscriber(ctx, listener, id, publisher, streamType, publisherConn)
|
||||
}
|
||||
if err != nil {
|
||||
log.Printf("Could not create subscriber for %s publisher %s on %s: %s", streamType, publisher, conn, err)
|
||||
continue
|
||||
}
|
||||
|
||||
return subscriber
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *mcuProxy) NewSubscriber(ctx context.Context, listener McuListener, publisher string, streamType StreamType, initiator McuInitiator) (McuSubscriber, error) {
|
||||
var publisherInfo *proxyPublisherInfo
|
||||
if conn := m.getPublisherConnection(publisher, streamType); conn != nil {
|
||||
|
@ -1956,21 +2040,52 @@ func (m *mcuProxy) NewSubscriber(ctx context.Context, listener McuListener, publ
|
|||
return nil, publisherInfo.err
|
||||
}
|
||||
|
||||
if !publisherInfo.conn.IsSameCountry(initiator) {
|
||||
bw := publisherInfo.conn.Bandwidth()
|
||||
allowOutgoing := bw == nil || bw.AllowOutgoing()
|
||||
if !allowOutgoing || !publisherInfo.conn.IsSameCountry(initiator) {
|
||||
connections := m.getSortedConnections(initiator)
|
||||
if len(connections) > 0 && !connections[0].IsSameCountry(publisherInfo.conn) {
|
||||
if !allowOutgoing || len(connections) > 0 && !connections[0].IsSameCountry(publisherInfo.conn) {
|
||||
// Connect to remote publisher through "closer" gateway.
|
||||
for _, conn := range connections {
|
||||
if conn.IsShutdownScheduled() || conn.IsTemporary() || conn == publisherInfo.conn {
|
||||
continue
|
||||
subscriber := m.createSubscriber(ctx, listener, publisherInfo.id, publisher, streamType, publisherInfo.conn, connections, func(c *mcuProxyConnection) bool {
|
||||
bw := c.Bandwidth()
|
||||
return bw == nil || bw.AllowOutgoing()
|
||||
})
|
||||
if subscriber == nil {
|
||||
connections2 := make([]*mcuProxyConnection, 0, len(connections))
|
||||
for _, c := range connections {
|
||||
if c.Bandwidth() != nil {
|
||||
connections2 = append(connections2, c)
|
||||
}
|
||||
}
|
||||
SlicesSortFunc(connections2, func(a *mcuProxyConnection, b *mcuProxyConnection) int {
|
||||
var outgoing_a *float64
|
||||
if bw := a.Bandwidth(); bw != nil {
|
||||
outgoing_a = bw.Outgoing
|
||||
}
|
||||
|
||||
subscriber, err := conn.newRemoteSubscriber(ctx, listener, publisherInfo.id, publisher, streamType, publisherInfo.conn)
|
||||
if err != nil {
|
||||
log.Printf("Could not create subscriber for %s publisher %s on %s: %s", streamType, publisher, conn, err)
|
||||
continue
|
||||
}
|
||||
var outgoing_b *float64
|
||||
if bw := b.Bandwidth(); bw != nil {
|
||||
outgoing_b = bw.Outgoing
|
||||
}
|
||||
|
||||
if outgoing_a == nil && outgoing_b == nil {
|
||||
return 0
|
||||
} else if outgoing_a == nil && outgoing_b != nil {
|
||||
return -1
|
||||
} else if outgoing_a != nil && outgoing_b == nil {
|
||||
return -1
|
||||
} else if *outgoing_a < *outgoing_b {
|
||||
return -1
|
||||
} else if *outgoing_a > *outgoing_b {
|
||||
return 1
|
||||
}
|
||||
return 0
|
||||
})
|
||||
subscriber = m.createSubscriber(ctx, listener, publisherInfo.id, publisher, streamType, publisherInfo.conn, connections2, func(c *mcuProxyConnection) bool {
|
||||
return true
|
||||
})
|
||||
}
|
||||
if subscriber != nil {
|
||||
return subscriber, nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -446,6 +446,8 @@ type TestProxyServerHandler struct {
|
|||
|
||||
mu sync.Mutex
|
||||
load atomic.Int64
|
||||
incoming atomic.Pointer[float64]
|
||||
outgoing atomic.Pointer[float64]
|
||||
clients map[string]*testProxyServerClient
|
||||
publishers map[string]*testProxyServerPublisher
|
||||
subscribers map[string]*testProxyServerSubscriber
|
||||
|
@ -523,36 +525,75 @@ func (h *TestProxyServerHandler) deleteSubscriber(id string) (*testProxyServerSu
|
|||
return sub, true
|
||||
}
|
||||
|
||||
func (h *TestProxyServerHandler) updateLoad(delta int64) {
|
||||
if delta == 0 {
|
||||
return
|
||||
func (h *TestProxyServerHandler) UpdateBandwidth(incoming float64, outgoing float64) {
|
||||
h.incoming.Store(&incoming)
|
||||
h.outgoing.Store(&outgoing)
|
||||
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
msg := h.getLoadMessage(h.load.Load())
|
||||
for _, c := range h.clients {
|
||||
c.sendMessage(msg)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *TestProxyServerHandler) Clear(incoming bool, outgoing bool) {
|
||||
if incoming {
|
||||
h.incoming.Store(nil)
|
||||
}
|
||||
if outgoing {
|
||||
h.outgoing.Store(nil)
|
||||
}
|
||||
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
load := h.load.Add(delta)
|
||||
msg := h.getLoadMessage(h.load.Load())
|
||||
for _, c := range h.clients {
|
||||
go func(c *testProxyServerClient, load int64) {
|
||||
c.sendMessage(&ProxyServerMessage{
|
||||
Type: "event",
|
||||
Event: &EventProxyServerMessage{
|
||||
Type: "update-load",
|
||||
Load: load,
|
||||
},
|
||||
})
|
||||
}(c, load)
|
||||
c.sendMessage(msg)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *TestProxyServerHandler) getLoadMessage(load int64) *ProxyServerMessage {
|
||||
msg := &ProxyServerMessage{
|
||||
Type: "event",
|
||||
Event: &EventProxyServerMessage{
|
||||
Type: "update-load",
|
||||
Load: load,
|
||||
},
|
||||
}
|
||||
|
||||
incoming := h.incoming.Load()
|
||||
outgoing := h.outgoing.Load()
|
||||
if incoming != nil || outgoing != nil {
|
||||
msg.Event.Bandwidth = &EventProxyServerBandwidth{
|
||||
Incoming: incoming,
|
||||
Outgoing: outgoing,
|
||||
}
|
||||
}
|
||||
return msg
|
||||
}
|
||||
|
||||
func (h *TestProxyServerHandler) updateLoad(delta int64) {
|
||||
if delta == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
load := h.load.Add(delta)
|
||||
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
msg := h.getLoadMessage(load)
|
||||
for _, c := range h.clients {
|
||||
go c.sendMessage(msg)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *TestProxyServerHandler) sendLoad(c *testProxyServerClient) {
|
||||
c.sendMessage(&ProxyServerMessage{
|
||||
Type: "event",
|
||||
Event: &EventProxyServerMessage{
|
||||
Type: "update-load",
|
||||
Load: h.load.Load(),
|
||||
},
|
||||
})
|
||||
msg := h.getLoadMessage(h.load.Load())
|
||||
c.sendMessage(msg)
|
||||
}
|
||||
|
||||
func (h *TestProxyServerHandler) removeClient(client *testProxyServerClient) {
|
||||
|
@ -810,6 +851,153 @@ func Test_ProxyWaitForPublisher(t *testing.T) {
|
|||
defer pub.Close(context.Background())
|
||||
}
|
||||
|
||||
func Test_ProxyPublisherBandwidth(t *testing.T) {
|
||||
CatchLogForTest(t)
|
||||
t.Parallel()
|
||||
server1 := NewProxyServerForTest(t, "DE")
|
||||
server2 := NewProxyServerForTest(t, "DE")
|
||||
mcu := newMcuProxyForTestWithServers(t, []*TestProxyServerHandler{
|
||||
server1,
|
||||
server2,
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
||||
defer cancel()
|
||||
|
||||
pub1Id := "the-publisher-1"
|
||||
pub1Sid := "1234567890"
|
||||
pub1Listener := &MockMcuListener{
|
||||
publicId: pub1Id + "-public",
|
||||
}
|
||||
pub1Initiator := &MockMcuInitiator{
|
||||
country: "DE",
|
||||
}
|
||||
pub1, err := mcu.NewPublisher(ctx, pub1Listener, pub1Id, pub1Sid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pub1Initiator)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
defer pub1.Close(context.Background())
|
||||
|
||||
if pub1.(*mcuProxyPublisher).conn.rawUrl == server1.URL {
|
||||
server1.UpdateBandwidth(100, 0)
|
||||
} else {
|
||||
server2.UpdateBandwidth(100, 0)
|
||||
}
|
||||
|
||||
// Wait until proxy has been updated
|
||||
for ctx.Err() == nil {
|
||||
mcu.connectionsMu.RLock()
|
||||
connections := mcu.connections
|
||||
mcu.connectionsMu.RUnlock()
|
||||
missing := true
|
||||
for _, c := range connections {
|
||||
if c.Bandwidth() != nil {
|
||||
missing = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if !missing {
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
|
||||
pub2Id := "the-publisher-2"
|
||||
pub2id := "1234567890"
|
||||
pub2Listener := &MockMcuListener{
|
||||
publicId: pub2Id + "-public",
|
||||
}
|
||||
pub2Initiator := &MockMcuInitiator{
|
||||
country: "DE",
|
||||
}
|
||||
pub2, err := mcu.NewPublisher(ctx, pub2Listener, pub2Id, pub2id, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pub2Initiator)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
defer pub2.Close(context.Background())
|
||||
|
||||
if pub1.(*mcuProxyPublisher).conn.rawUrl == pub2.(*mcuProxyPublisher).conn.rawUrl {
|
||||
t.Errorf("servers should be different, got %s", pub1.(*mcuProxyPublisher).conn.rawUrl)
|
||||
}
|
||||
}
|
||||
|
||||
func Test_ProxyPublisherBandwidthOverload(t *testing.T) {
|
||||
CatchLogForTest(t)
|
||||
t.Parallel()
|
||||
server1 := NewProxyServerForTest(t, "DE")
|
||||
server2 := NewProxyServerForTest(t, "DE")
|
||||
mcu := newMcuProxyForTestWithServers(t, []*TestProxyServerHandler{
|
||||
server1,
|
||||
server2,
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
||||
defer cancel()
|
||||
|
||||
pub1Id := "the-publisher-1"
|
||||
pub1Sid := "1234567890"
|
||||
pub1Listener := &MockMcuListener{
|
||||
publicId: pub1Id + "-public",
|
||||
}
|
||||
pub1Initiator := &MockMcuInitiator{
|
||||
country: "DE",
|
||||
}
|
||||
pub1, err := mcu.NewPublisher(ctx, pub1Listener, pub1Id, pub1Sid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pub1Initiator)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
defer pub1.Close(context.Background())
|
||||
|
||||
// If all servers are bandwidth loaded, select the one with the least usage.
|
||||
if pub1.(*mcuProxyPublisher).conn.rawUrl == server1.URL {
|
||||
server1.UpdateBandwidth(100, 0)
|
||||
server2.UpdateBandwidth(102, 0)
|
||||
} else {
|
||||
server1.UpdateBandwidth(102, 0)
|
||||
server2.UpdateBandwidth(100, 0)
|
||||
}
|
||||
|
||||
// Wait until proxy has been updated
|
||||
for ctx.Err() == nil {
|
||||
mcu.connectionsMu.RLock()
|
||||
connections := mcu.connections
|
||||
mcu.connectionsMu.RUnlock()
|
||||
missing := false
|
||||
for _, c := range connections {
|
||||
if c.Bandwidth() == nil {
|
||||
missing = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !missing {
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
|
||||
pub2Id := "the-publisher-2"
|
||||
pub2id := "1234567890"
|
||||
pub2Listener := &MockMcuListener{
|
||||
publicId: pub2Id + "-public",
|
||||
}
|
||||
pub2Initiator := &MockMcuInitiator{
|
||||
country: "DE",
|
||||
}
|
||||
pub2, err := mcu.NewPublisher(ctx, pub2Listener, pub2Id, pub2id, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pub2Initiator)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
defer pub2.Close(context.Background())
|
||||
|
||||
if pub1.(*mcuProxyPublisher).conn.rawUrl != pub2.(*mcuProxyPublisher).conn.rawUrl {
|
||||
t.Errorf("servers should be the same, got %s / %s", pub1.(*mcuProxyPublisher).conn.rawUrl, pub2.(*mcuProxyPublisher).conn.rawUrl)
|
||||
}
|
||||
}
|
||||
|
||||
func Test_ProxyPublisherLoad(t *testing.T) {
|
||||
CatchLogForTest(t)
|
||||
t.Parallel()
|
||||
|
@ -1015,3 +1203,144 @@ func Test_ProxySubscriberCountry(t *testing.T) {
|
|||
t.Errorf("expected server %s, go %s", serverUS.URL, sub.(*mcuProxySubscriber).conn.rawUrl)
|
||||
}
|
||||
}
|
||||
|
||||
func Test_ProxySubscriberBandwidth(t *testing.T) {
|
||||
CatchLogForTest(t)
|
||||
t.Parallel()
|
||||
serverDE := NewProxyServerForTest(t, "DE")
|
||||
serverUS := NewProxyServerForTest(t, "US")
|
||||
mcu := newMcuProxyForTestWithServers(t, []*TestProxyServerHandler{
|
||||
serverDE,
|
||||
serverUS,
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
||||
defer cancel()
|
||||
|
||||
pubId := "the-publisher"
|
||||
pubSid := "1234567890"
|
||||
pubListener := &MockMcuListener{
|
||||
publicId: pubId + "-public",
|
||||
}
|
||||
pubInitiator := &MockMcuInitiator{
|
||||
country: "DE",
|
||||
}
|
||||
pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
defer pub.Close(context.Background())
|
||||
|
||||
if pub.(*mcuProxyPublisher).conn.rawUrl != serverDE.URL {
|
||||
t.Errorf("expected server %s, go %s", serverDE.URL, pub.(*mcuProxyPublisher).conn.rawUrl)
|
||||
}
|
||||
|
||||
serverDE.UpdateBandwidth(0, 100)
|
||||
|
||||
// Wait until proxy has been updated
|
||||
for ctx.Err() == nil {
|
||||
mcu.connectionsMu.RLock()
|
||||
connections := mcu.connections
|
||||
mcu.connectionsMu.RUnlock()
|
||||
missing := true
|
||||
for _, c := range connections {
|
||||
if c.Bandwidth() != nil {
|
||||
missing = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if !missing {
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
|
||||
subListener := &MockMcuListener{
|
||||
publicId: "subscriber-public",
|
||||
}
|
||||
subInitiator := &MockMcuInitiator{
|
||||
country: "US",
|
||||
}
|
||||
sub, err := mcu.NewSubscriber(ctx, subListener, pubId, StreamTypeVideo, subInitiator)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
defer sub.Close(context.Background())
|
||||
|
||||
if sub.(*mcuProxySubscriber).conn.rawUrl != serverUS.URL {
|
||||
t.Errorf("expected server %s, go %s", serverUS.URL, sub.(*mcuProxySubscriber).conn.rawUrl)
|
||||
}
|
||||
}
|
||||
|
||||
func Test_ProxySubscriberBandwidthOverload(t *testing.T) {
|
||||
CatchLogForTest(t)
|
||||
t.Parallel()
|
||||
serverDE := NewProxyServerForTest(t, "DE")
|
||||
serverUS := NewProxyServerForTest(t, "US")
|
||||
mcu := newMcuProxyForTestWithServers(t, []*TestProxyServerHandler{
|
||||
serverDE,
|
||||
serverUS,
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
|
||||
defer cancel()
|
||||
|
||||
pubId := "the-publisher"
|
||||
pubSid := "1234567890"
|
||||
pubListener := &MockMcuListener{
|
||||
publicId: pubId + "-public",
|
||||
}
|
||||
pubInitiator := &MockMcuInitiator{
|
||||
country: "DE",
|
||||
}
|
||||
pub, err := mcu.NewPublisher(ctx, pubListener, pubId, pubSid, StreamTypeVideo, 0, MediaTypeVideo|MediaTypeAudio, pubInitiator)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
defer pub.Close(context.Background())
|
||||
|
||||
if pub.(*mcuProxyPublisher).conn.rawUrl != serverDE.URL {
|
||||
t.Errorf("expected server %s, go %s", serverDE.URL, pub.(*mcuProxyPublisher).conn.rawUrl)
|
||||
}
|
||||
|
||||
serverDE.UpdateBandwidth(0, 100)
|
||||
serverUS.UpdateBandwidth(0, 102)
|
||||
|
||||
// Wait until proxy has been updated
|
||||
for ctx.Err() == nil {
|
||||
mcu.connectionsMu.RLock()
|
||||
connections := mcu.connections
|
||||
mcu.connectionsMu.RUnlock()
|
||||
missing := false
|
||||
for _, c := range connections {
|
||||
if c.Bandwidth() == nil {
|
||||
missing = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !missing {
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
|
||||
subListener := &MockMcuListener{
|
||||
publicId: "subscriber-public",
|
||||
}
|
||||
subInitiator := &MockMcuInitiator{
|
||||
country: "US",
|
||||
}
|
||||
sub, err := mcu.NewSubscriber(ctx, subListener, pubId, StreamTypeVideo, subInitiator)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
defer sub.Close(context.Background())
|
||||
|
||||
if sub.(*mcuProxySubscriber).conn.rawUrl != serverDE.URL {
|
||||
t.Errorf("expected server %s, go %s", serverDE.URL, sub.(*mcuProxySubscriber).conn.rawUrl)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,6 +41,20 @@ tokentype = static
|
|||
# self-signed certificates.
|
||||
#skipverify = false
|
||||
|
||||
[bandwidth]
|
||||
# Target bandwidth limit for incoming streams (in megabits per second).
|
||||
# Set to 0 to disable the limit. If the limit is reached, the proxy notifies
|
||||
# the signaling servers that another proxy should be used for publishing if
|
||||
# possible.
|
||||
#incoming = 1024
|
||||
|
||||
# Target bandwidth limit for outgoing streams (in megabits per second).
|
||||
# Set to 0 to disable the limit. If the limit is reached, the proxy notifies
|
||||
# the signaling servers that another proxy should be used for subscribing if
|
||||
# possible. Note that this might require additional outgoing bandwidth for the
|
||||
# remote streams.
|
||||
#outgoing = 1024
|
||||
|
||||
[tokens]
|
||||
# For token type "static": Mapping of <tokenid> = <publickey> of signaling
|
||||
# servers allowed to connect.
|
||||
|
|
|
@ -100,6 +100,11 @@ type ProxyServer struct {
|
|||
stopped atomic.Bool
|
||||
load atomic.Int64
|
||||
|
||||
maxIncoming int64
|
||||
currentIncoming atomic.Int64
|
||||
maxOutgoing int64
|
||||
currentOutgoing atomic.Int64
|
||||
|
||||
shutdownChannel chan struct{}
|
||||
shutdownScheduled atomic.Bool
|
||||
|
||||
|
@ -280,11 +285,32 @@ func NewProxyServer(r *mux.Router, version string, config *goconf.ConfigFile) (*
|
|||
log.Printf("No token id configured, remote streams will be disabled")
|
||||
}
|
||||
|
||||
maxIncoming, _ := config.GetInt("bandwidth", "incoming")
|
||||
if maxIncoming < 0 {
|
||||
maxIncoming = 0
|
||||
}
|
||||
if maxIncoming > 0 {
|
||||
log.Printf("Target bandwidth for incoming streams: %d MBit/s", maxIncoming)
|
||||
} else {
|
||||
log.Printf("Target bandwidth for incoming streams: unlimited")
|
||||
}
|
||||
maxOutgoing, _ := config.GetInt("bandwidth", "outgoing")
|
||||
if maxOutgoing < 0 {
|
||||
maxOutgoing = 0
|
||||
}
|
||||
if maxIncoming > 0 {
|
||||
log.Printf("Target bandwidth for outgoing streams: %d MBit/s", maxOutgoing)
|
||||
} else {
|
||||
log.Printf("Target bandwidth for outgoing streams: unlimited")
|
||||
}
|
||||
|
||||
result := &ProxyServer{
|
||||
version: version,
|
||||
country: country,
|
||||
welcomeMessage: string(welcomeMessage) + "\n",
|
||||
config: config,
|
||||
maxIncoming: int64(maxIncoming) * 1024 * 1024,
|
||||
maxOutgoing: int64(maxOutgoing) * 1024 * 1024,
|
||||
|
||||
shutdownChannel: make(chan struct{}),
|
||||
|
||||
|
@ -413,18 +439,7 @@ loop:
|
|||
}
|
||||
}
|
||||
|
||||
func (s *ProxyServer) updateLoad() {
|
||||
load := s.GetClientsLoad()
|
||||
if load == s.load.Load() {
|
||||
return
|
||||
}
|
||||
|
||||
s.load.Store(load)
|
||||
if s.shutdownScheduled.Load() {
|
||||
// Server is scheduled to shutdown, no need to update clients with current load.
|
||||
return
|
||||
}
|
||||
|
||||
func (s *ProxyServer) newLoadEvent(load int64, incoming int64, outgoing int64) *signaling.ProxyServerMessage {
|
||||
msg := &signaling.ProxyServerMessage{
|
||||
Type: "event",
|
||||
Event: &signaling.EventProxyServerMessage{
|
||||
|
@ -432,7 +447,37 @@ func (s *ProxyServer) updateLoad() {
|
|||
Load: load,
|
||||
},
|
||||
}
|
||||
if s.maxIncoming > 0 || s.maxOutgoing > 0 {
|
||||
msg.Event.Bandwidth = &signaling.EventProxyServerBandwidth{}
|
||||
if s.maxIncoming > 0 {
|
||||
value := float64(incoming) / float64(s.maxIncoming) * 100
|
||||
msg.Event.Bandwidth.Incoming = &value
|
||||
}
|
||||
if s.maxOutgoing > 0 {
|
||||
value := float64(outgoing) / float64(s.maxOutgoing) * 100
|
||||
msg.Event.Bandwidth.Outgoing = &value
|
||||
}
|
||||
}
|
||||
return msg
|
||||
}
|
||||
|
||||
func (s *ProxyServer) updateLoad() {
|
||||
load, incoming, outgoing := s.GetClientsLoad()
|
||||
if load == s.load.Load() &&
|
||||
incoming == s.currentIncoming.Load() &&
|
||||
outgoing == s.currentOutgoing.Load() {
|
||||
return
|
||||
}
|
||||
|
||||
s.load.Store(load)
|
||||
s.currentIncoming.Store(incoming)
|
||||
s.currentOutgoing.Store(outgoing)
|
||||
if s.shutdownScheduled.Load() {
|
||||
// Server is scheduled to shutdown, no need to update clients with current load.
|
||||
return
|
||||
}
|
||||
|
||||
msg := s.newLoadEvent(load, incoming, outgoing)
|
||||
s.IterateSessions(func(session *ProxySession) {
|
||||
session.sendMessage(msg)
|
||||
})
|
||||
|
@ -576,13 +621,7 @@ func (s *ProxyServer) onMcuDisconnected() {
|
|||
}
|
||||
|
||||
func (s *ProxyServer) sendCurrentLoad(session *ProxySession) {
|
||||
msg := &signaling.ProxyServerMessage{
|
||||
Type: "event",
|
||||
Event: &signaling.EventProxyServerMessage{
|
||||
Type: "update-load",
|
||||
Load: s.load.Load(),
|
||||
},
|
||||
}
|
||||
msg := s.newLoadEvent(s.load.Load(), s.currentIncoming.Load(), s.currentOutgoing.Load())
|
||||
session.sendMessage(msg)
|
||||
}
|
||||
|
||||
|
@ -1283,15 +1322,21 @@ func (s *ProxyServer) HasClients() bool {
|
|||
return len(s.clients) > 0
|
||||
}
|
||||
|
||||
func (s *ProxyServer) GetClientsLoad() int64 {
|
||||
func (s *ProxyServer) GetClientsLoad() (load int64, incoming int64, outgoing int64) {
|
||||
s.clientsLock.RLock()
|
||||
defer s.clientsLock.RUnlock()
|
||||
|
||||
var load int64
|
||||
for _, c := range s.clients {
|
||||
load += int64(c.MaxBitrate())
|
||||
bitrate := int64(c.MaxBitrate())
|
||||
load += bitrate
|
||||
if _, ok := c.(signaling.McuPublisher); ok {
|
||||
incoming += bitrate
|
||||
} else if _, ok := c.(signaling.McuSubscriber); ok {
|
||||
outgoing += bitrate
|
||||
}
|
||||
}
|
||||
return load / 1024
|
||||
load = load / 1024
|
||||
return
|
||||
}
|
||||
|
||||
func (s *ProxyServer) GetClient(id string) signaling.McuClient {
|
||||
|
|
34
slices_go120.go
Normal file
34
slices_go120.go
Normal file
|
@ -0,0 +1,34 @@
|
|||
//go:build !go1.21
|
||||
|
||||
/**
|
||||
* Standalone signaling server for the Nextcloud Spreed app.
|
||||
* Copyright (C) 2024 struktur AG
|
||||
*
|
||||
* @author Joachim Bauch <bauch@struktur.de>
|
||||
*
|
||||
* @license GNU AGPL version 3 or any later version
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
package signaling
|
||||
|
||||
import (
|
||||
"sort"
|
||||
)
|
||||
|
||||
func SlicesSortFunc[T any](l []T, f func(a T, b T) int) {
|
||||
sort.Slice(l, func(i, j int) bool {
|
||||
return f(l[i], l[j]) < 0
|
||||
})
|
||||
}
|
32
slices_go121.go
Normal file
32
slices_go121.go
Normal file
|
@ -0,0 +1,32 @@
|
|||
//go:build go1.21
|
||||
|
||||
/**
|
||||
* Standalone signaling server for the Nextcloud Spreed app.
|
||||
* Copyright (C) 2024 struktur AG
|
||||
*
|
||||
* @author Joachim Bauch <bauch@struktur.de>
|
||||
*
|
||||
* @license GNU AGPL version 3 or any later version
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
package signaling
|
||||
|
||||
import (
|
||||
"slices"
|
||||
)
|
||||
|
||||
func SlicesSortFunc[T any](l []T, f func(a T, b T) int) {
|
||||
slices.SortFunc(l, f)
|
||||
}
|
Loading…
Add table
Reference in a new issue