百度360必应搜狗淘宝本站头条
当前位置:网站首页 > IT技术 > 正文

datax+gbload+GBase入库适配操作手顺

wptr33 2025-02-06 16:37 50 浏览

原文链接:datax+gbload+GBase入库适配操作手顺|GBASE社区|天津南大通用数据技术股份有限公司|GBASE-致力于成为用户最信赖的数据库产品供应商
更多精彩内容尽在GBASE社区|天津南大通用数据技术股份有限公司|GBASE-致力于成为用户最信赖的数据库产品供应商

一、环境准备

1、开启vsftpd服务,注意修改,默认路径(gbload_server_9.5.25版本支持sftp协议)

userlist_enable=YES -- 设定userlist 可用,一般是deny。这个不强求。如果是,将对应用户从列表注销(ftpuser、user_list)

local_root=/opt/gbload -- 设定一个默认目录。对应下面
gbload_ftp_server_base_path 这个路径。

chroot_local_user=YES -- 限制在一个目录。

allow_writeable_chroot=YES -- 这个目录可写

2、安装gccli客户端

注意修改环境变量,参考配置文件

3、dispcli (区分python2 和3 需要申请特定注意。要python某个版本编译的)

放入目录,比如 /opt/dispcli (这里里面才是dispcli文件)

4、Java环境变量 (注意将sgloader 和jdbc 也配置进去。不一定有用,配置了肯定可以)

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.131-11.b12.el7.x86_64

export PATH=.:$PATH:$JAVA_HOME/bin

export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:/opt/gbloader_test/ gbase-connector-java-8.3.81.53-build55.4.1-bin.jar:/opt/gbloader_test/sgLoader/sgloader-0.0.1.jar

二、gbloader部署

1、解压后有2个目录,api_file 、server_file ,比如放在/opt/gbload_server

2、创建目录用于放加载临时文件

mkdir -p /opt/ gbload_server /data/main

mkdir -p /opt/ gbload_server /data/copy

mkdir -p /opt/ gbload_server /data/kvstore

3、修改gbload_server的配置文件,在
/opt/gbload_server/server_file/下面,名字叫gbload_server.conf

修改的几个内容如下:

gbload_config_server_host=10.10.3.233 #就是部署本机的IP地址


gbload_config_extern_master_server=10.10.3.233 #就是部署本机的IP地址,以后如果加载机有扩展的情况下,这里写的是master 的IP地址,只有一台加载机则就是本机的IP地址


gbload_data_key_retention_time=2 #表加载临时信息的保存时间,单位位秒,设置2就行;


gbload_ftp_server_host_name=10.10.3.233 #就是部署本机的IP地址


gbload_ftp_server_user_passwd=gbase:gbase #连接本机ftp服务的用户名和密码,格式为user:passwd


gbload_ftp_server_base_path=
/opt/gbloader_test/server_file/ #本机ftp服务的主目录,用户由
gbload_ftp_server_user_passwd指定,加载服务必须对此目录有读写权限


gbload_dispatch_server_ip_port=10.10.3.235:8080 #dispatch服务的IP和端口


gbload_dispatch_client_path=
/opt/gbloader_test/dispatch_server/ #dispcli所在的路径


gbload_gnode_gbloader_path=
/opt/gbloader_test/server_file/ #gbloader所在的路径


gbload_database_server_host=10.10.3.109 #数据库节点的IP地址;需要制定本地gcluster/gnode使用的IP地址(GBase MPP Cluster的一个管理节点的IP)


gbload_file_count_per_controy_file=100000 #每个控制文件处理的数据文件个数

4、启动加载服务,在
/opt/gbload_server/server_file下执行nohup sh ./ gbload_server_monit.sh &即可,并通过ps –ef|grep gbload_server看进程是否启动

5、gbload加载测试样例

  1. 表结构

Create database testdb;

Use testdb;

CREATE TABLE "lineorder" (

"lo_orderkey" bigint(20) DEFAULT NULL,

"lo_linenumber" int(11) DEFAULT NULL,

"lo_custkey" int(11) DEFAULT NULL,

"lo_partkey" int(11) DEFAULT NULL,

"lo_suppkey" int(11) DEFAULT NULL,

"lo_orderdate" int(11) DEFAULT NULL,

"lo_orderpriority" varchar(15) DEFAULT NULL,

"lo_shippriority" varchar(1) DEFAULT NULL,

"lo_quantity" int(11) DEFAULT NULL,

"lo_extendedprice" int(11) DEFAULT NULL,

"lo_ordtotalprice" int(11) DEFAULT NULL,

"lo_discount" int(11) DEFAULT NULL,

"lo_revenue" int(11) DEFAULT NULL,

"lo_supplycost" int(11) DEFAULT NULL,

"lo_tax" int(11) DEFAULT NULL,

"lo_commitdate" int(11) DEFAULT NULL,

"lo_shipmode" varchar(10) DEFAULT NULL

) COMPRESS(5, 5) ENGINE=EXPRESS DISTRIBUTED BY('lo_orderkey');

  1. Java样例代码

以下代码用的时候,修改红色加粗的IP地址就行,就是运行gbload_server的这台机器的IP地址。

url = "jdbc:gbload://" + host + ":" + port + "/" + db + "?failoverEnable=true&hostList=" + "10.10.3.233" + "&basePath=" + sgLoader + "&sgloaderPath=" + sgLoader + "&interval=120&characterEncoding=gbk&UseOneServer=false"; // URL

import java.io.*;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.PreparedStatement;

import java.sql.SQLException;

import java.util.ArrayList;

public class Lineorder implements Runnable{

String host; // 加载服务IP地址

String port = "8585"; // 端口

String port2 = "5258";

String db; // 数据库名

String table; // 要加载的表名

int psize; // 每次提交的行数

int thdCount; //线程数

static int sleepTime = 0; //每次提交sleep的毫秒数

String sgLoader;

String fileName ;

public Lineorder(String host, String db, String table, int psize, String sgLoader, String fileName){

this.host = host;

this.db = db;

this.table = table;

this.psize = psize;

this.sgLoader = sgLoader;

this.fileName = fileName;

}

@Override

public void run() {

// TODO Auto-generated method stub

String url = "jdbc:gbload://" + host + ":" + port + "/" + db + "?failoverEnable=true&hostList=" + "10.10.3.233;10.10.3.234" + "&basePath=" + sgLoader + "&sgloaderPath=" + sgLoader + "&interval=120&characterEncoding=gbk&UseOneServer=false"; // URL

try {

Class.forName("cn.gbase.jdbc.LoadDriver").newInstance();

} catch (Exception e) {

e.printStackTrace();

return;

}

Connection conn = null;

PreparedStatement pstmt = null;

try {

System.out.println(url);

conn = DriverManager.getConnection(url, "gbase", "gbase20110531");

System.out.println("getConnection done");

} catch (SQLException e1) {

e1.printStackTrace();

return;

}

if(conn == null){

System.err.println("conn is null");

return;

}

String sql = "insert into " + table + " values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";

//String sql = "insert into lineorder (lo_orderkey,lo_linenumber,lo_custkey,lo_partkey,lo_suppkey,lo_orderdate,lo_orderpriority,lo_shippriority,lo_quantity,lo_extendedprice,lo_ordtotalprice,lo_discount,lo_revenue,lo_supplycost,lo_tax,lo_commitdate,lo_shipmode) values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";

File file = new File(fileName);

BufferedReader reader = null;

try{

reader = new BufferedReader(new FileReader(file));

String tempString = null;

String[] lines = null;

pstmt = conn.prepareStatement(sql);

String threadName = Thread.currentThread().getName();

int line = 0;

while ((tempString = reader.readLine()) != null) {

lines = tempString.split("\\|");

pstmt.setString(1, lines[0]);

pstmt.setString(2, lines[1]);

pstmt.setString(3, lines[2]);

pstmt.setString(4, lines[3]);

pstmt.setString(5, lines[4]);

pstmt.setString(6, lines[5]);

pstmt.setString(7, lines[6]);

pstmt.setString(8, lines[7]);

pstmt.setString(9, lines[8]);

pstmt.setString(10, lines[9]);

pstmt.setString(11, lines[10]);

pstmt.setString(12, lines[11]);

pstmt.setString(13, lines[12]);

pstmt.setString(14, lines[13]);

pstmt.setString(15, lines[14]);

pstmt.setString(16, lines[15]);

pstmt.setString(17, lines[16]);

pstmt.addBatch();

line++;

if(line == psize){

line = 0;

pstmt.executeBatch();

conn.commit();

pstmt.clearBatch();

}

}

pstmt.executeBatch();

conn.commit();

pstmt.clearBatch();

reader.close();

System.out.println(threadName+": ==================>LOAD END! " );

}catch(Exception e){

e.printStackTrace();

}finally{

if (reader != null) {

try {

reader.close();

} catch (IOException e) {

e.printStackTrace();

}

}

try {

if (pstmt != null)

pstmt.close();

} catch (SQLException e) {

e.printStackTrace();

}

try {

if(conn != null)

conn.close();

} catch (SQLException e) {

e.printStackTrace();

}

}

}

public static void main(String[] args) {

String host = args[0]; // 加载服务IP地址

String db = args[1]; // 数据库名

String table=args[2]; // 要加载的表名

int psize = Integer.parseInt(args[3]) ;

int thdCount = Integer.parseInt(args[4]);

String sgLoader = args[5]; //sgLoader路径

String fileName = args[6]; //加载的文件名

ArrayList loadThreads = new ArrayList(thdCount);

long beginTime = System.currentTimeMillis();

// 启动加载线程。

for(int i=1; i<=thdCount; i++){

Lineorder gbl = new Lineorder(host,db,table,psize,sgLoader,fileName);

Thread thd = new Thread(gbl);

loadThreads.add(thd);

}

for (Thread t : loadThreads) {

t.start();

}

for (Thread t : loadThreads) {

try {

t.join();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

long endTime = System.currentTimeMillis();

double useTime = (endTime - beginTime) / 1000; // 秒

//double speed = (psize * pcount * thdCount ) / useTime;

//System.out.println("rowCount: "+ psize * pcount * thdCount );

System.out.println("time: "+ useTime + " sec");

//System.out.println("speed: " + speed + " row/sec");

}

}

  1. Java代码编译及执行

编译:javac Lineorder.java

执行:java Lineorder 192.168.2.188 testdb lineorder 10000 1
/opt/gbload_server/api_file/
/opt/gbload_server/data/lineorder.tbl

执行参数说明:

Lineorder为java编译出来的class文件

192.168.2.188为gbload_server运行所在机器的IP地址;

testdb:gbase 8a MPP cluster数据库中的数据库名称;

Lineorder:表名称;

10000:每次提交的数据量,建议10000~20000,但不是绝对,根据服务器的配置灵活调整,多测试几组数据找出最有的批量提交值;

1:线程数

/opt/ gbload_server /api_file/:sgloader-0.0.1.jar所在的路径


/opt/gbload_server/data/lineorder.tbl:需要加载的数据文件

  1. 测试加载,如下图所未,gbload加载成功

三、datax部署

1、解压datax.zip到/opt目录

2、解压公司的datax_plugin_build3.0.zip解压包,对应的plugin下面有reader和writer,放到datax的plugin下对应目录下面

3、用gbload_server的
gcluster-load-api-8.5.1.2-build_20241217080553.jar包,datax的plugin里gbase8amppgbloaderwriter和gbase8amppjdbcwriter对应的jar包,保持版本一致(gbload_server高版本与datax自带jar包兼容有问题)

4、 python datax.py -r mysqlreader -w gbase8amppgbloaderwriter 可以生成样例,可以根据生成的样例进行修改,生成json任务文件

5、执行python datax.py ${name}.json

任务执行最后,统计信息

原文链接:datax+gbload+GBase入库适配操作手顺|GBASE社区|天津南大通用数据技术股份有限公司|GBASE-致力于成为用户最信赖的数据库产品供应商
更多精彩内容尽在GBASE社区|天津南大通用数据技术股份有限公司|GBASE-致力于成为用户最信赖的数据库产品供应商

相关推荐

高性能并发队列Disruptor使用详解

基本概念Disruptor是一个高性能的异步处理框架,是一个轻量的Java消息服务JMS,能够在无锁的情况下实现队列的并发操作Disruptor使用环形数组实现了类似队列的功能,并且是一个有界队列....

Disruptor一个高性能队列_java高性能队列

Disruptor一个高性能队列前言说到队列比较熟悉的可能是ArrayBlockingQueue、LinkedBlockingQueue这两个有界队列,大多应用在线程池中使用能保证线程安全,但其安全性...

谈谈防御性编程_防御性策略

防御性编程对于程序员来说是一种良好的代码习惯,是为了保护自己的程序在不可未知的异常下,避免带来更大的破坏性崩溃,使得程序在错误发生时,依然能够云淡风轻的处理,但很多程序员入行很多年,写出的代码依然都是...

有人敲门,开水开了,电话响了,孩子哭了,你先顾谁?

前言哎呀,这种情况你肯定遇到过吧!正在家里忙活着,突然——咚咚咚有人敲门,咕噜咕噜开水开了,铃铃铃电话响了,哇哇哇孩子又哭了...我去,四件事一起来,人都懵了!你说先搞哪个?其实这跟我们写Java多线...

面试官:线程池如何按照core、max、queue的执行顺序去执行?

前言这是一个真实的面试题。前几天一个朋友在群里分享了他刚刚面试候选者时问的问题:"线程池如何按照core、max、queue的执行循序去执行?"。我们都知道线程池中代码执行顺序是:co...

深入剖析 Java 中线程池的多种实现方式

在当今高度并发的互联网软件开发领域,高效地管理和利用线程资源是提升程序性能的关键。Java作为一种广泛应用于后端开发的编程语言,为我们提供了丰富的线程池实现方式。今天,就让我们深入探讨Java中...

并发编程之《彻底搞懂Java线程》_java多线程并发解决方案详解

目录引言一、核心概念:线程是什么?...

Redis怎么实现延时消息_redis实现延时任务

一句话总结Redis可通过有序集合(ZSET)实现延时消息:将消息作为value,到期时间戳作为score存入ZSET。消费者轮询用ZRANGEBYSCORE获取到期消息,配合Lua脚本保证原子性获取...

CompletableFuture真的用对了吗?盘点它最容易被误用的5个场景

在Java并发编程中,CompletableFuture是处理异步任务的利器,但不少开发者在使用时踩过这些坑——线上服务突然雪崩、异常悄无声息消失、接口响应时间翻倍……本文结合真实案例,拆解5个最容易...

接口性能优化技巧,有点硬_接口性能瓶颈

背景我负责的系统到2021年初完成了功能上的建设,开始进入到推广阶段。随着推广的逐步深入,收到了很多好评的同时也收到了很多对性能的吐槽。刚刚收到吐槽的时候,我们的心情是这样的:...

禁止使用这5个Java类,每一个背后都有一段&quot;血泪史&quot;

某电商平台的支付系统突然报警:大量订单状态异常。排查日志发现,同一笔订单被重复支付了三次。事后复盘显示,罪魁祸首竟是一行看似无害的SimpleDateFormat代码。在Java开发中,这类因使用不安...

无锁队列Disruptor原理解析_无锁队列实现原理

队列比较队列...

Java并发队列与容器_java 并发队列

【前言:无论是大数据从业人员还是Java从业人员,掌握Java高并发和多线程是必备技能之一。本文主要阐述Java并发包下的阻塞队列和并发容器,其实研读过大数据相关技术如Spark、Storm等源码的,...

线程池工具及拒绝策略的使用_线程池处理策略

线程池的拒绝策略若线程池中的核心线程数被用完且阻塞队列已排满,则此时线程池的资源已耗尽,线程池将没有足够的线程资源执行新的任务。为了保证操作系统的安全,线程池将通过拒绝策略处理新添加的线程任务。...

【面试题精讲】ArrayBlockingQueue 和 LinkedBlockingQueue 区别?

有的时候博客内容会有变动,首发博客是最新的,其他博客地址可能会未同步,认准...