U

����� g(�����������������������@���sZ��d�Z�dddgZddlZddlZddlZddlZddlZddlZddl	Z	ddl
Z
ddlZddlZddl
Z
ddlZddlZddlZddlZddlZddlZddlZddlZddlZddlmZmZmZmZmZ�ddlmZmZm Z m!Z!�ddlm"Z"m#Z#m$Z$�dd	lm%Z%m&Z&m'Z'�dd
l(m)Z)�dd��Z*dd
��Z+dd��Z,dd��Z,G�dd��de-�Z.G�dd��de-�Z/dS�)a���!Obtains input data needed by various subclasses of
hwrf.hwrftask.HWRFTask.  

This module implements the functionality described in
hwrf.hwrftask.HWRFTask.inputiter().  It takes many HWRF tasks, asks
them what input is needed and collates that information.  It has a
list of many possible input sources, and knows which ones are
available from which cluster.  It goes through available input sources
in priority order, obtaining the input data.�DataCatalog�InputSource�
in_date_range�����N)�alias�batchexe�checkrun�ExitStatusException�run)�deliver_file�
isnonempty�make_symlink�makedirs)�to_datetime�to_datetime_rel�to_timedelta)�InputSourceBadType�PartialTransfer�UnsupportedTransfer)�jloggerc�����������������C���s����t�d�}t|��}�|�d�D�]�}|�d�}|dk�r^t|�}|�t|�|�kr�|�t||�kr��dS�qt|dd���}t||d�|d����}|�t|�|�kr|�t||�kr�dS�qd	S�)
ab��!Is this time in the given time range?

    @param t A time as a ten digit number.  For example, 1830123118 is
      December 31, 1830 at 18:00 UTC.
    @param trange A comma-separated list of time ranges such as
    this:
    * 2015081412 --- 12:00 UTC on August 14, 2015
    * 2015081412-2015082318 --- From 12:00 UTC on August 14, 2015
      through 18:00 UTC on August 23, 2015
    * 2015081412-2015082318,2011010100-2011123123 --- From 12:00 UTC
      on August 14, 2015 through 18:00 UTC on August 23, 2015 and all
      of year 2011.
    @returns True if t falls in the range trange, or False otherwise.Z1800�,�-r���T�
�����������F)r���r����split�findr���)�tZtrange�epsilonZtrZidash�start�end��r ����7/lfs/h1/ops/prod/packages/hmon.v3.2.7/ush/hwrf/input.pyr������s"����
��c�����������������C���s8���t�j�tj�|����tjtj�|��tj�|��|ddd�S�)zX!Convenience function that opens a temporary file using
    tempfile.NamedTemporaryFile.z.tmpF)�prefix�dir�mode�suffix�delete)	�produtil�fileopr
����os�path�dirname�tempfile�NamedTemporaryFile�basename)�f�mr ���r ���r!����tempopenB���s����
���r1���c�����������������C���s����t����}|�d|��dd�|��dd�f���tt|������D�]b}|dks:|dkrPq:|�|�}t|tj�r�|�dt	|�|�
d�f���q:|�dt	|�t|�f���q:|�d��|���S��	z}!Makes a string version of a dataset+item dict as produced by
    hwrf_expt.inputiter() or hwrf.hwrftask.HWRFTask.inputiter()z%s(%s�datasetz(**no*dataset**)�itemz
(**no*item**)z, %s=%sz%Y-%m-%d_%H:%M:%S�)�
�io�StringIO�write�get�sorted�list�keys�
isinstance�datetime�str�strftime�repr�getvalue��d�s�k�vr ���r ���r!����strsrcL���s����
��
rI���c�����������������C���s����t����}|�d|��dd�|��dd�f���tt|������D�]b}|dks:|dkrPq:|�|�}t|tj�r�|�dt	|�|�
d�f���q:|�dt	|�t|�f���q:|�d��|���S�r2���r6���rD���r ���r ���r!���rI���^���s����
��
c�������������������@���s<���e�Zd�ZdZdd��Zdd��Zdd��Zd
d	d
�Zddd�ZdS�)r���a�
��!Provides the location of a file in an archive, on disk or on a
    remote server via sftp or ftp.

    This class is a collection of functions that know how to provide
    the location of a file in either an archive or a filesystem.  It
    does not know how to actually obtain the file.  This serves as the
    underlying "where is that file" implementation of InputSource.
    All of this is driven by a section in an hwrf.config.HWRFConfig
    object.

    For example, suppose one set up this configuration file:
    @code{.conf}
      [wcoss_fcst_nco]
      # WCOSS: Input locations for the production HWRF
      gfs               = /com/gfs/prod/gfs.{aYMD}/
      gdas1             = /com/gfs/prod/gdas.{aYMD}/
      gfs_sf            = gfs.t{aHH}z.sf{fahr:02d}
      gfs_sfcanl        = gfs.t{aHH}z.sfcanl
      gdas1_bufr        = gdas1.t{aHH}z.{obstype}.tm00.bufr_d
    @endcode

    In this example, "gfs" is a dataset, while "gfs_sfcanl" is an item
    in the dataset.  The DataCatalog.locate() function can find the
    location of a gfs_sf file given the inputs required for string
    expansion by hwrf.config.HWRFConfig.timestrinterp().  In this
    case, only the analysis time is required for the "{aYMD}" in the
    dataset location and "{aHH}" in the gfs_sfcanl filename.
    @code{.py}
      dc=DataCatalog(conf,"wcoss_fcst_nco","2015091800")
      sfcanl=dc.locate("gfs","gfs_sfcanl")
      print sfcanl
    @endcode
    That code would print "/com/gfs/prod/gfs.20150818/gfs.t00z.sfcanl"
    which is the operational output path of the GFS surface analysis
    file for the analysis time in question.

    Suppose we wanted the spectral forecast file, "gfs_sf" instead,
    for forecast hour 54.  That also requires the forecast time
    ("ftime") in order to fill in the "{fahr:02d}" in the filename
    with the number 54.
    @code{.py}
      dc=DataCatalog(conf,"wcoss_fcst_nco","2015091800")
      sf48a=dc.locate("gfs","gfs_sf",ftime="2015092006")
      sf48b=dc.locate("gfs","gfs_sf",ftime=48*3600)
      print sf48a
      print sf48b
    @endcode
    That code would print "/com/gfs/prod/gfs.20150818/gfs.t00z.sf54"
    twice.  Note that you can specify the forecast time as an absolute
    time, or as a number of seconds relative to the analysis time and 
    achieve the same effect either way.

    If we want the bufr file, we have to provide one more piece of
    information: the observation type, to fill in "{obstype}".
    @code{.py}
      dc=DataCatalog(conf,"wcoss_fcst_nco","2015091800")
      gpm=dc.locate("gdas1","gdas1_bufr",obstype="gpm")
      print gpm
    @endcode
    which prints "/com/gfs/prod/gdas.20150918/gdas1.t00z.gpm.tm00.bufr_d"
    c�����������������C���s,���||�_�t|t�std��||�_t|�|�_dS�)z�!DataCatalog constructor
        @param conf the configuration object, an hwrf.config.HWRFConfig
        @param section the section that provides location information
        @param anltime the default analysis time        z2In DataCatalog.__init__, section must be a string.N)�confr>���r@����	TypeError�sectionr����anltime)�selfrJ���rL���rM���r ���r ���r!����__init__����s
����
zDataCatalog.__init__c�����������������C���s8���t�|�jtj�r|�j�d�}n
t|�j�}dt|�j�|f�S�)z,!A string representation of this DataCatalogz%Y%m%d%HzDataCatalog(conf,%s,%s))r>���rM���r?���rA���r@���rB���rL���)rN����stimer ���r ���r!����__repr__����s����
zDataCatalog.__repr__c�����������������C���sH���zt��tdd�W�S��tjtttfk
rB�}�z
W�Y��dS�d}~X�Y�nX�dS�)z�!Is this dataset updated in real-time?

        @returns True if this dataset is updated in real-time, False
        otherwise.  By default, this will return True if
        conf[section,"rt_updated"] is set to "yes" or False otherwise.�
rt_updatedFN)rJ���ZgetboolrL����configparser�Error�KeyErrorrK����
ValueError)rN����er ���r ���r!���rR�������s����zDataCatalog.rt_updatedNc�����������������K���s��|dkr0|dk	r*|��dt|�t|�f���|�j}|dkr^|dk	rZ|��dt|�t|�f���|}t|�}t||�}|dk	r�|dk	r�t||�s�|dk	r�|��dt|�t|�t|�f���dS�|dk	r�|��dt|�t|�t|�t|�j�f���|�jj	|�jd|�d�||f|�S�)a:��!Internal function that performs string interpolation.
        
        This is an internal implementation function that you should
        not call directly.  It performs string interpolation using the
        underlying conf object.  This acts exactly like the expansions
        done in the hwrf.conf file: {stuff} is expanded to the
        contents of the "stuff" variable.  Expansions are done in the
        section specified in the constructor.  In addition, various a*
        and f* variables are expanded based on the analysis time
        ("atime") and forecast time ("ftime").  See
        hwrf.config.HWRFConfig.timestrinterp() for details.
        @param string the string being expanded
        @param atime Optional: the analysis time.  The default is self.anltime
        @param ftime Optional: the forecast time.
        @param logger Optional: a logging.Logger for log messages
        @param dates Optional: dates for which this datasource is valid.
          This is passed to in_date_range() for validation.  This is
          used to implement the InputSource date ranges.
        @param kwargs Additional keyword arguments are passed to the
          hwrf.config.HWRFConfig.timestrinterp() for string replacement.
        @returns The return value from string interpolation or None if
          nothing was found.Nz4{%s}: has no atime.  Will use atime=self.anltime=%s.z-{%s}: has no ftime.  Will use ftime=atime=%s.z{%s}: atime %s not in %sz1parsing {%s} with ftime=%s atime=%s in section %s�{�})
�infor@���rB���rM���r���r���r���rL���rJ���Z
timestrinterp)rN����string�atime�ftime�logger�dates�kwargsr ���r ���r!����parse����sR���������

������
����zDataCatalog.parsec�����������������K���s����|dk	r.|��dt|�t|�t|�t|�f���|�j|f||||d�|��}|dkrVdS�|�j|f|||d�|��}	d�|dd���dkr�||	�}
n|d�|	�}
|dk	r�|��d	t|�t|	�t|
�f���|
S�)
a��!Find the location of a requested piece of data.

        Locates the specified item for the specified dataset, at the
        given analysis time ("atime") and forecast time ("ftime").  If
        the requested data is known to not exist, returns None.  This
        should be overridden by subclasses.  The present
        implementation just does this: {dataset}/{item} expanding
        dataset and item with self.parse.  Any kwargs are passed
        along: this allows such things as ensemble ID, or switching
        between GRIB1 or GRIB2 via a keyword argument.
        @param dataset The name of the dataset.
        @param item The name of the item in the dataset.
        @param atime Optional: the analysis time.  The default is self.anltime.
        @param ftime Optional: the forecast time which can be anything
          accepted by hwrf.numerics.to_datetime_rel() relative to the
          analysis time.
        @param logger Optional: a logging.Logger for log messages.  If this
          is provided, several steps along the way of finding the data 
          location are logged.
        @param dates Optional: dates for which this datasource is valid.
          This is passed to in_date_range() for validation.  This is
          used to implement the InputSource date ranges.
        @param kwargs Additional keyword arguments are passed by
          parse() to the hwrf.config.HWRFConfig.timestrinterp() for
          string replacement.
        @return The path to the requested data or None if it is not found.Nz.locate item=%s atime=%s ftime=%s in dataset=%s)r\���r]���r^���r_���)r\���r]���r^���z/#�����r����/zresult %s %s => %s)rZ���rB���ra���r���)rN���r3���r4���r\���r]���r^���r_���r`����ds�it�resultr ���r ���r!����locate��s0���������
���zDataCatalog.locate)NNNN)NNNN)	�__name__�
__module__�__qualname__�__doc__rO���rQ���rR���ra���rg���r ���r ���r ���r!���r���o���s���=

1���c�������������������@���s����e�Zd�ZdZd'dd�Zd(dd�Zdd	��Zd)d
d�Zd*d
d�Zd+dd�Z	d,dd�Z
dd��Zdd��Zdd��Z
d-dd�Zdd��Zdd��Zd d!��Zd.d#d$�Zd/d%d&�ZdS�)0r���a/��!Fetch data from multiple sources.

    This class knows how to fetch data from remote clusters, or the
    local machine.  The data locations are specified by a several
    DataCatalog sections, each of which is given a priority, a valid
    set of dates and a file transfer mechanism.  Data catalogs are
    tried in priority order.  Files are obtained in multiple threads
    at once, and several file transfer mechanisms are understood:

    * file://  ---  obtain files on disk
    * ftp://   ---  contact an FTP server
    * sftp://  ---  contact a server over SSH.  SSH-based rsync is used.
    * htar://  ---  use the proprietary htar program to get a tape archive

    However, only one DataCatalog is examined at a time.  All threads
    work on that one DataCatalog until all data that can be obtained
    from it is done.  Then the threads exit, and new ones are spawned
    to examine the next DataCatalog.

    For example, suppose you are on the Jet supercomputer running a
    HISTORY (retrospective) simulation.  You set up this configuration
    section in your hwrf.conf config file:
    @code{.conf}
      [jet_sources_prod2014]
      jet_hist_PROD2014%location  = file:///
      jet_hist_PROD2014%histprio=90
      jet_hist_PROD2014%fcstprio=90

      prod15_data_sp%location=htar://
      prod15_data_sp%histprio=59
      prod15_data_sp%dates=2015011218-2015123118
      
      [jet_hist_PROD2014]
      @inc=gfs2014_naming
      inputroot2014=/lfs3/projects/hwrf-data/hwrf-input
      gfs={inputroot2014}/HISTORY/GFS.{aYYYY}/{aYMDH}/
      gfs_sfcanl = gfs.t{aHH}z.sfcanl
      
      [prod15_data_sp]
      inputroot=/NCEPPROD/2year/hpssprod/runhistory/rh{aYYYY}/{aYYYY}{aMM}/{aYMD}
      gfs={inputroot}/
      gfs_sfcanl = {gfs_tar}#./gfs.t{aHH}z.sfcanl
      
      [hwrfdata]
      inputroot=/pan2/projects/hwrfv3/John.Doe/hwrfdata
      gfs={inputroot}/hwrf.{aYMDH}/
      gfs_sfcanl = gfs.t{aHH}z.sfcanl
    @endcode
    and this is the code:
    @code{.py}
      is=InputSource(conf,"jet_sources_prod2014","2015071806")
      hwrfdata=DataCatalog(conf,"hwrfdata")
      is.get([
         {"dataset":"gfs", "item":"gfs_sfcanl","atime"="2015071800"},
         {"dataset":"gfs", "item":"gfs_sfcanl","atime"="2015071806"},
         {"dataset":"gfs", "item":"gfs_sfcanl","atime"="2015071812"} ],
         hwrfdata,realtime=False)
    @endcode

    In this example, the InputSource will look for three GFS surface
    analysis files.  It will search two possible locations for them:
    the on-disk Jet "PROD2014" history location and the NCO production
    tape files.  The disk location will be searched first because its
    history priority is 90, while the tape area has a priority of 59.

    Three files will show up eventually:

    * /pan2/projects/hwrfv3/John.Doe/hwrfdata/hwrf.2015071800/gfs.t00z.sfcanl
    * /pan2/projects/hwrfv3/John.Doe/hwrfdata/hwrf.2015071806/gfs.t06z.sfcanl
    * /pan2/projects/hwrfv3/John.Doe/hwrfdata/hwrf.2015071812/gfs.t12z.sfcanl

    Each file will come from either here:

    * /lfs3/projects/hwrf-data/hwrf-input/HISTORY/GFS.2015071800/gfs.t00z.sfcanl
    * /lfs3/projects/hwrf-data/hwrf-input/HISTORY/GFS.2015071806/gfs.t06z.sfcanl
    * /lfs3/projects/hwrf-data/hwrf-input/HISTORY/GFS.2015071812/gfs.t12z.sfcanl

    or here:

    * htar -xf /NCEPPROD/2year/hpssprod/runhistory/rh2015/201507/20150718/2015071800gfs.tar ./gfs.t00z.sfcanl
    * htar -xf /NCEPPROD/2year/hpssprod/runhistory/rh2015/201507/20150718/2015071806gfs.tar ./gfs.t06z.sfcanl
    * htar -xf /NCEPPROD/2year/hpssprod/runhistory/rh2015/201507/20150718/2015071812gfs.tar ./gfs.t12z.sfcanl    Nc��������������������s���||�_�||�_||�_dd������fdd�}t�|�|�_||�_t��|�_d|�_	t��|�_
d|�_t�tj
�|�_|dk	snt�|dk	szt�t|�|�_t|�|�_t�d�|�_|g}|�|d�r�|�||df��d���t�t�}	|D�]�}
|�|
�D�]�}|�d	�}|d
kr�|d
|��||d�d���}
}z|�|
|�|	|
�|<�W�q��tk
�rz�}�z.|dk	�rb|�d|
|t|�f���W�Y��q�W�5�d}~X�Y�q�X�q�q�t��}t|	����D�]�\}
}d
|k�r0d|k�s�d|k�r0|�dd�}|dk�r�t|�j�|
|�j�}nt d|f���d|k�r|d�}nd}|�j!||d
�|�dd�|�dd�|d��n|�d|
f���|�"t|
����q�|�rtt#j$�%d|�jd�&|�f���|��'���dS�)a���!InputSource constructor.
        @param conf    the hwrf.config.HWRFConfig to use for
          configuration info
        @param section the section that specifies the list of data catalogs
        @param anltime the default analysis time
        @param htar    the produtil.prog.Runner that runs htar
        @param logger  a logging.Logger for log messages
        @param hsi     the produtil.prog.Runner that runs hsic�������������������S���s���d�S��Nr ���r ���r ���r ���r!����none���s����z"InputSource.__init__.<locals>.nonec����������������������s
���t�����S�rl���)�collections�defaultdictr ����rm���r ���r!����dictnone���s����z&InputSource.__init__.<locals>.dictnoneTNz@incr����%r���r���z[%s] %s: key error: %s�location�histprio�fcstprio�typer���z2Do not know how to make a DataCatalog of type "%s"r_����1970010100-2038011818)rs���ru���rt���r_���zBBad source %s: must have location and either histprio or fcstprio.ztInput sources must ahve location and either histprio or fcstprio.  Check options in [%s]: %s and rerun launcher job.�, )(rJ���rL���rM���rn���ro����_sftp_dir_ok�_loggerr<����forecast�	_f_sorted�history�	_h_sorted�	threading�Lock�locks�AssertionErrorr����htar�hsi�validZ
has_option�extendr����dictr=���r���r:���rU����warningr@����itemsr���r����add�append�hwrf�
exceptions�InvalidInputSpecification�join�_sort)rN���rJ���rL���rM���r����r^���r����rq���Zsections�sourcesZsec�key�c�src�attrZke�badZdctype�dcr_���r ���rp���r!���rO������s�����	




����
��



���zInputSource.__init__rc���c�����������������C���s����|�j��dd�}d|ks$tj���dkr6tt|�d��}ntt|�d��}|r�|r�|dd||f�|f�}|dk	r�|�d|||t|�f���n4|d	d||f�f�}|dk	r�|�d
||t|�f���|S�)a���!Creates a produtil.prog.Runner for running rsync over ssh.

        Returns a Runner object (as in produtil.run) for executing
        rsync -e ssh.  This subroutine is used to implement
        workarounds for known bugs.
        @param netpart The netpart portion of the sftp URL.
        @param path The path portion of the sftp URL.
        @param dest The destination on the local disk.�rsync�jet)�-e�sshz
--protocolZ29)r����r����z-LvptgoDz%s:%sNz%s:%s=>%s = %sz-dzcheck(%s:%s) = %s)	rJ����getexer'����cluster�namer���r���rZ���rB���)rN����netpartr*����checkdir�destr^���r�����cmdr ���r ���r!����_rsync_ssh_exe���s����	zInputSource._rsync_ssh_exec�����������������C���s,���t�|�jdd��d�|�_t�|�jdd��d�|�_dS�)zT!Sorts the list of history and forecast DataCatalogs by
        decreasing priority.c�����������������S���s
���|�d��S��Nr���r �����xr ���r ���r!����<lambda>�������z#InputSource._sort.<locals>.<lambda>)r����c�����������������S���s
���|�d��S�r����r ���r����r ���r ���r!���r������r����N)r;���r{���r}���)rN���r ���r ���r!���r������s����zInputSource._sortc�����������������C���s����|dkr|dkrdS�|dkr d}t�j�|�}|dk	rT|�j�t|�||||f��d|�_|dk	r||�j�t|�||||f��d|�_dS�)aO��!Adds a DataCatalog to this InputSource.  

        Called automatically from the constructor to add a DataCatalog
        to this InputSource.  The list of add() calls is generated
        from the config section specified in the constructor.  You
        should never need to call this function unless you want to
        explicitly add more DataCatalog objects that are not listed in
        the config files.

        The location parameter is a URL from file, sftp, ftp or htar.
        Examples:

        * local files: file:///lfs3/projects/hwrf-data/hwrf-input/
        * scp:         sftp://Some.Username@dtn-zeus.rdhpcs.noaa.gov/
        * ftp:         ftp://anonymous@ftpprd.ncep.noaa.gov/
        * htar:        htar:///NCEPPROD/1year/hpssprod/runhistory/rh2012/201204/20120418/

        @warning Bad things will happen if you add the same source
          twice.  Bad things.
        @note If fcstprio and histprio are both None, this call has no
          effect.

        @param dc the DataCatelog object
        @param location the URL of the data source, including the
            username if needed.
        @param fcstprio the priority for using this source in FORECAST
            (real-time) mode.  If missing or None, the source will not
            be used in FORECAST mode.
        @param histprio the priority for using this source in HISTORY
            (retrospective) mode.  If missing or None,the source will
            not be used in HISTORY mode.

        @param dates Dates for which this source is valid.  This is
          passed to the trange argument of in_date_range(t,trange) Nrw���F)	�urllibra����urlparser{���r�����floatr|���r}���r~���)rN���r����rs���ru���rt���r_����parsedr ���r ���r!���r������s����#�zInputSource.add����c�����������
���
���C���s$��|dkr|�j�}|dk	r&|�d|f���t�d|�}|rX|���\}}|rJ|sbtd|���n
d|�}}d}z�z\|dk	r�|�d||f���tj	|||d�}|�
���|dk	s�t�|}d}d	td
|�<�|W�W��4S��t
k
r��}	�zdtd
|�<�W�5�d}	~	X�Y�nX�W�5�|dk	�r|dk	�r|�d��|����X�dS�)ag��!Opens an FTP connection

        Opens the specified ftp://user@host/... request subject to the
        specified timeout, logging to the specified logger (if present
        and non-Null).
        @param netpart The netpart portion of the URL
        @param logger the logging.Logger for log messages
        @param timeout the connection timeout in secondsNzopen_ftp %sz([a-zA-Z0-9_.-]+)+@(.+)z:FTP logins must be of the form user@host but you gave "%s"Z	anonymousz%In finally block, closing FTP stream.z
%s@%s: log in)�timeoutTzftp://F)rz���rZ����re�search�groupsZInvalidLoginr�����close�ftplibZFTPZloginr����r�����	Exception)
rN���r����r^���r�����r�user�hostr/����retvalrW���r ���r ���r!����open_ftp5��s>����	���
�
"


zInputSource.open_ftpc��������������
���C���sp���z |�j�||d�}t||d��W�dS��tk
rj�}�z,|dk	rT|�dt|�t|�f���W�Y��dS�d}~X�Y�nX�dS�)a'��!Checks to see if rsync can even access a remote server.
        @param netpart the netpart portion of the URL
        @param logger the logging.Logger for log messages
        @param timeout the connection timeout in seconds
        @returns True if the server is accessible and False otherwise)r�����r^���TNz%s: rsync cannot access: %sF)r����r���r����r����r@���)rN���r����r^���r�����dirpathr����rW���r ���r ���r!����rsync_check_access[��s�����zInputSource.rsync_check_accessTc	�����������������C���s*��|dkr|�j�}tj�|�}	tjj||dd�}
tj�|
�}	|dk	r\|�dt|�t|�t|
���|	j}|	j}|	j	}
d||
f�}|dkr�|��
|	|
|||
||||||||�
S�|dkr�|�j|��0�|��|	|
|||
||||||||�
W��5�Q�R���S�Q�R�X�n8|dk�r|��
|	|
|||
||||||||�
S�td	|
���dS�)
a���!Internal implementation function that fetches one file.

        You should not call this directly; it is meant to be called
        by "get" and re-implemented in subclasses.  This grabs one
        file, potentially from a remote location.  The URL for the
        base directory of some dataset is in dsurl, while the specific
        file is in urlmore.  The urlmore will be appended to the file
        part of dsurl via urljoin, and the resulting file will be
        transferred.
        @param streams a list used to store opened streams
        @param dc the DataCatalog being obtained
        @param dsurl the URL of the DataCatalog
        @param urlmore additional parts of the URL such as the
          reference or HTTP Get 
        @param dest The local disk destination
        @param logger the logging.Logger for log messages
        @param timeout the connection timeout in seconds
        @param realtime True for FORECAST mode, False for HISTORY mode.
        @returns True if successful, False if notNT)Zallow_fragmentsz%s + %s = %s�%s://%s�file�ftp�sftpzLCannot transfer this url: unsupported method (not htar, ftp, file or sftp): )rz���r����ra���r����ZurljoinrZ���rB����schemer*����netloc�_impl_fetch_filer�����_impl_fetch_ftp�_impl_fetch_sftpr���)rN����streamsr�����dsurl�urlmorer����r^���r�����realtimer�����joinedr����r*���r�����nr ���r ���r!����
fetch_filek��s������������������������������
��������������zInputSource.fetch_filec�����������������C���sn���|dk	r|��d|
|f���|
r&|���s@tj�|�rft�|tj�rfttj�|
�|d��t	||
d|d��ndS�dS�)a��!Fetches a file from local disk by making a symbolic link.
        @param parsed The parsed URL from urlparse.urlparse
        @param joined The joined URL from urlparse.urljoin
        @param scheme The data transfer scheme (ftp, sftp, etc.)
        @param path The URL path
        @param netpart the netpart portion of the URL.
        @param streams the array of transfer streams
        @param dc the DataCatalog for the remote data
        @param dsurl the dataset URL
        @param urlmore section and other parts of the URL
        @param dest the local disk destination
        @param logger the logging.Logger for messages, or None
        @param timeout connection timeout in seconds, ignored
        @param realtime True for FORECAST mode, False if not.  In
          FORECAST mode, the symbolic link is made even if the file
          does not exist, so long as the DataCatalog is marked as
          realtime (DataCatalog.rt_updated() returns True)
        @returns True on success, False if the file was not linkedNz%s: from local file %sr����T)�forcer^���F)
rZ���rR���r)���r*����exists�access�R_OKr
���r+���r���)rN���r����r����r����r*���r����r����r����r����r����r����r^���r����r����r ���r ���r!���r�������s����
��zInputSource._impl_fetch_filec��������������
���C���s~��d}�zBz�tj�|�}|�j|�|�}|dkr\|�d||f���|�j|||d�}||�j|�|<�|dkr�|�d||f���W�W���dS�|�d||f���ttj�|
�|d��t	|
d	��}|j
}W�5�Q�R�X�|�j|||d
�}t||d��t�
||
��d}W�nT�tjjk
�rD�}�z0|dk	�r*|��d|t|j�f���W�Y��W��dS�d}~X�Y�nX�W�5�|dk	�rx|dk	�rn|��d|f���t�|��X�dS�)
a+��!Fetches a file via rsync over ssh.
        @param parsed The parsed URL from urlparse.urlparse
        @param joined The joined URL from urlparse.urljoin
        @param scheme The data transfer scheme (ftp, sftp, etc.)
        @param path The URL path
        @param netpart the netpart portion of the URL.
        @param streams the array of transfer streams
        @param dc the DataCatalog for the remote data
        @param dsurl the dataset URL
        @param urlmore section and other parts of the URL
        @param dest the local disk destination
        @param logger the logging.Logger for messages, or None
        @param timeout connection timeout in seconds
        @param realtime True for FORECAST mode, False if not.  Ignored.
        @returns True on success, False if the file was not copiedNz(In finally block, deleting temp file %s.z%s:%s: check access.)r^���r����Fz%%s:%s: skip: directory inaccessibble.z%s:%s: can accessr�����wb)r����z%s: non-zero exit status %sT)r����r)����remover*���r+���ry���rZ���r����r
���r1���r����r����r����renamer'���r	���r���rB����
returncode)rN���r����r����r����r*���r����r����r����r����r����r����r^���r����r�����tempnamer�����okr/���r����rW���r ���r ���r!���r�������sL���������

�� 

zInputSource._impl_fetch_sftpc��������������
���C���sD��d||f�}||kr(|�j�|||d�||<�||�}d}z�ttj�|
�|d��t|
d��>}|j}|dk	rz|�	d||j|f���|�
d|j�|j��W�5�Q�R�X�|�|j�}|dk	r�tj�
|�}||kr�|dk	r�|�d	|||f���td
||f���|dk	�r|�	d|
|f���t�||
��d}W�5�|dk	�r>|�d|���t�|��X�dS�)
a!��!Fetches a file over FTP.
        @param parsed The parsed URL from urlparse.urlparse
        @param joined The joined URL from urlparse.urljoin
        @param scheme The data transfer scheme (ftp, sftp, etc.)
        @param path The URL path
        @param netpart the netpart portion of the URL.
        @param streams the array of transfer streams
        @param dc the DataCatalog for the remote data
        @param dsurl the dataset URL
        @param urlmore section and other parts of the URL
        @param dest the local disk destination
        @param logger the logging.Logger for messages, or None
        @param timeout connection timeout in seconds
        @param realtime True for FORECAST mode, False if not.  Ignored.
        @returns True on success, False if the file was not copiedr����)r^���r����Nz'In finally block, removing temp file %sr����r����z%s: pull %s => %szRETR z%%s: wrong size: %d local vs %d remotez>Could not transfer full file: only %d of %d bytes transferred.z%s: move from %sT)r����r����r)���r����r
���r*���r+���r1���r����rZ���Z
retrbinaryr9����size�getsizer���r����)rN���r����r����r����r*���r����r����r����r����r����r����r^���r����r����r�����streamr����r/���Zremote_sizeZ
local_sizer ���r ���r!���r�������sN����
�����

�zInputSource._impl_fetch_ftpc�����������������C���s0���|r|�j�s|�����|�jS�|�js&|�����|�jS�dS�)a,��!Returns the list of DataCatalog objects for FORECAST or
        HISTORY mode.
        @param realtime True for FORECAST mode, False for HISTORY
        @returns self.forecast or self.history
        @post _sort() has been called, sorting self.forecast and
          self.history in order of priorityN)r|���r����r{���r~���r}���)rN���r����r ���r ���r!����list_for��s������zInputSource.list_forc
��������������	���C���s���t�jjdddd����}
t|
t�jj�s(t�|�jdk	s6t�|�jdk	r�|�jddd|d�f�d	k}t||d
�}|dkr�|�d|t|�f���|�j	d
|f�dd��t
|����D����|
j�}|�
d|
jf���tt�jj|
jd��D�]}|�
|��q�t||d
�}|dk�r0|�
d|���td�D�]}t�d���qt||d
�}|dk�rH|�
d|���|dk	�rh|�
d|t|�f���t��}t��}t
|����D�]�\}}|d�}tj�|
j|�}|�d||f���tj�|��rDttj�|�|d
��t||d|d��|dd��D�]"}|�d||f���|�|���q�|j�tj�||
j�}t �!dd|�}|�|��n8tj�||
j�}t �!dd|�}|�|��|�d|f����q�|�r�t"t
|��}|�d|d�|�f���|�r�t"t
|��}|�d |d�|�f���|�r�|�s�|�
d!|f���W�5�Q�R�X�dS�)"a���!Fetches an archive from HPSS
        @param archpath path to the archive on HPSS
        @param parts list of required archive elements as integer index
          within the done argument
        @param[out] done list of bool, set to True if the part was obtained
        @param prio the priority of this input source
        @param loc,parsed,dc,data,target_dt,realtime,skip_existing Ignored.
        @param logger the logging.Logger for log messageszpull.F)r"����cdZ
keep_on_errorNr:���r����:z.idxz	/dev/nullr����r���zC%s.idx: exit status %d dumping index file. Htar will probably fail.z-xpfc�����������������S���s���g�|�]}|�qS�r ���r ���)�.0�pr ���r ���r!����
<listcomp>C��s�����z1InputSource._impl_get_archive.<locals>.<listcomp>z%s: list contents)r*���z>non-zero exit status %d from htar; will retry in five seconds.�2���g�������?z9non-zero exit status %d from htar; will keep going anywayz%s: pull %d filesz%s: check for this at %s)�keepr^���r���z
%s: add %dz^(../)+��z%s: does not existz%s: does not have: %srx���z%s: has files: %sz0%s: gleefully reporting all desired files found.)#r'���r����ZTempDirr>���r����r����r	���r�����intr����r<���r=���r+���rZ���r@���ZlistingZListing�range�time�sleep�len�setr����r)���r*���r�����debugr����r
���r
���r�����relpathr�����subr;���)rN����archpath�parts�done�prio�locr����r�����data�	target_dcr����r^����
skip_existingZtd�i�errr�����line�statr����ZnopeZyup�filepartZtgti�tgtr����Zrelfile�missing�foundr ���r ���r!����_impl_get_archive-��s�����

�

���
�
�

�
�����zInputSource._impl_get_archivec��������������
���C���s��|��d�}|dkrh|d|��}||d�d��}||
krV||
|�krV|
|�|��|��n||g|
|�|<�n�|dd��dkr�|�d|f���dS�z&|�j|	||||||
d�r�|�|��W�nT�ttfk
�r�}�z0|dk	r�|jd	t|�t|�t|�f�d
d��W�5�d}~X�Y�nX�dS�)a���!Obtain one or more files.
        @param i The index in done of the file being fetched
        @param done an array of logical flags telling which files are transferred
        @param src the source location
        @param tgt the target location
        @param prio the numerical priority 
        @param loc the on-disk destination
        @param parsed the parsed URL as output by urlparse.urlparse
        @param dc the DataCatalog
        @param streams the array of transfer streams
            @param archives a double-nested dict of lists, mapping from
          archive name to file part to index within done of the file
          in question
        @param target_dc the DataCatalog of the target locations
        @param realtime True for FORECAST mode, False for HISTORY mode
        @param logger the logging.Logger for log messages
        @param skip_existing if True, do not re-download files that
        already exist on disk (in the target_dc)�#r���r���N����zhtar:z %s: no # in path - skipping this)r^���r����zfetching %s=>%s: %sT��exc_info)r���r����r����r����r�����EnvironmentErrorr���r@���)rN���r����r����r����r����r����r����r����r����r�����archivesr����r����r����r^���r�����archsepZarchr����rW���r ���r ���r!����_impl_get_filew��s>����
���������zInputSource._impl_get_filec�����������	���	���C���s`���t����}|�d��|D�]4\}}}}}|�dt|�t|�t|�t|�f���q|���}|����|S�)a���!Generates a string containing a human-readable, prioritized
        list of data sources.
        @param dclist The data source list from list_for()
        @returns A multi-line string containing the table.

        Example:
            Prioritized list of data sources:
            PRIO-   LOCATION = SOURCE @ DATES
            100 -   file:/// = DataCatalog(conf,'wcoss_fcst_PROD2014',2015080518) @ '1970010100-2038011818'
            098 -   file:/// = DataCatalog(conf,'wcoss_prepbufrnr_PROD2014',2015080518) @ '1970010100-2038011818'
            097 -    file:// = DataCatalog(conf,'zhan_gyre',2015080518) @ '2011060718-2011111200,2013051800-2013091018'zDPrioritized list of data sources:
PRIO-   LOCATION = SOURCE @ DATES
z%03d - %10s = %s @ %s
)r7���r8���r9���r����r@���rB���rC���r����)	rN����dclistrF���r����r����r����r����r_���Zsvr ���r ���r!����	priotable���s����
����
zInputSource.priotableFc�����������������C���s���|dkr|�j�}|��|�}t��}|�|��|���|D��]�\}}	}
}}|	dk	sNt�|dk	sZt�|
dk	sft�|dk	srt�|dk	s~t�|
j}
|
j}|�dt|�t	|	�t
|�t
|�f���|
dkr�|��||�s�|�d|f���q2n|
dkr�|�d|	|
f���q2t
��}t�t
�}d}�z�tj�d|���v}d}t��}t�d
t|�t	|	�t
|�t
|�f���|D��]�}|d7�}||k�rn�qRd|k�s|t�|jf�|�}|dk�r��qR||k�r�|dk	�rR|�d|f����qRtj�|��r�|�r�|dk	�r�|�d|f���|�|���qR|dk	�r|�dt
|�t
|�f���d}|dk	�rF|�dt
|�t
|�f���z|jf�d|i|��}W�nD�tk
�r��}�z$|�d|t	|�f���W�Y���qRW�5�d}~X�Y�nX�|dk�r��qR|dk	�r�|�dt |�t
|�f���|�|��|j!|�j"||||||	|
||||||||gd���qR|�#���t|����D�]^\}}t$|�dk�rZ|dk	�r&|�d|f����q&|j!|�j%|||||	|
||||||gd���q&|�#���W�5�Q�R�X�W�5�|dk	�r�|�d��t|����D�]`\}}z|����W�nH�tk
�r�}�z(|dk	�r|jd|t	|�f�d	d
��W�5�d}~X�Y�nX��q�X�~q2t�dt|�t	|	�t
|�t
|�f���d}d}|D�]�}|d7�}||k�rp�qV|jf�|�}tj�|��r��qV|�&dd��r�|dk	�r�|�dt
|�f���n"|dk	�r�|�dt
|�f���d	}�qV|�S�)a��!Transfers the specified set of data to the specified
        target.  The "target_dc" is a DataCatalog that specifies the
        destination filenames.  The "realtime" argument is True for
        FORECAST (real-time) mode runs, and False for HISTORY
        (retrospective) mode runs.  The "data" argument should be an
        iterable (list, tuple, etc.) where each element is a dict-like
        object that describes one file to obtain.  Each dict contains:

          dataset - string name of the dataset (gfs, gdas1, gefs,
            enkf, etc.)
          item - string name of the object (ie.: sf, sfcanl, bufr)
          atime - Optional: a datetime.datetime specifying the
            analysis time.  Default is the atime from the
            InputSource's constructor.
          ftime - Optional: a datetime.datetime specifying the
            forecast time.
          ...others... - any other keyword arguments will be sent to
            the .location functions in any of this InputSource's
            DataCatalog objects.Nz Consider: %03d - %10s = %s @ %s
r����z%s: cannot access; will skip)r����r����r����z'%s: invalid transfer mode %s; will skip�"In finally block, closing streams.�%Exception while closing stream %s: %sTr��������r���z!Pull from: %03d - %10s = %s @ %s
r���r_���z%s: already processing thisz%s: already existsz%s => %sz	(unknown)zsearch for %s in %sz%s: key error %szSRC %s => %s)�argsz%s: nothing to pull; skipz6Exited input loop after source: %03d - %10s = %s @ %s
F�optionalzmissing optional data: %szMISSING INPUT: %s)'rz���r����r����rZ���r��r����r����r����r����r@���rB���r�����errorr����rn���ro���r����r<���r����r����r����r'����workpoolZWorkPoolr���rg���r)���r*���r����r����r����rU���rI���Zadd_workr��Zbarrierr����r����r:���)rN���r����r����r����r^���r����r��r����r����r����r����r����r_���r����r����r����r��r
��r����r����rW���r�����seenrE���r����r����rG���r����r����r����r ���r ���r!���r:������s*����
�����
����

�







�

�������������
������������



������


zInputSource.getc�����������������K���s��|dkr|�j�}t��}ztt|�}|D�]b\}
}}}|j	f�||d�|��}|dkrNq"|�
d�}|dkrjtd	��q"|�j||||||d
�r"�q�q"W�5�|dk	r�|�d��t|����D�]\\}	}
z|
����W�q��tk
�r�}�z&|dk	r�|jd|	t|�f�dd��W�5�d}~X�Y�q�X�q�X�dS�)a,��!This is a simple wrapper around fetch_file that gets only
        one file.  It will fail if the file requires pulling an
        archive.
        @param dataset the dataset to transfer
        @param item the desired item in the dataset
        @param dest the on-disk destination filename
        @param logger a logging.Logger for log messages
        @param timeout the connection timeout in seconds
        @param realtime True for FORECAST mode, False for HISTORY mode
        @param kwargs extra keyword arguments are passed to DataCatalog.locate()Nr��r��Tr����)r3���r4���r����r���zYSource is in an archive.  De-archiving is not supported by "get_one."  Use "get" instead.r����)
rz���r����r����r<���r����r����r����r@���r����rg���r����NotImplementedErrorr����)rN���r3���r4���r����r^���r����r����r`���r����r����r����rW���r��r����r����r����r����r����r��r ���r ���r!����get_one=��s:������
�


��zInputSource.get_one)NNN)Nrc���NN)NNN)Nr����)Nr����rc���)Nr����T)T)FNT)Nr����T)rh���ri���rj���rk���rO���r����r����r����r����r����r����r����r����r����r����r����r��r��r:���r��r ���r ���r ���r!���r���2��s.���R
e

-
&
�����
123
J/���
�)0rk����__all__rn���r)���r����r,���rS���Zurllib.parser����r����r����r���r����r?���r7����produtil.runr'����produtil.cluster�produtil.fileop�produtil.cdZprodutil.workpoolZprodutil.listing�
hwrf.numericsr�����hwrf.exceptionsr���r���r���r���r	���r
���r���r���r
���r���r���r���r���r���r����produtil.logr���r���r1���rI����objectr���r���r ���r ���r ���r!����<module>���s ���
`0#
�D