百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT技术 > 正文

datax+gbload+GBase入库适配操作手顺

wptr33 2025-02-06 16:37 24 浏览

原文链接:datax+gbload+GBase入库适配操作手顺|GBASE社区|天津南大通用数据技术股份有限公司|GBASE-致力于成为用户最信赖的数据库产品供应商
更多精彩内容尽在GBASE社区|天津南大通用数据技术股份有限公司|GBASE-致力于成为用户最信赖的数据库产品供应商

一、环境准备

1、开启vsftpd服务,注意修改,默认路径(gbload_server_9.5.25版本支持sftp协议)

userlist_enable=YES -- 设定userlist 可用,一般是deny。这个不强求。如果是,将对应用户从列表注销(ftpuser、user_list)

local_root=/opt/gbload -- 设定一个默认目录。对应下面
gbload_ftp_server_base_path 这个路径。

chroot_local_user=YES -- 限制在一个目录。

allow_writeable_chroot=YES -- 这个目录可写

2、安装gccli客户端

注意修改环境变量,参考配置文件

3、dispcli (区分python2 和3 需要申请特定注意。要python某个版本编译的)

放入目录,比如 /opt/dispcli (这里里面才是dispcli文件)

4、Java环境变量 (注意将sgloader 和jdbc 也配置进去。不一定有用,配置了肯定可以)

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.131-11.b12.el7.x86_64

export PATH=.:$PATH:$JAVA_HOME/bin

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:/opt/gbloader_test/ gbase-connector-java-8.3.81.53-build55.4.1-bin.jar:/opt/gbloader_test/sgLoader/sgloader-0.0.1.jar

二、gbloader部署

1、解压后有2个目录,api_file 、server_file ,比如放在/opt/gbload_server

2、创建目录用于放加载临时文件

mkdir -p /opt/ gbload_server /data/main

mkdir -p /opt/ gbload_server /data/copy

mkdir -p /opt/ gbload_server /data/kvstore

3、修改gbload_server的配置文件,在
/opt/gbload_server/server_file/下面,名字叫gbload_server.conf

修改的几个内容如下:

gbload_config_server_host=10.10.3.233 #就是部署本机的IP地址


gbload_config_extern_master_server=10.10.3.233 #就是部署本机的IP地址,以后如果加载机有扩展的情况下,这里写的是master 的IP地址,只有一台加载机则就是本机的IP地址


gbload_data_key_retention_time=2 #表加载临时信息的保存时间,单位位秒,设置2就行;


gbload_ftp_server_host_name=10.10.3.233 #就是部署本机的IP地址


gbload_ftp_server_user_passwd=gbase:gbase #连接本机ftp服务的用户名和密码,格式为user:passwd


gbload_ftp_server_base_path=
/opt/gbloader_test/server_file/ #本机ftp服务的主目录,用户由
gbload_ftp_server_user_passwd指定,加载服务必须对此目录有读写权限


gbload_dispatch_server_ip_port=10.10.3.235:8080 #dispatch服务的IP和端口


gbload_dispatch_client_path=
/opt/gbloader_test/dispatch_server/ #dispcli所在的路径


gbload_gnode_gbloader_path=
/opt/gbloader_test/server_file/ #gbloader所在的路径


gbload_database_server_host=10.10.3.109 #数据库节点的IP地址;需要制定本地gcluster/gnode使用的IP地址(GBase MPP Cluster的一个管理节点的IP)


gbload_file_count_per_controy_file=100000 #每个控制文件处理的数据文件个数

4、启动加载服务,在
/opt/gbload_server/server_file下执行nohup sh ./ gbload_server_monit.sh &即可,并通过ps –ef|grep gbload_server看进程是否启动

5、gbload加载测试样例

  1. 表结构

Create database testdb;

Use testdb;

CREATE TABLE "lineorder" (

"lo_orderkey" bigint(20) DEFAULT NULL,

"lo_linenumber" int(11) DEFAULT NULL,

"lo_custkey" int(11) DEFAULT NULL,

"lo_partkey" int(11) DEFAULT NULL,

"lo_suppkey" int(11) DEFAULT NULL,

"lo_orderdate" int(11) DEFAULT NULL,

"lo_orderpriority" varchar(15) DEFAULT NULL,

"lo_shippriority" varchar(1) DEFAULT NULL,

"lo_quantity" int(11) DEFAULT NULL,

"lo_extendedprice" int(11) DEFAULT NULL,

"lo_ordtotalprice" int(11) DEFAULT NULL,

"lo_discount" int(11) DEFAULT NULL,

"lo_revenue" int(11) DEFAULT NULL,

"lo_supplycost" int(11) DEFAULT NULL,

"lo_tax" int(11) DEFAULT NULL,

"lo_commitdate" int(11) DEFAULT NULL,

"lo_shipmode" varchar(10) DEFAULT NULL

) COMPRESS(5, 5) ENGINE=EXPRESS DISTRIBUTED BY('lo_orderkey');

  1. Java样例代码

以下代码用的时候,修改红色加粗的IP地址就行,就是运行gbload_server的这台机器的IP地址。

url = "jdbc:gbload://" + host + ":" + port + "/" + db + "?failoverEnable=true&hostList=" + "10.10.3.233" + "&basePath=" + sgLoader + "&sgloaderPath=" + sgLoader + "&interval=120&characterEncoding=gbk&UseOneServer=false"; // URL

import java.io.*;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.PreparedStatement;

import java.sql.SQLException;

import java.util.ArrayList;

public class Lineorder implements Runnable{

String host; // 加载服务IP地址

String port = "8585"; // 端口

String port2 = "5258";

String db; // 数据库名

String table; // 要加载的表名

int psize; // 每次提交的行数

int thdCount; //线程数

static int sleepTime = 0; //每次提交sleep的毫秒数

String sgLoader;

String fileName ;

public Lineorder(String host, String db, String table, int psize, String sgLoader, String fileName){

this.host = host;

this.db = db;

this.table = table;

this.psize = psize;

this.sgLoader = sgLoader;

this.fileName = fileName;

}

@Override

public void run() {

// TODO Auto-generated method stub

String url = "jdbc:gbload://" + host + ":" + port + "/" + db + "?failoverEnable=true&hostList=" + "10.10.3.233;10.10.3.234" + "&basePath=" + sgLoader + "&sgloaderPath=" + sgLoader + "&interval=120&characterEncoding=gbk&UseOneServer=false"; // URL

try {

Class.forName("cn.gbase.jdbc.LoadDriver").newInstance();

} catch (Exception e) {

e.printStackTrace();

return;

}

Connection conn = null;

PreparedStatement pstmt = null;

try {

System.out.println(url);

conn = DriverManager.getConnection(url, "gbase", "gbase20110531");

System.out.println("getConnection done");

} catch (SQLException e1) {

e1.printStackTrace();

return;

}

if(conn == null){

System.err.println("conn is null");

return;

}

String sql = "insert into " + table + " values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";

//String sql = "insert into lineorder (lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority,lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount,lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";

File file = new File(fileName);

BufferedReader reader = null;

try{

reader = new BufferedReader(new FileReader(file));

String tempString = null;

String[] lines = null;

pstmt = conn.prepareStatement(sql);

String threadName = Thread.currentThread().getName();

int line = 0;

while ((tempString = reader.readLine()) != null) {

lines = tempString.split("\\|");

pstmt.setString(1, lines[0]);

pstmt.setString(2, lines[1]);

pstmt.setString(3, lines[2]);

pstmt.setString(4, lines[3]);

pstmt.setString(5, lines[4]);

pstmt.setString(6, lines[5]);

pstmt.setString(7, lines[6]);

pstmt.setString(8, lines[7]);

pstmt.setString(9, lines[8]);

pstmt.setString(10, lines[9]);

pstmt.setString(11, lines[10]);

pstmt.setString(12, lines[11]);

pstmt.setString(13, lines[12]);

pstmt.setString(14, lines[13]);

pstmt.setString(15, lines[14]);

pstmt.setString(16, lines[15]);

pstmt.setString(17, lines[16]);

pstmt.addBatch();

line++;

if(line == psize){

line = 0;

pstmt.executeBatch();

conn.commit();

pstmt.clearBatch();

}

}

pstmt.executeBatch();

conn.commit();

pstmt.clearBatch();

reader.close();

System.out.println(threadName+": ==================>LOAD END! " );

}catch(Exception e){

e.printStackTrace();

}finally{

if (reader != null) {

try {

reader.close();

} catch (IOException e) {

e.printStackTrace();

}

}

try {

if (pstmt != null)

pstmt.close();

} catch (SQLException e) {

e.printStackTrace();

}

try {

if(conn != null)

conn.close();

} catch (SQLException e) {

e.printStackTrace();

}

}

}

public static void main(String[] args) {

String host = args[0]; // 加载服务IP地址

String db = args[1]; // 数据库名

String table=args[2]; // 要加载的表名

int psize = Integer.parseInt(args[3]) ;

int thdCount = Integer.parseInt(args[4]);

String sgLoader = args[5]; //sgLoader路径

String fileName = args[6]; //加载的文件名

ArrayList loadThreads = new ArrayList(thdCount);

long beginTime = System.currentTimeMillis();

// 启动加载线程。

for(int i=1; i<=thdCount; i++){

Lineorder gbl = new Lineorder(host,db,table,psize,sgLoader,fileName);

Thread thd = new Thread(gbl);

loadThreads.add(thd);

}

for (Thread t : loadThreads) {

t.start();

}

for (Thread t : loadThreads) {

try {

t.join();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

long endTime = System.currentTimeMillis();

double useTime = (endTime - beginTime) / 1000; // 秒

//double speed = (psize * pcount * thdCount ) / useTime;

//System.out.println("rowCount: "+ psize * pcount * thdCount );

System.out.println("time: "+ useTime + " sec");

//System.out.println("speed: " + speed + " row/sec");

}

}

  1. Java代码编译及执行

编译:javac Lineorder.java

执行:java Lineorder 192.168.2.188 testdb lineorder 10000 1
/opt/gbload_server/api_file/
/opt/gbload_server/data/lineorder.tbl

执行参数说明:

Lineorder为java编译出来的class文件

192.168.2.188为gbload_server运行所在机器的IP地址;

testdb:gbase 8a MPP cluster数据库中的数据库名称;

Lineorder:表名称;

10000:每次提交的数据量,建议10000~20000,但不是绝对,根据服务器的配置灵活调整,多测试几组数据找出最有的批量提交值;

1:线程数

/opt/ gbload_server /api_file/:sgloader-0.0.1.jar所在的路径


/opt/gbload_server/data/lineorder.tbl:需要加载的数据文件

  1. 测试加载,如下图所未,gbload加载成功

三、datax部署

1、解压datax.zip到/opt目录

2、解压公司的datax_plugin_build3.0.zip解压包,对应的plugin下面有reader和writer,放到datax的plugin下对应目录下面

3、用gbload_server的
gcluster-load-api-8.5.1.2-build_20241217080553.jar包,datax的plugin里gbase8amppgbloaderwriter和gbase8amppjdbcwriter对应的jar包,保持版本一致(gbload_server高版本与datax自带jar包兼容有问题)

4、 python datax.py -r mysqlreader -w gbase8amppgbloaderwriter 可以生成样例,可以根据生成的样例进行修改,生成json任务文件

5、执行python datax.py ${name}.json

任务执行最后,统计信息

原文链接:datax+gbload+GBase入库适配操作手顺|GBASE社区|天津南大通用数据技术股份有限公司|GBASE-致力于成为用户最信赖的数据库产品供应商
更多精彩内容尽在GBASE社区|天津南大通用数据技术股份有限公司|GBASE-致力于成为用户最信赖的数据库产品供应商

相关推荐

开发者必看的八大Material Design开源项目

MaterialDesign是介于拟物和扁平之间的一种设计风格,自从它发布以来,便引起了很多开发者的关注,在这里小编介绍在Android开发者当中里最受青睐的八个MaterialDesign开源项...

另类插这么可爱,一定是…(另类t恤)

IT之家(www.ithome.com):另类插图:这么可爱,一定是…OSXMavericks和Yosemite打破了苹果对Mac操作系统传统的命名方式,使用加州的某些标志性景点来替换猫...

Android常用ADB命令(安卓adb工具是什么)

杀死应用①根据包名获取APP的PIDadbshellps|grep应用包名②执行kill命令...

微软Mac版PowerPoint测试Reading Order Pane功能

IT之家5月20日消息,微软公司昨日(5月19日)发布博文,邀请Microsoft365Insiders成员,测试macOS新版PowerPoint演示文稿应用,重点引入...

Visual Studio跨平台开发实战(4):Xamarin Android控制项介绍

前言不同于iOS,Xamarin在VisualStudio中针对Android,可以直接设计使用者界面.在本篇教学文章中,笔者会针对Android的专案目录结构以及基本控制项进行介绍,包...

用云存储30分钟快速搭建APP,你信吗?

背景不管你承认与否,移动互联的时代已经到来,这是一个移动互联的时代,手机已经是当今世界上引领潮流的趋势,大型的全球化企业和中小企业都把APP程序开发纳入到他们的企业发展策略当中。但随着手机APP上传的...

谷歌P图神器来了!不用学不用教,输入一句话,分分钟给结果

Pine发自凹非寺量子位|公众号QbitAI当你拍照片时,“模特不好好配合”怎么办?...

iOS文本编辑控件UITextField和UITextVie

记录一个菜鸟的IOS学习之旅,如能帮助正在学习的你,亦枫不胜荣幸;如路过的大神如指教几句,亦枫感激涕淋!细心的朋友可能已经注意到了,IOS学习之旅系列教程在本篇公众号的文章中,封面已经换成美女图片了,...

Android入门图文教程集锦(android 入门教程)

Android入门视频教程集锦AndroidStudio错误gradientandroid:endXattributenotfound...

如何使用Android自定义复合视图(如何使用android自定义复合视图)

在最近的一个客户应用中,我遇到了一个需求,根据选定的值来生成指定数量的编辑框字段,这样用户可以输入人物信息。最初我的想法是把这些逻辑放到Fragment中,只是根据选中值的变化来向线性布局容器中增加编...

原生安卓开发app的框架frida常用关键代码定位

前言有时候可能会对APP进行字符串加密等操作,这样的话你的变量名等一些都被混淆了,看代码就可能无从下手...

教程10 | 三分钟搞定一个智能输入法程序

一案例描述1、考核知识点网格布局线性布局样式和主题Toast2、练习目标掌握网格布局的使用掌握Toast的使用掌握线性布局的使用...

(Android 8.1) 功能与新特性(android的功能)

和你一起终身学习,这里是程序员AndroidAndroid8.1(API级别27)为用户和开发人员引入了各种新特性和功能。本文档重点介绍了开发人员的新功能。通过本章阅读,您将获取到以下内容:Andr...

怎样设置EditText内部文字被锁定不可删除和修改

在做项目的时候,我曾经遇到过这样的要求,就是跟百度贴吧客户端上的一样,在回复帖子的时候,在EditText中显示回复人的名字,而且这个名字不可以修改和删除,说白了就是不可操作,只能在后面输入内容。在E...

如何阻止 Android 活动启动时 EditText 获得焦点

技术背景在Android开发中,当活动启动时,EditText有时会自动获得焦点并弹出虚拟键盘,这可能不是用户期望的行为。为了提升用户体验,我们需要阻止...