在kettle 抽取mongodb中转换mongodb数据,组内存超过限制怎么办

Kettle是一款国外开源的ETL工具,纯java编写,可以在Window、Linux、Unix上运行,数据抽取高效稳定。作为一个数据抽取工具,在建立数据仓库的过程中,免不了需要从不同的数据源中抽取到集中的数据仓库中,下面就拿mongodb抽取到mysql为例,简述抽取数据的步骤:1. 新建“转换”:test2. 在test中新建“MongoDB Input”,填写数据源信息等,其中有一点比较重要,fields标签页中,取消选中“Output single JSON field”。如下图:![image](http://firewarm.coding.me/images/kettle/kettle-mongo-mysql.png)3. 在test中新建“字段选择”,选择要转换的字段,如下图![image](http://firewarm.coding.me/images/kettle/kettle-mongo-mysql2.png)4. 新建“插入/更新”,选择mysql的数据源、表等,选择字段对应关系![image](http://firewarm.coding.me/images/kettle/kettle-mongo-mysql3.png)5. 然后依次画上相关的连线即可![image](http://firewarm.coding.me/images/kettle/kettle-mongo-mysql4.png)6. 然后执行处理即可
本文已收录于以下专栏:
相关文章推荐
kettle从MySQL中读取数据并写入MongoDB的教程
1、从MySQL中读取数据
1.1创建“数据库连接”
点击“DB连接”,
会弹出如下页面。填写连接MySQL的信息,点击“测试”,查看...
最近有一个调优的项目设计到MongoDB数据转移到Mysql库进行数据分析。
以下是利用kettle对数据的转换的介绍:
1:MongoDb的查询:
  主要是基于json格式:具体的查询可以见连接M...
1.源数据库新增一条记录,目标库同时新增一条记录;
2.源数据库修改一条记录,目标库同时修改该条记录;
示例用到三个Kettle组件
下面详细说下每个组件的配...
如果你在做cxf,并且你想做全注销
java.lang.AbstractMethodError: javax.ws.rs.core.UriBuilder.uri(Ljava/lang/S)Ljavax/ws/rs...
我在生成text响应是是没有问题的:
@Path(&myresource&)
public class MyResource {
@Produces(&text/plain&)
先把 spring搞起来吧:
Jersey与Spring集成时报的错误:
java.lang.AbstractMethodError: javax.ws.rs.core.UriBuilder.uri(Ljava/lang/Stri...
java.lang.AbstractMethodError: javax.ws.rs.core.UriBuilder.uri(Ljava/lang/S)Ljavax/ws/rs/core/...
他的最新文章
讲师:AI100
讲师:谢梁
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)关注51Testing
使用Kettle增量抽取MongoDB数据实践
发表于: 10:20 &作者:JC5353 & 来源:51Testing软件测试网采编
推荐标签:
  需求: 增量抽取MongoDB数据并加载到MSSQL  由于不能使用关系型的自定义, 所以主要遇到的问题有:  1、增量时间的查询和参数控制  2、ETL的批次信息和调用参数的写入  第一个问题的解决如下:  使用命名参数在Query页中进行过滤, 一开始会担心${}的引用方式会用Mongo的语法冲突, 后发现运行正常  第二个问题:  先为结果增加常量值, 如常量值固定则直接写死, 不固定的常量值先设置为空串, 在后面使用字符串替换组件传入命名参数, 最后用字段选择把空串的常量值移除
搜索风云榜
51Testing官方微信
51Testing官方微博
测试知识全知道2892人阅读
Oracle与数据处理(43)
使用Kettle抽取MongoDB的数据到Oracle数据等关系型数据库中还是比较简单的,因为Mongo中存储的都是json格式的数据,所以取出数据后按照json的格式拆分出来就行啦。
整体流程图:
1.获取MongoDB数据,各个选项卡填入对应的值即可:
2.字段选择,获取到的字段并不是都需要的,最好加上字段选择,只选择需要的字段
3.表输出,即可连接数据库,按照对应的映射关系,同步数据了。
以上便是MongoDB同步数据到关系型数据库的kettle操作方式了,我的kettle版本的5.3,较低版本可能没有第一个组件的。
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:35591次
排名:千里之外
原创:52篇
(3)(2)(2)(3)(1)(2)(2)(4)(1)(2)(3)(1)(3)(2)(6)(1)(2)(4)(2)(5)(3)
(window.slotbydup = window.slotbydup || []).push({
id: '4740887',
container: s,
size: '250,250',
display: 'inlay-fix'3766人阅读
NOSQL(49)
MongoDB数据量大于2亿后遇到的问题 及原因分析
一、数据增长情况
&& &每月增长量最大达到了1.9亿,每天增长约300W-500W
&& &(增长数据具体可看页尾)
二、遇到的情况及解决方法
&& &1.数据量过大,并且都集中在一个表,所以此表数据插入变慢。
&& &&& &表索引越多越明显,
&& &&& &优化处理方法:
&& &&& &&& &1.优化索引,以前的startTime日期字段索引,
&& &&& &&& &修改为客户端用日期生成ObjectId,再用_id 来进行查找。
&& &&& &&& &2.TraceId 字段(一个TraceId 对应多条记录)计划也删除,后面再用ES 系统先查询到_id 后,
&& &&& &&& &再从mongoDB 进行查找。
&& &&& &原因分析:
&& &&& &&& &当表数据增长到千万级后,内存数据中的索引数据增加,内存越来越不够用,需要刷新脏数据增多,&& &&& &&& &
&& &&& &&& &mongostat 分析的 dirty % & 3,后从16G 内存升级到32G 内存后,情况稍有好转。
&& &2.数据量过大后,从节点时尔出现CPU load 负载过高,从节点尤其明显。
&& &&& &在把表重命名,新数据插入到新表处理后:
&& &&& &db.TraceLogs.renameCollection(&TraceLogs_bak170210&);
&& &&& &(新数据插入时,会自动生成表TraceLogs)
&& &&& &历史数据表统计信息&& &
&& &&& &&& &Log:PRIMARY& db.TraceLogs_bak170210.stats()
&& &&& &&& &{
&& &&& &&& &&& &&ns& : &RavenLogs.TraceLogs_bak170210&,
&& &&& &&& &&& &&count& : ,
&& &&& &&& &&& &&size& : ,
&& &&& &&& &&& &&avgObjSize& : 2251,
&& &&& &&& &&& &&storageSize& : 444,613,255,168,
&& &&& &&& &&& &.....
&& &&& &&& &&& &&nindexes& : 2,
&& &&& &&& &&& &&totalIndexSize& : ,
&& &&& &&& &&& &&indexSizes& : {
&& &&& &&& &&& &&& &&_id_& : 3,973,029,888,
&& &&& &&& &&& &&& &&TraceId_1& : 11,302,027,264
&& &&& &&& &&& &},
&& &&& &&& &&& &&ok& : 1
&& &&& &&& &}
&& &&& &&& &从此统计信息中可以看到:
&& &&& &&& &&& &&& &表存储大小:&& &444G,
&& &&& &&& &&& &&& &索引 _id_ 3.9G, TraceId_1 大小:11G
&& &&& &再次查看数据库性能
&& &&& &从以前的:
&& &&& &load average: & 5.47, 5.47, 5.47
&& &&& &降到了:
&& &&& &load average: 0.88, 1.34, 1.69
&& &&& &(主从节点,皆已下降)
&& &&& &在做历史数据迁移期间,又升到了& 8 并且时频繁出现。
&& &&& &完成数据迁移后,回落到& 2 & load avg &: 4 之间&& &&& &(升级到MongoDB3.4 之后)
&& &&& &&& &
&& &&& &原因分析:
&& &&& &&& &个人认为,主因还是因为内存不够。索引+热数据远远超过了16G的MongoDB使用内存。
&& &&& &&& &从而导致大量的IO,相对的CPU load 也上去了。
&& &&& &&& &在把原大表TraceLogs 改名后(TraceLogs_bak170210),大量的热块数据已被清除出内存,
&& &3.此前数据库从节点内存升级后(16G --& 32G),参数配置不当,节点实例当机情况:
&& &&& &wiredTiger:
&& &&& &&& &engineConfig:
&& &&&&&& cacheSizeGB: 28&& &(限制mongoDB 使用内存最大值)
&& &&& &后调整为默认值
&& &&& &&& &&& &#cacheSizeGB: 28&& &(限制mongoDB 使用内存最大值),默认值为50%
&& &&& &mongoDB实例恢复正常,但CPU load 也一直居高不下。
&& &&& &原因分析:
&& &&& &&& &系统使用内存太少,可能是磁盘缓存过低,而无法读写数据,但在mongoDB 日志中,
&& &&& &&& &无法找到原因。只是看到实例被关闭。
&& &4.因为oplog 同步表最大设置值(oplogSizeMB)为50G, 但50G 只能保存52h 的数量变化量。
&& &想添加新的从节点时,当同步完成数据后,已过了oplog 的窗口期.
&& &(oplogSizeMB的大小要大于数据同步完成+索引建立完成的时间段内生成的数据量,
&& &当同步完成后,从节点从主节点读oplog表的数据,发现最小的同步时间,已大于从节点中
&& &同步开始时的时间了,这就是窗口期已过期)
&& &&& &数据量大后,重新创建索引的时间特别惊人,一个索引需要10多个小时。
&& &&& &500G 存储量,总共需要3天左右的数据完成节点的数据同步及索引创建。
&& &&& &后面计划在添加节点前,做以下调整:
&& &&& &1.把数据库升级到3.4 版本,此版本在新节点数据同步,创建索引上,号称已有很大的改善。
&& &&& &2.删除能够优化的索引,如果索引无法优化,也可以考虑先把某个索引删除,节点完成后,再重新建立
经验总结:
&& &1.索引的优化,尽可能的发挥主键索引的功能,比如上面说到的,使用日期范围自己生成_id 范围,用_id字段进行查询,
&& &能不建立索引,就不建立。在大增长的表中,极其重要。
&& &2.数据库服务器的内存配置上,内存&索引大小,或者是配置到 内存&=索引大小+热数据大小 还是有必要的。
&& &3.数据库服务器的磁盘配置上,如果是云服务器,尽量采用高效云盘。使用EXT4,或者使用NFS 格式也是有必要的。
&& &4.如果一个库有多个表的数据达到亿级时,可能也是考虑使用分片集群的时候,特别是如果此表是做为主业务
&& &数据库的情况。
---------- 表数据增长情况 ------------------
2/1/2017,&& &3152283
2/2/2017,&& &3394489
2/3/2017,&& &3524487
2/4/2017,&& &3511386
2/5/2017,&& &3870305
2/6/2017,&& &3056966
2/7/2017,&& &3022927
2/8/2017,&& &3484463
2/9/2017,&& &4033520
--------------------------
&2016/12:&& &
&2017/01:&& &
&2017/02:&& &
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:863917次
积分:9674
积分:9674
排名:第1932名
原创:211篇
转载:91篇
评论:46条
(3)(3)(3)(3)(2)(6)(4)(2)(1)(3)(4)(1)(2)(3)(1)(2)(2)(4)(5)(3)(4)(6)(4)(2)(13)(4)(8)(5)(6)(12)(13)(5)(2)(5)(6)(6)(6)(1)(6)(4)(5)(20)(2)(4)(1)(1)(2)(1)(9)(1)(1)(3)(1)(1)(4)(1)(2)(4)(1)(1)(1)(1)(1)(1)(6)(1)(1)(6)(7)(6)(4)(3)(2)(5)(6)(7)(4)
(window.slotbydup = window.slotbydup || []).push({
id: '4740881',
container: s,
size: '200,200',
display: 'inlay-fix'一、实现目标
  源数据库的数据更新或者删除之后,目标数据库的数据跟着更新或删除,整体流程截图如下:
一、准备工作
源数据库ORACLE& 目标数据库MongoDB,在源数据库添加删除、更新触发器
二、操作步骤
添加表输入组件,连接ORACLE触发器记录表
添加JAVA代码组件,进行步骤跳转,根据输入的数据判断是删除或者更新,如果是删除,则跳转至MongoDB Delete步骤中,如果是更新的话,跳转至字段选择步骤中。JAVA代码中的详细信息如下:
import java.util.L
import org.pentaho.di.core.exception.KettleE
import org.pentaho.di.core.row.RowDataU
import org.pentaho.di.core.row.RowM
import org.pentaho.di.core.row.RowMetaI
import org.pentaho.di.core.row.ValueM
import org.pentaho.di.trans.T
import org.pentaho.di.trans.TransM
private Object[] previousR//上一行
private RowSet t1 = null;//业务表步骤
private RowSet t2 = null;//删除步骤
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException
Object[] r = getRow(); //获取输入行
if ( first ) {
if ( getInputRowMeta() == null ) {
setOutputDone();//设置输出完成
return false;
if ( r == null ) { // 如果当前行为null
if ( previousRow != null ) {//如果上一行不为null
//是最后一行
boolean valid=true;
previousRow = createOutputRow(previousRow, data.outputRowMeta.size());
Trans trans=getTrans();//获取转换实例
if (trans != null){
String sync_val = get(Fields.In, "ID").getString(previousRow);//获取ID
trans.setVariable("LAST_SYNC_VAL", sync_val);//设置变量的值
String OpType = get(Fields.In, "DATATYPE").getString(previousRow);//获取操作类型是删除还是更新
String keyid= get(Fields.In, "DATAID").getString(previousRow);//获取操作类型是删除还是更新
//Object[] rowData = RowDataUtil.allocateRowData(data.outputRowMeta.size());
//get(Fields.Out, "KEYID").setValue(rowData,keyid);
//putRowTo(data.outputRowMeta, previousRow,t2);
if(OpType.equals("UPDATE")){//验证通过
putRowTo(data.outputRowMeta, previousRow,t1);
putRowTo(data.outputRowMeta, previousRow,t2);
setOutputDone();//设置输出完成
return false;//返回false表示不用再继续处理processRow
if ( !first ) {//不是第一次执行,因为第一次执行时previousRow一定是Null
//不是最后一行
boolean valid=true;
String OpType = get(Fields.In, "DATATYPE").getString(previousRow);//获取操作类型是删除还是更新
String keyid= get(Fields.In, "DATAID").getString(previousRow);//获取操作类型是删除还是更新
//Object[] rowData = RowDataUtil.allocateRowData(data.outputRowMeta.size());
//get(Fields.Out, "KEYID").setValue(rowData,keyid);
//putRowTo(data.outputRowMeta, previousRow,t2);
if(OpType.equals("UPDATE")){
putRowTo(data.outputRowMeta, previousRow,t1);
putRowTo(data.outputRowMeta, previousRow,t2);
previousRow =//把当前行设为下一次执行的上一行
if ( first ) {//如果是首次执行
first = false;
t1 = findTargetRowSet("dataupdate");//业务表步骤
t2 = findTargetRowSet("datadelete");//数据删除步骤
return true;//返回true表示还要继续处理processRow
3.如果跳转至了MongoDB Delete,则根据ID对目标库进行删除。Mongodb delete组件配置如下:
  JSON query中的{ID:"?{DATAID}"}表示删除ID等于传进来的参数DATAID的所有数据,Execute for each row要选择上,表示执行每一行数据。 & & & & &
  4.如果通过JAVA代码2判断为更新的话,则流程将跳转至字段选择组件,只获取主键ID,此步骤非常重要,因为要根据ID去源表中获取等更新的那条数据。
5.选择表输入组件,该步骤是根据上一步传入的ID获取待更新的那一条数据
PS:获取SQL查询语句:此处写入SQL语句,里边的?是变量替换,下边要勾选上"替换SQL语句里的变量",从步骤插入数据要选择上一步,勾选上执行每一行。
6.下边的步骤:流查询、JAVA代码是对数据进行清洗,字典替换,此处不再解释
7.最后一步:Mongodb output输出需要详细设置
output options选项卡勾选update &modifier update
Mongo文档字段配置:ID为主键匹配字段,匹配字段更新为Y 修改器设置为N/A表示不对主键更新
阅读(...) 评论()}

我要回帖

更多关于 kettle 抽取mongodb 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信