* Replace find_next_block with separate find and next functions
First "working" read-ahead

* Fix in-memory clusters being re-read

* Hugely improve read-ahead performance

* Code cleanup

* Read-ahead more clusters to fix latency issues with empty clusters

* Fix git merge changes

* PR comments

* Fix function signature

* Fix clang build

* Whitespace
This commit is contained in:
Jacob Wirth 2019-03-28 17:28:42 -07:00 коммит произвёл GitHub
Родитель 9be96b6c9e
Коммит 71ad161ee9
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
12 изменённых файлов: 996 добавлений и 544 удалений

Просмотреть файл

@ -12,9 +12,7 @@
#include <list>
#include <fstream>
#include <memory>
#include <azure_c_shared_utility/threadapi.h>
#include <azure_c_shared_utility/condition.h>
#include <azure_c_shared_utility/lock.h>
#include <thread>
#if defined(__clang__)
@ -83,6 +81,10 @@ constexpr uint64_t operator"" _s(unsigned long long x)
#define CLUSTER_WRITE_DELAY_NS 2_s
#endif
#ifndef CLUSTER_READ_AHEAD_COUNT
#define CLUSTER_READ_AHEAD_COUNT 2
#endif
static_assert(MAX_CLUSTER_LENGTH_NS < INT16_MAX * MATROSKA_TIMESCALE_NS, "Cluster length must fit in a 16 bit int");
static_assert(CLUSTER_WRITE_DELAY_NS >= MAX_CLUSTER_LENGTH_NS * 2, "Cluster write delay is shorter than 2 clusters");
@ -136,9 +138,11 @@ public:
size_t write(const void *buffer, size_t size) override;
uint64 getFilePointer() override;
void close() override;
void setOwnerThread();
private:
std::fstream m_stream;
std::thread::id m_owner;
};
// Struct matches https://docs.microsoft.com/en-us/windows/desktop/wmdm/-bitmapinfoheader

Просмотреть файл

@ -10,6 +10,8 @@
#include <k4ainternal/matroska_common.h>
#include <functional>
#include <mutex>
#include <future>
namespace k4arecord
{
@ -33,17 +35,31 @@ typedef struct _cluster_info_t
// Once it is known that no gap is present between indexed clusters, next_known is set to true.
typedef std::unique_ptr<cluster_info_t, std::function<void(cluster_info_t *)>> cluster_cache_t;
typedef struct _read_block_t
{
struct _track_reader_t *reader;
cluster_info_t *cluster_info;
std::shared_ptr<libmatroska::KaxCluster> cluster;
libmatroska::KaxInternalBlock *block;
// A pointer to a cluster that is still being loaded from disk.
typedef std::shared_future<std::shared_ptr<libmatroska::KaxCluster>> future_cluster_t;
uint64_t timestamp_ns;
uint64_t sync_timestamp_ns;
int index;
} read_block_t;
typedef struct _loaded_cluster_t
{
cluster_info_t *cluster_info = NULL;
std::shared_ptr<libmatroska::KaxCluster> cluster;
#if CLUSTER_READ_AHEAD_COUNT
// Pointers to previous and next clusters to keep them preloaded in memory.
future_cluster_t previous_clusters[CLUSTER_READ_AHEAD_COUNT];
future_cluster_t next_clusters[CLUSTER_READ_AHEAD_COUNT];
#endif
} loaded_cluster_t;
typedef struct _block_info_t
{
struct _track_reader_t *reader = NULL;
std::shared_ptr<loaded_cluster_t> cluster;
libmatroska::KaxInternalBlock *block = NULL;
uint64_t timestamp_ns = 0; // The timestamp of the block as written in the file.
uint64_t sync_timestamp_ns = 0; // The timestamp of the block, including sychronization offsets.
int index = -1; // Index of the block element within the cluster.
} block_info_t;
typedef struct _track_reader_t
{
@ -52,13 +68,17 @@ typedef struct _track_reader_t
k4a_image_format_t format;
uint64_t sync_delay_ns;
BITMAPINFOHEADER *bitmap_header;
std::shared_ptr<read_block_t> current_block;
std::shared_ptr<block_info_t> current_block;
} track_reader_t;
typedef struct _k4a_playback_context_t
{
const char *file_path;
std::unique_ptr<IOCallback> ebml_file;
std::mutex io_lock; // Locks access to ebml_file
bool file_closing;
logger_t logger_handle;
uint64_t timecode_scale;
@ -78,9 +98,10 @@ typedef struct _k4a_playback_context_t
uint64_t sync_period_ns;
uint64_t seek_timestamp_ns;
cluster_info_t *seek_cluster;
std::shared_ptr<loaded_cluster_t> seek_cluster;
cluster_cache_t cluster_cache;
std::recursive_mutex cache_lock; // Locks modification of cluster_cache
track_reader_t color_track;
track_reader_t depth_track;
@ -128,11 +149,17 @@ void populate_cluster_info(k4a_playback_context_t *context,
cluster_info_t *cluster_info);
cluster_info_t *find_cluster(k4a_playback_context_t *context, uint64_t timestamp_ns);
cluster_info_t *next_cluster(k4a_playback_context_t *context, cluster_info_t *current, bool next);
std::shared_ptr<libmatroska::KaxCluster> load_cluster(k4a_playback_context_t *context, cluster_info_t *cluster_info);
std::shared_ptr<read_block_t> find_next_block(k4a_playback_context_t *context, track_reader_t *reader, bool next);
k4a_result_t new_capture(k4a_playback_context_t *context,
std::shared_ptr<read_block_t> &block,
k4a_capture_t *capture_handle);
std::shared_ptr<loaded_cluster_t> load_cluster(k4a_playback_context_t *context, cluster_info_t *cluster_info);
std::shared_ptr<loaded_cluster_t> load_next_cluster(k4a_playback_context_t *context,
loaded_cluster_t *current_cluster,
bool next);
std::shared_ptr<block_info_t> find_block(k4a_playback_context_t *context,
track_reader_t *reader,
uint64_t timestamp_ns);
std::shared_ptr<block_info_t> next_block(k4a_playback_context_t *context, block_info_t *current, bool next);
k4a_result_t new_capture(k4a_playback_context_t *context, block_info_t *block, k4a_capture_t *capture_handle);
k4a_stream_result_t get_capture(k4a_playback_context_t *context, k4a_capture_t *capture_handle, bool next);
k4a_stream_result_t get_imu_sample(k4a_playback_context_t *context, k4a_imu_sample_t *imu_sample, bool next);
@ -148,7 +175,7 @@ template<typename T> T *read_element(k4a_playback_context_t *context, EbmlElemen
typed_element->Read(*context->stream, T::ClassInfos.Context, upper_level, dummy, true);
return typed_element;
}
catch (std::ios_base::failure e)
catch (std::ios_base::failure &e)
{
LOG_ERROR("Failed to read element %s in recording '%s': %s",
T::ClassInfos.GetName(),
@ -215,7 +242,7 @@ template<typename T> std::unique_ptr<T> find_next(k4a_playback_context_t *contex
return std::unique_ptr<T>(static_cast<T *>(element));
}
catch (std::ios_base::failure e)
catch (std::ios_base::failure &e)
{
LOG_ERROR("Failed to find %s in recording '%s': %s", T::ClassInfos.GetName(), context->file_path, e.what());
return nullptr;

Просмотреть файл

@ -10,6 +10,8 @@
#include <k4ainternal/matroska_common.h>
#include <set>
#include <condition_variable>
#include <mutex>
namespace k4arecord
{
@ -72,12 +74,13 @@ typedef struct _k4a_record_context_t
// std::list can't be memset to 0, so we need to use a pointer.
std::unique_ptr<std::list<cluster_t *>> pending_clusters;
LOCK_HANDLE pending_cluster_lock; // Locks last_written_timestamp, most_recent_timestamp, and pending_clusters
std::mutex pending_cluster_lock; // Locks last_written_timestamp, most_recent_timestamp, and pending_clusters
bool writer_stopping;
THREAD_HANDLE writer_thread;
COND_HANDLE writer_notify;
LOCK_HANDLE writer_lock;
std::thread writer_thread;
// std::condition_variable constructor may throw, so wrap this in a pointer.
std::unique_ptr<std::condition_variable> writer_notify;
std::mutex writer_lock;
bool header_written, first_cluster_written;
} k4a_record_context_t;
@ -117,7 +120,7 @@ k4a_result_t write_cluster(k4a_record_context_t *context, cluster_t *cluster, ui
k4a_result_t start_matroska_writer_thread(k4a_record_context_t *context);
k4a_result_t stop_matroska_writer_thread(k4a_record_context_t *context);
void stop_matroska_writer_thread(k4a_record_context_t *context);
libmatroska::KaxTag *add_tag(k4a_record_context_t *context,
const char *name,

Просмотреть файл

@ -8,7 +8,7 @@ using namespace k4arecord;
static_assert(sizeof(std::streamoff) == sizeof(int64), "64-bit seeking is not supported on this architecture");
static_assert(sizeof(std::streamsize) == sizeof(int64), "64-bit seeking is not supported on this architecture");
LargeFileIOCallback::LargeFileIOCallback(const char *path, const open_mode mode)
LargeFileIOCallback::LargeFileIOCallback(const char *path, const open_mode mode) : m_owner(std::this_thread::get_id())
{
assert(path);
std::ios::openmode om = std::ios::binary;
@ -44,6 +44,7 @@ LargeFileIOCallback::~LargeFileIOCallback()
uint32 LargeFileIOCallback::read(void *buffer, size_t size)
{
assert(size <= UINT32_MAX); // can't properly return > uint32
assert(m_owner == std::this_thread::get_id());
m_stream.read((char *)buffer, (std::streamsize)size);
return (uint32)m_stream.gcount();
@ -52,6 +53,7 @@ uint32 LargeFileIOCallback::read(void *buffer, size_t size)
void LargeFileIOCallback::setFilePointer(int64 offset, libebml::seek_mode mode)
{
assert(mode == SEEK_SET || mode == SEEK_CUR || mode == SEEK_END);
assert(m_owner == std::this_thread::get_id());
switch (mode)
{
@ -72,6 +74,7 @@ void LargeFileIOCallback::setFilePointer(int64 offset, libebml::seek_mode mode)
size_t LargeFileIOCallback::write(const void *buffer, size_t size)
{
assert(size <= INT64_MAX); // m_stream.write() takes a signed long input
assert(m_owner == std::this_thread::get_id());
m_stream.write((const char *)buffer, (std::streamsize)size);
return size;
@ -79,6 +82,7 @@ size_t LargeFileIOCallback::write(const void *buffer, size_t size)
uint64 LargeFileIOCallback::getFilePointer()
{
assert(m_owner == std::this_thread::get_id());
std::streampos pos = m_stream.tellg();
assert(pos >= 0); // tellg() should have thrown an exception if this happens
return (uint64)pos;
@ -96,3 +100,8 @@ void LargeFileIOCallback::close()
m_stream.close();
}
}
void LargeFileIOCallback::setOwnerThread()
{
m_owner = std::this_thread::get_id();
}

Просмотреть файл

@ -26,7 +26,7 @@ std::unique_ptr<EbmlElement> next_child(k4a_playback_context_t *context, EbmlEle
return std::unique_ptr<EbmlElement>(element);
}
catch (std::ios_base::failure e)
catch (std::ios_base::failure &e)
{
LOG_ERROR("Failed to get next child (parent id %x) in recording '%s': %s",
EbmlId(*parent).GetValue(),
@ -46,7 +46,7 @@ k4a_result_t skip_element(k4a_playback_context_t *context, EbmlElement *element)
return K4A_RESULT_SUCCEEDED;
}
catch (std::ios_base::failure e)
catch (std::ios_base::failure &e)
{
LOG_ERROR("Failed seek past element (id %x) in recording '%s': %s",
EbmlId(*element).GetValue(),
@ -220,17 +220,17 @@ k4a_result_t parse_mkv(k4a_playback_context_t *context)
KaxSimpleBlock *simple_block = NULL;
KaxBlockGroup *block_group = NULL;
std::shared_ptr<KaxCluster> last_cluster = load_cluster(context, cluster_info);
if (last_cluster == nullptr)
std::shared_ptr<loaded_cluster_t> last_cluster = load_cluster(context, cluster_info);
if (last_cluster == nullptr || last_cluster->cluster == nullptr)
{
LOG_ERROR("Failed to load end of recording.", 0);
return K4A_RESULT_FAILED;
}
for (EbmlElement *e : last_cluster->GetElementList())
for (EbmlElement *e : last_cluster->cluster->GetElementList())
{
if (check_element_type(e, &simple_block))
{
simple_block->SetParent(*last_cluster);
simple_block->SetParent(*last_cluster->cluster);
uint64_t block_timestamp_ns = simple_block->GlobalTimecode();
if (block_timestamp_ns > context->last_timestamp_ns)
{
@ -239,7 +239,7 @@ k4a_result_t parse_mkv(k4a_playback_context_t *context)
}
else if (check_element_type(e, &block_group))
{
block_group->SetParent(*last_cluster);
block_group->SetParent(*last_cluster->cluster);
uint64_t block_timestamp_ns = block_group->GlobalTimecode();
if (block_timestamp_ns > context->last_timestamp_ns)
{
@ -280,58 +280,67 @@ k4a_result_t populate_cluster_cache(k4a_playback_context_t *context)
return K4A_RESULT_FAILED;
}
context->cluster_cache = cluster_cache_t(new cluster_info_t, cluster_cache_deleter);
context->seek_cluster = context->cluster_cache.get();
populate_cluster_info(context, first_cluster, context->cluster_cache.get());
// Populate the reset of the cache with the Cue data stored in the file.
cluster_info_t *cluster_cache_end = context->cluster_cache.get();
if (context->cues)
try
{
uint64_t last_offset = context->first_cluster_offset;
uint64_t last_timestamp = context->seek_cluster->timestamp_ns;
KaxCuePoint *cue = NULL;
for (EbmlElement *e : context->cues->GetElementList())
std::lock_guard<std::recursive_mutex> lock(context->cache_lock);
context->cluster_cache = cluster_cache_t(new cluster_info_t, cluster_cache_deleter);
populate_cluster_info(context, first_cluster, context->cluster_cache.get());
// Populate the rest of the cache with the Cue data stored in the file.
cluster_info_t *cluster_cache_end = context->cluster_cache.get();
if (context->cues)
{
if (check_element_type(e, &cue))
uint64_t last_offset = context->first_cluster_offset;
uint64_t last_timestamp_ns = context->cluster_cache->timestamp_ns;
KaxCuePoint *cue = NULL;
for (EbmlElement *e : context->cues->GetElementList())
{
const KaxCueTrackPositions *positions = cue->GetSeekPosition();
if (positions)
if (check_element_type(e, &cue))
{
uint64_t timestamp = GetChild<KaxCueTime>(*cue).GetValue() * context->timecode_scale;
uint64_t file_offset = positions->ClusterPosition();
if (file_offset == last_offset)
const KaxCueTrackPositions *positions = cue->GetSeekPosition();
if (positions)
{
// This cluster is already in the cache, skip it.
continue;
}
else if (file_offset > last_offset && timestamp >= last_timestamp)
{
cluster_info_t *cluster_info = new cluster_info_t;
// This timestamp might not actually be the start of the cluster.
// The start timestamp is not known until populate_cluster_info is called.
cluster_info->timestamp_ns = timestamp;
cluster_info->file_offset = file_offset;
cluster_info->previous = cluster_cache_end;
uint64_t timestamp_ns = GetChild<KaxCueTime>(*cue).GetValue() * context->timecode_scale;
uint64_t file_offset = positions->ClusterPosition();
cluster_cache_end->next = cluster_info;
cluster_cache_end = cluster_info;
if (file_offset == last_offset)
{
// This cluster is already in the cache, skip it.
continue;
}
else if (file_offset > last_offset && timestamp_ns >= last_timestamp_ns)
{
cluster_info_t *cluster_info = new cluster_info_t;
// This timestamp might not actually be the start of the cluster.
// The start timestamp is not known until populate_cluster_info is called.
cluster_info->timestamp_ns = timestamp_ns;
cluster_info->file_offset = file_offset;
cluster_info->previous = cluster_cache_end;
last_offset = file_offset;
last_timestamp = timestamp;
}
else
{
LOG_WARNING("Cluster or Cue entry is out of order.", 0);
cluster_cache_end->next = cluster_info;
cluster_cache_end = cluster_info;
last_offset = file_offset;
last_timestamp_ns = timestamp_ns;
}
else
{
LOG_WARNING("Cluster or Cue entry is out of order.", 0);
}
}
}
}
}
else
{
LOG_WARNING("Recording is missing Cue entries, playback performance may be impacted.", 0);
}
}
else
catch (std::system_error &e)
{
LOG_WARNING("Recording is missing Cue entries, playback performance may be impacted.", 0);
LOG_ERROR("Failed to populate cluster cache: %s", e.what());
return K4A_RESULT_FAILED;
}
return K4A_RESULT_SUCCEEDED;
@ -894,7 +903,7 @@ k4a_result_t seek_offset(k4a_playback_context_t *context, uint64_t offset)
context->ebml_file->setFilePointer((int64_t)file_offset);
return K4A_RESULT_SUCCEEDED;
}
catch (std::ios_base::failure e)
catch (std::ios_base::failure &e)
{
LOG_ERROR("Failed to seek file to %llu (relative %llu) '%s': %s",
file_offset,
@ -907,6 +916,7 @@ k4a_result_t seek_offset(k4a_playback_context_t *context, uint64_t offset)
// Read the cluster metadata from a Matroska element and add it to a cache entry.
// File read pointer should already be at the start of the cluster.
// The caller should currently own the lock for the cluster cache.
void populate_cluster_info(k4a_playback_context_t *context,
std::shared_ptr<KaxCluster> &cluster,
cluster_info_t *cluster_info)
@ -966,29 +976,39 @@ cluster_info_t *find_cluster(k4a_playback_context_t *context, uint64_t timestamp
RETURN_VALUE_IF_ARG(NULL, context == NULL);
RETURN_VALUE_IF_ARG(NULL, context->cluster_cache == nullptr);
// Find the closest cluster in the cache
cluster_info_t *cluster_info = context->cluster_cache.get();
while (cluster_info->next)
try
{
if (cluster_info->next->timestamp_ns > timestamp_ns)
{
break;
}
cluster_info = cluster_info->next;
}
std::lock_guard<std::recursive_mutex> lock(context->cache_lock);
// Make sure there are no gaps in the cache and ensure this really is the closest cluster.
cluster_info_t *next_cluster_info = next_cluster(context, cluster_info, true);
while (next_cluster_info)
{
if (next_cluster_info->timestamp_ns > timestamp_ns)
// Find the closest cluster in the cache
cluster_info_t *cluster_info = context->cluster_cache.get();
while (cluster_info->next)
{
break;
if (cluster_info->next->timestamp_ns > timestamp_ns)
{
break;
}
cluster_info = cluster_info->next;
}
cluster_info = next_cluster_info;
next_cluster_info = next_cluster(context, cluster_info, true);
// Make sure there are no gaps in the cache and ensure this really is the closest cluster.
cluster_info_t *next_cluster_info = next_cluster(context, cluster_info, true);
while (next_cluster_info)
{
if (next_cluster_info->timestamp_ns > timestamp_ns)
{
break;
}
cluster_info = next_cluster_info;
next_cluster_info = next_cluster(context, cluster_info, true);
}
return cluster_info;
}
catch (std::system_error &e)
{
LOG_ERROR("Failed to find cluster for timestamp %llu: %s", timestamp_ns, e.what());
return NULL;
}
return cluster_info;
}
// Finds the next or previous cluster given a current cluster. This function checks the cluster_cache first to see if
@ -1000,197 +1020,427 @@ cluster_info_t *next_cluster(k4a_playback_context_t *context, cluster_info_t *cu
RETURN_VALUE_IF_ARG(NULL, context->cluster_cache == nullptr);
RETURN_VALUE_IF_ARG(NULL, current_cluster == NULL);
if (next)
try
{
if (current_cluster->next_known)
std::lock_guard<std::recursive_mutex> lock(context->cache_lock);
if (next)
{
// If end of file, next will be NULL
return current_cluster->next;
}
else
{
// Read forward in file to find next cluster and fill in cache
if (K4A_FAILED(seek_offset(context, current_cluster->file_offset)))
{
LOG_ERROR("Failed to seek to current cluster element.", 0);
return NULL;
}
std::shared_ptr<KaxCluster> current_element = find_next<KaxCluster>(context);
if (current_element == nullptr)
{
LOG_ERROR("Failed to find current cluster element.", 0);
return NULL;
}
populate_cluster_info(context, current_element, current_cluster);
if (current_cluster->next_known)
{
// If populate_cluster_info() just connected the next entry, we can exit early.
// If end of file, next will be NULL
return current_cluster->next;
}
// Seek to the end of the current cluster so that find_next returns the next cluster in the file.
if (K4A_FAILED(skip_element(context, current_element.get())))
else
{
LOG_ERROR("Failed to seek to next cluster element.", 0);
return NULL;
}
std::shared_ptr<KaxCluster> next_cluster = find_next<KaxCluster>(context, true);
if (next_cluster)
{
if (current_cluster->next &&
current_cluster->next->file_offset == context->segment->GetRelativePosition(*next_cluster.get()))
std::lock_guard<std::mutex> io_lock(context->io_lock);
if (context->file_closing)
{
// If there is a non-cluster element between these entries, they may not get connected otherwise.
current_cluster->next_known = true;
current_cluster = current_cluster->next;
// User called k4a_playback_close(), return immediately.
return NULL;
}
LargeFileIOCallback *file_io = dynamic_cast<LargeFileIOCallback *>(context->ebml_file.get());
if (file_io != NULL)
{
file_io->setOwnerThread();
}
// Read forward in file to find next cluster and fill in cache
if (K4A_FAILED(seek_offset(context, current_cluster->file_offset)))
{
LOG_ERROR("Failed to seek to current cluster element.", 0);
return NULL;
}
std::shared_ptr<KaxCluster> current_element = find_next<KaxCluster>(context);
if (current_element == nullptr)
{
LOG_ERROR("Failed to find current cluster element.", 0);
return NULL;
}
populate_cluster_info(context, current_element, current_cluster);
if (current_cluster->next_known)
{
// If populate_cluster_info() just connected the next entry, we can exit early.
return current_cluster->next;
}
// Seek to the end of the current cluster so that find_next returns the next cluster in the file.
if (K4A_FAILED(skip_element(context, current_element.get())))
{
LOG_ERROR("Failed to seek to next cluster element.", 0);
return NULL;
}
std::shared_ptr<KaxCluster> next_cluster = find_next<KaxCluster>(context, true);
if (next_cluster)
{
if (current_cluster->next && current_cluster->next->file_offset ==
context->segment->GetRelativePosition(*next_cluster.get()))
{
// If there is a non-cluster element between these entries, they may not get connected
// otherwise.
current_cluster->next_known = true;
current_cluster = current_cluster->next;
}
else
{
// Add a new entry to the cache for the cluster we just found.
cluster_info_t *next_cluster_info = new cluster_info_t;
next_cluster_info->previous = current_cluster;
next_cluster_info->next = current_cluster->next;
current_cluster->next = next_cluster_info;
current_cluster->next_known = true;
if (next_cluster_info->next)
{
next_cluster_info->next->previous = next_cluster_info;
}
current_cluster = next_cluster_info;
}
populate_cluster_info(context, next_cluster, current_cluster);
return current_cluster;
}
else
{
// Add a new entry to the cache for the cluster we just found.
cluster_info_t *next_cluster_info = new cluster_info_t;
next_cluster_info->previous = current_cluster;
next_cluster_info->next = current_cluster->next;
current_cluster->next = next_cluster_info;
// End of file reached
current_cluster->next_known = true;
if (next_cluster_info->next)
{
next_cluster_info->next->previous = next_cluster_info;
}
current_cluster = next_cluster_info;
return NULL;
}
populate_cluster_info(context, next_cluster, current_cluster);
return current_cluster;
}
else
{
// End of file reached
current_cluster->next_known = true;
return NULL;
}
}
}
else
{
if (current_cluster->previous)
{
if (current_cluster->previous->next_known)
{
return current_cluster->previous;
}
else
{
// Read forward from previous cached cluster to fill in gap
cluster_info_t *next_cluster_info = next_cluster(context, current_cluster->previous, true);
while (next_cluster_info && next_cluster_info != current_cluster)
{
next_cluster_info = next_cluster(context, next_cluster_info, true);
}
return current_cluster->previous;
}
}
else
{
// Beginning of file reached
return NULL;
if (current_cluster->previous)
{
if (current_cluster->previous->next_known)
{
return current_cluster->previous;
}
else
{
// Read forward from previous cached cluster to fill in gap
cluster_info_t *next_cluster_info = next_cluster(context, current_cluster->previous, true);
while (next_cluster_info && next_cluster_info != current_cluster)
{
next_cluster_info = next_cluster(context, next_cluster_info, true);
}
return current_cluster->previous;
}
}
else
{
// Beginning of file reached
return NULL;
}
}
}
catch (std::system_error &e)
{
LOG_ERROR("Failed to find next cluster: %s", e.what());
return NULL;
}
}
// Load the actual block data for a cluster off the disk.
// If the cluster is already in memory, a shared_ptr to it will be returned.
// Load a cluster from the cluster cache / disk without any neighbor preloading.
// This should never fail unless there is a file IO error.
std::shared_ptr<KaxCluster> load_cluster(k4a_playback_context_t *context, cluster_info_t *cluster_info)
static std::shared_ptr<KaxCluster> load_cluster_internal(k4a_playback_context_t *context, cluster_info_t *cluster_info)
{
RETURN_VALUE_IF_ARG(nullptr, context == NULL);
RETURN_VALUE_IF_ARG(nullptr, context->ebml_file == nullptr);
try
{
// Check if the cluster already exists in memory, and if so, return it.
std::shared_ptr<KaxCluster> cluster = cluster_info->cluster.lock();
if (cluster)
{
context->cache_hits++;
}
else
{
std::lock_guard<std::mutex> lock(context->io_lock);
if (context->file_closing)
{
// User called k4a_playback_close(), return immediately.
return nullptr;
}
// The cluster may have been loaded while we were acquiring the io lock, check again before actually loading
// from disk.
cluster = cluster_info->cluster.lock();
if (cluster)
{
context->cache_hits++;
}
else
{
context->load_count++;
// Start reading the actual cluster data from disk.
LargeFileIOCallback *file_io = dynamic_cast<LargeFileIOCallback *>(context->ebml_file.get());
if (file_io != NULL)
{
file_io->setOwnerThread();
}
if (K4A_FAILED(seek_offset(context, cluster_info->file_offset)))
{
LOG_ERROR("Failed to seek to cluster cluster at: %llu", cluster_info->file_offset);
return nullptr;
}
cluster = find_next<KaxCluster>(context, true);
if (cluster)
{
if (read_element<KaxCluster>(context, cluster.get()) == NULL)
{
LOG_ERROR("Failed to load cluster at: %llu", cluster_info->file_offset);
return nullptr;
}
uint64_t timecode = GetChild<KaxClusterTimecode>(*cluster).GetValue();
assert(context->timecode_scale <= INT64_MAX);
cluster->InitTimecode(timecode, (int64_t)context->timecode_scale);
cluster_info->cluster = cluster;
}
}
}
return cluster;
}
catch (std::system_error &e)
{
LOG_ERROR("Failed to load cluster from disk: %s", e.what());
return nullptr;
}
}
// Load the actual block data for a cluster off the disk, and start preloading the neighboring clusters.
// This should never fail unless there is a file IO error.
std::shared_ptr<loaded_cluster_t> load_cluster(k4a_playback_context_t *context, cluster_info_t *cluster_info)
{
RETURN_VALUE_IF_ARG(nullptr, context == NULL);
RETURN_VALUE_IF_ARG(nullptr, context->cluster_cache == nullptr);
RETURN_VALUE_IF_ARG(nullptr, cluster_info == NULL);
std::shared_ptr<KaxCluster> cluster = cluster_info->cluster.lock();
if (cluster)
std::shared_ptr<KaxCluster> cluster = load_cluster_internal(context, cluster_info);
if (cluster == nullptr)
{
context->cache_hits++;
return cluster;
}
context->load_count++;
if (K4A_FAILED(seek_offset(context, cluster_info->file_offset)))
{
LOG_ERROR("Failed to seek to cluster at: %llu", cluster_info->file_offset);
return nullptr;
}
cluster = find_next<KaxCluster>(context);
if (cluster)
std::shared_ptr<loaded_cluster_t> result = std::shared_ptr<loaded_cluster_t>(new loaded_cluster_t());
result->cluster_info = cluster_info;
result->cluster = cluster;
#if CLUSTER_READ_AHEAD_COUNT
try
{
if (read_element<KaxCluster>(context, cluster.get()) == NULL)
// Preload the neighboring clusters immediately
cluster_info_t *previous_cluster_info = cluster_info;
cluster_info_t *next_cluster_info = cluster_info;
for (size_t i = 0; i < CLUSTER_READ_AHEAD_COUNT; i++)
{
LOG_ERROR("Failed to read cluster data at: %llu", cluster_info->file_offset);
return nullptr;
if (previous_cluster_info != NULL)
{
previous_cluster_info = next_cluster(context, previous_cluster_info, false);
}
if (next_cluster_info != NULL)
{
next_cluster_info = next_cluster(context, next_cluster_info, true);
}
result->previous_clusters[i] = std::async(std::launch::deferred, [context, previous_cluster_info] {
return previous_cluster_info ? load_cluster_internal(context, previous_cluster_info) : nullptr;
});
result->next_clusters[i] = std::async(std::launch::deferred, [context, next_cluster_info] {
return next_cluster_info ? load_cluster_internal(context, next_cluster_info) : nullptr;
});
result->previous_clusters[i].wait();
result->next_clusters[i].wait();
}
uint64_t timecode = GetChild<KaxClusterTimecode>(*cluster).GetValue();
assert(context->timecode_scale <= INT64_MAX);
cluster->InitTimecode(timecode, (int64_t)context->timecode_scale);
cluster_info->cluster = cluster;
return cluster;
}
else
catch (std::system_error &e)
{
LOG_ERROR("Failed to find cluster element at: %llu", cluster_info->file_offset);
LOG_ERROR("Failed to load read-ahead clusters: %s", e.what());
return nullptr;
}
#endif
return result;
}
// Search operates in 2 modes:
// - If there is already a current_block, find the next/previous block
// - If there is no current_block, find the first block before or after the seek_timestamp
std::shared_ptr<read_block_t> find_next_block(k4a_playback_context_t *context, track_reader_t *reader, bool next)
// Load the next or previous cluster off the disk using the existing preloaded neighbors.
// The next neighbor in sequence will start being preloaded asynchronously.
std::shared_ptr<loaded_cluster_t> load_next_cluster(k4a_playback_context_t *context,
loaded_cluster_t *current_cluster,
bool next)
{
RETURN_VALUE_IF_ARG(nullptr, context == NULL);
RETURN_VALUE_IF_ARG(nullptr, context->cluster_cache == nullptr);
RETURN_VALUE_IF_ARG(nullptr, current_cluster == NULL);
cluster_info_t *cluster_info = next_cluster(context, current_cluster->cluster_info, next);
if (cluster_info == NULL)
{
// End of file reached.
return nullptr;
}
std::shared_ptr<loaded_cluster_t> result = std::shared_ptr<loaded_cluster_t>(new loaded_cluster_t());
result->cluster_info = cluster_info;
#if CLUSTER_READ_AHEAD_COUNT
try
{
// Use the current cluster as one of the neightbors, and then wait for the target cluster to be available.
std::shared_ptr<KaxCluster> old_cluster = current_cluster->cluster;
if (next)
{
result->previous_clusters[0] = std::async(std::launch::deferred, [old_cluster] { return old_cluster; });
for (size_t i = 1; i < CLUSTER_READ_AHEAD_COUNT; i++)
{
result->previous_clusters[i] = current_cluster->previous_clusters[i - 1];
}
current_cluster->next_clusters[0].wait();
result->cluster = current_cluster->next_clusters[0].get();
}
else
{
result->next_clusters[0] = std::async(std::launch::deferred, [old_cluster] { return old_cluster; });
for (size_t i = 1; i < CLUSTER_READ_AHEAD_COUNT; i++)
{
result->next_clusters[i] = current_cluster->next_clusters[i - 1];
}
current_cluster->previous_clusters[0].wait();
result->cluster = current_cluster->previous_clusters[0].get();
}
// Spawn a new async task to preload the next cluster in sequence.
if (next)
{
for (size_t i = 0; i < CLUSTER_READ_AHEAD_COUNT - 1; i++)
{
result->next_clusters[i] = current_cluster->next_clusters[i + 1];
}
result->next_clusters[CLUSTER_READ_AHEAD_COUNT - 1] = std::async([context, cluster_info] {
cluster_info_t *new_cluster = cluster_info;
for (size_t i = 0; i < CLUSTER_READ_AHEAD_COUNT && new_cluster != NULL; i++)
{
new_cluster = next_cluster(context, new_cluster, true);
}
return new_cluster ? load_cluster_internal(context, new_cluster) : nullptr;
});
}
else
{
for (size_t i = 0; i < CLUSTER_READ_AHEAD_COUNT - 1; i++)
{
result->previous_clusters[i] = current_cluster->previous_clusters[i + 1];
}
result->previous_clusters[CLUSTER_READ_AHEAD_COUNT - 1] = std::async([context, cluster_info] {
cluster_info_t *new_cluster = cluster_info;
for (size_t i = 0; i < CLUSTER_READ_AHEAD_COUNT && new_cluster != NULL; i++)
{
new_cluster = next_cluster(context, new_cluster, false);
}
return new_cluster ? load_cluster_internal(context, new_cluster) : nullptr;
});
}
}
catch (std::system_error &e)
{
LOG_ERROR("Failed to load next cluster: %s", e.what());
return nullptr;
}
#else
result->cluster = load_cluster_internal(context, cluster_info);
#endif
return result;
}
// Find the first block with a timestamp >= the specified timestamp. If no blocks are found, a pointer to EOF will be
// returned, or nullptr if an error occurs.
std::shared_ptr<block_info_t> find_block(k4a_playback_context_t *context, track_reader_t *reader, uint64_t timestamp_ns)
{
RETURN_VALUE_IF_ARG(nullptr, context == NULL);
RETURN_VALUE_IF_ARG(nullptr, context->seek_cluster == NULL);
RETURN_VALUE_IF_ARG(nullptr, reader == NULL);
RETURN_VALUE_IF_ARG(nullptr, reader->track == NULL);
bool timestamp_search = reader->current_block == nullptr;
uint64_t track_number = reader->track->TrackNumber().GetValue();
// Create a new block pointing to the start of the cluster containing timestamp_ns.
std::shared_ptr<block_info_t> block = std::make_shared<block_info_t>();
block->reader = reader;
block->index = -1;
cluster_info_t *cluster_info = find_cluster(context, timestamp_ns);
if (cluster_info == NULL)
{
LOG_ERROR("Failed to find data cluster for timestamp: %llu", timestamp_ns);
return nullptr;
}
block->cluster = load_cluster(context, cluster_info);
if (block->cluster == nullptr || block->cluster->cluster == nullptr)
{
LOG_ERROR("Failed to load initial data cluster from disk.", 0);
return nullptr;
}
// Start searching through the blocks for the timestamp we want.
while (block)
{
block = next_block(context, block.get(), true);
if (block)
{
// Return this block if EOF was reached, or the timestamp is >= the search timestamp.
if (block->block == NULL || block->sync_timestamp_ns >= timestamp_ns)
{
return block;
}
}
}
LOG_ERROR("Failed to read next block from disk.", 0);
return nullptr;
}
// Find the next / previous block given a current block. If there is no next block, a block pointing to EOF will be
// returned, or nullptr if an error occurs.
std::shared_ptr<block_info_t> next_block(k4a_playback_context_t *context, block_info_t *current, bool next)
{
RETURN_VALUE_IF_ARG(nullptr, context == NULL);
RETURN_VALUE_IF_ARG(nullptr, current == NULL);
RETURN_VALUE_IF_ARG(nullptr, current->reader == NULL);
RETURN_VALUE_IF_ARG(nullptr, current->cluster == nullptr);
RETURN_VALUE_IF_ARG(nullptr, current->cluster->cluster == nullptr);
RETURN_VALUE_IF_ARG(nullptr, current->cluster->cluster_info == nullptr);
// Get the track number of the current block.
uint64_t track_number = current->reader->track->TrackNumber().GetValue();
assert(track_number <= UINT16_MAX);
uint16_t search_number = static_cast<uint16_t>(track_number);
std::shared_ptr<read_block_t> next_block = std::make_shared<read_block_t>();
next_block->reader = reader;
if (timestamp_search)
// Copy the current block and start the search at the next index.
std::shared_ptr<block_info_t> next_block = std::shared_ptr<block_info_t>(new block_info_t(*current));
next_block->index += next ? 1 : -1;
std::shared_ptr<loaded_cluster_t> search_cluster = next_block->cluster;
while (search_cluster != nullptr && search_cluster->cluster != nullptr)
{
// Search the whole cluster for the correct timestamp
next_block->cluster_info = context->seek_cluster;
next_block->cluster = load_cluster(context, context->seek_cluster);
if (next_block->cluster == nullptr)
{
LOG_ERROR("Failed to load data cluster from disk.", 0);
return nullptr;
}
next_block->index = next ? 0 : ((int)next_block->cluster->ListSize() - 1);
}
else
{
// Increment/Decrement the block index and start searching from there.
next_block->index = reader->current_block->index + (next ? 1 : -1);
next_block->cluster_info = reader->current_block->cluster_info;
next_block->cluster = reader->current_block->cluster;
}
while (next_block->cluster != nullptr)
{
std::vector<EbmlElement *> elements = next_block->cluster->GetElementList();
// Search through the current cluster for the next valid block.
std::vector<EbmlElement *> elements = next_block->cluster->cluster->GetElementList();
KaxSimpleBlock *simple_block = NULL;
KaxBlockGroup *block_group = NULL;
while (next_block->index < (int)elements.size() && next_block->index >= 0)
{
// We need to support both SimpleBlocks and BlockGroups, check to see if the current element is either of
// these types.
next_block->block = NULL;
if (check_element_type(elements[(size_t)next_block->index], &simple_block))
{
if (simple_block->TrackNum() == search_number)
{
simple_block->SetParent(*next_block->cluster);
simple_block->SetParent(*next_block->cluster->cluster);
next_block->block = simple_block;
}
}
@ -1198,51 +1448,35 @@ std::shared_ptr<read_block_t> find_next_block(k4a_playback_context_t *context, t
{
if (block_group->TrackNumber() == search_number)
{
block_group->SetParent(*next_block->cluster);
block_group->SetParentTrack(*reader->track);
block_group->SetParent(*next_block->cluster->cluster);
block_group->SetParentTrack(*current->reader->track);
next_block->block = &GetChild<KaxBlock>(*block_group);
}
}
if (next_block->block != NULL)
{
// We found a valid block for this track, update the timestamp and return it.
next_block->timestamp_ns = next_block->block->GlobalTimecode();
next_block->sync_timestamp_ns = next_block->timestamp_ns + reader->sync_delay_ns;
if (timestamp_search)
{
if ((next && next_block->timestamp_ns >= context->seek_timestamp_ns) ||
(!next && next_block->timestamp_ns < context->seek_timestamp_ns))
{
return next_block;
}
}
else
{
return next_block;
}
next_block->sync_timestamp_ns = next_block->timestamp_ns + current->reader->sync_delay_ns;
return next_block;
}
next_block->index += next ? 1 : -1;
}
// Block wasn't found in this cluster, go to the next one
cluster_info_t *found_cluster_info = next_cluster(context, next_block->cluster_info, next);
if (found_cluster_info != NULL)
// The next block wasn't found in this cluster, go to the next cluster.
search_cluster = load_next_cluster(context, next_block->cluster.get(), next);
if (search_cluster != nullptr && search_cluster->cluster != nullptr)
{
next_block->cluster_info = found_cluster_info;
next_block->cluster = load_cluster(context, found_cluster_info);
if (next_block->cluster == nullptr)
{
LOG_ERROR("Failed to load next data cluster from disk.", 0);
return nullptr;
}
next_block->index = next ? 0 : ((int)next_block->cluster->ListSize() - 1);
}
else
{
break;
next_block->cluster = search_cluster;
next_block->index = next ? 0 : ((int)search_cluster->cluster->ListSize() - 1);
}
}
// End of file reached
// There are no more clusters, end of file was reached.
// The cluster and index are kept so that reading in the opposite direction returns a valid block.
next_block->timestamp_ns = 0;
next_block->sync_timestamp_ns = 0;
next_block->block = NULL;
return next_block;
}
@ -1255,9 +1489,7 @@ static void free_vector_buffer(void *buffer, void *context)
delete vector;
}
k4a_result_t new_capture(k4a_playback_context_t *context,
std::shared_ptr<read_block_t> &block,
k4a_capture_t *capture_handle)
k4a_result_t new_capture(k4a_playback_context_t *context, block_info_t *block, k4a_capture_t *capture_handle)
{
RETURN_VALUE_IF_ARG(K4A_RESULT_FAILED, context == NULL);
RETURN_VALUE_IF_ARG(K4A_RESULT_FAILED, capture_handle == NULL);
@ -1357,7 +1589,9 @@ k4a_stream_result_t get_capture(k4a_playback_context_t *context, k4a_capture_t *
RETURN_VALUE_IF_ARG(K4A_STREAM_RESULT_FAILED, capture_handle == NULL);
track_reader_t *blocks[] = { &context->color_track, &context->depth_track, &context->ir_track };
std::shared_ptr<read_block_t> next_blocks[] = { nullptr, nullptr, nullptr };
std::shared_ptr<block_info_t> next_blocks[] = { context->color_track.current_block,
context->depth_track.current_block,
context->ir_track.current_block };
static_assert(arraysize(blocks) == arraysize(next_blocks), "Track / block mapping does not match");
uint64_t timestamp_start_ns = UINT64_MAX;
@ -1374,7 +1608,15 @@ k4a_stream_result_t get_capture(k4a_playback_context_t *context, k4a_capture_t *
// Only read from disk if we haven't aready found the next block for this track
if (next_blocks[i] == nullptr)
{
next_blocks[i] = find_next_block(context, blocks[i], next);
next_blocks[i] = find_block(context, blocks[i], context->seek_timestamp_ns);
if (!next)
{
next_blocks[i] = next_block(context, next_blocks[i].get(), false);
}
}
else
{
next_blocks[i] = next_block(context, next_blocks[i].get(), next);
}
if (next_blocks[i] && next_blocks[i]->block)
{
@ -1444,9 +1686,15 @@ k4a_stream_result_t get_capture(k4a_playback_context_t *context, k4a_capture_t *
bool filled = false;
for (size_t i = 0; i < arraysize(blocks); i++)
{
if (!next_blocks[i] && !blocks[i]->current_block)
if (next_blocks[i] == nullptr && blocks[i]->current_block == nullptr)
{
std::shared_ptr<read_block_t> test_block = find_next_block(context, blocks[i], !next);
std::shared_ptr<block_info_t> test_block = find_block(context,
blocks[i],
context->seek_timestamp_ns);
if (next)
{
test_block = next_block(context, test_block.get(), false);
}
if (test_block && test_block->block)
{
if (next && (timestamp_end_ns - test_block->sync_timestamp_ns < context->sync_period_ns / 2))
@ -1494,7 +1742,7 @@ k4a_stream_result_t get_capture(k4a_playback_context_t *context, k4a_capture_t *
if (next_blocks[i] && next_blocks[i]->block)
{
blocks[i]->current_block = next_blocks[i];
k4a_result_t result = TRACE_CALL(new_capture(context, blocks[i]->current_block, capture_handle));
k4a_result_t result = TRACE_CALL(new_capture(context, blocks[i]->current_block.get(), capture_handle));
if (K4A_FAILED(result))
{
if (*capture_handle != NULL)

Просмотреть файл

@ -157,33 +157,34 @@ write_track_data(k4a_record_context_t *context, KaxTrackEntry *track, uint64_t t
RETURN_VALUE_IF_ARG(K4A_RESULT_FAILED, track == NULL);
RETURN_VALUE_IF_ARG(K4A_RESULT_FAILED, buffer == NULL);
if (Lock(context->pending_cluster_lock) != LOCK_OK)
try
{
LOG_ERROR("Failed to lock pending clusters", 0);
std::lock_guard<std::mutex> lock(context->pending_cluster_lock);
if (context->most_recent_timestamp < timestamp_ns)
{
context->most_recent_timestamp = timestamp_ns;
}
cluster_t *cluster = get_cluster_for_timestamp(context, timestamp_ns);
if (cluster == NULL)
{
// The timestamp is too old, the block of data has already been written.
return K4A_RESULT_FAILED;
}
track_data_t data = { track, buffer };
cluster->data.push_back(std::make_pair(timestamp_ns, data));
}
catch (std::system_error &e)
{
LOG_ERROR("Failed to write track data to queue: %s", e.what());
return K4A_RESULT_FAILED;
}
if (context->most_recent_timestamp < timestamp_ns)
if (context->writer_notify)
{
context->most_recent_timestamp = timestamp_ns;
}
cluster_t *cluster = get_cluster_for_timestamp(context, timestamp_ns);
if (cluster == NULL)
{
// The timestamp is too old, the block of data has already been written.
Unlock(context->pending_cluster_lock);
return K4A_RESULT_FAILED;
}
track_data_t data = { track, buffer };
cluster->data.push_back(std::make_pair(timestamp_ns, data));
Unlock(context->pending_cluster_lock);
if (!context->writer_stopping && Condition_Post(context->writer_notify) != COND_OK)
{
LOG_ERROR("Failed to notify writer thread", 0);
// Data was still written in this case, so don't return failure
context->writer_notify->notify_one();
}
return K4A_RESULT_SUCCEEDED;
@ -356,7 +357,7 @@ k4a_result_t write_cluster(k4a_record_context_t *context, cluster_t *cluster, ui
{
new_cluster->Render(*context->ebml_file, cues);
}
catch (std::ios_base::failure e)
catch (std::ios_base::failure &e)
{
LOG_ERROR("Failed to write recording data '%s': %s", context->file_path, e.what());
result = K4A_RESULT_FAILED;
@ -379,24 +380,24 @@ k4a_result_t write_cluster(k4a_record_context_t *context, cluster_t *cluster, ui
return result;
}
static int matroska_writer_thread(void *context_ptr)
static void matroska_writer_thread(k4a_record_context_t *context)
{
k4a_record_context_t *context = (k4a_record_context_t *)context_ptr;
assert(context->writer_notify);
assert(context->writer_lock);
k4a_result_t result = K4A_RESULT_SUCCEEDED;
if (Lock(context->writer_lock) != LOCK_OK)
try
{
LOG_ERROR("Writer thread failed Lock", 0);
result = K4A_RESULT_FAILED;
}
std::unique_lock<std::mutex> lock(context->writer_lock);
while (!context->writer_stopping && result == K4A_RESULT_SUCCEEDED)
{
if (Lock(context->pending_cluster_lock) == LOCK_OK)
LargeFileIOCallback *file_io = dynamic_cast<LargeFileIOCallback *>(context->ebml_file.get());
if (file_io != NULL)
{
file_io->setOwnerThread();
}
while (!context->writer_stopping)
{
context->pending_cluster_lock.lock();
// Check the oldest pending cluster to see if we should write to disk.
cluster_t *oldest_cluster = NULL;
if (!context->pending_clusters->empty())
@ -413,88 +414,72 @@ static int matroska_writer_thread(void *context_ptr)
oldest_cluster = NULL;
}
}
Unlock(context->pending_cluster_lock);
context->pending_cluster_lock.unlock();
if (oldest_cluster)
{
result = TRACE_CALL(write_cluster(context, oldest_cluster));
k4a_result_t result = TRACE_CALL(write_cluster(context, oldest_cluster));
if (K4A_FAILED(result))
{
// write_cluster failures are not recoverable (file IO errors only, the file is likely corrupt)
LOG_ERROR("Cluster write failed, dropping cluster.", 0);
LOG_ERROR("Cluster write failed, writer thread exiting.", 0);
break;
}
}
// Wait until more clusters arrive up to 100ms, or 1ms if the queue is not empty.
COND_RESULT cond = Condition_Wait(context->writer_notify, context->writer_lock, oldest_cluster ? 1 : 100);
if (cond != COND_OK && cond != COND_TIMEOUT)
context->writer_notify->wait_for(lock, std::chrono::milliseconds(oldest_cluster ? 1 : 100));
if (file_io != NULL)
{
LOG_ERROR("Writer thread failed Condition_Wait: %d", cond);
result = K4A_RESULT_FAILED;
break;
file_io->setOwnerThread();
}
}
}
if (Unlock(context->writer_lock) != LOCK_OK)
catch (std::system_error &e)
{
LOG_ERROR("Writer thread failed Unlock", 0);
result = K4A_RESULT_FAILED;
LOG_ERROR("Writer thread threw exception: %s", e.what());
}
ThreadAPI_Exit((int)result);
return 0;
}
k4a_result_t start_matroska_writer_thread(k4a_record_context_t *context)
{
RETURN_VALUE_IF_ARG(K4A_RESULT_FAILED, context == NULL);
RETURN_VALUE_IF_ARG(K4A_RESULT_FAILED, context->writer_thread);
RETURN_VALUE_IF_ARG(K4A_RESULT_FAILED, context->writer_thread.joinable());
context->writer_notify = Condition_Init();
RETURN_VALUE_IF_ARG(K4A_RESULT_FAILED, !context->writer_notify);
context->writer_lock = Lock_Init();
RETURN_VALUE_IF_ARG(K4A_RESULT_FAILED, !context->writer_lock);
context->writer_stopping = false;
if (ThreadAPI_Create(&context->writer_thread, matroska_writer_thread, context) != THREADAPI_OK)
try
{
context->writer_thread = 0;
LOG_ERROR("Failed to start recording writer thread.", 0);
context->writer_notify.reset(new std::condition_variable());
context->writer_stopping = false;
context->writer_thread = std::thread(matroska_writer_thread, context);
}
catch (std::system_error &e)
{
LOG_ERROR("Failed to start recording writer thread: %s", e.what());
return K4A_RESULT_FAILED;
}
return K4A_RESULT_SUCCEEDED;
}
// May return failure if thread encountered an error while running
k4a_result_t stop_matroska_writer_thread(k4a_record_context_t *context)
void stop_matroska_writer_thread(k4a_record_context_t *context)
{
RETURN_VALUE_IF_ARG(K4A_RESULT_FAILED, context == NULL);
RETURN_VALUE_IF_ARG(K4A_RESULT_FAILED, !context->writer_thread);
RETURN_VALUE_IF_ARG(VOID_VALUE, context == NULL);
RETURN_VALUE_IF_ARG(VOID_VALUE, context->writer_notify == nullptr);
RETURN_VALUE_IF_ARG(VOID_VALUE, !context->writer_thread.joinable());
context->writer_stopping = true;
if (Condition_Post(context->writer_notify) != COND_OK)
try
{
// If this fails, the thread will still eventually stop via the writer_stopping flag.
LOG_WARNING("Failed to notify writer thread to stop.", 0);
context->writer_stopping = true;
context->writer_notify->notify_one();
context->writer_thread.join();
}
k4a_result_t result = K4A_RESULT_SUCCEEDED;
if (ThreadAPI_Join(context->writer_thread, (int *)&result) != THREADAPI_OK)
catch (std::system_error &e)
{
LOG_ERROR("Failed to stop recording writer thread.", 0);
result = K4A_RESULT_FAILED;
LOG_ERROR("Failed to stop recording writer thread: %s", e.what());
}
Condition_Deinit(context->writer_notify);
Lock_Deinit(context->writer_lock);
context->writer_thread = 0;
return result;
}
KaxTag *
@ -570,4 +555,5 @@ k4a_result_t get_matroska_segment(k4a_record_context_t *context,
*iocallback = context->ebml_file.get();
return K4A_RESULT_SUCCEEDED;
}
} // namespace k4arecord

Просмотреть файл

@ -37,13 +37,14 @@ k4a_result_t k4a_playback_open(const char *path, k4a_playback_t *playback_handle
{
context->logger_handle = logger_handle;
context->file_path = path;
context->file_closing = false;
try
{
context->ebml_file = make_unique<LargeFileIOCallback>(path, MODE_READ);
context->stream = make_unique<libebml::EbmlStream>(*context->ebml_file);
}
catch (std::ios_base::failure e)
catch (std::ios_base::failure &e)
{
LOG_ERROR("Unable to open file '%s': %s", path, e.what());
result = K4A_RESULT_FAILED;
@ -58,12 +59,21 @@ k4a_result_t k4a_playback_open(const char *path, k4a_playback_t *playback_handle
if (K4A_SUCCEEDED(result))
{
// Seek to the first cluster
context->seek_cluster = find_cluster(context, 0);
if (context->seek_cluster == nullptr)
cluster_info_t *seek_cluster_info = find_cluster(context, 0);
if (seek_cluster_info == NULL)
{
LOG_ERROR("Failed to parse recording, recording is empty.", 0);
result = K4A_RESULT_FAILED;
}
else
{
context->seek_cluster = load_cluster(context, seek_cluster_info);
if (context->seek_cluster == nullptr)
{
LOG_ERROR("Failed to load first data cluster of recording.", 0);
result = K4A_RESULT_FAILED;
}
}
}
if (K4A_SUCCEEDED(result))
@ -78,7 +88,7 @@ k4a_result_t k4a_playback_open(const char *path, k4a_playback_t *playback_handle
{
context->ebml_file->close();
}
catch (std::ios_base::failure e)
catch (std::ios_base::failure &)
{
// The file was opened as read-only, ignore any close failures.
}
@ -295,18 +305,24 @@ k4a_result_t k4a_playback_seek_timestamp(k4a_playback_t playback_handle,
target_time_ns = (uint64_t)offset_usec * 1000;
}
k4a_result_t result = K4A_RESULT_SUCCEEDED;
cluster_info_t *seek_cluster = find_cluster(context, target_time_ns);
result = K4A_RESULT_FROM_BOOL(seek_cluster != nullptr);
if (K4A_SUCCEEDED(result))
cluster_info_t *seek_cluster_info = find_cluster(context, target_time_ns);
if (seek_cluster_info == NULL)
{
context->seek_cluster = seek_cluster;
reset_seek_pointers(context, target_time_ns);
LOG_ERROR("Failed to find cluster for timestamp: %llu ns", target_time_ns);
return K4A_RESULT_FAILED;
}
return result;
std::shared_ptr<loaded_cluster_t> seek_cluster = load_cluster(context, seek_cluster_info);
if (seek_cluster == nullptr || seek_cluster->cluster == nullptr)
{
LOG_ERROR("Failed to load data cluster at timestamp: %llu ns", target_time_ns);
return K4A_RESULT_FAILED;
}
context->seek_cluster = seek_cluster;
reset_seek_pointers(context, target_time_ns);
return K4A_RESULT_SUCCEEDED;
}
uint64_t k4a_playback_get_last_timestamp_usec(k4a_playback_t playback_handle)
@ -329,15 +345,28 @@ void k4a_playback_close(const k4a_playback_t playback_handle)
LOG_TRACE(" Seek count: %llu", context->seek_count);
LOG_TRACE(" Cluster load count: %llu", context->load_count);
LOG_TRACE(" Cluster cache hits: %llu", context->cache_hits);
context->file_closing = true;
try
{
try
{
context->io_lock.lock();
}
catch (std::system_error &)
{
// Lock is in a bad state, close the file anyway.
}
context->ebml_file->close();
}
catch (std::ios_base::failure e)
catch (std::ios_base::failure &)
{
// The file was opened as read-only, ignore any close failures.
}
context->io_lock.unlock();
// After this destroy, logging will no longer happen.
if (context->logger_handle)
{

Просмотреть файл

@ -45,7 +45,7 @@ k4a_result_t k4a_record_create(const char *path,
{
context->ebml_file = make_unique<LargeFileIOCallback>(path, MODE_CREATE);
}
catch (std::ios_base::failure e)
catch (std::ios_base::failure &e)
{
LOG_ERROR("Unable to open file '%s': %s", path, e.what());
result = K4A_RESULT_FAILED;
@ -57,7 +57,6 @@ k4a_result_t k4a_record_create(const char *path,
context->device = device;
context->device_config = device_config;
context->pending_clusters = make_unique<std::list<cluster_t *>>();
context->pending_cluster_lock = Lock_Init();
context->timecode_scale = MATROSKA_TIMESCALE_NS;
context->camera_fps = k4a_convert_fps_to_uint(device_config.camera_fps);
@ -321,7 +320,7 @@ k4a_result_t k4a_record_create(const char *path,
{
context->ebml_file->close();
}
catch (std::ios_base::failure e)
catch (std::ios_base::failure &)
{
// The file is empty at this point, ignore any close failures.
}
@ -445,7 +444,7 @@ k4a_result_t k4a_record_write_header(const k4a_record_t recording_handle)
context->tags_void->Render(*context->ebml_file);
}
}
catch (std::ios_base::failure e)
catch (std::ios_base::failure &e)
{
LOG_ERROR("Failed to write recording header '%s': %s", context->file_path, e.what());
return K4A_RESULT_FAILED;
@ -579,113 +578,120 @@ k4a_result_t k4a_record_flush(const k4a_record_t recording_handle)
RETURN_VALUE_IF_ARG(K4A_RESULT_FAILED, context == NULL);
RETURN_VALUE_IF_ARG(K4A_RESULT_FAILED, !context->header_written);
// Lock the writer thread first so we don't have conflicts
if (Lock(context->writer_lock) == LOCK_OK)
try
{
if (Lock(context->pending_cluster_lock) == LOCK_OK)
// Lock the writer thread first so we don't have conflicts
std::lock_guard<std::mutex> writer_lock(context->writer_lock);
LargeFileIOCallback *file_io = dynamic_cast<LargeFileIOCallback *>(context->ebml_file.get());
if (file_io != NULL)
{
if (!context->pending_clusters->empty())
{
for (cluster_t *cluster : *context->pending_clusters)
{
k4a_result_t write_result = TRACE_CALL(
write_cluster(context, cluster, &context->last_written_timestamp));
if (K4A_FAILED(write_result))
{
// Try to flush as much of the recording as possible to disk before returning any errors.
result = write_result;
}
}
context->pending_clusters->clear();
}
try
{
auto &segment_info = GetChild<KaxInfo>(*context->file_segment);
uint64_t current_position = context->ebml_file->getFilePointer();
// Update segment info
GetChild<KaxDuration>(segment_info)
.SetValue((double)((context->most_recent_timestamp - context->start_timestamp_offset) /
context->timecode_scale));
context->segment_info_void->ReplaceWith(segment_info, *context->ebml_file);
// Render cues
auto &cues = GetChild<KaxCues>(*context->file_segment);
cues.Render(*context->ebml_file);
// Update tags
auto &tags = GetChild<KaxTags>(*context->file_segment);
if (tags.GetElementPosition() > 0)
{
context->ebml_file->setFilePointer((int64_t)tags.GetElementPosition());
tags.Render(*context->ebml_file);
if (tags.GetEndPosition() != context->tags_void->GetElementPosition())
{
// Rewrite the void block after tags
EbmlVoid tags_void;
tags_void.SetSize(context->tags_void->GetSize() -
(tags.GetEndPosition() - context->tags_void->GetElementPosition()));
tags_void.Render(*context->ebml_file);
}
}
{ // Update seek info
auto &seek_head = GetChild<KaxSeekHead>(*context->file_segment);
seek_head.RemoveAll(); // Remove any seek entries from previous flushes
seek_head.IndexThis(segment_info, *context->file_segment);
auto &tracks = GetChild<KaxTracks>(*context->file_segment);
if (tracks.GetElementPosition() > 0)
{
seek_head.IndexThis(tracks, *context->file_segment);
}
auto &attachments = GetChild<KaxAttachments>(*context->file_segment);
if (attachments.GetElementPosition() > 0)
{
seek_head.IndexThis(attachments, *context->file_segment);
}
if (tags.GetElementPosition() > 0)
{
seek_head.IndexThis(tags, *context->file_segment);
}
if (cues.GetElementPosition() > 0)
{
seek_head.IndexThis(cues, *context->file_segment);
}
context->seek_void->ReplaceWith(seek_head, *context->ebml_file);
}
// Update the file segment head to write the current size
context->ebml_file->setFilePointer(0, seek_end);
uint64 segment_size = context->ebml_file->getFilePointer() -
context->file_segment->GetElementPosition() - context->file_segment->HeadSize();
// Segment size can only be set once normally, so force the flag.
context->file_segment->SetSizeInfinite(true);
if (!context->file_segment->ForceSize(segment_size))
{
LOG_ERROR("Failed set file segment size.", 0);
}
context->file_segment->OverwriteHead(*context->ebml_file);
// Set the write pointer back in case we're not done recording yet.
assert(current_position <= INT64_MAX);
context->ebml_file->setFilePointer((int64_t)current_position);
}
catch (std::ios_base::failure e)
{
LOG_ERROR("Failed to write recording '%s': %s", context->file_path, e.what());
result = K4A_RESULT_FAILED;
}
Unlock(context->pending_cluster_lock);
file_io->setOwnerThread();
}
Unlock(context->writer_lock);
std::lock_guard<std::mutex> cluster_lock(context->pending_cluster_lock);
if (!context->pending_clusters->empty())
{
for (cluster_t *cluster : *context->pending_clusters)
{
k4a_result_t write_result = TRACE_CALL(
write_cluster(context, cluster, &context->last_written_timestamp));
if (K4A_FAILED(write_result))
{
// Try to flush as much of the recording as possible to disk before returning any errors.
result = write_result;
}
}
context->pending_clusters->clear();
}
auto &segment_info = GetChild<KaxInfo>(*context->file_segment);
uint64_t current_position = context->ebml_file->getFilePointer();
// Update segment info
GetChild<KaxDuration>(segment_info)
.SetValue(
(double)((context->most_recent_timestamp - context->start_timestamp_offset) / context->timecode_scale));
context->segment_info_void->ReplaceWith(segment_info, *context->ebml_file);
// Render cues
auto &cues = GetChild<KaxCues>(*context->file_segment);
cues.Render(*context->ebml_file);
// Update tags
auto &tags = GetChild<KaxTags>(*context->file_segment);
if (tags.GetElementPosition() > 0)
{
context->ebml_file->setFilePointer((int64_t)tags.GetElementPosition());
tags.Render(*context->ebml_file);
if (tags.GetEndPosition() != context->tags_void->GetElementPosition())
{
// Rewrite the void block after tags
EbmlVoid tags_void;
tags_void.SetSize(context->tags_void->GetSize() -
(tags.GetEndPosition() - context->tags_void->GetElementPosition()));
tags_void.Render(*context->ebml_file);
}
}
{ // Update seek info
auto &seek_head = GetChild<KaxSeekHead>(*context->file_segment);
seek_head.RemoveAll(); // Remove any seek entries from previous flushes
seek_head.IndexThis(segment_info, *context->file_segment);
auto &tracks = GetChild<KaxTracks>(*context->file_segment);
if (tracks.GetElementPosition() > 0)
{
seek_head.IndexThis(tracks, *context->file_segment);
}
auto &attachments = GetChild<KaxAttachments>(*context->file_segment);
if (attachments.GetElementPosition() > 0)
{
seek_head.IndexThis(attachments, *context->file_segment);
}
if (tags.GetElementPosition() > 0)
{
seek_head.IndexThis(tags, *context->file_segment);
}
if (cues.GetElementPosition() > 0)
{
seek_head.IndexThis(cues, *context->file_segment);
}
context->seek_void->ReplaceWith(seek_head, *context->ebml_file);
}
// Update the file segment head to write the current size
context->ebml_file->setFilePointer(0, seek_end);
uint64 segment_size = context->ebml_file->getFilePointer() - context->file_segment->GetElementPosition() -
context->file_segment->HeadSize();
// Segment size can only be set once normally, so force the flag.
context->file_segment->SetSizeInfinite(true);
if (!context->file_segment->ForceSize(segment_size))
{
LOG_ERROR("Failed set file segment size.", 0);
}
context->file_segment->OverwriteHead(*context->ebml_file);
// Set the write pointer back in case we're not done recording yet.
assert(current_position <= INT64_MAX);
context->ebml_file->setFilePointer((int64_t)current_position);
}
catch (std::ios_base::failure &e)
{
LOG_ERROR("Failed to write recording '%s': %s", context->file_path, e.what());
return K4A_RESULT_FAILED;
}
catch (std::system_error &e)
{
LOG_ERROR("Failed to flush recording '%s': %s", context->file_path, e.what());
return K4A_RESULT_FAILED;
}
return result;
}
@ -702,16 +708,14 @@ void k4a_record_close(const k4a_record_t recording_handle)
{
// If these fail, there's nothing we can do but log.
(void)TRACE_CALL(k4a_record_flush(recording_handle));
(void)TRACE_CALL(stop_matroska_writer_thread(context));
stop_matroska_writer_thread(context);
}
Lock_Deinit(context->pending_cluster_lock);
try
{
context->ebml_file->close();
}
catch (std::ios_base::failure e)
catch (std::ios_base::failure &e)
{
LOG_ERROR("Failed to close recording '%s': %s", context->file_path, e.what());
}

Просмотреть файл

@ -3,6 +3,7 @@
add_executable(record_ut record_ut.cpp)
add_executable(playback_ut playback_ut.cpp test_helpers.cpp sample_recordings.cpp)
add_executable(playback_perf playback_perf.cpp test_helpers.cpp)
target_link_libraries(record_ut PRIVATE
k4ainternal::utcommon
@ -15,9 +16,16 @@ target_link_libraries(playback_ut PRIVATE
k4a::k4arecord
)
target_link_libraries(playback_perf PRIVATE
k4ainternal::utcommon
k4ainternal::playback
k4a::k4arecord
)
# Include the PUBLIC and INTERFACE directories specified by k4ainternal::record
target_include_directories(record_ut PRIVATE $<TARGET_PROPERTY:k4ainternal::record,INTERFACE_INCLUDE_DIRECTORIES>)
target_include_directories(playback_ut PRIVATE $<TARGET_PROPERTY:k4ainternal::playback,INTERFACE_INCLUDE_DIRECTORIES>)
target_include_directories(playback_perf PRIVATE $<TARGET_PROPERTY:k4ainternal::playback,INTERFACE_INCLUDE_DIRECTORIES>)
k4a_add_tests(TARGET record_ut TEST_TYPE UNIT)
k4a_add_tests(TARGET playback_ut TEST_TYPE UNIT)

Просмотреть файл

@ -0,0 +1,180 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
#include <utcommon.h>
#include <k4a/k4a.h>
#include <k4ainternal/common.h>
#include <k4ainternal/matroska_common.h>
#include "test_helpers.h"
#include <fstream>
// Module being tested
#include <k4arecord/playback.h>
using namespace testing;
static std::string g_test_file_name;
class playback_perf : public ::testing::Test
{
protected:
void SetUp() override {}
void TearDown() override {}
};
TEST_F(playback_perf, test_open)
{
k4a_playback_t handle = NULL;
k4a_result_t result = K4A_RESULT_FAILED;
{
Timer t("File open: " + g_test_file_name);
result = k4a_playback_open(g_test_file_name.c_str(), &handle);
}
ASSERT_EQ(result, K4A_RESULT_SUCCEEDED);
k4a_record_configuration_t config;
result = k4a_playback_get_record_configuration(handle, &config);
ASSERT_EQ(result, K4A_RESULT_SUCCEEDED);
std::cout << "Config:" << std::endl;
std::cout << " Tracks enabled:";
static const std::pair<bool *, std::string> tracks[] = { { &config.color_track_enabled, "Color" },
{ &config.depth_track_enabled, "Depth" },
{ &config.ir_track_enabled, "IR" },
{ &config.imu_track_enabled, "IMU" } };
for (int i = 0; i < 4; i++)
{
if (*tracks[i].first)
{
std::cout << " " << tracks[i].second;
}
}
std::cout << std::endl;
std::cout << " Color format: " << format_names[config.color_format] << std::endl;
std::cout << " Color resolution: " << resolution_names[config.color_resolution] << std::endl;
std::cout << " Depth mode: " << depth_names[config.depth_mode] << std::endl;
std::cout << " Frame rate: " << fps_names[config.camera_fps] << std::endl;
std::cout << " Depth delay: " << config.depth_delay_off_color_usec << " usec" << std::endl;
std::cout << " Start offset: " << config.start_timestamp_offset_usec << " usec" << std::endl;
k4a_playback_close(handle);
}
TEST_F(playback_perf, test_1000_reads_forward)
{
k4a_playback_t handle = NULL;
k4a_result_t result = K4A_RESULT_FAILED;
{
Timer t("File open: " + g_test_file_name);
result = k4a_playback_open(g_test_file_name.c_str(), &handle);
}
ASSERT_EQ(result, K4A_RESULT_SUCCEEDED);
{
k4a_capture_t capture = NULL;
k4a_stream_result_t playback_result = K4A_STREAM_RESULT_FAILED;
Timer t("Next capture x1000");
for (int i = 0; i < 1000; i++)
{
playback_result = k4a_playback_get_next_capture(handle, &capture);
ASSERT_EQ(playback_result, K4A_STREAM_RESULT_SUCCEEDED);
ASSERT_NE(capture, nullptr);
k4a_capture_release(capture);
}
}
k4a_playback_close(handle);
}
TEST_F(playback_perf, test_1000_reads_backward)
{
k4a_playback_t handle = NULL;
k4a_result_t result = K4A_RESULT_FAILED;
{
Timer t("File open: " + g_test_file_name);
result = k4a_playback_open(g_test_file_name.c_str(), &handle);
}
ASSERT_EQ(result, K4A_RESULT_SUCCEEDED);
{
Timer t("Seek to end");
result = k4a_playback_seek_timestamp(handle, 0, K4A_PLAYBACK_SEEK_END);
ASSERT_EQ(result, K4A_RESULT_SUCCEEDED);
}
{
k4a_capture_t capture = NULL;
k4a_stream_result_t playback_result = K4A_STREAM_RESULT_FAILED;
Timer t("Previous capture x1000");
for (int i = 0; i < 1000; i++)
{
playback_result = k4a_playback_get_previous_capture(handle, &capture);
ASSERT_EQ(playback_result, K4A_STREAM_RESULT_SUCCEEDED);
ASSERT_NE(capture, nullptr);
k4a_capture_release(capture);
}
}
k4a_playback_close(handle);
}
TEST_F(playback_perf, test_read_latency_30fps)
{
k4a_playback_t handle = NULL;
k4a_result_t result = K4A_RESULT_FAILED;
{
Timer t("File open: " + g_test_file_name);
result = k4a_playback_open(g_test_file_name.c_str(), &handle);
}
ASSERT_EQ(result, K4A_RESULT_SUCCEEDED);
std::vector<int64_t> deltas;
{
k4a_capture_t capture = NULL;
k4a_stream_result_t playback_result = K4A_STREAM_RESULT_FAILED;
Timer t("Next capture x1000");
for (int i = 0; i < 1000; i++)
{
auto start = std::chrono::high_resolution_clock::now();
playback_result = k4a_playback_get_next_capture(handle, &capture);
auto delta = std::chrono::high_resolution_clock::now() - start;
ASSERT_EQ(playback_result, K4A_STREAM_RESULT_SUCCEEDED);
ASSERT_NE(capture, nullptr);
k4a_capture_release(capture);
deltas.push_back(delta.count());
std::this_thread::sleep_until(start + std::chrono::milliseconds(33));
}
}
std::sort(deltas.begin(), deltas.end(), std::less<int64_t>());
int64_t total_ns = 0;
for (auto d : deltas)
{
total_ns += d;
}
std::cout << "Avg latency: " << (total_ns / (int64_t)deltas.size() / 1000) << " usec" << std::endl;
std::cout << "P95 latency: " << (deltas[(size_t)((double)deltas.size() * 0.95) - 1] / 1000) << " usec" << std::endl;
std::cout << "P99 latency: " << (deltas[(size_t)((double)deltas.size() * 0.99) - 1] / 1000) << " usec" << std::endl;
k4a_playback_close(handle);
}
int main(int argc, char **argv)
{
k4a_unittest_init();
::testing::InitGoogleTest(&argc, argv);
if (argc < 2)
{
std::cout << "Usage: playback_perf <options> <testfile.mkv>" << std::endl;
return 1;
}
g_test_file_name = std::string(argv[1]);
return RUN_ALL_TESTS();
}

Просмотреть файл

@ -4,9 +4,12 @@
#include <utcommon.h>
#include <k4a/k4a.h>
#include <k4ainternal/common.h>
#include <k4ainternal/matroska_common.h>
#include "test_helpers.h"
#include <fstream>
#include <thread>
#include <chrono>
// Module being tested
#include <k4arecord/playback.h>
@ -404,7 +407,6 @@ TEST_F(playback_ut, playback_seek_test)
result = k4a_playback_seek_timestamp(handle, seek.first, seek.second);
ASSERT_EQ(result, K4A_RESULT_SUCCEEDED);
// __debugbreak();
stream_result = k4a_playback_get_next_capture(handle, &capture);
ASSERT_EQ(stream_result, K4A_STREAM_RESULT_SUCCEEDED);
ASSERT_TRUE(validate_test_capture(capture,
@ -638,76 +640,6 @@ TEST_F(playback_ut, open_skipped_frames_file)
k4a_playback_close(handle);
}
TEST_F(playback_ut, DISABLED_open_test_file)
{
k4a_playback_t handle;
k4a_result_t result = k4a_playback_open("test.mkv", &handle);
ASSERT_EQ(result, K4A_RESULT_SUCCEEDED);
uint8_t buffer[8096];
size_t buffer_size = 8096;
k4a_buffer_result_t buffer_result = k4a_playback_get_raw_calibration(handle, &buffer[0], &buffer_size);
ASSERT_EQ(buffer_result, K4A_BUFFER_RESULT_SUCCEEDED);
k4a_calibration_t calibration;
result = k4a_playback_get_calibration(handle, &calibration);
ASSERT_EQ(result, K4A_RESULT_SUCCEEDED);
result = k4a_playback_get_calibration(handle, &calibration);
ASSERT_EQ(result, K4A_RESULT_SUCCEEDED);
std::cout << "Previous capture" << std::endl;
k4a_capture_t capture = NULL;
k4a_stream_result_t playback_result = k4a_playback_get_previous_capture(handle, &capture);
ASSERT_EQ(playback_result, K4A_STREAM_RESULT_EOF);
ASSERT_EQ(capture, nullptr);
std::cout << "Next capture x10" << std::endl;
for (int i = 0; i < 10; i++)
{
playback_result = k4a_playback_get_next_capture(handle, &capture);
ASSERT_EQ(playback_result, K4A_STREAM_RESULT_SUCCEEDED);
ASSERT_NE(capture, nullptr);
k4a_capture_release(capture);
}
std::cout << "Previous capture x10" << std::endl;
for (int i = 0; i < 9; i++)
{
playback_result = k4a_playback_get_previous_capture(handle, &capture);
ASSERT_EQ(playback_result, K4A_STREAM_RESULT_SUCCEEDED);
ASSERT_NE(capture, nullptr);
k4a_capture_release(capture);
}
playback_result = k4a_playback_get_previous_capture(handle, &capture);
ASSERT_EQ(playback_result, K4A_STREAM_RESULT_EOF);
ASSERT_EQ(capture, nullptr);
k4a_record_configuration_t config;
result = k4a_playback_get_record_configuration(handle, &config);
ASSERT_EQ(result, K4A_RESULT_SUCCEEDED);
std::cout << "Config:" << std::endl;
std::cout << " Tracks enabled:";
static const std::pair<bool *, std::string> tracks[] = { { &config.color_track_enabled, "Color" },
{ &config.depth_track_enabled, "Depth" },
{ &config.ir_track_enabled, "IR" },
{ &config.imu_track_enabled, "IMU" } };
for (int i = 0; i < 4; i++)
{
if (*tracks[i].first)
{
std::cout << " " << tracks[i].second;
}
}
std::cout << std::endl;
std::cout << " Color format: " << format_names[config.color_format] << std::endl;
std::cout << " Color resolution: " << resolution_names[config.color_resolution] << std::endl;
std::cout << " Depth mode: " << depth_names[config.depth_mode] << std::endl;
std::cout << " Frame rate: " << fps_names[config.camera_fps] << std::endl;
std::cout << " Depth delay: " << config.depth_delay_off_color_usec << " usec" << std::endl;
std::cout << " Start offset: " << config.start_timestamp_offset_usec << " usec" << std::endl;
k4a_playback_close(handle);
}
int main(int argc, char **argv)
{
k4a_unittest_init();

Просмотреть файл

@ -6,6 +6,8 @@
#include <utcommon.h>
#include <k4a/k4a.h>
#include <chrono>
#include <iostream>
static const char *const format_names[] = { "K4A_IMAGE_FORMAT_COLOR_MJPG", "K4A_IMAGE_FORMAT_COLOR_NV12",
"K4A_IMAGE_FORMAT_COLOR_YUY2", "K4A_IMAGE_FORMAT_COLOR_BGRA32",
@ -52,4 +54,24 @@ protected:
void TearDown() override;
};
class Timer
{
public:
Timer(std::string _name) : name(_name)
{
std::cout << "Start Timer(" << name << ")" << std::endl;
start = std::chrono::high_resolution_clock::now();
}
~Timer()
{
auto delta = std::chrono::high_resolution_clock::now() - start;
std::cout << "End Timer(" << name << "): " << ((float)delta.count() / 1000000.0f) << " ms" << std::endl;
}
private:
std::string name;
std::chrono::time_point<std::chrono::high_resolution_clock> start;
};
#endif /* RECORD_TEST_HELPERS_H */