# -*- coding: utf-8 -*-
# <nbformat>3.0</nbformat>
# <codecell>
#!/usr/bin/env python
#
#The MIT CorrelX Correlator
#
#https://github.com/MITHaystack/CorrelX
#Contact: correlX@haystack.mit.edu
#Project leads: Victor Pankratius, Pedro Elosegui Project developer: A.J. Vazquez Alvarez
#
#Copyright 2017 MIT Haystack Observatory
#
#Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
#
#The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
#
#THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
#
#------------------------------
#------------------------------
#Project: CorrelX.
#File: lib_hadoop_hdfs.py.
#Author: A.J. Vazquez Alvarez (ajvazquez@haystack.mit.edu)
#Description:
"""
Functions for starting and stopping hadoop, and for sending files to HDFS (or copying files to Lustre).
"""
#History:
#initial version: 2015.12 ajva
#MIT Haystack Observatory
from __future__ import print_function
import os
import time
import sys
import imp
from lib_vdif import get_vdif_stats
import numpy as np
import const_hadoop
imp.reload(const_hadoop)
from const_hadoop import *
##################################################################
#
# Hadoop cluster start/stop
#
##################################################################
[docs]def cluster_start(wait_time,hadoop_conf_dir,hadoop_dir,file_slaves,temp_dir,username,temp_log,single_node=0,use_lustre_plugin=0,v=0,\
file_log=sys.stdout):
"""
Start hadoop. It returns the number of active nodes after initialization.
Parameters
----------
wait_time : int
time [s] to wait after each command sent to Hadoop.
hadoop_conf_dir : str
Hadoop configuration folder path [hadoop_home/etc/hadoop].
hadoop_dir : str
Hadoop home folder path.
file_slaves : str
Path to Hadoop slaves file.
temp_dir : str
Path to temporary folder path
username : str
user name (to be used to login into slave nodes through ssh).
temp_log : str
Path to temporary log file.
single_node : int
If 1 will only delete temporary folders on current machine (master), if 0 on all (master and slaves)
use_lustre_plugin : int
1 for Lustre filesystem, 0 for HDFS.
v : int
1 for verbose.
file_log : str
Path to log file.
Returns
-------
int_t_nodes : int
(t_nodes) number of nodes up and running in the Hadoop cluster.
Notes
-----
|
| **Summary:**
|
| 1. Delete temporary files (otherwise there may be problems with previous process IDs...).
| 2. Format HDFS (in the future the hadoop service may be running continuously to avoid initialization for every correlation...).
| 3. Start HDFS.
| 4. Start YARN.
| 5. Check running processes.
"""
#forced_temp_folder = "/tmp"
#temp_files = forced_temp_folder + "/*" + username + "*"
forced_temp_folder = temp_dir
temp_files = forced_temp_folder + "/*"
command_delete_basic = "rm -Rf --verbose " + temp_files + " > " + temp_log
#command_delete_owned = "find /tmp/ -user " + username + " -exec rm -fr {} \;"
if v==1:
print("\nStarting Hadoop...",file=file_log)
print("\n (!) Deleting temporary files (forced dir: " + forced_temp_folder + " for user " + username + ") ",file=file_log)
print(" Single node: ".ljust(10),end="",file=file_log)
if single_node==1:
print("yes",file=file_log)
else:
print("no",file=file_log)
# Delete temporary files and reformat HDFS
if v==1:
print(" Deleting temporary files (on all nodes) and reformatting HDFS...",file=file_log)
if single_node:
os.system(command_delete_basic)
#os.system(command_delete_owned)
else:
os.system("pdsh -d -R ssh -l " + username + " -w `cat " + \
hadoop_conf_dir + file_slaves + "|tr '\n' ','|rev|cut -c2-|rev` " +
command_delete_basic)
#os.system("pdsh -d -R ssh -l " + username + " -w ^" + hadoop_conf_dir + file_slaves + " " + command_delete_owned)
# Count number of removed files.
count_nodes=0
filtered_list=[]
list_nodes=[]
with open(hadoop_conf_dir + file_slaves, 'r') as f_nodes:
for line in f_nodes:
list_nodes.append(line.strip())
count_nodes+=1
with open(temp_log, 'r') as f_tmp:
for line in f_tmp:
[ip_s,file_s]=line.split("removed")
filtered_list+=[ip_s]
reduced_list = set(filtered_list)
if v==1:
for i in reduced_list:
print(" " + i.ljust(18) + " removed files: ".ljust(32) + str(filtered_list.count(i)),file=file_log)
# Format HDFS
if v==1:
print(" Reformatting HDFS...",file=file_log)
os.system(hadoop_dir + "bin/hdfs --config "+hadoop_conf_dir+" namenode -format -force")
#os.system(hadoop_dir + "bin/hdfs dfsadmin -safemode leave")
# Wait for initialization
if v==1:
print(" Waiting for Hadoop initialization (1) ("+str(wait_time)+" s)...",file=file_log)
file_log.flush()
if wait_time>0:
time.sleep(wait_time)
# Start HDFS
if v==1:
print(" Starting HDFS...",file=file_log)
if use_lustre_plugin==0:
os.system(hadoop_dir + "sbin/start-dfs.sh --config "+hadoop_conf_dir) # + new_inter)
# Wait for initialization
if v==1:
print(" Waiting for Hadoop initialization (2) ("+str(wait_time)+" s)...",file=file_log)
file_log.flush()
if wait_time>0:
time.sleep(wait_time)
# Start YARN
if v==1:
print(" Starting YARN...",file=file_log)
os.system(hadoop_dir + "sbin/start-yarn.sh --config "+hadoop_conf_dir) # + new_inter)
# Check open ports
#os.system("netstat -nlp|grep java" + plus_out)
#show_file(file_out)
# Wait for initialization
if v==1:
print(" Waiting for Hadoop initialization (3) ("+str(wait_time)+" s)...",file=file_log)
file_log.flush()
if wait_time>0:
time.sleep(wait_time)
# Start History server
if v==1:
print(" Starting History server...",file=file_log)
os.system("mkdir -p " + temp_dir + "/"+HADOOP_DIR_DONE_INTER)
os.system("mkdir -p " + temp_dir + "/"+HADOOP_DIR_DONE)
if use_lustre_plugin==0:
os.system(hadoop_dir + "sbin/mr-jobhistory-daemon.sh --config "+hadoop_conf_dir+" start historyserver") # + new_inter)
else:
os.system(hadoop_dir + "sbin/mr-jobhistory-daemon.sh --config "+hadoop_conf_dir +\
" start historyserver -D "+C_H_INLINE_LUSTRE_FS_ABS_PARAM+"="+C_H_INLINE_LUSTRE_FS_ABS_VAL) # + new_inter)
if v==1:
print(" Waiting for Hadoop initialization (and 4) ("+str(wait_time)+" s)...",file=file_log)
file_log.flush()
if wait_time>0:
time.sleep(wait_time)
# Check running processes
if v==1:
os.system("jps" + " > " + temp_log)
print(" Launched processes:",file=file_log)
with open(temp_log, 'r') as f_log:
for line in f_log:
print(" "+line.strip(),file=file_log)
if v==1:
print(" List of nodes:",file=file_log)
os.system(hadoop_dir + "bin/yarn --config "+ hadoop_conf_dir +" node -list > " + temp_log)#temp_dir + ) # + plus_out)
t_nodes=0
for iter_refresh in range(1):
with open(temp_log, 'r') as f_tmp:
for line in f_tmp:
if "RUNNING" in line:
for i in list_nodes:
if i in line:
list_nodes.remove(i)
if v==1:
print(" "+line.strip(),end="\n",file=file_log)
if "Nodes:" in line:
[stotal,t_nodes]=line.split(":")
if (iter_refresh==0)and(int(t_nodes)>count_nodes):
if v==1:
print(" Too many nodes initiated!",file=file_log)
if v==1:
print("Nodes off: "+str(list_nodes),file=file_log)
#os.system(hadoop_dir + "bin/hadoop dfsadmin -refreshNodes")
return(int(t_nodes)) #number of nodes active
[docs]def cluster_stop(wait_time,hadoop_dir,hadoop_conf_dir,temp_log,timeout=-1,v=0,file_log=sys.stdout):
"""
Stop hadoop.
Parameters
----------
wait_time : int
Time [s] to wait after each command sent to Hadoop.
hadoop_dir : str
Hadoop home folder path.
hadoop_conf_dir : str
Hadoop configuration folder path [hadoop_home/etc/hadoop].
temp_log : str
Path to temporary log file.
timeout : int
If >0 will terminate Hadoop stop command after "timeout" seconds.
v : int
1 for verbose.
file_log : str
Path to log file.
Returns
-------
N/A
Notes
-----
|
| **Summary:**
|
| -Stop YARN.
| -Stop DFS.
| -Wait "wait_time" seconds for termination.
"""
use_lustre_plugin = 0
if v==1:
print("\nStopping Hadoop...",file=file_log)
# Stop YARN
if v==1:
print(" Stopping YARN...",file=file_log)
if timeout<0:
os.system(hadoop_dir + "sbin/stop-yarn.sh --config "+hadoop_conf_dir) # + new_inter)
else:
os.system("timeout "+str(timeout)+" "+hadoop_dir + "sbin/stop-yarn.sh --config "+hadoop_conf_dir) # + new_inter)
# Stop DFS
if v==1:
print(" Stopping DFS...",file=file_log)
#print(" Stopping ALL...",file=file_log)
if use_lustre_plugin==0:
os.system(hadoop_dir + "sbin/stop-dfs.sh --config "+hadoop_conf_dir) # + new_inter)
#os.system(hadoop_dir + "sbin/stop-all.sh")
# Stop history server
os.system(hadoop_dir + "sbin/mr-jobhistory-daemon.sh --config "+hadoop_conf_dir+" stop historyserver")
# Wait for termination
if v==1:
print(" Waiting for Hadoop termination ("+str(wait_time)+" s)...",file=file_log)
#file_log.flush()
if wait_time>0:
time.sleep(wait_time)
##################################################################
#
# Move media files to distributed filesystem
#
##################################################################
[docs]def copy_files_to_hdfs(replication,input_files,data_dir,data_dir_tmp,hadoop_dir,hadoop_conf_dir,hdfs_data_dir,\
packets_per_hdfs_block,temp_log,copy_delay=0,checksum_size=100,text_mode=1,\
use_lustre_plugin=0,lustre_prefix="/nobackup1/ajva/hadoop",bm_avoid_copy=0,v=0,file_log=sys.stdout):
"""
Copy files from local directories to HDFS. It returns the elapsed time for moving the files (including the applied delay).
Parameters
----------
replication : int
Number of copies of the same file block in the HDFS system.
input_files : list of str
names of the files to be moved into HDFS/LustreFS.
data_dir : str
Path (in local filesystem) to the folder hosting the files to be moved into HDFS/LustreFS.
data_dir_tmp : str
Path (in local filesystem) to the folder for copy split input data before being moved into HDFS/LustreFS.
hadoop_dir : str
Hadoop home folder path (local filesystem).
hadoop_conf_dir : str
Hadoop configuration folder path (etc/hadoop).
hdfs_data_dir : str
Path (in HDFS/LustreFS), thus relative to "lustre_prefix", to host input files.
packets_per_hdfs_block : int
Number of VDIF frames per file split.
temp_log : str
Path to temporary log file.
copy_delay : int
Time [s] to wait after each command sent to Hadoop.
checksum_size : int
Number of bytes for checksum (for each split) [this overrides automatic calculation].
text_mode : int
[default 1] If 1 use checksum_size to override value computed automatically.
use_lustre_plugin : int
If 1 use Lustre filesysm.
lustre_prefix : str
Path in Lustre to preceed "hdfs_data_dir" if using Lustre.
bm_avoid_copy : int
[default 0] If 1 it will not split input files if "lustre_prefix"+"hdfs_data_dir" has already the data
for the file in "input_files" from a previous execution. See notes below.
v : int
1 for verbose.
file_log : str
Path to log file.
Returns
-------
put_t_s : float
timestamp with start time for loop copying files.
put_t_e : float
timestamp with stop time for loop copying files.
put_d : float
total execution time for loop copying files.
Notes
-----
|
| **Summary:**
|
| -Delete existing files in HDFS (to avoid errors on existing files)
| TODO: consider overwritting files.
| -Wait for delay if applicable.
| -Move files to HDFS (with specified block size)
| 2015.12.1. packet_size is read from the first frame of the file.
|
|
| **Notes:**
|
| -Regarding filesystems:
| HDFS: Path inside Hadoop distributed filesystem (accessible from Hadoop).
| LustreFS: Path relative to Hadoop Lustre home folder (accessible from Hadoop).
| Local filesystem: Path accesible from this command, it can be local, NFS, Lustre, etc.
|
| -Regarding "bm_avoid_copy":
| Always 0 by default.
| After each execution, for each processed file there will be a folder in "lustre_prefix"+"hdfs_data_dir"+"file_name"+... with
| the splits for that file. Setting this to 1 will avoid to re-split the file if it was already used previously. Use only
| for repeated benchmarking.
"""
# Split file manually into the size of the hdfs blocks
# Use 1 if processing input as text
#split_file_sequentially=0
split_file_sequentially=text_mode
if v==1:
print("\nCopying data files to HDFS...",file=file_log)
print(data_dir)
print(input_files)
safe_status = "OFF"
if use_lustre_plugin==0:
os.system(hadoop_dir + "bin/hdfs --config "+hadoop_conf_dir+" dfsadmin -report|grep Safe > " + temp_log)
with open(temp_log, 'r') as f_tmp:
for line in f_tmp:
safe_status = line.strip()
if safe_status == "OFF":
if v==1:
#print(" Safe status: " + safe_status,file=file_log)
print(" Checking HDFS: OK",file=file_log)
else:
#os.system(hadoop_dir+"bin/hdfs dfsadmin -safemode leave")
if v==1:
#print(" Forcing out of safe mode...",file=file_log)
print(" ERROR!: Namenode is in safe mode!...",file=file_log)
# TODO: Propagate the error...
dest_dir = hdfs_data_dir
# Delete all existing files in HDFS and create directory.
if use_lustre_plugin==0:
os.system(hadoop_dir+"bin/hdfs --config "+hadoop_conf_dir+" dfs -rm -r -f "+ dest_dir)
os.system(hadoop_dir+"bin/hdfs --config "+hadoop_conf_dir+" dfs -mkdir " + dest_dir)
else:
if bm_avoid_copy==0:
os.system("rm -r -f "+lustre_prefix+ dest_dir)
os.system("mkdir " + lustre_prefix + dest_dir)
#if v==1:
# print(" Forcing out of safe mode...",file=file_log)
#os.system(hadoop_dir+"bin/hdfs dfsadmin -safemode leave")
if copy_delay>0:
if v==1:
print(" Waiting for HDFS interaction (" + str(copy_delay) + " s)...",file=file_log)
time.sleep(copy_delay)
if v==1:
print(" Will wait for " + str(len(input_files)) + " HDFS interaction(s) (" + str(copy_delay) + " s)...",file=file_log)
if split_file_sequentially==0:
put_t_s = time.time()
for filename in input_files:
vdif_stats=get_vdif_stats(data_dir+filename,packet_limit=1,offset_bytes=0,only_offset_once=0,v=0)
packet_size=vdif_stats[4]
#blocksize = min(packet_size*packets_per_hdfs_block,os.path.getsize(data_dir+filename))
blocksize = packet_size*packets_per_hdfs_block
command_hdfs=hadoop_dir + "bin/hdfs --config "+hadoop_conf_dir+" dfs"+\
" -D "+C_H_HDFS_CHECKSUM+ "="+str(checksum_size)+\
" -D "+C_H_HDFS_BLOCKSIZE+ "="+str(blocksize)+\
" -D "+C_H_HDFS_REPLICATION+ "="+str(replication)+\
" -put -f " + data_dir + filename + " " + dest_dir
os.system(command_hdfs)
if copy_delay>0:
time.sleep(copy_delay)
put_t_e = time.time()
else:
put_t_s = time.time()
for filename in input_files:
vdif_stats=get_vdif_stats(data_dir+filename,packet_limit=1,offset_bytes=0,only_offset_once=0,v=0)
packet_size=vdif_stats[4]
checksum_size=packets_per_hdfs_block
blocksize = packet_size*packets_per_hdfs_block
suffix_size=int(1+np.ceil(np.log10(1+os.path.getsize(data_dir+filename)//blocksize)))
command_hdfs="No command executed"
send_tmp_folder = data_dir_tmp+filename+"_tmp_folder"
test_file_exists = lustre_prefix + dest_dir+"/0"+"/"+filename
if bm_avoid_copy:
if os.path.isfile(test_file_exists):
if v==1:
print("(!!) Avoiding copy of file "+filename+", already found in "+lustre_prefix + dest_dir,file=file_log)
continue
os.system("rm -r "+send_tmp_folder)
os.system("mkdir -p "+send_tmp_folder)
os.system("split --bytes="+str(blocksize)+" -d -a " + str(suffix_size) + " " +data_dir+filename+" "+send_tmp_folder+"/tmp_")
files_to_process = os.listdir(send_tmp_folder)
num_iters=len(files_to_process)
if v==1:
print(" Sending "+str(num_iters)+" block(s):",file=file_log)
print(" "+','.join(files_to_process),file=file_log)
iteri=-1
check_value=0
for fi in files_to_process:
iteri+=1
if v==1:
if (iteri/num_iters)>(check_value/100):
print(str(check_value)+"%",file=file_log)
check_value+=10
if use_lustre_plugin==0:
os.system(hadoop_dir + "bin/hdfs --config "+hadoop_conf_dir+" dfs -mkdir -p " + dest_dir+"/"+str(iteri)+"/")
os.system(hadoop_dir + "bin/hdfs --config "+hadoop_conf_dir+" dfs -ls " + dest_dir + " >> " + temp_log)
else:
os.system("mkdir "+lustre_prefix+dest_dir+"/"+str(iteri)+"/"+" &> /dev/null")
if use_lustre_plugin==0:
command_hdfs=hadoop_dir + "bin/hdfs --config "+hadoop_conf_dir+" dfs"+\
" -D "+C_H_HDFS_CHECKSUM+ "="+str(checksum_size)+\
" -D "+C_H_HDFS_BLOCKSIZE+ "="+str(blocksize)+\
" -D "+C_H_HDFS_REPLICATION+ "="+str(replication)+\
" -put -f " + send_tmp_folder + "/" + fi + " " + dest_dir+str(iteri)+"/"+filename
os.system(command_hdfs)
else:
command_hdfs="mv "+ send_tmp_folder + "/" + fi + " "+ lustre_prefix + dest_dir+str(iteri)+"/"+filename
os.system(command_hdfs)
if copy_delay>0:
time.sleep(copy_delay)
os.system("rm -r "+send_tmp_folder)
if v==1:
print("100%, deleting temp files",file=file_log)
put_t_e = time.time()
put_d = put_t_e - put_t_s
if use_lustre_plugin==0:
os.system(hadoop_dir + "bin/hdfs --config "+hadoop_conf_dir+" dfs -ls " + dest_dir + " >> " + temp_log)
if copy_delay>0:
if v==1:
print(" Last command: "+command_hdfs)
print(" Waiting for HDFS interaction (" + str(copy_delay) + " s)...",file=file_log)
time.sleep(copy_delay)
os.system("cd " + hadoop_dir)
if v==1:
if use_lustre_plugin==0:
os.system(hadoop_dir + "bin/hdfs --config "+hadoop_conf_dir+" fsck " + dest_dir + " -files -blocks >> " + temp_log)
if copy_delay>0:
if v==1:
print(" Waiting for HDFS interaction (" + str(copy_delay) + " s)...",file=file_log)
time.sleep(copy_delay)
with open(temp_log, 'r') as f_tmp:
for line in f_tmp:
print(" "+line.strip(),file=file_log)
return([put_t_s,put_t_e,put_d])
##################################################################
#
# Hadoop master and slaves files
#
##################################################################
[docs]def nodes_to_slaves_masters(conf_dir,hadoop_conf_dir,file_nodes,file_slaves,file_masters,max_slaves=0,master_is_slave=0,v=0,file_log=sys.stdout):
"""
Convert list of nodes (obtained by slurm or by local script) into hadoop slaves file.
Parameters
----------
conf_dir
path to folder where master and slaves files are copied to.
hadoop_conf_dir
path to folder where masters and slaves files are created.
file_nodes
filename for file with all nodes in the allocation (master+slaves), one node per line.
file_slaves
filename for Hadoop slave nodes file.
file_masters
filename for Hadoop master node file.
max_slaves
maximum number of slave nodes.
master_is_slave
if 1 include master node in list of slave nodes.
v
verbose if 1.
file_log
handler for log file.
Returns
-------
N/A
Notes
-----
|
| **TO DO:**
|
| Remove hadoop_conf_dir (check correct folder).
"""
if max_slaves<1:
max_slaves=1
if v==1:
print("\nCreating slaves files...",file=file_log)
print(" Slaves:",file=file_log)
i=0
with open(conf_dir + file_nodes, 'r') as f_nodes:
with open(hadoop_conf_dir + file_slaves, 'w') as f_slaves:
with open(hadoop_conf_dir + file_masters, 'w') as f_masters:
if 0==1: #max_slaves==1:
print("localhost",file=f_slaves)
print("localhost",file=f_slaves)
if v==1:
print(" Forcing slaves files to show only localhost",file=file_log)
else:
for line in f_nodes:
i+=1
if i==1:
print(line.strip(),file=f_masters)
if master_is_slave==1:
print(line.strip(),file=f_slaves)
elif i<=max_slaves:
print(line.strip(),file=f_slaves)
if v==1:
print(" "+line.strip().ljust(20),file=file_log)
os.system("cp " + hadoop_conf_dir + file_slaves + " " + conf_dir + file_slaves)
os.system("cp " + hadoop_conf_dir + file_masters + " " + conf_dir + file_masters)
##################################################################
#
# Hadoop configuration files (Update configuration)
#
##################################################################
[docs]def update_hcparam(source_file,destination_file,pairs,v=0,file_log=sys.stdout):
"""
Update parameter for hadoop configuration file.
Parameters
----------
source_file
Template for Hadoop xml configuration file.
destination_file
Final Hadoop xml configuration file (template with mods applied).
pairs
list of pairs [parameter, value]. See notes below.
v
verbose if 1.
file_log
handler for log file.
Returns
-------
N/A
Notes
------
| If the parameter already exists in the file, its value is overriden.
| If the parameter does not exist, it is added following the required format (Hadoop xml).
| That is, given a list of pairs [PARAMETER,VALUE]
|
| <configuration>
| <property>
| <name>PARAMETER</name>
| <value>VALUE</value>
| </property>
| ...
| </configuration>
"""
isamec = ""
copy_again=0
input_destination_file=destination_file
if source_file==destination_file:
copy_again=1
destination_file+=".tmp"
isamec = " mv " + destination_file + " " + source_file
command = ""
iteration=0
updated_params=[]
for pair in pairs:
iteration+=1
if iteration==1:
command+="sed -e '/" + pair[0] + "/I!b;n;c\\\\t\\t<value>" + pair[1] + "</value>' "+source_file
if iteration>0:
command+="|sed -e '/" + pair[0] + "/I!b;n;c\\\\t\\t<value>" + pair[1] + "</value>' "
command+= "> " + destination_file
os.system(command)
if copy_again==1:
os.system(isamec)
if v==1:
for pair in pairs: #[1:]:
print(" " + pair[0].ljust(49) + " " + pair[1],file=file_log)
# Now check if any parameter was not found, and thus was not updated
# Checks lowercase!
add_params=[]
with open(source_file, 'r') as f:
lines = f.read()
for pair in pairs:
count=1
if pair[0].lower() in lines.lower():
updated_params+=[pair[0]]
count=0
if count==1:
if v==1:
print(" (!) Parameter not found: " + pair[0].ljust(49),file=file_log)
if pair[0] not in updated_params:
add_params+=[[pair[0],pair[1]]]
if add_params!=[] and ".xml" in source_file:
with open(input_destination_file, 'r') as f:
lines_content = f.readlines()
with open(input_destination_file, 'w') as f:
for line in lines_content[:-1]:
#if "</configuration>" not in line:
print(line,end="",file=f)
#else:
for pair in add_params:
print(" <property>",file=f)
print(" <name>"+pair[0]+"</name>",file=f)
print(" <value>"+pair[1]+"</value>",file=f)
print(" </property>",file=f)
if v==1:
print(" (!) Added parameter: " + pair[0].ljust(49)+ " = "+pair[1],file=file_log)
print("</configuration>",file=f)
[docs]def process_hcfile(pairs,conf_dir,templates_dir,v=0,file_log=sys.stdout):
"""
Process hadoop configuration file.
Parameters
----------
pairs
list (by file) of lists (param,value) for overriding Hadoop configuration files.
conf_dir
path to folder for placing modified Hadoop configuration files.
templates_dir
path to folder with templates for Hadoop configuration files.
v
verbose if 1.
file_log
handler for log file.
Returns
-------
processed_filename
filename of the processed file.
Notes
-----
|
| **TO DO**
|
| Currently assuming that the filename is the first value in the list (i.e. [0][1]),
| need to use C_CONF_H_ALL_CONFIG_FILE instead
"""
if v==1:
print(pairs[0][1])
print(" Processing " + pairs[0][1] + "...",file=file_log)
source_file = templates_dir+pairs[0][1]
destination_file = conf_dir+pairs[0][1]
update_hcparam(source_file=source_file,destination_file=destination_file,pairs=pairs[1:],v=v,file_log=file_log)
processed_filename = pairs[0][1]
return(processed_filename)
[docs]def process_hadoop_config_files(list_configurations,pairs_config,templates_dir,conf_dir,v=0,file_log=sys.stdout):
"""
Process a set of Hadoop configuration files (core-site.xml, etc).
Parameters
----------
list_configurations
list of headers in configuration file associated to "pairs_config" below.
pairs_config
list of lists of pairs [[(param0,value0),(param1,value1),...]] to update xml files.
templates_dir
path to folder with Hadoop .xml file templates.
v
verbose if 1.
file_log
handler for log file.
Returns
-------
configuration_files
list of str with filenames of processed configuration files.
"""
if v==1:
print("\nProcessing Hadoop configuration files...",file=file_log)
print(" Reading from: " + templates_dir,file=file_log)
print(" Writing to: " + conf_dir,file=file_log)
configuration_files=[]
for (configuration,pair_config) in zip(list_configurations,pairs_config):
processed_file = process_hcfile(v=v,file_log=file_log,pairs=pair_config,conf_dir=conf_dir,templates_dir=templates_dir)
configuration_files+=[processed_file]
return(configuration_files)
##################################################################
#
# Hadoop application and configuration files distribution
#
##################################################################
[docs]def distribute_files(simply_copy_local,file_group,files,source_dir,conf_dir,destination_dir,\
nodes,temp_log,v=0,exec_permission=0,file_log=sys.stdout,username="hduser",force_node=""):
"""
Copy configuration and application files to nodes.
Parameters
----------
simply_copy_local : int
| If 1 it will only copy app and config files to the specified folder for the current machine.
| If 0, it will distribute the app and config files to the rest of the nodes via ssh.
file_group : str
identifier for this batch of files, only for reporting.
source_dir
path to folder with the files that will be distributed.
files
list of files (relative to the "source_dir" folder").
destination_dir
path to destination folder in the remote machines (thise in "nodes").
conf_dir
path to folder that includes the file "nodes".
nodes
filename (relative to "conf_dir") with one machine per line.
temp_log
handler for intermediate file (buffer) for system calls.
v
verbose if 1.
exec_permission
if 1 it will give execution permissions to the distributed files.
file_log
handler for log file.
username : str
user name (to be used to login into slave nodes through ssh).
force_node : str
if not "", it will only send the files to this node ("force_node").
Returns
-------
0
Notes
-----
|
| **TO DO:**
|
| (!) It will delete ipython-notebook lines. Need to add this as an option, or add another configuration option for third-party sources.
"""
#show_errors=""
show_errors=" 2>> " + temp_log
count_nodes=0
if v==1:
print("\nCopying " + file_group + " files to nodes...",file=file_log)
print(" Nodes:",end="",file=file_log)
with open(conf_dir + nodes, 'r') as f_nodes:
for line in f_nodes:
count_nodes+=1
print("\t\t\t"+line.strip(),file=file_log)
print(" Username: \t\t" + username,file=file_log)
print(" " + file_group + " source dir: \t"+source_dir,file=file_log)
print(" " + file_group + " files: \t\t"+" ".join(files),file=file_log)
print(" " + file_group + " destination dir: \t"+destination_dir,file=file_log)
fixed_headers = 0
for f in files:
#Remove ipython-notebook lines from source files (.py)
filename=(source_dir + f)
#Only for third party dependencies, need to add this intoa new line in config file.
#TO DO: Quick fix for third party library (hardcoded). Add option for third party dependencies.
#if (filename[-3:]==".py")and("six" not in filename):
if filename[-3:]==".py":
fixed_headers+=1
fix_file_header(filename)
if simply_copy_local:
os.system("mkdir -p " + destination_dir)
os.system("cp " + source_dir + f + " " + destination_dir + show_errors)
else:
#os.system("pdsh -d -R ssh -l " + username + " -w ^" + conf_dir + nodes + " mkdir -p " + destination_dir + " 2>> " + temp_log)
#os.system("pdcp -d -R ssh -l " + username + " -w ^" + conf_dir + nodes + " " + source_dir + f + " " + destination_dir + " 2> " + temp_log)
if force_node=="":
os.system("pdsh -d -R ssh -l " + username + " -w `cat" + conf_dir + nodes + \
"|tr '\n' ','|rev|cut -c2-|rev` mkdir -p " + destination_dir + show_errors)
os.system("pdcp -d -R ssh -l " + username + " -w `cat" + conf_dir + nodes + \
"|tr '\n' ','|rev|cut -c2-|rev` " + source_dir + f + " " + destination_dir + show_errors)
else:
os.system("pdsh -d -R ssh -l " + username + " -w " + force_node + " mkdir -p " + destination_dir + show_errors)
os.system("pdcp -d -R ssh -l " + username + " -w " + force_node + " " + source_dir + f + " " + destination_dir + show_errors)
if exec_permission==1:
if simply_copy_local:
os.system("chmod a+x " + destination_dir + f + show_errors)
else:
os.system("pdsh -d -R ssh -l " + username + " -w ^" + conf_dir + nodes + " chmod a+x " + destination_dir + f + show_errors)
if v==1:
with open(temp_log, 'r') as f_tmp:
lines = set(f_tmp.read())
for line in lines:
if "Failures" in line:
print(" "+f.ljust(24)+ " -> " ,end="",file=file_log)
print(" "+line.strip(),end="",file=file_log)
print(" for "+str(count_nodes)+" nodes.",end="\n",file=file_log)
if v==1:
print(" Fixed .py files: \t" + str(fixed_headers),file=file_log)
print(" Execution permission:\t", end="",file=file_log)
if exec_permission:
print("+x",file=file_log)
else:
print("N/A",file=file_log)
return(0)
# <codecell>