Tutorial¶
Note
This is intended to be a tutorial for the user of tlpipe package, who will just use the already presented tasks in the package to do some data analysis. For the developers of this package and those who want to do some developments/continuations, you may want to refer Developer’s guide for a deeper introduction.
Contents
Prepare for the input pipe file¶
An input pipe file is actually a python script file, so it follows plain python syntax, but to emphasis that it is just used as an input pipe file for a data analysis pipeline, usually it is named with a suffix “.pipe” instead of “.py”.
The only required argument to run a data analysis pipeline is the input pipe file, in which one specifies all tasks to be imported and excuted, all parameter settings for each task and also the excuting order (or flow controlling) of the pipeline.
Here we take the waterfall plot as an example to show how to write an input pipe file.
Non-iterative pipeline¶
Create and open an file named plot_wf.pipe (the name can be choosen arbitrary);
Speicify a variable pipe_tasks to hold analysis tasks that will be imported and excuted, and (optionally) a variable pipe_outdir to set the output directory (the default value is ‘./output/’). You can set other parameters related to the pipeline according to your need or just use the default values. All paramters and their default values can be checked by method
show_params()
, note: all these parameters should be prepended with a prefix “pipe_”;1 2 3 4 5 6 7 8 9 10 11 12
# -*- mode: python; -*- # input file for the analysis pipeline # execute this pipeline by either command of the following two: # tlpipe dir/to/plot_wf.pipe # mpiexec -n N tlpipe dir/to/plot_wf.pipe pipe_tasks = [] pipe_outdir = './output/' pipe_logging = 'notset' # pipe_logging = 'info'
Import tasks and set task parameters:
Import
Dispatch
to select data to plot;1 2 3 4 5 6 7 8 9 10 11 12 13
import glob data_dir = 'dir/to/data' # your data directory files = sorted(glob.glob(data_dir+'/*.hdf5')) # all data files as a list # data selection from tlpipe.timestream import dispatch pipe_tasks.append(dispatch.Dispatch) ### parameters for Dispatch dp_input_files = files # data files as list dp_freq_select = (500, 510) # frequency indices, from 500 to 510 dp_feed_select = [1, 2, 32, 33] # feed no. as a list dp_out = 'dp'
Import
Detect
to find and mask noise source signal;1 2 3 4 5 6 7
# find and mask noise source signal from tlpipe.timestream import detect_ns pipe_tasks.append(detect_ns.Detect) ### parameters for Detect dt_in = dp_out # dt_feed = 1 dt_out = 'dt'
Import
Plot
to plot;1 2 3 4 5 6 7
from tlpipe.plot import plot_waterfall pipe_tasks.append(plot_waterfall.Plot) ### parameters for Plot pwf_in = dt_out pwf_flag_ns = True # mask noise source signal pwf_fig_name = 'waterfall/wf' # figure name to save pwf_out = 'pwf'
The final input pipe file looks like download
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 # -*- mode: python; -*- # input file for the analysis pipeline # execute this pipeline by either command of the following two: # tlpipe dir/to/plot_wf.pipe # mpiexec -n N tlpipe dir/to/plot_wf.pipe pipe_tasks = [] pipe_outdir = './output/' pipe_logging = 'notset' # pipe_logging = 'info' import glob data_dir = 'dir/to/data' # your data directory files = sorted(glob.glob(data_dir+'/*.hdf5')) # all data files as a list # data selection from tlpipe.timestream import dispatch pipe_tasks.append(dispatch.Dispatch) ### parameters for Dispatch dp_input_files = files # data files as list dp_freq_select = (500, 510) # frequency indices, from 500 to 510 dp_feed_select = [1, 2, 32, 33] # feed no. as a list dp_out = 'dp' # find and mask noise source signal from tlpipe.timestream import detect_ns pipe_tasks.append(detect_ns.Detect) ### parameters for Detect dt_in = dp_out # dt_feed = 1 dt_out = 'dt' # plot waterfall of selected data from tlpipe.plot import plot_waterfall pipe_tasks.append(plot_waterfall.Plot) ### parameters for Plot pwf_in = dt_out pwf_flag_ns = True # mask noise source signal pwf_fig_name = 'waterfall/wf' # figure name to save pwf_out = 'pwf'
Note
To show all pipeline related parameters and their default values, you can do:
>>> from tlpipe.pipeline import pipeline >>> pipeline.Manager.prefix 'pipe_' >>> pipeline.Manager.show_params() Parameters of Manager: copy: True tasks: [] logging: info flush: False timing: False overwrite: False outdir: output/
Each imported task should be appended into the list pipe_tasks in order to be excuted by the pipeline;
Each task’s paramters should be prepended with its own prefix. See the source file of each task to get the prefix and all paramters that can be set. You can also get the prefix and paramters (and their default values) by the following method (take
Dispatch
for example):>>> from tlpipe.timestream import dispatch >>> dispatch.Dispatch.prefix 'dp_' >>> dispatch.Dispatch.show_params() Parameters of task Dispatch: out: None requires: None in: None iter_start: 0 iter_step: 1 input_files: None iter_num: None copy: False iterable: False output_files: None time_select: (0, None) stop: None libver: latest corr: all exclude: [] check_status: True dist_axis: 0 freq_select: (0, None) feed_select: (0, None) tag_output_iter: True tag_input_iter: True start: 0 mode: r pol_select: (0, None) extra_inttime: 150 days: 1.0 drop_days: 0.0 exclude_bad: True
Usally the input of one task should be ether read from the data files, for example:
1
dp_input_files = files # data files as list
or is the output of a previously excuted task (to construct a task chain), for example:
1
dt_in = dp_out
1
pwf_in = dt_out
Iterative pipeline¶
To make the pipeline iteratively run for several days data, or more than one
group (treat a list of files as a separate group) of data, you should set the
parameter iterable of each task you want to iterate to True, and optionally
specify an iteration number. If no iteration number is specified, the pipeline
will iteratively run until all input data has been processed. Take again the
above waterfall plot as an example, suppose you want to iteratively plot the
waterfall of 2 days data, or two separate groups of data, the input pipe file
plot_wf_iter.pipe download
is like:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 # -*- mode: python; -*- # input file for the analysis pipeline # execute this pipeline by either command of the following two: # tlpipe dir/to/plot_wf_iter.pipe # mpiexec -n N tlpipe dir/to/plot_wf_iter.pipe pipe_tasks = [] pipe_outdir = './output/' pipe_logging = 'notset' # pipe_logging = 'info' import glob data_dir1 = 'dir1/to/data' # your data directory data_dir2 = 'dir2/to/data' # your data directory ### one way files = sorted(glob.glob(data_dir1+'/*.hdf5')) # more than 1 day's data files as a list ### or another way group1 = sorted(glob.glob(data_dir1+'/*.hdf5')) group2 = sorted(glob.glob(data_dir2+'/*.hdf5')) files = [ group1, group2 ] # or two groups of data, each as a list of data files # data selection from tlpipe.timestream import dispatch pipe_tasks.append(dispatch.Dispatch) ### parameters for Dispatch dp_input_files = files # data files as list dp_freq_select = (500, 510) # frequency indices, from 500 to 510 dp_feed_select = [1, 2, 32, 33] # feed no. as a list dp_iterable = True dp_iter_num = 2 # set the number of iterations dp_tag_input_iter = False dp_out = 'dp' # find and mask noise source signal from tlpipe.timestream import detect_ns pipe_tasks.append(detect_ns.Detect) ### parameters for Detect dt_in = dp_out # dt_feed = 1 dt_iterable = True dt_out = 'dt' # plot waterfall of selected data from tlpipe.plot import plot_waterfall pipe_tasks.append(plot_waterfall.Plot) ### parameters for Plot pwf_in = dt_out pwf_iterable = True pwf_flag_ns = True # mask noise source signal pwf_fig_name = 'waterfall/wf' # figure name to save pwf_out = 'pwf'
Note
The number of iterations can be set only once in the first task, as after the first task has been executed the specified number of iterations, it will no longer produce its output for the subsequent tasks, those task will stop to iterate when there is no input for it.
Non-trivial control flow¶
You can run several tasks iteratively, and then run some other tasks non-iteratively when the iterative tasks all have done.
For example, if you want the waterfall plot of two days averaged data,
you can iteratively run several tasks, each iteration for one day data, and
then combine (accumulate and average) the two days data and plot its
waterfall, just as follows shown in plot_wf_nontrivial.pipe
download
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 # -*- mode: python; -*- # input file for the analysis pipeline # execute this pipeline by either command of the following two: # tlpipe dir/to/plot_wf_nontrivial.pipe # mpiexec -n N tlpipe dir/to/plot_wf_nontrivial.pipe pipe_tasks = [] pipe_outdir = './output/' pipe_logging = 'notset' # pipe_logging = 'info' import glob data_dir = 'dir/to/data' # your data directory files = sorted(glob.glob(data_dir+'/*.hdf5')) # at least 2 days data files as a list # data selection from tlpipe.timestream import dispatch pipe_tasks.append(dispatch.Dispatch) ### parameters for Dispatch dp_input_files = files # data files as list dp_freq_select = (500, 510) # frequency indices, from 500 to 510 dp_feed_select = [1, 2, 32, 33] # feed no. as a list dp_iterable = True dp_iter_num = 2 # set the number of iterations dp_tag_input_iter = False dp_out = 'dp' # find and mask noise source signal from tlpipe.timestream import detect_ns pipe_tasks.append(detect_ns.Detect) ### parameters for Detect dt_in = dp_out # dt_feed = 1 dt_iterable = True dt_out = 'dt' # plot waterfall of selected data from tlpipe.plot import plot_waterfall pipe_tasks.append(plot_waterfall.Plot) ### parameters for Plot pwf_in = dt_out pwf_iterable = True pwf_flag_ns = True # mask noise source signal pwf_fig_name = 'waterfall/wf' # figure name to save pwf_out = 'pwf' # convert raw timestream to timestream from tlpipe.timestream import rt2ts pipe_tasks.append(rt2ts.Rt2ts) ### parameters for Rt2ts r2t_in = dt_out # can also be pwf_out as it is the same r2t_iterable = True r2t_out = 'r2t' # re-order the data to have RA from 0 to 2pi from tlpipe.timestream import re_order pipe_tasks.append(re_order.ReOrder) ### parameters for ReOrder ro_in = r2t_out ro_iterable = True ro_out = 'ro' # accumulate the re-ordered data from different days from tlpipe.timestream import accumulate pipe_tasks.append(accumulate.Accum) ### parameters for Accum ac_in = ro_out ac_iterable = True ac_out = 'ac' # barrier above iterative tasks before executing the following tasks. from tlpipe.timestream import barrier pipe_tasks.append(barrier.Barrier) ### parameters for Barrier # average the accumulated data from tlpipe.timestream import average pipe_tasks.append(average.Average) ### parameters for Average av_in = ac_out av_output_files = [ 'average/file_%d.hdf5' %i for i in range(1, 7) ] # here save intermediate results av_out = 'av' # waterfall plot of the averaged data from tlpipe.plot import plot_waterfall pipe_tasks.append((plot_waterfall.Plot, 'pwf1_')) # here use a new prefix pwf1_ instead of the default pwf_ to discriminate from the previous plot_waterfall ### parameters for Plot pwf1_in = av_out pwf1_input_files = av_output_files # here you can read data from the saved intermediate data files if you do not set pwf1_in pwf1_flag_ns = True pwf1_fig_name = 'vis_av/vis' pwf1_out = 'pwf1'
Note
Notice the use of the task Barrier
to
block the control flow before the executing of its subsequent tasks. As
the task Barrier
won’t get its input
from any other tasks, the pipeline will restart at the begining every time
when it gets to execute Barrier
. Once
everything before Barrier
has been
executed, it will unblocks its subsequent tasks and allow them to proceed
normally.
Note
Note in real data analysis, the data should be RFI flagged, calibrated, and maybe some other processes done before the data accumulating and averaging, here for simplicity and easy understanding, we have omitted all those processes. One can refer to the real data analysis pipeline input files in the package’s input directory.
Execute several times a same task¶
Special care need to be taken when executing several times a same task. Since the input pipe file is just a plain python script, it will be first executed before the parameters parsing process, the assignment of a variable will override the same named variable before it during the excuting of the pipe file script. So for the need of executing several times a same task, different prefixes should be set for each of these tasks (i.e., except for the first appeared which could have just use the default prefix of the task, all others need to set a different prefix). To do this, you need to append a 2-tuple to the list pipe_tasks, with its first element being the imported task, and the second element being a new prefix to use. See for example the line
1 pipe_tasks.append((plot_waterfall.Plot, 'pwf1_')) # here use a new prefix pwf1_ instead of the default pwf_ to discriminate from the previous plot_waterfall
in plot_wf_nontrivial.pipe in the above example.
Save intermediate data¶
To save data that has been processed by one task (used for maybe break point recovery, etc.), you can just set the output_files paramter of this task to be a list of file names (can only save as hdf5 data files), then data will be split into almost equal chunks along the time axis and save each chunk to one of the data file. For example, see the line
1 av_output_files = [ 'average/file_%d.hdf5' %i for i in range(1, 7) ] # here save intermediate results
in plot_wf_nontrivial.pipe in the above example.
Recovery from intermediate data¶
You can recovery the pipeline from a break point (where you have saved the intermediate data) by reading data from data files you have saved. To do this, instead of set the in parameter, you need to set the input_files paramter to a list with elements being the saved data files. For example, see the line
1 pwf1_input_files = av_output_files # here you can read data from the saved intermediate data files if you do not set pwf1_in
in plot_wf_nontrivial.pipe in the above example.
Note
If the in paramter and the input_files parameter are both set, the task will get its input from the in paramter instead of reading data from the input_files as it is much slower to read the data from the files. So in order to recovery from the break point, you should not set the in parameter, or should set in to be None, which is the default value.
Run the pipeline¶
Single process run¶
If you do not have an MPI environment installed, or you just want a single process run, just do (in case plot_wf.pipe is in you working directory)
$ tlpipe plot_wf.pipe
or (in case plot_wf.pipe isn’t in you working directory)
$ tlpipe dir/to/plot_wf.pipe
If you want to submit and run the pipeline in the background, do like
$ nohup tlpipe dir/to/plot_wf.pipe &> output.txt &
Multiple processes run¶
To run the pipeline in parallel and distributed maner on a cluster using multiple processes, you can do something like (in case plot_wf.pipe is in you working directory)
$ mpiexec -n N tlpipe plot_wf.pipe
or (in case plot_wf.pipe isn’t in you working directory)
$ mpiexec -n N tlpipe dir/to/plot_wf.pipe
If you want to submit and run the pipeline in the background on several nodes, for example, node2, node3, node4, do like
$ nohup mpiexec -n N -host node2,node3,node4 --map-by node tlpipe dir/to/plot_wf.pipe &> output.txt &
Note
In the above commands, N is the number of processes you want to run!
Pipeline products and intermediate results¶
Pipeline products and intermediate results will be in the directory setting by pipe_outdir.
Other excutable commands¶
h5info: Check what’s in a (or a list of) HDF5 data file(s). For its use, do some thing like
$ h5info data.hdf5
or
$ h5info data1.hdf5, data2.hdf5, data3.hdf5