第2月第1天 GCDAsyncSocket dispatch_source_set_event_handler

时间:2023-03-08 20:17:54
+ (void)startCFStreamThreadIfNeeded
{
LogTrace(); static dispatch_once_t predicate;
dispatch_once(&predicate, ^{ cfstreamThreadRetainCount = 0;
cfstreamThreadSetupQueue = dispatch_queue_create("GCDAsyncSocket-CFStreamThreadSetup", DISPATCH_QUEUE_SERIAL);
}); dispatch_sync(cfstreamThreadSetupQueue, ^{ @autoreleasepool { if (++cfstreamThreadRetainCount == 1)
{
cfstreamThread = [[NSThread alloc] initWithTarget:self
selector:@selector(cfstreamThread)
object:nil];
[cfstreamThread start];
}
}});
} + (void)cfstreamThread { @autoreleasepool
{
[[NSThread currentThread] setName:GCDAsyncSocketThreadName]; LogInfo(@"CFStreamThread: Started"); // We can't run the run loop unless it has an associated input source or a timer.
// So we'll just create a timer that will never fire - unless the server runs for decades.
[NSTimer scheduledTimerWithTimeInterval:[[NSDate distantFuture] timeIntervalSinceNow]
target:self
selector:@selector(ignore:)
userInfo:nil
repeats:YES]; NSThread *currentThread = [NSThread currentThread];
NSRunLoop *currentRunLoop = [NSRunLoop currentRunLoop]; BOOL isCancelled = [currentThread isCancelled]; while (!isCancelled && [currentRunLoop runMode:NSDefaultRunLoopMode beforeDate:[NSDate distantFuture]])
{
isCancelled = [currentThread isCancelled];
} LogInfo(@"CFStreamThread: Stopped");
}} - (BOOL)addStreamsToRunLoop
{
LogTrace(); NSAssert(dispatch_get_specific(IsOnSocketQueueOrTargetQueueKey), @"Must be dispatched on socketQueue");
NSAssert((readStream != NULL && writeStream != NULL), @"Read/Write stream is null"); if (!(flags & kAddedStreamsToRunLoop))
{
LogVerbose(@"Adding streams to runloop..."); [[self class] startCFStreamThreadIfNeeded];
[[self class] performSelector:@selector(scheduleCFStreams:)
onThread:cfstreamThread
withObject:self
waitUntilDone:YES]; flags |= kAddedStreamsToRunLoop;
} return YES;
}

  

一、GCDAsyncSocket的核心就是dispatch_source_set_event_handler

1.accpet回调

            accept4Source = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, socket4FD, , socketQueue);

            int socketFD = socket4FD;
dispatch_source_t acceptSource = accept4Source; dispatch_source_set_event_handler(accept4Source, ^{ @autoreleasepool { LogVerbose(@"event4Block"); unsigned long i = ;
unsigned long numPendingConnections = dispatch_source_get_data(acceptSource); LogVerbose(@"numPendingConnections: %lu", numPendingConnections); while ([self doAccept:socketFD] && (++i < numPendingConnections));
}});

2.read,write回调

    readSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, socketFD, , socketQueue);
writeSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, socketFD, , socketQueue); // Setup event handlers dispatch_source_set_event_handler(readSource, ^{ @autoreleasepool { LogVerbose(@"readEventBlock"); socketFDBytesAvailable = dispatch_source_get_data(readSource);
LogVerbose(@"socketFDBytesAvailable: %lu", socketFDBytesAvailable); if (socketFDBytesAvailable > )
[self doReadData];
else
[self doReadEOF];
}}); dispatch_source_set_event_handler(writeSource, ^{ @autoreleasepool { LogVerbose(@"writeEventBlock"); flags |= kSocketCanAcceptBytes;
[self doWriteData];
}});

二,缓存区

1.创建

GCDAsyncReadPacket没有传入buffer,则readpacket没有缓冲区,socket可读时会放入preBuffer

- (void)readDataToData:(NSData *)data withTimeout:(NSTimeInterval)timeout tag:(long)tag
{
[self readDataToData:data withTimeout:timeout buffer:nil bufferOffset: maxLength: tag:tag];
} - (void)readDataToData:(NSData *)data
withTimeout:(NSTimeInterval)timeout
buffer:(NSMutableData *)buffer
bufferOffset:(NSUInteger)offset
tag:(long)tag
{
[self readDataToData:data withTimeout:timeout buffer:buffer bufferOffset:offset maxLength: tag:tag];
} - (void)readDataToData:(NSData *)data withTimeout:(NSTimeInterval)timeout maxLength:(NSUInteger)length tag:(long)tag
{
[self readDataToData:data withTimeout:timeout buffer:nil bufferOffset: maxLength:length tag:tag];
} - (void)readDataToData:(NSData *)data
withTimeout:(NSTimeInterval)timeout
buffer:(NSMutableData *)buffer
bufferOffset:(NSUInteger)offset
maxLength:(NSUInteger)maxLength
tag:(long)tag
{
if ([data length] == ) {
LogWarn(@"Cannot read: [data length] == 0");
return;
}
if (offset > [buffer length]) {
LogWarn(@"Cannot read: offset > [buffer length]");
return;
}
if (maxLength > && maxLength < [data length]) {
LogWarn(@"Cannot read: maxLength > 0 && maxLength < [data length]");
return;
} GCDAsyncReadPacket *packet = [[GCDAsyncReadPacket alloc] initWithData:buffer
startOffset:offset
maxLength:maxLength
timeout:timeout
readLength:
terminator:data
tag:tag]; dispatch_async(socketQueue, ^{ @autoreleasepool { LogTrace(); if ((flags & kSocketStarted) && !(flags & kForbidReadsWrites))
{
[readQueue addObject:packet];
[self maybeDequeueRead];
}
}}); // Do not rely on the block being run in order to release the packet,
// as the queue might get released without the block completing.
}

三、面向对象封装

1.socket可读的时候先用preBuffer接收,拷贝到currentRead->buffer中,为生产者-消费者模式.

    GCDAsyncReadPacket *currentRead;
GCDAsyncWritePacket *currentWrite; GCDAsyncSocketPreBuffer *preBuffer;
        uint8_t *buffer;

        if (readIntoPreBuffer)
{
[preBuffer ensureCapacityForWrite:bytesToRead]; buffer = [preBuffer writeBuffer];
} 。。。 int socketFD = (socket4FD == SOCKET_NULL) ? socket6FD : socket4FD; ssize_t result = read(socketFD, buffer, (size_t)bytesToRead);
LogVerbose(@"read from socket = %i", (int)result); 。。。 if (readIntoPreBuffer)
{
// We just read a big chunk of data into the preBuffer [preBuffer didWrite:bytesRead];
LogVerbose(@"read data into preBuffer - preBuffer.length = %zu", [preBuffer availableBytes]); // Search for the terminating sequence bytesToRead = [currentRead readLengthForTermWithPreBuffer:preBuffer found:&done];
LogVerbose(@"copying %lu bytes from preBuffer", (unsigned long)bytesToRead); // Ensure there's room on the read packet's buffer [currentRead ensureCapacityForAdditionalDataOfLength:bytesToRead]; // Copy bytes from prebuffer into read buffer uint8_t *readBuf = (uint8_t *)[currentRead->buffer mutableBytes] + currentRead->startOffset
+ currentRead->bytesDone; memcpy(readBuf, [preBuffer readBuffer], bytesToRead); // Remove the copied bytes from the prebuffer
[preBuffer didRead:bytesToRead];
LogVerbose(@"preBuffer.length = %zu", [preBuffer availableBytes]); // Update totals
currentRead->bytesDone += bytesToRead;
totalBytesReadForCurrentRead += bytesToRead; // Our 'done' variable was updated via the readLengthForTermWithPreBuffer:found: method above
}

4.runloop