/* *=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=* */ /* ** Copyright UCAR (c) 1990 - 2016 */ /* ** University Corporation for Atmospheric Research (UCAR) */ /* ** National Center for Atmospheric Research (NCAR) */ /* ** Boulder, Colorado, USA */ /* ** BSD licence applies - redistribution and use in source and binary */ /* ** forms, with or without modification, are permitted provided that */ /* ** the following conditions are met: */ /* ** 1) If the software is modified to produce derivative works, */ /* ** such modified software should be clearly marked, so as not */ /* ** to confuse it with the version available from UCAR. */ /* ** 2) Redistributions of source code must retain the above copyright */ /* ** notice, this list of conditions and the following disclaimer. */ /* ** 3) Redistributions in binary form must reproduce the above copyright */ /* ** notice, this list of conditions and the following disclaimer in the */ /* ** documentation and/or other materials provided with the distribution. */ /* ** 4) Neither the name of UCAR nor the names of its contributors, */ /* ** if any, may be used to endorse or promote products derived from */ /* ** this software without specific prior written permission. */ /* ** DISCLAIMER: THIS SOFTWARE IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS */ /* ** OR IMPLIED WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED */ /* ** WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. */ /* *=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=* */ /****************************************************************** * mbq.c * * Message buffer queue - MBQ module * * Mike Dixon, RAP, NCAR, P.O.Box 3000, Boulder, CO, 80303 * * Some of this code was copied from owws_mb. * * March 1997 * ******************************************************************/ /************************************************************************ The MBQ module is a wrapper around MB. It allows the user to write and then read them without the reader having prior knowldge about their size. This is accomplished by sending 2 BM messages for every user message. The first is a header, MBQ_header_t, which stores the message size. The second message is the user message itself. On the reading side the header, which is of known length, is read first. Since this holds the size of the following message the reader can allocate the required memory for the body of the message, which is aquired in a second read. The header has two fields which the user can load up to indentify the type of message. These are the 'type' and 'subtype' fields. They are not used by the MBQ module itself, but are passed through for use by the reading program. The error messages generated by MB are negative integers. Where appropriate these errors are passed up to the user. Refer to mb.doc for details on decoding these errors. *************************************************************************/ #include #include #include #include #include #include #include #include #include #include static void check_init(MBQ_handle_t *handle) { assert(handle->magic_cookie == MBQ_MAGIC); } /******************* * MBQ_init_handle() * * Initialize MBQ handle * * This must be done before using the handle for any * other function. */ void MBQ_init_handle(MBQ_handle_t *handle, char *prog_name, int debug) { MEM_zero(*handle); handle->prog_name = umalloc(strlen(prog_name) + 1); strcpy(handle->prog_name, prog_name); handle->debug = debug; handle->latest_id_read = -1; handle->magic_cookie = MBQ_MAGIC; } /*************** * MBQ_create() * * This function creates a new RDWR message buffer with the given * max buffer size and and maximum number of messages. * * Parameters: * handle - MBQ handle * mbq_path - name of the message buffer which should already be created. * max_buf_size - the maximum size of the buffer * max_n_msgs - the maximum number of MB messages. * * Return value: * 0 on success, * On error, negative status returned from the _MB_open() call. * Refer to mb.doc. */ int MBQ_create (MBQ_handle_t *handle, char *mbq_path, int max_buf_size, int max_n_msgs) { int status; int maxn; int msg_size; int u_mask; int permissions; _MB_status mb_st; check_init(handle); /* * store file path */ if (handle->mbq_path == NULL) { handle->mbq_path = umalloc(strlen(mbq_path) + 1); } else { handle->mbq_path = urealloc(handle->mbq_path, strlen(mbq_path) + 1); } strcpy(handle->mbq_path, mbq_path); /* * compute params for _MB_open * * We need 2 * max_n_msgs, because we read/write a header for * each message. * * Our mean msg size is computed with this taken into account. */ maxn = max_n_msgs * 2; msg_size = max_buf_size / maxn; /* * set the file permissions based on the umask */ u_mask = umask(0); /* query the umask */ umask(u_mask); /* reset it */ permissions = ~u_mask & 0666; /* * Create an MB for INIT and SEQUENTIAL */ status = _MB_open (mbq_path, MB_INIT | MB_RDWR | MB_SEQUENTIAL, permissions, msg_size, maxn); handle->mbid = status; if (status < 0) { /* * Failed to open MB for read/write. */ fprintf(stderr, "ERROR - %s:MBQ_create\n", handle->prog_name); fprintf(stderr, "Cannot create MBQ '%s'\n", mbq_path); return (status); } else { if (handle->debug) { fprintf(stderr, "%s:MBQ_create\n", handle->prog_name); fprintf(stderr, "Successfully created '%s'\n", handle->mbq_path); } } /* * get size */ status = _MB_stat (handle->mbid, &mb_st); if (status != MB_SUCCESS) { return (status); } handle->mb_size = mb_st.size; handle->maxn_msgs = mb_st.maxn_msgs; if (handle->debug) { fprintf(stderr, "mb size: %d\n", handle->mb_size); fprintf(stderr, "maxn msgs: %d\n", handle->maxn_msgs); } return (0); } /******************** * MBQ_open() * * This function opens an MB RDONLY. * * Parameters: * handle - MBQ handle * mbq_path - name of the message buffer which should already be created. * * Return value: * 0 on success, * On error, negative status returned from the MB_open() call. * Refer to mb.doc. */ int MBQ_open (MBQ_handle_t *handle, char *mbq_path) { int status; check_init(handle); /* * store file path */ if (handle->mbq_path == NULL) { handle->mbq_path = umalloc(strlen(mbq_path) + 1); } else { handle->mbq_path = urealloc(handle->mbq_path, strlen(mbq_path) + 1); } strcpy(handle->mbq_path, mbq_path); /* * Open the MB */ status = _MB_open (mbq_path, 0, 0, 0, 0); handle->mbid = status; if (status < 0) { /* * Failed to open MB for readonly */ fprintf(stderr, "ERROR - %s:MBQ_opsn\n", handle->prog_name); fprintf(stderr, "Cannot open MBQ '%s'\n", mbq_path); return (status); } else { if (handle->debug) { fprintf(stderr, "%s:MBQ_open\n", handle->prog_name); fprintf(stderr, "Successfully opened '%s'\n", handle->mbq_path); } return (0); } } /******************** * MBQ_write() * * This function writes a message to an MBQ. * * This actually writes two messages: * * 1. MBQ_header_t struct filled out appropriately * 2. message itself * * Parameters: * handle - MBQ handle * message - message array * mess_len - message length in bytes * mess_type - user-defined and used message type * mess_subtype - user-defined and used message subtype * * Note: the type and subtype are not used by the MBQ module but * are passed through so the reading routine can determine * something about the message from the header. * * Return value: * 0 on success, * On error, negative status returned from the _MB_write() call. * Refer to mb.doc. * */ int MBQ_write (MBQ_handle_t *handle, void *message, int mess_len, int mess_type, int mess_subtype) { int status; check_init(handle); /* * set and encode the header */ handle->hdr.magic_cookie = MBQ_MAGIC; handle->hdr.len = mess_len; handle->hdr.type = mess_type; handle->hdr.subtype = mess_subtype; BE_from_array_32(&handle->hdr, sizeof(MBQ_header_t)); /* * send the header */ status = _MB_write(handle->mbid, (char *) &handle->hdr, sizeof(MBQ_header_t), MB_NEW_ID, NULL); if (status != sizeof(MBQ_header_t)) { fprintf(stderr, "ERROR - %s:MBQ_write\n", handle->prog_name); fprintf(stderr, "Cannot write header in MBQ '%s'\n", handle->mbq_path); return (status); } /* * send the body */ status = _MB_write(handle->mbid, (char *) message, mess_len, MB_NEW_ID, NULL); if (status != mess_len) { fprintf(stderr, "ERROR - %s:MBQ_write\n", handle->prog_name); fprintf(stderr, "Cannot write %d bytes of message in MBQ '%s'\n", mess_len, handle->mbq_path); return (status); } /* * success */ if (handle->debug) { fprintf(stderr, "%s:MBQ_write\n", handle->prog_name); fprintf(stderr, "Successfully wrote %d bytes to '%s'\n", mess_len, handle->mbq_path); } return (0); } /************************ * MBQ_message_waiting() * * Check if a message is waiting in the queue. * * Returns: * 1 if YES * 0 if NO * On error, negative status returned from _MB_stat() call * Refer to mb.doc. */ int MBQ_message_waiting (MBQ_handle_t *handle) { int status; _MB_status mb_st; check_init(handle); /* * get status */ status = _MB_stat (handle->mbid, &mb_st); if (status != MB_SUCCESS) { return (status); } if (mb_st.n_msgs == 0) { return (0); } if (mb_st.latest_id == handle->latest_id_read) { return (0); } else { return (1); } } /******************** * MBQ_read() * * This function reads a message from an MBQ * * This actually reads two messages: * * 1. MBQ_header_t struct which gives data on the message. * 2. message itself * * Parameters: * handle - MBQ handle * * Return value: * nbytes_read on success; * 0 if no message is available; * On error, negative status returned from the _MB_read() call. * Refer to mb.doc. */ int MBQ_read (MBQ_handle_t *handle) { int correct_magic = FALSE; int status; int mess_id; check_init(handle); /* * read the header */ while (!correct_magic) { status = _MB_read(handle->mbid, (char *) &handle->hdr, sizeof(MBQ_header_t), MB_CURRENT, (unsigned int *) &mess_id); if (status != sizeof(MBQ_header_t)) { if (status == MB_NOT_FOUND) { /* * no message */ return (0); } else if (status == MB_TOO_SMALL) { /* * buffer too small for hdr - skip to next one */ _MB_seek(handle->mbid, 1, MB_CURRENT); continue; } else { fprintf(stderr, "ERROR - MBQ_read\n"); fprintf(stderr, "Cannot read header in MBQ '%s'\n", handle->mbq_path); return (status); } } /* if (status != sizeof(MBQ_header_t)) */ handle->latest_id_read = mess_id; /* * decode the header */ BE_to_array_32(&handle->hdr, sizeof(MBQ_header_t)); if (handle->hdr.magic_cookie == MBQ_MAGIC) { correct_magic = TRUE; } } /* while (!correct_magic) */ /* * allocate memory for message */ if (handle->msg == NULL) { handle->msg = umalloc(handle->hdr.len); } else { if (handle->hdr.len > handle->n_msg_alloc) { handle->msg = urealloc(handle->msg, handle->hdr.len); handle->n_msg_alloc = handle->hdr.len; } } /* * read the message */ status = _MB_read(handle->mbid, handle->msg, handle->hdr.len, MB_CURRENT, (unsigned int *) &mess_id); if (status != handle->hdr.len) { if (status == MB_NOT_FOUND) { /* * no message */ return (0); } else { fprintf(stderr, "ERROR - MBQ_read\n"); fprintf(stderr, "Cannot read message from MBQ '%s'\n", handle->mbq_path); return (status); } } /* if (status != handle->hdr.len) */ handle->latest_id_read = mess_id; /* * success - return nbytes read */ if (handle->debug) { fprintf(stderr, "%s:MBQ_read\n", handle->prog_name); fprintf(stderr, "Successfully read %d bytes from '%s'\n", handle->hdr.len, handle->mbq_path); } return (handle->hdr.len); } /******************** * MBQ_read_block() * * This function reads a message from an MBQ - it blocks until * a message is received. * * This actually reads two messages: * * 1. MBQ_header_t struct which gives data on the message. * 2. message itself * * Parameters: * handle - MBQ handle * msecs_sleep - number of millisecs to sleep between reads * while waiting for a message to arrive. * * Return value: * nbytes_read on success; * On error, negative status returned from the _MB_read() call. * Refer to mb.doc. */ int MBQ_read_block (MBQ_handle_t *handle, int msecs_sleep) { int usecs_sleep; int done = FALSE; int status; check_init(handle); usecs_sleep = msecs_sleep * 1000; while (!done) { status = MBQ_message_waiting (handle); if (status < 0) { /* * error */ return (status); } else if (status == 0) { /* * no message - sleep */ uusleep(usecs_sleep); } else { /* * message to be read */ status = MBQ_read(handle); done = TRUE; } } /* while (!done) */ return (status); } /******************** * MBQ_seek_end() * * Seek to the end of the MBQ * * Return value: * 0 on success; * On error, negative status returned from the _MB_seek() call. * Refer to mb.doc. */ int MBQ_seek_end (MBQ_handle_t *handle) { int status; _MB_status mb_st; check_init(handle); /* * get status */ status = _MB_stat (handle->mbid, &mb_st); if (status != MB_SUCCESS) { return (status); } /* * set latest message read to latest_id in stat */ handle->latest_id_read = mb_st.latest_id; /* * Seek to latest_id. */ status = _MB_seek (handle->mbid, 0, MB_LATEST); if (status < 0) { return (status); } else { return (0); } } /******************** * MBQ_close() * * Close the MBQ * */ void MBQ_close (MBQ_handle_t *handle) { check_init(handle); /* * Close the MB */ if (_MB_close (handle->mbid) != MB_SUCCESS) { fprintf(stderr, "ERROR closing MBQ '%s'\n", handle->mbq_path); } } /******************** * MBQ_free() * * Free memory assocaited with the MBQ * */ void MBQ_free (MBQ_handle_t *handle) { check_init(handle); if (handle->prog_name != NULL) { ufree (handle->prog_name); } if (handle->mbq_path != NULL) { ufree (handle->mbq_path); } if (handle->msg != NULL) { ufree (handle->msg); } MEM_zero(*handle); }