Sunday, March 23, 2008

Part 2: How to Transfer Variable Length Messages With Async Sockets

In my previous post about transferring data in the async manner. I was talking about designing a small data exchange protocol to transfer a message over the network. All I/O was done in the async manner using BeginXXX/EndXXX pattern. Code on that post was handling single message only. However, in the real world it rarely happens that only one message is being transferred over single connection. It is more common to expect that several messages can be received by the peer.



Data exchange protocol contains messages prefixed by the size. Size prefix has fixed length.

Prefixing data with its size is the corner stone of the simple data transfer protocol introduced in the previous example. There is no problem transferring multiple messages over single connection. Remote peer should have no problem distinguishing separate messages from the data stream.
This post will provide code sample how to read multiple messages from the network.

What to expect when multiple messages arrive at the server?
While dealing with multiple messages one has to remember that receive operation can return arbitrary number of bytes being read from the net. Typically that size is from 0 to specified buffer length in the Receive or BeginReceive methods.

Our data exchange format is illustrated on the image above.
Peer code after receiving number of bytes should be able to answer what part of the message it has just received. Is it part of the size prefix or it is a message body?
Sometimes, several messages can be received at one Receive call.

Let's see what situations we can encounter while processing incoming data:
- received data contain only data size prefix
- received data contain part of the data size prefix
- received data contain prefix and part of the data
- received data contain prefix, message data and part of the prefix of the next message
- received data contain prefix, message, prefix of the next message and part of its body.



When developing data processing code one has to expect the above illustrated scenarios will happen.

Here's the server code that handles conditions described above. I present here only server callback function. Client sending code can be obtained from the previous post .


private void ServerReadCallback(IAsyncResult ar)
{
try
{
ServerState state = (ServerState)ar.AsyncState;
Socket client = state.Client;
SocketError socketError;

int dataRead = client.EndReceive(ar, out socketError);
int dataOffset = 0; //to simplify logic
int restOfData = 0;

if (socketError != SocketError.Success)
{
client.Close();
return;
}

if (dataRead <= 0)
{
client.Close();
return;
}

while (dataRead > 0)
{
//check to determine what income data contain: size prefix or message
if (!state.DataSizeReceived)
{
//there is already some data in the buffer
if (state.Data.Length > 0)
{
restOfData = PrefixSize - (int)state.Data.Length;
state.Data.Write(state.Buffer, dataOffset, restOfData);
dataRead -= restOfData;
dataOffset += restOfData;
}
else if (dataRead >= PrefixSize)
{ //store whole data size prefix
state.Data.Write(state.Buffer, dataOffset, PrefixSize);
dataRead -= PrefixSize;
dataOffset += PrefixSize;
}
else
{ // store only part of the size prefix
state.Data.Write(state.Buffer, dataOffset, dataRead);
dataOffset += dataRead;
dataRead = 0;
}

if (state.Data.Length == PrefixSize )
{ //we received data size prefix
state.DataSize = BitConverter.ToInt32(state.Data.GetBuffer(), 0);
state.DataSizeReceived = true;
//reset internal data stream
state.Data.Position = 0;
state.Data.SetLength(0);
}
else
{ //we received just part of the prefix information
//issue another read
client.BeginReceive(state.Buffer, 0, state.Buffer.Length,
SocketFlags.None, new AsyncCallback(ServerReadCallback),
state);
return;
}
}

//at this point we know the size of the pending data
if ((state.Data.Length + dataRead) >= state.DataSize)
{ //we have all the data for this message

restOfData = state.DataSize - (int)state.Data.Length;

state.Data.Write(state.Buffer, dataOffset, restOfData);
Console.WriteLine("Data message received. Size: {0}",
state.DataSize);

//store received messages
//lock(messages)
// messages.Add(state.Data.ToArray());

dataOffset += restOfData;
dataRead -= restOfData;

//message received - cleanup internal memory stream
state.Data.SetLength(0);
state.Data.Position = 0;
state.DataSizeReceived = false;
state.DataSize = 0;

if (dataRead == 0)
{ //no more data remaining to process - issue another receive
client.BeginReceive(state.Buffer, 0, state.Buffer.Length,
SocketFlags.None, new AsyncCallback(ServerReadCallback),
state);
return;
}
else
continue; //there's still some data to process in the buffers
}
else
{ //there is still data pending, store what we've
//received and issue another BeginReceive
state.Data.Write(state.Buffer, dataOffset, dataRead);

client.BeginReceive(state.Buffer, 0, state.Buffer.Length,
SocketFlags.None, new AsyncCallback(ServerReadCallback), state);

dataRead = 0;
}
}
}
catch (Exception ex)
{
Console.WriteLine(ex.ToString());
}
}