SocketManager.cs 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707
  1. using System;
  2. using System.Collections.Generic;
  3. using System.IO;
  4. using System.Net;
  5. using System.Net.Sockets;
  6. using System.Text;
  7. using UnityEngine;
  8. public class UidGenerator
  9. {
  10. const int VALUE_START = 10000;
  11. long m_Epoch2020 = new DateTime(2000, 1, 1, 0, 0, 0, DateTimeKind.Utc).Ticks / 10000;
  12. long m_Time = 0;
  13. uint m_Value = VALUE_START;
  14. public long TimeSince2020(long time)
  15. {
  16. return (time - m_Epoch2020) / 1000;
  17. }
  18. public long Generate(byte flag)
  19. {
  20. //Debug.Log((DateTime.UtcNow.Ticks / 10000).ToString());
  21. long time = TimeSince2020(DateTime.UtcNow.Ticks / 10000);
  22. if (time > m_Time)
  23. {
  24. m_Time = time;
  25. m_Value = VALUE_START;
  26. }
  27. else
  28. {
  29. m_Value++;
  30. if (m_Value > ushort.MaxValue)
  31. {
  32. m_Time++;
  33. m_Value = VALUE_START;
  34. }
  35. }
  36. ulong result = 0;
  37. result |= ((ulong)flag & byte.MaxValue) << 56;
  38. result |= ((ulong)m_Time & 0xFFFFFFFFFF) << 16;
  39. result |= ((ulong)m_Value & ushort.MaxValue);
  40. return (long)result;
  41. }
  42. }
  43. public class RecvPacketPraser
  44. {
  45. const int HEAD_SIZE = 2;
  46. const int MAX_PACKET_SIZE = 2048;
  47. public enum ParserState { PacketSize, PacketBody }
  48. public ParserState State { get; private set; } = ParserState.PacketSize;
  49. public MemoryStream MemoryStream { get; private set; }
  50. int packetSize;
  51. byte[] headCache = new byte[HEAD_SIZE];
  52. byte[] packetCache = new byte[MAX_PACKET_SIZE];
  53. public bool Parse(CircularBuffer buffer)
  54. {
  55. switch (State)
  56. {
  57. case ParserState.PacketSize:
  58. if (buffer.Length < HEAD_SIZE)
  59. {
  60. return false;
  61. }
  62. MemoryStream = null;
  63. buffer.Read(headCache, 0, HEAD_SIZE);
  64. packetSize = headCache[0] << 8 | headCache[1];
  65. if (packetSize > MAX_PACKET_SIZE)
  66. {
  67. throw new Exception($"Size error {packetSize}");
  68. }
  69. else if (packetSize == 0)
  70. {
  71. return true;
  72. }
  73. State = ParserState.PacketBody;
  74. break;
  75. case ParserState.PacketBody:
  76. if (buffer.Length < packetSize)
  77. {
  78. return false;
  79. }
  80. buffer.Read(packetCache, 0, packetSize);
  81. MemoryStream = new MemoryStream(packetCache, 0, packetSize, true, true);
  82. State = ParserState.PacketSize;
  83. break;
  84. }
  85. return true;
  86. }
  87. }
  88. public class CircularBuffer
  89. {
  90. public int ChunkSize { get; private set; }
  91. public int FirstIndex;
  92. public int LastIndex;
  93. Queue<byte[]> bufferCache = new Queue<byte[]>();
  94. Queue<byte[]> bufferQueue = new Queue<byte[]>();
  95. byte[] lastBuffer;
  96. public CircularBuffer(int chunkSize = 2048)
  97. {
  98. ChunkSize = chunkSize;
  99. AddLast();
  100. }
  101. public int Length
  102. {
  103. get
  104. {
  105. int c = 0;
  106. if (bufferQueue.Count == 0)
  107. {
  108. c = 0;
  109. }
  110. else
  111. {
  112. c = (bufferQueue.Count - 1) * ChunkSize + LastIndex - FirstIndex;
  113. }
  114. if (c < 0)
  115. {
  116. throw new Exception($"CircularBuffer count < 0: BufferCount:{bufferQueue.Count} LastIndex:{LastIndex} FirstIndex{FirstIndex}");
  117. }
  118. return c;
  119. }
  120. }
  121. void AddLast()
  122. {
  123. byte[] buffer;
  124. if (bufferCache.Count > 0)
  125. {
  126. buffer = bufferCache.Dequeue();
  127. }
  128. else
  129. {
  130. buffer = new byte[ChunkSize];
  131. }
  132. bufferQueue.Enqueue(buffer);
  133. lastBuffer = buffer;
  134. }
  135. public void RemoveFirst()
  136. {
  137. bufferCache.Enqueue(bufferQueue.Dequeue());
  138. }
  139. public byte[] First
  140. {
  141. get
  142. {
  143. if (bufferQueue.Count == 0)
  144. {
  145. AddLast();
  146. }
  147. return bufferQueue.Peek();
  148. }
  149. }
  150. public byte[] Last
  151. {
  152. get
  153. {
  154. if (this.bufferQueue.Count == 0)
  155. {
  156. this.AddLast();
  157. }
  158. return lastBuffer;
  159. }
  160. }
  161. public int Read(byte[] buffer, int offset, int count)
  162. {
  163. if (buffer.Length < offset + count)
  164. {
  165. throw new Exception($"buffer length , count, lenght{buffer.Length} offset{offset} count{count}");
  166. }
  167. int length = Length;
  168. if (length < count)
  169. {
  170. count = length;
  171. }
  172. int alreadyCopyCount = 0;
  173. while (alreadyCopyCount < count)
  174. {
  175. // 需要拷贝总数量
  176. int n = count - alreadyCopyCount;
  177. // 可以拷贝完
  178. if (ChunkSize - FirstIndex > n)
  179. {
  180. Array.Copy(First, FirstIndex, buffer, alreadyCopyCount + offset, n);
  181. FirstIndex += n;
  182. alreadyCopyCount += n;
  183. }
  184. // 需要继续拷贝
  185. else
  186. {
  187. Array.Copy(First, FirstIndex, buffer, alreadyCopyCount + offset, ChunkSize - FirstIndex);
  188. alreadyCopyCount += ChunkSize - FirstIndex;
  189. FirstIndex = 0;
  190. RemoveFirst();
  191. }
  192. }
  193. return count;
  194. }
  195. public void Write(byte[] buffer, int offset, int count)
  196. {
  197. int alreadyCopyCount = 0;
  198. while (alreadyCopyCount < count)
  199. {
  200. if (LastIndex == ChunkSize)
  201. {
  202. AddLast();
  203. LastIndex = 0;
  204. }
  205. // 需要拷贝总数量
  206. int n = count - alreadyCopyCount;
  207. // 可以拷贝完
  208. if (ChunkSize - LastIndex > n)
  209. {
  210. Array.Copy(buffer, alreadyCopyCount + offset, lastBuffer, LastIndex, n);
  211. LastIndex += n;
  212. alreadyCopyCount += n;
  213. }
  214. else
  215. {
  216. Array.Copy(buffer, alreadyCopyCount + offset, lastBuffer, LastIndex, ChunkSize - LastIndex);
  217. alreadyCopyCount += ChunkSize - LastIndex;
  218. LastIndex = ChunkSize;
  219. }
  220. }
  221. }
  222. }
  223. public class ErrorCode
  224. {
  225. // 110000 以上避免与SocketError冲突
  226. public const int ERR_PeerDisconnect = 110001;
  227. public const int ERR_SocketError = 110002;
  228. public const int ERR_RecvTimeout = 110003;
  229. }
  230. public class SocketHandler
  231. {
  232. public const int RECV_BUFFER_SIZE = 2048;
  233. const int HEAD_SIZE = 2;
  234. static int idGen = 0;
  235. private int id;
  236. public int Id => id;
  237. public Action<MemoryStream> RecvCallback;
  238. public Action<int> ErrCallback;
  239. public bool Disposed { get; internal set; } = false;
  240. Socket socket;
  241. bool isSending = false;
  242. byte[] buffer = new byte[RECV_BUFFER_SIZE];
  243. byte[] headBuffer = new byte[HEAD_SIZE];
  244. RecvPacketPraser parser = new RecvPacketPraser();
  245. CircularBuffer sendBuffer = new CircularBuffer();
  246. CircularBuffer recvBuffer = new CircularBuffer();
  247. public SocketHandler(Socket socket)
  248. {
  249. id = ++idGen;
  250. this.socket = socket;
  251. socket.BeginReceive(buffer, 0, RECV_BUFFER_SIZE, SocketFlags.None, OnReceive, socket);
  252. }
  253. /// <summary>
  254. /// 线程不安全
  255. /// </summary>
  256. /// <param name="data"></param>
  257. public void Send(byte[] data)
  258. {
  259. int len = data.Length;
  260. headBuffer[0] = (byte)((len & 0xFF00) >> 8);
  261. headBuffer[1] = (byte)(len & 0xFF);
  262. sendBuffer.Write(headBuffer, 0, HEAD_SIZE);
  263. sendBuffer.Write(data, 0, len);
  264. }
  265. /// <summary>
  266. /// 线程不安全
  267. /// </summary>
  268. /// <param name="data"></param>
  269. public void Send(string msg)
  270. {
  271. Send(Encoding.UTF8.GetBytes(msg));
  272. }
  273. public void Send(IList<byte[]> data)
  274. {
  275. int len = 0;
  276. for (int i = 0; i < data.Count; ++i)
  277. {
  278. len += data[i].Length;
  279. }
  280. headBuffer[0] = (byte)((len & 0xFF00) >> 8);
  281. headBuffer[1] = (byte)(len & 0xFF);
  282. sendBuffer.Write(headBuffer, 0, HEAD_SIZE);
  283. for (int i = 0; i < data.Count; ++i)
  284. {
  285. sendBuffer.Write(data[i], 0, data[i].Length);
  286. }
  287. }
  288. void OnSendComplete(IAsyncResult result)
  289. {
  290. try
  291. {
  292. int sendSize = socket.EndSend(result, out SocketError err);
  293. if (err != SocketError.Success)
  294. {
  295. OnError((int)err);
  296. return;
  297. }
  298. Debug.Log($"SocketHandler Sent {sendSize} {err}");
  299. lock (sendBuffer)
  300. {
  301. sendBuffer.FirstIndex += sendSize;
  302. if (sendBuffer.FirstIndex == sendBuffer.ChunkSize)
  303. {
  304. sendBuffer.FirstIndex = 0;
  305. sendBuffer.RemoveFirst();
  306. }
  307. isSending = false;
  308. }
  309. }
  310. catch (Exception e)
  311. {
  312. OnError(ErrorCode.ERR_SocketError);
  313. }
  314. }
  315. void OnReceive(IAsyncResult result)
  316. {
  317. try
  318. {
  319. int recvSize = socket.EndReceive(result, out SocketError err);
  320. if (err != SocketError.Success)
  321. {
  322. OnError((int)err);
  323. return;
  324. }
  325. if (recvSize > 0)
  326. {
  327. Debug.Log($"SocketHandler Recv {recvSize} {err}");
  328. lock (recvBuffer)
  329. {
  330. recvBuffer.Write(buffer, 0, recvSize);
  331. }
  332. }
  333. else
  334. {
  335. OnError(ErrorCode.ERR_PeerDisconnect);
  336. return;
  337. }
  338. }
  339. catch (Exception e)
  340. {
  341. OnError(ErrorCode.ERR_SocketError);
  342. return;
  343. }
  344. if (socket.Connected)
  345. {
  346. socket.BeginReceive(buffer, 0, RECV_BUFFER_SIZE, SocketFlags.None, OnReceive, socket);
  347. }
  348. }
  349. void OnError(int err)
  350. {
  351. SocketManager.Disconnect(this);
  352. ErrCallback.Invoke(err);
  353. }
  354. void StartSend()
  355. {
  356. if (socket == null || !socket.Connected)
  357. {
  358. isSending = false;
  359. return;
  360. }
  361. if (isSending)
  362. {
  363. return;
  364. }
  365. if (sendBuffer.Length == 0)
  366. {
  367. isSending = false;
  368. return;
  369. }
  370. lock (sendBuffer)
  371. {
  372. isSending = true;
  373. int sendSize = sendBuffer.ChunkSize - sendBuffer.FirstIndex;
  374. if (sendSize > sendBuffer.Length)
  375. {
  376. sendSize = sendBuffer.Length;
  377. }
  378. socket.BeginSend(sendBuffer.First, sendBuffer.FirstIndex, sendSize, SocketFlags.None, OnSendComplete, socket);
  379. //socket.Send(sendBuffer.First, sendBuffer.FirstIndex, sendSize, SocketFlags.None);
  380. }
  381. }
  382. void StartRecv()
  383. {
  384. if (socket == null || !socket.Connected)
  385. {
  386. return;
  387. }
  388. if (recvBuffer.Length == 0)
  389. {
  390. return;
  391. }
  392. lock (recvBuffer)
  393. {
  394. while (parser.Parse(recvBuffer))
  395. {
  396. if (parser.MemoryStream != null)
  397. {
  398. RecvCallback.Invoke(parser.MemoryStream);
  399. }
  400. }
  401. }
  402. }
  403. internal void Tick()
  404. {
  405. if (!socket.Connected)
  406. {
  407. return;
  408. }
  409. StartSend();
  410. StartRecv();
  411. }
  412. }
  413. public static class SocketManager
  414. {
  415. public const int MAX_PACKET_SIZE = 2048;
  416. static Dictionary<int, Socket> mapSockets = new Dictionary<int, Socket>();
  417. static Dictionary<int, SocketHandler> socketHandlers = new Dictionary<int, SocketHandler>();
  418. public static SocketHandler Connect(IPEndPoint endPoint)
  419. {
  420. Socket socket = new Socket(endPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
  421. //socket.Connect(endPoint);
  422. //Debug.Log("Manager Connected Sync");
  423. SocketHandler handler = new SocketHandler(socket);
  424. mapSockets.Add(handler.Id, socket);
  425. socketHandlers.Add(handler.Id, handler);
  426. socket.BeginConnect(endPoint, OnBeginConnect, handler);
  427. return handler;
  428. }
  429. public static void Disconnect(SocketHandler handler)
  430. {
  431. if (handler.Disposed)
  432. {
  433. return;
  434. }
  435. if (!mapSockets.TryGetValue(handler.Id, out Socket socket))
  436. {
  437. return;
  438. }
  439. try
  440. {
  441. if (socket.Connected)
  442. {
  443. socket.Disconnect(false);
  444. }
  445. }
  446. catch(SocketException e)
  447. {
  448. handler.ErrCallback.Invoke(e.ErrorCode);
  449. }
  450. catch (Exception e)
  451. {
  452. handler.ErrCallback.Invoke(ErrorCode.ERR_SocketError);
  453. }
  454. handler.Disposed = true;
  455. if (mapSockets.ContainsKey(handler.Id))
  456. {
  457. mapSockets.Remove(handler.Id);
  458. }
  459. if (socketHandlers.ContainsKey(handler.Id))
  460. {
  461. socketHandlers.Remove(handler.Id);
  462. }
  463. Debug.Log("SocketManager Disconnected");
  464. }
  465. public static void Tick()
  466. {
  467. foreach (var handler in socketHandlers.Values)
  468. {
  469. handler.Tick();
  470. }
  471. }
  472. static void OnBeginConnect(IAsyncResult result)
  473. {
  474. //Socket socket = result.AsyncState as Socket;
  475. SocketHandler handler = result.AsyncState as SocketHandler;
  476. if (!mapSockets.TryGetValue(handler.Id, out Socket socket))
  477. {
  478. Debug.LogError("SocketManager Unmanaged socket connected");
  479. return;
  480. }
  481. try
  482. {
  483. //if (socket.Connected)
  484. //{
  485. socket.EndConnect(result);
  486. Debug.Log("SocketManager Connected Async");
  487. //}
  488. }
  489. catch (SocketException e)
  490. {
  491. Disconnect(handler);
  492. handler.ErrCallback.Invoke(e.ErrorCode);
  493. //handler.ErrCallback.Invoke(ErrorCode.ERR_SocketError);
  494. }
  495. catch (Exception e)
  496. {
  497. Disconnect(handler);
  498. handler.ErrCallback.Invoke(ErrorCode.ERR_SocketError);
  499. }
  500. }
  501. }
  502. public static class LuaSocketManager
  503. {
  504. static UidGenerator uidGenerator = new UidGenerator();
  505. static SocketHandler handler = null;
  506. static Queue<string> msgBuffer = new Queue<string>();
  507. static byte[] byteCache = new byte[1];
  508. static byte[] longCache = new byte[8];
  509. static List<byte[]> contentSegments = new List<byte[]>();
  510. /// <summary>
  511. /// 保存最近一次发生错误的错误码
  512. /// </summary>
  513. public static int ErrorCode { get; private set; }
  514. public static void Connect(string ipAddress, int port)
  515. {
  516. if (handler != null)
  517. {
  518. Debug.LogWarning("LuaSocket 存在未断开的会话,断开会话");
  519. SocketManager.Disconnect(handler);
  520. }
  521. IPEndPoint targetEndPoint = new IPEndPoint(IPAddress.Parse(ipAddress), port);
  522. handler = SocketManager.Connect(targetEndPoint);
  523. handler.RecvCallback = OnReceive;
  524. handler.ErrCallback = OnError;
  525. }
  526. public static void Disconnect()
  527. {
  528. SocketManager.Disconnect(handler);
  529. handler = null;
  530. }
  531. /// <summary>
  532. /// 更新,发送缓冲的信息,解析接受到的信息
  533. /// </summary>
  534. /// <returns>true为正常连接, false为已断开连接</returns>
  535. public static bool Update()
  536. {
  537. if (handler == null || handler.Disposed)
  538. {
  539. Debug.Log("LuaSocket 会话已中断或未开始");
  540. return false;
  541. }
  542. SocketManager.Tick();
  543. return true;
  544. }
  545. /// <summary>
  546. /// 发送信息
  547. /// </summary>
  548. /// <param name="msg">内容</param>
  549. /// <param name="sequence">是否需要排序</param>
  550. /// <returns>false发送过程中发生错误,对方不会接收到信息。true为未检测到错误,但不能肯定对方成功接收</returns>
  551. public static bool Send(string msg, bool sequence = true)
  552. {
  553. if (handler == null)
  554. {
  555. Debug.LogWarning("LuaSocket 会话不存在,确保发送信息前先调用Connect方法");
  556. return false;
  557. }
  558. if (handler.Disposed)
  559. {
  560. Debug.LogWarning("LuaSocket 会话已弃用,请重新连接");
  561. return false;
  562. }
  563. //if (sequence)
  564. //{
  565. // long header = uidGenerator.Generate(0x8F);
  566. // contentSegments.Add(longCache);
  567. // contentSegments.Add(Encoding.UTF8.GetBytes(msg));
  568. //}
  569. //else
  570. //{
  571. // byteCache[0] = 0x00;
  572. // contentSegments.Add(byteCache);
  573. // contentSegments.Add(Encoding.UTF8.GetBytes(msg));
  574. //}
  575. //handler.Send(contentSegments);
  576. handler.Send(msg);
  577. return true;
  578. }
  579. public static string RecvMessage()
  580. {
  581. if (msgBuffer.Count == 0)
  582. {
  583. return null;
  584. }
  585. return msgBuffer.Dequeue();
  586. }
  587. public static long GenerateUid(byte flag = 0x80)
  588. {
  589. return uidGenerator.Generate(flag);
  590. }
  591. static void OnReceive(MemoryStream stream)
  592. {
  593. msgBuffer.Enqueue(Encoding.UTF8.GetString(stream.GetBuffer(), (int)stream.Position, (int)stream.Length));
  594. }
  595. static void OnError(int err)
  596. {
  597. Debug.LogError($"LuaSocket 发生错误 {err}");
  598. ErrorCode = err;
  599. handler = null;
  600. }
  601. }