lib_hadoop_hdfs module

Functions for starting and stopping hadoop, and for sending files to HDFS (or copying files to Lustre).

lib_hadoop_hdfs.cluster_start(wait_time, hadoop_conf_dir, hadoop_dir, file_slaves, temp_dir, username, temp_log, single_node=0, use_lustre_plugin=0, v=0, file_log=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>)[source]

Start hadoop. It returns the number of active nodes after initialization.

wait_time : int
time [s] to wait after each command sent to Hadoop.
hadoop_conf_dir : str
Hadoop configuration folder path [hadoop_home/etc/hadoop].
hadoop_dir : str
Hadoop home folder path.
file_slaves : str
Path to Hadoop slaves file.
temp_dir : str
Path to temporary folder path
username : str
user name (to be used to login into slave nodes through ssh).
temp_log : str
Path to temporary log file.
single_node : int
If 1 will only delete temporary folders on current machine (master), if 0 on all (master and slaves)
use_lustre_plugin : int
1 for Lustre filesystem, 0 for HDFS.
v : int
1 for verbose.
file_log : str
Path to log file.
int_t_nodes : int
(t_nodes) number of nodes up and running in the Hadoop cluster.

Summary:

1. Delete temporary files (otherwise there may be problems with previous process IDs...).
2. Format HDFS (in the future the hadoop service may be running continuously to avoid initialization for every correlation...).
3. Start HDFS.
4. Start YARN.
5. Check running processes.
lib_hadoop_hdfs.cluster_stop(wait_time, hadoop_dir, hadoop_conf_dir, temp_log, timeout=-1, v=0, file_log=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>)[source]

Stop hadoop.

wait_time : int
Time [s] to wait after each command sent to Hadoop.
hadoop_dir : str
Hadoop home folder path.
hadoop_conf_dir : str
Hadoop configuration folder path [hadoop_home/etc/hadoop].
temp_log : str
Path to temporary log file.
timeout : int
If >0 will terminate Hadoop stop command after “timeout” seconds.
v : int
1 for verbose.
file_log : str
Path to log file.
N/A

Summary:

-Stop YARN.
-Stop DFS.
-Wait “wait_time” seconds for termination.
lib_hadoop_hdfs.copy_files_to_hdfs(replication, input_files, data_dir, data_dir_tmp, hadoop_dir, hadoop_conf_dir, hdfs_data_dir, packets_per_hdfs_block, temp_log, copy_delay=0, checksum_size=100, text_mode=1, use_lustre_plugin=0, lustre_prefix='/nobackup1/ajva/hadoop', bm_avoid_copy=0, v=0, file_log=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>)[source]

Copy files from local directories to HDFS. It returns the elapsed time for moving the files (including the applied delay).

replication : int
Number of copies of the same file block in the HDFS system.
input_files : list of str
names of the files to be moved into HDFS/LustreFS.
data_dir : str
Path (in local filesystem) to the folder hosting the files to be moved into HDFS/LustreFS.
data_dir_tmp : str
Path (in local filesystem) to the folder for copy split input data before being moved into HDFS/LustreFS.
hadoop_dir : str
Hadoop home folder path (local filesystem).
hadoop_conf_dir : str
Hadoop configuration folder path (etc/hadoop).
hdfs_data_dir : str
Path (in HDFS/LustreFS), thus relative to “lustre_prefix”, to host input files.
packets_per_hdfs_block : int
Number of VDIF frames per file split.
temp_log : str
Path to temporary log file.
copy_delay : int
Time [s] to wait after each command sent to Hadoop.
checksum_size : int
Number of bytes for checksum (for each split) [this overrides automatic calculation].
text_mode : int
[default 1] If 1 use checksum_size to override value computed automatically.
use_lustre_plugin : int
If 1 use Lustre filesysm.
lustre_prefix : str
Path in Lustre to preceed “hdfs_data_dir” if using Lustre.
bm_avoid_copy : int
[default 0] If 1 it will not split input files if “lustre_prefix”+”hdfs_data_dir” has already the data
for the file in “input_files” from a previous execution. See notes below.
v : int
1 for verbose.
file_log : str
Path to log file.
put_t_s : float
timestamp with start time for loop copying files.
put_t_e : float
timestamp with stop time for loop copying files.
put_d : float
total execution time for loop copying files.

Summary:

-Delete existing files in HDFS (to avoid errors on existing files)
TODO: consider overwritting files.
-Wait for delay if applicable.
-Move files to HDFS (with specified block size)
2015.12.1. packet_size is read from the first frame of the file.


Notes:

-Regarding filesystems:
HDFS: Path inside Hadoop distributed filesystem (accessible from Hadoop).
LustreFS: Path relative to Hadoop Lustre home folder (accessible from Hadoop).
Local filesystem: Path accesible from this command, it can be local, NFS, Lustre, etc.

-Regarding “bm_avoid_copy”:
Always 0 by default.
After each execution, for each processed file there will be a folder in “lustre_prefix”+”hdfs_data_dir”+”file_name”+... with
the splits for that file. Setting this to 1 will avoid to re-split the file if it was already used previously. Use only
for repeated benchmarking.
lib_hadoop_hdfs.distribute_files(simply_copy_local, file_group, files, source_dir, conf_dir, destination_dir, nodes, temp_log, v=0, exec_permission=0, file_log=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>, username='hduser', force_node='')[source]

Copy configuration and application files to nodes.

simply_copy_local : int
If 1 it will only copy app and config files to the specified folder for the current machine.
If 0, it will distribute the app and config files to the rest of the nodes via ssh.
file_group : str
identifier for this batch of files, only for reporting.
source_dir
path to folder with the files that will be distributed.
files
list of files (relative to the “source_dir” folder”).
destination_dir
path to destination folder in the remote machines (thise in “nodes”).
conf_dir
path to folder that includes the file “nodes”.
nodes
filename (relative to “conf_dir”) with one machine per line.
temp_log
handler for intermediate file (buffer) for system calls.
v
verbose if 1.
exec_permission
if 1 it will give execution permissions to the distributed files.
file_log
handler for log file.
username : str
user name (to be used to login into slave nodes through ssh).
force_node : str
if not “”, it will only send the files to this node (“force_node”).
0

TO DO:

(!) It will delete ipython-notebook lines. Need to add this as an option, or add another configuration option for third-party sources.
lib_hadoop_hdfs.fix_file_header(filename='x.py')[source]
Remove the first line which the iPython notebook adds to the .py files, since
files that do not begin with #!/usr/bin/env python may raise errors in hadoop.
filename
path to python script.
N/A
lib_hadoop_hdfs.nodes_to_slaves_masters(conf_dir, hadoop_conf_dir, file_nodes, file_slaves, file_masters, max_slaves=0, master_is_slave=0, v=0, file_log=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>)[source]

Convert list of nodes (obtained by slurm or by local script) into hadoop slaves file.

conf_dir
path to folder where master and slaves files are copied to.
hadoop_conf_dir
path to folder where masters and slaves files are created.
file_nodes
filename for file with all nodes in the allocation (master+slaves), one node per line.
file_slaves
filename for Hadoop slave nodes file.
file_masters
filename for Hadoop master node file.
max_slaves
maximum number of slave nodes.
master_is_slave
if 1 include master node in list of slave nodes.
v
verbose if 1.
file_log
handler for log file.
N/A

TO DO:

Remove hadoop_conf_dir (check correct folder).
lib_hadoop_hdfs.process_hadoop_config_files(list_configurations, pairs_config, templates_dir, conf_dir, v=0, file_log=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>)[source]

Process a set of Hadoop configuration files (core-site.xml, etc).

list_configurations
list of headers in configuration file associated to “pairs_config” below.
pairs_config
list of lists of pairs [[(param0,value0),(param1,value1),...]] to update xml files.
templates_dir
path to folder with Hadoop .xml file templates.
v
verbose if 1.
file_log
handler for log file.
configuration_files
list of str with filenames of processed configuration files.
lib_hadoop_hdfs.process_hcfile(pairs, conf_dir, templates_dir, v=0, file_log=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>)[source]

Process hadoop configuration file.

pairs
list (by file) of lists (param,value) for overriding Hadoop configuration files.
conf_dir
path to folder for placing modified Hadoop configuration files.
templates_dir
path to folder with templates for Hadoop configuration files.
v
verbose if 1.
file_log
handler for log file.
processed_filename
filename of the processed file.

TO DO

Currently assuming that the filename is the first value in the list (i.e. [0][1]),
need to use C_CONF_H_ALL_CONFIG_FILE instead
lib_hadoop_hdfs.update_hcparam(source_file, destination_file, pairs, v=0, file_log=<_io.TextIOWrapper name='<stdout>' mode='w' encoding='UTF-8'>)[source]

Update parameter for hadoop configuration file.

source_file
Template for Hadoop xml configuration file.
destination_file
Final Hadoop xml configuration file (template with mods applied).
pairs
list of pairs [parameter, value]. See notes below.
v
verbose if 1.
file_log
handler for log file.
N/A
If the parameter already exists in the file, its value is overriden.
If the parameter does not exist, it is added following the required format (Hadoop xml).
That is, given a list of pairs [PARAMETER,VALUE]

<configuration>
<property>
<name>PARAMETER</name>
<value>VALUE</value>
</property>
...
</configuration>