diff --git a/nodemanager/NodeManager.cbp b/nodemanager/NodeManager.cbp index 4dff363..fb07434 100644 --- a/nodemanager/NodeManager.cbp +++ b/nodemanager/NodeManager.cbp @@ -45,7 +45,7 @@ - + @@ -111,6 +111,8 @@ + + @@ -122,6 +124,8 @@ + + diff --git a/nodemanager/NodeManager.depend b/nodemanager/NodeManager.depend index 72c9088..5cebec5 100644 --- a/nodemanager/NodeManager.depend +++ b/nodemanager/NodeManager.depend @@ -251,25 +251,26 @@ "String.h" -1427353132 /home/evanc/spdlog/include/spdlog/spdlog.h +1440745033 /home/evanc/spdlog/include/spdlog/spdlog.h + "tweakme.h" "common.h" "logger.h" "details/spdlog_impl.h" -1427353132 /home/evanc/spdlog/include/spdlog/common.h +1440745033 /home/evanc/spdlog/include/spdlog/common.h -1427353132 /home/evanc/spdlog/include/spdlog/logger.h +1440745033 /home/evanc/spdlog/include/spdlog/logger.h "sinks/base_sink.h" "common.h" "./details/logger_impl.h" -1427353132 /home/evanc/spdlog/include/spdlog/sinks/base_sink.h +1440745033 /home/evanc/spdlog/include/spdlog/sinks/base_sink.h @@ -278,14 +279,15 @@ "../common.h" "../details/log_msg.h" -1427353132 /home/evanc/spdlog/include/spdlog/sinks/sink.h +1440745033 /home/evanc/spdlog/include/spdlog/sinks/sink.h "../details/log_msg.h" -1427353132 /home/evanc/spdlog/include/spdlog/details/log_msg.h +1440745033 /home/evanc/spdlog/include/spdlog/details/log_msg.h + "../common.h" "./format.h" -1427353132 /home/evanc/spdlog/include/spdlog/details/format.h +1440745033 /home/evanc/spdlog/include/spdlog/details/format.h @@ -296,12 +298,13 @@ + "format.cc" -1427353132 /home/evanc/spdlog/include/spdlog/details/format.cc +1440745033 /home/evanc/spdlog/include/spdlog/details/format.cc "format.h" @@ -311,12 +314,13 @@ + 1427353132 /home/evanc/spdlog/include/spdlog/formatter.h "details/log_msg.h" "details/pattern_formatter_impl.h" -1427353132 /home/evanc/spdlog/include/spdlog/details/pattern_formatter_impl.h +1440745033 /home/evanc/spdlog/include/spdlog/details/pattern_formatter_impl.h @@ -326,37 +330,42 @@ "./log_msg.h" "./os.h" -1427353132 /home/evanc/spdlog/include/spdlog/details/os.h +1440745033 /home/evanc/spdlog/include/spdlog/details/os.h + + + + "../common.h" -1427353132 /home/evanc/spdlog/include/spdlog/details/logger_impl.h +1440745033 /home/evanc/spdlog/include/spdlog/details/logger_impl.h "./line_logger.h" -1427353132 /home/evanc/spdlog/include/spdlog/details/line_logger.h +1440745033 /home/evanc/spdlog/include/spdlog/details/line_logger.h "../common.h" "../logger.h" -1427353132 /home/evanc/spdlog/include/spdlog/details/spdlog_impl.h +1440745033 /home/evanc/spdlog/include/spdlog/details/spdlog_impl.h "registry.h" "../sinks/file_sinks.h" "../sinks/stdout_sinks.h" "../sinks/syslog_sink.h" -1427353132 /home/evanc/spdlog/include/spdlog/details/registry.h +1440745033 /home/evanc/spdlog/include/spdlog/details/registry.h + "./null_mutex.h" "../logger.h" "../async_logger.h" "../common.h" -1427353132 /home/evanc/spdlog/include/spdlog/async_logger.h +1440745033 /home/evanc/spdlog/include/spdlog/async_logger.h "common.h" @@ -364,10 +373,10 @@ "spdlog.h" "./details/async_logger_impl.h" -1427353132 /home/evanc/spdlog/include/spdlog/details/async_logger_impl.h +1440745033 /home/evanc/spdlog/include/spdlog/details/async_logger_impl.h "./async_log_helper.h" -1427353132 /home/evanc/spdlog/include/spdlog/details/async_log_helper.h +1440745033 /home/evanc/spdlog/include/spdlog/details/async_log_helper.h @@ -377,12 +386,13 @@ "./mpmc_bounded_q.h" "./log_msg.h" "./format.h" + "os.h" 1427353132 /home/evanc/spdlog/include/spdlog/details/mpmc_bounded_q.h "../common.h" -1427353132 /home/evanc/spdlog/include/spdlog/sinks/file_sinks.h +1440745033 /home/evanc/spdlog/include/spdlog/sinks/file_sinks.h "base_sink.h" "../details/null_mutex.h" @@ -391,26 +401,26 @@ 1427353132 /home/evanc/spdlog/include/spdlog/details/null_mutex.h -1427353132 /home/evanc/spdlog/include/spdlog/details/file_helper.h +1440745033 /home/evanc/spdlog/include/spdlog/details/file_helper.h "os.h" -1427353132 /home/evanc/spdlog/include/spdlog/sinks/stdout_sinks.h +1440745033 /home/evanc/spdlog/include/spdlog/sinks/stdout_sinks.h "./ostream_sink.h" "../details/null_mutex.h" -1427353132 /home/evanc/spdlog/include/spdlog/sinks/ostream_sink.h +1440745033 /home/evanc/spdlog/include/spdlog/sinks/ostream_sink.h "../details/null_mutex.h" "./base_sink.h" -1427353132 /home/evanc/spdlog/include/spdlog/sinks/syslog_sink.h +1440745033 /home/evanc/spdlog/include/spdlog/sinks/syslog_sink.h @@ -427,7 +437,7 @@ "core/RemoteExecutor.h" "Version.h" -1436264986 /home/evanc/whpc-linux-communicator/nodemanager/utils/Logger.h +1447837233 /home/evanc/whpc-linux-communicator/nodemanager/utils/Logger.h @@ -435,24 +445,31 @@ "String.h" -1427722564 /home/evanc/whpc-linux-communicator/nodemanager/core/RemoteCommunicator.h +1464874241 /home/evanc/whpc-linux-communicator/nodemanager/core/RemoteCommunicator.h "../utils/Logger.h" + "../filters/ExecutionFilter.h" "IRemoteExecutor.h" -1427722694 /home/evanc/whpc-linux-communicator/nodemanager/core/IRemoteExecutor.h +1437990898 /home/evanc/whpc-linux-communicator/nodemanager/core/IRemoteExecutor.h "../arguments/StartJobAndTaskArgs.h" "../arguments/StartTaskArgs.h" "../arguments/EndJobArgs.h" "../arguments/EndTaskArgs.h" + "../arguments/MetricCountersConfig.h" -1428574742 /home/evanc/whpc-linux-communicator/nodemanager/core/RemoteExecutor.h +1464733277 /home/evanc/whpc-linux-communicator/nodemanager/core/RemoteExecutor.h + + "IRemoteExecutor.h" "JobTaskTable.h" "Monitor.h" "Process.h" "Reporter.h" + "HostsManager.h" + "../arguments/MetricCountersConfig.h" + "../data/ProcessStatistics.h" 1427353166 source:/home/evanc/whpc-linux-communicator/nodemanager/arguments/EndJobArgs.cpp "EndJobArgs.h" @@ -461,7 +478,7 @@ 1427353166 /home/evanc/whpc-linux-communicator/nodemanager/arguments/EndJobArgs.h -1437473138 /home/evanc/whpc-linux-communicator/nodemanager/utils/JsonHelper.h +1464941459 /home/evanc/whpc-linux-communicator/nodemanager/utils/JsonHelper.h @@ -470,14 +487,14 @@ "EndTaskArgs.h" "../utils/JsonHelper.h" -1427353166 /home/evanc/whpc-linux-communicator/nodemanager/arguments/EndTaskArgs.h +1433133379 /home/evanc/whpc-linux-communicator/nodemanager/arguments/EndTaskArgs.h -1429783386 source:/home/evanc/whpc-linux-communicator/nodemanager/arguments/ProcessStartInfo.cpp +1464897409 source:/home/evanc/whpc-linux-communicator/nodemanager/arguments/ProcessStartInfo.cpp "ProcessStartInfo.h" "../utils/JsonHelper.h" -1429783378 /home/evanc/whpc-linux-communicator/nodemanager/arguments/ProcessStartInfo.h +1464897165 /home/evanc/whpc-linux-communicator/nodemanager/arguments/ProcessStartInfo.h @@ -487,7 +504,7 @@ "StartJobAndTaskArgs.h" "../utils/JsonHelper.h" -1427383873 /home/evanc/whpc-linux-communicator/nodemanager/arguments/StartJobAndTaskArgs.h +1429010700 /home/evanc/whpc-linux-communicator/nodemanager/arguments/StartJobAndTaskArgs.h "ProcessStartInfo.h" @@ -509,7 +526,7 @@ "../utils/String.h" "../common/ErrorCodes.h" -1428574604 /home/evanc/whpc-linux-communicator/nodemanager/core/Process.h +1464871942 /home/evanc/whpc-linux-communicator/nodemanager/core/Process.h @@ -524,6 +541,7 @@ "../utils/Logger.h" "../utils/System.h" "../common/ErrorCodes.h" + "../data/ProcessStatistics.h" 1429015468 /home/evanc/whpc-linux-communicator/nodemanager/utils/String.h @@ -538,7 +556,7 @@ "../utils/System.h" "../arguments/StartJobAndTaskArgs.h" -1428574577 /home/evanc/whpc-linux-communicator/nodemanager/utils/System.h +1464869302 /home/evanc/whpc-linux-communicator/nodemanager/utils/System.h "String.h" "Logger.h" @@ -584,7 +602,7 @@ "../utils/WriterLock.h" "../utils/ReaderLock.h" -1438324701 /home/evanc/whpc-linux-communicator/nodemanager/core/JobTaskTable.h +1438327680 /home/evanc/whpc-linux-communicator/nodemanager/core/JobTaskTable.h "../data/TaskInfo.h" @@ -592,7 +610,7 @@ "../data/NodeInfo.h" "../utils/ReaderLock.h" -1436256679 /home/evanc/whpc-linux-communicator/nodemanager/data/TaskInfo.h +1438325484 /home/evanc/whpc-linux-communicator/nodemanager/data/TaskInfo.h @@ -662,10 +680,15 @@ 1424929892 source:/home/evanc/whpc-linux-communicator/nodemanager/utils/WriterLock.cpp "WriterLock.h" -1427383873 /home/evanc/whpc-linux-communicator/nodemanager/core/Monitor.h +1438161680 /home/evanc/whpc-linux-communicator/nodemanager/core/Monitor.h + "../utils/System.h" + "../data/MonitoringPacket.h" + "../arguments/MetricCounter.h" + "../arguments/MetricCountersConfig.h" + "MetricCollectorBase.h" 1428571893 source:/home/evanc/whpc-linux-communicator/nodemanager/core/Monitor.cpp @@ -675,7 +698,7 @@ "../utils/Logger.h" "../utils/System.h" -1430202795 /home/evanc/whpc-linux-communicator/nodemanager/core/Reporter.h +1446103482 /home/evanc/whpc-linux-communicator/nodemanager/core/Reporter.h "../utils/Logger.h" @@ -685,7 +708,7 @@ "Reporter.h" "../utils/Logger.h" -1437474755 /home/evanc/whpc-linux-communicator/nodemanager/common/ErrorCodes.h +1464870624 /home/evanc/whpc-linux-communicator/nodemanager/common/ErrorCodes.h 1429084854 source:/home/evanc/whpc-linux-communicator/nodemanager/Version.cpp "Version.h" @@ -720,7 +743,7 @@ 1437547414 source:/home/evanc/whpc-linux-communicator/nodemanager/core/NodeManagerConfig.cpp "NodeManagerConfig.h" -1440058578 /home/evanc/whpc-linux-communicator/nodemanager/core/NodeManagerConfig.h +1464734870 /home/evanc/whpc-linux-communicator/nodemanager/core/NodeManagerConfig.h "../utils/Configuration.h" 1437732272 source:/home/evanc/whpc-linux-communicator/nodemanager/arguments/MonitoringConfig.cpp @@ -745,7 +768,7 @@ 1437645025 source:/home/evanc/whpc-linux-communicator/nodemanager/core/HttpHelper.cpp "HttpHelper.h" -1440058706 /home/evanc/whpc-linux-communicator/nodemanager/core/HttpHelper.h +1460020276 /home/evanc/whpc-linux-communicator/nodemanager/core/HttpHelper.h "NodeManagerConfig.h" @@ -762,7 +785,7 @@ 1438238815 /home/evanc/whpc-linux-communicator/nodemanager/data/Umid.h -1433133379 /home/evanc/whpc-linux-communicator/nodemanager/data/ProcessStatistics.h +1447837294 /home/evanc/whpc-linux-communicator/nodemanager/data/ProcessStatistics.h @@ -961,3 +984,259 @@ "cpprest/http_msg.h" "cpprest/details/http_constants.dat" +1464885862 source:/home/evanc/whpc-linux-communicator/nodemanager/test/ExecutionFilterTest.cpp + "ExecutionFilterTest.h" + + + + + "../utils/JsonHelper.h" + "../core/Process.h" + "../utils/Logger.h" + "../core/RemoteCommunicator.h" + "../core/RemoteExecutor.h" + "../core/NodeManagerConfig.h" + "../common/ErrorCodes.h" + "../core/HttpHelper.h" + +1464883806 /home/evanc/whpc-linux-communicator/nodemanager/test/ExecutionFilterTest.h + +1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/http_listener.h + + + "cpprest/http_msg.h" + + +1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/http_msg.h + + + + + + "pplx/pplxtasks.h" + "cpprest/json.h" + "cpprest/uri.h" + "cpprest/http_headers.h" + "cpprest/details/cpprest_compat.h" + "cpprest/asyncrt_utils.h" + "cpprest/streams.h" + "cpprest/containerstream.h" + "cpprest/details/http_constants.dat" + "cpprest/details/http_constants.dat" + "cpprest/details/http_constants.dat" + +1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/pplx/pplxtasks.h + + + "pplx/pplx.h" + + + + + + + + + + + + + +1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/pplx/pplx.h + "cpprest/details/cpprest_compat.h" + "pplx/pplxwin.h" + "pplx/pplxlinux.h" + "pplx/pplxlinux.h" + "pplx/pplxcancellation_token.h" + + +1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/details/cpprest_compat.h + + + "cpprest/details/nosal.h" + + +1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/details/nosal.h + +1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/pplx/pplxwin.h + "cpprest/details/cpprest_compat.h" + "pplx/pplxinterface.h" + +1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/pplx/pplxinterface.h + + + +1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/pplx/pplxlinux.h + + "pthread.h" + "cpprest/details/cpprest_compat.h" + + + + + + "pplx/pplxinterface.h" + +1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/pplx/pplxcancellation_token.h + + + "pplx/pplxinterface.h" + +1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/json.h + + + + + + + "cpprest/details/basic_types.h" + "cpprest/asyncrt_utils.h" + +1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/details/basic_types.h + + + + + "cpprest/details/cpprest_compat.h" + + + "cpprest/details/SafeInt3.hpp" + +1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/details/SafeInt3.hpp + + + + + + + +1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/asyncrt_utils.h + + + + + + + "pplx/pplxtasks.h" + "cpprest/details/basic_types.h" + + + + +1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/uri.h + "cpprest/base_uri.h" + "cpprest/uri_builder.h" + +1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/base_uri.h + + + + + + "cpprest/asyncrt_utils.h" + "cpprest/details/basic_types.h" + +1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/uri_builder.h + + + + "cpprest/base_uri.h" + "cpprest/details/uri_parser.h" + +1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/details/uri_parser.h + + +1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/http_headers.h + + + + + + "cpprest/asyncrt_utils.h" + +1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/streams.h + "cpprest/astreambuf.h" + + +1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/astreambuf.h + + + + + "pplx/pplxtasks.h" + "cpprest/details/basic_types.h" + "cpprest/asyncrt_utils.h" + +1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/containerstream.h + + + + + "pplx/pplxtasks.h" + "cpprest/astreambuf.h" + "cpprest/streams.h" + +1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/details/http_constants.dat + +1440745033 /home/evanc/spdlog/include/spdlog/tweakme.h + +1464874650 /home/evanc/whpc-linux-communicator/nodemanager/filters/ExecutionFilter.h + + + "../core/NodeManagerConfig.h" + +1437983925 /home/evanc/whpc-linux-communicator/nodemanager/arguments/MetricCountersConfig.h + "MetricCounter.h" + "../utils/JsonHelper.h" + +1430401912 /home/evanc/whpc-linux-communicator/nodemanager/data/MonitoringPacket.h + + "../utils/Logger.h" + "Umid.h" + +1453226140 /home/evanc/whpc-linux-communicator/nodemanager/core/HostsManager.h + + + "HttpFetcher.h" + "../data/HostEntry.h" + +1447837236 /home/evanc/whpc-linux-communicator/nodemanager/core/HttpFetcher.h + + + + "Reporter.h" + +1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/http_client.h + + + + + "pplx/pplxtasks.h" + "cpprest/http_msg.h" + "cpprest/json.h" + "cpprest/uri.h" + "cpprest/details/web_utilities.h" + "cpprest/details/basic_types.h" + "cpprest/asyncrt_utils.h" + "cpprest/oauth1.h" + "cpprest/oauth2.h" + "boost/asio/ssl.hpp" + +1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/details/web_utilities.h + "cpprest/asyncrt_utils.h" + +1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/oauth1.h + "cpprest/http_msg.h" + "cpprest/details/web_utilities.h" + "cpprest/details/http_constants.dat" + "cpprest/details/http_constants.dat" + +1460016995 /home/evanc/evancasa/cpprestsdk/Release/include/cpprest/oauth2.h + "cpprest/http_msg.h" + "cpprest/details/web_utilities.h" + "cpprest/details/http_constants.dat" + +1447837236 /home/evanc/whpc-linux-communicator/nodemanager/data/HostEntry.h + + + diff --git a/nodemanager/Readme.txt b/nodemanager/Readme.txt index 146ab3e..bd0d008 100644 --- a/nodemanager/Readme.txt +++ b/nodemanager/Readme.txt @@ -1,18 +1,18 @@ -== Conding Convention == - -Namespaces should be rooted from "hpc", and have at most 2 layers, which means, you can only -define one more layer under "hpc". - -Files which contain contents under a sub namespace, should be put into a sub folder with the -same name as the sub namespace. - -One file should contain only one class. - -Private fields, variables should be named using camel convention, while class/struct/methods -should be named using Pascal convention. - -File names should be in Pascal convention exception main.cpp, while folder names should be -in lower case. +== Conding Convention == + +Namespaces should be rooted from "hpc", and have at most 2 layers, which means, you can only +define one more layer under "hpc". + +Files which contain contents under a sub namespace, should be put into a sub folder with the +same name as the sub namespace. + +One file should contain only one class. + +Private fields, variables should be named using camel convention, while class/struct/methods +should be named using Pascal convention. + +File names should be in Pascal convention exception main.cpp, while folder names should be +in lower case. Namespace description: arguments: All data structures passed from head directly. diff --git a/nodemanager/Version.h b/nodemanager/Version.h index 7a2c08e..61bfd9f 100644 --- a/nodemanager/Version.h +++ b/nodemanager/Version.h @@ -456,6 +456,14 @@ namespace hpc "Fix a memory leak issue in the cpprestsdk", } }, + { "1.7.1.0", + { + "Added the execution filter support", + "Added unit test for execution filter", + "Improved the unit test framework", + "Fixed some problems in json format processing", + } + }, }; return versionHistory; diff --git a/nodemanager/arguments/EndJobArgs.cpp b/nodemanager/arguments/EndJobArgs.cpp index ab6c8c0..d4fdc2d 100644 --- a/nodemanager/arguments/EndJobArgs.cpp +++ b/nodemanager/arguments/EndJobArgs.cpp @@ -1,17 +1,17 @@ -#include "EndJobArgs.h" -#include "../utils/JsonHelper.h" - -using namespace hpc::arguments; -using namespace hpc::utils; - -EndJobArgs::EndJobArgs(int jobId) : JobId(jobId) -{ - //ctor -} - -EndJobArgs EndJobArgs::FromJson(const json::value& j) -{ - EndJobArgs args(JsonHelper::Read("JobId", j)); - - return std::move(args); -} +#include "EndJobArgs.h" +#include "../utils/JsonHelper.h" + +using namespace hpc::arguments; +using namespace hpc::utils; + +EndJobArgs::EndJobArgs(int jobId) : JobId(jobId) +{ + //ctor +} + +EndJobArgs EndJobArgs::FromJson(const json::value& j) +{ + EndJobArgs args(JsonHelper::Read("JobId", j)); + + return std::move(args); +} diff --git a/nodemanager/arguments/EndJobArgs.h b/nodemanager/arguments/EndJobArgs.h index 1c20752..fbf6bd2 100644 --- a/nodemanager/arguments/EndJobArgs.h +++ b/nodemanager/arguments/EndJobArgs.h @@ -1,25 +1,25 @@ -#ifndef ENDJOBARGS_H -#define ENDJOBARGS_H - -#include - -namespace hpc -{ - namespace arguments - { - struct EndJobArgs - { - public: - EndJobArgs(int jobId); - - int JobId; - - static EndJobArgs FromJson(const web::json::value& jsonValue); - - protected: - private: - }; - } -} - -#endif // ENDJOBARGS_H +#ifndef ENDJOBARGS_H +#define ENDJOBARGS_H + +#include + +namespace hpc +{ + namespace arguments + { + struct EndJobArgs + { + public: + EndJobArgs(int jobId); + + int JobId; + + static EndJobArgs FromJson(const web::json::value& jsonValue); + + protected: + private: + }; + } +} + +#endif // ENDJOBARGS_H diff --git a/nodemanager/arguments/EndTaskArgs.cpp b/nodemanager/arguments/EndTaskArgs.cpp index 1ef37d2..8c399a2 100644 --- a/nodemanager/arguments/EndTaskArgs.cpp +++ b/nodemanager/arguments/EndTaskArgs.cpp @@ -1,21 +1,21 @@ -#include "EndTaskArgs.h" -#include "../utils/JsonHelper.h" - -using namespace hpc::arguments; -using namespace hpc::utils; - +#include "EndTaskArgs.h" +#include "../utils/JsonHelper.h" + +using namespace hpc::arguments; +using namespace hpc::utils; + EndTaskArgs::EndTaskArgs(int jobId, int taskId, int gracePeriodSeconds) - : JobId(jobId), TaskId(taskId), TaskCancelGracePeriodSeconds(gracePeriodSeconds) -{ - //ctor -} - -EndTaskArgs EndTaskArgs::FromJson(const json::value& j) -{ - EndTaskArgs args( - JsonHelper::Read("JobId", j), - JsonHelper::Read("TaskId", j), + : JobId(jobId), TaskId(taskId), TaskCancelGracePeriodSeconds(gracePeriodSeconds) +{ + //ctor +} + +EndTaskArgs EndTaskArgs::FromJson(const json::value& j) +{ + EndTaskArgs args( + JsonHelper::Read("JobId", j), + JsonHelper::Read("TaskId", j), JsonHelper::Read("TaskCancelGracePeriod", j)); - - return std::move(args); -} + + return std::move(args); +} diff --git a/nodemanager/arguments/EndTaskArgs.h b/nodemanager/arguments/EndTaskArgs.h index 7492221..add7101 100644 --- a/nodemanager/arguments/EndTaskArgs.h +++ b/nodemanager/arguments/EndTaskArgs.h @@ -1,27 +1,27 @@ -#ifndef ENDTASKARGS_H -#define ENDTASKARGS_H - -#include - -namespace hpc -{ - namespace arguments - { - struct EndTaskArgs - { - public: +#ifndef ENDTASKARGS_H +#define ENDTASKARGS_H + +#include + +namespace hpc +{ + namespace arguments + { + struct EndTaskArgs + { + public: EndTaskArgs(int jobId, int taskId, int gracePeriodSeconds); - - int JobId; + + int JobId; int TaskId; - int TaskCancelGracePeriodSeconds; - - static EndTaskArgs FromJson(const web::json::value& jsonValue); - - protected: - private: - }; - } -} - -#endif // ENDTASKARGS_H + int TaskCancelGracePeriodSeconds; + + static EndTaskArgs FromJson(const web::json::value& jsonValue); + + protected: + private: + }; + } +} + +#endif // ENDTASKARGS_H diff --git a/nodemanager/arguments/MetricCountersConfig.h b/nodemanager/arguments/MetricCountersConfig.h index 6b5141c..cf067af 100644 --- a/nodemanager/arguments/MetricCountersConfig.h +++ b/nodemanager/arguments/MetricCountersConfig.h @@ -25,7 +25,7 @@ namespace hpc return MetricCountersConfig( JsonHelper>::Read("MetricCounters", jsonValue)); } - + protected: private: }; diff --git a/nodemanager/arguments/ProcessStartInfo.cpp b/nodemanager/arguments/ProcessStartInfo.cpp index 0939000..f8e0ed3 100644 --- a/nodemanager/arguments/ProcessStartInfo.cpp +++ b/nodemanager/arguments/ProcessStartInfo.cpp @@ -1,17 +1,17 @@ -#include "ProcessStartInfo.h" -#include "../utils/JsonHelper.h" - -using namespace hpc::arguments; -using namespace hpc::utils; - -ProcessStartInfo::ProcessStartInfo( - std::string&& cmdLine, - std::string&& stdIn, - std::string&& stdOut, - std::string&& stdErr, +#include "ProcessStartInfo.h" +#include "../utils/JsonHelper.h" + +using namespace hpc::arguments; +using namespace hpc::utils; + +ProcessStartInfo::ProcessStartInfo( + std::string&& cmdLine, + std::string&& stdIn, + std::string&& stdOut, + std::string&& stdErr, std::string&& workDir, - int taskRequeueCount, - std::vector&& affinity, + int taskRequeueCount, + std::vector&& affinity, std::map&& enviVars) : CommandLine(std::move(cmdLine)), StdInFile(std::move(stdIn)), @@ -20,23 +20,37 @@ ProcessStartInfo::ProcessStartInfo( WorkDirectory(std::move(workDir)), TaskRequeueCount(taskRequeueCount), Affinity(std::move(affinity)), - EnvironmentVariables(std::move(enviVars)) -{ -} - -/// TODO: Consider using attribute feature when compiler ready. -ProcessStartInfo ProcessStartInfo::FromJson(const web::json::value& jsonValue) -{ - ProcessStartInfo startInfo( - JsonHelper::Read("commandLine", jsonValue), - JsonHelper::Read("stdin", jsonValue), - JsonHelper::Read("stdout", jsonValue), - JsonHelper::Read("stderr", jsonValue), - JsonHelper::Read("workingDirectory", jsonValue), + EnvironmentVariables(std::move(enviVars)) +{ +} + +json::value ProcessStartInfo::ToJson() const +{ + json::value v; + v["commandLine"] = json::value::string(this->CommandLine); + v["stdin"] = json::value::string(this->StdInFile); + v["stdout"] = json::value::string(this->StdOutFile); + v["stderr"] = json::value::string(this->StdErrFile); + v["workingDirectory"] = json::value::string(this->WorkDirectory); + v["taskRequeueCount"] = this->TaskRequeueCount; + v["affinity"] = JsonHelper>::ToJson(this->Affinity); + v["environmentVariables"] = JsonHelper>::ToJson(this->EnvironmentVariables); + return v; +} + +/// TODO: Consider using attribute feature when compiler ready. +ProcessStartInfo ProcessStartInfo::FromJson(const web::json::value& jsonValue) +{ + ProcessStartInfo startInfo( + JsonHelper::Read("commandLine", jsonValue), + JsonHelper::Read("stdin", jsonValue), + JsonHelper::Read("stdout", jsonValue), + JsonHelper::Read("stderr", jsonValue), + JsonHelper::Read("workingDirectory", jsonValue), JsonHelper::Read("taskRequeueCount", jsonValue), - JsonHelper>::Read("affinity", jsonValue), - JsonHelper>::Read("environmentVariables", jsonValue)); - - return std::move(startInfo); -} - + JsonHelper>::Read("affinity", jsonValue), + JsonHelper>::Read("environmentVariables", jsonValue)); + + return std::move(startInfo); +} + diff --git a/nodemanager/arguments/ProcessStartInfo.h b/nodemanager/arguments/ProcessStartInfo.h index 894f392..99bbb1d 100644 --- a/nodemanager/arguments/ProcessStartInfo.h +++ b/nodemanager/arguments/ProcessStartInfo.h @@ -1,44 +1,45 @@ -#ifndef PROCESSSTARTINFO_H -#define PROCESSSTARTINFO_H - -#include -#include -#include - -#include - -namespace hpc -{ - namespace arguments - { - struct ProcessStartInfo - { - public: - ProcessStartInfo( - std::string&& cmdLine, - std::string&& stdIn, - std::string&& stdOut, - std::string&& stdErr, +#ifndef PROCESSSTARTINFO_H +#define PROCESSSTARTINFO_H + +#include +#include +#include + +#include + +namespace hpc +{ + namespace arguments + { + struct ProcessStartInfo + { + public: + ProcessStartInfo( + std::string&& cmdLine, + std::string&& stdIn, + std::string&& stdOut, + std::string&& stdErr, std::string&& workDir, int taskRequeueCount, - std::vector&& affinity, - std::map&& enviVars); - - ProcessStartInfo(ProcessStartInfo&& startInfo) = default; - - static ProcessStartInfo FromJson(const web::json::value& jsonValue); + std::vector&& affinity, + std::map&& enviVars); - std::string CommandLine; - std::string StdInFile; - std::string StdOutFile; - std::string StdErrFile; + ProcessStartInfo(ProcessStartInfo&& startInfo) = default; + + static ProcessStartInfo FromJson(const web::json::value& jsonValue); + web::json::value ToJson() const; + + std::string CommandLine; + std::string StdInFile; + std::string StdOutFile; + std::string StdErrFile; std::string WorkDirectory; - int TaskRequeueCount; - std::vector Affinity; - std::map EnvironmentVariables; - protected: - private: - }; - } -} -#endif // PROCESSSTARTINFO_H + int TaskRequeueCount; + std::vector Affinity; + std::map EnvironmentVariables; + protected: + private: + }; + } +} +#endif // PROCESSSTARTINFO_H diff --git a/nodemanager/arguments/StartJobAndTaskArgs.cpp b/nodemanager/arguments/StartJobAndTaskArgs.cpp index f9fae64..1034971 100644 --- a/nodemanager/arguments/StartJobAndTaskArgs.cpp +++ b/nodemanager/arguments/StartJobAndTaskArgs.cpp @@ -4,6 +4,7 @@ using namespace hpc::arguments; using namespace hpc::utils; +using namespace web; StartJobAndTaskArgs::StartJobAndTaskArgs(int jobId, int taskId, ProcessStartInfo&& startInfo, std::string&& userName, std::string&& password) : @@ -13,6 +14,25 @@ StartJobAndTaskArgs::StartJobAndTaskArgs(int jobId, int taskId, ProcessStartInfo //ctor } +json::value StartJobAndTaskArgs::ToJson() const +{ + json::value v; + + json::value jobTaskId; + jobTaskId["JobId"] = this->JobId; + jobTaskId["TaskId"] = this->TaskId; + + v["m_Item1"] = jobTaskId; + v["m_Item2"] = this->StartInfo.ToJson(); + v["m_Item3"] = json::value::string(this->UserName); + v["m_Item4"] = json::value::string(this->Password); + v["m_Item5"] = json::value::string(this->PrivateKey); + v["m_Item6"] = json::value::string(this->PublicKey); + + return v; +} + + StartJobAndTaskArgs StartJobAndTaskArgs::FromJson(const json::value& j) { StartJobAndTaskArgs args( diff --git a/nodemanager/arguments/StartJobAndTaskArgs.h b/nodemanager/arguments/StartJobAndTaskArgs.h index 3e82bcf..43f9fb9 100644 --- a/nodemanager/arguments/StartJobAndTaskArgs.h +++ b/nodemanager/arguments/StartJobAndTaskArgs.h @@ -1,41 +1,42 @@ -#ifndef STARTJOBANDTASKARGS_H -#define STARTJOBANDTASKARGS_H - -#include - -#include "ProcessStartInfo.h" - -namespace hpc -{ - namespace arguments - { - struct StartJobAndTaskArgs - { - public: +#ifndef STARTJOBANDTASKARGS_H +#define STARTJOBANDTASKARGS_H + +#include + +#include "ProcessStartInfo.h" + +namespace hpc +{ + namespace arguments + { + struct StartJobAndTaskArgs + { + public: StartJobAndTaskArgs( int jobId, int taskId, ProcessStartInfo&& startInfo, std::string&& userName, - std::string&& password); - - StartJobAndTaskArgs(StartJobAndTaskArgs&& args) = default; - - int JobId; - int TaskId; + std::string&& password); + + StartJobAndTaskArgs(StartJobAndTaskArgs&& args) = default; + web::json::value ToJson() const; + + int JobId; + int TaskId; ProcessStartInfo StartInfo; std::string UserName; std::string Password; std::vector Certificate; std::string PrivateKey; - std::string PublicKey; - - static StartJobAndTaskArgs FromJson(const web::json::value& jsonValue); - - protected: - private: - }; - } -} - -#endif // STARTJOBANDTASKARGS_H + std::string PublicKey; + + static StartJobAndTaskArgs FromJson(const web::json::value& jsonValue); + + protected: + private: + }; + } +} + +#endif // STARTJOBANDTASKARGS_H diff --git a/nodemanager/arguments/StartTaskArgs.cpp b/nodemanager/arguments/StartTaskArgs.cpp index 004ecac..925c1cc 100644 --- a/nodemanager/arguments/StartTaskArgs.cpp +++ b/nodemanager/arguments/StartTaskArgs.cpp @@ -1,22 +1,22 @@ -#include "StartTaskArgs.h" -#include "../utils/JsonHelper.h" - -using namespace hpc::arguments; -using namespace hpc::utils; - -StartTaskArgs::StartTaskArgs(int jobId, int taskId, ProcessStartInfo&& startInfo) : - JobId(jobId), TaskId(taskId), StartInfo(std::move(startInfo)) -{ - //ctor -} - -StartTaskArgs StartTaskArgs::FromJson(const json::value& j) -{ - StartTaskArgs args( - JsonHelper::Read("JobId", j.at("m_Item1")), - JsonHelper::Read("TaskId", j.at("m_Item1")), - ProcessStartInfo::FromJson(j.at("m_Item2"))); - - return std::move(args); -} - +#include "StartTaskArgs.h" +#include "../utils/JsonHelper.h" + +using namespace hpc::arguments; +using namespace hpc::utils; + +StartTaskArgs::StartTaskArgs(int jobId, int taskId, ProcessStartInfo&& startInfo) : + JobId(jobId), TaskId(taskId), StartInfo(std::move(startInfo)) +{ + //ctor +} + +StartTaskArgs StartTaskArgs::FromJson(const json::value& j) +{ + StartTaskArgs args( + JsonHelper::Read("JobId", j.at("m_Item1")), + JsonHelper::Read("TaskId", j.at("m_Item1")), + ProcessStartInfo::FromJson(j.at("m_Item2"))); + + return std::move(args); +} + diff --git a/nodemanager/arguments/StartTaskArgs.h b/nodemanager/arguments/StartTaskArgs.h index ba378ba..1e251f3 100644 --- a/nodemanager/arguments/StartTaskArgs.h +++ b/nodemanager/arguments/StartTaskArgs.h @@ -1,29 +1,29 @@ -#ifndef STARTTASKARGS_H -#define STARTTASKARGS_H - -#include - -#include "ProcessStartInfo.h" - -namespace hpc -{ - namespace arguments - { - struct StartTaskArgs - { - public: - StartTaskArgs(int jobId, int taskId, ProcessStartInfo&& startInfo); - - int JobId; - int TaskId; - ProcessStartInfo StartInfo; - - static StartTaskArgs FromJson(const web::json::value& jsonValue); - - protected: - private: - }; - } -} - -#endif // STARTTASKARGS_H +#ifndef STARTTASKARGS_H +#define STARTTASKARGS_H + +#include + +#include "ProcessStartInfo.h" + +namespace hpc +{ + namespace arguments + { + struct StartTaskArgs + { + public: + StartTaskArgs(int jobId, int taskId, ProcessStartInfo&& startInfo); + + int JobId; + int TaskId; + ProcessStartInfo StartInfo; + + static StartTaskArgs FromJson(const web::json::value& jsonValue); + + protected: + private: + }; + } +} + +#endif // STARTTASKARGS_H diff --git a/nodemanager/common/ErrorCodes.h b/nodemanager/common/ErrorCodes.h index 92697bf..d9ca726 100644 --- a/nodemanager/common/ErrorCodes.h +++ b/nodemanager/common/ErrorCodes.h @@ -17,6 +17,9 @@ namespace hpc TestRunFailed = 176, FailedToOpenPort = 177, ConfigurationFileError = 178, + WriteFileError = 179, + ReadFileError = 180, + UnknownFilter = 181, }; } } diff --git a/nodemanager/config/nodemanager.json b/nodemanager/config/nodemanager.json index 19bea23..1458c39 100644 --- a/nodemanager/config/nodemanager.json +++ b/nodemanager/config/nodemanager.json @@ -2,11 +2,14 @@ "TrustedCAFile":"/opt/hpcnodemanager/certs/test.pem", "TrustedCAPath":"/opt/hpcnodemanager/certs/", "UseDefaultCA":false, - "ClusterAuthenticationKey":"abc", + "ClusterAuthenticationKey":"", "HeartbeatUri":"https://evancv1:40001/api/evanc.com/computenodereported", "MetricUri":"udp://EVANCV1.fareast.corp.microsoft.com:9894/api/4aed4836-c0c8-4e92-853d-cac4698582d1/metricreported", "RegisterUri":"https://evancv1:40001/api/evanc.com/registerrequested", "CertificateChainFile":"/opt/hpcnodemanager/certs/evanc.com.crt", "PrivateKeyFile":"/opt/hpcnodemanager/certs/evanc.com.key", - "ListeningUri":"https://0.0.0.0:40002" + "ListeningUri":"http://0.0.0.0:40000", + "JobStartFilter":"sed -s 's/123/456/g'", + "JobEndFilter":"sed -s 's/123/456/g'", + "TaskStartFilter":"sed -s 's/123/456/g'" } diff --git a/nodemanager/core/HttpReporter.cpp b/nodemanager/core/HttpReporter.cpp index 38d1527..b1cb7f4 100644 --- a/nodemanager/core/HttpReporter.cpp +++ b/nodemanager/core/HttpReporter.cpp @@ -13,15 +13,15 @@ void HttpReporter::Report() { const std::string& uri = this->reportUri; - auto jsonBody = this->valueFetcher(); - if (jsonBody.is_null()) - { - Logger::Error("Skipped reporting to {0} because json is null", uri); - return; - } - - Logger::Debug("---------> Report to {0} with {1}", uri, jsonBody); - + auto jsonBody = this->valueFetcher(); + if (jsonBody.is_null()) + { + Logger::Error("Skipped reporting to {0} because json is null", uri); + return; + } + + Logger::Debug("---------> Report to {0} with {1}", uri, jsonBody); + http_client client = HttpHelper::GetHttpClient(uri); http_request request = HttpHelper::GetHttpRequest(methods::POST, jsonBody); @@ -38,18 +38,18 @@ void HttpReporter::Report() this->intervalSeconds = milliseconds / 1000; } - Logger::Debug("---------> Reported to {0} response code {1}, value {2}, interval {3}", uri, response.status_code(), milliseconds, this->intervalSeconds); + Logger::Debug("---------> Reported to {0} response code {1}, value {2}, interval {3}", uri, response.status_code(), milliseconds, this->intervalSeconds); } - catch (const http_exception& httpEx) - { - Logger::Warn("HttpException occurred when report to {0}, ex {1}", this->reportUri, httpEx.what()); - } - catch (const std::exception& ex) - { - Logger::Error("Exception occurred when report to {0}, ex {1}", this->reportUri, ex.what()); - } - catch (...) - { - Logger::Error("Unknown error occurred when report to {0}", this->reportUri); + catch (const http_exception& httpEx) + { + Logger::Warn("HttpException occurred when report to {0}, ex {1}", this->reportUri, httpEx.what()); + } + catch (const std::exception& ex) + { + Logger::Error("Exception occurred when report to {0}, ex {1}", this->reportUri, ex.what()); + } + catch (...) + { + Logger::Error("Unknown error occurred when report to {0}", this->reportUri); } } diff --git a/nodemanager/core/IRemoteExecutor.h b/nodemanager/core/IRemoteExecutor.h index 6c76b31..5ad55a7 100644 --- a/nodemanager/core/IRemoteExecutor.h +++ b/nodemanager/core/IRemoteExecutor.h @@ -1,28 +1,28 @@ -#ifndef IREMOTEEXECUTOR_H -#define IREMOTEEXECUTOR_H - -#include "../arguments/StartJobAndTaskArgs.h" -#include "../arguments/StartTaskArgs.h" -#include "../arguments/EndJobArgs.h" +#ifndef IREMOTEEXECUTOR_H +#define IREMOTEEXECUTOR_H + +#include "../arguments/StartJobAndTaskArgs.h" +#include "../arguments/StartTaskArgs.h" +#include "../arguments/EndJobArgs.h" #include "../arguments/EndTaskArgs.h" #include "../arguments/MetricCountersConfig.h" - -namespace hpc -{ - namespace core - { - class IRemoteExecutor - { - public: - virtual web::json::value StartJobAndTask(hpc::arguments::StartJobAndTaskArgs&& args, const std::string& callbackUri) = 0; - virtual web::json::value StartTask(hpc::arguments::StartTaskArgs&& args, const std::string& callbackUri) = 0; - virtual web::json::value EndJob(hpc::arguments::EndJobArgs&& args) = 0; - virtual web::json::value EndTask(hpc::arguments::EndTaskArgs&& args, const std::string& callbackUri) = 0; - virtual web::json::value Ping(const std::string& callbackUri) = 0; - virtual web::json::value Metric(const std::string& callbackUri) = 0; - virtual web::json::value MetricConfig(hpc::arguments::MetricCountersConfig&& config, const std::string& callbackUri) = 0; - }; - } -} - -#endif // IREMOTEEXECUTOR_H + +namespace hpc +{ + namespace core + { + class IRemoteExecutor + { + public: + virtual web::json::value StartJobAndTask(hpc::arguments::StartJobAndTaskArgs&& args, const std::string& callbackUri) = 0; + virtual web::json::value StartTask(hpc::arguments::StartTaskArgs&& args, const std::string& callbackUri) = 0; + virtual web::json::value EndJob(hpc::arguments::EndJobArgs&& args) = 0; + virtual web::json::value EndTask(hpc::arguments::EndTaskArgs&& args, const std::string& callbackUri) = 0; + virtual web::json::value Ping(const std::string& callbackUri) = 0; + virtual web::json::value Metric(const std::string& callbackUri) = 0; + virtual web::json::value MetricConfig(hpc::arguments::MetricCountersConfig&& config, const std::string& callbackUri) = 0; + }; + } +} + +#endif // IREMOTEEXECUTOR_H diff --git a/nodemanager/core/JobTaskTable.cpp b/nodemanager/core/JobTaskTable.cpp index cdc0902..76c0fb5 100644 --- a/nodemanager/core/JobTaskTable.cpp +++ b/nodemanager/core/JobTaskTable.cpp @@ -1,22 +1,22 @@ -#include "JobTaskTable.h" -#include "../utils/WriterLock.h" +#include "JobTaskTable.h" +#include "../utils/WriterLock.h" #include "../utils/ReaderLock.h" -#include "../utils/System.h" - -using namespace hpc::core; -using namespace web; +#include "../utils/System.h" + +using namespace hpc::core; +using namespace web; using namespace hpc::data; -using namespace hpc::utils; +using namespace hpc::utils; JobTaskTable* JobTaskTable::instance = nullptr; - -json::value JobTaskTable::ToJson() -{ - ReaderLock readerLock(&this->lock); - auto j = this->nodeInfo.ToJson(); - return std::move(j); + +json::value JobTaskTable::ToJson() +{ + ReaderLock readerLock(&this->lock); + auto j = this->nodeInfo.ToJson(); + return std::move(j); } - + int JobTaskTable::GetTaskCount() { ReaderLock readerLock(&this->lock); @@ -27,7 +27,7 @@ int JobTaskTable::GetTaskCount() return taskCount; } - + int JobTaskTable::GetCoresInUse() { ReaderLock readerLock(&this->lock); @@ -64,88 +64,88 @@ int JobTaskTable::GetCoresInUse() return used; } - -std::shared_ptr JobTaskTable::AddJobAndTask(int jobId, int taskId, bool& isNewEntry) -{ - WriterLock writerLock(&this->lock); - - std::shared_ptr job; - auto j = this->nodeInfo.Jobs.find(jobId); - if (j == this->nodeInfo.Jobs.end()) - { - job = std::shared_ptr(new JobInfo(jobId)); + +std::shared_ptr JobTaskTable::AddJobAndTask(int jobId, int taskId, bool& isNewEntry) +{ + WriterLock writerLock(&this->lock); + + std::shared_ptr job; + auto j = this->nodeInfo.Jobs.find(jobId); + if (j == this->nodeInfo.Jobs.end()) + { + job = std::shared_ptr(new JobInfo(jobId)); this->nodeInfo.Jobs[jobId] = job; - } - else - { + } + else + { job = j->second; - } - - std::shared_ptr task; - auto t = job->Tasks.find(taskId); - if (t == job->Tasks.end()) - { - task = std::shared_ptr(new TaskInfo(jobId, taskId, nodeInfo.Name)); + } + + std::shared_ptr task; + auto t = job->Tasks.find(taskId); + if (t == job->Tasks.end()) + { + task = std::shared_ptr(new TaskInfo(jobId, taskId, nodeInfo.Name)); job->Tasks[taskId] = task; - isNewEntry = true; - } - else - { + isNewEntry = true; + } + else + { task = t->second; - isNewEntry = false; - } - - return task; + isNewEntry = false; + } + + return task; } std::shared_ptr JobTaskTable::GetTask(int jobId, int taskId) { - ReaderLock readerLock(&this->lock); + ReaderLock readerLock(&this->lock); - std::shared_ptr task; - - auto j = this->nodeInfo.Jobs.find(jobId); - if (j != this->nodeInfo.Jobs.end()) - { + std::shared_ptr task; + + auto j = this->nodeInfo.Jobs.find(jobId); + if (j != this->nodeInfo.Jobs.end()) + { auto t = j->second->Tasks.find(taskId); if (t != j->second->Tasks.end()) { task = t->second; - } + } } - return task; -} - -std::shared_ptr JobTaskTable::RemoveJob(int jobId) -{ - WriterLock writerLock(&this->lock); - - std::shared_ptr job; - auto j = this->nodeInfo.Jobs.find(jobId); - if (j != this->nodeInfo.Jobs.end()) - { - job = j->second; - this->nodeInfo.Jobs.erase(j); - } - - return job; -} - -void JobTaskTable::RemoveTask(int jobId, int taskId, uint64_t attemptId) -{ - WriterLock writerLock(&this->lock); - - auto j = this->nodeInfo.Jobs.find(jobId); - if (j != this->nodeInfo.Jobs.end()) + return task; +} + +std::shared_ptr JobTaskTable::RemoveJob(int jobId) +{ + WriterLock writerLock(&this->lock); + + std::shared_ptr job; + auto j = this->nodeInfo.Jobs.find(jobId); + if (j != this->nodeInfo.Jobs.end()) + { + job = j->second; + this->nodeInfo.Jobs.erase(j); + } + + return job; +} + +void JobTaskTable::RemoveTask(int jobId, int taskId, uint64_t attemptId) +{ + WriterLock writerLock(&this->lock); + + auto j = this->nodeInfo.Jobs.find(jobId); + if (j != this->nodeInfo.Jobs.end()) { auto t = j->second->Tasks.find(taskId); // only erase when attempt ID matches. if (t != j->second->Tasks.end() && t->second->GetAttemptId() == attemptId) - { + { j->second->Tasks.erase(t); - } + } } -} +} diff --git a/nodemanager/core/JobTaskTable.h b/nodemanager/core/JobTaskTable.h index b6cbd28..a2bf71d 100644 --- a/nodemanager/core/JobTaskTable.h +++ b/nodemanager/core/JobTaskTable.h @@ -1,37 +1,37 @@ -#ifndef JOBTASKTABLE_H -#define JOBTASKTABLE_H - -#include -#include - -#include "../data/TaskInfo.h" -#include "../data/JobInfo.h" +#ifndef JOBTASKTABLE_H +#define JOBTASKTABLE_H + +#include +#include + +#include "../data/TaskInfo.h" +#include "../data/JobInfo.h" #include "../data/NodeInfo.h" -#include "../utils/ReaderLock.h" - -namespace hpc -{ - namespace core - { - class JobTaskTable - { - public: +#include "../utils/ReaderLock.h" + +namespace hpc +{ + namespace core + { + class JobTaskTable + { + public: JobTaskTable() : lock(PTHREAD_RWLOCK_INITIALIZER) { JobTaskTable::instance = this; - } + } ~JobTaskTable() { pthread_rwlock_destroy(&this->lock); JobTaskTable::instance = nullptr; - } - - web::json::value ToJson(); - // web::json::value GetTaskJson(int jobId, int taskId) const; - - std::shared_ptr AddJobAndTask(int jobId, int taskId, bool& isNewEntry); - std::shared_ptr RemoveJob(int jobId); + } + + web::json::value ToJson(); + // web::json::value GetTaskJson(int jobId, int taskId) const; + + std::shared_ptr AddJobAndTask(int jobId, int taskId, bool& isNewEntry); + std::shared_ptr RemoveJob(int jobId); void RemoveTask(int jobId, int taskId, uint64_t attemptId); std::shared_ptr GetTask(int jobId, int taskId); int GetJobCount() @@ -49,17 +49,17 @@ namespace hpc // Set this flag so the next report will trigger the resync with scheduler. this->nodeInfo.JustStarted = true; } - - static JobTaskTable* GetInstance() { return JobTaskTable::instance; } - - protected: - private: - pthread_rwlock_t lock; + + static JobTaskTable* GetInstance() { return JobTaskTable::instance; } + + protected: + private: + pthread_rwlock_t lock; hpc::data::NodeInfo nodeInfo; - static JobTaskTable* instance; - }; - } -} - -#endif // JOBTASKTABLE_H + static JobTaskTable* instance; + }; + } +} + +#endif // JOBTASKTABLE_H diff --git a/nodemanager/core/NodeManagerConfig.h b/nodemanager/core/NodeManagerConfig.h index ee592b4..34357af 100644 --- a/nodemanager/core/NodeManagerConfig.h +++ b/nodemanager/core/NodeManagerConfig.h @@ -36,6 +36,9 @@ namespace hpc AddConfigurationItem(std::string, CertificateChainFile); AddConfigurationItem(std::string, PrivateKeyFile); AddConfigurationItem(std::string, ListeningUri); + AddConfigurationItem(std::string, JobStartFilter); + AddConfigurationItem(std::string, JobEndFilter); + AddConfigurationItem(std::string, TaskStartFilter); AddConfigurationItem(bool, UseDefaultCA); AddConfigurationItem(bool, Debug); AddConfigurationItem(int, HostsFetchInterval); diff --git a/nodemanager/core/Process.cpp b/nodemanager/core/Process.cpp index d0e2a1c..7ebc9cc 100644 --- a/nodemanager/core/Process.cpp +++ b/nodemanager/core/Process.cpp @@ -57,13 +57,13 @@ void Process::Cleanup() Logger::Info("Cleanup zombie result: {0}", output); } -pplx::task Process::Start() +pplx::task> Process::Start() { pthread_create(&this->threadId, nullptr, ForkThread, this); Logger::Debug(this->jobId, this->taskId, this->requeueCount, "Created thread {0}", this->threadId); - return pplx::task(this->started); + return pplx::task>(this->started); } void Process::Kill(int forcedExitCode, bool forced) @@ -183,6 +183,7 @@ Start: Logger::Error(p->jobId, p->taskId, p->requeueCount, "Failed to fork(), pid = {0}, errno = {1}, msg = {2}", p->processId, errno, errorMessage); p->SetExitCode(errno); + p->started.set(std::pair(p->processId, p->threadId)); goto Final; } @@ -193,8 +194,7 @@ Start: else { assert(p->processId > 0); - p->started.set(p->processId); - assert(p->processId > 0); + p->started.set(std::pair(p->processId, p->threadId)); p->Monitor(); } @@ -483,26 +483,16 @@ int Process::CreateTaskFolder() sprintf(folder, "/tmp/nodemanager_task_%d_%d.XXXXXX", this->taskId, this->requeueCount); - char* p = mkdtemp(folder); + int ret = System::CreateTempFolder(folder, this->userName); - if (p) + if (ret != 0) { - this->taskFolder = p; - - int ret; - ret = this->ExecuteCommand("chown -R", this->userName, this->taskFolder); - - if (ret == 0) - { - ret = this->ExecuteCommand("chmod -R u+rwX", this->taskFolder); - } - - return ret; - } - else - { - return errno; + Logger::Debug(this->jobId, this->taskId, this->requeueCount, "CreateTaskFolder failed exitCode {0}.", ret); } + + this->taskFolder = folder; + + return ret; } std::string Process::BuildScript() @@ -519,6 +509,8 @@ std::string Process::BuildScript() fs << "#!/bin/bash" << std::endl << std::endl; fs << "cd "; + + Logger::Debug("{0}, {1}", this->taskFolder, this->workDirectory); if (this->workDirectory.empty()) { fs << this->taskFolder; diff --git a/nodemanager/core/Process.h b/nodemanager/core/Process.h index db5d954..86b2c32 100644 --- a/nodemanager/core/Process.h +++ b/nodemanager/core/Process.h @@ -1,158 +1,158 @@ -#ifndef PROCESS_H -#define PROCESS_H - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "../utils/String.h" -#include "../utils/Logger.h" -#include "../utils/System.h" +#ifndef PROCESS_H +#define PROCESS_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "../utils/String.h" +#include "../utils/Logger.h" +#include "../utils/System.h" #include "../common/ErrorCodes.h" #include "../data/ProcessStatistics.h" - -using namespace hpc::utils; - -namespace hpc -{ - namespace core - { - class Process - { - public: + +using namespace hpc::utils; + +namespace hpc +{ + namespace core + { + class Process + { + public: typedef void Callback( int, std::string&&, - const hpc::data::ProcessStatistics& stat); - + const hpc::data::ProcessStatistics& stat); + Process( int jobId, int taskId, - int requeueCount, - const std::string& cmdLine, - const std::string& standardOut, - const std::string& standardErr, - const std::string& standardIn, + int requeueCount, + const std::string& cmdLine, + const std::string& standardOut, + const std::string& standardErr, + const std::string& standardIn, const std::string& workDir, const std::string& user, - std::vector&& cpuAffinity, - std::map&& envi, - const std::function completed); - - Process(Process&&) = default; - - virtual ~Process(); - - pplx::task Start(); - void Kill(int forcedExitCode = 0x0FFFFFFF, bool forced = true); - const hpc::data::ProcessStatistics& GetStatisticsFromCGroup(); + std::vector&& cpuAffinity, + std::map&& envi, + const std::function completed); + + Process(Process&&) = default; + + virtual ~Process(); + + pplx::task> Start(); + void Kill(int forcedExitCode = 0x0FFFFFFF, bool forced = true); + const hpc::data::ProcessStatistics& GetStatisticsFromCGroup(); static void Cleanup(); - - protected: + + protected: private: void SetExitCode(int exitCode) { this->exitCode = exitCode; this->exitCodeSet = true; } - + void OnCompleted(); - int CreateTaskFolder(); - - template - int ExecuteCommand(const std::string& cmd, const Args& ... args) - { - std::string output; - std::string cmdLine = String::Join(" ", cmd, args...); - int ret = System::ExecuteCommandOut(output, cmd, args...); - if (ret != 0) + int CreateTaskFolder(); + + template + int ExecuteCommand(const std::string& cmd, const Args& ... args) + { + std::string output; + std::string cmdLine = String::Join(" ", cmd, args...); + int ret = System::ExecuteCommandOut(output, cmd, args...); + if (ret != 0) { this->SetExitCode(ret); this->message << "Task " << this->taskId << ": '" << cmdLine << "', exitCode " << ret << ". output " - << output << std::endl; + << output << std::endl; } - Logger::Debug(this->jobId, this->taskId, this->requeueCount, "'{0}', exitCode {1}, output {2}.", cmdLine, ret, output); - - return ret; - } - - template - int ExecuteCommandNoCapture(const std::string& cmd, const Args& ... args) - { - std::string output; - std::string cmdLine = String::Join(" ", cmd, args...); - int ret = System::ExecuteCommandOut(output, cmd, args...); + Logger::Debug(this->jobId, this->taskId, this->requeueCount, "'{0}', exitCode {1}, output {2}.", cmdLine, ret, output); + + return ret; + } + + template + int ExecuteCommandNoCapture(const std::string& cmd, const Args& ... args) + { + std::string output; + std::string cmdLine = String::Join(" ", cmd, args...); + int ret = System::ExecuteCommandOut(output, cmd, args...); + + Logger::Debug(this->jobId, this->taskId, this->requeueCount, "'{0}', exitCode {1}, output {2}.", cmdLine, ret, output); + + return ret; + } - Logger::Debug(this->jobId, this->taskId, this->requeueCount, "'{0}', exitCode {1}, output {2}.", cmdLine, ret, output); - - return ret; - } - static void* ForkThread(void*); - + std::string GetAffinity(); static inline void OutputAffinity(std::ostringstream& oss, int start, int end) { if (start == end) { oss << "," << start; } else if (start < end) { oss << "," << start << "-" << end; } } - + void Run(const std::string& path); static void* ReadPipeThread(void* p); - void SendbackOutput(const std::string& uri, const std::string& output, int order) const; - void Monitor(); - std::string BuildScript(); - std::unique_ptr PrepareEnvironment(); - - std::ostringstream stdOut; - std::ostringstream stdErr; - std::ostringstream message; + void SendbackOutput(const std::string& uri, const std::string& output, int order) const; + void Monitor(); + std::string BuildScript(); + std::unique_ptr PrepareEnvironment(); + + std::ostringstream stdOut; + std::ostringstream stdErr; + std::ostringstream message; int exitCode = (int)hpc::common::ErrorCodes::DefaultExitCode; - bool exitCodeSet = false; + bool exitCodeSet = false; hpc::data::ProcessStatistics statistics; - - std::string taskFolder; + + std::string taskFolder; const int jobId; const int taskId; - const int requeueCount; - const std::string taskExecutionId; - const std::string commandLine; - std::string stdOutFile; - std::string stdErrFile; - const std::string stdInFile; + const int requeueCount; + const std::string taskExecutionId; + const std::string commandLine; + std::string stdOutFile; + std::string stdErrFile; + const std::string stdInFile; const std::string workDirectory; const std::string userName; - const std::vector affinity; - const std::map environments; + const std::vector affinity; + const std::map environments; std::vector environmentsBuffer; bool streamOutput = false; - int stdoutPipe[2]; - - const std::function callback; - - pthread_t threadId = 0; + int stdoutPipe[2]; + + const std::function callback; + + pthread_t threadId = 0; pthread_t outputThreadId = 0; pid_t processId; bool ended = false; - pthread_rwlock_t lock = PTHREAD_RWLOCK_INITIALIZER; - - pplx::task_completion_event started; - }; - } -} - -#endif // PROCESS_H + pthread_rwlock_t lock = PTHREAD_RWLOCK_INITIALIZER; + + pplx::task_completion_event> started; + }; + } +} + +#endif // PROCESS_H diff --git a/nodemanager/core/RemoteCommunicator.cpp b/nodemanager/core/RemoteCommunicator.cpp index 2cef238..5bdbb51 100644 --- a/nodemanager/core/RemoteCommunicator.cpp +++ b/nodemanager/core/RemoteCommunicator.cpp @@ -1,131 +1,132 @@ #include #include - -#include "RemoteCommunicator.h" -#include "../utils/String.h" -#include "../utils/System.h" -#include "../arguments/StartJobAndTaskArgs.h" + +#include "RemoteCommunicator.h" +#include "../utils/String.h" +#include "../utils/System.h" +#include "../arguments/StartJobAndTaskArgs.h" #include "../common/ErrorCodes.h" #include "NodeManagerConfig.h" #include "HttpHelper.h" #include "../arguments/MetricCountersConfig.h" - + using namespace web::http; -using namespace web; -using namespace hpc::utils; -using namespace hpc::arguments; +using namespace web; +using namespace hpc::utils; +using namespace hpc::arguments; using namespace hpc::core; using namespace hpc::common; using namespace web::http::experimental::listener; +using namespace hpc::filters; -RemoteCommunicator::RemoteCommunicator(IRemoteExecutor& exec, const http_listener_config& config) : +RemoteCommunicator::RemoteCommunicator(IRemoteExecutor& exec, const http_listener_config& config) : listeningUri(NodeManagerConfig::GetListeningUri()), isListening(false), executor(exec), - listener(listeningUri, config) -{ - this->listener.support( - methods::POST, - [this](auto request) { this->HandlePost(request); }); + listener(listeningUri, config) +{ + this->listener.support( + methods::POST, + [this](auto request) { this->HandlePost(request); }); if (NodeManagerConfig::GetDebug()) - { - this->listener.support( - methods::GET, - [this](auto request) { this->HandleGet(request); }); + { + this->listener.support( + methods::GET, + [this](auto request) { this->HandleGet(request); }); } - + this->processors["startjobandtask"] = [this] (const auto& j, const auto& c) { return this->StartJobAndTask(j, c); }; - this->processors["starttask"] = [this] (const auto& j, const auto& c) { return this->StartTask(j, c); }; - this->processors["endjob"] = [this] (const auto& j, const auto& c) { return this->EndJob(j, c); }; - this->processors["endtask"] = [this] (const auto& j, const auto& c) { return this->EndTask(j, c); }; - this->processors["ping"] = [this] (const auto& j, const auto& c) { return this->Ping(j, c); }; - this->processors["metric"] = [this] (const auto& j, const auto& c) { return this->Metric(j, c); }; - this->processors["metricconfig"] = [this] (const auto& j, const auto& c) { return this->MetricConfig(j, c); }; -} - -RemoteCommunicator::~RemoteCommunicator() -{ - this->Close(); -} - -void RemoteCommunicator::Open() -{ - if (!this->isListening) - { - this->listener.open().then([this](auto t) - { - this->isListening = !IsError(t); - Logger::Info( - "Opening at {0}, result {1}", - this->listener.uri().to_string(), + this->processors["starttask"] = [this] (const auto& j, const auto& c) { return this->StartTask(j, c); }; + this->processors["endjob"] = [this] (const auto& j, const auto& c) { return this->EndJob(j, c); }; + this->processors["endtask"] = [this] (const auto& j, const auto& c) { return this->EndTask(j, c); }; + this->processors["ping"] = [this] (const auto& j, const auto& c) { return this->Ping(j, c); }; + this->processors["metric"] = [this] (const auto& j, const auto& c) { return this->Metric(j, c); }; + this->processors["metricconfig"] = [this] (const auto& j, const auto& c) { return this->MetricConfig(j, c); }; +} + +RemoteCommunicator::~RemoteCommunicator() +{ + this->Close(); +} + +void RemoteCommunicator::Open() +{ + if (!this->isListening) + { + this->listener.open().then([this](auto t) + { + this->isListening = !IsError(t); + Logger::Info( + "Opening at {0}, result {1}", + this->listeningUri, this->isListening ? "opened." : "failed."); if (!this->isListening) { exit((int)ErrorCodes::FailedToOpenPort); - } - }); - } -} - -void RemoteCommunicator::Close() -{ - if (this->isListening) - { - try - { - this->listener.close().wait(); - this->isListening = false; - } - catch (const std::exception& ex) - { - Logger::Error("Exception happened while close the listener {0}, {1}", this->listener.uri().to_string().c_str(), ex.what()); - } - } -} + } + }); + } +} + +void RemoteCommunicator::Close() +{ + if (this->isListening) + { + try + { + this->listener.close().wait(); + this->isListening = false; + } + catch (const std::exception& ex) + { + Logger::Error("Exception happened while close the listener {0}, {1}", this->listener.uri().to_string().c_str(), ex.what()); + } + } +} void RemoteCommunicator::HandleGet(http_request request) { - auto uri = request.relative_uri().to_string(); - Logger::Info("Request (GET): Uri {0}", uri); + auto uri = request.relative_uri().to_string(); + Logger::Info("Request (GET): Uri {0}", uri); json::value body; body["status"] = json::value::string("node manager working"); - request.reply(status_codes::OK, body).then([this](auto t) { this->IsError(t); }); -} - -void RemoteCommunicator::HandlePost(http_request request) + request.reply(status_codes::OK, body).then([this](auto t) { this->IsError(t); }); +} + +void RemoteCommunicator::HandlePost(http_request request) { - auto uri = request.relative_uri().to_string(); - Logger::Info("Request: Uri {0}", uri); - - std::vector tokens = String::Split(uri, '/'); + auto uri = request.relative_uri().to_string(); + Logger::Info("Request: Uri {0}", uri); + + std::vector tokens = String::Split(uri, '/'); if (tokens.size() < 4) { Logger::Warn("Not supported uri {0}", uri); return; } - - // skip the first '/'. - int p = 1; - auto apiSpace = tokens[p++]; - auto nodeName = tokens[p++]; - auto methodName = tokens[p++]; - - Logger::Debug("Request: Uri {0}, Node {1}, Method {2}", uri, nodeName, methodName); - - if (apiSpace != ApiSpace) - { - Logger::Error("Not allowed ApiSpace {0}", apiSpace); - request.reply(status_codes::NotFound, U("Not found")) - .then([this](auto t) { this->IsError(t); }); - return; - } + + // skip the first '/'. + int p = 1; + auto apiSpace = tokens[p++]; + auto nodeName = tokens[p++]; + auto methodName = tokens[p++]; + + Logger::Debug("Request: Uri {0}, Node {1}, Method {2}", uri, nodeName, methodName); + + if (apiSpace != ApiSpace) + { + Logger::Error("Not allowed ApiSpace {0}", apiSpace); + request.reply(status_codes::NotFound, U("Not found")) + .then([this](auto t) { this->IsError(t); }); + return; + } std::string authenticationKey; if (HttpHelper::FindHeader(request, HttpHelper::AuthenticationHeaderKey, authenticationKey)) { - Logger::Debug("AuthenticationKey found"); + Logger::Debug("AuthenticationKey found"); } if (NodeManagerConfig::GetClusterAuthenticationKey() != authenticationKey) @@ -135,84 +136,120 @@ void RemoteCommunicator::HandlePost(http_request request) return; } - std::string callbackUri; + std::string callbackUri; if (HttpHelper::FindHeader(request, CallbackUriKey, callbackUri)) - { - Logger::Debug("CallbackUri found {0}", callbackUri.c_str()); - } - - auto processor = this->processors.find(methodName); - - if (processor != this->processors.end()) - { + { + Logger::Debug("CallbackUri found {0}", callbackUri.c_str()); + } + + auto processor = this->processors.find(methodName); + + if (processor != this->processors.end()) + { request.extract_json().then([processor, callback = std::move(callbackUri)](pplx::task t) { auto j = t.get(); // Logger::Debug("Json: {0}", j.serialize()); - - return processor->second(j, callback); - }) - .then([request, this](pplx::task t) + + return processor->second(j, callback); + }) + .then([request, this](pplx::task t) { - std::string errorMessage; - if (!this->IsError(t, errorMessage)) + std::string errorMessage; + if (!this->IsError(t, errorMessage)) { - auto jsonBody = t.get(); - request.reply(status_codes::OK, jsonBody).then([this](auto t) { this->IsError(t); }); - } - else - { - request.reply(status_codes::InternalError, errorMessage).then([this](auto t) { this->IsError(t); }); - } - }); - } - else - { - Logger::Warn("Unable to find the method {0}", methodName.c_str()); - request.reply(status_codes::NotFound, "").then([this](auto t) { this->IsError(t); }); - } -} - -json::value RemoteCommunicator::StartJobAndTask(const json::value& val, const std::string& callbackUri) + auto jsonBody = t.get(); + request.reply(status_codes::OK, jsonBody).then([this](auto t) { this->IsError(t); }); + } + else + { + request.reply(status_codes::InternalError, errorMessage).then([this](auto t) { this->IsError(t); }); + } + }); + } + else + { + Logger::Warn("Unable to find the method {0}", methodName.c_str()); + request.reply(status_codes::NotFound, "").then([this](auto t) { this->IsError(t); }); + } +} + +json::value RemoteCommunicator::StartJobAndTask(const json::value& val, const std::string& callbackUri) { - auto args = StartJobAndTaskArgs::FromJson(val); - return this->executor.StartJobAndTask(std::move(args), callbackUri); -} - -json::value RemoteCommunicator::StartTask(const json::value& val, const std::string& callbackUri) -{ - auto args = StartTaskArgs::FromJson(val); - return this->executor.StartTask(std::move(args), callbackUri); -} - -json::value RemoteCommunicator::EndJob(const json::value& val, const std::string& callbackUri) -{ - auto args = EndJobArgs::FromJson(val); - return this->executor.EndJob(std::move(args)); -} - -json::value RemoteCommunicator::EndTask(const json::value& val, const std::string& callbackUri) -{ - auto args = EndTaskArgs::FromJson(val); - return this->executor.EndTask(std::move(args), callbackUri); -} - -json::value RemoteCommunicator::Ping(const json::value& val, const std::string& callbackUri) -{ - return this->executor.Ping(callbackUri); -} - -json::value RemoteCommunicator::Metric(const json::value& val, const std::string& callbackUri) -{ - return this->executor.Metric(callbackUri); -} - -json::value RemoteCommunicator::MetricConfig(const json::value& val, const std::string& callbackUri) + auto args = StartJobAndTaskArgs::FromJson(val); + + json::value v; + std::string executionMessage; + int ret = this->filter.OnJobStart(args.JobId, val, v, executionMessage); + if (ret != 0) + { + throw new std::runtime_error(String::Join(" ", "StartJob filter fails with error code", ret, "execution message", executionMessage)); + return v; + } + else + { + return this->executor.StartJobAndTask(StartJobAndTaskArgs::FromJson(v), callbackUri); + } +} + +json::value RemoteCommunicator::StartTask(const json::value& val, const std::string& callbackUri) { - auto args = MetricCountersConfig::FromJson(val); - return this->executor.MetricConfig(std::move(args), callbackUri); -} - -const std::string RemoteCommunicator::ApiSpace = "api"; -const std::string RemoteCommunicator::CallbackUriKey = "CallbackURI"; - + auto args = StartTaskArgs::FromJson(val); + + json::value v; + std::string executionMessage; + int ret = this->filter.OnTaskStart(args.JobId, args.TaskId, args.StartInfo.TaskRequeueCount, val, v, executionMessage); + if (ret != 0) + { + throw new std::runtime_error(String::Join(" ", "StartTask filter fails with error code", ret, "execution message", executionMessage)); + return v; + } + else + { + return this->executor.StartTask(StartTaskArgs::FromJson(v), callbackUri); + } +} + +json::value RemoteCommunicator::EndJob(const json::value& val, const std::string& callbackUri) +{ + auto args = EndJobArgs::FromJson(val); + + json::value v; + std::string executionMessage; + int ret = this->filter.OnJobEnd(args.JobId, val, v, executionMessage); + if (ret != 0) + { + throw new std::runtime_error(String::Join(" ", "EndJob filter fails with error code", ret, "execution message", executionMessage)); + return v; + } + else + { + return this->executor.EndJob(EndJobArgs::FromJson(v)); + } +} + +json::value RemoteCommunicator::EndTask(const json::value& val, const std::string& callbackUri) +{ + auto args = EndTaskArgs::FromJson(val); + return this->executor.EndTask(std::move(args), callbackUri); +} + +json::value RemoteCommunicator::Ping(const json::value& val, const std::string& callbackUri) +{ + return this->executor.Ping(callbackUri); +} + +json::value RemoteCommunicator::Metric(const json::value& val, const std::string& callbackUri) +{ + return this->executor.Metric(callbackUri); +} + +json::value RemoteCommunicator::MetricConfig(const json::value& val, const std::string& callbackUri) +{ + auto args = MetricCountersConfig::FromJson(val); + return this->executor.MetricConfig(std::move(args), callbackUri); +} + +const std::string RemoteCommunicator::ApiSpace = "api"; +const std::string RemoteCommunicator::CallbackUriKey = "CallbackURI"; + diff --git a/nodemanager/core/RemoteCommunicator.h b/nodemanager/core/RemoteCommunicator.h index 7528af9..00f5b76 100644 --- a/nodemanager/core/RemoteCommunicator.h +++ b/nodemanager/core/RemoteCommunicator.h @@ -1,79 +1,82 @@ -#ifndef REMOTECOMMUNICATOR_H -#define REMOTECOMMUNICATOR_H - -#include -#include - -#include "../utils/Logger.h" -#include "IRemoteExecutor.h" - -namespace hpc -{ - using namespace utils; - using namespace web; - - namespace core - { - class RemoteCommunicator - { - public: - RemoteCommunicator(IRemoteExecutor& executor, const web::http::experimental::listener::http_listener_config& config); - ~RemoteCommunicator(); - - void Open(); - void Close(); - - protected: - private: - void HandlePost(web::http::http_request message); - void HandleGet(web::http::http_request message); - - template - static bool IsError(pplx::task& t, std::string& errorMessage) - { - try { t.wait(); return false; } - catch (const web::http::http_exception& httpEx) - { +#ifndef REMOTECOMMUNICATOR_H +#define REMOTECOMMUNICATOR_H + +#include +#include + +#include "../utils/Logger.h" +#include "../filters/ExecutionFilter.h" +#include "IRemoteExecutor.h" + +namespace hpc +{ + using namespace utils; + using namespace web; + using namespace filters; + + namespace core + { + class RemoteCommunicator + { + public: + RemoteCommunicator(IRemoteExecutor& executor, const web::http::experimental::listener::http_listener_config& config); + ~RemoteCommunicator(); + + void Open(); + void Close(); + + protected: + private: + void HandlePost(web::http::http_request message); + void HandleGet(web::http::http_request message); + + template + static bool IsError(pplx::task& t, std::string& errorMessage) + { + try { t.wait(); return false; } + catch (const web::http::http_exception& httpEx) + { Logger::Error("Http exception occurred: {0}", httpEx.what()); - errorMessage = httpEx.what(); - } - catch (const std::exception& ex) - { + errorMessage = httpEx.what(); + } + catch (const std::exception& ex) + { Logger::Error("Exception occurred: {0}", ex.what()); - errorMessage = ex.what(); - } - - return true; - } - - template - static bool IsError(pplx::task& t) - { + errorMessage = ex.what(); + } + + return true; + } + + template + static bool IsError(pplx::task& t) + { std::string errorMessage; - return IsError(t, errorMessage); - } - - json::value StartJobAndTask(const json::value& val, const std::string&); - json::value StartTask(const json::value& val, const std::string&); - json::value EndJob(const json::value& val, const std::string&); - json::value EndTask(const json::value& val, const std::string&); - json::value Ping(const json::value& val, const std::string&); - json::value Metric(const json::value& val, const std::string&); - json::value MetricConfig(const json::value& val, const std::string&); - - static const std::string ApiSpace; - static const std::string CallbackUriKey; - const std::string listeningUri; - - bool isListening; - - std::map> processors; - - IRemoteExecutor& executor; - - web::http::experimental::listener::http_listener listener; - }; - } -} - -#endif // REMOTECOMMUNICATOR_H + return IsError(t, errorMessage); + } + + json::value StartJobAndTask(const json::value& val, const std::string&); + json::value StartTask(const json::value& val, const std::string&); + json::value EndJob(const json::value& val, const std::string&); + json::value EndTask(const json::value& val, const std::string&); + json::value Ping(const json::value& val, const std::string&); + json::value Metric(const json::value& val, const std::string&); + json::value MetricConfig(const json::value& val, const std::string&); + + static const std::string ApiSpace; + static const std::string CallbackUriKey; + const std::string listeningUri; + + bool isListening; + + std::map> processors; + + IRemoteExecutor& executor; + + web::http::experimental::listener::http_listener listener; + ExecutionFilter filter; + }; + } +} + +#endif // REMOTECOMMUNICATOR_H diff --git a/nodemanager/core/RemoteExecutor.cpp b/nodemanager/core/RemoteExecutor.cpp index 9cc499a..f014bd0 100644 --- a/nodemanager/core/RemoteExecutor.cpp +++ b/nodemanager/core/RemoteExecutor.cpp @@ -234,12 +234,12 @@ json::value RemoteExecutor::StartTask(StartTaskArgs&& args, const std::string& c args.JobId, args.TaskId, taskInfo->GetTaskRequeueCount(), "StartTask for ProcessKey {0}, process count {1}", taskInfo->ProcessKey, this->processes.size()); - process->Start().then([this, taskInfo] (pid_t pid) + process->Start().then([this, taskInfo] (std::pair ids) { - if (pid > 0) + if (ids.first > 0) { Logger::Debug(taskInfo->JobId, taskInfo->TaskId, taskInfo->GetTaskRequeueCount(), - "Process started {0}", pid); + "Process started pid {0}, tid {1}", ids.first, ids.second); } }); } diff --git a/nodemanager/core/RemoteExecutor.h b/nodemanager/core/RemoteExecutor.h index 6c2c6ae..1d67445 100644 --- a/nodemanager/core/RemoteExecutor.h +++ b/nodemanager/core/RemoteExecutor.h @@ -1,70 +1,70 @@ -#ifndef REMOTEEXECUTOR_H -#define REMOTEEXECUTOR_H +#ifndef REMOTEEXECUTOR_H +#define REMOTEEXECUTOR_H #include #include - -#include "IRemoteExecutor.h" -#include "JobTaskTable.h" -#include "Monitor.h" -#include "Process.h" + +#include "IRemoteExecutor.h" +#include "JobTaskTable.h" +#include "Monitor.h" +#include "Process.h" #include "Reporter.h" -#include "HostsManager.h" -#include "../arguments/MetricCountersConfig.h" -#include "../data/ProcessStatistics.h" - -namespace hpc -{ - namespace core - { - class RemoteExecutor : public IRemoteExecutor - { - public: - RemoteExecutor(const std::string& networkName); - ~RemoteExecutor() { pthread_rwlock_destroy(&this->lock); } - - virtual web::json::value StartJobAndTask(hpc::arguments::StartJobAndTaskArgs&& args, const std::string& callbackUri); - virtual web::json::value StartTask(hpc::arguments::StartTaskArgs&& args, const std::string& callbackUri); - virtual web::json::value EndJob(hpc::arguments::EndJobArgs&& args); - virtual web::json::value EndTask(hpc::arguments::EndTaskArgs&& args, const std::string& callbackUri); - virtual web::json::value Ping(const std::string& callbackUri); - virtual web::json::value Metric(const std::string& callbackUri); - virtual web::json::value MetricConfig(hpc::arguments::MetricCountersConfig&& config, const std::string& callbackUri); - protected: +#include "HostsManager.h" +#include "../arguments/MetricCountersConfig.h" +#include "../data/ProcessStatistics.h" + +namespace hpc +{ + namespace core + { + class RemoteExecutor : public IRemoteExecutor + { + public: + RemoteExecutor(const std::string& networkName); + ~RemoteExecutor() { pthread_rwlock_destroy(&this->lock); } + + virtual web::json::value StartJobAndTask(hpc::arguments::StartJobAndTaskArgs&& args, const std::string& callbackUri); + virtual web::json::value StartTask(hpc::arguments::StartTaskArgs&& args, const std::string& callbackUri); + virtual web::json::value EndJob(hpc::arguments::EndJobArgs&& args); + virtual web::json::value EndTask(hpc::arguments::EndTaskArgs&& args, const std::string& callbackUri); + virtual web::json::value Ping(const std::string& callbackUri); + virtual web::json::value Metric(const std::string& callbackUri); + virtual web::json::value MetricConfig(hpc::arguments::MetricCountersConfig&& config, const std::string& callbackUri); + protected: private: static void* GracePeriodElapsed(void* data); void StartHeartbeat(const std::string& callbackUri); void StartMetric(const std::string& callbackUri); - void StartHostsManager(); - + void StartHostsManager(); + const hpc::data::ProcessStatistics* TerminateTask( int jobId, int taskId, int requeueCount, uint64_t processKey, int exitCode, bool forced); - + void ReportTaskCompletion(int jobId, int taskId, int taskRequeueCount, json::value jsonBody, const std::string& callbackUri); - const int UnknowId = 999; - const int NodeInfoReportInterval = 30; + const int UnknowId = 999; + const int NodeInfoReportInterval = 30; const int MetricReportInterval = 1; - const int RegisterInterval = 300; - const int DefaultHostsFetchInterval = 300; - const int MinHostsFetchInterval = 30; + const int RegisterInterval = 300; + const int DefaultHostsFetchInterval = 300; + const int MinHostsFetchInterval = 30; + + JobTaskTable jobTaskTable; + Monitor monitor; + + std::unique_ptr> nodeInfoReporter; + std::unique_ptr> registerReporter; + std::unique_ptr>> metricReporter; + std::unique_ptr hostsManager; - JobTaskTable jobTaskTable; - Monitor monitor; - - std::unique_ptr> nodeInfoReporter; - std::unique_ptr> registerReporter; - std::unique_ptr>> metricReporter; - std::unique_ptr hostsManager; - std::map> processes; std::map> jobUsers; std::map> userJobs; - pthread_rwlock_t lock; - }; - } -} - -#endif // REMOTEEXECUTOR_H + pthread_rwlock_t lock; + }; + } +} + +#endif // REMOTEEXECUTOR_H diff --git a/nodemanager/core/Reporter.cpp b/nodemanager/core/Reporter.cpp index 66fbfe4..d36c143 100644 --- a/nodemanager/core/Reporter.cpp +++ b/nodemanager/core/Reporter.cpp @@ -1,10 +1,10 @@ -#include - -#include "Reporter.h" -#include "../utils/Logger.h" - -using namespace hpc::core; -using namespace hpc::utils; -using namespace web::http::client; -using namespace web::http; +#include + +#include "Reporter.h" +#include "../utils/Logger.h" + +using namespace hpc::core; +using namespace hpc::utils; +using namespace web::http::client; +using namespace web::http; diff --git a/nodemanager/core/Reporter.h b/nodemanager/core/Reporter.h index 0abb267..a7ea605 100644 --- a/nodemanager/core/Reporter.h +++ b/nodemanager/core/Reporter.h @@ -1,89 +1,89 @@ -#ifndef REPORTER_H -#define REPORTER_H - -#include +#ifndef REPORTER_H +#define REPORTER_H + +#include #include - -#include "../utils/Logger.h" - -using namespace hpc::utils; - -namespace hpc -{ - namespace core + +#include "../utils/Logger.h" + +using namespace hpc::utils; + +namespace hpc +{ + namespace core { - template - class Reporter - { - public: + template + class Reporter + { + public: Reporter(const std::string& uri, int hold, int interval, std::function fetcher) - : reportUri(uri), valueFetcher(fetcher), intervalSeconds(interval), holdSeconds(hold) - { + : reportUri(uri), valueFetcher(fetcher), intervalSeconds(interval), holdSeconds(hold) + { } void Start() { - if (!this->reportUri.empty()) - { - pthread_create(&this->threadId, nullptr, ReportingThread, this); - } + if (!this->reportUri.empty()) + { + pthread_create(&this->threadId, nullptr, ReportingThread, this); + } } void Stop() { - this->isRunning = false; - if (this->threadId != 0) + this->isRunning = false; + if (this->threadId != 0) { - while (this->inRequest) usleep(1); - pthread_cancel(this->threadId); - pthread_join(this->threadId, nullptr); - Logger::Debug("Destructed Reporter {0}", this->reportUri); + while (this->inRequest) usleep(1); + pthread_cancel(this->threadId); + pthread_join(this->threadId, nullptr); + Logger::Debug("Destructed Reporter {0}", this->reportUri); } - } - + } + virtual ~Reporter() { - Logger::Debug("Destruct Reporter {0}", this->reportUri); - } - + Logger::Debug("Destruct Reporter {0}", this->reportUri); + } + virtual void Report() = 0; - - protected: - const std::string reportUri; - std::function valueFetcher; + + protected: + const std::string reportUri; + std::function valueFetcher; int intervalSeconds; - private: + private: static void* ReportingThread(void* arg) - { - pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, nullptr); - pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr); + { + pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, nullptr); + pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, nullptr); - Reporter* r = static_cast(arg); + Reporter* r = static_cast(arg); sleep(r->holdSeconds); - - while (r->isRunning) + + while (r->isRunning) { - if (!r->reportUri.empty()) + if (!r->reportUri.empty()) { r->inRequest = true; r->Report(); r->inRequest = false; - } - - sleep(r->intervalSeconds); - } + } + + sleep(r->intervalSeconds); + } + + pthread_exit(nullptr); + } + + int holdSeconds; - pthread_exit(nullptr); - } - - int holdSeconds; - pthread_t threadId = 0; bool isRunning = true; - bool inRequest = false; - }; - } -} - -#endif // REPORTER_H + bool inRequest = false; + }; + } +} + +#endif // REPORTER_H diff --git a/nodemanager/data/JobInfo.cpp b/nodemanager/data/JobInfo.cpp index 95e28ed..f7998ba 100644 --- a/nodemanager/data/JobInfo.cpp +++ b/nodemanager/data/JobInfo.cpp @@ -1,19 +1,19 @@ -#include "JobInfo.h" - -using namespace web; -using namespace hpc::data; - -json::value JobInfo::ToJson() const -{ - json::value j; - - j["JobId"] = this->JobId; - - std::vector tasks; - - std::transform(this->Tasks.cbegin(), this->Tasks.cend(), std::back_inserter(tasks), [](auto i) { return i.second->ToJson(); }); - - j["Tasks"] = json::value::array(tasks); - - return std::move(j); -} +#include "JobInfo.h" + +using namespace web; +using namespace hpc::data; + +json::value JobInfo::ToJson() const +{ + json::value j; + + j["JobId"] = this->JobId; + + std::vector tasks; + + std::transform(this->Tasks.cbegin(), this->Tasks.cend(), std::back_inserter(tasks), [](auto i) { return i.second->ToJson(); }); + + j["Tasks"] = json::value::array(tasks); + + return std::move(j); +} diff --git a/nodemanager/data/NodeInfo.cpp b/nodemanager/data/NodeInfo.cpp index 9c0cf65..a5b8916 100644 --- a/nodemanager/data/NodeInfo.cpp +++ b/nodemanager/data/NodeInfo.cpp @@ -1,29 +1,29 @@ -#include "NodeInfo.h" -#include "../utils/System.h" - -using namespace web; -using namespace hpc::data; -using namespace hpc::utils; - -NodeInfo::NodeInfo() -{ - this->Name = System::GetNodeName(); -} - -json::value NodeInfo::ToJson() -{ - json::value j; - j["Availability"] = json::value::number((int)this->Availability); - j["JustStarted"] = this->JustStarted; - j["MacAddress"] = json::value::string(this->MacAddress); - j["Name"] = json::value::string(this->Name); - - std::vector jobs; - - std::transform(this->Jobs.cbegin(), this->Jobs.cend(), std::back_inserter(jobs), [](auto i) { return i.second->ToJson(); }); - - j["Jobs"] = json::value::array(jobs); - - this->JustStarted = false; - return std::move(j); -} +#include "NodeInfo.h" +#include "../utils/System.h" + +using namespace web; +using namespace hpc::data; +using namespace hpc::utils; + +NodeInfo::NodeInfo() +{ + this->Name = System::GetNodeName(); +} + +json::value NodeInfo::ToJson() +{ + json::value j; + j["Availability"] = json::value::number((int)this->Availability); + j["JustStarted"] = this->JustStarted; + j["MacAddress"] = json::value::string(this->MacAddress); + j["Name"] = json::value::string(this->Name); + + std::vector jobs; + + std::transform(this->Jobs.cbegin(), this->Jobs.cend(), std::back_inserter(jobs), [](auto i) { return i.second->ToJson(); }); + + j["Jobs"] = json::value::array(jobs); + + this->JustStarted = false; + return std::move(j); +} diff --git a/nodemanager/data/NodeInfo.h b/nodemanager/data/NodeInfo.h index 04915cf..8889491 100644 --- a/nodemanager/data/NodeInfo.h +++ b/nodemanager/data/NodeInfo.h @@ -1,40 +1,40 @@ -#ifndef NODEINFO_H -#define NODEINFO_H - -#include -#include -#include - -#include "JobInfo.h" - -namespace hpc -{ - namespace data - { - enum class NodeAvailability - { - AlwaysOn = 0, - Available = 1, - Occupied = 2 - }; - - struct NodeInfo - { - public: - NodeInfo(); - - web::json::value ToJson(); - - NodeAvailability Availability; - bool JustStarted = true; - std::string MacAddress; - std::string Name; - - std::map> Jobs; - protected: - private: - }; - } -} - -#endif // NODEINFO_H +#ifndef NODEINFO_H +#define NODEINFO_H + +#include +#include +#include + +#include "JobInfo.h" + +namespace hpc +{ + namespace data + { + enum class NodeAvailability + { + AlwaysOn = 0, + Available = 1, + Occupied = 2 + }; + + struct NodeInfo + { + public: + NodeInfo(); + + web::json::value ToJson(); + + NodeAvailability Availability; + bool JustStarted = true; + std::string MacAddress; + std::string Name; + + std::map> Jobs; + protected: + private: + }; + } +} + +#endif // NODEINFO_H diff --git a/nodemanager/data/OutputData.cpp b/nodemanager/data/OutputData.cpp index 17fbf68..a9dc546 100644 --- a/nodemanager/data/OutputData.cpp +++ b/nodemanager/data/OutputData.cpp @@ -1,12 +1,12 @@ #include "OutputData.h" -json::value OutputData::ToJson() const -{ - json::value j; - j["NodeName"] = json::value::string(this->NodeName); +json::value OutputData::ToJson() const +{ + json::value j; + j["NodeName"] = json::value::string(this->NodeName); j["Order"] = this->Order; j["Content"] = json::value::string(this->Content); - j["Eof"] = this->Eof; + j["Eof"] = this->Eof; - return std::move(j); -} + return std::move(j); +} diff --git a/nodemanager/data/OutputData.h b/nodemanager/data/OutputData.h index db27ee3..97f2f32 100644 --- a/nodemanager/data/OutputData.h +++ b/nodemanager/data/OutputData.h @@ -1,7 +1,7 @@ #ifndef OUTPUTDATA_H #define OUTPUTDATA_H -#include +#include #include using namespace web; @@ -14,7 +14,7 @@ class OutputData { } - json::value ToJson() const; + json::value ToJson() const; std::string NodeName; int Order; diff --git a/nodemanager/data/TaskInfo.cpp b/nodemanager/data/TaskInfo.cpp index d63eb5c..0a3d174 100644 --- a/nodemanager/data/TaskInfo.cpp +++ b/nodemanager/data/TaskInfo.cpp @@ -1,40 +1,40 @@ -#include "TaskInfo.h" -#include "../utils/JsonHelper.h" -#include "../utils/String.h" - -using namespace web; -using namespace hpc::data; -using namespace hpc::utils; - -json::value TaskInfo::ToJson() const -{ - json::value j; - - j["TaskId"] = this->TaskId; - j["TaskRequeueCount"] = this->taskRequeueCount; - j["ExitCode"] = this->ExitCode; - j["Exited"] = this->Exited; - j["KernelProcessorTime"] = this->KernelProcessorTimeMs; - j["UserProcessorTime"] = this->UserProcessorTimeMs; - j["WorkingSet"] = this->WorkingSetKb; - j["NumberOfProcesses"] = this->GetProcessCount(); - j["PrimaryTask"] = this->IsPrimaryTask; - j["Message"] = JsonHelper::ToJson(this->Message); +#include "TaskInfo.h" +#include "../utils/JsonHelper.h" +#include "../utils/String.h" + +using namespace web; +using namespace hpc::data; +using namespace hpc::utils; + +json::value TaskInfo::ToJson() const +{ + json::value j; + + j["TaskId"] = this->TaskId; + j["TaskRequeueCount"] = this->taskRequeueCount; + j["ExitCode"] = this->ExitCode; + j["Exited"] = this->Exited; + j["KernelProcessorTime"] = this->KernelProcessorTimeMs; + j["UserProcessorTime"] = this->UserProcessorTimeMs; + j["WorkingSet"] = this->WorkingSetKb; + j["NumberOfProcesses"] = this->GetProcessCount(); + j["PrimaryTask"] = this->IsPrimaryTask; + j["Message"] = JsonHelper::ToJson(this->Message); j["ProcessIds"] = JsonHelper::ToJson(String::Join<','>(this->ProcessIds)); - return j; -} - -json::value TaskInfo::ToCompletionEventArgJson() const -{ - json::value j = this->ToJson(); - - json::value jobIdArg; - jobIdArg["JobId"] = this->JobId; - jobIdArg["TaskInfo"] = j; - jobIdArg["NodeName"] = json::value::string(this->NodeName); - - return jobIdArg; + return j; +} + +json::value TaskInfo::ToCompletionEventArgJson() const +{ + json::value j = this->ToJson(); + + json::value jobIdArg; + jobIdArg["JobId"] = this->JobId; + jobIdArg["TaskInfo"] = j; + jobIdArg["NodeName"] = json::value::string(this->NodeName); + + return jobIdArg; } void TaskInfo::AssignFromStat(const ProcessStatistics& stat) @@ -43,4 +43,4 @@ void TaskInfo::AssignFromStat(const ProcessStatistics& stat) this->UserProcessorTimeMs = stat.UserTimeMs; this->ProcessIds = stat.ProcessIds; this->WorkingSetKb = stat.WorkingSetKb; -} +} diff --git a/nodemanager/data/TaskInfo.h b/nodemanager/data/TaskInfo.h index 7ee53d5..b9e3285 100644 --- a/nodemanager/data/TaskInfo.h +++ b/nodemanager/data/TaskInfo.h @@ -32,8 +32,8 @@ namespace hpc { if (this->GracefulThreadId) { - pthread_cancel(this->GracefulThreadId); - pthread_join(this->GracefulThreadId, nullptr); + pthread_cancel(this->GracefulThreadId); + pthread_join(this->GracefulThreadId, nullptr); Logger::Debug("Destructed TaskInfo GracefulThread {0}", this->GracefulThreadId); this->GracefulThreadId = 0; } diff --git a/nodemanager/filters/ExecutionFilter.cpp b/nodemanager/filters/ExecutionFilter.cpp new file mode 100644 index 0000000..ecdb523 --- /dev/null +++ b/nodemanager/filters/ExecutionFilter.cpp @@ -0,0 +1,122 @@ +#include "ExecutionFilter.h" +#include "../utils/Logger.h" +#include "../common/ErrorCodes.h" +#include "../utils/System.h" +#include "../core/Process.h" +#include "../data/ProcessStatistics.h" + +using namespace hpc::filters; +using namespace hpc::utils; +using namespace hpc::common; +using namespace hpc::data; + +int ExecutionFilter::OnJobStart(int jobId, const json::value& input, json::value& output, std::string& executionMessage) +{ + return this->ExecuteFilter(JobStartFilter, jobId, 0, 0, input, output, executionMessage); +} + +int ExecutionFilter::OnJobEnd(int jobId, const json::value& input, json::value& output, std::string& executionMessage) +{ + return this->ExecuteFilter(JobEndFilter, jobId, 0, 0, input, output, executionMessage); +} + +int ExecutionFilter::OnTaskStart(int jobId, int taskId, int requeueCount, const json::value& input, json::value& output, std::string& executionMessage) +{ + return this->ExecuteFilter(TaskStartFilter, jobId, taskId, requeueCount, input, output, executionMessage); +} + +int ExecutionFilter::ExecuteFilter(const std::string& filterType, int jobId, int taskId, int requeueCount, const json::value& input, json::value& output, std::string& executionMessage) +{ + auto filterIt = this->filterFiles.find(filterType); + if (filterIt == this->filterFiles.end()) + { + Logger::Error(jobId, taskId, requeueCount, "Unknown filter type {0}", filterType); + return (int)ErrorCodes::UnknownFilter; + } + + std::string filterFile = filterIt->second; + + if (filterFile.empty()) + { + Logger::Error(jobId, taskId, requeueCount, "{0} not detected, skip", filterType); + output = input; + return 0; + } + + char folder[256]; + sprintf(folder, "/tmp/nodemanager_executionfilter_%d.XXXXXX", jobId); + int ret = System::CreateTempFolder(folder, "root"); + + if (ret != 0) + { + Logger::Error(jobId, taskId, requeueCount, "{0} {1}: Failed to create folder {2}, exit code {3}", filterType, filterFile, folder, ret); + return ret; + } + + std::string folderString = folder; + + std::string stdinFile = folderString + "/stdin.txt"; + ret = System::WriteStringToFile(stdinFile, input.serialize()); + + if (ret != 0) + { + Logger::Error(jobId, taskId, requeueCount, "{0} {1}: Failed to create stdin file {2}, exit code {3}", filterType, filterFile, stdinFile, ret); + return ret; + } + + std::string stdoutFile = folderString + "/stdout.txt"; + std::string stderrFile = stdoutFile; + + Process p( + jobId, taskId, requeueCount, filterFile, stdoutFile, stderrFile, stdinFile, folderString, "root", + std::vector(), std::map(), + [&ret, &executionMessage, jobId, taskId, requeueCount, &filterType, &filterFile] + (int exitCode, std::string&& message, const ProcessStatistics& stat) + { + ret = exitCode; + if (ret != 0) + { + Logger::Error(jobId, taskId, requeueCount, "{0} {1}: returned {2}, message {3}", filterType, filterFile, ret, message); + } + else + { + Logger::Info(jobId, taskId, requeueCount, "{0} {1}: success with message {2}", filterType, filterFile, message); + } + + executionMessage = message; + }); + + pthread_t processTid; + p.Start().then([&ret, &processTid, jobId, taskId, requeueCount, &filterType, &filterFile] (std::pair ids) + { + Logger::Info(jobId, taskId, requeueCount, "{0} {1}: pid {2} tid {3}", filterType, filterFile, ids.first, ids.second); + processTid = ids.second; + }).wait(); + + int joinRet = pthread_join(processTid, nullptr); + + if (joinRet != 0) + { + Logger::Error(jobId, taskId, requeueCount, "{0} {1}: join thread {2} ret {3}", filterType, filterFile, processTid, joinRet); + if (0 == ret) { ret = joinRet; } + } + + if (0 == ret) + { + std::ifstream fsStdout(stdoutFile, std::ios::in); + + if (fsStdout) + { + std::string content((std::istreambuf_iterator(fsStdout)), std::istreambuf_iterator()); + Logger::Info(jobId, taskId, requeueCount, "{0} {1}: plugin output {2}", filterType, filterFile, content); + output = json::value::parse(content); + fsStdout.close(); + } + else + { + return (int)ErrorCodes::ReadFileError; + } + } + + return ret; +} diff --git a/nodemanager/filters/ExecutionFilter.h b/nodemanager/filters/ExecutionFilter.h new file mode 100644 index 0000000..18ad645 --- /dev/null +++ b/nodemanager/filters/ExecutionFilter.h @@ -0,0 +1,38 @@ +#ifndef EXECUTIONFILTER_H +#define EXECUTIONFILTER_H + +#include +#include +#include "../core/NodeManagerConfig.h" + +namespace hpc +{ + namespace filters + { + using namespace hpc::core; + + class ExecutionFilter + { + public: + ExecutionFilter() + { + filterFiles[JobStartFilter] = NodeManagerConfig::GetJobStartFilter(); + filterFiles[JobEndFilter] = NodeManagerConfig::GetJobEndFilter(); + filterFiles[TaskStartFilter] = NodeManagerConfig::GetTaskStartFilter(); + } + + int OnJobStart(int jobId, const json::value& input, json::value& output, std::string& executionMessage); + int OnJobEnd(int jobId, const json::value& input, json::value& output, std::string& executionMessage); + int OnTaskStart(int jobId, int taskId, int requeueCount, const json::value& input, json::value& output, std::string& executionMessage); + int ExecuteFilter(const std::string& filterType, int jobId, int taskId, int requeueCount, const json::value& input, json::value& output, std::string& executionMessage); + + private: + std::map filterFiles; + const std::string JobStartFilter = "JobStartFilter"; + const std::string JobEndFilter = "JobEndFilter"; + const std::string TaskStartFilter = "TaskStartFilter"; + }; + } +} + +#endif diff --git a/nodemanager/main.cpp b/nodemanager/main.cpp index 296880f..fcd9983 100644 --- a/nodemanager/main.cpp +++ b/nodemanager/main.cpp @@ -1,13 +1,13 @@ #include -#include -#include -#include - -#include "utils/Logger.h" -#include "core/RemoteCommunicator.h" +#include +#include +#include + +#include "utils/Logger.h" +#include "core/RemoteCommunicator.h" #include "core/RemoteExecutor.h" #include "Version.h" -#include "core/NodeManagerConfig.h" +#include "core/NodeManagerConfig.h" #include "common/ErrorCodes.h" #include "core/HttpHelper.h" @@ -15,12 +15,12 @@ #include "test/TestRunner.h" using namespace hpc::tests; -#endif // DEBUG - -using namespace std; -using namespace hpc::core; +#endif // DEBUG + +using namespace std; +using namespace hpc::core; using namespace hpc::utils; -using namespace hpc; +using namespace hpc; using namespace hpc::common; using namespace web::http::experimental::listener; @@ -28,9 +28,9 @@ void Cleanup() { Logger::Info("Cleaning up zombie processes"); Process::Cleanup(); -} +} -int main(int argc, char* argv[]) +int main(int argc, char* argv[]) { if (argc > 1) { @@ -40,8 +40,8 @@ int main(int argc, char* argv[]) return 0; } } - - std::cout << "Node manager started." << std::endl; + + std::cout << "Node manager started." << std::endl; Logger::Info("Log system works."); Logger::Info("Version: {0}", Version::GetVersion()); @@ -60,7 +60,7 @@ int main(int argc, char* argv[]) ret); return ret; - } + } #ifdef DEBUG @@ -79,27 +79,27 @@ int main(int argc, char* argv[]) Cleanup(); Logger::Debug( - "Trusted CA File: {0}", + "Trusted CA File: {0}", NodeManagerConfig::GetTrustedCAFile()); - const std::string networkName = ""; - RemoteExecutor executor(networkName); + const std::string networkName = ""; + RemoteExecutor executor(networkName); http_listener_config config; config.set_ssl_context_callback([] (auto& ctx) { HttpHelper::ConfigListenerSslContext(ctx); }); - - RemoteCommunicator rc(executor, config); - rc.Open(); - - while (true) - { - sleep(100); - } - - // rc.Close(); - - return 0; -} + + RemoteCommunicator rc(executor, config); + rc.Open(); + + while (true) + { + sleep(100); + } + + // rc.Close(); + + return 0; +} diff --git a/nodemanager/makefile b/nodemanager/makefile index a4dda5d..7772809 100644 --- a/nodemanager/makefile +++ b/nodemanager/makefile @@ -1,211 +1,211 @@ -# node manager makefile. - -# casablanca -CASABLANCA = ../../evancasa/cpprestsdk -CASA_RELEASE = $(CASABLANCA)/Release -CASA_INC = $(CASA_RELEASE)/include -CASA_LIB_DEBUG = $(CASA_RELEASE)/build.debug/Binaries -CASA_LIB_RELEASE = $(CASA_RELEASE)/build.release/Binaries - -# spdlog -SPDLOG = ../../spdlog -SPDLOG_INC = $(SPDLOG)/include - -# tools -CC = gcc -CXX = g++ -AR = ar -LD = g++ -WINDRES = windres - -# common -LIBOUTDIR = lib -INSTALLDIR = /opt/hpcnodemanager -INC = -I$(CASA_INC) -I$(SPDLOG_INC) -CFLAGS = -Wall -std=c++14 -Wno-unused-local-typedefs -LIB = -lcpprest -lpthread -lboost_system -lssl -lcrypto -LDFLAGS = -Wl,-rpath,\$$ORIGIN/$(LIBOUTDIR),-I$(INSTALLDIR)/$(LIBOUTDIR)/ld-linux-x86-64.so.2 -BINARY = nodemanager -DEBUG = debug -RELEASE = release -OBJDIR = obj -OUTDIR = bin -CPPSWITHDIR = $(wildcard */*.cpp *.cpp) -CPPS = $(notdir $(CPPSWITHDIR)) -OBJS = $(CPPS:.cpp=.o) - -# debug -MACRO_DEBUG = -D DEBUG -INC_DEBUG = $(INC) -CFLAGS_DEBUG = $(CFLAGS) -g -LIBDIR_DEBUG = -L$(CASA_LIB_DEBUG) -LIB_DEBUG = $(LIB) -LDFLAGS_DEBUG = $(LDFLAGS) -OBJDIR_DEBUG = $(OBJDIR)/$(DEBUG) -OBJDIRSED_DEBUG = $(OBJDIR)\/$(DEBUG) -OUTDIR_DEBUG = $(OUTDIR)/$(DEBUG) -OUT_DEBUG = $(OUTDIR_DEBUG)/$(BINARY) -OBJS_DEBUG = $(addprefix $(OBJDIR_DEBUG)/, $(OBJS)) - -# release -MACRO_RELEASE = -INC_RELEASE = $(INC) -CFLAGS_RELEASE = $(CFLAGS) -O2 -LIBDIR_RELEASE = -L$(CASA_LIB_RELEASE) -LIB_RELEASE = $(LIB) -LDFLAGS_RELEASE = $(LDFLAGS) -s -OBJDIR_RELEASE = $(OBJDIR)/$(RELEASE) -OBJDIRSED_RELEASE = $(OBJDIR)\/$(RELEASE) -OUTDIR_RELEASE = $(OUTDIR)/$(RELEASE) -OUT_RELEASE = $(OUTDIR_RELEASE)/$(BINARY) -OBJS_RELEASE = $(addprefix $(OBJDIR_RELEASE)/, $(OBJS)) - -# pseudo targets -all: debug release -rebuild: clean all -clean: clean_debug clean_release -release: before_release out_release after_release -debug: before_debug out_debug after_debug -redebug: clean_debug debug -rerelease: clean_release release - -# color output -define compile - $(indent); - @tput bold; - @echo -n "Compiling [ "; - @tput setaf 6; - @echo -n $@; - @tput setaf 7; - @echo -n " ] using [ "; - @tput setaf 5; - @echo -n $<; - @tput setaf 7; - @echo " ]"; - @tput sgr0 - @$(CXX) $(1) $(2) $(3) -c $< -o $@ -endef - -define building - $(indent); - @tput bold; - @echo -n "Building [ "; - @tput setaf 6; - @echo -n $(1); - @tput setaf 7; - @echo " ]"; - @tput sgr0 -endef - -indent=@printf '\t' - -define prepare - @echo "" - @tput bold; - @echo -n "Preparing to build [ "; - @tput setaf 2; - @echo -n $(1); - @tput setaf 7; - @echo " ] target"; - @tput sgr0; - $(indent); - [ -d $(2) ] || mkdir -p $(2) - $(indent); - [ -d $(3) ] || mkdir -p $(3) - @echo "" - @tput bold; - @echo -n "Start to build [ "; - @tput setaf 2; - @echo -n $(1); - @tput setaf 7; - @echo " ] target"; - @tput sgr0; -endef - -define finish - $(indent); - cp config/* $(2)/ - $(indent); - cp scripts/* $(2)/ - $(indent); - [ -d $(2)/logs ] || mkdir $(2)/logs - $(indent); - [ -d $(2)/$(LIBOUTDIR) ] || mkdir $(2)/$(LIBOUTDIR) - $(indent); - cp $(shell ldd $(3) | cut -f3 -d' ') $(2)/$(LIBOUTDIR) 2> /dev/null || : - @tput bold; - @echo -n "Finished to build [ "; - @tput setaf 2; - @echo -n $(1); - @tput setaf 7; - @echo " ] target"; - @tput sgr0; -endef - -# debug targets -before_debug: - $(call prepare,$(DEBUG),$(OUTDIR_DEBUG),$(OBJDIR_DEBUG)) - -after_debug: - $(call finish,$(DEBUG),$(OUTDIR_DEBUG),$(OUT_DEBUG)) - -out_debug: before_debug $(OUT_DEBUG) - -$(OUT_DEBUG): $(OBJS_DEBUG) - $(call building,$@) - $(indent); - $(LD) $(LIBDIR_DEBUG) -o $@ $(OBJDIR_DEBUG)/*.o $(LDFLAGS_DEBUG) $(LIB_DEBUG) - -clean_debug: - rm -rf $(OUTDIR_DEBUG) - rm -rf $(OBJDIR_DEBUG) - -$(OBJDIR_DEBUG)/%.o: */%.cpp - $(call compile,$(CFLAGS_DEBUG),$(INC_DEBUG),$(MACRO_DEBUG)) - -$(OBJDIR_DEBUG)/%.o: %.cpp - $(call compile,$(CFLAGS_DEBUG),$(INC_DEBUG),$(MACRO_DEBUG)) - -# release targets -before_release: - $(call prepare,$(RELEASE),$(OUTDIR_RELEASE),$(OBJDIR_RELEASE)) - -after_release: - $(call finish,$(RELEASE),$(OUTDIR_RELEASE),$(OUT_RELEASE)) - -out_release: before_release $(OUT_RELEASE) - -$(OUT_RELEASE): $(OBJS_RELEASE) - $(call building,$@) - $(indent); - $(LD) $(LIBDIR_RELEASE) -o $@ $(OBJDIR_RELEASE)/*.o $(LDFLAGS_RELEASE) $(LIB_RELEASE) - -clean_release: - rm -rf $(OUTDIR_RELEASE) - rm -rf $(OBJDIR_RELEASE) - -$(OBJDIR_RELEASE)/%.o: */%.cpp - $(call compile,$(CFLAGS_RELEASE),$(INC_RELEASE),$(MACRO_RELEASE)) - -$(OBJDIR_RELEASE)/%.o: %.cpp - $(call compile,$(CFLAGS_RELEASE),$(INC_RELEASE),$(MACRO_RELEASE)) - -# phony targets -.PHONY: before_debug after_debug clean_debug before_release after_release clean_release - -# dependency -include deps -deps: $(CPPSWITHDIR) - @tput bold - @echo "Rebuilding dependencies ... " - @tput sgr0 - $(indent) - $(CXX) $(CFLAGS) -MM *.cpp */*.cpp > rawdeps - $(indent) - cat rawdeps | sed -e "s/\(.*\)\.o/$(OBJDIRSED_DEBUG)\/\1\.o/" > deps - $(indent) - cat rawdeps | sed -e "s/\(.*\)\.o/$(OBJDIRSED_RELEASE)\/\1\.o/" >> deps - @tput setaf 2 - @tput bold - @echo "Done." - @tput sgr0; +# node manager makefile. + +# casablanca +CASABLANCA = ../../evancasa/cpprestsdk +CASA_RELEASE = $(CASABLANCA)/Release +CASA_INC = $(CASA_RELEASE)/include +CASA_LIB_DEBUG = $(CASA_RELEASE)/build.debug/Binaries +CASA_LIB_RELEASE = $(CASA_RELEASE)/build.release/Binaries + +# spdlog +SPDLOG = ../../spdlog +SPDLOG_INC = $(SPDLOG)/include + +# tools +CC = gcc +CXX = g++ +AR = ar +LD = g++ +WINDRES = windres + +# common +LIBOUTDIR = lib +INSTALLDIR = /opt/hpcnodemanager +INC = -I$(CASA_INC) -I$(SPDLOG_INC) +CFLAGS = -Wall -std=c++14 -Wno-unused-local-typedefs +LIB = -lcpprest -lpthread -lboost_system -lssl -lcrypto +LDFLAGS = -Wl,-rpath,\$$ORIGIN/$(LIBOUTDIR),-I$(INSTALLDIR)/$(LIBOUTDIR)/ld-linux-x86-64.so.2 +BINARY = nodemanager +DEBUG = debug +RELEASE = release +OBJDIR = obj +OUTDIR = bin +CPPSWITHDIR = $(wildcard */*.cpp *.cpp) +CPPS = $(notdir $(CPPSWITHDIR)) +OBJS = $(CPPS:.cpp=.o) + +# debug +MACRO_DEBUG = -D DEBUG +INC_DEBUG = $(INC) +CFLAGS_DEBUG = $(CFLAGS) -g +LIBDIR_DEBUG = -L$(CASA_LIB_DEBUG) +LIB_DEBUG = $(LIB) +LDFLAGS_DEBUG = $(LDFLAGS) +OBJDIR_DEBUG = $(OBJDIR)/$(DEBUG) +OBJDIRSED_DEBUG = $(OBJDIR)\/$(DEBUG) +OUTDIR_DEBUG = $(OUTDIR)/$(DEBUG) +OUT_DEBUG = $(OUTDIR_DEBUG)/$(BINARY) +OBJS_DEBUG = $(addprefix $(OBJDIR_DEBUG)/, $(OBJS)) + +# release +MACRO_RELEASE = +INC_RELEASE = $(INC) +CFLAGS_RELEASE = $(CFLAGS) -O2 +LIBDIR_RELEASE = -L$(CASA_LIB_RELEASE) +LIB_RELEASE = $(LIB) +LDFLAGS_RELEASE = $(LDFLAGS) -s +OBJDIR_RELEASE = $(OBJDIR)/$(RELEASE) +OBJDIRSED_RELEASE = $(OBJDIR)\/$(RELEASE) +OUTDIR_RELEASE = $(OUTDIR)/$(RELEASE) +OUT_RELEASE = $(OUTDIR_RELEASE)/$(BINARY) +OBJS_RELEASE = $(addprefix $(OBJDIR_RELEASE)/, $(OBJS)) + +# pseudo targets +all: debug release +rebuild: clean all +clean: clean_debug clean_release +release: before_release out_release after_release +debug: before_debug out_debug after_debug +redebug: clean_debug debug +rerelease: clean_release release + +# color output +define compile + $(indent); + @tput bold; + @echo -n "Compiling [ "; + @tput setaf 6; + @echo -n $@; + @tput setaf 7; + @echo -n " ] using [ "; + @tput setaf 5; + @echo -n $<; + @tput setaf 7; + @echo " ]"; + @tput sgr0 + @$(CXX) $(1) $(2) $(3) -c $< -o $@ +endef + +define building + $(indent); + @tput bold; + @echo -n "Building [ "; + @tput setaf 6; + @echo -n $(1); + @tput setaf 7; + @echo " ]"; + @tput sgr0 +endef + +indent=@printf '\t' + +define prepare + @echo "" + @tput bold; + @echo -n "Preparing to build [ "; + @tput setaf 2; + @echo -n $(1); + @tput setaf 7; + @echo " ] target"; + @tput sgr0; + $(indent); + [ -d $(2) ] || mkdir -p $(2) + $(indent); + [ -d $(3) ] || mkdir -p $(3) + @echo "" + @tput bold; + @echo -n "Start to build [ "; + @tput setaf 2; + @echo -n $(1); + @tput setaf 7; + @echo " ] target"; + @tput sgr0; +endef + +define finish + $(indent); + cp config/* $(2)/ + $(indent); + cp scripts/* $(2)/ + $(indent); + [ -d $(2)/logs ] || mkdir $(2)/logs + $(indent); + [ -d $(2)/$(LIBOUTDIR) ] || mkdir $(2)/$(LIBOUTDIR) + $(indent); + cp $(shell ldd $(3) | cut -f3 -d' ') $(2)/$(LIBOUTDIR) 2> /dev/null || : + @tput bold; + @echo -n "Finished to build [ "; + @tput setaf 2; + @echo -n $(1); + @tput setaf 7; + @echo " ] target"; + @tput sgr0; +endef + +# debug targets +before_debug: + $(call prepare,$(DEBUG),$(OUTDIR_DEBUG),$(OBJDIR_DEBUG)) + +after_debug: + $(call finish,$(DEBUG),$(OUTDIR_DEBUG),$(OUT_DEBUG)) + +out_debug: before_debug $(OUT_DEBUG) + +$(OUT_DEBUG): $(OBJS_DEBUG) + $(call building,$@) + $(indent); + $(LD) $(LIBDIR_DEBUG) -o $@ $(OBJDIR_DEBUG)/*.o $(LDFLAGS_DEBUG) $(LIB_DEBUG) + +clean_debug: + rm -rf $(OUTDIR_DEBUG) + rm -rf $(OBJDIR_DEBUG) + +$(OBJDIR_DEBUG)/%.o: */%.cpp + $(call compile,$(CFLAGS_DEBUG),$(INC_DEBUG),$(MACRO_DEBUG)) + +$(OBJDIR_DEBUG)/%.o: %.cpp + $(call compile,$(CFLAGS_DEBUG),$(INC_DEBUG),$(MACRO_DEBUG)) + +# release targets +before_release: + $(call prepare,$(RELEASE),$(OUTDIR_RELEASE),$(OBJDIR_RELEASE)) + +after_release: + $(call finish,$(RELEASE),$(OUTDIR_RELEASE),$(OUT_RELEASE)) + +out_release: before_release $(OUT_RELEASE) + +$(OUT_RELEASE): $(OBJS_RELEASE) + $(call building,$@) + $(indent); + $(LD) $(LIBDIR_RELEASE) -o $@ $(OBJDIR_RELEASE)/*.o $(LDFLAGS_RELEASE) $(LIB_RELEASE) + +clean_release: + rm -rf $(OUTDIR_RELEASE) + rm -rf $(OBJDIR_RELEASE) + +$(OBJDIR_RELEASE)/%.o: */%.cpp + $(call compile,$(CFLAGS_RELEASE),$(INC_RELEASE),$(MACRO_RELEASE)) + +$(OBJDIR_RELEASE)/%.o: %.cpp + $(call compile,$(CFLAGS_RELEASE),$(INC_RELEASE),$(MACRO_RELEASE)) + +# phony targets +.PHONY: before_debug after_debug clean_debug before_release after_release clean_release + +# dependency +include deps +deps: $(CPPSWITHDIR) + @tput bold + @echo "Rebuilding dependencies ... " + @tput sgr0 + $(indent) + $(CXX) $(CFLAGS) -MM *.cpp */*.cpp > rawdeps + $(indent) + cat rawdeps | sed -e "s/\(.*\)\.o/$(OBJDIRSED_DEBUG)\/\1\.o/" > deps + $(indent) + cat rawdeps | sed -e "s/\(.*\)\.o/$(OBJDIRSED_RELEASE)\/\1\.o/" >> deps + @tput setaf 2 + @tput bold + @echo "Done." + @tput sgr0; diff --git a/nodemanager/test/ExecutionFilterTest.cpp b/nodemanager/test/ExecutionFilterTest.cpp new file mode 100644 index 0000000..c3b044b --- /dev/null +++ b/nodemanager/test/ExecutionFilterTest.cpp @@ -0,0 +1,105 @@ +#include "ExecutionFilterTest.h" + +#ifdef DEBUG + +#include +#include +#include +#include +#include + +#include "../utils/JsonHelper.h" +#include "../core/Process.h" +#include "../utils/Logger.h" +#include "../core/RemoteCommunicator.h" +#include "../core/RemoteExecutor.h" +#include "../core/NodeManagerConfig.h" +#include "../common/ErrorCodes.h" +#include "../core/HttpHelper.h" +#include "../utils/System.h" + +using namespace hpc::tests; +using namespace hpc::core; +using namespace hpc::data; +using namespace hpc::utils; + +using namespace web::http; +using namespace web; +using namespace web::http::experimental::listener; +using namespace web::http::client; + +bool ExecutionFilterTest::JobStart() +{ + bool result = true; + const std::string networkName = ""; + RemoteExecutor executor(networkName); + + http_listener_config config; + config.set_ssl_context_callback([] (auto& ctx) + { + HttpHelper::ConfigListenerSslContext(ctx); + }); + + RemoteCommunicator rc(executor, config); + rc.Open(); + + http_client client(U("http://localhost:40000/")); + uri_builder builder(U("/api/node/startjobandtask")); + + std::string stdoutFile = "/tmp/JobStartFilterTest"; + std::string output; + System::ExecuteCommandOut(output, "rm", stdoutFile); + + ProcessStartInfo psi( + "echo 123", + "", + std::string(stdoutFile), + "", + "", + 0, + std::vector(), + { { "CCP_ISADMIN", "1" } }); + + StartJobAndTaskArgs arg( + 88, + 88, + std::move(psi), + "", + ""); + + json::value v1 = arg.ToJson(); + + Logger::Debug("Debug JSON: {0}", v1.serialize()); + StartJobAndTaskArgs arg1 = StartJobAndTaskArgs::FromJson(v1); + + client.request(methods::POST, builder.to_string(), arg.ToJson()).then([&] (http_response response) + { + if (status_codes::OK != response.status_code()) + { + Logger::Debug("The response code {0}", response.status_code()); + result = false; + } + }).wait(); + + sleep(2); + + // verify the job start filter is called. + // The command line is changed to echo 456. + std::ifstream outFile(stdoutFile, std::ios::in); + if (outFile) + { + int num; + outFile >> num; + Logger::Debug("stdout value {0}, expected value {1}", num, 456); + result &= num == 456; + } + else + { + Logger::Debug("Stdout file not found {0}", stdoutFile); + result = false; + } + + return result; +} + +#endif // DEBUG diff --git a/nodemanager/test/ExecutionFilterTest.h b/nodemanager/test/ExecutionFilterTest.h new file mode 100644 index 0000000..92b8c4b --- /dev/null +++ b/nodemanager/test/ExecutionFilterTest.h @@ -0,0 +1,25 @@ +#ifndef EXECUTIONFILTERTEST_H +#define EXECUTIONFILTERTEST_H + +#ifdef DEBUG + +namespace hpc +{ + namespace tests + { + class ExecutionFilterTest + { + public: + ExecutionFilterTest() { } + + static bool JobStart(); + + protected: + private: + }; + } +} + +#endif // DEBUG + +#endif // EXECUTIONFILTERTEST_H diff --git a/nodemanager/test/ProcessTest.cpp b/nodemanager/test/ProcessTest.cpp index 82d53ef..3d59940 100644 --- a/nodemanager/test/ProcessTest.cpp +++ b/nodemanager/test/ProcessTest.cpp @@ -9,7 +9,7 @@ using namespace hpc::tests; using namespace hpc::core; using namespace hpc::data; - + using namespace web::http; using namespace web; @@ -27,8 +27,8 @@ bool ProcessTest::ClusRun() int callbackCount = 0; bool callbacked = false; - listener.support( - methods::POST, + listener.support( + methods::POST, [&result, &callbackCount](auto request) { auto j = request.extract_json().get(); @@ -46,7 +46,7 @@ bool ProcessTest::ClusRun() listener.open().wait(); - Logger::Info("Listener opened"); + Logger::Info("Listener opened"); Process p( 25, 28, 1, "echo 30; sleep 1; echo 31", listeningUri, "", "", "", "root", @@ -65,16 +65,17 @@ bool ProcessTest::ClusRun() callbacked = true; }); - p.Start().then([&result, &started] (pid_t pid) + pthread_t threadId; + + p.Start().then([&result, &started, &threadId] (std::pair ids) { - if (pid <= 0) result = false; - - Logger::Info("pid {0}, result {1}", pid, result); - + if (ids.first <= 0) result = false; + Logger::Info("pid {0}, result {1}", ids.first, result); + threadId = ids.second; started = true; - }); + }).wait(); - sleep(2); + pthread_join(threadId, nullptr); if (!(callbacked && started)) result = false; @@ -123,6 +124,7 @@ eesGSKS5l22ZMXJNShgzPKmv3HpH22CSVpO0sNZ6R+iG8a3oq4QkU61MT1CfGoMI\ a8lxTKnZCsRXU1HexqZs+DSc+30tz50bNqLdido/l5B4EJnQP03ciO0=\ -----END RSA PRIVATE KEY-----"; + System::DeleteUser(userName); std::string publicKey = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDEkoEAGGc6wT16d4Ye+yN2hcqigdTGlMcjUlW6cAmRWYXLwkKoW3WlX3xAK0oQdMLqRDu2PVRPY3qfHURj0EEellpydeaSekp1fg27Rw2VKmEumu6Wxwo9HddXORPAQXTQ4yI0lWSerypckXVPeVjHetbkSci2foLedCbeBA9c/RyRgIUl227/pJKDNX2Rpqly0sY82nVWN/0p4NAyslexA0fGdBx+IgKnbU2JQKJeiwOomtEB/N492XRfCw2eCi7Ly3R8+U1KeBm+zH6Q8aH8ApqQohhLRw71bcWZ1g1bxd6HORxXOu0mFTzHbWFcZ9ILtXRl4Pt0x5Mve1AJXEKb hpclabsa@longhaulLN5-033\n"; int ret = System::CreateUser(userName, password); @@ -151,16 +153,17 @@ a8lxTKnZCsRXU1HexqZs+DSc+30tz50bNqLdido/l5B4EJnQP03ciO0=\ callbacked = true; }); - p.Start().then([&result, &started] (pid_t pid) + pthread_t threadId; + + p.Start().then([&result, &started, &threadId] (std::pair ids) { - if (pid <= 0) result = false; - - Logger::Info("pid {0}, result {1}", pid, result); - + if (ids.first <= 0) result = false; + Logger::Info("pid {0}, result {1}", ids.first, result); + threadId = ids.second; started = true; - }); + }).wait(); - sleep(1); + pthread_join(threadId, nullptr); if (!(callbacked && started)) result = false; @@ -213,6 +216,7 @@ a8lxTKnZCsRXU1HexqZs+DSc+30tz50bNqLdido/l5B4EJnQP03ciO0=\ -----END RSA PRIVATE KEY-----"; std::string publicKey = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDEkoEAGGc6wT16d4Ye+yN2hcqigdTGlMcjUlW6cAmRWYXLwkKoW3WlX3xAK0oQdMLqRDu2PVRPY3qfHURj0EEellpydeaSekp1fg27Rw2VKmEumu6Wxwo9HddXORPAQXTQ4yI0lWSerypckXVPeVjHetbkSci2foLedCbeBA9c/RyRgIUl227/pJKDNX2Rpqly0sY82nVWN/0p4NAyslexA0fGdBx+IgKnbU2JQKJeiwOomtEB/N492XRfCw2eCi7Ly3R8+U1KeBm+zH6Q8aH8ApqQohhLRw71bcWZ1g1bxd6HORxXOu0mFTzHbWFcZ9ILtXRl4Pt0x5Mve1AJXEKb hpclabsa@longhaulLN5-033\n"; + System::DeleteUser(userName); int ret = System::CreateUser(userName, password); if (ret != 0) return false; @@ -240,16 +244,17 @@ a8lxTKnZCsRXU1HexqZs+DSc+30tz50bNqLdido/l5B4EJnQP03ciO0=\ callbacked = true; }); - p.Start().then([&result, &started] (pid_t pid) + pthread_t threadId; + + p.Start().then([&result, &started, &threadId] (std::pair ids) { - if (pid <= 0) result = false; - - Logger::Info("pid {0}, result {1}", pid, result); - + if (ids.first <= 0) result = false; + Logger::Info("pid {0}, result {1}", ids.first, result); + threadId = ids.second; started = true; - }); + }).wait(); - sleep(1); + pthread_join(threadId, nullptr); if (!(callbacked && started)) result = false; @@ -302,6 +307,7 @@ a8lxTKnZCsRXU1HexqZs+DSc+30tz50bNqLdido/l5B4EJnQP03ciO0=\ -----END RSA PRIVATE KEY-----"; std::string publicKey = "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDEkoEAGGc6wT16d4Ye+yN2hcqigdTGlMcjUlW6cAmRWYXLwkKoW3WlX3xAK0oQdMLqRDu2PVRPY3qfHURj0EEellpydeaSekp1fg27Rw2VKmEumu6Wxwo9HddXORPAQXTQ4yI0lWSerypckXVPeVjHetbkSci2foLedCbeBA9c/RyRgIUl227/pJKDNX2Rpqly0sY82nVWN/0p4NAyslexA0fGdBx+IgKnbU2JQKJeiwOomtEB/N492XRfCw2eCi7Ly3R8+U1KeBm+zH6Q8aH8ApqQohhLRw71bcWZ1g1bxd6HORxXOu0mFTzHbWFcZ9ILtXRl4Pt0x5Mve1AJXEKb hpclabsa@longhaulLN5-033\n"; + System::DeleteUser(userName); int ret = System::CreateUser(userName, password); if (ret != 0) return false; @@ -364,16 +370,17 @@ a8lxTKnZCsRXU1HexqZs+DSc+30tz50bNqLdido/l5B4EJnQP03ciO0=\ callbacked = true; }); - p.Start().then([&result, &started] (pid_t pid) + pthread_t threadId; + + p.Start().then([&result, &started, &threadId] (std::pair ids) { - if (pid <= 0) result = false; - - Logger::Info("pid {0}, result {1}", pid, result); - + if (ids.first <= 0) result = false; + Logger::Info("pid {0}, result {1}", ids.first, result); + threadId = ids.second; started = true; - }); + }).wait(); - sleep(1); + pthread_join(threadId, nullptr); if (!(callbacked && started)) result = false; diff --git a/nodemanager/test/TestRunner.cpp b/nodemanager/test/TestRunner.cpp index 30092f1..79edb0c 100644 --- a/nodemanager/test/TestRunner.cpp +++ b/nodemanager/test/TestRunner.cpp @@ -4,6 +4,7 @@ #include "../utils/Logger.h" #include "ProcessTest.h" +#include "ExecutionFilterTest.h" using namespace hpc::tests; using namespace hpc::utils; @@ -14,6 +15,7 @@ TestRunner::TestRunner() this->tests["Affinity"] = []() { return ProcessTest::Affinity(); }; this->tests["RemainingProcess"] = []() { return ProcessTest::RemainingProcess(); }; this->tests["ClusRun"] = []() { return ProcessTest::ClusRun(); }; + this->tests["FilterJobStart"] = []() { return ExecutionFilterTest::JobStart(); }; } bool TestRunner::Run() @@ -22,6 +24,9 @@ bool TestRunner::Run() Logger::Info("========================================================"); Logger::Info("Start Testing, {0} cases in total.", this->tests.size()); Logger::Info("========================================================"); + + std::map results; + for (auto& t : this->tests) { Logger::Info(""); @@ -34,9 +39,29 @@ bool TestRunner::Run() Logger::Info(""); Logger::Info(""); + results[t.first] = result; + if (!result) finalResult = false; } + Logger::Info(""); + Logger::Info(""); + Logger::Info("========================================================"); + Logger::Info("Testing result summary"); + Logger::Info("========================================================"); + int passed = 0; + for (auto& t : results) + { + Logger::Info("Testing {0} : {1}", t.first, t.second ? "Passed" : "Failed"); + if (t.second) + { + passed++; + } + } + + Logger::Info("========================================================"); + Logger::Info("All passed {0}, Pass rate {1} / {2}.", finalResult, passed, this->tests.size()); + return finalResult; } diff --git a/nodemanager/test/TestRunner.h b/nodemanager/test/TestRunner.h index d6889d3..0f44de5 100644 --- a/nodemanager/test/TestRunner.h +++ b/nodemanager/test/TestRunner.h @@ -16,6 +16,7 @@ namespace hpc TestRunner(); bool Run(); + protected: private: std::map> tests; diff --git a/nodemanager/utils/JsonHelper.cpp b/nodemanager/utils/JsonHelper.cpp index 798a02c..3c1e03d 100644 --- a/nodemanager/utils/JsonHelper.cpp +++ b/nodemanager/utils/JsonHelper.cpp @@ -37,6 +37,7 @@ namespace hpc TOJSON(int, number) TOJSON(double, number) + TOJSON(uint64_t, number) TOJSON(bool, boolean) TOJSON(std::string, string) } diff --git a/nodemanager/utils/JsonHelper.h b/nodemanager/utils/JsonHelper.h index 4854fd7..7906a2f 100644 --- a/nodemanager/utils/JsonHelper.h +++ b/nodemanager/utils/JsonHelper.h @@ -1,142 +1,152 @@ -#ifndef JSONHELPER_H -#define JSONHELPER_H - -#include -#include - -#include - -using namespace web; - -namespace hpc -{ - namespace utils - { - template - class JsonHelper - { - public: - static T Read(const std::string& name, const json::value& j) - { - json::value obj; +#ifndef JSONHELPER_H +#define JSONHELPER_H - if (j.has_field(name)) - { - obj = j.at(name); - } +#include +#include - return JsonHelper::FromJson(obj); - } - - static void Write(const std::string& name, json::value& v, const T& t) - { - v[name] = ToJson(t); - } - - static T FromJson(const json::value& j); - - static json::value ToJson(const T& t); - - protected: - private: - }; - - // Partial specialize for vector. - template - class JsonHelper> - { - public: - static std::vector Read(const std::string& name, const json::value& j) - { - json::value obj; +#include - if (j.has_field(name)) - { - obj = j.at(name); - } +using namespace web; - return JsonHelper>::FromJson(obj); - } - - static std::vector FromJson(const json::value& j) - { - std::vector values; - - if (!j.is_null()) - { - auto arr = JsonHelper::FromJson(j); - - std::transform( - arr.cbegin(), - arr.cend(), - std::back_inserter(values), - [](const auto& i) { return JsonHelper::FromJson(i); }); - } - - return std::move(values); - } - - static json::array ToJson(const std::vector& vec) - { - std::vector values; - - std::transform( - vec.cbegin(), - vec.cend(), - std::back_inserter(values), - [](const T& v) { return JsonHelper::ToJson(v); }); - - return std::move(json::value::array(values)); - } - - protected: - private: - }; - - // Partial specialize for map. - template - class JsonHelper> - { - public: - static std::map Read(const std::string& name, const json::value& j) +namespace hpc +{ + namespace utils + { + template + class JsonHelper + { + public: + static T Read(const std::string& name, const json::value& j) { json::value obj; if (j.has_field(name)) - { + { obj = j.at(name); } - - return std::move(JsonHelper>::FromJson(obj)); - } - - static std::map FromJson(const json::value& j) - { - std::map values; - - if (!j.is_null()) - { - const auto& arr = JsonHelper::FromJson(j); - - for (auto const i : arr) - { - values[i.first] = JsonHelper::FromJson(i.second); - } - } - - return std::move(values); - } -// -// static json::object ToJson(const std::map& m) -// { -// return json::value::object(m); -// } - // static void Write(const std::string& name, json::value& v, const std::vector& t); - - protected: - private: - }; - } -} - -#endif // JSONHELPER_H + + return JsonHelper::FromJson(obj); + } + + static void Write(const std::string& name, json::value& v, const T& t) + { + v[name] = ToJson(t); + } + + static T FromJson(const json::value& j); + + static json::value ToJson(const T& t); + + protected: + private: + }; + + // Partial specialize for vector. + template + class JsonHelper> + { + public: + static std::vector Read(const std::string& name, const json::value& j) + { + json::value obj; + + if (j.has_field(name)) + { + obj = j.at(name); + } + + return JsonHelper>::FromJson(obj); + } + + static std::vector FromJson(const json::value& j) + { + std::vector values; + + if (!j.is_null()) + { + auto arr = JsonHelper::FromJson(j); + + std::transform( + arr.cbegin(), + arr.cend(), + std::back_inserter(values), + [](const auto& i) { return JsonHelper::FromJson(i); }); + } + + return std::move(values); + } + + static json::value ToJson(const std::vector& vec) + { + std::vector values; + + std::transform( + vec.cbegin(), + vec.cend(), + std::back_inserter(values), + [](const T& v) { return JsonHelper::ToJson(v); }); + + return std::move(json::value::array(values)); + } + + protected: + private: + }; + + // Partial specialize for map. + template + class JsonHelper> + { + public: + static std::map Read(const std::string& name, const json::value& j) + { + json::value obj; + + if (j.has_field(name)) + { + obj = j.at(name); + } + + return std::move(JsonHelper>::FromJson(obj)); + } + + static std::map FromJson(const json::value& j) + { + std::map values; + + if (!j.is_null()) + { + const auto& arr = JsonHelper::FromJson(j); + + for (auto const i : arr) + { + values[i.first] = JsonHelper::FromJson(i.second); + } + } + + return std::move(values); + } + + static json::value ToJson(const std::map& m) + { + json::value v; + + std::for_each( + m.cbegin(), + m.cend(), + [&v](const std::pair& p) + { + v[p.first] = JsonHelper::ToJson(p.second); + }); + + return std::move(v); + } + // static void Write(const std::string& name, json::value& v, const std::vector& t); + + protected: + private: + }; + } +} + +#endif // JSONHELPER_H diff --git a/nodemanager/utils/Logger.cpp b/nodemanager/utils/Logger.cpp index 9e63f83..f3d55c2 100644 --- a/nodemanager/utils/Logger.cpp +++ b/nodemanager/utils/Logger.cpp @@ -1,15 +1,15 @@ -#include "Logger.h" - -using namespace hpc::utils; - -Logger::Logger() -{ - loggers.push_back(spdlog::stdout_logger_mt("console")); - loggers.push_back(spdlog::rotating_logger_mt("nodemanager", "logs/nodemanager", 1048576 * 5, 100)); - - spdlog::set_level(spdlog::level::debug); - spdlog::set_pattern("[%m/%d %T.%e] %t %l: %v"); -} - +#include "Logger.h" + +using namespace hpc::utils; + +Logger::Logger() +{ + loggers.push_back(spdlog::stdout_logger_mt("console")); + loggers.push_back(spdlog::rotating_logger_mt("nodemanager", "logs/nodemanager", 1048576 * 5, 100)); + + spdlog::set_level(spdlog::level::debug); + spdlog::set_pattern("[%m/%d %T.%e] %t %l: %v"); +} + Logger Logger::instance; diff --git a/nodemanager/utils/Logger.h b/nodemanager/utils/Logger.h index cceb980..23b7ac8 100644 --- a/nodemanager/utils/Logger.h +++ b/nodemanager/utils/Logger.h @@ -1,134 +1,134 @@ -#ifndef LOGGER_H -#define LOGGER_H - -#include -#include -#include -#include +#ifndef LOGGER_H +#define LOGGER_H + +#include +#include +#include +#include #include -#include "String.h" - -namespace hpc -{ - namespace utils - { - enum class LogLevel - { - Emergency = 0, /* system is unusable */ - Alert = 1, /* action must be taken immediately */ - Critical = 2, /* critical conditions */ - Error = 3, /* error conditions */ - Warning = 4, /* warning conditions */ - Notice = 5, /* normal but significant condition */ - Info = 6, /* informational */ - Debug = 7 /* debug-level messages */ - }; - - class Logger - { - public: - template - static void Info(const char* fmt, Args ...args) - { - Log(LogLevel::Info, fmt, args...); - } - - template - static void Error(const char* fmt, Args ...args) - { - Log(LogLevel::Error, fmt, args...); - } - - template - static void Warn(const char* fmt, Args ...args) - { - Log(LogLevel::Warning, fmt, args...); - } - - template - static void Debug(const char* fmt, Args ...args) - { - Log(LogLevel::Debug, fmt, args...); - } - - template - static void Info(int jobId, int taskId, int requeue, const char* fmt, Args ...args) +#include "String.h" + +namespace hpc +{ + namespace utils + { + enum class LogLevel + { + Emergency = 0, /* system is unusable */ + Alert = 1, /* action must be taken immediately */ + Critical = 2, /* critical conditions */ + Error = 3, /* error conditions */ + Warning = 4, /* warning conditions */ + Notice = 5, /* normal but significant condition */ + Info = 6, /* informational */ + Debug = 7 /* debug-level messages */ + }; + + class Logger + { + public: + template + static void Info(const char* fmt, Args ...args) { - auto f = String::Join("", "Job ", jobId, ", Task ", taskId, ".", requeue, ": ", fmt); - Log(LogLevel::Info, f.c_str(), args...); - } - - template - static void Error(int jobId, int taskId, int requeue, const char* fmt, Args ...args) - { - auto f = String::Join("", "Job ", jobId, ", Task ", taskId, ".", requeue, ": ", fmt); - Log(LogLevel::Error, f.c_str(), args...); - } - - template - static void Warn(int jobId, int taskId, int requeue, const char* fmt, Args ...args) - { - auto f = String::Join("", "Job ", jobId, ", Task ", taskId, ".", requeue, ": ", fmt); - Log(LogLevel::Warning, f.c_str(), args...); - } - - template - static void Debug(int jobId, int taskId, int requeue, const char* fmt, Args ...args) - { - auto f = String::Join("", "Job ", jobId, ", Task ", taskId, ".", requeue, ": ", fmt); - Log(LogLevel::Debug, f.c_str(), args...); - } - - template - static void Log(LogLevel level, const char* fmt, Args ...args) - { - for (auto logger : instance.loggers) - { - switch (level) - { - case LogLevel::Emergency: - logger->emerg(fmt, args...); - break; - case LogLevel::Alert: - logger->alert(fmt, args...); - break; - case LogLevel::Critical: - logger->critical(fmt, args...); - break; - case LogLevel::Error: - logger->error(fmt, args...); - break; - case LogLevel::Warning: - logger->warn(fmt, args...); - break; - case LogLevel::Notice: - logger->notice(fmt, args...); - break; - case LogLevel::Info: - logger->info(fmt, args...); - break; - case LogLevel::Debug: - logger->debug(fmt, args...); - break; - default: - logger->trace(fmt, args...); + Log(LogLevel::Info, fmt, args...); + } + + template + static void Error(const char* fmt, Args ...args) + { + Log(LogLevel::Error, fmt, args...); + } + + template + static void Warn(const char* fmt, Args ...args) + { + Log(LogLevel::Warning, fmt, args...); + } + + template + static void Debug(const char* fmt, Args ...args) + { + Log(LogLevel::Debug, fmt, args...); + } + + template + static void Info(int jobId, int taskId, int requeue, const char* fmt, Args ...args) + { + auto f = String::Join("", "Job ", jobId, ", Task ", taskId, ".", requeue, ": ", fmt); + Log(LogLevel::Info, f.c_str(), args...); + } + + template + static void Error(int jobId, int taskId, int requeue, const char* fmt, Args ...args) + { + auto f = String::Join("", "Job ", jobId, ", Task ", taskId, ".", requeue, ": ", fmt); + Log(LogLevel::Error, f.c_str(), args...); + } + + template + static void Warn(int jobId, int taskId, int requeue, const char* fmt, Args ...args) + { + auto f = String::Join("", "Job ", jobId, ", Task ", taskId, ".", requeue, ": ", fmt); + Log(LogLevel::Warning, f.c_str(), args...); + } + + template + static void Debug(int jobId, int taskId, int requeue, const char* fmt, Args ...args) + { + auto f = String::Join("", "Job ", jobId, ", Task ", taskId, ".", requeue, ": ", fmt); + Log(LogLevel::Debug, f.c_str(), args...); + } + + template + static void Log(LogLevel level, const char* fmt, Args ...args) + { + for (auto logger : instance.loggers) + { + switch (level) + { + case LogLevel::Emergency: + logger->emerg(fmt, args...); + break; + case LogLevel::Alert: + logger->alert(fmt, args...); + break; + case LogLevel::Critical: + logger->critical(fmt, args...); + break; + case LogLevel::Error: + logger->error(fmt, args...); + break; + case LogLevel::Warning: + logger->warn(fmt, args...); + break; + case LogLevel::Notice: + logger->notice(fmt, args...); + break; + case LogLevel::Info: + logger->info(fmt, args...); + break; + case LogLevel::Debug: + logger->debug(fmt, args...); + break; + default: + logger->trace(fmt, args...); } if (level <= LogLevel::Warning) { logger->flush(); - } - } - } - - private: - Logger(); - - static Logger instance; - std::vector> loggers; - }; - } -} - -#endif // LOGGER_H + } + } + } + + private: + Logger(); + + static Logger instance; + std::vector> loggers; + }; + } +} + +#endif // LOGGER_H diff --git a/nodemanager/utils/String.h b/nodemanager/utils/String.h index 470ab53..04e5c56 100644 --- a/nodemanager/utils/String.h +++ b/nodemanager/utils/String.h @@ -1,37 +1,37 @@ -#ifndef STRING_H -#define STRING_H - -#include -#include +#ifndef STRING_H +#define STRING_H + +#include +#include #include -#include - -namespace hpc -{ - namespace utils - { - class String - { - public: - static std::vector Split(const std::string& str, char delim); - - template - static std::string Join(const T& delim, const Args& ...args) - { - std::ostringstream oss; - bool first = true; +#include + +namespace hpc +{ + namespace utils + { + class String + { + public: + static std::vector Split(const std::string& str, char delim); + + template + static std::string Join(const T& delim, const Args& ...args) + { + std::ostringstream oss; + bool first = true; auto tmp = { ((first ? oss : oss << delim) << args, first = false)... }; - [&tmp]() { }; - - return std::move(oss.str()); + [&tmp]() { }; + + return std::move(oss.str()); } - - template - static std::string Join(const std::vector& values) - { - std::ostringstream oss; - bool first = true; + + template + static std::string Join(const std::vector& values) + { + std::ostringstream oss; + bool first = true; for (const auto& v : values) { @@ -39,7 +39,7 @@ namespace hpc first = false; } - return std::move(oss.str()); + return std::move(oss.str()); } static inline std::string Trim(const std::string& s) @@ -64,11 +64,11 @@ namespace hpc return std::move(userName); } - - protected: + + protected: private: - }; - } -} - -#endif // STRING_H + }; + } +} + +#endif // STRING_H diff --git a/nodemanager/utils/System.cpp b/nodemanager/utils/System.cpp index 0e00fc5..79339c0 100644 --- a/nodemanager/utils/System.cpp +++ b/nodemanager/utils/System.cpp @@ -542,3 +542,38 @@ int System::DeleteUser(const std::string& userName) return ret; } + +int System::CreateTempFolder(char* folderTemplate, const std::string& userName) +{ + char* p = mkdtemp(folderTemplate); + + if (p) + { + std::string output; + int ret = System::ExecuteCommandOut(output, "chown -R", userName, p); + if (ret == 0) + { + ret = System::ExecuteCommandOut(output, "chmod -R u+rwX", p); + } + + return ret; + } + else + { + return errno; + } +} + +int System::WriteStringToFile(const std::string& fileName, const std::string& contents) +{ + std::ofstream os(fileName, std::ios::trunc); + if (!os) + { + return (int)ErrorCodes::WriteFileError; + } + + os << contents; + os.close(); + + return 0; +} diff --git a/nodemanager/utils/System.h b/nodemanager/utils/System.h index 6370671..45131d8 100644 --- a/nodemanager/utils/System.h +++ b/nodemanager/utils/System.h @@ -1,40 +1,40 @@ -#ifndef SYSTEM_H -#define SYSTEM_H - -#include - -#include "String.h" +#ifndef SYSTEM_H +#define SYSTEM_H + +#include + +#include "String.h" #include "Logger.h" -#include "../common/ErrorCodes.h" - -namespace hpc -{ - namespace utils - { - enum class IpAddressVersion - { - V4 = 4, - V6 = 6 - }; - - struct System - { - public: - typedef std::tuple NetInfo; - - static std::vector GetNetworkInfo(); - static std::string GetIpAddress(IpAddressVersion version, const std::string& name); - static void CPUUsage(uint64_t &total, uint64_t &idle); - static void Memory(uint64_t &available, uint64_t &total); - static void CPU(int &cores, int &sockets); - static int NetworkUsage(uint64_t &network, const std::string& netName); +#include "../common/ErrorCodes.h" + +namespace hpc +{ + namespace utils + { + enum class IpAddressVersion + { + V4 = 4, + V6 = 6 + }; + + struct System + { + public: + typedef std::tuple NetInfo; + + static std::vector GetNetworkInfo(); + static std::string GetIpAddress(IpAddressVersion version, const std::string& name); + static void CPUUsage(uint64_t &total, uint64_t &idle); + static void Memory(uint64_t &available, uint64_t &total); + static void CPU(int &cores, int &sockets); + static int NetworkUsage(uint64_t &network, const std::string& netName); static int Vmstat(float &pagesPerSec, float &contextSwitchesPerSec); static int Iostat(float &bytesPerSec); static int IostatX(float &queueLength); static int FreeSpace(float &freeSpaceKB); - static const std::string& GetNodeName(); - static bool IsCGroupInstalled(); - + static const std::string& GetNodeName(); + static bool IsCGroupInstalled(); + static const std::string& GetDistroInfo(); static int CreateUser( @@ -58,76 +58,78 @@ namespace hpc const std::string& userName, const std::string& key); - static int DeleteUser(const std::string& userName); - - template - static int ExecuteCommandIn(const std::string& input, const std::string& cmd, const Args& ... args) + static int DeleteUser(const std::string& userName); + static int CreateTempFolder(char* folderTemplate, const std::string& userName); + static int WriteStringToFile(const std::string& fileName, const std::string& contents); + + template + static int ExecuteCommandIn(const std::string& input, const std::string& cmd, const Args& ... args) { - std::string command = String::Join(" ", cmd, args...); - //Logger::Debug("Executing cmd: {0}", command); - FILE* stream = popen(command.c_str(), "w"); - int exitCode = (int)hpc::common::ErrorCodes::PopenError; - - if (stream) + std::string command = String::Join(" ", cmd, args...); + //Logger::Debug("Executing cmd: {0}", command); + FILE* stream = popen(command.c_str(), "w"); + int exitCode = (int)hpc::common::ErrorCodes::PopenError; + + if (stream) { if (!input.empty()) { fputs(input.c_str(), stream); } - - int ret = pclose(stream); - exitCode = WEXITSTATUS(ret); - } - else + + int ret = pclose(stream); + exitCode = WEXITSTATUS(ret); + } + else { - Logger::Error("Error when popen {0}", command); - } - + Logger::Error("Error when popen {0}", command); + } + return exitCode; } - - template - static int ExecuteCommandOut(std::string& output, const std::string& cmd, const Args& ... args) - { - std::string command = String::Join(" ", cmd, args...); - //Logger::Debug("Executing cmd: {0}", command); - int exitCode = (int)hpc::common::ErrorCodes::PopenError; - - std::ostringstream result; - FILE* stream = popen(command.c_str(), "r"); - if (stream) - { - char buffer[512]; - while (fgets(buffer, sizeof(buffer), stream) != nullptr) - { - result << buffer; - } - - int ret = pclose(stream); + template + static int ExecuteCommandOut(std::string& output, const std::string& cmd, const Args& ... args) + { + std::string command = String::Join(" ", cmd, args...); + //Logger::Debug("Executing cmd: {0}", command); + int exitCode = (int)hpc::common::ErrorCodes::PopenError; + + std::ostringstream result; + FILE* stream = popen(command.c_str(), "r"); + + if (stream) + { + char buffer[512]; + while (fgets(buffer, sizeof(buffer), stream) != nullptr) + { + result << buffer; + } + + int ret = pclose(stream); exitCode = WEXITSTATUS(ret); - } - else + } + else { int err = errno; - Logger::Error("Error when popen {0}, errno {1}", command, err); - result << "error when popen " << command << ", errno " << err << std::endl; - } - - output = result.str(); + Logger::Error("Error when popen {0}, errno {1}", command, err); + result << "error when popen " << command << ", errno " << err << std::endl; + } + + output = result.str(); if (exitCode != 0) { Logger::Warn("Executing {0}, error code {1}", command, exitCode); } - - return exitCode; - } - - protected: - private: - }; - } -} - -#endif // SYSTEM_H + + return exitCode; + } + + protected: + private: + }; + } +} + +#endif // SYSTEM_H