xarray+dask处理大规模海洋气象数据

[复制链接]

气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。

9 s" D& t k8 n+ L
% v9 e9 d U9 J; \. n; d

大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。

' H: H: k9 I( u0 F4 J ( X; P5 N& G9 c U$ e" @- k% Z8 H1 L

本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。

- h' |5 g/ |& L- Z' j 8 _2 A7 `4 ?1 |: z

数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。

7 y+ J- U* Y' ?: O/ `

其中几个关键点解释一下:

; C2 q/ ~ v' T- @0 |3 o

(1)首先定义拟合正太分布的函数

- Q3 ]; ^3 y" h% Y6 C0 i8 o
def norm_fit(data):3 y$ b, ?% J+ x. L- R' j5 {/ {; ? S4 g loc, scale = st.norm.fit(data) 4 V d. o# r5 t3 g8 y' l7 _8 k return np.array([loc, scale])
' D2 \* X7 n3 m% p. K0 n" o

这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。

& |1 O. R4 p0 g- R

(2)xarray分块

4 |" Y; N" w" `/ W- J
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) 5 U; a& N [; l# n/ W' k x = x.chunk({"lev":5, "lat":100,"lon":100})
7 D! s+ F$ K* H# e5 O, N' x; b7 R

xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。

' _% |! k; w% k

(3)xarray.apply_ufunc函数

* l( ]8 ]% B% Z {6 c; m2 |0 D0 [9 ~8 x
result = xr.apply_ufunc(norm_fit,: G. G# C0 x6 B1 { x, & j( p1 x8 U# e input_core_dims=[["time"]],! X/ b3 m. U% z0 h output_core_dims=[["paras"]],, t. \- \8 m9 t% u+ E2 B4 H8 ^9 ~+ A dask="parallelized", 7 U6 p. z6 ^% E } output_dtypes=[np.float],, K, v) Q$ F1 Q9 |4 f' _ dask_gufunc_kwargs={output_sizes:{"paras":2}}1 |" A6 z5 Z# {9 b, x6 }9 h% t )
8 Y9 {6 C% B) f3 y0 V- i

apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)

7 M" X2 v; ?9 |# k4 w0 @6 g3 s: ~, |6 e* I" W' S

以下是全部代码:

2 o. _4 p: a' k) V/ ^( O# m
from scipy import stats as st0 k6 ^2 L1 P. b4 n% t & \4 j- x6 Z$ G- u% I# o' ^1 ^ import xarray as xr1 s3 ~# q& ^4 W import dask 1 v% x3 E# \3 a6 \4 d import numpy as np ) |! X8 i V2 n- l% {# n# O from dask.diagnostics import ProgressBar : B; ~9 }4 [' G# U0 { & A7 h6 j( p5 ` u/ ] def norm_fit(data):8 ~( b% T8 }/ j" Y8 I3 L! r$ U4 e loc, scale = st.norm.fit(data)! N) t Y4 l5 y) U) p6 [2 {- n- j: H return np.array([loc, scale]) 9 ]& Q) b: k4 n2 Q $ N) v% F5 c9 h0 e( S rs = np.random.RandomState(0)' f( \, c: r( B5 E3 U x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])8 h' A |2 Q- w% e/ C- Z x = x.chunk({"lev":5, "lat":100,"lon":100}) 5 |* D5 B" W- L/ U' p3 l2 q8 [, X. k- w# ]- a- K #使用apply_ufunc计算,并用dask的并行计算 ; q- z7 \! Q* R8 n& g result = xr.apply_ufunc(norm_fit,1 e3 [3 |" ^; n8 @3 i$ p0 l x,3 b0 k2 S1 K. N5 h# g9 }' ?+ Y1 r( J input_core_dims=[["time"]],9 J- i" w& \7 b/ q output_core_dims=[["paras"]], 0 P1 m# f& `/ [1 x* j& E dask="parallelized", & s# p- ^' c. I1 Q" A& k output_dtypes=[np.float], . M7 `- u, Z! u2 B8 H dask_gufunc_kwargs={output_sizes:{"paras":2}} ]! Z& q2 i( X; `9 P) x; y )' \% U& X: a6 k* w 0 O j4 g; _# m% b9 O #compute进行真实的计算并显示进度 |+ ~8 K1 J$ a4 w with ProgressBar():+ f; |' W2 I4 X* U result = results.compute() 9 @# `: ~+ w3 T( ~' A8 j, j9 z9 @ A- h9 w9 W #结果冲命名保存到nc文件8 x7 ~ p, N2 m" n( _ result = result.rename("norm_paras") s- u$ K* j- ?. I' v; [ result = result.to_netcdf("norm_fit_paras.nc",compute=False) , I8 R% H$ s7 w* \ with ProgressBar(): 1 b+ s6 N9 ^4 x& G" Y result.compute()
3 \6 d6 y( j! f5 S& e) Z' Q% }

转自:气海同途

' E* N# C; r/ F1 S; J$ v

关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料

回复

举报 使用道具

相关帖子

全部回帖
暂无回帖,快来参与回复吧
懒得打字?点击右侧快捷回复 【吾爱海洋论坛发文有奖】
您需要登录后才可以回帖 登录 | 立即注册
各安天命
活跃在2024-12-1
快速回复 返回顶部 返回列表