气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。
% ~% [$ {1 a% s" H7 n + j. M n) I9 f7 {( F! `
大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。 . s- F7 k% w2 Y
! j+ K' k2 g/ E. l: `) J
本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。
9 ]8 f2 u- d4 X: f7 e& F. _6 X ^4 b2 d5 U# y" S4 @
数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。 4 y" S" D) E% w7 z, {0 E) f4 n
其中几个关键点解释一下: % s: ?$ U3 c8 b
(1)首先定义拟合正太分布的函数
* _: c: `+ @8 H- T! T def norm_fit(data):
9 c) t/ d# l1 X loc, scale = st.norm.fit(data)
$ g" H; n& t3 l# M5 K- [4 R return np.array([loc, scale])
7 S, o; y9 V% W) K& F+ e! g 这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。
, H( x9 {/ x9 N* \0 d& Y2 e (2)xarray分块 ( S& G, r9 q7 r
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
% b$ O4 n/ K5 n9 O; O) ~ x = x.chunk({"lev":5, "lat":100,"lon":100}) ; E1 u! u) l. o5 l! D( M
xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。 ( [4 u' _7 T# R3 l. B- }
(3)xarray.apply_ufunc函数
t5 E# }5 W1 M0 P9 f result = xr.apply_ufunc(norm_fit,
2 a" F: N% A5 j7 e x,5 w* e; q0 ?6 M m; n: X
input_core_dims=[["time"]],
( |3 f* }3 t }' u0 a output_core_dims=[["paras"]],
% {- |; }$ x9 N" r/ u% e dask="parallelized",8 ]* W, d. ^% {2 ^2 b- U p! e( b/ F
output_dtypes=[np.float],
$ T$ p* y7 d7 u- f* q( ~6 g dask_gufunc_kwargs={output_sizes:{"paras":2}}
# \( R! g" Q+ _5 g )
" B0 o7 H+ F1 ^+ | apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)
/ S/ H. I# r9 d) `2 {+ a7 |0 W( p7 u2 e5 Q1 y5 S& w% H
以下是全部代码:
0 L( ]( J% \5 F2 I; a from scipy import stats as st
& g0 L. {9 R m& H: F) r9 w: T0 v$ |6 |- J$ ?
import xarray as xr9 Q7 S# L4 A) j9 g0 J7 j. ?. J
import dask
( m7 O8 Y! V! r import numpy as np# A& ^+ V. H, H+ w" D
from dask.diagnostics import ProgressBar, i6 Q/ E% ?% w
7 X" d/ ^8 ~: C
def norm_fit(data):
4 ], ?9 v5 v$ ~) i loc, scale = st.norm.fit(data)2 p4 T. l* t4 ]& A
return np.array([loc, scale])
' @: z, v7 U0 k6 l
M0 M: A( G% M8 r$ M+ Q rs = np.random.RandomState(0)- m/ u! t+ S/ G! j1 g3 S
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])
2 D( Q. a5 p0 e, z- o x = x.chunk({"lev":5, "lat":100,"lon":100})
* ?* M2 \7 a6 g* e7 M- |8 ]$ t( H$ c* A( E+ G( D" |) S5 a: p
#使用apply_ufunc计算,并用dask的并行计算
! Q9 p9 }% G6 }, M4 x5 j9 u# q result = xr.apply_ufunc(norm_fit,
2 X0 V) j, \! c) k# d x,8 x/ a! k% b% ^0 X* f5 |9 Y6 J
input_core_dims=[["time"]],; T% G% s, q* m, S2 A7 L- Z
output_core_dims=[["paras"]],5 A) y' m0 `2 j( q
dask="parallelized",
/ J9 |4 i7 L! W/ w% b. r output_dtypes=[np.float],
: u$ i! A! K( j dask_gufunc_kwargs={output_sizes:{"paras":2}}4 }5 ~9 x* x" E% F0 T. V+ ]3 [
)% b1 K4 ?: _' `8 V* C5 O# Y4 x8 d- G" }
/ l# }7 D* R- W, ~: U) L #compute进行真实的计算并显示进度9 R2 i, |5 ?& u! z0 d" ]
with ProgressBar():
7 I# l, `* A H5 e result = results.compute()
& ` ]+ l* |. m, S. S/ X1 ]: \# N7 _0 V+ F- C7 d
#结果冲命名保存到nc文件
+ ]" d2 o$ N9 c, `, A result = result.rename("norm_paras")& F8 E) K+ |3 ~2 ~; h
result = result.to_netcdf("norm_fit_paras.nc",compute=False)
' `4 ?/ Y [; ?! |$ S with ProgressBar():
4 T+ C- w* v& c8 D3 p$ t$ ?9 L result.compute()
- {- Z( v( H' T" I! J. I% ]/ F 转自:气海同途 4 [) Q' R# T( u7 X1 ~% x7 ^6 }
关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料 |