This commit is contained in:
Evan Cui 2016-06-03 04:20:20 -07:00
Родитель 988e8d8d5d
Коммит 31b1eab09d
53 изменённых файлов: 2497 добавлений и 1758 удалений

Просмотреть файл

@ -45,7 +45,7 @@
<Add option="-fexceptions" />
<Add option="-Wno-unused-local-typedefs" />
<Add option="-std=c++1y" />
<Add directory="../../hpccasa/Release/include" />
<Add directory="../../evancasa/cpprestsdk/Release/include" />
<Add directory="../../spdlog/include" />
</Compiler>
<Unit filename="Readme.txt" />
@ -111,6 +111,8 @@
<Unit filename="data/TaskInfo.h" />
<Unit filename="data/Umid.cpp" />
<Unit filename="data/Umid.h" />
<Unit filename="filters/ExecutionFilter.cpp" />
<Unit filename="filters/ExecutionFilter.h" />
<Unit filename="main.cpp" />
<Unit filename="makefile" />
<Unit filename="scripts/CleanupAllTasks.sh" />
@ -122,6 +124,8 @@
<Unit filename="scripts/Statistics.sh" />
<Unit filename="scripts/WaitForTrust.sh" />
<Unit filename="scripts/common.sh" />
<Unit filename="test/ExecutionFilterTest.cpp" />
<Unit filename="test/ExecutionFilterTest.h" />
<Unit filename="test/ProcessTest.cpp" />
<Unit filename="test/ProcessTest.h" />
<Unit filename="test/TestRunner.cpp" />

Просмотреть файл

@ -251,25 +251,26 @@
<sstream>
"String.h"
1427353132 /home/evanc/spdlog/include/spdlog/spdlog.h
1440745033 /home/evanc/spdlog/include/spdlog/spdlog.h
"tweakme.h"
"common.h"
"logger.h"
"details/spdlog_impl.h"
1427353132 /home/evanc/spdlog/include/spdlog/common.h
1440745033 /home/evanc/spdlog/include/spdlog/common.h
<string>
<initializer_list>
<chrono>
<memory>
1427353132 /home/evanc/spdlog/include/spdlog/logger.h
1440745033 /home/evanc/spdlog/include/spdlog/logger.h
<vector>
<memory>
"sinks/base_sink.h"
"common.h"
"./details/logger_impl.h"
1427353132 /home/evanc/spdlog/include/spdlog/sinks/base_sink.h
1440745033 /home/evanc/spdlog/include/spdlog/sinks/base_sink.h
<string>
<mutex>
<atomic>
@ -278,14 +279,15 @@
"../common.h"
"../details/log_msg.h"
1427353132 /home/evanc/spdlog/include/spdlog/sinks/sink.h
1440745033 /home/evanc/spdlog/include/spdlog/sinks/sink.h
"../details/log_msg.h"
1427353132 /home/evanc/spdlog/include/spdlog/details/log_msg.h
1440745033 /home/evanc/spdlog/include/spdlog/details/log_msg.h
<thread>
"../common.h"
"./format.h"
1427353132 /home/evanc/spdlog/include/spdlog/details/format.h
1440745033 /home/evanc/spdlog/include/spdlog/details/format.h
<stdint.h>
<cassert>
<cmath>
@ -296,12 +298,13 @@
<stdexcept>
<string>
<sstream>
<map>
<iterator>
<intrin.h>
<utility>
"format.cc"
1427353132 /home/evanc/spdlog/include/spdlog/details/format.cc
1440745033 /home/evanc/spdlog/include/spdlog/details/format.cc
"format.h"
<string.h>
<cctype>
@ -311,12 +314,13 @@
<cstdarg>
<cstring>
<windows.h>
<windows.h>
1427353132 /home/evanc/spdlog/include/spdlog/formatter.h
"details/log_msg.h"
"details/pattern_formatter_impl.h"
1427353132 /home/evanc/spdlog/include/spdlog/details/pattern_formatter_impl.h
1440745033 /home/evanc/spdlog/include/spdlog/details/pattern_formatter_impl.h
<string>
<chrono>
<memory>
@ -326,37 +330,42 @@
"./log_msg.h"
"./os.h"
1427353132 /home/evanc/spdlog/include/spdlog/details/os.h
1440745033 /home/evanc/spdlog/include/spdlog/details/os.h
<string>
<cstdio>
<ctime>
<Windows.h>
<share.h>
<sys/syscall.h>
<unistd.h>
<thread>
"../common.h"
1427353132 /home/evanc/spdlog/include/spdlog/details/logger_impl.h
1440745033 /home/evanc/spdlog/include/spdlog/details/logger_impl.h
"./line_logger.h"
1427353132 /home/evanc/spdlog/include/spdlog/details/line_logger.h
1440745033 /home/evanc/spdlog/include/spdlog/details/line_logger.h
<type_traits>
"../common.h"
"../logger.h"
1427353132 /home/evanc/spdlog/include/spdlog/details/spdlog_impl.h
1440745033 /home/evanc/spdlog/include/spdlog/details/spdlog_impl.h
"registry.h"
"../sinks/file_sinks.h"
"../sinks/stdout_sinks.h"
"../sinks/syslog_sink.h"
1427353132 /home/evanc/spdlog/include/spdlog/details/registry.h
1440745033 /home/evanc/spdlog/include/spdlog/details/registry.h
<string>
<mutex>
<unordered_map>
<functional>
"./null_mutex.h"
"../logger.h"
"../async_logger.h"
"../common.h"
1427353132 /home/evanc/spdlog/include/spdlog/async_logger.h
1440745033 /home/evanc/spdlog/include/spdlog/async_logger.h
<chrono>
<functional>
"common.h"
@ -364,10 +373,10 @@
"spdlog.h"
"./details/async_logger_impl.h"
1427353132 /home/evanc/spdlog/include/spdlog/details/async_logger_impl.h
1440745033 /home/evanc/spdlog/include/spdlog/details/async_logger_impl.h
"./async_log_helper.h"
1427353132 /home/evanc/spdlog/include/spdlog/details/async_log_helper.h
1440745033 /home/evanc/spdlog/include/spdlog/details/async_log_helper.h
<chrono>
<thread>
<atomic>
@ -377,12 +386,13 @@
"./mpmc_bounded_q.h"
"./log_msg.h"
"./format.h"
"os.h"
1427353132 /home/evanc/spdlog/include/spdlog/details/mpmc_bounded_q.h
<atomic>
"../common.h"
1427353132 /home/evanc/spdlog/include/spdlog/sinks/file_sinks.h
1440745033 /home/evanc/spdlog/include/spdlog/sinks/file_sinks.h
<mutex>
"base_sink.h"
"../details/null_mutex.h"
@ -391,26 +401,26 @@
1427353132 /home/evanc/spdlog/include/spdlog/details/null_mutex.h
1427353132 /home/evanc/spdlog/include/spdlog/details/file_helper.h
1440745033 /home/evanc/spdlog/include/spdlog/details/file_helper.h
<string>
<thread>
<chrono>
"os.h"
1427353132 /home/evanc/spdlog/include/spdlog/sinks/stdout_sinks.h
1440745033 /home/evanc/spdlog/include/spdlog/sinks/stdout_sinks.h
<iostream>
<mutex>
"./ostream_sink.h"
"../details/null_mutex.h"
1427353132 /home/evanc/spdlog/include/spdlog/sinks/ostream_sink.h
1440745033 /home/evanc/spdlog/include/spdlog/sinks/ostream_sink.h
<ostream>
<mutex>
<memory>
"../details/null_mutex.h"
"./base_sink.h"
1427353132 /home/evanc/spdlog/include/spdlog/sinks/syslog_sink.h
1440745033 /home/evanc/spdlog/include/spdlog/sinks/syslog_sink.h
<array>
<string>
<syslog.h>
@ -427,7 +437,7 @@
"core/RemoteExecutor.h"
"Version.h"
1436264986 /home/evanc/whpc-linux-communicator/nodemanager/utils/Logger.h
1447837233 /home/evanc/whpc-linux-communicator/nodemanager/utils/Logger.h
<syslog.h>
<stdio.h>
<stdarg.h>
@ -435,24 +445,31 @@
<spdlog/spdlog.h>
"String.h"
1427722564 /home/evanc/whpc-linux-communicator/nodemanager/core/RemoteCommunicator.h
1464874241 /home/evanc/whpc-linux-communicator/nodemanager/core/RemoteCommunicator.h
<cpprest/http_listener.h>
<cpprest/json.h>
"../utils/Logger.h"
"../filters/ExecutionFilter.h"
"IRemoteExecutor.h"
1427722694 /home/evanc/whpc-linux-communicator/nodemanager/core/IRemoteExecutor.h
1437990898 /home/evanc/whpc-linux-communicator/nodemanager/core/IRemoteExecutor.h
"../arguments/StartJobAndTaskArgs.h"
"../arguments/StartTaskArgs.h"
"../arguments/EndJobArgs.h"
"../arguments/EndTaskArgs.h"
"../arguments/MetricCountersConfig.h"
1428574742 /home/evanc/whpc-linux-communicator/nodemanager/core/RemoteExecutor.h
1464733277 /home/evanc/whpc-linux-communicator/nodemanager/core/RemoteExecutor.h
<set>
<map>
"IRemoteExecutor.h"
"JobTaskTable.h"
"Monitor.h"
"Process.h"
"Reporter.h"
"HostsManager.h"
"../arguments/MetricCountersConfig.h"
"../data/ProcessStatistics.h"
1427353166 source:/home/evanc/whpc-linux-communicator/nodemanager/arguments/EndJobArgs.cpp
"EndJobArgs.h"
@ -461,7 +478,7 @@
1427353166 /home/evanc/whpc-linux-communicator/nodemanager/arguments/EndJobArgs.h
<cpprest/json.h>
1437473138 /home/evanc/whpc-linux-communicator/nodemanager/utils/JsonHelper.h
1464941459 /home/evanc/whpc-linux-communicator/nodemanager/utils/JsonHelper.h
<vector>
<map>
<cpprest/json.h>
@ -470,14 +487,14 @@
"EndTaskArgs.h"
"../utils/JsonHelper.h"
1427353166 /home/evanc/whpc-linux-communicator/nodemanager/arguments/EndTaskArgs.h
1433133379 /home/evanc/whpc-linux-communicator/nodemanager/arguments/EndTaskArgs.h
<cpprest/json.h>
1429783386 source:/home/evanc/whpc-linux-communicator/nodemanager/arguments/ProcessStartInfo.cpp
1464897409 source:/home/evanc/whpc-linux-communicator/nodemanager/arguments/ProcessStartInfo.cpp
"ProcessStartInfo.h"
"../utils/JsonHelper.h"
1429783378 /home/evanc/whpc-linux-communicator/nodemanager/arguments/ProcessStartInfo.h
1464897165 /home/evanc/whpc-linux-communicator/nodemanager/arguments/ProcessStartInfo.h
<string>
<vector>
<map>
@ -487,7 +504,7 @@
"StartJobAndTaskArgs.h"
"../utils/JsonHelper.h"
1427383873 /home/evanc/whpc-linux-communicator/nodemanager/arguments/StartJobAndTaskArgs.h
1429010700 /home/evanc/whpc-linux-communicator/nodemanager/arguments/StartJobAndTaskArgs.h
<cpprest/json.h>
"ProcessStartInfo.h"
@ -509,7 +526,7 @@
"../utils/String.h"
"../common/ErrorCodes.h"
1428574604 /home/evanc/whpc-linux-communicator/nodemanager/core/Process.h
1464871942 /home/evanc/whpc-linux-communicator/nodemanager/core/Process.h
<string>
<sstream>
<vector>
@ -524,6 +541,7 @@
"../utils/Logger.h"
"../utils/System.h"
"../common/ErrorCodes.h"
"../data/ProcessStatistics.h"
1429015468 /home/evanc/whpc-linux-communicator/nodemanager/utils/String.h
<string>
@ -538,7 +556,7 @@
"../utils/System.h"
"../arguments/StartJobAndTaskArgs.h"
1428574577 /home/evanc/whpc-linux-communicator/nodemanager/utils/System.h
1464869302 /home/evanc/whpc-linux-communicator/nodemanager/utils/System.h
<string>
"String.h"
"Logger.h"
@ -584,7 +602,7 @@
"../utils/WriterLock.h"
"../utils/ReaderLock.h"
1438324701 /home/evanc/whpc-linux-communicator/nodemanager/core/JobTaskTable.h
1438327680 /home/evanc/whpc-linux-communicator/nodemanager/core/JobTaskTable.h
<map>
<cpprest/json.h>
"../data/TaskInfo.h"
@ -592,7 +610,7 @@
"../data/NodeInfo.h"
"../utils/ReaderLock.h"
1436256679 /home/evanc/whpc-linux-communicator/nodemanager/data/TaskInfo.h
1438325484 /home/evanc/whpc-linux-communicator/nodemanager/data/TaskInfo.h
<cpprest/json.h>
<vector>
<string>
@ -662,10 +680,15 @@
1424929892 source:/home/evanc/whpc-linux-communicator/nodemanager/utils/WriterLock.cpp
"WriterLock.h"
1427383873 /home/evanc/whpc-linux-communicator/nodemanager/core/Monitor.h
1438161680 /home/evanc/whpc-linux-communicator/nodemanager/core/Monitor.h
<cpprest/json.h>
<map>
<boost/uuid/uuid.hpp>
"../utils/System.h"
"../data/MonitoringPacket.h"
"../arguments/MetricCounter.h"
"../arguments/MetricCountersConfig.h"
"MetricCollectorBase.h"
1428571893 source:/home/evanc/whpc-linux-communicator/nodemanager/core/Monitor.cpp
<pthread.h>
@ -675,7 +698,7 @@
"../utils/Logger.h"
"../utils/System.h"
1430202795 /home/evanc/whpc-linux-communicator/nodemanager/core/Reporter.h
1446103482 /home/evanc/whpc-linux-communicator/nodemanager/core/Reporter.h
<cpprest/json.h>
<functional>
"../utils/Logger.h"
@ -685,7 +708,7 @@
"Reporter.h"
"../utils/Logger.h"
1437474755 /home/evanc/whpc-linux-communicator/nodemanager/common/ErrorCodes.h
1464870624 /home/evanc/whpc-linux-communicator/nodemanager/common/ErrorCodes.h
1429084854 source:/home/evanc/whpc-linux-communicator/nodemanager/Version.cpp
"Version.h"
@ -720,7 +743,7 @@
1437547414 source:/home/evanc/whpc-linux-communicator/nodemanager/core/NodeManagerConfig.cpp
"NodeManagerConfig.h"
1440058578 /home/evanc/whpc-linux-communicator/nodemanager/core/NodeManagerConfig.h
1464734870 /home/evanc/whpc-linux-communicator/nodemanager/core/NodeManagerConfig.h
"../utils/Configuration.h"
1437732272 source:/home/evanc/whpc-linux-communicator/nodemanager/arguments/MonitoringConfig.cpp
@ -745,7 +768,7 @@
1437645025 source:/home/evanc/whpc-linux-communicator/nodemanager/core/HttpHelper.cpp
"HttpHelper.h"
1440058706 /home/evanc/whpc-linux-communicator/nodemanager/core/HttpHelper.h
1460020276 /home/evanc/whpc-linux-communicator/nodemanager/core/HttpHelper.h
<cpprest/http_client.h>
<boost/asio/ssl.hpp>
"NodeManagerConfig.h"
@ -762,7 +785,7 @@
1438238815 /home/evanc/whpc-linux-communicator/nodemanager/data/Umid.h
<inttypes.h>
1433133379 /home/evanc/whpc-linux-communicator/nodemanager/data/ProcessStatistics.h
1447837294 /home/evanc/whpc-linux-communicator/nodemanager/data/ProcessStatistics.h
<vector>
<inttypes.h>
@ -961,3 +984,259 @@
"cpprest/http_msg.h"
"cpprest/details/http_constants.dat"
1464885862 source:/home/evanc/whpc-linux-communicator/nodemanager/test/ExecutionFilterTest.cpp
"ExecutionFilterTest.h"
<iostream>
<string>
<cpprest/http_listener.h>
<cpprest/json.h>
"../utils/JsonHelper.h"
"../core/Process.h"
"../utils/Logger.h"
"../core/RemoteCommunicator.h"
"../core/RemoteExecutor.h"
"../core/NodeManagerConfig.h"
"../common/ErrorCodes.h"
"../core/HttpHelper.h"
1464883806 /home/evanc/whpc-linux-communicator/nodemanager/test/ExecutionFilterTest.h
1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/http_listener.h
<limits>
<functional>
"cpprest/http_msg.h"
<boost/asio/ssl.hpp>
1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/http_msg.h
<map>
<memory>
<string>
<vector>
<system_error>
"pplx/pplxtasks.h"
"cpprest/json.h"
"cpprest/uri.h"
"cpprest/http_headers.h"
"cpprest/details/cpprest_compat.h"
"cpprest/asyncrt_utils.h"
"cpprest/streams.h"
"cpprest/containerstream.h"
"cpprest/details/http_constants.dat"
"cpprest/details/http_constants.dat"
"cpprest/details/http_constants.dat"
1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/pplx/pplxtasks.h
<ppltasks.h>
<concrt.h>
"pplx/pplx.h"
<jni.h>
<functional>
<vector>
<utility>
<exception>
<algorithm>
<windows.h>
<ctxtcall.h>
<agile.h>
<winapifamily.h>
<winapifamily.h>
<uithreadctxt.h>
1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/pplx/pplx.h
"cpprest/details/cpprest_compat.h"
"pplx/pplxwin.h"
"pplx/pplxlinux.h"
"pplx/pplxlinux.h"
"pplx/pplxcancellation_token.h"
<functional>
1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/details/cpprest_compat.h
<sal.h>
<assert.h>
"cpprest/details/nosal.h"
<cstdio>
1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/details/nosal.h
1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/pplx/pplxwin.h
"cpprest/details/cpprest_compat.h"
"pplx/pplxinterface.h"
1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/pplx/pplxinterface.h
<memory>
<atomic>
1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/pplx/pplxlinux.h
<signal.h>
"pthread.h"
"cpprest/details/cpprest_compat.h"
<dispatch/dispatch.h>
<boost/thread/mutex.hpp>
<boost/thread/condition_variable.hpp>
<mutex>
<condition_variable>
"pplx/pplxinterface.h"
1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/pplx/pplxcancellation_token.h
<cstdlib>
<string>
"pplx/pplxinterface.h"
1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/json.h
<memory>
<string>
<sstream>
<vector>
<unordered_map>
<cstdint>
"cpprest/details/basic_types.h"
"cpprest/asyncrt_utils.h"
1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/details/basic_types.h
<string>
<fstream>
<iostream>
<sstream>
"cpprest/details/cpprest_compat.h"
<stdint.h>
<cstdint>
"cpprest/details/SafeInt3.hpp"
1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/details/SafeInt3.hpp
<cstddef>
<cstdlib>
<intrin.h>
<stdint.h>
<assert.h>
<stdlib.h>
1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/asyncrt_utils.h
<string>
<vector>
<cstdint>
<system_error>
<random>
<locale.h>
"pplx/pplxtasks.h"
"cpprest/details/basic_types.h"
<chrono>
<boost/algorithm/string.hpp>
<xlocale.h>
1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/uri.h
"cpprest/base_uri.h"
"cpprest/uri_builder.h"
1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/base_uri.h
<map>
<memory>
<string>
<vector>
<functional>
"cpprest/asyncrt_utils.h"
"cpprest/details/basic_types.h"
1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/uri_builder.h
<sstream>
<string>
<vector>
"cpprest/base_uri.h"
"cpprest/details/uri_parser.h"
1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/details/uri_parser.h
<string>
1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/http_headers.h
<map>
<memory>
<string>
<vector>
<system_error>
"cpprest/asyncrt_utils.h"
1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/streams.h
"cpprest/astreambuf.h"
<iosfwd>
1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/astreambuf.h
<ios>
<memory>
<cstring>
<math.h>
"pplx/pplxtasks.h"
"cpprest/details/basic_types.h"
"cpprest/asyncrt_utils.h"
1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/containerstream.h
<vector>
<queue>
<algorithm>
<iterator>
"pplx/pplxtasks.h"
"cpprest/astreambuf.h"
"cpprest/streams.h"
1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/details/http_constants.dat
1440745033 /home/evanc/spdlog/include/spdlog/tweakme.h
1464874650 /home/evanc/whpc-linux-communicator/nodemanager/filters/ExecutionFilter.h
<string>
<cpprest/json.h>
"../core/NodeManagerConfig.h"
1437983925 /home/evanc/whpc-linux-communicator/nodemanager/arguments/MetricCountersConfig.h
"MetricCounter.h"
"../utils/JsonHelper.h"
1430401912 /home/evanc/whpc-linux-communicator/nodemanager/data/MonitoringPacket.h
<boost/uuid/uuid.hpp>
"../utils/Logger.h"
"Umid.h"
1453226140 /home/evanc/whpc-linux-communicator/nodemanager/core/HostsManager.h
<string>
<vector>
"HttpFetcher.h"
"../data/HostEntry.h"
1447837236 /home/evanc/whpc-linux-communicator/nodemanager/core/HttpFetcher.h
<cpprest/json.h>
<cpprest/http_client.h>
<functional>
"Reporter.h"
1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/http_client.h
<wrl.h>
<msxml6.h>
<memory>
<limits>
"pplx/pplxtasks.h"
"cpprest/http_msg.h"
"cpprest/json.h"
"cpprest/uri.h"
"cpprest/details/web_utilities.h"
"cpprest/details/basic_types.h"
"cpprest/asyncrt_utils.h"
"cpprest/oauth1.h"
"cpprest/oauth2.h"
"boost/asio/ssl.hpp"
1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/details/web_utilities.h
"cpprest/asyncrt_utils.h"
1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/oauth1.h
"cpprest/http_msg.h"
"cpprest/details/web_utilities.h"
"cpprest/details/http_constants.dat"
"cpprest/details/http_constants.dat"
1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/oauth2.h
"cpprest/http_msg.h"
"cpprest/details/web_utilities.h"
"cpprest/details/http_constants.dat"
1447837236 /home/evanc/whpc-linux-communicator/nodemanager/data/HostEntry.h
<string>
<cpprest/json.h>

Просмотреть файл

@ -1,18 +1,18 @@
== Conding Convention ==
Namespaces should be rooted from "hpc", and have at most 2 layers, which means, you can only
define one more layer under "hpc".
Files which contain contents under a sub namespace, should be put into a sub folder with the
same name as the sub namespace.
One file should contain only one class.
Private fields, variables should be named using camel convention, while class/struct/methods
should be named using Pascal convention.
File names should be in Pascal convention exception main.cpp, while folder names should be
in lower case.
== Conding Convention ==
Namespaces should be rooted from "hpc", and have at most 2 layers, which means, you can only
define one more layer under "hpc".
Files which contain contents under a sub namespace, should be put into a sub folder with the
same name as the sub namespace.
One file should contain only one class.
Private fields, variables should be named using camel convention, while class/struct/methods
should be named using Pascal convention.
File names should be in Pascal convention exception main.cpp, while folder names should be
in lower case.
Namespace description:
arguments: All data structures passed from head directly.

Просмотреть файл

@ -456,6 +456,14 @@ namespace hpc
"Fix a memory leak issue in the cpprestsdk",
}
},
{ "1.7.1.0",
{
"Added the execution filter support",
"Added unit test for execution filter",
"Improved the unit test framework",
"Fixed some problems in json format processing",
}
},
};
return versionHistory;

Просмотреть файл

@ -1,17 +1,17 @@
#include "EndJobArgs.h"
#include "../utils/JsonHelper.h"
using namespace hpc::arguments;
using namespace hpc::utils;
EndJobArgs::EndJobArgs(int jobId) : JobId(jobId)
{
//ctor
}
EndJobArgs EndJobArgs::FromJson(const json::value& j)
{
EndJobArgs args(JsonHelper<int>::Read("JobId", j));
return std::move(args);
}
#include "EndJobArgs.h"
#include "../utils/JsonHelper.h"
using namespace hpc::arguments;
using namespace hpc::utils;
EndJobArgs::EndJobArgs(int jobId) : JobId(jobId)
{
//ctor
}
EndJobArgs EndJobArgs::FromJson(const json::value& j)
{
EndJobArgs args(JsonHelper<int>::Read("JobId", j));
return std::move(args);
}

Просмотреть файл

@ -1,25 +1,25 @@
#ifndef ENDJOBARGS_H
#define ENDJOBARGS_H
#include <cpprest/json.h>
namespace hpc
{
namespace arguments
{
struct EndJobArgs
{
public:
EndJobArgs(int jobId);
int JobId;
static EndJobArgs FromJson(const web::json::value& jsonValue);
protected:
private:
};
}
}
#endif // ENDJOBARGS_H
#ifndef ENDJOBARGS_H
#define ENDJOBARGS_H
#include <cpprest/json.h>
namespace hpc
{
namespace arguments
{
struct EndJobArgs
{
public:
EndJobArgs(int jobId);
int JobId;
static EndJobArgs FromJson(const web::json::value& jsonValue);
protected:
private:
};
}
}
#endif // ENDJOBARGS_H

Просмотреть файл

@ -1,21 +1,21 @@
#include "EndTaskArgs.h"
#include "../utils/JsonHelper.h"
using namespace hpc::arguments;
using namespace hpc::utils;
#include "EndTaskArgs.h"
#include "../utils/JsonHelper.h"
using namespace hpc::arguments;
using namespace hpc::utils;
EndTaskArgs::EndTaskArgs(int jobId, int taskId, int gracePeriodSeconds)
: JobId(jobId), TaskId(taskId), TaskCancelGracePeriodSeconds(gracePeriodSeconds)
{
//ctor
}
EndTaskArgs EndTaskArgs::FromJson(const json::value& j)
{
EndTaskArgs args(
JsonHelper<int>::Read("JobId", j),
JsonHelper<int>::Read("TaskId", j),
: JobId(jobId), TaskId(taskId), TaskCancelGracePeriodSeconds(gracePeriodSeconds)
{
//ctor
}
EndTaskArgs EndTaskArgs::FromJson(const json::value& j)
{
EndTaskArgs args(
JsonHelper<int>::Read("JobId", j),
JsonHelper<int>::Read("TaskId", j),
JsonHelper<int>::Read("TaskCancelGracePeriod", j));
return std::move(args);
}
return std::move(args);
}

Просмотреть файл

@ -1,27 +1,27 @@
#ifndef ENDTASKARGS_H
#define ENDTASKARGS_H
#include <cpprest/json.h>
namespace hpc
{
namespace arguments
{
struct EndTaskArgs
{
public:
#ifndef ENDTASKARGS_H
#define ENDTASKARGS_H
#include <cpprest/json.h>
namespace hpc
{
namespace arguments
{
struct EndTaskArgs
{
public:
EndTaskArgs(int jobId, int taskId, int gracePeriodSeconds);
int JobId;
int JobId;
int TaskId;
int TaskCancelGracePeriodSeconds;
static EndTaskArgs FromJson(const web::json::value& jsonValue);
protected:
private:
};
}
}
#endif // ENDTASKARGS_H
int TaskCancelGracePeriodSeconds;
static EndTaskArgs FromJson(const web::json::value& jsonValue);
protected:
private:
};
}
}
#endif // ENDTASKARGS_H

Просмотреть файл

@ -25,7 +25,7 @@ namespace hpc
return MetricCountersConfig(
JsonHelper<std::vector<MetricCounter>>::Read("MetricCounters", jsonValue));
}
protected:
private:
};

Просмотреть файл

@ -1,17 +1,17 @@
#include "ProcessStartInfo.h"
#include "../utils/JsonHelper.h"
using namespace hpc::arguments;
using namespace hpc::utils;
ProcessStartInfo::ProcessStartInfo(
std::string&& cmdLine,
std::string&& stdIn,
std::string&& stdOut,
std::string&& stdErr,
#include "ProcessStartInfo.h"
#include "../utils/JsonHelper.h"
using namespace hpc::arguments;
using namespace hpc::utils;
ProcessStartInfo::ProcessStartInfo(
std::string&& cmdLine,
std::string&& stdIn,
std::string&& stdOut,
std::string&& stdErr,
std::string&& workDir,
int taskRequeueCount,
std::vector<uint64_t>&& affinity,
int taskRequeueCount,
std::vector<uint64_t>&& affinity,
std::map<std::string, std::string>&& enviVars) :
CommandLine(std::move(cmdLine)),
StdInFile(std::move(stdIn)),
@ -20,23 +20,37 @@ ProcessStartInfo::ProcessStartInfo(
WorkDirectory(std::move(workDir)),
TaskRequeueCount(taskRequeueCount),
Affinity(std::move(affinity)),
EnvironmentVariables(std::move(enviVars))
{
}
/// TODO: Consider using attribute feature when compiler ready.
ProcessStartInfo ProcessStartInfo::FromJson(const web::json::value& jsonValue)
{
ProcessStartInfo startInfo(
JsonHelper<std::string>::Read("commandLine", jsonValue),
JsonHelper<std::string>::Read("stdin", jsonValue),
JsonHelper<std::string>::Read("stdout", jsonValue),
JsonHelper<std::string>::Read("stderr", jsonValue),
JsonHelper<std::string>::Read("workingDirectory", jsonValue),
EnvironmentVariables(std::move(enviVars))
{
}
json::value ProcessStartInfo::ToJson() const
{
json::value v;
v["commandLine"] = json::value::string(this->CommandLine);
v["stdin"] = json::value::string(this->StdInFile);
v["stdout"] = json::value::string(this->StdOutFile);
v["stderr"] = json::value::string(this->StdErrFile);
v["workingDirectory"] = json::value::string(this->WorkDirectory);
v["taskRequeueCount"] = this->TaskRequeueCount;
v["affinity"] = JsonHelper<std::vector<uint64_t>>::ToJson(this->Affinity);
v["environmentVariables"] = JsonHelper<std::map<std::string, std::string>>::ToJson(this->EnvironmentVariables);
return v;
}
/// TODO: Consider using attribute feature when compiler ready.
ProcessStartInfo ProcessStartInfo::FromJson(const web::json::value& jsonValue)
{
ProcessStartInfo startInfo(
JsonHelper<std::string>::Read("commandLine", jsonValue),
JsonHelper<std::string>::Read("stdin", jsonValue),
JsonHelper<std::string>::Read("stdout", jsonValue),
JsonHelper<std::string>::Read("stderr", jsonValue),
JsonHelper<std::string>::Read("workingDirectory", jsonValue),
JsonHelper<int>::Read("taskRequeueCount", jsonValue),
JsonHelper<std::vector<uint64_t>>::Read("affinity", jsonValue),
JsonHelper<std::map<std::string, std::string>>::Read("environmentVariables", jsonValue));
return std::move(startInfo);
}
JsonHelper<std::vector<uint64_t>>::Read("affinity", jsonValue),
JsonHelper<std::map<std::string, std::string>>::Read("environmentVariables", jsonValue));
return std::move(startInfo);
}

Просмотреть файл

@ -1,44 +1,45 @@
#ifndef PROCESSSTARTINFO_H
#define PROCESSSTARTINFO_H
#include <string>
#include <vector>
#include <map>
#include <cpprest/json.h>
namespace hpc
{
namespace arguments
{
struct ProcessStartInfo
{
public:
ProcessStartInfo(
std::string&& cmdLine,
std::string&& stdIn,
std::string&& stdOut,
std::string&& stdErr,
#ifndef PROCESSSTARTINFO_H
#define PROCESSSTARTINFO_H
#include <string>
#include <vector>
#include <map>
#include <cpprest/json.h>
namespace hpc
{
namespace arguments
{
struct ProcessStartInfo
{
public:
ProcessStartInfo(
std::string&& cmdLine,
std::string&& stdIn,
std::string&& stdOut,
std::string&& stdErr,
std::string&& workDir,
int taskRequeueCount,
std::vector<uint64_t>&& affinity,
std::map<std::string, std::string>&& enviVars);
ProcessStartInfo(ProcessStartInfo&& startInfo) = default;
static ProcessStartInfo FromJson(const web::json::value& jsonValue);
std::vector<uint64_t>&& affinity,
std::map<std::string, std::string>&& enviVars);
std::string CommandLine;
std::string StdInFile;
std::string StdOutFile;
std::string StdErrFile;
ProcessStartInfo(ProcessStartInfo&& startInfo) = default;
static ProcessStartInfo FromJson(const web::json::value& jsonValue);
web::json::value ToJson() const;
std::string CommandLine;
std::string StdInFile;
std::string StdOutFile;
std::string StdErrFile;
std::string WorkDirectory;
int TaskRequeueCount;
std::vector<uint64_t> Affinity;
std::map<std::string, std::string> EnvironmentVariables;
protected:
private:
};
}
}
#endif // PROCESSSTARTINFO_H
int TaskRequeueCount;
std::vector<uint64_t> Affinity;
std::map<std::string, std::string> EnvironmentVariables;
protected:
private:
};
}
}
#endif // PROCESSSTARTINFO_H

Просмотреть файл

@ -4,6 +4,7 @@
using namespace hpc::arguments;
using namespace hpc::utils;
using namespace web;
StartJobAndTaskArgs::StartJobAndTaskArgs(int jobId, int taskId, ProcessStartInfo&& startInfo,
std::string&& userName, std::string&& password) :
@ -13,6 +14,25 @@ StartJobAndTaskArgs::StartJobAndTaskArgs(int jobId, int taskId, ProcessStartInfo
//ctor
}
json::value StartJobAndTaskArgs::ToJson() const
{
json::value v;
json::value jobTaskId;
jobTaskId["JobId"] = this->JobId;
jobTaskId["TaskId"] = this->TaskId;
v["m_Item1"] = jobTaskId;
v["m_Item2"] = this->StartInfo.ToJson();
v["m_Item3"] = json::value::string(this->UserName);
v["m_Item4"] = json::value::string(this->Password);
v["m_Item5"] = json::value::string(this->PrivateKey);
v["m_Item6"] = json::value::string(this->PublicKey);
return v;
}
StartJobAndTaskArgs StartJobAndTaskArgs::FromJson(const json::value& j)
{
StartJobAndTaskArgs args(

Просмотреть файл

@ -1,41 +1,42 @@
#ifndef STARTJOBANDTASKARGS_H
#define STARTJOBANDTASKARGS_H
#include <cpprest/json.h>
#include "ProcessStartInfo.h"
namespace hpc
{
namespace arguments
{
struct StartJobAndTaskArgs
{
public:
#ifndef STARTJOBANDTASKARGS_H
#define STARTJOBANDTASKARGS_H
#include <cpprest/json.h>
#include "ProcessStartInfo.h"
namespace hpc
{
namespace arguments
{
struct StartJobAndTaskArgs
{
public:
StartJobAndTaskArgs(
int jobId,
int taskId,
ProcessStartInfo&& startInfo,
std::string&& userName,
std::string&& password);
StartJobAndTaskArgs(StartJobAndTaskArgs&& args) = default;
int JobId;
int TaskId;
std::string&& password);
StartJobAndTaskArgs(StartJobAndTaskArgs&& args) = default;
web::json::value ToJson() const;
int JobId;
int TaskId;
ProcessStartInfo StartInfo;
std::string UserName;
std::string Password;
std::vector<unsigned char> Certificate;
std::string PrivateKey;
std::string PublicKey;
static StartJobAndTaskArgs FromJson(const web::json::value& jsonValue);
protected:
private:
};
}
}
#endif // STARTJOBANDTASKARGS_H
std::string PublicKey;
static StartJobAndTaskArgs FromJson(const web::json::value& jsonValue);
protected:
private:
};
}
}
#endif // STARTJOBANDTASKARGS_H

Просмотреть файл

@ -1,22 +1,22 @@
#include "StartTaskArgs.h"
#include "../utils/JsonHelper.h"
using namespace hpc::arguments;
using namespace hpc::utils;
StartTaskArgs::StartTaskArgs(int jobId, int taskId, ProcessStartInfo&& startInfo) :
JobId(jobId), TaskId(taskId), StartInfo(std::move(startInfo))
{
//ctor
}
StartTaskArgs StartTaskArgs::FromJson(const json::value& j)
{
StartTaskArgs args(
JsonHelper<int>::Read("JobId", j.at("m_Item1")),
JsonHelper<int>::Read("TaskId", j.at("m_Item1")),
ProcessStartInfo::FromJson(j.at("m_Item2")));
return std::move(args);
}
#include "StartTaskArgs.h"
#include "../utils/JsonHelper.h"
using namespace hpc::arguments;
using namespace hpc::utils;
StartTaskArgs::StartTaskArgs(int jobId, int taskId, ProcessStartInfo&& startInfo) :
JobId(jobId), TaskId(taskId), StartInfo(std::move(startInfo))
{
//ctor
}
StartTaskArgs StartTaskArgs::FromJson(const json::value& j)
{
StartTaskArgs args(
JsonHelper<int>::Read("JobId", j.at("m_Item1")),
JsonHelper<int>::Read("TaskId", j.at("m_Item1")),
ProcessStartInfo::FromJson(j.at("m_Item2")));
return std::move(args);
}

Просмотреть файл

@ -1,29 +1,29 @@
#ifndef STARTTASKARGS_H
#define STARTTASKARGS_H
#include <cpprest/json.h>
#include "ProcessStartInfo.h"
namespace hpc
{
namespace arguments
{
struct StartTaskArgs
{
public:
StartTaskArgs(int jobId, int taskId, ProcessStartInfo&& startInfo);
int JobId;
int TaskId;
ProcessStartInfo StartInfo;
static StartTaskArgs FromJson(const web::json::value& jsonValue);
protected:
private:
};
}
}
#endif // STARTTASKARGS_H
#ifndef STARTTASKARGS_H
#define STARTTASKARGS_H
#include <cpprest/json.h>
#include "ProcessStartInfo.h"
namespace hpc
{
namespace arguments
{
struct StartTaskArgs
{
public:
StartTaskArgs(int jobId, int taskId, ProcessStartInfo&& startInfo);
int JobId;
int TaskId;
ProcessStartInfo StartInfo;
static StartTaskArgs FromJson(const web::json::value& jsonValue);
protected:
private:
};
}
}
#endif // STARTTASKARGS_H

Просмотреть файл

@ -17,6 +17,9 @@ namespace hpc
TestRunFailed = 176,
FailedToOpenPort = 177,
ConfigurationFileError = 178,
WriteFileError = 179,
ReadFileError = 180,
UnknownFilter = 181,
};
}
}

Просмотреть файл

@ -2,11 +2,14 @@
"TrustedCAFile":"/opt/hpcnodemanager/certs/test.pem",
"TrustedCAPath":"/opt/hpcnodemanager/certs/",
"UseDefaultCA":false,
"ClusterAuthenticationKey":"abc",
"ClusterAuthenticationKey":"",
"HeartbeatUri":"https://evancv1:40001/api/evanc.com/computenodereported",
"MetricUri":"udp://EVANCV1.fareast.corp.microsoft.com:9894/api/4aed4836-c0c8-4e92-853d-cac4698582d1/metricreported",
"RegisterUri":"https://evancv1:40001/api/evanc.com/registerrequested",
"CertificateChainFile":"/opt/hpcnodemanager/certs/evanc.com.crt",
"PrivateKeyFile":"/opt/hpcnodemanager/certs/evanc.com.key",
"ListeningUri":"https://0.0.0.0:40002"
"ListeningUri":"http://0.0.0.0:40000",
"JobStartFilter":"sed -s 's/123/456/g'",
"JobEndFilter":"sed -s 's/123/456/g'",
"TaskStartFilter":"sed -s 's/123/456/g'"
}

Просмотреть файл

@ -13,15 +13,15 @@ void HttpReporter::Report()
{
const std::string& uri = this->reportUri;
auto jsonBody = this->valueFetcher();
if (jsonBody.is_null())
{
Logger::Error("Skipped reporting to {0} because json is null", uri);
return;
}
Logger::Debug("---------> Report to {0} with {1}", uri, jsonBody);
auto jsonBody = this->valueFetcher();
if (jsonBody.is_null())
{
Logger::Error("Skipped reporting to {0} because json is null", uri);
return;
}
Logger::Debug("---------> Report to {0} with {1}", uri, jsonBody);
http_client client = HttpHelper::GetHttpClient(uri);
http_request request = HttpHelper::GetHttpRequest(methods::POST, jsonBody);
@ -38,18 +38,18 @@ void HttpReporter::Report()
this->intervalSeconds = milliseconds / 1000;
}
Logger::Debug("---------> Reported to {0} response code {1}, value {2}, interval {3}", uri, response.status_code(), milliseconds, this->intervalSeconds);
Logger::Debug("---------> Reported to {0} response code {1}, value {2}, interval {3}", uri, response.status_code(), milliseconds, this->intervalSeconds);
}
catch (const http_exception& httpEx)
{
Logger::Warn("HttpException occurred when report to {0}, ex {1}", this->reportUri, httpEx.what());
}
catch (const std::exception& ex)
{
Logger::Error("Exception occurred when report to {0}, ex {1}", this->reportUri, ex.what());
}
catch (...)
{
Logger::Error("Unknown error occurred when report to {0}", this->reportUri);
catch (const http_exception& httpEx)
{
Logger::Warn("HttpException occurred when report to {0}, ex {1}", this->reportUri, httpEx.what());
}
catch (const std::exception& ex)
{
Logger::Error("Exception occurred when report to {0}, ex {1}", this->reportUri, ex.what());
}
catch (...)
{
Logger::Error("Unknown error occurred when report to {0}", this->reportUri);
}
}

Просмотреть файл

@ -1,28 +1,28 @@
#ifndef IREMOTEEXECUTOR_H
#define IREMOTEEXECUTOR_H
#include "../arguments/StartJobAndTaskArgs.h"
#include "../arguments/StartTaskArgs.h"
#include "../arguments/EndJobArgs.h"
#ifndef IREMOTEEXECUTOR_H
#define IREMOTEEXECUTOR_H
#include "../arguments/StartJobAndTaskArgs.h"
#include "../arguments/StartTaskArgs.h"
#include "../arguments/EndJobArgs.h"
#include "../arguments/EndTaskArgs.h"
#include "../arguments/MetricCountersConfig.h"
namespace hpc
{
namespace core
{
class IRemoteExecutor
{
public:
virtual web::json::value StartJobAndTask(hpc::arguments::StartJobAndTaskArgs&& args, const std::string& callbackUri) = 0;
virtual web::json::value StartTask(hpc::arguments::StartTaskArgs&& args, const std::string& callbackUri) = 0;
virtual web::json::value EndJob(hpc::arguments::EndJobArgs&& args) = 0;
virtual web::json::value EndTask(hpc::arguments::EndTaskArgs&& args, const std::string& callbackUri) = 0;
virtual web::json::value Ping(const std::string& callbackUri) = 0;
virtual web::json::value Metric(const std::string& callbackUri) = 0;
virtual web::json::value MetricConfig(hpc::arguments::MetricCountersConfig&& config, const std::string& callbackUri) = 0;
};
}
}
#endif // IREMOTEEXECUTOR_H
namespace hpc
{
namespace core
{
class IRemoteExecutor
{
public:
virtual web::json::value StartJobAndTask(hpc::arguments::StartJobAndTaskArgs&& args, const std::string& callbackUri) = 0;
virtual web::json::value StartTask(hpc::arguments::StartTaskArgs&& args, const std::string& callbackUri) = 0;
virtual web::json::value EndJob(hpc::arguments::EndJobArgs&& args) = 0;
virtual web::json::value EndTask(hpc::arguments::EndTaskArgs&& args, const std::string& callbackUri) = 0;
virtual web::json::value Ping(const std::string& callbackUri) = 0;
virtual web::json::value Metric(const std::string& callbackUri) = 0;
virtual web::json::value MetricConfig(hpc::arguments::MetricCountersConfig&& config, const std::string& callbackUri) = 0;
};
}
}
#endif // IREMOTEEXECUTOR_H

Просмотреть файл

@ -1,22 +1,22 @@
#include "JobTaskTable.h"
#include "../utils/WriterLock.h"
#include "JobTaskTable.h"
#include "../utils/WriterLock.h"
#include "../utils/ReaderLock.h"
#include "../utils/System.h"
using namespace hpc::core;
using namespace web;
#include "../utils/System.h"
using namespace hpc::core;
using namespace web;
using namespace hpc::data;
using namespace hpc::utils;
using namespace hpc::utils;
JobTaskTable* JobTaskTable::instance = nullptr;
json::value JobTaskTable::ToJson()
{
ReaderLock readerLock(&this->lock);
auto j = this->nodeInfo.ToJson();
return std::move(j);
json::value JobTaskTable::ToJson()
{
ReaderLock readerLock(&this->lock);
auto j = this->nodeInfo.ToJson();
return std::move(j);
}
int JobTaskTable::GetTaskCount()
{
ReaderLock readerLock(&this->lock);
@ -27,7 +27,7 @@ int JobTaskTable::GetTaskCount()
return taskCount;
}
int JobTaskTable::GetCoresInUse()
{
ReaderLock readerLock(&this->lock);
@ -64,88 +64,88 @@ int JobTaskTable::GetCoresInUse()
return used;
}
std::shared_ptr<TaskInfo> JobTaskTable::AddJobAndTask(int jobId, int taskId, bool& isNewEntry)
{
WriterLock writerLock(&this->lock);
std::shared_ptr<JobInfo> job;
auto j = this->nodeInfo.Jobs.find(jobId);
if (j == this->nodeInfo.Jobs.end())
{
job = std::shared_ptr<JobInfo>(new JobInfo(jobId));
std::shared_ptr<TaskInfo> JobTaskTable::AddJobAndTask(int jobId, int taskId, bool& isNewEntry)
{
WriterLock writerLock(&this->lock);
std::shared_ptr<JobInfo> job;
auto j = this->nodeInfo.Jobs.find(jobId);
if (j == this->nodeInfo.Jobs.end())
{
job = std::shared_ptr<JobInfo>(new JobInfo(jobId));
this->nodeInfo.Jobs[jobId] = job;
}
else
{
}
else
{
job = j->second;
}
std::shared_ptr<TaskInfo> task;
auto t = job->Tasks.find(taskId);
if (t == job->Tasks.end())
{
task = std::shared_ptr<TaskInfo>(new TaskInfo(jobId, taskId, nodeInfo.Name));
}
std::shared_ptr<TaskInfo> task;
auto t = job->Tasks.find(taskId);
if (t == job->Tasks.end())
{
task = std::shared_ptr<TaskInfo>(new TaskInfo(jobId, taskId, nodeInfo.Name));
job->Tasks[taskId] = task;
isNewEntry = true;
}
else
{
isNewEntry = true;
}
else
{
task = t->second;
isNewEntry = false;
}
return task;
isNewEntry = false;
}
return task;
}
std::shared_ptr<TaskInfo> JobTaskTable::GetTask(int jobId, int taskId)
{
ReaderLock readerLock(&this->lock);
ReaderLock readerLock(&this->lock);
std::shared_ptr<TaskInfo> task;
auto j = this->nodeInfo.Jobs.find(jobId);
if (j != this->nodeInfo.Jobs.end())
{
std::shared_ptr<TaskInfo> task;
auto j = this->nodeInfo.Jobs.find(jobId);
if (j != this->nodeInfo.Jobs.end())
{
auto t = j->second->Tasks.find(taskId);
if (t != j->second->Tasks.end())
{
task = t->second;
}
}
}
return task;
}
std::shared_ptr<JobInfo> JobTaskTable::RemoveJob(int jobId)
{
WriterLock writerLock(&this->lock);
std::shared_ptr<JobInfo> job;
auto j = this->nodeInfo.Jobs.find(jobId);
if (j != this->nodeInfo.Jobs.end())
{
job = j->second;
this->nodeInfo.Jobs.erase(j);
}
return job;
}
void JobTaskTable::RemoveTask(int jobId, int taskId, uint64_t attemptId)
{
WriterLock writerLock(&this->lock);
auto j = this->nodeInfo.Jobs.find(jobId);
if (j != this->nodeInfo.Jobs.end())
return task;
}
std::shared_ptr<JobInfo> JobTaskTable::RemoveJob(int jobId)
{
WriterLock writerLock(&this->lock);
std::shared_ptr<JobInfo> job;
auto j = this->nodeInfo.Jobs.find(jobId);
if (j != this->nodeInfo.Jobs.end())
{
job = j->second;
this->nodeInfo.Jobs.erase(j);
}
return job;
}
void JobTaskTable::RemoveTask(int jobId, int taskId, uint64_t attemptId)
{
WriterLock writerLock(&this->lock);
auto j = this->nodeInfo.Jobs.find(jobId);
if (j != this->nodeInfo.Jobs.end())
{
auto t = j->second->Tasks.find(taskId);
// only erase when attempt ID matches.
if (t != j->second->Tasks.end() &&
t->second->GetAttemptId() == attemptId)
{
{
j->second->Tasks.erase(t);
}
}
}
}
}

Просмотреть файл

@ -1,37 +1,37 @@
#ifndef JOBTASKTABLE_H
#define JOBTASKTABLE_H
#include <map>
#include <cpprest/json.h>
#include "../data/TaskInfo.h"
#include "../data/JobInfo.h"
#ifndef JOBTASKTABLE_H
#define JOBTASKTABLE_H
#include <map>
#include <cpprest/json.h>
#include "../data/TaskInfo.h"
#include "../data/JobInfo.h"
#include "../data/NodeInfo.h"
#include "../utils/ReaderLock.h"
namespace hpc
{
namespace core
{
class JobTaskTable
{
public:
#include "../utils/ReaderLock.h"
namespace hpc
{
namespace core
{
class JobTaskTable
{
public:
JobTaskTable() : lock(PTHREAD_RWLOCK_INITIALIZER)
{
JobTaskTable::instance = this;
}
}
~JobTaskTable()
{
pthread_rwlock_destroy(&this->lock);
JobTaskTable::instance = nullptr;
}
web::json::value ToJson();
// web::json::value GetTaskJson(int jobId, int taskId) const;
std::shared_ptr<hpc::data::TaskInfo> AddJobAndTask(int jobId, int taskId, bool& isNewEntry);
std::shared_ptr<hpc::data::JobInfo> RemoveJob(int jobId);
}
web::json::value ToJson();
// web::json::value GetTaskJson(int jobId, int taskId) const;
std::shared_ptr<hpc::data::TaskInfo> AddJobAndTask(int jobId, int taskId, bool& isNewEntry);
std::shared_ptr<hpc::data::JobInfo> RemoveJob(int jobId);
void RemoveTask(int jobId, int taskId, uint64_t attemptId);
std::shared_ptr<hpc::data::TaskInfo> GetTask(int jobId, int taskId);
int GetJobCount()
@ -49,17 +49,17 @@ namespace hpc
// Set this flag so the next report will trigger the resync with scheduler.
this->nodeInfo.JustStarted = true;
}
static JobTaskTable* GetInstance() { return JobTaskTable::instance; }
protected:
private:
pthread_rwlock_t lock;
static JobTaskTable* GetInstance() { return JobTaskTable::instance; }
protected:
private:
pthread_rwlock_t lock;
hpc::data::NodeInfo nodeInfo;
static JobTaskTable* instance;
};
}
}
#endif // JOBTASKTABLE_H
static JobTaskTable* instance;
};
}
}
#endif // JOBTASKTABLE_H

Просмотреть файл

@ -36,6 +36,9 @@ namespace hpc
AddConfigurationItem(std::string, CertificateChainFile);
AddConfigurationItem(std::string, PrivateKeyFile);
AddConfigurationItem(std::string, ListeningUri);
AddConfigurationItem(std::string, JobStartFilter);
AddConfigurationItem(std::string, JobEndFilter);
AddConfigurationItem(std::string, TaskStartFilter);
AddConfigurationItem(bool, UseDefaultCA);
AddConfigurationItem(bool, Debug);
AddConfigurationItem(int, HostsFetchInterval);

Просмотреть файл

@ -57,13 +57,13 @@ void Process::Cleanup()
Logger::Info("Cleanup zombie result: {0}", output);
}
pplx::task<pid_t> Process::Start()
pplx::task<std::pair<pid_t, pthread_t>> Process::Start()
{
pthread_create(&this->threadId, nullptr, ForkThread, this);
Logger::Debug(this->jobId, this->taskId, this->requeueCount, "Created thread {0}", this->threadId);
return pplx::task<pid_t>(this->started);
return pplx::task<std::pair<pid_t, pthread_t>>(this->started);
}
void Process::Kill(int forcedExitCode, bool forced)
@ -183,6 +183,7 @@ Start:
Logger::Error(p->jobId, p->taskId, p->requeueCount, "Failed to fork(), pid = {0}, errno = {1}, msg = {2}", p->processId, errno, errorMessage);
p->SetExitCode(errno);
p->started.set(std::pair<pid_t, pthread_t>(p->processId, p->threadId));
goto Final;
}
@ -193,8 +194,7 @@ Start:
else
{
assert(p->processId > 0);
p->started.set(p->processId);
assert(p->processId > 0);
p->started.set(std::pair<pid_t, pthread_t>(p->processId, p->threadId));
p->Monitor();
}
@ -483,26 +483,16 @@ int Process::CreateTaskFolder()
sprintf(folder, "/tmp/nodemanager_task_%d_%d.XXXXXX", this->taskId, this->requeueCount);
char* p = mkdtemp(folder);
int ret = System::CreateTempFolder(folder, this->userName);
if (p)
if (ret != 0)
{
this->taskFolder = p;
int ret;
ret = this->ExecuteCommand("chown -R", this->userName, this->taskFolder);
if (ret == 0)
{
ret = this->ExecuteCommand("chmod -R u+rwX", this->taskFolder);
}
return ret;
}
else
{
return errno;
Logger::Debug(this->jobId, this->taskId, this->requeueCount, "CreateTaskFolder failed exitCode {0}.", ret);
}
this->taskFolder = folder;
return ret;
}
std::string Process::BuildScript()
@ -519,6 +509,8 @@ std::string Process::BuildScript()
fs << "#!/bin/bash" << std::endl << std::endl;
fs << "cd ";
Logger::Debug("{0}, {1}", this->taskFolder, this->workDirectory);
if (this->workDirectory.empty())
{
fs << this->taskFolder;

Просмотреть файл

@ -1,158 +1,158 @@
#ifndef PROCESS_H
#define PROCESS_H
#include <string>
#include <sstream>
#include <vector>
#include <map>
#include <memory>
#include <unistd.h>
#include <sys/signal.h>
#include <pplx/pplxtasks.h>
#include <sys/wait.h>
#include <sys/resource.h>
#include "../utils/String.h"
#include "../utils/Logger.h"
#include "../utils/System.h"
#ifndef PROCESS_H
#define PROCESS_H
#include <string>
#include <sstream>
#include <vector>
#include <map>
#include <memory>
#include <unistd.h>
#include <sys/signal.h>
#include <pplx/pplxtasks.h>
#include <sys/wait.h>
#include <sys/resource.h>
#include "../utils/String.h"
#include "../utils/Logger.h"
#include "../utils/System.h"
#include "../common/ErrorCodes.h"
#include "../data/ProcessStatistics.h"
using namespace hpc::utils;
namespace hpc
{
namespace core
{
class Process
{
public:
using namespace hpc::utils;
namespace hpc
{
namespace core
{
class Process
{
public:
typedef void Callback(
int,
std::string&&,
const hpc::data::ProcessStatistics& stat);
const hpc::data::ProcessStatistics& stat);
Process(
int jobId,
int taskId,
int requeueCount,
const std::string& cmdLine,
const std::string& standardOut,
const std::string& standardErr,
const std::string& standardIn,
int requeueCount,
const std::string& cmdLine,
const std::string& standardOut,
const std::string& standardErr,
const std::string& standardIn,
const std::string& workDir,
const std::string& user,
std::vector<uint64_t>&& cpuAffinity,
std::map<std::string, std::string>&& envi,
const std::function<Callback> completed);
Process(Process&&) = default;
virtual ~Process();
pplx::task<pid_t> Start();
void Kill(int forcedExitCode = 0x0FFFFFFF, bool forced = true);
const hpc::data::ProcessStatistics& GetStatisticsFromCGroup();
std::vector<uint64_t>&& cpuAffinity,
std::map<std::string, std::string>&& envi,
const std::function<Callback> completed);
Process(Process&&) = default;
virtual ~Process();
pplx::task<std::pair<pid_t, pthread_t>> Start();
void Kill(int forcedExitCode = 0x0FFFFFFF, bool forced = true);
const hpc::data::ProcessStatistics& GetStatisticsFromCGroup();
static void Cleanup();
protected:
protected:
private:
void SetExitCode(int exitCode)
{
this->exitCode = exitCode;
this->exitCodeSet = true;
}
void OnCompleted();
int CreateTaskFolder();
template <typename ... Args>
int ExecuteCommand(const std::string& cmd, const Args& ... args)
{
std::string output;
std::string cmdLine = String::Join(" ", cmd, args...);
int ret = System::ExecuteCommandOut(output, cmd, args...);
if (ret != 0)
int CreateTaskFolder();
template <typename ... Args>
int ExecuteCommand(const std::string& cmd, const Args& ... args)
{
std::string output;
std::string cmdLine = String::Join(" ", cmd, args...);
int ret = System::ExecuteCommandOut(output, cmd, args...);
if (ret != 0)
{
this->SetExitCode(ret);
this->message
<< "Task " << this->taskId << ": '" << cmdLine
<< "', exitCode " << ret << ". output "
<< output << std::endl;
<< output << std::endl;
}
Logger::Debug(this->jobId, this->taskId, this->requeueCount, "'{0}', exitCode {1}, output {2}.", cmdLine, ret, output);
return ret;
}
template <typename ... Args>
int ExecuteCommandNoCapture(const std::string& cmd, const Args& ... args)
{
std::string output;
std::string cmdLine = String::Join(" ", cmd, args...);
int ret = System::ExecuteCommandOut(output, cmd, args...);
Logger::Debug(this->jobId, this->taskId, this->requeueCount, "'{0}', exitCode {1}, output {2}.", cmdLine, ret, output);
return ret;
}
template <typename ... Args>
int ExecuteCommandNoCapture(const std::string& cmd, const Args& ... args)
{
std::string output;
std::string cmdLine = String::Join(" ", cmd, args...);
int ret = System::ExecuteCommandOut(output, cmd, args...);
Logger::Debug(this->jobId, this->taskId, this->requeueCount, "'{0}', exitCode {1}, output {2}.", cmdLine, ret, output);
return ret;
}
Logger::Debug(this->jobId, this->taskId, this->requeueCount, "'{0}', exitCode {1}, output {2}.", cmdLine, ret, output);
return ret;
}
static void* ForkThread(void*);
std::string GetAffinity();
static inline void OutputAffinity(std::ostringstream& oss, int start, int end)
{
if (start == end) { oss << "," << start; }
else if (start < end) { oss << "," << start << "-" << end; }
}
void Run(const std::string& path);
static void* ReadPipeThread(void* p);
void SendbackOutput(const std::string& uri, const std::string& output, int order) const;
void Monitor();
std::string BuildScript();
std::unique_ptr<const char* []> PrepareEnvironment();
std::ostringstream stdOut;
std::ostringstream stdErr;
std::ostringstream message;
void SendbackOutput(const std::string& uri, const std::string& output, int order) const;
void Monitor();
std::string BuildScript();
std::unique_ptr<const char* []> PrepareEnvironment();
std::ostringstream stdOut;
std::ostringstream stdErr;
std::ostringstream message;
int exitCode = (int)hpc::common::ErrorCodes::DefaultExitCode;
bool exitCodeSet = false;
bool exitCodeSet = false;
hpc::data::ProcessStatistics statistics;
std::string taskFolder;
std::string taskFolder;
const int jobId;
const int taskId;
const int requeueCount;
const std::string taskExecutionId;
const std::string commandLine;
std::string stdOutFile;
std::string stdErrFile;
const std::string stdInFile;
const int requeueCount;
const std::string taskExecutionId;
const std::string commandLine;
std::string stdOutFile;
std::string stdErrFile;
const std::string stdInFile;
const std::string workDirectory;
const std::string userName;
const std::vector<uint64_t> affinity;
const std::map<std::string, std::string> environments;
const std::vector<uint64_t> affinity;
const std::map<std::string, std::string> environments;
std::vector<std::string> environmentsBuffer;
bool streamOutput = false;
int stdoutPipe[2];
const std::function<Callback> callback;
pthread_t threadId = 0;
int stdoutPipe[2];
const std::function<Callback> callback;
pthread_t threadId = 0;
pthread_t outputThreadId = 0;
pid_t processId;
bool ended = false;
pthread_rwlock_t lock = PTHREAD_RWLOCK_INITIALIZER;
pplx::task_completion_event<pid_t> started;
};
}
}
#endif // PROCESS_H
pthread_rwlock_t lock = PTHREAD_RWLOCK_INITIALIZER;
pplx::task_completion_event<std::pair<pid_t, pthread_t>> started;
};
}
}
#endif // PROCESS_H

Просмотреть файл

@ -1,131 +1,132 @@
#include <sstream>
#include <stdlib.h>
#include "RemoteCommunicator.h"
#include "../utils/String.h"
#include "../utils/System.h"
#include "../arguments/StartJobAndTaskArgs.h"
#include "RemoteCommunicator.h"
#include "../utils/String.h"
#include "../utils/System.h"
#include "../arguments/StartJobAndTaskArgs.h"
#include "../common/ErrorCodes.h"
#include "NodeManagerConfig.h"
#include "HttpHelper.h"
#include "../arguments/MetricCountersConfig.h"
using namespace web::http;
using namespace web;
using namespace hpc::utils;
using namespace hpc::arguments;
using namespace web;
using namespace hpc::utils;
using namespace hpc::arguments;
using namespace hpc::core;
using namespace hpc::common;
using namespace web::http::experimental::listener;
using namespace hpc::filters;
RemoteCommunicator::RemoteCommunicator(IRemoteExecutor& exec, const http_listener_config& config) :
RemoteCommunicator::RemoteCommunicator(IRemoteExecutor& exec, const http_listener_config& config) :
listeningUri(NodeManagerConfig::GetListeningUri()), isListening(false), executor(exec),
listener(listeningUri, config)
{
this->listener.support(
methods::POST,
[this](auto request) { this->HandlePost(request); });
listener(listeningUri, config)
{
this->listener.support(
methods::POST,
[this](auto request) { this->HandlePost(request); });
if (NodeManagerConfig::GetDebug())
{
this->listener.support(
methods::GET,
[this](auto request) { this->HandleGet(request); });
{
this->listener.support(
methods::GET,
[this](auto request) { this->HandleGet(request); });
}
this->processors["startjobandtask"] = [this] (const auto& j, const auto& c) { return this->StartJobAndTask(j, c); };
this->processors["starttask"] = [this] (const auto& j, const auto& c) { return this->StartTask(j, c); };
this->processors["endjob"] = [this] (const auto& j, const auto& c) { return this->EndJob(j, c); };
this->processors["endtask"] = [this] (const auto& j, const auto& c) { return this->EndTask(j, c); };
this->processors["ping"] = [this] (const auto& j, const auto& c) { return this->Ping(j, c); };
this->processors["metric"] = [this] (const auto& j, const auto& c) { return this->Metric(j, c); };
this->processors["metricconfig"] = [this] (const auto& j, const auto& c) { return this->MetricConfig(j, c); };
}
RemoteCommunicator::~RemoteCommunicator()
{
this->Close();
}
void RemoteCommunicator::Open()
{
if (!this->isListening)
{
this->listener.open().then([this](auto t)
{
this->isListening = !IsError(t);
Logger::Info(
"Opening at {0}, result {1}",
this->listener.uri().to_string(),
this->processors["starttask"] = [this] (const auto& j, const auto& c) { return this->StartTask(j, c); };
this->processors["endjob"] = [this] (const auto& j, const auto& c) { return this->EndJob(j, c); };
this->processors["endtask"] = [this] (const auto& j, const auto& c) { return this->EndTask(j, c); };
this->processors["ping"] = [this] (const auto& j, const auto& c) { return this->Ping(j, c); };
this->processors["metric"] = [this] (const auto& j, const auto& c) { return this->Metric(j, c); };
this->processors["metricconfig"] = [this] (const auto& j, const auto& c) { return this->MetricConfig(j, c); };
}
RemoteCommunicator::~RemoteCommunicator()
{
this->Close();
}
void RemoteCommunicator::Open()
{
if (!this->isListening)
{
this->listener.open().then([this](auto t)
{
this->isListening = !IsError(t);
Logger::Info(
"Opening at {0}, result {1}",
this->listeningUri,
this->isListening ? "opened." : "failed.");
if (!this->isListening)
{
exit((int)ErrorCodes::FailedToOpenPort);
}
});
}
}
void RemoteCommunicator::Close()
{
if (this->isListening)
{
try
{
this->listener.close().wait();
this->isListening = false;
}
catch (const std::exception& ex)
{
Logger::Error("Exception happened while close the listener {0}, {1}", this->listener.uri().to_string().c_str(), ex.what());
}
}
}
}
});
}
}
void RemoteCommunicator::Close()
{
if (this->isListening)
{
try
{
this->listener.close().wait();
this->isListening = false;
}
catch (const std::exception& ex)
{
Logger::Error("Exception happened while close the listener {0}, {1}", this->listener.uri().to_string().c_str(), ex.what());
}
}
}
void RemoteCommunicator::HandleGet(http_request request)
{
auto uri = request.relative_uri().to_string();
Logger::Info("Request (GET): Uri {0}", uri);
auto uri = request.relative_uri().to_string();
Logger::Info("Request (GET): Uri {0}", uri);
json::value body;
body["status"] = json::value::string("node manager working");
request.reply(status_codes::OK, body).then([this](auto t) { this->IsError(t); });
}
void RemoteCommunicator::HandlePost(http_request request)
request.reply(status_codes::OK, body).then([this](auto t) { this->IsError(t); });
}
void RemoteCommunicator::HandlePost(http_request request)
{
auto uri = request.relative_uri().to_string();
Logger::Info("Request: Uri {0}", uri);
std::vector<std::string> tokens = String::Split(uri, '/');
auto uri = request.relative_uri().to_string();
Logger::Info("Request: Uri {0}", uri);
std::vector<std::string> tokens = String::Split(uri, '/');
if (tokens.size() < 4)
{
Logger::Warn("Not supported uri {0}", uri);
return;
}
// skip the first '/'.
int p = 1;
auto apiSpace = tokens[p++];
auto nodeName = tokens[p++];
auto methodName = tokens[p++];
Logger::Debug("Request: Uri {0}, Node {1}, Method {2}", uri, nodeName, methodName);
if (apiSpace != ApiSpace)
{
Logger::Error("Not allowed ApiSpace {0}", apiSpace);
request.reply(status_codes::NotFound, U("Not found"))
.then([this](auto t) { this->IsError(t); });
return;
}
// skip the first '/'.
int p = 1;
auto apiSpace = tokens[p++];
auto nodeName = tokens[p++];
auto methodName = tokens[p++];
Logger::Debug("Request: Uri {0}, Node {1}, Method {2}", uri, nodeName, methodName);
if (apiSpace != ApiSpace)
{
Logger::Error("Not allowed ApiSpace {0}", apiSpace);
request.reply(status_codes::NotFound, U("Not found"))
.then([this](auto t) { this->IsError(t); });
return;
}
std::string authenticationKey;
if (HttpHelper::FindHeader(request, HttpHelper::AuthenticationHeaderKey, authenticationKey))
{
Logger::Debug("AuthenticationKey found");
Logger::Debug("AuthenticationKey found");
}
if (NodeManagerConfig::GetClusterAuthenticationKey() != authenticationKey)
@ -135,84 +136,120 @@ void RemoteCommunicator::HandlePost(http_request request)
return;
}
std::string callbackUri;
std::string callbackUri;
if (HttpHelper::FindHeader(request, CallbackUriKey, callbackUri))
{
Logger::Debug("CallbackUri found {0}", callbackUri.c_str());
}
auto processor = this->processors.find(methodName);
if (processor != this->processors.end())
{
{
Logger::Debug("CallbackUri found {0}", callbackUri.c_str());
}
auto processor = this->processors.find(methodName);
if (processor != this->processors.end())
{
request.extract_json().then([processor, callback = std::move(callbackUri)](pplx::task<json::value> t)
{
auto j = t.get();
// Logger::Debug("Json: {0}", j.serialize());
return processor->second(j, callback);
})
.then([request, this](pplx::task<json::value> t)
return processor->second(j, callback);
})
.then([request, this](pplx::task<json::value> t)
{
std::string errorMessage;
if (!this->IsError(t, errorMessage))
std::string errorMessage;
if (!this->IsError(t, errorMessage))
{
auto jsonBody = t.get();
request.reply(status_codes::OK, jsonBody).then([this](auto t) { this->IsError(t); });
}
else
{
request.reply(status_codes::InternalError, errorMessage).then([this](auto t) { this->IsError(t); });
}
});
}
else
{
Logger::Warn("Unable to find the method {0}", methodName.c_str());
request.reply(status_codes::NotFound, "").then([this](auto t) { this->IsError(t); });
}
}
json::value RemoteCommunicator::StartJobAndTask(const json::value& val, const std::string& callbackUri)
auto jsonBody = t.get();
request.reply(status_codes::OK, jsonBody).then([this](auto t) { this->IsError(t); });
}
else
{
request.reply(status_codes::InternalError, errorMessage).then([this](auto t) { this->IsError(t); });
}
});
}
else
{
Logger::Warn("Unable to find the method {0}", methodName.c_str());
request.reply(status_codes::NotFound, "").then([this](auto t) { this->IsError(t); });
}
}
json::value RemoteCommunicator::StartJobAndTask(const json::value& val, const std::string& callbackUri)
{
auto args = StartJobAndTaskArgs::FromJson(val);
return this->executor.StartJobAndTask(std::move(args), callbackUri);
}
json::value RemoteCommunicator::StartTask(const json::value& val, const std::string& callbackUri)
{
auto args = StartTaskArgs::FromJson(val);
return this->executor.StartTask(std::move(args), callbackUri);
}
json::value RemoteCommunicator::EndJob(const json::value& val, const std::string& callbackUri)
{
auto args = EndJobArgs::FromJson(val);
return this->executor.EndJob(std::move(args));
}
json::value RemoteCommunicator::EndTask(const json::value& val, const std::string& callbackUri)
{
auto args = EndTaskArgs::FromJson(val);
return this->executor.EndTask(std::move(args), callbackUri);
}
json::value RemoteCommunicator::Ping(const json::value& val, const std::string& callbackUri)
{
return this->executor.Ping(callbackUri);
}
json::value RemoteCommunicator::Metric(const json::value& val, const std::string& callbackUri)
{
return this->executor.Metric(callbackUri);
}
json::value RemoteCommunicator::MetricConfig(const json::value& val, const std::string& callbackUri)
auto args = StartJobAndTaskArgs::FromJson(val);
json::value v;
std::string executionMessage;
int ret = this->filter.OnJobStart(args.JobId, val, v, executionMessage);
if (ret != 0)
{
throw new std::runtime_error(String::Join(" ", "StartJob filter fails with error code", ret, "execution message", executionMessage));
return v;
}
else
{
return this->executor.StartJobAndTask(StartJobAndTaskArgs::FromJson(v), callbackUri);
}
}
json::value RemoteCommunicator::StartTask(const json::value& val, const std::string& callbackUri)
{
auto args = MetricCountersConfig::FromJson(val);
return this->executor.MetricConfig(std::move(args), callbackUri);
}
const std::string RemoteCommunicator::ApiSpace = "api";
const std::string RemoteCommunicator::CallbackUriKey = "CallbackURI";
auto args = StartTaskArgs::FromJson(val);
json::value v;
std::string executionMessage;
int ret = this->filter.OnTaskStart(args.JobId, args.TaskId, args.StartInfo.TaskRequeueCount, val, v, executionMessage);
if (ret != 0)
{
throw new std::runtime_error(String::Join(" ", "StartTask filter fails with error code", ret, "execution message", executionMessage));
return v;
}
else
{
return this->executor.StartTask(StartTaskArgs::FromJson(v), callbackUri);
}
}
json::value RemoteCommunicator::EndJob(const json::value& val, const std::string& callbackUri)
{
auto args = EndJobArgs::FromJson(val);
json::value v;
std::string executionMessage;
int ret = this->filter.OnJobEnd(args.JobId, val, v, executionMessage);
if (ret != 0)
{
throw new std::runtime_error(String::Join(" ", "EndJob filter fails with error code", ret, "execution message", executionMessage));
return v;
}
else
{
return this->executor.EndJob(EndJobArgs::FromJson(v));
}
}
json::value RemoteCommunicator::EndTask(const json::value& val, const std::string& callbackUri)
{
auto args = EndTaskArgs::FromJson(val);
return this->executor.EndTask(std::move(args), callbackUri);
}
json::value RemoteCommunicator::Ping(const json::value& val, const std::string& callbackUri)
{
return this->executor.Ping(callbackUri);
}
json::value RemoteCommunicator::Metric(const json::value& val, const std::string& callbackUri)
{
return this->executor.Metric(callbackUri);
}
json::value RemoteCommunicator::MetricConfig(const json::value& val, const std::string& callbackUri)
{
auto args = MetricCountersConfig::FromJson(val);
return this->executor.MetricConfig(std::move(args), callbackUri);
}
const std::string RemoteCommunicator::ApiSpace = "api";
const std::string RemoteCommunicator::CallbackUriKey = "CallbackURI";

Просмотреть файл

@ -1,79 +1,82 @@
#ifndef REMOTECOMMUNICATOR_H
#define REMOTECOMMUNICATOR_H
#include <cpprest/http_listener.h>
#include <cpprest/json.h>
#include "../utils/Logger.h"
#include "IRemoteExecutor.h"
namespace hpc
{
using namespace utils;
using namespace web;
namespace core
{
class RemoteCommunicator
{
public:
RemoteCommunicator(IRemoteExecutor& executor, const web::http::experimental::listener::http_listener_config& config);
~RemoteCommunicator();
void Open();
void Close();
protected:
private:
void HandlePost(web::http::http_request message);
void HandleGet(web::http::http_request message);
template <typename T>
static bool IsError(pplx::task<T>& t, std::string& errorMessage)
{
try { t.wait(); return false; }
catch (const web::http::http_exception& httpEx)
{
#ifndef REMOTECOMMUNICATOR_H
#define REMOTECOMMUNICATOR_H
#include <cpprest/http_listener.h>
#include <cpprest/json.h>
#include "../utils/Logger.h"
#include "../filters/ExecutionFilter.h"
#include "IRemoteExecutor.h"
namespace hpc
{
using namespace utils;
using namespace web;
using namespace filters;
namespace core
{
class RemoteCommunicator
{
public:
RemoteCommunicator(IRemoteExecutor& executor, const web::http::experimental::listener::http_listener_config& config);
~RemoteCommunicator();
void Open();
void Close();
protected:
private:
void HandlePost(web::http::http_request message);
void HandleGet(web::http::http_request message);
template <typename T>
static bool IsError(pplx::task<T>& t, std::string& errorMessage)
{
try { t.wait(); return false; }
catch (const web::http::http_exception& httpEx)
{
Logger::Error("Http exception occurred: {0}", httpEx.what());
errorMessage = httpEx.what();
}
catch (const std::exception& ex)
{
errorMessage = httpEx.what();
}
catch (const std::exception& ex)
{
Logger::Error("Exception occurred: {0}", ex.what());
errorMessage = ex.what();
}
return true;
}
template <typename T>
static bool IsError(pplx::task<T>& t)
{
errorMessage = ex.what();
}
return true;
}
template <typename T>
static bool IsError(pplx::task<T>& t)
{
std::string errorMessage;
return IsError(t, errorMessage);
}
json::value StartJobAndTask(const json::value& val, const std::string&);
json::value StartTask(const json::value& val, const std::string&);
json::value EndJob(const json::value& val, const std::string&);
json::value EndTask(const json::value& val, const std::string&);
json::value Ping(const json::value& val, const std::string&);
json::value Metric(const json::value& val, const std::string&);
json::value MetricConfig(const json::value& val, const std::string&);
static const std::string ApiSpace;
static const std::string CallbackUriKey;
const std::string listeningUri;
bool isListening;
std::map<std::string, std::function<json::value(const json::value&, const std::string&)>> processors;
IRemoteExecutor& executor;
web::http::experimental::listener::http_listener listener;
};
}
}
#endif // REMOTECOMMUNICATOR_H
return IsError(t, errorMessage);
}
json::value StartJobAndTask(const json::value& val, const std::string&);
json::value StartTask(const json::value& val, const std::string&);
json::value EndJob(const json::value& val, const std::string&);
json::value EndTask(const json::value& val, const std::string&);
json::value Ping(const json::value& val, const std::string&);
json::value Metric(const json::value& val, const std::string&);
json::value MetricConfig(const json::value& val, const std::string&);
static const std::string ApiSpace;
static const std::string CallbackUriKey;
const std::string listeningUri;
bool isListening;
std::map<std::string, std::function<json::value(const json::value&, const std::string&)>> processors;
IRemoteExecutor& executor;
web::http::experimental::listener::http_listener listener;
ExecutionFilter filter;
};
}
}
#endif // REMOTECOMMUNICATOR_H

Просмотреть файл

@ -234,12 +234,12 @@ json::value RemoteExecutor::StartTask(StartTaskArgs&& args, const std::string& c
args.JobId, args.TaskId, taskInfo->GetTaskRequeueCount(),
"StartTask for ProcessKey {0}, process count {1}", taskInfo->ProcessKey, this->processes.size());
process->Start().then([this, taskInfo] (pid_t pid)
process->Start().then([this, taskInfo] (std::pair<pid_t, pthread_t> ids)
{
if (pid > 0)
if (ids.first > 0)
{
Logger::Debug(taskInfo->JobId, taskInfo->TaskId, taskInfo->GetTaskRequeueCount(),
"Process started {0}", pid);
"Process started pid {0}, tid {1}", ids.first, ids.second);
}
});
}

Просмотреть файл

@ -1,70 +1,70 @@
#ifndef REMOTEEXECUTOR_H
#define REMOTEEXECUTOR_H
#ifndef REMOTEEXECUTOR_H
#define REMOTEEXECUTOR_H
#include <set>
#include <map>
#include "IRemoteExecutor.h"
#include "JobTaskTable.h"
#include "Monitor.h"
#include "Process.h"
#include "IRemoteExecutor.h"
#include "JobTaskTable.h"
#include "Monitor.h"
#include "Process.h"
#include "Reporter.h"
#include "HostsManager.h"
#include "../arguments/MetricCountersConfig.h"
#include "../data/ProcessStatistics.h"
namespace hpc
{
namespace core
{
class RemoteExecutor : public IRemoteExecutor
{
public:
RemoteExecutor(const std::string& networkName);
~RemoteExecutor() { pthread_rwlock_destroy(&this->lock); }
virtual web::json::value StartJobAndTask(hpc::arguments::StartJobAndTaskArgs&& args, const std::string& callbackUri);
virtual web::json::value StartTask(hpc::arguments::StartTaskArgs&& args, const std::string& callbackUri);
virtual web::json::value EndJob(hpc::arguments::EndJobArgs&& args);
virtual web::json::value EndTask(hpc::arguments::EndTaskArgs&& args, const std::string& callbackUri);
virtual web::json::value Ping(const std::string& callbackUri);
virtual web::json::value Metric(const std::string& callbackUri);
virtual web::json::value MetricConfig(hpc::arguments::MetricCountersConfig&& config, const std::string& callbackUri);
protected:
#include "HostsManager.h"
#include "../arguments/MetricCountersConfig.h"
#include "../data/ProcessStatistics.h"
namespace hpc
{
namespace core
{
class RemoteExecutor : public IRemoteExecutor
{
public:
RemoteExecutor(const std::string& networkName);
~RemoteExecutor() { pthread_rwlock_destroy(&this->lock); }
virtual web::json::value StartJobAndTask(hpc::arguments::StartJobAndTaskArgs&& args, const std::string& callbackUri);
virtual web::json::value StartTask(hpc::arguments::StartTaskArgs&& args, const std::string& callbackUri);
virtual web::json::value EndJob(hpc::arguments::EndJobArgs&& args);
virtual web::json::value EndTask(hpc::arguments::EndTaskArgs&& args, const std::string& callbackUri);
virtual web::json::value Ping(const std::string& callbackUri);
virtual web::json::value Metric(const std::string& callbackUri);
virtual web::json::value MetricConfig(hpc::arguments::MetricCountersConfig&& config, const std::string& callbackUri);
protected:
private:
static void* GracePeriodElapsed(void* data);
void StartHeartbeat(const std::string& callbackUri);
void StartMetric(const std::string& callbackUri);
void StartHostsManager();
void StartHostsManager();
const hpc::data::ProcessStatistics* TerminateTask(
int jobId, int taskId, int requeueCount,
uint64_t processKey, int exitCode, bool forced);
void ReportTaskCompletion(int jobId, int taskId, int taskRequeueCount, json::value jsonBody, const std::string& callbackUri);
const int UnknowId = 999;
const int NodeInfoReportInterval = 30;
const int UnknowId = 999;
const int NodeInfoReportInterval = 30;
const int MetricReportInterval = 1;
const int RegisterInterval = 300;
const int DefaultHostsFetchInterval = 300;
const int MinHostsFetchInterval = 30;
const int RegisterInterval = 300;
const int DefaultHostsFetchInterval = 300;
const int MinHostsFetchInterval = 30;
JobTaskTable jobTaskTable;
Monitor monitor;
std::unique_ptr<Reporter<json::value>> nodeInfoReporter;
std::unique_ptr<Reporter<json::value>> registerReporter;
std::unique_ptr<Reporter<std::vector<unsigned char>>> metricReporter;
std::unique_ptr<HostsManager> hostsManager;
JobTaskTable jobTaskTable;
Monitor monitor;
std::unique_ptr<Reporter<json::value>> nodeInfoReporter;
std::unique_ptr<Reporter<json::value>> registerReporter;
std::unique_ptr<Reporter<std::vector<unsigned char>>> metricReporter;
std::unique_ptr<HostsManager> hostsManager;
std::map<uint64_t, std::shared_ptr<Process>> processes;
std::map<int, std::tuple<std::string, bool, bool, bool, bool, std::string>> jobUsers;
std::map<std::string, std::set<int>> userJobs;
pthread_rwlock_t lock;
};
}
}
#endif // REMOTEEXECUTOR_H
pthread_rwlock_t lock;
};
}
}
#endif // REMOTEEXECUTOR_H

Просмотреть файл

@ -1,10 +1,10 @@
#include <cpprest/http_client.h>
#include "Reporter.h"
#include "../utils/Logger.h"
using namespace hpc::core;
using namespace hpc::utils;
using namespace web::http::client;
using namespace web::http;
#include <cpprest/http_client.h>
#include "Reporter.h"
#include "../utils/Logger.h"
using namespace hpc::core;
using namespace hpc::utils;
using namespace web::http::client;
using namespace web::http;

Просмотреть файл

@ -1,89 +1,89 @@
#ifndef REPORTER_H
#define REPORTER_H
#include <cpprest/json.h>
#ifndef REPORTER_H
#define REPORTER_H
#include <cpprest/json.h>
#include <functional>
#include "../utils/Logger.h"
using namespace hpc::utils;
namespace hpc
{
namespace core
#include "../utils/Logger.h"
using namespace hpc::utils;
namespace hpc
{
namespace core
{
template<typename ReportType>
class Reporter
{
public:
template<typename ReportType>
class Reporter
{
public:
Reporter(const std::string& uri, int hold, int interval, std::function<ReportType()> fetcher)
: reportUri(uri), valueFetcher(fetcher), intervalSeconds(interval), holdSeconds(hold)
{
: reportUri(uri), valueFetcher(fetcher), intervalSeconds(interval), holdSeconds(hold)
{
}
void Start()
{
if (!this->reportUri.empty())
{
pthread_create(&this->threadId, nullptr, ReportingThread, this);
}
if (!this->reportUri.empty())
{
pthread_create(&this->threadId, nullptr, ReportingThread, this);
}
}
void Stop()
{
this->isRunning = false;
if (this->threadId != 0)
this->isRunning = false;
if (this->threadId != 0)
{
while (this->inRequest) usleep(1);
pthread_cancel(this->threadId);
pthread_join(this->threadId, nullptr);
Logger::Debug("Destructed Reporter {0}", this->reportUri);
while (this->inRequest) usleep(1);
pthread_cancel(this->threadId);
pthread_join(this->threadId, nullptr);
Logger::Debug("Destructed Reporter {0}", this->reportUri);
}
}
}
virtual ~Reporter()
{
Logger::Debug("Destruct Reporter {0}", this->reportUri);
}
Logger::Debug("Destruct Reporter {0}", this->reportUri);
}
virtual void Report() = 0;
protected:
const std::string reportUri;
std::function<ReportType()> valueFetcher;
protected:
const std::string reportUri;
std::function<ReportType()> valueFetcher;
int intervalSeconds;
private:
private:
static void* ReportingThread(void* arg)
{
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, nullptr);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr);
{
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, nullptr);
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr);
Reporter* r = static_cast<Reporter*>(arg);
Reporter* r = static_cast<Reporter*>(arg);
sleep(r->holdSeconds);
while (r->isRunning)
while (r->isRunning)
{
if (!r->reportUri.empty())
if (!r->reportUri.empty())
{
r->inRequest = true;
r->Report();
r->inRequest = false;
}
sleep(r->intervalSeconds);
}
}
sleep(r->intervalSeconds);
}
pthread_exit(nullptr);
}
int holdSeconds;
pthread_exit(nullptr);
}
int holdSeconds;
pthread_t threadId = 0;
bool isRunning = true;
bool inRequest = false;
};
}
}
#endif // REPORTER_H
bool inRequest = false;
};
}
}
#endif // REPORTER_H

Просмотреть файл

@ -1,19 +1,19 @@
#include "JobInfo.h"
using namespace web;
using namespace hpc::data;
json::value JobInfo::ToJson() const
{
json::value j;
j["JobId"] = this->JobId;
std::vector<json::value> tasks;
std::transform(this->Tasks.cbegin(), this->Tasks.cend(), std::back_inserter(tasks), [](auto i) { return i.second->ToJson(); });
j["Tasks"] = json::value::array(tasks);
return std::move(j);
}
#include "JobInfo.h"
using namespace web;
using namespace hpc::data;
json::value JobInfo::ToJson() const
{
json::value j;
j["JobId"] = this->JobId;
std::vector<json::value> tasks;
std::transform(this->Tasks.cbegin(), this->Tasks.cend(), std::back_inserter(tasks), [](auto i) { return i.second->ToJson(); });
j["Tasks"] = json::value::array(tasks);
return std::move(j);
}

Просмотреть файл

@ -1,29 +1,29 @@
#include "NodeInfo.h"
#include "../utils/System.h"
using namespace web;
using namespace hpc::data;
using namespace hpc::utils;
NodeInfo::NodeInfo()
{
this->Name = System::GetNodeName();
}
json::value NodeInfo::ToJson()
{
json::value j;
j["Availability"] = json::value::number((int)this->Availability);
j["JustStarted"] = this->JustStarted;
j["MacAddress"] = json::value::string(this->MacAddress);
j["Name"] = json::value::string(this->Name);
std::vector<json::value> jobs;
std::transform(this->Jobs.cbegin(), this->Jobs.cend(), std::back_inserter(jobs), [](auto i) { return i.second->ToJson(); });
j["Jobs"] = json::value::array(jobs);
this->JustStarted = false;
return std::move(j);
}
#include "NodeInfo.h"
#include "../utils/System.h"
using namespace web;
using namespace hpc::data;
using namespace hpc::utils;
NodeInfo::NodeInfo()
{
this->Name = System::GetNodeName();
}
json::value NodeInfo::ToJson()
{
json::value j;
j["Availability"] = json::value::number((int)this->Availability);
j["JustStarted"] = this->JustStarted;
j["MacAddress"] = json::value::string(this->MacAddress);
j["Name"] = json::value::string(this->Name);
std::vector<json::value> jobs;
std::transform(this->Jobs.cbegin(), this->Jobs.cend(), std::back_inserter(jobs), [](auto i) { return i.second->ToJson(); });
j["Jobs"] = json::value::array(jobs);
this->JustStarted = false;
return std::move(j);
}

Просмотреть файл

@ -1,40 +1,40 @@
#ifndef NODEINFO_H
#define NODEINFO_H
#include <cpprest/json.h>
#include <string>
#include <map>
#include "JobInfo.h"
namespace hpc
{
namespace data
{
enum class NodeAvailability
{
AlwaysOn = 0,
Available = 1,
Occupied = 2
};
struct NodeInfo
{
public:
NodeInfo();
web::json::value ToJson();
NodeAvailability Availability;
bool JustStarted = true;
std::string MacAddress;
std::string Name;
std::map<int, std::shared_ptr<JobInfo>> Jobs;
protected:
private:
};
}
}
#endif // NODEINFO_H
#ifndef NODEINFO_H
#define NODEINFO_H
#include <cpprest/json.h>
#include <string>
#include <map>
#include "JobInfo.h"
namespace hpc
{
namespace data
{
enum class NodeAvailability
{
AlwaysOn = 0,
Available = 1,
Occupied = 2
};
struct NodeInfo
{
public:
NodeInfo();
web::json::value ToJson();
NodeAvailability Availability;
bool JustStarted = true;
std::string MacAddress;
std::string Name;
std::map<int, std::shared_ptr<JobInfo>> Jobs;
protected:
private:
};
}
}
#endif // NODEINFO_H

Просмотреть файл

@ -1,12 +1,12 @@
#include "OutputData.h"
json::value OutputData::ToJson() const
{
json::value j;
j["NodeName"] = json::value::string(this->NodeName);
json::value OutputData::ToJson() const
{
json::value j;
j["NodeName"] = json::value::string(this->NodeName);
j["Order"] = this->Order;
j["Content"] = json::value::string(this->Content);
j["Eof"] = this->Eof;
j["Eof"] = this->Eof;
return std::move(j);
}
return std::move(j);
}

Просмотреть файл

@ -1,7 +1,7 @@
#ifndef OUTPUTDATA_H
#define OUTPUTDATA_H
#include <cpprest/json.h>
#include <cpprest/json.h>
#include <string>
using namespace web;
@ -14,7 +14,7 @@ class OutputData
{
}
json::value ToJson() const;
json::value ToJson() const;
std::string NodeName;
int Order;

Просмотреть файл

@ -1,40 +1,40 @@
#include "TaskInfo.h"
#include "../utils/JsonHelper.h"
#include "../utils/String.h"
using namespace web;
using namespace hpc::data;
using namespace hpc::utils;
json::value TaskInfo::ToJson() const
{
json::value j;
j["TaskId"] = this->TaskId;
j["TaskRequeueCount"] = this->taskRequeueCount;
j["ExitCode"] = this->ExitCode;
j["Exited"] = this->Exited;
j["KernelProcessorTime"] = this->KernelProcessorTimeMs;
j["UserProcessorTime"] = this->UserProcessorTimeMs;
j["WorkingSet"] = this->WorkingSetKb;
j["NumberOfProcesses"] = this->GetProcessCount();
j["PrimaryTask"] = this->IsPrimaryTask;
j["Message"] = JsonHelper<std::string>::ToJson(this->Message);
#include "TaskInfo.h"
#include "../utils/JsonHelper.h"
#include "../utils/String.h"
using namespace web;
using namespace hpc::data;
using namespace hpc::utils;
json::value TaskInfo::ToJson() const
{
json::value j;
j["TaskId"] = this->TaskId;
j["TaskRequeueCount"] = this->taskRequeueCount;
j["ExitCode"] = this->ExitCode;
j["Exited"] = this->Exited;
j["KernelProcessorTime"] = this->KernelProcessorTimeMs;
j["UserProcessorTime"] = this->UserProcessorTimeMs;
j["WorkingSet"] = this->WorkingSetKb;
j["NumberOfProcesses"] = this->GetProcessCount();
j["PrimaryTask"] = this->IsPrimaryTask;
j["Message"] = JsonHelper<std::string>::ToJson(this->Message);
j["ProcessIds"] = JsonHelper<std::string>::ToJson(String::Join<','>(this->ProcessIds));
return j;
}
json::value TaskInfo::ToCompletionEventArgJson() const
{
json::value j = this->ToJson();
json::value jobIdArg;
jobIdArg["JobId"] = this->JobId;
jobIdArg["TaskInfo"] = j;
jobIdArg["NodeName"] = json::value::string(this->NodeName);
return jobIdArg;
return j;
}
json::value TaskInfo::ToCompletionEventArgJson() const
{
json::value j = this->ToJson();
json::value jobIdArg;
jobIdArg["JobId"] = this->JobId;
jobIdArg["TaskInfo"] = j;
jobIdArg["NodeName"] = json::value::string(this->NodeName);
return jobIdArg;
}
void TaskInfo::AssignFromStat(const ProcessStatistics& stat)
@ -43,4 +43,4 @@ void TaskInfo::AssignFromStat(const ProcessStatistics& stat)
this->UserProcessorTimeMs = stat.UserTimeMs;
this->ProcessIds = stat.ProcessIds;
this->WorkingSetKb = stat.WorkingSetKb;
}
}

Просмотреть файл

@ -32,8 +32,8 @@ namespace hpc
{
if (this->GracefulThreadId)
{
pthread_cancel(this->GracefulThreadId);
pthread_join(this->GracefulThreadId, nullptr);
pthread_cancel(this->GracefulThreadId);
pthread_join(this->GracefulThreadId, nullptr);
Logger::Debug("Destructed TaskInfo GracefulThread {0}", this->GracefulThreadId);
this->GracefulThreadId = 0;
}

Просмотреть файл

@ -0,0 +1,122 @@
#include "ExecutionFilter.h"
#include "../utils/Logger.h"
#include "../common/ErrorCodes.h"
#include "../utils/System.h"
#include "../core/Process.h"
#include "../data/ProcessStatistics.h"
using namespace hpc::filters;
using namespace hpc::utils;
using namespace hpc::common;
using namespace hpc::data;
int ExecutionFilter::OnJobStart(int jobId, const json::value& input, json::value& output, std::string& executionMessage)
{
return this->ExecuteFilter(JobStartFilter, jobId, 0, 0, input, output, executionMessage);
}
int ExecutionFilter::OnJobEnd(int jobId, const json::value& input, json::value& output, std::string& executionMessage)
{
return this->ExecuteFilter(JobEndFilter, jobId, 0, 0, input, output, executionMessage);
}
int ExecutionFilter::OnTaskStart(int jobId, int taskId, int requeueCount, const json::value& input, json::value& output, std::string& executionMessage)
{
return this->ExecuteFilter(TaskStartFilter, jobId, taskId, requeueCount, input, output, executionMessage);
}
int ExecutionFilter::ExecuteFilter(const std::string& filterType, int jobId, int taskId, int requeueCount, const json::value& input, json::value& output, std::string& executionMessage)
{
auto filterIt = this->filterFiles.find(filterType);
if (filterIt == this->filterFiles.end())
{
Logger::Error(jobId, taskId, requeueCount, "Unknown filter type {0}", filterType);
return (int)ErrorCodes::UnknownFilter;
}
std::string filterFile = filterIt->second;
if (filterFile.empty())
{
Logger::Error(jobId, taskId, requeueCount, "{0} not detected, skip", filterType);
output = input;
return 0;
}
char folder[256];
sprintf(folder, "/tmp/nodemanager_executionfilter_%d.XXXXXX", jobId);
int ret = System::CreateTempFolder(folder, "root");
if (ret != 0)
{
Logger::Error(jobId, taskId, requeueCount, "{0} {1}: Failed to create folder {2}, exit code {3}", filterType, filterFile, folder, ret);
return ret;
}
std::string folderString = folder;
std::string stdinFile = folderString + "/stdin.txt";
ret = System::WriteStringToFile(stdinFile, input.serialize());
if (ret != 0)
{
Logger::Error(jobId, taskId, requeueCount, "{0} {1}: Failed to create stdin file {2}, exit code {3}", filterType, filterFile, stdinFile, ret);
return ret;
}
std::string stdoutFile = folderString + "/stdout.txt";
std::string stderrFile = stdoutFile;
Process p(
jobId, taskId, requeueCount, filterFile, stdoutFile, stderrFile, stdinFile, folderString, "root",
std::vector<uint64_t>(), std::map<std::string, std::string>(),
[&ret, &executionMessage, jobId, taskId, requeueCount, &filterType, &filterFile]
(int exitCode, std::string&& message, const ProcessStatistics& stat)
{
ret = exitCode;
if (ret != 0)
{
Logger::Error(jobId, taskId, requeueCount, "{0} {1}: returned {2}, message {3}", filterType, filterFile, ret, message);
}
else
{
Logger::Info(jobId, taskId, requeueCount, "{0} {1}: success with message {2}", filterType, filterFile, message);
}
executionMessage = message;
});
pthread_t processTid;
p.Start().then([&ret, &processTid, jobId, taskId, requeueCount, &filterType, &filterFile] (std::pair<pid_t, pthread_t> ids)
{
Logger::Info(jobId, taskId, requeueCount, "{0} {1}: pid {2} tid {3}", filterType, filterFile, ids.first, ids.second);
processTid = ids.second;
}).wait();
int joinRet = pthread_join(processTid, nullptr);
if (joinRet != 0)
{
Logger::Error(jobId, taskId, requeueCount, "{0} {1}: join thread {2} ret {3}", filterType, filterFile, processTid, joinRet);
if (0 == ret) { ret = joinRet; }
}
if (0 == ret)
{
std::ifstream fsStdout(stdoutFile, std::ios::in);
if (fsStdout)
{
std::string content((std::istreambuf_iterator<char>(fsStdout)), std::istreambuf_iterator<char>());
Logger::Info(jobId, taskId, requeueCount, "{0} {1}: plugin output {2}", filterType, filterFile, content);
output = json::value::parse(content);
fsStdout.close();
}
else
{
return (int)ErrorCodes::ReadFileError;
}
}
return ret;
}

Просмотреть файл

@ -0,0 +1,38 @@
#ifndef EXECUTIONFILTER_H
#define EXECUTIONFILTER_H
#include <string>
#include <cpprest/json.h>
#include "../core/NodeManagerConfig.h"
namespace hpc
{
namespace filters
{
using namespace hpc::core;
class ExecutionFilter
{
public:
ExecutionFilter()
{
filterFiles[JobStartFilter] = NodeManagerConfig::GetJobStartFilter();
filterFiles[JobEndFilter] = NodeManagerConfig::GetJobEndFilter();
filterFiles[TaskStartFilter] = NodeManagerConfig::GetTaskStartFilter();
}
int OnJobStart(int jobId, const json::value& input, json::value& output, std::string& executionMessage);
int OnJobEnd(int jobId, const json::value& input, json::value& output, std::string& executionMessage);
int OnTaskStart(int jobId, int taskId, int requeueCount, const json::value& input, json::value& output, std::string& executionMessage);
int ExecuteFilter(const std::string& filterType, int jobId, int taskId, int requeueCount, const json::value& input, json::value& output, std::string& executionMessage);
private:
std::map<std::string, std::string> filterFiles;
const std::string JobStartFilter = "JobStartFilter";
const std::string JobEndFilter = "JobEndFilter";
const std::string TaskStartFilter = "TaskStartFilter";
};
}
}
#endif

Просмотреть файл

@ -1,13 +1,13 @@
#include <iostream>
#include <string>
#include <cpprest/http_listener.h>
#include <cpprest/json.h>
#include "utils/Logger.h"
#include "core/RemoteCommunicator.h"
#include <string>
#include <cpprest/http_listener.h>
#include <cpprest/json.h>
#include "utils/Logger.h"
#include "core/RemoteCommunicator.h"
#include "core/RemoteExecutor.h"
#include "Version.h"
#include "core/NodeManagerConfig.h"
#include "core/NodeManagerConfig.h"
#include "common/ErrorCodes.h"
#include "core/HttpHelper.h"
@ -15,12 +15,12 @@
#include "test/TestRunner.h"
using namespace hpc::tests;
#endif // DEBUG
using namespace std;
using namespace hpc::core;
#endif // DEBUG
using namespace std;
using namespace hpc::core;
using namespace hpc::utils;
using namespace hpc;
using namespace hpc;
using namespace hpc::common;
using namespace web::http::experimental::listener;
@ -28,9 +28,9 @@ void Cleanup()
{
Logger::Info("Cleaning up zombie processes");
Process::Cleanup();
}
}
int main(int argc, char* argv[])
int main(int argc, char* argv[])
{
if (argc > 1)
{
@ -40,8 +40,8 @@ int main(int argc, char* argv[])
return 0;
}
}
std::cout << "Node manager started." << std::endl;
std::cout << "Node manager started." << std::endl;
Logger::Info("Log system works.");
Logger::Info("Version: {0}", Version::GetVersion());
@ -60,7 +60,7 @@ int main(int argc, char* argv[])
ret);
return ret;
}
}
#ifdef DEBUG
@ -79,27 +79,27 @@ int main(int argc, char* argv[])
Cleanup();
Logger::Debug(
"Trusted CA File: {0}",
"Trusted CA File: {0}",
NodeManagerConfig::GetTrustedCAFile());
const std::string networkName = "";
RemoteExecutor executor(networkName);
const std::string networkName = "";
RemoteExecutor executor(networkName);
http_listener_config config;
config.set_ssl_context_callback([] (auto& ctx)
{
HttpHelper::ConfigListenerSslContext(ctx);
});
RemoteCommunicator rc(executor, config);
rc.Open();
while (true)
{
sleep(100);
}
// rc.Close();
return 0;
}
RemoteCommunicator rc(executor, config);
rc.Open();
while (true)
{
sleep(100);
}
// rc.Close();
return 0;
}

Просмотреть файл

@ -1,211 +1,211 @@
# node manager makefile.
# casablanca
CASABLANCA = ../../evancasa/cpprestsdk
CASA_RELEASE = $(CASABLANCA)/Release
CASA_INC = $(CASA_RELEASE)/include
CASA_LIB_DEBUG = $(CASA_RELEASE)/build.debug/Binaries
CASA_LIB_RELEASE = $(CASA_RELEASE)/build.release/Binaries
# spdlog
SPDLOG = ../../spdlog
SPDLOG_INC = $(SPDLOG)/include
# tools
CC = gcc
CXX = g++
AR = ar
LD = g++
WINDRES = windres
# common
LIBOUTDIR = lib
INSTALLDIR = /opt/hpcnodemanager
INC = -I$(CASA_INC) -I$(SPDLOG_INC)
CFLAGS = -Wall -std=c++14 -Wno-unused-local-typedefs
LIB = -lcpprest -lpthread -lboost_system -lssl -lcrypto
LDFLAGS = -Wl,-rpath,\$$ORIGIN/$(LIBOUTDIR),-I$(INSTALLDIR)/$(LIBOUTDIR)/ld-linux-x86-64.so.2
BINARY = nodemanager
DEBUG = debug
RELEASE = release
OBJDIR = obj
OUTDIR = bin
CPPSWITHDIR = $(wildcard */*.cpp *.cpp)
CPPS = $(notdir $(CPPSWITHDIR))
OBJS = $(CPPS:.cpp=.o)
# debug
MACRO_DEBUG = -D DEBUG
INC_DEBUG = $(INC)
CFLAGS_DEBUG = $(CFLAGS) -g
LIBDIR_DEBUG = -L$(CASA_LIB_DEBUG)
LIB_DEBUG = $(LIB)
LDFLAGS_DEBUG = $(LDFLAGS)
OBJDIR_DEBUG = $(OBJDIR)/$(DEBUG)
OBJDIRSED_DEBUG = $(OBJDIR)\/$(DEBUG)
OUTDIR_DEBUG = $(OUTDIR)/$(DEBUG)
OUT_DEBUG = $(OUTDIR_DEBUG)/$(BINARY)
OBJS_DEBUG = $(addprefix $(OBJDIR_DEBUG)/, $(OBJS))
# release
MACRO_RELEASE =
INC_RELEASE = $(INC)
CFLAGS_RELEASE = $(CFLAGS) -O2
LIBDIR_RELEASE = -L$(CASA_LIB_RELEASE)
LIB_RELEASE = $(LIB)
LDFLAGS_RELEASE = $(LDFLAGS) -s
OBJDIR_RELEASE = $(OBJDIR)/$(RELEASE)
OBJDIRSED_RELEASE = $(OBJDIR)\/$(RELEASE)
OUTDIR_RELEASE = $(OUTDIR)/$(RELEASE)
OUT_RELEASE = $(OUTDIR_RELEASE)/$(BINARY)
OBJS_RELEASE = $(addprefix $(OBJDIR_RELEASE)/, $(OBJS))
# pseudo targets
all: debug release
rebuild: clean all
clean: clean_debug clean_release
release: before_release out_release after_release
debug: before_debug out_debug after_debug
redebug: clean_debug debug
rerelease: clean_release release
# color output
define compile
$(indent);
@tput bold;
@echo -n "Compiling [ ";
@tput setaf 6;
@echo -n $@;
@tput setaf 7;
@echo -n " ] using [ ";
@tput setaf 5;
@echo -n $<;
@tput setaf 7;
@echo " ]";
@tput sgr0
@$(CXX) $(1) $(2) $(3) -c $< -o $@
endef
define building
$(indent);
@tput bold;
@echo -n "Building [ ";
@tput setaf 6;
@echo -n $(1);
@tput setaf 7;
@echo " ]";
@tput sgr0
endef
indent=@printf '\t'
define prepare
@echo ""
@tput bold;
@echo -n "Preparing to build [ ";
@tput setaf 2;
@echo -n $(1);
@tput setaf 7;
@echo " ] target";
@tput sgr0;
$(indent);
[ -d $(2) ] || mkdir -p $(2)
$(indent);
[ -d $(3) ] || mkdir -p $(3)
@echo ""
@tput bold;
@echo -n "Start to build [ ";
@tput setaf 2;
@echo -n $(1);
@tput setaf 7;
@echo " ] target";
@tput sgr0;
endef
define finish
$(indent);
cp config/* $(2)/
$(indent);
cp scripts/* $(2)/
$(indent);
[ -d $(2)/logs ] || mkdir $(2)/logs
$(indent);
[ -d $(2)/$(LIBOUTDIR) ] || mkdir $(2)/$(LIBOUTDIR)
$(indent);
cp $(shell ldd $(3) | cut -f3 -d' ') $(2)/$(LIBOUTDIR) 2> /dev/null || :
@tput bold;
@echo -n "Finished to build [ ";
@tput setaf 2;
@echo -n $(1);
@tput setaf 7;
@echo " ] target";
@tput sgr0;
endef
# debug targets
before_debug:
$(call prepare,$(DEBUG),$(OUTDIR_DEBUG),$(OBJDIR_DEBUG))
after_debug:
$(call finish,$(DEBUG),$(OUTDIR_DEBUG),$(OUT_DEBUG))
out_debug: before_debug $(OUT_DEBUG)
$(OUT_DEBUG): $(OBJS_DEBUG)
$(call building,$@)
$(indent);
$(LD) $(LIBDIR_DEBUG) -o $@ $(OBJDIR_DEBUG)/*.o $(LDFLAGS_DEBUG) $(LIB_DEBUG)
clean_debug:
rm -rf $(OUTDIR_DEBUG)
rm -rf $(OBJDIR_DEBUG)
$(OBJDIR_DEBUG)/%.o: */%.cpp
$(call compile,$(CFLAGS_DEBUG),$(INC_DEBUG),$(MACRO_DEBUG))
$(OBJDIR_DEBUG)/%.o: %.cpp
$(call compile,$(CFLAGS_DEBUG),$(INC_DEBUG),$(MACRO_DEBUG))
# release targets
before_release:
$(call prepare,$(RELEASE),$(OUTDIR_RELEASE),$(OBJDIR_RELEASE))
after_release:
$(call finish,$(RELEASE),$(OUTDIR_RELEASE),$(OUT_RELEASE))
out_release: before_release $(OUT_RELEASE)
$(OUT_RELEASE): $(OBJS_RELEASE)
$(call building,$@)
$(indent);
$(LD) $(LIBDIR_RELEASE) -o $@ $(OBJDIR_RELEASE)/*.o $(LDFLAGS_RELEASE) $(LIB_RELEASE)
clean_release:
rm -rf $(OUTDIR_RELEASE)
rm -rf $(OBJDIR_RELEASE)
$(OBJDIR_RELEASE)/%.o: */%.cpp
$(call compile,$(CFLAGS_RELEASE),$(INC_RELEASE),$(MACRO_RELEASE))
$(OBJDIR_RELEASE)/%.o: %.cpp
$(call compile,$(CFLAGS_RELEASE),$(INC_RELEASE),$(MACRO_RELEASE))
# phony targets
.PHONY: before_debug after_debug clean_debug before_release after_release clean_release
# dependency
include deps
deps: $(CPPSWITHDIR)
@tput bold
@echo "Rebuilding dependencies ... "
@tput sgr0
$(indent)
$(CXX) $(CFLAGS) -MM *.cpp */*.cpp > rawdeps
$(indent)
cat rawdeps | sed -e "s/\(.*\)\.o/$(OBJDIRSED_DEBUG)\/\1\.o/" > deps
$(indent)
cat rawdeps | sed -e "s/\(.*\)\.o/$(OBJDIRSED_RELEASE)\/\1\.o/" >> deps
@tput setaf 2
@tput bold
@echo "Done."
@tput sgr0;
# node manager makefile.
# casablanca
CASABLANCA = ../../evancasa/cpprestsdk
CASA_RELEASE = $(CASABLANCA)/Release
CASA_INC = $(CASA_RELEASE)/include
CASA_LIB_DEBUG = $(CASA_RELEASE)/build.debug/Binaries
CASA_LIB_RELEASE = $(CASA_RELEASE)/build.release/Binaries
# spdlog
SPDLOG = ../../spdlog
SPDLOG_INC = $(SPDLOG)/include
# tools
CC = gcc
CXX = g++
AR = ar
LD = g++
WINDRES = windres
# common
LIBOUTDIR = lib
INSTALLDIR = /opt/hpcnodemanager
INC = -I$(CASA_INC) -I$(SPDLOG_INC)
CFLAGS = -Wall -std=c++14 -Wno-unused-local-typedefs
LIB = -lcpprest -lpthread -lboost_system -lssl -lcrypto
LDFLAGS = -Wl,-rpath,\$$ORIGIN/$(LIBOUTDIR),-I$(INSTALLDIR)/$(LIBOUTDIR)/ld-linux-x86-64.so.2
BINARY = nodemanager
DEBUG = debug
RELEASE = release
OBJDIR = obj
OUTDIR = bin
CPPSWITHDIR = $(wildcard */*.cpp *.cpp)
CPPS = $(notdir $(CPPSWITHDIR))
OBJS = $(CPPS:.cpp=.o)
# debug
MACRO_DEBUG = -D DEBUG
INC_DEBUG = $(INC)
CFLAGS_DEBUG = $(CFLAGS) -g
LIBDIR_DEBUG = -L$(CASA_LIB_DEBUG)
LIB_DEBUG = $(LIB)
LDFLAGS_DEBUG = $(LDFLAGS)
OBJDIR_DEBUG = $(OBJDIR)/$(DEBUG)
OBJDIRSED_DEBUG = $(OBJDIR)\/$(DEBUG)
OUTDIR_DEBUG = $(OUTDIR)/$(DEBUG)
OUT_DEBUG = $(OUTDIR_DEBUG)/$(BINARY)
OBJS_DEBUG = $(addprefix $(OBJDIR_DEBUG)/, $(OBJS))
# release
MACRO_RELEASE =
INC_RELEASE = $(INC)
CFLAGS_RELEASE = $(CFLAGS) -O2
LIBDIR_RELEASE = -L$(CASA_LIB_RELEASE)
LIB_RELEASE = $(LIB)
LDFLAGS_RELEASE = $(LDFLAGS) -s
OBJDIR_RELEASE = $(OBJDIR)/$(RELEASE)
OBJDIRSED_RELEASE = $(OBJDIR)\/$(RELEASE)
OUTDIR_RELEASE = $(OUTDIR)/$(RELEASE)
OUT_RELEASE = $(OUTDIR_RELEASE)/$(BINARY)
OBJS_RELEASE = $(addprefix $(OBJDIR_RELEASE)/, $(OBJS))
# pseudo targets
all: debug release
rebuild: clean all
clean: clean_debug clean_release
release: before_release out_release after_release
debug: before_debug out_debug after_debug
redebug: clean_debug debug
rerelease: clean_release release
# color output
define compile
$(indent);
@tput bold;
@echo -n "Compiling [ ";
@tput setaf 6;
@echo -n $@;
@tput setaf 7;
@echo -n " ] using [ ";
@tput setaf 5;
@echo -n $<;
@tput setaf 7;
@echo " ]";
@tput sgr0
@$(CXX) $(1) $(2) $(3) -c $< -o $@
endef
define building
$(indent);
@tput bold;
@echo -n "Building [ ";
@tput setaf 6;
@echo -n $(1);
@tput setaf 7;
@echo " ]";
@tput sgr0
endef
indent=@printf '\t'
define prepare
@echo ""
@tput bold;
@echo -n "Preparing to build [ ";
@tput setaf 2;
@echo -n $(1);
@tput setaf 7;
@echo " ] target";
@tput sgr0;
$(indent);
[ -d $(2) ] || mkdir -p $(2)
$(indent);
[ -d $(3) ] || mkdir -p $(3)
@echo ""
@tput bold;
@echo -n "Start to build [ ";
@tput setaf 2;
@echo -n $(1);
@tput setaf 7;
@echo " ] target";
@tput sgr0;
endef
define finish
$(indent);
cp config/* $(2)/
$(indent);
cp scripts/* $(2)/
$(indent);
[ -d $(2)/logs ] || mkdir $(2)/logs
$(indent);
[ -d $(2)/$(LIBOUTDIR) ] || mkdir $(2)/$(LIBOUTDIR)
$(indent);
cp $(shell ldd $(3) | cut -f3 -d' ') $(2)/$(LIBOUTDIR) 2> /dev/null || :
@tput bold;
@echo -n "Finished to build [ ";
@tput setaf 2;
@echo -n $(1);
@tput setaf 7;
@echo " ] target";
@tput sgr0;
endef
# debug targets
before_debug:
$(call prepare,$(DEBUG),$(OUTDIR_DEBUG),$(OBJDIR_DEBUG))
after_debug:
$(call finish,$(DEBUG),$(OUTDIR_DEBUG),$(OUT_DEBUG))
out_debug: before_debug $(OUT_DEBUG)
$(OUT_DEBUG): $(OBJS_DEBUG)
$(call building,$@)
$(indent);
$(LD) $(LIBDIR_DEBUG) -o $@ $(OBJDIR_DEBUG)/*.o $(LDFLAGS_DEBUG) $(LIB_DEBUG)
clean_debug:
rm -rf $(OUTDIR_DEBUG)
rm -rf $(OBJDIR_DEBUG)
$(OBJDIR_DEBUG)/%.o: */%.cpp
$(call compile,$(CFLAGS_DEBUG),$(INC_DEBUG),$(MACRO_DEBUG))
$(OBJDIR_DEBUG)/%.o: %.cpp
$(call compile,$(CFLAGS_DEBUG),$(INC_DEBUG),$(MACRO_DEBUG))
# release targets
before_release:
$(call prepare,$(RELEASE),$(OUTDIR_RELEASE),$(OBJDIR_RELEASE))
after_release:
$(call finish,$(RELEASE),$(OUTDIR_RELEASE),$(OUT_RELEASE))
out_release: before_release $(OUT_RELEASE)
$(OUT_RELEASE): $(OBJS_RELEASE)
$(call building,$@)
$(indent);
$(LD) $(LIBDIR_RELEASE) -o $@ $(OBJDIR_RELEASE)/*.o $(LDFLAGS_RELEASE) $(LIB_RELEASE)
clean_release:
rm -rf $(OUTDIR_RELEASE)
rm -rf $(OBJDIR_RELEASE)
$(OBJDIR_RELEASE)/%.o: */%.cpp
$(call compile,$(CFLAGS_RELEASE),$(INC_RELEASE),$(MACRO_RELEASE))
$(OBJDIR_RELEASE)/%.o: %.cpp
$(call compile,$(CFLAGS_RELEASE),$(INC_RELEASE),$(MACRO_RELEASE))
# phony targets
.PHONY: before_debug after_debug clean_debug before_release after_release clean_release
# dependency
include deps
deps: $(CPPSWITHDIR)
@tput bold
@echo "Rebuilding dependencies ... "
@tput sgr0
$(indent)
$(CXX) $(CFLAGS) -MM *.cpp */*.cpp > rawdeps
$(indent)
cat rawdeps | sed -e "s/\(.*\)\.o/$(OBJDIRSED_DEBUG)\/\1\.o/" > deps
$(indent)
cat rawdeps | sed -e "s/\(.*\)\.o/$(OBJDIRSED_RELEASE)\/\1\.o/" >> deps
@tput setaf 2
@tput bold
@echo "Done."
@tput sgr0;

Просмотреть файл

@ -0,0 +1,105 @@
#include "ExecutionFilterTest.h"
#ifdef DEBUG
#include <iostream>
#include <fstream>
#include <string>
#include <cpprest/http_listener.h>
#include <cpprest/json.h>
#include "../utils/JsonHelper.h"
#include "../core/Process.h"
#include "../utils/Logger.h"
#include "../core/RemoteCommunicator.h"
#include "../core/RemoteExecutor.h"
#include "../core/NodeManagerConfig.h"
#include "../common/ErrorCodes.h"
#include "../core/HttpHelper.h"
#include "../utils/System.h"
using namespace hpc::tests;
using namespace hpc::core;
using namespace hpc::data;
using namespace hpc::utils;
using namespace web::http;
using namespace web;
using namespace web::http::experimental::listener;
using namespace web::http::client;
bool ExecutionFilterTest::JobStart()
{
bool result = true;
const std::string networkName = "";
RemoteExecutor executor(networkName);
http_listener_config config;
config.set_ssl_context_callback([] (auto& ctx)
{
HttpHelper::ConfigListenerSslContext(ctx);
});
RemoteCommunicator rc(executor, config);
rc.Open();
http_client client(U("http://localhost:40000/"));
uri_builder builder(U("/api/node/startjobandtask"));
std::string stdoutFile = "/tmp/JobStartFilterTest";
std::string output;
System::ExecuteCommandOut(output, "rm", stdoutFile);
ProcessStartInfo psi(
"echo 123",
"",
std::string(stdoutFile),
"",
"",
0,
std::vector<uint64_t>(),
{ { "CCP_ISADMIN", "1" } });
StartJobAndTaskArgs arg(
88,
88,
std::move(psi),
"",
"");
json::value v1 = arg.ToJson();
Logger::Debug("Debug JSON: {0}", v1.serialize());
StartJobAndTaskArgs arg1 = StartJobAndTaskArgs::FromJson(v1);
client.request(methods::POST, builder.to_string(), arg.ToJson()).then([&] (http_response response)
{
if (status_codes::OK != response.status_code())
{
Logger::Debug("The response code {0}", response.status_code());
result = false;
}
}).wait();
sleep(2);
// verify the job start filter is called.
// The command line is changed to echo 456.
std::ifstream outFile(stdoutFile, std::ios::in);
if (outFile)
{
int num;
outFile >> num;
Logger::Debug("stdout value {0}, expected value {1}", num, 456);
result &= num == 456;
}
else
{
Logger::Debug("Stdout file not found {0}", stdoutFile);
result = false;
}
return result;
}
#endif // DEBUG

Просмотреть файл

@ -0,0 +1,25 @@
#ifndef EXECUTIONFILTERTEST_H
#define EXECUTIONFILTERTEST_H
#ifdef DEBUG
namespace hpc
{
namespace tests
{
class ExecutionFilterTest
{
public:
ExecutionFilterTest() { }
static bool JobStart();
protected:
private:
};
}
}
#endif // DEBUG
#endif // EXECUTIONFILTERTEST_H

Просмотреть файл

@ -9,7 +9,7 @@
using namespace hpc::tests;
using namespace hpc::core;
using namespace hpc::data;
using namespace web::http;
using namespace web;
@ -27,8 +27,8 @@ bool ProcessTest::ClusRun()
int callbackCount = 0;
bool callbacked = false;
listener.support(
methods::POST,
listener.support(
methods::POST,
[&result, &callbackCount](auto request)
{
auto j = request.extract_json().get();
@ -46,7 +46,7 @@ bool ProcessTest::ClusRun()
listener.open().wait();
Logger::Info("Listener opened");
Logger::Info("Listener opened");
Process p(
25, 28, 1, "echo 30; sleep 1; echo 31", listeningUri, "", "", "", "root",
@ -65,16 +65,17 @@ bool ProcessTest::ClusRun()
callbacked = true;
});
p.Start().then([&result, &started] (pid_t pid)
pthread_t threadId;
p.Start().then([&result, &started, &threadId] (std::pair<pid_t, pthread_t> ids)
{
if (pid <= 0) result = false;
Logger::Info("pid {0}, result {1}", pid, result);
if (ids.first <= 0) result = false;
Logger::Info("pid {0}, result {1}", ids.first, result);
threadId = ids.second;
started = true;
});
}).wait();
sleep(2);
pthread_join(threadId, nullptr);
if (!(callbacked && started)) result = false;
@ -123,6 +124,7 @@ eesGSKS5l22ZMXJNShgzPKmv3HpH22CSVpO0sNZ6R+iG8a3oq4QkU61MT1CfGoMI\
a8lxTKnZCsRXU1HexqZs+DSc+30tz50bNqLdido/l5B4EJnQP03ciO0=\
-----END RSA PRIVATE KEY-----";
System::DeleteUser(userName);
std::string publicKey = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDEkoEAGGc6wT16d4Ye+yN2hcqigdTGlMcjUlW6cAmRWYXLwkKoW3WlX3xAK0oQdMLqRDu2PVRPY3qfHURj0EEellpydeaSekp1fg27Rw2VKmEumu6Wxwo9HddXORPAQXTQ4yI0lWSerypckXVPeVjHetbkSci2foLedCbeBA9c/RyRgIUl227/pJKDNX2Rpqly0sY82nVWN/0p4NAyslexA0fGdBx+IgKnbU2JQKJeiwOomtEB/N492XRfCw2eCi7Ly3R8+U1KeBm+zH6Q8aH8ApqQohhLRw71bcWZ1g1bxd6HORxXOu0mFTzHbWFcZ9ILtXRl4Pt0x5Mve1AJXEKb hpclabsa@longhaulLN5-033\n";
int ret = System::CreateUser(userName, password);
@ -151,16 +153,17 @@ a8lxTKnZCsRXU1HexqZs+DSc+30tz50bNqLdido/l5B4EJnQP03ciO0=\
callbacked = true;
});
p.Start().then([&result, &started] (pid_t pid)
pthread_t threadId;
p.Start().then([&result, &started, &threadId] (std::pair<pid_t, pthread_t> ids)
{
if (pid <= 0) result = false;
Logger::Info("pid {0}, result {1}", pid, result);
if (ids.first <= 0) result = false;
Logger::Info("pid {0}, result {1}", ids.first, result);
threadId = ids.second;
started = true;
});
}).wait();
sleep(1);
pthread_join(threadId, nullptr);
if (!(callbacked && started)) result = false;
@ -213,6 +216,7 @@ a8lxTKnZCsRXU1HexqZs+DSc+30tz50bNqLdido/l5B4EJnQP03ciO0=\
-----END RSA PRIVATE KEY-----";
std::string publicKey = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDEkoEAGGc6wT16d4Ye+yN2hcqigdTGlMcjUlW6cAmRWYXLwkKoW3WlX3xAK0oQdMLqRDu2PVRPY3qfHURj0EEellpydeaSekp1fg27Rw2VKmEumu6Wxwo9HddXORPAQXTQ4yI0lWSerypckXVPeVjHetbkSci2foLedCbeBA9c/RyRgIUl227/pJKDNX2Rpqly0sY82nVWN/0p4NAyslexA0fGdBx+IgKnbU2JQKJeiwOomtEB/N492XRfCw2eCi7Ly3R8+U1KeBm+zH6Q8aH8ApqQohhLRw71bcWZ1g1bxd6HORxXOu0mFTzHbWFcZ9ILtXRl4Pt0x5Mve1AJXEKb hpclabsa@longhaulLN5-033\n";
System::DeleteUser(userName);
int ret = System::CreateUser(userName, password);
if (ret != 0) return false;
@ -240,16 +244,17 @@ a8lxTKnZCsRXU1HexqZs+DSc+30tz50bNqLdido/l5B4EJnQP03ciO0=\
callbacked = true;
});
p.Start().then([&result, &started] (pid_t pid)
pthread_t threadId;
p.Start().then([&result, &started, &threadId] (std::pair<pid_t, pthread_t> ids)
{
if (pid <= 0) result = false;
Logger::Info("pid {0}, result {1}", pid, result);
if (ids.first <= 0) result = false;
Logger::Info("pid {0}, result {1}", ids.first, result);
threadId = ids.second;
started = true;
});
}).wait();
sleep(1);
pthread_join(threadId, nullptr);
if (!(callbacked && started)) result = false;
@ -302,6 +307,7 @@ a8lxTKnZCsRXU1HexqZs+DSc+30tz50bNqLdido/l5B4EJnQP03ciO0=\
-----END RSA PRIVATE KEY-----";
std::string publicKey = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDEkoEAGGc6wT16d4Ye+yN2hcqigdTGlMcjUlW6cAmRWYXLwkKoW3WlX3xAK0oQdMLqRDu2PVRPY3qfHURj0EEellpydeaSekp1fg27Rw2VKmEumu6Wxwo9HddXORPAQXTQ4yI0lWSerypckXVPeVjHetbkSci2foLedCbeBA9c/RyRgIUl227/pJKDNX2Rpqly0sY82nVWN/0p4NAyslexA0fGdBx+IgKnbU2JQKJeiwOomtEB/N492XRfCw2eCi7Ly3R8+U1KeBm+zH6Q8aH8ApqQohhLRw71bcWZ1g1bxd6HORxXOu0mFTzHbWFcZ9ILtXRl4Pt0x5Mve1AJXEKb hpclabsa@longhaulLN5-033\n";
System::DeleteUser(userName);
int ret = System::CreateUser(userName, password);
if (ret != 0) return false;
@ -364,16 +370,17 @@ a8lxTKnZCsRXU1HexqZs+DSc+30tz50bNqLdido/l5B4EJnQP03ciO0=\
callbacked = true;
});
p.Start().then([&result, &started] (pid_t pid)
pthread_t threadId;
p.Start().then([&result, &started, &threadId] (std::pair<pid_t, pthread_t> ids)
{
if (pid <= 0) result = false;
Logger::Info("pid {0}, result {1}", pid, result);
if (ids.first <= 0) result = false;
Logger::Info("pid {0}, result {1}", ids.first, result);
threadId = ids.second;
started = true;
});
}).wait();
sleep(1);
pthread_join(threadId, nullptr);
if (!(callbacked && started)) result = false;

Просмотреть файл

@ -4,6 +4,7 @@
#include "../utils/Logger.h"
#include "ProcessTest.h"
#include "ExecutionFilterTest.h"
using namespace hpc::tests;
using namespace hpc::utils;
@ -14,6 +15,7 @@ TestRunner::TestRunner()
this->tests["Affinity"] = []() { return ProcessTest::Affinity(); };
this->tests["RemainingProcess"] = []() { return ProcessTest::RemainingProcess(); };
this->tests["ClusRun"] = []() { return ProcessTest::ClusRun(); };
this->tests["FilterJobStart"] = []() { return ExecutionFilterTest::JobStart(); };
}
bool TestRunner::Run()
@ -22,6 +24,9 @@ bool TestRunner::Run()
Logger::Info("========================================================");
Logger::Info("Start Testing, {0} cases in total.", this->tests.size());
Logger::Info("========================================================");
std::map<std::string, bool> results;
for (auto& t : this->tests)
{
Logger::Info("");
@ -34,9 +39,29 @@ bool TestRunner::Run()
Logger::Info("");
Logger::Info("");
results[t.first] = result;
if (!result) finalResult = false;
}
Logger::Info("");
Logger::Info("");
Logger::Info("========================================================");
Logger::Info("Testing result summary");
Logger::Info("========================================================");
int passed = 0;
for (auto& t : results)
{
Logger::Info("Testing {0} : {1}", t.first, t.second ? "Passed" : "Failed");
if (t.second)
{
passed++;
}
}
Logger::Info("========================================================");
Logger::Info("All passed {0}, Pass rate {1} / {2}.", finalResult, passed, this->tests.size());
return finalResult;
}

Просмотреть файл

@ -16,6 +16,7 @@ namespace hpc
TestRunner();
bool Run();
protected:
private:
std::map<std::string, std::function<bool(void)>> tests;

Просмотреть файл

@ -37,6 +37,7 @@ namespace hpc
TOJSON(int, number)
TOJSON(double, number)
TOJSON(uint64_t, number)
TOJSON(bool, boolean)
TOJSON(std::string, string)
}

Просмотреть файл

@ -1,142 +1,152 @@
#ifndef JSONHELPER_H
#define JSONHELPER_H
#include <vector>
#include <map>
#include <cpprest/json.h>
using namespace web;
namespace hpc
{
namespace utils
{
template <typename T>
class JsonHelper
{
public:
static T Read(const std::string& name, const json::value& j)
{
json::value obj;
#ifndef JSONHELPER_H
#define JSONHELPER_H
if (j.has_field(name))
{
obj = j.at(name);
}
#include <vector>
#include <map>
return JsonHelper<T>::FromJson(obj);
}
static void Write(const std::string& name, json::value& v, const T& t)
{
v[name] = ToJson(t);
}
static T FromJson(const json::value& j);
static json::value ToJson(const T& t);
protected:
private:
};
// Partial specialize for vector.
template <typename T>
class JsonHelper<std::vector<T>>
{
public:
static std::vector<T> Read(const std::string& name, const json::value& j)
{
json::value obj;
#include <cpprest/json.h>
if (j.has_field(name))
{
obj = j.at(name);
}
using namespace web;
return JsonHelper<std::vector<T>>::FromJson(obj);
}
static std::vector<T> FromJson(const json::value& j)
{
std::vector<T> values;
if (!j.is_null())
{
auto arr = JsonHelper<const json::array&>::FromJson(j);
std::transform(
arr.cbegin(),
arr.cend(),
std::back_inserter(values),
[](const auto& i) { return JsonHelper<T>::FromJson(i); });
}
return std::move(values);
}
static json::array ToJson(const std::vector<T>& vec)
{
std::vector<json::value> values;
std::transform(
vec.cbegin(),
vec.cend(),
std::back_inserter(values),
[](const T& v) { return JsonHelper<T>::ToJson(v); });
return std::move(json::value::array(values));
}
protected:
private:
};
// Partial specialize for map.
template <typename T>
class JsonHelper<std::map<std::string, T>>
{
public:
static std::map<std::string, T> Read(const std::string& name, const json::value& j)
namespace hpc
{
namespace utils
{
template <typename T>
class JsonHelper
{
public:
static T Read(const std::string& name, const json::value& j)
{
json::value obj;
if (j.has_field(name))
{
{
obj = j.at(name);
}
return std::move(JsonHelper<std::map<std::string, T>>::FromJson(obj));
}
static std::map<std::string, T> FromJson(const json::value& j)
{
std::map<std::string, T> values;
if (!j.is_null())
{
const auto& arr = JsonHelper<const json::object&>::FromJson(j);
for (auto const i : arr)
{
values[i.first] = JsonHelper<T>::FromJson(i.second);
}
}
return std::move(values);
}
//
// static json::object ToJson(const std::map<std::string, T>& m)
// {
// return json::value::object(m);
// }
// static void Write(const std::string& name, json::value& v, const std::vector<T>& t);
protected:
private:
};
}
}
#endif // JSONHELPER_H
return JsonHelper<T>::FromJson(obj);
}
static void Write(const std::string& name, json::value& v, const T& t)
{
v[name] = ToJson(t);
}
static T FromJson(const json::value& j);
static json::value ToJson(const T& t);
protected:
private:
};
// Partial specialize for vector.
template <typename T>
class JsonHelper<std::vector<T>>
{
public:
static std::vector<T> Read(const std::string& name, const json::value& j)
{
json::value obj;
if (j.has_field(name))
{
obj = j.at(name);
}
return JsonHelper<std::vector<T>>::FromJson(obj);
}
static std::vector<T> FromJson(const json::value& j)
{
std::vector<T> values;
if (!j.is_null())
{
auto arr = JsonHelper<const json::array&>::FromJson(j);
std::transform(
arr.cbegin(),
arr.cend(),
std::back_inserter(values),
[](const auto& i) { return JsonHelper<T>::FromJson(i); });
}
return std::move(values);
}
static json::value ToJson(const std::vector<T>& vec)
{
std::vector<json::value> values;
std::transform(
vec.cbegin(),
vec.cend(),
std::back_inserter(values),
[](const T& v) { return JsonHelper<T>::ToJson(v); });
return std::move(json::value::array(values));
}
protected:
private:
};
// Partial specialize for map.
template <typename T>
class JsonHelper<std::map<std::string, T>>
{
public:
static std::map<std::string, T> Read(const std::string& name, const json::value& j)
{
json::value obj;
if (j.has_field(name))
{
obj = j.at(name);
}
return std::move(JsonHelper<std::map<std::string, T>>::FromJson(obj));
}
static std::map<std::string, T> FromJson(const json::value& j)
{
std::map<std::string, T> values;
if (!j.is_null())
{
const auto& arr = JsonHelper<const json::object&>::FromJson(j);
for (auto const i : arr)
{
values[i.first] = JsonHelper<T>::FromJson(i.second);
}
}
return std::move(values);
}
static json::value ToJson(const std::map<std::string, T>& m)
{
json::value v;
std::for_each(
m.cbegin(),
m.cend(),
[&v](const std::pair<std::string, T>& p)
{
v[p.first] = JsonHelper<T>::ToJson(p.second);
});
return std::move(v);
}
// static void Write(const std::string& name, json::value& v, const std::vector<T>& t);
protected:
private:
};
}
}
#endif // JSONHELPER_H

Просмотреть файл

@ -1,15 +1,15 @@
#include "Logger.h"
using namespace hpc::utils;
Logger::Logger()
{
loggers.push_back(spdlog::stdout_logger_mt("console"));
loggers.push_back(spdlog::rotating_logger_mt("nodemanager", "logs/nodemanager", 1048576 * 5, 100));
spdlog::set_level(spdlog::level::debug);
spdlog::set_pattern("[%m/%d %T.%e] %t %l: %v");
}
#include "Logger.h"
using namespace hpc::utils;
Logger::Logger()
{
loggers.push_back(spdlog::stdout_logger_mt("console"));
loggers.push_back(spdlog::rotating_logger_mt("nodemanager", "logs/nodemanager", 1048576 * 5, 100));
spdlog::set_level(spdlog::level::debug);
spdlog::set_pattern("[%m/%d %T.%e] %t %l: %v");
}
Logger Logger::instance;

Просмотреть файл

@ -1,134 +1,134 @@
#ifndef LOGGER_H
#define LOGGER_H
#include <syslog.h>
#include <stdio.h>
#include <stdarg.h>
#include <iostream>
#ifndef LOGGER_H
#define LOGGER_H
#include <syslog.h>
#include <stdio.h>
#include <stdarg.h>
#include <iostream>
#include <spdlog/spdlog.h>
#include "String.h"
namespace hpc
{
namespace utils
{
enum class LogLevel
{
Emergency = 0, /* system is unusable */
Alert = 1, /* action must be taken immediately */
Critical = 2, /* critical conditions */
Error = 3, /* error conditions */
Warning = 4, /* warning conditions */
Notice = 5, /* normal but significant condition */
Info = 6, /* informational */
Debug = 7 /* debug-level messages */
};
class Logger
{
public:
template <typename ...Args>
static void Info(const char* fmt, Args ...args)
{
Log(LogLevel::Info, fmt, args...);
}
template <typename ...Args>
static void Error(const char* fmt, Args ...args)
{
Log(LogLevel::Error, fmt, args...);
}
template <typename ...Args>
static void Warn(const char* fmt, Args ...args)
{
Log(LogLevel::Warning, fmt, args...);
}
template <typename ...Args>
static void Debug(const char* fmt, Args ...args)
{
Log(LogLevel::Debug, fmt, args...);
}
template <typename ...Args>
static void Info(int jobId, int taskId, int requeue, const char* fmt, Args ...args)
#include "String.h"
namespace hpc
{
namespace utils
{
enum class LogLevel
{
Emergency = 0, /* system is unusable */
Alert = 1, /* action must be taken immediately */
Critical = 2, /* critical conditions */
Error = 3, /* error conditions */
Warning = 4, /* warning conditions */
Notice = 5, /* normal but significant condition */
Info = 6, /* informational */
Debug = 7 /* debug-level messages */
};
class Logger
{
public:
template <typename ...Args>
static void Info(const char* fmt, Args ...args)
{
auto f = String::Join("", "Job ", jobId, ", Task ", taskId, ".", requeue, ": ", fmt);
Log(LogLevel::Info, f.c_str(), args...);
}
template <typename ...Args>
static void Error(int jobId, int taskId, int requeue, const char* fmt, Args ...args)
{
auto f = String::Join("", "Job ", jobId, ", Task ", taskId, ".", requeue, ": ", fmt);
Log(LogLevel::Error, f.c_str(), args...);
}
template <typename ...Args>
static void Warn(int jobId, int taskId, int requeue, const char* fmt, Args ...args)
{
auto f = String::Join("", "Job ", jobId, ", Task ", taskId, ".", requeue, ": ", fmt);
Log(LogLevel::Warning, f.c_str(), args...);
}
template <typename ...Args>
static void Debug(int jobId, int taskId, int requeue, const char* fmt, Args ...args)
{
auto f = String::Join("", "Job ", jobId, ", Task ", taskId, ".", requeue, ": ", fmt);
Log(LogLevel::Debug, f.c_str(), args...);
}
template <typename ...Args>
static void Log(LogLevel level, const char* fmt, Args ...args)
{
for (auto logger : instance.loggers)
{
switch (level)
{
case LogLevel::Emergency:
logger->emerg(fmt, args...);
break;
case LogLevel::Alert:
logger->alert(fmt, args...);
break;
case LogLevel::Critical:
logger->critical(fmt, args...);
break;
case LogLevel::Error:
logger->error(fmt, args...);
break;
case LogLevel::Warning:
logger->warn(fmt, args...);
break;
case LogLevel::Notice:
logger->notice(fmt, args...);
break;
case LogLevel::Info:
logger->info(fmt, args...);
break;
case LogLevel::Debug:
logger->debug(fmt, args...);
break;
default:
logger->trace(fmt, args...);
Log(LogLevel::Info, fmt, args...);
}
template <typename ...Args>
static void Error(const char* fmt, Args ...args)
{
Log(LogLevel::Error, fmt, args...);
}
template <typename ...Args>
static void Warn(const char* fmt, Args ...args)
{
Log(LogLevel::Warning, fmt, args...);
}
template <typename ...Args>
static void Debug(const char* fmt, Args ...args)
{
Log(LogLevel::Debug, fmt, args...);
}
template <typename ...Args>
static void Info(int jobId, int taskId, int requeue, const char* fmt, Args ...args)
{
auto f = String::Join("", "Job ", jobId, ", Task ", taskId, ".", requeue, ": ", fmt);
Log(LogLevel::Info, f.c_str(), args...);
}
template <typename ...Args>
static void Error(int jobId, int taskId, int requeue, const char* fmt, Args ...args)
{
auto f = String::Join("", "Job ", jobId, ", Task ", taskId, ".", requeue, ": ", fmt);
Log(LogLevel::Error, f.c_str(), args...);
}
template <typename ...Args>
static void Warn(int jobId, int taskId, int requeue, const char* fmt, Args ...args)
{
auto f = String::Join("", "Job ", jobId, ", Task ", taskId, ".", requeue, ": ", fmt);
Log(LogLevel::Warning, f.c_str(), args...);
}
template <typename ...Args>
static void Debug(int jobId, int taskId, int requeue, const char* fmt, Args ...args)
{
auto f = String::Join("", "Job ", jobId, ", Task ", taskId, ".", requeue, ": ", fmt);
Log(LogLevel::Debug, f.c_str(), args...);
}
template <typename ...Args>
static void Log(LogLevel level, const char* fmt, Args ...args)
{
for (auto logger : instance.loggers)
{
switch (level)
{
case LogLevel::Emergency:
logger->emerg(fmt, args...);
break;
case LogLevel::Alert:
logger->alert(fmt, args...);
break;
case LogLevel::Critical:
logger->critical(fmt, args...);
break;
case LogLevel::Error:
logger->error(fmt, args...);
break;
case LogLevel::Warning:
logger->warn(fmt, args...);
break;
case LogLevel::Notice:
logger->notice(fmt, args...);
break;
case LogLevel::Info:
logger->info(fmt, args...);
break;
case LogLevel::Debug:
logger->debug(fmt, args...);
break;
default:
logger->trace(fmt, args...);
}
if (level <= LogLevel::Warning)
{
logger->flush();
}
}
}
private:
Logger();
static Logger instance;
std::vector<std::shared_ptr<spdlog::logger>> loggers;
};
}
}
#endif // LOGGER_H
}
}
}
private:
Logger();
static Logger instance;
std::vector<std::shared_ptr<spdlog::logger>> loggers;
};
}
}
#endif // LOGGER_H

Просмотреть файл

@ -1,37 +1,37 @@
#ifndef STRING_H
#define STRING_H
#include <string>
#include <vector>
#ifndef STRING_H
#define STRING_H
#include <string>
#include <vector>
#include <sstream>
#include <algorithm>
namespace hpc
{
namespace utils
{
class String
{
public:
static std::vector<std::string> Split(const std::string& str, char delim);
template <typename T, typename ... Args>
static std::string Join(const T& delim, const Args& ...args)
{
std::ostringstream oss;
bool first = true;
#include <algorithm>
namespace hpc
{
namespace utils
{
class String
{
public:
static std::vector<std::string> Split(const std::string& str, char delim);
template <typename T, typename ... Args>
static std::string Join(const T& delim, const Args& ...args)
{
std::ostringstream oss;
bool first = true;
auto tmp = { ((first ? oss : oss << delim) << args, first = false)... };
[&tmp]() { };
return std::move(oss.str());
[&tmp]() { };
return std::move(oss.str());
}
template <char delim, typename T>
static std::string Join(const std::vector<T>& values)
{
std::ostringstream oss;
bool first = true;
template <char delim, typename T>
static std::string Join(const std::vector<T>& values)
{
std::ostringstream oss;
bool first = true;
for (const auto& v : values)
{
@ -39,7 +39,7 @@ namespace hpc
first = false;
}
return std::move(oss.str());
return std::move(oss.str());
}
static inline std::string Trim(const std::string& s)
@ -64,11 +64,11 @@ namespace hpc
return std::move(userName);
}
protected:
protected:
private:
};
}
}
#endif // STRING_H
};
}
}
#endif // STRING_H

Просмотреть файл

@ -542,3 +542,38 @@ int System::DeleteUser(const std::string& userName)
return ret;
}
int System::CreateTempFolder(char* folderTemplate, const std::string& userName)
{
char* p = mkdtemp(folderTemplate);
if (p)
{
std::string output;
int ret = System::ExecuteCommandOut(output, "chown -R", userName, p);
if (ret == 0)
{
ret = System::ExecuteCommandOut(output, "chmod -R u+rwX", p);
}
return ret;
}
else
{
return errno;
}
}
int System::WriteStringToFile(const std::string& fileName, const std::string& contents)
{
std::ofstream os(fileName, std::ios::trunc);
if (!os)
{
return (int)ErrorCodes::WriteFileError;
}
os << contents;
os.close();
return 0;
}

Просмотреть файл

@ -1,40 +1,40 @@
#ifndef SYSTEM_H
#define SYSTEM_H
#include <string>
#include "String.h"
#ifndef SYSTEM_H
#define SYSTEM_H
#include <string>
#include "String.h"
#include "Logger.h"
#include "../common/ErrorCodes.h"
namespace hpc
{
namespace utils
{
enum class IpAddressVersion
{
V4 = 4,
V6 = 6
};
struct System
{
public:
typedef std::tuple<std::string, std::string, std::string, std::string, bool> NetInfo;
static std::vector<NetInfo> GetNetworkInfo();
static std::string GetIpAddress(IpAddressVersion version, const std::string& name);
static void CPUUsage(uint64_t &total, uint64_t &idle);
static void Memory(uint64_t &available, uint64_t &total);
static void CPU(int &cores, int &sockets);
static int NetworkUsage(uint64_t &network, const std::string& netName);
#include "../common/ErrorCodes.h"
namespace hpc
{
namespace utils
{
enum class IpAddressVersion
{
V4 = 4,
V6 = 6
};
struct System
{
public:
typedef std::tuple<std::string, std::string, std::string, std::string, bool> NetInfo;
static std::vector<NetInfo> GetNetworkInfo();
static std::string GetIpAddress(IpAddressVersion version, const std::string& name);
static void CPUUsage(uint64_t &total, uint64_t &idle);
static void Memory(uint64_t &available, uint64_t &total);
static void CPU(int &cores, int &sockets);
static int NetworkUsage(uint64_t &network, const std::string& netName);
static int Vmstat(float &pagesPerSec, float &contextSwitchesPerSec);
static int Iostat(float &bytesPerSec);
static int IostatX(float &queueLength);
static int FreeSpace(float &freeSpaceKB);
static const std::string& GetNodeName();
static bool IsCGroupInstalled();
static const std::string& GetNodeName();
static bool IsCGroupInstalled();
static const std::string& GetDistroInfo();
static int CreateUser(
@ -58,76 +58,78 @@ namespace hpc
const std::string& userName,
const std::string& key);
static int DeleteUser(const std::string& userName);
template <typename ... Args>
static int ExecuteCommandIn(const std::string& input, const std::string& cmd, const Args& ... args)
static int DeleteUser(const std::string& userName);
static int CreateTempFolder(char* folderTemplate, const std::string& userName);
static int WriteStringToFile(const std::string& fileName, const std::string& contents);
template <typename ... Args>
static int ExecuteCommandIn(const std::string& input, const std::string& cmd, const Args& ... args)
{
std::string command = String::Join(" ", cmd, args...);
//Logger::Debug("Executing cmd: {0}", command);
FILE* stream = popen(command.c_str(), "w");
int exitCode = (int)hpc::common::ErrorCodes::PopenError;
if (stream)
std::string command = String::Join(" ", cmd, args...);
//Logger::Debug("Executing cmd: {0}", command);
FILE* stream = popen(command.c_str(), "w");
int exitCode = (int)hpc::common::ErrorCodes::PopenError;
if (stream)
{
if (!input.empty())
{
fputs(input.c_str(), stream);
}
int ret = pclose(stream);
exitCode = WEXITSTATUS(ret);
}
else
int ret = pclose(stream);
exitCode = WEXITSTATUS(ret);
}
else
{
Logger::Error("Error when popen {0}", command);
}
Logger::Error("Error when popen {0}", command);
}
return exitCode;
}
template <typename ... Args>
static int ExecuteCommandOut(std::string& output, const std::string& cmd, const Args& ... args)
{
std::string command = String::Join(" ", cmd, args...);
//Logger::Debug("Executing cmd: {0}", command);
int exitCode = (int)hpc::common::ErrorCodes::PopenError;
std::ostringstream result;
FILE* stream = popen(command.c_str(), "r");
if (stream)
{
char buffer[512];
while (fgets(buffer, sizeof(buffer), stream) != nullptr)
{
result << buffer;
}
int ret = pclose(stream);
template <typename ... Args>
static int ExecuteCommandOut(std::string& output, const std::string& cmd, const Args& ... args)
{
std::string command = String::Join(" ", cmd, args...);
//Logger::Debug("Executing cmd: {0}", command);
int exitCode = (int)hpc::common::ErrorCodes::PopenError;
std::ostringstream result;
FILE* stream = popen(command.c_str(), "r");
if (stream)
{
char buffer[512];
while (fgets(buffer, sizeof(buffer), stream) != nullptr)
{
result << buffer;
}
int ret = pclose(stream);
exitCode = WEXITSTATUS(ret);
}
else
}
else
{
int err = errno;
Logger::Error("Error when popen {0}, errno {1}", command, err);
result << "error when popen " << command << ", errno " << err << std::endl;
}
output = result.str();
Logger::Error("Error when popen {0}, errno {1}", command, err);
result << "error when popen " << command << ", errno " << err << std::endl;
}
output = result.str();
if (exitCode != 0)
{
Logger::Warn("Executing {0}, error code {1}", command, exitCode);
}
return exitCode;
}
protected:
private:
};
}
}
#endif // SYSTEM_H
return exitCode;
}
protected:
private:
};
}
}
#endif // SYSTEM_H