Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 53 additions & 78 deletions src/main/java/com/alipay/oceanbase/hbase/OHTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1966,24 +1966,23 @@ private ObTableQuery buildObTableQuery(final Get get, Collection<byte[]> columnQ
public static ObTableBatchOperation buildObTableBatchOperation(List<Mutation> rowList,
List<byte[]> qualifiers) {
ObTableBatchOperation batch = new ObTableBatchOperation();
OHOpType opType;
ObTableOperationType opType;
Map<String, Integer> indexMap = new HashMap<>();
for (Mutation row : rowList) {
if (row instanceof Put) {
opType = OHOpType.Put;
opType = INSERT_OR_UPDATE;
} else if (row instanceof Delete) {
opType = OHOpType.Delete;
opType = DEL;
} else if (row instanceof Increment) {
opType = OHOpType.Increment;
opType = INCREMENT;
} else if (row instanceof Append) {
opType = OHOpType.Append;
opType = APPEND;
} else {
throw new FeatureNotSupportedException("not supported other type");
}
Set<Map.Entry<byte[], List<KeyValue>>> familyCellMap = row.getFamilyMap().entrySet();

for (Map.Entry<byte[], List<KeyValue>> familyWithCells : familyCellMap) {
if (opType == OHOpType.Increment || opType == OHOpType.Append) {
if (opType == INCREMENT || opType == APPEND) {
indexMap.clear();
for (int i = 0; i < familyWithCells.getValue().size(); i++) {
Cell cell = familyWithCells.getValue().get(i);
Expand All @@ -1992,15 +1991,17 @@ public static ObTableBatchOperation buildObTableBatchOperation(List<Mutation> ro
}
for (Map.Entry<String, Integer> entry : indexMap.entrySet()) {
qualifiers.add(entry.getKey().getBytes());
batch.addTableOperation(buildObTableOperation(familyWithCells.getValue().get(entry.getValue()), opType, row.getTTL()));
batch.addTableOperation(buildObTableOperation(
familyWithCells.getValue().get(entry.getValue()), opType,
row.getTTL()));
}
} else {
for (KeyValue cell : familyWithCells.getValue()) {
batch.addTableOperation(buildObTableOperation(cell, opType, row.getTTL()));
batch.addTableOperation(
buildObTableOperation(cell, opType, row.getTTL()));
}
}
}

}
batch.setSamePropertiesNames(true);
return batch;
Expand All @@ -2010,10 +2011,8 @@ private com.alipay.oceanbase.rpc.mutation.Mutation buildMutation(KeyValue kv,
ObTableOperationType operationType,
boolean isTableGroup, Long TTL) {
KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getType());
switch (operationType) {
case INSERT_OR_UPDATE:
case APPEND:
case INCREMENT:
switch (kvType) {
case Put:
String[] property_columns = V_COLUMNS;
Object[] property = new Object[] { CellUtil.cloneValue(kv) };
if (TTL != Long.MAX_VALUE) {
Expand All @@ -2024,36 +2023,29 @@ private com.alipay.oceanbase.rpc.mutation.Mutation buildMutation(KeyValue kv,
ROW_KEY_COLUMNS,
new Object[] { kv.getRow(), kv.getQualifier(), kv.getTimestamp() },
property_columns, property);
case DEL:
switch (kvType) {
case Delete:
return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL,
ROW_KEY_COLUMNS,
new Object[] { kv.getRow(), kv.getQualifier(), kv.getTimestamp() },
null, null);
case Maximum:
return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL,
ROW_KEY_COLUMNS,
new Object[] { kv.getRow(), null, -kv.getTimestamp() }, null, null);
case DeleteColumn:
return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL,
ROW_KEY_COLUMNS,
new Object[] { kv.getRow(), kv.getQualifier(), -kv.getTimestamp() },
null, null);
case DeleteFamily:
return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL,
ROW_KEY_COLUMNS,
new Object[] { kv.getRow(), isTableGroup ? kv.getQualifier() : null,
-kv.getTimestamp() }, null, null);
case DeleteFamilyVersion:
return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(
DEL,
ROW_KEY_COLUMNS,
new Object[] { kv.getRow(), isTableGroup ? kv.getQualifier() : null,
kv.getTimestamp() }, null, null);
default:
throw new IllegalArgumentException("illegal mutation type " + kvType);
}
case Delete:
return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, ROW_KEY_COLUMNS,
new Object[] { kv.getRow(), kv.getQualifier(), kv.getTimestamp() }, null, null);
case Maximum:
return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(DEL, ROW_KEY_COLUMNS,
new Object[] { kv.getRow(), null, -kv.getTimestamp() }, null, null);
case DeleteColumn:
return com.alipay.oceanbase.rpc.mutation.Mutation
.getInstance(DEL, ROW_KEY_COLUMNS,
new Object[] { kv.getRow(), kv.getQualifier(), -kv.getTimestamp() }, null,
null);
case DeleteFamily:
return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(
DEL,
ROW_KEY_COLUMNS,
new Object[] { kv.getRow(), isTableGroup ? kv.getQualifier() : null,
-kv.getTimestamp() }, null, null);
case DeleteFamilyVersion:
return com.alipay.oceanbase.rpc.mutation.Mutation.getInstance(
DEL,
ROW_KEY_COLUMNS,
new Object[] { kv.getRow(), isTableGroup ? kv.getQualifier() : null,
kv.getTimestamp() }, null, null);
default:
throw new IllegalArgumentException("illegal mutation type " + operationType);
}
Expand Down Expand Up @@ -2178,48 +2170,35 @@ private BatchOperation buildBatchOperation(String tableName, List<? extends Row>
return batch;
}

public static ObTableOperation buildObTableOperation(KeyValue kv, OHOpType operationType,
public static ObTableOperation buildObTableOperation(KeyValue kv,
ObTableOperationType operationType,
Long TTL) {
KeyValue.Type kvType = KeyValue.Type.codeToType(kv.getType());
String[] property_columns = V_COLUMNS;
Object[] property = new Object[] { CellUtil.cloneValue(kv) };
if (TTL != Long.MAX_VALUE) {
property_columns = PROPERTY_COLUMNS;
property = new Object[] { CellUtil.cloneValue(kv), TTL };
}
switch (operationType) {
switch (kvType) {
case Put:
case Increment:
case Append:
ObTableOperationType type;
if (operationType == OHOpType.Put) {
type = INSERT_OR_UPDATE;
} else if (operationType == OHOpType.Increment) {
type = INCREMENT;
} else {
type = APPEND;
}
return getInstance(
type,
operationType,
new Object[] { CellUtil.cloneRow(kv), CellUtil.cloneQualifier(kv),
kv.getTimestamp() }, property_columns, property);
case Delete:
KeyValue.Type delType = KeyValue.Type.codeToType(kv.getTypeByte());
if (delType == KeyValue.Type.Delete) {
return getInstance(
DEL,
new Object[] { CellUtil.cloneRow(kv), CellUtil.cloneQualifier(kv),
kv.getTimestamp() }, null, null);
} else if (delType == KeyValue.Type.DeleteColumn) {
return getInstance(
DEL,
new Object[] { CellUtil.cloneRow(kv), CellUtil.cloneQualifier(kv),
-kv.getTimestamp() }, null, null);
} else if (delType == KeyValue.Type.DeleteFamily) {
return getInstance(DEL, new Object[] { kv.getRow(), null, -kv.getTimestamp() },
null, null);
} else {
throw new IllegalArgumentException("illegal delete type " + operationType);
}
return getInstance(
DEL,
new Object[] { CellUtil.cloneRow(kv), CellUtil.cloneQualifier(kv),
kv.getTimestamp() }, null, null);
case DeleteColumn:
return getInstance(
DEL,
new Object[] { CellUtil.cloneRow(kv), CellUtil.cloneQualifier(kv),
-kv.getTimestamp() }, null, null);
case DeleteFamily:
return getInstance(DEL, new Object[] { kv.getRow(), null, -kv.getTimestamp() },
null, null);
default:
throw new IllegalArgumentException("illegal mutation type " + operationType);
}
Expand Down Expand Up @@ -2331,8 +2310,4 @@ public byte[][] getEndKeys() throws IOException {
public Pair<byte[][], byte[][]> getStartEndKeys() throws IOException {
return new Pair<>(getStartKeys(), getEndKeys());
}

public enum OHOpType {
Put, Append, Delete, Increment
}
}
46 changes: 46 additions & 0 deletions src/test/java/com/alipay/oceanbase/hbase/HTableTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -5108,6 +5108,52 @@ public void testAppend() throws IOException {
}
}

@Test
public void testHbasePutDeleteCell() throws Exception {
final byte[] rowKey = Bytes.toBytes("12345");
final byte[] family = Bytes.toBytes("family1");

Put put = new Put(rowKey);
put.add(family, Bytes.toBytes("A"), Bytes.toBytes("a"));
put.add(family, Bytes.toBytes("B"), Bytes.toBytes("b"));
put.add(family, Bytes.toBytes("C"), Bytes.toBytes("c"));
put.add(family, Bytes.toBytes("D"), Bytes.toBytes("d"));
hTable.put(put);
// get row back and assert the values
Get get = new Get(rowKey);
get.addFamily(family);
Result result = hTable.get(get);
assertTrue("Column A value should be a",
Bytes.toString(result.getValue(family, Bytes.toBytes("A"))).equals("a"));
assertTrue("Column B value should be b",
Bytes.toString(result.getValue(family, Bytes.toBytes("B"))).equals("b"));
assertTrue("Column C value should be c",
Bytes.toString(result.getValue(family, Bytes.toBytes("C"))).equals("c"));
assertTrue("Column D value should be d",
Bytes.toString(result.getValue(family, Bytes.toBytes("D"))).equals("d"));
// put the same row again with C column deleted
put = new Put(rowKey);
put.add(family, Bytes.toBytes("A"), Bytes.toBytes("a1"));
put.add(family, Bytes.toBytes("B"), Bytes.toBytes("b1"));
KeyValue marker = new KeyValue(rowKey, family, Bytes.toBytes("C"),
HConstants.LATEST_TIMESTAMP, KeyValue.Type.DeleteColumn);
put.add(family, Bytes.toBytes("D"), Bytes.toBytes("d1"));
put.add(marker);
hTable.put(put);
// get row back and assert the values
get = new Get(rowKey);
get.addFamily(family);
result = hTable.get(get);
assertTrue("Column A value should be a1",
Bytes.toString(result.getValue(family, Bytes.toBytes("A"))).equals("a1"));
assertTrue("Column B value should be b1",
Bytes.toString(result.getValue(family, Bytes.toBytes("B"))).equals("b1"));
System.out.println(result.getValue(family, Bytes.toBytes("C")));
assertTrue("Column C should not exist", result.getValue(family, Bytes.toBytes("C")) == null);
assertTrue("Column D value should be d1",
Bytes.toString(result.getValue(family, Bytes.toBytes("D"))).equals("d1"));
}

@Test
public void testCellTTL() throws Exception {
String key1 = "key1";
Expand Down
44 changes: 22 additions & 22 deletions src/test/java/com/alipay/oceanbase/hbase/util/ObHTableTestUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class ObHTableTestUtil {

public static void prepareClean(List<String> tableGroupList) throws Exception {
for (String tableGroup : tableGroupList) {
tableNameList.addAll(getOTableNameList(tableGroup));
tableNameList.addAll(getOHTableNameList(tableGroup));
}
}

Expand Down Expand Up @@ -120,30 +120,30 @@ public static OHTableClient newOHTableClient(String tableName) {
return new OHTableClient(tableName, newConfiguration());
}

static public List<String> getOTableNameList(String tableGroup) throws IOException {
// 读取建表语句
List<String> res = new LinkedList<>();
String sql = new String(Files.readAllBytes(Paths.get(NativeHBaseUtil.SQL_PATH)));
String[] sqlList = sql.split(";");
Map<String, HTableDescriptor> tableMap = new LinkedHashMap<>();
for (String singleSql : sqlList) {
String realTableName;
if (singleSql.contains("CREATE TABLE ")) {
singleSql.trim();
String[] splits = singleSql.split(" ");
String tableGroupName = splits[2].substring(1, splits[2].length() - 1);
if (tableGroupName.contains(":")) {
String[] tmpStr = tableGroupName.split(":", 2);
tableGroupName = tmpStr[1];
}
realTableName = tableGroupName.split("\\$", 2)[0];
if (realTableName.equals(tableGroup)) {
res.add(tableGroupName);
}
static public List<String> getOHTableNameList(String tableGroup) throws IOException {
// 读取建表语句
List<String> res = new LinkedList<>();
String sql = new String(Files.readAllBytes(Paths.get(NativeHBaseUtil.SQL_PATH)));
String[] sqlList = sql.split(";");
Map<String, HTableDescriptor> tableMap = new LinkedHashMap<>();
for (String singleSql : sqlList) {
String realTableName;
if (singleSql.contains("CREATE TABLE ")) {
singleSql.trim();
String[] splits = singleSql.split(" ");
String tableGroupName = splits[2].substring(1, splits[2].length() - 1);
if (tableGroupName.contains(":")) {
String[] tmpStr = tableGroupName.split(":", 2);
tableGroupName = tmpStr[1];
}
realTableName = tableGroupName.split("\\$", 2)[0];
if (realTableName.equals(tableGroup)) {
res.add(tableGroupName);
}
}
return res;
}
return res;
}

static public Connection getConnection() {
try {
Expand Down