U g@sZdZdddgZddlZddlZddlZddlZddlZddlZddl Z ddl Z ddl Z ddl Z ddl Z ddlZddlZddlZddlZddlZddlZddlZddlZddlZddlmZmZmZmZmZddlmZm Z m!Z!m"Z"ddlm#Z#m$Z$m%Z%dd lm&Z&m'Z'm(Z(dd l)m*Z*d dZ+d d Z,ddZ-ddZ-Gddde.Z/Gddde.Z0dS)a!Obtains input data needed by various subclasses of hafs.hafstask.HAFSTask. This module implements the functionality described in hafs.hafstask.HAFSTask.inputiter(). It takes many HAFS tasks, asks them what input is needed and collates that information. It has a list of many possible input sources, and knows which ones are available from which cluster. It goes through available input sources in priority order, obtaining the input data. DataCatalog InputSource in_date_rangeN)aliasbatchexecheckrunExitStatusExceptionrun) deliver_file isnonempty make_symlinkmakedirs) to_datetimeto_datetime_rel to_timedelta)InputSourceBadTypePartialTransferUnsupportedTransfer)jloggercCstd}t|}|dD]}|d}|dkr^t|}|t| |kr|t||krdSqt|dd}t||d|d}|t| |kr|t||krdSqd S) ab!Is this time in the given time range? @param t A time as a ten digit number. For example, 1830123118 is December 31, 1830 at 18:00 UTC. @param trange A comma-separated list of time ranges such as this: * 2019081412 --- 12:00 UTC on August 14, 2019 * 2019081412-2019082318 --- From 12:00 UTC on August 14, 2019 through 18:00 UTC on August 23, 2019 * 2019081412-2019082318,2011010100-2011123123 --- From 12:00 UTC on August 14, 2019 through 18:00 UTC on August 23, 2019 and all of year 2011. @returns True if t falls in the range trange, or False otherwise.Z1800,-rT  F)rrsplitfindr)tZtrangeepsilontrZidashstartendr!7/lfs/h1/ops/para/packages/hafs.v2.0.8/ush/hafs/input.pyrs"   cCs8tjtj|tjtj|tj||dddS)zX!Convenience function that opens a temporary file using tempfile.NamedTemporaryFile.z.tmpF)prefixdirmodesuffixdelete) produtilfileopr ospathdirnametempfileNamedTemporaryFilebasename)fmr!r!r"tempopenBs r2cCst}|d|dd|ddftt|D]b}|dks:|dkrPq:||}t|tjr|dt || dfq:|dt |t |fq:|d| S zd!Makes a string version of a dataset+item dict as produced by hafs.hafstask.HAFSTask.inputiter()z%s(%sdatasetz(**no*dataset**)itemz (**no*item**)z, %s=%sz%Y-%m-%d_%H:%M:%S) ioStringIOwritegetsortedlistkeys isinstancedatetimestrstrftimereprgetvaluedskvr!r!r"strsrcLs   rJcCst}|d|dd|ddftt|D]b}|dks:|dkrPq:||}t|tjr|dt || dfq:|dt |t |fq:|d| Sr3r7rEr!r!r"rJ^s   c@s<eZdZdZddZddZddZd d d Zdd d ZdS)ra !Provides the location of a file in an archive, on disk or on a remote server via sftp or ftp. This class is a collection of functions that know how to provide the location of a file in either an archive or a filesystem. It does not know how to actually obtain the file. This serves as the underlying "where is that file" implementation of InputSource. All of this is driven by a section in an hafs.config.HAFSConfig object. For example, suppose one set up this configuration file: @code{.conf} [wcoss_fcst_nco] # WCOSS: Input locations for the production HAFS gfs = /com/gfs/prod/gfs.{aYMD}/ gdas1 = /com/gfs/prod/gdas.{aYMD}/ gfs_sf = gfs.t{aHH}z.sf{fahr:02d} gfs_sfcanl = gfs.t{aHH}z.sfcanl gdas1_bufr = gdas1.t{aHH}z.{obstype}.tm00.bufr_d @endcode In this example, "gfs" is a dataset, while "gfs_sfcanl" is an item in the dataset. The DataCatalog.locate() function can find the location of a gfs_sf file given the inputs required for string expansion by hafs.config.HAFSConfig.timestrinterp(). In this case, only the analysis time is required for the "{aYMD}" in the dataset location and "{aHH}" in the gfs_sfcanl filename. @code{.py} dc=DataCatalog(conf,"wcoss_fcst_nco","2019091800") sfcanl=dc.locate("gfs","gfs_sfcanl") print sfcanl @endcode That code would print "/com/gfs/prod/gfs.20190818/gfs.t00z.sfcanl" which is the operational output path of the GFS surface analysis file for the analysis time in question. Suppose we wanted the spectral forecast file, "gfs_sf" instead, for forecast hour 54. That also requires the forecast time ("ftime") in order to fill in the "{fahr:02d}" in the filename with the number 54. @code{.py} dc=DataCatalog(conf,"wcoss_fcst_nco","2019091800") sf48a=dc.locate("gfs","gfs_sf",ftime="2019092006") sf48b=dc.locate("gfs","gfs_sf",ftime=48*3600) print sf48a print sf48b @endcode That code would print "/com/gfs/prod/gfs.20190818/gfs.t00z.sf54" twice. Note that you can specify the forecast time as an absolute time, or as a number of seconds relative to the analysis time and achieve the same effect either way. If we want the bufr file, we have to provide one more piece of information: the observation type, to fill in "{obstype}". @code{.py} dc=DataCatalog(conf,"wcoss_fcst_nco","2019091800") gpm=dc.locate("gdas1","gdas1_bufr",obstype="gpm") print gpm @endcode which prints "/com/gfs/prod/gdas.20190918/gdas1.t00z.gpm.tm00.bufr_d" cCs,||_t|tstd||_t||_dS)z!DataCatalog constructor @param conf the configuration object, an hafs.config.HAFSConfig @param section the section that provides location information @param anltime the default analysis time z2In DataCatalog.__init__, section must be a string.N)confr?rA TypeErrorsectionranltime)selfrKrMrNr!r!r"__init__s  zDataCatalog.__init__cCs8t|jtjr|jd}n t|j}dt|j|fS)z,!A string representation of this DataCatalogz%Y%m%d%HzDataCatalog(conf,%s,%s))r?rNr@rBrArCrM)rOstimer!r!r"__repr__s zDataCatalog.__repr__c CsHzttddWStjtttfk rB}z WYdSd}~XYnXdS)z!Is this dataset updated in real-time? @returns True if this dataset is updated in real-time, False otherwise. By default, this will return True if conf[section,"rt_updated"] is set to "yes" or False otherwise. rt_updatedFN)rKgetboolrM configparserErrorKeyErrorrL ValueError)rOer!r!r"rSszDataCatalog.rt_updatedNcKs|dkr0|dk r*|dt|t|f|j}|dkr^|dk rZ|dt|t|f|}t|}t||}|dk r|dk rt||s|dk r|dt|t|t|fdS|dk r|dt|t|t|t|jf|jj |jd|d||f|S)a2!Internal function that performs string interpolation. This is an internal implementation function that you should not call directly. It performs string interpolation using the underlying conf object. This acts exactly like the expansions done in the hafs.conf file: {stuff} is expanded to the contents of the "stuff" variable. Expansions are done in the section specified in the constructor. In addition, various a* and f* variables are expanded based on the analysis time ("atime") and forecast time ("ftime"). See hafs.config.HAFSConfig.timestrinterp() for details. @param string the string being expanded @param atime Optional: the analysis time. The default is self.anltime @param ftime Optional: the forecast time. @param logger Optional: a logging.Logger for log messages @param dates Optional: dates for which this datasource is valid. This is passed to in_date_range() for validation. This is used to implement the InputSource date ranges. @param kwargs Additional keyword arguments are passed to the hafs.config.HAFSConfig.timestrinterp() for string replacement. @returns The return value from string interpolation or None if nothing was found.Nz4{%s}: has no atime. Will use atime=self.anltime=%s.z-{%s}: has no ftime. Will use ftime=atime=%s.z{%s}: atime %s not in %sz1parsing {%s} with ftime=%s atime=%s in section %s{}) inforArCrNrrrrMrK timestrinterp)rOstringatimeftimeloggerdateskwargsr!r!r"parsesR   zDataCatalog.parsec Ks|dk r.|dt|t|t|t|f|j|f||||d|}|dkrVdS|j|f|||d|} d|dddkr|| } n |d| } |dk r|d t|t| t| f| S) a!Find the location of a requested piece of data. Locates the specified item for the specified dataset, at the given analysis time ("atime") and forecast time ("ftime"). If the requested data is known to not exist, returns None. This should be overridden by subclasses. The present implementation just does this: {dataset}/{item} expanding dataset and item with self.parse. Any kwargs are passed along: this allows such things as ensemble ID, or switching between GRIB1 or GRIB2 via a keyword argument. @param dataset The name of the dataset. @param item The name of the item in the dataset. @param atime Optional: the analysis time. The default is self.anltime. @param ftime Optional: the forecast time which can be anything accepted by tcutil.numerics.to_datetime_rel() relative to the analysis time. @param logger Optional: a logging.Logger for log messages. If this is provided, several steps along the way of finding the data location are logged. @param dates Optional: dates for which this datasource is valid. This is passed to in_date_range() for validation. This is used to implement the InputSource date ranges. @param kwargs Additional keyword arguments are passed by parse() to the hafs.config.HAFSConfig.timestrinterp() for string replacement. @return The path to the requested data or None if it is not found.Nz.locate item=%s atime=%s ftime=%s in dataset=%s)r_r`rarb)r_r`raz/#r/zresult %s %s => %s)r\rCrdr) rOr4r5r_r`rarbrcdsitresultr!r!r"locates0  zDataCatalog.locate)NNNN)NNNN) __name__ __module__ __qualname____doc__rPrRrSrdrjr!r!r!r"ros= 1c@seZdZdZd'ddZd(ddZdd Zd)d d Zd*d dZd+ddZ d,ddZ ddZ ddZ ddZ d-ddZddZddZd d!Zd.d#d$Zd/d%d&ZdS)0ra!Fetch data from multiple sources. This class knows how to fetch data from remote clusters, or the local machine. The data locations are specified by a several DataCatalog sections, each of which is given a priority, a valid set of dates and a file transfer mechanism. Data catalogs are tried in priority order. Files are obtained in multiple threads at once, and several file transfer mechanisms are understood: * file:// --- obtain files on disk * ftp:// --- contact an FTP server * sftp:// --- contact a server over SSH. SSH-based rsync is used. * htar:// --- use the proprietary htar program to get a tape archive However, only one DataCatalog is examined at a time. All threads work on that one DataCatalog until all data that can be obtained from it is done. Then the threads exit, and new ones are spawned to examine the next DataCatalog.Nc s|||_||_||_ddfdd}t||_||_t|_d|_ t|_ d|_ tt j |_|dk snt|dk sztt||_t||_td|_|g}||dr|||dfdtt} |D]} || D]} | d } | d kr| d | | | d d} }z|| | | | |<Wqtk rz}z.|dk rb|d | | t|fWYqW5d}~XYqXqqt}| D]\} }d |kr,d|ksd|kr,|dd}|dkrt|j| |j}nt d|fd|kr|d}nd}|j!||d |dd|dd|dn|d| f|"t| q|rpt#j$%d|jd&|f|'dS)a!InputSource constructor. @param conf the hafs.config.HAFSConfig to use for configuration info @param section the section that specifies the list of data catalogs @param anltime the default analysis time @param htar the produtil.prog.Runner that runs htar @param logger a logging.Logger for log messages @param hsi the produtil.prog.Runner that runs hsicSsdSNr!r!r!r!r"noneRsz"InputSource.__init__..nonecs tSro) collections defaultdictr!rpr!r"dictnoneTsz&InputSource.__init__..dictnoneTNz@incr%rrz[%s] %s: key error: %slocationhistpriofcstpriotyperz2Do not know how to make a DataCatalog of type "%s"rb1970010100-2038011818)rvrxrwrbzBBad source %s: must have location and either histprio or fcstprio.ztInput sources must ahve location and either histprio or fcstprio. Check options in [%s]: %s and rerun launcher job., )(rKrMrNrqrr _sftp_dir_ok_loggerr=forecast _f_sortedhistory _h_sorted threadingLocklocksAssertionErrorrhtarhsivalid has_optionextendrdictr>rr;rWwarningrAitemsrraddappendhafs exceptionsInvalidInputSpecificationjoin_sort)rOrKrMrNrrarrtsectionssourcesseckeycsrcattrZkebadZdctypedcrbr!rsr"rPFs                   zInputSource.__init__rfcCs|jdd}d|ks$tjdkr6tt|d}ntt|d}|r|r|dd||f|f}|dk r|d|||t|fn4|d d||ff}|dk r|d ||t|f|S) a!Creates a produtil.prog.Runner for running rsync over ssh. Returns a Runner object (as in produtil.run) for executing rsync -e ssh. This subroutine is used to implement workarounds for known bugs. @param netpart The netpart portion of the sftp URL. @param path The path portion of the sftp URL. @param dest The destination on the local disk.rsyncjet)-esshz --protocolZ29)rrz-LvptgoDz%s:%sNz%s:%s=>%s = %sz-dzcheck(%s:%s) = %s) rKgetexer(clusternamerrr\rC)rOnetpartr+checkdirdestrarcmdr!r!r"_rsync_ssh_exes zInputSource._rsync_ssh_execCs,t|jddd|_t|jddd|_dS)zT!Sorts the list of history and forecast DataCatalogs by decreasing priority.cSs |d SNrr!xr!r!r"z#InputSource._sort..)rcSs |d Srr!rr!r!r"rrN)r<r~r)rOr!r!r"rszInputSource._sortcCs|dkr|dkrdS|dkr d}tj|}|dk rT|jt|||||fd|_|dk r||jt|||||fd|_dS)aI!Adds a DataCatalog to this InputSource. Called automatically from the constructor to add a DataCatalog to this InputSource. The list of add() calls is generated from the config section specified in the constructor. You should never need to call this function unless you want to explicitly add more DataCatalog objects that are not listed in the config files. The location parameter is a URL from file, sftp, ftp or htar. Examples: * local files: file:///lfs4/HFIP/hwrf-data/hafs-input/ * scp: sftp://Some.Username@dtn-zeus.rdhpcs.noaa.gov/ * ftp: ftp://anonymous@ftpprd.ncep.noaa.gov/ * htar: htar:///NCEPPROD/1year/hpssprod/runhistory/rh2012/201204/20120418/ @warning Bad things will happen if you add the same source twice. Bad things. @note If fcstprio and histprio are both None, this call has no effect. @param dc the DataCatelog object @param location the URL of the data source, including the username if needed. @param fcstprio the priority for using this source in FORECAST (real-time) mode. If missing or None, the source will not be used in FORECAST mode. @param histprio the priority for using this source in HISTORY (retrospective) mode. If missing or None,the source will not be used in HISTORY mode. @param dates Dates for which this source is valid. This is passed to the trange argument of in_date_range(t,trange) NrzF) urllibrdurlparser~rfloatrrr)rOrrvrxrwrbparsedr!r!r"rs# zInputSource.addc Cs$|dkr|j}|dk r&|d|ftd|}|rX|\}}|rJ|sbtd|n d|}}d}zz\|dk r|d||ftj |||d}| |dk st |}d}d t d |<|WW4St k r} zd t d |<W5d} ~ XYnXW5|dk r|dk r|d|XdS) ag!Opens an FTP connection Opens the specified ftp://user@host/... request subject to the specified timeout, logging to the specified logger (if present and non-Null). @param netpart The netpart portion of the URL @param logger the logging.Logger for log messages @param timeout the connection timeout in secondsNz open_ftp %sz([a-zA-Z0-9_.-]+)+@(.+)z:FTP logins must be of the form user@host but you gave "%s"Z anonymousz%In finally block, closing FTP stream.z %s@%s: log in)timeoutTzftp://F)r}r\researchgroupsZ InvalidLoginrcloseftplibFTPloginrr Exception) rOrrarruserhostr0retvalrYr!r!r"open_ftps>       "   zInputSource.open_ftpc Cspz |j||d}t||dWdStk rj}z,|dk rT|dt|t|fWYdSd}~XYnXdS)a'!Checks to see if rsync can even access a remote server. @param netpart the netpart portion of the URL @param logger the logging.Logger for log messages @param timeout the connection timeout in seconds @returns True if the server is accessible and False otherwise)rraTNz%s: rsync cannot access: %sF)rrrrrA)rOrrardirpathrrYr!r!r"rsync_check_accesss zInputSource.rsync_check_accessTc Cs*|dkr|j}tj|} tjj||dd} tj| } |dk r\|dt|t|t| | j} | j} | j } d| | f}| dkr| | | | | | |||||||| S| dkr|j |0| | | | | | |||||||| W5QRSQRXn8| dkr| | | | | | |||||||| Std | dS) a!Internal implementation function that fetches one file. You should not call this directly; it is meant to be called by "get" and re-implemented in subclasses. This grabs one file, potentially from a remote location. The URL for the base directory of some dataset is in dsurl, while the specific file is in urlmore. The urlmore will be appended to the file part of dsurl via urljoin, and the resulting file will be transferred. @param streams a list used to store opened streams @param dc the DataCatalog being obtained @param dsurl the URL of the DataCatalog @param urlmore additional parts of the URL such as the reference or HTTP Get @param dest The local disk destination @param logger the logging.Logger for log messages @param timeout the connection timeout in seconds @param realtime True for FORECAST mode, False for HISTORY mode. @returns True if successful, False if notNT)allow_fragmentsz %s + %s = %s%s://%sfileftpsftpzLCannot transfer this url: unsupported method (not htar, ftp, file or sftp): )r}rrdrurljoinr\rCschemer+netloc_impl_fetch_filer_impl_fetch_ftp_impl_fetch_sftpr)rOstreamsrdsurlurlmorerrarrealtimerjoinedrr+rnr!r!r" fetch_file,s     zInputSource.fetch_filecCsn| dk r| d| |f| r&|s@tj|rft|tjrfttj| | dt || d| dndSdS)a!Fetches a file from local disk by making a symbolic link. @param parsed The parsed URL from urlparse.urlparse @param joined The joined URL from urlparse.urljoin @param scheme The data transfer scheme (ftp, sftp, etc.) @param path The URL path @param netpart the netpart portion of the URL. @param streams the array of transfer streams @param dc the DataCatalog for the remote data @param dsurl the dataset URL @param urlmore section and other parts of the URL @param dest the local disk destination @param logger the logging.Logger for messages, or None @param timeout connection timeout in seconds, ignored @param realtime True for FORECAST mode, False if not. In FORECAST mode, the symbolic link is made even if the file does not exist, so long as the DataCatalog is marked as realtime (DataCatalog.rt_updated() returns True) @returns True on success, False if the file was not linkedNz%s: from local file %srT)forceraF) r\rSr*r+existsaccessR_OKr r,r )rOrrrr+rrrrrrrarrr!r!r"r]s   zInputSource._impl_fetch_filec Cs~d}zBztj|}|j||}|dkr\| d||f|j|| |d}||j||<|dkr| d||fWWdS| d||fttj| | dt | d  }|j }W5QRX|j |||d }t || dt || d}WnTtjjk rD}z0| dk r*| d |t|jfWYWdSd}~XYnXW5|dk rx| dk rn| d|ft|Xd S) a+!Fetches a file via rsync over ssh. @param parsed The parsed URL from urlparse.urlparse @param joined The joined URL from urlparse.urljoin @param scheme The data transfer scheme (ftp, sftp, etc.) @param path The URL path @param netpart the netpart portion of the URL. @param streams the array of transfer streams @param dc the DataCatalog for the remote data @param dsurl the dataset URL @param urlmore section and other parts of the URL @param dest the local disk destination @param logger the logging.Logger for messages, or None @param timeout connection timeout in seconds @param realtime True for FORECAST mode, False if not. Ignored. @returns True on success, False if the file was not copiedNz(In finally block, deleting temp file %s.z%s:%s: check access.)rarFz%%s:%s: skip: directory inaccessibble.z%s:%s: can accessrwb)rz%s: non-zero exit status %sT)rr*remover+r,r|r\rr r2rrrrenamer(r rrC returncode)rOrrrr+rrrrrrrarrtempnamerokr0rrYr!r!r"r{sL          zInputSource._impl_fetch_sftpc CsDd||f}||kr(|j|| | d||<||}d}zttj| | dt| d>}|j}| dk rz| d||j|f| d|j|j W5QRX| |j}|dk rtj |}||kr| dk r| d |||ftd ||f| dk r| d | |ft|| d}W5|dk r>| d|t|Xd S) a!!Fetches a file over FTP. @param parsed The parsed URL from urlparse.urlparse @param joined The joined URL from urlparse.urljoin @param scheme The data transfer scheme (ftp, sftp, etc.) @param path The URL path @param netpart the netpart portion of the URL. @param streams the array of transfer streams @param dc the DataCatalog for the remote data @param dsurl the dataset URL @param urlmore section and other parts of the URL @param dest the local disk destination @param logger the logging.Logger for messages, or None @param timeout connection timeout in seconds @param realtime True for FORECAST mode, False if not. Ignored. @returns True on success, False if the file was not copiedr)rarNz'In finally block, removing temp file %srrz%s: pull %s => %szRETR z%%s: wrong size: %d local vs %d remotez>Could not transfer full file: only %d of %d bytes transferred.z%s: move from %sT)rrr*rr r+r,r2rr\Z retrbinaryr:sizegetsizerr)rOrrrr+rrrrrrrarrrstreamrr0Z remote_sizeZ local_sizer!r!r"rsN         zInputSource._impl_fetch_ftpcCs0|r|js||jS|js&||jSdS)a,!Returns the list of DataCatalog objects for FORECAST or HISTORY mode. @param realtime True for FORECAST mode, False for HISTORY @returns self.forecast or self.history @post _sort() has been called, sorting self.forecast and self.history in order of priorityN)rrr~rr)rOrr!r!r"list_forszInputSource.list_forc  Cstjjdddd} t| tjjs(t|jdk s6t|jdk r|jddd|dfd k}t|| d }|d kr| d |t|f|j d |fdd| D| j }| d| j ft tjj| j dD]}| |qt|| d }|d kr,| d|tdD]}tdqt|| d }|d krD| d|| dk rd| d|t|ft}t}|D]\}}|d }tj| j |}| d||ftj|rsz1InputSource._impl_get_archive..z%s: list contents)r+z>non-zero exit status %d from htar; will retry in five seconds.2g?z9non-zero exit status %d from htar; will keep going anywayz%s: pull %d filesz%s: check for this at %s)keeprarz %s: add %dz^(../)+z%s: does not existz%s: does not have: %sr{z%s: has files: %sz0%s: gleefully reporting all desired files found.)#r(rTempDirr?rrr rintrr>r,r\rAlistingListingrangetimesleeplensetrr*r+rdebugrr r rrelpathrsubr<r=)rOarchpathpartsdonepriolocrrdata target_dcrra skip_existingtdierrrlinestatrZnopeZyupfilepartZtgtitgtrZrelfilemissingfoundr!r!r"_impl_get_archives                  zInputSource._impl_get_archivec Cs |d}|dkrh|d|}||dd}|| krV|| |krV| |||n||g| ||<n|dddkr|d|fdSz&|j| |||||| dr||WnTttfk r}z0|dk r|jd t|t|t|fd d W5d}~XYnXdS) a!Obtain one or more files. @param i The index in done of the file being fetched @param done an array of logical flags telling which files are transferred @param src the source location @param tgt the target location @param prio the numerical priority @param loc the on-disk destination @param parsed the parsed URL as output by urlparse.urlparse @param dc the DataCatalog @param streams the array of transfer streams @param archives a double-nested dict of lists, mapping from archive name to file part to index within done of the file in question @param target_dc the DataCatalog of the target locations @param realtime True for FORECAST mode, False for HISTORY mode @param logger the logging.Logger for log messages @param skip_existing if True, do not re-download files that already exist on disk (in the target_dc)#rrNzhtar:z %s: no # in path - skipping this)rarzfetching %s=>%s: %sTexc_info)rrrrrEnvironmentErrorrrA)rOrrrrrrrrrarchivesrrrrararchseparchrrYr!r!r"_impl_get_file8s>  zInputSource._impl_get_filec Cs`t}|d|D]4\}}}}}|dt|t|t|t|fq|}||S)a!Generates a string containing a human-readable, prioritized list of data sources. @param dclist The data source list from list_for() @returns A multi-line string containing the table. Example: Prioritized list of data sources: PRIO- LOCATION = SOURCE @ DATES 100 - file:/// = DataCatalog(conf,'wcoss_fcst_PROD2019',2019080518) @ '1970010100-2038011818' 098 - file:/// = DataCatalog(conf,'wcoss_prepbufrnr_PROD2019',2019080518) @ '1970010100-2038011818' 097 - file:// = DataCatalog(conf,'zhan_gyre',2019080518) @ '2011060718-2011111200,2013051800-2013091018'zDPrioritized list of data sources: PRIO- LOCATION = SOURCE @ DATES z%03d - %10s = %s @ %s )r8r9r:rrArCrDr) rOdclistrGrrrrrbsvr!r!r" priotablegs   zInputSource.priotableFcCs|dkr|j}||}t}||||D]\}} } } } | dk sNt|dk sZt| dk sft| dk srt| dk s~t| j} | j}|dt|t | t | t | f| dkr| ||s| d|fq2n| dkr| d| | fq2t }tt }d}ztjd |r}d }t}td t|t | t | t | f|D]}|d7}||krnqRd|ks|t|jf|}|dkrqR||kr|dk rR|d|fqRtj|r|r|dk r|d|f||qR|dk r|dt |t |fd}|dk rF|dt |t | fz| jfd| i|}WnDtk r}z$|d|t |fWYqRW5d}~XYnX|dkrqR|dk r|dt|t |f|||j |j!|||||| | | |||||||gdqR|"|D]^\}}t#|d krV|dk r"|d|fq"|j |j$||||| | | |||||g dq"|"W5QRXW5|dk r|d|D]`\}}z |WnHtk r}z(|dk r|jd|t |fd d W5d}~XYnXqX~q2tdt|t | t | t | fd }d}|D]}|d7}||krhqN|jf|}tj|rqN|%ddr|dk r|dt |fn"|dk r|dt |fd }qN| S)a!Transfers the specified set of data to the specified target. The "target_dc" is a DataCatalog that specifies the destination filenames. The "realtime" argument is True for FORECAST (real-time) mode runs, and False for HISTORY (retrospective) mode runs. The "data" argument should be an iterable (list, tuple, etc.) where each element is a dict-like object that describes one file to obtain. Each dict contains: dataset - string name of the dataset (gfs, gdas1, gefs, enkf, etc.) item - string name of the object (ie.: sf, sfcanl, bufr) atime - Optional: a datetime.datetime specifying the analysis time. Default is the atime from the InputSource's constructor. ftime - Optional: a datetime.datetime specifying the forecast time. ...others... - any other keyword arguments will be sent to the .location functions in any of this InputSource's DataCatalog objects.Nz Consider: %03d - %10s = %s @ %s rz%s: cannot access; will skip)rrrz'%s: invalid transfer mode %s; will skip"In finally block, closing streams.%Exception while closing stream %s: %sTrrz!Pull from: %03d - %10s = %s @ %s rrbz%s: already processing thisz%s: already existsz%s => %sz (unknown)zsearch for %s in %sz%s: key error %sz SRC %s => %s)argsz%s: nothing to pull; skipz6Exited input loop after source: %03d - %10s = %s @ %s Foptionalzmissing optional data: %szMISSING INPUT: %s)&r}rrr\rrrrrrArCrerrorrrqrrrrrrr(workpoolZWorkPoolrrjr*r+rrrrWrJZadd_workrZbarrierrr r;)rOrrrrarrrrrrrrbrrrrrrrrYrseenrFrrrHrrrr!r!r"r;|s*                                 zInputSource.getcKs|dkr|j}t}ztt|} | D]b\} }}}|jf||d|}|dkrNq"| d}|dkrjt d q"|j ||||||d r"qq"W5|dk r|d|D]Z\} } z | Wqtk r} z&|dk r|jd| t| fddW5d} ~ XYqXqXdS) a,!This is a simple wrapper around fetch_file that gets only one file. It will fail if the file requires pulling an archive. @param dataset the dataset to transfer @param item the desired item in the dataset @param dest the on-disk destination filename @param logger a logging.Logger for log messages @param timeout the connection timeout in seconds @param realtime True for FORECAST mode, False for HISTORY mode @param kwargs extra keyword arguments are passed to DataCatalog.locate()NrrTr)r4r5r rzYSource is in an archive. De-archiving is not supported by "get_one." Use "get" instead.r) r}rrrrrrArrjrNotImplementedErrorr)rOr4r5rrarrrcrrrrYrrrrrrrr!r!r"get_ones:      zInputSource.get_one)NNN)NrfNN)NNN)Nr)Nrrf)NrT)T)FNT)NrT)rkrlrmrnrPrrrrrrrrrrr rrr;r!r!r!r!r"r2s. e  - &  123 J/ )1rn__all__rqr*rr-rU urllib.parserrrrrr@r8 produtil.runr(produtil.clusterprodutil.fileop produtil.cdZprodutil.workpoolprodutil.listingtcutil.numericstcutilhafs.exceptionsrrrrrr r r r r rrrrrr produtil.logrrr2rJobjectrrr!r!r!r"s  `0 # D