yuan00yu 2019-06-27
背景
PyODPS DataFrame 提供了类似 pandas 的接口,来操作 ODPS 数据,同时也支持在本地使用 pandas,和使用数据库来执行。
PyODPS DataFrame 除了支持类似 pandas 的 map 和 apply 方法,也提供了 MapReduce API 来扩展 pandas 语法以适应大数据环境。
PyODPS 的自定义函数是序列化到 MaxCompute 上执行的,MaxCompute 的 Python 环境只包含了 numpy 这一个第三方包,用户常常问的问题是,如何在自定义函数里使用 pandas、scipy 或者 scikit-learn 这样的包含c代码的库?
现在,MaxCompute 在 sprint 27 及更高版本的 isolation,让在自定义函数中使用这些包成为可能。同时,
PyODPS也需要至少0.7.4版本 。接下来我会详细介绍使用步骤。
步骤
上传第三方包(只需做一次)
这个步骤只需要做一次,当 MaxCompute 资源里有了这些包,这一步直接跳过。
现在这些主流的 Python 包都提供了 whl 包,提供了各平台包含二进制文件的包,因此找到能在 MaxCompute 上能运行的包是第一步。
其次,要想在 MaxCompute 上运行,需要包含所有的依赖包,这个是比较繁琐的。我们可以看下各个包的依赖情况(删除表示已经包含)
包名 依赖
pandas numpy, python-dateutil, pytz, six scipy numpy scikit-learn numpy, scipy
所以,我们一共需要上传 python-dateutil、pytz、pandas、scipy、sklearn、six 这六个包,就能保证 pandas、scipy 和 scikit-learn 可用。
我们直接通过 http://mirrors.aliyun.com/pyp... 来找包。首先是 python-dateutils:http://mirrors.aliyun.com/pyp... 。我们找到最新版,这个包是纯 Python 的包,我们找到最新版的 zip 包,python-dateutil-2.6.0.zip,下载。
重命名为 python-dateutil.zip,通过 MaxCompute Console 上传资源。
add archive python-dateutil.zip;
pytz 一样,找到 pytz-2017.2.zip。上传不表。
six 找到 six-1.11.0.tar.gz。
接下来,是pandas,对于这种包含c的包,我们一定要找 名字中包含cp27-cp27m-manylinux1_x86_64 的whl包,这样才能在 MaxCompute 上正确执行。因此,这样我们找到最新版的包是:pandas-0.20.2-cp27-cp27m-manylinux1_x86_64.whl。
这里我们把后缀改成zip,上传。
add archive pandas.zip;
其他包也是一样,因此,我们把它们都列出来:
包名 文件名 上传资源名
python-dateutil python-dateutil-2.6.0.zip python-dateutil.zip pytz pytz-2017.2.zip pytz.zip six six-1.11.0.tar.gz six.tar.gz pandas pandas-0.20.2-cp27-cp27m-manylinux1_x86_64.zip pandas.zip scipy scipy-0.19.0-cp27-cp27m-manylinux1_x86_64.zip scipy.zip scikit-learn scikit_learn-0.18.1-cp27-cp27m-manylinux1_x86_64.zip sklearn.zip
至此,全部包上传都已完成。
当然,我们全部上传也可以使用 PyODPS 的资源上传接口来完成,同样只需要操作一遍即可。至于用哪个,看个人喜好了。
编写代码验证
我们写一个简单的函数,里面用到了所有的库,最好是在函数里来 import 这些第三方库。
def test(x): from sklearn import datasets, svm from scipy import misc import numpy as np iris = datasets.load_iris() assert iris.data.shape == (150, 4) assert np.array_equal(np.unique(iris.target), [0, 1, 2]) clf = svm.LinearSVC() clf.fit(iris.data, iris.target) pred = clf.predict([[5.0, 3.6, 1.3, 0.25]]) assert pred[0] == 0 assert misc.face().shape is not None return x
这段代码只是示例,目标是用到以上所说的所有的包。
写完函数后,我们写一个简单的 map,记住, 运行时要确保 isolation 打开 ,如果不在 project 级别打开,可以在运行时打开,一个可以设置全局的选项:
from odps import options options.sql.settings = {'odps.isolation.session.enable': True}
也可以在 execute 方法上指定本次执行打开 isolation。
同样,我们可以在全局通过 options.df.libraries 指定用到的包,也可以在 execute 时指定。这里,我们要指定所有的包,包括依赖。下面就是调用刚刚定义的函数的例子。
hints = { 'odps.isolation.session.enable': True } libraries = ['python-dateutil.zip', 'pytz.zip', 'six.tar.gz', 'pandas.zip', 'scipy.zip', 'sklearn.zip'] iris = o.get_table('pyodps_iris').to_df() print iris[:1].sepal_length.map(test).execute(hints=hints, libraries=libraries)
可以看到,我们的函数顺利执行。
总结
对于要用到的第三方库及其依赖,如果已经上传,则可以直接编写代码,并指定用到的 libraries 即可;否则,需要按照教程上传第三方库。
可以看到,当第一步上传包做过后,以后每次使用都是优雅的,只需指定 libraries 就可以了。
原文链接
计算的时候总共分3步,1到2是第二组......lower: i. 这组数据中的小值 higher: j. 这组数据中的大值,fraction 是第三步中的小数部分,意思是当前这组数据的0到1的分位数
Series是一种类似于一维数组的对象,由一组数据以及一组与之对应的索引组成。 index: 索引序列,必须是唯一的,且与数据的长度相同. 如果没有传入索引参数,则默认会自动创建一个从0~N的整数索引