using System; using System.Collections.Generic; using System.IO; using System.Net; using System.Net.Sockets; using System.Text; using UnityEngine; public class UidGenerator { const int VALUE_START = 10000; long m_Epoch2020 = new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc).Ticks / 10000; long m_Time = 0; uint m_Value = VALUE_START; public long TimeSince2020(long time) { return (time - m_Epoch2020) / 1000; } public long Generate(byte flag) { //Debug.Log((DateTime.UtcNow.Ticks / 10000).ToString()); long time = TimeSince2020(DateTime.UtcNow.Ticks / 10000); if (time > m_Time) { m_Time = time; m_Value = VALUE_START; } else { m_Value++; if (m_Value > ushort.MaxValue) { m_Time++; m_Value = VALUE_START; } } ulong result = 0; result |= ((ulong)flag & byte.MaxValue) << 56; result |= ((ulong)m_Time & 0xFFFFFFFFFF) << 16; result |= ((ulong)m_Value & ushort.MaxValue); return (long)result; } } public class RecvPacketPraser { const int HEAD_SIZE = 2; const int MAX_PACKET_SIZE = 2048; public enum ParserState { PacketSize, PacketBody } public ParserState State { get; private set; } = ParserState.PacketSize; public MemoryStream MemoryStream { get; private set; } int packetSize; byte[] headCache = new byte[HEAD_SIZE]; byte[] packetCache = new byte[MAX_PACKET_SIZE]; public bool Parse(CircularBuffer buffer) { switch (State) { case ParserState.PacketSize: if (buffer.Length < HEAD_SIZE) { return false; } MemoryStream = null; buffer.Read(headCache, 0, HEAD_SIZE); packetSize = headCache[0] << 8 | headCache[1]; if (packetSize > MAX_PACKET_SIZE) { throw new Exception($"Size error {packetSize}"); } else if (packetSize == 0) { return true; } State = ParserState.PacketBody; break; case ParserState.PacketBody: if (buffer.Length < packetSize) { return false; } buffer.Read(packetCache, 0, packetSize); MemoryStream = new MemoryStream(packetCache, 0, packetSize, true, true); State = ParserState.PacketSize; break; } return true; } } public class CircularBuffer { public int ChunkSize { get; private set; } public int FirstIndex; public int LastIndex; Queue bufferCache = new Queue(); Queue bufferQueue = new Queue(); byte[] lastBuffer; public CircularBuffer(int chunkSize = 2048) { ChunkSize = chunkSize; AddLast(); } public int Length { get { int c = 0; if (bufferQueue.Count == 0) { c = 0; } else { c = (bufferQueue.Count - 1) * ChunkSize + LastIndex - FirstIndex; } if (c < 0) { throw new Exception($"CircularBuffer count < 0: BufferCount:{bufferQueue.Count} LastIndex:{LastIndex} FirstIndex{FirstIndex}"); } return c; } } void AddLast() { byte[] buffer; if (bufferCache.Count > 0) { buffer = bufferCache.Dequeue(); } else { buffer = new byte[ChunkSize]; } bufferQueue.Enqueue(buffer); lastBuffer = buffer; } public void RemoveFirst() { bufferCache.Enqueue(bufferQueue.Dequeue()); } public byte[] First { get { if (bufferQueue.Count == 0) { AddLast(); } return bufferQueue.Peek(); } } public byte[] Last { get { if (this.bufferQueue.Count == 0) { this.AddLast(); } return lastBuffer; } } public int Read(byte[] buffer, int offset, int count) { if (buffer.Length < offset + count) { throw new Exception($"buffer length , count, lenght{buffer.Length} offset{offset} count{count}"); } int length = Length; if (length < count) { count = length; } int alreadyCopyCount = 0; while (alreadyCopyCount < count) { // 需要拷贝总数量 int n = count - alreadyCopyCount; // 可以拷贝完 if (ChunkSize - FirstIndex > n) { Array.Copy(First, FirstIndex, buffer, alreadyCopyCount + offset, n); FirstIndex += n; alreadyCopyCount += n; } // 需要继续拷贝 else { Array.Copy(First, FirstIndex, buffer, alreadyCopyCount + offset, ChunkSize - FirstIndex); alreadyCopyCount += ChunkSize - FirstIndex; FirstIndex = 0; RemoveFirst(); } } return count; } public void Write(byte[] buffer, int offset, int count) { int alreadyCopyCount = 0; while (alreadyCopyCount < count) { if (LastIndex == ChunkSize) { AddLast(); LastIndex = 0; } // 需要拷贝总数量 int n = count - alreadyCopyCount; // 可以拷贝完 if (ChunkSize - LastIndex > n) { Array.Copy(buffer, alreadyCopyCount + offset, lastBuffer, LastIndex, n); LastIndex += n; alreadyCopyCount += n; } else { Array.Copy(buffer, alreadyCopyCount + offset, lastBuffer, LastIndex, ChunkSize - LastIndex); alreadyCopyCount += ChunkSize - LastIndex; LastIndex = ChunkSize; } } } } public class ErrorCode { // 110000 以上避免与SocketError冲突 public const int ERR_PeerDisconnect = 110001; public const int ERR_SocketError = 110002; public const int ERR_RecvTimeout = 110003; } public class SocketHandler { public const int RECV_BUFFER_SIZE = 2048; const int HEAD_SIZE = 2; static int idGen = 0; private int id; public int Id => id; public Action RecvCallback; public Action ErrCallback; public bool Disposed { get; internal set; } = false; Socket socket; bool isSending = false; byte[] buffer = new byte[RECV_BUFFER_SIZE]; byte[] headBuffer = new byte[HEAD_SIZE]; RecvPacketPraser parser = new RecvPacketPraser(); CircularBuffer sendBuffer = new CircularBuffer(); CircularBuffer recvBuffer = new CircularBuffer(); public SocketHandler(Socket socket) { id = ++idGen; this.socket = socket; socket.BeginReceive(buffer, 0, RECV_BUFFER_SIZE, SocketFlags.None, OnReceive, socket); } /// /// 线程不安全 /// /// public void Send(byte[] data) { int len = data.Length; headBuffer[0] = (byte)((len & 0xFF00) >> 8); headBuffer[1] = (byte)(len & 0xFF); sendBuffer.Write(headBuffer, 0, HEAD_SIZE); sendBuffer.Write(data, 0, len); } /// /// 线程不安全 /// /// public void Send(string msg) { Send(Encoding.UTF8.GetBytes(msg)); } public void Send(IList data) { int len = 0; for (int i = 0; i < data.Count; ++i) { len += data[i].Length; } headBuffer[0] = (byte)((len & 0xFF00) >> 8); headBuffer[1] = (byte)(len & 0xFF); sendBuffer.Write(headBuffer, 0, HEAD_SIZE); for (int i = 0; i < data.Count; ++i) { sendBuffer.Write(data[i], 0, data[i].Length); } } void OnSendComplete(IAsyncResult result) { try { int sendSize = socket.EndSend(result, out SocketError err); if (err != SocketError.Success) { OnError((int)err); return; } Debug.Log($"SocketHandler Sent {sendSize} {err}"); lock (sendBuffer) { sendBuffer.FirstIndex += sendSize; if (sendBuffer.FirstIndex == sendBuffer.ChunkSize) { sendBuffer.FirstIndex = 0; sendBuffer.RemoveFirst(); } isSending = false; } } catch (Exception e) { OnError(ErrorCode.ERR_SocketError); } } void OnReceive(IAsyncResult result) { try { int recvSize = socket.EndReceive(result, out SocketError err); if (err != SocketError.Success) { OnError((int)err); return; } if (recvSize > 0) { Debug.Log($"SocketHandler Recv {recvSize} {err}"); lock (recvBuffer) { recvBuffer.Write(buffer, 0, recvSize); } } else { OnError(ErrorCode.ERR_PeerDisconnect); return; } } catch (Exception e) { OnError(ErrorCode.ERR_SocketError); return; } if (socket.Connected) { socket.BeginReceive(buffer, 0, RECV_BUFFER_SIZE, SocketFlags.None, OnReceive, socket); } } void OnError(int err) { SocketManager.Disconnect(this); ErrCallback.Invoke(err); } void StartSend() { if (socket == null || !socket.Connected) { isSending = false; return; } if (isSending) { return; } if (sendBuffer.Length == 0) { isSending = false; return; } lock (sendBuffer) { isSending = true; int sendSize = sendBuffer.ChunkSize - sendBuffer.FirstIndex; if (sendSize > sendBuffer.Length) { sendSize = sendBuffer.Length; } socket.BeginSend(sendBuffer.First, sendBuffer.FirstIndex, sendSize, SocketFlags.None, OnSendComplete, socket); //socket.Send(sendBuffer.First, sendBuffer.FirstIndex, sendSize, SocketFlags.None); } } void StartRecv() { if (socket == null || !socket.Connected) { return; } if (recvBuffer.Length == 0) { return; } lock (recvBuffer) { while (parser.Parse(recvBuffer)) { if (parser.MemoryStream != null) { RecvCallback.Invoke(parser.MemoryStream); } } } } internal void Tick() { if (!socket.Connected) { return; } StartSend(); StartRecv(); } } public static class SocketManager { public const int MAX_PACKET_SIZE = 2048; static Dictionary mapSockets = new Dictionary(); static Dictionary socketHandlers = new Dictionary(); public static SocketHandler Connect(IPEndPoint endPoint) { Socket socket = new Socket(endPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); //socket.Connect(endPoint); //Debug.Log("Manager Connected Sync"); SocketHandler handler = new SocketHandler(socket); mapSockets.Add(handler.Id, socket); socketHandlers.Add(handler.Id, handler); socket.BeginConnect(endPoint, OnBeginConnect, handler); return handler; } public static void Disconnect(SocketHandler handler) { if (handler.Disposed) { return; } if (!mapSockets.TryGetValue(handler.Id, out Socket socket)) { return; } try { if (socket.Connected) { socket.Disconnect(false); } } catch(SocketException e) { handler.ErrCallback.Invoke(e.ErrorCode); } catch (Exception e) { handler.ErrCallback.Invoke(ErrorCode.ERR_SocketError); } handler.Disposed = true; if (mapSockets.ContainsKey(handler.Id)) { mapSockets.Remove(handler.Id); } if (socketHandlers.ContainsKey(handler.Id)) { socketHandlers.Remove(handler.Id); } Debug.Log("SocketManager Disconnected"); } public static void Tick() { foreach (var handler in socketHandlers.Values) { handler.Tick(); } } static void OnBeginConnect(IAsyncResult result) { //Socket socket = result.AsyncState as Socket; SocketHandler handler = result.AsyncState as SocketHandler; if (!mapSockets.TryGetValue(handler.Id, out Socket socket)) { Debug.LogError("SocketManager Unmanaged socket connected"); return; } try { //if (socket.Connected) //{ socket.EndConnect(result); Debug.Log("SocketManager Connected Async"); //} } catch (SocketException e) { Disconnect(handler); handler.ErrCallback.Invoke(e.ErrorCode); //handler.ErrCallback.Invoke(ErrorCode.ERR_SocketError); } catch (Exception e) { Disconnect(handler); handler.ErrCallback.Invoke(ErrorCode.ERR_SocketError); } } } public static class LuaSocketManager { static UidGenerator uidGenerator = new UidGenerator(); static SocketHandler handler = null; static Queue msgBuffer = new Queue(); static byte[] byteCache = new byte[1]; static byte[] longCache = new byte[8]; static List contentSegments = new List(); /// /// 保存最近一次发生错误的错误码 /// public static int ErrorCode { get; private set; } public static void Connect(string ipAddress, int port) { if (handler != null) { Debug.LogWarning("LuaSocket 存在未断开的会话,断开会话"); SocketManager.Disconnect(handler); } IPEndPoint targetEndPoint = new IPEndPoint(IPAddress.Parse(ipAddress), port); handler = SocketManager.Connect(targetEndPoint); handler.RecvCallback = OnReceive; handler.ErrCallback = OnError; } public static void Disconnect() { SocketManager.Disconnect(handler); handler = null; } /// /// 更新,发送缓冲的信息,解析接受到的信息 /// /// true为正常连接, false为已断开连接 public static bool Update() { if (handler == null || handler.Disposed) { Debug.Log("LuaSocket 会话已中断或未开始"); return false; } SocketManager.Tick(); return true; } /// /// 发送信息 /// /// 内容 /// 是否需要排序 /// false发送过程中发生错误,对方不会接收到信息。true为未检测到错误,但不能肯定对方成功接收 public static bool Send(string msg, bool sequence = true) { if (handler == null) { Debug.LogWarning("LuaSocket 会话不存在,确保发送信息前先调用Connect方法"); return false; } if (handler.Disposed) { Debug.LogWarning("LuaSocket 会话已弃用,请重新连接"); return false; } //if (sequence) //{ // long header = uidGenerator.Generate(0x8F); // contentSegments.Add(longCache); // contentSegments.Add(Encoding.UTF8.GetBytes(msg)); //} //else //{ // byteCache[0] = 0x00; // contentSegments.Add(byteCache); // contentSegments.Add(Encoding.UTF8.GetBytes(msg)); //} //handler.Send(contentSegments); handler.Send(msg); return true; } public static string RecvMessage() { if (msgBuffer.Count == 0) { return null; } return msgBuffer.Dequeue(); } public static long GenerateUid(byte flag = 0x80) { return uidGenerator.Generate(flag); } static void OnReceive(MemoryStream stream) { msgBuffer.Enqueue(Encoding.UTF8.GetString(stream.GetBuffer(), (int)stream.Position, (int)stream.Length)); } static void OnError(int err) { Debug.LogError($"LuaSocket 发生错误 {err}"); ErrorCode = err; handler = null; } }