Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
fff83ac
finish connection function
stuBirdFly Aug 21, 2024
ab20c61
Merge pull request #48 from stuBirdFly/client_1.3
stuBirdFly Aug 23, 2024
ba83fdf
hbase 1.x compatibility: filter relevant (#51)
miyuan-ljr Aug 28, 2024
f388433
add scan lease test (#53)
stuBirdFly Aug 29, 2024
42b0e29
support allowPatialResult (#56)
stuBirdFly Aug 31, 2024
cd4efbe
[Chore] use obkv-table-client 1.2.13-SNAPSHOT for testing (#57)
shenyunlong Sep 4, 2024
fa09aa9
init bufferedMutator
JackShi148 Sep 11, 2024
41a02a8
finish validateFamily and asyncExecute
JackShi148 Sep 11, 2024
4731710
correct log in OHBufferedMutatorImpl
JackShi148 Sep 11, 2024
1755b26
pass self-test
JackShi148 Sep 12, 2024
3d9c6f1
format code
JackShi148 Sep 12, 2024
7f4cee2
get bugfix
stuBirdFly Sep 12, 2024
6fcc36b
Merge pull request #63 from stuBirdFly/client_1.3
stuBirdFly Sep 12, 2024
9d6cb88
add retry when batch fails
JackShi148 Sep 13, 2024
cc17288
remove test print
JackShi148 Sep 13, 2024
d07e205
Merge branch 'HBase_1.x_compatible' into 1_x_comp
JackShi148 Sep 13, 2024
46167f9
format code
JackShi148 Sep 13, 2024
48f22ab
make interface more generalized
JackShi148 Sep 13, 2024
bea44d3
test (#59)
miyuan-ljr Sep 13, 2024
9c9e1f9
random row filter (#62)
miyuan-ljr Sep 13, 2024
5aed422
format BufferedMutator test case
JackShi148 Sep 14, 2024
cc3c244
remove redundancy, add some comments
JackShi148 Sep 14, 2024
a428d84
fix (#66)
miyuan-ljr Sep 14, 2024
0f5ab57
fix type of a bufferedMutator. Optimize by review
JackShi148 Sep 14, 2024
e00d183
OHBufferedMutator in OBKV Hbase 1_x_comp (#64)
JackShi148 Sep 14, 2024
82b6365
hbase_multi_column_family_dev (#67)
maochongxin Sep 17, 2024
4ae3827
OHBufferedMutator set and use runtimeBatchExecutor in ObTableClient
JackShi148 Sep 18, 2024
ba4828c
OHBufferedMutator set and add runtimeBatchExecutor in ObTableClient (…
JackShi148 Sep 18, 2024
d5e2085
Merge remote-tracking branch 'refs/remotes/obkv/HBase_1.x_compatible'…
JackShi148 Sep 18, 2024
ca5afb0
Bugfix/sql and conflict fixes (#69)
maochongxin Sep 18, 2024
a8270aa
Merge remote-tracking branch 'obkv/HBase_1.x_compatible' into 1_x_comp
JackShi148 Sep 20, 2024
292b9b6
[Chore] refresh code from master branch and upgrade table client vers…
shenyunlong Sep 20, 2024
da93bc6
multi-namespace in one HBase client
JackShi148 Sep 23, 2024
24a6499
merge obkv/HBase_1.x_compatible
JackShi148 Sep 23, 2024
2b2f414
format code
JackShi148 Sep 23, 2024
8261f43
set rpcConnectTimeout to tableClient
JackShi148 Sep 23, 2024
b53d5cb
multi-namespace in ocp mode and odp mode
JackShi148 Sep 24, 2024
897a8f3
revert self-defined pom xml
JackShi148 Sep 24, 2024
21ccb79
use the test as initial testing case
JackShi148 Sep 24, 2024
c748ffd
format code
JackShi148 Sep 24, 2024
680c0ba
Merge branch 'branch-1.3' into 1_x_comp
JackShi148 Oct 11, 2024
1c18df3
use 'default' database when param_url misses the database parameter
JackShi148 Oct 13, 2024
b509906
add new test case testing multi-namespace, remove some comments
JackShi148 Oct 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 40 additions & 5 deletions src/main/java/com/alipay/oceanbase/hbase/OHTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query.*;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.syncquery.ObTableQueryAsyncRequest;
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryAsyncStreamResult;
import com.alipay.oceanbase.rpc.stream.ObTableClientQueryStreamResult;
import com.alipay.oceanbase.rpc.table.ObHBaseParams;
import com.alipay.oceanbase.rpc.table.ObKVParams;
import com.alipay.sofa.common.thread.SofaThreadPoolExecutor;
Expand Down Expand Up @@ -195,8 +196,9 @@ public OHTable(Configuration configuration, String tableName) throws IOException
long keepAliveTime = configuration.getLong(HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME,
DEFAULT_HBASE_HTABLE_THREAD_KEEP_ALIVE_TIME);
this.executePool = createDefaultThreadPoolExecutor(1, maxThreads, keepAliveTime);
this.obTableClient = ObTableClientManager
.getOrCreateObTableClient(new OHConnectionConfiguration(configuration));
OHConnectionConfiguration ohConnectionConf = new OHConnectionConfiguration(configuration);
this.obTableClient = ObTableClientManager.getOrCreateObTableClient(setUserDefinedNamespace(
this.tableNameString, ohConnectionConf));

finishSetUp();
}
Expand Down Expand Up @@ -242,8 +244,9 @@ public OHTable(Configuration configuration, final byte[] tableName,
this.tableNameString = Bytes.toString(tableName);
this.executePool = executePool;
this.cleanupPoolOnClose = false;
this.obTableClient = ObTableClientManager
.getOrCreateObTableClient(new OHConnectionConfiguration(configuration));
OHConnectionConfiguration ohConnectionConf = new OHConnectionConfiguration(configuration);
this.obTableClient = ObTableClientManager.getOrCreateObTableClient(setUserDefinedNamespace(
this.tableNameString, ohConnectionConf));

finishSetUp();
}
Expand All @@ -260,6 +263,7 @@ public OHTable(Configuration configuration, final byte[] tableName,
* @param executePool ExecutorService to be used.
* @throws IllegalArgumentException if the param error
*/
@InterfaceAudience.Private
public OHTable(final byte[] tableName, final ObTableClient obTableClient,
final ExecutorService executePool) {
checkArgument(tableName != null, "tableNameString is blank.");
Expand Down Expand Up @@ -306,7 +310,8 @@ public OHTable(TableName tableName, Connection connection,
DEFAULT_HBASE_HTABLE_PUT_WRITE_BUFFER_CHECK);
this.writeBufferSize = connectionConfig.getWriteBufferSize();
this.tableName = tableName.getName();
this.obTableClient = ObTableClientManager.getOrCreateObTableClient(connectionConfig);
this.obTableClient = ObTableClientManager.getOrCreateObTableClient(setUserDefinedNamespace(
this.tableNameString, connectionConfig));
}

/**
Expand Down Expand Up @@ -364,6 +369,36 @@ private void finishSetUp() {
WRITE_BUFFER_SIZE_DEFAULT);
}

private OHConnectionConfiguration setUserDefinedNamespace(String tableNameString,
OHConnectionConfiguration ohConnectionConf)
throws IOException {
if (tableNameString.indexOf(':') != -1) {
String[] params = tableNameString.split(":");
if (params.length != 2) {
throw new IllegalArgumentException("Please check the format of self-defined "
+ "namespace and qualifier: { "
+ tableNameString + " }");
}
String database = params[0];
checkArgument(isNotBlank(database), "self-defined namespace cannot be blank or null { "
+ tableNameString + " }");
if (ohConnectionConf.isOdpMode()) {
ohConnectionConf.setDatabase(database);
} else {
String databaseSuffix = "database=" + database;
String paramUrl = ohConnectionConf.getParamUrl();
int databasePos = paramUrl.indexOf("database");
if (databasePos == -1) {
paramUrl += "&" + databaseSuffix;
} else {
paramUrl = paramUrl.substring(0, databasePos) + databaseSuffix;
}
ohConnectionConf.setParamUrl(paramUrl);
}
}
return ohConnectionConf;
}

@Override
public byte[] getTableName() {
return tableName;
Expand Down
35 changes: 35 additions & 0 deletions src/main/java/com/alipay/oceanbase/hbase/OHTablePool.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

public class OHTablePool implements Closeable {

private String originTabelName = null;
private final PoolMap<String, HTableInterface> tables;
private final int maxSize;
private final PoolMap.PoolType poolType;
Expand Down Expand Up @@ -314,6 +315,9 @@ int getCurrentPoolSize(String tableName) {
* @param paramUrl the table root server http url
*/
public void setParamUrl(final String tableName, String paramUrl) {
if (originTabelName == null) {
originTabelName = tableName;
}
setTableAttribute(tableName, HBASE_OCEANBASE_PARAM_URL, Bytes.toBytes(paramUrl));
}

Expand All @@ -334,6 +338,9 @@ public String getParamUrl(final String tableName) {
* @param fullUserName the table login username
*/
public void setFullUserName(final String tableName, final String fullUserName) {
if (originTabelName == null) {
originTabelName = tableName;
}
setTableAttribute(tableName, HBASE_OCEANBASE_FULL_USER_NAME, Bytes.toBytes(fullUserName));
}

Expand All @@ -354,6 +361,9 @@ public String getFullUserName(final String tableName) {
* @param password the table login password
*/
public void setPassword(final String tableName, final String password) {
if (originTabelName == null) {
originTabelName = tableName;
}
setTableAttribute(tableName, HBASE_OCEANBASE_PASSWORD, Bytes.toBytes(password));
}

Expand All @@ -374,6 +384,9 @@ public String getPassword(final String tableName) {
* @param sysUserName the sys username
*/
public void setSysUserName(final String tableName, final String sysUserName) {
if (originTabelName == null) {
originTabelName = tableName;
}
setTableAttribute(tableName, HBASE_OCEANBASE_SYS_USER_NAME, Bytes.toBytes(sysUserName));
}

Expand All @@ -394,6 +407,9 @@ public String getSysUserName(final String tableName) {
* @param sysPassword the sys user password
*/
public void setSysPassword(final String tableName, final String sysPassword) {
if (originTabelName == null) {
originTabelName = tableName;
}
setTableAttribute(tableName, HBASE_OCEANBASE_SYS_PASSWORD, Bytes.toBytes(sysPassword));
}

Expand Down Expand Up @@ -481,6 +497,10 @@ public long getWriteBufferSize(String tableName) {
.toLong(attr);
}

public String getOriginTableName() {
return this.originTabelName;
}

/**
* Sets the operation timeout for the specified tables in this pool.
*
Expand Down Expand Up @@ -520,6 +540,9 @@ public int getOperationTimeout(String tableName) {
* @param odpAddr ODP address
*/
public void setOdpAddr(final String tableName, String odpAddr) {
if (originTabelName == null) {
originTabelName = tableName;
}
setTableAttribute(tableName, HBASE_OCEANBASE_ODP_ADDR, Bytes.toBytes(odpAddr));
}

Expand All @@ -545,6 +568,9 @@ public String getOdpAddr(String tableName) {
* @param odpPort ODP port
*/
public void setOdpPort(final String tableName, int odpPort) {
if (originTabelName == null) {
originTabelName = tableName;
}
setTableAttribute(tableName, HBASE_OCEANBASE_ODP_PORT, Bytes.toBytes(odpPort));
}

Expand All @@ -564,6 +590,9 @@ public int getOdpPort(String tableName) {
* @param odpMode ODP mode
*/
public void setOdpMode(final String tableName, boolean odpMode) {
if (originTabelName == null) {
originTabelName = tableName;
}
setTableAttribute(tableName, HBASE_OCEANBASE_ODP_MODE, Bytes.toBytes(odpMode));
}

Expand All @@ -583,6 +612,9 @@ public boolean getOdpMode(String tableName) {
* @param database ODP database name
*/
public void setDatabase(final String tableName, String database) {
if (originTabelName == null) {
originTabelName = tableName;
}
setTableAttribute(tableName, HBASE_OCEANBASE_DATABASE, Bytes.toBytes(database));
}

Expand Down Expand Up @@ -653,6 +685,9 @@ public Object getTableExtendAttribute(String tableName, String attributeName) {
}

public void setRuntimeBatchExecutor(String tableName, ExecutorService runtimeBatchExecutor) {
if (originTabelName == null) {
originTabelName = tableName;
}
setTableExtendAttribute(tableName, HBASE_OCEANBASE_BATCH_EXECUTOR, runtimeBatchExecutor);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,4 +159,8 @@ public final class OHConstants {

public static final int MAX_KEYVALUE_SIZE_DEFAULT = -1;

public static final String SOCKET_TIMEOUT = "ipc.socket.timeout";

public static final int DEFAULT_SOCKET_TIMEOUT = 20000; // 20 seconds

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,29 @@
import java.util.Properties;

import static com.alipay.oceanbase.hbase.constants.OHConstants.*;
import static org.apache.commons.lang.StringUtils.isBlank;
import static org.apache.hadoop.hbase.ipc.RpcClient.DEFAULT_SOCKET_TIMEOUT_CONNECT;
import static org.apache.hadoop.hbase.ipc.RpcClient.SOCKET_TIMEOUT_CONNECT;

@InterfaceAudience.Private
public class OHConnectionConfiguration {
private String paramUrl;
private String database;
private final Properties properties;
private final String paramUrl;
private final String fullUsername;
private final String password;
private final String sysUsername;
private final String sysPassword;
private final String odpAddr;
private final int odpPort;
private final boolean odpMode;
private final String database;
private final long writeBufferSize;
private final int operationTimeout;
private final int scannerCaching;
private final long scannerMaxResultSize;
private final int maxKeyValueSize;
private final int rpcTimeout;
private final int rpcConnectTimeout;

public OHConnectionConfiguration(Configuration conf) {
this.paramUrl = conf.get(HBASE_OCEANBASE_PARAM_URL);
Expand All @@ -54,11 +58,27 @@ public OHConnectionConfiguration(Configuration conf) {
this.odpAddr = conf.get(HBASE_OCEANBASE_ODP_ADDR);
this.odpPort = conf.getInt(HBASE_OCEANBASE_ODP_PORT, -1);
this.odpMode = conf.getBoolean(HBASE_OCEANBASE_ODP_MODE, false);
this.database = conf.get(HBASE_OCEANBASE_DATABASE);
String database = conf.get(HBASE_OCEANBASE_DATABASE);
if (isBlank(database)) {
database = "default";
}
this.database = database;
this.writeBufferSize = conf.getLong(WRITE_BUFFER_SIZE_KEY, WRITE_BUFFER_SIZE_DEFAULT);
this.operationTimeout = conf.getInt("hbase.client.operation.timeout", 1200000);
this.rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
int rpcConnectTimeout = -1;
if (conf.get(SOCKET_TIMEOUT_CONNECT) != null) {
rpcConnectTimeout = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
} else {
if (conf.get(SOCKET_TIMEOUT) != null) {
rpcConnectTimeout = conf.getInt(SOCKET_TIMEOUT, DEFAULT_SOCKET_TIMEOUT);
} else {
rpcConnectTimeout = conf.getInt(SOCKET_TIMEOUT_CONNECT,
DEFAULT_SOCKET_TIMEOUT_CONNECT);
}
}
this.rpcConnectTimeout = rpcConnectTimeout;
this.scannerCaching = conf.getInt("hbase.client.scanner.caching", Integer.MAX_VALUE);
this.scannerMaxResultSize = conf.getLong("hbase.client.scanner.max.result.size",
WRITE_BUFFER_SIZE_DEFAULT);
Expand All @@ -72,6 +92,14 @@ public OHConnectionConfiguration(Configuration conf) {
}
}

public void setParamUrl(String paramUrl) {
this.paramUrl = paramUrl;
}

public void setDatabase(String database) {
this.database = database;
}

public long getWriteBufferSize() {
return this.writeBufferSize;
}
Expand All @@ -92,6 +120,10 @@ public int getRpcTimeout() {
return this.rpcTimeout;
}

public int getRpcConnectTimeout() {
return this.rpcConnectTimeout;
}

public long getScannerMaxResultSize() {
return this.scannerMaxResultSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public OHTableFactory(Configuration conf, OHTablePool tablePool,
public HTableInterface createHTableInterface(Configuration config, byte[] tableName) {
try {
String tableNameStr = Bytes.toString(tableName);
tableNameStr = tableNameStr.equals(this.tablePool.getOriginTableName()) ? tableNameStr
: this.tablePool.getOriginTableName();

OHTable ht = new OHTable(adjustConfiguration(copyConfiguration(config), tableNameStr),
tableName, this.threadPool);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.alipay.oceanbase.rpc.constant.Constants;
import com.google.common.base.Objects;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.ConnectionConfiguration;

import java.io.IOException;
import java.util.Map;
Expand Down Expand Up @@ -58,7 +60,11 @@ public static ObTableClient getOrCreateObTableClient(OHConnectionConfiguration c
checkArgument(isNotBlank(connectionConfig.getParamUrl()), HBASE_OCEANBASE_PARAM_URL
+ " is blank");
obTableClientKey = new ObTableClientKey();
obTableClientKey.setParamUrl(connectionConfig.getParamUrl());
String paramUrl = connectionConfig.getParamUrl();
if (!paramUrl.contains("database")) {
paramUrl += "&database=default";
}
obTableClientKey.setParamUrl(paramUrl);
obTableClientKey.setSysUserName(connectionConfig.getSysUsername());
if (connectionConfig.getSysPassword() == null) {
obTableClientKey.setSysPassword(Constants.EMPTY_STRING);
Expand All @@ -80,11 +86,11 @@ public static ObTableClient getOrCreateObTableClient(OHConnectionConfiguration c
obTableClientKey.getProperties().put(property.getKey(), property.getValue());
}

return getOrCreateObTableClient(obTableClientKey);
return getOrCreateObTableClient(obTableClientKey, connectionConfig.getRpcConnectTimeout());
}

public static ObTableClient getOrCreateObTableClient(ObTableClientKey obTableClientKey)
throws IOException {
public static ObTableClient getOrCreateObTableClient(ObTableClientKey obTableClientKey,
int rpcConnectTimeout) throws IOException {
if (OB_TABLE_CLIENT_INSTANCE.get(obTableClientKey) == null) {
ReentrantLock tmp = new ReentrantLock();
ReentrantLock lock = OB_TABLE_CLIENT_LOCK.putIfAbsent(obTableClientKey, tmp);
Expand All @@ -109,6 +115,7 @@ public static ObTableClient getOrCreateObTableClient(ObTableClientKey obTableCli
}
obTableClient.setFullUserName(obTableClientKey.getFullUserName());
obTableClient.setPassword(obTableClientKey.getPassword());
obTableClient.setRpcConnectTimeout(rpcConnectTimeout);
obTableClient.init();
OB_TABLE_CLIENT_INSTANCE.put(obTableClientKey, obTableClient);
}
Expand Down
15 changes: 15 additions & 0 deletions src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,21 @@ public void testFilter() throws Exception {
tryPut(hTable, putKey2Column2Value1);
tryPut(hTable, putKey2Column2Value2);

// time may be different
// +---------+-----+----------------+--------+
// | K | Q | T | V |
// +---------+-----+----------------+--------+
// | getKey1 | abc | -1728834971469 | value1 |
// | getKey1 | abc | -1728834971399 | value2 |
// | getKey1 | abc | -1728834971330 | value1 |
// | getKey1 | def | -1728834971748 | value2 |
// | getKey1 | def | -1728834971679 | value1 |
// | getKey1 | def | -1728834971609 | value2 |
// | getKey1 | def | -1728834971540 | value1 |
// | getKey2 | def | -1728834971887 | value2 |
// | getKey2 | def | -1728834971818 | value1 |
// +---------+-----+----------------+--------+

filter = new ColumnPrefixFilter(Bytes.toBytes("e"));
get = new Get(toBytes(key1));
get.setMaxVersions(10);
Expand Down
Loading