Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 40 additions & 6 deletions 46 LoopLibrary/BaseLoop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,20 @@
using System.Threading.Tasks;
using DebugPrintLibrary;
using EventServerCore;
using System.Linq;
using System.Collections.Generic;

namespace LoopLibrary
{
public abstract class BaseLoop<T>
{
public readonly ulong id;
private readonly int _interval;
private int _interval;
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private int _count;
public CancellationTokenSource Cts => _cts;
private readonly int _maxTimespans = 100;
private Queue<double> _timespans = new Queue<double>();

protected BaseLoop(int interval, ulong id = 1)
{
Expand All @@ -34,9 +38,19 @@ private async Task<Result<T>> Loop(CancellationToken token)
{
var delay = Task.Delay(_interval, token);

await Update(_count);
_count++;

var start = DateTime.Now;
await Update(_count++);
var timespan = DateTime.Now - start;

lock (_timespans)
{
_timespans.Enqueue(timespan.TotalMilliseconds);
if (_timespans.Count > _maxTimespans)
{
_timespans.Dequeue();
}
}

await delay;
}
}
Expand All @@ -57,7 +71,7 @@ private async Task<Result<T>> Loop(CancellationToken token)
}
}

protected virtual void Start(){}
protected virtual void Start() { }

protected abstract Task Update(int count);

Expand All @@ -68,12 +82,32 @@ protected virtual void OnCancel()

public void Done(T result)
{
throw new OperationCompletedException<Result<T>>(new Result<T>(true ,result));
throw new OperationCompletedException<Result<T>>(new Result<T>(true, result));
}

public void Cancel()
{
throw new OperationCanceledException();
}

public double GetFps()
{
lock (_timespans)
{
if (_timespans.Count > 0)
{
return 1000 / _timespans.Average();
}
else
{
return 0;
}
}
}

public void SetInterval(int ms)
{
_interval = ms;
}
}
}
1 change: 1 addition & 0 deletions 1 StreamClient/StreamClient.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<Nullable>enable</Nullable>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
</PropertyGroup>

<ItemGroup>
Expand Down
12 changes: 8 additions & 4 deletions 12 StreamServer.Test/PacketTest.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;
using System.Linq;
using CommonLibrary;
using Xunit;
using Xunit.Abstractions;
Expand All @@ -20,7 +21,8 @@ public void PacketTest1()
var packet = new MinimumAvatarPacket(1, new Vector3(2, 5, 6), 10,
new Vector4(2, 4, 3, 120), 0);
Assert.True(packet.CheckRange(), "packet.CheckRange()");
var buff = Utility.PacketsToBuffer(new List<MinimumAvatarPacket> {packet});
var packets = new List<MinimumAvatarPacket> {packet};
var buff = Utility.PacketsToBuffer(ref packets);
var decodedPacket = Utility.BufferToPackets(buff);
Assert.Equal(packet.PaketId, decodedPacket![0].PaketId);
Assert.Equal(packet.Position, decodedPacket![0].Position);
Expand All @@ -36,7 +38,8 @@ public void PacketTest2()
var packet2 = new MinimumAvatarPacket(2, new Vector3((short) 7.5f, (short) 2.6f, (short) 9.2f), 99,
new Vector4(1, 9, 6, -1), 0);
Assert.True(packet1.CheckRange() && packet2.CheckRange(), "packet1.CheckRange() && packet2.CheckRange()");
var buff = Utility.PacketsToBuffer(new List<MinimumAvatarPacket> {packet1, packet2});
var packets = new List<MinimumAvatarPacket> {packet1, packet2};
var buff = Utility.PacketsToBuffer(ref packets);
var decodedPacket = Utility.BufferToPackets(buff);

Assert.Equal(packet1.PaketId, decodedPacket![0].PaketId);
Expand All @@ -56,7 +59,8 @@ public void PacketTest3()
var packet = new MinimumAvatarPacket(3, new Vector3(), 0,
new Vector4(), 0);
Assert.True(packet.CheckRange(), "packet.CheckRange()");
var buff = Utility.PacketsToBuffer(new List<MinimumAvatarPacket> {packet});
var packets = new List<MinimumAvatarPacket> {packet};
var buff = Utility.PacketsToBuffer(ref packets);
var decodedPacket = Utility.BufferToPackets(buff);
Assert.Equal(packet.PaketId, decodedPacket![0].PaketId);
Assert.Equal(packet.Position, decodedPacket![0].Position);
Expand All @@ -77,7 +81,7 @@ public static void PacketTest4()
Assert.True(packet.CheckRange(), $"packet.CheckRange({i})");
}

var buffs = Utility.PacketsToBuffers(packets);
var buffs = Utility.PacketsToBuffers(ref packets).ToList();

const int nSize = 29;
for (int i = 0; i < buffs.Count; ++i)
Expand Down
17 changes: 15 additions & 2 deletions 17 StreamServer/Source/StreamServer/Entry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ static void Main(string[] args)
{
UdpClient udpClient = new UdpClient(5577);
var input = new InputLoop(udpClient, 2, 1);
var output = new OutputLoop(udpClient, 1000, 2);
var statusCheck = new StatusCheckLoop(1000, 3);
var output = new OutputLoop(udpClient, 10, 2);
var statusCheck = new StatusCheckLoop(1000, 3, input, output);
input.Run();
output.Run();
statusCheck.Run();
Expand All @@ -28,6 +28,19 @@ static void Main(string[] args)
Thread.Sleep(100);
break;
}
else
{
try
{
var interval = Int32.Parse(line);
output.SetInterval(interval);
Printer.PrintDbg($"Set output interval to {interval}");
}
catch (System.Exception)
{
continue;
}
}
}
}
}
Expand Down
13 changes: 6 additions & 7 deletions 13 StreamServer/Source/StreamServer/InputLoop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public InputLoop(UdpClient udpClient, int interval, ulong id)
{
udp = udpClient;
}

protected override void Start()
{
IPEndPoint localEndPoint = (IPEndPoint)udp.Client.LocalEndPoint;
Expand All @@ -32,13 +32,12 @@ protected override async Task Update(int count)
{
try
{
UdpReceiveResult res;
res = await udp.ReceiveAsync();
var process = PacketProcessor.Process(res);
tasks.Add(process);
} catch (SocketException e)
UdpReceiveResult res = await udp.ReceiveAsync();
tasks.Add(Task.Run(() => PacketProcessor.Process(res)));
}
catch (SocketException e)
{
if (e.ErrorCode != 10054) //Client Disconnected.
if (e.ErrorCode != 10054) // Client Disconnected.
Printer.PrintDbg(e, id);
}
}
Expand Down
3 changes: 1 addition & 2 deletions 3 StreamServer/Source/StreamServer/ModelManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ namespace StreamServer
{
public class ModelManager
{
private ModelManager(){}
private ModelManager() { }
private static ModelManager? _instance;
public static ModelManager Instance => _instance ??= new ModelManager();

public readonly ConcurrentDictionary<ulong, User> Users = new ConcurrentDictionary<ulong, User>();
}
}
33 changes: 23 additions & 10 deletions 33 StreamServer/Source/StreamServer/OutputLoop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
using System.Net;
using System.Net.Sockets;
using System.Threading.Tasks;
using System.Linq;
using CommonLibrary;
using CommonLibrary.ExtensionMethod;
using DebugPrintLibrary;
using EventServerCore;
using LoopLibrary;
Expand All @@ -20,7 +22,7 @@ public OutputLoop(UdpClient udpClient, int interval, ulong id)
{
udp = udpClient;
}

protected override void Start()
{
var localEndPoint = udp.Client.LocalEndPoint as IPEndPoint;
Expand All @@ -35,11 +37,15 @@ protected override async Task Update(int count)
List<User> users = new List<User>();
foreach (var kvp in ModelManager.Instance.Users)
{
if(kvp.Value == null) continue;
if (kvp.Value == null)
continue;
var user = kvp.Value;

MinimumAvatarPacket? packet = user.CurrentPacket;
{
if (user.IsConnected && packet != null && DateTime.Now - user.DateTimeBox!.LastUpdated > new TimeSpan(0, 0, 1))
// 最終パケットが1秒以上前だったら切断したものとする
if (user.IsConnected && packet != null
&& DateTime.Now - user.DateTimeBox!.LastUpdated > new TimeSpan(0, 0, 1))
{
#if DEBUG
Printer.PrintDbg($"Disconnected: [{user.UserId.ToString()}] " +
Expand All @@ -50,15 +56,22 @@ protected override async Task Update(int count)
ModelManager.Instance.Users.TryRemove(kvp.Key, out var dummy);
}
}
if (user.IsConnected) users.Add(user);
if (packet != null) packets.Add(packet);

// 有効なユーザとその最新のパケットをまとめる
if (user.IsConnected)
users.Add(user);
if (packet != null)
packets.Add(packet);
}
List<Task> tasks = new List<Task>();
foreach (var user in users)

// 論理プロセッサ分に分割して並列処理
await Task.WhenAll(users.SplitInto(Environment.ProcessorCount).Select(userList => Task.Run(async () =>
{
tasks.Add(PacketSender.Send(user, packets, udp));
}
await Task.WhenAll(tasks);
foreach (var user in userList)
{
await PacketSender.Send(user, packets, udp);
}
})));
}
}
}
44 changes: 17 additions & 27 deletions 44 StreamServer/Source/StreamServer/PacketProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,38 +9,28 @@ namespace StreamServer
{
public static class PacketProcessor
{
public static async Task Process(UdpReceiveResult res)
public static void Process(UdpReceiveResult res)
{
try
var packet = Utility.BufferToPacket(res.Buffer);
if (!ValidatePacket(packet))
{
await Task.Run(() =>
{
var packet = Utility.BufferToPacket(res.Buffer);
if (!ValidatePacket(packet))
{
return;
}
var users = ModelManager.Instance.Users;

User? user;
if (!users.TryGetValue(packet.PaketId, out user))
{
// RemoteEndPointから来た最初のパケットの場合
user = users[packet.PaketId] = new User(packet.PaketId);
user.RemoteEndPoint = res.RemoteEndPoint;
Printer.PrintDbg($"Connected: [{user.UserId.ToString()}] " +
$"({res.RemoteEndPoint.Address}: {res.RemoteEndPoint.Port.ToString()})");
}

user.CurrentPacket = packet;
user.DateTimeBox = new DateTimeBox(DateTime.Now);
user.IsConnected = true;
});
return;
}
catch (Exception e)
var users = ModelManager.Instance.Users;

User? user;
if (!users.TryGetValue(packet.PaketId, out user))
{
Printer.PrintDbg(e);
// RemoteEndPointから来た最初のパケットの場合
user = users[packet.PaketId] = new User(packet.PaketId);
user.RemoteEndPoint = res.RemoteEndPoint;
Printer.PrintDbg($"Connected: [{user.UserId.ToString()}] " +
$"({res.RemoteEndPoint.Address}: {res.RemoteEndPoint.Port.ToString()})");
}

user.CurrentPacket = packet;
user.DateTimeBox = new DateTimeBox(DateTime.Now);
user.IsConnected = true;
}

private static bool ValidatePacket(in MinimumAvatarPacket? packet)
Expand Down
50 changes: 19 additions & 31 deletions 50 StreamServer/Source/StreamServer/PacketSender.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
Expand All @@ -14,40 +14,28 @@ public static class PacketSender
{
public static async Task Send(User user, List<MinimumAvatarPacket> packets, UdpClient udp)
{
await Task.Run(async () =>
var packetCopy = packets.ToList();
const int maxCount = 100;
if (user.CurrentPacket != null)
{
var packetCopy = packets.ToList();
if (user.CurrentPacket != null)
{
var selfPosition = user.CurrentPacket.Position;

packetCopy.HeapSort((a, b) =>
{
var aSquare = Vector3.Square(a.Position, selfPosition);
var bSquare = Vector3.Square(b.Position, selfPosition);
var comp = aSquare < bSquare ? -1 : 1;
return comp;
});
}
var selfPosition = user.CurrentPacket.Position;

if (packetCopy.Count > 100)
packetCopy = packetCopy.GetRange(0, 100);
var buffs = Utility.PacketsToBuffers(packetCopy);
var tasks = new List<Task>();
foreach (var buf in buffs)
packetCopy.HeapSort((a, b) =>
{
tasks.Add(udp.SendAsync(buf, buf.Length, user.RemoteEndPoint));
}
var aSquare = Vector3.Square(a.Position, selfPosition);
var bSquare = Vector3.Square(b.Position, selfPosition);
var comp = aSquare < bSquare ? -1 : 1;
return comp;
}, maxCount);
}

try
{
await Task.WhenAll(tasks);
}
catch (Exception e)
{
Printer.PrintDbg(e);
}
});
if (packetCopy.Count > maxCount)
packetCopy = packetCopy.GetRange(0, maxCount);
var buffs = Utility.PacketsToBuffers(ref packetCopy);
foreach (var buf in buffs)
{
await udp.SendAsync(buf, buf.Length, user.RemoteEndPoint);
}
}
}
}
Loading
Morty Proxy This is a proxified and sanitized view of the page, visit original site.