收藏本站 劰载中...网站公告 | 吾爱海洋论坛交流QQ群:835383472

xarray+dask处理大规模海洋气象数据

[复制链接]

气象海洋领域,常常会涉及到大规模的数据处理,比如高时空分辨率的模式数据、雷达数据和卫星数据等。

9 @# ^5 K* u9 J* Z8 o7 l3 ~& {
, m: Q" \# V& C* P, i+ g

大部分情况,我们一般只会涉及小规模数据处理,对计算效率并不会太过追求。但是当数据量变大时,低效的数据处理所耗费的时间非常明显,因此高效的数据处理方式尤为重要。

/ u" H9 q& t0 z- v" q. ` 9 u+ l& Q' r- C4 S- ]% G7 Q& d

本篇以拟合一个高维数据的正太分布参数为例,介绍如何使用xarray+dask加速数据处理。

P" Y. m4 T0 ?7 \/ C8 o) V " m7 }$ K' x2 [

数据维度X[time, lev, lat, lon],需要对三维空间每一点,沿着时间维度做正太分布拟合(正太分布拟合只是作为例子,这里可以定义你需要的操作函数)。

/ k* b# w- T& \: Y+ j* @

其中几个关键点解释一下:

0 n+ F" p# T1 Z% x0 W

(1)首先定义拟合正太分布的函数

" v6 n; q# n; K+ V
def norm_fit(data):% Q/ S; a, d v2 J( v. j5 t% v loc, scale = st.norm.fit(data)' }9 T! a# Z" e return np.array([loc, scale])
( f& n% A' m3 o8 K% w

这里需要注意的是,拟合的函数,输出参数,需要打包为一个数组。并且它的维度需要和后面aplly_ufun定义的输出维度一致。

9 W) T1 ?. B8 J. V! A7 _/ n) }4 D$ T

(2)xarray分块

& {% d* b5 K7 I. N) E+ A% V& r: e
x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"]) , q" P9 x3 W: i' K x = x.chunk({"lev":5, "lat":100,"lon":100})
9 @8 I J& _8 _. g1 v3 q8 ^

xarray结合dask可以将一个大型数组分成一个个数据块(chunk),需要注意的是我们需要沿着时间维操作,拟合需要整个时间维度的数据,因此时间维time不能分块,只能对其他维度分块。

) g4 v& C/ d7 L! P

(3)xarray.apply_ufunc函数

4 I9 W- R3 x; a7 X+ K' o9 b, @
result = xr.apply_ufunc(norm_fit,6 e6 q4 x6 Q: D x, $ R0 h e) c0 b" H' L; L& ^' U input_core_dims=[["time"]], & x0 ~- p6 f$ n% w9 e& @9 X+ A output_core_dims=[["paras"]], , C" _% E2 I+ t* s, C; S( S4 _ dask="parallelized", : s8 e: Z9 [! f/ f4 ] output_dtypes=[np.float], * p# C m( [) s8 d dask_gufunc_kwargs={output_sizes:{"paras":2}} 5 b' E! d% M" n$ g )
0 b/ h; |; S. s

apply_ufunc函数具体可以参考官网教程,这里只说使用时遇到的困难,即如何定义输出维度:输出维度是用output_core_dim定义的,将输出的拟合参数(期望和标准差)定义为paras维度,维度的大小为2,通过output_sizes参数设置。这样我们输入[time, lev, lat, lon]的数据,在每个空间点对时间维度拟合之后,输出的数据为[lev, lat, lon, paras]。(PS: 这里感谢深雨露大佬的指点)

) t% E+ L6 v+ b( [9 _ ( J4 r5 h( h, z/ a

以下是全部代码:

9 |* D) ^5 [( l& a3 W
from scipy import stats as st 0 [( Y* T6 J- Y7 s2 }0 E 3 o! H* @% q/ o, Q2 \ t import xarray as xr" Y/ c! t+ w; r import dask + E8 W, N5 Y5 `, x0 |8 e import numpy as np3 C: e5 u5 \1 b! X% ^6 z from dask.diagnostics import ProgressBar 6 H6 t& ~1 H3 G, X + q% z* n0 ^4 i' X3 J! v$ B! g def norm_fit(data):, a7 l2 W( D( |# O+ b v/ z- A; c4 e loc, scale = st.norm.fit(data) : o* S# L# n% V( Q6 F/ p return np.array([loc, scale]) + |& W" } d) r ( G$ g- ~0 H* m/ Y& H6 b4 ^9 t rs = np.random.RandomState(0) 0 W4 G7 [& J a |4 O5 O: u8 F" s x = xr.DataArray(rs.randn(500, 20, 1500, 1500), dims=["time", "lev", "lat","lon"])+ A/ q Y$ P8 W, } x = x.chunk({"lev":5, "lat":100,"lon":100})+ W8 ]! Z" }' q6 h - R& e" ]2 _6 p2 d7 ] #使用apply_ufunc计算,并用dask的并行计算2 J/ S% `! C, y R8 \ result = xr.apply_ufunc(norm_fit, 3 b2 X1 K0 t5 |1 X# I7 w x,0 a0 P6 F+ ~- m: a% _ input_core_dims=[["time"]],% z z, D5 j' @ output_core_dims=[["paras"]], " [& W. D {* F6 z6 D" O$ e dask="parallelized",3 V$ d5 p/ Y* Z/ r% |7 p output_dtypes=[np.float], 7 t& c: ?/ ]* f7 y' W! g w. N dask_gufunc_kwargs={output_sizes:{"paras":2}}( P- ~9 L- \2 w I )% n; U) Y2 N* U8 c$ v7 W3 q" S 1 N6 ~4 k q8 }' S2 A# D$ U* z #compute进行真实的计算并显示进度 ' L: I* i# g2 Q& F3 ? with ProgressBar(): ' U; d* U! V2 R" y0 q5 J+ x result = results.compute() 4 i" j3 m$ Q4 X7 R6 w, z6 m# H9 i( w; [7 s$ \/ ]! q$ V" Q #结果冲命名保存到nc文件 8 F, {/ j( E9 y9 q result = result.rename("norm_paras") : q2 m% |3 y' Q& l! M result = result.to_netcdf("norm_fit_paras.nc",compute=False)3 f0 T! u5 K# h- I* _* Z with ProgressBar():( a# n' Z0 G# Q( N2 k result.compute()
: g: p( c/ X2 w# K6 L$ f

转自:气海同途

8 I) j" a! k2 o/ M

关注【Ai尚研修科研技术平台】公众号,查看更多课程安排及免费数据资料

回复

举报 使用道具

相关帖子

全部回帖
暂无回帖,快来参与回复吧
懒得打字?点击右侧快捷回复 【吾爱海洋论坛发文有奖】
您需要登录后才可以回帖 登录 | 立即注册
各安天命
活跃在昨天 16:42
快速回复 返回顶部 返回列表