MaxCompute创建基于python的UDF函数并引用第三方包

MaxCompute创建基于python的UDF函数并引用第三方包,例如UDF中使用pandas包及其依赖的其他第三方包。


最终实现效果

image-20221117114015971



前提条件

  1. MaxCompute集群已安装Python环境,该案例中Python是3.7版本
  2. 已安装并配置MaxCompute客户端


使用pandas包

由于集群中Python是3.7版本,所以使用pandas包版本也必须使用cp37,该案例以pandas 1.1.5版本为基础。

  1. PyPI页面的Release history区域找到1.1.5版本,单击Download files后找到文件名为pandas-1.1.5-cp37-cp37m-manylinux1_x86_64.whl的pandas包进行下载。

    image-20221117134558114

  2. 修改下载的pandas包后缀为zip格式

  3. 通过MaxCompute客户端上传pandas包至MaxCompute项目空间,如果客户端使用的是本地终端,pandas-1.1.5-cp37-cp37m-manylinux1_x86_64.zip也可以为绝对路径。

    1
    odpscmd -e "use abi_ods_dev; add archive pandas-1.1.5-cp37-cp37m-manylinux1_x86_64.zip -f"
  4. 在DataWorks的业务流程/你的业务流程名称/MaxCompute/资源/目录下创建Python脚本,此处以udf_temp_qu.py为例,详细的UDF代码结构请参见Python 3 UDF

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    from odps.udf import annotate
    import json

    @annotate("bigint, bigint->string")
    class SampleUDF(object):

    def __init__(self):
    import sys
    sys.path.insert(0, 'work/pandas-1.1.5-cp37-cp37m-manylinux1_x86_64.zip')

    def evaluate(self, arg0, arg1):
    import pandas as pd
    df = pd.DataFrame([{'arg0': arg0, 'arg1': arg1}])
    df['arg2'] = df['arg0'] + df['arg1']
    return json.dumps(df.to_dict())

  5. 保存udf_temp_qu.py并提交

    image-20221117135642065

  6. 在DataWorks的业务流程/你的业务流程名称/MaxCompute/函数/目录下创建UDF函数,此处以udf_temp_qu为例,资源列表中需要引用第5步的资源名称和pandas的资源

    image-20221117135851076

  7. 保存udf_temp_qu并提交

  8. 测试UDF函数

    1
    2
    set odps.sql.python.version=cp37;
    select udf_temp_qu(1,2);

    从测试结果来看,提示pandas包依赖的其他第三方包缺失、例如numpy等

    image-20221117140305439

  9. 到pandas的官方GitHub1.1.x文档可以看到,pandas依赖了3个第三方包,然后下载对应PyPI上下载对应版本的whl包

    image-20221117141817917

  10. 将新下载的第三方包重复执行第一步到第七步

  11. 大功告成

image-20221117142331157


最后再看一下资源和函数的配置

image-20221117142853385 image-20221117142804534