JAVA实现访问ODPS表导出数据


获取调用SDK授权账号

  • 1.用户账号登陆阿里云平台网址
  • 2.按照如下步骤获取对应的授权账号 key和秘钥
  • 3.获取项目名称和endpoint

如下图所示, 随意新建一个临时查询,点击执行后查看日志中的log view


  • 参数h=xxxx (xxxx 即为endpoint)
  • 参数p=yyyy (yyyy 即为项目名称)


调用SDK访问数据中台访问表

  • 1.新增一个maven项目
mvn archetype:generate \
-DgroupId=cn.pingan \
-DartifactId=test_odps \
-DarchetypeArtifactId=maven-archetype-quickstart \
-DarchetypeCatalog=local \
-DinteractiveMode=false


  • 2.根据需要在 pom.xml 中添加依赖包
<dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>odps-sdk-core</artifactId>
    <version>0.37.9-public</version>
</dependency>

<!--<dependency>-->
    <!--<groupId>com.aliyun.odps</groupId>-->
    <!--<artifactId>odps-sdk-commons</artifactId>-->
    <!--<version>0.37.9-public</version>-->
<!--</dependency>-->

<!--<dependency>-->
    <!--<groupId>com.aliyun.odps</groupId>-->
    <!--<artifactId>odps-sdk-udf</artifactId>-->
    <!--<version>0.37.9-public</version>-->
<!--</dependency>-->

<!--<dependency>-->
    <!--<groupId>com.aliyun.odps</groupId>-->
    <!--<artifactId>odps-sdk-mapred</artifactId>-->
    <!--<version>0.37.9-public</version>-->
<!--</dependency>-->

<!--<dependency>-->
    <!--<groupId>com.aliyun.odps</groupId>-->
    <!--<artifactId>odps-sdk-graph</artifactId>-->
    <!--<version>0.37.9-public</version>-->
<!--</dependency>-->
  • 3.小于10000条记录数据读取

tasksql通过sql查询,对记录数有10000条的限制

import com.aliyun.odps.Instance;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.task.SQLTask;
import java.util.List;

public class TaskSql {
    private static final String accessId = "<your key>";
    private static final String accessKey = "<your secret>";
    private static final String endPoint = "";
    private static final String project = "";
    private static final String sql = "select 1 id,'test' as name union all select 2 id,'tasksql' as name;";

    public static void main(String[] args) {
        Account account = new AliyunAccount(accessId, accessKey);
        Odps odps = new Odps(account);
        odps.setEndpoint(endPoint);
        odps.setDefaultProject(project);
        Instance i;
        try {
            i = SQLTask.run(odps, sql);
            i.waitForSuccess();
            List<Record> records = SQLTask.getResult(i);
            for(Record r:records){
                System.out.println(
                    r.get(0).toString()+","+
                    r.get(1).toString()+","+
                    r.getString("name")
                );

            }
        } catch (OdpsException e) {
            e.printStackTrace();
        }
    }
}

4.配合Tunnel,解决大批量数据导出

import com.aliyun.odps.Instance;
import com.aliyun.odps.Odps;
import com.aliyun.odps.OdpsException;
import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.data.RecordReader;
import com.aliyun.odps.task.SQLTask;
import com.aliyun.odps.tunnel.TableTunnel;
import com.aliyun.odps.tunnel.TableTunnel.DownloadSession;
import com.aliyun.odps.tunnel.TunnelException;
import java.io.IOException;
import java.util.UUID;

/**
 * Created by darebeat on 2021/7/11.
 */
public class TunnelSql {
    private static final String accessId = "<your key>";
    private static final String accessKey = "<your secret>";
    private static final String endPoint = "";
    private static final String project = "";
    private static final String sql = "select '1' id,'test' as name union all select '2' id,'tasksql' as name;";

    private static final String table = "t_export_" + UUID.randomUUID().toString().replace("-", "_");//此处使用随机字符串作为临时导出存放数据的表的名字。
    private static final Odps odps = getOdps();

    public static void main(String[] args) {
        System.out.println(table);
        runSql();
        tunnel();
    }
    /*
     * 下载SQLTask的结果。
     * */
    private static void tunnel() {
        TableTunnel tunnel = new TableTunnel(odps);
        try {
            DownloadSession downloadSession = tunnel.createDownloadSession(project, table);
            System.out.println("Session Status is : "+ downloadSession.getStatus().toString());
            long count = downloadSession.getRecordCount();
            System.out.println("RecordCount is: " + count);
            RecordReader recordReader = downloadSession.openRecordReader(0, count);
            Record record;
            while ((record = recordReader.read()) != null) {
                consumeRecord(record, downloadSession.getSchema());
            }
            recordReader.close();
        } catch (TunnelException e) {
            e.printStackTrace();
        } catch (IOException e1) {
            e1.printStackTrace();
        }
    }
    /*
     * 保存这条数据。
     * 如果数据量少,可以直接打印结果后拷贝。实际使用场景下也可以使用JAVA.IO写到本地文件,或在远端服务器上保存数据结果。
     * */
    private static void consumeRecord(Record record, TableSchema schema) {
        // TODO do your custom data export operation
        System.out.println(record.getString("id")+","+record.getString("name"));
    }
    /*
     * 运行SQL ,把查询结果保存成临时表,方便后续用Tunnel下载。
     * 保存数据的lifecycle此处设置为1天,如果删除步骤失败,也不会浪费过多存储空间。
     * */
    private static void runSql() {
        Instance i;
        StringBuilder sb = new StringBuilder("create table ").append(table)
                .append(" lifecycle 1 as ").append(sql);
        try {
            System.out.println(sb.toString());
            i = SQLTask.run(getOdps(), sb.toString());
            i.waitForSuccess();
        } catch (OdpsException e) {
            e.printStackTrace();
        }
    }
    /*
     * 初始化MaxCompute的连接信息。
     * */
    private static Odps getOdps() {
        Account account = new AliyunAccount(accessId, accessKey);
        Odps odps = new Odps(account);
        odps.setEndpoint(endPoint);
        odps.setDefaultProject(project);
        return odps;
    }
}

文章作者: darebeat
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 darebeat !
  目录