diff --git a/deployment/pypi/setup.py b/deployment/pypi/setup.py index bd97927c3..c0447b4f7 100644 --- a/deployment/pypi/setup.py +++ b/deployment/pypi/setup.py @@ -63,7 +63,7 @@ setuptools.setup( 'psutil', 'requests', 'astor', - 'pyhdfs', + 'PythonWebHDFS', 'hyperopt', 'json_tricks', 'numpy', diff --git a/setup.py b/setup.py index d9329bd25..a543a0925 100644 --- a/setup.py +++ b/setup.py @@ -63,7 +63,7 @@ setup( 'requests', 'scipy', 'schema', - 'pyhdfs' + 'PythonWebHDFS' ], cmdclass={ diff --git a/src/nni_manager/training_service/pai/paiData.ts b/src/nni_manager/training_service/pai/paiData.ts index 0f1923009..ed9fb8d23 100644 --- a/src/nni_manager/training_service/pai/paiData.ts +++ b/src/nni_manager/training_service/pai/paiData.ts @@ -64,7 +64,7 @@ export const PAI_TRIAL_COMMAND_FORMAT: string = `export NNI_PLATFORM=pai NNI_SYS_DIR={0} NNI_OUTPUT_DIR={1} NNI_TRIAL_JOB_ID={2} NNI_EXP_ID={3} NNI_TRIAL_SEQ_ID={4} && cd $NNI_SYS_DIR && sh install_nni.sh && python3 -m nni_trial_tool.trial_keeper --trial_command '{5}' --nnimanager_ip '{6}' --nnimanager_port '{7}' ---pai_hdfs_output_dir '{8}' --pai_hdfs_host '{9}' --pai_user_name {10} --nni_hdfs_exp_dir '{11}'`; +--pai_hdfs_output_dir '{8}' --pai_hdfs_host '{9}' --pai_user_name {10} --nni_hdfs_exp_dir '{11}' --webhdfs_path '/webhdfs/api/v1'`; export const PAI_OUTPUT_DIR_FORMAT: string = `hdfs://{0}:9000/`; diff --git a/tools/nni_trial_tool/trial_keeper.py b/tools/nni_trial_tool/trial_keeper.py index 1675f8bf4..2d9837a30 100644 --- a/tools/nni_trial_tool/trial_keeper.py +++ b/tools/nni_trial_tool/trial_keeper.py @@ -48,10 +48,25 @@ def main_loop(args): # redirect trial keeper's stdout and stderr to syslog trial_syslogger_stdout = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial', StdOutputType.Stdout) sys.stdout = sys.stderr = trial_keeper_syslogger + # backward compatibility + hdfs_host = None + hdfs_output_dir = None + if args.hdfs_host: + hdfs_host = args.hdfs_host + elif args.pai_hdfs_host: + hdfs_host = args.pai_hdfs_host + if args.hdfs_output_dir: + hdfs_output_dir = args.hdfs_output_dir + elif args.pai_hdfs_output_dir: + hdfs_output_dir = args.pai_hdfs_output_dir - if args.pai_hdfs_host is not None and args.nni_hdfs_exp_dir is not None: + if hdfs_host is not None and args.nni_hdfs_exp_dir is not None: try: - hdfs_client = HdfsClient(hosts='{0}:{1}'.format(args.pai_hdfs_host, '50070'), user_name=args.pai_user_name, timeout=5) + if args.webhdfs_path: + hdfs_client = HdfsClient(hosts='{0}:80'.format(hdfs_host), user_name=args.pai_user_name, webhdfs_path=args.webhdfs_path, timeout=5) + else: + # backward compatibility + hdfs_client = HdfsClient(hosts='{0}:{1}'.format(hdfs_host, '50070'), user_name=args.pai_user_name, timeout=5) except Exception as e: nni_log(LogType.Error, 'Create HDFS client error: ' + str(e)) raise e @@ -67,14 +82,14 @@ def main_loop(args): # child worker process exits and all stdout data is read if retCode is not None and log_pipe_stdout.set_process_exit() and log_pipe_stdout.is_read_completed == True: nni_log(LogType.Info, 'subprocess terminated. Exit code is {}. Quit'.format(retCode)) - if args.pai_hdfs_output_dir is not None: + if hdfs_output_dir is not None: # Copy local directory to hdfs for OpenPAI nni_local_output_dir = os.environ['NNI_OUTPUT_DIR'] try: - if copyDirectoryToHdfs(nni_local_output_dir, args.pai_hdfs_output_dir, hdfs_client): - nni_log(LogType.Info, 'copy directory from {0} to {1} success!'.format(nni_local_output_dir, args.pai_hdfs_output_dir)) + if copyDirectoryToHdfs(nni_local_output_dir, hdfs_output_dir, hdfs_client): + nni_log(LogType.Info, 'copy directory from {0} to {1} success!'.format(nni_local_output_dir, hdfs_output_dir)) else: - nni_log(LogType.Info, 'copy directory from {0} to {1} failed!'.format(nni_local_output_dir, args.pai_hdfs_output_dir)) + nni_log(LogType.Info, 'copy directory from {0} to {1} failed!'.format(nni_local_output_dir, hdfs_output_dir)) except Exception as e: nni_log(LogType.Error, 'HDFS copy directory got exception: ' + str(e)) raise e @@ -95,10 +110,13 @@ if __name__ == '__main__': PARSER.add_argument('--trial_command', type=str, help='Command to launch trial process') PARSER.add_argument('--nnimanager_ip', type=str, default='localhost', help='NNI manager rest server IP') PARSER.add_argument('--nnimanager_port', type=str, default='8081', help='NNI manager rest server port') - PARSER.add_argument('--pai_hdfs_output_dir', type=str, help='the output dir of hdfs') - PARSER.add_argument('--pai_hdfs_host', type=str, help='the host of hdfs') + PARSER.add_argument('--pai_hdfs_output_dir', type=str, help='the output dir of pai_hdfs') # backward compatibility + PARSER.add_argument('--hdfs_output_dir', type=str, help='the output dir of hdfs') + PARSER.add_argument('--pai_hdfs_host', type=str, help='the host of pai_hdfs') # backward compatibility + PARSER.add_argument('--hdfs_host', type=str, help='the host of hdfs') PARSER.add_argument('--pai_user_name', type=str, help='the username of hdfs') PARSER.add_argument('--nni_hdfs_exp_dir', type=str, help='nni experiment directory in hdfs') + PARSER.add_argument('--webhdfs_path', type=str, help='the webhdfs path used in webhdfs URL') args, unknown = PARSER.parse_known_args() if args.trial_command is None: exit(1) diff --git a/tools/setup.py b/tools/setup.py index 9e475a59a..f015876e6 100644 --- a/tools/setup.py +++ b/tools/setup.py @@ -12,7 +12,7 @@ setuptools.setup( 'psutil', 'astor', 'schema', - 'pyhdfs' + 'PythonWebHDFS' ], author = 'Microsoft NNI Team',