Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.*;

Expand Down Expand Up @@ -330,4 +331,259 @@ public void testMultiCFPut() throws Throwable {
public void testMultiCFPutBatch() throws Throwable {
FOR_EACH(group2tableNames, OHTableSecondaryPartPutTest::testMltiCFPutBatchImpl);
}

@Test
public void testPutOpt() throws Throwable {
FOR_EACH(tableNames, OHTableSecondaryPartPutTest::testPutOptImpl);
FOR_EACH(group2tableNames, OHTableSecondaryPartPutTest::testMultiCFPutOptImpl);
}

public static void testPutOptImpl(String tableName) throws Exception {
int NUM_QUALIFIERS = 10;
int NUM_PUTS = 10;
byte[] family = toBytes(getColumnFamilyName(tableName));
OHTableClient hTable = ObHTableTestUtil.newOHTableClient(getTableName(tableName));
hTable.init();
{ // 单put,10个Qualifier
Put put = new Put(toBytes("row1"));
List<String> qualifiers = new ArrayList<>();
for (int i = 1; i <= NUM_QUALIFIERS; i++) {
byte[] qualifier = toBytes("q" + i);
byte[] value = toBytes("value" + i);
put.addColumn(family, qualifier, value);
qualifiers.add(Bytes.toString(qualifier));
}
hTable.put(put);
// verify
Get get = new Get(Bytes.toBytes("row1"));
Result result = hTable.get(get);
Assert.assertEquals(NUM_QUALIFIERS, result.getFamilyMap(family).size());
qualifiers.forEach(q ->
Assert.assertNotNull("Qualifier " + q + " not found",
result.getValue(family, Bytes.toBytes(q)))
);
}

{ // 单put,10个Qualifier,指定相同timestamp
Put put = new Put(toBytes("row2"));
List<String> qualifiers = new ArrayList<>();
long timestamp = System.currentTimeMillis();
for (int i = 1; i <= NUM_QUALIFIERS; i++) {
byte[] qualifier = toBytes("q" + i);
byte[] value = toBytes("value" + i);
put.addColumn(family, qualifier, timestamp, value);
qualifiers.add(Bytes.toString(qualifier));
}
hTable.put(put);
// verify
Get get = new Get(Bytes.toBytes("row2"));
Result result = hTable.get(get);
Assert.assertEquals(NUM_QUALIFIERS, result.getFamilyMap(family).size());
qualifiers.forEach(q ->
Assert.assertNotNull("Qualifier " + q + " not found",
result.getValue(family, Bytes.toBytes(q)))
);
}
{ // batch put,相同key, 多个Qualifier
byte[] rowKey = Bytes.toBytes("batch_row");
List<Put> puts = new ArrayList<>();
List<String> qualifiers = new ArrayList<>();
for (int i = 1; i <= NUM_PUTS; i++) {
Put put = new Put(rowKey);
byte[] qualifier = Bytes.toBytes("batch_q" + i);
put.addColumn(family, qualifier, Bytes.toBytes("batch_val" + i));
puts.add(put);
qualifiers.add(Bytes.toString(qualifier));
}
hTable.put(puts);
// verify
Get get = new Get(rowKey);
Result result = hTable.get(get);
Assert.assertEquals(NUM_QUALIFIERS, result.getFamilyMap(family).size());
qualifiers.forEach(q ->
Assert.assertNotNull("Qualifier " + q + " not found",
result.getValue(family, Bytes.toBytes(q)))
);
}

{ // batch put,相同key,多个Qualifier,指定相同timestamp
byte[] rowKey = Bytes.toBytes("batch_row_ts");
List<Put> puts = new ArrayList<>();
List<String> qualifiers = new ArrayList<>();
for (int i = 1; i <= NUM_PUTS; i++) {
Put put = new Put(rowKey);
byte[] qualifier = Bytes.toBytes("batch_ts_q" + i);
put.addColumn(family, qualifier, Bytes.toBytes("batch_val" + i));
puts.add(put);
qualifiers.add(Bytes.toString(qualifier));
}
hTable.put(puts);
// verify
Get get = new Get(rowKey);
Result result = hTable.get(get);
Assert.assertEquals(NUM_QUALIFIERS, result.getFamilyMap(family).size());
qualifiers.forEach(q ->
Assert.assertNotNull("Qualifier " + q + " not found",
result.getValue(family, Bytes.toBytes(q)))
);
}
}

public static void testMultiCFPutOptImpl(Map.Entry<String, List<String>> entry) throws Exception {
int NUM_QUALIFIERS = 10;
OHTableClient hTable = ObHTableTestUtil.newOHTableClient(getTableName(entry.getKey()));
hTable.init();
{
Put put = new Put(toBytes("multi_cf_row"));
for (String tableName : entry.getValue()) {
byte[] family = toBytes(getColumnFamilyName(tableName));
// 单put,10个Qualifier
List<String> qualifiers = new ArrayList<>();
for (int i = 0; i < NUM_QUALIFIERS; i++) {
byte[] qualifier = toBytes("q" + i);
byte[] value = toBytes("value_" + i);
put.addColumn(family, qualifier, value);
qualifiers.add(Bytes.toString(qualifier));
}
}
hTable.put(put);
// verify
Get get = new Get(Bytes.toBytes("multi_cf_row"));
Result result = hTable.get(get);
for (String tableName : entry.getValue()) {
byte[] family = toBytes(getColumnFamilyName(tableName));
Assert.assertEquals(NUM_QUALIFIERS,
result.getFamilyMap(family).size());

for (int i = 0; i < NUM_QUALIFIERS; i++) {
byte[] val = result.getValue(family, Bytes.toBytes("q" + i));
Assert.assertEquals( "value_" + i,
Bytes.toString(val));
}
}
}

{ // 指定timestamp
long currentTimestamp = System.currentTimeMillis();
Put put = new Put(toBytes("multi_cf_ts_row"));
for (String tableName : entry.getValue()) {
byte[] family = toBytes(getColumnFamilyName(tableName));
// 单put,10个Qualifier
List<String> qualifiers = new ArrayList<>();
for (int i = 0; i < NUM_QUALIFIERS; i++) {
byte[] qualifier = toBytes("q" + i);
byte[] value = toBytes("value_" + i);
put.addColumn(family, qualifier, currentTimestamp, value);
qualifiers.add(Bytes.toString(qualifier));
}
}
hTable.put(put);
// verify
Get get = new Get(Bytes.toBytes("multi_cf_ts_row"));
Result result = hTable.get(get);
for (String tableName : entry.getValue()) {
byte[] family = toBytes(getColumnFamilyName(tableName));
Assert.assertEquals(NUM_QUALIFIERS,
result.getFamilyMap(family).size());

for (int i = 0; i < NUM_QUALIFIERS; i++) {
byte[] val = result.getValue(family, Bytes.toBytes("q" + i));
Assert.assertEquals( "value_" + i,
Bytes.toString(val));
}
}
}

{
byte[] ROW = Bytes.toBytes("batch_row");
List<Put> puts = new ArrayList<>();
for (String tableName : entry.getValue()) {
byte[] family = toBytes(getColumnFamilyName(tableName));
Put put = new Put(ROW);
for (int i = 0; i < NUM_QUALIFIERS; i++) {
put.addColumn(family,
Bytes.toBytes("q" + i),
Bytes.toBytes("value_" + i));
}
puts.add(put);
}
hTable.put(puts);
// verify
Get get = new Get(ROW);
Result result = hTable.get(get);
for (String tableName : entry.getValue()) {
byte[] family = toBytes(getColumnFamilyName(tableName));
Assert.assertEquals(NUM_QUALIFIERS,
result.getFamilyMap(family).size());

for (int i = 0; i < NUM_QUALIFIERS; i++) {
byte[] val = result.getValue(family, Bytes.toBytes("q" + i));
Assert.assertEquals( "value_" + i,
Bytes.toString(val));
}
}
}

{
long timestamp = System.currentTimeMillis();
byte[] ROW = Bytes.toBytes("batch_ts_row");
List<Put> puts = new ArrayList<>();
for (String tableName : entry.getValue()) {
byte[] family = toBytes(getColumnFamilyName(tableName));
for (int i = 0; i < NUM_QUALIFIERS; i++) {
Put put = new Put(ROW);
put.addColumn(family,
Bytes.toBytes("q" + i),
timestamp,
Bytes.toBytes("value_" + i));
puts.add(put);
}
}
hTable.put(puts);
// verify
Get get = new Get(ROW);
Result result = hTable.get(get);
for (String tableName : entry.getValue()) {
byte[] family = toBytes(getColumnFamilyName(tableName));
Assert.assertEquals(NUM_QUALIFIERS,
result.getFamilyMap(family).size());

for (int i = 0; i < NUM_QUALIFIERS; i++) {
byte[] val = result.getValue(family, Bytes.toBytes("q" + i));
Assert.assertEquals( "value_" + i,
Bytes.toString(val));
}
}
}

{
byte[] ROW = Bytes.toBytes("batch_row_2");
List<Put> puts = new ArrayList<>();
for (int i = 0; i < NUM_QUALIFIERS; i++) {
Put put = new Put(ROW);
for (String tableName : entry.getValue()) {
byte[] family = toBytes(getColumnFamilyName(tableName));
put.addColumn(family,
Bytes.toBytes("q" + i),
Bytes.toBytes("value_" + i));
}
puts.add(put);
}
hTable.put(puts);
// verify
Get get = new Get(ROW);
Result result = hTable.get(get);
for (String tableName : entry.getValue()) {
byte[] family = toBytes(getColumnFamilyName(tableName));
Assert.assertEquals(NUM_QUALIFIERS,
result.getFamilyMap(family).size());

for (int i = 0; i < NUM_QUALIFIERS; i++) {
byte[] val = result.getValue(family, Bytes.toBytes("q" + i));
Assert.assertEquals( "value_" + i,
Bytes.toString(val));
}
}
}
}
}