Updated the FileCopy UDOs
- Allow parallel scanning - guard against reading into the overscan area
This commit is contained in:
Родитель
6b4ddc57fe
Коммит
639ca90bec
|
@ -0,0 +1,34 @@
|
|||
|
||||
Microsoft Visual Studio Solution File, Format Version 12.00
|
||||
# Visual Studio 2013
|
||||
VisualStudioVersion = 12.0.31101.0
|
||||
MinimumVisualStudioVersion = 10.0.40219.1
|
||||
Project("{182E2583-ECAD-465B-BB50-91101D7C24CE}") = "FileCopyUDOs", "FileCopyUDOs\FileCopyUDOs.usqlproj", "{12AA77E5-BD0A-491D-B1A5-6C95F54011CA}"
|
||||
EndProject
|
||||
Global
|
||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution
|
||||
Debug|Any CPU = Debug|Any CPU
|
||||
Debug|x64 = Debug|x64
|
||||
Debug|x86 = Debug|x86
|
||||
Release|Any CPU = Release|Any CPU
|
||||
Release|x64 = Release|x64
|
||||
Release|x86 = Release|x86
|
||||
EndGlobalSection
|
||||
GlobalSection(ProjectConfigurationPlatforms) = postSolution
|
||||
{12AA77E5-BD0A-491D-B1A5-6C95F54011CA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
|
||||
{12AA77E5-BD0A-491D-B1A5-6C95F54011CA}.Debug|Any CPU.Build.0 = Debug|Any CPU
|
||||
{12AA77E5-BD0A-491D-B1A5-6C95F54011CA}.Debug|x64.ActiveCfg = Debug|x64
|
||||
{12AA77E5-BD0A-491D-B1A5-6C95F54011CA}.Debug|x64.Build.0 = Debug|x64
|
||||
{12AA77E5-BD0A-491D-B1A5-6C95F54011CA}.Debug|x86.ActiveCfg = Debug|x86
|
||||
{12AA77E5-BD0A-491D-B1A5-6C95F54011CA}.Debug|x86.Build.0 = Debug|x86
|
||||
{12AA77E5-BD0A-491D-B1A5-6C95F54011CA}.Release|Any CPU.ActiveCfg = Release|Any CPU
|
||||
{12AA77E5-BD0A-491D-B1A5-6C95F54011CA}.Release|Any CPU.Build.0 = Release|Any CPU
|
||||
{12AA77E5-BD0A-491D-B1A5-6C95F54011CA}.Release|x64.ActiveCfg = Release|x64
|
||||
{12AA77E5-BD0A-491D-B1A5-6C95F54011CA}.Release|x64.Build.0 = Release|x64
|
||||
{12AA77E5-BD0A-491D-B1A5-6C95F54011CA}.Release|x86.ActiveCfg = Release|x86
|
||||
{12AA77E5-BD0A-491D-B1A5-6C95F54011CA}.Release|x86.Build.0 = Release|x86
|
||||
EndGlobalSection
|
||||
GlobalSection(SolutionProperties) = preSolution
|
||||
HideSolutionNode = FALSE
|
||||
EndGlobalSection
|
||||
EndGlobal
|
|
@ -0,0 +1,41 @@
|
|||
<?xml version="1.0" encoding="utf-8"?>
|
||||
<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
|
||||
<PropertyGroup>
|
||||
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
|
||||
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
|
||||
<SchemaVersion>2.0</SchemaVersion>
|
||||
<ProjectGuid>12aa77e5-bd0a-491d-b1a5-6c95f54011ca</ProjectGuid>
|
||||
<OutputType>File</OutputType>
|
||||
<AssemblyName>Algebra.xml</AssemblyName>
|
||||
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
|
||||
<Name>FileCopyUDOs</Name>
|
||||
<RootNamespace>FileCopyUDOs</RootNamespace>
|
||||
<RuntimeVersion>default</RuntimeVersion>
|
||||
<OutputStreamPath>C:\LocalRunDataRoot</OutputStreamPath>
|
||||
</PropertyGroup>
|
||||
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
|
||||
<DebugSymbols>true</DebugSymbols>
|
||||
<OutputPath>bin\Debug\</OutputPath>
|
||||
</PropertyGroup>
|
||||
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
|
||||
<DebugSymbols>false</DebugSymbols>
|
||||
<OutputPath>bin\Release\</OutputPath>
|
||||
</PropertyGroup>
|
||||
<ItemGroup>
|
||||
<Script Include="Script.usql">
|
||||
<UseLocalComputeAccount>false</UseLocalComputeAccount>
|
||||
<LocalComputeAccount>localcomputeaccount</LocalComputeAccount>
|
||||
<LocalDatabase>master</LocalDatabase>
|
||||
<LocalSchema>dbo</LocalSchema>
|
||||
<ClusterComputeAccount>mryskona</ClusterComputeAccount>
|
||||
<ClusterDatabase>master</ClusterDatabase>
|
||||
<ClusterSchema>dbo</ClusterSchema>
|
||||
</Script>
|
||||
<ScriptCode Include="Script.usql.cs">
|
||||
<DependentUpon>Script.usql</DependentUpon>
|
||||
</ScriptCode>
|
||||
</ItemGroup>
|
||||
<Import Project="$(AppData)\Microsoft\DataLake\MsBuild\1.0\Usql.targets" />
|
||||
<PropertyGroup>
|
||||
</PropertyGroup>
|
||||
</Project>
|
|
@ -0,0 +1,12 @@
|
|||
@x =
|
||||
EXTRACT block_no long,
|
||||
block_size long,
|
||||
block byte[]
|
||||
FROM "wasb://private-preview@mryskona2/PrivatePreview/GOPR0056.MP4"
|
||||
// FROM "/Movies/GOPR0056.MP4"
|
||||
USING new FileCopy.ReadFile();
|
||||
|
||||
OUTPUT @x
|
||||
TO "/output/turtlecave.MP4"
|
||||
ORDER BY block_no
|
||||
USING new FileCopy.WriteFile();
|
|
@ -0,0 +1,75 @@
|
|||
using Microsoft.Analytics.Interfaces;
|
||||
using Microsoft.Analytics.Types.Sql;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.IO;
|
||||
|
||||
namespace FileCopy
|
||||
{
|
||||
[SqlUserDefinedExtractor]
|
||||
public class ReadFile : IExtractor
|
||||
{
|
||||
const int _max_blocksz = 4194304;
|
||||
|
||||
private int _blocksz;
|
||||
public ReadFile(int block_size = 2097152)
|
||||
{
|
||||
if (block_size > _max_blocksz)
|
||||
{
|
||||
throw new Exception(string.Format("Specified block size of {0} bytes exceeds the maximal limit of {1} bytes. Please specify a lower number", block_size, _max_blocksz));
|
||||
}
|
||||
this._blocksz = block_size;
|
||||
}
|
||||
public override IEnumerable<IRow> Extract(IUnstructuredReader input, IUpdatableRow outputrow)
|
||||
{
|
||||
long length = input.Length;
|
||||
long start = input.Start;
|
||||
Stream baseStream = input.BaseStream;
|
||||
byte[] readBuffer = new byte[this._blocksz];
|
||||
|
||||
while (length > 0)
|
||||
{
|
||||
var globalPosition = start + baseStream.Position;
|
||||
|
||||
// We need to make sure that we read block size or only the last remainder and not into the 4MB overscan area in the next extent block that is provided to handle row-oriented processing
|
||||
var readsize = (int)Math.Min(this._blocksz, length); // Cast from (long) to (int) is safe since Min will never give a value larger than (int) _blocksz.
|
||||
|
||||
Array.Resize<byte>(ref readBuffer, readsize); // Make sure buffer is large enough. Assumes that Resize only resizes if needed.
|
||||
|
||||
var bytesRead = baseStream.Read(readBuffer, 0, readsize);
|
||||
if (bytesRead <= 0 || bytesRead > readsize)
|
||||
{
|
||||
throw new Exception(string.Format("Unexpected amount of {2} bytes was read starting at global stream position {1}. Expected to read {0} bytes.",
|
||||
readsize, globalPosition, bytesRead));
|
||||
}
|
||||
|
||||
Array.Resize<byte>(ref readBuffer, bytesRead);
|
||||
length -= bytesRead;
|
||||
|
||||
outputrow.Set<long>(0, globalPosition); // global position of the block
|
||||
outputrow.Set<long>(1, bytesRead); // block size
|
||||
outputrow.Set<byte[]>(2, readBuffer); // block data
|
||||
yield return outputrow.AsReadOnly();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
[SqlUserDefinedOutputter]
|
||||
public class WriteFile : IOutputter
|
||||
{
|
||||
public override void Output(IRow row, IUnstructuredWriter output)
|
||||
{
|
||||
ISchema schema = row.Schema;
|
||||
for (int i = 0; i < schema.Count; i++)
|
||||
{
|
||||
object obj = row.Get<object>(i);
|
||||
if (obj is byte[])
|
||||
{
|
||||
output.BaseStream.Write((byte[])obj, 0, ((byte[])obj).Length);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Загрузка…
Ссылка в новой задаче