In [None]:
%matplotlib inline


# Topography source

This is a simple topography source case showing the ability of freshkiss3d to apply
source terms on toporgaphy to simulate bed movement induced phenomena like tsunamis.


In [None]:
import os, sys
import argparse
import numpy as np
import random as rd
import matplotlib.pylab as plt
import freshkiss3d as fk
import freshkiss3d.extra.plots as fk_plt
from scipy.interpolate import griddata

os.system('rm -r outputs')

#sphinx_gallery_thumbnail_number = 5
parser = argparse.ArgumentParser()
parser.add_argument('--nographics', action='store_true')
args = parser.parse_args()



def load_landslice(dir_path_z_variations):
    files_path_z_variations = os.listdir(dir_path_z_variations)
     
    times = []
    xt_grid = []
    yt_grid = []
    zbt_grid = []
    for ffile in files_path_z_variations:
        path_ffile = dir_path_z_variations+'/'+ffile
        
        x_grid = []
        y_grid = []
        zb_grid = []
        with open(path_ffile, "r") as f:
            time = f.readline()
            time = float(time)
            times.append(time)
            
            lines = f.readlines()
            for line in lines:    
                line = line.split(";")
                line = [float(i) for i in line]
                x_grid.append(line[0])
                y_grid.append(line[1])
                zb_grid.append(line[2])
            x_grid = np.array(x_grid, dtype='d')
            y_grid = np.array(y_grid, dtype='d')
            zb_grid = np.array(zb_grid, dtype='d')
            
            xt_grid.append(x_grid)
            yt_grid.append(y_grid)
            zbt_grid.append(zb_grid)
            
    xt_grid = [i for _,i in sorted(zip(times, xt_grid))]
    yt_grid = [i for _,i in sorted(zip(times, yt_grid))]
    zbt_grid = [i for _,i in sorted(zip(times, zbt_grid))]
    times = sorted(times)
    
    return xt_grid, yt_grid, zbt_grid, times



def msh_points_into_data_domain(x_msh, y_msh, max_x_grid, min_x_grid, \
    max_y_grid, min_y_grid):
        
    x_msh_in_grid = []
    y_msh_in_grid = []
    C_msh_in_grid = []
    C = 0
    for x, y in zip(x_msh, y_msh):
        if min_x_grid < x < max_x_grid:
            if min_y_grid < y < max_y_grid:
                x_msh_in_grid.append(x)
                y_msh_in_grid.append(y)
                C_msh_in_grid.append(C)
        C += 1
    return x_msh_in_grid, y_msh_in_grid, C_msh_in_grid
           
           
            
def load_zb_init(x_msh, y_msh, topo_xy, x_msh_in_grid, y_msh_in_grid, \
    zt_msh_in_grid):
    
    NC = len(x_msh) 
    zb = np.empty(NC, dtype = 'd')
    for C in range(NC):
        zb[C] = topo_xy(x_msh[C], y_msh[C])
    
    zb_0 = {}
    c = 0
    for x_fk, y_fk in zip(x_msh, y_msh):
        zb_0[(x_fk, y_fk)] = zb[c]
        c += 1

    c = 0
    for x, y in zip(x_msh_in_grid, y_msh_in_grid):
        if (x, y) in zb_0:
            zb_0[(x, y)] += zt_msh_in_grid[0][c] 
        c += 1
        
    zb_0 = np.array(list(zb_0.values()),dtype='d') 
    return zb_0  




def interpolate_topography_data_on_freshkiss_mesh(x_msh, y_msh, xt_data, yt_data, \
    zbt_data, interpolate_method='nearest'):

    if interpolate_method not in ('nearest', 'linear', 'cubic'):
        raise ValueError(
            "Choose between these interpolate methods :\n"
            "'nearest', 'linear', 'cubic'\n"
            )
            
    zt_msh = []
    if interpolate_method=='nearest':    
        for x_data, y_data, zb_data in zip(xt_data, yt_data, zbt_data):       
            zt_msh.append(griddata((x_data, y_data), \
                zb_data, (x_msh, y_msh), \
                method = 'nearest'))

    if interpolate_method=='linear':
        for x_data, y_data, zb_data in zip(xt_data, yt_data, zbt_data):       
            zt_msh.append(griddata((x_data, y_data), \
                zb_data, (x_msh, y_msh), \
                method = 'linear'))

    if interpolate_method=='cubic':
        for x_data, y_data, zb_data in zip(xt_data, yt_data, zbt_data):       
            zt_msh.append(griddata((x_data, y_data), \
                zb_data, (x_msh, y_msh), \
                method = 'cubic'))

    print("Using method {} to set data to Freshkiss3d.\n"\
    .format(interpolate_method))
    
    return zt_msh

## Time loop:



In [None]:
FINAL_TIME = 15.
simutime = fk.SimuTime(final_time=FINAL_TIME, time_iteration_max=20000, \
second_order=True)

create_figure_scheduler = fk.schedules(times=[0., 1., 5., 9., 12., 15.])

## Mesh:



In [None]:
dir_path = os.path.abspath(os.path.dirname(sys.argv[0]))
TG, vertex_labels, boundary_labels = fk.read_msh(dir_path + '/inputs/square2.mesh')
x = np.asarray(TG.x)
y = np.asarray(TG.y)

x *= 10.
y *= 10.

triangular_mesh = fk.TriangularMesh(TG, vertex_labels, boundary_labels)

if not args.nographics:
  fk_plt.plot_mesh(triangular_mesh)


x_msh = np.asarray(TG.x)
y_msh = np.asarray(TG.y)

## Topography source term: 

``ExternalEffects`` objects are called in ``problem.solve()`` at each time step. In
this example external effects are ``TopographySource``.
There can be as much external effects as needed as long as they are stored in a
dictionary type.

Let us consider four cases:

 * In the first case we define continuous topography source over time to simulate a slowly moving bed.
 * In the second case we define sparse topography source over time to simulate sudden modification of the bed like a landslide.
 * In the third case we read several files which contain the topography coordinates (x, y, zb) 
 * for several times for the whole domain.
 * In the fourth case we read several files which contain the topography coordinates (x, y, zb) 
 * for several times for the moved part of the domain

<div class="alert alert-info"><h4>Note</h4><p>Note that in both cases (continuous or sparse over time), the source can either be 
   homogeneous or heterogeneous in space i.e. user can set either a constant set of sources 
   with the ``sources`` parameter or a function (or set of function) of x, y with
   the ``sources_funct`` parameter. User can also set a vector of size (NC) containing
   the source term on each cell of the mesh with the ``sources_cell`` parameter.
   For given coordinates, we can interpolate the data on the mesh with different methods :
   nearest (neighbour), linear or cubic.</p></div>



In [None]:
CASE = 4


if CASE == 1:
    # The bed movement rate in set in m/s with ``source_unit='m/s'``.
    def bed_source_rate(x, y, x_0=90., y_0=100.):
        return .2*np.exp(-.005*((x-(x_0+5.))**2+(y-y_0)**2)) \
             - .2*np.exp(-.005*((x-(x_0-5.))**2+(y-y_0)**2))

    if not args.nographics:
      fk_plt.plot_init_1d(triangular_mesh, bed_source_rate, 
                          absc='x', ordo='Bed source rate $m/s$', title='Source term')
    
    
    continuously_moving_bed = fk.TopographySource(
        times=[1., 10.],
        sources_funct=bed_source_rate,
        source_unit='m/s')
    external_effects = {"bed": continuously_moving_bed}



elif CASE == 2:
    # The landslide in set in m with ``source_unit='m'`` and occurs at :math:`t=1s`
    def landslide_source(x, y, x_0=90., y_0=100.):
        return 2.0*np.exp(-.005*((x-(x_0+5.))**2+(y-y_0)**2)) \
             - 2.0*np.exp(-.005*((x-(x_0-5.))**2+(y-y_0)**2))

    if not args.nographics:
      fk_plt.plot_init_1d(triangular_mesh, landslide_source, 
                          absc='x', ordo='Landslide source $m$', title='Source term')
    
    
    tsunami = fk.TopographySource(
        times=[1.],
        sources_funct=landslide_source,
        source_unit='m')
    external_effects = {"tsunami": tsunami}


    
elif CASE == 3:
    dir_path_z_variations = dir_path + "/inputs/landslide"
    
    xt_grid, yt_grid, zbt_grid, times = \
        load_landslice(dir_path_z_variations)
    
    zt_msh = interpolate_topography_data_on_freshkiss_mesh(\
        x_msh, y_msh, xt_grid, yt_grid, zbt_grid)

    Dzt_msh = []
    for i in range(len(zt_msh)-1):
        Dzt_msh.append(zt_msh[i+1] - zt_msh[i])
    
    tsunami = fk.TopographySource(
        times = times[1::],
        sources_cell = Dzt_msh,
        source_unit = 'm')
    external_effects = {"tsunami": tsunami}
    


elif CASE == 4:
    dir_path_z_variations = dir_path + "/inputs/landslide_part_domain"
    
    xt_grid, yt_grid, zbt_grid, times = \
        load_landslice(dir_path_z_variations)

    max_x_grid = max(xt_grid[0])
    min_x_grid = min(xt_grid[0])
    max_y_grid = max(yt_grid[0])
    min_y_grid = min(yt_grid[0])
            
    x_msh_in_grid, y_msh_in_grid, C_msh_in_grid = \
        msh_points_into_data_domain(x_msh, y_msh, max_x_grid, \
        min_x_grid, max_y_grid, min_y_grid)    
    
    zt_msh_in_grid = interpolate_topography_data_on_freshkiss_mesh(\
        x_msh_in_grid, y_msh_in_grid, xt_grid, yt_grid, zbt_grid)
    
    Dzt_msh = []
    for i in range(len(zt_msh_in_grid)-1):
        Dzt_msh.append(zt_msh_in_grid[i+1] - zt_msh_in_grid[i])
    
    tsunami = fk.TopographySource(
        times = times[1::],
        sources_cell = Dzt_msh,
        cells = C_msh_in_grid,
        source_unit = 'm')
    external_effects = {"tsunami": tsunami}
    


else:
    external_effects = None

## Layers:



In [None]:
FREE_SURFACE = 8.0
X_1 = 10. ; X_2 = 100.
def topo(x, y):
    if x < X_1:
        topo = 10.0
    elif x > X_2:
        topo = 0.0
    else:
        topo = -(10.0/(X_2-X_1))*(x - X_2)
    return topo

NL = 2
    
if CASE == 3:
    NC = triangular_mesh.NV
    zb_0 = np.zeros(NC, dtype='d')
    for C in range(NC):
        zb_0[C] = topo(x_msh[C], y_msh[C]) + zt_msh[0][C]

    layer = fk.Layer(NL, triangular_mesh, topography=zb_0)
    
    
elif CASE == 4:
    zb_0 = load_zb_init(x_msh, y_msh, topo, x_msh_in_grid, y_msh_in_grid, \
        zt_msh_in_grid)
    
    layer = fk.Layer(NL, triangular_mesh, topography=zb_0)
    

else:
    layer = fk.Layer(NL, triangular_mesh, topography_funct=topo)

## Primitives:



In [None]:
primitives = fk.Primitive(triangular_mesh, layer, free_surface=FREE_SURFACE)

## Boundary conditions:



In [None]:
slides = [fk.Slide(ref=r) for r in [1,2,3,4]]

## Writter:



In [None]:
vtk_writer = fk.VTKWriter(triangular_mesh, scheduler=fk.schedules(count=10),
                          scale_h=3.)

## Problem definition:



In [None]:
problem = fk.Problem(simutime, triangular_mesh,
                     layer, primitives,
                     slides=slides,
                     external_effects=external_effects,
                     vtk_writer=vtk_writer,
                     custom_funct={'plot':fk_plt.plot_freesurface_topo_3d},
                     custom_funct_scheduler=create_figure_scheduler)

## Problem solving:



In [None]:
problem.solve()
if not args.nographics:
  plt.show()