背景
现在市面上有不少的SQ on HBase的产品:Trafodion、Impala、Greplum等等,但是很少有支持触发器功能的产品,也许是因为在这种应用场景下很少使用到触发器,但不可否认触发器是一项非常有用的功能,本文主要针对一些缺少触发器功能的产品提供一种有趣实现的思路。
概览
如下图所示:
- 当有数据插入到SQEngine中时,数据会存储至HBase.
- 在HBase中借助Observer,当有数据触发时会通知BaseRegionObserver, 根据项目需要实现prePut、PostPut等方法完成Event的处理,在本例中,会以HTTP请求的方式发送至Trigger Server。
- 执行PLSQL或者是SQL, 此步骤可以根据产品自己的特点决定,如果有PLSQL功能则Trigger内容中可以书写PLSQL语法,反之可以以SQL的形式执行。
- PLSQL最终还是要通过SQ Engine执行,所以这一步一般数据数据库本身功能。
实现HBase Observer
继承BaseRegionObserver,实现相应的方法,本例中实现put,也就是在插入/更新时触发操作。
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.filter.ByteArrayComparable;import org.apache.hadoop.hbase.filter.CompareFilter;import org.apache.hadoop.hbase.regionserver.RegionScanner;import org.apache.hadoop.hbase.regionserver.wal.WALEdit;import org.apache.http.HttpResponse;import org.apache.http.client.HttpClient;import org.apache.http.client.methods.HttpGet;import org.apache.http.impl.client.DefaultHttpClient;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.net.URI;import java.io.IOException;import java.util.List;public class EsgObserver extends BaseRegionObserver { @Override public void prePut(ObserverContexte, Put put, WALEdit edit, Durability durability) throws IOException { String tableName = e.getEnvironment().getRegion().getTableDesc().getNameAsString(); e.getEnvironment().getRegionInfo().getRegionId(); if(put.getTimeStamp() != ts){ sendRequest(tableName,"prePut"); } this.ts = put.getTimeStamp();}private void sendRequest(String tableName, String API){ HttpClient client = new DefaultHttpClient(); HttpGet get = new HttpGet(); HttpResponse response; String url, ip, port; Configuration conf = HBaseConfiguration.create(); try { Connection conn = ConnectionFactory.createConnection(conf); Table t = conn.getTable(TableName.valueOf(tableName)); HTableDescriptor htd = t.getTableDescriptor(); String value = htd.getValue(“coprocessor$4”); String args = value.split(“\\|”)[3]; ip = (args.split(“,”)[0]).split(“=”)[1];port = (args.split(“,”)[1]).split(“=”)[1]; url = “http://” + ip + “:” + port + “/esg/exec/”; } catch (Exception e) { logger.error(e.getMessage(), e); return; } String table = tableName.split(“:”)[1]; String uri = url + API + “?tableName=” + table; logger.info(“uri:” + uri); try{ get.setURI(new URI(uri)); response = client.execute(get); logger.info(“request response code:” + response.getStatusLine().getStatusCode()); logger.info(“request response reason:” + response.getStatusLine().getReasonPhrase()); } catch (Exception e){ logger.error(e.getMessage(),e); } }}
Trigger Server的核心实现
Trigger Server中采用了Spring boot + Antlr4实现。其中,Spring boot负责web 项目的搭建,Antlr4实现触发器语句的解析分离。
语句解析如下:
语句解析主要进行触发创建、修改、删除语句的解析。
grammar Sql;@header { package com.esg.hbase.generated;}program : block EOF;//block: T_CREATE T_OR T_REPLACE;block: create_trigger_stmt | drop_stmt | switch_trigger_stmt;create_trigger_stmt: T_CREATE has_replace T_TRIGGER trigger_name before_after cuid_ops T_ON table_name for_each_row plsql_block;has_replace: (T_OR T_REPLACE)? ;trigger_name: L_ID;before_after: (T_BEFORE | T_AFTER);cuid_ops: (T_INSERT | T_DELETE | T_UPDATE T_OF L_ID);table_name: L_ID;for_each_row: (T_FOR T_EACH T_ROW)?;plsql_block: L_PLSQL_BLOCK;drop_stmt: T_DROP T_TRIGGER trigger_name T_SEMICOLON?;enable_disable: (T_ENABLE | T_DISABLE);switch_trigger_stmt: (switch_trigger_all_stmt | switch_trigger_specify_stmt);switch_trigger_specify_stmt: T_ALTER T_TRIGGER trigger_name enable_disable;switch_trigger_all_stmt: T_ALTER T_TABLE table_name enable_disable T_ALL T_TRIGGERS;
核心接收请求的类
import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import java.sql.Connection;import java.sql.DriverManager;import java.sql.SQLException;import java.sql.Statement;import java.util.Iterator;import java.util.List;@RequestMapping("esg/exec") @RestController public class ExecutorController { @Autowired private TriggerService triggerService; @GetMapping("/prePut") public void prePut(String tableName) { Listtriggers = this.triggerService.find(tableName, true, 1); Iterator iter = triggers.iterator(); while(iter.hasNext()){ Trigger trigger = iter.next(); if (trigger.getHasEnabled()){ String plsql = trigger.getPlsql(); executeQuery(url, username, password, plsql); } } } }
总结
本文提供了一种在SQL-on-HBase数据库上实现触发器的一种思路,这种方式实现了触发器功能的独立部署执行,方便简洁,借助于Web系统完成触发器的执行。此方案可以实现数据同步,例如同步到Kafka消息系统中,同步到其他库中等,有一些适合的应用场景。
但是,此方法也有着较为明显的缺陷,性能上可能会带来比较大的问题,所有的触发事件都需要经过同一个web服务器,增加了Web系统的负担,一旦有大量数据插入修改时,性能会成为瓶颈。事务方面,触发器中的语句和最初的插入语句不属于同一个事务,会造成数据不一致。容错也不在此方案的考虑范围内。本文为抛砖引玉,欢迎各位讨论提出更好的更合理方案。