The Code-Bin
Links
Home
Add your code!
All Listings
About
Latest Entry
Featured Scripts
Author's Website
Latest Entries
FFMPEG Thumbnail Scr...
PHP, 0.8KB
Jul. 29, 10:24pm
John
Z80 Assembler, 190 bytes
Feb. 17, 3:36am
John
Z80 Assembler, 176 bytes
Sep. 13, 2:19am
John
Z80 Assembler, 77 bytes
Sep. 13, 2:18am
John
Z80 Assembler, 209 bytes
Sep. 13, 2:17am
TCP Client with compression
Posted by: Ryno | September 2, 2009 @ 9:20am
C# Code
[
Download
]
using System; using System.Collections.Generic; using System.Text; using System.Net.Sockets; using System.Diagnostics; using System.Threading; using System.Net; using MessageLib; using NetworkLayer.Hashing; using Ionic.Zlib; using NetworkLayer; namespace NetworkLayer.Client { public delegate void StringDelegate(object sender, string data); public delegate void MessageDelegate(object sender, Msg message); public class ClientModule { #region Events public event MessageDelegate OnAckTimedOut; public event MessageDelegate OnDataReceived; public event StringDelegate OnErrorOccured; public event StringDelegate OnConnect; public event StringDelegate OnDisconnect; public event StringDelegate OnReconnect; #endregion #region Variables // Back buffer. private byte[] backBuffer = new byte[StateObject.BufferSize * 10]; private int backBufferCount = 0; private int bbmsgSize = 0; public List<string> ackNeed = new List<string>(); public List<Msg> backLog = new List<Msg>(); public static int AckTimeoutCount = 200; public int ClientID = -1; public string ClientName = String.Empty; public string ClientType = String.Empty; public string ClientIP = String.Empty; private int msgID = 0; private bool _isCancel = false; public bool IsCancel { get { return _isCancel; } set { _isCancel = value; } } private bool _isReady = false; public bool isReady { get { return isReady; } } // Thread signal ManualResetEvent receiveDone = new ManualResetEvent(false); public ManualResetEvent clientDone = new ManualResetEvent(false); private Socket Client = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp); //Logging Logging.Logger logger = new Logging.Logger("clientModule.log", 5); #endregion #region Constructors public ClientModule() { try { string strHostName = Dns.GetHostName(); IPHostEntry iphostentry = Dns.GetHostEntry(strHostName); IPAddress ipaddress = iphostentry.AddressList[0]; this.ClientIP = ipaddress.ToString(); Client.SendBufferSize = 512; Client.ReceiveBufferSize = 512; } catch (Exception ex) { logger.Error("ClientModule(), "); logger.Error("Error:\r\n" + ex.Message + "\r\nInnerException:\r\n" + ex.InnerException.Message); } } #endregion #region Connect Code public void Connect(EndPoint remoteEP) { try { clientDone.Reset(); Client.BeginConnect(remoteEP, new AsyncCallback(ConnectCallback), Client); clientDone.WaitOne(); } catch (Exception ex) { logger.Error("Connect(EndPoint), " + remoteEP.ToString()); logger.Error("Error:\r\n" + ex.Message + "\r\nInnerException:\r\n" + ex.InnerException.Message); } } private void ConnectCallback(IAsyncResult ar) { try { // Retrieve the socket from the state object. Socket client = (Socket)ar.AsyncState; // Complete the connection. client.EndConnect(ar); if (OnConnect != null) OnConnect(this, String.Format("Socket connected to {0}", client.RemoteEndPoint.ToString()); StateObject state = new StateObject(); state.workSocket = client; client.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReceiveCallback), state); clientDone.WaitOne(); } catch (Exception ex) { logger.Error("ConnectCallback, "); logger.Error("Error:\r\n" + ex.Message + "\r\nInnerException:\r\n" + ex.InnerException.Message); } } #endregion #region Receive Code private void ReceiveCallback(IAsyncResult ar) { try { String content = String.Empty; // Retrieve the state object and the handler socket // from the asynchronous state object. StateObject state = (StateObject)ar.AsyncState; Socket handler = state.workSocket; // Read data from the client socket. int bytesRead = handler.EndReceive(ar); //bool gettingMore = false; if (bytesRead > 0) { #region compressed Byte stream Management string xmlMessage = String.Empty; try { bbmsgSize = Int32.Parse(Encoding.ASCII.GetString(state.buffer, 0, 6)); } catch { if (backBufferCount > 6) bbmsgSize = Int32.Parse(Encoding.ASCII.GetString(backBuffer, 0, 6)); else { byte[] dummy = new byte[bytesRead + backBufferCount]; Buffer.BlockCopy(backBuffer, 0, dummy, 0, backBufferCount); Buffer.BlockCopy(state.buffer, 0, dummy, backBufferCount, bytesRead); bbmsgSize = Int32.Parse(Encoding.ASCII.GetString(dummy, 0, 6)); dummy = null; } } if (bbmsgSize > (bytesRead + backBufferCount)) { // Append data to the backBuffer Buffer.BlockCopy(state.buffer, 0, backBuffer, backBufferCount, bytesRead); backBufferCount += bytesRead; } else if (bbmsgSize == (bytesRead + backBufferCount)) { // Append data to the backBuffer Buffer.BlockCopy(state.buffer, 0, backBuffer, backBufferCount, bytesRead); backBufferCount += bytesRead; // deflate the buffer data xmlMessage = deflateBytes(backBuffer); // Clean the backBuffer backBuffer = new byte[StateObject.BufferSize * 10]; backBufferCount = 0; bbmsgSize = 0; } else if (bbmsgSize < (bytesRead + backBufferCount)) { // Append data to the backBuffer Buffer.BlockCopy(state.buffer, 0, backBuffer, backBufferCount, bytesRead); backBufferCount += bytesRead; // do this untill all messages in backBuffer handled while (true) { int sMsgSize = 0; // make sure sure backBuffer is bigger than 6 bytes if (backBufferCount > 6) sMsgSize = Int32.Parse(Encoding.ASCII.GetString(backBuffer, 0, 6)); else break; // make sure backBuffer is bigger than the msg that it wants to retrieve if (backBufferCount < sMsgSize) { bbmsgSize = sMsgSize; break; } // Copy out the msg from the backBuffer byte[] wBuffer = new byte[sMsgSize]; Buffer.BlockCopy(backBuffer, 0, wBuffer, 0, sMsgSize); // deflate the buffer data string xml = deflateBytes(wBuffer); //sb.Append(xml); //sb.Append("<delem>"); xmlMessage += xml.Trim() + "<delem>"; xml = null; // remove the message from the backBuffer // make sure its not the end of the buffer backBufferCount = backBufferCount - sMsgSize; if (backBufferCount == 0) { bbmsgSize = 0; backBuffer = new byte[StateObject.BufferSize * 10]; break; } // Copy the remaining backBuffer to the rBuffer (remaindingBuffer) // Then clear the backBuffer and put the remainding buffer back into // the backBuffer for future use. byte[] rBuffer = new byte[backBufferCount]; Buffer.BlockCopy(backBuffer, sMsgSize, rBuffer, 0, rBuffer.Length); backBuffer = new byte[StateObject.BufferSize * 10]; Buffer.BlockCopy(rBuffer, 0, backBuffer, 0, backBufferCount); } } // clear the reading buffer so that you can use it again state.buffer = new byte[StateObject.BufferSize]; #endregion if (xmlMessage != String.Empty) { string[] msgParts = xmlMessage.Split(new string[] { "<delem>" }, StringSplitOptions.RemoveEmptyEntries); for (int i = 0; i < msgParts.Length; i++) { Msg reader = new Msg(); Msg msg = reader.DeSerialize(msgParts[i]); MessageHandler(msg); } } content = string.Empty; receiveDone.Set(); } if (handler != null && handler.Connected == true) { handler.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0, new AsyncCallback(ReceiveCallback), state); receiveDone.WaitOne(); } } catch (Exception ex) { logger.Error("ReceiveCallback, "); logger.Error("Error:\r\n" + ex.Message + "\r\nInnerException:\r\n" + ex.InnerException.Message); } } private string deflateBytes(byte[] wBuffer) { string xmlMessage = String.Empty; try { int msgSize = Int32.Parse(Encoding.ASCII.GetString(wBuffer, 0, 6)); int crcSize = Int32.Parse(Encoding.ASCII.GetString(wBuffer, 6, 2)); byte[] crcByte = new byte[crcSize]; Buffer.BlockCopy(wBuffer, 8, crcByte, 0, crcSize); byte[] workBuffer = new byte[msgSize - (8 + crcSize)]; Buffer.BlockCopy(wBuffer, 8 + crcSize, workBuffer, 0, msgSize - (8 + crcSize)); Crc32 crc = new Crc32(); byte[] checkSum = crc.ComputeHash(workBuffer); ZlibCodec compressor = new ZlibCodec(CompressionMode.Decompress); //int rc = compressor.InitializeDeflate(CompressionLevel.Level9); byte[] deflatedBuffer = new byte[51200]; compressor.InputBuffer = workBuffer; compressor.NextIn = 0; compressor.OutputBuffer = deflatedBuffer; compressor.NextOut = 0; compressor.AvailableBytesOut = deflatedBuffer.Length; compressor.AvailableBytesIn = workBuffer.Length; int rc1 = compressor.Inflate(FlushType.None); int rc2 = compressor.Inflate(FlushType.Finish); int rc3 = compressor.EndInflate(); xmlMessage = Encoding.ASCII.GetString(deflatedBuffer); xmlMessage = (xmlMessage.Split(new string[] { "</Msg>" }, StringSplitOptions.RemoveEmptyEntries)[0] + "</Msg>"); } catch (Exception ex) { logger.Error("deflateBytes, wBuffer:" + Encoding.ASCII.GetString(wBuffer)); logger.Error("Error:\r\n" + ex.Message + "\r\nInnerException:\r\n" + ex.InnerException.Message); } return xmlMessage; } public void MessageHandler(Msg msg) { try { if (msg.Header != null) { switch (msg.Header.msgType) { case "handShake": handShake handshake = (handShake)msg.Body.Messages[0]; handshake.ClientName = this.ClientName; handshake.ClientType = this.ClientType; handshake.ClientIP = this.ClientIP; this.ClientID = handshake.ClientID; Header head = new Header(); head.fromSystem = this.ClientIP; head.msgType = "handShake"; head.toSystem = msg.Header.fromSystem; head.msgID = (msgID++).ToString(); Body body = new Body(); body.Add(handshake); Msg sendMessage = new Msg(head, body); _isReady = true; Send(sendMessage); break; default: OnDataReceived(this, msg); break; } // Signal that the connection has been made. clientDone.Set(); } else if (msg.ACK != null) { switch (msg.ACK.msgType) { case "handShake": OnDataReceived(this, msg); break; default: OnDataReceived(this, msg); break; } int index = 0; foreach (string id in ackNeed) { if (id == msg.ACK.msgID) { backLog.RemoveAt(index); ackNeed.Remove(id); break; } index++; } clientDone.Set(); } } catch (Exception ex) { logger.Error("MessageHandler, "); logger.Error("Error:\r\n" + ex.Message + "\r\nInnerException:\r\n" + ex.InnerException.Message); } } #endregion #region Send Code public void Send(Msg message) { try { if (_isReady) { clientDone.Reset(); //Thread.Sleep(100); if (message.Header.msgID == null) message.Header.msgID = (this.msgID++).ToString(); lock (ackNeed) { ackNeed.Add(message.Header.msgID); backLog.Add(message); } string xmlMessage = message.Serialize();// + Convert.ToChar(3); #region Converting to byte[] compressing, hashing data and setting up Header // Convert the string data to byte data using ASCII encoding. // And compress the Bytes byte[] byteData = Encoding.ASCII.GetBytes(xmlMessage); byte[] decompByteData = new byte[xmlMessage.Length]; byte[] compressedByteData = new byte[xmlMessage.Length]; ZlibCodec compressor = new ZlibCodec(); int rc = compressor.InitializeDeflate(CompressionLevel.Level9); compressor.InputBuffer = Encoding.ASCII.GetBytes(xmlMessage); compressor.NextIn = 0; compressor.OutputBuffer = compressedByteData; compressor.NextOut = 0; compressor.AvailableBytesOut = xmlMessage.Length; compressor.AvailableBytesIn = xmlMessage.Length; int rc1 = compressor.Deflate(FlushType.None); int rc2 = compressor.Deflate(FlushType.Finish); int rc3 = compressor.EndDeflate(); byte[] finalByteData = new byte[((int)compressor.TotalBytesOut)]; Buffer.BlockCopy(compressedByteData, 0, finalByteData, 0, (int)compressor.TotalBytesOut); Crc32 crc = new Crc32(); byte[] computedHash = crc.ComputeHash(finalByteData); byte[] header = new byte[8 + computedHash.Length]; int headLen = header.Length; int bodyLen = (int)compressor.TotalBytesOut; int totalLen = headLen + bodyLen; byte[] lencomputedHash = new byte[2]; lencomputedHash = Encoding.ASCII.GetBytes(computedHash.Length.ToString()); byte[] lenBytes = new byte[6]; lenBytes = Encoding.ASCII.GetBytes(totalLen.ToString()); Buffer.BlockCopy(lenBytes, 0, header, 0, lenBytes.Length); Buffer.BlockCopy(lencomputedHash, 0, header, 6, lencomputedHash.Length); Buffer.BlockCopy(computedHash, 0, header, 8, computedHash.Length); byte[] hashedFinal = new byte[((int)compressor.TotalBytesOut) + header.Length]; Buffer.BlockCopy(header, 0, hashedFinal, 0, header.Length); Buffer.BlockCopy(finalByteData, 0, hashedFinal, header.Length, finalByteData.Length); // Testing multi messages //byte[] dummy3 = new byte[hashedFinal.Length * 3]; //for (int i = 0; i < 3; i++) //{ // Buffer.BlockCopy(hashedFinal, 0, dummy3, i * hashedFinal.Length, hashedFinal.Length); //} #endregion // Begin sending the data to the remote device. Client.BeginSend(hashedFinal, 0, hashedFinal.Length, 0, new AsyncCallback(SendCallback), Client); clientDone.WaitOne(); AckTimeOutObject ackTO = new AckTimeOutObject(); ackTO.timeout = AckTimeoutCount; ackTO.msgID = message.Header.msgID; Thread ackThread = new Thread(AckTimeout); ackThread.IsBackground = true; ackThread.Start(ackTO); } } catch (Exception ex) { logger.Error("Send, Message:\r\n" + message.Serialize()); logger.Error("Error:\r\n" + ex.Message + "\r\nInnerException:\r\n" + ex.InnerException.Message); } } public void SendACK(string msgType, string msgAction, string msgID, string toSystem, string msgProcessedBy) { try { ACK ack = new ACK(); ack.fromSystem = this.ClientIP; ack.toSystem = toSystem; ack.msgType = msgType; ack.msgID = msgID; ack.msgAction = msgAction; ack.msgProcessedBy = msgProcessedBy; Msg message = new Msg(ack); Send(message); } catch (Exception ex) { logger.Error("SendACK, "); logger.Error("Error:\r\n" + ex.Message + "\r\nInnerException:\r\n" + ex.InnerException.Message); } } public void AckTimeout(object acktimeout) { AckTimeOutObject ackTO = (AckTimeOutObject)acktimeout; Thread.Sleep(ackTO.timeout); bool found = false; int index = 0; try { lock (ackNeed) { foreach (string id in ackNeed) { if (id == ackTO.msgID) { found = true; ackNeed.Remove(id); break; } index++; } if (found) { if (OnAckTimedOut != null) { OnAckTimedOut(this, backLog[index]); backLog.RemoveAt(index); } } } } catch (Exception ex) { string exMsg = ex.Message; if (exMsg.Contains("Collection was modified;")) { ackTO.timeout = 10; Thread ackThread = new Thread(AckTimeout); ackThread.IsBackground = true; ackThread.Start(ackTO); } } } public void SendCallback(IAsyncResult ar) { try { // Retrieve the socket from the state object. Socket handler = (Socket)ar.AsyncState; // Complete sending the data to the remote device. int bytesSent = handler.EndSend(ar); //Console.WriteLine("Sent {0} bytes to server.", bytesSent); clientDone.Set(); } catch (Exception ex) { logger.Error("SendCallback, "); logger.Error("Error:\r\n" + ex.Message + "\r\nInnerException:\r\n" + ex.InnerException.Message); } } #endregion #region Keep Alive private void startKeepAlive() { Client.Poll(100, SelectMode.SelectError); } #endregion public void Shutdown() { try { Client.Shutdown(SocketShutdown.Both); Client.Close(); } catch (Exception ex) { logger.Error("Shutdown(), "); logger.Error("Error:\r\n" + ex.Message + "\r\nInnerException:\r\n" + ex.InnerException.Message); } } } }
Syntax Highlighting
[
Open in new window
]
Author Comments
none
Rating
4.39 / 8
74 Votes
http://codebin.yi.org/361
page generated in 0.00 seconds