aboutsummaryrefslogtreecommitdiffstats
path: root/lib/psmq.c
blob: 914d559fe83522ce3ec24cfa7f2a3d410fc34c69 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
/* ==========================================================================
    Licensed under BSD 2clause license See LICENSE file for more information
    Author: Michał Łyszczek <michal.lyszczek@bofc.pl>
   ==========================================================================
         ----------------------------------------------------------
        / this file contains all public functions of the psmq      \
        | library. Library is to make communication between client |
        \ and the broker an easy peasy task.                       /
         ----------------------------------------------------------
           \
            \
              _____   _________
             /     \_/         |
            |                 ||
            |                 ||
           |    ###\  /###   | |
           |     0  \/  0    | |
          /|                 | |
         / |        <        |\ \
        | /|                 | | |
        | |     \_______/   |  | |
        | |                 | / /
        /||                 /|||
           ----------------|
                | |    | |
                ***    ***
               /___\  /___\
   ==========================================================================
          _               __            __         ____ _  __
         (_)____   _____ / /__  __ ____/ /___     / __/(_)/ /___   _____
        / // __ \ / ___// // / / // __  // _ \   / /_ / // // _ \ / ___/
       / // / / // /__ / // /_/ // /_/ //  __/  / __// // //  __/(__  )
      /_//_/ /_/ \___//_/ \__,_/ \__,_/ \___/  /_/  /_//_/ \___//____/

   ========================================================================== */


#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <mqueue.h>
#include <stdarg.h>
#include <stdio.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <time.h>

#include "psmq-common.h"
#include "psmq.h"
#include "valid.h"


/* ==========================================================================
                  _                __           ____
    ____   _____ (_)_   __ ____ _ / /_ ___     / __/__  __ ____   _____ _____
   / __ \ / ___// /| | / // __ `// __// _ \   / /_ / / / // __ \ / ___// ___/
  / /_/ // /   / / | |/ // /_/ // /_ /  __/  / __// /_/ // / / // /__ (__  )
 / .___//_/   /_/  |___/ \__,_/ \__/ \___/  /_/   \__,_//_/ /_/ \___//____/
/_/
   ========================================================================== */


/* ==========================================================================
    Same as psmq_publish, but also accepts psmq_msg.ctrl part of message, to
    be able to send custom commands. Usefull only as internal usage.
    Exported externally because tests use this function, but its usage won't
    be documented and it is not guaranteed to have stable API/ABI.
   ========================================================================== */


int psmq_publish_msg
(
	struct psmq     *psmq,     /* psmq object */
	char             cmd,      /* message command */
	unsigned char    data,     /* data for the control part of message */
	const char      *topic,    /* topic of message to be sent */
	const void      *payload,  /* payload of message to be sent */
	size_t           paylen,   /* length of payload buffer */
	unsigned int     prio      /* message priority */
)
{
	struct psmq_msg  pub;      /* buffer used to send out data to broker */
	/*~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~*/


	VALID(EINVAL, psmq);
	VALID(EBADF, psmq->qpub != (mqd_t)-1);
	VALID(EBADF, psmq->qsub != psmq->qpub);

	memset(&pub, 0x00, sizeof(pub));
	pub.ctrl.cmd = cmd;
	pub.ctrl.data = data;

	if (topic)
		strcpy(pub.data, topic);

	/* payload may be NULL and it's ok, so set
	 * payload only if it was set */
	if (payload)
	{
		memcpy(pub.data + (topic ? strlen(topic) + 1 : 0), payload, paylen);
		pub.paylen = paylen;
	}

	return mq_send(psmq->qpub, (char *)&pub, psmq_real_msg_size(pub), prio);
}


/* ==========================================================================
                       __     __ _          ____
        ____   __  __ / /_   / /(_)_____   / __/__  __ ____   _____ _____
       / __ \ / / / // __ \ / // // ___/  / /_ / / / // __ \ / ___// ___/
      / /_/ // /_/ // /_/ // // // /__   / __// /_/ // / / // /__ (__  )
     / .___/ \__,_//_.___//_//_/ \___/  /_/   \__,_//_/ /_/ \___//____/
    /_/
   ========================================================================== */


/* ==========================================================================
    Publishes message on 'topic' with 'payload' of size 'paylen' with 'prio'
    priority to broker defined in 'psmq'. This is used only to publish
    real (non-control) messages.

    Returns 0 on success or -1 on errors

    errno:
            EINVAL      psmq is invalid (null)
            EINVAL      topic is invalid (null)
            EBADF       psmq was not properly initialized
            EBADMSG     topic does not start from '/' character
            ENOBUFS     topic and/or payload are to big to fit into buffers
   ========================================================================== */


int psmq_publish
(
	struct psmq     *psmq,     /* psmq object */
	const char      *topic,    /* topic of message to be sent */
	const void      *payload,  /* payload of message to be sent */
	size_t           paylen,   /* length of payload buffer */
	unsigned int     prio      /* message priority */
)
{
	struct psmq_msg  pub;      /* buffer used to send out data to broker */
	/*~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~*/


	VALID(EINVAL, psmq);
	VALID(EINVAL, topic);
	VALID(ENOBUFS, strlen(topic) + 1 + paylen <= sizeof(pub.data));
	VALID(EBADMSG, topic[0] == '/');

	return psmq_publish_msg(psmq, PSMQ_CTRL_CMD_PUBLISH, psmq->fd,
			topic, payload, paylen, prio);
}


/* ==========================================================================
    Waits for message to be received from broker. Message is stored in msg
    buffer provided by caller. Function will receive both subscribed message
    as well as control messages (like subscribe confirmation). Function waits
    for message forever or until it is interrupted by signal.

    Returns 0 on success or -1 on errors

    errno:
            EINVAL      psmq is invalid (null)
            EBADF       psmq was not properly initialized
            EINTR       The call was interrupted by a signal handler
            EAGAIN      The queue was empty, and the O_NONBLOCK flag was set
                        for the message queue
   ========================================================================== */


int psmq_receive
(
	struct psmq      *psmq,  /* psmq object */
	struct psmq_msg  *msg,   /* received message */
	unsigned int     *prio   /* message priority */
)
{
	VALID(EINVAL, psmq);
	VALID(EINVAL, msg);
	VALID(EBADF, psmq->qpub != (mqd_t)-1);
	VALID(EBADF, psmq->qsub != psmq->qpub);

	/* return -1 on error otherwise return 0 */
	return -(mq_receive(psmq->qsub, (char *)msg, sizeof(*msg), prio) == -1);
}


/* ==========================================================================
    Same as psmq_receive(), but blocks only for ammount of time specified
    by absolute timespec 'tp', unlike psmq_receive() which will block until
    message is received (or is interrupted by signal).

    When timeout occurs, msg is not modified.
    When tp is 0, function returns immediately.

    Returns 0 on success or -1 on errors

    errno:
            EINVAL      psmq is invalid (null)
            EINVAL      msg is invalid (null)
            EINVAL      tp is invalid (null)
            EINVAL      tp is invalid (tv_sec less than zero, or
                        tv_nsec less than zero or grater than 1000 mil)
            EBADF       psmq was not properly initialized
            EINTR       The call was interrupted by a signal handler
            ETIMEDOUT   call timed out before message could be received
            EAGAIN      The queue was empty, and the O_NONBLOCK flag was set
                        for the message queue

    notes:
            On qnx (up until 6.4.0 at least) when timeout occurs,
            mq_timedreceveice will return EINTR instead of ETIMEDOUT
   ========================================================================== */


int psmq_timedreceive
(
	struct psmq      *psmq,  /* psmq object */
	struct psmq_msg  *msg,   /* received message */
	unsigned int     *prio,  /* message priority */
	struct timespec  *tp     /* absolute time to wait for timeout */
)
{
	VALID(EINVAL, psmq);
	VALID(EINVAL, msg);
	VALID(EINVAL, tp);
	VALID(EBADF, psmq->qpub != (mqd_t)-1);
	VALID(EBADF, psmq->qsub != psmq->qpub);

	return -(mq_timedreceive(psmq->qsub, (char *)msg,
				sizeof(*msg), prio, tp) == -1);
}


/* ==========================================================================
    Same as psmq_timedreceive() but accepts 'milisecond' time that shall
    pass before timeout should occur instead of absolute timespec.

    When timeout occurs, msg is not modified.
    When ms is 0, function returns immediately.

    Returns 0 on success or -1 on errors

    errno:
            EINVAL      psmq is invalid (null)
            EINVAL      msg is invalid (null)
            EBADF       psmq was not properly initialized
            EINTR       The call was interrupted by a signal handler
            ETIMEDOUT   call timed out before message could be received
            EAGAIN      The queue was empty, and the O_NONBLOCK flag was set
                        for the message queue

    notes:
            On qnx (up until 6.4.0 at least) when timeout occurs,
            mq_timedreceveice will return EINTR instead of ETIMEDOUT
   ========================================================================== */


int psmq_timedreceive_ms
(
	struct psmq      *psmq,  /* psmq object */
	struct psmq_msg  *msg,   /* received message */
	unsigned int     *prio,  /* message priority */
	size_t            ms     /* ms to wait until timeout occurs */
)
{
	struct timespec   tp;    /* absolute time to wait for timeout */
	/*~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~*/

	VALID(EINVAL, psmq);
	VALID(EINVAL, msg);
	VALID(EBADF, psmq->qpub != (mqd_t)-1);
	VALID(EBADF, psmq->qsub != psmq->qpub);

	psmq_ms_to_tp(ms, &tp);
	return -(mq_timedreceive(psmq->qsub, (char *)msg,
				sizeof(*msg), prio, &tp) == -1);
}


/* ==========================================================================
    Create mqueue for receiving and opens connection to broker

    Return 0 when broker sends back connection confirmation or -1 when error
    occured.

    errno:
            EINVAL      psmq is invalid (null)
            EINVAL      brokername is invalid (null)
            EINVAL      brokername does not start with '/'
            EINVAL      mqname is invalid (null)
            EINVAL      mqname does not start with '/'
            EINVAL      maxmsg is 0 or less
            ENAMETOOLONG  mqname is bigger than PSMQ_MSG_MAX and thus cannot
                        be send to broker
            EACCES      Either brokername or mqname can't be opened due to
                        permissions
            EACCES      mqname or brokername contains more than one '/'
            ENFILE      system-wide limit on opened files has been reached
            EMFILE      per-process limit on opened files has been reached
            ENOENT      brokername does not exist
            ENOENT      mqname or brokername was just "/" and nothing else
            ENOMEM      not enough memory in the system
            ENOSPC      not enough space for the creation of a new queue
   ========================================================================== */


int psmq_init
(
	struct psmq     *psmq,        /* psmq object to initialize */
	const char      *brokername,  /* name of the broker to connect to */
	const char      *mqname,      /* name of the reciving queue to create */
	int              maxmsg       /* max queued messages in mqname */
)
{
	struct mq_attr   mqa;         /* mqueue attributes */
	int              ack;         /* ACK from the broker after open */
	size_t           mqnamelen;   /* length of mqname string */
	/*~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~*/


	VALID(EINVAL, psmq);
	VALID(EINVAL, brokername);
	VALID(EINVAL, brokername[0] == '/');
	VALID(EINVAL, mqname);
	VALID(EINVAL, mqname[0] == '/');
	VALID(EINVAL, maxmsg > 0);
	mqnamelen = strlen(mqname);
	VALID(ENAMETOOLONG, mqnamelen < PSMQ_MSG_MAX);


	memset(psmq, 0x00, sizeof(struct psmq));
	memset(&mqa, 0x00, sizeof(mqa));

	mqa.mq_msgsize = sizeof(struct psmq_msg);
	mqa.mq_maxmsg = maxmsg;
	psmq->qsub = (mqd_t)-1;
	psmq->qpub = (mqd_t)-1;

	/* if staled queue exist, remove it, so it
	 * doesn't do weird stuff */
	mq_unlink(mqname);

	/* open subscribe queue, where we will receive
	 * data we subscribed to, and at the beginning
	 * is used to report error from broker to client */
	psmq->qsub = mq_open(mqname, O_RDONLY | O_CREAT, 0600, &mqa);
	if (psmq->qsub == (mqd_t)-1)
		return -1;

	/* open publish queue, this will be used to
	 * subscribe to topics at the start, and
	 * later this will be used to publish data on
	 * given topic */
	psmq->qpub = mq_open(brokername, O_WRONLY);
	if (psmq->qpub == (mqd_t)-1)
	{
		mq_close(psmq->qsub);
		psmq->qsub = (mqd_t)-1;
		return -1;
	}

	/* both queues have been created, that means
	 * we have enough memory to operate, now we
	 * need to register to broker, so it knows
	 * where to send messages */
	if (psmq_publish_msg(psmq, PSMQ_CTRL_CMD_OPEN, 0, mqname, NULL, 0, 0) != 0)
		goto error;

	/* check response from the broker on subscribe
	 * queue to check if broker managed to
	 * allocate memory for us and open queue on
	 * his side. Read from broker until we read
	 * open reply */
	for (;;)
	{
		struct psmq_msg  msg;  /* received psmq message */
		/*~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~*/


		memset(&msg, 0x00, sizeof(msg));
		if ((ack = psmq_receive(psmq, &msg, NULL)) == -1)
			break;

		/* open response is suppose to send back
		 * message with open command, if that is
		 * not the case, this could mean that
		 * there may be some old data in queue.
		 * This can happend if we open queue from
		 * previously crashed session. Open
		 * response is first message we ever
		 * receive, so if there is anything else,
		 * these messages do not belong to us and
		 * we can safely discard them. */
		if (msg.ctrl.cmd != PSMQ_CTRL_CMD_OPEN)
			continue;

		/* no space left for us on the broker */
		ack = msg.ctrl.data;
		if (msg.paylen == 0 && ack == ENOSPC)
			break;

		if (msg.paylen != 1)
		{
			/* we expected exactly 1 byte as fd,
			 * anything else is wrong */
			ack = EBADMSG;
			break;
		}

		ack = msg.ctrl.data;
		psmq->fd = msg.data[0];
		break;
	}

	/* broker will return either 0 or errno to
	 * indicate what went wrong on his side, if
	 * ack is 0, broker will send file descriptor
	 * to use when communicating with him. */
	if (ack != 0)
	{
		if (ack > 0)
			errno = ack;

		goto error;
	}

	/* ack received and fd is valid,
	 * we are victorious */
	return 0;

error:
	mq_close(psmq->qpub);
	mq_close(psmq->qsub);
	mq_unlink(mqname);
	psmq->qpub = (mqd_t)-1;
	psmq->qsub = (mqd_t)-1;
	return -1;
}


/* ==========================================================================
    Cleans up whatever has been allocate through the life cycle of 'psmq'.
    Also sends CLOSE command to broker, so it can cleanup and free space for
    another queue.

    Returns 0 on success or -1 on errors

    errno:
            EINVAL      psmq is invalid (null)
            EBADF       psmq has not been initialized
   ========================================================================== */


int psmq_cleanup
(
	struct psmq  *psmq  /* psmq object to cleanup */
)
{
	VALID(EINVAL, psmq);
	VALID(EBADF, psmq->qsub != psmq->qpub);

	/* send close() to the broker, we don't care
	 * if it succed or not, we close our booth and
	 * nothing can stop us from doing it */
	psmq_publish_msg(psmq, PSMQ_CTRL_CMD_CLOSE, psmq->fd, NULL, NULL, 0, 0);
	mq_close(psmq->qpub);
	mq_close(psmq->qsub);
	psmq->qpub = (mqd_t) -1;
	psmq->qsub = (mqd_t) -1;
	return 0;
}


/* ==========================================================================
    Subscribes to 'psmq' broker on 'topic'. After this, broker will send
    back ACK message with information whether subscribtion was success or
    not. You can check this by calling psmq_receive() after this function.

    Returns 0 on success or -1 on error

    errno:
            EINVAL      psmq is invalid (null)
            EINVAL      topic is invalid (null)
            EINVAL      topic is empty ("")
            EBADMSG     topic contains only "/" and nothing else
            EBADF       psmq has not been initialized
            ENOBUFS     topic is too long
   ========================================================================== */


int psmq_subscribe
(
	struct psmq  *psmq,  /* psmq object */
	const char   *topic  /* topic to register to */
)
{
	VALID(EINVAL, psmq);
	VALID(EINVAL, topic);
	VALID(EINVAL, topic[0] != '\0');
	VALID(EBADMSG, topic[0] == '/');
	VALID(EBADF, psmq->qsub != (mqd_t)-1);
	VALID(EBADF, psmq->qsub != psmq->qpub);
	VALID(ENOBUFS, strlen(topic) + 1 <= PSMQ_MSG_MAX);

	/* send subscribe request to the server */
	return psmq_publish_msg(psmq, PSMQ_CTRL_CMD_SUBSCRIBE,
			psmq->fd, topic, NULL, 0, 0);
}


/* ==========================================================================
    Unsubscribes from 'topic'. After call to this function, broker will send
    back ACK reply with information whether command was success or not. You
    can check this by calling psmq_receive() after this.

    Returns 0 on success or -1 on error

    errno:
            EINVAL      psmq is invalid (null)
            EINVAL      topic is invalid (null)
            EINVAL      topic is empty ("")
            EBADMSG     topic contains only "/" and nothing else
            EBADF       psmq has not been initialized
            ENOBUFS     topic is too long
   ========================================================================== */


int psmq_unsubscribe
(
	struct psmq  *psmq,  /* psmq object */
	const char   *topic  /* topic to register to */
)
{
	VALID(EINVAL, psmq);
	VALID(EINVAL, topic);
	VALID(EINVAL, topic[0] != '\0');
	VALID(EBADMSG, topic[0] == '/');
	VALID(EBADF, psmq->qsub != (mqd_t)-1);
	VALID(EBADF, psmq->qsub != psmq->qpub);
	VALID(ENOBUFS, strlen(topic) + 1 <= PSMQ_MSG_MAX);

	/* send subscribe request to the server */
	return psmq_publish_msg(psmq, PSMQ_CTRL_CMD_UNSUBSCRIBE,
			psmq->fd, topic, NULL, 0, 0);
}


/* ==========================================================================
    Sends ioctl to alter how broker interfacts with client.

    Returns 0 on success or -1 on error

    errno:
            EINVAL      psmq is invalid (null)
            EINVAL      req is not a valid ioctl request
            EBADF       psmq has not been initialized
   ========================================================================== */


int psmq_ioctl
(
	struct psmq   *psmq,        /* psmq object */
	int            req,         /* ioctl request to make */
	...                         /* variadic data for req */)
{
	int            val_int;     /* ap value treated as integer */
	unsigned short val_ushort;  /* ap value treated as unsigned short */
	va_list        ap;          /* variadic argument */
	char           buf[32];     /* buffer with ioctl data to send to broker */
	/*~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~*/


	VALID(EINVAL, psmq);
	VALID(EBADF, psmq->qsub != (mqd_t)-1);
	VALID(EBADF, psmq->qsub != psmq->qpub);

	va_start(ap, req);
	memset(buf, 0x00, sizeof(buf));
	buf[0] = req;

	switch (req)
	{
	/* ==================================================================
	                       __        __   _                        __
	      ____ ___  ___   / /__ __  / /_ (_)__ _  ___  ___  __ __ / /_
	     / __// -_)/ _ \ / // // / / __// //  ' \/ -_)/ _ \/ // // __/
	    /_/   \__// .__//_/ \_, /  \__//_//_/_/_/\__/ \___/\_,_/ \__/
	             /_/       /___/
	   ================================================================== */

	case PSMQ_IOCTL_REPLY_TIMEOUT:
		/* va_arg cannot accept short as type, so we
		 * need this intermediate step to extract int
		 * first, and then assign it to ushort */
		val_int = va_arg(ap, int);
		VALID(EINVAL, val_int <= USHRT_MAX);

		val_ushort = val_int;
		memcpy(buf + 1, &val_ushort, sizeof(val_ushort));
		return psmq_publish_msg(psmq, PSMQ_CTRL_CMD_IOCTL, psmq->fd, NULL,
				buf, 1 + sizeof(val_ushort), 0);

	default:
		errno = EINVAL;
		return -1;
	}
}


/* ==========================================================================
    Sets time in ms, how long broker will wait for us to take some messages
    from mqueue, in case our mqueue is full.

    Note: if you set this to high value, and you are notoriously slow to get
    messages from the queue, this may severy impact performance of broker,
    as broker is single threaded by design, and will hang when it has to
    wait for client.
   ========================================================================== */


int psmq_ioctl_reply_timeout
(
	struct psmq   *psmq,  /* psmq object */
	unsigned short val    /* timeout value in ms */
)
{
	return psmq_ioctl(psmq, PSMQ_IOCTL_REPLY_TIMEOUT, val);
}