RSS

(root)/iphone/tappity : 52 : common/source/SocketMessenger.m

To get this branch, use:
bzr branch /browse/iphone/tappity

« back to all changes in this revision

Viewing changes to common/source/SocketMessenger.m

Dömötör Gulyás
2009-10-31 12:32:32
Revision ID: dognotdog@gmail.com-20091031113232-lvx5g6lwnze2ugpu
moves networking into common class

Show diffs side-by-side

added added

removed removed

 
1
//
 
2
//  SocketMessenger.m
 
3
//  tappity
 
4
//
 
5
//  Created by döme on 30.10.2009.
 
6
//  Copyright 2009 __MyCompanyName__. All rights reserved.
 
7
//
 
8
 
 
9
#include <unistd.h>
 
10
#include <stdint.h>
 
11
#include <sys/socket.h>
 
12
#include <fcntl.h>
 
13
#include <termios.h>
 
14
#include <netinet/in.h>
 
15
#include <arpa/inet.h>
 
16
#include <sys/types.h>
 
17
#include <netdb.h>
 
18
#include <netinet/in.h>
 
19
#include <sys/time.h>
 
20
 
 
21
#import "SocketMessenger.h"
 
22
 
 
23
NSString* SocketMessageIdKey    = @"SocketMessageId";
 
24
NSString* SocketMessageDataKey  = @"SocketMessageData";
 
25
 
 
26
@interface SocketMessenger (Private)
 
27
- (void) startCommsWorkThreads;
 
28
@end
 
29
 
 
30
@implementation SocketMessenger
 
31
 
 
32
- (id) init
 
33
{
 
34
        if (!(self = [super init]))
 
35
                return nil;
 
36
                
 
37
        commsSocket = -1;
 
38
        
 
39
        sendLock = [[NSCondition alloc] init];
 
40
                
 
41
        return self;
 
42
}
 
43
 
 
44
- (BOOL) threadActive: (id) key
 
45
{
 
46
        return [[activeThreads objectForKey: key] boolValue];
 
47
}
 
48
 
 
49
- (void) threadWillExit: (id) key
 
50
{
 
51
        @synchronized (self)
 
52
        {
 
53
                [activeThreads removeObjectForKey: key];
 
54
                
 
55
                if (![activeThreads count])
 
56
                {
 
57
                        close(commsSocket);
 
58
                        commsSocket = -1;
 
59
                        
 
60
                        
 
61
                }
 
62
        }
 
63
}
 
64
 
 
65
- (void) receivingThread: (id) info
 
66
{
 
67
        NSAutoreleasePool* pool = [[NSAutoreleasePool alloc] init];
 
68
 
 
69
        @synchronized(self)
 
70
        {
 
71
                [self retain];
 
72
        }
 
73
 
 
74
        while ([self threadActive: info])
 
75
        {
 
76
                struct timeval tv;
 
77
                fd_set readfds;
 
78
                fd_set writefds;
 
79
                fd_set errorfds;
 
80
                int socket = -1;
 
81
                int maxSocket = -1;
 
82
                
 
83
                @synchronized(self)
 
84
                {
 
85
                        socket = commsSocket;
 
86
                }
 
87
 
 
88
                tv.tv_sec = 1;
 
89
                tv.tv_usec = 0;
 
90
 
 
91
                FD_ZERO(&readfds);
 
92
                FD_ZERO(&writefds);
 
93
                FD_ZERO(&errorfds);
 
94
 
 
95
                FD_SET(socket, &readfds);
 
96
                FD_SET(socket, &errorfds);
 
97
                maxSocket = MAX(maxSocket, socket);
 
98
                
 
99
                if (maxSocket < 0)
 
100
                {
 
101
                        //printf("No sockets, sleeping for a bit...\n");
 
102
                        usleep(1000000);
 
103
                        continue;
 
104
                }
 
105
 
 
106
                if (select(maxSocket+1, &readfds, &writefds, &errorfds, &tv) < 0)
 
107
                {
 
108
                        perror("select");
 
109
                        break;
 
110
                }
 
111
 
 
112
                if (FD_ISSET(socket, &readfds))
 
113
                {
 
114
                        BOOL messageFinished = NO;
 
115
                        //NSLog(@"receiving...");
 
116
                        if (!expectedMessageSize)
 
117
                        {
 
118
                                uint32_t header[2] = {0,0};
 
119
                                int actuallyRead = 0;
 
120
                        
 
121
                                actuallyRead = recv(socket, header, 8, MSG_PEEK);
 
122
                                
 
123
                                if (actuallyRead == 8)
 
124
                                {
 
125
                                        actuallyRead = recv(socket, header, 8, 0);
 
126
                                        expectedMessageSize = ntohl(header[1]);
 
127
                                        currentMessageId = ntohl(header[0]);
 
128
                                        currentData = [[NSMutableData alloc] initWithLength: expectedMessageSize];
 
129
                                }
 
130
                                else if (actuallyRead == -1)
 
131
                                {
 
132
                                        //close(commsSocket);
 
133
                                        //self->commsSocket = 0;
 
134
                                        if (errno != ETIMEDOUT)
 
135
                                        {
 
136
                                                printf("Connection dropped with error.\n");
 
137
                                                break;
 
138
                                        }
 
139
                                }
 
140
                                else if (actuallyRead == 0)
 
141
                                {
 
142
                                        printf("remote socket closed.\n");
 
143
                                        break;
 
144
                                }
 
145
                                
 
146
                                if (!expectedMessageSize)
 
147
                                        messageFinished = YES;
 
148
                        }
 
149
                        else
 
150
                        {
 
151
                                size_t readAmount = expectedMessageSize - currentlyRead;
 
152
                                int actuallyRead = 0;
 
153
                        
 
154
                                actuallyRead = recv(socket, [currentData mutableBytes] + currentlyRead, readAmount, 0);
 
155
                                if (actuallyRead == -1)
 
156
                                {
 
157
                                        printf("Connection dropped with error.\n");
 
158
                                        break;
 
159
                                }
 
160
                                else if (actuallyRead == 0)
 
161
                                {
 
162
                                        printf("remote socket closed.\n");
 
163
                                        break;
 
164
                                }
 
165
 
 
166
                                currentlyRead += actuallyRead;
 
167
                                
 
168
                                if (currentlyRead == expectedMessageSize)
 
169
                                        messageFinished = YES;
 
170
                        }
 
171
 
 
172
                        if (messageFinished)
 
173
                        {
 
174
                                @synchronized(self)
 
175
                                {
 
176
                                        expectedMessageSize = 0;
 
177
                                        /*
 
178
                                        if (!receivedPackets)
 
179
                                                receivedPackets = [[NSMutableArray alloc] init];
 
180
                                        [receivedPackets addObject: currentData];
 
181
                                        [currentData release];
 
182
                                        */
 
183
                                        
 
184
                                }
 
185
 
 
186
                                if (receiveDataOnMainThread)
 
187
                                        [self performSelectorOnMainThread: @selector(dataReceived:) withObject: [NSDictionary dictionaryWithObjectsAndKeys: [NSNumber numberWithInt: currentMessageId], SocketMessageIdKey, currentData, SocketMessageDataKey, nil] waitUntilDone: NO];
 
188
                                else
 
189
                                        [self dataReceived: [NSDictionary dictionaryWithObjectsAndKeys: [NSNumber numberWithInt: currentMessageId], SocketMessageIdKey, currentData, SocketMessageDataKey, nil]];
 
190
 
 
191
                                currentData = nil;
 
192
                                currentlyRead = 0;
 
193
                        }
 
194
 
 
195
                }
 
196
        }
 
197
        
 
198
        [self threadWillExit: info];
 
199
 
 
200
        @synchronized(self)
 
201
        {
 
202
                [self release];
 
203
        }
 
204
 
 
205
        [info release];
 
206
        [pool drain];
 
207
}
 
208
 
 
209
 
 
210
- (void) runThreadWithSelector: (SEL) selector
 
211
{
 
212
        @synchronized(self)
 
213
        {
 
214
                if (!activeThreads)
 
215
                        activeThreads = [[NSMutableDictionary alloc] init];
 
216
                id number = [NSNumber numberWithInt: threadIds++];
 
217
 
 
218
                [activeThreads setObject: [NSNumber numberWithBool: YES] forKey: number];
 
219
                [NSThread detachNewThreadSelector: selector toTarget: self withObject: [number retain]];
 
220
        }
 
221
}
 
222
 
 
223
- (void) sendData: (NSData*) tapData withIdentifier: (NSInteger) messageId
 
224
{               
 
225
        [sendLock lock];
 
226
 
 
227
        if (!sendQueue)
 
228
                sendQueue = [[NSMutableArray alloc] init];
 
229
 
 
230
        [sendQueue addObject: [NSDictionary dictionaryWithObjectsAndKeys: [NSNumber numberWithInteger: messageId], SocketMessageIdKey, tapData, SocketMessageDataKey, nil]];
 
231
        
 
232
        [sendLock signal];
 
233
        [sendLock unlock];
 
234
                
 
235
}
 
236
 
 
237
 
 
238
- (void) sendingThread: (id) info
 
239
{
 
240
        NSAutoreleasePool* pool = [[NSAutoreleasePool alloc] init];
 
241
 
 
242
        @synchronized(self)
 
243
        {
 
244
                [self retain];
 
245
        }
 
246
 
 
247
        while ([self threadActive: info])
 
248
        {
 
249
                [sendLock lock];
 
250
                        while (![sendQueue count])
 
251
                                [sendLock wait];
 
252
                                
 
253
                //NSLog(@"sending");
 
254
                
 
255
                NSDictionary* dict = [[sendQueue objectAtIndex: 0] retain];
 
256
                NSData* data = [dict objectForKey: SocketMessageDataKey];
 
257
                uint32_t messageId = [[dict objectForKey: SocketMessageIdKey] intValue];
 
258
                [sendQueue removeObjectAtIndex: 0];
 
259
                
 
260
                [sendLock unlock];
 
261
 
 
262
                size_t sizeToSend = 8;
 
263
                size_t dataSent = 0;
 
264
                uint32_t header[2] = {htonl(messageId), htonl([data length])};
 
265
                int err = 0;
 
266
                int socket = -1;
 
267
 
 
268
                @synchronized(self)
 
269
                {
 
270
                        socket = commsSocket;
 
271
                }
 
272
 
 
273
                while (dataSent < sizeToSend)
 
274
                {
 
275
                        if ((err = send(socket, header + dataSent, sizeToSend - dataSent, 0)) == -1)
 
276
                        {
 
277
                                perror("send");
 
278
                                break;
 
279
                        }
 
280
                        else
 
281
                                dataSent += err;
 
282
                }
 
283
                
 
284
                if (err != -1)
 
285
                {
 
286
                        sizeToSend = [data length];
 
287
                        dataSent = 0;
 
288
                        while (dataSent < sizeToSend)
 
289
                        {
 
290
                                if ((err = send(socket, [data bytes] + dataSent, sizeToSend - dataSent, 0)) == -1)
 
291
                                {
 
292
                                        perror("send");
 
293
                                        break;
 
294
                                }
 
295
                                else
 
296
                                        dataSent += err;
 
297
                                        
 
298
                        }
 
299
                }
 
300
                [dict release];
 
301
                
 
302
                if (err == -1)
 
303
                        break;
 
304
                
 
305
        }
 
306
        
 
307
 
 
308
        [self threadWillExit: info];
 
309
 
 
310
        @synchronized(self)
 
311
        {
 
312
                [self release];
 
313
        }
 
314
 
 
315
        [info release];
 
316
        [pool drain];
 
317
}
 
318
 
 
319
- (void) connectToService: (NSNetService*) service
 
320
{
 
321
        NSInteger       port = [service port];
 
322
        NSString*       hostName = [service hostName];
 
323
        
 
324
        NSLog(@"SocketMessenger attempting to connect to %@ : %d", hostName, port);
 
325
 
 
326
        int sockfd = 0;
 
327
 
 
328
 
 
329
//      _setStandardSocketOpts(socket);
 
330
 
 
331
        struct addrinfo hints, *servinfo = NULL, *p = NULL;
 
332
        int rv = 0;
 
333
 
 
334
        memset(&hints, 0, sizeof hints);
 
335
        hints.ai_family = AF_UNSPEC; // use AF_INET6 to force IPv6
 
336
        hints.ai_socktype = SOCK_STREAM;
 
337
        //hints.ai_protocol = IPPROTO_TCP;
 
338
        hints.ai_flags    = AI_PASSIVE;
 
339
        
 
340
//      NSHost* host = [NSHost hostWithName: hostName];
 
341
 
 
342
        if ((rv = getaddrinfo([hostName UTF8String], [[NSString stringWithFormat: @"%d", port] UTF8String], &hints, &servinfo)) != 0)
 
343
        {
 
344
                fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
 
345
                exit(1);
 
346
        }
 
347
 
 
348
        // loop through all the results and connect to the first we can
 
349
        for(p = servinfo; p != NULL; p = p->ai_next)
 
350
        {
 
351
                if ((sockfd = socket(p->ai_family, p->ai_socktype, p->ai_protocol)) == -1)
 
352
                {
 
353
                        perror("socket");
 
354
                        continue;
 
355
                }
 
356
 
 
357
                if (connect(sockfd, p->ai_addr, p->ai_addrlen) == -1)
 
358
                {
 
359
                        close(sockfd);
 
360
                        perror("connect");
 
361
                        continue;
 
362
                }
 
363
 
 
364
                break; // if we get here, we must have connected successfully
 
365
        }
 
366
 
 
367
        if (p == NULL) {
 
368
                // looped off the end of the list with no connection
 
369
                fprintf(stderr, "failed to connect\n");
 
370
                exit(2);
 
371
        }
 
372
 
 
373
        freeaddrinfo(servinfo); // all done with this structure
 
374
 
 
375
        NSLog(@"SocketMessenger connected");
 
376
 
 
377
 
 
378
        self->commsSocket = sockfd;
 
379
        
 
380
        [self startCommsWorkThreads];
 
381
                
 
382
}
 
383
 
 
384
- (void) startCommsWorkThreads
 
385
{
 
386
        [self runThreadWithSelector: @selector(sendingThread:)];
 
387
        [self runThreadWithSelector: @selector(receivingThread:)];
 
388
        
 
389
        if ([self respondsToSelector: @selector(connectionWasEstablished)])
 
390
                [self connectionWasEstablished];
 
391
}
 
392
 
 
393
static void _setStandardSocketOpts(int socket)
 
394
{
 
395
    int yes = 1;
 
396
    setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, (void *)&yes, sizeof(yes));
 
397
        int timeout = 2000;
 
398
        setsockopt(socket, SOL_SOCKET, SO_SNDTIMEO, (char*)&timeout, sizeof(timeout));
 
399
        setsockopt(socket, SOL_SOCKET, SO_RCVTIMEO, (char*)&timeout, sizeof(timeout));
 
400
}
 
401
 
 
402
 
 
403
- (void) acceptThread: (id) info
 
404
{
 
405
        NSAutoreleasePool* pool = [[NSAutoreleasePool alloc] init];
 
406
 
 
407
        @synchronized(self)
 
408
        {
 
409
                [self retain];
 
410
        }
 
411
 
 
412
        while ([self threadActive: info])
 
413
        {
 
414
 
 
415
                struct timeval tv;
 
416
                fd_set readfds;
 
417
                fd_set writefds;
 
418
                fd_set errorfds;
 
419
                int maxSocket = -1;
 
420
                
 
421
                int lsock = server.listenSocket;
 
422
 
 
423
                tv.tv_sec = 1;
 
424
                tv.tv_usec = 0;
 
425
 
 
426
                FD_ZERO(&readfds);
 
427
                FD_ZERO(&writefds);
 
428
                FD_ZERO(&errorfds);
 
429
 
 
430
                FD_SET(lsock, &readfds);
 
431
                FD_SET(lsock, &errorfds);
 
432
                maxSocket = MAX(maxSocket, lsock);
 
433
                
 
434
                if (maxSocket < 0)
 
435
                {
 
436
                        printf("No sockets, sleeping for a bit...\n");
 
437
                        usleep(1000000);
 
438
                        continue;
 
439
                }
 
440
                
 
441
                //NSLog(@"listening for connection...");
 
442
 
 
443
                if (select(maxSocket+1, &readfds, NULL, &errorfds, &tv) < 0)
 
444
                {
 
445
                        perror("select");
 
446
                        break;
 
447
                }
 
448
 
 
449
                if (FD_ISSET(lsock, &readfds))
 
450
                {
 
451
                        NSLog(@"Accepting connection...");
 
452
                        // accept
 
453
                        socklen_t       sinSize = sizeof(struct sockaddr_in);
 
454
                        struct sockaddr_in      peerAddress;
 
455
                        int newSocket = accept(lsock, (struct sockaddr *)&peerAddress, &sinSize);
 
456
                        
 
457
                        _setStandardSocketOpts(newSocket);
 
458
 
 
459
                        
 
460
                        if (newSocket == -1)
 
461
                        {
 
462
                                if (errno == EWOULDBLOCK)
 
463
                                {
 
464
                                }
 
465
                                else
 
466
                                {
 
467
                                        printf("Error accepting connection.\n");
 
468
                                        break;
 
469
                                }
 
470
                        }
 
471
                        
 
472
                        @synchronized(self)
 
473
                        {
 
474
                                if (commsSocket)
 
475
                                        close(commsSocket);
 
476
 
 
477
                                // add socket to sockets list
 
478
                                commsSocket = newSocket;
 
479
                        }
 
480
                        
 
481
                        [server.netService stop];
 
482
                        [server.netService release];
 
483
                        server.netService = nil;
 
484
                        
 
485
                        [self startCommsWorkThreads];
 
486
                        
 
487
                        NSLog(@"Accepted connection.");
 
488
                }
 
489
 
 
490
        }
 
491
 
 
492
        @synchronized(self)
 
493
        {
 
494
                [self release];
 
495
        }
 
496
 
 
497
        [info release];
 
498
        [pool drain];
 
499
}
 
500
 
 
501
 
 
502
- (BOOL) enableBonjourWithDomain:(NSString*)domain applicationProtocol:(NSString*)protocol name:(NSString*)name 
 
503
{
 
504
        if(![domain length])
 
505
                domain = @""; //Will use default Bonjour registration doamins, typically just ".local"
 
506
        if(![name length])
 
507
                name = @""; //Will use default Bonjour name, e.g. the name assigned to the device in iTunes
 
508
        
 
509
        assert([protocol length] && server.listenSocket);
 
510
        
 
511
        NSLog(@"tappity port: %d", ntohs(server.myAddress.sin_port));
 
512
 
 
513
        [server.netService stop];
 
514
        [server.netService release];
 
515
 
 
516
        server.netService = [[NSNetService alloc] initWithDomain: domain type: protocol name: name port: ntohs(server.myAddress.sin_port)];
 
517
        if(server.netService == nil)
 
518
                return NO;
 
519
        
 
520
        [server.netService setDelegate:self];
 
521
//      [server.netService scheduleInRunLoop:[NSRunLoop currentRunLoop] forMode:NSRunLoopCommonModes];
 
522
        [server.netService publish];
 
523
        
 
524
        return YES;
 
525
}
 
526
 
 
527
 
 
528
- (void) startListening
 
529
{
 
530
        server.myAddress.sin_family = AF_INET;                                  // host byte order
 
531
        server.myAddress.sin_port = htons(server.portnum);              // short, network byte order, any port
 
532
        server.myAddress.sin_addr.s_addr = htonl(INADDR_ANY);   // auto-fill with my IP
 
533
 
 
534
        server.listenSocket = socket(PF_INET, SOCK_STREAM, 0);
 
535
        assert(server.listenSocket != -1);
 
536
 
 
537
        _setStandardSocketOpts(server.listenSocket);
 
538
 
 
539
        int err = bind(server.listenSocket, (struct sockaddr *)&server.myAddress, sizeof(server.myAddress));
 
540
        assert(-1 != err);
 
541
        
 
542
        err = listen(server.listenSocket, 1);
 
543
        assert(-1 != err);
 
544
 
 
545
        [self runThreadWithSelector: @selector(acceptThread:)];
 
546
        
 
547
        if([self enableBonjourWithDomain: @"" applicationProtocol: server.protocolName name: server.serviceName])
 
548
                NSLog(@"tappity bounjour advertisments up and running");
 
549
 
 
550
}
 
551
 
 
552
- (void) startBonjourServerWithName: (NSString*) name protocol: (NSString*) protocol port: (int) pnum;
 
553
{
 
554
        NSLog(@"starting tappity server");
 
555
        
 
556
        [server.serviceName release];
 
557
        server.serviceName = [name retain];
 
558
        
 
559
        
 
560
        [server.protocolName release];
 
561
        server.protocolName = [protocol retain];
 
562
        
 
563
        server.portnum = pnum;
 
564
        
 
565
        [self startListening];
 
566
 
 
567
}
 
568
 
 
569
- (void) dealloc
 
570
{
 
571
        [server.serviceName release];
 
572
 
 
573
        [server.netService stop];
 
574
        [server.netService release];
 
575
 
 
576
        [activeThreads release];
 
577
        [sendQueue release];
 
578
        [sendLock release];
 
579
 
 
580
        [super dealloc];
 
581
}
 
582
 
 
583
 
 
584
@end

Loggerhead 1.17 is a web-based interface for Bazaar branches