Cleanup and upgrade to work with Accumulo 1.6.0.

* Increment Accumulo Version to 1.6.0
* Fix accumulo dependency versions
  - Harmonize guava version for Accumulo.
  - Automatically manage slf4j version.
* Clean up depr. code
* Clean up return codes and error logging
This commit is contained in:
Mike Drob 2014-02-05 12:13:46 -05:00 коммит произвёл Sean Busbey
Родитель a3cf980f93
Коммит 6e5177fd8b
3 изменённых файлов: 42 добавлений и 51 удалений

Просмотреть файл

@ -42,6 +42,7 @@
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
@ -56,7 +57,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>r08</version>
<version>15.0</version>
</dependency>
<dependency>
<groupId>com.yahoo.ycsb</groupId>
@ -93,13 +94,4 @@
<url>http://repository.apache.org/snapshots</url>
</repository>
</repositories>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.5.8</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>

Просмотреть файл

@ -1,38 +1,40 @@
package com.yahoo.ycsb.db;
import com.yahoo.ycsb.ByteIterator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.Vector;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.CleanUp;
import org.apache.hadoop.io.Text;
import org.apache.zookeeper.KeeperException;
import com.yahoo.ycsb.ByteArrayByteIterator;
import com.yahoo.ycsb.ByteIterator;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;
import com.yahoo.ycsb.ByteArrayByteIterator;
public class AccumuloClient extends DB {
// Error code constants.
@ -59,20 +61,19 @@ public class AccumuloClient extends DB {
@Override
public void init() {
public void init() throws DBException {
_colFam = new Text(getProperties().getProperty("accumulo.columnFamily"));
_inst = new ZooKeeperInstance(getProperties().getProperty("accumulo.instanceName"),
getProperties().getProperty("accumulo.zooKeepers"));
try {
_connector = _inst.getConnector(getProperties().getProperty("accumulo.username"),
getProperties().getProperty("accumulo.password"));
String principal = getProperties().getProperty("accumulo.username");
AuthenticationToken token = new PasswordToken(getProperties().getProperty("accumulo.password"));
_connector = _inst.getConnector(principal, token);
} catch (AccumuloException e) {
// TODO Auto-generated catch block
e.printStackTrace();
throw new DBException(e);
} catch (AccumuloSecurityException e) {
// TODO Auto-generated catch block
e.printStackTrace();
throw new DBException(e);
}
_PC_FLAG = getProperties().getProperty("accumulo.PC_FLAG","none");
@ -135,13 +136,16 @@ public class AccumuloClient extends DB {
}
}
long bwSize = Long.parseLong(getProperties().getProperty("accumulo.batchWriterSize", "100000"));
long bwMaxLatency = Long.parseLong(getProperties().getProperty("accumulo.batchWriterMaxLatency", "30000"));
int bwThreads = Integer.parseInt(getProperties().getProperty("accumulo.batchWriterThreads", "1"));
_bw = _connector.createBatchWriter(table, bwSize, bwMaxLatency, bwThreads);
BatchWriterConfig bwc = new BatchWriterConfig();
bwc.setMaxLatency(Long.parseLong(getProperties().getProperty("accumulo.batchWriterMaxLatency", "30000")), TimeUnit.MILLISECONDS);
bwc.setMaxMemory(Long.parseLong(getProperties().getProperty("accumulo.batchWriterSize", "100000")));
bwc.setMaxWriteThreads(Integer.parseInt(getProperties().getProperty("accumulo.batchWriterThreads", "1")));
_bw = _connector.createBatchWriter(table, bwc);
// Create our scanners
_singleScanner = _connector.createScanner(table, Constants.NO_AUTHS);
_scanScanner = _connector.createScanner(table, Constants.NO_AUTHS);
_singleScanner = _connector.createScanner(table, Authorizations.EMPTY);
_scanScanner = _connector.createScanner(table, Authorizations.EMPTY);
_table = table; // Store the name of the table we have open.
}
@ -149,8 +153,9 @@ public class AccumuloClient extends DB {
/**
* Gets a scanner from Accumulo over one row
*
* @param row
* @return
* @param row the row to scan
* @param fields the set of columns to scan
* @return an Accumulo {@link Scanner} bound to the given row and columns
*/
private Scanner getRow(Text row, Set<String> fields)
{
@ -188,7 +193,7 @@ public class AccumuloClient extends DB {
System.err.println("Error trying to reading Accumulo table" + key + e);
return ServerError;
}
return 0;
return Ok;
}
@ -252,7 +257,7 @@ public class AccumuloClient extends DB {
currentHM.put(entry.getKey().getColumnQualifier().toString(), new ByteArrayByteIterator(buf));
}
return 0;
return Ok;
}
@ -281,12 +286,13 @@ public class AccumuloClient extends DB {
keyNotification(key);
}
} catch (MutationsRejectedException e) {
// TODO Auto-generated catch block
System.err.println("Error performing update.");
e.printStackTrace();
return ServerError;
}
return 0;
return Ok;
}
@Override
@ -303,26 +309,19 @@ public class AccumuloClient extends DB {
return ServerError;
}
try {
deleteRow(new Text(key));
} catch (SecurityException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (TableNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (Exception e) {
// TODO Auto-generated catch block
} catch (RuntimeException e) {
System.err.println("Error performing delete.");
e.printStackTrace();
return ServerError;
}
return 0;
return Ok;
}
// These functions are adapted from RowOperations.java:
private void deleteRow(Text row) throws TableNotFoundException {
private void deleteRow(Text row) {
deleteRow(getRow(row, null));
}
@ -430,7 +429,7 @@ public class AccumuloClient extends DB {
// TODO Auto-generated catch block
e.printStackTrace();
}
return 0;
return Ok;
}
}

Просмотреть файл

@ -45,7 +45,7 @@
<properties>
<maven.assembly.version>2.2.1</maven.assembly.version>
<hbase.version>0.92.1</hbase.version>
<accumulo.version>1.4.5</accumulo.version>
<accumulo.version>1.6.0</accumulo.version>
<cassandra.version>0.7.0</cassandra.version>
<infinispan.version>7.1.0.CR1</infinispan.version>
<openjpa.jdbc.version>2.1.1</openjpa.jdbc.version>