从Swift 4中的InputStream中读取n个字节

时间:2022-01-17 20:10:08

I have a server that sends me messages over TCP where the first 4 bytes determine the length of the rest of the message. So I need to

我有一个服务器通过TCP向我发送消息,其中前4个字节确定消息的其余部分的长度。所以我需要

1) read 4 bytes into an UInt32 (works) and store it into bytes_expected

1)将4个字节读入UInt32(工作)并将其存储到bytes_expected中

2) read bytes_expected bytes into message

2)将bytes_expected字节读入消息

Right now my code looks like this:

现在我的代码看起来像这样:

private let inputStreamAccessQueue  = DispatchQueue(label: "SynchronizedInputStreamAccess")

func inputStreamHandler(_ event: Stream.Event) {
    switch event {
        case Stream.Event.hasBytesAvailable:
            self.handleInput()

        ...
    }
}

func handleInput() {
    // **QUESTION: Do I use this barrier wrong?**
    self.inputStreamAccessQueue.sync(flags: .barrier) {            
        guard let istr = self.inputStream else {
            log.error(self.buildLogMessage("InputStream is nil"))
            return
        }

        guard istr.hasBytesAvailable else {
            log.error(self.buildLogMessage("handleInput() called when inputstream has no bytes available"))
            return
        }

        let lengthbuffer = UnsafeMutablePointer<UInt8>.allocate(capacity: 4)
        defer { lengthbuffer.deallocate(capacity: 4) }
        let lenbytes_read = istr.read(lengthbuffer, maxLength: 4)

        guard lenbytes_read == 4 else {
            self.errorHandler(NetworkingError.InputError("Input Stream received \(lenbytes_read) (!=4) bytes"))
            return
        }

        let bytes_expected = Int(UnsafeRawPointer(lengthbuffer).load(as: UInt32.self).bigEndian)
        log.info(self.buildLogMessage("expect \(bytes_expected) bytes"))

        print("::DEBUG", call, "bytes_expected", bytes_expected)

        var message = ""
        var bytes_missing = bytes_expected
        while bytes_missing > 0 {
            //print("::DEBUG", call, "bytes_missing", bytes_missing)
            let buffer = UnsafeMutablePointer<UInt8>.allocate(capacity: bytes_missing)
            let bytes_read = istr.read(buffer, maxLength: bytes_missing)

            print("::DEBUG", call, "bytes_read", bytes_read)

            guard bytes_read > 0 else {
                print("bytes_read not > 0: \(bytes_read)")
                return
            }

            guard bytes_read <= bytes_missing else {
                print("Read more bytes than expected. missing=\(bytes_missing), read=\(bytes_read)")
                return
            }

            guard let partial_message = String(bytesNoCopy: buffer, length: bytes_read, encoding: .utf8, freeWhenDone: true) else {
                log.error("ERROR WHEN READING")
                return
            }

            message = message + partial_message
            bytes_missing -= bytes_read
        }

        self.handleMessage(message)
    }
}

My problem is that istr.read(buffer, maxLength: bytes_missing) sometimes does not read all messages at once, so I loop until I have read all I want. But I still see my app crashing (rarely) because handleInput() is called again while another call to that method is still running. In this case, bytes_expected contains random values and the app crashes due to illegal memory allocation.

我的问题是istr.read(buffer,maxLength:bytes_missing)有时不会一次读取所有消息,所以我循环直到我读完了我想要的全部内容。但我仍然看到我的应用程序崩溃(很少)因为再次调用handleInput()而另一个对该方法的调用仍在运行。在这种情况下,bytes_expected包含随机值,并且应用程序因非法内存分配而崩溃。

I thought I could avoid this by using the barrier. But it seems this does not work... Am I using the barrier wrong?

我以为我可以通过使用屏障避免这种情况。但似乎这不起作用......我使用屏障是错误的吗?

2 个解决方案

#1


1  

My suggestion is not to fight against the asynchronous nature of network I/O. Read and collect data in a buffer whenever the Stream.Event.hasBytesAvailable event is signalled. If the buffer contains enough data (4 length bytes plus the expected message length) then process the data and remove it. Otherwise do nothing and wait for more data.

我的建议不是要对抗网络I / O的异步性质。每当发信号通知Stream.Event.hasBytesAvailable事件时,在缓冲区中读取和收集数据。如果缓冲区包含足够的数据(4个长度字节加上预期的消息长度),则处理数据并将其删除。否则什么也不做,等待更多数据。

The following (untested) code is meant as a demonstration. It shows only the parts which are relevant for this particular problem. Initialization, event handler, etc are omitted for brevity.

以下(未经测试的)代码用作演示。它仅显示与此特定问题相关的部分。为简洁起见,省略了初始化,事件处理程序等。

class MessageReader {

    var buffer = Data(count: 1024) // Must be large enough for largest message + 4
    var bytesRead = 0 // Number of bytes read so far

    // Called from `handleInput` with a complete message.
    func processMessage(message: Data) {
        // ...
    }

    // Called from event handler if `Stream.Event.hasBytesAvailable` is signalled.
    func handleInput(istr: InputStream) {
        assert(bytesRead < buffer.count)

        // Read from input stream, appending to previously read data:
        let maxRead = buffer.count - bytesRead
        let amount = buffer.withUnsafeMutableBytes { (p: UnsafeMutablePointer<UInt8>) in
            istr.read(p + bytesRead, maxLength: maxRead)
        }
        guard amount > 0 else {
            // handle EOF or read error ...
            fatalError()
        }
        bytesRead += amount

        while bytesRead >= 4 {
            // Read message size:
            let messageSize = buffer.withUnsafeBytes { (p: UnsafePointer<UInt32>) in
                Int(UInt32(bigEndian: p.pointee))
            }
            let totalSize = 4 + messageSize
            guard totalSize <= buffer.count else {
                // Handle buffer too small for message situation ...
                fatalError()
            }

            if bytesRead < totalSize {
                break // Not enough data to read message.
            }

            // Buffer contains complete message now. Process it ...
            processMessage(message: buffer[4 ..< totalSize])

            // ... and remove it from the buffer:
            if totalSize < bytesRead {
                // Move remaining data to the front:
                buffer.withUnsafeMutableBytes { (p: UnsafeMutablePointer<UInt8>) in
                    _ = memmove(p, p + totalSize, bytesRead - totalSize)
                }
            }
            bytesRead -= totalSize
        }
    }
}

#2


1  

Inspired by Martin R (https://*.com/a/48344040/3827381 - Thank you very much!) I came up with this solution:

灵感来自Martin R(https://*.com/a/48344040/3827381 - 非常感谢!)我提出了这个解决方案:

var buffer = Data(count: 4096)
var offset = 0 // the index of the first byte that can be overridden
var readState = 0
var missingMsgBytes = 0
var msg = ""

func handleInput(_ istr: InputStream) {
    assert(buffer.count >= 5, "buffer must be large enough to contain length info (4 bytes) and at least one payload byte => min 5 bytes buffer required")
    assert(offset < buffer.count, "offset \(offset) is not smaller than \(buffer.count)")

    let toRead = buffer.count - offset
    let read = buffer.withUnsafeMutableBytes { (p: UnsafeMutablePointer<UInt8>) in istr.read(p + offset, maxLength: toRead) }
    guard read > 0 else {
        self.errorHandler(NetworkingError.InputError("Input Stream received \(read) bytes which is smaller than 0 => Network error"))
        return
    }
    offset += read
    var msgStart = 0
    var msgEnd = 0

    if readState == 0 {
        if offset < 4 {
            return
        }
        missingMsgBytes = buffer[0...3].withUnsafeBytes { (p: UnsafePointer<UInt32>) in Int(UInt32(bigEndian: p.pointee)) }
        msgStart = 4
        msgEnd = offset
        readState = 1
    } else {
        msgStart = 0
        msgEnd = offset
    }

    var fullMessageRead = false

    if readState == 1 {
        let payloadRead = msgEnd - msgStart
        if payloadRead <= missingMsgBytes {
            assert(msgEnd > msgStart, "msgEnd (\(msgEnd) <= msgStart \(msgStart). This should not happen")
            if msgEnd > msgStart {
                msg += String(data: buffer[msgStart..<msgEnd], encoding: .utf8)!
                missingMsgBytes -= payloadRead
                offset = 0
            }
            fullMessageRead = (missingMsgBytes == 0)
        } else { // read more than was missing
            msg += String(data: buffer[msgStart..<msgStart+missingMsgBytes], encoding: .utf8)!
            fullMessageRead = true
            buffer.withUnsafeMutableBytes { (p: UnsafeMutablePointer<UInt8>) in
                _ = memmove(p, p + missingMsgBytes, read - missingMsgBytes) // dst, src, number
            }
            offset = read-missingMsgBytes
        }
    }

    if fullMessageRead {
        handleMessage(msg)
        readState = 0
         msg = ""
        missingMsgBytes = 0
    }
}

This solution is able to read messages of arbitrary size. The buffer size only determines how much can be read at one time => The bigger the buffer, the faster the app.

该解决方案能够读取任意大小的消息。缓冲区大小仅确定一次可读取的数量=>缓冲区越大,应用程序越快。

I tested the code for about an hour now and it did not crash. The old code crashed after 1-2 minutes. It seems to be finally working now.

我现在测试了大约一个小时的代码并且它没有崩溃。旧代码在1-2分钟后崩溃。它现在似乎终于有效了。

But as I want to improve my programming knowledge I'd like to ask if there are some unnecessary complicated things in my code or if anyone sees a bug that could possibly still cause the app to crash or to read wrong data?

但是,当我想提高我的编程知识时,我想问一下我的代码中是否存在一些不必要的复杂内容,或者是否有人发现可能仍会导致应用程序崩溃或读取错误数据的错误?

#1


1  

My suggestion is not to fight against the asynchronous nature of network I/O. Read and collect data in a buffer whenever the Stream.Event.hasBytesAvailable event is signalled. If the buffer contains enough data (4 length bytes plus the expected message length) then process the data and remove it. Otherwise do nothing and wait for more data.

我的建议不是要对抗网络I / O的异步性质。每当发信号通知Stream.Event.hasBytesAvailable事件时,在缓冲区中读取和收集数据。如果缓冲区包含足够的数据(4个长度字节加上预期的消息长度),则处理数据并将其删除。否则什么也不做,等待更多数据。

The following (untested) code is meant as a demonstration. It shows only the parts which are relevant for this particular problem. Initialization, event handler, etc are omitted for brevity.

以下(未经测试的)代码用作演示。它仅显示与此特定问题相关的部分。为简洁起见,省略了初始化,事件处理程序等。

class MessageReader {

    var buffer = Data(count: 1024) // Must be large enough for largest message + 4
    var bytesRead = 0 // Number of bytes read so far

    // Called from `handleInput` with a complete message.
    func processMessage(message: Data) {
        // ...
    }

    // Called from event handler if `Stream.Event.hasBytesAvailable` is signalled.
    func handleInput(istr: InputStream) {
        assert(bytesRead < buffer.count)

        // Read from input stream, appending to previously read data:
        let maxRead = buffer.count - bytesRead
        let amount = buffer.withUnsafeMutableBytes { (p: UnsafeMutablePointer<UInt8>) in
            istr.read(p + bytesRead, maxLength: maxRead)
        }
        guard amount > 0 else {
            // handle EOF or read error ...
            fatalError()
        }
        bytesRead += amount

        while bytesRead >= 4 {
            // Read message size:
            let messageSize = buffer.withUnsafeBytes { (p: UnsafePointer<UInt32>) in
                Int(UInt32(bigEndian: p.pointee))
            }
            let totalSize = 4 + messageSize
            guard totalSize <= buffer.count else {
                // Handle buffer too small for message situation ...
                fatalError()
            }

            if bytesRead < totalSize {
                break // Not enough data to read message.
            }

            // Buffer contains complete message now. Process it ...
            processMessage(message: buffer[4 ..< totalSize])

            // ... and remove it from the buffer:
            if totalSize < bytesRead {
                // Move remaining data to the front:
                buffer.withUnsafeMutableBytes { (p: UnsafeMutablePointer<UInt8>) in
                    _ = memmove(p, p + totalSize, bytesRead - totalSize)
                }
            }
            bytesRead -= totalSize
        }
    }
}

#2


1  

Inspired by Martin R (https://*.com/a/48344040/3827381 - Thank you very much!) I came up with this solution:

灵感来自Martin R(https://*.com/a/48344040/3827381 - 非常感谢!)我提出了这个解决方案:

var buffer = Data(count: 4096)
var offset = 0 // the index of the first byte that can be overridden
var readState = 0
var missingMsgBytes = 0
var msg = ""

func handleInput(_ istr: InputStream) {
    assert(buffer.count >= 5, "buffer must be large enough to contain length info (4 bytes) and at least one payload byte => min 5 bytes buffer required")
    assert(offset < buffer.count, "offset \(offset) is not smaller than \(buffer.count)")

    let toRead = buffer.count - offset
    let read = buffer.withUnsafeMutableBytes { (p: UnsafeMutablePointer<UInt8>) in istr.read(p + offset, maxLength: toRead) }
    guard read > 0 else {
        self.errorHandler(NetworkingError.InputError("Input Stream received \(read) bytes which is smaller than 0 => Network error"))
        return
    }
    offset += read
    var msgStart = 0
    var msgEnd = 0

    if readState == 0 {
        if offset < 4 {
            return
        }
        missingMsgBytes = buffer[0...3].withUnsafeBytes { (p: UnsafePointer<UInt32>) in Int(UInt32(bigEndian: p.pointee)) }
        msgStart = 4
        msgEnd = offset
        readState = 1
    } else {
        msgStart = 0
        msgEnd = offset
    }

    var fullMessageRead = false

    if readState == 1 {
        let payloadRead = msgEnd - msgStart
        if payloadRead <= missingMsgBytes {
            assert(msgEnd > msgStart, "msgEnd (\(msgEnd) <= msgStart \(msgStart). This should not happen")
            if msgEnd > msgStart {
                msg += String(data: buffer[msgStart..<msgEnd], encoding: .utf8)!
                missingMsgBytes -= payloadRead
                offset = 0
            }
            fullMessageRead = (missingMsgBytes == 0)
        } else { // read more than was missing
            msg += String(data: buffer[msgStart..<msgStart+missingMsgBytes], encoding: .utf8)!
            fullMessageRead = true
            buffer.withUnsafeMutableBytes { (p: UnsafeMutablePointer<UInt8>) in
                _ = memmove(p, p + missingMsgBytes, read - missingMsgBytes) // dst, src, number
            }
            offset = read-missingMsgBytes
        }
    }

    if fullMessageRead {
        handleMessage(msg)
        readState = 0
         msg = ""
        missingMsgBytes = 0
    }
}

This solution is able to read messages of arbitrary size. The buffer size only determines how much can be read at one time => The bigger the buffer, the faster the app.

该解决方案能够读取任意大小的消息。缓冲区大小仅确定一次可读取的数量=>缓冲区越大,应用程序越快。

I tested the code for about an hour now and it did not crash. The old code crashed after 1-2 minutes. It seems to be finally working now.

我现在测试了大约一个小时的代码并且它没有崩溃。旧代码在1-2分钟后崩溃。它现在似乎终于有效了。

But as I want to improve my programming knowledge I'd like to ask if there are some unnecessary complicated things in my code or if anyone sees a bug that could possibly still cause the app to crash or to read wrong data?

但是,当我想提高我的编程知识时,我想问一下我的代码中是否存在一些不必要的复杂内容,或者是否有人发现可能仍会导致应用程序崩溃或读取错误数据的错误?