! -*-f90-*-
!***********************************************************************
!* GNU Lesser General Public License
!*
!* This file is part of the GFDL Flexible Modeling System (FMS).
!*
!* FMS is free software: you can redistribute it and/or modify it under
!* the terms of the GNU Lesser General Public License as published by
!* the Free Software Foundation, either version 3 of the License, or (at
!* your option) any later version.
!*
!* FMS is distributed in the hope that it will be useful, but WITHOUT
!* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
!* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
!* for more details.
!*
!* You should have received a copy of the GNU Lesser General Public
!* License along with FMS. If not, see .
!***********************************************************************
!
!
! Set user stack size.
!
!
! This sets the size of an array that is used for internal storage by
! mpp_domains. This array is used, for instance, to buffer the
! data sent and received in halo updates.
!
! This call has implied global synchronization. It should be
! placed somewhere where all PEs can call it.
!
!
! call mpp_domains_set_stack_size(n)
!
!
!
subroutine mpp_domains_set_stack_size(n)
!set the mpp_domains_stack variable to be at least n LONG words long
integer, intent(in) :: n
character(len=8) :: text
if( n.LE.mpp_domains_stack_size )return
#ifdef use_libSMA
call mpp_malloc( ptr_domains_stack, n, mpp_domains_stack_size )
call mpp_malloc( ptr_domains_stack_nonblock, n, mpp_domains_stack_size )
#else
if( allocated(mpp_domains_stack) )deallocate(mpp_domains_stack)
allocate( mpp_domains_stack(n) )
if( allocated(mpp_domains_stack_nonblock) )deallocate(mpp_domains_stack_nonblock)
allocate( mpp_domains_stack_nonblock(n) )
mpp_domains_stack_size = n
#endif
write( text,'(i8)' )n
if( mpp_pe().EQ.mpp_root_pe() )call mpp_error( NOTE, 'MPP_DOMAINS_SET_STACK_SIZE: stack size set to '//text//'.' )
return
end subroutine mpp_domains_set_stack_size
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
! !
! MPP_DOMAINS: overloaded operators (==, /=) !
! !
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
function mpp_domain1D_eq( a, b )
logical :: mpp_domain1D_eq
type(domain1D), intent(in) :: a, b
mpp_domain1D_eq = ( a%compute%begin.EQ.b%compute%begin .AND. &
a%compute%end .EQ.b%compute%end .AND. &
a%data%begin .EQ.b%data%begin .AND. &
a%data%end .EQ.b%data%end .AND. &
a%global%begin .EQ.b%global%begin .AND. &
a%global%end .EQ.b%global%end )
!compare pelists
! if( mpp_domain1D_eq )mpp_domain1D_eq = ASSOCIATED(a%list) .AND. ASSOCIATED(b%list)
! if( mpp_domain1D_eq )mpp_domain1D_eq = size(a%list(:)).EQ.size(b%list(:))
! if( mpp_domain1D_eq )mpp_domain1D_eq = ALL(a%list%pe.EQ.b%list%pe)
return
end function mpp_domain1D_eq
function mpp_domain1D_ne( a, b )
logical :: mpp_domain1D_ne
type(domain1D), intent(in) :: a, b
mpp_domain1D_ne = .NOT. ( a.EQ.b )
return
end function mpp_domain1D_ne
function mpp_domain2D_eq( a, b )
logical :: mpp_domain2D_eq
type(domain2D), intent(in) :: a, b
integer :: nt, n
mpp_domain2d_eq = size(a%x(:)) .EQ. size(b%x(:))
nt = size(a%x(:))
do n = 1, nt
if(mpp_domain2d_eq) mpp_domain2D_eq = a%x(n).EQ.b%x(n) .AND. a%y(n).EQ.b%y(n)
end do
if( mpp_domain2D_eq .AND. ((a%pe.EQ.NULL_PE).OR.(b%pe.EQ.NULL_PE)) )return !NULL_DOMAIN2D
!compare pelists
if( mpp_domain2D_eq )mpp_domain2D_eq = ASSOCIATED(a%list) .AND. ASSOCIATED(b%list)
if( mpp_domain2D_eq )mpp_domain2D_eq = size(a%list(:)).EQ.size(b%list(:))
if( mpp_domain2D_eq )mpp_domain2D_eq = ALL(a%list%pe.EQ.b%list%pe)
if( mpp_domain2D_eq )mpp_domain2D_eq = ALL(a%io_layout .EQ. b%io_layout)
if( mpp_domain2D_eq )mpp_domain2D_eq = a%symmetry .eqv. b%symmetry
return
end function mpp_domain2D_eq
!#####################################################################
function mpp_domain2D_ne( a, b )
logical :: mpp_domain2D_ne
type(domain2D), intent(in) :: a, b
mpp_domain2D_ne = .NOT. ( a.EQ.b )
return
end function mpp_domain2D_ne
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
! !
! MPP_GET and SET routiness: retrieve various components of domains !
! !
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
subroutine mpp_get_compute_domain1D( domain, begin, end, size, max_size, is_global )
type(domain1D), intent(in) :: domain
integer, intent(out), optional :: begin, end, size, max_size
logical, intent(out), optional :: is_global
if( PRESENT(begin) )begin = domain%compute%begin
if( PRESENT(end) )end = domain%compute%end
if( PRESENT(size) )size = domain%compute%size
if( PRESENT(max_size) )max_size = domain%compute%max_size
if( PRESENT(is_global) )is_global = domain%compute%is_global
return
end subroutine mpp_get_compute_domain1D
!#####################################################################
subroutine mpp_get_data_domain1D( domain, begin, end, size, max_size, is_global )
type(domain1D), intent(in) :: domain
integer, intent(out), optional :: begin, end, size, max_size
logical, intent(out), optional :: is_global
if( PRESENT(begin) )begin = domain%data%begin
if( PRESENT(end) )end = domain%data%end
if( PRESENT(size) )size = domain%data%size
if( PRESENT(max_size) )max_size = domain%data%max_size
if( PRESENT(is_global) )is_global = domain%data%is_global
return
end subroutine mpp_get_data_domain1D
!#####################################################################
subroutine mpp_get_global_domain1D( domain, begin, end, size, max_size )
type(domain1D), intent(in) :: domain
integer, intent(out), optional :: begin, end, size, max_size
if( PRESENT(begin) )begin = domain%global%begin
if( PRESENT(end) )end = domain%global%end
if( PRESENT(size) )size = domain%global%size
if( PRESENT(max_size) )max_size = domain%global%max_size
return
end subroutine mpp_get_global_domain1D
!#####################################################################
subroutine mpp_get_memory_domain1D( domain, begin, end, size, max_size, is_global )
type(domain1D), intent(in) :: domain
integer, intent(out), optional :: begin, end, size, max_size
logical, intent(out), optional :: is_global
if( PRESENT(begin) )begin = domain%memory%begin
if( PRESENT(end) )end = domain%memory%end
if( PRESENT(size) )size = domain%memory%size
if( PRESENT(max_size) )max_size = domain%memory%max_size
if( PRESENT(is_global) )is_global = domain%memory%is_global
return
end subroutine mpp_get_memory_domain1D
!#####################################################################
subroutine mpp_get_compute_domain2D( domain, xbegin, xend, ybegin, yend, xsize, xmax_size, ysize, ymax_size, &
x_is_global, y_is_global, tile_count, position )
type(domain2D), intent(in) :: domain
integer, intent(out), optional :: xbegin, xend, ybegin, yend, xsize, xmax_size, ysize, ymax_size
logical, intent(out), optional :: x_is_global, y_is_global
integer, intent(in), optional :: tile_count, position
integer :: tile, ishift, jshift
tile = 1
if(present(tile_count)) tile = tile_count
call mpp_get_compute_domain( domain%x(tile), xbegin, xend, xsize, xmax_size, x_is_global )
call mpp_get_compute_domain( domain%y(tile), ybegin, yend, ysize, ymax_size, y_is_global )
call mpp_get_domain_shift( domain, ishift, jshift, position )
if( PRESENT(xend) ) xend = xend + ishift
if( PRESENT(yend) ) yend = yend + jshift
if( PRESENT(xsize)) xsize = xsize + ishift
if( PRESENT(ysize)) ysize = ysize + jshift
if(PRESENT(xmax_size))xmax_size = xmax_size + ishift
if(PRESENT(ymax_size))ymax_size = ymax_size + jshift
return
end subroutine mpp_get_compute_domain2D
!#####################################################################
subroutine mpp_get_data_domain2D( domain, xbegin, xend, ybegin, yend, xsize, xmax_size, ysize, ymax_size, &
x_is_global, y_is_global, tile_count, position )
type(domain2D), intent(in) :: domain
integer, intent(out), optional :: xbegin, xend, ybegin, yend, xsize, xmax_size, ysize, ymax_size
logical, intent(out), optional :: x_is_global, y_is_global
integer, intent(in), optional :: tile_count, position
integer :: tile, ishift, jshift
tile = 1
if(present(tile_count)) tile = tile_count
call mpp_get_data_domain( domain%x(tile), xbegin, xend, xsize, xmax_size, x_is_global )
call mpp_get_data_domain( domain%y(tile), ybegin, yend, ysize, ymax_size, y_is_global )
call mpp_get_domain_shift( domain, ishift, jshift, position )
if( PRESENT(xend) ) xend = xend + ishift
if( PRESENT(yend) ) yend = yend + jshift
if( PRESENT(xsize)) xsize = xsize + ishift
if( PRESENT(ysize)) ysize = ysize + jshift
if(PRESENT(xmax_size))xmax_size = xmax_size + ishift
if(PRESENT(ymax_size))ymax_size = ymax_size + jshift
return
end subroutine mpp_get_data_domain2D
!#####################################################################
subroutine mpp_get_global_domain2D( domain, xbegin, xend, ybegin, yend, xsize, xmax_size, ysize, ymax_size, &
tile_count, position )
type(domain2D), intent(in) :: domain
integer, intent(out), optional :: xbegin, xend, ybegin, yend, xsize, xmax_size, ysize, ymax_size
integer, intent(in), optional :: tile_count, position
integer :: tile, ishift, jshift
tile = 1
if(present(tile_count)) tile = tile_count
call mpp_get_global_domain( domain%x(tile), xbegin, xend, xsize, xmax_size )
call mpp_get_global_domain( domain%y(tile), ybegin, yend, ysize, ymax_size )
call mpp_get_domain_shift( domain, ishift, jshift, position )
if( PRESENT(xend) ) xend = xend + ishift
if( PRESENT(yend) ) yend = yend + jshift
if( PRESENT(xsize)) xsize = xsize + ishift
if( PRESENT(ysize)) ysize = ysize + jshift
if(PRESENT(xmax_size))xmax_size = xmax_size + ishift
if(PRESENT(ymax_size))ymax_size = ymax_size + jshift
return
end subroutine mpp_get_global_domain2D
!#####################################################################
subroutine mpp_get_memory_domain2D( domain, xbegin, xend, ybegin, yend, xsize, xmax_size, ysize, ymax_size, &
x_is_global, y_is_global, position)
type(domain2D), intent(in) :: domain
integer, intent(out), optional :: xbegin, xend, ybegin, yend, xsize, xmax_size, ysize, ymax_size
logical, intent(out), optional :: x_is_global, y_is_global
integer, intent(in), optional :: position
integer :: tile, ishift, jshift
tile = 1
call mpp_get_memory_domain( domain%x(tile), xbegin, xend, xsize, xmax_size, x_is_global )
call mpp_get_memory_domain( domain%y(tile), ybegin, yend, ysize, ymax_size, y_is_global )
call mpp_get_domain_shift( domain, ishift, jshift, position )
if( PRESENT(xend) ) xend = xend + ishift
if( PRESENT(yend) ) yend = yend + jshift
if( PRESENT(xsize)) xsize = xsize + ishift
if( PRESENT(ysize)) ysize = ysize + jshift
if(PRESENT(xmax_size))xmax_size = xmax_size + ishift
if(PRESENT(ymax_size))ymax_size = ymax_size + jshift
return
end subroutine mpp_get_memory_domain2D
!#####################################################################
subroutine mpp_set_compute_domain1D( domain, begin, end, size, is_global )
type(domain1D), intent(inout) :: domain
integer, intent(in), optional :: begin, end, size
logical, intent(in), optional :: is_global
if(present(begin)) domain%compute%begin = begin
if(present(end)) domain%compute%end = end
if(present(size)) domain%compute%size = size
if(present(is_global)) domain%compute%is_global = is_global
end subroutine mpp_set_compute_domain1D
!#####################################################################
subroutine mpp_set_compute_domain2D( domain, xbegin, xend, ybegin, yend, xsize, ysize, &
x_is_global, y_is_global, tile_count )
type(domain2D), intent(inout) :: domain
integer, intent(in), optional :: xbegin, xend, ybegin, yend, xsize, ysize
logical, intent(in), optional :: x_is_global, y_is_global
integer, intent(in), optional :: tile_count
integer :: tile
tile = 1
if(present(tile_count)) tile = tile_count
call mpp_set_compute_domain(domain%x(tile), xbegin, xend, xsize, x_is_global)
call mpp_set_compute_domain(domain%y(tile), ybegin, yend, ysize, y_is_global)
end subroutine mpp_set_compute_domain2D
!#####################################################################
subroutine mpp_set_data_domain1D( domain, begin, end, size, is_global )
type(domain1D), intent(inout) :: domain
integer, intent(in), optional :: begin, end, size
logical, intent(in), optional :: is_global
if(present(begin)) domain%data%begin = begin
if(present(end)) domain%data%end = end
if(present(size)) domain%data%size = size
if(present(is_global)) domain%data%is_global = is_global
end subroutine mpp_set_data_domain1D
!#####################################################################
subroutine mpp_set_data_domain2D( domain, xbegin, xend, ybegin, yend, xsize, ysize, &
x_is_global, y_is_global, tile_count )
type(domain2D), intent(inout) :: domain
integer, intent(in), optional :: xbegin, xend, ybegin, yend, xsize, ysize
logical, intent(in), optional :: x_is_global, y_is_global
integer, intent(in), optional :: tile_count
integer :: tile
tile = 1
if(present(tile_count)) tile = tile_count
call mpp_set_data_domain(domain%x(tile), xbegin, xend, xsize, x_is_global)
call mpp_set_data_domain(domain%y(tile), ybegin, yend, ysize, y_is_global)
end subroutine mpp_set_data_domain2D
!#####################################################################
subroutine mpp_set_global_domain1D( domain, begin, end, size)
type(domain1D), intent(inout) :: domain
integer, intent(in), optional :: begin, end, size
if(present(begin)) domain%global%begin = begin
if(present(end)) domain%global%end = end
if(present(size)) domain%global%size = size
end subroutine mpp_set_global_domain1D
!#####################################################################
subroutine mpp_set_global_domain2D( domain, xbegin, xend, ybegin, yend, xsize, ysize, tile_count )
type(domain2D), intent(inout) :: domain
integer, intent(in), optional :: xbegin, xend, ybegin, yend, xsize, ysize
integer, intent(in), optional :: tile_count
integer :: tile
tile = 1
if(present(tile_count)) tile = tile_count
call mpp_set_global_domain(domain%x(tile), xbegin, xend, xsize)
call mpp_set_global_domain(domain%y(tile), ybegin, yend, ysize)
end subroutine mpp_set_global_domain2D
!#####################################################################
!
!
! Retrieve 1D components of 2D decomposition.
!
!
! It is sometime necessary to have direct recourse to the domain1D types
! that compose a domain2D object. This call retrieves them.
!
!
! call mpp_get_domain_components( domain, x, y )
!
!
!
!
subroutine mpp_get_domain_components( domain, x, y, tile_count )
type(domain2D), intent(in) :: domain
type(domain1D), intent(inout), optional :: x, y
integer, intent(in), optional :: tile_count
integer :: tile
tile = 1
if(present(tile_count)) tile = tile_count
if( PRESENT(x) )x = domain%x(tile)
if( PRESENT(y) )y = domain%y(tile)
return
end subroutine mpp_get_domain_components
!#####################################################################
subroutine mpp_get_compute_domains1D( domain, begin, end, size )
type(domain1D), intent(in) :: domain
integer, intent(out), optional, dimension(:) :: begin, end, size
if( .NOT.module_is_initialized ) &
call mpp_error( FATAL, 'MPP_GET_COMPUTE_DOMAINS: must first call mpp_domains_init.' )
!we use shape instead of size for error checks because size is used as an argument
if( PRESENT(begin) )then
if( any(shape(begin).NE.shape(domain%list)) ) &
call mpp_error( FATAL, 'MPP_GET_COMPUTE_DOMAINS: begin array size does not match domain.' )
begin(:) = domain%list(:)%compute%begin
end if
if( PRESENT(end) )then
if( any(shape(end).NE.shape(domain%list)) ) &
call mpp_error( FATAL, 'MPP_GET_COMPUTE_DOMAINS: end array size does not match domain.' )
end(:) = domain%list(:)%compute%end
end if
if( PRESENT(size) )then
if( any(shape(size).NE.shape(domain%list)) ) &
call mpp_error( FATAL, 'MPP_GET_COMPUTE_DOMAINS: size array size does not match domain.' )
size(:) = domain%list(:)%compute%size
end if
return
end subroutine mpp_get_compute_domains1D
!#####################################################################
subroutine mpp_get_compute_domains2D( domain, xbegin, xend, xsize, ybegin, yend, ysize, position )
type(domain2D), intent(in) :: domain
integer, intent(out), optional, dimension(:) :: xbegin, xend, xsize, ybegin, yend, ysize
integer, intent(in ), optional :: position
integer :: i, ishift, jshift
call mpp_get_domain_shift( domain, ishift, jshift, position )
if( .NOT.module_is_initialized ) &
call mpp_error( FATAL, 'MPP_GET_COMPUTE_DOMAINS: must first call mpp_domains_init.' )
if( PRESENT(xbegin) )then
if( size(xbegin(:)).NE.size(domain%list(:)) ) &
call mpp_error( FATAL, 'MPP_GET_COMPUTE_DOMAINS: xbegin array size does not match domain.' )
do i = 1, size(xbegin(:))
xbegin(i) = domain%list(i-1)%x(1)%compute%begin
end do
end if
if( PRESENT(xend) )then
if( size(xend(:)).NE.size(domain%list(:)) ) &
call mpp_error( FATAL, 'MPP_GET_COMPUTE_DOMAINS: xend array size does not match domain.' )
do i = 1, size(xend(:))
xend(i) = domain%list(i-1)%x(1)%compute%end + ishift
end do
end if
if( PRESENT(xsize) )then
if( size(xsize(:)).NE.size(domain%list(:)) ) &
call mpp_error( FATAL, 'MPP_GET_COMPUTE_DOMAINS: xsize array size does not match domain.' )
do i = 1, size(xsize(:))
xsize(i) = domain%list(i-1)%x(1)%compute%size + ishift
end do
end if
if( PRESENT(ybegin) )then
if( size(ybegin(:)).NE.size(domain%list(:)) ) &
call mpp_error( FATAL, 'MPP_GET_COMPUTE_DOMAINS: ybegin array size does not match domain.' )
do i = 1, size(ybegin(:))
ybegin(i) = domain%list(i-1)%y(1)%compute%begin
end do
end if
if( PRESENT(yend) )then
if( size(yend(:)).NE.size(domain%list(:)) ) &
call mpp_error( FATAL, 'MPP_GET_COMPUTE_DOMAINS: yend array size does not match domain.' )
do i = 1, size(yend(:))
yend(i) = domain%list(i-1)%y(1)%compute%end + jshift
end do
end if
if( PRESENT(ysize) )then
if( size(ysize(:)).NE.size(domain%list(:)) ) &
call mpp_error( FATAL, 'MPP_GET_COMPUTE_DOMAINS: ysize array size does not match domain.' )
do i = 1, size(ysize(:))
ysize(i) = domain%list(i-1)%y(1)%compute%size + jshift
end do
end if
return
end subroutine mpp_get_compute_domains2D
!#####################################################################
subroutine mpp_get_domain_extents1D(domain, xextent, yextent)
type(domain2d), intent(in) :: domain
integer, dimension(0:), intent(inout) :: xextent, yextent
integer :: n
if(domain%ntiles .NE. 1) call mpp_error(FATAL,"mpp_domains_util.inc(mpp_get_domain_extents1D): "// &
"ntiles is more than 1, please use mpp_get_domain_extents2D")
if(size(xextent) .NE. size(domain%x(1)%list(:))) call mpp_error(FATAL,"mpp_domains_util.inc(mpp_get_domain_extents1D): "// &
"size(xextent) does not equal to size(domain%x(1)%list(:)))")
if(size(yextent) .NE. size(domain%y(1)%list(:))) call mpp_error(FATAL,"mpp_domains_util.inc(mpp_get_domain_extents1D): "// &
"size(yextent) does not equal to size(domain%y(1)%list(:)))")
do n = 0, size(domain%x(1)%list(:))-1
xextent(n) = domain%x(1)%list(n)%compute%size
enddo
do n = 0, size(domain%y(1)%list(:))-1
yextent(n) = domain%y(1)%list(n)%compute%size
enddo
end subroutine mpp_get_domain_extents1D
!#####################################################################
! This will return xextent and yextent for each tile
subroutine mpp_get_domain_extents2D(domain, xextent, yextent)
type(domain2d), intent(in) :: domain
integer, dimension(:,:), intent(inout) :: xextent, yextent
integer :: ntile, nlist, n, m, ndivx, ndivy, tile, pos
ntile = domain%ntiles
nlist = size(domain%list(:))
if(size(xextent,2) .ne. ntile .or. size(yextent,2) .ne. ntile) call mpp_error(FATAL, &
"mpp_domains_utile.inc: the second dimension size of xextent/yextent is not correct")
ndivx = size(xextent,1); ndivy = size(yextent,1)
do n = 0, nlist-1
if(ANY(domain%list(n)%x(:)%pos>ndivx-1) ) call mpp_error(FATAL, &
"mpp_domains_utile.inc: first dimension size of xextent is less than the x-layout in some tile")
if(ANY(domain%list(n)%y(:)%pos>ndivy-1) ) call mpp_error(FATAL, &
"mpp_domains_utile.inc: first dimension size of yextent is less than the y-layout in some tile")
end do
xextent = 0; yextent=0
do n = 0, nlist-1
do m = 1, size(domain%list(n)%tile_id(:))
tile = domain%list(n)%tile_id(m)
pos = domain%list(n)%x(m)%pos+1
if(xextent(pos, tile) == 0) xextent(pos,tile) = domain%list(n)%x(m)%compute%size
pos = domain%list(n)%y(m)%pos+1
if(yextent(pos, tile) == 0) yextent(pos,tile) = domain%list(n)%y(m)%compute%size
end do
end do
end subroutine mpp_get_domain_extents2D
!#####################################################################
function mpp_get_domain_pe(domain)
type(domain2d), intent(in) :: domain
integer :: mpp_get_domain_pe
mpp_get_domain_pe = domain%pe
end function mpp_get_domain_pe
function mpp_get_domain_tile_root_pe(domain)
type(domain2d), intent(in) :: domain
integer :: mpp_get_domain_tile_root_pe
mpp_get_domain_tile_root_pe = domain%tile_root_pe
end function mpp_get_domain_tile_root_pe
function mpp_get_io_domain(domain)
type(domain2d), intent(in) :: domain
type(domain2d), pointer :: mpp_get_io_domain
if(ASSOCIATED(domain%io_domain)) then
mpp_get_io_domain => domain%io_domain
else
mpp_get_io_domain => NULL()
endif
end function mpp_get_io_domain
!#####################################################################
!
!
!
!
!
subroutine mpp_get_pelist1D( domain, pelist, pos )
type(domain1D), intent(in) :: domain
integer, intent(out) :: pelist(:)
integer, intent(out), optional :: pos
integer :: ndivs
if( .NOT.module_is_initialized ) &
call mpp_error( FATAL, 'MPP_GET_PELIST: must first call mpp_domains_init.' )
ndivs = size(domain%list(:))
if( size(pelist(:)).NE.ndivs ) &
call mpp_error( FATAL, 'MPP_GET_PELIST: pelist array size does not match domain.' )
pelist(:) = domain%list(0:ndivs-1)%pe
if( PRESENT(pos) )pos = domain%pos
return
end subroutine mpp_get_pelist1D
!#####################################################################
!
!
!
!
!
subroutine mpp_get_pelist2D( domain, pelist, pos )
type(domain2D), intent(in) :: domain
integer, intent(out) :: pelist(:)
integer, intent(out), optional :: pos
if( .NOT.module_is_initialized ) &
call mpp_error( FATAL, 'MPP_GET_PELIST: must first call mpp_domains_init.' )
if( size(pelist(:)).NE.size(domain%list(:)) ) &
call mpp_error( FATAL, 'MPP_GET_PELIST: pelist array size does not match domain.' )
pelist(:) = domain%list(:)%pe
if( PRESENT(pos) )pos = domain%pos
return
end subroutine mpp_get_pelist2D
!#####################################################################
!
!
!
!
subroutine mpp_get_layout1D( domain, layout )
type(domain1D), intent(in) :: domain
integer, intent(out) :: layout
if( .NOT.module_is_initialized ) &
call mpp_error( FATAL, 'MPP_GET_LAYOUT: must first call mpp_domains_init.' )
layout = size(domain%list(:))
return
end subroutine mpp_get_layout1D
!#####################################################################
!
!
!
!
subroutine mpp_get_layout2D( domain, layout )
type(domain2D), intent(in) :: domain
integer, intent(out) :: layout(2)
if( .NOT.module_is_initialized ) &
call mpp_error( FATAL, 'MPP_GET_LAYOUT: must first call mpp_domains_init.' )
layout(1) = size(domain%x(1)%list(:))
layout(2) = size(domain%y(1)%list(:))
return
end subroutine mpp_get_layout2D
!#####################################################################
!
!
! Returns the shift value in x and y-direction according to domain position..
!
!
! When domain is symmetry, one extra point maybe needed in
! x- and/or y-direction. This routine will return the shift value based
! on the position
!
!
! call mpp_get_domain_shift( domain, ishift, jshift, position )
!
!
! predefined data contains 2-d domain decomposition.
!
!
! return value will be 0 or 1.
!
!
! position of data. Its value can be CENTER, EAST, NORTH or CORNER.
!
!
subroutine mpp_get_domain_shift(domain, ishift, jshift, position)
type(domain2D), intent(in) :: domain
integer, intent(out) :: ishift, jshift
integer, optional, intent(in) :: position
integer :: pos
ishift = 0 ; jshift = 0
pos = CENTER
if(present(position)) pos = position
if(domain%symmetry) then ! shift is non-zero only when the domain is symmetry.
select case(pos)
case(CORNER)
ishift = 1; jshift = 1
case(EAST)
ishift = 1
case(NORTH)
jshift = 1
end select
end if
end subroutine mpp_get_domain_shift
!#####################################################################
subroutine mpp_get_neighbor_pe_1d(domain, direction, pe)
! Return PE to the righ/left of this PE-domain.
type(domain1D), intent(inout) :: domain
integer, intent(in) :: direction
integer, intent(out) :: pe
integer ipos, ipos2, npx
pe = NULL_PE
npx = size(domain%list(:)) ! 0..npx-1
ipos = domain%pos
select case (direction)
case (:-1)
! neighbor on the left
ipos2 = ipos - 1
if(ipos2 < 0) then
if(domain%cyclic) then
ipos2 = npx-1
else
ipos2 = -999
endif
endif
case (0)
! identity
ipos2 = ipos
case (1:)
! neighbor on the right
ipos2 = ipos + 1
if(ipos2 > npx-1) then
if(domain%cyclic) then
ipos2 = 0
else
ipos2 = -999
endif
endif
end select
if(ipos2 >= 0) pe = domain%list(ipos2)%pe
end subroutine mpp_get_neighbor_pe_1d
!#####################################################################
subroutine mpp_get_neighbor_pe_2d(domain, direction, pe)
! Return PE North/South/East/West of this PE-domain.
! direction must be NORTH, SOUTH, EAST or WEST.
type(domain2D), intent(inout) :: domain
integer, intent(in) :: direction
integer, intent(out) :: pe
integer ipos, jpos, npx, npy, ix, iy, ipos0, jpos0
pe = NULL_PE
npx = size(domain%x(1)%list(:)) ! 0..npx-1
npy = size(domain%y(1)%list(:)) ! 0..npy-1
ipos0 = domain%x(1)%pos
jpos0 = domain%y(1)%pos
select case (direction)
case (NORTH)
ix = 0
iy = 1
case (NORTH_EAST)
ix = 1
iy = 1
case (EAST)
ix = 1
iy = 0
case (SOUTH_EAST)
ix = 1
iy =-1
case (SOUTH)
ix = 0
iy =-1
case (SOUTH_WEST)
ix =-1
iy =-1
case (WEST)
ix =-1
iy = 0
case (NORTH_WEST)
ix =-1
iy = 1
case default
call mpp_error( FATAL, &
& 'MPP_GET_NEIGHBOR_PE_2D: direction must be either NORTH, ' &
& // 'SOUTH, EAST, WEST, NORTH_EAST, SOUTH_EAST, SOUTH_WEST or NORTH_WEST')
end select
ipos = ipos0 + ix
jpos = jpos0 + iy
if( (ipos < 0 .or. ipos > npx-1) .and. domain%x(1)%cyclic ) then
! E/W cyclic domain
ipos = modulo(ipos, npx)
endif
if( (ipos < 0 .and. btest(domain%fold,WEST)) .or. &
& (ipos > npx-1 .and. btest(domain%fold,EAST)) ) then
! E or W folded domain
ipos = ipos0
jpos = npy-jpos-1
endif
if( (jpos < 0 .or. jpos > npy-1) .and. domain%y(1)%cyclic ) then
! N/S cyclic
jpos = modulo(jpos, npy)
endif
if( (jpos < 0 .and. btest(domain%fold,SOUTH)) .or. &
& (jpos > npy-1 .and. btest(domain%fold,NORTH)) ) then
! N or S folded
ipos = npx-ipos-1
jpos = jpos0
endif
! get the PE number
pe = NULL_PE
if(ipos >= 0 .and. ipos <= npx-1 .and. jpos >= 0 .and. jpos <= npy-1) then
pe = domain%pearray(ipos, jpos)
endif
end subroutine mpp_get_neighbor_pe_2d
!#######################################################################
subroutine nullify_domain2d_list(domain)
type(domain2d), intent(inout) :: domain
domain%list =>NULL()
end subroutine nullify_domain2d_list
!#######################################################################
function mpp_domain_is_symmetry(domain)
type(domain2d), intent(in) :: domain
logical :: mpp_domain_is_symmetry
mpp_domain_is_symmetry = domain%symmetry
return
end function mpp_domain_is_symmetry
!#######################################################################
function mpp_domain_is_initialized(domain)
type(domain2d), intent(in) :: domain
logical :: mpp_domain_is_initialized
mpp_domain_is_initialized = domain%initialized
return
end function mpp_domain_is_initialized
!#######################################################################
!--- private routine used only for mpp_update_domains. This routine will
!--- compare whalo, ehalo, shalo, nhalo with the halo size when defining "domain"
!--- to decide if update is needed. Also it check the sign of whalo, ehalo, shalo and nhalo.
function domain_update_is_needed(domain, whalo, ehalo, shalo, nhalo)
type(domain2d), intent(in) :: domain
integer, intent(in) :: whalo, ehalo, shalo, nhalo
logical :: domain_update_is_needed
domain_update_is_needed = .true.
if(whalo == 0 .AND. ehalo==0 .AND. shalo == 0 .AND. nhalo==0 ) then
domain_update_is_needed = .false.
if( debug )call mpp_error(NOTE, &
'mpp_domains_util.inc: halo size to be updated are all zero, no update will be done')
return
end if
if( (whalo == -domain%whalo .AND. domain%whalo .NE. 0) .or. &
(ehalo == -domain%ehalo .AND. domain%ehalo .NE. 0) .or. &
(shalo == -domain%shalo .AND. domain%shalo .NE. 0) .or. &
(nhalo == -domain%nhalo .AND. domain%nhalo .NE. 0) ) then
domain_update_is_needed = .false.
call mpp_error(NOTE, 'mpp_domains_util.inc: at least one of w/e/s/n halo size to be updated '// &
'is the inverse of the original halo when defining domain, no update will be done')
return
end if
end function domain_update_is_needed
!#######################################################################
! this routine found the domain has the same halo size with the input
! whalo, ehalo,
function search_update_overlap(domain, whalo, ehalo, shalo, nhalo, position)
type(domain2d), intent(inout) :: domain
integer, intent(in) :: whalo, ehalo, shalo, nhalo
integer, intent(in) :: position
type(overlapSpec), pointer :: search_update_overlap
type(overlapSpec), pointer :: update_ref
type(overlapSpec), pointer :: check => NULL()
integer :: ishift, jshift, shift
shift = 0; if(domain%symmetry) shift = 1
select case(position)
case (CENTER)
update_ref => domain%update_T
ishift = 0; jshift = 0
case (CORNER)
update_ref => domain%update_C
ishift = shift; jshift = shift
case (NORTH)
update_ref => domain%update_N
ishift = 0; jshift = shift
case (EAST)
update_ref => domain%update_E
ishift = shift; jshift = 0
case default
call mpp_error(FATAL,"mpp_domains_util.inc(search_update_overlap): position should be CENTER|CORNER|EAST|NORTH")
end select
search_update_overlap => update_ref
do
if(whalo == search_update_overlap%whalo .AND. ehalo == search_update_overlap%ehalo .AND. &
shalo == search_update_overlap%shalo .AND. nhalo == search_update_overlap%nhalo ) then
exit ! found domain
endif
!--- if not found, switch to next
if(.NOT. ASSOCIATED(search_update_overlap%next)) then
allocate(search_update_overlap%next)
search_update_overlap => search_update_overlap%next
if(domain%fold .NE. 0) then
call compute_overlaps(domain, position, search_update_overlap, check, &
ishift, jshift, 0, 0, whalo, ehalo, shalo, nhalo)
else
call set_overlaps(domain, update_ref, search_update_overlap, whalo, ehalo, shalo, nhalo )
endif
exit
else
search_update_overlap => search_update_overlap%next
end if
end do
update_ref => NULL()
end function search_update_overlap
!#######################################################################
! this routine found the check at certain position
function search_check_overlap(domain, position)
type(domain2d), intent(in) :: domain
integer, intent(in) :: position
type(overlapSpec), pointer :: search_check_overlap
select case(position)
case (CENTER)
search_check_overlap => NULL()
case (CORNER)
search_check_overlap => domain%check_C
case (NORTH)
search_check_overlap => domain%check_N
case (EAST)
search_check_overlap => domain%check_E
case default
call mpp_error(FATAL,"mpp_domains_util.inc(search_check_overlap): position should be CENTER|CORNER|EAST|NORTH")
end select
end function search_check_overlap
!#######################################################################
! this routine found the bound at certain position
function search_bound_overlap(domain, position)
type(domain2d), intent(in) :: domain
integer, intent(in) :: position
type(overlapSpec), pointer :: search_bound_overlap
select case(position)
case (CENTER)
search_bound_overlap => NULL()
case (CORNER)
search_bound_overlap => domain%bound_C
case (NORTH)
search_bound_overlap => domain%bound_N
case (EAST)
search_bound_overlap => domain%bound_E
case default
call mpp_error(FATAL,"mpp_domains_util.inc(search_bound_overlap): position should be CENTER|CORNER|EAST|NORTH")
end select
end function search_bound_overlap
!########################################################################
! return the tile_id on current pe
function mpp_get_tile_id(domain)
type(domain2d), intent(in) :: domain
integer, dimension(size(domain%tile_id(:))) :: mpp_get_tile_id
mpp_get_tile_id = domain%tile_id
return
end function mpp_get_tile_id
!#######################################################################
! return the tile_id on current pelist. one-tile-per-pe is assumed.
subroutine mpp_get_tile_list(domain, tiles)
type(domain2d), intent(in) :: domain
integer, intent(inout) :: tiles(:)
integer :: i
if( size(tiles(:)).NE.size(domain%list(:)) ) &
call mpp_error( FATAL, 'mpp_get_tile_list: tiles array size does not match domain.' )
do i = 1, size(tiles(:))
if(size(domain%list(i-1)%tile_id(:)) > 1) call mpp_error( FATAL, &
'mpp_get_tile_list: only support one-tile-per-pe now, contact developer');
tiles(i) = domain%list(i-1)%tile_id(1)
end do
end subroutine mpp_get_tile_list
!########################################################################
! return number of tiles in mosaic
function mpp_get_ntile_count(domain)
type(domain2d), intent(in) :: domain
integer :: mpp_get_ntile_count
mpp_get_ntile_count = domain%ntiles
return
end function mpp_get_ntile_count
!########################################################################
! return number of tile on current pe
function mpp_get_current_ntile(domain)
type(domain2d), intent(in) :: domain
integer :: mpp_get_current_ntile
mpp_get_current_ntile = size(domain%tile_id(:))
return
end function mpp_get_current_ntile
!#######################################################################
! return if current pe is the root pe of the tile, if number of tiles on current pe
! is greater than 1, will return true, if isc==isg and jsc==jsg also will return true,
! otherwise false will be returned.
function mpp_domain_is_tile_root_pe(domain)
type(domain2d), intent(in) :: domain
logical :: mpp_domain_is_tile_root_pe
mpp_domain_is_tile_root_pe = domain%pe == domain%tile_root_pe;
end function mpp_domain_is_tile_root_pe
!#########################################################################
! return number of processors used on current tile.
function mpp_get_tile_npes(domain)
type(domain2d), intent(in) :: domain
integer :: mpp_get_tile_npes
integer :: i, tile
!--- When there is more than one tile on this pe, we assume each tile will be
!--- limited to this pe.
if(size(domain%tile_id(:)) > 1) then
mpp_get_tile_npes = 1
else
mpp_get_tile_npes = 0
tile = domain%tile_id(1)
do i = 0, size(domain%list(:))-1
if(tile == domain%list(i)%tile_id(1) ) mpp_get_tile_npes = mpp_get_tile_npes + 1
end do
endif
end function mpp_get_tile_npes
!########################################################################
! get the processors list used on current tile.
subroutine mpp_get_tile_pelist(domain, pelist)
type(domain2d), intent(in) :: domain
integer, intent(inout) :: pelist(:)
integer :: npes_on_tile
integer :: i, tile, pos
npes_on_tile = mpp_get_tile_npes(domain)
if(size(pelist(:)) .NE. npes_on_tile) call mpp_error(FATAL, &
"mpp_domains_util.inc(mpp_get_tile_pelist): size(pelist) does not equal npes on current tile")
tile = domain%tile_id(1)
pos = 0
do i = 0, size(domain%list(:))-1
if(tile == domain%list(i)%tile_id(1)) then
pos = pos+1
pelist(pos) = domain%list(i)%pe
endif
enddo
return
end subroutine mpp_get_tile_pelist
!#####################################################################
subroutine mpp_get_tile_compute_domains( domain, xbegin, xend, ybegin, yend, position )
type(domain2D), intent(in) :: domain
integer, intent(out), dimension(:) :: xbegin, xend, ybegin, yend
integer, intent(in ), optional :: position
integer :: i, ishift, jshift
integer :: npes_on_tile, pos, tile
call mpp_get_domain_shift( domain, ishift, jshift, position )
if( .NOT.module_is_initialized ) &
call mpp_error( FATAL, 'mpp_get_compute_domains2D: must first call mpp_domains_init.' )
npes_on_tile = mpp_get_tile_npes(domain)
if(size(xbegin(:)) .NE. npes_on_tile) call mpp_error(FATAL, &
"mpp_domains_util.inc(mpp_get_compute_domains2D): size(xbegin) does not equal npes on current tile")
if(size(xend(:)) .NE. npes_on_tile) call mpp_error(FATAL, &
"mpp_domains_util.inc(mpp_get_compute_domains2D): size(xend) does not equal npes on current tile")
if(size(ybegin(:)) .NE. npes_on_tile) call mpp_error(FATAL, &
"mpp_domains_util.inc(mpp_get_compute_domains2D): size(ybegin) does not equal npes on current tile")
if(size(yend(:)) .NE. npes_on_tile) call mpp_error(FATAL, &
"mpp_domains_util.inc(mpp_get_compute_domains2D): size(yend) does not equal npes on current tile")
tile = domain%tile_id(1)
pos = 0
do i = 0, size(domain%list(:))-1
if(tile == domain%list(i)%tile_id(1)) then
pos = pos+1
xbegin(pos) = domain%list(i)%x(1)%compute%begin
xend (pos) = domain%list(i)%x(1)%compute%end + ishift
ybegin(pos) = domain%list(i)%y(1)%compute%begin
yend (pos) = domain%list(i)%y(1)%compute%end + jshift
endif
enddo
return
end subroutine mpp_get_tile_compute_domains
!#############################################################################
function mpp_get_num_overlap(domain, action, p, position)
type(domain2d), intent(in) :: domain
integer, intent(in) :: action
integer, intent(in) :: p
integer, optional, intent(in) :: position
integer :: mpp_get_num_overlap
type(overlapSpec), pointer :: update => NULL()
integer :: pos
pos = CENTER
if(present(position)) pos = position
select case(pos)
case (CENTER)
update => domain%update_T
case (CORNER)
update => domain%update_C
case (EAST)
update => domain%update_E
case (NORTH)
update => domain%update_N
case default
call mpp_error( FATAL, "mpp_domains_mod(mpp_get_num_overlap): invalid option of position")
end select
if(action == EVENT_SEND) then
if(p< 1 .OR. p > update%nsend) call mpp_error( FATAL, &
"mpp_domains_mod(mpp_get_num_overlap): p should be between 1 and update%nsend")
mpp_get_num_overlap = update%send(p)%count
else if(action == EVENT_RECV) then
if(p< 1 .OR. p > update%nrecv) call mpp_error( FATAL, &
"mpp_domains_mod(mpp_get_num_overlap): p should be between 1 and update%nrecv")
mpp_get_num_overlap = update%recv(p)%count
else
call mpp_error( FATAL, "mpp_domains_mod(mpp_get_num_overlap): invalid option of action")
end if
end function mpp_get_num_overlap
!#############################################################################
subroutine mpp_get_update_size(domain, nsend, nrecv, position)
type(domain2d), intent(in) :: domain
integer, intent(out) :: nsend, nrecv
integer, optional, intent(in) :: position
integer :: pos
pos = CENTER
if(present(position)) pos = position
select case(pos)
case (CENTER)
nsend = domain%update_T%nsend
nrecv = domain%update_T%nrecv
case (CORNER)
nsend = domain%update_C%nsend
nrecv = domain%update_C%nrecv
case (EAST)
nsend = domain%update_E%nsend
nrecv = domain%update_E%nrecv
case (NORTH)
nsend = domain%update_N%nsend
nrecv = domain%update_N%nrecv
case default
call mpp_error( FATAL, "mpp_domains_mod(mpp_get_update_size): invalid option of position")
end select
end subroutine mpp_get_update_size
!#############################################################################
subroutine mpp_get_update_pelist(domain, action, pelist, position)
type(domain2d), intent(in) :: domain
integer, intent(in) :: action
integer, intent(inout) :: pelist(:)
integer, optional, intent(in) :: position
type(overlapSpec), pointer :: update => NULL()
integer :: pos, p
pos = CENTER
if(present(position)) pos = position
select case(pos)
case (CENTER)
update => domain%update_T
case (CORNER)
update => domain%update_C
case (EAST)
update => domain%update_E
case (NORTH)
update => domain%update_N
case default
call mpp_error( FATAL, "mpp_domains_mod(mpp_get_update_pelist): invalid option of position")
end select
if(action == EVENT_SEND) then
if(size(pelist) .NE. update%nsend) call mpp_error( FATAL, &
"mpp_domains_mod(mpp_get_update_pelist): size of pelist does not match update%nsend")
do p = 1, update%nsend
pelist(p) = update%send(p)%pe
enddo
else if(action == EVENT_RECV) then
if(size(pelist) .NE. update%nrecv) call mpp_error( FATAL, &
"mpp_domains_mod(mpp_get_update_pelist): size of pelist does not match update%nrecv")
do p = 1, update%nrecv
pelist(p) = update%recv(p)%pe
enddo
else
call mpp_error( FATAL, "mpp_domains_mod(mpp_get_update_pelist): invalid option of action")
end if
end subroutine mpp_get_update_pelist
!#############################################################################
subroutine mpp_get_overlap(domain, action, p, is, ie, js, je, dir, rot, position)
type(domain2d), intent(in) :: domain
integer, intent(in) :: action
integer, intent(in) :: p
integer, dimension(:), intent(out) :: is, ie, js, je
integer, dimension(:), intent(out) :: dir, rot
integer, optional, intent(in) :: position
type(overlapSpec), pointer :: update => NULL()
type(overlap_type), pointer :: overlap => NULL()
integer :: count, pos
pos = CENTER
if(present(position)) pos = position
select case(pos)
case (CENTER)
update => domain%update_T
case (CORNER)
update => domain%update_C
case (EAST)
update => domain%update_E
case (NORTH)
update => domain%update_N
case default
call mpp_error( FATAL, "mpp_domains_mod(mpp_get_overlap): invalid option of position")
end select
if(action == EVENT_SEND) then
overlap => update%send(p)
else if(action == EVENT_RECV) then
overlap => update%recv(p)
else
call mpp_error( FATAL, "mpp_domains_mod(mpp_get_overlap): invalid option of action")
end if
count = overlap%count
if(size(is(:)) .NE. count .OR. size(ie (:)) .NE. count .OR. size(js (:)) .NE. count .OR. &
size(je(:)) .NE. count .OR. size(dir(:)) .NE. count .OR. size(rot(:)) .NE. count ) &
call mpp_error( FATAL, "mpp_domains_mod(mpp_get_overlap): size mismatch between number of overlap and array size")
is = overlap%is (1:count)
ie = overlap%ie (1:count)
js = overlap%js (1:count)
je = overlap%je (1:count)
dir = overlap%dir (1:count)
rot = overlap%rotation(1:count)
update => NULL()
overlap => NULL()
end subroutine mpp_get_overlap
!##################################################################
function mpp_get_domain_name(domain)
type(domain2d), intent(in) :: domain
character(len=NAME_LENGTH) :: mpp_get_domain_name
mpp_get_domain_name = domain%name
end function mpp_get_domain_name
!#################################################################
function mpp_get_domain_root_pe(domain)
type(domain2d), intent(in) :: domain
integer :: mpp_get_domain_root_pe
mpp_get_domain_root_pe = domain%list(0)%pe
end function mpp_get_domain_root_pe
!#################################################################
function mpp_get_domain_npes(domain)
type(domain2d), intent(in) :: domain
integer :: mpp_get_domain_npes
mpp_get_domain_npes = size(domain%list(:))
return
end function mpp_get_domain_npes
!################################################################
subroutine mpp_get_domain_pelist(domain, pelist)
type(domain2d), intent(in) :: domain
integer, intent(out) :: pelist(:)
integer :: p
if(size(pelist(:)) .NE. size(domain%list(:)) ) then
call mpp_error(FATAL, "mpp_get_domain_pelist: size(pelist(:)) .NE. size(domain%list(:)) ")
endif
do p = 0, size(domain%list(:))-1
pelist(p+1) = domain%list(p)%pe
enddo
return
end subroutine mpp_get_domain_pelist
!#################################################################
function mpp_get_io_domain_layout(domain)
type(domain2d), intent(in) :: domain
integer, dimension(2) :: mpp_get_io_domain_layout
mpp_get_io_domain_layout = domain%io_layout
end function mpp_get_io_domain_layout
!################################################################
function get_rank_send(domain, overlap_x, overlap_y, rank_x, rank_y, ind_x, ind_y)
type(domain2D), intent(in) :: domain
type(overlapSpec), intent(in) :: overlap_x, overlap_y
integer, intent(out) :: rank_x, rank_y, ind_x, ind_y
integer :: get_rank_send
integer :: nlist, nsend_x, nsend_y
nlist = size(domain%list(:))
nsend_x = overlap_x%nsend
nsend_y = overlap_y%nsend
rank_x = nlist+1
rank_y = nlist+1
if(nsend_x>0) rank_x = overlap_x%send(1)%pe - domain%pe
if(nsend_y>0) rank_y = overlap_y%send(1)%pe - domain%pe
if(rank_x .LT. 0) rank_x = rank_x + nlist
if(rank_y .LT. 0) rank_y = rank_y + nlist
get_rank_send = min(rank_x, rank_y)
ind_x = nsend_x + 1
ind_y = nsend_y + 1
if(get_rank_send < nlist+1) then
if(nsend_x>0) ind_x = 1
if(nsend_y>0) ind_y = 1
endif
end function get_rank_send
!############################################################################
function get_rank_recv(domain, overlap_x, overlap_y, rank_x, rank_y, ind_x, ind_y)
type(domain2D), intent(in) :: domain
type(overlapSpec), intent(in) :: overlap_x, overlap_y
integer, intent(out) :: rank_x, rank_y, ind_x, ind_y
integer :: get_rank_recv
integer :: nlist, nrecv_x, nrecv_y
nlist = size(domain%list(:))
nrecv_x = overlap_x%nrecv
nrecv_y = overlap_y%nrecv
rank_x = -1
rank_y = -1
if(nrecv_x>0) then
rank_x = overlap_x%recv(1)%pe - domain%pe
if(rank_x .LE. 0) rank_x = rank_x + nlist
endif
if(nrecv_y>0) then
rank_y = overlap_y%recv(1)%pe - domain%pe
if(rank_y .LE. 0) rank_y = rank_y + nlist
endif
get_rank_recv = max(rank_x, rank_y)
ind_x = nrecv_x + 1
ind_y = nrecv_y + 1
if(get_rank_recv < nlist+1) then
if(nrecv_x>0) ind_x = 1
if(nrecv_y>0) ind_y = 1
endif
end function get_rank_recv
function get_vector_recv(domain, update_x, update_y, ind_x, ind_y, start_pos, pelist)
type(domain2D), intent(in) :: domain
type(overlapSpec), intent(in) :: update_x, update_y
integer, intent(out) :: ind_x(:), ind_y(:)
integer, intent(out) :: start_pos(:)
integer, intent(out) :: pelist(:)
integer :: nlist, nrecv_x, nrecv_y, ntot, n
integer :: ix, iy, rank_x, rank_y, cur_pos
integer :: get_vector_recv
nlist = size(domain%list(:))
nrecv_x = update_x%nrecv
nrecv_y = update_y%nrecv
ntot = nrecv_x + nrecv_y
n = 1
ix = 1
iy = 1
ind_x = -1
ind_y = -1
get_vector_recv = 0
cur_pos = 0
do while (n<=ntot)
if(ix <= nrecv_x ) then
rank_x = update_x%recv(ix)%pe-domain%pe
if(rank_x .LE. 0) rank_x = rank_x + nlist
else
rank_x = -1
endif
if(iy <= nrecv_y ) then
rank_y = update_y%recv(iy)%pe-domain%pe
if(rank_y .LE. 0) rank_y = rank_y + nlist
else
rank_y = -1
endif
get_vector_recv = get_vector_recv + 1
start_pos(get_vector_recv) = cur_pos
if( rank_x == rank_y ) then
n = n+2
ind_x (get_vector_recv) = ix
ind_y (get_vector_recv) = iy
cur_pos = cur_pos + update_x%recv(ix)%totsize + update_y%recv(iy)%totsize
pelist(get_vector_recv) = update_x%recv(ix)%pe
ix = ix + 1
iy = iy + 1
else if ( rank_x > rank_y ) then
n = n+1
ind_x (get_vector_recv) = ix
ind_y (get_vector_recv) = -1
cur_pos = cur_pos + update_x%recv(ix)%totsize
pelist(get_vector_recv) = update_x%recv(ix)%pe
ix = ix + 1
else if ( rank_y > rank_x ) then
n = n+1
ind_x (get_vector_recv) = -1
ind_y (get_vector_recv) = iy
cur_pos = cur_pos + update_y%recv(iy)%totsize
pelist(get_vector_recv) = update_y%recv(iy)%pe
iy = iy+1
endif
end do
end function get_vector_recv
function get_vector_send(domain, update_x, update_y, ind_x, ind_y, start_pos, pelist)
type(domain2D), intent(in) :: domain
type(overlapSpec), intent(in) :: update_x, update_y
integer, intent(out) :: ind_x(:), ind_y(:)
integer, intent(out) :: start_pos(:)
integer, intent(out) :: pelist(:)
integer :: nlist, nsend_x, nsend_y, ntot, n
integer :: ix, iy, rank_x, rank_y, cur_pos
integer :: get_vector_send
nlist = size(domain%list(:))
nsend_x = update_x%nsend
nsend_y = update_y%nsend
ntot = nsend_x + nsend_y
n = 1
ix = 1
iy = 1
ind_x = -1
ind_y = -1
get_vector_send = 0
cur_pos = 0
do while (n<=ntot)
if(ix <= nsend_x ) then
rank_x = update_x%send(ix)%pe-domain%pe
if(rank_x .LT. 0) rank_x = rank_x + nlist
else
rank_x = nlist+1
endif
if(iy <= nsend_y ) then
rank_y = update_y%send(iy)%pe-domain%pe
if(rank_y .LT. 0) rank_y = rank_y + nlist
else
rank_y = nlist+1
endif
get_vector_send = get_vector_send + 1
start_pos(get_vector_send) = cur_pos
if( rank_x == rank_y ) then
n = n+2
ind_x (get_vector_send) = ix
ind_y (get_vector_send) = iy
cur_pos = cur_pos + update_x%send(ix)%totsize + update_y%send(iy)%totsize
pelist (get_vector_send) = update_x%send(ix)%pe
ix = ix + 1
iy = iy + 1
else if ( rank_x < rank_y ) then
n = n+1
ind_x (get_vector_send) = ix
ind_y (get_vector_send) = -1
cur_pos = cur_pos + update_x%send(ix)%totsize
pelist (get_vector_send) = update_x%send(ix)%pe
ix = ix + 1
else if ( rank_y < rank_x ) then
n = n+1
ind_x (get_vector_send) = -1
ind_y (get_vector_send) = iy
cur_pos = cur_pos + update_y%send(iy)%totsize
pelist (get_vector_send) = update_y%send(iy)%pe
iy = iy+1
endif
end do
end function get_vector_send
!############################################################################
function get_rank_unpack(domain, overlap_x, overlap_y, rank_x, rank_y, ind_x, ind_y)
type(domain2D), intent(in) :: domain
type(overlapSpec), intent(in) :: overlap_x, overlap_y
integer, intent(out) :: rank_x, rank_y, ind_x, ind_y
integer :: get_rank_unpack
integer :: nlist, nrecv_x, nrecv_y
nlist = size(domain%list(:))
nrecv_x = overlap_x%nrecv
nrecv_y = overlap_y%nrecv
rank_x = nlist+1
rank_y = nlist+1
if(nrecv_x>0) rank_x = overlap_x%recv(nrecv_x)%pe - domain%pe
if(nrecv_y>0) rank_y = overlap_y%recv(nrecv_y)%pe - domain%pe
if(rank_x .LE.0) rank_x = rank_x + nlist
if(rank_y .LE.0) rank_y = rank_y + nlist
get_rank_unpack = min(rank_x, rank_y)
ind_x = 0
ind_y = 0
if(get_rank_unpack < nlist+1) then
if(nrecv_x >0) ind_x = nrecv_x
if(nrecv_y >0) ind_y = nrecv_y
endif
end function get_rank_unpack
function get_mesgsize(overlap, do_dir)
type(overlap_type), intent(in) :: overlap
logical, intent(in) :: do_dir(:)
integer :: get_mesgsize
integer :: n, dir
get_mesgsize = 0
do n = 1, overlap%count
dir = overlap%dir(n)
if(do_dir(dir)) then
get_mesgsize = get_mesgsize + overlap%msgsize(n)
end if
end do
end function get_mesgsize
!#############################################################################
subroutine mpp_set_domain_symmetry(domain, symmetry)
type(domain2D), intent(inout) :: domain
logical, intent(in ) :: symmetry
domain%symmetry = symmetry
end subroutine mpp_set_domain_symmetry
subroutine mpp_copy_domain1D(domain_in, domain_out)
type(domain1D), intent(in) :: domain_in
type(domain1D), intent(inout) :: domain_out
domain_out%compute = domain_in%compute
domain_out%data = domain_in%data
domain_out%global = domain_in%global
domain_out%memory = domain_in%memory
domain_out%cyclic = domain_in%cyclic
domain_out%pe = domain_in%pe
domain_out%pos = domain_in%pos
end subroutine mpp_copy_domain1D
!#################################################################
!z1l: This is not fully implemented. The current purpose is to make
! it work in read_data.
subroutine mpp_copy_domain2D(domain_in, domain_out)
type(domain2D), intent(in) :: domain_in
type(domain2D), intent(inout) :: domain_out
integer :: n, ntiles
domain_out%id = domain_in%id
domain_out%pe = domain_in%pe
domain_out%fold = domain_in%fold
domain_out%pos = domain_in%pos
domain_out%symmetry = domain_in%symmetry
domain_out%whalo = domain_in%whalo
domain_out%ehalo = domain_in%ehalo
domain_out%shalo = domain_in%shalo
domain_out%nhalo = domain_in%nhalo
domain_out%ntiles = domain_in%ntiles
domain_out%max_ntile_pe = domain_in%max_ntile_pe
domain_out%ncontacts = domain_in%ncontacts
domain_out%rotated_ninety = domain_in%rotated_ninety
domain_out%initialized = domain_in%initialized
domain_out%tile_root_pe = domain_in%tile_root_pe
domain_out%io_layout = domain_in%io_layout
domain_out%name = domain_in%name
ntiles = size(domain_in%x(:))
allocate(domain_out%x(ntiles), domain_out%y(ntiles), domain_out%tile_id(ntiles) )
do n = 1, ntiles
call mpp_copy_domain1D(domain_in%x(n), domain_out%x(n))
call mpp_copy_domain1D(domain_in%y(n), domain_out%y(n))
enddo
domain_out%tile_id = domain_in%tile_id
return
end subroutine mpp_copy_domain2D
!######################################################################
subroutine set_group_update(group, domain)
type(mpp_group_update_type), intent(inout) :: group
type(domain2D), intent(inout) :: domain
integer :: nscalar, nvector, nlist
integer :: nsend, nrecv, nsend_old, nrecv_old
integer :: nsend_s, nsend_x, nsend_y
integer :: nrecv_s, nrecv_x, nrecv_y
integer :: update_buffer_pos, tot_recv_size, tot_send_size
integer :: msgsize_s, msgsize_x, msgsize_y, msgsize
logical :: recv_s(8), send_s(8)
logical :: recv_x(8), send_x(8), recv_y(8), send_y(8)
integer :: ntot, n, l, m, ksize
integer :: i_s, i_x, i_y, rank_s, rank_x, rank_y, rank
integer :: ind_s(3*MAXOVERLAP)
integer :: ind_x(3*MAXOVERLAP)
integer :: ind_y(3*MAXOVERLAP)
integer :: pelist(3*MAXOVERLAP), to_pe_list(3*MAXOVERLAP)
integer :: buffer_pos_recv(3*MAXOVERLAP), buffer_pos_send(3*MAXOVERLAP)
integer :: recv_size(3*MAXOVERLAP), send_size(3*MAXOVERLAP)
integer :: position_x, position_y, npack, nunpack, dir
integer :: pack_buffer_pos, unpack_buffer_pos
integer :: omp_get_num_threads, nthreads
character(len=8) :: text
type(overlap_type), pointer :: overPtr => NULL()
type(overlapSpec), pointer :: update_s => NULL()
type(overlapSpec), pointer :: update_x => NULL()
type(overlapSpec), pointer :: update_y => NULL()
nscalar = group%nscalar
nvector = group%nvector
!--- get the overlap data type
select case(group%gridtype)
case (AGRID)
position_x = CENTER
position_y = CENTER
case (BGRID_NE, BGRID_SW)
position_x = CORNER
position_y = CORNER
case (CGRID_NE, CGRID_SW)
position_x = EAST
position_y = NORTH
case (DGRID_NE, DGRID_SW)
position_x = NORTH
position_y = EAST
case default
call mpp_error(FATAL, "set_group_update: invalid value of gridtype")
end select
if(nscalar>0) then
update_s => search_update_overlap(domain, group%whalo_s, group%ehalo_s, &
group%shalo_s, group%nhalo_s, group%position)
endif
if(nvector>0) then
update_x => search_update_overlap(domain, group%whalo_v, group%ehalo_v, &
group%shalo_v, group%nhalo_v, position_x)
update_y => search_update_overlap(domain, group%whalo_v, group%ehalo_v, &
group%shalo_v, group%nhalo_v, position_y)
endif
if(nscalar > 0) then
recv_s = group%recv_s
send_s = recv_s
endif
if(nvector > 0) then
recv_x = group%recv_x
send_x = recv_x
recv_y = group%recv_y
send_y = recv_y
end if
nlist = size(domain%list(:))
group%initialized = .true.
nsend_s = 0; nsend_x = 0; nsend_y = 0
nrecv_s = 0; nrecv_x = 0; nrecv_y = 0
if(nscalar > 0) then
!--- This check could not be done because of memory domain
! if( group%isize_s .NE. (group%ie_s-group%is_s+1) .OR. group%jsize_s .NE. (group%je_s-group%js_s+1)) &
! call mpp_error(FATAL, "set_group_update: mismatch of size of the field and domain memory domain")
nsend_s = update_s%nsend
nrecv_s = update_s%nrecv
endif
!--- ksize_s must equal ksize_v
if(nvector > 0 .AND. nscalar > 0) then
if(group%ksize_s .NE. group%ksize_v) then
call mpp_error(FATAL, "set_group_update: ksize_s and ksize_v are not equal")
endif
ksize = group%ksize_s
else if (nscalar > 0) then
ksize = group%ksize_s
else if (nvector > 0) then
ksize = group%ksize_v
else
call mpp_error(FATAL, "set_group_update: nscalar and nvector are all 0")
endif
nthreads = 1
!$OMP PARALLEL
!$ nthreads = omp_get_num_threads()
!$OMP END PARALLEL
if( nthreads > nthread_control_loop ) then
group%k_loop_inside = .FALSE.
else
group%k_loop_inside = .TRUE.
endif
if(nvector > 0) then
!--- This check could not be done because of memory domain
! if( group%isize_x .NE. (group%ie_x-group%is_x+1) .OR. group%jsize_x .NE. (group%je_x-group%js_x+1)) &
! call mpp_error(FATAL, "set_group_update: mismatch of size of the fieldx and domain memory domain")
! if( group%isize_y .NE. (group%ie_y-group%is_y+1) .OR. group%jsize_y .NE. (group%je_y-group%js_y+1)) &
! call mpp_error(FATAL, "set_group_update: mismatch of size of the fieldy and domain memory domain")
nsend_x = update_x%nsend
nrecv_x = update_x%nrecv
nsend_y = update_y%nsend
nrecv_y = update_y%nrecv
endif
!figure out message size for each processor.
ntot = nrecv_s + nrecv_x + nrecv_y
if(ntot > 3*MAXOVERLAP) call mpp_error(FATAL, "set_group_update: ntot is greater than 3*MAXOVERLAP")
n = 1
i_s = 1
i_x = 1
i_y = 1
ind_s = -1
ind_x = -1
ind_y = -1
nrecv = 0
do while(n<=ntot)
if( i_s <= nrecv_s ) then
rank_s = update_s%recv(i_s)%pe-domain%pe
if(rank_s .LE. 0) rank_s = rank_s + nlist
else
rank_s = -1
endif
if( i_x <= nrecv_x ) then
rank_x = update_x%recv(i_x)%pe-domain%pe
if(rank_x .LE. 0) rank_x = rank_x + nlist
else
rank_x = -1
endif
if( i_y <= nrecv_y ) then
rank_y = update_y%recv(i_y)%pe-domain%pe
if(rank_y .LE. 0) rank_y = rank_y + nlist
else
rank_y = -1
endif
nrecv = nrecv + 1
rank = maxval((/rank_s, rank_x, rank_y/))
if(rank == rank_s) then
n = n + 1
ind_s(nrecv) = i_s
pelist(nrecv) = update_s%recv(i_s)%pe
i_s = i_s + 1
endif
if(rank == rank_x) then
n = n + 1
ind_x(nrecv) = i_x
pelist(nrecv) = update_x%recv(i_x)%pe
i_x = i_x + 1
endif
if(rank == rank_y) then
n = n + 1
ind_y(nrecv) = i_y
pelist(nrecv) = update_y%recv(i_y)%pe
i_y = i_y + 1
endif
enddo
nrecv_old = nrecv
nrecv = 0
update_buffer_pos = 0
tot_recv_size = 0
!--- setup for recv
do l = 1, nrecv_old
msgsize_s = 0
msgsize_x = 0
msgsize_y = 0
m = ind_s(l)
if(m>0) msgsize_s = get_mesgsize(update_s%recv(m), recv_s)*ksize*nscalar
m = ind_x(l)
if(m>0) msgsize_x = get_mesgsize(update_x%recv(m), recv_x)*ksize*nvector
m = ind_y(l)
if(m>0) msgsize_y = get_mesgsize(update_y%recv(m), recv_y)*ksize*nvector
msgsize = msgsize_s + msgsize_x + msgsize_y
if( msgsize.GT.0 )then
tot_recv_size = tot_recv_size + msgsize
nrecv = nrecv + 1
if(nrecv > MAXOVERLAP) then
call mpp_error(FATAL, "set_group_update: nrecv is greater than MAXOVERLAP, increase MAXOVERLAP")
endif
group%from_pe(nrecv) = pelist(l)
group%recv_size(nrecv) = msgsize
group%buffer_pos_recv(nrecv) = update_buffer_pos
update_buffer_pos = update_buffer_pos + msgsize
end if
end do
group%nrecv = nrecv
!--- setup for unpack
nunpack = 0
unpack_buffer_pos = 0
do l = 1, nrecv_old
m = ind_s(l)
if(m>0) then
overptr => update_s%recv(m)
do n = 1, overptr%count
dir = overptr%dir(n)
if(recv_s(dir)) then
nunpack = nunpack + 1
if(nunpack > MAXOVERLAP) call mpp_error(FATAL, &
"set_group_update: nunpack is greater than MAXOVERLAP, increase MAXOVERLAP 1")
group%unpack_type(nunpack) = FIELD_S
group%unpack_buffer_pos(nunpack) = unpack_buffer_pos
group%unpack_rotation(nunpack) = overptr%rotation(n)
group%unpack_is(nunpack) = overptr%is(n)
group%unpack_ie(nunpack) = overptr%ie(n)
group%unpack_js(nunpack) = overptr%js(n)
group%unpack_je(nunpack) = overptr%je(n)
group%unpack_size(nunpack) = overptr%msgsize(n)*nscalar
unpack_buffer_pos = unpack_buffer_pos + group%unpack_size(nunpack)*ksize
end if
end do
end if
m = ind_x(l)
if(m>0) then
overptr => update_x%recv(m)
do n = 1, overptr%count
dir = overptr%dir(n)
if(recv_x(dir)) then
nunpack = nunpack + 1
if(nunpack > MAXOVERLAP) call mpp_error(FATAL, &
"set_group_update: nunpack is greater than MAXOVERLAP, increase MAXOVERLAP 2")
group%unpack_type(nunpack) = FIELD_X
group%unpack_buffer_pos(nunpack) = unpack_buffer_pos
group%unpack_rotation(nunpack) = overptr%rotation(n)
group%unpack_is(nunpack) = overptr%is(n)
group%unpack_ie(nunpack) = overptr%ie(n)
group%unpack_js(nunpack) = overptr%js(n)
group%unpack_je(nunpack) = overptr%je(n)
group%unpack_size(nunpack) = overptr%msgsize(n)*nvector
unpack_buffer_pos = unpack_buffer_pos + group%unpack_size(nunpack)*ksize
end if
end do
end if
m = ind_y(l)
if(m>0) then
overptr => update_y%recv(m)
do n = 1, overptr%count
dir = overptr%dir(n)
if(recv_y(dir)) then
nunpack = nunpack + 1
if(nunpack > MAXOVERLAP) call mpp_error(FATAL, &
"set_group_update: nunpack is greater than MAXOVERLAP, increase MAXOVERLAP 3")
group%unpack_type(nunpack) = FIELD_Y
group%unpack_buffer_pos(nunpack) = unpack_buffer_pos
group%unpack_rotation(nunpack) = overptr%rotation(n)
group%unpack_is(nunpack) = overptr%is(n)
group%unpack_ie(nunpack) = overptr%ie(n)
group%unpack_js(nunpack) = overptr%js(n)
group%unpack_je(nunpack) = overptr%je(n)
group%unpack_size(nunpack) = overptr%msgsize(n)*nvector
unpack_buffer_pos = unpack_buffer_pos + group%unpack_size(nunpack)*ksize
end if
end do
end if
end do
group%nunpack = nunpack
if(update_buffer_pos .NE. unpack_buffer_pos ) call mpp_error(FATAL, &
"set_group_update: update_buffer_pos .NE. unpack_buffer_pos")
!figure out message size for each processor.
ntot = nsend_s + nsend_x + nsend_y
n = 1
i_s = 1
i_x = 1
i_y = 1
ind_s = -1
ind_x = -1
ind_y = -1
nsend = 0
do while(n<=ntot)
if( i_s <= nsend_s ) then
rank_s = update_s%send(i_s)%pe-domain%pe
if(rank_s .LT. 0) rank_s = rank_s + nlist
else
rank_s = nlist+1
endif
if( i_x <= nsend_x ) then
rank_x = update_x%send(i_x)%pe-domain%pe
if(rank_x .LT. 0) rank_x = rank_x + nlist
else
rank_x = nlist+1
endif
if( i_y <= nsend_y ) then
rank_y = update_y%send(i_y)%pe-domain%pe
if(rank_y .LT. 0) rank_y = rank_y + nlist
else
rank_y = nlist+1
endif
nsend = nsend + 1
rank = minval((/rank_s, rank_x, rank_y/))
if(rank == rank_s) then
n = n + 1
ind_s(nsend) = i_s
pelist(nsend) = update_s%send(i_s)%pe
i_s = i_s + 1
endif
if(rank == rank_x) then
n = n + 1
ind_x(nsend) = i_x
pelist(nsend) = update_x%send(i_x)%pe
i_x = i_x + 1
endif
if(rank == rank_y) then
n = n + 1
ind_y(nsend) = i_y
pelist(nsend) = update_y%send(i_y)%pe
i_y = i_y + 1
endif
enddo
nsend_old = nsend
nsend = 0
tot_send_size = 0
do l = 1, nsend_old
msgsize_s = 0
msgsize_x = 0
msgsize_y = 0
m = ind_s(l)
if(m>0) msgsize_s = get_mesgsize(update_s%send(m), send_s)*ksize*nscalar
m = ind_x(l)
if(m>0) msgsize_x = get_mesgsize(update_x%send(m), send_x)*ksize*nvector
m = ind_y(l)
if(m>0) msgsize_y = get_mesgsize(update_y%send(m), send_y)*ksize*nvector
msgsize = msgsize_s + msgsize_x + msgsize_y
if( msgsize.GT.0 )then
tot_send_size = tot_send_size + msgsize
nsend = nsend + 1
if(nsend > MAXOVERLAP) then
call mpp_error(FATAL, "set_group_update: nsend is greater than MAXOVERLAP, increase MAXOVERLAP")
endif
send_size(nsend) = msgsize
group%to_pe(nsend) = pelist(l)
group%buffer_pos_send(nsend) = update_buffer_pos
group%send_size(nsend) = msgsize
update_buffer_pos = update_buffer_pos + msgsize
end if
end do
group%nsend = nsend
!--- setup for pack
npack = 0
pack_buffer_pos = unpack_buffer_pos
do l = 1, nsend_old
m = ind_s(l)
if(m>0) then
overptr => update_s%send(m)
do n = 1, overptr%count
dir = overptr%dir(n)
if(send_s(dir)) then
npack = npack + 1
if(npack > MAXOVERLAP) call mpp_error(FATAL, &
"set_group_update: npack is greater than MAXOVERLAP, increase MAXOVERLAP 1")
group%pack_type(npack) = FIELD_S
group%pack_buffer_pos(npack) = pack_buffer_pos
group%pack_rotation(npack) = overptr%rotation(n)
group%pack_is(npack) = overptr%is(n)
group%pack_ie(npack) = overptr%ie(n)
group%pack_js(npack) = overptr%js(n)
group%pack_je(npack) = overptr%je(n)
group%pack_size(npack) = overptr%msgsize(n)*nscalar
pack_buffer_pos = pack_buffer_pos + group%pack_size(npack)*ksize
end if
end do
end if
m = ind_x(l)
if(m>0) then
overptr => update_x%send(m)
do n = 1, overptr%count
dir = overptr%dir(n)
!--- nonsym_edge update is not for rotation of 90 or -90 degree ( cubic sphere grid )
if( group%nonsym_edge .and. (overptr%rotation(n)==NINETY .or. &
overptr%rotation(n)==MINUS_NINETY) ) then
call mpp_error(FATAL, 'set_group_update: flags=NONSYMEDGEUPDATE is not compatible '// &
'with 90 or -90 degree rotation (normally cubic sphere grid' )
endif
if(send_x(dir)) then
npack = npack + 1
if(npack > MAXOVERLAP) call mpp_error(FATAL, &
"set_group_update: npack is greater than MAXOVERLAP, increase MAXOVERLAP 2")
group%pack_type(npack) = FIELD_X
group%pack_buffer_pos(npack) = pack_buffer_pos
group%pack_rotation(npack) = overptr%rotation(n)
group%pack_is(npack) = overptr%is(n)
group%pack_ie(npack) = overptr%ie(n)
group%pack_js(npack) = overptr%js(n)
group%pack_je(npack) = overptr%je(n)
group%pack_size(npack) = overptr%msgsize(n)*nvector
pack_buffer_pos = pack_buffer_pos + group%pack_size(npack)*ksize
end if
end do
end if
m = ind_y(l)
if(m>0) then
overptr => update_y%send(m)
do n = 1, overptr%count
dir = overptr%dir(n)
if( group%nonsym_edge .and. (overptr%rotation(n)==NINETY .or. &
overptr%rotation(n)==MINUS_NINETY) ) then
call mpp_error(FATAL, 'set_group_update: flags=NONSYMEDGEUPDATE is not compatible '// &
'with 90 or -90 degree rotation (normally cubic sphere grid' )
endif
if(send_y(dir)) then
npack = npack + 1
if(npack > MAXOVERLAP) call mpp_error(FATAL, &
"set_group_update: npack is greater than MAXOVERLAP, increase MAXOVERLAP 3")
group%pack_type(npack) = FIELD_Y
group%pack_buffer_pos(npack) = pack_buffer_pos
group%pack_rotation(npack) = overptr%rotation(n)
group%pack_is(npack) = overptr%is(n)
group%pack_ie(npack) = overptr%ie(n)
group%pack_js(npack) = overptr%js(n)
group%pack_je(npack) = overptr%je(n)
group%pack_size(npack) = overptr%msgsize(n)*nvector
pack_buffer_pos = pack_buffer_pos + group%pack_size(npack)*ksize
end if
end do
end if
end do
group%npack = npack
if(update_buffer_pos .NE. pack_buffer_pos ) call mpp_error(FATAL, &
"set_group_update: update_buffer_pos .NE. pack_buffer_pos")
!--- make sure the buffer is large enough
mpp_domains_stack_hwm = max( mpp_domains_stack_hwm, tot_recv_size+tot_send_size )
if( mpp_domains_stack_hwm.GT.mpp_domains_stack_size )then
write( text,'(i8)' )mpp_domains_stack_hwm
call mpp_error( FATAL, 'set_group_update: mpp_domains_stack overflow, '// &
'call mpp_domains_set_stack_size('//trim(text)//') from all PEs.' )
end if
group%tot_msgsize = tot_recv_size+tot_send_size
end subroutine set_group_update
!######################################################################
subroutine mpp_clear_group_update(group)
type(mpp_group_update_type), intent(inout) :: group
group%nscalar = 0
group%nvector = 0
group%nsend = 0
group%nrecv = 0
group%npack = 0
group%nunpack = 0
group%initialized = .false.
end subroutine mpp_clear_group_update
!#####################################################################
function mpp_group_update_initialized(group)
type(mpp_group_update_type), intent(in) :: group
logical :: mpp_group_update_initialized
mpp_group_update_initialized = group%initialized
end function mpp_group_update_initialized
!#####################################################################
function mpp_group_update_is_set(group)
type(mpp_group_update_type), intent(in) :: group
logical :: mpp_group_update_is_set
mpp_group_update_is_set = (group%nscalar > 0 .OR. group%nvector > 0)
end function mpp_group_update_is_set