Merge branch 'master' into updatepackage
This commit is contained in:
Коммит
dd4103060a
|
@ -10,16 +10,15 @@ import java.nio.channels.FileChannel
|
|||
import java.nio.file.Files
|
||||
import java.util.concurrent.{Executors, TimeUnit}
|
||||
import java.util.zip.GZIPInputStream
|
||||
|
||||
import com.google.common.io.{Files => GFiles}
|
||||
import datax.config.SparkEnvVariables
|
||||
import datax.constants.{ProductConstant, BlobProperties}
|
||||
import datax.constants.{BlobProperties, ProductConstant}
|
||||
import datax.exception.EngineException
|
||||
import datax.securedsetting.KeyVaultClient
|
||||
import datax.telemetry.AppInsightLogger
|
||||
import org.apache.commons.codec.digest.DigestUtils
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.{FileSystem, Path, RemoteIterator}
|
||||
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, RemoteIterator}
|
||||
import org.apache.log4j.LogManager
|
||||
import org.apache.spark.broadcast
|
||||
|
||||
|
@ -511,12 +510,22 @@ object HadoopClient {
|
|||
* @return a list of file paths under the folder
|
||||
*/
|
||||
def listFiles(folder: String): Iterator[String] = {
|
||||
listFileObjects(folder).map(f=>f.getPath.toString)
|
||||
}
|
||||
|
||||
/**
|
||||
* list file objects under a folder
|
||||
*
|
||||
* @param folder path to the specified folder
|
||||
* @return a list of file paths under the folder
|
||||
*/
|
||||
def listFileObjects(folder: String): Iterator[FileStatus] = {
|
||||
resolveStorageAccountKeyForPath(folder)
|
||||
val path = new Path(folder)
|
||||
val fs = path.getFileSystem(getConf)
|
||||
|
||||
if(fs.exists(path))
|
||||
fs.listFiles(path, true).map(f=>f.getPath.toString)
|
||||
if (fs.exists(path))
|
||||
fs.listFiles(path, true)
|
||||
else
|
||||
Iterator.empty
|
||||
}
|
||||
|
|
|
@ -15,7 +15,9 @@ import datax.fs.HadoopClient
|
|||
import datax.input.BatchBlobInputSetting
|
||||
import datax.processor.{BatchBlobProcessor, CommonProcessorFactory}
|
||||
import datax.telemetry.AppInsightLogger
|
||||
import org.apache.hadoop.fs.FileStatus
|
||||
import org.apache.log4j.LogManager
|
||||
|
||||
import scala.language.postfixOps
|
||||
import scala.collection.mutable.{HashSet, ListBuffer}
|
||||
import scala.concurrent.duration._
|
||||
|
@ -92,7 +94,10 @@ object BlobBatchingHost {
|
|||
val sc = spark.sparkContext
|
||||
val processor = processorGenerator(config)
|
||||
|
||||
val filesToProcess = prefixes.flatMap(prefix=>HadoopClient.listFiles(prefix._1).toSeq)
|
||||
val filterTimeRange = config.dict.getOrElse("filterTimeRange", "").equals("true")
|
||||
appLog.warn(s"filterTimeRange: $filterTimeRange")
|
||||
val filesToProcess = prefixes.flatMap(prefix=>HadoopClient.listFileObjects(prefix._1).flatMap(f=>inTimeRange(f, filterTimeRange)).toSeq)
|
||||
appLog.warn(s"filesToProcess: ${filesToProcess.length}")
|
||||
val minTimestamp = prefixes.minBy(_._2.getTime)._2
|
||||
appLog.warn(s"Start processing for $minTimestamp")
|
||||
val pathsRDD = sc.makeRDD(filesToProcess)
|
||||
|
@ -103,9 +108,9 @@ object BlobBatchingHost {
|
|||
AppInsightLogger.trackEvent(ProductConstant.ProductRoot + "/batch/end", null, batchResult)
|
||||
|
||||
//write tracker file
|
||||
val (trackerFolder, dateTime) = getTrackerConfigs(config)
|
||||
val (trackerFolder, dateTimeStart, dateTimeEnd) = getTrackerConfigs(config)
|
||||
if (trackerFolder != "") {
|
||||
writeTracker(trackerFolder, dateTime)
|
||||
writeTracker(trackerFolder, dateTimeStart, dateTimeEnd)
|
||||
}
|
||||
/*
|
||||
This is a temporary workaround for https://github.com/Microsoft/ApplicationInsights-Java/issues/891
|
||||
|
@ -124,17 +129,23 @@ object BlobBatchingHost {
|
|||
|
||||
// get tracker configs from input arguments and sys env
|
||||
// if the trackerFolder is configured, a tracker file should be written in the folder
|
||||
def getTrackerConfigs(config: UnifiedConfig) : (String, Timestamp) = {
|
||||
def getTrackerConfigs(config: UnifiedConfig) : (String, Timestamp, Timestamp) = {
|
||||
val inputRoot = config.dict.getOrElse("trackerFolder", "")
|
||||
val timeStart = sys.env.getOrElse("process_start_datetime", "")
|
||||
(inputRoot, Timestamp.from(Instant.parse(timeStart)))
|
||||
val timeEnd = sys.env.getOrElse("process_end_datetime", "")
|
||||
(inputRoot, Timestamp.from(Instant.parse(timeStart)), Timestamp.from(Instant.parse(timeEnd)))
|
||||
}
|
||||
|
||||
// write a tracker file in specific folder with format _SUCCESS_yyyy_MM_dd_HH
|
||||
def writeTracker(trackerFolder: String, dt: Timestamp): Unit = {
|
||||
def writeTracker(trackerFolder: String, dt: Timestamp, dtEnd: Timestamp): Unit = {
|
||||
// first create folder if not exists
|
||||
HadoopClient.createFolder(trackerFolder)
|
||||
val dateFormat = new SimpleDateFormat("yyyy_MM_dd_HH")
|
||||
var fmt = "yyyy_MM_dd_HH"
|
||||
// if the interval is not 1h, the use the format yyyy_MM_dd_HH_mm
|
||||
if (dtEnd.getTime / 1000 - dt.getTime / 1000 + 1 != 3600) {
|
||||
fmt += "_mm"
|
||||
}
|
||||
val dateFormat = new SimpleDateFormat(fmt)
|
||||
|
||||
val out = "_SUCCESS_" + dateFormat.format(dt)
|
||||
val outFilename = trackerFolder + out
|
||||
|
@ -142,4 +153,22 @@ object BlobBatchingHost {
|
|||
appLog.warn(s"tracker file has been written: $outFilename")
|
||||
|
||||
}
|
||||
|
||||
def inTimeRange(fs: FileStatus, filterTimeRange: Boolean): Iterator[String] = {
|
||||
if (!filterTimeRange) {
|
||||
return Iterator(fs.getPath.toString)
|
||||
}
|
||||
val start = Timestamp.from(Instant.parse(sys.env.getOrElse("process_start_datetime", ""))).getTime / 1000
|
||||
val end = Timestamp.from(Instant.parse(sys.env.getOrElse("process_end_datetime", ""))).getTime / 1000
|
||||
val fTime = fs.getModificationTime / 1000
|
||||
appLog.warn(s"inTimeRange: start $start, end $end, fTime $fTime")
|
||||
|
||||
if (start % 3600 == 0) {
|
||||
if (fTime <= end) Iterator(fs.getPath.toString) else Iterator.empty
|
||||
} else if ((end + 1) % 3600 == 0) {
|
||||
if (fTime >= start) Iterator(fs.getPath.toString) else Iterator.empty
|
||||
} else {
|
||||
if (fTime >= start && fTime <= end) Iterator(fs.getPath.toString) else Iterator.empty
|
||||
}
|
||||
}
|
||||
}
|
Разница между файлами не показана из-за своего большого размера
Загрузить разницу
|
@ -26,7 +26,7 @@
|
|||
},
|
||||
"devDependencies": {
|
||||
"@babel/core": "7.19.0",
|
||||
"@babel/preset-env": "7.18.10",
|
||||
"@babel/preset-env": "7.19.0",
|
||||
"@babel/preset-react": "7.18.6",
|
||||
"babel-loader": "8.2.5",
|
||||
"css-loader": "5.2.7",
|
||||
|
|
|
@ -8084,30 +8084,27 @@
|
|||
}
|
||||
},
|
||||
"react-router": {
|
||||
"version": "4.3.1",
|
||||
"resolved": "https://registry.npmjs.org/react-router/-/react-router-4.3.1.tgz",
|
||||
"integrity": "sha512-yrvL8AogDh2X42Dt9iknk4wF4V8bWREPirFfS9gLU1huk6qK41sg7Z/1S81jjTrGHxa3B8R3J6xIkDAA6CVarg==",
|
||||
"version": "5.3.3",
|
||||
"resolved": "https://registry.npmjs.org/react-router/-/react-router-5.3.3.tgz",
|
||||
"integrity": "sha512-mzQGUvS3bM84TnbtMYR8ZjKnuPJ71IjSzR+DE6UkUqvN4czWIqEs17yLL8xkAycv4ev0AiN+IGrWu88vJs/p2w==",
|
||||
"dev": true,
|
||||
"requires": {
|
||||
"history": "^4.7.2",
|
||||
"hoist-non-react-statics": "^2.5.0",
|
||||
"invariant": "^2.2.4",
|
||||
"@babel/runtime": "^7.12.13",
|
||||
"history": "^4.9.0",
|
||||
"hoist-non-react-statics": "^3.1.0",
|
||||
"loose-envify": "^1.3.1",
|
||||
"mini-create-react-context": "^0.4.0",
|
||||
"path-to-regexp": "^1.7.0",
|
||||
"prop-types": "^15.6.1",
|
||||
"warning": "^4.0.1"
|
||||
"prop-types": "^15.6.2",
|
||||
"react-is": "^16.6.0",
|
||||
"tiny-invariant": "^1.0.2",
|
||||
"tiny-warning": "^1.0.0"
|
||||
},
|
||||
"dependencies": {
|
||||
"hoist-non-react-statics": {
|
||||
"version": "2.5.5",
|
||||
"resolved": "https://registry.npmjs.org/hoist-non-react-statics/-/hoist-non-react-statics-2.5.5.tgz",
|
||||
"integrity": "sha512-rqcy4pJo55FTTLWt+bU8ukscqHeE/e9KWvsOW2b/a3afxQZhwkQdT1rPPCJ0rYXdj4vNcasY8zHTH+jF/qStxw==",
|
||||
"dev": true
|
||||
},
|
||||
"isarray": {
|
||||
"version": "0.0.1",
|
||||
"resolved": "https://registry.npmjs.org/isarray/-/isarray-0.0.1.tgz",
|
||||
"integrity": "sha1-ihis/Kmo9Bd+Cav8YDiTmwXR7t8=",
|
||||
"integrity": "sha512-D2S+3GLxWH+uhrNEcoh/fnmYeP8E8/zHl644d/jdA0g2uyXvy3sb0qxotE+ne0LtccHknQzWwZEzhak7oJ0COQ==",
|
||||
"dev": true
|
||||
},
|
||||
"path-to-regexp": {
|
||||
|
@ -9945,15 +9942,6 @@
|
|||
"integrity": "sha512-2ham8XPWTONajOR0ohOKOHXkm3+gaBmGut3SRuu75xLd/RRaY6vqgh8NBYYk7+RW3u5AtzPQZG8F10LHkl0lAQ==",
|
||||
"dev": true
|
||||
},
|
||||
"warning": {
|
||||
"version": "4.0.3",
|
||||
"resolved": "https://registry.npmjs.org/warning/-/warning-4.0.3.tgz",
|
||||
"integrity": "sha512-rpJyN222KWIvHJ/F53XSZv0Zl/accqHR8et1kpaMTD/fLCRxtV8iX8czMzY7sVZupTI3zcUTg8eycS2kNF9l6w==",
|
||||
"dev": true,
|
||||
"requires": {
|
||||
"loose-envify": "^1.0.0"
|
||||
}
|
||||
},
|
||||
"watchpack": {
|
||||
"version": "1.7.5",
|
||||
"resolved": "https://registry.npmjs.org/watchpack/-/watchpack-1.7.5.tgz",
|
||||
|
|
|
@ -71,7 +71,7 @@
|
|||
"prop-types": "15.8.1",
|
||||
"radium": "0.25.0",
|
||||
"react-redux": "6.0.1",
|
||||
"react-router": "4.3.1",
|
||||
"react-router": "5.3.3",
|
||||
"react-router-dom": "5.3.3",
|
||||
"redis-console": "1.0.0",
|
||||
"redux": "4.2.0",
|
||||
|
|
Загрузка…
Ссылка в новой задаче