Add a new behavior of the vacuum index (#479)

This commit is contained in:
paryoja 2021-11-04 13:14:11 +09:00 коммит произвёл GitHub
Родитель 661df177a5
Коммит 2f8d32b422
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
15 изменённых файлов: 537 добавлений и 40 удалений

Просмотреть файл

@ -38,44 +38,50 @@ class HyperspaceIndexManagementTests(HyperspaceTestCase):
idx_config = IndexConfig('idx2', ['name'], ['age'])
self.hyperspace.createIndex(self.df, idx_config)
self.assertEqual(self.hyperspace.indexes().filter(
"""name = "idx2" and state = "ACTIVE" """).count(), 1)
"""name = "idx2" and state = "ACTIVE" """).count(), 1)
self.assertEqual(self.hyperspace.indexes().filter(
"""name = "idx2" and state = "DELETED" """).count(), 0)
"""name = "idx2" and state = "DELETED" """).count(), 0)
self.hyperspace.deleteIndex("idx2")
self.assertEqual(self.hyperspace.indexes().filter(
"""name = "idx2" and state = "DELETED" """).count(), 1)
"""name = "idx2" and state = "DELETED" """).count(), 1)
self.assertEqual(self.hyperspace.indexes().filter(
"""name = "idx2" and state = "ACTIVE" """).count(), 0)
"""name = "idx2" and state = "ACTIVE" """).count(), 0)
def test_index_restore(self):
idx_config = IndexConfig('idx3', ['name'], ['age'])
self.hyperspace.createIndex(self.df, idx_config)
self.hyperspace.deleteIndex("idx3")
self.assertEqual(self.hyperspace.indexes().filter(
"""name = "idx3" and state = "DELETED" """).count(), 1)
"""name = "idx3" and state = "DELETED" """).count(), 1)
self.hyperspace.restoreIndex("idx3")
self.assertEqual(self.hyperspace.indexes().filter(
"""name = "idx3" and state = "ACTIVE" """).count(), 1)
"""name = "idx3" and state = "ACTIVE" """).count(), 1)
self.assertEqual(self.hyperspace.indexes().filter(
"""name = "idx3" and state = "DELETED" """).count(), 0)
"""name = "idx3" and state = "DELETED" """).count(), 0)
def test_index_vacuum(self):
idx_config = IndexConfig('idx4', ['name'], ['age'])
self.hyperspace.createIndex(self.df, idx_config)
self.hyperspace.deleteIndex("idx4")
self.assertEqual(self.hyperspace.indexes().filter(
"""name = "idx4" and state = "DELETED" """).count(), 1)
"""name = "idx4" and state = "DELETED" """).count(), 1)
self.hyperspace.vacuumIndex("idx4")
self.assertEqual(self.hyperspace.indexes().filter("""name = "idx4" """).count(), 0)
def test_index_refresh(self):
# vacuuming of active index leaves the index as active
idx_config = IndexConfig('idx5', ['name'], ['age'])
self.hyperspace.createIndex(self.df, idx_config)
self.assertEqual(self.hyperspace.indexes().filter(
"""name = "idx5" and state = "ACTIVE" """).count(), 1)
def test_index_refresh_incremental(self):
idx_config = IndexConfig('idx1', ['name'], ['age'])
self.hyperspace.createIndex(self.df, idx_config)
# Test the inter-op works fine for refreshIndex.
self.hyperspace.refreshIndex('idx1')
self.hyperspace.refreshIndex('idx1', 'incremental')
def test_index_refresh(self):
def test_index_refresh_full(self):
idx_config = IndexConfig('idx1', ['name'], ['age'])
self.hyperspace.createIndex(self.df, idx_config)
# Test the inter-op works fine for optimizeIndex.
@ -89,6 +95,7 @@ class HyperspaceIndexManagementTests(HyperspaceTestCase):
df = self.hyperspace.index('idx1')
df.show()
hyperspace_test = unittest.TestLoader().loadTestsFromTestCase(HyperspaceIndexManagementTests)
result = unittest.TextTestRunner(verbosity=3).run(hyperspace_test)
sys.exit(not result.wasSuccessful())

Просмотреть файл

@ -65,9 +65,10 @@ class Hyperspace(spark: SparkSession) {
}
/**
* Does hard delete of indexes marked as `DELETED`.
* Does hard delete of the entire indexes if it is marked as `DELETED`.
* Does clean up index (hard delete of the old indexes) if the index is 'ACTIVE'.
*
* @param indexName Name of the index to restore.
* @param indexName Name of the index to vacuum.
*/
def vacuumIndex(indexName: String): Unit = {
indexManager.vacuum(indexName)

Просмотреть файл

@ -24,6 +24,7 @@ object Constants {
val DELETED = "DELETED"
val REFRESHING = "REFRESHING"
val VACUUMING = "VACUUMING"
val VACUUMINGOUTDATED = "VACUUMINGOUTDATED"
val RESTORING = "RESTORING"
val OPTIMIZING = "OPTIMIZING"
val DOESNOTEXIST = "DOESNOTEXIST"

Просмотреть файл

@ -0,0 +1,144 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.actions
import org.apache.hadoop.fs.Path
import com.microsoft.hyperspace.{Hyperspace, HyperspaceException}
import com.microsoft.hyperspace.actions.Constants.States.{ACTIVE, VACUUMINGOUTDATED}
import com.microsoft.hyperspace.index.{IndexConstants, IndexDataManager, IndexLogEntry, IndexLogManager, LogEntry}
import com.microsoft.hyperspace.index.sources.delta.DeltaLakeRelationMetadata
import com.microsoft.hyperspace.telemetry.{AppInfo, HyperspaceEvent, VacuumOutdatedActionEvent}
import com.microsoft.hyperspace.util.FileUtils
/**
* Vacuum outdated data of indexes.
*
* Algorithm:
* - Delete every version except the latest versions.
*/
class VacuumOutdatedAction(
final override protected val logManager: IndexLogManager,
dataManager: IndexDataManager)
extends Action {
private lazy val previousIndexLogEntry = {
logManager.getLog(baseId) match {
case Some(e: IndexLogEntry) => e
case _ =>
throw HyperspaceException("LogEntry must exist for vacuum outdated operation.")
}
}
final override lazy val logEntry: LogEntry = {
previousIndexLogEntry.relations match {
case null => previousIndexLogEntry
case relations if relations.nonEmpty =>
val relationMetadata = Hyperspace
.getContext(spark)
.sourceProviderManager
.getRelationMetadata(relations.head)
val updatedDerivedDataset = relationMetadata match {
case deltaLakeRelationMetadata: DeltaLakeRelationMetadata =>
// Reset Delta Lake version mapping.
val resetProperty = deltaLakeRelationMetadata.resetDeltaVersionHistory(
previousIndexLogEntry.derivedDataset.properties)
val newProperty = deltaLakeRelationMetadata.enrichIndexProperties(
resetProperty + (IndexConstants.INDEX_LOG_VERSION -> endId.toString))
previousIndexLogEntry.derivedDataset.withNewProperties(newProperty)
case _ => previousIndexLogEntry.derivedDataset
}
previousIndexLogEntry.copy(derivedDataset = updatedDerivedDataset)
case _ => previousIndexLogEntry
}
}
override def transientState: String = VACUUMINGOUTDATED
override def finalState: String = ACTIVE
override def validate(): Unit = {
if (!previousIndexLogEntry.state.equalsIgnoreCase(ACTIVE)) {
throw HyperspaceException(
s"VacuumOutdated is only supported in $ACTIVE state. " +
s"Current state is ${previousIndexLogEntry.state}.")
}
}
final override def op(): Unit = {
// Delete unused directory first, then delete unused files in used directories.
val indexVersionsInUse: Set[Int] = logEntry match {
case indexLogEntry: IndexLogEntry =>
dataVersionInfos(indexLogEntry)
case other =>
throw HyperspaceException(
s"VacuumOutdated is not supported for log entry class ${other.getClass.getName}")
}
// Delete version directories not used.
dataManager.getAllVersionIds().foreach { id =>
if (!indexVersionsInUse.contains(id)) {
dataManager.delete(id)
}
}
val filesInUse = logEntry match {
case indexLogEntry: IndexLogEntry =>
indexLogEntry.content.fileInfos.map { info =>
info.name
}
}
// Delete unused files.
dataManager.getAllFilePaths().foreach { path =>
// Ignore files such as "_SUCCESS" and "._SUCCESS.crc".
if (!path.getName.startsWith("_") &&
!path.getName.startsWith("._") &&
!filesInUse.contains(path.toString)) {
FileUtils.delete(path)
}
}
}
/**
* Extracts latest versions of an index.
*
* @return List of directory paths containing index files for latest index version.
*/
private[actions] def dataVersionInfos(entry: IndexLogEntry): Set[Int] = {
// Get used versions using the filenames of contents.
// length + 1 due to '=' between prefix and version number.
val prefixLength = IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX.length + 1
entry
.indexDataDirectoryPaths()
.map(dirname => new Path(dirname).getName)
.collect {
case name if name.startsWith(IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX) =>
name.drop(prefixLength).toInt
}
.toSet
}
override protected def event(appInfo: AppInfo, message: String): HyperspaceEvent = {
VacuumOutdatedActionEvent(appInfo, logEntry.asInstanceOf[IndexLogEntry], message)
}
}

Просмотреть файл

@ -22,7 +22,7 @@ import org.apache.spark.sql.internal.SQLConf
import com.microsoft.hyperspace.HyperspaceException
import com.microsoft.hyperspace.actions._
import com.microsoft.hyperspace.actions.Constants.States.DOESNOTEXIST
import com.microsoft.hyperspace.actions.Constants.States.{ACTIVE, DOESNOTEXIST}
import com.microsoft.hyperspace.index.IndexConstants.{REFRESH_MODE_FULL, REFRESH_MODE_INCREMENTAL, REFRESH_MODE_QUICK}
class IndexCollectionManager(
@ -60,13 +60,23 @@ class IndexCollectionManager(
}
override def vacuum(indexName: String): Unit = {
// Note that the behavior of vacuum index is different when the state is ACTIVE.
// The event that action creates is also different.
withLogManager(indexName) { logManager =>
val hadoopConf = spark.sessionState.newHadoopConf()
val indexPath = PathResolver(spark.sessionState.conf, hadoopConf)
.getIndexPath(indexName)
val dataManager =
indexDataManagerFactory.create(indexPath, hadoopConf)
new VacuumAction(logManager, dataManager).run()
logManager.getLatestLog() match {
case Some(index) if index.state == ACTIVE =>
// clean up only if state is ACTIVE.
new VacuumOutdatedAction(logManager, dataManager).run()
case _ =>
new VacuumAction(logManager, dataManager).run()
}
}
}

Просмотреть файл

@ -36,8 +36,12 @@ import com.microsoft.hyperspace.util.FileUtils
* f1.parquet
*/
trait IndexDataManager {
def getAllFilePaths(): Seq[Path]
def getLatestVersionId(): Option[Int]
def getAllVersionIds(): Seq[Int]
def getPath(id: Int): Path
def delete(id: Int): Unit
@ -49,23 +53,53 @@ class IndexDataManagerImpl(indexPath: Path, configuration: Configuration)
private lazy val fs: FileSystem = indexPath.getFileSystem(configuration)
/**
* This method relies on the naming convention that directory name will be similar to hive
* partitioning scheme, i.e. "root/v__=value/f1.parquet" etc. Here the value represents the
* version id of the data.
* Get latest version id of the index data directory.
*/
override def getLatestVersionId(): Option[Int] = {
if (!fs.exists(indexPath)) {
return None
}
val prefixLength = IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX.length + 1
val ids = fs.listStatus(indexPath).collect {
case status
if status.getPath.getName.startsWith(IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX) =>
status.getPath.getName.drop(prefixLength).toInt
}
val ids = getAllVersionIds()
if (ids.isEmpty) None else Some(ids.max)
}
/**
* This method relies on the naming convention that directory name will be similar to hive
* partitioning scheme, i.e. {{{"root/v__=value/f1.parquet"}}} etc. Here the value represents the
* version id of the data.
*/
override def getAllVersionIds(): Seq[Int] = {
if (!fs.exists(indexPath)) {
return Nil
}
val prefixLength = IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX.length + 1
fs.listStatus(indexPath)
.collect {
case status
if status.getPath.getName.startsWith(IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX) =>
status.getPath.getName.drop(prefixLength).toInt
}
}
/**
* Get all file paths in the index directory.
*/
override def getAllFilePaths(): Seq[Path] = {
if (!fs.exists(indexPath)) {
return Nil
}
val directories = fs.listStatus(indexPath).collect {
case status
if status.getPath.getName.startsWith(IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX) =>
status.getPath
}
directories.flatMap { dir =>
fs.listStatus(dir).collect {
case status => status.getPath
}
}
}
/**
* Get directory path of the given id.
*/
override def getPath(id: Int): Path = {
new Path(indexPath, s"${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=${id.toString}")
}

Просмотреть файл

@ -513,6 +513,24 @@ case class IndexLogEntry(
(name, derivedDataset, signature, content).hashCode
}
/**
* Extracts paths to top-level directories paths which
* contain the latest version index files.
*
* @return List of directory paths containing index files for latest index version.
*/
def indexDataDirectoryPaths(): Seq[String] = {
var prefix = content.root.name
var directory = content.root
while (directory.subDirs.size == 1 &&
!directory.subDirs.head.name.startsWith(IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX)) {
prefix += s"${directory.subDirs.head.name}/"
directory = directory.subDirs.head
}
directory.subDirs.map(d => s"$prefix${d.name}")
}
/**
* A mutable map for holding auxiliary information of this index log entry while applying rules.
*/

Просмотреть файл

@ -54,7 +54,9 @@ trait IndexManager {
def restore(indexName: String): Unit
/**
* Does hard delete of indexes marked as `DELETED`. Once vacuumed, an index can't be 'restore'd.
* If the index is marked as `DELETED`, does hard delete of indexes while does
* If it is 'ACTIVE', does clean up of indexes (hard delete of unused index files).
* Once vacuumed, hard deleted index files can't be 'restore'd.
*
* @param indexName Name of the index to vacuum.
*/

Просмотреть файл

@ -139,15 +139,7 @@ private[hyperspace] object IndexStatistics {
* @return List of directory paths containing index files for latest index version.
*/
private def getIndexContentDirectoryPaths(entry: IndexLogEntry): Seq[String] = {
var root = entry.content.root
var prefix = entry.content.root.name
while (root.subDirs.size == 1 &&
!root.subDirs.head.name.startsWith(IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX)) {
prefix += s"${root.subDirs.head.name}/"
root = root.subDirs.head
}
root.subDirs.map(d => s"$prefix${d.name}")
entry.indexDataDirectoryPaths()
}
/**

Просмотреть файл

@ -57,5 +57,15 @@ class DeltaLakeRelationMetadata(metadata: Relation) extends FileBasedRelationMet
properties ++ deltaVerHistory
}
/**
* Remove DELTA_VERSION_HISTORY_PROPERTY from properties.
*
* @param properties Index properties to reset.
* @return Updated index properties for vacuum outdated data.
*/
def resetDeltaVersionHistory(properties: Map[String, String]): Map[String, String] = {
properties - DeltaLakeConstants.DELTA_VERSION_HISTORY_PROPERTY
}
override def canSupportUserSpecifiedSchema: Boolean = false
}

Просмотреть файл

@ -84,6 +84,16 @@ case class RestoreActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: S
case class VacuumActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: String)
extends HyperspaceIndexCRUDEvent
/**
* Deletion of old index files event. Emitted when vacuum is called on an ACTIVE index.
*
* @param appInfo AppInfo for spark application.
* @param index Related index.
* @param message Message about event.
*/
case class VacuumOutdatedActionEvent(appInfo: AppInfo, index: IndexLogEntry, message: String)
extends HyperspaceIndexCRUDEvent
/**
* Index Refresh Event. Emitted when refresh is called on an index.
*

Просмотреть файл

@ -0,0 +1,156 @@
/*
* Copyright (2020) The Hyperspace Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.microsoft.hyperspace.actions
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkFunSuite
import org.mockito.ArgumentMatchers.anyInt
import org.mockito.Mockito.{mock, verify, when}
import org.mockito.internal.verification.Times
import com.microsoft.hyperspace.{HyperspaceException, SparkInvolvedSuite}
import com.microsoft.hyperspace.actions.Constants.States._
import com.microsoft.hyperspace.index.{Content, Directory, FileInfo, IndexConstants, IndexDataManager, IndexLogEntry, IndexLogManager}
import com.microsoft.hyperspace.index.IndexConstants.UNKNOWN_FILE_ID
import com.microsoft.hyperspace.index.covering.CoveringIndex
class VacuumOutdatedActionTest extends SparkFunSuite with SparkInvolvedSuite {
private val mockLogManager: IndexLogManager = mock(classOf[IndexLogManager])
private val mockDataManager: IndexDataManager = mock(classOf[IndexDataManager])
private val mockIndexLogEntry: IndexLogEntry = mock(classOf[IndexLogEntry])
private val mockContent: Content = mock(classOf[Content])
override def beforeAll(): Unit = {
super.beforeAll()
when(mockLogManager.getLatestId()).thenReturn(None)
}
def versionDirectories(versions: Seq[Int]): Seq[String] = {
versions.map(version =>
s"file:/a/b/${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=$version")
}
test("validate() passes if old index logs are found with ACTIVE state.") {
when(mockLogManager.getLog(anyInt)).thenReturn(Some(mockIndexLogEntry))
when(mockIndexLogEntry.state).thenReturn(ACTIVE)
val action = new VacuumOutdatedAction(mockLogManager, mockDataManager)
// No exception thrown is considered a pass
action.validate()
}
test("validate() fails if old index logs found with non-ACTIVE state") {
when(mockLogManager.getLog(anyInt)).thenReturn(Some(mockIndexLogEntry))
when(mockIndexLogEntry.state).thenReturn(CREATING)
val action = new VacuumOutdatedAction(mockLogManager, mockDataManager)
val ex = intercept[HyperspaceException](action.validate())
assert(
ex.getMessage.contains(
"VacuumOutdated is only supported in ACTIVE state. Current state is CREATING."))
}
test("op() calls which deletes nothing since every data is up-to-date") {
val pathPrefix: String = s"file:/a/b/${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=2"
val sampleFileName = s"$pathPrefix/part-00053-.c000.snappy.parquet"
val sampleFilePath = new Path(sampleFileName)
when(mockLogManager.getLog(anyInt)).thenReturn(Some(mockIndexLogEntry))
when(mockDataManager.getAllVersionIds()).thenReturn(Seq(0, 1, 2))
when(mockDataManager.getAllFilePaths()).thenReturn(Seq(sampleFilePath))
when(mockIndexLogEntry.indexDataDirectoryPaths())
.thenReturn(versionDirectories(Seq(0, 1, 2)))
when(mockIndexLogEntry.content).thenReturn(mockContent)
when(mockContent.fileInfos).thenReturn(Set(FileInfo(sampleFileName, 0, 0, 0)))
val action = new VacuumOutdatedAction(mockLogManager, mockDataManager)
action.op()
verify(mockDataManager, new Times(0)).delete(-1)
verify(mockDataManager, new Times(0)).delete(0)
verify(mockDataManager, new Times(0)).delete(1)
verify(mockDataManager, new Times(0)).delete(2)
verify(mockDataManager, new Times(0)).delete(3)
}
test("op() calls delete for all outdated data") {
val pathPrefix: String = s"file:/a/b/${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=2"
val sampleFileName1 = s"$pathPrefix/part-00053-.c000.snappy.parquet"
val sampleFilePath1 = new Path(sampleFileName1)
val sampleFileName2 = s"$pathPrefix/part-00027-.c000.snappy.parquet"
val sampleFilePath2 = new Path(sampleFileName2)
when(mockLogManager.getLog(anyInt)).thenReturn(Some(mockIndexLogEntry))
when(mockDataManager.getAllVersionIds()).thenReturn(Seq(0, 1, 2, 3))
when(mockDataManager.getAllFilePaths()).thenReturn(Seq(sampleFilePath1, sampleFilePath2))
when(mockIndexLogEntry.indexDataDirectoryPaths()).thenReturn(versionDirectories(Seq(2, 3)))
when(mockIndexLogEntry.content).thenReturn(mockContent)
when(mockContent.fileInfos).thenReturn(
Set(FileInfo(sampleFileName1, 0, 0, 0), FileInfo(sampleFileName2, 0, 0, 1)))
val action = new VacuumOutdatedAction(mockLogManager, mockDataManager)
action.op()
verify(mockDataManager).delete(0)
verify(mockDataManager).delete(1)
verify(mockDataManager, new Times(0)).delete(2)
verify(mockDataManager, new Times(0)).delete(3)
verify(mockDataManager, new Times(0)).delete(-1)
}
test("versionInfos gets correct version info.") {
val versions = Seq(4, 5)
val action = new VacuumOutdatedAction(mockLogManager, mockDataManager)
val versionDirectory =
versions.map(
version =>
Directory(
s"${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=$version",
files = Seq(FileInfo(s"index_$version", 0, 0, UNKNOWN_FILE_ID))))
val content = Content(
Directory(
"file:/",
subDirs = Seq(Directory(
"a",
files =
Seq(FileInfo("f1", 0, 0, UNKNOWN_FILE_ID), FileInfo("f2", 0, 0, UNKNOWN_FILE_ID)),
subDirs = Seq(
Directory(
"b",
files =
Seq(FileInfo("f3", 0, 0, UNKNOWN_FILE_ID), FileInfo("f4", 0, 0, UNKNOWN_FILE_ID)),
subDirs = versionDirectory))))))
val entry = IndexLogEntry.create(
"indexName",
CoveringIndex(Seq("col1"), Seq("col2", "col3"), null, 200, Map()),
content,
null,
Map())
val expected = versions.toSet
val actual = action.dataVersionInfos(entry)
assert(actual.equals(expected))
}
}

Просмотреть файл

@ -21,6 +21,7 @@ import java.sql.Timestamp
import scala.collection.mutable
import io.delta.tables.DeltaTable
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{DataFrame, QueryTest}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@ -28,11 +29,12 @@ import org.apache.spark.sql.delta.files.TahoeLogFileIndex
import org.apache.spark.sql.execution.datasources._
import com.microsoft.hyperspace.{Hyperspace, Implicits, SampleData, TestConfig}
import com.microsoft.hyperspace.TestUtils.latestIndexLogEntry
import com.microsoft.hyperspace.TestUtils.{getFileIdTracker, latestIndexLogEntry}
import com.microsoft.hyperspace.index.IndexConstants.REFRESH_MODE_QUICK
import com.microsoft.hyperspace.index.plans.logical.IndexHadoopFsRelation
import com.microsoft.hyperspace.index.sources.delta.DeltaLakeRelation
import com.microsoft.hyperspace.util.PathUtils
import com.microsoft.hyperspace.util.PathUtils.DataPathFilter
class DeltaLakeIntegrationTest extends QueryTest with HyperspaceSuite {
override val indexLocationDirName = "deltaLakeIntegrationTest"
@ -457,6 +459,100 @@ class DeltaLakeIntegrationTest extends QueryTest with HyperspaceSuite {
}
}
test("Verify time travel query works well with VacuumIndex.") {
withTempPathAsString { path =>
import spark.implicits._
val df = sampleData.toDF("Date", "RGUID", "Query", "imprs", "clicks")
df.write.format("delta").save(path)
val tsMap = mutable.Map[Long, String]()
tsMap.put(0, getSparkFormattedTimestamps(System.currentTimeMillis).head)
val indexName = "deltaIndex3"
val deltaDf = spark.read.format("delta").load(path)
appendAndRefresh(df, path, indexName, None, tsMap) // delta version 1
appendAndRefresh(df, path, indexName, None, tsMap) // delta version 2
val indexConfig = IndexConfig(indexName, Seq("clicks"), Seq("Query"))
withSQLConf(IndexConstants.INDEX_LINEAGE_ENABLED -> "true") {
hyperspace.createIndex(deltaDf, indexConfig)
}
withIndex(indexName) {
checkExpectedIndexUsed(indexName, path, None, 1)
withSQLConf(TestConfig.HybridScanEnabled: _*) {
appendAndRefresh(df, path, indexName, None, tsMap)
// delta version 3, index log version 1.
appendAndRefresh(df, path, indexName, Some("incremental"), tsMap)
// delta version 4, index log version 3 (refresh).
// Without delta table version, the latest log version should be applied.
checkExpectedIndexUsed(indexName, path, None, 3)
// For delta table version 0, candidate log version is 1.
checkExpectedIndexUsed(indexName, path, Some(0, tsMap(1)), 1)
appendAndRefresh(df, path, indexName, None, tsMap)
// delta version 5, index log version 3.
appendAndRefresh(df, path, indexName, None, tsMap)
// delta version 6, index log version 3.
appendAndRefresh(df, path, indexName, Some("incremental"), tsMap)
// delta version 7, index log version 5 (refresh).
hyperspace.optimizeIndex(indexName)
// delta version 7, index log version 7 (optimize).
appendAndRefresh(df, path, indexName, Some("incremental"), tsMap)
// delta version 8, index log version 9 (refresh).
hyperspace.optimizeIndex(indexName)
// delta version 8, index log version 11 (optimize).
appendAndRefresh(df, path, indexName, None, tsMap)
// delta version 9, index long version 11.
val beforeDataFiles = listFiles(path, getFileIdTracker(systemPath, indexConfig))
val beforeIndexDataFiles =
listFiles(getIndexDataPath(indexName, 0), getFileIdTracker(systemPath, indexConfig))
// Calling vacuumIndex on active index deletes outdated data and history.
hyperspace.vacuumIndex(indexName)
val afterDataFiles = listFiles(path, getFileIdTracker(systemPath, indexConfig))
val afterIndexDataFiles =
listFiles(getIndexDataPath(indexName, 0), getFileIdTracker(systemPath, indexConfig))
// Data files should not affected by vacuum index
assert(beforeDataFiles === afterDataFiles)
// Two files should be deleted in the version 0 index data directory
assert((beforeIndexDataFiles.toSet -- afterIndexDataFiles.toSet).size == 2)
// These paths should not be deleted
val fs = new Path("/").getFileSystem(new Configuration)
val shouldExistPath = Seq(0, 5)
shouldExistPath.map(idx => {
val indexDataPath = getIndexDataPath(indexName, idx)
assert(fs.exists(new Path(indexDataPath)))
})
// These path should be deleted
val shouldDeletedPath = Seq(1, 2, 3, 4)
shouldDeletedPath.map(idx => {
val indexDataPath = getIndexDataPath(indexName, idx)
assert(!fs.exists(new Path(indexDataPath)))
})
// Whenever index is used, since every history is also vacuumed,
// expected index version is always 13 (the last version after vacuum outdated).
checkExpectedIndexUsed(indexName, path, None, 13)
checkExpectedIndexUsed(indexName, path, Some(1, tsMap(1)), 13)
checkExpectedIndexUsed(indexName, path, Some(2, tsMap(2)), 13)
checkExpectedIndexUsed(indexName, path, Some(5, tsMap(5)), 13)
checkExpectedIndexUsed(indexName, path, Some(6, tsMap(6)), 13)
checkExpectedIndexUsed(indexName, path, Some(7, tsMap(7)), 13)
checkExpectedIndexUsed(indexName, path, Some(8, tsMap(8)), 13)
checkExpectedIndexUsed(indexName, path, Some(9, tsMap(9)), 13)
}
}
}
}
test("DeltaLakeRelation.closestIndex should handle indexes without delta versions.") {
withTempPathAsString { path =>
import spark.implicits._
@ -596,4 +692,20 @@ class DeltaLakeIntegrationTest extends QueryTest with HyperspaceSuite {
expectedPathsSubStr.exists(p.toString.contains(_))) && expectedPathsSubStr.forall(p =>
rootPaths.exists(_.toString.contains(p)))
}
private def listFiles(path: String, fileIdTracker: FileIdTracker): Seq[FileInfo] = {
val absolutePath = PathUtils.makeAbsolute(path)
val fs = absolutePath.getFileSystem(new Configuration)
fs.listStatus(absolutePath)
.toSeq
.filter(f => DataPathFilter.accept(f.getPath))
.map(f => FileInfo(f, fileIdTracker.addFile(f), asFullPath = true))
}
private def getIndexDataPath(indexName: String, idx: Int): String = {
val indexBasePath = spark.conf.get("spark.hyperspace.system.path")
s"${indexBasePath}/${indexName}/${IndexConstants.INDEX_VERSION_DIRECTORY_PREFIX}=${idx}"
}
}

Просмотреть файл

@ -16,7 +16,7 @@
package com.microsoft.hyperspace.index
import java.io.{File, FileNotFoundException}
import java.io.File
import java.nio.file
import java.nio.file.{Files, Paths}
@ -401,7 +401,7 @@ class IndexLogEntryTest extends HyperspaceSuite with SQLHelper {
}
test(
"Directory.fromDirectory and fromLeafFileswhere files are at same level but different" +
"Directory.fromDirectory and fromLeafFiles where files are at same level but different" +
"dirs.") {
// File Structure
// testDir/temp/a/f1

Просмотреть файл

@ -112,7 +112,7 @@ class IndexLogManagerImplTest extends HyperspaceSuite {
// find position to insert \0
val jsonContent = JsonUtils.toJson(sampleIndexLogEntry)
val sourceIndex = jsonContent.indexOf("\"source\"")
val damagedJsonContent = jsonContent.substring(0, sourceIndex + 8) + "\0" + jsonContent
val damagedJsonContent = jsonContent.substring(0, sourceIndex + 8) + "\u0000" + jsonContent
.substring(sourceIndex + 8);
FileUtils.createFile(