SQOOP-1374: From/To: Metadata upgrade

(Abraham Elmahrek via Jarek Jarcec Cecho)
This commit is contained in:
Jarek Jarcec Cecho 2014-09-01 15:15:53 +02:00 коммит произвёл Abraham Elmahrek
Родитель cd882a9f3c
Коммит 51a07bc352
24 изменённых файлов: 2388 добавлений и 1670 удалений

Просмотреть файл

@ -66,6 +66,7 @@ public class MConnection extends MAccountableEntity implements MClonable {
this.connectorId = other.connectorId;
this.connectorPart = connectorPart;
this.frameworkPart = frameworkPart;
this.setPersistenceId(other.getPersistenceId());
}
@Override

Просмотреть файл

@ -106,6 +106,7 @@ public class MJob extends MAccountableEntity implements MClonable {
this.fromConnectorPart = fromPart;
this.toConnectorPart = toPart;
this.frameworkPart = frameworkPart;
this.setPersistenceId(other.getPersistenceId());
}
@Override

Просмотреть файл

@ -49,7 +49,6 @@ public class GenericJdbcConnectorMetadataUpgrader extends MetadataUpgrader {
@Override
public void upgrade(MJobForms original, MJobForms upgradeTarget) {
doUpgrade(original.getForms(), upgradeTarget.getForms());
}
@SuppressWarnings("unchecked")
@ -65,12 +64,17 @@ public class GenericJdbcConnectorMetadataUpgrader extends MetadataUpgrader {
for (MForm form : target) {
List<MInput<?>> inputs = form.getInputs();
MForm originalForm = formMap.get(form.getName());
if (originalForm == null) {
LOG.warn("Form: '" + form.getName() + "' not present in old " +
"connector. So it and its inputs will not be transferred by the upgrader.");
continue;
}
for (MInput input : inputs) {
try {
MInput originalInput = originalForm.getInput(input.getName());
input.setValue(originalInput.getValue());
} catch (SqoopException ex) {
LOG.warn("Input: " + input.getName() + " not present in old " +
LOG.warn("Input: '" + input.getName() + "' not present in old " +
"connector. So it will not be transferred by the upgrader.");
}
}

Просмотреть файл

@ -79,7 +79,7 @@ public class GenericJdbcValidator extends Validator {
}
private Validation validateToJobConfiguration(ToJobConfiguration configuration) {
Validation validation = new Validation(ToJobConfiguration.class);
Validation validation = new Validation(FromJobConfiguration.class);
if(configuration.toTable.tableName == null && configuration.toTable.sql == null) {
validation.addMessage(Status.UNACCEPTABLE, "toTable", "Either table name or SQL must be specified");
@ -103,7 +103,7 @@ public class GenericJdbcValidator extends Validator {
}
private Validation validateFromJobConfiguration(FromJobConfiguration configuration) {
Validation validation = new Validation(ToJobConfiguration.class);
Validation validation = new Validation(FromJobConfiguration.class);
if(configuration.fromTable.tableName == null && configuration.fromTable.sql == null) {
validation.addMessage(Status.UNACCEPTABLE, "fromTable", "Either table name or SQL must be specified");

Просмотреть файл

@ -127,6 +127,6 @@ public class HdfsConnector extends SqoopConnector {
*/
@Override
public MetadataUpgrader getMetadataUpgrader() {
return null;
return new HdfsMetadataUpgrader();
}
}

Просмотреть файл

@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.sqoop.connector.hdfs;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.spi.MetadataUpgrader;
import org.apache.sqoop.model.MConnectionForms;
import org.apache.sqoop.model.MForm;
import org.apache.sqoop.model.MInput;
import org.apache.sqoop.model.MJobForms;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class HdfsMetadataUpgrader extends MetadataUpgrader {
private static final Logger LOG =
Logger.getLogger(HdfsMetadataUpgrader.class);
/*
* For now, there is no real upgrade. So copy all data over,
* set the validation messages and error messages to be the same as for the
* inputs in the original one.
*/
@Override
public void upgrade(MConnectionForms original,
MConnectionForms upgradeTarget) {
doUpgrade(original.getForms(), upgradeTarget.getForms());
}
@Override
public void upgrade(MJobForms original, MJobForms upgradeTarget) {
doUpgrade(original.getForms(), upgradeTarget.getForms());
}
@SuppressWarnings("unchecked")
private void doUpgrade(List<MForm> original, List<MForm> target) {
// Easier to find the form in the original forms list if we use a map.
// Since the constructor of MJobForms takes a list,
// index is not guaranteed to be the same, so we need to look for
// equivalence
Map<String, MForm> formMap = new HashMap<String, MForm>();
for (MForm form : original) {
formMap.put(form.getName(), form);
}
for (MForm form : target) {
List<MInput<?>> inputs = form.getInputs();
MForm originalForm = formMap.get(form.getName());
if (originalForm == null) {
LOG.warn("Form: '" + form.getName() + "' not present in old " +
"connector. So it and its inputs will not be transferred by the upgrader.");
continue;
}
for (MInput input : inputs) {
try {
MInput originalInput = originalForm.getInput(input.getName());
input.setValue(originalInput.getValue());
} catch (SqoopException ex) {
LOG.warn("Input: '" + input.getName() + "' not present in old " +
"connector. So it will not be transferred by the upgrader.");
}
}
}
}
}

Просмотреть файл

@ -17,10 +17,7 @@
*/
package org.apache.sqoop.connector;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@ -150,48 +147,22 @@ public class ConnectorManager implements Reconfigurable {
LOG.trace("Begin connector manager initialization");
}
List<URL> connectorConfigs = new ArrayList<URL>();
List<URL> connectorConfigs = ConnectorManagerUtils.getConnectorConfigs();
try {
Enumeration<URL> appPathConfigs =
ConnectorManager.class.getClassLoader().getResources(
ConfigurationConstants.FILENAME_CONNECTOR_PROPERTIES);
LOG.info("Connector config urls: " + connectorConfigs);
while (appPathConfigs.hasMoreElements()) {
connectorConfigs.add(appPathConfigs.nextElement());
if (connectorConfigs.size() == 0) {
throw new SqoopException(ConnectorError.CONN_0002);
}
for (URL url : connectorConfigs) {
ConnectorHandler handler = new ConnectorHandler(url);
ConnectorHandler handlerOld =
handlerMap.put(handler.getUniqueName(), handler);
if (handlerOld != null) {
throw new SqoopException(ConnectorError.CONN_0006,
handler + ", " + handlerOld);
}
ClassLoader ctxLoader = Thread.currentThread().getContextClassLoader();
if (ctxLoader != null) {
Enumeration<URL> ctxPathConfigs = ctxLoader.getResources(
ConfigurationConstants.FILENAME_CONNECTOR_PROPERTIES);
while (ctxPathConfigs.hasMoreElements()) {
URL configUrl = ctxPathConfigs.nextElement();
if (!connectorConfigs.contains(configUrl)) {
connectorConfigs.add(configUrl);
}
}
}
LOG.info("Connector config urls: " + connectorConfigs);
if (connectorConfigs.size() == 0) {
throw new SqoopException(ConnectorError.CONN_0002);
}
for (URL url : connectorConfigs) {
ConnectorHandler handler = new ConnectorHandler(url);
ConnectorHandler handlerOld =
handlerMap.put(handler.getUniqueName(), handler);
if (handlerOld != null) {
throw new SqoopException(ConnectorError.CONN_0006,
handler + ", " + handlerOld);
}
}
} catch (IOException ex) {
throw new SqoopException(ConnectorError.CONN_0001, ex);
}
registerConnectors(autoUpgrade);

Просмотреть файл

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.sqoop.connector;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.core.ConfigurationConstants;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
/**
* Utilities for ConnectorManager.
*/
public class ConnectorManagerUtils {
/**
* Get a list of URLs of connectors that are installed.
* Check
* @return List of URLs.
*/
public static List<URL> getConnectorConfigs() {
List<URL> connectorConfigs = new ArrayList<URL>();
try {
// Check ConnectorManager classloader.
Enumeration<URL> appPathConfigs =
ConnectorManager.class.getClassLoader().getResources(
ConfigurationConstants.FILENAME_CONNECTOR_PROPERTIES);
while (appPathConfigs.hasMoreElements()) {
connectorConfigs.add(appPathConfigs.nextElement());
}
// Check thread context classloader.
ClassLoader ctxLoader = Thread.currentThread().getContextClassLoader();
if (ctxLoader != null) {
Enumeration<URL> ctxPathConfigs = ctxLoader.getResources(
ConfigurationConstants.FILENAME_CONNECTOR_PROPERTIES);
while (ctxPathConfigs.hasMoreElements()) {
URL configUrl = ctxPathConfigs.nextElement();
if (!connectorConfigs.contains(configUrl)) {
connectorConfigs.add(configUrl);
}
}
}
} catch (IOException ex) {
throw new SqoopException(ConnectorError.CONN_0001, ex);
}
return connectorConfigs;
}
}

Просмотреть файл

@ -207,12 +207,12 @@ public class JdbcRepository extends Repository {
*/
@Override
public List<MConnector> findConnectors() {
return (List<MConnector>) doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) {
return handler.findConnectors(conn);
}
});
return (List<MConnector>) doWithConnection(new DoWithConnection() {
@Override
public Object doIt(Connection conn) {
return handler.findConnectors(conn);
}
});
}
/**

Просмотреть файл

@ -445,24 +445,48 @@ public abstract class Repository {
for (MJob job : jobs) {
// Make a new copy of the forms from the connector,
// else the values will get set in the forms in the connector for
// each connection.
List<MForm> forms = newConnector.getJobForms(Direction.FROM).clone(false).getForms();
MJobForms newJobForms = new MJobForms(forms);
upgrader.upgrade(job.getConnectorPart(Direction.FROM), newJobForms);
// @TODO(Abe): Check From and To
MJob newJob = new MJob(job, newJobForms, newJobForms, job.getFrameworkPart());
// each job.
List<MForm> fromForms = newConnector.getJobForms(Direction.FROM).clone(false).getForms();
List<MForm> toForms = newConnector.getJobForms(Direction.TO).clone(false).getForms();
// Transform form structures to objects for validations
// @TODO(Abe): Check From and To
Object newConfigurationObject = ClassUtils.instantiate(connector.getJobConfigurationClass(Direction.FROM));
FormUtils.fromForms(newJob.getConnectorPart(Direction.FROM).getForms(), newConfigurationObject);
Validation validation = validator.validateJob(newConfigurationObject);
if (validation.getStatus().canProceed()) {
// New FROM direction forms, old TO direction forms.
if (job.getConnectorId(Direction.FROM) == newConnector.getPersistenceId()) {
MJobForms newFromJobForms = new MJobForms(fromForms);
MJobForms oldToJobForms = job.getConnectorPart(Direction.TO);
upgrader.upgrade(job.getConnectorPart(Direction.FROM), newFromJobForms);
MJob newJob = new MJob(job, newFromJobForms, oldToJobForms, job.getFrameworkPart());
updateJob(newJob, tx);
} else {
logInvalidModelObject("job", newJob, validation);
upgradeSuccessful = false;
// Transform form structures to objects for validations
// Object newFromConfigurationObject = ClassUtils.instantiate(connector.getJobConfigurationClass(Direction.FROM));
// FormUtils.fromForms(newJob.getConnectorPart(Direction.FROM).getForms(), newFromConfigurationObject);
// Validation fromValidation = validator.validateJob(newFromConfigurationObject);
// if (fromValidation.getStatus().canProceed()) {
// updateJob(newJob, tx);
// } else {
// logInvalidModelObject("job", newJob, fromValidation);
// upgradeSuccessful = false;
// }
}
// Old FROM direction forms, new TO direction forms.
if (job.getConnectorId(Direction.TO) == newConnector.getPersistenceId()) {
MJobForms oldFromJobForms = job.getConnectorPart(Direction.FROM);
MJobForms newToJobForms = new MJobForms(toForms);
upgrader.upgrade(job.getConnectorPart(Direction.TO), newToJobForms);
MJob newJob = new MJob(job, oldFromJobForms, newToJobForms, job.getFrameworkPart());
updateJob(newJob, tx);
// Transform form structures to objects for validations
// Object newToConfigurationObject = ClassUtils.instantiate(connector.getJobConfigurationClass(Direction.TO));
// FormUtils.fromForms(newJob.getConnectorPart(Direction.TO).getForms(), newToConfigurationObject);
// Validation toValidation = validator.validateJob(newToConfigurationObject);
// if (toValidation.getStatus().canProceed()) {
// updateJob(newJob, tx);
// } else {
// logInvalidModelObject("job", newJob, toValidation);
// upgradeSuccessful = false;
// }
}
}

Просмотреть файл

@ -32,139 +32,139 @@ import static org.junit.Assert.assertNull;
*/
public class TestFrameworkMetadataUpgrader {
// FrameworkMetadataUpgrader upgrader;
//
// @Before
// public void initializeUpgrader() {
// upgrader = new FrameworkMetadataUpgrader();
// }
//
// /**
// * We take the same forms on input and output and we
// * expect that all values will be correctly transferred.
// */
// @Test
// public void testConnectionUpgrade() {
// MConnectionForms original = connection1();
// MConnectionForms target = connection1();
//
// original.getStringInput("f1.s1").setValue("A");
// original.getStringInput("f1.s2").setValue("B");
// original.getIntegerInput("f1.i").setValue(3);
//
// upgrader.upgrade(original, target);
//
// assertEquals("A", target.getStringInput("f1.s1").getValue());
// assertEquals("B", target.getStringInput("f1.s2").getValue());
// assertEquals(3, (long)target.getIntegerInput("f1.i").getValue());
// }
//
// /**
// * We take the same forms on input and output and we
// * expect that all values will be correctly transferred.
// */
// @Test
// public void testJobUpgrade() {
// MJobForms original = job1(MJob.Type.IMPORT);
// MJobForms target = job1(MJob.Type.IMPORT);
//
// original.getStringInput("f1.s1").setValue("A");
// original.getStringInput("f1.s2").setValue("B");
// original.getIntegerInput("f1.i").setValue(3);
//
// upgrader.upgrade(original, target);
//
// assertEquals("A", target.getStringInput("f1.s1").getValue());
// assertEquals("B", target.getStringInput("f1.s2").getValue());
// assertEquals(3, (long)target.getIntegerInput("f1.i").getValue());
// }
//
// /**
// * Upgrade scenario when new input has been added to the target forms.
// */
// @Test
// public void testNonExistingInput() {
// MConnectionForms original = connection1();
// MConnectionForms target = connection2();
//
// original.getStringInput("f1.s1").setValue("A");
// original.getStringInput("f1.s2").setValue("B");
// original.getIntegerInput("f1.i").setValue(3);
//
// upgrader.upgrade(original, target);
//
// assertEquals("A", target.getStringInput("f1.s1").getValue());
// assertNull(target.getStringInput("f1.s2_").getValue());
// assertEquals(3, (long)target.getIntegerInput("f1.i").getValue());
// }
//
// /**
// * Upgrade scenario when entire has been added in the target and
// * therefore is missing in the original.
// */
// @Test
// public void testNonExistingForm() {
// MConnectionForms original = connection1();
// MConnectionForms target = connection3();
//
// original.getStringInput("f1.s1").setValue("A");
// original.getStringInput("f1.s2").setValue("B");
// original.getIntegerInput("f1.i").setValue(3);
//
// upgrader.upgrade(original, target);
//
// assertNull(target.getStringInput("f2.s1").getValue());
// assertNull(target.getStringInput("f2.s2").getValue());
// assertNull(target.getIntegerInput("f2.i").getValue());
// }
//
// MJobForms job1(MJob.Type type) {
// return new MJobForms(type, forms1());
// }
//
// MConnectionForms connection1() {
// return new MConnectionForms(forms1());
// }
//
// MConnectionForms connection2() {
// return new MConnectionForms(forms2());
// }
//
// MConnectionForms connection3() {
// return new MConnectionForms(forms3());
// }
//
// List<MForm> forms1() {
// List<MForm> list = new LinkedList<MForm>();
// list.add(new MForm("f1", inputs1("f1")));
// return list;
// }
//
// List<MInput<?>> inputs1(String formName) {
// List<MInput<?>> list = new LinkedList<MInput<?>>();
// list.add(new MStringInput(formName + ".s1", false, (short)30));
// list.add(new MStringInput(formName + ".s2", false, (short)30));
// list.add(new MIntegerInput(formName + ".i", false));
// return list;
// }
//
// List<MForm> forms2() {
// List<MForm> list = new LinkedList<MForm>();
// list.add(new MForm("f1", inputs2("f1")));
// return list;
// }
//
// List<MInput<?>> inputs2(String formName) {
// List<MInput<?>> list = new LinkedList<MInput<?>>();
// list.add(new MStringInput(formName + ".s1", false, (short)30));
// list.add(new MStringInput(formName + ".s2_", false, (short)30));
// list.add(new MIntegerInput(formName + ".i", false));
// return list;
// }
//
// List<MForm> forms3() {
// List<MForm> list = new LinkedList<MForm>();
// list.add(new MForm("f2", inputs1("f2")));
// return list;
// }
FrameworkMetadataUpgrader upgrader;
@Before
public void initializeUpgrader() {
upgrader = new FrameworkMetadataUpgrader();
}
/**
* We take the same forms on input and output and we
* expect that all values will be correctly transferred.
*/
@Test
public void testConnectionUpgrade() {
MConnectionForms original = connection1();
MConnectionForms target = connection1();
original.getStringInput("f1.s1").setValue("A");
original.getStringInput("f1.s2").setValue("B");
original.getIntegerInput("f1.i").setValue(3);
upgrader.upgrade(original, target);
assertEquals("A", target.getStringInput("f1.s1").getValue());
assertEquals("B", target.getStringInput("f1.s2").getValue());
assertEquals(3, (long)target.getIntegerInput("f1.i").getValue());
}
/**
* We take the same forms on input and output and we
* expect that all values will be correctly transferred.
*/
@Test
public void testJobUpgrade() {
MJobForms original = job1();
MJobForms target = job1();
original.getStringInput("f1.s1").setValue("A");
original.getStringInput("f1.s2").setValue("B");
original.getIntegerInput("f1.i").setValue(3);
upgrader.upgrade(original, target);
assertEquals("A", target.getStringInput("f1.s1").getValue());
assertEquals("B", target.getStringInput("f1.s2").getValue());
assertEquals(3, (long)target.getIntegerInput("f1.i").getValue());
}
/**
* Upgrade scenario when new input has been added to the target forms.
*/
@Test
public void testNonExistingInput() {
MConnectionForms original = connection1();
MConnectionForms target = connection2();
original.getStringInput("f1.s1").setValue("A");
original.getStringInput("f1.s2").setValue("B");
original.getIntegerInput("f1.i").setValue(3);
upgrader.upgrade(original, target);
assertEquals("A", target.getStringInput("f1.s1").getValue());
assertNull(target.getStringInput("f1.s2_").getValue());
assertEquals(3, (long)target.getIntegerInput("f1.i").getValue());
}
/**
* Upgrade scenario when entire has been added in the target and
* therefore is missing in the original.
*/
@Test
public void testNonExistingForm() {
MConnectionForms original = connection1();
MConnectionForms target = connection3();
original.getStringInput("f1.s1").setValue("A");
original.getStringInput("f1.s2").setValue("B");
original.getIntegerInput("f1.i").setValue(3);
upgrader.upgrade(original, target);
assertNull(target.getStringInput("f2.s1").getValue());
assertNull(target.getStringInput("f2.s2").getValue());
assertNull(target.getIntegerInput("f2.i").getValue());
}
MJobForms job1() {
return new MJobForms(forms1());
}
MConnectionForms connection1() {
return new MConnectionForms(forms1());
}
MConnectionForms connection2() {
return new MConnectionForms(forms2());
}
MConnectionForms connection3() {
return new MConnectionForms(forms3());
}
List<MForm> forms1() {
List<MForm> list = new LinkedList<MForm>();
list.add(new MForm("f1", inputs1("f1")));
return list;
}
List<MInput<?>> inputs1(String formName) {
List<MInput<?>> list = new LinkedList<MInput<?>>();
list.add(new MStringInput(formName + ".s1", false, (short)30));
list.add(new MStringInput(formName + ".s2", false, (short)30));
list.add(new MIntegerInput(formName + ".i", false));
return list;
}
List<MForm> forms2() {
List<MForm> list = new LinkedList<MForm>();
list.add(new MForm("f1", inputs2("f1")));
return list;
}
List<MInput<?>> inputs2(String formName) {
List<MInput<?>> list = new LinkedList<MInput<?>>();
list.add(new MStringInput(formName + ".s1", false, (short)30));
list.add(new MStringInput(formName + ".s2_", false, (short)30));
list.add(new MIntegerInput(formName + ".i", false));
return list;
}
List<MForm> forms3() {
List<MForm> list = new LinkedList<MForm>();
list.add(new MForm("f2", inputs1("f2")));
return list;
}
}

Просмотреть файл

@ -40,8 +40,10 @@ public final class DerbyRepoConstants {
* 3 - Version 1.99.4
* SQ_SUBMISSION modified SQS_EXTERNAL_ID varchar(50)
* Increased size of SQ_CONNECTOR.SQC_VERSION to 64
* 4 - Version 1.99.4
* Changed to FROM/TO design.
*/
public static final int VERSION = 3;
public static final int VERSION = 4;
private DerbyRepoConstants() {
// Disable explicit object creation

Просмотреть файл

@ -19,6 +19,7 @@ package org.apache.sqoop.repository.derby;
import static org.apache.sqoop.repository.derby.DerbySchemaQuery.*;
import java.net.URL;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
@ -36,6 +37,8 @@ import org.apache.commons.lang.StringUtils;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.DirectionError;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.ConnectorHandler;
import org.apache.sqoop.connector.ConnectorManagerUtils;
import org.apache.sqoop.model.MBooleanInput;
import org.apache.sqoop.model.MConnection;
import org.apache.sqoop.model.MConnectionForms;
@ -73,6 +76,14 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
private static final String EMBEDDED_DERBY_DRIVER_CLASSNAME =
"org.apache.derby.jdbc.EmbeddedDriver";
/**
* Unique name of HDFS Connector.
* HDFS Connector was originally part of the Sqoop framework, but now is its
* own connector. This constant is used to pre-register the HDFS Connector
* so that jobs that are being upgraded can reference the HDFS Connector.
*/
private static final String CONNECTOR_HDFS = "hdfs-connector";
private JdbcRepositoryContext repoContext;
private DataSource dataSource;
private JdbcRepositoryTransactionFactory txFactory;
@ -391,6 +402,25 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
runQuery(QUERY_UPGRADE_TABLE_SQ_SUBMISSION_MODIFY_COLUMN_SQS_EXTERNAL_ID_VARCHAR_50, conn);
runQuery(QUERY_UPGRADE_TABLE_SQ_CONNECTOR_MODIFY_COLUMN_SQC_VERSION_VARCHAR_64, conn);
}
if(version <= 3) {
// Schema modifications
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_RENAME_COLUMN_SQF_OPERATION_TO_SQF_DIRECTION, conn);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_RENAME_COLUMN_SQB_CONNECTION_TO_SQB_FROM_CONNECTION, conn);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_SQB_TO_CONNECTION, conn);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_CONSTRAINT_SQB_SQN, conn);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_FROM, conn);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_TO, conn);
// Data modifications only for non-fresh install.
if (version > 0) {
// Register HDFS connector
updateJobData(conn, registerHdfsConnector(conn));
}
// Wait to remove SQB_TYPE (IMPORT/EXPORT) until we update data.
// Data updates depend on knowledge of the type of job.
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE, conn);
}
ResultSet rs = null;
PreparedStatement stmt = null;
@ -413,6 +443,172 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
}
}
/**
* Upgrade job data from IMPORT/EXPORT to FROM/TO.
* Since the framework is no longer responsible for HDFS,
* the HDFS connector/connection must be added.
* Also, the framework forms are moved around such that
* they belong to the added HDFS connector. Any extra forms
* are removed.
* NOTE: Connector forms should have a direction (FROM/TO),
* but framework forms should not.
*
* Here's a brief list describing the data migration process.
* 1. Change SQ_FORM.SQF_DIRECTION from IMPORT to FROM.
* 2. Change SQ_FORM.SQF_DIRECTION from EXPORT to TO.
* 3. Change EXPORT to TO in newly existing SQF_DIRECTION.
* This should affect connectors only since Connector forms
* should have had a value for SQF_OPERATION.
* 4. Change IMPORT to FROM in newly existing SQF_DIRECTION.
* This should affect connectors only since Connector forms
* should have had a value for SQF_OPERATION.
* 5. Add HDFS connector for jobs to reference.
* 6. Set 'input' and 'output' forms connector.
* to HDFS connector.
* 7. Throttling form was originally the second form in
* the framework. It should now be the first form.
* 8. Remove the EXPORT throttling form and ensure all of
* its dependencies point to the IMPORT throttling form.
* Then make sure the throttling form does not have a direction.
* Framework forms should not have a direction.
* 9. Create an HDFS connection to reference and update
* jobs to reference that connection. IMPORT jobs
* should have TO HDFS connector, EXPORT jobs should have
* FROM HDFS connector.
* 10. Update 'table' form names to 'fromTable' and 'toTable'.
* Also update the relevant inputs as well.
* @param conn
*/
private void updateJobData(Connection conn, long connectorId) {
if (LOG.isTraceEnabled()) {
LOG.trace("Updating existing data for generic connectors.");
}
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_OPERATION_TO_SQF_DIRECTION, conn,
Direction.FROM.toString(), "IMPORT");
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_OPERATION_TO_SQF_DIRECTION, conn,
Direction.TO.toString(), "EXPORT");
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR_HDFS_FORM_DIRECTION, conn,
Direction.FROM.toString(),
"input");
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR_HDFS_FORM_DIRECTION, conn,
Direction.TO.toString(),
"output");
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR, conn,
new Long(connectorId), "input", "output");
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_INPUT_UPDATE_THROTTLING_FORM_INPUTS, conn,
"IMPORT", "EXPORT");
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_REMOVE_EXTRA_FORM_INPUTS, conn,
"throttling", "EXPORT");
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_REMOVE_EXTRA_FRAMEWORK_FORM, conn,
"throttling", "EXPORT");
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_DIRECTION_TO_NULL, conn,
"throttling");
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_FRAMEWORK_INDEX, conn,
new Long(0), "throttling");
MConnection hdfsConnection = createHdfsConnection(conn);
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION_COPY_SQB_FROM_CONNECTION, conn,
"EXPORT");
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_FROM_CONNECTION, conn,
new Long(hdfsConnection.getPersistenceId()), "EXPORT");
runQuery(QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION, conn,
new Long(hdfsConnection.getPersistenceId()), "IMPORT");
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_NAME, conn,
"fromTable", "table", Direction.FROM.toString());
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_INPUT_NAMES, conn,
Direction.FROM.toString().toLowerCase(), "fromTable", Direction.FROM.toString());
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_NAME, conn,
"toTable", "table", Direction.TO.toString());
runQuery(QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_INPUT_NAMES, conn,
Direction.TO.toString().toLowerCase(), "toTable", Direction.TO.toString());
if (LOG.isTraceEnabled()) {
LOG.trace("Updated existing data for generic connectors.");
}
}
/**
* Pre-register HDFS Connector so that metadata upgrade will work.
*/
protected long registerHdfsConnector(Connection conn) {
if (LOG.isTraceEnabled()) {
LOG.trace("Begin HDFS Connector pre-loading.");
}
List<URL> connectorConfigs = ConnectorManagerUtils.getConnectorConfigs();
if (LOG.isInfoEnabled()) {
LOG.info("Connector config urls: " + connectorConfigs);
}
ConnectorHandler handler = null;
for (URL url : connectorConfigs) {
handler = new ConnectorHandler(url);
if (handler.getMetadata().getPersistenceId() != -1) {
return handler.getMetadata().getPersistenceId();
}
if (handler.getUniqueName().equals(CONNECTOR_HDFS)) {
try {
PreparedStatement baseConnectorStmt = conn.prepareStatement(
STMT_INSERT_CONNECTOR_BASE,
Statement.RETURN_GENERATED_KEYS);
baseConnectorStmt.setString(1, handler.getMetadata().getUniqueName());
baseConnectorStmt.setString(2, handler.getMetadata().getClassName());
baseConnectorStmt.setString(3, "0");
if (baseConnectorStmt.executeUpdate() == 1) {
ResultSet rsetConnectorId = baseConnectorStmt.getGeneratedKeys();
if (rsetConnectorId.next()) {
if (LOG.isInfoEnabled()) {
LOG.info("HDFS Connector pre-loaded: " + rsetConnectorId.getLong(1));
}
return rsetConnectorId.getLong(1);
}
}
} catch (SQLException e) {
throw new SqoopException(DerbyRepoError.DERBYREPO_0013);
}
break;
}
}
return -1L;
}
/**
* Create an HDFS connection.
* Intended to be used when moving HDFS connector out of framework
* to its own connector.
*
* NOTE: Upgrade path only!
*/
private MConnection createHdfsConnection(Connection conn) {
if (LOG.isTraceEnabled()) {
LOG.trace("Creating HDFS connection.");
}
MConnector hdfsConnector = this.findConnector(CONNECTOR_HDFS, conn);
MFramework framework = findFramework(conn);
MConnection hdfsConnection = new MConnection(
hdfsConnector.getPersistenceId(),
hdfsConnector.getConnectionForms(),
framework.getConnectionForms());
this.createConnection(hdfsConnection, conn);
if (LOG.isTraceEnabled()) {
LOG.trace("Created HDFS connection.");
}
return hdfsConnection;
}
/**
* {@inheritDoc}
*/
@ -536,7 +732,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
List<MForm> connectionForms = new ArrayList<MForm>();
List<MForm> jobForms = new ArrayList<MForm>();
loadForms(connectionForms, jobForms, formFetchStmt, inputFetchStmt, 1);
loadFrameworkForms(connectionForms, jobForms, formFetchStmt, inputFetchStmt, 1);
// Return nothing If there aren't any framework metadata
if(connectionForms.isEmpty() && jobForms.isEmpty()) {
@ -948,11 +1144,11 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
conn);
createInputValues(STMT_INSERT_JOB_INPUT,
jobId,
job.getFrameworkPart().getForms(),
job.getConnectorPart(Direction.TO).getForms(),
conn);
createInputValues(STMT_INSERT_JOB_INPUT,
jobId,
job.getConnectorPart(Direction.TO).getForms(),
job.getFrameworkPart().getForms(),
conn);
job.setPersistenceId(jobId);
@ -993,9 +1189,13 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
job.getConnectorPart(Direction.FROM).getForms(),
conn);
createInputValues(STMT_INSERT_JOB_INPUT,
job.getPersistenceId(),
job.getFrameworkPart().getForms(),
conn);
job.getPersistenceId(),
job.getConnectorPart(Direction.TO).getForms(),
conn);
createInputValues(STMT_INSERT_JOB_INPUT,
job.getPersistenceId(),
job.getFrameworkPart().getForms(),
conn);
} catch (SQLException ex) {
logException(ex, job);
@ -1157,6 +1357,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
try {
stmt = conn.prepareStatement(STMT_SELECT_ALL_JOBS_FOR_CONNECTOR);
stmt.setLong(1, connectorId);
stmt.setLong(2, connectorId);
return loadJobs(stmt, conn);
} catch (SQLException ex) {
@ -1664,7 +1865,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
formConnectorFetchStmt.setLong(1, connectorId);
inputFetchStmt.setLong(1, id);
//inputFetchStmt.setLong(2, XXX); // Will be filled by loadForms
//inputFetchStmt.setLong(2, XXX); // Will be filled by loadFrameworkForms
inputFetchStmt.setLong(3, id);
List<MForm> connectorConnForms = new ArrayList<MForm>();
@ -1674,9 +1875,9 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
List<MForm> toJobForms = new ArrayList<MForm>();
loadConnectorForms(connectorConnForms, fromJobForms, toJobForms,
formConnectorFetchStmt, inputFetchStmt, 2);
loadForms(frameworkConnForms, frameworkJobForms,
formFrameworkFetchStmt, inputFetchStmt, 2);
formConnectorFetchStmt, inputFetchStmt, 2);
loadFrameworkForms(frameworkConnForms, frameworkJobForms,
formFrameworkFetchStmt, inputFetchStmt, 2);
MConnection connection = new MConnection(connectorId,
new MConnectionForms(connectorConnForms),
@ -1736,7 +1937,7 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
toFormConnectorFetchStmt.setLong(1,toConnectorId);
inputFetchStmt.setLong(1, id);
//inputFetchStmt.setLong(1, XXX); // Will be filled by loadForms
//inputFetchStmt.setLong(1, XXX); // Will be filled by loadFrameworkForms
inputFetchStmt.setLong(3, id);
List<MForm> toConnectorConnForms = new ArrayList<MForm>();
@ -1765,8 +1966,8 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
toConnectorToJobForms,
toFormConnectorFetchStmt, inputFetchStmt, 2);
loadForms(frameworkConnForms, frameworkJobForms,
formFrameworkFetchStmt, inputFetchStmt, 2);
loadFrameworkForms(frameworkConnForms, frameworkJobForms,
formFrameworkFetchStmt, inputFetchStmt, 2);
MJob job = new MJob(
fromConnectorId, toConnectorId,
@ -1902,11 +2103,22 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
*
* @param query Query that should be executed
*/
private void runQuery(String query, Connection conn) {
Statement stmt = null;
private void runQuery(String query, Connection conn, Object... args) {
PreparedStatement stmt = null;
try {
stmt = conn.createStatement();
if (stmt.execute(query)) {
stmt = conn.prepareStatement(query);
for (int i = 0; i < args.length; ++i) {
if (args[i] instanceof String) {
stmt.setString(i + 1, (String)args[i]);
} else if (args[i] instanceof Long) {
stmt.setLong(i + 1, (Long) args[i]);
} else {
stmt.setObject(i, args[i]);
}
}
if (stmt.execute()) {
ResultSet rset = stmt.getResultSet();
int count = 0;
while (rset.next()) {
@ -1936,18 +2148,18 @@ public class DerbyRepositoryHandler extends JdbcRepositoryHandler {
* @param inputFetchStmt Prepare statement for fetching inputs
* @throws SQLException In case of any failure on Derby side
*/
public void loadForms(List<MForm> connectionForms,
List<MForm> jobForms,
PreparedStatement formFetchStmt,
PreparedStatement inputFetchStmt,
int formPosition) throws SQLException {
public void loadFrameworkForms(List<MForm> connectionForms,
List<MForm> jobForms,
PreparedStatement formFetchStmt,
PreparedStatement inputFetchStmt,
int formPosition) throws SQLException {
// Get list of structures from database
ResultSet rsetForm = formFetchStmt.executeQuery();
while (rsetForm.next()) {
long formId = rsetForm.getLong(1);
Long formConnectorId = rsetForm.getLong(2);
String operation = rsetForm.getString(3);
String direction = rsetForm.getString(3);
String formName = rsetForm.getString(4);
String formType = rsetForm.getString(5);
int formIndex = rsetForm.getInt(6);

Просмотреть файл

@ -69,6 +69,8 @@ public final class DerbySchemaConstants {
public static final String COLUMN_SQF_OPERATION = "SQF_OPERATION";
public static final String COLUMN_SQF_DIRECTION = "SQF_DIRECTION";
public static final String COLUMN_SQF_NAME = "SQF_NAME";
public static final String COLUMN_SQF_TYPE = "SQF_TYPE";
@ -144,6 +146,10 @@ public final class DerbySchemaConstants {
public static final String COLUMN_SQB_NAME = "SQB_NAME";
public static final String COLUMN_SQB_CONNECTION = "SQB_CONNECTION";
public static final String COLUMN_SQB_TYPE = "SQB_TYPE";
public static final String COLUMN_SQB_FROM_CONNECTION = "SQB_FROM_CONNECTION";
public static final String COLUMN_SQB_TO_CONNECTION = "SQB_TO_CONNECTION";
@ -162,6 +168,14 @@ public final class DerbySchemaConstants {
public static final String CONSTRAINT_SQB_SQN = SCHEMA_PREFIX + CONSTRAINT_SQB_SQN_NAME;
public static final String CONSTRAINT_SQB_SQN_FROM_NAME = CONSTRAINT_PREFIX + "SQB_SQN_FROM";
public static final String CONSTRAINT_SQB_SQN_FROM = SCHEMA_PREFIX + CONSTRAINT_SQB_SQN_FROM_NAME;
public static final String CONSTRAINT_SQB_SQN_TO_NAME = CONSTRAINT_PREFIX + "SQB_SQN_TO";
public static final String CONSTRAINT_SQB_SQN_TO = SCHEMA_PREFIX + CONSTRAINT_SQB_SQN_TO_NAME;
// SQ_CONNECTION_INPUT
public static final String TABLE_SQ_CONNECTION_INPUT_NAME =

Просмотреть файл

@ -50,16 +50,16 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
* <p>
* <strong>SQ_FORM</strong>: Form details.
* <pre>
* +-----------------------------+
* | SQ_FORM |
* +-----------------------------+
* | SQF_ID: BIGINT PK AUTO-GEN |
* | SQF_CONNECTOR: BIGINT | FK SQ_CONNECTOR(SQC_ID),NULL for framework
* | SQF_OPERATION: VARCHAR(32) | "IMPORT"|"EXPORT"|NULL
* | SQF_NAME: VARCHAR(64) |
* | SQF_TYPE: VARCHAR(32) | "CONNECTION"|"JOB"
* | SQF_INDEX: SMALLINT |
* +-----------------------------+
* +----------------------------------+
* | SQ_FORM |
* +----------------------------------+
* | SQF_ID: BIGINT PK AUTO-GEN |
* | SQF_CONNECTOR: BIGINT | FK SQ_CONNECTOR(SQC_ID),NULL for framework
* | SQF_DIRECTION: VARCHAR(32) | "FROM"|"TO"|NULL
* | SQF_NAME: VARCHAR(64) |
* | SQF_TYPE: VARCHAR(32) | "CONNECTION"|"JOB"
* | SQF_INDEX: SMALLINT |
* +----------------------------------+
* </pre>
* </p>
* <p>
@ -104,8 +104,8 @@ import static org.apache.sqoop.repository.derby.DerbySchemaConstants.*;
* +--------------------------------+
* | SQB_ID: BIGINT PK AUTO-GEN |
* | SQB_NAME: VARCHAR(64) |
* | SQB_TYPE: VARCHAR(64) |
* | SQB_CONNECTION: BIGINT | FK SQ_CONNECTION(SQN_ID)
* | SQB_FROM_CONNECTION: BIGINT | FK SQ_CONNECTION(SQN_ID)
* | SQB_TO_CONNECTION: BIGINT | FK SQ_CONNECTION(SQN_ID)
* | SQB_CREATION_USER: VARCHAR(32) |
* | SQB_CREATION_DATE: TIMESTAMP |
* | SQB_UPDATE_USER: VARCHAR(32) |
@ -286,13 +286,13 @@ public final class DerbySchemaQuery {
public static final String QUERY_CREATE_TABLE_SQ_JOB =
"CREATE TABLE " + TABLE_SQ_JOB + " ("
+ COLUMN_SQB_ID + " BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1) PRIMARY KEY, "
+ COLUMN_SQB_FROM_CONNECTION + " BIGINT, "
+ COLUMN_SQB_TO_CONNECTION + " BIGINT, "
+ COLUMN_SQB_CONNECTION + " BIGINT, "
+ COLUMN_SQB_NAME + " VARCHAR(64), "
+ COLUMN_SQB_TYPE + " VARCHAR(64),"
+ COLUMN_SQB_CREATION_DATE + " TIMESTAMP,"
+ COLUMN_SQB_UPDATE_DATE + " TIMESTAMP,"
+ "CONSTRAINT " + CONSTRAINT_SQB_SQN + " "
+ "FOREIGN KEY(" + COLUMN_SQB_FROM_CONNECTION + ") "
+ "FOREIGN KEY(" + COLUMN_SQB_CONNECTION + ") "
+ "REFERENCES " + TABLE_SQ_CONNECTION + " (" + COLUMN_SQN_ID + ")"
+ ")";
@ -459,7 +459,7 @@ public final class DerbySchemaQuery {
"SELECT "
+ COLUMN_SQF_ID + ", "
+ COLUMN_SQF_CONNECTOR + ", "
+ COLUMN_SQF_OPERATION + ", "
+ COLUMN_SQF_DIRECTION + ", "
+ COLUMN_SQF_NAME + ", "
+ COLUMN_SQF_TYPE + ", "
+ COLUMN_SQF_INDEX
@ -472,13 +472,13 @@ public final class DerbySchemaQuery {
"SELECT "
+ COLUMN_SQF_ID + ", "
+ COLUMN_SQF_CONNECTOR + ", "
+ COLUMN_SQF_OPERATION + ", "
+ COLUMN_SQF_DIRECTION + ", "
+ COLUMN_SQF_NAME + ", "
+ COLUMN_SQF_TYPE + ", "
+ COLUMN_SQF_INDEX
+ " FROM " + TABLE_SQ_FORM
+ " WHERE " + COLUMN_SQF_CONNECTOR + " IS NULL "
+ " ORDER BY " + COLUMN_SQF_INDEX;
+ " ORDER BY " + COLUMN_SQF_TYPE + ", " + COLUMN_SQF_DIRECTION + ", " + COLUMN_SQF_INDEX;
// DML: Fetch inputs for a given form
public static final String STMT_FETCH_INPUT =
@ -530,10 +530,10 @@ public final class DerbySchemaQuery {
+ COLUMN_SQBI_VALUE
+ " FROM " + TABLE_SQ_INPUT
+ " LEFT OUTER JOIN " + TABLE_SQ_JOB_INPUT
+ " ON " + COLUMN_SQBI_INPUT + " = " + COLUMN_SQI_ID
+ " AND " + COLUMN_SQBI_JOB + " = ?"
+ " WHERE " + COLUMN_SQI_FORM + " = ?" +
" AND (" + COLUMN_SQBI_JOB + " = ? OR " + COLUMN_SQBI_JOB + " IS NULL)"
+ " ON " + COLUMN_SQBI_INPUT + " = " + COLUMN_SQI_ID
+ " AND " + COLUMN_SQBI_JOB + " = ?"
+ " WHERE " + COLUMN_SQI_FORM + " = ?"
+ " AND (" + COLUMN_SQBI_JOB + " = ? OR " + COLUMN_SQBI_JOB + " IS NULL)"
+ " ORDER BY " + COLUMN_SQI_INDEX;
// DML: Insert connector base
@ -548,7 +548,7 @@ public final class DerbySchemaQuery {
public static final String STMT_INSERT_FORM_BASE =
"INSERT INTO " + TABLE_SQ_FORM + " ("
+ COLUMN_SQF_CONNECTOR + ", "
+ COLUMN_SQF_OPERATION + ", "
+ COLUMN_SQF_DIRECTION + ", "
+ COLUMN_SQF_NAME + ", "
+ COLUMN_SQF_TYPE + ", "
+ COLUMN_SQF_INDEX
@ -770,50 +770,36 @@ public final class DerbySchemaQuery {
+ "job." + COLUMN_SQB_CREATION_DATE + ", "
+ "job." + COLUMN_SQB_UPDATE_USER + ", "
+ "job." + COLUMN_SQB_UPDATE_DATE
+ " FROM " + TABLE_SQ_JOB + " AS job"
+ " FROM " + TABLE_SQ_JOB + " job"
+ " LEFT JOIN " + TABLE_SQ_CONNECTION
+ " as FROM_CONNECTOR ON " + COLUMN_SQB_FROM_CONNECTION + " = FROM_CONNECTOR." + COLUMN_SQN_ID
+ " FROM_CONNECTOR ON " + COLUMN_SQB_FROM_CONNECTION + " = FROM_CONNECTOR." + COLUMN_SQN_ID
+ " LEFT JOIN " + TABLE_SQ_CONNECTION
+ " as TO_CONNECTOR ON " + COLUMN_SQB_TO_CONNECTION + " = TO_CONNECTOR." + COLUMN_SQN_ID
+ " TO_CONNECTOR ON " + COLUMN_SQB_TO_CONNECTION + " = TO_CONNECTOR." + COLUMN_SQN_ID
+ " WHERE " + COLUMN_SQB_ID + " = ?";
// DML: Select all jobs
public static final String STMT_SELECT_JOB_ALL =
"SELECT "
+ "FROM_CONNECTOR." + COLUMN_SQN_CONNECTOR + ", "
+ "TO_CONNECTOR." + COLUMN_SQN_CONNECTOR + ", "
+ "job." + COLUMN_SQB_ID + ", "
+ "job." + COLUMN_SQB_NAME + ", "
+ "job." + COLUMN_SQB_FROM_CONNECTION + ", "
+ "job." + COLUMN_SQB_TO_CONNECTION + ", "
+ "job." + COLUMN_SQB_ENABLED + ", "
+ "job." + COLUMN_SQB_CREATION_USER + ", "
+ "job." + COLUMN_SQB_CREATION_DATE + ", "
+ "job." + COLUMN_SQB_UPDATE_USER + ", "
+ "job." + COLUMN_SQB_UPDATE_DATE
+ " FROM " + TABLE_SQ_JOB + " AS job"
+ " LEFT JOIN " + TABLE_SQ_CONNECTION
+ " as FROM_CONNECTOR ON " + COLUMN_SQB_FROM_CONNECTION + " = FROM_CONNECTOR." + COLUMN_SQN_ID
+ " LEFT JOIN " + TABLE_SQ_CONNECTION
+ " as TO_CONNECTOR ON " + COLUMN_SQB_TO_CONNECTION + " = TO_CONNECTOR." + COLUMN_SQN_ID;
+ "FROM_CONNECTION." + COLUMN_SQN_CONNECTOR + ", "
+ "TO_CONNECTION." + COLUMN_SQN_CONNECTOR + ", "
+ "JOB." + COLUMN_SQB_ID + ", "
+ "JOB." + COLUMN_SQB_NAME + ", "
+ "JOB." + COLUMN_SQB_FROM_CONNECTION + ", "
+ "JOB." + COLUMN_SQB_TO_CONNECTION + ", "
+ "JOB." + COLUMN_SQB_ENABLED + ", "
+ "JOB." + COLUMN_SQB_CREATION_USER + ", "
+ "JOB." + COLUMN_SQB_CREATION_DATE + ", "
+ "JOB." + COLUMN_SQB_UPDATE_USER + ", "
+ "JOB." + COLUMN_SQB_UPDATE_DATE
+ " FROM " + TABLE_SQ_JOB + " JOB"
+ " LEFT JOIN " + TABLE_SQ_CONNECTION + " FROM_CONNECTION"
+ " ON " + COLUMN_SQB_FROM_CONNECTION + " = FROM_CONNECTION." + COLUMN_SQN_ID
+ " LEFT JOIN " + TABLE_SQ_CONNECTION + " TO_CONNECTION"
+ " ON " + COLUMN_SQB_TO_CONNECTION + " = TO_CONNECTION." + COLUMN_SQN_ID;
// DML: Select all jobs for a Connector
public static final String STMT_SELECT_ALL_JOBS_FOR_CONNECTOR =
"SELECT "
+ COLUMN_SQN_CONNECTOR + ", "
+ COLUMN_SQB_ID + ", "
+ COLUMN_SQB_NAME + ", "
+ COLUMN_SQB_FROM_CONNECTION + ", "
+ COLUMN_SQB_TO_CONNECTION + ", "
+ COLUMN_SQB_ENABLED + ", "
+ COLUMN_SQB_CREATION_USER + ", "
+ COLUMN_SQB_CREATION_DATE + ", "
+ COLUMN_SQB_UPDATE_USER + ", "
+ COLUMN_SQB_UPDATE_DATE
+ " FROM " + TABLE_SQ_JOB
+ " LEFT JOIN " + TABLE_SQ_CONNECTION
+ " ON " + COLUMN_SQB_FROM_CONNECTION + " = " + COLUMN_SQN_ID
+ " AND " + COLUMN_SQN_CONNECTOR + " = ? ";
public static final String STMT_SELECT_ALL_JOBS_FOR_CONNECTOR = STMT_SELECT_JOB_ALL
+ " WHERE FROM_CONNECTION." + COLUMN_SQN_CONNECTOR + " = ? OR TO_CONNECTION." + COLUMN_SQN_CONNECTOR + " = ?";
// DML: Insert new submission
public static final String STMT_INSERT_SUBMISSION =
@ -964,6 +950,122 @@ public final class DerbySchemaQuery {
"ALTER TABLE " + TABLE_SQ_CONNECTOR + " ALTER COLUMN "
+ COLUMN_SQC_VERSION + " SET DATA TYPE VARCHAR(64)";
// Version 4 Upgrade
public static final String QUERY_UPGRADE_TABLE_SQ_JOB_RENAME_COLUMN_SQB_CONNECTION_TO_SQB_FROM_CONNECTION =
"RENAME COLUMN " + TABLE_SQ_JOB + "." + COLUMN_SQB_CONNECTION
+ " TO " + COLUMN_SQB_FROM_CONNECTION;
public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_COLUMN_SQB_TO_CONNECTION =
"ALTER TABLE " + TABLE_SQ_JOB + " ADD COLUMN " + COLUMN_SQB_TO_CONNECTION
+ " BIGINT";
public static final String QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_CONSTRAINT_SQB_SQN =
"ALTER TABLE " + TABLE_SQ_JOB + " DROP CONSTRAINT " + CONSTRAINT_SQB_SQN;
public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_FROM =
"ALTER TABLE " + TABLE_SQ_JOB + " ADD CONSTRAINT " + CONSTRAINT_SQB_SQN_FROM
+ " FOREIGN KEY (" + COLUMN_SQB_FROM_CONNECTION + ") REFERENCES "
+ TABLE_SQ_CONNECTION + " (" + COLUMN_SQN_ID + ")";
public static final String QUERY_UPGRADE_TABLE_SQ_JOB_ADD_CONSTRAINT_SQB_SQN_TO =
"ALTER TABLE " + TABLE_SQ_JOB + " ADD CONSTRAINT " + CONSTRAINT_SQB_SQN_TO
+ " FOREIGN KEY (" + COLUMN_SQB_TO_CONNECTION + ") REFERENCES "
+ TABLE_SQ_CONNECTION + " (" + COLUMN_SQN_ID + ")";
public static final String QUERY_UPGRADE_TABLE_SQ_FORM_RENAME_COLUMN_SQF_OPERATION_TO_SQF_DIRECTION =
"RENAME COLUMN " + TABLE_SQ_FORM + "." + COLUMN_SQF_OPERATION
+ " TO " + COLUMN_SQF_DIRECTION;
public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_OPERATION_TO_SQF_DIRECTION =
"UPDATE " + TABLE_SQ_FORM + " SET " + COLUMN_SQF_DIRECTION
+ "=? WHERE " + COLUMN_SQF_DIRECTION + "=?"
+ " AND " + COLUMN_SQF_CONNECTOR + " IS NOT NULL";
public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR =
"UPDATE " + TABLE_SQ_FORM + " SET " + COLUMN_SQF_CONNECTOR + "= ?"
+ " WHERE " + COLUMN_SQF_CONNECTOR + " IS NULL AND "
+ COLUMN_SQF_NAME + " IN (?, ?)";
public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_CONNECTOR_HDFS_FORM_DIRECTION =
"UPDATE " + TABLE_SQ_FORM + " SET " + COLUMN_SQF_DIRECTION + "= ?"
+ " WHERE " + COLUMN_SQF_NAME + "= ?";
public static final String QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION_COPY_SQB_FROM_CONNECTION =
"UPDATE " + TABLE_SQ_JOB + " SET "
+ COLUMN_SQB_TO_CONNECTION + "=" + COLUMN_SQB_FROM_CONNECTION
+ " WHERE " + COLUMN_SQB_TYPE + "= ?";
public static final String QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_FROM_CONNECTION =
"UPDATE " + TABLE_SQ_JOB + " SET " + COLUMN_SQB_FROM_CONNECTION + "=?"
+ " WHERE " + COLUMN_SQB_TYPE + "= ?";
public static final String QUERY_UPGRADE_TABLE_SQ_JOB_UPDATE_SQB_TO_CONNECTION =
"UPDATE " + TABLE_SQ_JOB + " SET " + COLUMN_SQB_TO_CONNECTION + "=?"
+ " WHERE " + COLUMN_SQB_TYPE + "= ?";
public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_SQF_NAME =
"UPDATE " + TABLE_SQ_FORM + " SET "
+ COLUMN_SQF_NAME + "= ?"
+ " WHERE " + COLUMN_SQF_NAME + "= ?"
+ " AND " + COLUMN_SQF_DIRECTION + "= ?";
/**
* Intended to rename forms based on direction.
* e.g. If SQ_FORM.SQF_NAME = 'table' and parameter 1 = 'from'
* then SQ_FORM.SQF_NAME = 'fromTable'.
*/
public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_TABLE_INPUT_NAMES =
"UPDATE " + TABLE_SQ_INPUT + " SET "
+ COLUMN_SQI_NAME + "=("
+ "? || UPPER(SUBSTR(" + COLUMN_SQI_NAME + ",1,1))"
+ " || SUBSTR(" + COLUMN_SQI_NAME + ",2) )"
+ " WHERE " + COLUMN_SQI_FORM + " IN ("
+ " SELECT " + COLUMN_SQF_ID + " FROM " + TABLE_SQ_FORM + " WHERE " + COLUMN_SQF_NAME + "= ?"
+ " AND " + COLUMN_SQF_DIRECTION + "= ?)";
public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_DIRECTION_TO_NULL =
"UPDATE " + TABLE_SQ_FORM + " SET "
+ COLUMN_SQF_DIRECTION + "= NULL"
+ " WHERE " + COLUMN_SQF_NAME + "= ?";
public static final String QUERY_SELECT_THROTTLING_FORM_INPUT_IDS =
"SELECT SQI." + COLUMN_SQI_ID + " FROM " + TABLE_SQ_INPUT + " SQI"
+ " INNER JOIN " + TABLE_SQ_FORM + " SQF ON SQI." + COLUMN_SQI_FORM + "=SQF." + COLUMN_SQF_ID
+ " WHERE SQF." + COLUMN_SQF_NAME + "='throttling' AND SQF." + COLUMN_SQF_DIRECTION + "=?";
/**
* Intended to change SQ_JOB_INPUT.SQBI_INPUT from EXPORT
* throttling form, to IMPORT throttling form.
*/
public static final String QUERY_UPGRADE_TABLE_SQ_JOB_INPUT_UPDATE_THROTTLING_FORM_INPUTS =
"UPDATE " + TABLE_SQ_JOB_INPUT + " SQBI SET"
+ " SQBI." + COLUMN_SQBI_INPUT + "=(" + QUERY_SELECT_THROTTLING_FORM_INPUT_IDS
+ " AND SQI." + COLUMN_SQI_NAME + "=("
+ "SELECT SQI2." + COLUMN_SQI_NAME + " FROM " + TABLE_SQ_INPUT + " SQI2"
+ " WHERE SQI2." + COLUMN_SQI_ID + "=SQBI." + COLUMN_SQBI_INPUT + " FETCH FIRST 1 ROWS ONLY"
+ "))"
+ "WHERE SQBI." + COLUMN_SQBI_INPUT + " IN (" + QUERY_SELECT_THROTTLING_FORM_INPUT_IDS + ")";
public static final String QUERY_UPGRADE_TABLE_SQ_FORM_REMOVE_EXTRA_FORM_INPUTS =
"DELETE FROM " + TABLE_SQ_INPUT + " SQI"
+ " WHERE SQI." + COLUMN_SQI_FORM + " IN ("
+ "SELECT SQF." + COLUMN_SQF_ID + " FROM " + TABLE_SQ_FORM + " SQF "
+ " WHERE SQF." + COLUMN_SQF_NAME + "= ?"
+ " AND SQF." + COLUMN_SQF_DIRECTION + "= ?)";
public static final String QUERY_UPGRADE_TABLE_SQ_FORM_REMOVE_EXTRA_FRAMEWORK_FORM =
"DELETE FROM " + TABLE_SQ_FORM
+ " WHERE " + COLUMN_SQF_NAME + "= ?"
+ " AND " + COLUMN_SQF_DIRECTION + "= ?";
public static final String QUERY_UPGRADE_TABLE_SQ_FORM_UPDATE_FRAMEWORK_INDEX =
"UPDATE " + TABLE_SQ_FORM + " SET "
+ COLUMN_SQF_INDEX + "= ?"
+ " WHERE " + COLUMN_SQF_NAME + "= ?";
public static final String QUERY_UPGRADE_TABLE_SQ_JOB_REMOVE_COLUMN_SQB_TYPE =
"ALTER TABLE " + TABLE_SQ_JOB + " DROP COLUMN " + COLUMN_SQB_TYPE;
private DerbySchemaQuery() {
// Disable explicit object creation
}

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -33,213 +33,213 @@ import java.util.Map;
*/
public class TestConnectionHandling extends DerbyTestCase {
// DerbyRepositoryHandler handler;
//
// @Override
// public void setUp() throws Exception {
// super.setUp();
//
// handler = new DerbyRepositoryHandler();
//
// // We always needs schema for this test case
// createSchema();
//
// // We always needs connector and framework structures in place
// loadConnectorAndFramework();
// }
//
// public void testFindConnection() throws Exception {
// // Let's try to find non existing connection
// try {
// handler.findConnection(1, getDerbyConnection());
// fail();
// } catch(SqoopException ex) {
// assertEquals(DerbyRepoError.DERBYREPO_0024, ex.getErrorCode());
// }
//
// // Load prepared connections into database
// loadConnections();
//
// MConnection connA = handler.findConnection(1, getDerbyConnection());
// assertNotNull(connA);
// assertEquals(1, connA.getPersistenceId());
// assertEquals("CA", connA.getName());
//
// List<MForm> forms;
//
// // Check connector part
// forms = connA.getConnectorPart().getForms();
// assertEquals("Value1", forms.get(0).getInputs().get(0).getValue());
// assertNull(forms.get(0).getInputs().get(1).getValue());
// assertEquals("Value3", forms.get(1).getInputs().get(0).getValue());
// assertNull(forms.get(1).getInputs().get(1).getValue());
//
// // Check framework part
// forms = connA.getFrameworkPart().getForms();
// assertEquals("Value13", forms.get(0).getInputs().get(0).getValue());
// assertNull(forms.get(0).getInputs().get(1).getValue());
// assertEquals("Value15", forms.get(1).getInputs().get(0).getValue());
// assertNull(forms.get(1).getInputs().get(1).getValue());
// }
//
// public void testFindConnections() throws Exception {
// List<MConnection> list;
//
// // Load empty list on empty repository
// list = handler.findConnections(getDerbyConnection());
// assertEquals(0, list.size());
//
// loadConnections();
//
// // Load all two connections on loaded repository
// list = handler.findConnections(getDerbyConnection());
// assertEquals(2, list.size());
//
// assertEquals("CA", list.get(0).getName());
// assertEquals("CB", list.get(1).getName());
// }
//
// public void testExistsConnection() throws Exception {
// // There shouldn't be anything on empty repository
// assertFalse(handler.existsConnection(1, getDerbyConnection()));
// assertFalse(handler.existsConnection(2, getDerbyConnection()));
// assertFalse(handler.existsConnection(3, getDerbyConnection()));
//
// loadConnections();
//
// assertTrue(handler.existsConnection(1, getDerbyConnection()));
// assertTrue(handler.existsConnection(2, getDerbyConnection()));
// assertFalse(handler.existsConnection(3, getDerbyConnection()));
// }
//
// public void testCreateConnection() throws Exception {
// MConnection connection = getConnection();
//
// // Load some data
// fillConnection(connection);
//
// handler.createConnection(connection, getDerbyConnection());
//
// assertEquals(1, connection.getPersistenceId());
// assertCountForTable("SQOOP.SQ_CONNECTION", 1);
// assertCountForTable("SQOOP.SQ_CONNECTION_INPUT", 4);
//
// MConnection retrieved = handler.findConnection(1, getDerbyConnection());
// assertEquals(1, retrieved.getPersistenceId());
//
// List<MForm> forms;
// forms = connection.getConnectorPart().getForms();
// assertEquals("Value1", forms.get(0).getInputs().get(0).getValue());
// assertNull(forms.get(0).getInputs().get(1).getValue());
// assertEquals("Value2", forms.get(1).getInputs().get(0).getValue());
// assertNull(forms.get(1).getInputs().get(1).getValue());
//
// forms = connection.getFrameworkPart().getForms();
// assertEquals("Value13", forms.get(0).getInputs().get(0).getValue());
// assertNull(forms.get(0).getInputs().get(1).getValue());
// assertEquals("Value15", forms.get(1).getInputs().get(0).getValue());
// assertNull(forms.get(1).getInputs().get(1).getValue());
//
// // Let's create second connection
// connection = getConnection();
// fillConnection(connection);
//
// handler.createConnection(connection, getDerbyConnection());
//
// assertEquals(2, connection.getPersistenceId());
// assertCountForTable("SQOOP.SQ_CONNECTION", 2);
// assertCountForTable("SQOOP.SQ_CONNECTION_INPUT", 8);
// }
//
// public void testInUseConnection() throws Exception {
// loadConnections();
//
// assertFalse(handler.inUseConnection(1, getDerbyConnection()));
//
// loadJobs();
//
// assertTrue(handler.inUseConnection(1, getDerbyConnection()));
// }
//
// public void testUpdateConnection() throws Exception {
// loadConnections();
//
// MConnection connection = handler.findConnection(1, getDerbyConnection());
//
// List<MForm> forms;
//
// forms = connection.getConnectorPart().getForms();
// ((MStringInput)forms.get(0).getInputs().get(0)).setValue("Updated");
// ((MMapInput)forms.get(0).getInputs().get(1)).setValue(null);
// ((MStringInput)forms.get(1).getInputs().get(0)).setValue("Updated");
// ((MMapInput)forms.get(1).getInputs().get(1)).setValue(null);
//
// forms = connection.getFrameworkPart().getForms();
// ((MStringInput)forms.get(0).getInputs().get(0)).setValue("Updated");
// ((MMapInput)forms.get(0).getInputs().get(1)).setValue(new HashMap<String, String>()); // inject new map value
// ((MStringInput)forms.get(1).getInputs().get(0)).setValue("Updated");
// ((MMapInput)forms.get(1).getInputs().get(1)).setValue(new HashMap<String, String>()); // inject new map value
//
// connection.setName("name");
//
// handler.updateConnection(connection, getDerbyConnection());
//
// assertEquals(1, connection.getPersistenceId());
// assertCountForTable("SQOOP.SQ_CONNECTION", 2);
// assertCountForTable("SQOOP.SQ_CONNECTION_INPUT", 10);
//
// MConnection retrieved = handler.findConnection(1, getDerbyConnection());
// assertEquals("name", connection.getName());
//
// forms = retrieved.getConnectorPart().getForms();
// assertEquals("Updated", forms.get(0).getInputs().get(0).getValue());
// assertNull(forms.get(0).getInputs().get(1).getValue());
// assertEquals("Updated", forms.get(1).getInputs().get(0).getValue());
// assertNull(forms.get(1).getInputs().get(1).getValue());
//
// forms = retrieved.getFrameworkPart().getForms();
// assertEquals("Updated", forms.get(0).getInputs().get(0).getValue());
// assertNotNull(forms.get(0).getInputs().get(1).getValue());
// assertEquals(((Map)forms.get(0).getInputs().get(1).getValue()).size(), 0);
// assertEquals("Updated", forms.get(1).getInputs().get(0).getValue());
// assertNotNull(forms.get(1).getInputs().get(1).getValue());
// assertEquals(((Map)forms.get(1).getInputs().get(1).getValue()).size(), 0);
// }
//
// public void testEnableAndDisableConnection() throws Exception {
// loadConnections();
//
// // disable connection 1
// handler.enableConnection(1, false, getDerbyConnection());
//
// MConnection retrieved = handler.findConnection(1, getDerbyConnection());
// assertNotNull(retrieved);
// assertEquals(false, retrieved.getEnabled());
//
// // enable connection 1
// handler.enableConnection(1, true, getDerbyConnection());
//
// retrieved = handler.findConnection(1, getDerbyConnection());
// assertNotNull(retrieved);
// assertEquals(true, retrieved.getEnabled());
// }
//
// public void testDeleteConnection() throws Exception {
// loadConnections();
//
// handler.deleteConnection(1, getDerbyConnection());
// assertCountForTable("SQOOP.SQ_CONNECTION", 1);
// assertCountForTable("SQOOP.SQ_CONNECTION_INPUT", 4);
//
// handler.deleteConnection(2, getDerbyConnection());
// assertCountForTable("SQOOP.SQ_CONNECTION", 0);
// assertCountForTable("SQOOP.SQ_CONNECTION_INPUT", 0);
// }
//
// public MConnection getConnection() {
// return new MConnection(1,
// handler.findConnector("A", getDerbyConnection()).getConnectionForms(),
// handler.findFramework(getDerbyConnection()).getConnectionForms()
// );
// }
DerbyRepositoryHandler handler;
@Override
public void setUp() throws Exception {
super.setUp();
handler = new DerbyRepositoryHandler();
// We always needs schema for this test case
createSchema();
// We always needs connector and framework structures in place
loadConnectorAndFramework();
}
public void testFindConnection() throws Exception {
// Let's try to find non existing connection
try {
handler.findConnection(1, getDerbyConnection());
fail();
} catch(SqoopException ex) {
assertEquals(DerbyRepoError.DERBYREPO_0024, ex.getErrorCode());
}
// Load prepared connections into database
loadConnections();
MConnection connA = handler.findConnection(1, getDerbyConnection());
assertNotNull(connA);
assertEquals(1, connA.getPersistenceId());
assertEquals("CA", connA.getName());
List<MForm> forms;
// Check connector part
forms = connA.getConnectorPart().getForms();
assertEquals("Value1", forms.get(0).getInputs().get(0).getValue());
assertNull(forms.get(0).getInputs().get(1).getValue());
assertEquals("Value3", forms.get(1).getInputs().get(0).getValue());
assertNull(forms.get(1).getInputs().get(1).getValue());
// Check framework part
forms = connA.getFrameworkPart().getForms();
assertEquals("Value13", forms.get(0).getInputs().get(0).getValue());
assertNull(forms.get(0).getInputs().get(1).getValue());
assertEquals("Value15", forms.get(1).getInputs().get(0).getValue());
assertNull(forms.get(1).getInputs().get(1).getValue());
}
public void testFindConnections() throws Exception {
List<MConnection> list;
// Load empty list on empty repository
list = handler.findConnections(getDerbyConnection());
assertEquals(0, list.size());
loadConnections();
// Load all two connections on loaded repository
list = handler.findConnections(getDerbyConnection());
assertEquals(2, list.size());
assertEquals("CA", list.get(0).getName());
assertEquals("CB", list.get(1).getName());
}
public void testExistsConnection() throws Exception {
// There shouldn't be anything on empty repository
assertFalse(handler.existsConnection(1, getDerbyConnection()));
assertFalse(handler.existsConnection(2, getDerbyConnection()));
assertFalse(handler.existsConnection(3, getDerbyConnection()));
loadConnections();
assertTrue(handler.existsConnection(1, getDerbyConnection()));
assertTrue(handler.existsConnection(2, getDerbyConnection()));
assertFalse(handler.existsConnection(3, getDerbyConnection()));
}
public void testCreateConnection() throws Exception {
MConnection connection = getConnection();
// Load some data
fillConnection(connection);
handler.createConnection(connection, getDerbyConnection());
assertEquals(1, connection.getPersistenceId());
assertCountForTable("SQOOP.SQ_CONNECTION", 1);
assertCountForTable("SQOOP.SQ_CONNECTION_INPUT", 4);
MConnection retrieved = handler.findConnection(1, getDerbyConnection());
assertEquals(1, retrieved.getPersistenceId());
List<MForm> forms;
forms = connection.getConnectorPart().getForms();
assertEquals("Value1", forms.get(0).getInputs().get(0).getValue());
assertNull(forms.get(0).getInputs().get(1).getValue());
assertEquals("Value2", forms.get(1).getInputs().get(0).getValue());
assertNull(forms.get(1).getInputs().get(1).getValue());
forms = connection.getFrameworkPart().getForms();
assertEquals("Value13", forms.get(0).getInputs().get(0).getValue());
assertNull(forms.get(0).getInputs().get(1).getValue());
assertEquals("Value15", forms.get(1).getInputs().get(0).getValue());
assertNull(forms.get(1).getInputs().get(1).getValue());
// Let's create second connection
connection = getConnection();
fillConnection(connection);
handler.createConnection(connection, getDerbyConnection());
assertEquals(2, connection.getPersistenceId());
assertCountForTable("SQOOP.SQ_CONNECTION", 2);
assertCountForTable("SQOOP.SQ_CONNECTION_INPUT", 8);
}
public void testInUseConnection() throws Exception {
loadConnections();
assertFalse(handler.inUseConnection(1, getDerbyConnection()));
loadJobs();
assertTrue(handler.inUseConnection(1, getDerbyConnection()));
}
public void testUpdateConnection() throws Exception {
loadConnections();
MConnection connection = handler.findConnection(1, getDerbyConnection());
List<MForm> forms;
forms = connection.getConnectorPart().getForms();
((MStringInput)forms.get(0).getInputs().get(0)).setValue("Updated");
((MMapInput)forms.get(0).getInputs().get(1)).setValue(null);
((MStringInput)forms.get(1).getInputs().get(0)).setValue("Updated");
((MMapInput)forms.get(1).getInputs().get(1)).setValue(null);
forms = connection.getFrameworkPart().getForms();
((MStringInput)forms.get(0).getInputs().get(0)).setValue("Updated");
((MMapInput)forms.get(0).getInputs().get(1)).setValue(new HashMap<String, String>()); // inject new map value
((MStringInput)forms.get(1).getInputs().get(0)).setValue("Updated");
((MMapInput)forms.get(1).getInputs().get(1)).setValue(new HashMap<String, String>()); // inject new map value
connection.setName("name");
handler.updateConnection(connection, getDerbyConnection());
assertEquals(1, connection.getPersistenceId());
assertCountForTable("SQOOP.SQ_CONNECTION", 2);
assertCountForTable("SQOOP.SQ_CONNECTION_INPUT", 10);
MConnection retrieved = handler.findConnection(1, getDerbyConnection());
assertEquals("name", connection.getName());
forms = retrieved.getConnectorPart().getForms();
assertEquals("Updated", forms.get(0).getInputs().get(0).getValue());
assertNull(forms.get(0).getInputs().get(1).getValue());
assertEquals("Updated", forms.get(1).getInputs().get(0).getValue());
assertNull(forms.get(1).getInputs().get(1).getValue());
forms = retrieved.getFrameworkPart().getForms();
assertEquals("Updated", forms.get(0).getInputs().get(0).getValue());
assertNotNull(forms.get(0).getInputs().get(1).getValue());
assertEquals(((Map)forms.get(0).getInputs().get(1).getValue()).size(), 0);
assertEquals("Updated", forms.get(1).getInputs().get(0).getValue());
assertNotNull(forms.get(1).getInputs().get(1).getValue());
assertEquals(((Map)forms.get(1).getInputs().get(1).getValue()).size(), 0);
}
public void testEnableAndDisableConnection() throws Exception {
loadConnections();
// disable connection 1
handler.enableConnection(1, false, getDerbyConnection());
MConnection retrieved = handler.findConnection(1, getDerbyConnection());
assertNotNull(retrieved);
assertEquals(false, retrieved.getEnabled());
// enable connection 1
handler.enableConnection(1, true, getDerbyConnection());
retrieved = handler.findConnection(1, getDerbyConnection());
assertNotNull(retrieved);
assertEquals(true, retrieved.getEnabled());
}
public void testDeleteConnection() throws Exception {
loadConnections();
handler.deleteConnection(1, getDerbyConnection());
assertCountForTable("SQOOP.SQ_CONNECTION", 1);
assertCountForTable("SQOOP.SQ_CONNECTION_INPUT", 4);
handler.deleteConnection(2, getDerbyConnection());
assertCountForTable("SQOOP.SQ_CONNECTION", 0);
assertCountForTable("SQOOP.SQ_CONNECTION_INPUT", 0);
}
public MConnection getConnection() {
return new MConnection(1,
handler.findConnector("A", getDerbyConnection()).getConnectionForms(),
handler.findFramework(getDerbyConnection()).getConnectionForms()
);
}
}

Просмотреть файл

@ -26,70 +26,70 @@ import java.util.List;
*/
public class TestConnectorHandling extends DerbyTestCase {
// DerbyRepositoryHandler handler;
//
// @Override
// public void setUp() throws Exception {
// super.setUp();
//
// handler = new DerbyRepositoryHandler();
//
// // We always needs schema for this test case
// createSchema();
// }
//
// public void testFindConnector() throws Exception {
// // On empty repository, no connectors should be there
// assertNull(handler.findConnector("A", getDerbyConnection()));
// assertNull(handler.findConnector("B", getDerbyConnection()));
//
// // Load connector into repository
// loadConnectorAndFramework();
//
// // Retrieve it
// MConnector connector = handler.findConnector("A", getDerbyConnection());
// assertNotNull(connector);
//
// // Get original structure
// MConnector original = getConnector();
//
// // And compare them
// assertEquals(original, connector);
// }
//
// public void testFindAllConnectors() throws Exception {
// // No connectors in an empty repository, we expect an empty list
// assertEquals(handler.findConnectors(getDerbyConnection()).size(),0);
//
// loadConnectorAndFramework();
// addConnector();
//
// // Retrieve connectors
// List<MConnector> connectors = handler.findConnectors(getDerbyConnection());
// assertNotNull(connectors);
// assertEquals(connectors.size(),2);
// assertEquals(connectors.get(0).getUniqueName(),"A");
// assertEquals(connectors.get(1).getUniqueName(),"B");
//
//
// }
//
// public void testRegisterConnector() throws Exception {
// MConnector connector = getConnector();
//
// handler.registerConnector(connector, getDerbyConnection());
//
// // Connector should get persistence ID
// assertEquals(1, connector.getPersistenceId());
//
// // Now check content in corresponding tables
// assertCountForTable("SQOOP.SQ_CONNECTOR", 1);
// assertCountForTable("SQOOP.SQ_FORM", 6);
// assertCountForTable("SQOOP.SQ_INPUT", 12);
//
// // Registered connector should be easily recovered back
// MConnector retrieved = handler.findConnector("A", getDerbyConnection());
// assertNotNull(retrieved);
// assertEquals(connector, retrieved);
// }
DerbyRepositoryHandler handler;
@Override
public void setUp() throws Exception {
super.setUp();
handler = new DerbyRepositoryHandler();
// We always needs schema for this test case
createSchema();
}
public void testFindConnector() throws Exception {
// On empty repository, no connectors should be there
assertNull(handler.findConnector("A", getDerbyConnection()));
assertNull(handler.findConnector("B", getDerbyConnection()));
// Load connector into repository
loadConnectorAndFramework();
// Retrieve it
MConnector connector = handler.findConnector("A", getDerbyConnection());
assertNotNull(connector);
// Get original structure
MConnector original = getConnector();
// And compare them
assertEquals(original, connector);
}
public void testFindAllConnectors() throws Exception {
// No connectors in an empty repository, we expect an empty list
assertEquals(handler.findConnectors(getDerbyConnection()).size(),0);
loadConnectorAndFramework();
addConnector();
// Retrieve connectors
List<MConnector> connectors = handler.findConnectors(getDerbyConnection());
assertNotNull(connectors);
assertEquals(connectors.size(),2);
assertEquals(connectors.get(0).getUniqueName(),"A");
assertEquals(connectors.get(1).getUniqueName(),"B");
}
public void testRegisterConnector() throws Exception {
MConnector connector = getConnector();
handler.registerConnector(connector, getDerbyConnection());
// Connector should get persistence ID
assertEquals(1, connector.getPersistenceId());
// Now check content in corresponding tables
assertCountForTable("SQOOP.SQ_CONNECTOR", 1);
assertCountForTable("SQOOP.SQ_FORM", 6);
assertCountForTable("SQOOP.SQ_INPUT", 12);
// Registered connector should be easily recovered back
MConnector retrieved = handler.findConnector("A", getDerbyConnection());
assertNotNull(retrieved);
assertEquals(connector, retrieved);
}
}

Просмотреть файл

@ -29,102 +29,101 @@ import java.sql.SQLException;
*/
public class TestFrameworkHandling extends DerbyTestCase {
// DerbyRepositoryHandler handler;
//
// @Override
// public void setUp() throws Exception {
// super.setUp();
//
// handler = new DerbyRepositoryHandler();
//
// // We always needs schema for this test case
// createSchema();
// }
//
// public void testFindFramework() throws Exception {
// // On empty repository, no framework should be there
// assertNull(handler.findFramework(getDerbyConnection()));
//
// // Load framework into repository
// loadConnectorAndFramework();
//
// // Retrieve it
// MFramework framework = handler.findFramework(getDerbyConnection());
// assertNotNull(framework);
//
// // Get original structure
// MFramework original = getFramework();
//
// // And compare them
// assertEquals(original, framework);
// }
//
// public void testRegisterConnector() throws Exception {
// MFramework framework = getFramework();
//
// handler.registerFramework(framework, getDerbyConnection());
//
// // Connector should get persistence ID
// assertEquals(1, framework.getPersistenceId());
//
// // Now check content in corresponding tables
// assertCountForTable("SQOOP.SQ_CONNECTOR", 0);
// assertCountForTable("SQOOP.SQ_FORM", 6);
// assertCountForTable("SQOOP.SQ_INPUT", 12);
//
// // Registered framework should be easily recovered back
// MFramework retrieved = handler.findFramework(getDerbyConnection());
// assertNotNull(retrieved);
// assertEquals(framework, retrieved);
// assertEquals(framework.getVersion(), retrieved.getVersion());
// }
//
// private String getFrameworkVersion() throws Exception {
// final String frameworkVersionQuery =
// "SELECT SQM_VALUE FROM SQOOP.SQ_SYSTEM WHERE SQM_KEY=?";
// String retVal = null;
// PreparedStatement preparedStmt = null;
// ResultSet resultSet = null;
// try {
// preparedStmt =
// getDerbyConnection().prepareStatement(frameworkVersionQuery);
// preparedStmt.setString(1, DerbyRepoConstants.SYSKEY_FRAMEWORK_VERSION);
// resultSet = preparedStmt.executeQuery();
// if(resultSet.next())
// retVal = resultSet.getString(1);
// return retVal;
// } finally {
// if(preparedStmt !=null) {
// try {
// preparedStmt.close();
// } catch(SQLException e) {
// }
// }
// if(resultSet != null) {
// try {
// resultSet.close();
// } catch(SQLException e) {
// }
// }
// }
// }
//
// public void testFrameworkVersion() throws Exception {
// handler.registerFramework(getFramework(), getDerbyConnection());
//
// final String lowerVersion = Integer.toString(
// Integer.parseInt(FrameworkManager.CURRENT_FRAMEWORK_VERSION) - 1);
// assertEquals(FrameworkManager.CURRENT_FRAMEWORK_VERSION, getFrameworkVersion());
// runQuery("UPDATE SQOOP.SQ_SYSTEM SET SQM_VALUE='" + lowerVersion +
// "' WHERE SQM_KEY = '" + DerbyRepoConstants.SYSKEY_FRAMEWORK_VERSION + "'");
// assertEquals(lowerVersion, getFrameworkVersion());
//
// MFramework framework = getFramework();
// handler.updateFramework(framework, getDerbyConnection());
//
// assertEquals(FrameworkManager.CURRENT_FRAMEWORK_VERSION, framework.getVersion());
//
// assertEquals(FrameworkManager.CURRENT_FRAMEWORK_VERSION, getFrameworkVersion());
// }
DerbyRepositoryHandler handler;
@Override
public void setUp() throws Exception {
super.setUp();
handler = new DerbyRepositoryHandler();
// We always needs schema for this test case
createSchema();
}
public void testFindFramework() throws Exception {
// On empty repository, no framework should be there
assertNull(handler.findFramework(getDerbyConnection()));
// Load framework into repository
loadConnectorAndFramework();
// Retrieve it
MFramework framework = handler.findFramework(getDerbyConnection());
assertNotNull(framework);
// Get original structure
MFramework original = getFramework();
// And compare them
assertEquals(original, framework);
}
public void testRegisterConnector() throws Exception {
MFramework framework = getFramework();
handler.registerFramework(framework, getDerbyConnection());
// Connector should get persistence ID
assertEquals(1, framework.getPersistenceId());
// Now check content in corresponding tables
assertCountForTable("SQOOP.SQ_CONNECTOR", 0);
assertCountForTable("SQOOP.SQ_FORM", 4);
assertCountForTable("SQOOP.SQ_INPUT", 8);
// Registered framework should be easily recovered back
MFramework retrieved = handler.findFramework(getDerbyConnection());
assertNotNull(retrieved);
assertEquals(framework, retrieved);
assertEquals(framework.getVersion(), retrieved.getVersion());
}
private String getFrameworkVersion() throws Exception {
final String frameworkVersionQuery =
"SELECT SQM_VALUE FROM SQOOP.SQ_SYSTEM WHERE SQM_KEY=?";
String retVal = null;
PreparedStatement preparedStmt = null;
ResultSet resultSet = null;
try {
preparedStmt =
getDerbyConnection().prepareStatement(frameworkVersionQuery);
preparedStmt.setString(1, DerbyRepoConstants.SYSKEY_FRAMEWORK_VERSION);
resultSet = preparedStmt.executeQuery();
if(resultSet.next())
retVal = resultSet.getString(1);
return retVal;
} finally {
if(preparedStmt !=null) {
try {
preparedStmt.close();
} catch(SQLException e) {
}
}
if(resultSet != null) {
try {
resultSet.close();
} catch(SQLException e) {
}
}
}
}
public void testFrameworkVersion() throws Exception {
handler.registerFramework(getFramework(), getDerbyConnection());
final String lowerVersion = Integer.toString(
Integer.parseInt(FrameworkManager.CURRENT_FRAMEWORK_VERSION) - 1);
assertEquals(FrameworkManager.CURRENT_FRAMEWORK_VERSION, getFrameworkVersion());
runQuery("UPDATE SQOOP.SQ_SYSTEM SET SQM_VALUE='" + lowerVersion +
"' WHERE SQM_KEY = '" + DerbyRepoConstants.SYSKEY_FRAMEWORK_VERSION + "'");
assertEquals(lowerVersion, getFrameworkVersion());
MFramework framework = getFramework();
handler.updateFramework(framework, getDerbyConnection());
assertEquals(FrameworkManager.CURRENT_FRAMEWORK_VERSION, framework.getVersion());
assertEquals(FrameworkManager.CURRENT_FRAMEWORK_VERSION, getFrameworkVersion());
}
}

Просмотреть файл

@ -40,107 +40,107 @@ import java.util.Map;
*/
public class TestInputTypes extends DerbyTestCase {
// DerbyRepositoryHandler handler;
//
// @Override
// public void setUp() throws Exception {
// super.setUp();
//
// handler = new DerbyRepositoryHandler();
//
// // We always needs schema for this test case
// createSchema();
// }
//
// /**
// * Ensure that metadata with all various data types can be successfully
// * serialized into repository and retrieved back.
// */
// public void testMetadataSerialization() throws Exception {
// MConnector connector = getConnector();
//
// // Serialize the connector with all data types into repository
// handler.registerConnector(connector, getDerbyConnection());
//
// // Successful serialization should update the ID
// assertNotSame(connector.getPersistenceId(), MPersistableEntity.PERSISTANCE_ID_DEFAULT);
//
// // Retrieve registered connector
// MConnector retrieved = handler.findConnector(connector.getUniqueName(), getDerbyConnection());
// assertNotNull(retrieved);
//
// // Original and retrieved connectors should be the same
// assertEquals(connector, retrieved);
// }
//
// /**
// * Test that serializing actual data is not an issue.
// */
// public void testDataSerialization() throws Exception {
// MConnector connector = getConnector();
// MFramework framework = getFramework();
//
// // Register metadata for everything and our new connector
// handler.registerConnector(connector, getDerbyConnection());
// handler.registerFramework(framework, getDerbyConnection());
//
// // Inserted values
// Map<String, String> map = new HashMap<String, String>();
// map.put("A", "B");
//
// // Connection object with all various values
// MConnection connection = new MConnection(connector.getPersistenceId(), connector.getConnectionForms(), framework.getConnectionForms());
// MConnectionForms forms = connection.getConnectorPart();
// forms.getStringInput("f.String").setValue("A");
// forms.getMapInput("f.Map").setValue(map);
// forms.getIntegerInput("f.Integer").setValue(1);
// forms.getBooleanInput("f.Boolean").setValue(true);
// forms.getEnumInput("f.Enum").setValue("YES");
//
// // Create the connection in repository
// handler.createConnection(connection, getDerbyConnection());
// assertNotSame(connection.getPersistenceId(), MPersistableEntity.PERSISTANCE_ID_DEFAULT);
//
// // Retrieve created connection
// MConnection retrieved = handler.findConnection(connection.getPersistenceId(), getDerbyConnection());
// forms = retrieved.getConnectorPart();
// assertEquals("A", forms.getStringInput("f.String").getValue());
// assertEquals(map, forms.getMapInput("f.Map").getValue());
// assertEquals(1, (int)forms.getIntegerInput("f.Integer").getValue());
// assertEquals(true, (boolean)forms.getBooleanInput("f.Boolean").getValue());
// assertEquals("YES", forms.getEnumInput("f.Enum").getValue());
// }
//
// /**
// * Overriding parent method to get forms with all supported data types.
// *
// * @return Forms with all data types
// */
// @Override
// protected List<MForm> getForms() {
// List<MForm> forms = new LinkedList<MForm>();
//
// List<MInput<?>> inputs;
// MInput input;
//
// inputs = new LinkedList<MInput<?>>();
//
// input = new MStringInput("f.String", false, (short)30);
// inputs.add(input);
//
// input = new MMapInput("f.Map", false);
// inputs.add(input);
//
// input = new MIntegerInput("f.Integer", false);
// inputs.add(input);
//
// input = new MBooleanInput("f.Boolean", false);
// inputs.add(input);
//
// input = new MEnumInput("f.Enum", false, new String[] {"YES", "NO"});
// inputs.add(input);
//
// forms.add(new MForm("f", inputs));
// return forms;
// }
DerbyRepositoryHandler handler;
@Override
public void setUp() throws Exception {
super.setUp();
handler = new DerbyRepositoryHandler();
// We always needs schema for this test case
createSchema();
}
/**
* Ensure that metadata with all various data types can be successfully
* serialized into repository and retrieved back.
*/
public void testMetadataSerialization() throws Exception {
MConnector connector = getConnector();
// Serialize the connector with all data types into repository
handler.registerConnector(connector, getDerbyConnection());
// Successful serialization should update the ID
assertNotSame(connector.getPersistenceId(), MPersistableEntity.PERSISTANCE_ID_DEFAULT);
// Retrieve registered connector
MConnector retrieved = handler.findConnector(connector.getUniqueName(), getDerbyConnection());
assertNotNull(retrieved);
// Original and retrieved connectors should be the same
assertEquals(connector, retrieved);
}
/**
* Test that serializing actual data is not an issue.
*/
public void testDataSerialization() throws Exception {
MConnector connector = getConnector();
MFramework framework = getFramework();
// Register metadata for everything and our new connector
handler.registerConnector(connector, getDerbyConnection());
handler.registerFramework(framework, getDerbyConnection());
// Inserted values
Map<String, String> map = new HashMap<String, String>();
map.put("A", "B");
// Connection object with all various values
MConnection connection = new MConnection(connector.getPersistenceId(), connector.getConnectionForms(), framework.getConnectionForms());
MConnectionForms forms = connection.getConnectorPart();
forms.getStringInput("f.String").setValue("A");
forms.getMapInput("f.Map").setValue(map);
forms.getIntegerInput("f.Integer").setValue(1);
forms.getBooleanInput("f.Boolean").setValue(true);
forms.getEnumInput("f.Enum").setValue("YES");
// Create the connection in repository
handler.createConnection(connection, getDerbyConnection());
assertNotSame(connection.getPersistenceId(), MPersistableEntity.PERSISTANCE_ID_DEFAULT);
// Retrieve created connection
MConnection retrieved = handler.findConnection(connection.getPersistenceId(), getDerbyConnection());
forms = retrieved.getConnectorPart();
assertEquals("A", forms.getStringInput("f.String").getValue());
assertEquals(map, forms.getMapInput("f.Map").getValue());
assertEquals(1, (int)forms.getIntegerInput("f.Integer").getValue());
assertEquals(true, (boolean)forms.getBooleanInput("f.Boolean").getValue());
assertEquals("YES", forms.getEnumInput("f.Enum").getValue());
}
/**
* Overriding parent method to get forms with all supported data types.
*
* @return Forms with all data types
*/
@Override
protected List<MForm> getForms() {
List<MForm> forms = new LinkedList<MForm>();
List<MInput<?>> inputs;
MInput input;
inputs = new LinkedList<MInput<?>>();
input = new MStringInput("f.String", false, (short)30);
inputs.add(input);
input = new MMapInput("f.Map", false);
inputs.add(input);
input = new MIntegerInput("f.Integer", false);
inputs.add(input);
input = new MBooleanInput("f.Boolean", false);
inputs.add(input);
input = new MEnumInput("f.Enum", false, new String[] {"YES", "NO"});
inputs.add(input);
forms.add(new MForm("f", inputs));
return forms;
}
}

Просмотреть файл

@ -17,31 +17,53 @@
*/
package org.apache.sqoop.repository.derby;
import java.sql.Connection;
/**
*
*/
public class TestInternals extends DerbyTestCase {
// DerbyRepositoryHandler handler;
//
// @Override
// public void setUp() throws Exception {
// super.setUp();
//
// handler = new DerbyRepositoryHandler();
// }
//
// public void testSuitableInternals() throws Exception {
// assertFalse(handler.haveSuitableInternals(getDerbyConnection()));
// createSchema(); // Test code is building the structures
// assertTrue(handler.haveSuitableInternals(getDerbyConnection()));
// }
//
// public void testCreateorUpdateInternals() throws Exception {
// assertFalse(handler.haveSuitableInternals(getDerbyConnection()));
// handler.createOrUpdateInternals(getDerbyConnection());
// assertTrue(handler.haveSuitableInternals(getDerbyConnection()));
// }
DerbyRepositoryHandler handler;
@Override
public void setUp() throws Exception {
super.setUp();
handler = new TestDerbyRepositoryHandler();
}
public void testSuitableInternals() throws Exception {
assertFalse(handler.haveSuitableInternals(getDerbyConnection()));
createSchema(); // Test code is building the structures
assertTrue(handler.haveSuitableInternals(getDerbyConnection()));
}
public void testCreateorUpdateInternals() throws Exception {
assertFalse(handler.haveSuitableInternals(getDerbyConnection()));
handler.createOrUpdateInternals(getDerbyConnection());
assertTrue(handler.haveSuitableInternals(getDerbyConnection()));
}
public void testUpgradeVersion2ToVersion4() throws Exception {
createSchema(2);
assertFalse(handler.haveSuitableInternals(getDerbyConnection()));
loadConnectorAndFramework(2);
loadConnections(2);
loadJobs(2);
handler.createOrUpdateInternals(getDerbyConnection());
assertTrue(handler.haveSuitableInternals(getDerbyConnection()));
}
private class TestDerbyRepositoryHandler extends DerbyRepositoryHandler {
protected long registerHdfsConnector(Connection conn) {
try {
runQuery("INSERT INTO SQOOP.SQ_CONNECTOR(SQC_NAME, SQC_CLASS, SQC_VERSION)"
+ "VALUES('hdfs-connector', 'org.apache.sqoop.test.B', '1.0-test')");
return 2L;
} catch(Exception e) {
return -1L;
}
}
}
}

Просмотреть файл

@ -17,6 +17,7 @@
*/
package org.apache.sqoop.repository.derby;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.model.MForm;
import org.apache.sqoop.model.MJob;
@ -32,242 +33,249 @@ import java.util.Map;
*/
public class TestJobHandling extends DerbyTestCase {
// DerbyRepositoryHandler handler;
//
// @Override
// public void setUp() throws Exception {
// super.setUp();
//
// handler = new DerbyRepositoryHandler();
//
// // We always needs schema for this test case
// createSchema();
//
// // We always needs connector and framework structures in place
// loadConnectorAndFramework();
//
// // We always needs connection metadata in place
// loadConnections();
// }
//
// public void testFindJob() throws Exception {
// // Let's try to find non existing job
// try {
// handler.findJob(1, getDerbyConnection());
// fail();
// } catch(SqoopException ex) {
// assertEquals(DerbyRepoError.DERBYREPO_0030, ex.getErrorCode());
// }
//
// // Load prepared connections into database
// loadJobs();
//
// MJob jobImport = handler.findJob(1, getDerbyConnection());
// assertNotNull(jobImport);
// assertEquals(1, jobImport.getPersistenceId());
// assertEquals("JA", jobImport.getName());
// assertEquals(MJob.Type.IMPORT, jobImport.getType());
//
// List<MForm> forms;
//
// // Check connector part
// forms = jobImport.getFromPart().getForms();
// assertEquals("Value5", forms.get(0).getInputs().get(0).getValue());
// assertNull(forms.get(0).getInputs().get(1).getValue());
// assertEquals("Value7", forms.get(1).getInputs().get(0).getValue());
// assertNull(forms.get(1).getInputs().get(1).getValue());
//
// // Check framework part
// forms = jobImport.getFrameworkPart().getForms();
// assertEquals("Value17", forms.get(0).getInputs().get(0).getValue());
// assertNull(forms.get(0).getInputs().get(1).getValue());
// assertEquals("Value19", forms.get(1).getInputs().get(0).getValue());
// assertNull(forms.get(1).getInputs().get(1).getValue());
// }
//
// public void testFindJobs() throws Exception {
// List<MJob> list;
//
// // Load empty list on empty repository
// list = handler.findJobs(getDerbyConnection());
// assertEquals(0, list.size());
//
// loadJobs();
//
// // Load all two connections on loaded repository
// list = handler.findJobs(getDerbyConnection());
// assertEquals(4, list.size());
//
// assertEquals("JA", list.get(0).getName());
// assertEquals(MJob.Type.IMPORT, list.get(0).getType());
//
// assertEquals("JB", list.get(1).getName());
// assertEquals(MJob.Type.IMPORT, list.get(1).getType());
//
// assertEquals("JA", list.get(2).getName());
// assertEquals(MJob.Type.EXPORT, list.get(2).getType());
//
// assertEquals("JB", list.get(3).getName());
// assertEquals(MJob.Type.EXPORT, list.get(3).getType());
// }
//
// public void testExistsJob() throws Exception {
// // There shouldn't be anything on empty repository
// assertFalse(handler.existsJob(1, getDerbyConnection()));
// assertFalse(handler.existsJob(2, getDerbyConnection()));
// assertFalse(handler.existsJob(3, getDerbyConnection()));
// assertFalse(handler.existsJob(4, getDerbyConnection()));
// assertFalse(handler.existsJob(5, getDerbyConnection()));
//
// loadJobs();
//
// assertTrue(handler.existsJob(1, getDerbyConnection()));
// assertTrue(handler.existsJob(2, getDerbyConnection()));
// assertTrue(handler.existsJob(3, getDerbyConnection()));
// assertTrue(handler.existsJob(4, getDerbyConnection()));
// assertFalse(handler.existsJob(5, getDerbyConnection()));
// }
//
// public void testInUseJob() throws Exception {
// loadJobs();
// loadSubmissions();
//
// assertTrue(handler.inUseJob(1, getDerbyConnection()));
// assertFalse(handler.inUseJob(2, getDerbyConnection()));
// assertFalse(handler.inUseJob(3, getDerbyConnection()));
// assertFalse(handler.inUseJob(4, getDerbyConnection()));
// }
//
// public void testCreateJob() throws Exception {
// MJob job = getJob();
//
// // Load some data
// fillJob(job);
//
// handler.createJob(job, getDerbyConnection());
//
// assertEquals(1, job.getPersistenceId());
// assertCountForTable("SQOOP.SQ_JOB", 1);
// assertCountForTable("SQOOP.SQ_JOB_INPUT", 4);
//
// MJob retrieved = handler.findJob(1, getDerbyConnection());
// assertEquals(1, retrieved.getPersistenceId());
//
// List<MForm> forms;
// forms = job.getFromPart().getForms();
// assertEquals("Value1", forms.get(0).getInputs().get(0).getValue());
// assertNull(forms.get(0).getInputs().get(1).getValue());
// assertEquals("Value2", forms.get(1).getInputs().get(0).getValue());
// assertNull(forms.get(1).getInputs().get(1).getValue());
//
// forms = job.getFrameworkPart().getForms();
// assertEquals("Value13", forms.get(0).getInputs().get(0).getValue());
// assertNull(forms.get(0).getInputs().get(1).getValue());
// assertEquals("Value15", forms.get(1).getInputs().get(0).getValue());
// assertNull(forms.get(1).getInputs().get(1).getValue());
//
// // Let's create second job
// job = getJob();
// fillJob(job);
//
// handler.createJob(job, getDerbyConnection());
//
// assertEquals(2, job.getPersistenceId());
// assertCountForTable("SQOOP.SQ_JOB", 2);
// assertCountForTable("SQOOP.SQ_JOB_INPUT", 8);
// }
//
// public void testUpdateJob() throws Exception {
// loadJobs();
//
// MJob job = handler.findJob(1, getDerbyConnection());
//
// List<MForm> forms;
//
// forms = job.getFromPart().getForms();
// ((MStringInput)forms.get(0).getInputs().get(0)).setValue("Updated");
// ((MMapInput)forms.get(0).getInputs().get(1)).setValue(null);
// ((MStringInput)forms.get(1).getInputs().get(0)).setValue("Updated");
// ((MMapInput)forms.get(1).getInputs().get(1)).setValue(null);
//
// forms = job.getFrameworkPart().getForms();
// ((MStringInput)forms.get(0).getInputs().get(0)).setValue("Updated");
// ((MMapInput)forms.get(0).getInputs().get(1)).setValue(new HashMap<String, String>()); // inject new map value
// ((MStringInput)forms.get(1).getInputs().get(0)).setValue("Updated");
// ((MMapInput)forms.get(1).getInputs().get(1)).setValue(new HashMap<String, String>()); // inject new map value
//
// job.setName("name");
//
// handler.updateJob(job, getDerbyConnection());
//
// assertEquals(1, job.getPersistenceId());
// assertCountForTable("SQOOP.SQ_JOB", 4);
// assertCountForTable("SQOOP.SQ_JOB_INPUT", 18);
//
// MJob retrieved = handler.findJob(1, getDerbyConnection());
// assertEquals("name", retrieved.getName());
//
// forms = retrieved.getFromPart().getForms();
// assertEquals("Updated", forms.get(0).getInputs().get(0).getValue());
// assertNull(forms.get(0).getInputs().get(1).getValue());
// assertEquals("Updated", forms.get(1).getInputs().get(0).getValue());
// assertNull(forms.get(1).getInputs().get(1).getValue());
//
// forms = retrieved.getFrameworkPart().getForms();
// assertEquals("Updated", forms.get(0).getInputs().get(0).getValue());
// assertNotNull(forms.get(0).getInputs().get(1).getValue());
// assertEquals(((Map)forms.get(0).getInputs().get(1).getValue()).size(), 0);
// assertEquals("Updated", forms.get(1).getInputs().get(0).getValue());
// assertNotNull(forms.get(1).getInputs().get(1).getValue());
// assertEquals(((Map)forms.get(1).getInputs().get(1).getValue()).size(), 0);
// }
//
// public void testEnableAndDisableJob() throws Exception {
// loadJobs();
//
// // disable job 1
// handler.enableJob(1, false, getDerbyConnection());
//
// MJob retrieved = handler.findJob(1, getDerbyConnection());
// assertNotNull(retrieved);
// assertEquals(false, retrieved.getEnabled());
//
// // enable job 1
// handler.enableJob(1, true, getDerbyConnection());
//
// retrieved = handler.findJob(1, getDerbyConnection());
// assertNotNull(retrieved);
// assertEquals(true, retrieved.getEnabled());
// }
//
// public void testDeleteJob() throws Exception {
// loadJobs();
//
// handler.deleteJob(1, getDerbyConnection());
// assertCountForTable("SQOOP.SQ_JOB", 3);
// assertCountForTable("SQOOP.SQ_JOB_INPUT", 12);
//
// handler.deleteJob(2, getDerbyConnection());
// assertCountForTable("SQOOP.SQ_JOB", 2);
// assertCountForTable("SQOOP.SQ_JOB_INPUT", 8);
//
// handler.deleteJob(3, getDerbyConnection());
// assertCountForTable("SQOOP.SQ_JOB", 1);
// assertCountForTable("SQOOP.SQ_JOB_INPUT", 4);
//
// handler.deleteJob(4, getDerbyConnection());
// assertCountForTable("SQOOP.SQ_JOB", 0);
// assertCountForTable("SQOOP.SQ_JOB_INPUT", 0);
// }
//
// public MJob getJob() {
// return new MJob(1, 1, MJob.Type.IMPORT,
// handler.findConnector("A",
// getDerbyConnection()).getJobForms(MJob.Type.IMPORT
// ),
// handler.findFramework(
// getDerbyConnection()).getJobForms(MJob.Type.IMPORT
// )
// );
// }
DerbyRepositoryHandler handler;
@Override
public void setUp() throws Exception {
super.setUp();
handler = new DerbyRepositoryHandler();
// We always needs schema for this test case
createSchema();
// We always needs connector and framework structures in place
loadConnectorAndFramework();
// We always needs connection metadata in place
loadConnections();
}
public void testFindJob() throws Exception {
// Let's try to find non existing job
try {
handler.findJob(1, getDerbyConnection());
fail();
} catch(SqoopException ex) {
assertEquals(DerbyRepoError.DERBYREPO_0030, ex.getErrorCode());
}
// Load prepared connections into database
loadJobs();
MJob jobImport = handler.findJob(1, getDerbyConnection());
assertNotNull(jobImport);
assertEquals(1, jobImport.getPersistenceId());
assertEquals("JA", jobImport.getName());
List<MForm> forms;
// Check connector parts
forms = jobImport.getConnectorPart(Direction.FROM).getForms();
assertEquals(2, forms.size());
assertEquals("Value5", forms.get(0).getInputs().get(0).getValue());
assertNull(forms.get(0).getInputs().get(1).getValue());
assertEquals("Value5", forms.get(0).getInputs().get(0).getValue());
assertNull(forms.get(1).getInputs().get(1).getValue());
forms = jobImport.getConnectorPart(Direction.TO).getForms();
assertEquals(2, forms.size());
assertEquals("Value9", forms.get(0).getInputs().get(0).getValue());
assertNull(forms.get(0).getInputs().get(1).getValue());
assertEquals("Value9", forms.get(0).getInputs().get(0).getValue());
assertNull(forms.get(1).getInputs().get(1).getValue());
// Check framework part
forms = jobImport.getFrameworkPart().getForms();
assertEquals(2, forms.size());
assertEquals("Value17", forms.get(0).getInputs().get(0).getValue());
assertNull(forms.get(0).getInputs().get(1).getValue());
assertEquals("Value19", forms.get(1).getInputs().get(0).getValue());
assertNull(forms.get(1).getInputs().get(1).getValue());
}
public void testFindJobs() throws Exception {
List<MJob> list;
// Load empty list on empty repository
list = handler.findJobs(getDerbyConnection());
assertEquals(0, list.size());
loadJobs();
// Load all two connections on loaded repository
list = handler.findJobs(getDerbyConnection());
assertEquals(4, list.size());
assertEquals("JA", list.get(0).getName());
assertEquals("JB", list.get(1).getName());
assertEquals("JC", list.get(2).getName());
assertEquals("JD", list.get(3).getName());
}
public void testExistsJob() throws Exception {
// There shouldn't be anything on empty repository
assertFalse(handler.existsJob(1, getDerbyConnection()));
assertFalse(handler.existsJob(2, getDerbyConnection()));
assertFalse(handler.existsJob(3, getDerbyConnection()));
assertFalse(handler.existsJob(4, getDerbyConnection()));
assertFalse(handler.existsJob(5, getDerbyConnection()));
loadJobs();
assertTrue(handler.existsJob(1, getDerbyConnection()));
assertTrue(handler.existsJob(2, getDerbyConnection()));
assertTrue(handler.existsJob(3, getDerbyConnection()));
assertTrue(handler.existsJob(4, getDerbyConnection()));
assertFalse(handler.existsJob(5, getDerbyConnection()));
}
public void testInUseJob() throws Exception {
loadJobs();
loadSubmissions();
assertTrue(handler.inUseJob(1, getDerbyConnection()));
assertFalse(handler.inUseJob(2, getDerbyConnection()));
assertFalse(handler.inUseJob(3, getDerbyConnection()));
assertFalse(handler.inUseJob(4, getDerbyConnection()));
}
public void testCreateJob() throws Exception {
MJob job = getJob();
// Load some data
fillJob(job);
handler.createJob(job, getDerbyConnection());
assertEquals(1, job.getPersistenceId());
assertCountForTable("SQOOP.SQ_JOB", 1);
assertCountForTable("SQOOP.SQ_JOB_INPUT", 6);
MJob retrieved = handler.findJob(1, getDerbyConnection());
assertEquals(1, retrieved.getPersistenceId());
List<MForm> forms;
forms = job.getConnectorPart(Direction.FROM).getForms();
assertEquals("Value1", forms.get(0).getInputs().get(0).getValue());
assertNull(forms.get(0).getInputs().get(1).getValue());
forms = job.getConnectorPart(Direction.TO).getForms();
assertEquals("Value1", forms.get(0).getInputs().get(0).getValue());
assertNull(forms.get(0).getInputs().get(1).getValue());
forms = job.getFrameworkPart().getForms();
assertEquals("Value13", forms.get(0).getInputs().get(0).getValue());
assertNull(forms.get(0).getInputs().get(1).getValue());
assertEquals("Value15", forms.get(1).getInputs().get(0).getValue());
assertNull(forms.get(1).getInputs().get(1).getValue());
// Let's create second job
job = getJob();
fillJob(job);
handler.createJob(job, getDerbyConnection());
assertEquals(2, job.getPersistenceId());
assertCountForTable("SQOOP.SQ_JOB", 2);
assertCountForTable("SQOOP.SQ_JOB_INPUT", 12);
}
public void testUpdateJob() throws Exception {
loadJobs();
assertCountForTable("SQOOP.SQ_JOB", 4);
assertCountForTable("SQOOP.SQ_JOB_INPUT", 24);
MJob job = handler.findJob(1, getDerbyConnection());
List<MForm> forms;
forms = job.getConnectorPart(Direction.FROM).getForms();
((MStringInput)forms.get(0).getInputs().get(0)).setValue("Updated");
((MMapInput)forms.get(0).getInputs().get(1)).setValue(null);
forms = job.getConnectorPart(Direction.TO).getForms();
((MStringInput)forms.get(0).getInputs().get(0)).setValue("Updated");
((MMapInput)forms.get(0).getInputs().get(1)).setValue(null);
forms = job.getFrameworkPart().getForms();
((MStringInput)forms.get(0).getInputs().get(0)).setValue("Updated");
((MMapInput)forms.get(0).getInputs().get(1)).setValue(new HashMap<String, String>()); // inject new map value
((MStringInput)forms.get(1).getInputs().get(0)).setValue("Updated");
((MMapInput)forms.get(1).getInputs().get(1)).setValue(new HashMap<String, String>()); // inject new map value
job.setName("name");
handler.updateJob(job, getDerbyConnection());
assertEquals(1, job.getPersistenceId());
assertCountForTable("SQOOP.SQ_JOB", 4);
assertCountForTable("SQOOP.SQ_JOB_INPUT", 26);
MJob retrieved = handler.findJob(1, getDerbyConnection());
assertEquals("name", retrieved.getName());
forms = job.getConnectorPart(Direction.FROM).getForms();
assertEquals(2, forms.size());
assertEquals("Updated", forms.get(0).getInputs().get(0).getValue());
assertNull(forms.get(0).getInputs().get(1).getValue());
forms = job.getConnectorPart(Direction.TO).getForms();
assertEquals(2, forms.size());
assertEquals("Updated", forms.get(0).getInputs().get(0).getValue());
assertNull(forms.get(0).getInputs().get(1).getValue());
forms = retrieved.getFrameworkPart().getForms();
assertEquals(2, forms.size());
assertEquals("Updated", forms.get(0).getInputs().get(0).getValue());
assertNotNull(forms.get(0).getInputs().get(1).getValue());
assertEquals(((Map)forms.get(0).getInputs().get(1).getValue()).size(), 0);
}
public void testEnableAndDisableJob() throws Exception {
loadJobs();
// disable job 1
handler.enableJob(1, false, getDerbyConnection());
MJob retrieved = handler.findJob(1, getDerbyConnection());
assertNotNull(retrieved);
assertEquals(false, retrieved.getEnabled());
// enable job 1
handler.enableJob(1, true, getDerbyConnection());
retrieved = handler.findJob(1, getDerbyConnection());
assertNotNull(retrieved);
assertEquals(true, retrieved.getEnabled());
}
public void testDeleteJob() throws Exception {
loadJobs();
handler.deleteJob(1, getDerbyConnection());
assertCountForTable("SQOOP.SQ_JOB", 3);
assertCountForTable("SQOOP.SQ_JOB_INPUT", 18);
handler.deleteJob(2, getDerbyConnection());
assertCountForTable("SQOOP.SQ_JOB", 2);
assertCountForTable("SQOOP.SQ_JOB_INPUT", 12);
handler.deleteJob(3, getDerbyConnection());
assertCountForTable("SQOOP.SQ_JOB", 1);
assertCountForTable("SQOOP.SQ_JOB_INPUT", 6);
handler.deleteJob(4, getDerbyConnection());
assertCountForTable("SQOOP.SQ_JOB", 0);
assertCountForTable("SQOOP.SQ_JOB_INPUT", 0);
}
public MJob getJob() {
return new MJob(1, 1, 1, 1,
handler.findConnector("A", getDerbyConnection()).getJobForms(Direction.FROM),
handler.findConnector("A", getDerbyConnection()).getJobForms(Direction.TO),
handler.findFramework(getDerbyConnection()).getJobForms()
);
}
}

Просмотреть файл

@ -32,214 +32,214 @@ import java.util.List;
*/
public class TestSubmissionHandling extends DerbyTestCase {
// DerbyRepositoryHandler handler;
//
// @Override
// public void setUp() throws Exception {
// super.setUp();
//
// handler = new DerbyRepositoryHandler();
//
// // We always needs schema for this test case
// createSchema();
//
// // We always needs connector and framework structures in place
// loadConnectorAndFramework();
//
// // We also always needs connection metadata in place
// loadConnections();
//
// // And finally we always needs job metadata in place
// loadJobs();
// }
//
// public void testFindSubmissionsUnfinished() throws Exception {
// List<MSubmission> submissions;
//
// submissions = handler.findSubmissionsUnfinished(getDerbyConnection());
// assertNotNull(submissions);
// assertEquals(0, submissions.size());
//
// loadSubmissions();
//
// submissions = handler.findSubmissionsUnfinished(getDerbyConnection());
// assertNotNull(submissions);
// assertEquals(2, submissions.size());
// }
//
// public void testExistsSubmission() throws Exception {
// // There shouldn't be anything on empty repository
// assertFalse(handler.existsSubmission(1, getDerbyConnection()));
// assertFalse(handler.existsSubmission(2, getDerbyConnection()));
// assertFalse(handler.existsSubmission(3, getDerbyConnection()));
// assertFalse(handler.existsSubmission(4, getDerbyConnection()));
// assertFalse(handler.existsSubmission(5, getDerbyConnection()));
// assertFalse(handler.existsSubmission(6, getDerbyConnection()));
//
// loadSubmissions();
//
// assertTrue(handler.existsSubmission(1, getDerbyConnection()));
// assertTrue(handler.existsSubmission(2, getDerbyConnection()));
// assertTrue(handler.existsSubmission(3, getDerbyConnection()));
// assertTrue(handler.existsSubmission(4, getDerbyConnection()));
// assertTrue(handler.existsSubmission(5, getDerbyConnection()));
// assertFalse(handler.existsSubmission(6, getDerbyConnection()));
// }
//
// public void testCreateSubmission() throws Exception {
// Date creationDate = new Date();
// Date updateDate = new Date();
//
// CounterGroup firstGroup = new CounterGroup("ga");
// CounterGroup secondGroup = new CounterGroup("gb");
// firstGroup.addCounter(new Counter("ca", 100));
// firstGroup.addCounter(new Counter("cb", 200));
// secondGroup.addCounter(new Counter("ca", 300));
// secondGroup.addCounter(new Counter("cd", 400));
// Counters counters = new Counters();
// counters.addCounterGroup(firstGroup);
// counters.addCounterGroup(secondGroup);
//
// MSubmission submission = new MSubmission();
// submission.setJobId(1);
// submission.setStatus(SubmissionStatus.RUNNING);
// submission.setCreationDate(creationDate);
// submission.setLastUpdateDate(updateDate);
// submission.setExternalId("job-x");
// submission.setExternalLink("http://somewhere");
// submission.setExceptionInfo("RuntimeException");
// submission.setExceptionStackTrace("Yeah it happens");
// submission.setCounters(counters);
//
// handler.createSubmission(submission, getDerbyConnection());
//
// assertEquals(1, submission.getPersistenceId());
// assertCountForTable("SQOOP.SQ_SUBMISSION", 1);
//
// List<MSubmission> submissions =
// handler.findSubmissionsUnfinished(getDerbyConnection());
// assertNotNull(submissions);
// assertEquals(1, submissions.size());
//
// submission = submissions.get(0);
//
// assertEquals(1, submission.getJobId());
// assertEquals(SubmissionStatus.RUNNING, submission.getStatus());
// assertEquals(creationDate, submission.getCreationDate());
// assertEquals(updateDate, submission.getLastUpdateDate());
// assertEquals("job-x", submission.getExternalId());
// assertEquals("http://somewhere", submission.getExternalLink());
// assertEquals("RuntimeException", submission.getExceptionInfo());
// assertEquals("Yeah it happens", submission.getExceptionStackTrace());
//
// CounterGroup group;
// Counter counter;
// Counters retrievedCounters = submission.getCounters();
// assertNotNull(retrievedCounters);
//
// group = counters.getCounterGroup("ga");
// assertNotNull(group);
//
// counter = group.getCounter("ca");
// assertNotNull(counter);
// assertEquals(100, counter.getValue());
//
// counter = group.getCounter("cb");
// assertNotNull(counter);
// assertEquals(200, counter.getValue());
//
// group = counters.getCounterGroup("gb");
// assertNotNull(group);
//
// counter = group.getCounter("ca");
// assertNotNull(counter);
// assertEquals(300, counter.getValue());
//
// counter = group.getCounter("cd");
// assertNotNull(counter);
// assertEquals(400, counter.getValue());
//
// // Let's create second (simpler) connection
// submission =
// new MSubmission(1, new Date(), SubmissionStatus.SUCCEEDED, "job-x");
// handler.createSubmission(submission, getDerbyConnection());
//
// assertEquals(2, submission.getPersistenceId());
// assertCountForTable("SQOOP.SQ_SUBMISSION", 2);
// }
//
// public void testUpdateConnection() throws Exception {
// loadSubmissions();
//
// List<MSubmission> submissions =
// handler.findSubmissionsUnfinished(getDerbyConnection());
// assertNotNull(submissions);
// assertEquals(2, submissions.size());
//
// MSubmission submission = submissions.get(0);
// submission.setStatus(SubmissionStatus.SUCCEEDED);
//
// handler.updateSubmission(submission, getDerbyConnection());
//
// submissions = handler.findSubmissionsUnfinished(getDerbyConnection());
// assertNotNull(submissions);
// assertEquals(1, submissions.size());
// }
//
// public void testPurgeSubmissions() throws Exception {
// loadSubmissions();
// List<MSubmission> submissions;
//
// submissions = handler.findSubmissionsUnfinished(getDerbyConnection());
// assertNotNull(submissions);
// assertEquals(2, submissions.size());
// assertCountForTable("SQOOP.SQ_SUBMISSION", 5);
//
// Calendar calendar = Calendar.getInstance();
// // 2012-01-03 05:05:05
// calendar.set(2012, Calendar.JANUARY, 3, 5, 5, 5);
// handler.purgeSubmissions(calendar.getTime(), getDerbyConnection());
//
// submissions = handler.findSubmissionsUnfinished(getDerbyConnection());
// assertNotNull(submissions);
// assertEquals(1, submissions.size());
// assertCountForTable("SQOOP.SQ_SUBMISSION", 2);
//
// handler.purgeSubmissions(new Date(), getDerbyConnection());
//
// submissions = handler.findSubmissionsUnfinished(getDerbyConnection());
// assertNotNull(submissions);
// assertEquals(0, submissions.size());
// assertCountForTable("SQOOP.SQ_SUBMISSION", 0);
//
// handler.purgeSubmissions(new Date(), getDerbyConnection());
//
// submissions = handler.findSubmissionsUnfinished(getDerbyConnection());
// assertNotNull(submissions);
// assertEquals(0, submissions.size());
// assertCountForTable("SQOOP.SQ_SUBMISSION", 0);
// }
//
// /**
// * Test that by directly removing jobs we will also remove associated
// * submissions and counters.
// *
// * @throws Exception
// */
// public void testDeleteJobs() throws Exception {
// loadSubmissions();
// assertCountForTable("SQOOP.SQ_SUBMISSION", 5);
//
// handler.deleteJob(1, getDerbyConnection());
// assertCountForTable("SQOOP.SQ_SUBMISSION", 3);
//
// handler.deleteJob(2, getDerbyConnection());
// assertCountForTable("SQOOP.SQ_SUBMISSION", 2);
//
// handler.deleteJob(3, getDerbyConnection());
// assertCountForTable("SQOOP.SQ_SUBMISSION", 1);
//
// handler.deleteJob(4, getDerbyConnection());
// assertCountForTable("SQOOP.SQ_SUBMISSION", 0);
// }
DerbyRepositoryHandler handler;
@Override
public void setUp() throws Exception {
super.setUp();
handler = new DerbyRepositoryHandler();
// We always needs schema for this test case
createSchema();
// We always needs connector and framework structures in place
loadConnectorAndFramework();
// We also always needs connection metadata in place
loadConnections();
// And finally we always needs job metadata in place
loadJobs();
}
public void testFindSubmissionsUnfinished() throws Exception {
List<MSubmission> submissions;
submissions = handler.findSubmissionsUnfinished(getDerbyConnection());
assertNotNull(submissions);
assertEquals(0, submissions.size());
loadSubmissions();
submissions = handler.findSubmissionsUnfinished(getDerbyConnection());
assertNotNull(submissions);
assertEquals(2, submissions.size());
}
public void testExistsSubmission() throws Exception {
// There shouldn't be anything on empty repository
assertFalse(handler.existsSubmission(1, getDerbyConnection()));
assertFalse(handler.existsSubmission(2, getDerbyConnection()));
assertFalse(handler.existsSubmission(3, getDerbyConnection()));
assertFalse(handler.existsSubmission(4, getDerbyConnection()));
assertFalse(handler.existsSubmission(5, getDerbyConnection()));
assertFalse(handler.existsSubmission(6, getDerbyConnection()));
loadSubmissions();
assertTrue(handler.existsSubmission(1, getDerbyConnection()));
assertTrue(handler.existsSubmission(2, getDerbyConnection()));
assertTrue(handler.existsSubmission(3, getDerbyConnection()));
assertTrue(handler.existsSubmission(4, getDerbyConnection()));
assertTrue(handler.existsSubmission(5, getDerbyConnection()));
assertFalse(handler.existsSubmission(6, getDerbyConnection()));
}
public void testCreateSubmission() throws Exception {
Date creationDate = new Date();
Date updateDate = new Date();
CounterGroup firstGroup = new CounterGroup("ga");
CounterGroup secondGroup = new CounterGroup("gb");
firstGroup.addCounter(new Counter("ca", 100));
firstGroup.addCounter(new Counter("cb", 200));
secondGroup.addCounter(new Counter("ca", 300));
secondGroup.addCounter(new Counter("cd", 400));
Counters counters = new Counters();
counters.addCounterGroup(firstGroup);
counters.addCounterGroup(secondGroup);
MSubmission submission = new MSubmission();
submission.setJobId(1);
submission.setStatus(SubmissionStatus.RUNNING);
submission.setCreationDate(creationDate);
submission.setLastUpdateDate(updateDate);
submission.setExternalId("job-x");
submission.setExternalLink("http://somewhere");
submission.setExceptionInfo("RuntimeException");
submission.setExceptionStackTrace("Yeah it happens");
submission.setCounters(counters);
handler.createSubmission(submission, getDerbyConnection());
assertEquals(1, submission.getPersistenceId());
assertCountForTable("SQOOP.SQ_SUBMISSION", 1);
List<MSubmission> submissions =
handler.findSubmissionsUnfinished(getDerbyConnection());
assertNotNull(submissions);
assertEquals(1, submissions.size());
submission = submissions.get(0);
assertEquals(1, submission.getJobId());
assertEquals(SubmissionStatus.RUNNING, submission.getStatus());
assertEquals(creationDate, submission.getCreationDate());
assertEquals(updateDate, submission.getLastUpdateDate());
assertEquals("job-x", submission.getExternalId());
assertEquals("http://somewhere", submission.getExternalLink());
assertEquals("RuntimeException", submission.getExceptionInfo());
assertEquals("Yeah it happens", submission.getExceptionStackTrace());
CounterGroup group;
Counter counter;
Counters retrievedCounters = submission.getCounters();
assertNotNull(retrievedCounters);
group = counters.getCounterGroup("ga");
assertNotNull(group);
counter = group.getCounter("ca");
assertNotNull(counter);
assertEquals(100, counter.getValue());
counter = group.getCounter("cb");
assertNotNull(counter);
assertEquals(200, counter.getValue());
group = counters.getCounterGroup("gb");
assertNotNull(group);
counter = group.getCounter("ca");
assertNotNull(counter);
assertEquals(300, counter.getValue());
counter = group.getCounter("cd");
assertNotNull(counter);
assertEquals(400, counter.getValue());
// Let's create second (simpler) connection
submission =
new MSubmission(1, new Date(), SubmissionStatus.SUCCEEDED, "job-x");
handler.createSubmission(submission, getDerbyConnection());
assertEquals(2, submission.getPersistenceId());
assertCountForTable("SQOOP.SQ_SUBMISSION", 2);
}
public void testUpdateConnection() throws Exception {
loadSubmissions();
List<MSubmission> submissions =
handler.findSubmissionsUnfinished(getDerbyConnection());
assertNotNull(submissions);
assertEquals(2, submissions.size());
MSubmission submission = submissions.get(0);
submission.setStatus(SubmissionStatus.SUCCEEDED);
handler.updateSubmission(submission, getDerbyConnection());
submissions = handler.findSubmissionsUnfinished(getDerbyConnection());
assertNotNull(submissions);
assertEquals(1, submissions.size());
}
public void testPurgeSubmissions() throws Exception {
loadSubmissions();
List<MSubmission> submissions;
submissions = handler.findSubmissionsUnfinished(getDerbyConnection());
assertNotNull(submissions);
assertEquals(2, submissions.size());
assertCountForTable("SQOOP.SQ_SUBMISSION", 5);
Calendar calendar = Calendar.getInstance();
// 2012-01-03 05:05:05
calendar.set(2012, Calendar.JANUARY, 3, 5, 5, 5);
handler.purgeSubmissions(calendar.getTime(), getDerbyConnection());
submissions = handler.findSubmissionsUnfinished(getDerbyConnection());
assertNotNull(submissions);
assertEquals(1, submissions.size());
assertCountForTable("SQOOP.SQ_SUBMISSION", 2);
handler.purgeSubmissions(new Date(), getDerbyConnection());
submissions = handler.findSubmissionsUnfinished(getDerbyConnection());
assertNotNull(submissions);
assertEquals(0, submissions.size());
assertCountForTable("SQOOP.SQ_SUBMISSION", 0);
handler.purgeSubmissions(new Date(), getDerbyConnection());
submissions = handler.findSubmissionsUnfinished(getDerbyConnection());
assertNotNull(submissions);
assertEquals(0, submissions.size());
assertCountForTable("SQOOP.SQ_SUBMISSION", 0);
}
/**
* Test that by directly removing jobs we will also remove associated
* submissions and counters.
*
* @throws Exception
*/
public void testDeleteJobs() throws Exception {
loadSubmissions();
assertCountForTable("SQOOP.SQ_SUBMISSION", 5);
handler.deleteJob(1, getDerbyConnection());
assertCountForTable("SQOOP.SQ_SUBMISSION", 3);
handler.deleteJob(2, getDerbyConnection());
assertCountForTable("SQOOP.SQ_SUBMISSION", 2);
handler.deleteJob(3, getDerbyConnection());
assertCountForTable("SQOOP.SQ_SUBMISSION", 1);
handler.deleteJob(4, getDerbyConnection());
assertCountForTable("SQOOP.SQ_SUBMISSION", 0);
}
}

Просмотреть файл

@ -18,9 +18,7 @@
*/
package org.apache.sqoop.connector.spi;
import org.apache.sqoop.model.MConnection;
import org.apache.sqoop.model.MConnectionForms;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MJobForms;
public abstract class MetadataUpgrader {
@ -45,3 +43,4 @@ public abstract class MetadataUpgrader {
*/
public abstract void upgrade(MJobForms original, MJobForms upgradeTarget);
}