aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--man/psmq-sub.157
-rw-r--r--src/psmq-pub.c11
-rw-r--r--src/psmq-sub.c62
-rw-r--r--src/psmqd.c7
-rwxr-xr-xtst/psmq-progs.sh32
5 files changed, 139 insertions, 30 deletions
diff --git a/man/psmq-sub.1 b/man/psmq-sub.1
index da18f86..0298593 100644
--- a/man/psmq-sub.1
+++ b/man/psmq-sub.1
@@ -66,6 +66,63 @@ to a file, where logs from incoming messages shall be stored.
If file cannot be opened, program will refuse to start.
Optional argument, if not passed, messages will be printed to
.BR stdout .
+.PP
+Data will be printed in two ways depending on type of data received.
+When received data is simple ascii string, payload will be printed
+in the same line as info, thus one line per received message will
+be received.
+When data contains
+.B non-printable
+character, output will be hexdump like.
+.PP
+Line is printed in format:
+.br
+.BI p: n
+.I topic
+.BI data( llll ): payload
+.TP
+.BI p: n
+is a priority number
+.TP
+.I topic
+is a topic message was published on
+.TP
+.BI ( llll )
+number of bytes in payload
+.TP
+.I payload
+received payload
+.PP
+Check following example to better understand the format.
+Output is a snippet from a source code, sixth line is printed in binary
+since it contains utf8 character which encoding contain non-printable
+charater.
+Rest of lines are printed in single line as a string.
+.PP
+.nf
+ p:0 /source data( 37): switch (psmqd_cfg_init(argc, argv))
+ p:0 /source data( 3): {
+ p:0 /source data( 9): case 0:
+ p:0 /source data( 37): /* no errors in parsing arguments,
+ p:0 /source data( 32): * continue program execution
+ p:0 /source data(40)
+ 0x0000 09 09 20 2a 20 75 74 66 2d 38 20 61 74 74 61 63 .. * utf-8 attac
+ 0x0010 6b 2c 20 68 65 72 65 20 69 74 20 63 6f 6d 65 73 k, here it comes
+ 0x0020 21 20 c5 82 20 2a 2f 00 ! .. */.
+ p:0 /source data( 9): break;
+ p:0 /source data( 1):
+ p:0 /source data( 10): case -2:
+ p:0 /source data( 10): case -3:
+ p:0 /source data( 39): /* help or version was printed, exit
+ p:0 /source data( 30): * program without error */
+ p:0 /source data( 12): return 0;
+ p:0 /source data( 1):
+ p:0 /source data( 10): default:
+ p:0 /source data( 32): /* error occured when parsing
+ p:0 /source data( 23): * arguments, die */
+ p:0 /source data( 12): return 1;
+ p:0 /source data( 3): }
+fi
.SH EXAMPLES
.TP
Listen to single topic
diff --git a/src/psmq-pub.c b/src/psmq-pub.c
index 1eee94b..679a8f2 100644
--- a/src/psmq-pub.c
+++ b/src/psmq-pub.c
@@ -123,6 +123,7 @@ static void send_stdin
{
unsigned int topiclen; /* length of topic string */
unsigned int linemax; /* max allowed length of line buf */
+ unsigned int paylen; /* length of payload to send */
char line[PSMQ_MSG_MAX]; /* single line read from stdin */
/*~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~*/
@@ -178,10 +179,12 @@ static void send_stdin
return;
}
- /* now we have full line in buffer, ship
- * it. -1 is just so publish don't send
- * newline character */
- if (publish(psmq, topic, line, strlen(line) - 1, prio) != 0)
+ /* now we have full line in buffer, ship it. */
+ paylen = strlen(line);
+ /* remove new line character from string */
+ line[paylen - 1] = '\0';
+ /* send valid string with null terminator */
+ if (publish(psmq, topic, line, paylen, prio) != 0)
return;
}
}
diff --git a/src/psmq-sub.c b/src/psmq-sub.c
index 6ecd2a5..e068de7 100644
--- a/src/psmq-sub.c
+++ b/src/psmq-sub.c
@@ -27,6 +27,7 @@
# include "psmq-config.h"
#endif
+#include <ctype.h>
#include <embedlog.h>
#include <errno.h>
#include <stdlib.h>
@@ -87,6 +88,46 @@ static void sigint_handler
/* ==========================================================================
+ Check whether payload is binary data or not. It's treated as binary when
+ at least one byte is non-printable character.
+ ========================================================================== */
+
+
+static int is_payload_binary
+(
+ unsigned char *payload,
+ unsigned short paylen
+)
+{
+ unsigned short i;
+ /*~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~*/
+
+
+ /* treat no data as binary data */
+ if (paylen == 0)
+ return 1;
+
+ /* check all but the very last character */
+ for (i = 0; i != paylen - 1; ++i)
+ if (!isprint(payload[i]) && !isspace(payload[i]))
+ return 1;
+
+ /* if last string is null, we are dealing
+ * with proper string, data NOT binary */
+ if (payload[i] == '\0')
+ return 0;
+
+ /* something else got, even if this is
+ * printable character we cannot use it
+ * to print it as string as there is no
+ * null terminator. And anyway, string
+ * without null terminator is not a
+ * string. */
+ return 1;
+}
+
+
+/* ==========================================================================
Called by us when we receive message from broker.
========================================================================== */
@@ -115,11 +156,23 @@ static int on_receive
errno = msg->ctrl.data;
return -1;
+ case PSMQ_CTRL_CMD_IOCTL:
+ el_oprint(OELN, "reply timeout set 100");
+ return 0;
+
case PSMQ_CTRL_CMD_PUBLISH:
- el_oprint(ELN, &psmqs_out, "topic: %s, priority: %u, paylen: %hu%s",
- topic, prio, paylen, paylen ? ", payload:" : "");
- if (paylen)
+ if (is_payload_binary(payload, paylen))
+ {
+ el_oprint(ELN, &psmqs_out, "p:%u %s data(%hu)",
+ prio, topic, paylen);
el_opmemory(ELN, &psmqs_out, payload, paylen);
+ }
+ else
+ {
+ el_oprint(ELN, &psmqs_out, "p:%u %s data(%4hu): %s",
+ prio, topic, paylen, payload);
+ }
+
return 0;
default:
@@ -349,6 +402,9 @@ int psmq_sub_main
return 1;
}
+ if (psmq_ioctl(&psmq, PSMQ_IOCTL_REPLY_TIMEOUT, 100) != 0)
+ el_operror(OELW, "failed to set reply timeout, data might be lost");
+
el_oprint(OELN, "start receiving data");
while (run)
diff --git a/src/psmqd.c b/src/psmqd.c
index b59a813..f0411a6 100644
--- a/src/psmqd.c
+++ b/src/psmqd.c
@@ -105,7 +105,8 @@ int psmqd_main
{
case 0:
/* no errors in parsing arguments,
- * continue program execution */
+ * continue program execution
+ * utf-8 attack, here it comes! ł */
break;
case -2:
@@ -115,8 +116,8 @@ int psmqd_main
return 0;
default:
- /* error occured when parsing arguments,
- * die */
+ /* error occured when parsing
+ * arguments, die */
return 1;
}
diff --git a/tst/psmq-progs.sh b/tst/psmq-progs.sh
index 87765db..d4e3f6f 100755
--- a/tst/psmq-progs.sh
+++ b/tst/psmq-progs.sh
@@ -391,7 +391,7 @@ psmq_pub_from_stdin()
{
start_psmqs
echo "t" | ${psmqp_bin} -n${psmqp_name} -b${broker_name} -t/1
- mt_fail "psmq_grep "t.." \"${psmqs_stdout}\""
+ mt_fail "psmq_grep \"2): t\" \"${psmqs_stdout}\""
stop_psmqs
}
psmq_pub_from_stdin_max_line()
@@ -457,9 +457,8 @@ psmq_pub_with_prio()
start_psmqs
msg="m"
${psmqp_bin} -n${psmqp_name} -b${broker_name} -t/1 -m${msg} -p2
- mt_fail "psmq_grep \"topic: /1, priority: 2, paylen: 2, payload\" \
- \"${psmqs_stdout}\""
- mt_fail "psmq_grep \"${msg}.\" \"${psmqs_stdout}\""
+ mt_fail "psmq_grep \"p:2 /1 data(\" \"${psmqs_stdout}\""
+ mt_fail "psmq_grep \"2): m\" \"${psmqs_stdout}\""
stop_psmqs
}
psmq_pub_with_invalid_prio()
@@ -475,16 +474,15 @@ psmq_pub_from_stdin_with_prio()
{
start_psmqs
echo t | ${psmqp_bin} -n${psmqp_name} -b${broker_name} -t/1 -p2
- mt_fail "psmq_grep \"topic: /1, priority: 2, paylen: 3, payload\" \
- \"${psmqs_stdout}\""
- mt_fail "psmq_grep \"t..\" \"${psmqs_stdout}\""
+ mt_fail "psmq_grep \"p:2 /1 data(\" \"${psmqs_stdout}\""
+ mt_fail "psmq_grep \"2): t\" \"${psmqs_stdout}\""
stop_psmqs
}
psmq_pub_empty_message()
{
start_psmqs
${psmqp_bin} -n${psmqp_name} -b${broker_name} -t/1 -p2 -e
- mt_fail "psmq_grep \"topic: /1, priority: 2, paylen: 0\" \
+ mt_fail "psmq_grep \"p:2 /1 data(0)\" \
\"${psmqs_stdout}\""
stop_psmqs
}
@@ -495,8 +493,7 @@ psmq_pub_binary_single()
count=$((psmq_msg_max - 1 - 3))
dd if=/dev/urandom of=$msg bs=1 count=${count} 2>/dev/null
cat $msg | ${psmqp_bin} -n${psmqp_name} -b${broker_name} -t/1 -p2 -B
- mt_fail "psmq_grep \"topic: /1, priority: 2, paylen: $count, \" \
- \"${psmqs_stdout}\""
+ mt_fail "psmq_grep \"p:2 /1 data($count)\" \"${psmqs_stdout}\""
rm $msg
stop_psmqs
}
@@ -507,8 +504,7 @@ psmq_pub_binary_max()
count=$((psmq_msg_max - 3))
dd if=/dev/urandom of=$msg bs=1 count=${count} 2>/dev/null
cat $msg | ${psmqp_bin} -n${psmqp_name} -b${broker_name} -t/1 -p2 -B
- mt_fail "psmq_grep \"topic: /1, priority: 2, paylen: $count, \" \
- \"${psmqs_stdout}\""
+ mt_fail "psmq_grep \"p:2 /1 data($count)\" \"${psmqs_stdout}\""
rm $msg
stop_psmqs
}
@@ -520,10 +516,8 @@ psmq_pub_binary_split()
splt_count=$((count - 1))
dd if=/dev/urandom of=$msg bs=1 count=${count} 2>/dev/null
cat $msg | ${psmqp_bin} -n${psmqp_name} -b${broker_name} -t/1 -p2 -B
- mt_fail "psmq_grep \"topic: /1, priority: 2, paylen: $splt_count\" \
- \"${psmqs_stdout}\""
- mt_fail "psmq_grep \"topic: /1, priority: 2, paylen: 1, \" \
- \"${psmqs_stdout}\""
+ mt_fail "psmq_grep \"p:2 /1 data($splt_count)\" \"${psmqs_stdout}\""
+ mt_fail "psmq_grep \"p:2 /1 data(1)\" \"${psmqs_stdout}\""
rm $msg
stop_psmqs
}
@@ -538,11 +532,9 @@ psmq_pub_binary_many_split()
cat $msg | ${psmqp_bin} -n${psmqp_name} -b${broker_name} -t/1 -p2 -B
# first check for last split, this will make sure that previous
# splits are in a log file as well
- mt_fail "psmq_grep \"topic: /1, priority: 2, paylen: 1, \" \
- \"${psmqs_stdout}\""
+ mt_fail "psmq_grep \"p:2 /1 data(1)\" \"${psmqs_stdout}\""
# now do custom grep and check if we have 4 full splits
- split_count=$(grep "topic: /1, priority: 2, paylen: $splt_count" \
- $psmqs_stdout | wc -l)
+ split_count=$(grep "p:2 /1 data($splt_count)" $psmqs_stdout | wc -l)
mt_fail "[ $split_count -eq 4 ]"
rm $msg
stop_psmqs