|
气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。 k8 |) k3 K: X; a
2 g" l' r3 }- V( }! }& ]9 h) \
大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。 3 a8 I6 I6 q+ D7 @) `* t# `
* i& j4 h2 Y2 z3 S5 \6 _ 本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。 0 }% G. v$ j+ z
/ b ?8 c+ U+ I. n; b
数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。
: s/ X6 i( \: ? 其中几个关键点解释一下: ) U$ m8 ^* V) N7 K8 l6 o/ j
(1)首先定义拟合正太分布的函数 + D$ a g' p5 A) N) e9 ?5 ^% I: G
def norm_fit(data):
# q; v) c+ |9 T/ n6 w7 u" m ~4 `% s loc, scale = st.norm.fit(data) K9 l2 k$ D) m5 _6 [0 n9 ^, z3 d0 A+ M
return np.array([loc, scale]) 4 B) K; k' U5 d2 v
这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。
7 z$ n/ x6 m+ g8 |! Q& P (2)xarray分块
( J8 q3 Q5 [7 T) | T! d \7 H x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
5 Z$ u/ W [0 U- g7 R7 w. t x = x.chunk({"lev":5, "lat":100,"lon":100})
) Q. X' t; d# K, e1 x xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。
) a+ J. X5 E* Z (3)xarray.apply_ufunc函数
2 ~" Q" h3 g1 p9 f) |9 j" ?; q result = xr.apply_ufunc(norm_fit,
2 O' v: ?$ }7 v4 d6 A- F7 p, Z x,1 ~' i2 X9 |& f
input_core_dims=[["time"]],2 {8 `8 L- U! s% m( i" M3 |
output_core_dims=[["paras"]]," B; r' p3 |7 }
dask="parallelized",
% d. @4 n& Z) z. J output_dtypes=[np.float],
0 N) h$ |5 @& J* w3 J5 p dask_gufunc_kwargs={output_sizes:{"paras":2}}
6 w% i# R+ [% F( k. Q! a0 R- A )
! Z3 S" v( \% A# ?0 g apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点) % c9 N$ \, Q5 T8 Q4 |- ^* j2 p
+ G# e+ D0 d7 P& C+ G( T# [/ @9 U; \ 以下是全部代码: 1 y) S9 W; C9 _3 ~6 y e' H! g6 T4 }
from scipy import stats as st: V) d7 R9 T; k/ J
+ z6 u1 I: _( A9 |( v2 k
import xarray as xr
6 E7 A! i8 y7 t. r import dask: }# M$ n3 Q+ \. {
import numpy as np
, H" n* h' z, l from dask.diagnostics import ProgressBar
G/ [* e- F, m! o9 z2 N+ D# N# \4 ~1 ?' T, W* ]8 \
def norm_fit(data):
- W0 S) j! B# n2 }3 o* A loc, scale = st.norm.fit(data)7 b% M' V( N( B4 R
return np.array([loc, scale])+ c# d) J6 r; q* W0 y$ b
5 s/ J. B ?% w, I+ ^5 B" n; l8 a' p B
rs = np.random.RandomState(0)
) K* t& L/ Y5 l* e5 i% ?$ ] x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])* j6 X" I5 f5 m
x = x.chunk({"lev":5, "lat":100,"lon":100})
1 J: J; z. ^% f v# m- Y9 U% h9 ]# y0 `7 u' t( j" K
#使用apply_ufunc计算,并用dask的并行计算' ?! w& X) h5 s
result = xr.apply_ufunc(norm_fit,
7 x! z5 z# J( l) G2 F1 s9 X" r, r' {- b x,
1 s2 w' ]/ |" ]% }% i% ? input_core_dims=[["time"]],8 [) q- }" X% D( q* E" W. a8 n
output_core_dims=[["paras"]],2 _) o% E! M& R# b( S& D
dask="parallelized",
$ B) U: I2 v4 h! a% P2 u output_dtypes=[np.float],
& u/ J1 N* j. v5 d( | dask_gufunc_kwargs={output_sizes:{"paras":2}}
8 Y: g- I; s: a# c ), @9 X9 ?' j# g' p
% O. S {* w6 V& p5 W
#compute进行真实的计算并显示进度$ U. S" N9 X* E. _
with ProgressBar():, r9 m2 l4 a4 G" V: g
result = results.compute()
* B$ C d, R5 N$ e! g$ t' e/ u7 j
#结果冲命名保存到nc文件# N V( n; V' S" X: z
result = result.rename("norm_paras")
5 f h0 x3 N$ A1 o3 f+ } result = result.to_netcdf("norm_fit_paras.nc",compute=False)) b! U9 g. c& b( q) G
with ProgressBar():! [0 s# ?; @& F. ]; l
result.compute() 6 A$ [# _% [/ K0 @
转自:气海同途 - }" r C" o$ F u2 t
关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料 |