基于Morphlines的ETL开发流程
作者 : 费英林
目录
1.3. UDH Search 与 Morphline 的关系
Morphlines 是一个开源框架,它降低了 Hadoop 平台上开发和维护 ETL 流程的时间和成本,通过 Morphlines 可以抽取、转换并加载数据到 Apache Solr 、 HBase 、 HDFS 、企业数据仓库或者在线分析应用。我们可以很方便的通过配置文件对 morphline 的流程进行配置,不需要进行 Java 编码工作。
Morphlines 本身是一个类库,可以嵌入到任何 Java 代码里。一个 morphline 是一个转换命令集合,这些命令对记录进行加载、转换等等。记录是一个内存数据结构,由键值对组成,可能含有 blob 或 POJO 附件。
1. 系统概要
1.1. 背景介绍
在 Hadoop 平台上开发 ETL 应用是一项费时费力的工作,要编写一些复杂的处理程序,如 MapReduce 作业、 Oozie 工作流等等。为了适应多样化的应用需求,缩短应用开发周期,降低开发维护成本, Cloudera 公司开发了 Morphlines 框架。这个框架通过简单的配置文件来定义一个 ETL 流程,不需要任何 Java 编码即可满足大部分应用需求。如果既有的命令无法满足应用需求,程序员也可以写一些命令,这些命令可以方便的整合到 Morphlines 框架中。
1.2. 系统架构
1) 输入源: ETL 数据源, Morphlines 可以从 Flume 、 HBase 和 HDFS 中读取数据。
2 ) Morphline :一个自定义的工作流配置文件。
3 )输出: Morphline 可以输出数据到多个系统,如 HBase 、 HDFS 、 Solr 及企业数据仓库等等。
1.3. UDH Search与Morphline的关系
Morphline 最初是作为 Search 的一部分来开发的,后来为了有更多的开发者和使用者能够了解和扩展 Morphline 的功能, Morphline 形成了一个独立的开源项目。利用 Morphline 可以实现 Search 中的 ETL 阶段工作,也可以实现 Search 之外的一些 ETL 工作。本文以 Morphline 在 Search 中的应用为例,讲述代码开发和部署,以及一些需要注意的问题。下图是 Morphline 在 Search 整体架构中的角色应用。
2. 系统安装与配置
2.1. UDH Search服务安装
在集群上部署 UDH Search 服务,参见“专题 -UDH Search 系统构建及其应用”一文。需要配置 Lily HBase Batch Indexer 和 Lily HBase NRT Indexer ,分别用于索引的批量创建和实时创建,其中索引的实时创建是利用 HBase 的 Replication 机制实现的。
3. Morphline 命令开发
Morhpline 命令需要实现 Java 接口 Command 或它的子类 AbstractCommand ,另外还需要一个 CommandBuilder 接口的实现类,在这个 CommandBuilder 里定义 Morhpline 命令的名字。下面以删除某字段里 Html 标签为例,给出代码的具体实现。
3.1. CommandBuilder
在 Builder 里指定命令名称并创建命令实例(工厂方法)。
package com.yonyou.udhsearch.morphlines;
import org.kitesdk.morphline.api.Command;
import org.kitesdk.morphline.api.CommandBuilder;
import org.kitesdk.morphline.api.MorphlineContext;
import com.typesafe.config.Config;
public final class DropHtmlTags implements CommandBuilder {
@Override
public Collection<String> getNames() {
return Collections.singletonList(" htmlDropper ");
}
@Override
public Command build(Config config, Command parent, Command child, MorphlineContext context) {
return new HtmlDropper(this, config, parent, child, context);
}
}
3.2. Command
在命令实现类里实现 doProcess(Record record) 方法,在这个方法加入我们需要的处理逻辑,在本例中是删除 Html 标签及回车换行符。
package com.yonyou.udhsearch.morphlines ;
import java.util.Collection;
import java.util.Collections;
import java.util.ListIterator;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.kitesdk.morphline.api.Record;
import org.kitesdk.morphline.base.AbstractCommand;
import com.typesafe.config.Config;
private static final class HtmlDropper extends AbstractCommand {
private static final String regEx_script = "<script[^>]*?>[\\s\\S]*?<\\/script>";
private static final String regEx_style = "<style[^>]*?>[\\s\\S]*?<\\/style>"; //
private static final String regEx_html = "<[^>]+>";
private static final String regEx_space = "\\s*|\t|\r|\n";
private final String fieldName;
public HtmlDropper(CommandBuilder builder, Config config, Command parent, Command child, MorphlineContext context) {
super(builder, config, parent, child, context);
this.fieldName = getConfigs().getString(config, " field ");
validateArguments();
}
@Override
protected boolean doProcess(Record record) {
ListIterator iter = record.get(fieldName).listIterator();
while (iter.hasNext()) {
iter.set(transformFieldValue(iter.next()));
}
// pass record to next command in chain:
return super.doProcess(record);
}
/** Transforms the given input value to some output value */
private Object transformFieldValue(Object value) {
if (value == null) return "";
String htmlStr = value.toString();
try {
Pattern p_script = Pattern.compile(regEx_script, Pattern.CASE_INSENSITIVE);
Matcher m_script = p_script.matcher(htmlStr);
htmlStr = m_script.replaceAll("");
Pattern p_style = Pattern.compile(regEx_style, Pattern.CASE_INSENSITIVE);
Matcher m_style = p_style.matcher(htmlStr);
htmlStr = m_style.replaceAll("");
Pattern p_html = Pattern.compile(regEx_html, Pattern.CASE_INSENSITIVE);
Matcher m_html = p_html.matcher(htmlStr);
htmlStr = m_html.replaceAll("");
Pattern p_space = Pattern.compile(regEx_space, Pattern.CASE_INSENSITIVE);
Matcher m_space = p_space.matcher(htmlStr);
htmlStr = m_space.replaceAll("");
if (htmlStr.indexOf(" ") != -1)
htmlStr = htmlStr.replaceAll(" ", "");
} catch (Exception e) {
htmlStr = value.toString();
}
return htmlStr;
}
@Override
protected void doNotify(Record notification) {
LOG.debug("myNotification: {}", notification);
super.doNotify(notification);
}
}
4. Morphline 配置文件
在 Command 的构造函数里,我们指定了一个属性 ’ field ’ ,也就是说,我们可以通过 field 来指定对哪个字段进行操作。除了 field 属性,还需要在 Morphline 的配置文件里引入新开发的命令类。在这个例子里,对字段 data_f2 进行 Html 标签删除操作。如下所示:
morphlines : [
{
id : morphline1
importCommands : ["org.kitesdk.morphline.**", "com.ngdata.**", " com.yonyou.udhsearch.morphlines.** "]
commands : [
{
extractHBaseCells {
mappings : [
{
inputColumn : "data:f1"
outputField : "data_f1"
type : string
source : value
}
{
inputColumn : "data:f2"
outputField : "data_f2"
type : string
source : value
}
]
}
}
{
htmlDropper {
field : data_f2
}
}
{ logTrace { format : "output record: {}", args : ["@{}"] } }
]
}
]
5. Morphline 部署运行
运行时,需要保证新开发的命令类在 MapReduce 作业或 Lily NRT Indexer 服务的类路径里。
5.1. MapReduce作业
对于 MR 作业,可直接将打好的 jar 包包含在命令行里,如下所示:
HADOOP_CLASSPATH=/usr/lib/hbase/hbase-protocol.jar \
hadoop --config /etc/hadoop/conf jar \
/usr/lib/hbase-solr/tools/hbase-indexer-mr-*-job.jar --conf \
/etc/hbase/conf/hbase-site.xml --libjars /root/tools/MyMorphlineCommands/bin/DropHtmlTags.jar \
-D 'mapred.child.java.opts=-Xmx500m' \
--hbase-indexer-file /root/search/hbase_html_tags_dropper/conf/hbase_html_tags_dropper_mapper.xml --zk-host \
127.0.0.1/solr --collection hbase_html_tags_dropper --go-live --log4j \
/etc/hbase/conf.dist/log4j.properties
5.2. Lily NRT Indexer
将 jar 包放在服务的 lib 目录下,即 /usr/lib/hbase-solr/lib/ ,重启服务。
Attachments:
专题_基于UDH的一种实时查询系统构建方案.docx (application/vnd.openxmlformats-officedocument.wordprocessingml.document)
图1.png (image/png)
图2.png (image/png)
图3.png (image/png)
图4.png (image/png)
专题-基于Morphlines的ETL开发流程.docx (application/vnd.openxmlformats-officedocument.wordprocessingml.document)