SQOOP-1545: Sqoop2: From/To: Add supported directions to Repository

(Abraham Elmahrek via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2014-10-09 08:36:35 -07:00 коммит произвёл Abraham Elmahrek
Родитель b600036573
Коммит fc74316fbd
11 изменённых файлов: 808 добавлений и 142 удалений

Просмотреть файл

@ -20,7 +20,9 @@ package org.apache.sqoop.common;
/**
* Represents which Directions are supported.
*/
public class SupportedDirections {
public class SupportedDirections implements Comparable<SupportedDirections> {
private static final char SUPPORTED_DIRECTIONS_SEPARATOR = '/';
private boolean from;
private boolean to;
@ -38,4 +40,79 @@ public class SupportedDirections {
return direction == Direction.FROM && from
|| direction == Direction.TO && to;
}
/**
* @return String "FROM", "TO", "FROM/TO", "".
*/
public String toString() {
StringBuffer buffer = new StringBuffer();
if (isDirectionSupported(Direction.FROM)) {
buffer.append(Direction.FROM);
if (isDirectionSupported(Direction.TO)) {
buffer.append(SUPPORTED_DIRECTIONS_SEPARATOR);
buffer.append(Direction.TO);
}
} else if (isDirectionSupported(Direction.TO)) {
buffer.append(Direction.TO);
}
return buffer.toString();
}
public static SupportedDirections fromString(String supportedDirections) {
boolean from = false, to = false;
if (supportedDirections != null && !supportedDirections.equals("")) {
for (String direction : supportedDirections.split("/")) {
switch (Direction.valueOf(direction)) {
case FROM:
from = true;
break;
case TO:
to = true;
break;
}
}
}
return new SupportedDirections(from, to);
}
public static SupportedDirections fromDirection(Direction direction) {
boolean from = false, to = false;
switch (direction) {
case FROM:
from = true;
break;
case TO:
to = true;
break;
}
return new SupportedDirections(from, to);
}
@Override
public int compareTo(SupportedDirections o) {
int hash = 0;
if (this.isDirectionSupported(Direction.FROM)) {
hash |= 1;
}
if (this.isDirectionSupported(Direction.TO)) {
hash |= 2;
}
int oHash = 0;
if (this.isDirectionSupported(Direction.FROM)) {
oHash |= 1;
}
if (this.isDirectionSupported(Direction.TO)) {
oHash |= 2;
}
return hash - oHash;
}
}

Просмотреть файл

@ -52,4 +52,23 @@ public class TestSupportedDirections {
Assert.assertFalse(
supportedDirections.isDirectionSupported(Direction.TO));
}
@Test
public void testToString() {
// Both
SupportedDirections supportedDirections = new SupportedDirections(true, true);
Assert.assertEquals("FROM/TO", supportedDirections.toString());
// FROM
supportedDirections = new SupportedDirections(true, false);
Assert.assertEquals("FROM", supportedDirections.toString());
// TO
supportedDirections = new SupportedDirections(false, true);
Assert.assertEquals("TO", supportedDirections.toString());
// NONE
supportedDirections = new SupportedDirections(false, false);
Assert.assertEquals("", supportedDirections.toString());
}
}

Просмотреть файл

@ -181,6 +181,14 @@ public enum DerbyRepoError implements ErrorCode {
DERBYREPO_0044("Update of driver config failed"),
DERBYREPO_0045("Can't retrieve all connectors"),
DERBYREPO_0046("Could not add directions"),
DERBYREPO_0047("Could not get ID of recently added direction"),
DERBYREPO_0048("Could not register config direction"),
DERBYREPO_0049("Could not set connector direction")
;
private final String message;

Просмотреть файл

@ -33,13 +33,16 @@ import java.util.Date;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.DirectionError;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.common.SupportedDirections;
import org.apache.sqoop.connector.ConnectorHandler;
import org.apache.sqoop.connector.ConnectorManagerUtils;
import org.apache.sqoop.model.MBooleanInput;
@ -88,6 +91,8 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
*/
private static final String CONNECTOR_HDFS = "hdfs-connector";
private static final String LINK_HDFS = "hdfs-link";
private JdbcRepositoryContext repoContext;
/**
@ -121,7 +126,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
// Register the job config type, since driver config is per job
registerConfigs(null, null, mDriver.getDriverConfig().getConfigs(),
MConfigType.JOB.name(), baseConfigStmt, baseInputStmt);
MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn);
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0014, mDriver.toString(), ex);
@ -150,14 +155,17 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
// Register link type config
registerConfigs(connectorId, null, mc.getLinkConfig().getConfigs(),
MConfigType.LINK.name(), baseConfigStmt, baseInputStmt);
MConfigType.LINK.name(), baseConfigStmt, baseInputStmt, conn);
// Register both from/to job type config
registerConfigs(connectorId, Direction.FROM, mc.getConfig(Direction.FROM).getConfigs(),
MConfigType.JOB.name(), baseConfigStmt, baseInputStmt);
registerConfigs(connectorId, Direction.TO, mc.getConfig(Direction.TO).getConfigs(),
MConfigType.JOB.name(), baseConfigStmt, baseInputStmt);
if (mc.getSupportedDirections().isDirectionSupported(Direction.FROM)) {
registerConfigs(connectorId, Direction.FROM, mc.getConfig(Direction.FROM).getConfigs(),
MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn);
}
if (mc.getSupportedDirections().isDirectionSupported(Direction.TO)) {
registerConfigs(connectorId, Direction.TO, mc.getConfig(Direction.TO).getConfigs(),
MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn);
}
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0014,
mc.toString(), ex);
@ -167,6 +175,34 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
}
private void insertConnectorDirection(Long connectorId, Direction direction, Connection conn)
throws SQLException {
PreparedStatement stmt = null;
try {
stmt = conn.prepareStatement(STMT_INSERT_SQ_CONNECTOR_DIRECTIONS);
stmt.setLong(1, connectorId);
stmt.setLong(2, getDirection(direction, conn));
if (stmt.executeUpdate() != 1) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0049);
}
} finally {
closeStatements(stmt);
}
}
private void insertConnectorDirections(Long connectorId, SupportedDirections directions, Connection conn)
throws SQLException {
if (directions.isDirectionSupported(Direction.FROM)) {
insertConnectorDirection(connectorId, Direction.FROM, conn);
}
if (directions.isDirectionSupported(Direction.TO)) {
insertConnectorDirection(connectorId, Direction.TO, conn);
}
}
private long getConnectorId(MConnector mc, Connection conn) {
PreparedStatement baseConnectorStmt = null;
try {
@ -187,6 +223,10 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
if (!rsetConnectorId.next()) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
}
insertConnectorDirections(rsetConnectorId.getLong(1),
mc.getSupportedDirections(), conn);
return rsetConnectorId.getLong(1);
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0014,
@ -399,6 +439,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
}
if(version <= 3) {
// Schema modifications
runQuery(QUERY_CREATE_TABLE_SQ_DIRECTION, conn);
runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_RENAME_COLUMN_SQ_CFG_OPERATION_TO_SQ_CFG_DIRECTION, conn);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_RENAME_COLUMN_SQB_LINK_TO_SQB_FROM_LINK, conn);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_SQB_TO_LINK, conn);
@ -412,6 +453,9 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
updteJobInternals(conn, registerHdfsConnector(conn));
}
// Change direction from VARCHAR to BIGINT + foreign key.
updateDirections(conn, insertDirections(conn));
// Wait to remove SQB_TYPE (IMPORT/EXPORT) until we update data.
// Data updates depend on knowledge of the type of job.
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE, conn);
@ -442,6 +486,110 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
}
}
/**
* Insert directions: FROM and TO.
* @param conn
* @return Map<Direction, Long> direction ID => Direction
*/
protected Map<Direction, Long> insertDirections(Connection conn) {
// Add directions
Map<Direction, Long> directionMap = new TreeMap<Direction, Long>();
PreparedStatement insertDirectionStmt = null;
try {
// Insert directions and get IDs.
for (Direction direction : Direction.values()) {
insertDirectionStmt = conn.prepareStatement(STMT_INSERT_DIRECTION, Statement.RETURN_GENERATED_KEYS);
insertDirectionStmt.setString(1, direction.toString());
if (insertDirectionStmt.executeUpdate() != 1) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0046, "Could not add directions FROM and TO.");
}
ResultSet directionId = insertDirectionStmt.getGeneratedKeys();
if (directionId.next()) {
if (LOG.isInfoEnabled()) {
LOG.info("Loaded direction: " + directionId.getLong(1));
}
directionMap.put(direction, directionId.getLong(1));
} else {
throw new SqoopException(DerbyRepoError.DERBYREPO_0047, "Could not get ID of direction " + direction);
}
}
} catch (SQLException e) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0000, e);
} finally {
closeStatements(insertDirectionStmt);
}
return directionMap;
}
/**
* Add normalized M2M for SQ_CONNECTOR and SQ_CONFIG for Direction.
* 1. Remember all ID => direction for configs.
* 2. Drop SQF_DIRECTION (varhchar).
* 3. Add new M2M tables for SQ_CONNECTOR and SQ_CONFIG.
* 4. Add directions via updating SQ_CONFIG with proper Direction IDs.
* 5. Make sure all connectors have all supported directions.
* @param conn
*/
protected void updateDirections(Connection conn, Map<Direction, Long> directionMap) {
// Remember directions
Statement fetchFormsStmt = null,
fetchConnectorsStmt = null;
List<Long> connectorIds = new LinkedList<Long>();
List<Long> configIds = new LinkedList<Long>();
List<String> directions = new LinkedList<String>();
try {
fetchFormsStmt = conn.createStatement();
ResultSet rs = fetchFormsStmt.executeQuery(STMT_FETCH_CONFIG_DIRECTIONS);
while (rs.next()) {
configIds.add(rs.getLong(1));
directions.add(rs.getString(2));
}
rs.close();
} catch (SQLException e) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0000, e);
} finally {
closeStatements(fetchFormsStmt);
}
// Change Schema
runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_DROP_COLUMN_SQ_CFG_DIRECTION_VARCHAR, conn);
runQuery(QUERY_CREATE_TABLE_SQ_CONNECTOR_DIRECTIONS, conn);
runQuery(QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS, conn);
// Add directions back
while (!configIds.isEmpty() && !directions.isEmpty()) {
Long configId = configIds.remove(0);
String directionString = directions.remove(0);
if (directionString != null && !directionString.isEmpty()) {
Direction direction = Direction.valueOf(directionString);
runQuery(STMT_INSERT_SQ_CONFIG_DIRECTIONS, conn, configId, directionMap.get(direction));
}
}
// Add connector directions
try {
fetchConnectorsStmt = conn.createStatement();
ResultSet rs = fetchConnectorsStmt.executeQuery(STMT_SELECT_CONNECTOR_ALL);
while (rs.next()) {
connectorIds.add(rs.getLong(1));
}
rs.close();
} catch (SQLException e) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0000, e);
} finally {
closeStatements(fetchConnectorsStmt);
}
for (Long connectorId : connectorIds) {
for (Long directionId : directionMap.values()) {
runQuery(STMT_INSERT_SQ_CONNECTOR_DIRECTIONS, conn, connectorId, directionId);
}
}
}
/**
* Upgrade job data from IMPORT/EXPORT to FROM/TO.
* Since the framework is no longer responsible for HDFS,
@ -509,13 +657,13 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_DRIVER_INDEX, conn,
new Long(0), "throttling");
MLink hdfsLink = createHdfsLink(conn);
Long linkId = createHdfsLink(conn, connectorId);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_LINK_COPY_SQB_FROM_LINK, conn,
"EXPORT");
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_FROM_LINK, conn,
new Long(hdfsLink.getPersistenceId()), "EXPORT");
new Long(linkId), "EXPORT");
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_LINK, conn,
new Long(hdfsLink.getPersistenceId()), "IMPORT");
new Long(linkId), "IMPORT");
runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_UPDATE_SQ_CFG_NAME, conn,
"fromJobConfig", "table", Direction.FROM.toString());
@ -556,7 +704,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
if (handler.getUniqueName().equals(CONNECTOR_HDFS)) {
try {
PreparedStatement baseConnectorStmt = conn.prepareStatement(
STMT_INSERT_CONNECTOR_BASE,
STMT_INSERT_CONNECTOR_WITHOUT_SUPPORTED_DIRECTIONS,
Statement.RETURN_GENERATED_KEYS);
baseConnectorStmt.setString(1, handler.getMetadata().getUniqueName());
baseConnectorStmt.setString(2, handler.getMetadata().getClassName());
@ -588,22 +736,46 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
*
* NOTE: Upgrade path only!
*/
private MLink createHdfsLink(Connection conn) {
private Long createHdfsLink(Connection conn, Long connectorId) {
if (LOG.isTraceEnabled()) {
LOG.trace("Creating HDFS link.");
}
MConnector hdfsConnector = this.findConnector(CONNECTOR_HDFS, conn);
MLink hdfsLink = new MLink(
hdfsConnector.getPersistenceId(),
hdfsConnector.getLinkConfig());
this.createLink(hdfsLink, conn);
PreparedStatement stmt = null;
int result;
try {
stmt = conn.prepareStatement(STMT_INSERT_LINK,
Statement.RETURN_GENERATED_KEYS);
stmt.setString(1, LINK_HDFS);
stmt.setLong(2, connectorId);
stmt.setBoolean(3, true);
stmt.setNull(4, Types.VARCHAR);
stmt.setTimestamp(5, new Timestamp(System.currentTimeMillis()));
stmt.setNull(6, Types.VARCHAR);
stmt.setTimestamp(7, new Timestamp(System.currentTimeMillis()));
if (LOG.isTraceEnabled()) {
LOG.trace("Created HDFS link.");
result = stmt.executeUpdate();
if (result != 1) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0012,
Integer.toString(result));
}
ResultSet rsetConnectionId = stmt.getGeneratedKeys();
if (!rsetConnectionId.next()) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
}
if (LOG.isTraceEnabled()) {
LOG.trace("Created HDFS link.");
}
return rsetConnectionId.getLong(1);
} catch (SQLException ex) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0019, ex);
} finally {
closeStatements(stmt);
}
return hdfsLink;
}
/**
@ -695,7 +867,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
// Register a driver config as a job type with no owner/connector and direction
registerConfigs(null/* owner*/, null /*direction*/, mDriver.getDriverConfig().getConfigs(),
MConfigType.JOB.name(), baseConfigStmt, baseInputStmt);
MConfigType.JOB.name(), baseConfigStmt, baseInputStmt, conn);
// We're using hardcoded value for driver config as they are
// represented as NULL in the database.
@ -1744,13 +1916,13 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
Counters counters = new Counters();
while(rs.next()) {
while (rs.next()) {
String groupName = rs.getString(1);
String counterName = rs.getString(2);
long value = rs.getLong(3);
CounterGroup group = counters.getCounterGroup(groupName);
if(group == null) {
if (group == null) {
group = new CounterGroup(groupName);
counters.addCounterGroup(group);
}
@ -1758,7 +1930,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
group.addCounter(new Counter(counterName, value));
}
if(counters.isEmpty()) {
if (counters.isEmpty()) {
return null;
} else {
return counters;
@ -1769,7 +1941,83 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
}
}
private List<MConnector> loadConnectors(PreparedStatement stmt,Connection conn) throws SQLException {
private Long getDirection(Direction direction, Connection conn) throws SQLException {
PreparedStatement directionStmt = null;
ResultSet rs = null;
try {
directionStmt = conn.prepareStatement(STMT_SELECT_SQD_ID_BY_SQD_NAME);
directionStmt.setString(1, direction.toString());
rs = directionStmt.executeQuery();
rs.next();
return rs.getLong(1);
} finally {
if (rs != null) {
closeResultSets(rs);
}
if (directionStmt != null) {
closeStatements(directionStmt);
}
}
}
private Direction getDirection(long directionId, Connection conn) throws SQLException {
PreparedStatement directionStmt = null;
ResultSet rs = null;
try {
directionStmt = conn.prepareStatement(STMT_SELECT_SQD_NAME_BY_SQD_ID);
directionStmt.setLong(1, directionId);
rs = directionStmt.executeQuery();
rs.next();
return Direction.valueOf(rs.getString(1));
} finally {
if (rs != null) {
closeResultSets(rs);
}
if (directionStmt != null) {
closeStatements(directionStmt);
}
}
}
private SupportedDirections findConnectorSupportedDirections(long connectorId, Connection conn) throws SQLException {
PreparedStatement connectorDirectionsStmt = null;
ResultSet rs = null;
boolean from = false, to = false;
try {
connectorDirectionsStmt = conn.prepareStatement(STMT_SELECT_SQ_CONNECTOR_DIRECTIONS);
connectorDirectionsStmt.setLong(1, connectorId);
rs = connectorDirectionsStmt.executeQuery();
while(rs.next()) {
switch(getDirection(rs.getLong(2), conn)) {
case FROM:
from = true;
break;
case TO:
to = true;
break;
}
}
} finally {
if (rs != null) {
closeResultSets(rs);
}
if (connectorDirectionsStmt != null) {
closeStatements(connectorDirectionsStmt);
}
}
return new SupportedDirections(from, to);
}
private List<MConnector> loadConnectors(PreparedStatement stmt, Connection conn) throws SQLException {
List<MConnector> connectors = new ArrayList<MConnector>();
ResultSet rsConnectors = null;
PreparedStatement connectorConfigFetchStmt = null;
@ -1792,13 +2040,21 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
List<MConfig> fromConfig = new ArrayList<MConfig>();
List<MConfig> toConfig = new ArrayList<MConfig>();
loadConfigTypes(linkConfig, fromConfig, toConfig,
connectorConfigFetchStmt, connectorConfigInputFetchStmt, 1);
loadConfigTypes(linkConfig, fromConfig, toConfig, connectorConfigFetchStmt,
connectorConfigInputFetchStmt, 1, conn);
SupportedDirections supportedDirections
= findConnectorSupportedDirections(connectorId, conn);
MFromConfig fromJobConfig = null;
MToConfig toJobConfig = null;
if (supportedDirections.isDirectionSupported(Direction.FROM)) {
fromJobConfig = new MFromConfig(fromConfig);
}
if (supportedDirections.isDirectionSupported(Direction.TO)) {
toJobConfig = new MToConfig(toConfig);
}
MConnector mc = new MConnector(connectorName, connectorClassName, connectorVersion,
new MLinkConfig(linkConfig),
new MFromConfig(fromConfig),
new MToConfig(toConfig));
new MLinkConfig(linkConfig), fromJobConfig, toJobConfig);
mc.setPersistenceId(connectorId);
connectors.add(mc);
@ -1845,7 +2101,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
List<MConfig> toConfig = new ArrayList<MConfig>();
loadConfigTypes(connectorLinkConfig, fromConfig, toConfig, connectorConfigFetchStatement,
connectorConfigInputStatement, 2);
connectorConfigInputStatement, 2, conn);
MLink link = new MLink(connectorId, new MLinkConfig(connectorLinkConfig));
link.setPersistenceId(id);
@ -1911,7 +2167,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
List<MConfig> fromConnectorToJobConfig = new ArrayList<MConfig>();
loadConfigTypes(fromConnectorLinkConfig, fromConnectorFromJobConfig, fromConnectorToJobConfig,
fromConfigFetchStmt, jobInputFetchStmt, 2);
fromConfigFetchStmt, jobInputFetchStmt, 2, conn);
// TO entity configs
List<MConfig> toConnectorLinkConfig = new ArrayList<MConfig>();
@ -1922,7 +2178,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
List<MConfig> driverConfig = new ArrayList<MConfig>();
loadConfigTypes(toConnectorLinkConfig, toConnectorFromJobConfig, toConnectorToJobConfig,
toConfigFetchStmt, jobInputFetchStmt, 2);
toConfigFetchStmt, jobInputFetchStmt, 2, conn);
loadDriverConfigs(driverConfig, driverConfigfetchStmt, jobInputFetchStmt, 2);
@ -1951,6 +2207,21 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
return jobs;
}
private void registerConfigDirection(Long configId, Direction direction, Connection conn)
throws SQLException {
PreparedStatement stmt = null;
try {
stmt = conn.prepareStatement(STMT_INSERT_SQ_CONFIG_DIRECTIONS);
stmt.setLong(1, configId);
stmt.setLong(2, getDirection(direction, conn));
if (stmt.executeUpdate() != 1) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0048);
}
} finally {
closeStatements(stmt);
}
}
/**
* Register configs in derby database. This method will insert the ids
* generated by the repository into the configs passed in itself.
@ -1962,12 +2233,13 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
* @param type
* @param baseConfigStmt
* @param baseInputStmt
* @param conn
* @return short number of configs registered.
* @throws SQLException
*/
private short registerConfigs(Long connectorId, Direction direction,
List<MConfig> configs, String type, PreparedStatement baseConfigStmt,
PreparedStatement baseInputStmt)
PreparedStatement baseInputStmt, Connection conn)
throws SQLException {
short configIndex = 0;
@ -1977,14 +2249,10 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
} else {
baseConfigStmt.setLong(1, connectorId);
}
if(direction == null) {
baseConfigStmt.setNull(2, Types.VARCHAR);
} else {
baseConfigStmt.setString(2, direction.name());
}
baseConfigStmt.setString(3, config.getName());
baseConfigStmt.setString(4, type);
baseConfigStmt.setShort(5, configIndex++);
baseConfigStmt.setString(2, config.getName());
baseConfigStmt.setString(3, type);
baseConfigStmt.setShort(4, configIndex++);
int baseConfigCount = baseConfigStmt.executeUpdate();
if (baseConfigCount != 1) {
@ -1999,6 +2267,10 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
long configId = rsetConfigId.getLong(1);
config.setPersistenceId(configId);
if (direction != null) {
registerConfigDirection(configId, direction, conn);
}
// Insert all the inputs
List<MInput<?>> inputs = config.getInputs();
registerConfigInputs(configId, inputs, baseInputStmt);
@ -2071,7 +2343,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
} else if (args[i] instanceof Long) {
stmt.setLong(i + 1, (Long) args[i]);
} else {
stmt.setObject(i, args[i]);
stmt.setObject(i + 1, args[i]);
}
}
@ -2115,9 +2387,9 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
while (rsetConfig.next()) {
long configId = rsetConfig.getLong(1);
Long fromConnectorId = rsetConfig.getLong(2);
String configName = rsetConfig.getString(4);
String configTYpe = rsetConfig.getString(5);
int configIndex = rsetConfig.getInt(6);
String configName = rsetConfig.getString(3);
String configTYpe = rsetConfig.getString(4);
int configIndex = rsetConfig.getInt(5);
List<MInput<?>> configInputs = new ArrayList<MInput<?>>();
MConfig mDriverConfig = new MConfig(configName, configInputs);
@ -2211,6 +2483,26 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
}
}
private Direction findConfigDirection(long configId, Connection conn) throws SQLException {
PreparedStatement stmt = null;
ResultSet rs = null;
try {
stmt = conn.prepareStatement(STMT_SELECT_SQ_CONFIG_DIRECTIONS);
stmt.setLong(1, configId);
rs = stmt.executeQuery();
rs.next();
return getDirection(rs.getLong(2), conn);
} finally {
if (rs != null) {
closeResultSets(rs);
}
if (stmt != null) {
closeStatements(stmt);
}
}
}
/**
* Load configs and corresponding inputs from Derby database.
*
@ -2222,21 +2514,21 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
* @param toConfig TO job configs that will be filled up
* @param configFetchStmt Prepared statement for fetching configs
* @param inputFetchStmt Prepare statement for fetching inputs
* @param conn Connection object that is used to find config direction.
* @throws SQLException In case of any failure on Derby side
*/
public void loadConfigTypes(List<MConfig> linkConfig, List<MConfig> fromConfig,
List<MConfig> toConfig, PreparedStatement configFetchStmt, PreparedStatement inputFetchStmt,
int configPosition) throws SQLException {
int configPosition, Connection conn) throws SQLException {
// Get list of structures from database
ResultSet rsetConfig = configFetchStmt.executeQuery();
while (rsetConfig.next()) {
long configId = rsetConfig.getLong(1);
Long configConnectorId = rsetConfig.getLong(2);
String operation = rsetConfig.getString(3);
String configName = rsetConfig.getString(4);
String configType = rsetConfig.getString(5);
int configIndex = rsetConfig.getInt(6);
String configName = rsetConfig.getString(3);
String configType = rsetConfig.getString(4);
int configIndex = rsetConfig.getInt(5);
List<MInput<?>> configInputs = new ArrayList<MInput<?>>();
MConfig config = new MConfig(configName, configInputs);
@ -2324,7 +2616,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
linkConfig.add(config);
break;
case JOB:
Direction type = Direction.valueOf(operation);
Direction type = findConfigDirection(configId, conn);
List<MConfig> jobConfigs;
switch(type) {
case FROM:

Просмотреть файл

@ -41,6 +41,17 @@ public final class DerbySchemaConstants {
public static final String COLUMN_SQM_VALUE = "SQM_VALUE";
// SQ_DIRECTION
public static final String TABLE_SQ_DIRECTION_NAME = "SQ_DIRECTION";
public static final String TABLE_SQ_DIRECTION = SCHEMA_PREFIX
+ TABLE_SQ_DIRECTION_NAME;
public static final String COLUMN_SQD_ID = "SQD_ID";
public static final String COLUMN_SQD_NAME = "SQD_NAME";
// SQ_CONNECTOR
public static final String TABLE_SQ_CONNECTOR_NAME = "SQ_CONNECTOR";
@ -56,6 +67,27 @@ public final class DerbySchemaConstants {
public static final String COLUMN_SQC_VERSION = "SQC_VERSION";
// SQ_CONNECTOR_DIRECTIONS
public static final String TABLE_SQ_CONNECTOR_DIRECTIONS_NAME = "SQ_CONNECTOR_DIRECTIONS";
public static final String TABLE_SQ_CONNECTOR_DIRECTIONS = SCHEMA_PREFIX
+ TABLE_SQ_CONNECTOR_DIRECTIONS_NAME;
public static final String COLUMN_SQCD_ID = "SQCD_ID";
public static final String COLUMN_SQCD_CONNECTOR = "SQCD_CONNECTOR";
public static final String COLUMN_SQCD_DIRECTION = "SQCD_DIRECTION";
public static final String CONSTRAINT_SQCD_SQC_NAME = CONSTRAINT_PREFIX + "SQCD_SQC";
public static final String CONSTRAINT_SQCD_SQC = SCHEMA_PREFIX + CONSTRAINT_SQCD_SQC_NAME;
public static final String CONSTRAINT_SQCD_SQD_NAME = CONSTRAINT_PREFIX + "SQCD_SQD";
public static final String CONSTRAINT_SQCD_SQD = SCHEMA_PREFIX + CONSTRAINT_SQCD_SQD_NAME;
// SQ_CONFIG
public static final String TABLE_SQ_CONFIG_NAME = "SQ_CONFIG";
@ -81,6 +113,27 @@ public final class DerbySchemaConstants {
public static final String CONSTRAINT_SQ_CFG_SQC = SCHEMA_PREFIX + CONSTRAINT_SQ_CFG_SQC_NAME;
// SQ_CONFIG_DIRECTIONS
public static final String TABLE_SQ_CONFIG_DIRECTIONS_NAME = "SQ_CONFIG_DIRECTIONS";
public static final String TABLE_SQ_CONFIG_DIRECTIONS = SCHEMA_PREFIX
+ TABLE_SQ_CONFIG_DIRECTIONS_NAME;
public static final String COLUMN_SQ_CFG_DIR_ID = "SQ_CFG_DIR_ID";
public static final String COLUMN_SQ_CFG_DIR_CONFIG = "SQ_CFG_DIR_CONFIG";
public static final String COLUMN_SQ_CFG_DIR_DIRECTION = "SQ_CFG_DIR_DIRECTION";
public static final String CONSTRAINT_SQ_CFG_DIR_CONFIG_NAME = CONSTRAINT_PREFIX + "SQ_CFG_DIR_CONFIG";
public static final String CONSTRAINT_SQ_CFG_DIR_CONFIG = SCHEMA_PREFIX + CONSTRAINT_SQ_CFG_DIR_CONFIG_NAME;
public static final String CONSTRAINT_SQ_CFG_DIR_DIRECTION_NAME = CONSTRAINT_PREFIX + "SQ_CFG_DIR_DIRECTION";
public static final String CONSTRAINT_SQ_CFG_DIR_DIRECTION = SCHEMA_PREFIX + CONSTRAINT_SQ_CFG_DIR_DIRECTION_NAME;
// SQ_INPUT
public static final String TABLE_SQ_INPUT_NAME = "SQ_INPUT";

Просмотреть файл

@ -35,31 +35,65 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
* </pre>
* </p>
* <p>
* <strong>SQ_DIRECTION</strong>: Directions.
* <pre>
* +---------------------------------------+
* | SQ_DIRECTION |
* +---------------------------------------+
* | SQD_ID: BIGINT PK AUTO-GEN |
* | SQD_NAME: VARCHAR(64) | "FROM"|"TO"
* +---------------------------------------+
* </pre>
* </p>
* <p>
* <strong>SQ_CONNECTOR</strong>: Connector registration.
* <pre>
* +----------------------------+
* | SQ_CONNECTOR |
* +----------------------------+
* | SQC_ID: BIGINT PK AUTO-GEN |
* | SQC_NAME: VARCHAR(64) |
* | SQC_CLASS: VARCHAR(255) |
* | SQC_VERSION: VARCHAR(64) |
* +----------------------------+
* +-----------------------------+
* | SQ_CONNECTOR |
* +-----------------------------+
* | SQC_ID: BIGINT PK AUTO-GEN |
* | SQC_NAME: VARCHAR(64) |
* | SQC_CLASS: VARCHAR(255) |
* | SQC_VERSION: VARCHAR(64) |
* +-----------------------------+
* </pre>
* </p>
* <p>
* <strong>SQ_CONNECTOR_DIRECTIONS</strong>: Connector directions.
* <pre>
* +------------------------------+
* | SQ_CONNECTOR_DIRECTIONS |
* +------------------------------+
* | SQCD_ID: BIGINT PK AUTO-GEN |
* | SQCD_CONNECTOR: BIGINT | FK SQCD_CONNECTOR(SQC_ID)
* | SQCD_DIRECTION: BIGINT | FK SQCD_DIRECTION(SQD_ID)
* +------------------------------+
* </pre>
* </p>
* <p>
* <strong>SQ_CONFIG</strong>: Config details.
* <pre>
* +----------------------------------+
* | SQ_CONFIG |
* +----------------------------------+
* +-------------------------------------+
* | SQ_CONFIG |
* +-------------------------------------+
* | SQ_CFG_ID: BIGINT PK AUTO-GEN |
* | SQ_CFG_OWNER: BIGINT | FK SQ_CFG_OWNER(SQC_ID),NULL for driver
* | SQ_CFG_DIRECTION: VARCHAR(32) | "FROM"|"TO"|NULL
* | SQ_CFG_NAME: VARCHAR(64) |
* | SQ_CFG_TYPE: VARCHAR(32) | "LINK"|"JOB"
* | SQ_CFG_INDEX: SMALLINT |
* +----------------------------------+
* +-------------------------------------+
* </pre>
* </p>
* <p>
* <strong>SQ_CONFIG_DIRECTIONS</strong>: Connector directions.
* <pre>
* +------------------------------+
* | SQ_CONNECTOR_DIRECTIONS |
* +------------------------------+
* | SQCD_ID: BIGINT PK AUTO-GEN |
* | SQCD_CONFIG: BIGINT | FK SQCD_CONFIG(SQ_CFG_ID)
* | SQCD_DIRECTION: BIGINT | FK SQCD_DIRECTION(SQD_ID)
* +------------------------------+
* </pre>
* </p>
* <p>
@ -118,11 +152,11 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
* <strong>SQ_LINK_INPUT</strong>: N:M relationship link and input
* <pre>
* +----------------------------+
* | SQ_LINK_INPUT |
* | SQ_LINK_INPUT |
* +----------------------------+
* | SQ_LNKI_LINK: BIGINT PK | FK SQ_LINK(SQ_LNK_ID)
* | SQ_LNKI_INPUT: BIGINT PK | FK SQ_INPUT(SQI_ID)
* | SQ_LNKI_VALUE: LONG VARCHAR |
* | SQ_LNKI_LINK: BIGINT PK | FK SQ_LINK(SQ_LNK_ID)
* | SQ_LNKI_INPUT: BIGINT PK | FK SQ_INPUT(SQI_ID)
* | SQ_LNKI_VALUE: LONG VARCHAR|
* +----------------------------+
* </pre>
* </p>
@ -212,6 +246,13 @@ public final class DerbySchemaQuery {
+ COLUMN_SQM_VALUE + " VARCHAR(64) "
+ ")";
// DDL: Create table SQ_DIRECTION
public static final String QUERY_CREATE_TABLE_SQ_DIRECTION =
"CREATE TABLE " + TABLE_SQ_DIRECTION + " ("
+ COLUMN_SQD_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+ COLUMN_SQD_NAME + " VARCHAR(64)"
+ ")";
// DDL: Create table SQ_CONNECTOR
public static final String QUERY_CREATE_TABLE_SQ_CONNECTOR =
"CREATE TABLE " + TABLE_SQ_CONNECTOR + " ("
@ -221,6 +262,20 @@ public final class DerbySchemaQuery {
+ COLUMN_SQC_VERSION + " VARCHAR(64) "
+ ")";
// DDL: Create table SQ_CONNECTOR_DIRECTIONS
public static final String QUERY_CREATE_TABLE_SQ_CONNECTOR_DIRECTIONS =
"CREATE TABLE " + TABLE_SQ_CONNECTOR_DIRECTIONS + " ("
+ COLUMN_SQCD_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+ COLUMN_SQCD_CONNECTOR + " BIGINT, "
+ COLUMN_SQCD_DIRECTION + " BIGINT, "
+ "CONSTRAINT " + CONSTRAINT_SQCD_SQC + " "
+ "FOREIGN KEY (" + COLUMN_SQCD_CONNECTOR + ") "
+ "REFERENCES " + TABLE_SQ_CONNECTOR + " (" + COLUMN_SQC_ID + "), "
+ "CONSTRAINT " + CONSTRAINT_SQCD_SQD + " "
+ "FOREIGN KEY (" + COLUMN_SQCD_DIRECTION + ") "
+ "REFERENCES " + TABLE_SQ_DIRECTION + " (" + COLUMN_SQD_ID + ")"
+ ")";
// DDL: Create table SQ_CONFIG ( It stores the configs defined by every connector), if connector is null then it is driver config
public static final String QUERY_CREATE_TABLE_SQ_CONFIG =
"CREATE TABLE " + TABLE_SQ_CONFIG + " ("
@ -235,6 +290,20 @@ public final class DerbySchemaQuery {
+ "REFERENCES " + TABLE_SQ_CONNECTOR + " (" + COLUMN_SQC_ID + ")"
+ ")";
// DDL: Create table SQ_CONFIG_DIRECTIONS
public static final String QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS =
"CREATE TABLE " + TABLE_SQ_CONFIG_DIRECTIONS + " ("
+ COLUMN_SQ_CFG_DIR_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+ COLUMN_SQ_CFG_DIR_CONFIG + " BIGINT, "
+ COLUMN_SQ_CFG_DIR_DIRECTION + " BIGINT, "
+ "CONSTRAINT " + CONSTRAINT_SQ_CFG_DIR_CONFIG + " "
+ "FOREIGN KEY (" + COLUMN_SQ_CFG_DIR_CONFIG + ") "
+ "REFERENCES " + TABLE_SQ_CONFIG + " (" + COLUMN_SQ_CFG_ID + "), "
+ "CONSTRAINT " + CONSTRAINT_SQ_CFG_DIR_DIRECTION + " "
+ "FOREIGN KEY (" + COLUMN_SQ_CFG_DIR_DIRECTION + ") "
+ "REFERENCES " + TABLE_SQ_DIRECTION + " (" + COLUMN_SQD_ID + ")"
+ ")";
// DDL: Create table SQ_INPUT
public static final String QUERY_CREATE_TABLE_SQ_INPUT =
"CREATE TABLE " + TABLE_SQ_INPUT + " ("
@ -435,6 +504,14 @@ public final class DerbySchemaQuery {
+ COLUMN_SQM_VALUE + ") "
+ "VALUES(?, ?)";
public static final String STMT_SELECT_SQD_ID_BY_SQD_NAME =
"SELECT " + COLUMN_SQD_ID + " FROM " + TABLE_SQ_DIRECTION
+ " WHERE " + COLUMN_SQD_NAME + "=?";
public static final String STMT_SELECT_SQD_NAME_BY_SQD_ID =
"SELECT " + COLUMN_SQD_NAME + " FROM " + TABLE_SQ_DIRECTION
+ " WHERE " + COLUMN_SQD_ID + "=?";
// DML: Fetch connector Given Name
public static final String STMT_FETCH_BASE_CONNECTOR =
"SELECT "
@ -459,7 +536,6 @@ public final class DerbySchemaQuery {
"SELECT "
+ COLUMN_SQ_CFG_ID + ", "
+ COLUMN_SQ_CFG_OWNER + ", "
+ COLUMN_SQ_CFG_DIRECTION + ", "
+ COLUMN_SQ_CFG_NAME + ", "
+ COLUMN_SQ_CFG_TYPE + ", "
+ COLUMN_SQ_CFG_INDEX
@ -472,13 +548,12 @@ public final class DerbySchemaQuery {
"SELECT "
+ COLUMN_SQ_CFG_ID + ", "
+ COLUMN_SQ_CFG_OWNER + ", "
+ COLUMN_SQ_CFG_DIRECTION + ", "
+ COLUMN_SQ_CFG_NAME + ", "
+ COLUMN_SQ_CFG_TYPE + ", "
+ COLUMN_SQ_CFG_INDEX
+ " FROM " + TABLE_SQ_CONFIG
+ " WHERE " + COLUMN_SQ_CFG_OWNER + " IS NULL "
+ " ORDER BY " + COLUMN_SQ_CFG_TYPE + ", " + COLUMN_SQ_CFG_DIRECTION + ", " + COLUMN_SQ_CFG_INDEX;
+ " ORDER BY " + COLUMN_SQ_CFG_TYPE + ", " + COLUMN_SQ_CFG_INDEX;
// DML: Fetch inputs for a given config
public static final String STMT_FETCH_INPUT =
@ -544,15 +619,21 @@ public final class DerbySchemaQuery {
+ COLUMN_SQC_VERSION
+ ") VALUES (?, ?, ?)";
public static final String STMT_INSERT_CONNECTOR_WITHOUT_SUPPORTED_DIRECTIONS =
"INSERT INTO " + TABLE_SQ_CONNECTOR + " ("
+ COLUMN_SQC_NAME + ", "
+ COLUMN_SQC_CLASS + ", "
+ COLUMN_SQC_VERSION
+ ") VALUES (?, ?, ?)";
// DML: Insert config base
public static final String STMT_INSERT_CONFIG_BASE =
"INSERT INTO " + TABLE_SQ_CONFIG + " ("
+ COLUMN_SQ_CFG_OWNER + ", "
+ COLUMN_SQ_CFG_DIRECTION + ", "
+ COLUMN_SQ_CFG_NAME + ", "
+ COLUMN_SQ_CFG_TYPE + ", "
+ COLUMN_SQ_CFG_INDEX
+ ") VALUES ( ?, ?, ?, ?, ?)";
+ ") VALUES ( ?, ?, ?, ?)";
// DML: Insert config input
public static final String STMT_INSERT_INPUT_BASE =
@ -1058,6 +1139,45 @@ public final class DerbySchemaQuery {
"ALTER TABLE " + TABLE_SQ_LINK + " ADD CONSTRAINT "
+ CONSTRAINT_SQ_LNK_NAME_UNIQUE + " UNIQUE (" + COLUMN_SQ_LNK_NAME + ")";
public static final String STMT_INSERT_DIRECTION = "INSERT INTO " + TABLE_SQ_DIRECTION + " "
+ "(" + COLUMN_SQD_NAME + ") VALUES (?)";
// DML: Fetch all configs
public static final String STMT_FETCH_CONFIG_DIRECTIONS =
"SELECT "
+ COLUMN_SQ_CFG_ID + ", "
+ COLUMN_SQ_CFG_DIRECTION
+ " FROM " + TABLE_SQ_CONFIG;
public static final String QUERY_UPGRADE_TABLE_SQ_CONFIG_DROP_COLUMN_SQ_CFG_DIRECTION_VARCHAR =
"ALTER TABLE " + TABLE_SQ_CONFIG + " DROP COLUMN " + COLUMN_SQ_CFG_DIRECTION;
public static final String STMT_INSERT_SQ_CONNECTOR_DIRECTIONS =
"INSERT INTO " + TABLE_SQ_CONNECTOR_DIRECTIONS + " "
+ "(" + COLUMN_SQCD_CONNECTOR + ", " + COLUMN_SQCD_DIRECTION + ")"
+ " VALUES (?, ?)";
public static final String STMT_INSERT_SQ_CONFIG_DIRECTIONS =
"INSERT INTO " + TABLE_SQ_CONFIG_DIRECTIONS + " "
+ "(" + COLUMN_SQ_CFG_DIR_CONFIG + ", " + COLUMN_SQ_CFG_DIR_DIRECTION + ")"
+ " VALUES (?, ?)";
public static final String STMT_SELECT_SQ_CONNECTOR_DIRECTIONS_ALL =
"SELECT " + COLUMN_SQCD_CONNECTOR + ", " + COLUMN_SQCD_DIRECTION
+ " FROM " + TABLE_SQ_CONNECTOR_DIRECTIONS;
public static final String STMT_SELECT_SQ_CONNECTOR_DIRECTIONS =
STMT_SELECT_SQ_CONNECTOR_DIRECTIONS_ALL + " WHERE "
+ COLUMN_SQCD_CONNECTOR + " = ?";
public static final String STMT_SELECT_SQ_CONFIG_DIRECTIONS_ALL =
"SELECT " + COLUMN_SQ_CFG_DIR_CONFIG + ", " + COLUMN_SQ_CFG_DIR_DIRECTION
+ " FROM " + TABLE_SQ_CONFIG_DIRECTIONS;
public static final String STMT_SELECT_SQ_CONFIG_DIRECTIONS =
STMT_SELECT_SQ_CONFIG_DIRECTIONS_ALL + " WHERE "
+ COLUMN_SQ_CFG_DIR_CONFIG + " = ?";
private DerbySchemaQuery() {
// Disable explicit object creation
}

Просмотреть файл

@ -19,10 +19,13 @@ package org.apache.sqoop.repository.derby;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_SCHEMA_SQOOP;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_CONFIG;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_CONNECTOR;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_CONNECTOR_DIRECTIONS;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_COUNTER;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_COUNTER_GROUP;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_COUNTER_SUBMISSION;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_DIRECTION;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_INPUT;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_JOB;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_JOB_INPUT;
@ -31,6 +34,7 @@ import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TA
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_SUBMISSION;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_CREATE_TABLE_SQ_SYSTEM;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_CONFIG_RENAME_COLUMN_SQ_CFG_OPERATION_TO_SQ_CFG_DIRECTION;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_CONFIG_DROP_COLUMN_SQ_CFG_DIRECTION_VARCHAR;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_CREATION_USER;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_ENABLED;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_SQB_TO_LINK;
@ -47,6 +51,8 @@ import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_T
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_SUBMISSION_ADD_COLUMN_CREATION_USER;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.QUERY_UPGRADE_TABLE_SQ_SUBMISSION_ADD_COLUMN_UPDATE_USER;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.STMT_INSERT_DIRECTION;
import static org.junit.Assert.assertEquals;
import java.sql.Connection;
@ -147,6 +153,7 @@ abstract public class DerbyTestCase {
}
if (version > 3) {
runQuery(QUERY_CREATE_TABLE_SQ_DIRECTION);
runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_RENAME_COLUMN_SQ_CFG_OPERATION_TO_SQ_CFG_DIRECTION);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_RENAME_COLUMN_SQB_LINK_TO_SQB_FROM_LINK);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_SQB_TO_LINK);
@ -156,6 +163,13 @@ abstract public class DerbyTestCase {
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_UNIQUE_CONSTRAINT_NAME);
runQuery(QUERY_UPGRADE_TABLE_SQ_LINK_ADD_UNIQUE_CONSTRAINT_NAME);
runQuery(QUERY_UPGRADE_TABLE_SQ_CONFIG_DROP_COLUMN_SQ_CFG_DIRECTION_VARCHAR);
runQuery(QUERY_CREATE_TABLE_SQ_CONNECTOR_DIRECTIONS);
runQuery(QUERY_CREATE_TABLE_SQ_CONFIG_DIRECTIONS);
for (Direction direction : Direction.values()) {
runQuery(STMT_INSERT_DIRECTION, direction.toString());
}
}
runQuery("INSERT INTO SQOOP.SQ_SYSTEM(SQM_KEY, SQM_VALUE) VALUES('version', '" + version + "')");
@ -196,6 +210,42 @@ abstract public class DerbyTestCase {
}
}
/**
* Run single, arbitrary insert query on derby memory repository.
*
* @param query Query to execute
* @return Long id of newly inserted row (-1 if none).
* @throws Exception
*/
protected Long runInsertQuery(String query, Object... args) throws Exception {
PreparedStatement stmt = null;
try {
stmt = getDerbyDatabaseConnection().prepareStatement(query, PreparedStatement.RETURN_GENERATED_KEYS);
for (int i = 0; i < args.length; ++i) {
if (args[i] instanceof String) {
stmt.setString(i + 1, (String)args[i]);
} else if (args[i] instanceof Long) {
stmt.setLong(i + 1, (Long)args[i]);
} else {
stmt.setString(i + 1, args[i].toString());
}
}
if (!stmt.execute()) {
ResultSet rs = stmt.getGeneratedKeys();
rs.next();
return rs.getLong(1);
}
} finally {
if (stmt != null) {
stmt.close();
}
}
return -1L;
}
protected Connection getDerbyDatabaseConnection() {
return connection;
}
@ -291,54 +341,59 @@ abstract public class DerbyTestCase {
}
protected void loadConnectorAndDriverConfigVersion4() throws Exception {
Long configId;
// Connector entry
runQuery("INSERT INTO SQOOP.SQ_CONNECTOR(SQC_NAME, SQC_CLASS, SQC_VERSION)"
+ "VALUES('A', 'org.apache.sqoop.test.A', '1.0-test')");
for (String connector : new String[]{"1"}) {
// Directions
runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS(SQCD_CONNECTOR, SQCD_DIRECTION)"
+ "VALUES(" + connector + ", 1)");
runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS(SQCD_CONNECTOR, SQCD_DIRECTION)"
+ "VALUES(" + connector + ", 2)");
// connector configs
for (String direction : new String[]{"null", "'FROM'", "'TO'"}) {
for (String direction : new String[]{null, "1", "2"}) {
String type;
if (direction.equals("null")) {
if (direction == null) {
type = "LINK";
} else {
type = "JOB";
}
runQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_OWNER, SQ_CFG_DIRECTION, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES("
+ connector + ", "
+ direction
+ ", 'C1', '"
+ type
+ "', 0)");
runQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_OWNER, SQ_CFG_DIRECTION, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES("
+ connector + ", "
+ direction
+ ", 'C2', '"
+ type
+ "', 1)");
configId = runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_OWNER, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES(" + connector + ", 'C1', '" + type + "', 0)");
if (direction != null) {
runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG_DIRECTIONS"
+ "(SQ_CFG_DIR_CONFIG, SQ_CFG_DIR_DIRECTION) "
+ "VALUES(" + configId + ", " + direction + ")");
}
configId = runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_OWNER, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES(" + connector + ", 'C2', '" + type + "', 1)");
if (direction != null) {
runInsertQuery("INSERT INTO SQOOP.SQ_CONFIG_DIRECTIONS"
+ "(SQ_CFG_DIR_CONFIG, SQ_CFG_DIR_DIRECTION) "
+ "VALUES(" + configId + ", " + direction + ")");
}
}
}
// driver config
for (String type : new String[]{"JOB"}) {
runQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_OWNER, SQ_CFG_DIRECTION, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES(NULL, NULL"
+ ", 'C1', '"
+ type
+ "', 0)");
+ "(SQ_CFG_OWNER, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES(NULL" + ", 'C1', '" + type + "', 0)");
runQuery("INSERT INTO SQOOP.SQ_CONFIG"
+ "(SQ_CFG_OWNER, SQ_CFG_DIRECTION, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES(NULL, NULL"
+ ", 'C2', '"
+ type
+ "', 1)");
+ "(SQ_CFG_OWNER, SQ_CFG_NAME, SQ_CFG_TYPE, SQ_CFG_INDEX) "
+ "VALUES(NULL" + ", 'C2', '" + type + "', 1)");
}
// Input entries
@ -512,8 +567,10 @@ abstract public class DerbyTestCase {
*/
public void addConnector() throws Exception {
// Connector entry
runQuery("INSERT INTO SQOOP.SQ_CONNECTOR(SQC_NAME, SQC_CLASS, SQC_VERSION)"
+ "VALUES('B', 'org.apache.sqoop.test.B', '1.0-test')");
Long connectorId = runInsertQuery("INSERT INTO SQOOP.SQ_CONNECTOR(SQC_NAME, SQC_CLASS, SQC_VERSION)"
+ "VALUES('B', 'org.apache.sqoop.test.B', '1.0-test')");
runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS (SQCD_CONNECTOR, SQCD_DIRECTION) VALUES (" + connectorId + ", 1)");
runQuery("INSERT INTO SQOOP.SQ_CONNECTOR_DIRECTIONS (SQCD_CONNECTOR, SQCD_DIRECTION) VALUES (" + connectorId + ", 2)");
}
/**
@ -560,8 +617,20 @@ abstract public class DerbyTestCase {
}
protected MConnector getConnector() {
return getConnector(true, true);
}
protected MConnector getConnector(boolean from, boolean to) {
MFromConfig fromJobForms = null;
MToConfig toJobForms = null;
if (from) {
fromJobForms = getFromConfig();
}
if (to) {
toJobForms = getToConfig();
}
return new MConnector("A", "org.apache.sqoop.test.A", "1.0-test",
getLinkConfig(), getFromConfig(), getToConfig());
getLinkConfig(), fromJobForms, toJobForms);
}
protected MDriver getDriver() {

Просмотреть файл

@ -78,8 +78,6 @@ public class TestConnectorHandling extends DerbyTestCase {
assertEquals(connectors.size(),2);
assertEquals(connectors.get(0).getUniqueName(),"A");
assertEquals(connectors.get(1).getUniqueName(),"B");
}
@Test
@ -101,4 +99,64 @@ public class TestConnectorHandling extends DerbyTestCase {
assertNotNull(retrieved);
assertEquals(connector, retrieved);
}
@Test
public void testFromDirection() throws Exception {
MConnector connector = getConnector(true, false);
handler.registerConnector(connector, getDerbyDatabaseConnection());
// Connector should get persistence ID
assertEquals(1, connector.getPersistenceId());
// Now check content in corresponding tables
assertCountForTable("SQOOP.SQ_CONNECTOR", 1);
assertCountForTable("SQOOP.SQ_CONFIG", 4);
assertCountForTable("SQOOP.SQ_INPUT", 8);
// Registered connector should be easily recovered back
MConnector retrieved = handler.findConnector("A", getDerbyDatabaseConnection());
assertNotNull(retrieved);
assertEquals(connector, retrieved);
}
@Test
public void testToDirection() throws Exception {
MConnector connector = getConnector(false, true);
handler.registerConnector(connector, getDerbyDatabaseConnection());
// Connector should get persistence ID
assertEquals(1, connector.getPersistenceId());
// Now check content in corresponding tables
assertCountForTable("SQOOP.SQ_CONNECTOR", 1);
assertCountForTable("SQOOP.SQ_CONFIG", 4);
assertCountForTable("SQOOP.SQ_INPUT", 8);
// Registered connector should be easily recovered back
MConnector retrieved = handler.findConnector("A", getDerbyDatabaseConnection());
assertNotNull(retrieved);
assertEquals(connector, retrieved);
}
@Test
public void testNeitherDirection() throws Exception {
MConnector connector = getConnector(false, false);
handler.registerConnector(connector, getDerbyDatabaseConnection());
// Connector should get persistence ID
assertEquals(1, connector.getPersistenceId());
// Now check content in corresponding tables
assertCountForTable("SQOOP.SQ_CONNECTOR", 1);
assertCountForTable("SQOOP.SQ_CONFIG", 2);
assertCountForTable("SQOOP.SQ_INPUT", 4);
// Registered connector should be easily recovered back
MConnector retrieved = handler.findConnector("A", getDerbyDatabaseConnection());
assertNotNull(retrieved);
assertEquals(connector, retrieved);
}
}

Просмотреть файл

@ -31,10 +31,6 @@ import org.apache.sqoop.model.MStringInput;
import org.junit.Before;
import org.junit.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.*;
/**

Просмотреть файл

@ -35,7 +35,6 @@ import static org.apache.sqoop.shell.utils.ConfigDisplayer.*;
@SuppressWarnings("serial")
public class ShowConnectorFunction extends SqoopFunction {
private static final char SUPPORTED_DIRECTIONS_SEPARATOR = '/';
@SuppressWarnings("static-access")
public ShowConnectorFunction() {
@ -83,7 +82,7 @@ public class ShowConnectorFunction extends SqoopFunction {
uniqueNames.add(connector.getUniqueName());
versions.add(connector.getVersion());
classes.add(connector.getClassName());
supportedDirections.add(getSupportedDirections(connector));
supportedDirections.add(connector.getSupportedDirections().toString());
}
TableDisplayer.display(header, ids, uniqueNames, versions, classes, supportedDirections);
@ -113,33 +112,8 @@ public class ShowConnectorFunction extends SqoopFunction {
connector.getUniqueName(),
connector.getClassName(),
connector.getVersion(),
getSupportedDirections(connector)
connector.getSupportedDirections().toString()
);
displayConnectorConfigDetails(connector, client.getConnectorConfigBundle(connector.getPersistenceId()));
}
/**
* Creates a nicely formatted string for which directions are supported.
* Example: FROM/TO.
* @param connector
* @return String
*/
private String getSupportedDirections(MConnector connector) {
StringBuffer supportedDirectionsBuffer = new StringBuffer();
SupportedDirections supportedDirections
= connector.getSupportedDirections();
if (supportedDirections.isDirectionSupported(Direction.FROM)) {
supportedDirectionsBuffer.append(Direction.FROM);
if (supportedDirections.isDirectionSupported(Direction.TO)) {
supportedDirectionsBuffer.append(SUPPORTED_DIRECTIONS_SEPARATOR);
supportedDirectionsBuffer.append(Direction.TO);
}
} else if (supportedDirections.isDirectionSupported(Direction.TO)) {
supportedDirectionsBuffer.append(Direction.TO);
}
return supportedDirectionsBuffer.toString();
}
}

Просмотреть файл

@ -73,7 +73,7 @@ public abstract class SqoopConnector {
public abstract From getFrom();
/**
* @return an <tt>To</tt> that provides classes for performing export.
* @return an <tt>To</tt> that provides classes for performing export.n
*/
public abstract To getTo();