在这个Demo中,为前后端了一致性,服务器需要进行物理运算与权威结果的发布。为了不使得技术栈过于复杂,前后端都使用Unity来编写,其中后端使用无头模式。
消息的组成
Demo使用了ProtocolBuffer3来作为通信的中间载体。具体请看上一篇。而序列化的消息都由消息头和pb3序列化信息组成。
消息头
消息头的数据如下所示:
public static readonly Int16 headByteNum = 2 + 2 + 4 + 2 + 4;
public UInt16 package_id;//此次发送的id
public UInt16 ACK_ID;//应答
public Int32 ACK_bit;//一起发回去的应答
public UInt16 msg_id;//消息id
public UInt32 UUID;//玩家唯一id
package_id是这个包的序号,ACK返回的应答就是对于它的。为了节约空间,在分析了发包的频次之后可以适当的减少package_id的长度,在到达了变量类型的最大值之后可以回到最小值重新计数。
其中 ACK_ID为当前收到的最后面的消息的ID,需要告诉通信的另一边已收到。
而ACK_bit则是基于上述ACK_ID的位偏移的收到信息。比如ACK_ID为20,ACK_bit为0101,则[16,17,18,19]中明确收到了17与19。这么做是因为一方面按位存储节约了通信的数据量,另一方面过早的消息其实已经可以认为它失去了应答的价值。
msg_id用以定义消息主体pb3的具体类型用以反序列化处理。这一步会在后面的消息中心中具体阐述。
UUID是用以区分发包的主体是哪个玩家的。
消息头的ACK添加和获取代码如下所示:
/// <summary>
/// 按位设置其它ACK
/// </summary>
public void SetOtherACK(UInt16 other_ACK_ID)
{
if (ACK_ID == other_ACK_ID)
return;
int offset;
if (ACK_ID < other_ACK_ID)
offset = UInt16.MaxValue - (other_ACK_ID - ACK_ID);
else
offset = ACK_ID - other_ACK_ID - 1;
if (offset > 32)
return;
Int32 mask = 1 << offset;
ACK_bit = ACK_bit | mask;
}
public List<UInt16> GetAllACK()
{
List<UInt16> answer = new List<UInt16>();
answer.Add(ACK_ID);
for (int offset = 0; offset < 32; offset++)
{
Int32 mask = 1 << offset;
bool isHaveACK = (ACK_bit & mask) > 0;
if (isHaveACK)
{
int ack = ACK_ID - (offset + 1);
if (ack < 0)
{
ack += UInt16.MaxValue + 1;//循环利用边界的特殊处理
}
answer.Add((UInt16)ack);
}
}
return answer;
}
消息处理中心
消息中心主要负责两件事情:
- 对收到的消息进行处理
- 对超时的消息进行处理
主要数据如下所示:
//时间戳起点
private readonly static DateTime utc = new DateTime(1970, 1, 1);
//每秒网络逻辑帧数fps
public const float logicFramRate = 30f;
//心跳时间ms
public const int heartMillionSecond = (int)(1000.0f / logicFramRate);
//超时时间ms
public const int outTime = heartMillionSecond * 6;
//掉线的丢包率与RTT
public const float disconnectMaxLossRate = 0.1f;
public const int disconnectMaxRTT = heartMillionSecond * 12;
/// <summary>
/// 客户端C到服务器S的解析方法的字典
/// </summary>
private static Dictionary<UInt16, MsgHandleData> CSMessageParserDic = new Dictionary<UInt16, MsgHandleData>();
/// <summary>
/// 服务器S到客户端C的解析方法的字典
/// </summary>
private static Dictionary<UInt16, MsgHandleData> SCMessageParserDic = new Dictionary<UInt16, MsgHandleData>();
/// <summary>
/// CS 包超时处理方法 的字典
/// </summary>
private static Dictionary<UInt16, Action> CSMessageOutTimeDic = new Dictionary<ushort, Action>();
/// <summary>
/// SC 包超时处理方法 的字典
/// </summary>
private static Dictionary<UInt16, Action> SCMessageOutTimeDic = new Dictionary<ushort, Action>();
其中很多都是用以控制收发的常量。
对收到的消息进行处理
首先,由需要响应消息的代码来注册特定消息的处理流程,注册完成后收到特性的消息则交由特定的回调和解析代码去处理。
从客户端到服务器(cs)的具体代码如下:
/// <summary>
/// 消息处理方式内部类
/// </summary>
private class MsgHandleData
{
public int msgId;
public MessageParser messageParser;
public Action<UDPHead, object> callback;
}
/// <summary>
/// 添加对CS消息的监听
/// </summary>
public static void AddMsgListener_CS(CS_MSG_ID cs_msg_id, MessageParser parser, Action<UDPHead, object> callback)
{
UInt16 msgId = (UInt16)cs_msg_id;
if (CSMessageParserDic.ContainsKey(msgId))
{
Debug.LogError($"重复的CS监听注册 msgId:{msgId}");
return;
}
MsgHandleData data = new MsgHandleData();
data.messageParser = parser;
data.callback = callback;
data.msgId = msgId;
CSMessageParserDic.Add(msgId, data);
}
/// <summary>
/// 移除对CS消息的监听
/// </summary>
public static void RemoveMsgListener_CS(CS_MSG_ID cs_msg_id)
{
UInt16 msgId = (UInt16)cs_msg_id;
if (CSMessageParserDic.ContainsKey(msgId))
{
CSMessageParserDic.Remove(msgId);
}
}
/// <summary>
/// 当收到CS
/// </summary>
public static void OnReceiveCSMsg(UDPHead head, byte[] msg, int startIndex)
{
if (!CSMessageParserDic.TryGetValue(head.msg_id, out MsgHandleData data))
return;
var msgData = data.messageParser.ParseFrom(msg, startIndex, msg.Length - startIndex);//let it break
data.callback.Invoke(head, msgData);
}
对超时的消息进行处理
在消息中心中需要实现的与收到类似,也是注册和触发。具体的超时会在客户端和服务器中讲到。
异步收发的实现
客户端Socket
使用C#的UdpClient作为基本的通信用socket。它本身是阻塞形式的,这里使用了一个async Task来开启多线程;在收到新的消息之后,为了和unity本身的主逻辑通信,使用了一个线程安全队列来注册处理新到来的消息。
namespace MyNetwork
{
/// <summary>
/// 真正的通信代码
/// </summary>
public class ClientSocket
{
IPEndPoint serverEndPoint;// 服务器终结点
UdpClient udpClient;// UDPSocket
byte[] msg_buff;//发送消息的缓冲
Task receiveTask; //异步Task
bool isWait2Receiving;//是否等待中
Action<byte[]> receiveAction;
/// <summary>
/// 创建一个通信
/// </summary>
/// <param name="buffSize">最大缓冲区大小</param>
public ClientSocket(Action<byte[]> receiveAction, int buffSize = 1024)
{
this.receiveAction = receiveAction;
msg_buff = new byte[buffSize];
}
/// <summary>
/// 初始化UDP Socket
/// </summary>
/// <param name="hostName"></param>
/// <param name="port"></param>
public void StartUDP(string hostName, int port)
{
if (isWait2Receiving)
{
return;
}
serverEndPoint = new IPEndPoint(IPAddress.Parse(hostName), port);
udpClient = new UdpClient();
udpClient.Connect(serverEndPoint);
isWait2Receiving = true;
receiveTask = ReceiveMessage();
}
/// <summary>
/// 停止UDP Socket
/// </summary>
public void StopUDP()
{
if (!isWait2Receiving)
{
return;
}
udpClient.Close();
udpClient = null;
isWait2Receiving = false;
}
/// <summary>
/// 发送消息
/// </summary>
public void SendMessage(UDPHead head, IMessage message)
{
byte[] headData = head.ToByteArray();
byte[] data = message.ToByteArray();
int size = UDPHead.headByteNum + data.Length;
if (size > msg_buff.Length)
{
Debug.LogError("SendMessage prefix size or buff overflow error");
return;
}
headData.CopyTo(msg_buff, 0);
data.CopyTo(msg_buff, UDPHead.headByteNum);
udpClient?.Send(msg_buff, size);
}
/// <summary>
/// 接收消息
/// </summary>
private async Task ReceiveMessage()
{
while (isWait2Receiving)
{
try
{
UdpReceiveResult result = await udpClient.ReceiveAsync();
MonoDriver.Instance.Add2OtherThreadWaitUpdateCall(() => receiveAction?.Invoke(result.Buffer));
}
catch (Exception e)
{
MonoDriver.Instance.Add2OtherThreadWaitUpdateCall(() => Debug.LogError("ReceiveMessage Error" + e + e.StackTrace));
}
}
}
}
}
客户端
客户端由收发Socker,消息头处理,ACK处理,心跳保持等模块组成。
为了UDP的可靠性,客户端需要记录已发出的包,并在没有收到对应ACK时做出一定的操作(重传、掉线、报错等)。
为了进行拥塞控制和断线的检查,客户端需要计算RTT(客户端到服务器的往返时间)和lossRate(丢包率);并根据得到的值去控制心跳频率、发包频率等(尚未实现)。
收发消息:
收到消息时,会把消息头里的package_id记录到需要回复的队列中。
当需要发出消息时,组合这些需要回复的id形成新的消息头。
/// <summary>
/// 发送消息
/// </summary>
public void SendMessage(UInt16 msg_id, IMessage message)
{
lock (msgWaitACKDic)
{
var now = NetworkCenter.GetTimeStamp();
msgWaitACKDic.Add(package_id, new WaitACKData(now, msg_id));
Interlocked.Exchange(ref newestMsgSendTime, now);
//LogSystem.DevLog($"msgWaitACKDic添加{package_id}");
sendCount++;
}
UDPHead head = new UDPHead(this.UUID, package_id, msg_id);
#region 生成ACK
LinkedListNode<UInt16> node = wait2AckPackageIds.Last;
if (node == null)
{
head.ACK_ID = 0;
}
else
{
head.ACK_ID = node.Value;
}
for (int i = 0; i < 32; i++)
{
node = node?.Previous;
if (node == null)
break;
head.SetOtherACK(node.Value);
}
#endregion
clientSocket?.SendMessage(head, message);
PackageIdIncrease();
}
/// <summary>
/// 收到消息
/// </summary>
private void OnReceiveMessage(byte[] magData)
{
UDPHead head = new UDPHead(magData);
wait2AckPackageIds.AddLast(head.package_id);
if (wait2AckPackageIds.Count > 32)//因为最对带上32个
wait2AckPackageIds.RemoveFirst();
GetACKAndGenRTT(head);
NetworkCenter.OnReceiveSCMsg(head, magData, UDPHead.headByteNum);
}
在收到消息时,利用其中的ACK信息和本地记录的发送消息的时间,可以计算出对应的RTT,有了RTT还可以作为掉线的判别依据:
/// <summary>
/// 获取ACK 然后生成RTT
/// </summary>
private void GetACKAndGenRTT(UDPHead head)
{
lock (msgWaitACKDic)
{
List<UInt16> ACKs = head.GetAllACK();
var now = NetworkCenter.GetTimeStamp();
foreach (UInt16 ack in ACKs)
{
if (msgWaitACKDic.TryGetValue(ack, out WaitACKData waitACKData))
{
msgWaitACKDic.Remove(ack);
//LogSystem.DevLog($"msgWaitACKDic收到回复移除{ack} msgId:{waitACKData.msg_id}");
RTT = (long)(RTT * 0.9f + (now - waitACKData.sendTimeStamp) * 0.1f);
}
}
Check2Disconnect();
}
//if (RTT > )
//{
// onClientSendStateChange(clientSendState);
//}
}
/// <summary>
/// 检查是否要断开连接
/// </summary>
private void Check2Disconnect()
{
if (RTT > NetworkCenter.disconnectMaxRTT || lossRate > NetworkCenter.disconnectMaxLossRate)
{
MonoDriver.Instance.Add2OtherThreadWaitUpdateCall(() =>
{
LogSystem.DevLogWarning($"<color=red>网络过差,失去连接 RTT:{RTT} lossRate:{lossRate}</color>");
disConnectAction?.Invoke();
});
StopClient();
}
}
对于消息过期和心跳,我都使用了async Task来多线程的处理这些任务,以此增加效率。使用了lock(Object)与Interlocked等方式来保证线程安全。CancellationTokenSource则在关闭时正确的结束线程。
/// <summary>
/// 检查已经超时了的等待回信的 线程
/// </summary>
private async void CheckOutTime(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
await Task.Delay(outTimeMillisecond);
lock (msgWaitACKDic)
{
var now = NetworkCenter.GetTimeStamp();
wait2RemoveList.Clear();
if (msgWaitACKDic.Count > 0)
{
foreach (KeyValuePair<ushort, WaitACKData> item in msgWaitACKDic)
{
bool isOutTimd = now - item.Value.sendTimeStamp > outTimeMillisecond;
if (isOutTimd)
{
wait2RemoveList.Add(item.Key);
lossCount++;
}
}
foreach (ushort key in wait2RemoveList)
{
RTT = (long)(RTT * 0.9f + outTimeMillisecond * 0.1f);//加上超时时间
UInt16 msg_id = msgWaitACKDic[key].msg_id;
msgWaitACKDic.Remove(key);
LogSystem.DevLogWarning($"msgWaitACKDic移除超时{key} msgId:{msg_id}");
MonoDriver.Instance.Add2OtherThreadWaitUpdateCall(() =>
{
NetworkCenter.OnCSMsgTimeOut(msg_id);
});
}
}
lossRate = lossRate * 0.8f + lossCount / sendCount * 0.2f;
lossCount = 0;
sendCount = 0;
Check2Disconnect();
}
}
}
/// <summary>
/// 尝试发送心跳包 任务
/// 握手成功后在逻辑间隔内的时间内假如没有发过任何包就发一个心跳
/// </summary>
private async void TrySendHeartBeatMsg(CancellationToken token)
{
while (!token.IsCancellationRequested)
{
await Task.Delay(NetworkCenter.heartMillionSecond);
var now = NetworkCenter.GetTimeStamp();
if (now - Interlocked.Read(ref newestMsgSendTime) > NetworkCenter.heartMillionSecond)
{
CS_HeartBeat heartBeat = new CS_HeartBeat();
heartBeat.TimeStamp = now;
MonoDriver.Instance.Add2OtherThreadWaitUpdateCall(() =>
{
SendMessage((UInt16)CS_MSG_ID.CsHeartBeat, heartBeat);
});
}
}
}
/// <summary>
/// 客户端关闭 注意线程安全
/// </summary>
public void StopClient()
{
cancleHeartBeat?.Cancel();
sendHeartBeatTask?.Wait();
clientSocket.StopUDP();
cancleCheckOutTime?.Cancel();
checkOutTimetTask?.Wait();
isClientRunning = false;
clientSendState = ClientSendState.good;
package_id = 0;
newestMsgSendTime = 0;
RTT = 0;
lossRate = 0;
lossCount = 0;
sendCount = 0;
msgWaitACKDic.Clear();
wait2AckPackageIds.Clear();
}
服务端的主体逻辑与客户端大同小异,只是多了一层一对多的处理,有空在写了。
如何处理UDP包过大,引起传输效率变低的问题
没考虑过耶