RSS

(root)/iphone/tappity : 53 : common/source/SocketMessenger.m

To get this branch, use:
bzr branch /browse/iphone/tappity

« back to all changes in this revision

Viewing changes to common/source/SocketMessenger.m

Dömötör Gulyás
2009-11-06 16:51:25
Revision ID: dognotdog@gmail.com-20091106155125-ui432mcq9jgp6332
improvements on socket messenger and tappity

Show diffs side-by-side

added added

removed removed

23
23
NSString* SocketMessageIdKey    = @"SocketMessageId";
24
24
NSString* SocketMessageDataKey  = @"SocketMessageData";
25
25
 
 
26
const int kSocketMessengerTerminateMsg = -1;
 
27
 
 
28
 
26
29
@interface SocketMessenger (Private)
27
30
- (void) startCommsWorkThreads;
 
31
- (void) startWatchdogThread;
 
32
- (void) startListening;
28
33
@end
29
34
 
30
35
@implementation SocketMessenger
35
40
                return nil;
36
41
                
37
42
        commsSocket = -1;
 
43
        server.listenSocket = -1;
 
44
        
 
45
        automaticallyReconnect = YES;
38
46
        
39
47
        sendLock = [[NSCondition alloc] init];
 
48
        sendQueue = [[NSMutableArray alloc] init];
40
49
                
41
50
        return self;
42
51
}
71
80
                [self retain];
72
81
        }
73
82
 
74
 
        while ([self threadActive: info])
 
83
        while (rxThreadShouldRun)
75
84
        {
76
85
                struct timeval tv;
77
86
                fd_set readfds;
95
104
                FD_SET(socket, &readfds);
96
105
                FD_SET(socket, &errorfds);
97
106
                maxSocket = MAX(maxSocket, socket);
98
 
                
99
 
                if (maxSocket < 0)
100
 
                {
101
 
                        //printf("No sockets, sleeping for a bit...\n");
102
 
                        usleep(1000000);
103
 
                        continue;
104
 
                }
105
107
 
106
108
                if (select(maxSocket+1, &readfds, &writefds, &errorfds, &tv) < 0)
107
109
                {
108
110
                        perror("select");
109
 
                        break;
 
111
                        goto SELECT_ERR;
110
112
                }
 
113
                
 
114
                if (FD_ISSET(socket, &errorfds))
 
115
                        goto SELECT_ERR;
 
116
 
111
117
 
112
118
                if (FD_ISSET(socket, &readfds))
113
119
                {
129
135
                                }
130
136
                                else if (actuallyRead == -1)
131
137
                                {
132
 
                                        //close(commsSocket);
133
 
                                        //self->commsSocket = 0;
134
138
                                        if (errno != ETIMEDOUT)
135
139
                                        {
136
140
                                                printf("Connection dropped with error.\n");
137
 
                                                break;
 
141
                                                goto RECV_ERR;
138
142
                                        }
139
143
                                }
140
144
                                else if (actuallyRead == 0)
141
145
                                {
142
146
                                        printf("remote socket closed.\n");
143
 
                                        break;
 
147
                                        goto RECV_ERR;
144
148
                                }
145
149
                                
146
150
                                if (!expectedMessageSize)
155
159
                                if (actuallyRead == -1)
156
160
                                {
157
161
                                        printf("Connection dropped with error.\n");
158
 
                                        break;
 
162
                                        goto RECV_ERR;
159
163
                                }
160
164
                                else if (actuallyRead == 0)
161
165
                                {
162
166
                                        printf("remote socket closed.\n");
163
 
                                        break;
 
167
                                        goto RECV_ERR;
164
168
                                }
165
169
 
166
170
                                currentlyRead += actuallyRead;
171
175
 
172
176
                        if (messageFinished)
173
177
                        {
 
178
                                if (currentMessageId == kSocketMessengerTerminateMsg)
 
179
                                {
 
180
                                        break;
 
181
                                }
174
182
                                @synchronized(self)
175
183
                                {
176
184
                                        expectedMessageSize = 0;
182
190
                                        */
183
191
                                        
184
192
                                }
 
193
                                
 
194
                                //NSLog(@"dataReceived (%d) #%d", (int) [currentData length], currentMessageId);
185
195
 
186
196
                                if (receiveDataOnMainThread)
187
 
                                        [self performSelectorOnMainThread: @selector(dataReceived:) withObject: [NSDictionary dictionaryWithObjectsAndKeys: [NSNumber numberWithInt: currentMessageId], SocketMessageIdKey, currentData, SocketMessageDataKey, nil] waitUntilDone: NO];
 
197
                                        [delegate performSelectorOnMainThread: @selector(dataReceived:) withObject: [NSDictionary dictionaryWithObjectsAndKeys: [NSNumber numberWithInt: currentMessageId], SocketMessageIdKey, currentData, SocketMessageDataKey, nil] waitUntilDone: NO];
188
198
                                else
189
 
                                        [self dataReceived: [NSDictionary dictionaryWithObjectsAndKeys: [NSNumber numberWithInt: currentMessageId], SocketMessageIdKey, currentData, SocketMessageDataKey, nil]];
 
199
                                        [delegate dataReceived: [NSDictionary dictionaryWithObjectsAndKeys: [NSNumber numberWithInt: currentMessageId], SocketMessageIdKey, currentData, SocketMessageDataKey, nil]];
190
200
 
191
201
                                currentData = nil;
192
202
                                currentlyRead = 0;
193
203
                        }
194
204
 
195
205
                }
 
206
 
 
207
                continue;
 
208
 
 
209
RECV_ERR:
 
210
SELECT_ERR:
 
211
                close(commsSocket);
 
212
                self->commsSocket = -1;
 
213
                break;
 
214
 
196
215
        }
197
216
        
 
217
        rxThreadActive = 0;
 
218
        
198
219
        [self threadWillExit: info];
199
220
 
200
221
        @synchronized(self)
207
228
}
208
229
 
209
230
 
210
 
- (void) runThreadWithSelector: (SEL) selector
 
231
- (void) runThreadWithTarget: (id) target selector: (SEL) selector
211
232
{
212
233
        @synchronized(self)
213
234
        {
216
237
                id number = [NSNumber numberWithInt: threadIds++];
217
238
 
218
239
                [activeThreads setObject: [NSNumber numberWithBool: YES] forKey: number];
219
 
                [NSThread detachNewThreadSelector: selector toTarget: self withObject: [number retain]];
 
240
                [NSThread detachNewThreadSelector: selector toTarget: target withObject: [number retain]];
220
241
        }
221
242
}
222
243
 
237
258
 
238
259
- (void) sendingThread: (id) info
239
260
{
 
261
//      pthread_setname_np("sendingThread");
 
262
 
240
263
        NSAutoreleasePool* pool = [[NSAutoreleasePool alloc] init];
241
264
 
242
265
        @synchronized(self)
244
267
                [self retain];
245
268
        }
246
269
 
247
 
        while ([self threadActive: info])
 
270
        while (txThreadShouldRun)
248
271
        {
249
272
                [sendLock lock];
250
273
                        while (![sendQueue count])
269
292
                {
270
293
                        socket = commsSocket;
271
294
                }
 
295
                
 
296
                //printf("sending %d bytes\n", (int) sizeToSend);
272
297
 
273
298
                while (dataSent < sizeToSend)
274
299
                {
275
300
                        if ((err = send(socket, header + dataSent, sizeToSend - dataSent, 0)) == -1)
276
301
                        {
277
302
                                perror("send");
278
 
                                break;
 
303
                                goto SEND_ERR;
279
304
                        }
280
305
                        else
281
306
                                dataSent += err;
290
315
                                if ((err = send(socket, [data bytes] + dataSent, sizeToSend - dataSent, 0)) == -1)
291
316
                                {
292
317
                                        perror("send");
293
 
                                        break;
 
318
                                        goto SEND_ERR;
294
319
                                }
295
320
                                else
296
321
                                        dataSent += err;
297
322
                                        
298
323
                        }
299
324
                }
300
 
                [dict release];
301
325
                
302
326
                if (err == -1)
303
 
                        break;
304
 
                
 
327
                        goto SEND_ERR;
 
328
 
 
329
                [dict release];
 
330
                continue;
 
331
 
 
332
SEND_ERR:
 
333
                close(commsSocket);
 
334
                commsSocket = -1;
 
335
                [dict release];
 
336
                break;
305
337
        }
306
338
        
307
 
 
 
339
        txThreadActive = 0;
 
340
        
308
341
        [self threadWillExit: info];
309
342
 
310
343
        @synchronized(self)
316
349
        [pool drain];
317
350
}
318
351
 
319
 
- (void) connectToService: (NSNetService*) service
 
352
- (void) connectToRemoteService
320
353
{
321
 
        NSInteger       port = [service port];
322
 
        NSString*       hostName = [service hostName];
 
354
        NSInteger       port = [remoteService port];
 
355
        NSString*       hostName = [remoteService hostName];
323
356
        
324
357
        NSLog(@"SocketMessenger attempting to connect to %@ : %d", hostName, port);
325
358
 
371
404
        }
372
405
 
373
406
        freeaddrinfo(servinfo); // all done with this structure
 
407
        
 
408
        assert(sockfd != -1);
374
409
 
375
410
        NSLog(@"SocketMessenger connected");
376
411
 
378
413
        self->commsSocket = sockfd;
379
414
        
380
415
        [self startCommsWorkThreads];
 
416
        [self startWatchdogThread];
381
417
                
382
418
}
383
419
 
 
420
- (void) connectToService: (NSNetService*) service
 
421
{
 
422
        [service retain];
 
423
        [remoteService release];
 
424
        remoteService = service;
 
425
        [self connectToRemoteService];
 
426
}
 
427
 
 
428
- (void) stopCommsWorkThreads
 
429
{
 
430
        while (rxThreadActive || txThreadActive)
 
431
        {
 
432
                rxThreadShouldRun = 0;
 
433
                txThreadShouldRun = 0;
 
434
                [sendLock lock];
 
435
                [sendLock signal];
 
436
                [sendLock unlock];
 
437
                usleep(10000);
 
438
        }
 
439
}
 
440
 
 
441
- (void) stopWatchdogThread
 
442
{
 
443
        while (wdThreadActive)
 
444
        {
 
445
                wdThreadShouldRun = 0;
 
446
                usleep(10000);
 
447
        }
 
448
}
 
449
 
 
450
- (void) startWatchdogThread
 
451
{
 
452
        //[self stopWatchdogThread];
 
453
 
 
454
        if (!wdThreadActive)
 
455
        {
 
456
                wdThreadShouldRun = 1;
 
457
                @synchronized(self)
 
458
                {
 
459
                        wdThreadActive++;
 
460
                }
 
461
                [self runThreadWithTarget: self selector: @selector(watchdogThread:)];
 
462
        }
 
463
}
 
464
 
 
465
- (void) stopAcceptThread
 
466
{
 
467
        while (server.acThreadActive)
 
468
        {
 
469
                server.acThreadShouldRun = 0;
 
470
                usleep(10000);
 
471
        }
 
472
}
 
473
 
 
474
- (void) startAcceptThread
 
475
{
 
476
        [self stopAcceptThread];
 
477
 
 
478
        server.acThreadShouldRun = 1;
 
479
        server.acThreadActive = 1;
 
480
 
 
481
        [self runThreadWithTarget: self selector: @selector(acceptThread:)];
 
482
}
 
483
 
 
484
 
384
485
- (void) startCommsWorkThreads
385
486
{
386
 
        [self runThreadWithSelector: @selector(sendingThread:)];
387
 
        [self runThreadWithSelector: @selector(receivingThread:)];
388
 
        
389
 
        if ([self respondsToSelector: @selector(connectionWasEstablished)])
390
 
                [self connectionWasEstablished];
 
487
        [self stopCommsWorkThreads];
 
488
        
 
489
        rxThreadShouldRun = 1;
 
490
        rxThreadActive = 1;
 
491
        txThreadShouldRun = 1;
 
492
        txThreadActive = 1;
 
493
 
 
494
        [self runThreadWithTarget: self selector: @selector(sendingThread:)];
 
495
        [self runThreadWithTarget: self selector: @selector(receivingThread:)];
 
496
        
 
497
        if ([delegate respondsToSelector: @selector(connectionWasEstablished:)])
 
498
                [delegate performSelectorOnMainThread: @selector(connectionWasEstablished:) withObject: self waitUntilDone: NO];
391
499
}
392
500
 
393
501
static void _setStandardSocketOpts(int socket)
409
517
                [self retain];
410
518
        }
411
519
 
412
 
        while ([self threadActive: info])
 
520
        while (server.acThreadShouldRun)
413
521
        {
414
522
 
415
523
                struct timeval tv;
430
538
                FD_SET(lsock, &readfds);
431
539
                FD_SET(lsock, &errorfds);
432
540
                maxSocket = MAX(maxSocket, lsock);
433
 
                
434
 
                if (maxSocket < 0)
435
 
                {
436
 
                        printf("No sockets, sleeping for a bit...\n");
437
 
                        usleep(1000000);
438
 
                        continue;
439
 
                }
440
 
                
 
541
                                
441
542
                //NSLog(@"listening for connection...");
442
543
 
443
544
                if (select(maxSocket+1, &readfds, NULL, &errorfds, &tv) < 0)
444
545
                {
445
546
                        perror("select");
 
547
                        goto SELECT_ERR;
446
548
                        break;
447
549
                }
448
550
 
465
567
                                else
466
568
                                {
467
569
                                        printf("Error accepting connection.\n");
 
570
                                        goto ACCEPT_ERR;
468
571
                                        break;
469
572
                                }
470
573
                        }
482
585
                        [server.netService release];
483
586
                        server.netService = nil;
484
587
                        
 
588
                        close(server.listenSocket);
 
589
                        server.listenSocket = -1;
 
590
                        
485
591
                        [self startCommsWorkThreads];
486
592
                        
487
593
                        NSLog(@"Accepted connection.");
488
 
                }
489
 
 
490
 
        }
 
594
                        
 
595
                        goto ACCEPT_SUCCESS;
 
596
                }
 
597
                
 
598
                continue;
 
599
 
 
600
ACCEPT_ERR:
 
601
SELECT_ERR:
 
602
                close(server.listenSocket);
 
603
                server.listenSocket = -1;
 
604
                break;
 
605
 
 
606
ACCEPT_SUCCESS:
 
607
                break;
 
608
        }
 
609
        
 
610
        server.acThreadActive = 0;
 
611
 
 
612
        @synchronized(self)
 
613
        {
 
614
                [self release];
 
615
        }
 
616
 
 
617
        [info release];
 
618
        [pool drain];
 
619
}
 
620
 
 
621
- (void) watchdogThread: (id) info
 
622
{
 
623
        NSAutoreleasePool* pool = [[NSAutoreleasePool alloc] init];
 
624
 
 
625
        @synchronized(self)
 
626
        {
 
627
                [self retain];
 
628
        }
 
629
 
 
630
        while (wdThreadShouldRun)
 
631
        {
 
632
                if (isServing)
 
633
                {
 
634
                        if ((commsSocket == -1) && (server.listenSocket == -1))
 
635
                        {
 
636
                                NSLog(@"watchdog noticed connection error");
 
637
                                if (automaticallyReconnect)
 
638
                                {
 
639
                                        NSLog(@"restarting server");
 
640
                                        [self stopCommsWorkThreads];
 
641
                                        [self startListening];
 
642
                                }
 
643
                                else 
 
644
                                {
 
645
                                        if ([delegate respondsToSelector: @selector(connectionWasTerminated:)])
 
646
                                                [delegate performSelectorOnMainThread: @selector(connectionWasTerminated:) withObject: self waitUntilDone: NO];
 
647
                                        break;
 
648
                                }
 
649
                        }
 
650
                }
 
651
                else
 
652
                {
 
653
                        if (commsSocket == -1)
 
654
                        {
 
655
                                NSLog(@"watchdog noticed connection error");
 
656
                                if (automaticallyReconnect)
 
657
                                {
 
658
                                        NSLog(@"restarting client");
 
659
                                        [self stopCommsWorkThreads];
 
660
                                        [self connectToRemoteService];
 
661
                                }
 
662
                                else 
 
663
                                {
 
664
                                        if ([delegate respondsToSelector: @selector(connectionWasTerminated:)])
 
665
                                                [delegate performSelectorOnMainThread: @selector(connectionWasTerminated:) withObject: self waitUntilDone: NO];
 
666
                                        break;
 
667
                                }
 
668
                        }
 
669
                }
 
670
                usleep(1000000);
 
671
                continue;
 
672
 
 
673
WATCH_ERR:
 
674
                assert(0);
 
675
                break;
 
676
 
 
677
        }
 
678
 
 
679
        @synchronized(self)
 
680
        {
 
681
                wdThreadActive--;
 
682
        }
 
683
        
491
684
 
492
685
        @synchronized(self)
493
686
        {
508
701
        
509
702
        assert([protocol length] && server.listenSocket);
510
703
        
511
 
        NSLog(@"tappity port: %d", ntohs(server.myAddress.sin_port));
 
704
        NSLog(@"tappity port: %d", server.portnum);
512
705
 
513
706
        [server.netService stop];
514
707
        [server.netService release];
515
708
 
516
 
        server.netService = [[NSNetService alloc] initWithDomain: domain type: protocol name: name port: ntohs(server.myAddress.sin_port)];
 
709
        server.netService = [[NSNetService alloc] initWithDomain: domain type: protocol name: name port: server.portnum];
517
710
        if(server.netService == nil)
518
711
                return NO;
519
712
        
520
 
        [server.netService setDelegate:self];
 
713
        [server.netService setDelegate: self];
521
714
//      [server.netService scheduleInRunLoop:[NSRunLoop currentRunLoop] forMode:NSRunLoopCommonModes];
522
715
        [server.netService publish];
523
716
        
525
718
}
526
719
 
527
720
 
 
721
 
528
722
- (void) startListening
529
723
{
530
 
        server.myAddress.sin_family = AF_INET;                                  // host byte order
531
 
        server.myAddress.sin_port = htons(server.portnum);              // short, network byte order, any port
532
 
        server.myAddress.sin_addr.s_addr = htonl(INADDR_ANY);   // auto-fill with my IP
 
724
        struct sockaddr_in myAddress;
 
725
        memset(&myAddress, 0, sizeof(myAddress));
 
726
 
 
727
        myAddress.sin_family = AF_INET;                                 // host byte order
 
728
        myAddress.sin_port = htons(server.portnum);             // short, network byte order, any port
 
729
        myAddress.sin_addr.s_addr = htonl(INADDR_ANY);  // auto-fill with my IP
533
730
 
534
731
        server.listenSocket = socket(PF_INET, SOCK_STREAM, 0);
535
732
        assert(server.listenSocket != -1);
536
733
 
537
734
        _setStandardSocketOpts(server.listenSocket);
538
735
 
539
 
        int err = bind(server.listenSocket, (struct sockaddr *)&server.myAddress, sizeof(server.myAddress));
 
736
        int err = bind(server.listenSocket, (struct sockaddr *)&myAddress, sizeof(myAddress));
540
737
        assert(-1 != err);
541
738
        
542
739
        err = listen(server.listenSocket, 1);
543
740
        assert(-1 != err);
544
741
 
545
 
        [self runThreadWithSelector: @selector(acceptThread:)];
 
742
        [self startAcceptThread];
 
743
        [self startWatchdogThread];
546
744
        
547
745
        if([self enableBonjourWithDomain: @"" applicationProtocol: server.protocolName name: server.serviceName])
548
746
                NSLog(@"tappity bounjour advertisments up and running");
561
759
        server.protocolName = [protocol retain];
562
760
        
563
761
        server.portnum = pnum;
 
762
 
 
763
        isServing = YES;
564
764
        
565
765
        [self startListening];
566
766
 
567
767
}
568
768
 
 
769
- (void) terminateConnection
 
770
{
 
771
        [self stopWatchdogThread];
 
772
        [self stopAcceptThread];
 
773
        [self stopCommsWorkThreads];
 
774
}
 
775
 
569
776
- (void) dealloc
570
777
{
 
778
 
571
779
        [server.serviceName release];
572
780
 
573
781
        [server.netService stop];
577
785
        [sendQueue release];
578
786
        [sendLock release];
579
787
 
 
788
        [remoteService release];
 
789
 
580
790
        [super dealloc];
581
791
}
582
792
 
 
793
@synthesize delegate, receiveDataOnMainThread, automaticallyReconnect;
583
794
 
584
795
@end

Loggerhead 1.17 is a web-based interface for Bazaar branches