知识库 : 基于Morphlines的ETL开发流程

 

 

Edit Document

 

 

 

 

 

 

 

 

 

 

基于Morphlines的ETL开发流程

 

 

 

作者 费英林

 


目录

1.               系统概要

1.1.               背景介绍

1.2.               系统架构

1.3.               UDH Search Morphline 的关系

2.               系统安装与配置

2.1.               UDH Search 服务安装

3.               Morphline 命令开发

3.1.               CommandBuilder

3.2.               Command

4.               Morphline 配置文件

5.               Morphline 部署运行

5.1.               MapReduce 作业

5.2.               Lily NRT Indexer

 

Morphlines 是一个开源框架,它降低了 Hadoop 平台上开发和维护 ETL 流程的时间和成本,通过 Morphlines 可以抽取、转换并加载数据到 Apache Solr HBase HDFS 、企业数据仓库或者在线分析应用。我们可以很方便的通过配置文件对 morphline 的流程进行配置,不需要进行 Java 编码工作。

Morphlines 本身是一个类库,可以嵌入到任何 Java 代码里。一个 morphline 是一个转换命令集合,这些命令对记录进行加载、转换等等。记录是一个内存数据结构,由键值对组成,可能含有 blob POJO 附件。

1.     系统概要

1.1.   背景介绍

Hadoop 平台上开发 ETL 应用是一项费时费力的工作,要编写一些复杂的处理程序,如 MapReduce 作业、 Oozie 工作流等等。为了适应多样化的应用需求,缩短应用开发周期,降低开发维护成本, Cloudera 公司开发了 Morphlines 框架。这个框架通过简单的配置文件来定义一个 ETL 流程,不需要任何 Java 编码即可满足大部分应用需求。如果既有的命令无法满足应用需求,程序员也可以写一些命令,这些命令可以方便的整合到 Morphlines 框架中。

1.2.   系统架构

QQ截图20150730142319.jpg

1) 输入源: ETL 数据源, Morphlines 可以从 Flume HBase HDFS 中读取数据。

2 Morphline :一个自定义的工作流配置文件。

3 )输出: Morphline 可以输出数据到多个系统,如 HBase HDFS Solr 及企业数据仓库等等。

1.3.   UDH Search与Morphline的关系

Morphline 最初是作为 Search 的一部分来开发的,后来为了有更多的开发者和使用者能够了解和扩展 Morphline 的功能, Morphline 形成了一个独立的开源项目。利用 Morphline 可以实现 Search 中的 ETL 阶段工作,也可以实现 Search 之外的一些 ETL 工作。本文以 Morphline Search 中的应用为例,讲述代码开发和部署,以及一些需要注意的问题。下图是 Morphline Search 整体架构中的角色应用。

QQ截图20150730142349.jpg

2.     系统安装与配置

2.1.   UDH Search服务安装

    在集群上部署 UDH Search 服务,参见“专题 -UDH Search 系统构建及其应用”一文。需要配置 Lily HBase Batch Indexer Lily HBase NRT Indexer ,分别用于索引的批量创建和实时创建,其中索引的实时创建是利用 HBase Replication 机制实现的。

3.     Morphline 命令开发

                 Morhpline 命令需要实现 Java 接口 Command 或它的子类 AbstractCommand ,另外还需要一个 CommandBuilder 接口的实现类,在这个 CommandBuilder 里定义 Morhpline 命令的名字。下面以删除某字段里 Html 标签为例,给出代码的具体实现。

3.1.   CommandBuilder

Builder 里指定命令名称并创建命令实例(工厂方法)。

package com.yonyou.udhsearch.morphlines;

 

import org.kitesdk.morphline.api.Command;

import org.kitesdk.morphline.api.CommandBuilder;

import org.kitesdk.morphline.api.MorphlineContext;

 

import com.typesafe.config.Config;

 

public final class DropHtmlTags implements CommandBuilder {

  @Override

  public Collection<String> getNames() {

    return Collections.singletonList(" htmlDropper ");

  }

 

  @Override

  public Command build(Config config, Command parent, Command child, MorphlineContext context) {

    return new HtmlDropper(this, config, parent, child, context);

  }

}

3.2.   Command

在命令实现类里实现 doProcess(Record record) 方法,在这个方法加入我们需要的处理逻辑,在本例中是删除 Html 标签及回车换行符。

package com.yonyou.udhsearch.morphlines ;

 

import java.util.Collection;

import java.util.Collections;

import java.util.ListIterator;

import java.util.regex.Matcher;

import java.util.regex.Pattern;

 

import org.kitesdk.morphline.api.Record;

import org.kitesdk.morphline.base.AbstractCommand;

import com.typesafe.config.Config;

 

private static final class HtmlDropper extends AbstractCommand {

 

              private static final String regEx_script = "<script[^>]*?>[\\s\\S]*?<\\/script>";

              private static final String regEx_style = "<style[^>]*?>[\\s\\S]*?<\\/style>"; //

              private static final String regEx_html = "<[^>]+>";

              private static final String regEx_space = "\\s*|\t|\r|\n";

              private final String fieldName;

 

 

              public HtmlDropper(CommandBuilder builder, Config config, Command parent, Command child, MorphlineContext context) {

                super(builder, config, parent, child, context);

                this.fieldName = getConfigs().getString(config, " field ");

                validateArguments();

              }

 

              @Override

              protected boolean doProcess(Record record) {

                ListIterator iter = record.get(fieldName).listIterator();

                while (iter.hasNext()) {

                            iter.set(transformFieldValue(iter.next()));

                }

               

                // pass record to next command in chain:

                return super.doProcess(record);

              }

 

              /** Transforms the given input value to some output value */

              private Object transformFieldValue(Object value) {

                if (value == null) return "";

                String htmlStr = value.toString();

 

                try {

                Pattern p_script = Pattern.compile(regEx_script, Pattern.CASE_INSENSITIVE);

                Matcher m_script = p_script.matcher(htmlStr);

                htmlStr = m_script.replaceAll("");

 

                Pattern p_style = Pattern.compile(regEx_style, Pattern.CASE_INSENSITIVE);

                Matcher m_style = p_style.matcher(htmlStr);

                htmlStr = m_style.replaceAll("");

 

                Pattern p_html = Pattern.compile(regEx_html, Pattern.CASE_INSENSITIVE);

                Matcher m_html = p_html.matcher(htmlStr);

                htmlStr = m_html.replaceAll("");

 

                Pattern p_space = Pattern.compile(regEx_space, Pattern.CASE_INSENSITIVE);

                Matcher m_space = p_space.matcher(htmlStr);

                htmlStr = m_space.replaceAll("");

               

                if (htmlStr.indexOf("&nbsp;") != -1)

                htmlStr = htmlStr.replaceAll("&nbsp;", "");

               

                } catch (Exception e) {

                              htmlStr = value.toString();

                }

               

                return htmlStr;

              }

 

              @Override

              protected void doNotify(Record notification) {

                LOG.debug("myNotification: {}", notification);

                super.doNotify(notification);

              }

 

4.     Morphline 配置文件

Command 的构造函数里,我们指定了一个属性 field ,也就是说,我们可以通过 field 来指定对哪个字段进行操作。除了 field 属性,还需要在 Morphline 的配置文件里引入新开发的命令类。在这个例子里,对字段 data_f2 进行 Html 标签删除操作。如下所示:

morphlines : [

  {

    id : morphline1

    importCommands : ["org.kitesdk.morphline.**", "com.ngdata.**", " com.yonyou.udhsearch.morphlines.** "]

 

    commands : [                   

      {

        extractHBaseCells {

          mappings : [

            {

              inputColumn : "data:f1"

              outputField : "data_f1"

              type : string

              source : value

            }

            {

              inputColumn : "data:f2"

              outputField : "data_f2"

              type : string

              source : value

            }

          ]

        }

      }

      {

        htmlDropper {

          field : data_f2

        }

      }

      { logTrace { format : "output record: {}", args : ["@{}"] } }   

    ]

  }

]

5.     Morphline 部署运行

运行时,需要保证新开发的命令类在 MapReduce 作业或 Lily NRT Indexer 服务的类路径里。

5.1.   MapReduce作业

对于 MR 作业,可直接将打好的 jar 包包含在命令行里,如下所示:

HADOOP_CLASSPATH=/usr/lib/hbase/hbase-protocol.jar \

hadoop --config /etc/hadoop/conf jar \

/usr/lib/hbase-solr/tools/hbase-indexer-mr-*-job.jar --conf \

/etc/hbase/conf/hbase-site.xml --libjars /root/tools/MyMorphlineCommands/bin/DropHtmlTags.jar \

-D 'mapred.child.java.opts=-Xmx500m' \

--hbase-indexer-file /root/search/hbase_html_tags_dropper/conf/hbase_html_tags_dropper_mapper.xml --zk-host \

127.0.0.1/solr --collection hbase_html_tags_dropper --go-live --log4j \

/etc/hbase/conf.dist/log4j.properties

5.2.   Lily NRT Indexer

jar 包放在服务的 lib 目录下,即 /usr/lib/hbase-solr/lib/ ,重启服务。

 

Attachments:

基于UDH的一种实时查询系统构建方案 - 知识库 - 数据处理平台知识库.htm (text/html)
专题_基于UDH的一种实时查询系统构建方案.docx (application/vnd.openxmlformats-officedocument.wordprocessingml.document)
图1.png (image/png)
图2.png (image/png)
图3.png (image/png)
图4.png (image/png)
专题-基于Morphlines的ETL开发流程.docx (application/vnd.openxmlformats-officedocument.wordprocessingml.document)