Merge pull request #234 from Unity-Technologies/siyaoH/1.17/stream

Siyao h/1.17/stream
This commit is contained in:
Xingwei Zhu 2021-08-23 15:28:10 +08:00 коммит произвёл GitHub
Родитель ef4d020fa2 3e0fe54343
Коммит 381f03b39d
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
39 изменённых файлов: 6130 добавлений и 2 удалений

Просмотреть файл

@ -0,0 +1,3 @@
fileFormatVersion: 2
guid: dbf70f14bb884570b4acb979d5750f06
timeCreated: 1629426723

Просмотреть файл

@ -0,0 +1,380 @@
using System;
using System.Collections.Generic;
using Unity.UIWidgets.async;
using Unity.UIWidgets.Editor;
using Unity.UIWidgets.foundation;
using Unity.UIWidgets.widgets;
using UnityEditor;
using UnityEngine;
namespace Editor.Tests.Stream
{
public class TestMain : UIWidgetsEditorPanel
{
[MenuItem("UIWidgets/Test/Stream")]
public static void StartTest()
{
CreateWindow<TestMain>();
}
protected override void main()
{
ui_.runApp(new TestApp());
}
public class TestApp : StatelessWidget
{
/**
* Test Stream.periodic
*/
private void test1()
{
var myStream = Stream<int>.periodic(new TimeSpan(0,0,0,1), t =>
{
Debug.Log("lalalala");
return t;
});
myStream.listen(val =>
{
Debug.Log("value = " + val);
});
}
/**
* Test ErrorHandler
*/
private void test2()
{
IEnumerable<int> count()
{
for (int i = 1; i < 5; i++)
{
if (i == 4)
{
throw new Exception("Intentional exception");
}
else
{
yield return i;
}
}
}
void sumStream(Stream<int> stream, Action<int> onDone)
{
var sum = 0;
stream.listen(val =>
{
sum += val;
Debug.Log("sum stream = " + sum);
},
onDone: () =>
{
onDone.Invoke(sum);
},
onError: (e, stack) =>
{
Debug.Log("error at " + stack);
});
}
var myStream = Stream<int>.fromIterable(count());
sumStream(myStream, val =>
{
Debug.Log("sum = " + (int)val);
});
}
/**
* Test OnDone/OnData/Stream.fromIterable
*/
private void test3()
{
IEnumerable<int> count()
{
for (int i = 1; i < 5; i++)
{
yield return i;
}
}
void sumStream(Stream<int> stream, Action<int> onDone)
{
var sum = 0;
stream.listen(val =>
{
sum += val;
Debug.Log("sum stream = " + sum);
},
onDone: () =>
{
onDone.Invoke(sum);
},
onError: (e, stack) =>
{
Debug.Log("error at " + stack);
});
}
var myStream = Stream<int>.fromIterable(count());
sumStream(myStream, val =>
{
Debug.Log("sum = " + (int)val);
});
}
/**
* Test streamTransform Where
*/
private void test4()
{
Stream<int> numbers = Stream<int>.fromIterable(new List<int> {0, 1, 2, 3}).where(n => n % 2 == 0);
numbers.listen(n =>
{
Debug.Log("num = " + n);
});
}
/**
* Test Stream.take
*/
private void test5()
{
Stream<int> numbers = Stream<int>.periodic(new TimeSpan(0, 0, 0, 1), t => t).take(3);
numbers.listen(n =>
{
Debug.Log("num = " + n);
}, onDone: () =>
{
Debug.Log("periodic finished");
});
}
/**
* Test Stream.asBroadcastStream
*/
private void test6()
{
Stream<int> numbers = Stream<int>.periodic(new TimeSpan(0, 0, 0, 1), t => t).asBroadcastStream().take(10);
var subscription1 = numbers.listen((data) =>
{
Debug.Log("Sub1: " + data);
});
var subscription2 = numbers.listen((data) =>
{
Debug.Log("Sub2: " + data);
if (data == 3)
{
subscription1.cancel();
}
});
}
/**
* Test listen( ..., cancelOnError = true)
*/
private void test7()
{
Stream<int> numbers = Stream<int>.periodic(new TimeSpan(0, 0, 0, 1), t =>
{
if (t == 2)
{
throw new Exception("LaLaLa");
}
return t;
}).take(5);
void sumStream(Stream<int> stream, Action<int> onDone)
{
var sum = 0;
stream.listen(val =>
{
sum += val;
Debug.Log("sum stream = " + sum);
},
onDone: () =>
{
onDone.Invoke(sum);
},
onError: (e, stack) =>
{
Debug.Log("error at " + stack);
},
cancelOnError: true);
}
sumStream(numbers, val =>
{
Debug.Log("sum = " + (int)val);
});
}
/**
* Test subscription.pause/resume/cancel
*/
private void test8()
{
Stream<int> numbers = Stream<int>.periodic(new TimeSpan(0, 0, 0, 1), t => t).take(3);
var subscription = numbers.listen(n =>
{
Debug.Log("num = " + n);
}, onDone: () =>
{
Debug.Log("periodic finished");
});
Future.delayed(new TimeSpan(0, 0, 0, 0, 1200), () =>
{
Debug.Log("pause >>>>");
subscription.pause();
return FutureOr.nil;
}).then(v =>
{
Future.delayed(new TimeSpan(0, 0, 0, 5), () =>
{
Debug.Log("resume >>>>");
subscription.resume();
return FutureOr.nil;
}).then(v2 =>
{
Future.delayed(new TimeSpan(0, 0, 0, 1), () =>
{
Debug.Log("cancel >>>>");
subscription.cancel();
return FutureOr.nil;
});
});
});
}
/**
* Test Stream.map, distinct
*/
private void test9()
{
string convert(int number)
{
return "string " + number;
}
bool stringEqual(string s1, string s2)
{
return s1 == s2;
}
Stream<string> numbers = Stream<int>.fromIterable(new List<int> {0, 1, 2, 2, 3, 4, 5, 5}).map(convert).distinct(stringEqual);
numbers.listen(val =>
{
Debug.Log("val = " + val);
});
}
private void test10()
{
Stream<int> numbers = Stream<int>.fromIterable(new List<int> {0, 1, 2, 2, 3, 4, 5, 5});
var transformer = StreamTransformer<int, string>.fromHandlers(handleData: (val, sink) =>
{
sink.add("My number is " + val);
});
numbers.transform(transformer).listen(val =>
{
Debug.Log("val = " + val);
});
}
/**
* Test StreamController
*/
private void test11()
{
StreamController<float> controller = StreamController<float>.create();
Stream<float> stream = controller.stream;
var value = 1f;
var timer = Timer.periodic(new TimeSpan(0, 0, 0, 1), (v) =>
{
value = value * 1.2f;
controller.add(value);
return null;
});
stream.listen((val) =>
{
if (val >= 2)
{
timer.cancel();
}
Debug.Log("value = " + val);
});
}
/**
* Test Stream.fromFuture
*/
private void test12()
{
Future<string> getData()
{
return Future<string>.delayed(new TimeSpan(0, 0, 0, 1), () =>
{
return "My String from Future";
}).to<string>();
}
var stream = Stream<string>.fromFuture(getData());
stream.listen(val =>
{
Debug.Log("val = " + val);
});
}
/**
* Test Stream.multi
*/
private void test13()
{
var log = new List<string>();
var index = 1;
var multi = Stream<List<int>>.multi(c =>
{
var id = index++;
log.Add($"{id}");
for (var i = 0; i < id + 1; i++)
{
c.add(new List<int>{id, i});
}
c.close();
});
void logList(List<int> l)
{
log.Add($"{l.first()}-{l.last()}");
}
Future.wait<object>(new List<Future> {multi.forEach(logList), multi.forEach(logList)}).whenComplete(
() =>
{
foreach (var str in log)
{
Debug.Log(str);
}
}
);
}
public override Widget build(BuildContext context)
{
test13();
return new Container();
}
}
}
}

Просмотреть файл

@ -0,0 +1,3 @@
fileFormatVersion: 2
guid: 7261f9b43495443a8fa71bc128684342
timeCreated: 1629426745

Просмотреть файл

@ -0,0 +1,17 @@
{
"name": "UIWidgetsTestStream",
"references": [
"Unity.UIWidgets",
"Unity.UIWidgets.Editor"
],
"optionalUnityReferences": [],
"includePlatforms": [
"Editor"
],
"excludePlatforms": [],
"allowUnsafeCode": false,
"overrideReferences": false,
"precompiledReferences": [],
"autoReferenced": true,
"defineConstraints": []
}

Просмотреть файл

@ -0,0 +1,3 @@
fileFormatVersion: 2
guid: cc0319ac3dfa498d91ac8292f62ace4f
timeCreated: 1629426842

Просмотреть файл

@ -0,0 +1,8 @@
fileFormatVersion: 2
guid: efe638edf4624274957047c61091ef15
folderAsset: yes
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

Просмотреть файл

@ -0,0 +1,581 @@
%YAML 1.1
%TAG !u! tag:unity3d.com,2011:
--- !u!29 &1
OcclusionCullingSettings:
m_ObjectHideFlags: 0
serializedVersion: 2
m_OcclusionBakeSettings:
smallestOccluder: 5
smallestHole: 0.25
backfaceThreshold: 100
m_SceneGUID: 00000000000000000000000000000000
m_OcclusionCullingData: {fileID: 0}
--- !u!104 &2
RenderSettings:
m_ObjectHideFlags: 0
serializedVersion: 9
m_Fog: 0
m_FogColor: {r: 0.5, g: 0.5, b: 0.5, a: 1}
m_FogMode: 3
m_FogDensity: 0.01
m_LinearFogStart: 0
m_LinearFogEnd: 300
m_AmbientSkyColor: {r: 0.212, g: 0.227, b: 0.259, a: 1}
m_AmbientEquatorColor: {r: 0.114, g: 0.125, b: 0.133, a: 1}
m_AmbientGroundColor: {r: 0.047, g: 0.043, b: 0.035, a: 1}
m_AmbientIntensity: 1
m_AmbientMode: 0
m_SubtractiveShadowColor: {r: 0.42, g: 0.478, b: 0.627, a: 1}
m_SkyboxMaterial: {fileID: 10304, guid: 0000000000000000f000000000000000, type: 0}
m_HaloStrength: 0.5
m_FlareStrength: 1
m_FlareFadeSpeed: 3
m_HaloTexture: {fileID: 0}
m_SpotCookie: {fileID: 10001, guid: 0000000000000000e000000000000000, type: 0}
m_DefaultReflectionMode: 0
m_DefaultReflectionResolution: 128
m_ReflectionBounces: 1
m_ReflectionIntensity: 1
m_CustomReflection: {fileID: 0}
m_Sun: {fileID: 0}
m_IndirectSpecularColor: {r: 0.44657874, g: 0.49641275, b: 0.5748172, a: 1}
m_UseRadianceAmbientProbe: 0
--- !u!157 &3
LightmapSettings:
m_ObjectHideFlags: 0
serializedVersion: 11
m_GIWorkflowMode: 1
m_GISettings:
serializedVersion: 2
m_BounceScale: 1
m_IndirectOutputScale: 1
m_AlbedoBoost: 1
m_EnvironmentLightingMode: 0
m_EnableBakedLightmaps: 1
m_EnableRealtimeLightmaps: 0
m_LightmapEditorSettings:
serializedVersion: 12
m_Resolution: 2
m_BakeResolution: 40
m_AtlasSize: 1024
m_AO: 0
m_AOMaxDistance: 1
m_CompAOExponent: 1
m_CompAOExponentDirect: 0
m_ExtractAmbientOcclusion: 0
m_Padding: 2
m_LightmapParameters: {fileID: 0}
m_LightmapsBakeMode: 1
m_TextureCompression: 1
m_FinalGather: 0
m_FinalGatherFiltering: 1
m_FinalGatherRayCount: 256
m_ReflectionCompression: 2
m_MixedBakeMode: 2
m_BakeBackend: 1
m_PVRSampling: 1
m_PVRDirectSampleCount: 32
m_PVRSampleCount: 512
m_PVRBounces: 2
m_PVREnvironmentSampleCount: 256
m_PVREnvironmentReferencePointCount: 2048
m_PVRFilteringMode: 1
m_PVRDenoiserTypeDirect: 1
m_PVRDenoiserTypeIndirect: 1
m_PVRDenoiserTypeAO: 1
m_PVRFilterTypeDirect: 0
m_PVRFilterTypeIndirect: 0
m_PVRFilterTypeAO: 0
m_PVREnvironmentMIS: 1
m_PVRCulling: 1
m_PVRFilteringGaussRadiusDirect: 1
m_PVRFilteringGaussRadiusIndirect: 5
m_PVRFilteringGaussRadiusAO: 2
m_PVRFilteringAtrousPositionSigmaDirect: 0.5
m_PVRFilteringAtrousPositionSigmaIndirect: 2
m_PVRFilteringAtrousPositionSigmaAO: 1
m_ExportTrainingData: 0
m_TrainingDataDestination: TrainingData
m_LightProbeSampleCountMultiplier: 4
m_LightingDataAsset: {fileID: 0}
m_UseShadowmask: 1
--- !u!196 &4
NavMeshSettings:
serializedVersion: 2
m_ObjectHideFlags: 0
m_BuildSettings:
serializedVersion: 2
agentTypeID: 0
agentRadius: 0.5
agentHeight: 2
agentSlope: 45
agentClimb: 0.4
ledgeDropHeight: 0
maxJumpAcrossDistance: 0
minRegionArea: 2
manualCellSize: 0
cellSize: 0.16666667
manualTileSize: 0
tileSize: 256
accuratePlacement: 0
debug:
m_Flags: 0
m_NavMeshData: {fileID: 0}
--- !u!28 &601203222
Texture2D:
m_ObjectHideFlags: 0
m_CorrespondingSourceObject: {fileID: 0}
m_PrefabInstance: {fileID: 0}
m_PrefabAsset: {fileID: 0}
m_Name:
m_ImageContentsHash:
serializedVersion: 2
Hash: 00000000000000000000000000000000
m_ForcedFallbackFormat: 4
m_DownscaleFallback: 0
serializedVersion: 3
m_Width: 0
m_Height: 0
m_CompleteImageSize: 0
m_TextureFormat: 0
m_MipCount: 1
m_IsReadable: 1
m_IgnoreMasterTextureLimit: 0
m_IsPreProcessed: 0
m_StreamingMipmaps: 0
m_StreamingMipmapsPriority: 0
m_AlphaIsTransparency: 0
m_ImageCount: 0
m_TextureDimension: 2
m_TextureSettings:
serializedVersion: 2
m_FilterMode: 1
m_Aniso: 1
m_MipBias: 0
m_WrapU: 0
m_WrapV: 0
m_WrapW: 0
m_LightmapFormat: 0
m_ColorSpace: 0
image data: 0
_typelessdata:
m_StreamData:
offset: 0
size: 0
path:
m_OriginalWidth: 0
m_OriginalHeight: 0
m_OriginalAssetGuid: 00000000000000000000000000000000
--- !u!1 &764046566
GameObject:
m_ObjectHideFlags: 0
m_CorrespondingSourceObject: {fileID: 0}
m_PrefabInstance: {fileID: 0}
m_PrefabAsset: {fileID: 0}
serializedVersion: 6
m_Component:
- component: {fileID: 764046568}
- component: {fileID: 764046567}
m_Layer: 0
m_Name: Directional Light
m_TagString: Untagged
m_Icon: {fileID: 0}
m_NavMeshLayer: 0
m_StaticEditorFlags: 0
m_IsActive: 1
--- !u!108 &764046567
Light:
m_ObjectHideFlags: 0
m_CorrespondingSourceObject: {fileID: 0}
m_PrefabInstance: {fileID: 0}
m_PrefabAsset: {fileID: 0}
m_GameObject: {fileID: 764046566}
m_Enabled: 1
serializedVersion: 10
m_Type: 1
m_Shape: 0
m_Color: {r: 1, g: 1, b: 1, a: 1}
m_Intensity: 1
m_Range: 10
m_SpotAngle: 30
m_InnerSpotAngle: 21.80208
m_CookieSize: 10
m_Shadows:
m_Type: 2
m_Resolution: -1
m_CustomResolution: -1
m_Strength: 1
m_Bias: 0.05
m_NormalBias: 0.4
m_NearPlane: 0.2
m_CullingMatrixOverride:
e00: 1
e01: 0
e02: 0
e03: 0
e10: 0
e11: 1
e12: 0
e13: 0
e20: 0
e21: 0
e22: 1
e23: 0
e30: 0
e31: 0
e32: 0
e33: 1
m_UseCullingMatrixOverride: 0
m_Cookie: {fileID: 0}
m_DrawHalo: 0
m_Flare: {fileID: 0}
m_RenderMode: 0
m_CullingMask:
serializedVersion: 2
m_Bits: 4294967295
m_RenderingLayerMask: 1
m_Lightmapping: 4
m_LightShadowCasterMode: 0
m_AreaSize: {x: 1, y: 1}
m_BounceIntensity: 1
m_ColorTemperature: 6570
m_UseColorTemperature: 0
m_BoundingSphereOverride: {x: 0, y: 0, z: 0, w: 0}
m_UseBoundingSphereOverride: 0
m_ShadowRadius: 0
m_ShadowAngle: 0
--- !u!4 &764046568
Transform:
m_ObjectHideFlags: 0
m_CorrespondingSourceObject: {fileID: 0}
m_PrefabInstance: {fileID: 0}
m_PrefabAsset: {fileID: 0}
m_GameObject: {fileID: 764046566}
m_LocalRotation: {x: 0.40821788, y: -0.23456968, z: 0.10938163, w: 0.8754261}
m_LocalPosition: {x: 0, y: 3, z: 0}
m_LocalScale: {x: 1, y: 1, z: 1}
m_Children: []
m_Father: {fileID: 0}
m_RootOrder: 1
m_LocalEulerAnglesHint: {x: 50, y: -30, z: 0}
--- !u!1 &847097468
GameObject:
m_ObjectHideFlags: 0
m_CorrespondingSourceObject: {fileID: 0}
m_PrefabInstance: {fileID: 0}
m_PrefabAsset: {fileID: 0}
serializedVersion: 6
m_Component:
- component: {fileID: 847097469}
- component: {fileID: 847097471}
- component: {fileID: 847097470}
m_Layer: 5
m_Name: RawImage
m_TagString: Untagged
m_Icon: {fileID: 0}
m_NavMeshLayer: 0
m_StaticEditorFlags: 0
m_IsActive: 1
--- !u!224 &847097469
RectTransform:
m_ObjectHideFlags: 0
m_CorrespondingSourceObject: {fileID: 0}
m_PrefabInstance: {fileID: 0}
m_PrefabAsset: {fileID: 0}
m_GameObject: {fileID: 847097468}
m_LocalRotation: {x: 0, y: 0, z: 0, w: 1}
m_LocalPosition: {x: 0, y: 0, z: 0}
m_LocalScale: {x: 1, y: 1, z: 1}
m_Children: []
m_Father: {fileID: 2122288190}
m_RootOrder: 0
m_LocalEulerAnglesHint: {x: 0, y: 0, z: 0}
m_AnchorMin: {x: 0, y: 0}
m_AnchorMax: {x: 1, y: 1}
m_AnchoredPosition: {x: 0, y: 0}
m_SizeDelta: {x: 0, y: 0}
m_Pivot: {x: 0.5, y: 0.5}
--- !u!114 &847097470
MonoBehaviour:
m_ObjectHideFlags: 0
m_CorrespondingSourceObject: {fileID: 0}
m_PrefabInstance: {fileID: 0}
m_PrefabAsset: {fileID: 0}
m_GameObject: {fileID: 847097468}
m_Enabled: 1
m_EditorHideFlags: 0
m_Script: {fileID: 11500000, guid: a20abb9224fb842018ca8155e9fec694, type: 3}
m_Name:
m_EditorClassIdentifier:
m_Material: {fileID: 0}
m_Color: {r: 1, g: 1, b: 1, a: 1}
m_RaycastTarget: 1
m_Maskable: 1
m_OnCullStateChanged:
m_PersistentCalls:
m_Calls: []
m_Texture: {fileID: 601203222}
m_UVRect:
serializedVersion: 2
x: 0
y: 0
width: 1
height: 1
hardwareAntiAliasing: 0
fonts: []
--- !u!222 &847097471
CanvasRenderer:
m_ObjectHideFlags: 0
m_CorrespondingSourceObject: {fileID: 0}
m_PrefabInstance: {fileID: 0}
m_PrefabAsset: {fileID: 0}
m_GameObject: {fileID: 847097468}
m_CullTransparentMesh: 0
--- !u!1 &1548023132
GameObject:
m_ObjectHideFlags: 0
m_CorrespondingSourceObject: {fileID: 0}
m_PrefabInstance: {fileID: 0}
m_PrefabAsset: {fileID: 0}
serializedVersion: 6
m_Component:
- component: {fileID: 1548023135}
- component: {fileID: 1548023134}
- component: {fileID: 1548023133}
m_Layer: 0
m_Name: Main Camera
m_TagString: MainCamera
m_Icon: {fileID: 0}
m_NavMeshLayer: 0
m_StaticEditorFlags: 0
m_IsActive: 1
--- !u!81 &1548023133
AudioListener:
m_ObjectHideFlags: 0
m_CorrespondingSourceObject: {fileID: 0}
m_PrefabInstance: {fileID: 0}
m_PrefabAsset: {fileID: 0}
m_GameObject: {fileID: 1548023132}
m_Enabled: 1
--- !u!20 &1548023134
Camera:
m_ObjectHideFlags: 0
m_CorrespondingSourceObject: {fileID: 0}
m_PrefabInstance: {fileID: 0}
m_PrefabAsset: {fileID: 0}
m_GameObject: {fileID: 1548023132}
m_Enabled: 1
serializedVersion: 2
m_ClearFlags: 1
m_BackGroundColor: {r: 0.19215687, g: 0.3019608, b: 0.4745098, a: 0}
m_projectionMatrixMode: 1
m_GateFitMode: 2
m_FOVAxisMode: 0
m_SensorSize: {x: 36, y: 24}
m_LensShift: {x: 0, y: 0}
m_FocalLength: 50
m_NormalizedViewPortRect:
serializedVersion: 2
x: 0
y: 0
width: 1
height: 1
near clip plane: 0.3
far clip plane: 1000
field of view: 60
orthographic: 1
orthographic size: 5.6
m_Depth: -1
m_CullingMask:
serializedVersion: 2
m_Bits: 4294967295
m_RenderingPath: -1
m_TargetTexture: {fileID: 0}
m_TargetDisplay: 0
m_TargetEye: 3
m_HDR: 1
m_AllowMSAA: 1
m_AllowDynamicResolution: 0
m_ForceIntoRT: 0
m_OcclusionCulling: 1
m_StereoConvergence: 10
m_StereoSeparation: 0.022
--- !u!4 &1548023135
Transform:
m_ObjectHideFlags: 0
m_CorrespondingSourceObject: {fileID: 0}
m_PrefabInstance: {fileID: 0}
m_PrefabAsset: {fileID: 0}
m_GameObject: {fileID: 1548023132}
m_LocalRotation: {x: 0, y: 0.7071068, z: -0.7071068, w: 0}
m_LocalPosition: {x: 0, y: 1, z: 0.32}
m_LocalScale: {x: 1, y: 1, z: 1}
m_Children: []
m_Father: {fileID: 0}
m_RootOrder: 0
m_LocalEulerAnglesHint: {x: 90, y: 180, z: 0}
--- !u!1 &1900497009
GameObject:
m_ObjectHideFlags: 0
m_CorrespondingSourceObject: {fileID: 0}
m_PrefabInstance: {fileID: 0}
m_PrefabAsset: {fileID: 0}
serializedVersion: 6
m_Component:
- component: {fileID: 1900497012}
- component: {fileID: 1900497011}
- component: {fileID: 1900497010}
m_Layer: 0
m_Name: EventSystem
m_TagString: Untagged
m_Icon: {fileID: 0}
m_NavMeshLayer: 0
m_StaticEditorFlags: 0
m_IsActive: 1
--- !u!114 &1900497010
MonoBehaviour:
m_ObjectHideFlags: 0
m_CorrespondingSourceObject: {fileID: 0}
m_PrefabInstance: {fileID: 0}
m_PrefabAsset: {fileID: 0}
m_GameObject: {fileID: 1900497009}
m_Enabled: 1
m_EditorHideFlags: 0
m_Script: {fileID: 11500000, guid: 4f231c4fb786f3946a6b90b886c48677, type: 3}
m_Name:
m_EditorClassIdentifier:
m_HorizontalAxis: Horizontal
m_VerticalAxis: Vertical
m_SubmitButton: Submit
m_CancelButton: Cancel
m_InputActionsPerSecond: 10
m_RepeatDelay: 0.5
m_ForceModuleActive: 0
--- !u!114 &1900497011
MonoBehaviour:
m_ObjectHideFlags: 0
m_CorrespondingSourceObject: {fileID: 0}
m_PrefabInstance: {fileID: 0}
m_PrefabAsset: {fileID: 0}
m_GameObject: {fileID: 1900497009}
m_Enabled: 1
m_EditorHideFlags: 0
m_Script: {fileID: 11500000, guid: 76c392e42b5098c458856cdf6ecaaaa1, type: 3}
m_Name:
m_EditorClassIdentifier:
m_FirstSelected: {fileID: 0}
m_sendNavigationEvents: 1
m_DragThreshold: 10
--- !u!4 &1900497012
Transform:
m_ObjectHideFlags: 0
m_CorrespondingSourceObject: {fileID: 0}
m_PrefabInstance: {fileID: 0}
m_PrefabAsset: {fileID: 0}
m_GameObject: {fileID: 1900497009}
m_LocalRotation: {x: 0, y: 0, z: 0, w: 1}
m_LocalPosition: {x: 0, y: 0, z: 0}
m_LocalScale: {x: 1, y: 1, z: 1}
m_Children: []
m_Father: {fileID: 0}
m_RootOrder: 3
m_LocalEulerAnglesHint: {x: 0, y: 0, z: 0}
--- !u!1 &2122288186
GameObject:
m_ObjectHideFlags: 0
m_CorrespondingSourceObject: {fileID: 0}
m_PrefabInstance: {fileID: 0}
m_PrefabAsset: {fileID: 0}
serializedVersion: 6
m_Component:
- component: {fileID: 2122288190}
- component: {fileID: 2122288189}
- component: {fileID: 2122288188}
- component: {fileID: 2122288187}
m_Layer: 5
m_Name: Canvas
m_TagString: Untagged
m_Icon: {fileID: 0}
m_NavMeshLayer: 0
m_StaticEditorFlags: 0
m_IsActive: 1
--- !u!114 &2122288187
MonoBehaviour:
m_ObjectHideFlags: 0
m_CorrespondingSourceObject: {fileID: 0}
m_PrefabInstance: {fileID: 0}
m_PrefabAsset: {fileID: 0}
m_GameObject: {fileID: 2122288186}
m_Enabled: 1
m_EditorHideFlags: 0
m_Script: {fileID: 11500000, guid: dc42784cf147c0c48a680349fa168899, type: 3}
m_Name:
m_EditorClassIdentifier:
m_IgnoreReversedGraphics: 1
m_BlockingObjects: 0
m_BlockingMask:
serializedVersion: 2
m_Bits: 4294967295
--- !u!114 &2122288188
MonoBehaviour:
m_ObjectHideFlags: 0
m_CorrespondingSourceObject: {fileID: 0}
m_PrefabInstance: {fileID: 0}
m_PrefabAsset: {fileID: 0}
m_GameObject: {fileID: 2122288186}
m_Enabled: 1
m_EditorHideFlags: 0
m_Script: {fileID: 11500000, guid: 0cd44c1031e13a943bb63640046fad76, type: 3}
m_Name:
m_EditorClassIdentifier:
m_UiScaleMode: 0
m_ReferencePixelsPerUnit: 100
m_ScaleFactor: 1
m_ReferenceResolution: {x: 800, y: 600}
m_ScreenMatchMode: 0
m_MatchWidthOrHeight: 0
m_PhysicalUnit: 3
m_FallbackScreenDPI: 96
m_DefaultSpriteDPI: 96
m_DynamicPixelsPerUnit: 1
--- !u!223 &2122288189
Canvas:
m_ObjectHideFlags: 0
m_CorrespondingSourceObject: {fileID: 0}
m_PrefabInstance: {fileID: 0}
m_PrefabAsset: {fileID: 0}
m_GameObject: {fileID: 2122288186}
m_Enabled: 1
serializedVersion: 3
m_RenderMode: 0
m_Camera: {fileID: 0}
m_PlaneDistance: 100
m_PixelPerfect: 0
m_ReceivesEvents: 1
m_OverrideSorting: 0
m_OverridePixelPerfect: 0
m_SortingBucketNormalizedSize: 0
m_AdditionalShaderChannelsFlag: 0
m_SortingLayerID: 0
m_SortingOrder: 0
m_TargetDisplay: 0
--- !u!224 &2122288190
RectTransform:
m_ObjectHideFlags: 0
m_CorrespondingSourceObject: {fileID: 0}
m_PrefabInstance: {fileID: 0}
m_PrefabAsset: {fileID: 0}
m_GameObject: {fileID: 2122288186}
m_LocalRotation: {x: 0, y: 0, z: 0, w: 1}
m_LocalPosition: {x: 0, y: 0, z: 0}
m_LocalScale: {x: 0, y: 0, z: 0}
m_Children:
- {fileID: 847097469}
m_Father: {fileID: 0}
m_RootOrder: 2
m_LocalEulerAnglesHint: {x: 0, y: 0, z: 0}
m_AnchorMin: {x: 0, y: 0}
m_AnchorMax: {x: 0, y: 0}
m_AnchoredPosition: {x: 0, y: 0}
m_SizeDelta: {x: 0, y: 0}
m_Pivot: {x: 0, y: 0}

Просмотреть файл

@ -0,0 +1,7 @@
fileFormatVersion: 2
guid: edfd9854d0f804c30891682316f753ac
DefaultImporter:
externalObjects: {}
userData:
assetBundleName:
assetBundleVariant:

Просмотреть файл

@ -0,0 +1,157 @@
using System;
using System.Collections;
using System.Collections.Generic;
using uiwidgets;
using Unity.UIWidgets.engine;
using Unity.UIWidgets.foundation;
using Unity.UIWidgets.widgets;
using ui_ = Unity.UIWidgets.widgets.ui_;
using Unity.UIWidgets.async;
using Unity.UIWidgets.painting;
using Unity.UIWidgets.ui;
using UnityEngine.Networking;
using Color = Unity.UIWidgets.ui.Color;
using Image = Unity.UIWidgets.widgets.Image;
using Timer = Unity.UIWidgets.async.Timer;
namespace UIWidgetsSample
{
public class NumberCreator
{
public NumberCreator()
{
Timer.periodic(TimeSpan.FromSeconds(1), t =>
{
_controller.sink.add(_count);
_count++;
return default;
});
}
private int _count = 1;
private readonly StreamController<int> _controller = StreamController<int>.create();
public Stream<int> stream
{
get => _controller.stream;
}
}
public class StreamTest : UIWidgetsPanel
{
protected void OnEnable()
{
base.OnEnable();
}
protected override void main()
{
ui_.runApp(new MyApp());
}
class MyApp : StatelessWidget
{
public override Widget build(BuildContext context)
{
return new WidgetsApp(
home: new ExampleApp(),
color: Color.white,
pageRouteBuilder: (settings, builder) =>
new PageRouteBuilder(
settings: settings,
pageBuilder: (Buildcontext, animation, secondaryAnimation) => builder(context)
)
);
}
}
class ExampleApp : StatefulWidget
{
public ExampleApp(Key key = null) : base(key)
{
}
public override State createState()
{
return new ExampleState();
}
}
class ExampleState : State<ExampleApp>
{
readonly Stream<int> myStream = new NumberCreator().stream;
IEnumerator _loadCoroutine(string key, Completer completer, Isolate isolate) {
var url = new Uri(key);
using (var www = UnityWebRequest.Get(url)) {
yield return www.SendWebRequest();
using (Isolate.getScope(isolate)) {
if (www.isNetworkError || www.isHttpError) {
completer.completeError(new Exception($"Failed to load from url \"{url}\": {www.error}"));
yield break;
}
var data = www.downloadHandler.data;
completer.complete(data);
}
}
}
public override Widget build(BuildContext context)
{
Future<byte[]> f = null;
var completer = Completer.create();
var isolate = Isolate.current;
var panel = UIWidgetsPanelWrapper.current.window;
if (panel.isActive()) {
panel.startCoroutine(_loadCoroutine("https://buljan.rcsdk8.org/sites/main/files/main-images/camera_lense_0.jpeg", completer, isolate));
f = completer.future.to<byte[]>().then_<byte[]>(data => {
if (data != null && data.Length > 0) {
return data;
}
throw new Exception("not loaded");
});
}
var futureBuilder = new FutureBuilder<byte[]>(
future: f,
builder: (ctx, snapshot) =>
{
int width = 200;
int height = 200;
Color color = Colors.blue;
if (snapshot.connectionState == ConnectionState.done)
{
return new Container(alignment: Alignment.center, width: width, height:height, color: color, child: Image.memory(snapshot.data) );
} else if (snapshot.connectionState == ConnectionState.waiting)
{
return new Container(alignment: Alignment.center, width: width, height:height, color: color, child: new Text("waiting") );
}
else
{
return new Container(alignment: Alignment.center, width: width, height:height, color: color, child: new Text("else") );
}
}
);
var streamBuilder = new StreamBuilder<int>(
stream: myStream,
initialData: 1,
builder: (ctx, snapshot) =>
{
var data = snapshot.data;
return new Container(child: new Text($"stream data: {data}"));
}
);
return new Container(
color: Colors.blueGrey,
child: new Column(
children: new List<Widget>
{
streamBuilder,
futureBuilder
}
)
);
}
}
}
}

Просмотреть файл

@ -0,0 +1,58 @@
using System;
using Unity.UIWidgets.async;
using Unity.UIWidgets.engine;
using Unity.UIWidgets.widgets;
namespace UIWidgetsSample
{
public class StreamBuilderSample : UIWidgetsPanel
{
protected override void main()
{
ui_.runApp(
new MyStreamBuilderWidget()
);
}
}
class MyStreamBuilderWidget : StatelessWidget
{
private Stream<int> counter()
{
return Stream<int>.periodic(new TimeSpan(0, 0, 0, 1), i =>
{
return i * 3;
}).take(5);
}
public override Widget build(BuildContext context)
{
return new WidgetsApp(
title: "Text Fields",
home: new StreamBuilder<int>(
stream: counter(),
builder: (BuildContext sub_context, AsyncSnapshot<int> snapshot) =>
{
if (snapshot.hasError)
return new Text($"Error: {snapshot.error}");
switch (snapshot.connectionState) {
case ConnectionState.none:
return new Text("没有Stream");
case ConnectionState.waiting:
return new Text("等待数据...");
case ConnectionState.active:
return new Text($"active: {snapshot.data}");
case ConnectionState.done:
return new Text("Stream已关闭");
}
return null; // unreachable
}
),
pageRouteBuilder: (settings, builder) =>
new PageRouteBuilder(
settings: settings,
pageBuilder: (Buildcontext, animation, secondaryAnimation) => builder(context)
)
);
}
}
}

Просмотреть файл

@ -0,0 +1,3 @@
fileFormatVersion: 2
guid: 97f298138594499f8355a91dab4721fc
timeCreated: 1629451124

Просмотреть файл

@ -0,0 +1,7 @@
using System;
namespace Unity.UIWidgets.async {
public static partial class _async {
public static object _nonNullError(object error) => error ?? new NullReferenceException();
}
}

Просмотреть файл

@ -0,0 +1,3 @@
fileFormatVersion: 2
guid: 2682c09c8f8c447cbd5a400f4db9852a
timeCreated: 1629171016

Просмотреть файл

@ -0,0 +1,126 @@
using System;
using Unity.UIWidgets.async;
namespace Unity.UIWidgets.async {
public class CastStream<S, T> : Stream<T> {
readonly Stream<S> _source;
public CastStream(Stream<S> _source) {
this._source = _source;
}
public override bool isBroadcast {
get { return _source.isBroadcast; }
}
public override StreamSubscription<T> listen(Action<T> onData, Action<object, string> onError = null,
Action onDone = null, bool cancelOnError = false) {
var result = new CastStreamSubscription<S, T>(
_source.listen(null, onDone: onDone, cancelOnError: cancelOnError));
result.onData(onData);
result.onError(onError);
return result;
}
Stream<R> cast<R>() => new CastStream<S, R>(_source);
}
class CastStreamSubscription<S, T> : StreamSubscription<T> {
readonly StreamSubscription<S> _source;
/// Zone where listen was called.
readonly Zone _zone = Zone.current;
/// User's data handler. May be null.
ZoneUnaryCallback _handleData;
/// Copy of _source's handleError so we can report errors in onData.
/// May be null.
ZoneBinaryCallback _handleError;
public CastStreamSubscription(StreamSubscription<S> _source) {
this._source = _source;
_source.onData(_onData);
}
public override Future cancel() => _source.cancel();
public override void onData(Action<T> handleData) {
_handleData = handleData == null
? null
: _zone.registerUnaryCallback(data => {
handleData((T) data);
return null;
});
}
public override void onError(Action<object, string> handleError) {
_source.onError(handleError);
if (handleError == null) {
_handleError = null;
}
else {
_handleError = _zone
.registerBinaryCallback((a, b) => {
handleError(a, (string) b);
return null;
});
}
}
public override void onDone(Action handleDone) {
_source.onDone(handleDone);
}
void _onData(S data) {
if (_handleData == null) return;
T targetData;
try {
// siyao: this might go wrong
targetData = (T) (object) data;
}
catch (Exception error) {
if (_handleError == null) {
_zone.handleUncaughtError(error);
}
else {
_zone.runBinaryGuarded(_handleError, error, error.StackTrace);
}
return;
}
_zone.runUnaryGuarded(_handleData, targetData);
}
public override void pause(Future resumeSignal = null) {
_source.pause(resumeSignal);
}
public override void resume() {
_source.resume();
}
public override bool isPaused {
get { return _source.isPaused; }
}
public override Future<E> asFuture<E>(E futureValue) => _source.asFuture<E>(futureValue);
}
class CastStreamTransformer<SS, ST, TS, TT>
: StreamTransformerBase<TS, TT> {
public readonly StreamTransformer<SS, ST> _source;
public CastStreamTransformer(StreamTransformer<SS, ST> _source) {
this._source = _source;
}
public override StreamTransformer<RS, RT> cast<RS, RT>() =>
new CastStreamTransformer<SS, ST, RS, RT>(_source);
public override Stream<TT> bind(Stream<TS> stream) =>
_source.bind(stream.cast<SS>()).cast<TT>();
}
}

Просмотреть файл

@ -0,0 +1,3 @@
fileFormatVersion: 2
guid: 42f5ce67e0cb4ef18d4e4f51a08fb08c
timeCreated: 1628682203

Просмотреть файл

@ -0,0 +1,12 @@
namespace Unity.UIWidgets.async {
public partial class _async {
internal static object _invokeErrorHandler(
ZoneBinaryCallback errorHandler, object error, string stackTrace) {
// Dynamic invocation because we don't know the actual type of the
// first argument or the error object, but we should successfully call
// the handler if they match up.
// TODO(lrn): Should we? Why not the same below for the unary case?
return errorHandler(error, stackTrace);
}
}
}

Просмотреть файл

@ -0,0 +1,3 @@
fileFormatVersion: 2
guid: 0e0283a717c7486596da66a72eb9231e
timeCreated: 1629344015

Просмотреть файл

@ -0,0 +1,576 @@
using System;
using Unity.UIWidgets.foundation;
namespace Unity.UIWidgets.async {
class _BroadcastStream<T> : _ControllerStream<T> {
internal _BroadcastStream(_StreamControllerLifecycle<T> controller)
: base(controller) {
}
public override bool isBroadcast {
get { return true; }
}
}
class _BroadcastSubscription<T> : _ControllerSubscription<T> {
const int _STATE_EVENT_ID = 1;
internal const int _STATE_FIRING = 2;
const int _STATE_REMOVE_AFTER_FIRING = 4;
// TODO(lrn): Use the _state field on _ControllerSubscription to
// also store this state. Requires that the subscription implementation
// does not assume that it's use of the state integer is the only use.
internal int _eventState = 0; // Initialized to help dart2js type inference.
internal _BroadcastSubscription<T> _next;
internal _BroadcastSubscription<T> _previous;
internal _BroadcastSubscription(_StreamControllerLifecycle<T> controller,
Action<T> onData,
Action<object, string> onError,
Action onDone, bool cancelOnError
)
: base(controller, onData, onError, onDone, cancelOnError) {
_next = _previous = this;
}
internal bool _expectsEvent(int eventId) => (_eventState & _STATE_EVENT_ID) == eventId;
internal void _toggleEventId() {
_eventState ^= _STATE_EVENT_ID;
}
internal bool _isFiring {
get { return (_eventState & _STATE_FIRING) != 0; }
}
internal void _setRemoveAfterFiring() {
D.assert(_isFiring);
_eventState |= _STATE_REMOVE_AFTER_FIRING;
}
internal bool _removeAfterFiring {
get { return (_eventState & _STATE_REMOVE_AFTER_FIRING) != 0; }
}
// The controller._recordPause doesn't do anything for a broadcast controller,
// so we don't bother calling it.
protected override void _onPause() {
}
// The controller._recordResume doesn't do anything for a broadcast
// controller, so we don't bother calling it.
protected override void _onResume() {
}
// _onCancel is inherited.
}
abstract class _BroadcastStreamController<T>
: _StreamControllerBase<T> {
const int _STATE_INITIAL = 0;
const int _STATE_EVENT_ID = 1;
internal const int _STATE_FIRING = 2;
protected const int _STATE_CLOSED = 4;
const int _STATE_ADDSTREAM = 8;
public override _stream.ControllerCallback onListen { get; set; }
public override _stream.ControllerCancelCallback onCancel { get; set; }
// State of the controller.
internal int _state;
// Double-linked list of active listeners.
internal _BroadcastSubscription<T> _firstSubscription;
_BroadcastSubscription<T> _lastSubscription;
// Extra state used during an [addStream] call.
_AddStreamState<T> _addStreamState;
internal _Future _doneFuture;
internal _BroadcastStreamController(_stream.ControllerCallback onListen,
_stream.ControllerCancelCallback onCancel) {
this.onListen = onListen;
this.onCancel = onCancel;
_state = _STATE_INITIAL;
}
public override _stream.ControllerCallback onPause {
get {
throw new Exception(
"Broadcast stream controllers do not support pause callbacks");
}
set {
throw new Exception(
"Broadcast stream controllers do not support pause callbacks");
}
}
public override _stream.ControllerCallback onResume {
get {
throw new Exception(
"Broadcast stream controllers do not support pause callbacks");
}
set {
throw new Exception(
"Broadcast stream controllers do not support pause callbacks");
}
}
// StreamController interface.
public override Stream<T> stream {
get { return new _BroadcastStream<T>(this); }
}
public override StreamSink<T> sink {
get { return new _StreamSinkWrapper<T>(this); }
}
public override bool isClosed {
get { return (_state & _STATE_CLOSED) != 0; }
}
/**
* A broadcast controller is never paused.
*
* Each receiving stream may be paused individually, and they handle their
* own buffering.
*/
public override bool isPaused {
get => false;
}
/** Whether there are currently one or more subscribers. */
public override bool hasListener {
get => !_isEmpty;
}
/**
* Test whether the stream has exactly one listener.
*
* Assumes that the stream has a listener (not [_isEmpty]).
*/
internal bool _hasOneListener {
get {
D.assert(!_isEmpty);
return Equals(_firstSubscription, _lastSubscription);
}
}
/** Whether an event is being fired (sent to some, but not all, listeners). */
internal virtual bool _isFiring {
get => (_state & _STATE_FIRING) != 0;
}
internal bool _isAddingStream {
get => (_state & _STATE_ADDSTREAM) != 0;
}
internal virtual bool _mayAddEvent {
get => (_state < _STATE_CLOSED);
}
_Future _ensureDoneFuture() {
if (_doneFuture != null) return _doneFuture;
return _doneFuture = new _Future();
}
// Linked list helpers
internal virtual bool _isEmpty {
get { return _firstSubscription == null; }
}
/** Adds subscription to linked list of active listeners. */
void _addListener(_BroadcastSubscription<T> subscription) {
D.assert(Equals(subscription._next, subscription));
subscription._eventState = (_state & _STATE_EVENT_ID);
// Insert in linked list as last subscription.
_BroadcastSubscription<T> oldLast = _lastSubscription;
_lastSubscription = subscription;
subscription._next = null;
subscription._previous = oldLast;
if (oldLast == null) {
_firstSubscription = subscription;
}
else {
oldLast._next = subscription;
}
}
void _removeListener(_BroadcastSubscription<T> subscription) {
D.assert(Equals(subscription._controller, this));
D.assert(!Equals(subscription._next, subscription));
_BroadcastSubscription<T> previous = subscription._previous;
_BroadcastSubscription<T> next = subscription._next;
if (previous == null) {
// This was the first subscription.
_firstSubscription = next;
}
else {
previous._next = next;
}
if (next == null) {
// This was the last subscription.
_lastSubscription = previous;
}
else {
next._previous = previous;
}
subscription._next = subscription._previous = subscription;
}
// _StreamControllerLifecycle interface.
public override StreamSubscription<T> _subscribe(
Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError) {
if (isClosed) {
onDone = onDone ?? _stream._nullDoneHandler;
return new _DoneStreamSubscription<T>(() => onDone());
}
StreamSubscription<T> subscription = new _BroadcastSubscription<T>(
this, onData, onError, onDone, cancelOnError);
_addListener((_BroadcastSubscription<T>) subscription);
if (Equals(_firstSubscription, _lastSubscription)) {
// Only one listener, so it must be the first listener.
_stream._runGuarded(() => onListen());
}
return subscription;
}
public override Future _recordCancel(StreamSubscription<T> sub) {
_BroadcastSubscription<T> subscription = (_BroadcastSubscription<T>) sub;
// If already removed by the stream, don't remove it again.
if (Equals(subscription._next, subscription)) return null;
if (subscription._isFiring) {
subscription._setRemoveAfterFiring();
}
else {
_removeListener(subscription);
// If we are currently firing an event, the empty-check is performed at
// the end of the listener loop instead of here.
if (!_isFiring && _isEmpty) {
_callOnCancel();
}
}
return null;
}
public override void _recordPause(StreamSubscription<T> subscription) {
}
public override void _recordResume(StreamSubscription<T> subscription) {
}
// EventSink interface.
internal virtual Exception _addEventError() {
if (isClosed) {
return new Exception("Cannot add new events after calling close");
}
D.assert(_isAddingStream);
return new Exception("Cannot add new events while doing an addStream");
}
public override void add(T data) {
if (!_mayAddEvent) throw _addEventError();
_sendData(data);
}
public override void addError(object error, string stackTrace) {
// ArgumentError.checkNotNull(error, "error");
if (!_mayAddEvent) throw _addEventError();
AsyncError replacement = Zone.current.errorCallback((Exception) error);
if (replacement != null) {
error = _async._nonNullError(replacement);
stackTrace = replacement.StackTrace;
}
stackTrace = stackTrace ?? AsyncError.defaultStackTrace(error);
_sendError(error, stackTrace);
}
public override Future close() {
if (isClosed) {
D.assert(_doneFuture != null);
return _doneFuture;
}
if (!_mayAddEvent) throw _addEventError();
_state |= _STATE_CLOSED;
Future doneFuture = _ensureDoneFuture();
_sendDone();
return doneFuture;
}
public override Future done {
get { return _ensureDoneFuture(); }
}
public override Future addStream(Stream<T> stream, bool? cancelOnError = null) {
if (!_mayAddEvent) throw _addEventError();
_state |= _STATE_ADDSTREAM;
_addStreamState = new _AddStreamState<T>(this, stream, cancelOnError ?? false);
return _addStreamState.addStreamFuture;
}
// _EventSink interface, called from AddStreamState.
public override void _add(T data) {
_sendData(data);
}
public override void _addError(object error, string stackTrace) {
_sendError(error, stackTrace);
}
public override void _close() {
D.assert(_isAddingStream);
_AddStreamState<T> addState = _addStreamState;
_addStreamState = null;
_state &= ~_STATE_ADDSTREAM;
addState.complete();
}
// Event handling.
internal void _forEachListener(Action<_BufferingStreamSubscription<T>> action) {
if (_isFiring) {
throw new Exception(
"Cannot fire new event. Controller is already firing an event");
}
if (_isEmpty) return;
// Get event id of this event.
int id = (_state & _STATE_EVENT_ID);
// Start firing (set the _STATE_FIRING bit). We don't do [onCancel]
// callbacks while firing, and we prevent reentrancy of this function.
//
// Set [_state]'s event id to the next event's id.
// Any listeners added while firing this event will expect the next event,
// not this one, and won't get notified.
_state ^= _STATE_EVENT_ID | _STATE_FIRING;
_BroadcastSubscription<T> subscription = _firstSubscription;
while (subscription != null) {
if (subscription._expectsEvent(id)) {
subscription._eventState |= _BroadcastSubscription<T>._STATE_FIRING;
action(subscription);
subscription._toggleEventId();
_BroadcastSubscription<T> next = subscription._next;
if (subscription._removeAfterFiring) {
_removeListener(subscription);
}
subscription._eventState &= ~_BroadcastSubscription<T>._STATE_FIRING;
subscription = next;
}
else {
subscription = subscription._next;
}
}
_state &= ~_STATE_FIRING;
if (_isEmpty) {
_callOnCancel();
}
}
internal virtual void _callOnCancel() {
D.assert(_isEmpty);
if (isClosed && _doneFuture._mayComplete) {
// When closed, _doneFuture is not null.
_doneFuture._asyncComplete(FutureOr.nil);
}
_stream._runGuarded(() => onCancel());
}
}
class _SyncBroadcastStreamController<T> : _BroadcastStreamController<T>
, SynchronousStreamController<T> {
internal _SyncBroadcastStreamController(
_stream.ControllerCallback onListen, Action onCancel)
: base(onListen, () => {
onCancel();
return Future._nullFuture;
}) {
}
// EventDispatch interface.
internal override bool _mayAddEvent {
get { return base._mayAddEvent && !_isFiring; }
}
internal override Exception _addEventError() {
if (_isFiring) {
return new Exception(
"Cannot fire new event. Controller is already firing an event");
}
return base._addEventError();
}
public override void _sendData(T data) {
if (_isEmpty) return;
if (_hasOneListener) {
_state |= _BroadcastStreamController<T>._STATE_FIRING;
_BroadcastSubscription<T> subscription = _firstSubscription;
subscription._add(data);
_state &= ~_BroadcastStreamController<T>._STATE_FIRING;
if (_isEmpty) {
_callOnCancel();
}
return;
}
_forEachListener((_BufferingStreamSubscription<T> subscription) => { subscription._add(data); });
}
public override void _sendError(object error, string stackTrace) {
if (_isEmpty) return;
_forEachListener((_BufferingStreamSubscription<T> subscription) => {
subscription._addError(error, stackTrace);
});
}
public override void _sendDone() {
if (!_isEmpty) {
_forEachListener((_BufferingStreamSubscription<T> subscription) => { subscription._close(); });
}
else {
D.assert(_doneFuture != null);
D.assert(_doneFuture._mayComplete);
_doneFuture._asyncComplete(FutureOr.nil);
}
}
}
//
class _AsyncBroadcastStreamController<T> : _BroadcastStreamController<T> {
internal _AsyncBroadcastStreamController(_stream.ControllerCallback onListen,
_stream.ControllerCancelCallback onCancel)
: base(onListen, onCancel) {
}
// EventDispatch interface.
public override void _sendData(T data) {
for (_BroadcastSubscription<T> subscription = _firstSubscription;
subscription != null;
subscription = subscription._next) {
subscription._addPending(new _DelayedData<T>(data));
}
}
public override void _sendError(object error, string stackTrace) {
for (_BroadcastSubscription<T> subscription = _firstSubscription;
subscription != null;
subscription = subscription._next) {
subscription._addPending(new _DelayedError<T>((Exception) error, stackTrace));
}
}
public override void _sendDone() {
if (!_isEmpty) {
for (_BroadcastSubscription<T> subscription = _firstSubscription;
subscription != null;
subscription = subscription._next) {
subscription._addPending(new _DelayedDone<T>());
}
}
else {
D.assert(_doneFuture != null);
D.assert(_doneFuture._mayComplete);
_doneFuture._asyncComplete(FutureOr.nil);
}
}
}
//
// /**
// * Stream controller that is used by [Stream.asBroadcastStream].
// *
// * This stream controller allows incoming events while it is firing
// * other events. This is handled by delaying the events until the
// * current event is done firing, and then fire the pending events.
// *
// * This class extends [_SyncBroadcastStreamController]. Events of
// * an "asBroadcastStream" stream are always initiated by events
// * on another stream, and it is fine to forward them synchronously.
// */
class _AsBroadcastStreamController<T> : _SyncBroadcastStreamController<T>
, _EventDispatch<T> {
_StreamImplEvents<T> _pending;
internal _AsBroadcastStreamController(Action onListen, Action onCancel)
: base(() => onListen(), onCancel) {
}
bool _hasPending {
get { return _pending != null && !_pending.isEmpty; }
}
void _addPendingEvent(_DelayedEvent<T> evt) {
_pending = _pending ?? new _StreamImplEvents<T>();
_pending.add(evt);
}
public override void add(T data) {
if (!isClosed && _isFiring) {
_addPendingEvent(new _DelayedData<T>(data));
return;
}
base.add(data);
while (_hasPending) {
_pending.handleNext(this);
}
}
public override void addError(object error, string stackTrace) {
// ArgumentError.checkNotNull(error, "error");
stackTrace = stackTrace ?? AsyncError.defaultStackTrace(error);
if (!isClosed && _isFiring) {
_addPendingEvent(new _DelayedError<T>((Exception) error, stackTrace));
return;
}
if (!_mayAddEvent) throw _addEventError();
_sendError(error, stackTrace);
while (_hasPending) {
_pending.handleNext(this);
}
}
public override Future close() {
if (!isClosed && _isFiring) {
_addPendingEvent(new _DelayedDone<T>());
_state |= _BroadcastStreamController<T>._STATE_CLOSED;
return base.done;
}
Future result = base.close();
D.assert(!_hasPending);
return result;
}
internal override void _callOnCancel() {
if (_hasPending) {
_pending.clear();
_pending = null;
}
base._callOnCancel();
}
}
}

Просмотреть файл

@ -0,0 +1,3 @@
fileFormatVersion: 2
guid: 88bb4d17b79047948e7e36354ad968d4
timeCreated: 1629189231

Просмотреть файл

@ -40,9 +40,9 @@ namespace Unity.UIWidgets.async {
}
public abstract class Future {
static readonly _Future _nullFuture = _Future.zoneValue(null, Zone.root);
internal static readonly _Future _nullFuture = _Future.zoneValue(null, Zone.root);
static readonly _Future _falseFuture = _Future.zoneValue(false, Zone.root);
internal static readonly _Future _falseFuture = _Future.zoneValue(false, Zone.root);
public static Future create(Func<FutureOr> computation) {
_Future result = new _Future();

Просмотреть файл

@ -0,0 +1,9 @@
using Unity.UIWidgets.async;
namespace Unity.UIWidgets.core {
public abstract class Sink<T> {
public abstract void add(T data);
public abstract Future close();
}
}

Просмотреть файл

@ -0,0 +1,3 @@
fileFormatVersion: 2
guid: 8b152784a6234bc493708702199a316d
timeCreated: 1628676429

Просмотреть файл

@ -0,0 +1,65 @@
using System;
namespace Unity.UIWidgets.core {
public class Stopwatch {
static int _frequency;
// The _start and _stop fields capture the time when [start] and [stop]
// are called respectively.
// If _stop is null, the stopwatch is running.
int? _start = 0;
int? _stop = 0;
public Stopwatch() {
if (_frequency == null) _initTicker();
}
public int frequency {
get { return _frequency; }
}
public void start() {
if (_stop != null) {
// (Re)start this stopwatch.
// Don't count the time while the stopwatch has been stopped.
_start += _now() - _stop;
_stop = null;
}
}
public void stop() {
_stop = _stop ?? _now();
}
public void reset() {
_start = _stop ?? _now();
}
public int? elapsedTicks {
get { return (_stop ?? _now()) - _start; }
}
public TimeSpan elapsed {
get { return TimeSpan.FromMilliseconds(elapsedMicroseconds); }
}
// This is external, we might need to reimplement it
int elapsedMicroseconds { get; }
// This is external, we might need to reimplement it
int elapsedMilliseconds { get; }
bool isRunning {
get { return _stop == null; }
}
// This is external, we might need to reimplement it
static void _initTicker() {
}
// This is external, we might need to reimplement it
static int _now() {
return DateTime.Now.Millisecond;
}
}
}

Просмотреть файл

@ -0,0 +1,3 @@
fileFormatVersion: 2
guid: 63c86df370684414b7b148702eacd440
timeCreated: 1629184886

Просмотреть файл

@ -0,0 +1,990 @@
using System;
using System.Collections.Generic;
using System.Text;
using Unity.UIWidgets.core;
using Unity.UIWidgets.foundation;
using Stopwatch = Unity.UIWidgets.core.Stopwatch;
namespace Unity.UIWidgets.async {
public static partial class _stream {
public delegate void _TimerCallback();
}
public abstract class Stream<T> {
public Stream() {
}
// const Stream._internal();
public static Stream<T> empty() => new _EmptyStream<T>();
// @Since("2.5")
public static Stream<T> value(T value) {
var result = new _AsyncStreamController<T>(null, null, null, null);
result._add(value);
result._closeUnchecked();
return result.stream;
}
// @Since("2.5")
public static Stream<T> error(object error, string stackTrace = null) {
// ArgumentError.checkNotNull(error, "error");
var result = new _AsyncStreamController<T>(null, null, null, null);
result._addError(error, stackTrace ?? AsyncError.defaultStackTrace(error));
result._closeUnchecked();
return result.stream;
}
public static Stream<T> fromFuture(Future<T> future) {
// Use the controller's buffering to fill in the value even before
// the stream has a listener. For a single value, it's not worth it
// to wait for a listener before doing the `then` on the future.
_StreamController<T> controller =
new _SyncStreamController<T>(null, null, null, null);
future.then((value) => {
controller._add((T) value);
controller._closeUnchecked();
}, onError: (error) => {
controller._addError(error, null);
controller._closeUnchecked();
return FutureOr.nil;
});
return controller.stream;
}
public static Stream<T> fromFutures(IEnumerable<Future<T>> futures) {
_StreamController<T> controller =
new _SyncStreamController<T>(null, null, null, null);
int count = 0;
// Declare these as variables holding closures instead of as
// function declarations.
// This avoids creating a new closure from the functions for each future.
var onValue = new Action<object>((object value) => {
if (!controller.isClosed) {
controller._add((T) value);
if (--count == 0) controller._closeUnchecked();
}
});
var onError = new Func<Exception, FutureOr>((error) => {
if (!controller.isClosed) {
controller._addError(error, null);
if (--count == 0) controller._closeUnchecked();
}
return FutureOr.nil;
});
// The futures are already running, so start listening to them immediately
// (instead of waiting for the stream to be listened on).
// If we wait, we might not catch errors in the futures in time.
foreach (var future in futures) {
count++;
future.then(onValue, onError: onError);
}
// Use schedule microtask since controller is sync.
if (count == 0) async_.scheduleMicrotask(controller.close);
return controller.stream;
}
public static Stream<T> fromIterable(IEnumerable<T> elements) {
return new _GeneratedStreamImpl<T>(
() => (_PendingEvents<T>) new _IterablePendingEvents<T>(elements));
}
public static Stream<T> periodic(TimeSpan period,
Func<int, T> computation = null) {
Timer timer = default;
int computationCount = 0;
StreamController<T> controller = null;
// Counts the time that the Stream was running (and not paused).
Stopwatch watch = new Stopwatch();
Action sendEvent = () => {
watch.reset();
T data = default;
if (computation != null) {
try {
data = computation(computationCount++);
}
catch (Exception e) {
controller.addError(e, e.StackTrace);
return;
}
}
controller.add(data);
};
Action startPeriodicTimer = () => {
D.assert(timer == null);
timer = Timer.periodic(period, (object timer1) => {
sendEvent();
return null;
});
};
// the original code new an abstract class
controller = StreamController<T>.create(
sync: true,
onListen: () => {
watch.start();
startPeriodicTimer();
},
onPause: () => {
timer.cancel();
timer = null;
watch.stop();
},
onResume: () => {
D.assert(timer == null);
TimeSpan elapsed = watch.elapsed;
watch.start();
timer = Timer.create(period - elapsed, () => {
timer = null;
startPeriodicTimer();
sendEvent();
});
},
onCancel: () => {
if (timer != null) timer.cancel();
timer = null;
return Future._nullFuture;
});
return controller.stream;
}
public static Stream<T> eventTransformed(
Stream<T> source, _async._SinkMapper<T, T> mapSink) {
return new _BoundSinkStream<T, T>(source, mapSink);
}
static Stream<T> castFrom<S, T>(Stream<S> source) =>
new CastStream<S, T>(source);
public virtual bool isBroadcast {
get { return false; }
}
public virtual Stream<T> asBroadcastStream(
Action<StreamSubscription<T>> onListen = null,
Action<StreamSubscription<T>> onCancel = null) {
return new _AsBroadcastStream<T>(this, onListen, onCancel);
}
public abstract StreamSubscription<T> listen(
Action<T> onData, Action<object, string> onError = null, Action onDone = null, bool cancelOnError = false);
public Stream<T> where(Func<T, bool> test) {
return new _WhereStream<T>(this, test);
}
public Stream<S> map<S>(Func<T, S> convert) {
return new _MapStream<T, S>(this, convert);
}
public Stream<E> asyncMap<E>(Func<T, FutureOr> convert) {
_StreamControllerBase<E> controller = null;
StreamSubscription<T> subscription = null;
void onListen() {
var add = new Action<E>(controller.add);
D.assert(controller is _StreamController<E> ||
controller is _BroadcastStreamController<E>);
var addError = new Action<object, string>(controller._addError);
subscription = listen((T evt) => {
FutureOr newValue;
try {
newValue = convert(evt);
}
catch (Exception e) {
controller.addError(e, e.StackTrace);
return;
}
if (newValue.f is Future<E> newFuture) {
// siyao: this if different from dart
subscription.pause();
newFuture
.then(d => add((E) d), onError: (e) => {
addError(e, e.StackTrace);
return FutureOr.nil;
})
.whenComplete(subscription.resume);
}
else {
// Siyao: This works as if this is csharpt
controller.add((E) newValue.v);
}
}, onError: addError, onDone: () => controller.close());
}
if (isBroadcast) {
controller = (_StreamControllerBase<E>) StreamController<E>.broadcast(
onListen: () => onListen(),
onCancel: () => { subscription.cancel(); },
sync: true);
}
else {
controller = (_StreamControllerBase<E>) StreamController<E>.create(
onListen: onListen,
onPause: () => { subscription.pause(); },
onResume: () => { subscription.resume(); },
onCancel: () => subscription.cancel(),
sync: true);
}
return controller.stream;
}
Stream<E> asyncExpand<E>(Func<T, Stream<E>> convert) {
_StreamControllerBase<E> controller = null;
StreamSubscription<T> subscription = null;
void onListen() {
D.assert(controller is _StreamController<E> ||
controller is _BroadcastStreamController<E>);
subscription = listen((T evt) => {
Stream<E> newStream;
try {
newStream = convert(evt);
}
catch (Exception e) {
controller.addError(e, e.StackTrace);
return;
}
if (newStream != null) {
subscription.pause();
controller.addStream(newStream).whenComplete(subscription.resume);
}
},
onError: controller._addError, // Avoid Zone error replacement.
onDone: () => controller.close());
}
if (isBroadcast) {
controller = (_StreamControllerBase<E>) StreamController<E>.broadcast(
onListen: () => onListen(),
onCancel: () => { subscription.cancel(); },
sync: true);
}
else {
controller = (_StreamControllerBase<E>) StreamController<E>.create(
onListen: () => onListen(),
onPause: () => { subscription.pause(); },
onResume: () => { subscription.resume(); },
onCancel: () => subscription.cancel(),
sync: true);
}
return controller.stream;
}
Stream<T> handleError(ZoneBinaryCallback onError, _stream._ErrorTest test = null) {
return new _HandleErrorStream<T>(this, onError, test);
}
Stream<S> expand<S>(_stream._Transformation<T, IEnumerable<S>> convert) {
return new _ExpandStream<T, S>(this, convert);
}
Future pipe(StreamConsumer<T> streamConsumer) {
return streamConsumer.addStream(this).then((_) => streamConsumer.close(), (_) => FutureOr.nil);
}
public Stream<S> transform<S>(StreamTransformer<T, S> streamTransformer) {
return streamTransformer.bind(this);
}
Future<T> reduce(Func<T, T, T> combine) {
_Future result = new _Future();
bool seenFirst = false;
T value = default;
StreamSubscription<T> subscription = null;
subscription = listen(
(T element) => {
if (seenFirst) {
_stream._runUserCode(() => combine(value, element), (T newValue) => { value = newValue; },
onError: (e) => _stream._cancelAndErrorClosure(subscription, result)(e));
}
else {
value = element;
seenFirst = true;
}
},
onError: (e, s) => result._completeError((Exception) e),
onDone: () => {
if (!seenFirst) {
try {
// Throw and recatch, instead of just doing
// _completeWithErrorCallback, e, theError, StackTrace.current),
// to ensure that the stackTrace is set on the error.
throw new Exception("IterableElementError.noElement()");
}
catch (Exception e) {
async_._completeWithErrorCallback(result, e);
}
}
else {
// TODO: need check
result._complete(FutureOr.value(value));
}
},
cancelOnError: true);
return result.to<T>();
}
Future<S> fold<S>(S initialValue, Func<S, T, S> combine) {
_Future result = new _Future();
S value = initialValue;
StreamSubscription<T> subscription = null;
subscription = listen(
(T element) => {
_stream._runUserCode(() => combine(value, element), (S newValue) => { value = newValue; },
e => _stream._cancelAndErrorClosure(subscription, result)(e));
},
onError: (e, s) => result._completeError((Exception) e),
onDone: () => { result._complete(FutureOr.value(value)); },
cancelOnError: true);
return result.to<S>();
}
Future<string> join(string separator = "") {
_Future result = new _Future();
StringBuilder buffer = new StringBuilder();
StreamSubscription<T> subscription = null;
bool first = true;
subscription = listen(
(T element) => {
if (!first) {
buffer.Append(separator);
}
first = false;
try {
buffer.Append(element);
}
catch (Exception e) {
_stream._cancelAndErrorWithReplacement(subscription, result, e);
}
},
onError: (e, _) => result._completeError((Exception) e),
onDone: () => { result._complete(buffer.ToString()); },
cancelOnError: true);
return result.to<string>();
}
Future<bool> contains(object needle) {
_Future future = new _Future();
StreamSubscription<T> subscription = null;
subscription = listen(
(T element) => {
_stream._runUserCode(() => (Equals(element, needle)), (bool isMatch) => {
if (isMatch) {
_stream._cancelAndValue(subscription, future, true);
}
}, (e) => _stream._cancelAndErrorClosure(subscription, future)(e));
},
onError: (e, _) => future._completeError((Exception) e),
onDone: () => { future._complete(false); },
cancelOnError: true);
return future.to<bool>();
}
public Future forEach(Action<T> action) {
_Future future = new _Future();
StreamSubscription<T> subscription = null;
subscription = listen(
(T element) => {
// TODO(floitsch): the type should be 'void' and inferred.
_stream._runUserCode<object>(() => {
action(element);
return default;
}, (_) => { },
(e) => _stream._cancelAndErrorClosure(subscription, future)(e));
},
onError: (e, _) => future._completeError((Exception) e),
onDone: () => { future._complete(FutureOr.nil); },
cancelOnError: true);
return future;
}
Future<bool> every(Func<T, bool> test) {
_Future future = new _Future();
StreamSubscription<T> subscription = null;
subscription = listen(
(T element) => {
_stream._runUserCode(() => test(element), (bool isMatch) => {
if (!isMatch) {
_stream._cancelAndValue(subscription, future, false);
}
}, ex => _stream._cancelAndErrorClosure(subscription, future)(ex));
},
onError: (ex, s) => future._completeError((Exception) ex),
onDone: () => { future._complete(true); },
cancelOnError: true);
return future.to<bool>();
}
Future<bool> any(Func<T, bool> test) {
_Future future = new _Future();
StreamSubscription<T> subscription = null;
subscription = listen(
(T element) => {
_stream._runUserCode(() => test(element), (bool isMatch) => {
if (isMatch) {
_stream._cancelAndValue(subscription, future, true);
}
}, (e) => _stream._cancelAndErrorClosure(subscription, future)(e));
},
onError: (e, _) => future._completeError((Exception) e),
onDone: () => { future._complete(false); },
cancelOnError: true);
return future.to<bool>();
}
Future<int> length {
get {
_Future future = new _Future();
int count = 0;
listen(
(_) => { count++; },
onError: (e, _) => future._completeError((Exception) e),
onDone: () => { future._complete(count); },
cancelOnError: true);
return future.to<int>();
}
}
Future<bool> isEmpty {
get {
_Future future = new _Future();
StreamSubscription<T> subscription = null;
subscription = listen(
(_) => { _stream._cancelAndValue(subscription, future, false); },
onError: (e, _) => future._completeError((Exception) e),
onDone: () => { future._complete(true); },
cancelOnError: true);
return future.to<bool>();
}
}
public Stream<R> cast<R>() => Stream<T>.castFrom<T, R>(this);
public Future<List<T>> toList() {
List<T> result = new List<T>();
_Future future = new _Future();
listen(
(T data) => { result.Add(data); },
onError: (e, _) => future._completeError((Exception) e),
onDone: () => { future._complete(FutureOr.value(result)); },
cancelOnError: true);
return future.to<List<T>>();
}
public Future<HashSet<T>> toSet() {
HashSet<T> result = new HashSet<T>();
_Future future = new _Future();
listen(
(T data) => { result.Add(data); },
onError: (e, _) => future._completeError((Exception) e),
onDone: () => { future._complete(FutureOr.value(result)); },
cancelOnError: true);
return future.to<HashSet<T>>();
}
Future<E> drain<E>(E futureValue) =>
listen(null, cancelOnError: true).asFuture<E>(futureValue);
public Stream<T> take(int count) {
return new _TakeStream<T>(this, count);
}
Stream<T> takeWhile(Func<T, bool> test) {
return new _TakeWhileStream<T>(this, d => test(d));
}
Stream<T> skip(int count) {
return new _SkipStream<T>(this, count);
}
Stream<T> skipWhile(Func<T, bool> test) {
return new _SkipWhileStream<T>(this, d => test(d));
}
public Stream<T> distinct(Func<T, T, bool> equals) {
return new _DistinctStream<T>(this, (d1, d2) => equals(d1, d2));
}
Future<T> first {
get {
_Future future = new _Future();
StreamSubscription<T> subscription = null;
subscription = listen(
(T value) => { _stream._cancelAndValue(subscription, future, value); },
onError: (e, _) => future._completeError((Exception) e),
onDone: () => {
try {
throw new Exception("IterableElementError.noElement()");
}
catch (Exception e) {
async_._completeWithErrorCallback(future, e);
}
},
cancelOnError: true);
return future.to<T>();
}
}
Future<T> last {
get {
_Future future = new _Future();
T result = default;
bool foundResult = false;
listen(
(T value) => {
foundResult = true;
result = value;
},
onError: (e, _) => future._completeError((Exception) e),
onDone: () => {
if (foundResult) {
future._complete(FutureOr.value(result));
return;
}
try {
throw new Exception("IterableElementError.noElement()");
}
catch (Exception e) {
async_._completeWithErrorCallback(future, e);
}
},
cancelOnError: true);
return future.to<T>();
}
}
Future<T> single {
get {
_Future future = new _Future();
T result = default;
bool foundResult = false;
StreamSubscription<T> subscription = null;
subscription = listen(
(T value) => {
if (foundResult) {
// This is the second element we get.
try {
throw new Exception("IterableElementError.tooMany()");
}
catch (Exception e) {
_stream._cancelAndErrorWithReplacement(subscription, future, e);
}
return;
}
foundResult = true;
result = value;
},
onError: (e, _) => future._completeError((Exception) e),
onDone: () => {
if (foundResult) {
future._complete(FutureOr.value(result));
return;
}
try {
throw new Exception("IterableElementError.noElement()");
}
catch (Exception e) {
async_._completeWithErrorCallback(future, e);
}
},
cancelOnError: true);
return future.to<T>();
}
}
Future<T> firstWhere(Func<T, bool> test, Func<T> orElse = null) {
_Future future = new _Future();
StreamSubscription<T> subscription = null;
subscription = listen(
(T value) => {
_stream._runUserCode(() => test(value), (bool isMatch) => {
if (isMatch) {
_stream._cancelAndValue(subscription, future, value);
}
}, (e) => _stream._cancelAndErrorClosure(subscription, future)(e));
},
onError: (e, _) => future._completeError((Exception) e),
onDone: () => {
if (orElse != null) {
_stream._runUserCode(orElse, v => future._complete(FutureOr.value(v)), future._completeError);
return;
}
try {
throw new Exception("IterableElementError.noElement()");
}
catch (Exception e) {
async_._completeWithErrorCallback(future, e);
}
},
cancelOnError: true);
return future.to<T>();
}
Future<T> lastWhere(Func<T, bool> test, Func<T> orElse = null) {
_Future future = new _Future();
T result = default;
bool foundResult = false;
StreamSubscription<T> subscription = null;
subscription = listen(
(T value) => {
_stream._runUserCode(() => true == test(value), (bool isMatch) => {
if (isMatch) {
foundResult = true;
result = value;
}
}, (e) => _stream._cancelAndErrorClosure(subscription, future)(e));
},
onError: (e, _) => future._completeError((Exception) e),
onDone: () => {
if (foundResult) {
future._complete(FutureOr.value(result));
return;
}
if (orElse != null) {
_stream._runUserCode(orElse, v => future._complete(FutureOr.value(v)), future._completeError);
return;
}
try {
throw new Exception("IterableElementError.noElement()");
}
catch (Exception e) {
async_._completeWithErrorCallback(future, e);
}
},
cancelOnError: true);
return future.to<T>();
}
Future<T> singleWhere(Func<T, bool> test, Func<T> orElse = null) {
_Future future = new _Future();
T result = default;
bool foundResult = false;
StreamSubscription<T> subscription = null;
subscription = listen(
(T value) => {
_stream._runUserCode(() => true == test(value), (bool isMatch) => {
if (isMatch) {
if (foundResult) {
try {
throw new Exception("IterableElementError.tooMany()");
}
catch (Exception e) {
_stream._cancelAndErrorWithReplacement(subscription, future, e);
}
return;
}
foundResult = true;
result = value;
}
}, (e) => _stream._cancelAndErrorClosure(subscription, future)(e));
},
onError: (e, _) => future._completeError((Exception) e),
onDone: () => {
if (foundResult) {
future._complete(FutureOr.value(result));
return;
}
try {
if (orElse != null) {
_stream._runUserCode(orElse, v => future._complete(FutureOr.value(v)),
future._completeError);
return;
}
throw new Exception("IterableElementError.noElement()");
}
catch (Exception e) {
async_._completeWithErrorCallback(future, e);
}
},
cancelOnError: true);
return future.to<T>();
}
Future<T> elementAt(int index) {
// ArgumentError.checkNotNull(index, "index");
// RangeError.checkNotNegative(index, "index");
_Future future = new _Future();
StreamSubscription<T> subscription = null;
int elementIndex = 0;
subscription = listen(
(T value) => {
if (index == elementIndex) {
_stream._cancelAndValue(subscription, future, value);
return;
}
elementIndex += 1;
},
onError: (e, _) => future._completeError((Exception) e),
onDone: () => {
future._completeError(
new Exception($"exception {index} null, {elementIndex}")
// new RangeError.index(index, this, "index", null, elementIndex)
);
},
cancelOnError: true);
return future.to<T>();
}
public Stream<T> timeout(TimeSpan timeLimit, Action<EventSink<T>> onTimeout) {
_StreamControllerBase<T> controller = null;
// The following variables are set on listen.
StreamSubscription<T> subscription = null;
Timer timer = null;
Zone zone = null;
_stream._TimerCallback timeout = null;
Action<T> onData = (T evt) => {
timer.cancel();
timer = zone.createTimer(timeLimit, () => {
timeout();
return default;
});
// It might close the stream and cancel timer, so create recuring Timer
// before calling into add();
// issue: https://github.com/dart-lang/sdk/issues/37565
controller.add(evt);
};
Action<object, string> onError = (object error, string stack) => {
timer.cancel();
D.assert(controller is _StreamController<T> ||
controller is _BroadcastStreamController<T>);
Exception e = error as Exception;
controller._addError(e, e.StackTrace); // Avoid Zone error replacement.
timer = zone.createTimer(timeLimit, () => {
timeout();
return default;
});
};
Action onDone = () => {
timer.cancel();
controller.close();
};
Action onListen = () => {
// This is the onListen callback for of controller.
// It runs in the same zone that the subscription was created in.
// Use that zone for creating timers and running the onTimeout
// callback.
zone = Zone.current;
if (onTimeout == null) {
timeout = () => {
controller.addError(
new TimeoutException("No stream event", timeLimit), null);
};
}
else {
// TODO(floitsch): the return type should be 'void', and the type
// should be inferred.
var registeredOnTimeout =
zone.registerUnaryCallback((o) => {
onTimeout((EventSink<T>) o);
return default;
});
var wrapper = new _ControllerEventSinkWrapper<T>(null);
timeout = () => {
wrapper._sink = controller; // Only valid during call.
zone.runUnaryGuarded(registeredOnTimeout, wrapper);
wrapper._sink = null;
};
}
subscription = listen(onData, onError: onError, onDone: onDone);
timer = zone.createTimer(timeLimit, () => {
timeout();
return default;
});
};
Future onCancel() {
timer.cancel();
Future result = subscription.cancel();
subscription = null;
return result;
}
controller = isBroadcast
? (_StreamControllerBase<T>) new _SyncBroadcastStreamController<T>(() => onListen(), () => onCancel())
: new _SyncStreamController<T>(() => onListen(), () => {
// Don't null the timer, onCancel may call cancel again.
timer.cancel();
subscription.pause();
}, () => {
subscription.resume();
timer = zone.createTimer(timeLimit, () => {
timeout();
return default;
});
}, onCancel);
return controller.stream;
}
}
public abstract class StreamSubscription<T> {
public abstract Future cancel();
public abstract void onData(Action<T> handleData);
public abstract void onError(Action<object, string> action);
public abstract void onDone(Action handleDone);
public abstract void pause(Future resumeSignal = null);
public abstract void resume();
public virtual bool isPaused { get; }
public abstract Future<E> asFuture<E>(E futureValue);
}
public abstract class EventSink<T> : Sink<T> {
// public abstract void add(T evt);
public abstract void addError(object error, string stackTrace);
// void close();
}
// /** [Stream] wrapper that only exposes the [Stream] interface. */
public class StreamView<T> : Stream<T> {
readonly Stream<T> _stream;
public StreamView(Stream<T> stream) : base() {
_stream = stream;
}
public override bool isBroadcast {
get { return _stream.isBroadcast; }
}
public override Stream<T> asBroadcastStream(Action<StreamSubscription<T>> onListen = null,
Action<StreamSubscription<T>> onCancel = null)
=>
_stream.asBroadcastStream(onListen: onListen, onCancel: onCancel);
public override StreamSubscription<T> listen(Action<T> onData, Action<object, string> onError = null,
Action onDone = null, bool cancelOnError = false) {
return _stream.listen(onData,
onError: onError, onDone: onDone, cancelOnError: cancelOnError);
}
}
public interface StreamConsumer<S> {
Future addStream(Stream<S> stream);
Future close();
}
public abstract class StreamSink<S> : EventSink<S>, StreamConsumer<S> {
// Future close();
public virtual Future done { get; }
public virtual Future addStream(Stream<S> stream) {
throw new System.NotImplementedException();
}
// public Future closeConsumer() {
// throw new System.NotImplementedException();
// }
}
public abstract class StreamTransformer<S, T> {
// c# does not support change constructor
public static StreamTransformer<S, T> create<S, T>(_async._SubscriptionTransformer<S, T> onListen)
{
return new _StreamSubscriptionTransformer<S, T>(onListen);
}
public static StreamTransformer<S, T> fromHandlers(
_stream._TransformDataHandler<S, T> handleData = null,
_stream._TransformErrorHandler<T> handleError = null,
_stream._TransformDoneHandler<T> handleDone = null) {
return new _StreamHandlerTransformer<S, T>(handleData, handleError, handleDone);
}
// @Since("2.1")
public static StreamTransformer<S, T> fromBind(Func<Stream<S>, Stream<T>> bind) {
return new _StreamBindTransformer<S, T>(bind);
}
public static StreamTransformer<TS, TT> castFrom<SS, ST, TS, TT>(
StreamTransformer<SS, ST> source) {
return new CastStreamTransformer<SS, ST, TS, TT>(source);
}
public abstract Stream<T> bind(Stream<S> stream);
public abstract StreamTransformer<RS, RT> cast<RS, RT>();
}
public abstract class StreamTransformerBase<S, T> : StreamTransformer<S, T> {
public StreamTransformerBase() {
}
public override StreamTransformer<RS, RT> cast<RS, RT>() =>
StreamTransformer<RS, RT>.castFrom<S, T, RS, RT>(this);
}
public abstract class StreamIterator<T> {
/** Create a [StreamIterator] on [stream]. */
public static StreamIterator<T> Create(Stream<T> stream)
// TODO(lrn): use redirecting factory constructor when type
// arguments are supported.
=>
new _StreamIterator<T>(stream);
public abstract Future<bool> moveNext();
T current { get; }
public abstract Future cancel();
}
internal class _ControllerEventSinkWrapper<T> : EventSink<T> {
internal EventSink<T> _sink;
internal _ControllerEventSinkWrapper(EventSink<T> _sink) {
this._sink = _sink;
}
public override void add(T data) {
_sink.add(data);
}
public override void addError(object error, string stackTrace) {
_sink.addError(error, stackTrace);
}
public override Future close() {
_sink.close();
return Future._nullFuture;
}
}
}

Просмотреть файл

@ -0,0 +1,3 @@
fileFormatVersion: 2
guid: b09d9a1e8bd34f36ba6ed51a870f4bef
timeCreated: 1628672859

Просмотреть файл

@ -0,0 +1,755 @@
using System;
using Unity.UIWidgets.foundation;
namespace Unity.UIWidgets.async {
static partial class _stream {
public delegate void ControllerCallback();
public delegate Future ControllerCancelCallback();
public delegate void _NotificationHandler();
public static void _runGuarded(_NotificationHandler notificationHandler) {
if (notificationHandler == null) return;
try {
notificationHandler();
}
catch (Exception e) {
Zone.current.handleUncaughtError(e);
}
}
}
public interface IStreamController<T> {
Stream<T> stream { get; }
_stream.ControllerCallback onListen { get; set; }
// void onListen(void onListenHandler());
_stream.ControllerCallback onPause { get; set; }
// void set onPause(void onPauseHandler());
_stream.ControllerCallback onResume { get; set; }
// void set onResume(void onResumeHandler());
_stream.ControllerCancelCallback onCancel { get; set; }
// void set onCancel(onCancelHandler());
StreamSink<T> sink { get; }
bool isClosed { get; }
bool isPaused { get; }
/** Whether there is a subscriber on the [Stream]. */
bool hasListener { get; }
// public abstract void add(T evt);
//
// public abstract void addError(object error, string stackTrace);
Future close();
Future addStream(Stream<T> source, bool? cancelOnError = false);
void add(T evt);
void addError(object error, string stackTrace);
Future done { get; }
}
public abstract class StreamController<T> : StreamSink<T>, IStreamController<T> {
/** The stream that this controller is controlling. */
public virtual Stream<T> stream { get; }
public static StreamController<T> create(
_stream.ControllerCallback onListen = null,
_stream.ControllerCallback onPause = null,
_stream.ControllerCallback onResume = null,
_stream.ControllerCancelCallback onCancel = null,
// Action onListen = null,
// Action onPause = null,
// Action onResume = null,
// Action onCancel = null,
bool sync = false) {
return sync
? (StreamController<T>) new _SyncStreamController<T>(onListen, onPause, onResume, onCancel)
: new _AsyncStreamController<T>(onListen, onPause, onResume, onCancel);
}
public static StreamController<T> broadcast(
Action onListen = null, Action onCancel = null, bool sync = false) {
return sync
? (StreamController<T>) new _SyncBroadcastStreamController<T>(() => onListen?.Invoke(), onCancel)
: new _AsyncBroadcastStreamController<T>(() => onListen?.Invoke(), () => {
onCancel?.Invoke();
return Future._nullFuture;
});
}
public virtual _stream.ControllerCallback onListen { get; set; }
// void onListen(void onListenHandler());
public virtual _stream.ControllerCallback onPause { get; set; }
// void set onPause(void onPauseHandler());
public virtual _stream.ControllerCallback onResume { get; set; }
// void set onResume(void onResumeHandler());
public virtual _stream.ControllerCancelCallback onCancel { get; set; }
// void set onCancel(onCancelHandler());
public virtual StreamSink<T> sink { get; }
public virtual bool isClosed { get; }
public virtual bool isPaused { get; }
/** Whether there is a subscriber on the [Stream]. */
public virtual bool hasListener { get; }
// public abstract void add(T evt);
//
// public abstract void addError(object error, string stackTrace);
public abstract override Future close();
public abstract Future addStream(Stream<T> source, bool? cancelOnError = false);
}
public interface SynchronousStreamController<T> {
//: StreamController<T> {
// public abstract void add(T data);
// public abstract void addError(object error, string stackTrace);
// public abstract Future close();
}
interface _StreamControllerLifecycle<T> {
StreamSubscription<T> _subscribe(
Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError);
void _recordPause(StreamSubscription<T> subscription);
void _recordResume(StreamSubscription<T> subscription);
Future _recordCancel(StreamSubscription<T> subscription);
}
//
// // Base type for implementations of stream controllers.
abstract class _StreamControllerBase<T>
:
StreamController<T>,
_StreamControllerLifecycle<T>,
_EventSink<T>,
_EventDispatch<T> {
public abstract StreamSubscription<T> _subscribe(Action<T> onData, Action<object, string> onError,
Action onDone, bool cancelOnError);
public virtual void _recordPause(StreamSubscription<T> subscription) {
}
public virtual void _recordResume(StreamSubscription<T> subscription) {
}
public virtual Future _recordCancel(StreamSubscription<T> subscription) => null;
public abstract void _add(T data);
public abstract void _addError(object error, string stackTrace);
public abstract void _close();
public abstract void _sendData(T data);
public abstract void _sendError(object error, string stackTrace);
public abstract void _sendDone();
}
abstract class _StreamController<T> : _StreamControllerBase<T> {
/** The controller is in its initial state with no subscription. */
internal const int _STATE_INITIAL = 0;
internal const int _STATE_SUBSCRIBED = 1;
/** The subscription is canceled. */
internal const int _STATE_CANCELED = 2;
/** Mask for the subscription state. */
internal const int _STATE_SUBSCRIPTION_MASK = 3;
// The following state relate to the controller, not the subscription.
// If closed, adding more events is not allowed.
// If executing an [addStream], new events are not allowed either, but will
// be added by the stream.
internal const int _STATE_CLOSED = 4;
internal const int _STATE_ADDSTREAM = 8;
// @pragma("vm:entry-point")
object _varData;
/** Current state of the controller. */
// @pragma("vm:entry-point")
protected int _state = _STATE_INITIAL;
// TODO(lrn): Could this be stored in the varData field too, if it's not
// accessed until the call to "close"? Then we need to special case if it's
// accessed earlier, or if close is called before subscribing.
_Future _doneFuture;
public override _stream.ControllerCallback onListen { get; set; }
public override _stream.ControllerCallback onPause { get; set; }
public override _stream.ControllerCallback onResume { get; set; }
public override _stream.ControllerCancelCallback onCancel { get; set; }
internal _StreamController(_stream.ControllerCallback onListen, _stream.ControllerCallback onPause,
_stream.ControllerCallback onResume, _stream.ControllerCancelCallback onCancel) {
this.onListen = onListen;
this.onPause = onPause;
this.onResume = onResume;
this.onCancel = onCancel;
}
// Return a new stream every time. The streams are equal, but not identical.
public override Stream<T> stream {
get => new _ControllerStream<T>(this);
}
public override StreamSink<T> sink {
get => new _StreamSinkWrapper<T>(this);
}
bool _isCanceled {
get => (_state & _STATE_CANCELED) != 0;
}
/** Whether there is an active listener. */
public override bool hasListener {
get => (_state & _STATE_SUBSCRIBED) != 0;
}
/** Whether there has not been a listener yet. */
bool _isInitialState {
get =>
(_state & _STATE_SUBSCRIPTION_MASK) == _STATE_INITIAL;
}
public override bool isClosed {
get => (_state & _STATE_CLOSED) != 0;
}
public override bool isPaused {
get =>
hasListener ? _subscription._isInputPaused : !_isCanceled;
}
bool _isAddingStream {
get => (_state & _STATE_ADDSTREAM) != 0;
}
/** New events may not be added after close, or during addStream. */
internal bool _mayAddEvent {
get => (_state < _STATE_CLOSED);
}
// Returns the pending events.
// Pending events are events added before a subscription exists.
// They are added to the subscription when it is created.
// Pending events, if any, are kept in the _varData field until the
// stream is listened to.
// While adding a stream, pending events are moved into the
// state object to allow the state object to use the _varData field.
_PendingEvents<T> _pendingEvents {
get {
D.assert(_isInitialState);
if (!_isAddingStream) {
return (_PendingEvents<T>) _varData;
}
_StreamControllerAddStreamState<T> state = (_StreamControllerAddStreamState<T>) _varData;
return (_PendingEvents<T>) state.varData;
}
}
// Returns the pending events, and creates the object if necessary.
_StreamImplEvents<T> _ensurePendingEvents() {
D.assert(_isInitialState);
if (!_isAddingStream) {
_varData = _varData ?? new _StreamImplEvents<T>();
return (_StreamImplEvents<T>) _varData;
}
_StreamControllerAddStreamState<T> state = (_StreamControllerAddStreamState<T>) _varData;
if (state.varData == null) state.varData = new _StreamImplEvents<T>();
return (_StreamImplEvents<T>) state.varData;
}
// Get the current subscription.
// If we are adding a stream, the subscription is moved into the state
// object to allow the state object to use the _varData field.
protected _ControllerSubscription<T> _subscription {
get {
D.assert(hasListener);
if (_isAddingStream) {
_StreamControllerAddStreamState<T> addState = (_StreamControllerAddStreamState<T>) _varData;
return (_ControllerSubscription<T>) addState.varData;
}
return (_ControllerSubscription<T>) _varData;
}
}
protected Exception _badEventState() {
if (isClosed) {
return new Exception("Cannot add event after closing");
}
D.assert(_isAddingStream);
return new Exception("Cannot add event while adding a stream");
}
// StreamSink interface.
public override Future addStream(Stream<T> source, bool? cancelOnError = false) {
if (!_mayAddEvent) throw _badEventState();
if (_isCanceled) return _Future.immediate(FutureOr.nil);
_StreamControllerAddStreamState<T> addState =
new _StreamControllerAddStreamState<T>(
this, _varData, source, cancelOnError ?? false);
_varData = addState;
_state |= _STATE_ADDSTREAM;
return addState.addStreamFuture;
}
public override Future done {
get { return _ensureDoneFuture(); }
}
Future _ensureDoneFuture() {
_doneFuture = _doneFuture ?? (_isCanceled ? Future._nullFuture : new _Future());
return _doneFuture;
}
public override void add(T value) {
if (!_mayAddEvent) throw _badEventState();
_add(value);
}
public override void addError(object error, string stackTrace) {
// ArgumentError.checkNotNull(error, "error");
if (!_mayAddEvent) throw _badEventState();
error = _async._nonNullError(error);
AsyncError replacement = Zone.current.errorCallback((Exception) error);
if (replacement != null) {
error = _async._nonNullError(replacement);
// stackTrace = replacement.stackTrace;
}
stackTrace = stackTrace ?? AsyncError.defaultStackTrace(error);
_addError(error, stackTrace);
}
public override Future close() {
if (isClosed) {
return _ensureDoneFuture();
}
if (!_mayAddEvent) throw _badEventState();
_closeUnchecked();
return _ensureDoneFuture();
}
internal void _closeUnchecked() {
_state |= _STATE_CLOSED;
if (hasListener) {
_sendDone();
}
else if (_isInitialState) {
_ensurePendingEvents().add(new _DelayedDone<T>());
}
}
// EventSink interface. Used by the [addStream] events.
// Add data event, used both by the [addStream] events and by [add].
public override void _add(T value) {
if (hasListener) {
_sendData(value);
}
else if (_isInitialState) {
_ensurePendingEvents().add(new _DelayedData<T>(value));
}
}
public override void _addError(object error, string stackTrace) {
if (hasListener) {
_sendError(error, stackTrace);
}
else if (_isInitialState) {
_ensurePendingEvents().add(new _DelayedError<T>((Exception) error, stackTrace));
}
}
public override void _close() {
// End of addStream stream.
D.assert(_isAddingStream);
_StreamControllerAddStreamState<T> addState = (_StreamControllerAddStreamState<T>) _varData;
_varData = addState.varData;
_state &= ~_STATE_ADDSTREAM;
addState.complete();
}
// _StreamControllerLifeCycle interface
public override StreamSubscription<T> _subscribe(
Action<T> onData,
Action<object, string> onError,
Action onDone, bool cancelOnError) {
if (!_isInitialState) {
throw new Exception("Stream has already been listened to.");
}
_ControllerSubscription<T> subscription = new _ControllerSubscription<T>(
this, onData, onError, onDone, cancelOnError);
_PendingEvents<T> pendingEvents = _pendingEvents;
_state |= _STATE_SUBSCRIBED;
if (_isAddingStream) {
_StreamControllerAddStreamState<T> addState = (_StreamControllerAddStreamState<T>) _varData;
addState.varData = subscription;
addState.resume();
}
else {
_varData = subscription;
}
subscription._setPendingEvents(pendingEvents);
subscription._guardCallback(() => { _stream._runGuarded(() => onListen?.Invoke()); });
return subscription;
}
public override Future _recordCancel(StreamSubscription<T> subscription) {
// When we cancel, we first cancel any stream being added,
// Then we call `onCancel`, and finally the _doneFuture is completed.
// If either of addStream's cancel or `onCancel` returns a future,
// we wait for it before continuing.
// Any error during this process ends up in the returned future.
// If more errors happen, we act as if it happens inside nested try/finallys
// or whenComplete calls, and only the last error ends up in the
// returned future.
Future result = null;
if (_isAddingStream) {
_StreamControllerAddStreamState<T> addState = (_StreamControllerAddStreamState<T>) _varData;
result = addState.cancel();
}
_varData = null;
_state =
(_state & ~(_STATE_SUBSCRIBED | _STATE_ADDSTREAM)) | _STATE_CANCELED;
if (onCancel != null) {
if (result == null) {
// Only introduce a future if one is needed.
// If _onCancel returns null, no future is needed.
try {
result = onCancel();
}
catch (Exception e) {
// Return the error in the returned future.
// Complete it asynchronously, so there is time for a listener
// to handle the error.
var f = new _Future();
f._asyncCompleteError(e);
result = f;
}
}
else {
// Simpler case when we already know that we will return a future.
result = result.whenComplete(() => onCancel());
}
}
void complete() {
if (_doneFuture != null && _doneFuture._mayComplete) {
_doneFuture._asyncComplete(FutureOr.nil);
}
}
if (result != null) {
result = result.whenComplete(complete);
}
else {
complete();
}
return result;
}
public override void _recordPause(StreamSubscription<T> subscription) {
if (_isAddingStream) {
_StreamControllerAddStreamState<T> addState = (_StreamControllerAddStreamState<T>) _varData;
addState.pause();
}
_stream._runGuarded(() => onPause?.Invoke());
}
public override void _recordResume(StreamSubscription<T> subscription) {
if (_isAddingStream) {
_StreamControllerAddStreamState<T> addState = (_StreamControllerAddStreamState<T>) _varData;
addState.resume();
}
_stream._runGuarded(() => onResume?.Invoke());
}
}
//
abstract class _SyncStreamControllerDispatch<T>
: _StreamController<T>, SynchronousStreamController<T> {
internal virtual int _state { get; set; }
public override void _sendData(T data) {
_subscription._add(data);
}
public override void _sendError(object error, string stackTrace) {
_subscription._addError(error, stackTrace);
}
public override void _sendDone() {
_subscription._close();
}
protected _SyncStreamControllerDispatch(_stream.ControllerCallback onListen, _stream.ControllerCallback onPause,
_stream.ControllerCallback onResume, _stream.ControllerCancelCallback onCancel) : base(onListen, onPause,
onResume, onCancel) {
}
}
abstract class _AsyncStreamControllerDispatch<T>
: _StreamController<T> {
public override void _sendData(T data) {
_subscription._addPending(new _DelayedData<T>(data));
}
public override void _sendError(object error, string stackTrace) {
_subscription._addPending(new _DelayedError<T>((Exception) error, stackTrace));
}
public override void _sendDone() {
_subscription._addPending(new _DelayedDone<T>());
}
protected _AsyncStreamControllerDispatch(_stream.ControllerCallback onListen,
_stream.ControllerCallback onPause, _stream.ControllerCallback onResume,
_stream.ControllerCancelCallback onCancel) : base(onListen, onPause, onResume, onCancel) {
}
}
// TODO(lrn): Use common superclass for callback-controllers when VM supports
// constructors in mixin superclasses.
class _AsyncStreamController<T> : _AsyncStreamControllerDispatch<T> {
// public override void close() {
// throw new NotImplementedException();
// }
public _AsyncStreamController(_stream.ControllerCallback onListen, _stream.ControllerCallback onPause,
_stream.ControllerCallback onResume, _stream.ControllerCancelCallback onCancel) : base(onListen, onPause,
onResume, onCancel) {
}
}
class _SyncStreamController<T> : _SyncStreamControllerDispatch<T> {
public _SyncStreamController(_stream.ControllerCallback onListen, _stream.ControllerCallback onPause,
_stream.ControllerCallback onResume, _stream.ControllerCancelCallback onCancel) : base(onListen, onPause,
onResume, onCancel) {
}
}
class _ControllerStream<T> : _StreamImpl<T>, IEquatable<_ControllerStream<T>> {
_StreamControllerLifecycle<T> _controller;
internal _ControllerStream(_StreamControllerLifecycle<T> _controller) {
this._controller = _controller;
}
internal override StreamSubscription<T> _createSubscription(
Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError) =>
_controller._subscribe(onData, onError, onDone, cancelOnError);
// Override == and hashCode so that new streams returned by the same
// controller are considered equal. The controller returns a new stream
// each time it's queried, but doesn't have to cache the result.
// int hashCode {
// get { return _controller.GetHashCode() ^ 0x35323532; }
// }
// bool operator ==(object other) {
// if (identical(this, other)) return true;
// return other is _ControllerStream &&
// identical(other._controller, this._controller);
// }
public bool Equals(_ControllerStream<T> other) {
if (ReferenceEquals(null, other)) {
return false;
}
if (ReferenceEquals(this, other)) {
return true;
}
return Equals(_controller, other._controller);
}
public override bool Equals(object obj) {
if (ReferenceEquals(null, obj)) {
return false;
}
if (ReferenceEquals(this, obj)) {
return true;
}
if (obj.GetType() != GetType()) {
return false;
}
return Equals((_ControllerStream<T>) obj);
}
public override int GetHashCode() {
return _controller.GetHashCode() ^ 0x35323532;
}
}
class _ControllerSubscription<T> : _BufferingStreamSubscription<T> {
internal readonly _StreamControllerLifecycle<T> _controller;
internal _ControllerSubscription(
_StreamControllerLifecycle<T> _controller,
Action<T> onData,
Action<object, string> onError,
Action onDone, bool cancelOnError
)
: base(onData, onError, onDone, cancelOnError) {
this._controller = _controller;
}
protected override Future _onCancel() {
return _controller._recordCancel(this);
}
protected override void _onPause() {
_controller._recordPause(this);
}
protected override void _onResume() {
_controller._recordResume(this);
}
}
/** A class that exposes only the [StreamSink] interface of an object. */
class _StreamSinkWrapper<T> : StreamSink<T> {
readonly StreamController<T> _target;
internal _StreamSinkWrapper(StreamController<T> _target) {
this._target = _target;
}
public override void add(T data) {
_target.add(data);
}
public override void addError(object error, string stackTrace) {
_target.addError(error, stackTrace);
}
public override Future close() => _target.close();
public override Future addStream(Stream<T> source) => _target.addStream(source);
public override Future done {
get { return _target.done; }
}
}
class _AddStreamState<T> {
// [_Future] returned by call to addStream.
internal readonly _Future addStreamFuture;
// Subscription on stream argument to addStream.
internal readonly StreamSubscription<T> addSubscription;
internal _AddStreamState(
_EventSink<T> controller, Stream<T> source, bool cancelOnError) {
addStreamFuture = new _Future();
addSubscription = source.listen(controller._add,
onError: cancelOnError
? makeErrorHandler(controller)
: controller._addError,
onDone: controller._close,
cancelOnError: cancelOnError);
}
public static Action<object, string> makeErrorHandler(_EventSink<T> controller) {
return (object e, string s) => {
controller._addError(e, s);
controller._close();
};
}
public void pause() {
addSubscription.pause();
}
public void resume() {
addSubscription.resume();
}
public Future cancel() {
var cancel = addSubscription.cancel();
if (cancel == null) {
addStreamFuture._asyncComplete(FutureOr.nil);
return null;
}
return cancel.whenComplete(() => { addStreamFuture._asyncComplete(FutureOr.nil); });
}
public void complete() {
addStreamFuture._asyncComplete(FutureOr.nil);
}
}
class _StreamControllerAddStreamState<T> : _AddStreamState<T> {
// The subscription or pending data of a _StreamController.
// Stored here because we reuse the `_varData` field in the _StreamController
// to store this state object.
public object varData;
internal _StreamControllerAddStreamState(_StreamController<T> controller, object varData,
Stream<T> source, bool cancelOnError)
: base(controller, source, cancelOnError) {
if (controller.isPaused) {
addSubscription.pause();
}
}
}
}

Просмотреть файл

@ -0,0 +1,3 @@
fileFormatVersion: 2
guid: 6a5b6c88a4f44674921771bc51facbfc
timeCreated: 1628753099

Разница между файлами не показана из-за своего большого размера Загрузить разницу

Просмотреть файл

@ -0,0 +1,3 @@
fileFormatVersion: 2
guid: 8da42c1ef952401abf68f552618101a4
timeCreated: 1628681636

Просмотреть файл

@ -0,0 +1,84 @@
using System;
namespace Unity.UIWidgets.async {
/**
* Stream.multi is not supported by flutter 1.17.5 yet, but it might be useful for developers. To address this issue, we put all the necessary codes for this feature
* in this single file.
*
* [TODO] remove this code when we eventually upgrade UIWidgets to above 2.0
*/
public class StreamMultiUtils<T>
{
public static Stream<T> multi(Action<MultiStreamController<T>> onListen, bool isBroadcast = false) {
return new _MultiStream<T>(onListen, isBroadcast);
}
}
public interface MultiStreamController<T> : IStreamController<T> {
void addSync(T value);
void addErrorSync(object error, string trackStack);
void closeSync();
}
class _MultiStream<T> : Stream<T> {
public override bool isBroadcast {
get {
return _isBroadcast;
}
}
bool _isBroadcast;
/// The callback called for each listen.
public readonly Action<MultiStreamController<T>> _onListen;
public _MultiStream(Action<MultiStreamController<T>> _onListen, bool isBroadcast) {
_isBroadcast = isBroadcast;
this._onListen = _onListen;
}
public override StreamSubscription<T> listen(Action<T> onData, Action<object, string> onError = null,
Action onDone = null, bool cancelOnError = false) {
var controller = new _MultiStreamController<T>();
controller.onListen = () => {
_onListen(controller);
};
return controller._subscribe(
onData, onError, onDone, cancelOnError);
}
}
class _MultiStreamController<T> : _AsyncStreamController<T>, MultiStreamController<T> {
public _MultiStreamController() : base(null, null, null, null)
{
}
public void addSync(T value) {
if (!_mayAddEvent) throw _badEventState();
if (hasListener) _subscription._add(value);
}
public void addErrorSync(object error, string trackStack) {
if (!_mayAddEvent) throw _badEventState();
if (hasListener) {
_subscription._addError(error, trackStack ?? "");
}
}
public void closeSync() {
if (isClosed) return;
if (!_mayAddEvent) throw _badEventState();
_state |= _StreamController<T>._STATE_CLOSED;
if (hasListener) _subscription._close();
}
public override Stream<T> stream {
get {
throw new Exception("Not available");
}
}
}
}

Просмотреть файл

@ -0,0 +1,3 @@
fileFormatVersion: 2
guid: 5bf5bf2fe1c7407db0a476f2bbcd01f4
timeCreated: 1629701152

Просмотреть файл

@ -0,0 +1,556 @@
using System;
using System.Collections.Generic;
namespace Unity.UIWidgets.async {
public static partial class _stream {
/** Runs user code and takes actions depending on success or failure. */
internal static void _runUserCode<T>(
Func<T> userCode, Action<T> onSuccess, Action<Exception> onError) {
try {
onSuccess(userCode());
}
catch (Exception e) {
AsyncError replacement = Zone.current.errorCallback(e);
if (replacement == null) {
onError(e);
}
else {
var error = async_._nonNullError(replacement);
onError(error);
}
}
}
internal static void _cancelAndErrorWithReplacement<T>(StreamSubscription<T> subscription,
_Future future, Exception error) {
AsyncError replacement = Zone.current.errorCallback(error);
if (replacement != null) {
error = (Exception) _async._nonNullError(replacement);
}
_cancelAndError(subscription, future, error);
}
internal delegate void _ErrorCallback(Exception error);
internal static _ErrorCallback _cancelAndErrorClosure<T>(
StreamSubscription<T> subscription, _Future future) {
return (error) => { _cancelAndError(subscription, future, error); };
}
internal static void _cancelAndValue<T>(StreamSubscription<T> subscription, _Future future, object value) {
var cancelFuture = subscription.cancel();
if (cancelFuture != null && !Equals(cancelFuture, Future._nullFuture)) {
cancelFuture.whenComplete(() => future._complete(FutureOr.value(value)));
}
else {
future._complete(FutureOr.value(value));
}
}
static void _cancelAndError<T>(StreamSubscription<T> subscription, _Future future, Exception error
) {
var cancelFuture = subscription.cancel();
if (cancelFuture != null && !Equals(cancelFuture, Future._nullFuture)) {
cancelFuture.whenComplete(() => future._completeError(error));
}
else {
future._completeError(error);
}
}
internal static void _cancelAndValue<T>(StreamSubscription<T> subscription, _Future future, FutureOr value) {
var cancelFuture = subscription.cancel();
if (cancelFuture != null && !Equals(cancelFuture, Future._nullFuture)) {
cancelFuture.whenComplete(() => future._complete(value));
}
else {
future._complete(value);
}
}
internal delegate bool _Predicate<T>(T value);
//
internal static void _addErrorWithReplacement<T>(_EventSink<T> sink, Exception error, string stackTrace) {
AsyncError replacement = Zone.current.errorCallback(error);
if (replacement != null) {
error = async_._nonNullError(replacement);
stackTrace = replacement.StackTrace;
}
sink._addError(error, stackTrace);
}
internal delegate T _Transformation<S, T>(S value);
internal delegate bool _ErrorTest(Exception error);
internal delegate bool _Equality<T>(T a, T b);
}
abstract class _ForwardingStream<S, T> : Stream<T> {
internal readonly Stream<S> _source;
internal _ForwardingStream(Stream<S> _source) {
this._source = _source;
}
public override bool isBroadcast {
get { return _source.isBroadcast; }
}
public override StreamSubscription<T> listen(Action<T> onData, Action<object, string> onError = null,
Action onDone = null, bool cancelOnError = false) {
cancelOnError = Equals(true, cancelOnError);
return _createSubscription(onData, onError, onDone, cancelOnError);
}
internal virtual StreamSubscription<T> _createSubscription(
Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError) {
return new _ForwardingStreamSubscription<S, T>(
this, onData, onError, onDone, cancelOnError);
}
// Override the following methods in subclasses to change the behavior.
internal virtual void _handleData(S data, _EventSink<T> sink) {
sink._add((T) (object) data);
}
internal virtual void _handleError(object error, _EventSink<T> sink) {
string stackTrace = error is Exception ? ((Exception) error).StackTrace : "";
sink._addError(error, stackTrace);
}
internal virtual void _handleDone(_EventSink<T> sink) {
sink._close();
}
}
//
// /**
// * Abstract superclass for subscriptions that forward to other subscriptions.
// */
class _ForwardingStreamSubscription<S, T>
: _BufferingStreamSubscription<T> {
readonly _ForwardingStream<S, T> _stream;
StreamSubscription<S> _subscription;
internal _ForwardingStreamSubscription(_ForwardingStream<S, T> _stream,
Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError
)
: base(onData, onError, onDone, cancelOnError) {
this._stream = _stream;
_subscription = _stream._source
.listen(_handleData, onError: _handleError, onDone: _handleDone);
}
// _StreamSink interface.
// Transformers sending more than one event have no way to know if the stream
// is canceled or closed after the first, so we just ignore remaining events.
public override void _add(T data) {
if (_isClosed) return;
base._add(data);
}
public override void _addError(object error, string stackTrace) {
if (_isClosed) return;
base._addError(error, stackTrace);
}
// StreamSubscription callbacks.
protected override void _onPause() {
if (_subscription == null) return;
_subscription.pause();
}
protected override void _onResume() {
if (_subscription == null) return;
_subscription.resume();
}
protected override Future _onCancel() {
if (_subscription != null) {
StreamSubscription<S> subscription = _subscription;
_subscription = null;
return subscription.cancel();
}
return null;
}
// Methods used as listener on source subscription.
void _handleData(S data) {
_stream._handleData(data, this);
}
void _handleError(object error, string stackTrace) {
_stream._handleError((Exception) error, this);
}
void _handleDone() {
_stream._handleDone(this);
}
}
//
// // -------------------------------------------------------------------
// // Stream transformers used by the default Stream implementation.
// // -------------------------------------------------------------------
//
//
class _WhereStream<T> : _ForwardingStream<T, T> {
readonly _stream._Predicate<T> _test;
internal _WhereStream(Stream<T> source, Func<T, bool> test) : base(source) {
_test = d => test(d);
}
internal override void _handleData(T inputEvent, _EventSink<T> sink) {
bool satisfies;
try {
satisfies = _test(inputEvent);
}
catch (Exception e) {
_stream._addErrorWithReplacement(sink, e, e.StackTrace);
return;
}
if (satisfies) {
sink._add(inputEvent);
}
}
}
//
//
// /**
// * A stream pipe that converts data events before passing them on.
// */
class _MapStream<S, T> : _ForwardingStream<S, T> {
readonly _stream._Transformation<S, T> _transform;
internal _MapStream(Stream<S> source, Func<S, T> transform) : base(source) {
_transform = d => transform(d);
}
internal override void _handleData(S inputEvent, _EventSink<T> sink) {
T outputEvent;
try {
outputEvent = _transform(inputEvent);
}
catch (Exception e) {
_stream._addErrorWithReplacement(sink, e, e.StackTrace);
return;
}
sink._add(outputEvent);
}
}
//
// /**
// * A stream pipe that converts data events before passing them on.
// */
class _ExpandStream<S, T> : _ForwardingStream<S, T> {
readonly _stream._Transformation<S, IEnumerable<T>> _expand;
internal _ExpandStream(Stream<S> source, _stream._Transformation<S, IEnumerable<T>> expand) : base(source) {
_expand = expand;
}
internal override void _handleData(S inputEvent, _EventSink<T> sink) {
try {
foreach (T value in _expand(inputEvent)) {
sink._add(value);
}
}
catch (Exception e) {
// If either _expand or iterating the generated iterator throws,
// we abort the iteration.
_stream._addErrorWithReplacement(sink, e, e.StackTrace);
}
}
}
//
//
// /**
// * A stream pipe that converts or disposes error events
// * before passing them on.
// */
class _HandleErrorStream<T> : _ForwardingStream<T, T> {
readonly ZoneBinaryCallback _transform;
readonly _stream._ErrorTest _test;
internal _HandleErrorStream(Stream<T> source, ZoneBinaryCallback onError, _stream._ErrorTest test) :
base(source) {
_transform = onError;
_test = test;
}
internal override void _handleError(object error, _EventSink<T> sink) {
bool matches = true;
if (_test != null) {
try {
matches = _test((Exception) error);
}
catch (Exception e) {
_stream._addErrorWithReplacement(sink, e, e.StackTrace);
return;
}
}
string stackTrace = error is Exception ? ((Exception) error).StackTrace : "";
if (matches) {
try {
_async._invokeErrorHandler(_transform, error, stackTrace);
}
catch (Exception e) {
if (Equals(e, error)) {
sink._addError(error, stackTrace);
}
else {
_stream._addErrorWithReplacement(sink, e, e.StackTrace);
}
return;
}
}
else {
sink._addError(error, stackTrace);
}
}
}
//
class _TakeStream<T> : _ForwardingStream<T, T> {
readonly int _count;
internal _TakeStream(Stream<T> source, int count) : base(source) {
_count = count;
// This test is done early to avoid handling an async error
// in the _handleData method.
// ArgumentError.checkNotNull(count, "count");
}
internal override StreamSubscription<T> _createSubscription(Action<T> onData, Action<object, string> onError,
Action onDone, bool cancelOnError) {
if (_count == 0) {
_source.listen(null).cancel();
return new _DoneStreamSubscription<T>(() => onDone());
}
return new _StateStreamSubscription<T>(
this, onData, onError, onDone, cancelOnError, _count);
}
internal override void _handleData(T inputEvent, _EventSink<T> sink) {
_StateStreamSubscription<T> subscription = (_StateStreamSubscription<T>) sink;
int count = subscription._count;
if (count > 0) {
sink._add(inputEvent);
count -= 1;
subscription._count = count;
if (count == 0) {
// Closing also unsubscribes all subscribers, which unsubscribes
// this from source.
sink._close();
}
}
}
}
//
// /**
// * A [_ForwardingStreamSubscription] with one extra state field.
// *
// * Use by several different classes, storing an integer, bool or general.
// */
class _StateStreamSubscription<T> : _ForwardingStreamSubscription<T, T> {
// Raw state field. Typed access provided by getters and setters below.
// siyao: this is used as bool and int, if it was used at the same time, everything would be fxxked up.
object _sharedState;
internal _StateStreamSubscription(
_ForwardingStream<T, T> stream,
Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError, object _sharedState
)
: base(stream, onData, onError, onDone, cancelOnError) {
this._sharedState = _sharedState;
}
internal bool _flag {
get => (bool) _sharedState;
set => _sharedState = value;
}
internal int _count {
get => (int) _sharedState;
set => _sharedState = value;
}
internal object _value {
get => _sharedState;
set => _sharedState = value;
}
}
class _TakeWhileStream<T> : _ForwardingStream<T, T> {
readonly _stream._Predicate<T> _test;
internal _TakeWhileStream(Stream<T> source, _stream._Predicate<T> test)
: base(source) {
_test = test;
}
internal override void _handleData(T inputEvent, _EventSink<T> sink) {
bool satisfies;
try {
satisfies = _test(inputEvent);
}
catch (Exception e) {
_stream._addErrorWithReplacement(sink, e, e.StackTrace);
// The test didn't say true. Didn't say false either, but we stop anyway.
sink._close();
return;
}
if (satisfies) {
sink._add(inputEvent);
}
else {
sink._close();
}
}
}
//
class _SkipStream<T> : _ForwardingStream<T, T> {
readonly int _count;
internal _SkipStream(Stream<T> source, int count)
: base(source) {
_count = count;
// This test is done early to avoid handling an async error
// in the _handleData method.
// ArgumentError.checkNotNull(count, "count");
// RangeError.checkNotNegative(count, "count");
}
internal override StreamSubscription<T> _createSubscription(
Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError) {
return new _StateStreamSubscription<T>(
this, onData, onError, onDone, cancelOnError, _count);
}
internal void _handleDone(T inputEvent, _EventSink<T> sink) {
_StateStreamSubscription<T> subscription = (_StateStreamSubscription<T>) sink;
int count = subscription._count;
if (count > 0) {
subscription._count = count - 1;
return;
}
sink._add(inputEvent);
}
}
class _SkipWhileStream<T> : _ForwardingStream<T, T> {
readonly _stream._Predicate<T> _test;
internal _SkipWhileStream(Stream<T> source, _stream._Predicate<T> test) : base(source) {
_test = test;
}
internal override StreamSubscription<T> _createSubscription(
Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError
) {
return new _StateStreamSubscription<T>(
this, onData, onError, onDone, cancelOnError, false);
}
internal override void _handleData(T inputEvent, _EventSink<T> sink) {
_StateStreamSubscription<T> subscription = (_StateStreamSubscription<T>) sink;
bool hasFailed = subscription._flag;
if (hasFailed) {
sink._add(inputEvent);
return;
}
bool satisfies;
try {
satisfies = _test(inputEvent);
}
catch (Exception e) {
_stream._addErrorWithReplacement(sink, e, e.StackTrace);
// A failure to return a boolean is considered "not matching".
subscription._flag = true;
return;
}
if (!satisfies) {
subscription._flag = true;
sink._add(inputEvent);
}
}
}
class _DistinctStream<T> : _ForwardingStream<T, T> {
static readonly object _SENTINEL = new object();
readonly _stream._Equality<T> _equals;
internal _DistinctStream(Stream<T> source, _stream._Equality<T> equals) : base(source) {
_equals = equals;
}
internal override StreamSubscription<T> _createSubscription(Action<T> onData, Action<object, string> onError,
Action onDone, bool cancelOnError) {
return new _StateStreamSubscription<T>(
this, onData, onError, onDone, cancelOnError, _SENTINEL);
}
internal override void _handleData(T inputEvent, _EventSink<T> sink) {
_StateStreamSubscription<T> subscription = (_StateStreamSubscription<T>) sink;
var previous = subscription._value;
if (Equals(previous, _SENTINEL)) {
// First event.
subscription._value = inputEvent;
sink._add(inputEvent);
}
else {
T previousEvent = (T) previous;
bool isEqual;
try {
if (_equals == null) {
isEqual = Equals(previousEvent, inputEvent);
}
else {
isEqual = _equals(previousEvent, inputEvent);
}
}
catch (Exception e) {
_stream._addErrorWithReplacement(sink, e, e.StackTrace);
return;
}
if (!isEqual) {
sink._add(inputEvent);
subscription._value = inputEvent;
}
}
}
}
}

Просмотреть файл

@ -0,0 +1,3 @@
fileFormatVersion: 2
guid: ddf391c6af5c41bb8e27a4a5f0149c65
timeCreated: 1629257330

Просмотреть файл

@ -0,0 +1,304 @@
using System;
namespace Unity.UIWidgets.async {
class _EventSinkWrapper<T> : EventSink<T> {
_EventSink<T> _sink;
internal _EventSinkWrapper(_EventSink<T> _sink) {
this._sink = _sink;
}
public override void add(T data) {
_sink._add(data);
}
public override void addError(object error, string stackTrace) {
_sink._addError(error, stackTrace ?? AsyncError.defaultStackTrace(error));
}
public override Future close() {
_sink._close();
return Future._nullFuture;
}
}
class _SinkTransformerStreamSubscription<S, T>
: _BufferingStreamSubscription<T> {
/// The transformer's input sink.
EventSink<S> _transformerSink;
/// The subscription to the input stream.
StreamSubscription<S> _subscription;
internal _SinkTransformerStreamSubscription(Stream<S> source, _async._SinkMapper<S, T> mapper,
Action<T> onData, Action<object, string> onError, Action onDone, bool cancelOnError)
// We set the adapter's target only when the user is allowed to send data.
: base(onData, onError, onDone, cancelOnError) {
_EventSinkWrapper<T> eventSink = new _EventSinkWrapper<T>(this);
_transformerSink = mapper(eventSink);
_subscription =
source.listen(_handleData, onError: _handleError, onDone: _handleDone);
}
/** Whether this subscription is still subscribed to its source. */
bool _isSubscribed {
get { return _subscription != null; }
}
// _EventSink interface.
public override void _add(T data) {
if (_isClosed) {
throw new Exception("Stream is already closed");
}
base._add(data);
}
public override void _addError(object error, string stackTrace) {
if (_isClosed) {
throw new Exception("Stream is already closed");
}
base._addError(error, stackTrace);
}
public override void _close() {
if (_isClosed) {
throw new Exception("Stream is already closed");
}
base._close();
}
// _BufferingStreamSubscription hooks.
protected override void _onPause() {
if (_isSubscribed) _subscription.pause();
}
protected override void _onResume() {
if (_isSubscribed) _subscription.resume();
}
protected override Future _onCancel() {
if (_isSubscribed) {
StreamSubscription<S> subscription = _subscription;
_subscription = null;
return subscription.cancel();
}
return null;
}
void _handleData(S data) {
try {
_transformerSink.add(data);
}
catch (Exception e) {
_addError(e, e.StackTrace);
}
}
void _handleError(object error, string stackTrace) {
try {
_transformerSink.addError(error, stackTrace);
}
catch (Exception e) {
if (Equals(e, error)) {
_addError(error, stackTrace);
}
else {
_addError(e, e.StackTrace);
}
}
}
void _handleDone() {
try {
_subscription = null;
_transformerSink.close();
}
catch (Exception e) {
_addError(e, e.StackTrace);
}
}
}
class _StreamSinkTransformer<S, T> : StreamTransformerBase<S, T> {
readonly _async._SinkMapper<S, T> _sinkMapper;
public _StreamSinkTransformer(_async._SinkMapper<S, T> _sinkMapper) {
this._sinkMapper = _sinkMapper;
}
public override Stream<T> bind(Stream<S> stream) =>
new _BoundSinkStream<S, T>(stream, _sinkMapper);
}
class _BoundSinkStream<S, T> : Stream<T> {
readonly _async._SinkMapper<S, T> _sinkMapper;
readonly Stream<S> _stream;
public override bool isBroadcast {
get { return _stream.isBroadcast; }
}
internal _BoundSinkStream(Stream<S> _stream, _async._SinkMapper<S, T> _sinkMapper) {
this._stream = _stream;
this._sinkMapper = _sinkMapper;
}
public override StreamSubscription<T> listen(Action<T> onData,
Action<object, string> onError = null, Action onDone = null, bool cancelOnError = default) {
StreamSubscription<T> subscription =
new _SinkTransformerStreamSubscription<S, T>(
_stream, _sinkMapper, onData, onError, onDone, cancelOnError);
return subscription;
}
}
static partial class _stream {
public delegate void _TransformDataHandler<S, T>(S data, EventSink<T> sink);
/// Error-handler coming from [StreamTransformer.fromHandlers].
public delegate void _TransformErrorHandler<T>(
object error, string stackTrace, EventSink<T> sink);
/// Done-handler coming from [StreamTransformer.fromHandlers].
public delegate void _TransformDoneHandler<T>(EventSink<T> sink);
}
class _HandlerEventSink<S, T> : EventSink<S> {
readonly _stream._TransformDataHandler<S, T> _handleData;
readonly _stream._TransformErrorHandler<T> _handleError;
readonly _stream._TransformDoneHandler<T> _handleDone;
/// The output sink where the handlers should send their data into.
EventSink<T> _sink;
internal _HandlerEventSink(
_stream._TransformDataHandler<S, T> _handleData, _stream._TransformErrorHandler<T> _handleError,
_stream._TransformDoneHandler<T> _handleDone, EventSink<T> _sink) {
this._handleData = _handleData;
this._handleError = _handleError;
this._handleDone = _handleDone;
this._sink = _sink;
if (_sink == null) {
throw new Exception("The provided sink must not be null.");
}
}
bool _isClosed {
get { return _sink == null; }
}
public override void add(S data) {
if (_isClosed) {
throw new Exception("Sink is closed");
}
if (_handleData != null) {
_handleData(data, _sink);
}
else {
_sink.add((T)((object)data));
}
}
public override void addError(object error, string stackTrace) {
// ArgumentError.checkNotNull(error, "error");
if (_isClosed) {
throw new Exception("Sink is closed");
}
if (_handleError != null) {
stackTrace = stackTrace ?? AsyncError.defaultStackTrace(error);
_handleError(error, stackTrace, _sink);
}
else {
_sink.addError(error, stackTrace);
}
}
public override Future close() {
if (_isClosed) return Future._nullFuture;
var sink = _sink;
_sink = null;
if (_handleDone != null) {
_handleDone(sink);
}
else {
sink.close();
}
return Future._nullFuture;
}
}
class _StreamHandlerTransformer<S, T> : _StreamSinkTransformer<S, T> {
internal _StreamHandlerTransformer(
_stream._TransformDataHandler<S, T> handleData = null,
_stream._TransformErrorHandler<T> handleError = null,
_stream._TransformDoneHandler<T> handleDone = null)
: base((EventSink<T> outputSink) => {
return new _HandlerEventSink<S, T>(
handleData, handleError, handleDone, outputSink);
}) {
}
public override Stream<T> bind(Stream<S> stream) {
return base.bind(stream);
}
}
class _StreamBindTransformer<S, T> : StreamTransformerBase<S, T> {
readonly Func<Stream<S>, Stream<T>> _bind;
internal _StreamBindTransformer(Func<Stream<S>, Stream<T>> _bind) {
this._bind = _bind;
}
public override Stream<T> bind(Stream<S> stream) => _bind(stream);
}
public partial class _async {
public delegate EventSink<S> _SinkMapper<S, T>(EventSink<T> output);
public delegate StreamSubscription<T> _SubscriptionTransformer<S, T>(Stream<S> stream, bool cancelOnError);
}
class _StreamSubscriptionTransformer<S, T> : StreamTransformerBase<S, T> {
readonly _async._SubscriptionTransformer<S, T> _onListen;
internal _StreamSubscriptionTransformer(_async._SubscriptionTransformer<S, T> _onListen) {
this._onListen = _onListen;
}
public override Stream<T> bind(Stream<S> stream) =>
new _BoundSubscriptionStream<S, T>(stream, _onListen);
}
class _BoundSubscriptionStream<S, T> : Stream<T> {
internal _BoundSubscriptionStream(Stream<S> _stream, _async._SubscriptionTransformer<S, T> _onListen) {
this._stream = _stream;
this._onListen = _onListen;
}
readonly _async._SubscriptionTransformer<S, T> _onListen;
readonly Stream<S> _stream;
public override bool isBroadcast {
get { return _stream.isBroadcast; }
}
public override StreamSubscription<T> listen(Action<T> onData,
Action<object, string> onError = null, Action onDone = null, bool cancelOnError = false) {
//cancelOnError = cancelOnError;
StreamSubscription<T> result = _onListen(_stream, cancelOnError);
result.onData(onData);
result.onError(onError);
result.onDone(onDone);
return result;
}
}
}

Просмотреть файл

@ -0,0 +1,3 @@
fileFormatVersion: 2
guid: 4be593ce960f459482dbeb617dfcb4e0
timeCreated: 1628682407

Просмотреть файл

@ -68,6 +68,14 @@ namespace Unity.UIWidgets.async {
public AsyncError(Exception innerException) : base(null, innerException) {
}
public static string defaultStackTrace(object error) {
if (error is Exception ex) {
return ex.StackTrace;
}
return "";
}
}
struct _ZoneFunction<T> where T : Delegate {

Просмотреть файл

@ -0,0 +1,334 @@
using System;
using System.Collections.Generic;
using Unity.UIWidgets.async;
using Unity.UIWidgets.foundation;
namespace Unity.UIWidgets.widgets {
public abstract class StreamBuilderBase<T, S> : StatefulWidget {
public StreamBuilderBase(Key key = null, Stream<T> stream = null) : base(key: key) {
this.stream = stream;
}
public readonly Stream<T> stream;
public abstract S initial();
public virtual S afterConnected(S current) => current;
public abstract S afterData(S current, T data);
public virtual S afterError(S current, object error) => current;
public virtual S afterDone(S current) => current;
public virtual S afterDisconnected(S current) => current;
public abstract Widget build(BuildContext context, S currentSummary);
public override State createState() => new _StreamBuilderBaseState<T, S>();
}
class _StreamBuilderBaseState<T, S> : State<StreamBuilderBase<T, S>> {
StreamSubscription<T> _subscription;
S _summary;
public override void initState() {
base.initState();
_summary = widget.initial();
_subscribe();
}
public override void didUpdateWidget(StatefulWidget statefulWidget) {
StreamBuilderBase<T, S> oldWidget = statefulWidget as StreamBuilderBase<T, S>;
if (oldWidget == null) {
return;
}
base.didUpdateWidget(statefulWidget);
if (oldWidget != null) {
if (oldWidget.stream != widget.stream) {
if (_subscription != null) {
_unsubscribe();
_summary = widget.afterDisconnected(_summary);
}
_subscribe();
}
}
}
public override Widget build(BuildContext context) => widget.build(context, _summary);
public override void dispose() {
_unsubscribe();
base.dispose();
}
void _subscribe() {
if (widget.stream != null) {
_subscription = widget.stream.listen(
(T data) => { setState(() => { _summary = widget.afterData(_summary, data); }); },
onError: (object error, string stackTrace) => {
setState(() => { _summary = widget.afterError(_summary, error); });
},
onDone: () => { setState(() => { _summary = widget.afterDone(_summary); }); });
_summary = widget.afterConnected(_summary);
}
}
void _unsubscribe() {
if (_subscription != null) {
_subscription.cancel();
_subscription = null;
}
}
}
public enum ConnectionState {
none,
waiting,
active,
done,
}
//@immutable
public class AsyncSnapshot<T> : IEquatable<AsyncSnapshot<T>> {
AsyncSnapshot(ConnectionState connectionState, object data, object error) {
D.assert(connectionState != null);
D.assert(!(data != null && error != null));
this.connectionState = connectionState;
this.data = (T) data;
this.error = error;
}
public static AsyncSnapshot<object> nothing() {
return new AsyncSnapshot<object>(ConnectionState.none, null, null);
}
public static AsyncSnapshot<T> withData(ConnectionState state, T data) {
return new AsyncSnapshot<T>(state, data, null);
}
public static AsyncSnapshot<T> withError(ConnectionState state, object error) {
return new AsyncSnapshot<T>(state, null, error);
}
public readonly ConnectionState connectionState;
public readonly T data;
public T requireData {
get {
if (hasData)
return data;
if (hasError)
//TODO: not sure if cast works
throw (Exception) error;
throw new Exception("Snapshot has neither data nor error");
}
}
public readonly object error;
public AsyncSnapshot<T> inState(ConnectionState state) {
return new AsyncSnapshot<T>(state, data, error);
}
public bool hasData {
get => data != null;
}
public bool hasError {
get => error != null;
}
public override string ToString() =>
$"{foundation_.objectRuntimeType(this, "AsyncSnapshot")}({connectionState}, {data}, {error})";
public static bool operator ==(AsyncSnapshot<T> left, AsyncSnapshot<T> right) {
return Equals(left, right);
}
public static bool operator !=(AsyncSnapshot<T> left, AsyncSnapshot<T> right) {
return !Equals(left, right);
}
public bool Equals(AsyncSnapshot<T> other) {
if (ReferenceEquals(null, other)) {
return false;
}
if (ReferenceEquals(this, other)) {
return true;
}
return connectionState == other.connectionState && EqualityComparer<T>.Default.Equals(data, other.data) &&
Equals(error, other.error);
}
public override bool Equals(object obj) {
if (ReferenceEquals(null, obj)) {
return false;
}
if (ReferenceEquals(this, obj)) {
return true;
}
if (obj.GetType() != GetType()) {
return false;
}
return Equals((AsyncSnapshot<T>) obj);
}
public override int GetHashCode() {
unchecked {
var hashCode = (int) connectionState;
hashCode = (hashCode * 397) ^ EqualityComparer<T>.Default.GetHashCode(data);
hashCode = (hashCode * 397) ^ (error != null ? error.GetHashCode() : 0);
return hashCode;
}
}
}
public static partial class _async {
public delegate Widget AsyncWidgetBuilder<T>(BuildContext context, AsyncSnapshot<T> snapshot);
}
// TODO(ianh): remove unreachable code above once https://github.com/dart-lang/linter/issues/1139 is fixed
public class StreamBuilder<T> : StreamBuilderBase<T, AsyncSnapshot<T>> {
public StreamBuilder(
_async.AsyncWidgetBuilder<T> builder,
Key key = null,
T initialData = default,
Stream<T> stream = null
) : base(key: key, stream: stream) {
D.assert(builder != null);
this.builder = builder;
this.initialData = initialData;
}
public readonly _async.AsyncWidgetBuilder<T> builder;
public readonly T initialData;
public override
AsyncSnapshot<T> initial() => AsyncSnapshot<T>.withData(ConnectionState.none, initialData);
public override
AsyncSnapshot<T> afterConnected(AsyncSnapshot<T> current) => current.inState(ConnectionState.waiting);
public override
AsyncSnapshot<T> afterData(AsyncSnapshot<T> current, T data) {
return AsyncSnapshot<T>.withData(ConnectionState.active, data);
}
public override
AsyncSnapshot<T> afterError(AsyncSnapshot<T> current, object error) {
return AsyncSnapshot<T>.withError(ConnectionState.active, error);
}
public override
AsyncSnapshot<T> afterDone(AsyncSnapshot<T> current) => current.inState(ConnectionState.done);
public override
AsyncSnapshot<T> afterDisconnected(AsyncSnapshot<T> current) => current.inState(ConnectionState.none);
public override
Widget build(BuildContext context, AsyncSnapshot<T> currentSummary) => builder(context, currentSummary);
}
// TODO(ianh): remove unreachable code above once https://github.com/dart-lang/linter/issues/1141 is fixed
public class FutureBuilder<T> : StatefulWidget {
public FutureBuilder(
_async.AsyncWidgetBuilder<T> builder,
Key key = null,
Future<T> future = null,
T initialData = default
) :
base(key: key) {
D.assert(builder != null);
this.builder = builder;
this.future = future;
this.initialData = initialData;
}
public readonly Future<T> future;
public readonly _async.AsyncWidgetBuilder<T> builder;
public readonly T initialData;
public override
State createState() => new _FutureBuilderState<T>();
}
class _FutureBuilderState<T> : State<FutureBuilder<T>> {
object _activeCallbackIdentity;
AsyncSnapshot<T> _snapshot;
public override
void initState() {
base.initState();
_snapshot = AsyncSnapshot<T>.withData(ConnectionState.none, widget.initialData);
_subscribe();
}
public override
void didUpdateWidget(StatefulWidget statefulWidget) {
var oldWidget = statefulWidget as FutureBuilder<T>;
if (oldWidget == null) {
return;
}
base.didUpdateWidget(oldWidget);
if (oldWidget.future != widget.future) {
if (_activeCallbackIdentity != null) {
_unsubscribe();
_snapshot = _snapshot.inState(ConnectionState.none);
}
_subscribe();
}
}
public override
Widget build(BuildContext context) => widget.builder(context, _snapshot);
public override
void dispose() {
_unsubscribe();
base.dispose();
}
void _subscribe() {
if (widget.future != null) {
object callbackIdentity = new object();
_activeCallbackIdentity = callbackIdentity;
widget.future.then((object dataIn) => {
var data = (T) dataIn;
if (_activeCallbackIdentity == callbackIdentity) {
setState(() => { _snapshot = AsyncSnapshot<T>.withData(ConnectionState.done, data); });
}
}, onError: (Exception error) => {
if (_activeCallbackIdentity == callbackIdentity) {
setState(() => { _snapshot = AsyncSnapshot<T>.withError(ConnectionState.done, error); });
}
return FutureOr.nil;
});
_snapshot = _snapshot.inState(ConnectionState.waiting);
}
}
void _unsubscribe() {
_activeCallbackIdentity = null;
}
}
}

Просмотреть файл

@ -0,0 +1,3 @@
fileFormatVersion: 2
guid: 459525b5a4954fd3b46b3462cc408fbd
timeCreated: 1628671862