2 строки
22 KiB
Plaintext
2 строки
22 KiB
Plaintext
{"cells":[{"cell_type":"code","execution_count":null,"id":"0d14568e-e1df-4643-97eb-7c08cab2b863","metadata":{"microsoft":{"language":"scala"}},"outputs":[],"source":["%%spark\n","//Imports [Collapse me]\n"," import org.apache.spark.sql.delta.DeltaLog\n"," import org.apache.hadoop.conf.Configuration\n"," import org.apache.hadoop.fs.Path\n"," import org.apache.parquet.hadoop.ParquetFileReader\n"," import org.apache.parquet.hadoop.metadata.ParquetMetadata\n"," import org.apache.parquet.hadoop.metadata.FileMetaData\n"," import org.apache.parquet.hadoop.metadata.BlockMetaData\n"," import org.apache.spark.sql.Row\n"," import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, FloatType,TimestampType}\n"," import scala.collection.mutable.ListBuffer\n"," import org.apache.spark.sql.DataFrame\n"," import java.time.LocalDateTime\n"," import org.apache.spark.sql.functions.lit\n"," import io.delta.tables._\n","\n","/*****************************************************************************************************\n"," LAST UPDATED: 2024-02-14\n","\n"," Instructions:\n"," - Connect Notebook to Lakehouse (does not have to be in same Workspace)\n"," - Update deltaTable parameter immedately below instructions with name of table to analyze\n"," - Review other parameters (append vs overwrite) \n"," - Run\n"," - Review four output tables that have \"zz_DeltaAnalyzerOutput\"\n","\n"," zz_1_DeltaAnalyzerOutput_parquetFiles\n"," This table has one row per Parquet file\n"," Ideally, there should not be thousands of these\n"," This table only uses parquet file metadata and should be quick to populate\n","\n"," zz_2_DeltaAnalyzerOutput_rowRowgroups\n"," This table has one row per rowgroup and shows rowgroups for every parquet file\n"," Look for the number of rows per rowgroup. Ideally this should be 1M to 16M rows (higher the better)\n"," This table only uses parquet file metadata and should be quick to populate\n","\n"," zz_3_DeltaAnalyzerOutput_columnChunks\n"," One row per column/chunk within rowgroups\n"," Large number of output and has much more detail about dictionaries and compression\n"," This table only uses parquet file metadata and should be quick to populate\n"," \n"," zz_4_DeltaAnalyzerOutput_columns\n"," One row per column of the table\n"," Look to see how many unique values per column. If using floating point, consider modifying parquet file to use DECIMAL(17,4)\n"," This table runs a compute query against the Detla table so may take time depending on size of Delta table\n","\n"," Run \n"," %%sql\n"," OPTIMIZE tablename vorder \n","\n","\n"," Footnote:\n"," Useful doc\n"," https://www.javadoc.io/doc/org.apache.parquet/parquet-hadoop/latest/index.html\n","\n","*****************************************************************************************************/\n","\n","// Main Parameter\n","var deltaTableSchema: String = \"\" // Empty string for no schema, otherwise schema for table eg. \"dbo\"\n","val deltaTable: String = \"YOUR_DELTA_TABLENAME_HERE\" \n","\n","// Secondary Parameters\n","val timeStamp = LocalDateTime.now().toString\n","val overwriteOrAppend: String = \"Overwrite\" // \"Append\" or \"Overwrite\"\n","val printToScreen: Boolean = true // true or false\n","val skipColumnCardinality: Boolean = true // true or false\n","val skipWriteToTable: Boolean = true // true or false\n","val locale = new java.util.Locale(\"en\", \"US\")\n","\n","//Code [Collapse me]\n"," // Variables to help handle schema Y or N\n"," val HasSchema: Boolean = (deltaTableSchema.length>0)\n"," val deltaTableWithSchema = if (HasSchema) deltaTableSchema + \".\" + deltaTable else deltaTable\n"," val deltaTableSchemaPath = if (HasSchema) deltaTableSchema + \"/\" else \"\"\n"," val deltaTableSchemaFile = if (HasSchema) \"zz/\" else \"zz_\" // prefix for output files\n"," val deltaTableSchemaSQL = if (HasSchema) deltaTableSchema + \".\" else \"\"\n","\n"," val LakehouseID: String = sc.hadoopConfiguration.get(\"trident.lakehouse.id\")\n"," val defaultFS: String = sc.hadoopConfiguration.get(\"fs.defaultFS\")\n"," val deltaTablePath: String = s\"${defaultFS}/${LakehouseID}/Tables/${deltaTable}/\"\n","\n"," val deltaLog: DeltaLog = DeltaLog.forTable(spark, deltaTablePath)\n"," val formatter = java.text.NumberFormat.getInstance\n","\n"," deltaLog.update()\n"," \n"," // Figure out zOrder from history\n"," val fullHistoryDF = DeltaTable.forPath(spark, deltaTablePath).history(1)\n"," val operationParametersMapping : Map[String,Any] = fullHistoryDF.select(\"operationParameters\").head()(0).asInstanceOf[Map[String,Any]]\n"," val zOrderBy = \"\"//operationParametersMapping.get(\"zOrderBy\")\n","\n","\n","\n"," println(s\"Files: ${deltaLog.snapshot.allFiles.count}\")\n","\n"," var parquetFiles: ListBuffer[(String,String,String,String,String,Long,Long,String)] = ListBuffer()\n"," var rowGroups: ListBuffer[(String,String,String,Int,Int,Long,Long,Long,Float)] = ListBuffer()\n"," var columnChunks: ListBuffer[(String,String,String,Int,String,String,String,String,Long,Long,Long,String,Long,String)] = ListBuffer()\n","\n"," // For every Parquet file that belongs to the Delta Table\n"," val totalNumberRowGroups: Int = deltaLog.snapshot.allFiles.collect().map{ file => \n","\n"," val path: String = file.path \n"," val configuration = new Configuration()\n","\n"," val parquetFileReader: ParquetFileReader = ParquetFileReader.open(configuration, new Path(deltaTablePath + path))\n","\n"," val rowGroupMeta = parquetFileReader.getRowGroups() //java.util.ArrayList\n"," val parquetFileMetaData: FileMetaData = parquetFileReader.getFileMetaData()\n"," val parquetFileMetaDataKeyValues = parquetFileMetaData.getKeyValueMetaData() // Array\n","\n"," val parquetRecordCount: Long = parquetFileReader.getRecordCount()\n","\n"," val rowGroupSize: Int = rowGroupMeta.size\n","\n"," parquetFiles.append(\n"," (\n"," deltaTable ,\n"," timeStamp,\n"," path , \n"," parquetFileMetaDataKeyValues.get(\"com.microsoft.parquet.vorder.enabled\"),\n"," parquetFileMetaDataKeyValues.get(\"com.microsoft.parquet.vorder.level\"),\n"," parquetRecordCount,\n"," rowGroupSize,\n"," parquetFileMetaData.getCreatedBy\n"," \n"," )\n"," )\n","\n"," for(rowGroupNumber <- 0 to rowGroupSize -1)\n"," {\n"," print(\".\")\n"," val rowGroup: BlockMetaData = rowGroupMeta.get(rowGroupNumber)\n","\n"," //println(rowGroup.getColumns.size)\n"," //val rowGroupColumns: List[org.apache.parquet.hadoop.metadata.ColumnChunkMetaData] = rowGroup.getColumns\n"," val rowGroupColumns = rowGroup.getColumns\n","\n"," for(columnChunkID <- 0 to rowGroupColumns.size-1)\n"," {\n"," val columnStat = rowGroupColumns.get(columnChunkID)\n","\n"," columnChunks.append(\n"," (\n"," deltaTable,\n"," timeStamp,\n"," path,\n"," rowGroupNumber,\n"," columnStat.getPath().toString,\n"," columnStat.getCodec().toString,\n"," columnStat.getPrimitiveType().toString,\n"," columnStat.getStatistics().toString().replace(\"\\\"\",\"'\"), // need to convert double quotes to single quotes for Power BI\n"," columnStat.getTotalSize(),\n"," columnStat.getTotalUncompressedSize(),\n"," columnStat.getValueCount(),\n"," columnStat.hasDictionaryPage().toString,\n"," columnStat.getDictionaryPageOffset(),\n"," columnStat.getEncodings().toString \n"," )\n"," )\n"," }\n","\n"," val compressedPercent: Double = rowGroup.getCompressedSize.toFloat / rowGroup.getTotalByteSize * 100\n"," rowGroups.append(\n"," (\n"," deltaTable,\n"," timeStamp,\n"," path, \n"," rowGroupNumber,\n"," rowGroupSize ,\n"," rowGroup.getRowCount,\n"," rowGroup.getCompressedSize,\n"," rowGroup.getTotalByteSize,\n"," rowGroup.getCompressedSize.toFloat / rowGroup.getTotalByteSize \n"," )\n"," )\n"," }\n"," rowGroupSize\n"," }.sum\n","\n"," println(\"\")\n"," //println(\"Total number of row groups: \" + totalNumberRowGroups)\n","\n","\n","\n"," // Column Names cannot have spaces!!\n"," var parquetFilesDF: DataFrame = parquetFiles.toDF(\"TableName\",\"Timestamp\",\"Filename\",\"vorder_enabled\",\"vorder_level\",\"Rowcount\",\"RowGroups\",\"CreatedBy\")\n"," var rowGroupsDF: DataFrame = rowGroups.toDF(\"TableName\",\"Timestamp\",\"Filename\",\"RowGroupID\",\"TotalFileRowGroups\",\"Rowcount\",\"CompressedSize\",\"UncompressedSize\",\"CompressionRatio\")\n"," var columnChunksDF: DataFrame = columnChunks.toDF(\"TableName\",\"Timestamp\",\"Filename\",\"ColumnChunkID\",\"Path\",\"Codec\",\"PrimativeType\",\"Statistics\",\"TotalSize\",\"TotalUncompressedSize\",\"ValueCount\",\"HasDict\",\"DictOffset\",\"Encodings\" )\n","\n"," /********************************************************************************************************************\n"," Create DF for one row per column\n"," ********************************************************************************************************************/\n","\n"," var columns: ListBuffer[(String,String,String,Long,String,Long,Long,Long,Long,Double,Double)] = ListBuffer()\n"," val columnList: Array[String] = spark.table(deltaTableWithSchema).columns\n","\n"," val totalRows = rowGroupsDF.agg(sum(\"Rowcount\")).first().getLong(0)\n"," val tableSize = rowGroupsDF.agg(sum(\"CompressedSize\")).first().getLong(0)\n"," val totalRowGroups = parquetFilesDF.agg(sum(\"RowGroups\")).first().getLong(0)\n","\n"," if(totalRows<=1000000)\n"," {\n","\n"," /* Using single query for all columns */\n","\n"," var sql: String = \"select 1 as dummy\"\n"," for(column <- columnList) \n"," {\n"," sql+=(s\", count(distinct ${column}) as ${column}\")\n"," }\n"," sql +=s\" from ${deltaTableSchemaSQL}${deltaTable}\"\n"," val distinctDFblock: DataFrame = spark.sql(sql)\n"," \n"," for(column <- columnList) \n"," {\n"," val filtercondition: String = s\"Path = '[${column.toString}]'\"\n"," val distinctCount = distinctDFblock.select(col(column).cast(\"long\")).first().getLong(0)\n"," val primativeType= columnChunksDF.filter(filtercondition).select(col(\"PrimativeType\")).first().getString(0)\n"," val columnSize = columnChunksDF.filter(filtercondition).agg(sum(\"TotalSize\")).first().getLong(0)\n"," val columnSizeUncompressed = columnChunksDF.filter(filtercondition).agg(sum(\"TotalUncompressedSize\")).first().getLong(0)\n","\n"," columns.append(\n"," (\n"," deltaTable,\n"," timeStamp,\n"," column ,\n"," distinctCount,\n"," primativeType,\n"," columnSize,\n"," columnSizeUncompressed,\n"," totalRows ,\n"," tableSize ,\n"," distinctCount.toDouble/totalRows*100.0,\n"," columnSize.toDouble/tableSize*100.0\n"," )\n"," )\n"," }\n","\n"," } else {\n","\n"," var NotTheseColumns = List(\"requestPathUri\",\"clientRequestId\",\"fileSystemID\")\n"," for(column <- columnList) \n"," {\n"," \n"," val filtercondition: String = s\"Path = '[${column.toString}]'\"\n","\n"," var primativeType = \"None detected\" \n"," var columnSize : Long = 0\n"," var columnSizeUncompressed : Long = 0\n"," val filteredRows = columnChunksDF.filter(filtercondition).count()\n","\n"," if(filteredRows>0)\n"," {\n"," // may error here because partitioned columns do not show up in columnChunkDF\n"," primativeType= columnChunksDF.filter(filtercondition).select(col(\"PrimativeType\")).first().getString(0)\n"," columnSize = columnChunksDF.filter(filtercondition).agg(sum(\"TotalSize\")).first().getLong(0)\n"," columnSizeUncompressed = columnChunksDF.filter(filtercondition).agg(sum(\"TotalUncompressedSize\")).first().getLong(0)\n"," }\n","\n"," var distinctCount: Long = 0;\n"," if(!NotTheseColumns.contains(column))\n"," {\n"," var sql:String = \"\"\n","\n"," if(totalRows<10000000)\n"," {\n","\n"," if(skipColumnCardinality)\n"," {\n"," distinctCount=0\n"," println(s\"Skipping column cardinality checks [${column}]\")\n"," } else {\n"," println(\"Running Precise DCOUNT on \" + column) \n"," sql = s\"select count(distinct ${column}) as ${column} from ${deltaTableSchemaSQL}${deltaTable}\"\n"," val distinctDF: DataFrame = spark.sql(sql)\n"," distinctCount = distinctDF.select(col(column).cast(\"long\")).first().getLong(0)\n"," }\n"," \n"," } else {\n","\n"," if(skipColumnCardinality)\n"," {\n"," distinctCount=0\n"," println(s\"Skipping column cardinality checks [${column}]\")\n"," } else {\n"," println(\"Running Approx DCOUNT on \" + column) \n"," sql = s\"select approx_count_distinct(${column}) as ${column} from ${deltaTableSchemaSQL}${deltaTable}\"\n"," val distinctDF: DataFrame = spark.sql(sql)\n"," distinctCount = distinctDF.select(col(column).cast(\"long\")).first().getLong(0)\n"," } \n"," }\n","\n","\n"," } else {\n"," println(\"Skipping \" + column) \n"," }\n","\n"," columns.append(\n"," (\n"," deltaTable,\n"," timeStamp,\n"," column ,\n"," distinctCount,\n"," primativeType,\n"," columnSize,\n"," columnSizeUncompressed,\n"," totalRows ,\n"," tableSize ,\n"," distinctCount.toDouble/totalRows*100.0,\n"," columnSize.toDouble/tableSize*100.0\n"," )\n"," )\n"," }\n"," }\n"," \n"," var columnsDF: DataFrame = columns.toDF(\"TableName\",\"Timestamp\",\"ColumnName\",\"DistinctCount\",\"PrimitiveType\",\"ColumnSize\",\"ColumnSizeUncompressed\",\"TotalRows\",\"TableSize\",\"CardinalityOfTotalRows\",\"SizePercentOfTable\")\n","\n"," parquetFilesDF = parquetFilesDF\n"," .withColumn(\"TotalTableRows\",lit(totalRows))\n"," .withColumn(\"TotalTableRowGroups\",lit(totalRowGroups))\n"," rowGroupsDF = rowGroupsDF\n"," .withColumn(\"TotalTableRows\",lit(totalRows))\n"," .withColumn(\"RatioOfTotalTableRows\", col(\"Rowcount\")*100.0 / lit(totalRows))\n"," .withColumn(\"TotalTableRowGroups\",lit(totalRowGroups))\n","\n","\n"," // Fix data types (rowGroupsDF.printSchema())\n"," parquetFilesDF = parquetFilesDF.withColumn(\"Timestamp\",col(\"Timestamp\").cast(TimestampType))\n"," rowGroupsDF = rowGroupsDF.withColumn(\"Timestamp\",col(\"Timestamp\").cast(TimestampType))\n"," columnChunksDF = columnChunksDF.withColumn(\"Timestamp\",col(\"Timestamp\").cast(TimestampType))\n"," columnsDF = columnsDF.withColumn(\"Timestamp\",col(\"Timestamp\").cast(TimestampType))\n","\n"," // Display Super Summary\n"," val ParquetFiles = parquetFilesDF.count()\n"," val MaxRowgroupRowCount = rowGroupsDF.agg(max(\"Rowcount\")).first().getLong(0)\n"," val MinRowgroupRowCount = rowGroupsDF.agg(min(\"Rowcount\")).first().getLong(0)\n"," val HasVOrder = parquetFilesDF.agg(min(\"vorder_enabled\")).first()\n"," val formatter = java.text.NumberFormat.getIntegerInstance(locale)\n"," //val TableSizeFromColumnChunks = columnChunksDF.agg(sum(\"TotalSize\")).first().getLong(0)\n"," val tableSizeFromRowGroups = rowGroupsDF.agg(sum(\"CompressedSize\")).first().getLong(0)\n"," \n"," displayHTML(s\"\"\"\n"," <table>\n"," <tr>\n"," <td colspan=\"2\"><h1>Super Summary</h1></td>\n"," </tr>\n"," <tr>\n"," <td><b>Total Rows</b></td>\n"," <td>${formatter.format(totalRows)}</td>\n"," </tr>\n"," <tr>\n"," <td><b>Has V-ORDER</b></td>\n"," <td>$HasVOrder</td>\n"," </tr>\n"," <tr>\n"," <td><b>zOrderBy</b></td>\n"," <td>$zOrderBy</td>\n"," </tr>\n"," <tr>\n"," <td><b>Total Size in Bytes (RowGroups)</b></td>\n"," <td>${formatter.format(tableSizeFromRowGroups)}</td>\n"," </tr>\n"," <tr>\n"," <td><b>Total Parquet Files</b></td>\n"," <td>${formatter.format(ParquetFiles)}</td>\n"," </tr>\n"," <tr>\n"," <td><b>Total RowGroups</b></td>\n"," <td>${formatter.format(totalRowGroups)}</td>\n"," </tr>\n"," <tr>\n"," <td><b>Max rows per RowGroups</b></td>\n"," <td>${formatter.format(MaxRowgroupRowCount)}</td>\n"," </tr>\n"," <tr>\n"," <td><b>Min rows per RowGroups</b></td>\n"," <td>${formatter.format(MinRowgroupRowCount)}</td>\n"," </tr>\n"," <tr>\n"," <td><b>Avg rows per RowGroups</b></td>\n"," <td>${formatter.format(totalRows/totalRowGroups)}</td>\n"," </tr>\n"," </table>\n"," \"\"\")\n","\n","\n"," // Display Dataframes to screen\n"," if(printToScreen)\n"," {\n"," displayHTML(\"<h1>Parquet Files</h1>\")\n"," display(parquetFilesDF)\n"," displayHTML(\"<h1>Rowgroups</h1>Focus on rowcount per rowgroup. One rowgroup = one segment in Fabric semantic model\")\n"," display(rowGroupsDF)\n"," displayHTML(\"<h1>Column Chunks</h1> Blocks of columns inside each Rowgroup across all Parquet files\")\n"," display(columnChunksDF)\n"," displayHTML(\"<h1>Delta table Columns</h1>\")\n"," display(columnsDF)\n"," }\n","\n"," // Save Dataframe to Lakehouse files\n"," if(!skipWriteToTable)\n"," {\n"," parquetFilesDF\n"," .write\n"," .mode(overwriteOrAppend)\n"," .option(\"overwriteSchema\", \"true\")\n"," .format(\"delta\")\n"," .save(s\"./Tables/${deltaTableSchemaFile}1_DeltaAnalyzerOutput_parquetFiles\")\n","\n"," rowGroupsDF\n"," .write\n"," .mode(overwriteOrAppend)\n"," .option(\"overwriteSchema\", \"true\")\n"," .format(\"delta\")\n"," .save(s\"./Tables/${deltaTableSchemaFile}2_DeltaAnalyzerOutput_rowGroups\")\n","\n"," columnChunksDF\n"," .write\n"," .mode(overwriteOrAppend)\n"," .option(\"overwriteSchema\", \"true\")\n"," .format(\"delta\")\n"," .save(s\"./Tables/${deltaTableSchemaFile}3_DeltaAnalyzerOutput_columnChunks\")\n","\n"," columnsDF\n"," .write\n"," .mode(overwriteOrAppend)\n"," .option(\"overwriteSchema\", \"true\")\n"," .format(\"delta\")\n"," .save(s\"./Tables/${deltaTableSchemaFile}4_DeltaAnalyzerOutput_columns\")\n"," }"]}],"metadata":{"kernelspec":{"display_name":"python","name":"synapse_pyspark"},"language_info":{"name":"python"},"microsoft":{"host":{"trident":{"lakehouse":{"default_lakehouse":"eb69df7f-ffd3-4b6f-842c-f62eaa33f107","known_lakehouses":"[{\"id\":\"eb69df7f-ffd3-4b6f-842c-f62eaa33f107\"}]"}}},"language":"python","ms_spell_check":{"ms_spell_check_language":"en"}},"notebook_environment":{},"nteract":{"version":"nteract-front-end@1.0.0"},"save_output":true,"spark_compute":{"compute_id":"/trident/default","session_options":{"conf":{},"enableDebugMode":false}},"synapse_widget":{"state":{},"version":"0.1"},"trident":{"lakehouse":{"default_lakehouse":"eb69df7f-ffd3-4b6f-842c-f62eaa33f107","default_lakehouse_name":"Big_Demo_DB","default_lakehouse_workspace_id":"ec3384e9-4d95-435c-b19b-816b0ebd0fb1","known_lakehouses":[{"id":"eb69df7f-ffd3-4b6f-842c-f62eaa33f107"}]}},"widgets":{}},"nbformat":4,"nbformat_minor":5}
|