RSS

(root)/iphone/common : /source/SocketMessenger.m (revision 100)

To get this branch, use:
bzr branch /browse/iphone/common
Line Revision Contents
1 52
//
2
//  SocketMessenger.m
3
//  tappity
4
//
5
//  Created by döme on 30.10.2009.
6 54
7 52
//
8
9 54
/*
10
 * Copyright (c) 2009 Doemoetoer Gulyas.
11
 * All rights reserved.
12
 *
13
 * Redistribution and use in source and binary forms, with or without
14
 * modification, are permitted provided that the following conditions
15
 * are met:
16
 * 1. Redistributions of source code must retain the above copyright
17
 *    notice, this list of conditions and the following disclaimer.
18
 * 2. Redistributions in binary form must reproduce the above copyright
19
 *    notice, this list of conditions and the following disclaimer in the
20
 *    documentation and/or other materials provided with the distribution.
21
 * 3. The name of the author may not be used to endorse or promote products
22
 *    derived from this software without specific prior written permission.
23
 *
24
 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
25
 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
26
 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
27
 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
28
 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
29
 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
30
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
31
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
32
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
33
 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
34
 */
35
36 52
#include <unistd.h>
37
#include <stdint.h>
38
#include <fcntl.h>
39 62
#include <sys/types.h>
40
#include <sys/time.h>
41 52
#include <termios.h>
42
#include <netinet/in.h>
43 62
#include <sys/socket.h>
44 52
#include <arpa/inet.h>
45
#include <netdb.h>
46
47
#import "SocketMessenger.h"
48
49
NSString* SocketMessageIdKey	= @"SocketMessageId";
50
NSString* SocketMessageDataKey	= @"SocketMessageData";
51 55
NSString* SocketMessengerKey	= @"SocketMessenger";
52 52
53 55
//const int kSocketMessengerTerminateMsg = -1;
54 53
55 94
static void _setStandardSocketOpts(int socket)
56
{
57
    int yes = 1;
58
    setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (void *)&yes, sizeof(yes));
59
	int timeout = 2000;
60
	setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, (char*)&timeout, sizeof(timeout));
61
	setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, (char*)&timeout, sizeof(timeout));
62
}
63
64
65
@implementation SocketMessengerService
66
{
67
	int				listenSocket;
68
	NSNetService*	netService;
69
	
70
	uint16_t		servicePort;
71
	NSString*		serviceName;
72
	NSString*		protocolName;
73
	
74
	dispatch_queue_t					delegateQueue;
75
	
76
	dispatch_source_t	listenDispatchSource;
77
78
	SocketMessengerServiceErrorHandler errorHandler;
79
	SocketMessengerServiceConnectionCallback connectionCallback;
80
}
81
82
@synthesize errorHandler, connectionCallback;
83
84
- (id) init
85
{
86
	if (!(self = [super init]))
87
		return nil;
88
		
89
	listenSocket = -1;
90
	
91
	self.delegateQueue = dispatch_get_main_queue();
92
93
94
	return self;
95
}
96
97
- (void) dealloc
98
{
99
	[self stopServiceSync];
100
}
101
102
- (void) finalize
103
{
104
	[super finalize];
105
}
106
107
- (BOOL) enableBonjourWithDomain: (NSString*) domain applicationProtocol: (NSString*) protocol name: (NSString*) name 
108
{
109
	if(![domain length])
110
		domain = @""; //Will use default Bonjour registration doamins, typically just ".local"
111
	if(![name length])
112
		name = @""; //Will use default Bonjour name, e.g. the name assigned to the device in iTunes
113
	
114
	assert([protocol length] && (listenSocket != -1));
115
	
116
	NSLog(@"Bonjour port: %d", servicePort);
117
118
	[netService stop];
119
120
	netService = [[NSNetService alloc] initWithDomain: domain type: protocol name: name port: servicePort];
121
	if(netService == nil)
122
		return NO;
123
124
	[netService setDelegate: self];
125
//	[server.netService scheduleInRunLoop:[NSRunLoop currentRunLoop] forMode:NSRunLoopCommonModes];
126
	[netService publish];
127
	
128
	return YES;
129
}
130
131
- (void) acceptConnection
132
{
133
134
	NSLog(@"Accepting connection...");
135
	// accept
136
	socklen_t	sinSize = sizeof(struct sockaddr_in);
137
	struct sockaddr_in	peerAddress;
138
	int newSocket = accept(listenSocket, (struct sockaddr *)&peerAddress, &sinSize);
139
	
140
	_setStandardSocketOpts(newSocket);
141
	
142
	void (^errorBlock)() = ^{
143
		dispatch_source_cancel(listenDispatchSource);
144
		
145
		if (errorHandler)
146
			dispatch_async(delegateQueue, ^(void) {
147
				errorHandler([NSError errorWithDomain: @"SocketMessenger.accept" code: -1 userInfo: nil], self);
148
			});
149
	};
150
151
	
152
	if (newSocket == -1)
153
	{
154
		if (errno == EWOULDBLOCK)
155
		{
156
		}
157
		else
158
		{
159
			printf("Error accepting connection.\n");
160
			errorBlock();
161
			return;
162
		}
163
	}
164
	
165
	SocketMessenger* messenger = [[SocketMessenger alloc] init];
166
	
167
	if (connectionCallback)
168 95
	{
169
		NSLog(@"Dispatching connection callback...");
170 94
		dispatch_async(delegateQueue, ^(void) {
171
			connectionCallback(messenger, self);
172
		});
173 95
	}
174
	NSLog(@"Starting messenger");
175 94
	
176
	[messenger startWithSocket: newSocket];
177
178
	NSLog(@"Accepted connection.");
179
	
180
	return;
181
}
182
183
184
185
- (void) startService
186
{
187
	struct sockaddr_in myAddress;
188
	memset(&myAddress, 0, sizeof(myAddress));
189
190
	myAddress.sin_family = AF_INET;					// host byte order
191
	myAddress.sin_port = htons(servicePort);		// short, network byte order, any port
192
	myAddress.sin_addr.s_addr = htonl(INADDR_ANY);	// auto-fill with my IP
193
194
	listenSocket = socket(PF_INET, SOCK_STREAM, 0);
195
	assert(listenSocket != -1);
196
197
	_setStandardSocketOpts(listenSocket);
198
199
	int err = bind(listenSocket, (struct sockaddr *)&myAddress, sizeof(myAddress));
200
	assert(-1 != err);
201
	
202
	err = listen(listenSocket, 1);
203
	assert(-1 != err);
204
	
205 95
	socklen_t slen = sizeof(myAddress);
206
	err = getsockname(listenSocket, (struct sockaddr *)&myAddress, &slen);
207
	assert(-1 != err);
208
	servicePort = ntohs(myAddress.sin_port);
209 94
	
210
	listenDispatchSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, listenSocket, 0, dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0));
211
212
	dispatch_source_set_cancel_handler(listenDispatchSource, ^{
213
		close(listenSocket);
214
		listenSocket = -1;
215
		dispatch_release(listenDispatchSource);
216
		listenDispatchSource = NULL;
217
	});
218
	
219
	dispatch_source_set_event_handler(listenDispatchSource, ^{ [self acceptConnection]; });
220
221
	dispatch_resume(listenDispatchSource);
222
	
223
	BOOL bonjourPartial = protocolName || serviceName;
224
	BOOL bonjourComplete = protocolName && serviceName;
225
	
226
	if (bonjourPartial && !bonjourComplete)
227
		NSLog(@"Bonjour service information incomplete, attempting to start anyway.");
228
	
229
	if (bonjourPartial)
230
		if([self enableBonjourWithDomain: @"" applicationProtocol: protocolName name: serviceName])
231
			NSLog(@"socket messenger servie bounjour advertisments up and running");
232
		else
233
			NSLog(@"Bonjour service startup failed");
234
235
}
236
237
- (void) stopServiceSync
238
{
239
	
240
	[netService stop];
241
	netService = nil;
242
	
243
	if (listenDispatchSource)
244
		dispatch_source_cancel(listenDispatchSource);
245
246
	while (listenDispatchSource) usleep(1);
247
	
248
	
249
}
250
251
- (void) setDelegateQueue: (dispatch_queue_t) aQueue
252
{
253
	dispatch_retain(aQueue);
254
255
	if (delegateQueue)
256
		dispatch_release(delegateQueue);
257
	delegateQueue = aQueue;
258
}
259
260
- (BOOL) isListening
261
{
262
	return listenSocket != -1;
263
}
264
265
@synthesize protocolName, serviceName, servicePort;
266
@end
267
268 53
269 52
@interface SocketMessenger (Private)
270
- (void) startCommsWorkThreads;
271 53
- (void) startWatchdogThread;
272
- (void) startListening;
273 52
@end
274
275
@implementation SocketMessenger
276 94
{
277
	int commsSocket;
278
		
279
	dispatch_io_t		socketChannel;
280
	
281
	dispatch_source_t	watchdogDispatchSource;
282
	dispatch_source_t	servicePollDispatchSource;
283
	
284
	dispatch_queue_t	readQueue;
285
	dispatch_queue_t	writeQueue;
286
	dispatch_queue_t	watchdogQueue;
287
	dispatch_queue_t	delegateQueue;
288
		
289
		
290
	BOOL		automaticallyReconnect;
291
	BOOL		errorOccured;
292
	
293
	NSNetService* remoteService;
294
	
295
	SocketMessengerPacketReadHandler readHandler;
296
	SocketMessengerConnectionBeginCallback connectionBeginCallback;
297
	SocketMessengerConnectionEndCallback connectionEndCallback;
298
}
299 52
300
- (id) init
301
{
302
	if (!(self = [super init]))
303
		return nil;
304
		
305
	commsSocket = -1;
306 53
	
307
	automaticallyReconnect = YES;
308 52
	
309 94
	
310 52
		
311 94
	self.readQueue = dispatch_get_main_queue();
312
	self.delegateQueue = dispatch_get_main_queue();
313
	self.writeQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
314
	self.watchdogQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_LOW, 0);
315
			
316 52
	return self;
317
}
318
319 55
- (BOOL) isConnected
320
{
321
	return commsSocket != -1;
322
}
323
324 97
- (void) sendPlist: (id) plist withIdentifier: (NSInteger) messageId
325
{
326
	NSData* data = [NSPropertyListSerialization dataFromPropertyList: plist format: NSPropertyListBinaryFormat_v1_0 errorDescription: nil];
327
	assert(data);
328
	
329
	dispatch_data_t dispatchData = dispatch_data_create([data bytes], [data length], dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), DISPATCH_DATA_DESTRUCTOR_DEFAULT);
330
	
331
	[self sendPacket: dispatchData withIdentifier: messageId];
332
	
333
	dispatch_release(dispatchData);
334
335
}
336
337 94
338
- (void) sendPacket: (dispatch_data_t) payload withIdentifier: (NSInteger) messageId
339
{
340
	size_t payloadSize = dispatch_data_get_size(payload);
341 95
	uint32_t header[2] = {CFSwapInt32HostToBig(payloadSize+4), CFSwapInt32HostToBig(messageId)};
342 94
343
	dispatch_data_t headerData = dispatch_data_create(header, 8, NULL, nil);
344
	
345
	dispatch_data_t data = dispatch_data_create_concat(headerData, payload);
346
	
347 95
	if (socketChannel)
348
	{
349
		dispatch_io_write(socketChannel, 0, data, writeQueue, ^(bool done, dispatch_data_t data, int error){
350
			if(error)
351
			{
352
				NSLog(@"write error %d", error);
353
				errorOccured = YES;
354
			}
355
		});
356
	}
357 94
	dispatch_release(headerData);
358
	dispatch_release(data);
359
}
360
361
362
363
- (void) startWithSocket: (int) socket
364
{
365
	assert(commsSocket == -1);
366
	commsSocket = socket;
367
	
368
	[self startCommsWorkThreads];
369 52
}
370
371 53
- (void) connectToRemoteService
372 52
{
373 53
	NSInteger	port = [remoteService port];
374
	NSString*	hostName = [remoteService hostName];
375 52
	
376 94
	NSLog(@"SocketMessenger attempting to connect to %@ : %d", hostName, (int)port);
377 52
378
	int sockfd = 0;
379
380
381
//	_setStandardSocketOpts(socket);
382
383
	struct addrinfo hints, *servinfo = NULL, *p = NULL;
384
	int rv = 0;
385
386
	memset(&hints, 0, sizeof hints);
387
	hints.ai_family = AF_UNSPEC; // use AF_INET6 to force IPv6
388
	hints.ai_socktype = SOCK_STREAM;
389
	//hints.ai_protocol = IPPROTO_TCP;
390
	hints.ai_flags    = AI_PASSIVE;
391
	
392
//	NSHost* host = [NSHost hostWithName: hostName];
393
394
	if ((rv = getaddrinfo([hostName UTF8String], [[NSString stringWithFormat: @"%d", port] UTF8String], &hints, &servinfo)) != 0)
395
	{
396
		fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
397
		exit(1);
398
	}
399
400
	// loop through all the results and connect to the first we can
401
	for(p = servinfo; p != NULL; p = p->ai_next)
402
	{
403
		if ((sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1)
404
		{
405
			perror("socket");
406
			continue;
407
		}
408
409
		if (connect(sockfd, p->ai_addr, p->ai_addrlen) == -1)
410
		{
411
			close(sockfd);
412
			perror("connect");
413
			continue;
414
		}
415
416
		break; // if we get here, we must have connected successfully
417
	}
418
419
	if (p == NULL) {
420
		// looped off the end of the list with no connection
421
		fprintf(stderr, "failed to connect\n");
422
		exit(2);
423
	}
424
425
	freeaddrinfo(servinfo); // all done with this structure
426 53
	
427
	assert(sockfd != -1);
428 52
429
	NSLog(@"SocketMessenger connected");
430
431
432
	self->commsSocket = sockfd;
433
	
434
	[self startCommsWorkThreads];
435 53
	[self startWatchdogThread];
436 52
		
437
}
438
439 53
- (void) connectToService: (NSNetService*) service
440
{
441
	remoteService = service;
442
	[self connectToRemoteService];
443
}
444
445
- (void) stopCommsWorkThreads
446
{
447 95
	if (socketChannel)
448
	{
449 97
		dispatch_io_t channel = socketChannel;
450
		socketChannel = NULL;
451
		dispatch_io_close(channel, DISPATCH_IO_STOP);
452
		dispatch_release(channel);
453 95
	}
454 94
	
455
456 53
}
457
458
- (void) stopWatchdogThread
459
{
460 94
	if (watchdogDispatchSource)
461
		dispatch_source_cancel(watchdogDispatchSource);
462
	while (watchdogDispatchSource)
463
		usleep(1);
464 53
}
465
466
- (void) startWatchdogThread
467
{
468 94
	watchdogDispatchSource = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, watchdogQueue);
469
	dispatch_source_set_timer(watchdogDispatchSource, dispatch_walltime(DISPATCH_TIME_NOW, 0), 1ull*1000000000ull, 0ull); // 1s
470
	
471
	dispatch_source_set_event_handler(watchdogDispatchSource, ^(void) {
472
		if (errorOccured || (commsSocket == -1))
473 53
		{
474 94
			[self stopCommsWorkThreads];
475
			NSLog(@"watchdog noticed connection error");
476
			if (automaticallyReconnect)
477
			{
478
				NSLog(@"restarting client");
479
				[self connectToRemoteService];
480
			}
481
			else 
482
			{
483
				if (connectionEndCallback)
484
					dispatch_async(delegateQueue, ^{ connectionEndCallback(self); });
485
				
486
				dispatch_source_cancel(watchdogDispatchSource);
487
			}
488 53
		}
489 94
		
490
	});
491
492
	dispatch_source_set_cancel_handler(watchdogDispatchSource,  ^{
493
		dispatch_release(watchdogDispatchSource);
494
		watchdogDispatchSource = NULL;
495
	});
496
497
	dispatch_resume(watchdogDispatchSource);
498
499 53
}
500
501
502 52
- (void) startCommsWorkThreads
503
{
504 53
	[self stopCommsWorkThreads];
505
	
506 94
	errorOccured = NO;
507
	
508
	
509
	__block dispatch_data_t incompleteData = dispatch_data_empty;
510
	__block uint32_t amountToRead = 0;
511
	
512
	dispatch_retain(incompleteData);
513
514
	socketChannel = dispatch_io_create(DISPATCH_IO_STREAM, commsSocket, readQueue, ^(int error) {
515
		close(commsSocket);
516
		commsSocket = -1;
517
		dispatch_release(incompleteData);
518
		incompleteData = NULL;
519
	});
520
	
521 95
	dispatch_io_set_low_water(socketChannel, 0);
522
	dispatch_io_set_high_water(socketChannel, SIZE_MAX);
523
	
524 97
	__block void (^payloadReader)(_Bool done, dispatch_data_t newData, int error);
525
526
	
527
	__block void (^headerReader)(_Bool done, dispatch_data_t newData, int error) = ^(_Bool done, dispatch_data_t newData, int error)
528
	{
529
		assert(self);
530
		dispatch_data_t data = dispatch_data_create_concat(incompleteData, newData);
531
		size_t dataSize = dispatch_data_get_size(data);
532
		if (done && dataSize)
533
		{
534
			assert(dataSize == 4);
535
			
536
			const void* headerBuf = NULL;
537
			dispatch_data_t headerMap = dispatch_data_create_map(data, &headerBuf, NULL);
538
			memcpy(&amountToRead, headerBuf, 4);
539
			
540
			amountToRead = CFSwapInt32BigToHost(amountToRead);
541
			
542
543
			dispatch_release(incompleteData);
544
			incompleteData = dispatch_data_empty;
545
			dispatch_retain(incompleteData);
546
547
			dispatch_release(headerMap);
548
			
549
			if (amountToRead)
550
			{
551
				dispatch_io_read(socketChannel, 0, amountToRead, readQueue, payloadReader);
552
			}
553
			else if (!error && newData)
554
			{
555
				dispatch_io_read(socketChannel, 0, 4, readQueue, headerReader);
556
			}
557
558
			if (error)
559
				NSLog(@"oops, error while reading header: %i", error);
560
			
561
		}
562
		else
563
		{
564
			dispatch_release(incompleteData);
565
			incompleteData = data;
566
			dispatch_retain(incompleteData);
567
		}
568
		dispatch_release(data);
569
		
570
	};
571
	
572
	payloadReader = ^(_Bool done, dispatch_data_t newData, int error) {
573
574
		dispatch_data_t data = dispatch_data_create_concat(incompleteData, newData);
575
		size_t dataSize = dispatch_data_get_size(data);
576
		if (done)
577
		{
578
			assert(dataSize == amountToRead);
579
			amountToRead = 0;
580
			
581
			assert(dataSize >= 4);
582
			dispatch_data_t packetIdData = dispatch_data_create_subrange(data, 0, 4);
583
			dispatch_data_t payloadData = dispatch_data_create_subrange(data, 4, dataSize-4);
584
585
			
586
			const void* headerBuf = NULL;
587
			uint32_t packetIdentifier = 0;
588
			dispatch_data_t idMap = dispatch_data_create_map(packetIdData, &headerBuf, NULL);
589
			memcpy(&packetIdentifier, headerBuf, 4);
590
			packetIdentifier = CFSwapInt32BigToHost(packetIdentifier);
591
			
592
			if (readHandler)
593
				readHandler(packetIdentifier, payloadData);
594
						
595
			dispatch_release(packetIdData);
596
			dispatch_release(payloadData);
597
			dispatch_release(idMap);
598
			
599
			
600
			dispatch_release(incompleteData);
601
			incompleteData = dispatch_data_empty;
602
			dispatch_retain(incompleteData);
603
			
604
			
605
			if (!error && newData)
606
			{
607
				dispatch_io_read(socketChannel, 0, 4, readQueue, headerReader);
608
			}
609
			
610
			if (error)
611
				NSLog(@"oops, error while reading payload: %i", error);
612
		}
613
		else
614
		{
615
			dispatch_release(incompleteData);
616
			incompleteData = data;
617
			dispatch_retain(incompleteData);
618
		}
619
		dispatch_release(data);
620
	};
621
622
	
623 94
	__block void (^reader)(_Bool done, dispatch_data_t newData, int error) = ^(_Bool done, dispatch_data_t newData, int error) {
624
		
625 95
		if (newData)
626
		{
627
			dispatch_data_t data = dispatch_data_create_concat(incompleteData, newData);
628
			size_t dataSize = dispatch_data_get_size(data);
629
			
630
			dispatch_release(incompleteData);
631
			incompleteData = dispatch_data_empty;
632
			dispatch_retain(incompleteData);
633
			
634
			if (!amountToRead)
635
			{
636
				if (dataSize >= 8)
637
				{
638
					dispatch_data_t headerData = dispatch_data_create_subrange(data, 0, 4);
639
					dispatch_data_t packetData = dispatch_data_create_subrange(data, 4, dataSize-4);
640
					const void* headerBuf = NULL;
641
					dispatch_data_t headerMap = dispatch_data_create_map(headerData, &headerBuf, NULL);
642
					
643
					
644
					memcpy(&amountToRead, headerBuf, 4);
645
					
646
					amountToRead = CFSwapInt32BigToHost(amountToRead);
647
					
648
					dispatch_release(headerMap);
649
					dispatch_release(headerData);
650
					
651
					reader(done, packetData, error);
652
					dispatch_release(packetData);
653
				}
654
				else
655
				{
656
					dispatch_release(incompleteData);
657
					incompleteData = data;
658
					dispatch_retain(incompleteData);
659
				}
660 94
			}
661
			else
662
			{
663 95
				if (dataSize < amountToRead)
664
				{
665
					dispatch_release(incompleteData);
666
					incompleteData = data;
667
					dispatch_retain(incompleteData);
668
				}
669
				else
670
				{
671
					assert(dataSize >= 4);
672
					dispatch_data_t packetIdData = dispatch_data_create_subrange(data, 0, 4);
673
					dispatch_data_t payloadData = dispatch_data_create_subrange(data, 4, amountToRead-4);
674
					dispatch_release(incompleteData);
675
					incompleteData = dispatch_data_create_subrange(data, amountToRead, dataSize-amountToRead);
676
677
					const void* headerBuf = NULL;
678
					uint32_t packetIdentifier = 0;
679
					dispatch_data_t idMap = dispatch_data_create_map(packetIdData, &headerBuf, NULL);
680
					memcpy(&packetIdentifier, headerBuf, 4);
681
					packetIdentifier = CFSwapInt32BigToHost(packetIdentifier);
682
683
					if (readHandler)
684
						readHandler(packetIdentifier, payloadData);
685
					
686
					if (dataSize > amountToRead)
687
						reader(done, dispatch_data_empty, error);
688
					
689
					dispatch_release(packetIdData);
690
					dispatch_release(payloadData);
691
					dispatch_release(idMap);
692
693
				}
694 53
			}
695 95
			dispatch_release(data);
696 53
		}
697 94
		if (error)
698
			errorOccured = 1;
699
	};
700
	
701
	
702
	if (connectionBeginCallback)
703
		dispatch_async(delegateQueue, ^{ connectionBeginCallback(self); });
704
705 97
	dispatch_io_read(socketChannel, 0, 4, readQueue, headerReader);
706
	//dispatch_io_read(socketChannel, 0, SIZE_MAX, readQueue, reader);
707 94
708
}
709
710
711
712
713
714
- (void) setDelegateQueue: (dispatch_queue_t) aQueue
715
{
716 97
	if (aQueue)
717
		dispatch_retain(aQueue);
718 94
	
719
	if (delegateQueue)
720
		dispatch_release(delegateQueue);
721
	delegateQueue = aQueue;
722
}
723
724
725
- (void) setReadQueue: (dispatch_queue_t) aQueue
726
{
727 97
	if (aQueue)
728
		dispatch_retain(aQueue);
729 94
	
730
	if (readQueue)
731
		dispatch_release(readQueue);
732
	readQueue = aQueue;
733
}
734
735
736
- (void) setWriteQueue: (dispatch_queue_t) aQueue
737
{
738 97
	if (aQueue)
739
		dispatch_retain(aQueue);
740 94
	
741
	if (writeQueue)
742
		dispatch_release(writeQueue);
743
	writeQueue = aQueue;
744
}
745
746
- (void) setWatchdogQueue: (dispatch_queue_t) aQueue
747
{
748 97
	if (aQueue)
749
		dispatch_retain(aQueue);
750 94
	
751
	if (watchdogQueue)
752
		dispatch_release(watchdogQueue);
753
	watchdogQueue = aQueue;
754
}
755
756
- (void) terminateConnectionSync
757 53
{
758
	[self stopWatchdogThread];
759
	[self stopCommsWorkThreads];
760
}
761
762 52
- (void) dealloc
763
{
764 53
765 94
	self.delegateQueue = nil;
766
	self.writeQueue = nil;
767
	self.readQueue = nil;
768
	self.watchdogQueue = nil;
769
770
771
}
772
773
@synthesize automaticallyReconnect, delegateQueue, readQueue, writeQueue, watchdogQueue, readHandler, connectionBeginCallback, connectionEndCallback;
774
775
@end
776
777
778
779
780
781
@implementation NSData (SocketMessengerExtensions)
782
783
- (void) applyDispatchData: (void (^)(dispatch_data_t data)) block
784
{
785
	dispatch_data_t data = dispatch_data_create([self bytes], [self length], dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0), DISPATCH_DATA_DESTRUCTOR_DEFAULT);
786
	
787
	block(data);
788
	
789
	dispatch_release(data);
790
}
791
792
@end
793
794

Loggerhead 1.17 is a web-based interface for Bazaar branches