Possible Duplicate:
Is the received stream from a socket limited to a single send command?可能重复:从套接字接收的流是否仅限于单个发送命令?
Note: I see this question very complicated (hopefully not for you guys, that's why Im asking here lol) and I tried my best to explain it as simple and clear as possible.
注意:我发现这个问题非常复杂(希望对你们来说不是这样,这就是为什么我在这里问lol),我尽我最大的努力尽可能简单明了地解释它。
In my application, I'm continually receiving byte arrays in a fix sized buffer.
在我的应用程序中,我不断地在一个固定大小的缓冲区中接收字节数组。
These series of byte arrays that I'm receiving has been serialized 'binarily'.
我正在接收的这些字节数组已经被序列化为“二进制”。
However, sometimes the byte array received will be bigger than the fix sized buffer so I would need to store the current received byte array into a container and loop again to receive the remaining byte arrays coming in.
但是,有时接收到的字节数组会大于修复大小的缓冲区,因此我需要将当前接收到的字节数组存储到容器中,然后再次循环,以接收即将到来的剩余字节数组。
My question now is how to "concatenate" or "combine" or "join" all the "batches" of byte arrays I received ( and is stored in a container, possibly a queue of byte arrays) to form a single byte array and then de-serialize them?
我现在的问题是,如何“连接”或“合并”或“加入”我收到的所有字节数组的“批次”(并存储在容器中,可能是一个字节数组队列),以形成单个字节数组,然后对它们进行反序列化?
int bytesRead = client.EndReceive(ar);
if (bytesRead > 0)
{
// There might be more data, so store the data received so far.
// If the buffer was not filled, I have to get the number of bytes received as Thorsten Dittmar was saying, before queuing it
dataReceivedQueue.Enqueue(state.buffer);
// Get the rest of the data.
client.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0,
new AsyncCallback(ReceiveCallback_onQuery), state);
}
else
{
// All the data has arrived; put it in response.
response_onQueryHistory = ByteArrayToObject(functionThatCombinesBytes(dataReceivedQueue));
// Signal that all bytes have been received.
receiveDoneQuery.Set();
}
state.buffer is buffer where data are received. buffer is a byte array of size 4096. state is of type StateObject.
状态。缓冲区是接收数据的缓冲区。缓冲区是一个大小为4096的字节数组。状态类型为StateObject。
ByteArrayToObject(byte []) takes care of deserializing the data received and converting it back to its object form
ByteArrayToObject(byte[])负责对接收到的数据进行反序列化,并将其转换回对象形式
functionThatCombinesBytes(Queue) this function will receive a Queue of bytes and will "combine" all the bytes into one byte array
这个函数将接收一个字节队列,并将所有字节“组合”成一个字节数组
3 个解决方案
#1
3
Just because you are calling BeginReceive
with a buffer of a particular size, doesn't mean that it will necessarily entirely fill the buffer, so it's very likely that some of your queued buffers will actually only be partially filled with received data, and the remainder being zeros, this will almost certainly corrupt your combined stream if you simply concatenate them together since you're not also storing the number of bytes actually read into the buffer. You also appear to be reusing the same buffer each time, so you'll just be overwriting already-read data with new data.
仅仅因为你叫BeginReceive缓冲的大小,并不意味着它一定会完全填满缓冲区,所以很有可能你的一些队列缓冲区会只是部分充满了接收的数据,其余是0,这几乎肯定会腐败结合流如果你简单地连接在一起,因为你不是也存储实际上读取到缓冲区的字节数。您似乎每次都在重复使用相同的缓冲区,所以您只是在用新的数据重写已经读取的数据。
I would therefore suggest replacing your dataReceivedQueue
with a MemoryStream
, and using something like:
因此,我建议使用MemoryStream替换您的dataReceivedQueue,并使用以下方法:
if (bytesRead > 0)
{
// There might be more data, so store the data received so far.
memoryStream.Write(state.buffer, 0, bytesRead);
// Get the rest of the data.
client.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0,
new AsyncCallback(ReceiveCallback_onQuery), state);
}
else
{
// All the data has arrived; put it in response.
response_onQueryHistory = ByteArrayToObject(memoryStream.ToArray());
// Signal that all bytes have been received.
receiveDoneQuery.Set();
}
#2
2
First of all, unless your dataReceivedQueue
's type implements its own (or overrides Queue
's) Enqueue
method, your state.buffer
would be rewritten with each client.BeginReceive
call.
首先,除非您的dataReceivedQueue类型实现自己的(或重写队列的)Enqueue方法,否则就是您的状态。每个客户端都会重写缓冲区。BeginReceive电话。
You can simply add a MemoryStream
member to your StateObject
and append bytes to it as they come:
您可以简单地向您的StateObject添加一个MemoryStream成员,并在其出现时添加字节:
state.rawData.Seek(0, SeekOrigin.End);
state.rawData.Write(state.buffer, 0, bytesRead);
#3
1
First of all, you need to not only store the byte array, but also the number of bytes in the arrays that are actually valid. For example, each receive may not fully fill the buffer, thus the number of bytes is returned (bytesRead
in your code).
首先,您不仅需要存储字节数组,还需要存储实际上有效的数组中的字节数。例如,每个接收可能没有完全填充缓冲区,因此返回的字节数(bytesRead在您的代码中)。
If you had this, you could calculate the size of the final buffer by summing up the number of received bytes for each "batch".
如果您有这个,您可以通过计算每个“批”的接收字节数来计算最终缓冲区的大小。
After that you can - in a loop - use Array.Copy
to copy a "batch" to a specified position with a specified length into the target array.
之后,您可以在循环中使用数组。将“批处理”复制到目标数组中指定长度的指定位置。
For example, this could look like this:
例如,它可以是这样的:
// Batch is a class that contains the batch byte buffer and the number of bytes valid
int destinationPos = 0;
byte[] destination = new byte[<number of bytes in total>];
foreach (Batch b in batches)
{
Array.Copy(b.Bytes, 0, destination, destinationPos, b.ValidLength);
}
#1
3
Just because you are calling BeginReceive
with a buffer of a particular size, doesn't mean that it will necessarily entirely fill the buffer, so it's very likely that some of your queued buffers will actually only be partially filled with received data, and the remainder being zeros, this will almost certainly corrupt your combined stream if you simply concatenate them together since you're not also storing the number of bytes actually read into the buffer. You also appear to be reusing the same buffer each time, so you'll just be overwriting already-read data with new data.
仅仅因为你叫BeginReceive缓冲的大小,并不意味着它一定会完全填满缓冲区,所以很有可能你的一些队列缓冲区会只是部分充满了接收的数据,其余是0,这几乎肯定会腐败结合流如果你简单地连接在一起,因为你不是也存储实际上读取到缓冲区的字节数。您似乎每次都在重复使用相同的缓冲区,所以您只是在用新的数据重写已经读取的数据。
I would therefore suggest replacing your dataReceivedQueue
with a MemoryStream
, and using something like:
因此,我建议使用MemoryStream替换您的dataReceivedQueue,并使用以下方法:
if (bytesRead > 0)
{
// There might be more data, so store the data received so far.
memoryStream.Write(state.buffer, 0, bytesRead);
// Get the rest of the data.
client.BeginReceive(state.buffer, 0, StateObject.BufferSize, 0,
new AsyncCallback(ReceiveCallback_onQuery), state);
}
else
{
// All the data has arrived; put it in response.
response_onQueryHistory = ByteArrayToObject(memoryStream.ToArray());
// Signal that all bytes have been received.
receiveDoneQuery.Set();
}
#2
2
First of all, unless your dataReceivedQueue
's type implements its own (or overrides Queue
's) Enqueue
method, your state.buffer
would be rewritten with each client.BeginReceive
call.
首先,除非您的dataReceivedQueue类型实现自己的(或重写队列的)Enqueue方法,否则就是您的状态。每个客户端都会重写缓冲区。BeginReceive电话。
You can simply add a MemoryStream
member to your StateObject
and append bytes to it as they come:
您可以简单地向您的StateObject添加一个MemoryStream成员,并在其出现时添加字节:
state.rawData.Seek(0, SeekOrigin.End);
state.rawData.Write(state.buffer, 0, bytesRead);
#3
1
First of all, you need to not only store the byte array, but also the number of bytes in the arrays that are actually valid. For example, each receive may not fully fill the buffer, thus the number of bytes is returned (bytesRead
in your code).
首先,您不仅需要存储字节数组,还需要存储实际上有效的数组中的字节数。例如,每个接收可能没有完全填充缓冲区,因此返回的字节数(bytesRead在您的代码中)。
If you had this, you could calculate the size of the final buffer by summing up the number of received bytes for each "batch".
如果您有这个,您可以通过计算每个“批”的接收字节数来计算最终缓冲区的大小。
After that you can - in a loop - use Array.Copy
to copy a "batch" to a specified position with a specified length into the target array.
之后,您可以在循环中使用数组。将“批处理”复制到目标数组中指定长度的指定位置。
For example, this could look like this:
例如,它可以是这样的:
// Batch is a class that contains the batch byte buffer and the number of bytes valid
int destinationPos = 0;
byte[] destination = new byte[<number of bytes in total>];
foreach (Batch b in batches)
{
Array.Copy(b.Bytes, 0, destination, destinationPos, b.ValidLength);
}