SQOOP-1496: Sqoop2: Revisit/Refactor the SubmissionEngine/ExecutionEngine APIs

This commit is contained in:
Abraham Elmahrek 2014-09-19 16:24:59 -07:00
Родитель af25bccc2f
Коммит 3d539dd4d7
30 изменённых файлов: 1043 добавлений и 862 удалений

Просмотреть файл

@ -52,8 +52,8 @@ public class SubmissionBean implements JsonBean {
private static final String EXCEPTION_TRACE = "exception-trace";
private static final String PROGRESS = "progress";
private static final String COUNTERS = "counters";
private static final String CONNECTOR_SCHEMA = "schema-connector";
private static final String HIO_SCHEMA = "schema-hio";
private static final String FROM_SCHEMA = "schema-from";
private static final String TO_SCHEMA = "schema-to";
private List<MSubmission> submissions;
@ -116,11 +116,11 @@ public class SubmissionBean implements JsonBean {
if(submission.getCounters() != null) {
object.put(COUNTERS, extractCounters(submission.getCounters()));
}
if(submission.getConnectorSchema() != null) {
object.put(CONNECTOR_SCHEMA, extractSchema(submission.getConnectorSchema()));
if(submission.getFromSchema() != null) {
object.put(FROM_SCHEMA, extractSchema(submission.getFromSchema()));
}
if(submission.getHioSchema() != null) {
object.put(HIO_SCHEMA, extractSchema(submission.getHioSchema()));
if(submission.getToSchema() != null) {
object.put(TO_SCHEMA, extractSchema(submission.getToSchema()));
}
array.add(object);
@ -188,11 +188,11 @@ public class SubmissionBean implements JsonBean {
if(object.containsKey(COUNTERS)) {
submission.setCounters(restoreCounters((JSONObject) object.get(COUNTERS)));
}
if(object.containsKey(CONNECTOR_SCHEMA)) {
submission.setConnectorSchema(restoreSchemna((JSONObject) object.get(CONNECTOR_SCHEMA)));
if(object.containsKey(FROM_SCHEMA)) {
submission.setFromSchema(restoreSchemna((JSONObject) object.get(FROM_SCHEMA)));
}
if(object.containsKey(HIO_SCHEMA)) {
submission.setHioSchema(restoreSchemna((JSONObject) object.get(HIO_SCHEMA)));
if(object.containsKey(TO_SCHEMA)) {
submission.setToSchema(restoreSchemna((JSONObject) object.get(TO_SCHEMA)));
}
this.submissions.add(submission);

Просмотреть файл

@ -100,20 +100,21 @@ public class MSubmission extends MAccountableEntity {
String exceptionStackTrace;
/**
* Schema that was reported by the connector.
* Schema for the FROM part of the job submission
*
* This property is required.
*/
Schema connectorSchema;
Schema fromSchema;
/**
* Schema for the TO part of the job submission
* Optional schema that reported by the underlying I/O implementation. Please
* note that this property might be empty and in such case the connector
* schema will use also on Hadoop I/O side.
* note that this property might be empty and in such case use the FROM schema
* on the TO side.
*
* This property is optional.
*/
Schema hioSchema;
Schema toSchema;
public MSubmission() {
status = SubmissionStatus.UNKNOWN;
@ -219,20 +220,20 @@ public class MSubmission extends MAccountableEntity {
this.setExceptionStackTrace(writer.toString());
}
public Schema getConnectorSchema() {
return connectorSchema;
public Schema getFromSchema() {
return fromSchema;
}
public void setConnectorSchema(Schema connectorSchema) {
this.connectorSchema = connectorSchema;
public void setFromSchema(Schema connectorSchema) {
this.fromSchema = connectorSchema;
}
public Schema getHioSchema() {
return hioSchema;
public Schema getToSchema() {
return toSchema;
}
public void setHioSchema(Schema hioSchema) {
this.hioSchema = hioSchema;
public void setToSchema(Schema hioSchema) {
this.toSchema = hioSchema;
}
@Override
@ -248,8 +249,8 @@ public class MSubmission extends MAccountableEntity {
", externalLink='" + externalLink + '\'' +
", exceptionInfo='" + exceptionInfo + '\'' +
", exceptionStackTrace='" + exceptionStackTrace + '\'' +
", connectorSchema='" + connectorSchema + '\'' +
", hioSchema='" + hioSchema + '\'' +
", fromSchema='" + fromSchema + '\'' +
", toSchema='" + toSchema + '\'' +
'}';
}

Просмотреть файл

@ -405,20 +405,20 @@ public class TestSubmissionBean extends TestCase {
assertEquals(222222, counter.getValue());
}
public void testTransferConnectorSchema() {
public void testTransferFromSchema() {
MSubmission source = new MSubmission();
source.setConnectorSchema(getSchema());
source.setFromSchema(getSchema());
Schema target = transfer(source).getConnectorSchema();
Schema target = transfer(source).getFromSchema();
assertNotNull(target);
assertEquals(getSchema(), target);
}
public void testTransferHioSchema() {
public void testTransferToSchema() {
MSubmission source = new MSubmission();
source.setHioSchema(getSchema());
source.setToSchema(getSchema());
Schema target = transfer(source).getHioSchema();
Schema target = transfer(source).getToSchema();
assertNotNull(target);
assertEquals(getSchema(), target);
}

Просмотреть файл

@ -26,7 +26,6 @@ import org.apache.sqoop.connector.jdbc.configuration.ToJobConfiguration;
import org.apache.sqoop.job.etl.Initializer;
import org.apache.sqoop.job.etl.InitializerContext;
import org.apache.sqoop.validation.Status;
import org.apache.sqoop.validation.Validation;
import org.apache.sqoop.validation.ValidationResult;
import org.apache.sqoop.validation.ValidationRunner;

Просмотреть файл

@ -21,7 +21,6 @@ package org.apache.sqoop.connector.idf;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.Column;
@ -38,7 +37,6 @@ import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
@ -46,8 +44,6 @@ public class CSVIntermediateDataFormat extends IntermediateDataFormat<String> {
public static final char ESCAPE_CHARACTER = '\\';
public static final char QUOTE_CHARACTER = '\'';
private static final Logger LOG = Logger.getLogger
(CSVIntermediateDataFormat.class);
private static final char[] originals = {
0x5C,0x00,0x0A,0x0D,0x1A,0x22,0x27

Просмотреть файл

@ -19,14 +19,10 @@
package org.apache.sqoop.connector.idf;
import org.apache.sqoop.schema.Schema;
import org.apache.sqoop.schema.type.Column;
import org.apache.sqoop.schema.type.Type;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Abstract class representing a pluggable intermediate data format the Sqoop

Просмотреть файл

@ -33,7 +33,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
public class CSVIntermediateDataFormatTest {
public class TestCSVIntermediateDataFormat {
private final String BYTE_FIELD_ENCODING = "ISO-8859-1";

Просмотреть файл

@ -114,10 +114,9 @@ public class ConnectorManager implements Reconfigurable {
return bundles;
}
public ResourceBundle getResourceBundle(long connectorId,
Locale locale) {
public ResourceBundle getResourceBundle(long connectorId, Locale locale) {
ConnectorHandler handler = handlerMap.get(nameMap.get(connectorId));
return handler.getConnector().getBundle(locale);
return handler.getConnector().getBundle(locale);
}
public MConnector getConnectorMetadata(long connectorId) {

Просмотреть файл

@ -18,12 +18,11 @@
package org.apache.sqoop.framework;
import org.apache.sqoop.common.ImmutableContext;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.model.MSubmission;
/**
* Execution engine drive execution of sqoop submission (job). It's responsible
* Execution engine drives execution of sqoop job. It's responsible
* for executing all defined steps in the import/export workflow.
* A successful job execution will be recorded in the job submission entity
*/
public abstract class ExecutionEngine {
@ -31,6 +30,7 @@ public abstract class ExecutionEngine {
* Initialize execution engine
*
* @param context Configuration context
* @parma prefix Execution engine prefix
*/
public void initialize(ImmutableContext context, String prefix) {
}
@ -42,19 +42,19 @@ public abstract class ExecutionEngine {
}
/**
* Return new SubmissionRequest class or any subclass if it's needed by
* Return new JobRequest class or any subclass if it's needed by
* execution and submission engine combination.
*
* @return New Submission request object
* @return new JobRequestobject
*/
public SubmissionRequest createSubmissionRequest() {
return new SubmissionRequest();
public JobRequest createJobRequest() {
return new JobRequest();
}
/**
* Prepare given submission request.
* Prepare given job request.
*
* @param request Submission request
* @param request JobRequest
*/
public abstract void prepareSubmission(SubmissionRequest request);
public abstract void prepareJob(JobRequest request);
}

Просмотреть файл

@ -1,3 +1,5 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -266,264 +268,228 @@ public class JobManager implements Reconfigurable {
}
public MSubmission submit(long jobId, HttpEventContext ctx) {
String username = ctx.getUsername();
Repository repository = RepositoryManager.getInstance().getRepository();
MJob job = repository.findJob(jobId);
if (job == null) {
throw new SqoopException(FrameworkError.FRAMEWORK_0004,
"Unknown job id " + jobId);
}
if (!job.getEnabled()) {
throw new SqoopException(FrameworkError.FRAMEWORK_0009,
"Job id: " + job.getPersistenceId());
}
MConnection fromConnection = repository.findConnection(job.getConnectionId(Direction.FROM));
MConnection toConnection = repository.findConnection(job.getConnectionId(Direction.TO));
if (!fromConnection.getEnabled()) {
throw new SqoopException(FrameworkError.FRAMEWORK_0010,
"Connection id: " + fromConnection.getPersistenceId());
}
if (!toConnection.getEnabled()) {
throw new SqoopException(FrameworkError.FRAMEWORK_0010,
"Connection id: " + toConnection.getPersistenceId());
}
SqoopConnector fromConnector =
ConnectorManager.getInstance().getConnector(job.getConnectorId(Direction.FROM));
SqoopConnector toConnector =
ConnectorManager.getInstance().getConnector(job.getConnectorId(Direction.TO));
// Make sure that connectors support the directions they will be used from.
if (!fromConnector.getSupportedDirections().contains(Direction.FROM)) {
throw new SqoopException(FrameworkError.FRAMEWORK_0011,
"Connector: " + fromConnector.getClass().getCanonicalName());
}
if (!toConnector.getSupportedDirections().contains(Direction.TO)) {
throw new SqoopException(FrameworkError.FRAMEWORK_0011,
"Connector: " + toConnector.getClass().getCanonicalName());
}
// Transform forms to fromConnector specific classes
Object fromConnectorConnection = ClassUtils.instantiate(
fromConnector.getConnectionConfigurationClass());
FormUtils.fromForms(fromConnection.getConnectorPart().getForms(),
fromConnectorConnection);
Object fromJob = ClassUtils.instantiate(
fromConnector.getJobConfigurationClass(Direction.FROM));
FormUtils.fromForms(
job.getConnectorPart(Direction.FROM).getForms(), fromJob);
// Transform forms to toConnector specific classes
Object toConnectorConnection = ClassUtils.instantiate(
toConnector.getConnectionConfigurationClass());
FormUtils.fromForms(toConnection.getConnectorPart().getForms(),
toConnectorConnection);
Object toJob = ClassUtils.instantiate(
toConnector.getJobConfigurationClass(Direction.TO));
FormUtils.fromForms(job.getConnectorPart(Direction.TO).getForms(), toJob);
// Transform framework specific forms
Object fromFrameworkConnection = ClassUtils.instantiate(
FrameworkManager.getInstance().getConnectionConfigurationClass());
Object toFrameworkConnection = ClassUtils.instantiate(
FrameworkManager.getInstance().getConnectionConfigurationClass());
FormUtils.fromForms(fromConnection.getFrameworkPart().getForms(),
fromFrameworkConnection);
FormUtils.fromForms(toConnection.getFrameworkPart().getForms(),
toFrameworkConnection);
Object frameworkJob = ClassUtils.instantiate(
FrameworkManager.getInstance().getJobConfigurationClass());
FormUtils.fromForms(job.getFrameworkPart().getForms(), frameworkJob);
// Create request object
MSubmission summary = new MSubmission(jobId);
SubmissionRequest request = executionEngine.createSubmissionRequest();
summary.setCreationUser(username);
summary.setLastUpdateUser(username);
// Save important variables to the submission request
request.setSummary(summary);
request.setConnector(Direction.FROM, fromConnector);
request.setConnector(Direction.TO, toConnector);
request.setConnectorConnectionConfig(Direction.FROM, fromConnectorConnection);
request.setConnectorConnectionConfig(Direction.TO, toConnectorConnection);
request.setConnectorJobConfig(Direction.FROM, fromJob);
request.setConnectorJobConfig(Direction.TO, toJob);
// @TODO(Abe): Should we actually have 2 different Framework Connection config objects?
request.setFrameworkConnectionConfig(Direction.FROM, fromFrameworkConnection);
request.setFrameworkConnectionConfig(Direction.TO, toFrameworkConnection);
request.setConfigFrameworkJob(frameworkJob);
request.setJobName(job.getName());
request.setJobId(job.getPersistenceId());
request.setNotificationUrl(notificationBaseUrl + jobId);
Class<? extends IntermediateDataFormat<?>> dataFormatClass =
fromConnector.getIntermediateDataFormat();
request.setIntermediateDataFormat(fromConnector.getIntermediateDataFormat());
// Create request object
// Let's register all important jars
// sqoop-common
request.addJarForClass(MapContext.class);
// sqoop-core
request.addJarForClass(FrameworkManager.class);
// sqoop-spi
request.addJarForClass(SqoopConnector.class);
// Execution engine jar
request.addJarForClass(executionEngine.getClass());
// Connectors in use
request.addJarForClass(fromConnector.getClass());
request.addJarForClass(toConnector.getClass());
// Extra libraries that Sqoop code requires
request.addJarForClass(JSONValue.class);
// The IDF is used in the ETL process.
request.addJarForClass(dataFormatClass);
// Get callbacks
request.setFromCallback(fromConnector.getFrom());
request.setToCallback(toConnector.getTo());
LOG.debug("Using callbacks: " + request.getFromCallback() + ", " + request.getToCallback());
// Initialize submission from fromConnector perspective
CallbackBase[] baseCallbacks = {
request.getFromCallback(),
request.getToCallback()
};
CallbackBase baseCallback;
Class<? extends Initializer> initializerClass;
Initializer initializer;
InitializerContext initializerContext;
// Initialize From Connector callback.
baseCallback = request.getFromCallback();
initializerClass = baseCallback
.getInitializer();
initializer = (Initializer) ClassUtils
.instantiate(initializerClass);
if (initializer == null) {
throw new SqoopException(FrameworkError.FRAMEWORK_0006,
"Can't create initializer instance: " + initializerClass.getName());
}
// Initializer context
initializerContext = new InitializerContext(request.getConnectorContext(Direction.FROM));
// Initialize submission from fromConnector perspective
initializer.initialize(initializerContext,
request.getConnectorConnectionConfig(Direction.FROM),
request.getConnectorJobConfig(Direction.FROM));
// Add job specific jars to
request.addJars(initializer.getJars(initializerContext,
request.getConnectorConnectionConfig(Direction.FROM),
request.getConnectorJobConfig(Direction.FROM)));
// @TODO(Abe): Alter behavior of Schema here. Need from Schema.
Schema fromSchema = initializer.getSchema(initializerContext,
request.getConnectorConnectionConfig(Direction.FROM),
request.getConnectorJobConfig(Direction.FROM));
// request.getSummary().setConnectorSchema(initializer.getSchema(
// initializerContext,
// request.getConnectorConnectionConfig(ConnectorType.FROM),
// request.getConnectorJobConfig(ConnectorType.FROM)
// ));
// Initialize To Connector callback.
baseCallback = request.getToCallback();
initializerClass = baseCallback
.getInitializer();
initializer = (Initializer) ClassUtils
.instantiate(initializerClass);
if (initializer == null) {
throw new SqoopException(FrameworkError.FRAMEWORK_0006,
"Can't create initializer instance: " + initializerClass.getName());
}
// Initializer context
initializerContext = new InitializerContext(request.getConnectorContext(Direction.TO));
// Initialize submission from fromConnector perspective
initializer.initialize(initializerContext,
request.getConnectorConnectionConfig(Direction.TO),
request.getConnectorJobConfig(Direction.TO));
// Add job specific jars to
request.addJars(initializer.getJars(initializerContext,
request.getConnectorConnectionConfig(Direction.TO),
request.getConnectorJobConfig(Direction.TO)));
// @TODO(Abe): Alter behavior of Schema here. Need To Schema.
Schema toSchema = initializer.getSchema(initializerContext,
request.getConnectorConnectionConfig(Direction.TO),
request.getConnectorJobConfig(Direction.TO));
// Retrieve and persist the schema
// request.getSummary().setConnectorSchema(initializer.getSchema(
// initializerContext,
// request.getConnectorConnectionConfig(ConnectorType.TO),
// request.getConnectorJobConfig(ConnectorType.TO)
// ));
//TODO: Need better logic here
if (fromSchema != null)
request.getSummary().setConnectorSchema(fromSchema);
else
request.getSummary().setConnectorSchema(toSchema);
// Bootstrap job from framework perspective
prepareSubmission(request);
MSubmission mSubmission = createJobSubmission(ctx, jobId);
JobRequest jobRequest = createJobRequest(jobId, mSubmission);
// Bootstrap job to execute
prepareJob(jobRequest);
// Make sure that this job id is not currently running and submit the job
// only if it's not.
synchronized (getClass()) {
MSubmission lastSubmission = repository.findSubmissionLastForJob(jobId);
MSubmission lastSubmission = RepositoryManager.getInstance().getRepository()
.findSubmissionLastForJob(jobId);
if (lastSubmission != null && lastSubmission.getStatus().isRunning()) {
throw new SqoopException(FrameworkError.FRAMEWORK_0002,
"Job with id " + jobId);
throw new SqoopException(FrameworkError.FRAMEWORK_0002, "Job with id " + jobId);
}
// @TODO(Abe): Call multiple destroyers.
// TODO(Abe): Call multiple destroyers.
// TODO(jarcec): We might need to catch all exceptions here to ensure
// that Destroyer will be executed in all cases.
boolean submitted = submissionEngine.submit(request);
if (!submitted) {
destroySubmission(request);
summary.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT);
// NOTE: the following is a blocking call
boolean success = submissionEngine.submit(jobRequest);
if (!success) {
destroySubmission(jobRequest);
mSubmission.setStatus(SubmissionStatus.FAILURE_ON_SUBMIT);
}
repository.createSubmission(summary);
RepositoryManager.getInstance().getRepository().createSubmission(mSubmission);
}
return mSubmission;
}
// Return job status most recent
private JobRequest createJobRequest(long jobId, MSubmission submission) {
// get job
MJob job = getJob(jobId);
// get from/to connections for the job
MConnection fromConnection = getConnection(job.getConnectionId(Direction.FROM));
MConnection toConnection = getConnection(job.getConnectionId(Direction.TO));
// get from/to connectors for the connection
SqoopConnector fromConnector = getConnector(fromConnection.getConnectorId());
validateSupportedDirection(fromConnector, Direction.FROM);
SqoopConnector toConnector = getConnector(toConnection.getConnectorId());
validateSupportedDirection(toConnector, Direction.TO);
// Transform config to fromConnector specific classes
Object fromConnectionConfig = ClassUtils.instantiate(fromConnector
.getConnectionConfigurationClass());
FormUtils.fromForms(fromConnection.getConnectorPart().getForms(), fromConnectionConfig);
// Transform config to toConnector specific classes
Object toConnectorConfig = ClassUtils
.instantiate(toConnector.getConnectionConfigurationClass());
FormUtils.fromForms(toConnection.getConnectorPart().getForms(), toConnectorConfig);
Object fromJob = ClassUtils.instantiate(fromConnector.getJobConfigurationClass(Direction.FROM));
FormUtils.fromForms(job.getConnectorPart(Direction.FROM).getForms(), fromJob);
Object toJob = ClassUtils.instantiate(toConnector.getJobConfigurationClass(Direction.TO));
FormUtils.fromForms(job.getConnectorPart(Direction.TO).getForms(), toJob);
// Transform framework specific configs
// Q(VB) : Aren't the following 2 exactly the same?
Object fromFrameworkConnection = ClassUtils.instantiate(FrameworkManager.getInstance()
.getConnectionConfigurationClass());
FormUtils.fromForms(fromConnection.getFrameworkPart().getForms(), fromFrameworkConnection);
Object toFrameworkConnection = ClassUtils.instantiate(FrameworkManager.getInstance()
.getConnectionConfigurationClass());
FormUtils.fromForms(toConnection.getFrameworkPart().getForms(), toFrameworkConnection);
Object frameworkJob = ClassUtils.instantiate(FrameworkManager.getInstance()
.getJobConfigurationClass());
FormUtils.fromForms(job.getFrameworkPart().getForms(), frameworkJob);
// Create a job request for submit/execution
JobRequest jobRequest = executionEngine.createJobRequest();
// Save important variables to the job request
jobRequest.setSummary(submission);
jobRequest.setConnector(Direction.FROM, fromConnector);
jobRequest.setConnector(Direction.TO, toConnector);
jobRequest.setConnectorConnectionConfig(Direction.FROM, fromConnectionConfig);
jobRequest.setConnectorConnectionConfig(Direction.TO, toConnectorConfig);
jobRequest.setConnectorJobConfig(Direction.FROM, fromJob);
jobRequest.setConnectorJobConfig(Direction.TO, toJob);
// TODO(Abe): Should we actually have 2 different Framework Connection config objects?
jobRequest.setFrameworkConnectionConfig(Direction.FROM, fromFrameworkConnection);
jobRequest.setFrameworkConnectionConfig(Direction.TO, toFrameworkConnection);
jobRequest.setConfigFrameworkJob(frameworkJob);
jobRequest.setJobName(job.getName());
jobRequest.setJobId(job.getPersistenceId());
jobRequest.setNotificationUrl(notificationBaseUrl + jobId);
Class<? extends IntermediateDataFormat<?>> dataFormatClass =
fromConnector.getIntermediateDataFormat();
jobRequest.setIntermediateDataFormat(fromConnector.getIntermediateDataFormat());
jobRequest.setFrom(fromConnector.getFrom());
jobRequest.setTo(toConnector.getTo());
addStandardJars(jobRequest);
addConnectorJars(jobRequest, fromConnector, toConnector, dataFormatClass);
addConnectorInitializerJars(jobRequest, Direction.FROM);
addConnectorInitializerJars(jobRequest, Direction.TO);
Schema fromSchema = getSchemaFromConnector(jobRequest, Direction.FROM);
Schema toSchema = getSchemaFromConnector(jobRequest, Direction.TO);
// TODO(Gwen): Need better logic here once the Schema refactor: SQOOP-1378
if (fromSchema != null) {
jobRequest.getSummary().setFromSchema(fromSchema);
}
else {
jobRequest.getSummary().setFromSchema(toSchema);
}
LOG.debug("Using entities: " + jobRequest.getFrom() + ", " + jobRequest.getTo());
return jobRequest;
}
private void addConnectorJars(JobRequest jobRequest, SqoopConnector fromConnector,
SqoopConnector toConnector, Class<? extends IntermediateDataFormat<?>> dataFormatClass) {
jobRequest.addJarForClass(fromConnector.getClass());
jobRequest.addJarForClass(toConnector.getClass());
jobRequest.addJarForClass(dataFormatClass);
}
private void addStandardJars(JobRequest jobRequest) {
// Let's register all important jars
// sqoop-common
jobRequest.addJarForClass(MapContext.class);
// sqoop-core
jobRequest.addJarForClass(FrameworkManager.class);
// sqoop-spi
jobRequest.addJarForClass(SqoopConnector.class);
// Execution engine jar
jobRequest.addJarForClass(executionEngine.getClass());
// Extra libraries that Sqoop code requires
jobRequest.addJarForClass(JSONValue.class);
}
MSubmission createJobSubmission(HttpEventContext ctx, long jobId) {
MSubmission summary = new MSubmission(jobId);
summary.setCreationUser(ctx.getUsername());
summary.setLastUpdateUser(ctx.getUsername());
return summary;
}
private void prepareSubmission(SubmissionRequest request) {
JobConfiguration jobConfiguration = (JobConfiguration) request
.getConfigFrameworkJob();
SqoopConnector getConnector(long connnectorId) {
return ConnectorManager.getInstance().getConnector(connnectorId);
}
void validateSupportedDirection(SqoopConnector connector, Direction direction) {
// Make sure that connector supports the given direction
if (!connector.getSupportedDirections().contains(direction)) {
throw new SqoopException(FrameworkError.FRAMEWORK_0011, "Connector: "
+ connector.getClass().getCanonicalName());
}
}
MConnection getConnection(long connectionId) {
MConnection connection = RepositoryManager.getInstance().getRepository()
.findConnection(connectionId);
if (!connection.getEnabled()) {
throw new SqoopException(FrameworkError.FRAMEWORK_0010, "Connection id: "
+ connection.getPersistenceId());
}
return connection;
}
MJob getJob(long jobId) {
MJob job = RepositoryManager.getInstance().getRepository().findJob(jobId);
if (job == null) {
throw new SqoopException(FrameworkError.FRAMEWORK_0004, "Unknown job id: " + jobId);
}
if (!job.getEnabled()) {
throw new SqoopException(FrameworkError.FRAMEWORK_0009, "Job id: " + job.getPersistenceId());
}
return job;
}
private Schema getSchemaFromConnector(JobRequest jobRequest, Direction direction) {
Initializer initializer = getConnectorInitializer(jobRequest, direction);
// Initializer context
InitializerContext initializerContext = getInitializerContext(jobRequest, direction);
// Initialize submission from the connector perspective
initializer.initialize(initializerContext, jobRequest.getConnectorConnectionConfig(direction),
jobRequest.getConnectorJobConfig(direction));
// TODO(Abe): Alter behavior of Schema here.
return initializer.getSchema(initializerContext,
jobRequest.getConnectorConnectionConfig(direction),
jobRequest.getConnectorJobConfig(direction));
}
private void addConnectorInitializerJars(JobRequest jobRequest, Direction direction) {
Initializer initializer = getConnectorInitializer(jobRequest, direction);
InitializerContext initializerContext = getInitializerContext(jobRequest, direction);
// Add job specific jars to
jobRequest.addJars(initializer.getJars(initializerContext,
jobRequest.getConnectorConnectionConfig(direction),
jobRequest.getConnectorJobConfig(direction)));
}
private Initializer getConnectorInitializer(JobRequest jobRequest, Direction direction) {
Transferable transferable = direction.equals(Direction.FROM) ? jobRequest.getFrom() : jobRequest.getTo();
Class<? extends Initializer> initializerClass = transferable.getInitializer();
Initializer initializer = (Initializer) ClassUtils.instantiate(initializerClass);
if (initializer == null) {
throw new SqoopException(FrameworkError.FRAMEWORK_0006,
"Can't create connector initializer instance: " + initializerClass.getName());
}
return initializer;
}
private InitializerContext getInitializerContext(JobRequest jobRequest, Direction direction) {
return new InitializerContext(jobRequest.getConnectorContext(direction));
}
void prepareJob(JobRequest request) {
JobConfiguration jobConfiguration = (JobConfiguration) request.getConfigFrameworkJob();
// We're directly moving configured number of extractors and loaders to
// underlying request object. In the future we might need to throttle this
// count based on other running jobs to meet our SLAs.
@ -531,19 +497,19 @@ public class JobManager implements Reconfigurable {
request.setLoaders(jobConfiguration.throttling.loaders);
// Delegate rest of the job to execution engine
executionEngine.prepareSubmission(request);
executionEngine.prepareJob(request);
}
/**
* Callback that will be called only if we failed to submit the job to the
* remote cluster.
*/
private void destroySubmission(SubmissionRequest request) {
CallbackBase fromCallback = request.getFromCallback();
CallbackBase toCallback = request.getToCallback();
void destroySubmission(JobRequest request) {
Transferable from = request.getFrom();
Transferable to = request.getTo();
Class<? extends Destroyer> fromDestroyerClass = fromCallback.getDestroyer();
Class<? extends Destroyer> toDestroyerClass = toCallback.getDestroyer();
Class<? extends Destroyer> fromDestroyerClass = from.getDestroyer();
Class<? extends Destroyer> toDestroyerClass = to.getDestroyer();
Destroyer fromDestroyer = (Destroyer) ClassUtils.instantiate(fromDestroyerClass);
Destroyer toDestroyer = (Destroyer) ClassUtils.instantiate(toDestroyerClass);
@ -557,15 +523,15 @@ public class JobManager implements Reconfigurable {
"Can't create toDestroyer instance: " + toDestroyerClass.getName());
}
// @TODO(Abe): Update context to manage multiple connectors. As well as summary.
// TODO(Abe): Update context to manage multiple connectors. As well as summary.
DestroyerContext fromDestroyerContext = new DestroyerContext(
request.getConnectorContext(Direction.FROM), false, request.getSummary()
.getConnectorSchema());
.getFromSchema());
DestroyerContext toDestroyerContext = new DestroyerContext(
request.getConnectorContext(Direction.TO), false, request.getSummary()
.getConnectorSchema());
.getToSchema());
// Initialize submission from connector perspective
// destroy submission from connector perspective
fromDestroyer.destroy(fromDestroyerContext, request.getConnectorConnectionConfig(Direction.FROM),
request.getConnectorJobConfig(Direction.FROM));
toDestroyer.destroy(toDestroyerContext, request.getConnectorConnectionConfig(Direction.TO),
@ -573,42 +539,39 @@ public class JobManager implements Reconfigurable {
}
public MSubmission stop(long jobId, HttpEventContext ctx) {
String username = ctx.getUsername();
Repository repository = RepositoryManager.getInstance().getRepository();
MSubmission submission = repository.findSubmissionLastForJob(jobId);
MSubmission mSubmission = repository.findSubmissionLastForJob(jobId);
if (submission == null || !submission.getStatus().isRunning()) {
throw new SqoopException(FrameworkError.FRAMEWORK_0003,
"Job with id " + jobId + " is not running");
if (mSubmission == null || !mSubmission.getStatus().isRunning()) {
throw new SqoopException(FrameworkError.FRAMEWORK_0003, "Job with id " + jobId
+ " is not running");
}
submissionEngine.stop(mSubmission.getExternalId());
String externalId = submission.getExternalId();
submissionEngine.stop(externalId);
submission.setLastUpdateUser(username);
mSubmission.setLastUpdateUser(ctx.getUsername());
// Fetch new information to verify that the stop command has actually worked
update(submission);
update(mSubmission);
// Return updated structure
return submission;
return mSubmission;
}
public MSubmission status(long jobId) {
Repository repository = RepositoryManager.getInstance().getRepository();
MSubmission submission = repository.findSubmissionLastForJob(jobId);
MSubmission mSubmission = repository.findSubmissionLastForJob(jobId);
if (submission == null) {
if (mSubmission == null) {
return new MSubmission(jobId, new Date(), SubmissionStatus.NEVER_EXECUTED);
}
// If the submission is in running state, let's update it
if (submission.getStatus().isRunning()) {
update(submission);
if (mSubmission.getStatus().isRunning()) {
update(mSubmission);
}
return submission;
return mSubmission;
}
private void update(MSubmission submission) {
@ -744,4 +707,4 @@ public class JobManager implements Reconfigurable {
LOG.info("Ending submission manager update thread");
}
}
}
}

Просмотреть файл

@ -23,7 +23,7 @@ import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.idf.IntermediateDataFormat;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.job.etl.CallbackBase;
import org.apache.sqoop.job.etl.Transferable;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.utils.ClassUtils;
@ -35,7 +35,7 @@ import java.util.List;
* all information that we need to create a new submission (including mappers,
* reducers, ...).
*/
public class SubmissionRequest {
public class JobRequest {
/**
* Submission summary
@ -64,14 +64,14 @@ public class SubmissionRequest {
List<String> jars;
/**
* From connector callback
* From entity
*/
CallbackBase fromCallback;
Transferable from;
/**
* To connector callback
* To entity
*/
CallbackBase toCallback;
Transferable to;
/**
* All configuration objects
@ -95,11 +95,6 @@ public class SubmissionRequest {
*/
MutableMapContext frameworkContext;
/**
* HDFS output directory
*/
String outputDirectory;
/**
* Optional notification URL for job progress
*/
@ -120,7 +115,7 @@ public class SubmissionRequest {
*/
Class<? extends IntermediateDataFormat> intermediateDataFormat;
public SubmissionRequest() {
public JobRequest() {
this.jars = new LinkedList<String>();
this.fromConnectorContext = new MutableMapContext();
this.toConnectorContext = new MutableMapContext();
@ -207,20 +202,20 @@ public class SubmissionRequest {
}
}
public CallbackBase getFromCallback() {
return fromCallback;
public Transferable getFrom() {
return from;
}
public void setFromCallback(CallbackBase fromCallback) {
this.fromCallback = fromCallback;
public void setFrom(Transferable from) {
this.from = from;
}
public CallbackBase getToCallback() {
return toCallback;
public Transferable getTo() {
return to;
}
public void setToCallback(CallbackBase toCallback) {
this.toCallback = toCallback;
public void setTo(Transferable to) {
this.to = to;
}
public Object getConnectorConnectionConfig(Direction type) {

Просмотреть файл

@ -22,8 +22,8 @@ import org.apache.sqoop.submission.counter.Counters;
import org.apache.sqoop.submission.SubmissionStatus;
/**
* Submission engine is capable of executing and getting information about
* submissions to remote (hadoop) cluster.
* Submission engine is responsible in conveying the information about the
* job instances (submissions) to remote (hadoop) cluster.
*/
public abstract class SubmissionEngine {
@ -31,6 +31,7 @@ public abstract class SubmissionEngine {
* Initialize submission engine
*
* @param context Configuration context
* @param prefix Submission engine prefix
*/
public void initialize(MapContext context, String prefix) {
}
@ -57,7 +58,7 @@ public abstract class SubmissionEngine {
*
* @return Return true if we were able to submit job to remote cluster.
*/
public abstract boolean submit(SubmissionRequest submission);
public abstract boolean submit(JobRequest submission);
/**
* Hard stop for given submission.

Просмотреть файл

@ -17,64 +17,140 @@
*/
package org.apache.sqoop.framework;
import org.apache.sqoop.framework.configuration.ConnectionConfiguration;
import org.apache.sqoop.framework.configuration.JobConfiguration;
import org.apache.sqoop.validation.Status;
import org.apache.sqoop.validation.ValidationResult;
import org.apache.sqoop.validation.ValidationRunner;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
*
* NOTE(VB): This test class will soon be removed with the Validator refactoring
*/
public class TestFrameworkValidator {
FrameworkValidator validator;
@Before
public void setUp() {
validator = new FrameworkValidator();
}
@Test
public void testConnectionValidation() {
ConnectionConfiguration connectionConfiguration = new ConnectionConfiguration();
ValidationRunner runner = new ValidationRunner();
ValidationResult result = runner.validate(connectionConfiguration);
assertEquals(Status.FINE, result.getStatus());
assertEquals(0, result.getMessages().size());
}
@Test
public void testJobValidation() {
ValidationRunner runner = new ValidationRunner();
ValidationResult result;
JobConfiguration configuration;
// Empty form is allowed
configuration = new JobConfiguration();
result = runner.validate(configuration);
assertEquals(Status.FINE, result.getStatus());
// Explicitly setting extractors and loaders
configuration = new JobConfiguration();
configuration.throttling.extractors = 3;
configuration.throttling.loaders = 3;
result = runner.validate(configuration);
assertEquals(Status.FINE, result.getStatus());
assertEquals(0, result.getMessages().size());
// Negative and zero values for extractors and loaders
// configuration = new JobConfiguration();
// FrameworkValidator validator;
//
// @Before
// public void setUp() {
// validator = new FrameworkValidator();
// }
//
// @Test
// public void testConnectionValidation() {
// ConnectionConfiguration connectionConfiguration = new ConnectionConfiguration();
//
// Validation validation = validator.validateConnection(connectionConfiguration);
// assertEquals(Status.FINE, validation.getStatus());
// assertEquals(0, validation.getMessages().size());
// }
//
// @Test
// public void testExportJobValidation() {
// ExportJobConfiguration configuration;
// Validation validation;
//
// // Empty form is not allowed
// configuration = new ExportJobConfiguration();
// validation = validator.validateJob(MJob.Type.EXPORT, configuration);
// assertEquals(Status.UNACCEPTABLE, validation.getStatus());
// assertTrue(validation.getMessages().containsKey(new Validation.FormInput("input.inputDirectory")));
//
// // Explicitly setting extractors and loaders
// configuration = new ExportJobConfiguration();
// configuration.input.inputDirectory = "/czech/republic";
// configuration.throttling.extractors = 3;
// configuration.throttling.loaders = 3;
//
// validation = validator.validateJob(MJob.Type.EXPORT, configuration);
// assertEquals(Status.FINE, validation.getStatus());
// assertEquals(0, validation.getMessages().size());
//
// // Negative and zero values for extractors and loaders
// configuration = new ExportJobConfiguration();
// configuration.input.inputDirectory = "/czech/republic";
// configuration.throttling.extractors = 0;
// configuration.throttling.loaders = -1;
// result = runner.validate(configuration);
// assertEquals(Status.FINE, result.getStatus());
// assertTrue(result.getMessages().containsKey("throttling.extractors"));
// assertTrue(result.getMessages().containsKey("throttling.loaders"));
}
//
// validation = validator.validateJob(MJob.Type.EXPORT, configuration);
// assertEquals(Status.UNACCEPTABLE, validation.getStatus());
// assertTrue(validation.getMessages().containsKey(new Validation.FormInput("throttling.extractors")));
// assertTrue(validation.getMessages().containsKey(new Validation.FormInput("throttling.loaders")));
// }
//
//
// @Test
// public void testImportJobValidation() {
// ImportJobConfiguration configuration;
// Validation validation;
//
// // Empty form is not allowed
// configuration = new ImportJobConfiguration();
// validation = validator.validateJob(MJob.Type.IMPORT, configuration);
// assertEquals(Status.UNACCEPTABLE, validation.getStatus());
// assertTrue(validation.getMessages().containsKey(new Validation.FormInput("output.outputDirectory")));
//
// // Explicitly setting extractors and loaders
// configuration = new ImportJobConfiguration();
// configuration.output.outputDirectory = "/czech/republic";
// configuration.throttling.extractors = 3;
// configuration.throttling.loaders = 3;
//
// validation = validator.validateJob(MJob.Type.IMPORT, configuration);
// assertEquals(Status.FINE, validation.getStatus());
// assertEquals(0, validation.getMessages().size());
//
// // Negative and zero values for extractors and loaders
// configuration = new ImportJobConfiguration();
// configuration.output.outputDirectory = "/czech/republic";
// configuration.throttling.extractors = 0;
// configuration.throttling.loaders = -1;
//
// validation = validator.validateJob(MJob.Type.IMPORT, configuration);
// assertEquals(Status.UNACCEPTABLE, validation.getStatus());
// assertTrue(validation.getMessages().containsKey(new Validation.FormInput("throttling.extractors")));
// assertTrue(validation.getMessages().containsKey(new Validation.FormInput("throttling.loaders")));
//
// // specifying both compression as well as customCompression is
// // unacceptable
// configuration = new ImportJobConfiguration();
// configuration.output.outputDirectory = "/czech/republic";
// configuration.throttling.extractors = 2;
// configuration.throttling.loaders = 2;
// configuration.output.compression = OutputCompression.BZIP2;
// configuration.output.customCompression = "some.compression.codec";
//
// validation = validator.validateJob(MJob.Type.IMPORT, configuration);
// assertEquals(Status.UNACCEPTABLE, validation.getStatus());
// assertTrue(validation.getMessages().containsKey(new Validation.FormInput("output.compression")));
//
// // specifying a customCompression is fine
// configuration = new ImportJobConfiguration();
// configuration.output.outputDirectory = "/czech/republic";
// configuration.throttling.extractors = 2;
// configuration.throttling.loaders = 2;
// configuration.output.compression = OutputCompression.CUSTOM;
// configuration.output.customCompression = "some.compression.codec";
//
// validation = validator.validateJob(MJob.Type.IMPORT, configuration);
// assertEquals(Status.FINE, validation.getStatus());
//
// // specifying a customCompression without codec name is unacceptable
// configuration = new ImportJobConfiguration();
// configuration.output.outputDirectory = "/czech/republic";
// configuration.throttling.extractors = 2;
// configuration.throttling.loaders = 2;
// configuration.output.compression = OutputCompression.CUSTOM;
// configuration.output.customCompression = "";
//
// validation = validator.validateJob(MJob.Type.IMPORT, configuration);
// assertEquals(Status.UNACCEPTABLE, validation.getStatus());
// assertTrue(validation.getMessages().containsKey(new Validation.FormInput("output.compression")));
//
// configuration = new ImportJobConfiguration();
// configuration.output.outputDirectory = "/czech/republic";
// configuration.throttling.extractors = 2;
// configuration.throttling.loaders = 2;
// configuration.output.compression = OutputCompression.CUSTOM;
// configuration.output.customCompression = null;
//
// validation = validator.validateJob(MJob.Type.IMPORT, configuration);
// assertEquals(Status.UNACCEPTABLE, validation.getStatus());
// assertTrue(validation.getMessages().containsKey(new Validation.FormInput("output.compression")));
//
// }
}

Просмотреть файл

@ -0,0 +1,173 @@
package org.apache.sqoop.framework;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.List;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.ConnectorManager;
import org.apache.sqoop.connector.spi.SqoopConnector;
import org.apache.sqoop.model.MConnection;
import org.apache.sqoop.model.MJob;
import org.apache.sqoop.model.MSubmission;
import org.apache.sqoop.repository.JdbcRepository;
import org.apache.sqoop.repository.Repository;
import org.apache.sqoop.repository.RepositoryManager;
import org.apache.sqoop.request.HttpEventContext;
import org.junit.Before;
import org.junit.Test;
public class TestJobManager {
private JobManager jobManager;
private SqoopConnector sqoopConnectorMock;
private ConnectorManager connectorMgrMock;
private RepositoryManager repositoryManagerMock;
private Repository jdbcRepoMock;
@Before
public void setUp() {
jobManager = new JobManager();
connectorMgrMock = mock(ConnectorManager.class);
sqoopConnectorMock = mock(SqoopConnector.class);
ConnectorManager.setInstance(connectorMgrMock);
repositoryManagerMock = mock(RepositoryManager.class);
RepositoryManager.setInstance(repositoryManagerMock);
jdbcRepoMock = mock(JdbcRepository.class);
}
@Test
public void testCreateJobSubmission() {
HttpEventContext testCtx = new HttpEventContext();
testCtx.setUsername("testUser");
MSubmission jobSubmission = jobManager.createJobSubmission(testCtx, 1234L);
assertEquals(jobSubmission.getCreationUser(), "testUser");
assertEquals(jobSubmission.getLastUpdateUser(), "testUser");
}
@Test
public void testGetConnector() {
when(connectorMgrMock.getConnector(123l)).thenReturn(sqoopConnectorMock);
when(sqoopConnectorMock.getSupportedDirections()).thenReturn(getSupportedDirections());
assertEquals(jobManager.getConnector(123l), sqoopConnectorMock);
verify(connectorMgrMock, times(1)).getConnector(123l);
}
@Test
public void testUnsupportedDirectionForConnector() {
// invalid job id/ direction
SqoopException exception = new SqoopException(FrameworkError.FRAMEWORK_0011, "Connector: "
+ sqoopConnectorMock.getClass().getCanonicalName());
List<Direction> supportedDirections = getSupportedDirections();
when(sqoopConnectorMock.getSupportedDirections()).thenReturn(supportedDirections);
try {
// invalid direction
jobManager.validateSupportedDirection(sqoopConnectorMock, null);
} catch (SqoopException ex) {
assertEquals(ex.getMessage(), exception.getMessage());
verify(sqoopConnectorMock, times(1)).getSupportedDirections();
return;
}
fail("Should throw out an exception with message: " + exception.getMessage());
}
@Test
public void testGetConnection() {
MConnection testConnection = new MConnection(123l, null, null);
testConnection.setEnabled(true);
MConnection mConnectionSpy = org.mockito.Mockito.spy(testConnection);
when(repositoryManagerMock.getRepository()).thenReturn(jdbcRepoMock);
when(jdbcRepoMock.findConnection(123l)).thenReturn(mConnectionSpy);
assertEquals(jobManager.getConnection(123l), mConnectionSpy);
verify(repositoryManagerMock, times(1)).getRepository();
verify(jdbcRepoMock, times(1)).findConnection(123l);
}
@Test
public void testDisabledConnection() {
MConnection testConnection = new MConnection(123l, null, null);
testConnection.setPersistenceId(1234);
testConnection.setEnabled(false);
SqoopException exception = new SqoopException(FrameworkError.FRAMEWORK_0010, "Connection id: "
+ testConnection.getPersistenceId());
MConnection mConnectionSpy = org.mockito.Mockito.spy(testConnection);
when(repositoryManagerMock.getRepository()).thenReturn(jdbcRepoMock);
when(jdbcRepoMock.findConnection(123l)).thenReturn(mConnectionSpy);
try {
jobManager.getConnection(123l);
} catch (SqoopException ex) {
assertEquals(ex.getMessage(), exception.getMessage());
verify(repositoryManagerMock, times(1)).getRepository();
verify(jdbcRepoMock, times(1)).findConnection(123l);
}
}
@Test
public void testGetJob() {
MJob testJob = job(123l, 456l);
testJob.setEnabled(true);
MJob mJobSpy = org.mockito.Mockito.spy(testJob);
when(repositoryManagerMock.getRepository()).thenReturn(jdbcRepoMock);
when(jdbcRepoMock.findJob(123l)).thenReturn(mJobSpy);
assertEquals(jobManager.getJob(123l), mJobSpy);
verify(repositoryManagerMock, times(1)).getRepository();
verify(jdbcRepoMock, times(1)).findJob(123l);
}
@Test
public void testDisabledJob() {
MJob testJob = job(123l, 456l);
testJob.setEnabled(false);
testJob.setPersistenceId(1111);
SqoopException exception = new SqoopException(FrameworkError.FRAMEWORK_0009, "Job id: "
+ testJob.getPersistenceId());
MJob mJobSpy = org.mockito.Mockito.spy(testJob);
when(repositoryManagerMock.getRepository()).thenReturn(jdbcRepoMock);
when(jdbcRepoMock.findJob(123l)).thenReturn(mJobSpy);
try {
jobManager.getJob(123l);
} catch (SqoopException ex) {
assertEquals(ex.getMessage(), exception.getMessage());
verify(repositoryManagerMock, times(1)).getRepository();
verify(jdbcRepoMock, times(1)).findJob(123l);
}
}
@Test
public void testUnknownJob() {
long testJobId = 555l;
SqoopException exception = new SqoopException(FrameworkError.FRAMEWORK_0004, "Unknown job id: "
+ testJobId);
when(repositoryManagerMock.getRepository()).thenReturn(jdbcRepoMock);
when(jdbcRepoMock.findJob(testJobId)).thenReturn(null);
try {
jobManager.getJob(testJobId);
} catch (SqoopException ex) {
assertEquals(ex.getMessage(), exception.getMessage());
verify(repositoryManagerMock, times(1)).getRepository();
verify(jdbcRepoMock, times(1)).findJob(testJobId);
}
}
private MJob job(long fromId, long toId) {
MJob job = new MJob(fromId, toId, 1L, 2L, null, null, null);
job.setName("Vampire");
job.setCreationUser("Buffy");
return job;
}
public List<Direction> getSupportedDirections() {
return Arrays.asList(new Direction[] { Direction.FROM, Direction.TO });
}
}

Просмотреть файл

@ -29,43 +29,43 @@ import static org.junit.Assert.assertTrue;
/**
*
*/
public class TestSubmissionRequest {
public class TestJobRequest {
private SubmissionRequest submissionRequest;
private JobRequest jobRequest;
@Before
public void initializeSubmissionRequest() {
submissionRequest = new SubmissionRequest();
jobRequest = new JobRequest();
}
@Test
public void testAddJar() {
submissionRequest.addJar("A");
submissionRequest.addJar("B");
submissionRequest.addJar("A");
jobRequest.addJar("A");
jobRequest.addJar("B");
jobRequest.addJar("A");
assertEquals(2, submissionRequest.getJars().size());
assertEquals("A", submissionRequest.getJars().get(0));
assertEquals("B", submissionRequest.getJars().get(1));
assertEquals(2, jobRequest.getJars().size());
assertEquals("A", jobRequest.getJars().get(0));
assertEquals("B", jobRequest.getJars().get(1));
}
@Test
public void testAddJarForClass() {
submissionRequest.addJarForClass(TestSubmissionRequest.class);
submissionRequest.addJarForClass(TestFrameworkValidator.class);
jobRequest.addJarForClass(TestJobRequest.class);
jobRequest.addJarForClass(TestFrameworkValidator.class);
assertEquals(1, submissionRequest.getJars().size());
assertTrue(submissionRequest.getJars().contains(ClassUtils.jarForClass(TestSubmissionRequest.class)));
assertEquals(1, jobRequest.getJars().size());
assertTrue(jobRequest.getJars().contains(ClassUtils.jarForClass(TestJobRequest.class)));
}
@Test
public void testAddJars() {
submissionRequest.addJars(Arrays.asList("A", "B"));
submissionRequest.addJars(Arrays.asList("B", "C"));
jobRequest.addJars(Arrays.asList("A", "B"));
jobRequest.addJars(Arrays.asList("B", "C"));
assertEquals(3, submissionRequest.getJars().size());
assertEquals("A", submissionRequest.getJars().get(0));
assertEquals("B", submissionRequest.getJars().get(1));
assertEquals("C", submissionRequest.getJars().get(2));
assertEquals(3, jobRequest.getJars().size());
assertEquals("A", jobRequest.getJars().get(0));
assertEquals("B", jobRequest.getJars().get(1));
assertEquals("C", jobRequest.getJars().get(2));
}
}

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -21,13 +21,13 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.sqoop.framework.SubmissionRequest;
import org.apache.sqoop.framework.JobRequest;
/**
* Map-reduce specific submission request containing all extra information
* needed for bootstrapping map-reduce job.
*/
public class MRSubmissionRequest extends SubmissionRequest {
public class MRJobRequest extends JobRequest {
/**
* Map-reduce specific options.
@ -40,7 +40,7 @@ public class MRSubmissionRequest extends SubmissionRequest {
Class<? extends Writable> outputKeyClass;
Class<? extends Writable> outputValueClass;
public MRSubmissionRequest() {
public MRJobRequest() {
super();
}

Просмотреть файл

@ -17,21 +17,20 @@
*/
package org.apache.sqoop.execution.mapreduce;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.io.NullWritable;
import org.apache.sqoop.common.MutableMapContext;
import org.apache.sqoop.framework.ExecutionEngine;
import org.apache.sqoop.framework.SubmissionRequest;
import org.apache.sqoop.framework.configuration.JobConfiguration;
import org.apache.sqoop.framework.JobRequest;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.etl.From;
import org.apache.sqoop.job.etl.To;
import org.apache.sqoop.job.io.Data;
import org.apache.sqoop.job.io.SqoopWritable;
import org.apache.sqoop.job.mr.SqoopInputFormat;
import org.apache.sqoop.job.mr.SqoopMapper;
import org.apache.sqoop.job.mr.SqoopNullOutputFormat;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
*
*/
@ -41,44 +40,40 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
* {@inheritDoc}
*/
@Override
public SubmissionRequest createSubmissionRequest() {
return new MRSubmissionRequest();
public JobRequest createJobRequest() {
return new MRJobRequest();
}
public void prepareSubmission(SubmissionRequest gRequest) {
MRSubmissionRequest request = (MRSubmissionRequest)gRequest;
public void prepareJob(JobRequest jobRequest) {
MRJobRequest mrJobRequest = (MRJobRequest)jobRequest;
// Add jar dependencies
addDependencies(request);
addDependencies(mrJobRequest);
// Configure map-reduce classes for import
request.setInputFormatClass(SqoopInputFormat.class);
mrJobRequest.setInputFormatClass(SqoopInputFormat.class);
request.setMapperClass(SqoopMapper.class);
request.setMapOutputKeyClass(SqoopWritable.class);
request.setMapOutputValueClass(NullWritable.class);
mrJobRequest.setMapperClass(SqoopMapper.class);
mrJobRequest.setMapOutputKeyClass(SqoopWritable.class);
mrJobRequest.setMapOutputValueClass(NullWritable.class);
request.setOutputFormatClass(SqoopNullOutputFormat.class);
request.setOutputKeyClass(SqoopWritable.class);
request.setOutputValueClass(NullWritable.class);
mrJobRequest.setOutputFormatClass(SqoopNullOutputFormat.class);
mrJobRequest.setOutputKeyClass(SqoopWritable.class);
mrJobRequest.setOutputValueClass(NullWritable.class);
// Set up framework context
From from = (From)request.getFromCallback();
To to = (To)request.getToCallback();
MutableMapContext context = request.getFrameworkContext();
From from = (From) mrJobRequest.getFrom();
To to = (To) mrJobRequest.getTo();
MutableMapContext context = mrJobRequest.getFrameworkContext();
context.setString(JobConstants.JOB_ETL_PARTITIONER, from.getPartitioner().getName());
context.setString(JobConstants.JOB_ETL_EXTRACTOR, from.getExtractor().getName());
context.setString(JobConstants.JOB_ETL_LOADER, to.getLoader().getName());
context.setString(JobConstants.JOB_ETL_DESTROYER, from.getDestroyer().getName());
context.setString(JobConstants.INTERMEDIATE_DATA_FORMAT,
request.getIntermediateDataFormat().getName());
mrJobRequest.getIntermediateDataFormat().getName());
if(request.getExtractors() != null) {
context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors());
}
if(request.getExtractors() != null) {
context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, request.getExtractors());
if(mrJobRequest.getExtractors() != null) {
context.setInteger(JobConstants.JOB_ETL_EXTRACTOR_NUM, mrJobRequest.getExtractors());
}
}
@ -91,7 +86,7 @@ public class MapreduceExecutionEngine extends ExecutionEngine {
*
* @param request Active request object.
*/
protected void addDependencies(MRSubmissionRequest request) {
protected void addDependencies(MRJobRequest request) {
// Guava
request.addJarForClass(ThreadFactoryBuilder.class);
}

Просмотреть файл

@ -90,9 +90,6 @@ public final class ConfigurationUtils {
private static final Text SCHEMA_TO_CONNECTOR_KEY = new Text(SCHEMA_TO_CONNECTOR);
private static final String SCHEMA_HIO = JobConstants.PREFIX_JOB_CONFIG + "schema.hio";
private static final Text SCHEMA_HIO_KEY = new Text(SCHEMA_HIO);
/**
* Persist Connector configuration object for connection.

Просмотреть файл

@ -31,9 +31,9 @@ public class ProgressRunnable implements Runnable {
/**
* Context class that we should use for reporting progress.
*/
private final TaskInputOutputContext context;
private final TaskInputOutputContext<?,?,?,?> context;
public ProgressRunnable(final TaskInputOutputContext ctxt) {
public ProgressRunnable(final TaskInputOutputContext<?,?,?,?> ctxt) {
this.context = ctxt;
}

Просмотреть файл

@ -32,8 +32,7 @@ import org.apache.sqoop.utils.ClassUtils;
*/
public class SqoopDestroyerExecutor {
public static final Logger LOG =
Logger.getLogger(SqoopDestroyerExecutor.class);
public static final Logger LOG = Logger.getLogger(SqoopDestroyerExecutor.class);
/**
* Execute destroyer.
@ -56,10 +55,8 @@ public class SqoopDestroyerExecutor {
Object fromConfigConnection = ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, configuration);
Object fromConfigJob = ConfigurationUtils.getConnectorJobConfig(Direction.FROM, configuration);
// Propagate connector schema in every case for now
// TODO: Change to coditional choosing between Connector schemas.
// TODO(Abe/Gwen): Change to conditional choosing between schemas.
Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, configuration);
DestroyerContext destroyerContext = new DestroyerContext(subContext, success, schema);
LOG.info("Executing destroyer class " + destroyer.getClass());

Просмотреть файл

@ -64,9 +64,7 @@ public class SqoopFileOutputFormat
conf.set(JobConstants.JOB_MR_OUTPUT_CODEC, codecname);
}
SqoopOutputFormatLoadExecutor executor =
new SqoopOutputFormatLoadExecutor(context);
return executor.getRecordWriter();
return new SqoopOutputFormatLoadExecutor(context).getRecordWriter();
}
@Override

Просмотреть файл

@ -54,7 +54,7 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
* Service for reporting progress to mapreduce.
*/
private final ScheduledExecutorService progressService = Executors.newSingleThreadScheduledExecutor();
private IntermediateDataFormat data = null;
private IntermediateDataFormat<String> dataFormat = null;
private SqoopWritable dataOut = null;
@Override
@ -64,44 +64,36 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
String extractorName = conf.get(JobConstants.JOB_ETL_EXTRACTOR);
Extractor extractor = (Extractor) ClassUtils.instantiate(extractorName);
// Propagate connector schema in every case for now
// TODO: Change to coditional choosing between Connector schemas.
// TODO(Abe/Gwen): Change to conditional choosing between Connector schemas.
Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, conf);
if (schema==null) {
if (schema == null) {
schema = ConfigurationUtils.getConnectorSchema(Direction.TO, conf);
}
if (schema==null) {
if (schema == null) {
LOG.info("setting an empty schema");
}
String intermediateDataFormatName = conf.get(JobConstants
.INTERMEDIATE_DATA_FORMAT);
data = (IntermediateDataFormat) ClassUtils.instantiate(intermediateDataFormatName);
data.setSchema(schema);
String intermediateDataFormatName = conf.get(JobConstants.INTERMEDIATE_DATA_FORMAT);
dataFormat = (IntermediateDataFormat<String>) ClassUtils
.instantiate(intermediateDataFormatName);
dataFormat.setSchema(schema);
dataOut = new SqoopWritable();
// Objects that should be pass to the Executor execution
PrefixContext subContext = null;
Object configConnection = null;
Object configJob = null;
// Get configs for extractor
subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
configConnection = ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf);
configJob = ConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf);
// Objects that should be passed to the Executor execution
PrefixContext subContext = new PrefixContext(conf, JobConstants.PREFIX_CONNECTOR_FROM_CONTEXT);
Object fromConfig = ConfigurationUtils.getConnectorConnectionConfig(Direction.FROM, conf);
Object fromJob = ConfigurationUtils.getConnectorJobConfig(Direction.FROM, conf);
SqoopSplit split = context.getCurrentKey();
ExtractorContext extractorContext = new ExtractorContext(subContext, new MapDataWriter(context), schema);
ExtractorContext extractorContext = new ExtractorContext(subContext, new SqoopMapDataWriter(context), schema);
try {
LOG.info("Starting progress service");
progressService.scheduleAtFixedRate(new ProgressRunnable(context), 0, 2, TimeUnit.MINUTES);
LOG.info("Running extractor class " + extractorName);
extractor.extract(extractorContext, configConnection, configJob, split.getPartition());
extractor.extract(extractorContext, fromConfig, fromJob, split.getPartition());
LOG.info("Extractor has finished");
context.getCounter(SqoopCounters.ROWS_READ)
.increment(extractor.getRowsRead());
@ -117,37 +109,37 @@ public class SqoopMapper extends Mapper<SqoopSplit, NullWritable, SqoopWritable,
}
}
private class MapDataWriter extends DataWriter {
private class SqoopMapDataWriter extends DataWriter {
private Context context;
public MapDataWriter(Context context) {
public SqoopMapDataWriter(Context context) {
this.context = context;
}
@Override
public void writeArrayRecord(Object[] array) {
data.setObjectData(array);
dataFormat.setObjectData(array);
writeContent();
}
@Override
public void writeStringRecord(String text) {
data.setTextData(text);
dataFormat.setTextData(text);
writeContent();
}
@Override
public void writeRecord(Object obj) {
data.setData(obj.toString());
dataFormat.setData(obj.toString());
writeContent();
}
private void writeContent() {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Extracted data: " + data.getTextData());
LOG.debug("Extracted data: " + dataFormat.getTextData());
}
dataOut.setString(data.getTextData());
dataOut.setString(dataFormat.getTextData());
context.write(dataOut, NullWritable.get());
} catch (Exception e) {
throw new SqoopException(MapreduceExecutionError.MAPRED_EXEC_0013, e);

Просмотреть файл

@ -51,9 +51,9 @@ public class SqoopOutputFormatLoadExecutor {
private volatile boolean readerFinished = false;
private volatile boolean writerFinished = false;
private volatile IntermediateDataFormat data;
private volatile IntermediateDataFormat<String> dataFormat;
private JobContext context;
private SqoopRecordWriter producer;
private SqoopRecordWriter writer;
private Future<?> consumerFuture;
private Semaphore filled = new Semaphore(0, true);
private Semaphore free = new Semaphore(1, true);
@ -63,14 +63,14 @@ public class SqoopOutputFormatLoadExecutor {
SqoopOutputFormatLoadExecutor(boolean isTest, String loaderName){
this.isTest = isTest;
this.loaderName = loaderName;
data = new CSVIntermediateDataFormat();
producer = new SqoopRecordWriter();
dataFormat = new CSVIntermediateDataFormat();
writer = new SqoopRecordWriter();
}
public SqoopOutputFormatLoadExecutor(JobContext jobctx) {
context = jobctx;
producer = new SqoopRecordWriter();
data = (IntermediateDataFormat) ClassUtils.instantiate(context
writer = new SqoopRecordWriter();
dataFormat = (IntermediateDataFormat<String>) ClassUtils.instantiate(context
.getConfiguration().get(JobConstants.INTERMEDIATE_DATA_FORMAT));
Schema schema = ConfigurationUtils.getConnectorSchema(Direction.FROM, context.getConfiguration());
@ -78,14 +78,14 @@ public class SqoopOutputFormatLoadExecutor {
schema = ConfigurationUtils.getConnectorSchema(Direction.TO, context.getConfiguration());
}
data.setSchema(schema);
dataFormat.setSchema(schema);
}
public RecordWriter<SqoopWritable, NullWritable> getRecordWriter() {
consumerFuture = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat
("OutputFormatLoader-consumer").build()).submit(
new ConsumerThread());
return producer;
return writer;
}
/*
@ -98,7 +98,7 @@ public class SqoopOutputFormatLoadExecutor {
public void write(SqoopWritable key, NullWritable value) throws InterruptedException {
free.acquire();
checkIfConsumerThrew();
data.setTextData(key.getString());
dataFormat.setTextData(key.getString());
filled.release();
}
@ -144,7 +144,7 @@ public class SqoopOutputFormatLoadExecutor {
}
}
private class OutputFormatDataReader extends DataReader {
private class SqoopOutputFormatDataReader extends DataReader {
@Override
public Object[] readArrayRecord() throws InterruptedException {
@ -154,7 +154,7 @@ public class SqoopOutputFormatLoadExecutor {
return null;
}
try {
return data.getObjectData();
return dataFormat.getObjectData();
} finally {
releaseSema();
}
@ -168,7 +168,7 @@ public class SqoopOutputFormatLoadExecutor {
return null;
}
try {
return data.getTextData();
return dataFormat.getTextData();
} finally {
releaseSema();
}
@ -181,7 +181,7 @@ public class SqoopOutputFormatLoadExecutor {
return null;
}
try {
return data.getData();
return dataFormat.getData();
} catch (Throwable t) {
readerFinished = true;
LOG.error("Caught exception e while getting content ", t);
@ -215,7 +215,7 @@ public class SqoopOutputFormatLoadExecutor {
public void run() {
LOG.info("SqoopOutputFormatLoadExecutor consumer thread is starting");
try {
DataReader reader = new OutputFormatDataReader();
DataReader reader = new SqoopOutputFormatDataReader();
Configuration conf = null;
if (!isTest) {

Просмотреть файл

@ -68,14 +68,14 @@ public final class SubmissionDisplayer {
}
}
if(isVerbose() && submission.getConnectorSchema() != null) {
if(isVerbose() && submission.getFromSchema() != null) {
print(resourceString(Constants.RES_CONNECTOR_SCHEMA)+": ");
println(submission.getConnectorSchema());
println(submission.getFromSchema());
}
if(isVerbose() && submission.getHioSchema() != null) {
if(isVerbose() && submission.getToSchema() != null) {
print(resourceString(Constants.RES_HIO_SCHEMA)+": ");
println(submission.getHioSchema());
println(submission.getToSchema());
}
}

Просмотреть файл

@ -26,7 +26,7 @@ package org.apache.sqoop.job.etl;
* -> (framework-defined steps)
* -> Destroyer
*/
public class From extends CallbackBase {
public class From extends Transferable {
private Class<? extends Partitioner> partitioner;
private Class<? extends Extractor> extractor;

Просмотреть файл

@ -25,7 +25,7 @@ package org.apache.sqoop.job.etl;
* -> Loader
* -> Destroyer
*/
public class To extends CallbackBase {
public class To extends Transferable {
private Class<? extends Loader> loader;

Просмотреть файл

@ -18,14 +18,16 @@
package org.apache.sqoop.job.etl;
/**
* Set of default callbacks that must be implement by each job type.
* This entity encapsulates the workflow for data transfer via the
* {@link SqoopConnector}.It basically acts as an adapter between the data-source
* imported from or exported to.
*/
public abstract class CallbackBase {
public abstract class Transferable {
private Class<? extends Initializer> initializer;
private Class<? extends Destroyer> destroyer;
public CallbackBase(
public Transferable(
Class<? extends Initializer> initializer,
Class<? extends Destroyer> destroyer
) {
@ -46,4 +48,4 @@ public abstract class CallbackBase {
return "initializer=" + initializer.getName() +
", destroyer=" + destroyer.getName();
}
}
}

Просмотреть файл

@ -17,7 +17,6 @@
*/
package org.apache.sqoop.validation;
import org.apache.sqoop.model.MJob;
/**
* Connection and job metadata validator.

Просмотреть файл

@ -29,9 +29,9 @@ import org.apache.log4j.Logger;
import org.apache.sqoop.common.Direction;
import org.apache.sqoop.common.MapContext;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.execution.mapreduce.MRSubmissionRequest;
import org.apache.sqoop.execution.mapreduce.MRJobRequest;
import org.apache.sqoop.execution.mapreduce.MapreduceExecutionEngine;
import org.apache.sqoop.framework.SubmissionRequest;
import org.apache.sqoop.framework.JobRequest;
import org.apache.sqoop.framework.SubmissionEngine;
import org.apache.sqoop.job.JobConstants;
import org.apache.sqoop.job.mr.ConfigurationUtils;
@ -72,6 +72,7 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
*/
@Override
public void initialize(MapContext context, String prefix) {
super.initialize(context, prefix);
LOG.info("Initializing Map-reduce Submission Engine");
// Build global configuration, start with empty configuration object
@ -125,6 +126,7 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
*/
@Override
public void destroy() {
super.destroy();
LOG.info("Destroying Mapreduce Submission Engine");
// Closing job client
@ -147,9 +149,9 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
* {@inheritDoc}
*/
@Override
public boolean submit(SubmissionRequest generalRequest) {
public boolean submit(JobRequest mrJobRequest) {
// We're supporting only map reduce jobs
MRSubmissionRequest request = (MRSubmissionRequest) generalRequest;
MRJobRequest request = (MRJobRequest) mrJobRequest;
// Clone global configuration
Configuration configuration = new Configuration(globalConfiguration);
@ -208,7 +210,7 @@ public class MapreduceSubmissionEngine extends SubmissionEngine {
ConfigurationUtils.setFrameworkConnectionConfig(Direction.TO, job, request.getFrameworkConnectionConfig(Direction.TO));
ConfigurationUtils.setFrameworkJobConfig(job, request.getConfigFrameworkJob());
// @TODO(Abe): Persist TO schema.
ConfigurationUtils.setConnectorSchema(Direction.FROM, job, request.getSummary().getConnectorSchema());
ConfigurationUtils.setConnectorSchema(Direction.FROM, job, request.getSummary().getFromSchema());
if(request.getJobName() != null) {
job.setJobName("Sqoop: " + request.getJobName());