// *=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=* // ** Copyright UCAR (c) 1990 - 2016 // ** University Corporation for Atmospheric Research (UCAR) // ** National Center for Atmospheric Research (NCAR) // ** Boulder, Colorado, USA // ** BSD licence applies - redistribution and use in source and binary // ** forms, with or without modification, are permitted provided that // ** the following conditions are met: // ** 1) If the software is modified to produce derivative works, // ** such modified software should be clearly marked, so as not // ** to confuse it with the version available from UCAR. // ** 2) Redistributions of source code must retain the above copyright // ** notice, this list of conditions and the following disclaimer. // ** 3) Redistributions in binary form must reproduce the above copyright // ** notice, this list of conditions and the following disclaimer in the // ** documentation and/or other materials provided with the distribution. // ** 4) Neither the name of UCAR nor the names of its contributors, // ** if any, may be used to endorse or promote products derived from // ** this software without specific prior written permission. // ** DISCLAIMER: THIS SOFTWARE IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS // ** OR IMPLIED WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED // ** WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. // *=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=*=* /////////////////////////////////////////////////////////////// // DsSpdbMsg.cc // // DsSpdbMsg object // // Mike Dixon, RAP, NCAR, P.O.Box 3000, Boulder, CO, 80307-3000, USA // // March 1999 // /////////////////////////////////////////////////////////////// // // The DsSpdbMsg object provides the message protocol for // the Spdb service. // /////////////////////////////////////////////////////////////// #include #include #include #include #include #include #include #include using namespace std; //////////////////////////////////////////////////////////// // Constructor DsSpdbMsg::DsSpdbMsg(memModel_t mem_model /* = CopyMem */ ) : DsServerMsg(mem_model) { _appName = "unknown"; MEM_zero(_info); MEM_zero(_info2); MEM_zero(_horizLimits); MEM_zero(_vertLimits); _horizLimitsSet = false; _vertLimitsSet = false; clearData(); } ///////////////////////////// // Copy constructor // DsSpdbMsg::DsSpdbMsg(const DsSpdbMsg &rhs) { if (this != &rhs) { _copy(rhs); } } //////////////////////////////////////////////////////////// // destructor DsSpdbMsg::~DsSpdbMsg() { } ////////////////////////////// // Assignment // DsSpdbMsg &DsSpdbMsg::operator=(const DsSpdbMsg &rhs) { return _copy(rhs); } //////////////////////////////////////////////////////////// // clear data members void DsSpdbMsg::clearData() { _errorOccurred = false; _errorStr.clear(); _refBuf.free(); _auxBuf.free(); _dataBuf.free(); _timeList.clear(); } /////////////////////////////////////////////// // assemble a DS_SPDB_PUT message // // The chunk_data is passed in uncompressed. // The desired compression is indicated by data_buf_compression. void *DsSpdbMsg::assemblePut(const string &app_name, const string &url_str, int prod_id, const string &prod_label, mode_enum_t request_mode, Spdb::lead_time_storage_t lead_time_storage, int n_chunks, const MemBuf &ref_buf, const MemBuf &aux_buf, const MemBuf &data_buf, bool respect_zero_types, Spdb::compression_t data_buf_compression) { clearData(); // set header attributes setHdrAttr(DS_MESSAGE_TYPE_SPDB, DS_SPDB_PUT, request_mode); // indicate that this is the start of a series of put messages setCategory(StartPut); // url setUrlStr(url_str); // load up _info and _info2 _info.prod_id = prod_id; _info.n_chunks = n_chunks; _info.respect_zero_types = respect_zero_types; STRncopy(_info.prod_label, prod_label.c_str(), SPDB_LABEL_MAX); _info2.lead_time_storage = lead_time_storage; _info2.data_buf_compression = Spdb::COMPRESSION_NONE; // load up refs _refBuf.concat(ref_buf); _auxBuf.concat(aux_buf); // load up data buffer, handle compression as appropriate _dataBuf.concat(data_buf); compressDataBuf(data_buf_compression); // clear message parts clearParts(); // make local copies of relevant parts and byte-swap info_t info = _info; BEfromInfo(info); info2_t info2 = _info2; BEfromInfo2(info2); MemBuf refBuf(_refBuf); Spdb::chunk_refs_to_BE((Spdb::chunk_ref_t *) refBuf.getPtr(), n_chunks); MemBuf auxBuf(_auxBuf); Spdb::aux_refs_to_BE((Spdb::aux_ref_t *) auxBuf.getPtr(), n_chunks); // add parts addClientHost(); addClientIpaddr(); addClientUser(); if (app_name.size() > 0) { addPart(DS_SPDB_APP_NAME_PART, app_name.size() + 1, app_name.c_str()); } addPart(DS_SPDB_URL_PART, url_str.size() + 1, url_str.c_str()); addPart(DS_SPDB_INFO_PART, sizeof(info_t), &info); addPart(DS_SPDB_INFO2_PART, sizeof(info2_t), &info2); addPart(DS_SPDB_CHUNK_REF_PART, refBuf.getLen(), refBuf.getPtr()); addPart(DS_SPDB_AUX_REF_PART, auxBuf.getLen(), auxBuf.getPtr()); if (_auxXml.size() > 0) { addPart(DS_SPDB_AUX_XML_PART, _auxXml.size() + 1, _auxXml.c_str()); } addPart(DS_SPDB_CHUNK_DATA_PART, _dataBuf.getLen(), _dataBuf.getPtr()); // assemble void *msg = DsMessage::assemble(); if (_debug) { cerr << "------------- DsSpdbMsg::assemblePut --------------" << endl; print(cerr); cerr << "---------------------------------------------------" << endl; } return msg; } /////////////////////////////////////////////// // assemble put return // void *DsSpdbMsg::assemblePutReturn(mode_enum_t request_mode, const bool errorOccurred /* = false*/, const char *errorStr /* = NULL*/ ) { clearData(); // set header attributes setHdrAttr(DS_MESSAGE_TYPE_SPDB, DS_SPDB_PUT_RETURN, request_mode, errorOccurred? -1 : 0); // indicate that this is the end of a series of messages setCategory(EndSeries); // clear message parts clearParts(); // add parts if (errorOccurred && errorStr != NULL) { addPart(DS_SPDB_ERRORSTR_PART, strlen(errorStr) + 1, errorStr); } // assemble return (DsMessage::assemble()); } /////////////////////////////////////////////// // assemble DS_SPDB_GET messages // void *DsSpdbMsg::assembleGetExact(const string &url_str, time_t request_time, int data_type, int data_type2, int get_refs_only, int respect_zero_types, int get_unique, bool check_write_time_on_get, time_t latest_valid_write_time, Spdb::compression_t data_buf_compression) { clearData(); _urlStr = url_str; // load up local _info _info.request_time = request_time; _info.data_type = data_type; _info.data_type2 = data_type2; _info.get_refs_only = get_refs_only; _info.get_unique = get_unique; _info.respect_zero_types = respect_zero_types; _info2.check_write_time_on_get = check_write_time_on_get; _info2.latest_valid_write_time = latest_valid_write_time; _info2.data_buf_compression = data_buf_compression; return (_assembleGet(DS_SPDB_GET_MODE_EXACT)); } void *DsSpdbMsg::assembleGetClosest(const string &url_str, time_t request_time, int time_margin, int data_type, int data_type2, int get_refs_only, int respect_zero_types, int get_unique, bool check_write_time_on_get, time_t latest_valid_write_time, Spdb::compression_t data_buf_compression) { clearData(); _urlStr = url_str; // load up local _info _info.request_time = request_time; _info.time_margin = time_margin; _info.data_type = data_type; _info.data_type2 = data_type2; _info.get_refs_only = get_refs_only; _info.get_unique = get_unique; _info.respect_zero_types = respect_zero_types; _info2.check_write_time_on_get = check_write_time_on_get; _info2.latest_valid_write_time = latest_valid_write_time; _info2.data_buf_compression = data_buf_compression; return (_assembleGet(DS_SPDB_GET_MODE_CLOSEST)); } void *DsSpdbMsg::assembleGetInterval(const string &url_str, time_t start_time, time_t end_time, int data_type, int data_type2, int get_refs_only, int respect_zero_types, int get_unique, bool check_write_time_on_get, time_t latest_valid_write_time, Spdb::compression_t data_buf_compression) { clearData(); _urlStr = url_str; // load up local _info _info.start_time = start_time; _info.end_time = end_time; _info.data_type = data_type; _info.data_type2 = data_type2; _info.get_refs_only = get_refs_only; _info.get_unique = get_unique; _info.respect_zero_types = respect_zero_types; _info2.check_write_time_on_get = check_write_time_on_get; _info2.latest_valid_write_time = latest_valid_write_time; _info2.data_buf_compression = data_buf_compression; return (_assembleGet(DS_SPDB_GET_MODE_INTERVAL)); } void *DsSpdbMsg::assembleGetValid(const string &url_str, time_t request_time, int data_type, int data_type2, int get_refs_only, int respect_zero_types, int get_unique, bool check_write_time_on_get, time_t latest_valid_write_time, Spdb::compression_t data_buf_compression) { clearData(); _urlStr = url_str; // load up local _info _info.request_time = request_time; _info.data_type = data_type; _info.data_type2 = data_type2; _info.get_refs_only = get_refs_only; _info.get_unique = get_unique; _info.respect_zero_types = respect_zero_types; _info2.check_write_time_on_get = check_write_time_on_get; _info2.latest_valid_write_time = latest_valid_write_time; _info2.data_buf_compression = data_buf_compression; return (_assembleGet(DS_SPDB_GET_MODE_VALID)); } void *DsSpdbMsg::assembleGetLatest(const string &url_str, int time_margin, int data_type, int data_type2, int get_refs_only, int respect_zero_types, int get_unique, bool check_write_time_on_get, time_t latest_valid_write_time, Spdb::compression_t data_buf_compression) { clearData(); _urlStr = url_str; // load up local _info _info.time_margin = time_margin; _info.data_type = data_type; _info.data_type2 = data_type2; _info.get_refs_only = get_refs_only; _info.get_unique = get_unique; _info.respect_zero_types = respect_zero_types; _info2.check_write_time_on_get = check_write_time_on_get; _info2.latest_valid_write_time = latest_valid_write_time; _info2.data_buf_compression = data_buf_compression; return (_assembleGet(DS_SPDB_GET_MODE_LATEST)); } void *DsSpdbMsg::assembleGetFirstBefore(const string &url_str, time_t request_time, int time_margin, int data_type, int data_type2, int get_refs_only, int respect_zero_types, int get_unique, bool check_write_time_on_get, time_t latest_valid_write_time, Spdb::compression_t data_buf_compression) { clearData(); _urlStr = url_str; // load up local _info _info.request_time = request_time; _info.time_margin = time_margin; _info.data_type = data_type; _info.data_type2 = data_type2; _info.get_refs_only = get_refs_only; _info.get_unique = get_unique; _info.respect_zero_types = respect_zero_types; _info2.check_write_time_on_get = check_write_time_on_get; _info2.latest_valid_write_time = latest_valid_write_time; _info2.data_buf_compression = data_buf_compression; return (_assembleGet(DS_SPDB_GET_MODE_FIRST_BEFORE)); } void *DsSpdbMsg::assembleGetFirstAfter(const string &url_str, time_t request_time, int time_margin, int data_type, int data_type2, int get_refs_only, int respect_zero_types, int get_unique, bool check_write_time_on_get, time_t latest_valid_write_time, Spdb::compression_t data_buf_compression) { clearData(); _urlStr = url_str; // load up local _info _info.request_time = request_time; _info.time_margin = time_margin; _info.data_type = data_type; _info.data_type2 = data_type2; _info.get_refs_only = get_refs_only; _info.get_unique = get_unique; _info2.check_write_time_on_get = check_write_time_on_get; _info2.latest_valid_write_time = latest_valid_write_time; _info2.data_buf_compression = data_buf_compression; return (_assembleGet(DS_SPDB_GET_MODE_FIRST_AFTER)); } void *DsSpdbMsg::assembleGetTimes(const string &url_str, bool check_write_time_on_get, time_t latest_valid_write_time) { clearData(); _urlStr = url_str; // load up local _info _info2.check_write_time_on_get = check_write_time_on_get; _info2.latest_valid_write_time = latest_valid_write_time; return (_assembleGet(DS_SPDB_GET_MODE_TIMES)); } /////////////////////////////////////////////////////// // assemble a DS_SPDB_GET DATA success return message // // The chunk_data is passed in uncompressed. // The desired compression is indicated by data_buf_compression. void *DsSpdbMsg::assembleGetDataSuccessReturn(mode_enum_t request_mode, const info_t &getInfo, const MemBuf &refBuf, const MemBuf &auxBuf, const MemBuf &dataBuf, Spdb::compression_t data_buf_compression) { clearData(); // set header attributes setHdrAttr(DS_MESSAGE_TYPE_SPDB, DS_SPDB_GET_RETURN, request_mode); // indicate that this is the end of a series of messages setCategory(EndSeries); // load up _info _info = getInfo; _info2.data_buf_compression = Spdb::COMPRESSION_NONE; // load up refs _refBuf.free(); _refBuf.concat(refBuf); _auxBuf.free(); _auxBuf.concat(auxBuf); // load up data _dataBuf.free(); _dataBuf.concat(dataBuf); // compress as required compressDataBuf(data_buf_compression); // create local copies, byte-swap info_t locInfo = _info; BEfromInfo(locInfo); info2_t locInfo2 = _info2; BEfromInfo2(locInfo2); MemBuf refCopy(_refBuf); Spdb::chunk_refs_to_BE((Spdb::chunk_ref_t *) refCopy.getPtr(), _info.n_chunks); MemBuf auxCopy(_auxBuf); Spdb::aux_refs_to_BE((Spdb::aux_ref_t *) auxCopy.getPtr(), _info.n_chunks); // clear message parts clearParts(); // add parts addPart(DS_SPDB_INFO_PART, sizeof(info_t), &locInfo); addPart(DS_SPDB_INFO2_PART, sizeof(info2_t), &locInfo2); addPart(DS_SPDB_CHUNK_REF_PART, refCopy.getLen(), refCopy.getPtr()); addPart(DS_SPDB_AUX_REF_PART, auxCopy.getLen(), auxCopy.getPtr()); if (!_info.get_refs_only) { addPart(DS_SPDB_CHUNK_DATA_PART, _dataBuf.getLen(), _dataBuf.getPtr()); } void *msg = DsMessage::assemble(); if (_debug) { cerr << "------ DsSpdbMsg::assembleGetDataSuccessReturn ---------" << endl; print(cerr); cerr << "--------------------------------------------------------" << endl; } return msg; } /////////////////////////////////////////////////////////// // assemble a DS_SPDB_GET TIMES-mode success return message // void *DsSpdbMsg::assembleGetTimesSuccessReturn(mode_enum_t request_mode, const info_t &info) { clearData(); // set header attributes setHdrAttr(DS_MESSAGE_TYPE_SPDB, DS_SPDB_GET_RETURN, request_mode); // indicate that this is the end of a series of messages setCategory(EndSeries); // load up _info // convert copy into BE byte order _info = info; info_t locInfo(info); BEfromInfo(locInfo); // clear message parts clearParts(); // add parts addPart(DS_SPDB_INFO_PART, sizeof(info_t), &locInfo); // assemble return (DsMessage::assemble()); } /////////////////////////////////////////////////////////// // assemble a DS_SPDB_GET_MODE_TIME_LIST message // void *DsSpdbMsg::assembleCompileTimeList(const string &url_str, time_t start_time, time_t end_time, size_t min_time_interval, bool check_write_time_on_get, time_t latest_valid_write_time) { clearData(); _urlStr = url_str; // load up local _info _info.start_time = start_time; _info.end_time = end_time; _info.time_margin = min_time_interval; _info2.check_write_time_on_get = check_write_time_on_get; _info2.latest_valid_write_time = latest_valid_write_time; return (_assembleGet(DS_SPDB_GET_MODE_TIME_LIST)); } /////////////////////////////////////////////////////////////// // assemble a DS_SPDB_GET_MODE_TIME_LIST success return message // void *DsSpdbMsg::assembleCompileTimeListSuccessReturn (mode_enum_t request_mode, const info_t &info, const vector &time_list) { clearData(); // set header attributes setHdrAttr(DS_MESSAGE_TYPE_SPDB, DS_SPDB_GET_RETURN, request_mode); // indicate that this is the end of a series of messages setCategory(EndSeries); // load up _info // convert copy into BE byte order _info = info; info_t locInfo(info); BEfromInfo(locInfo); // load up time list in BE byte order int ntimes = time_list.size(); TaArray timesArray; si32 *times = timesArray.alloc(ntimes); for (int i = 0; i < ntimes; i++) { times[i] = (int) time_list[i]; } BE_from_array_32(times, ntimes * sizeof(si32)); // clear message parts clearParts(); // add parts addPart(DS_SPDB_INFO_PART, sizeof(info_t), &locInfo); addPart(DS_SPDB_TIME_LIST_PART, ntimes * sizeof(si32), times); void *msg = DsMessage::assemble(); if (_debug) { cerr << "--- DsSpdbMsg::assembleCompileTimeListSuccessReturn ---" << endl; print(cerr); cerr << "-------------------------------------------------------" << endl; } return msg; } /////////////////////////////////////////////// // assemble get error return // void *DsSpdbMsg::assembleGetErrorReturn(mode_enum_t request_mode, const char *errorStr /* = NULL*/ ) { clearData(); // set header attributes setHdrAttr(DS_MESSAGE_TYPE_SPDB, DS_SPDB_GET_RETURN, request_mode, -1); // indicate that this is the end of a series of messages setCategory(EndSeries); // clear message parts clearParts(); // add parts if (errorStr != NULL) { addPart(DS_SPDB_ERRORSTR_PART, strlen(errorStr) + 1, errorStr); } // assemble void *msg = DsMessage::assemble(); if (_debug) { cerr << "--------- DsSpdbMsg::assembleGetErrorReturn -----------" << endl; print(cerr); cerr << "-------------------------------------------------------" << endl; } return msg; } /////////////////////////////////////////////////////// // generic assemble using member data void *DsSpdbMsg::assemble(int subtype, int mode, category_t category) { // set header attributes setHdrAttr(DS_MESSAGE_TYPE_SPDB, subtype, mode); // indicate that this is the end of a series of messages setCategory(category); // convert info copy into BE byte order info_t locInfo(_info); BEfromInfo(locInfo); info2_t locInfo2(_info2); BEfromInfo2(locInfo2); // load up local refs to BE byte order MemBuf locRefBuf(_refBuf); Spdb::chunk_refs_to_BE((Spdb::chunk_ref_t *) locRefBuf.getPtr(), _info.n_chunks); MemBuf locAuxBuf(_auxBuf); Spdb::aux_refs_to_BE((Spdb::aux_ref_t *) locAuxBuf.getPtr(), _info.n_chunks); // clear message parts clearParts(); // add parts addClientHost(); addClientIpaddr(); addClientUser(); if (_appName.size() > 0) { addPart(DS_SPDB_APP_NAME_PART, _appName.size() + 1, _appName.c_str()); } if (_urlStr.size() > 0) { addPart(DS_SPDB_URL_PART, _urlStr.size() + 1, _urlStr.c_str()); } if (_errorOccurred) { if (_errorStr.size() > 0) { addPart(DS_SPDB_URL_PART, _errorStr.size() + 1, _errorStr.c_str()); } } addPart(DS_SPDB_INFO_PART, sizeof(info_t), &locInfo); addPart(DS_SPDB_INFO2_PART, sizeof(info2_t), &locInfo2); if (locRefBuf.getLen() > 0) { addPart(DS_SPDB_CHUNK_REF_PART, locRefBuf.getLen(), locRefBuf.getPtr()); } if (locAuxBuf.getLen() > 0) { addPart(DS_SPDB_AUX_REF_PART, locAuxBuf.getLen(), locAuxBuf.getPtr()); } if (_auxXml.size() > 0) { addPart(DS_SPDB_AUX_XML_PART, _auxXml.size() + 1, _auxXml.c_str()); } if (_dataBuf.getLen() > 0) { addPart(DS_SPDB_CHUNK_DATA_PART, _dataBuf.getLen(), _dataBuf.getPtr()); } // assemble void *msg = DsMessage::assemble(); if (_debug) { cerr << "------------- DsSpdbMsg::assemble --------------" << endl; print(cerr); cerr << "------------------------------------------------" << endl; } return msg; } ///////////////////////////////////// // override the disassemble function // int DsSpdbMsg::disassemble(const void *in_msg, ssize_t msg_len, bool delay_uncompression /* = false */) { clearData(); // peek at the header to make sure we're looking at the // right type of message if (decodeHeader(in_msg, msg_len)) { cerr << "ERROR - DsSpdbMsg::disassemble" << endl; cerr << "Bad message header" << endl; cerr << "Message len: " << msg_len << endl; return (-1); } if (_type != DS_MESSAGE_TYPE_SPDB) { cerr << "ERROR - DsSpdbMsg::disassemble" << endl; cerr << "Unknown message type: " << _type << endl; cerr << "Message len: " << msg_len << endl; printHeader(&cerr, ""); return (-1); } // disassemble the message using the base-class routine if (DsMessage::disassemble(in_msg, msg_len)) { cerr << "ERROR - DsSpdbMsg::disassemble" << endl; cerr << "ERROR in DsMessage::disassemble()" << endl; return (-1); } if (_flags) { _errorOccurred = true; } // set data members to parts if (partExists(DS_SPDB_URL_PART)) { _urlStr = (char *) getPartByType(DS_SPDB_URL_PART)->getBuf(); } if (partExists(DS_SPDB_APP_NAME_PART)) { _appName = (char *) getPartByType(DS_SPDB_APP_NAME_PART)->getBuf(); } if (partExists(DS_SPDB_ERRORSTR_PART)) { _errorStr = (char *) getPartByType(DS_SPDB_ERRORSTR_PART)->getBuf(); } if (partExists(DS_SPDB_INFO_PART)) { memcpy(&_info, getPartByType(DS_SPDB_INFO_PART)->getBuf(), sizeof(info_t)); BEtoInfo(_info); } if (partExists(DS_SPDB_INFO2_PART)) { memcpy(&_info2, getPartByType(DS_SPDB_INFO2_PART)->getBuf(), sizeof(info2_t)); BEtoInfo2(_info2); } if (partExists(DS_SPDB_HORIZ_LIMITS_PART)) { memcpy(&_horizLimits, getPartByType(DS_SPDB_HORIZ_LIMITS_PART)->getBuf(), sizeof(horiz_limits_t)); BE_to_array_32(&_horizLimits, sizeof(_horizLimits)); _horizLimitsSet = true; } if (partExists(DS_SPDB_VERT_LIMITS_PART)) { memcpy(&_vertLimits, getPartByType(DS_SPDB_VERT_LIMITS_PART)->getBuf(), sizeof(vert_limits_t)); BE_to_array_32(&_vertLimits, sizeof(_vertLimits)); _vertLimitsSet = true; } if (partExists(DS_SPDB_CHUNK_REF_PART)) { DsMsgPart *part = getPartByType(DS_SPDB_CHUNK_REF_PART); const void *buf = part->getBuf(); ssize_t len = part->getLength(); _refBuf.free(); _refBuf.add(buf, len); Spdb::chunk_refs_from_BE((Spdb::chunk_ref_t *) _refBuf.getPtr(), len / sizeof(Spdb::chunk_ref_t)); } if (partExists(DS_SPDB_AUX_REF_PART)) { DsMsgPart *part = getPartByType(DS_SPDB_AUX_REF_PART); const void *buf = part->getBuf(); ssize_t len = part->getLength(); _auxBuf.free(); _auxBuf.add(buf, len); Spdb::aux_refs_from_BE((Spdb::aux_ref_t *) _auxBuf.getPtr(), len / sizeof(Spdb::aux_ref_t)); } if (partExists(DS_SPDB_CHUNK_DATA_PART)) { DsMsgPart *part = getPartByType(DS_SPDB_CHUNK_DATA_PART); _dataBuf.free(); _dataBuf.add(part->getBuf(), part->getLength()); if (!delay_uncompression) { // uncompress now uncompressDataBuf(); } } if (partExists(DS_SPDB_TIME_LIST_PART)) { DsMsgPart *part = getPartByType(DS_SPDB_TIME_LIST_PART); ssize_t nTimes = part->getLength() / sizeof(si32); si32 *times = (si32 *) part->getBuf(); BE_from_array_32(times, part->getLength()); for (ssize_t i = 0; i < nTimes; i++) { _timeList.push_back(times[i]); } } if (partExists(DS_SPDB_AUX_XML_PART)) { _auxXml = (char *) getPartByType(DS_SPDB_AUX_XML_PART)->getBuf(); } if (_debug) { cerr << "------------- DsSpdbMsg::disassemble --------------" << endl; print(cerr); cerr << "---------------------------------------------------" << endl; } return (0); } ///////////////////////////////////////////////////////////// // compress the data buffer void DsSpdbMsg::compressDataBuf(Spdb::compression_t compression) { if (compression == Spdb::COMPRESSION_NONE) { uncompressDataBuf(); return; } Spdb::compression_t currentCompression = dataBufCompression(); if (compression == currentCompression) { // already compressed correctly return; } if (currentCompression != Spdb::COMPRESSION_NONE) { uncompressDataBuf(); } // determine toolsa compression method ta_compression_method_t compress_method = TA_COMPRESSION_GZIP; if (compression == Spdb::COMPRESSION_BZIP2) { compress_method = TA_COMPRESSION_BZIP; } // compress unsigned int nbytesCompressed; void *compressedData = ta_compress(compress_method, _dataBuf.getPtr(), _dataBuf.getLen(), &nbytesCompressed); if (compressedData == NULL) { // failed to compress return; } // success _dataBuf.free(); _dataBuf.add(compressedData, nbytesCompressed); // free up ta_compress_free(compressedData); // set compression status _info2.data_buf_compression = compression; } ///////////////////////////////////// // uncompress the data buffer void DsSpdbMsg::uncompressDataBuf() { Spdb::compression_t currentCompression = dataBufCompression(); // check if it is compressed if (currentCompression == Spdb::COMPRESSION_NONE) { // nothing to do return; } // uncompress unsigned int nbytesUncompressed; void *uncompressed = ta_decompress(_dataBuf.getPtr(), &nbytesUncompressed); if (uncompressed == NULL) { cerr << "WARNING - DsSpdbMsg::uncompressDataBuf" << endl; cerr << " Cannot uncompress data buffer" << endl; _info2.data_buf_compression = Spdb::COMPRESSION_NONE; return; } _dataBuf.free(); _dataBuf.add(uncompressed, nbytesUncompressed); ta_compress_free(uncompressed); // set compression status _info2.data_buf_compression = Spdb::COMPRESSION_NONE; } ///////////////////////////////////// // get data buffer compression state Spdb::compression_t DsSpdbMsg::dataBufCompression() const { return (Spdb::compression_t) _info2.data_buf_compression; } //////////////// // print message // void DsSpdbMsg::print(ostream &out, const char *spacer) const { out << spacer << "====== DsSpdbMsg ======" << endl; out << spacer << "clientHost: " << getClientHost() << endl; out << spacer << "clientIpaddr: " << getClientIpaddr() << endl; out << spacer << "clientUser: " << getClientUser() << endl; out << spacer << " Message subType: " << subtype2Str(_subType) << endl; out << spacer << " url: " << _urlStr << endl; out << spacer << " mode: " << mode2Str(_mode) << endl; switch (_subType) { case DS_SPDB_PUT: if (_info2.lead_time_storage == Spdb::LEAD_TIME_IN_DATA_TYPE) { out << spacer << " Lead time: set in data_type" << endl; } else if (_info2.lead_time_storage == Spdb::LEAD_TIME_IN_DATA_TYPE2) { out << spacer << " Lead time: set in data_type2" << endl; } switch (_mode) { case DS_SPDB_PUT_MODE_OVER: case DS_SPDB_PUT_MODE_ADD: case DS_SPDB_PUT_MODE_ADD_UNIQUE: case DS_SPDB_PUT_MODE_ONCE: out << spacer << " nChunks: " << _info.n_chunks << endl; break; default: {} } break; case DS_SPDB_PUT_RETURN: if (_flags) { out << spacer << " error occurred." << endl; out << spacer << _errorStr << endl; } break; case DS_SPDB_GET: switch (_mode) { case DS_SPDB_GET_MODE_EXACT: out << spacer << " request_time: " << utimstr(_info.request_time) << endl; break; case DS_SPDB_GET_MODE_CLOSEST: out << spacer << " request_time: " << utimstr(_info.request_time) << endl; out << spacer << " time_margin: " << _info.time_margin << endl; break; case DS_SPDB_GET_MODE_INTERVAL: out << spacer << " start_time: " << utimstr(_info.start_time) << endl; out << spacer << " end_time: " << utimstr(_info.end_time) << endl; break; case DS_SPDB_GET_MODE_VALID: out << spacer << " request_time: " << utimstr(_info.request_time) << endl; break; case DS_SPDB_GET_MODE_LATEST: out << spacer << " time_margin: " << _info.time_margin << endl; break; case DS_SPDB_GET_MODE_FIRST_BEFORE: out << spacer << " request_time: " << utimstr(_info.request_time) << endl; out << spacer << " time_margin: " << _info.time_margin << endl; break; case DS_SPDB_GET_MODE_FIRST_AFTER: out << spacer << " request_time: " << utimstr(_info.request_time) << endl; out << spacer << " time_margin: " << _info.time_margin << endl; break; case DS_SPDB_GET_MODE_TIMES: break; case DS_SPDB_GET_MODE_TIME_LIST: out << spacer << " start_time: " << utimstr(_info.start_time) << endl; out << spacer << " end_time: " << utimstr(_info.end_time) << endl; out << spacer << " min_interval: " << _info.time_margin << endl; break; default: break; } out << spacer << " dataType: " << _info.data_type << endl; out << spacer << " dataType2: " << _info.data_type2 << endl; if (_info.get_unique == Spdb::UniqueOff) { out << spacer << " Get unique: off" << endl; } else if (_info.get_unique == Spdb::UniqueLatest) { out << spacer << " Get unique: latest" << endl; } else if (_info.get_unique == Spdb::UniqueEarliest) { out << spacer << " Get unique: earliest" << endl; } else { out << spacer << " Get unique: unknown" << endl; } if (_info.get_refs_only) { out << spacer << " Get refs only: true" << endl; } else { out << spacer << " Get refs only: false" << endl; } if (_info.respect_zero_types) { out << spacer << " Respect zero types: true" << endl; } else { out << spacer << " Respect zero types: false" << endl; } if (_info2.check_write_time_on_get) { out << spacer << " Check write time: true" << endl; out << spacer << " Latest valid write time: " << utimstr(_info2.latest_valid_write_time) << endl; } if (_info2.check_write_time_on_get) { out << spacer << " Check write time: true" << endl; out << spacer << " Latest valid write time: " << utimstr(_info2.latest_valid_write_time) << endl; } if (_info2.data_buf_compression == Spdb::COMPRESSION_NONE) { out << spacer << " Data buf compression: none" << endl; } else if (_info2.data_buf_compression == Spdb::COMPRESSION_GZIP) { out << spacer << " Data buf compression: gzip" << endl; } else if (_info2.data_buf_compression == Spdb::COMPRESSION_BZIP2) { out << spacer << " Data buf compression: bzip2" << endl; } if (_horizLimitsSet) { out << spacer << " Horiz limits:" << endl; out << spacer << " Min lat: " << _horizLimits.min_lat << endl; out << spacer << " Min lon: " << _horizLimits.min_lon << endl; out << spacer << " Max lat: " << _horizLimits.max_lat << endl; out << spacer << " Max lon: " << _horizLimits.max_lon << endl; } if (_vertLimitsSet) { out << spacer << " Vert limits:" << endl; out << spacer << " Min ht: " << _vertLimits.min_ht << endl; out << spacer << " Max ht: " << _vertLimits.max_ht << endl; } break; case DS_SPDB_GET_RETURN: switch (_mode) { case DS_SPDB_GET_MODE_TIMES: out << spacer << " start_time: " << utimstr(_info.start_time) << endl; out << spacer << " end_time: " << utimstr(_info.end_time) << endl; out << spacer << " last_valid_time: " << utimstr(_info.last_valid_time) << endl; break; case DS_SPDB_GET_MODE_EXACT: case DS_SPDB_GET_MODE_CLOSEST: case DS_SPDB_GET_MODE_INTERVAL: case DS_SPDB_GET_MODE_VALID: case DS_SPDB_GET_MODE_LATEST: case DS_SPDB_GET_MODE_FIRST_BEFORE: case DS_SPDB_GET_MODE_FIRST_AFTER: out << spacer << " nChunks: " << _info.n_chunks << endl; if (_info2.data_buf_compression == Spdb::COMPRESSION_NONE) { out << spacer << " Data buf compression: none" << endl; } else if (_info2.data_buf_compression == Spdb::COMPRESSION_GZIP) { out << spacer << " Data buf compression: gzip" << endl; } else if (_info2.data_buf_compression == Spdb::COMPRESSION_BZIP2) { out << spacer << " Data buf compression: bzip2" << endl; } break; default: break; } if (_flags) { out << spacer << " error occurred." << endl; out << spacer << _errorStr << endl; } break; default: break; } // switch (_subType) if (_auxXml.size() > 0) { out << spacer << " auxXml:" << endl; out << spacer << _auxXml << endl; } if (_refBuf.getLen() > 0) { out << spacer << " Chunk ref buf size: " << _refBuf.getLen() << endl; } if (_auxBuf.getLen() > 0) { out << spacer << " Aux ref buf size: " << _auxBuf.getLen() << endl; } if (_dataBuf.getLen() > 0) { out << spacer << " Data buf size: " << _dataBuf.getLen() << endl; } if (lengthAssembled() > 0) { out << spacer << " Assembled message size: " << lengthAssembled() << endl; } out << spacer << "=======================" << endl; printHeader(out, spacer); // printPartHeaders(out, spacer); out << spacer << "=======================" << endl; } /////////////////////////////////////////////// // generic assemble for a DS_SPDB_GET message // void *DsSpdbMsg::_assembleGet(const mode_enum_t request_mode) { // set header attributes setHdrAttr(DS_MESSAGE_TYPE_SPDB, DS_SPDB_GET, request_mode); // indicate that this is the start of a series of get messages setCategory(StartGet); // convert local info to BE byte order info_t info(_info); BEfromInfo(info); info2_t info2(_info2); BEfromInfo2(info2); // clear message parts clearParts(); // add parts addClientHost(); addClientIpaddr(); addClientUser(); addPart(DS_SPDB_URL_PART, _urlStr.size() + 1, _urlStr.c_str()); addPart(DS_SPDB_INFO_PART, sizeof(info_t), &info); addPart(DS_SPDB_INFO2_PART, sizeof(info2_t), &info2); if (_auxXml.size() > 0) { addPart(DS_SPDB_AUX_XML_PART, _auxXml.size() + 1, _auxXml.c_str()); } // horizontal limits if (_horizLimitsSet) { horiz_limits_t hlimits = _horizLimits; BE_from_array_32(&hlimits, sizeof(hlimits)); addPart(DS_SPDB_HORIZ_LIMITS_PART, sizeof(hlimits), &hlimits); } // vertical limits if (_vertLimitsSet) { vert_limits_t vlimits = _vertLimits; BE_from_array_32(&vlimits, sizeof(vlimits)); addPart(DS_SPDB_VERT_LIMITS_PART, sizeof(vlimits), &vlimits); } // assemble void *msg = DsMessage::assemble(); if (_debug) { cerr << "------------- DsSpdbMsg::_assembleGet --------------" << endl; print(cerr); cerr << "----------------------------------------------------" << endl; } return msg; } //////////////////////////////////////////////// // Function to return the reference time. time_t DsSpdbMsg::getRefTime() const { // // Only relevant for get functions - return 0 otherwise. // if (_subType != DS_SPDB_GET) return 0L; switch (_mode) { // // For LATEST and INTERVAL, return 'now' and end_time respectively. // Also return end_time for TIME_LIST mode. // case DS_SPDB_GET_MODE_LATEST: return time(NULL); break; // case DS_SPDB_GET_MODE_INTERVAL: case DS_SPDB_GET_MODE_TIME_LIST: return _info.end_time; break; // // Mostly, we return the request time. // This is the default, but I'll list // all cases before I give the default label, just // so that the circumstances are clear. // case DS_SPDB_GET_MODE_VALID: case DS_SPDB_GET_MODE_CLOSEST: case DS_SPDB_GET_MODE_EXACT: case DS_SPDB_GET_MODE_FIRST_BEFORE: case DS_SPDB_GET_MODE_FIRST_AFTER: case DS_SPDB_GET_MODE_TIMES: default : return _info.request_time; break; } // // Should never get here. // return 0L; } ////////////////////////////////// // Convert to BE from info struct // void DsSpdbMsg::BEfromInfo(info_t &info) { BE_from_array_32(&info, sizeof(info_t) - SPDB_LABEL_MAX); } ////////////////////////////////// // Convert from BE to info struct // void DsSpdbMsg::BEtoInfo(info_t &info) { BE_to_array_32(&info, sizeof(info_t) - SPDB_LABEL_MAX); } ////////////////////////////////// // Convert to BE from info2 struct // void DsSpdbMsg::BEfromInfo2(info2_t &info2) { BE_from_array_32(&info2, sizeof(info2_t)); } ////////////////////////////////// // Convert from BE to info2 struct // void DsSpdbMsg::BEtoInfo2(info2_t &info2) { BE_to_array_32(&info2, sizeof(info2_t)); } ///////////////////////////////// // set or clear horizontal limits // // Only relevant for requests to servers which can interpret // the SPDB data, e.g. the Symprod servers. void DsSpdbMsg::setHorizLimits(double min_lat, double min_lon, double max_lat, double max_lon) { _horizLimits.min_lat = min_lat; _horizLimits.min_lon = min_lon; _horizLimits.max_lat = max_lat; _horizLimits.max_lon = max_lon; _horizLimitsSet = true; } void DsSpdbMsg::clearHorizLimits() { _horizLimitsSet = false; } /////////////////////////////// // set or clear vertical limits // // heights will generally be specified in km msl, though there // may be instances in which the client and server agree on // a different convention. // // Only relevant for requests to servers which can interpret // the SPDB data, e.g. the Symprod servers. void DsSpdbMsg::setVertLimits(double min_ht, double max_ht) { _vertLimits.min_ht = min_ht; _vertLimits.max_ht = max_ht; _vertLimitsSet = true; } void DsSpdbMsg::clearVertLimits() { _vertLimitsSet = false; } ////////////////////////////////////////////////////////////////// // set the auxiliary XML buffer // This may be used to pass extra information from a client // to a server // The contents of the XML must be agreed upon between the client // and server. This is not part of the SPDB protocol. void DsSpdbMsg::setAuxXml(const string &xml) { _auxXml = xml; } void DsSpdbMsg::clearAuxXml() { _auxXml.clear(); } //////////////////////////////////////////////// // return string representation of subtype string DsSpdbMsg::subtype2Str(int subtype) { switch(subtype) { case DS_SPDB_PUT: return "DS_SPDB_PUT"; case DS_SPDB_GET: return "DS_SPDB_GET"; case DS_SPDB_PUT_RETURN: return "DS_SPDB_PUT_RETURN"; case DS_SPDB_GET_RETURN: return "DS_SPDB_GET_RETURN"; default: return "UNKNOWN"; } } //////////////////////////////////////////////// // return string representation of mode string DsSpdbMsg::mode2Str(int mode) { switch(mode) { case DS_SPDB_PUT_MODE_ADD_UNIQUE: return "DS_SPDB_PUT_MODE_ADD_UNIQUE"; case DS_SPDB_PUT_MODE_ERASE: return "DS_SPDB_PUT_MODE_ERASE"; case DS_SPDB_PUT_MODE_OVER: return "DS_SPDB_PUT_MODE_OVER"; case DS_SPDB_PUT_MODE_ADD: return "DS_SPDB_PUT_MODE_ADD"; case DS_SPDB_PUT_MODE_ONCE: return "DS_SPDB_PUT_MODE_ONCE"; case DS_SPDB_GET_MODE_EXACT: return "DS_SPDB_GET_MODE_EXACT"; case DS_SPDB_GET_MODE_CLOSEST: return "DS_SPDB_GET_MODE_CLOSEST"; case DS_SPDB_GET_MODE_INTERVAL: return "DS_SPDB_GET_MODE_INTERVAL"; case DS_SPDB_GET_MODE_VALID: return "DS_SPDB_GET_MODE_VALID"; case DS_SPDB_GET_MODE_LATEST: return "DS_SPDB_GET_MODE_LATEST"; case DS_SPDB_GET_MODE_FIRST_BEFORE: return "DS_SPDB_GET_MODE_FIRST_BEFORE"; case DS_SPDB_GET_MODE_FIRST_AFTER: return "DS_SPDB_GET_MODE_FIRST_AFTER"; case DS_SPDB_GET_MODE_TIMES: return "DS_SPDB_GET_MODE_TIMES"; case DS_SPDB_GET_MODE_TIME_LIST: return "DS_SPDB_GET_MODE_TIME_LIST"; default: return "UNKNOWN"; } } //////////////////////////////////////////////// // return string representation of part string DsSpdbMsg::part2Str(int part) { switch(part) { case DS_SPDB_URL_PART: return "DS_SPDB_URL_PART"; case DS_SPDB_INFO_PART: return "DS_SPDB_INFO_PART"; case DS_SPDB_CHUNK_REF_PART: return "DS_SPDB_CHUNK_REF_PART"; case DS_SPDB_CHUNK_DATA_PART: return "DS_SPDB_CHUNK_DATA_PART"; case DS_SPDB_ERRORSTR_PART: return "DS_SPDB_ERRORSTR_PART"; case DS_SPDB_HORIZ_LIMITS_PART: return "DS_SPDB_HORIZ_LIMITS_PART"; case DS_SPDB_VERT_LIMITS_PART: return "DS_SPDB_VERT_LIMITS_PART"; case DS_SPDB_INFO2_PART: return "DS_SPDB_INFO2_PART"; case DS_SPDB_TIME_LIST_PART: return "DS_SPDB_TIME_LIST_PART"; case DS_SPDB_APP_NAME_PART: return "DS_SPDB_APP_NAME_PART"; case DS_SPDB_AUX_REF_PART: return "DS_SPDB_AUX_REF_PART"; case DS_SPDB_AUX_XML_PART: return "DS_SPDB_AUX_XML_PART"; default: return "UNKNOWN"; } } ////////////////////////////// // copy contents DsSpdbMsg &DsSpdbMsg::_copy(const DsSpdbMsg &rhs) { if (&rhs == this) { return *this; } // base class copy DsServerMsg::_copy(rhs); // copy members _appName = rhs._appName; _urlStr = rhs._urlStr; _errorOccurred = rhs._errorOccurred; _errorStr = rhs._errorStr; _info = rhs._info; _info2 = rhs._info2; _horizLimits = rhs._horizLimits; _vertLimits = rhs._vertLimits; _horizLimitsSet = rhs._horizLimitsSet; _vertLimitsSet = rhs._vertLimitsSet; _refBuf = rhs._refBuf; _auxBuf = rhs._auxBuf; _dataBuf = rhs._dataBuf; _auxXml = rhs._auxXml; _timeList = rhs._timeList; return *this; }