Attention une mise à jour du service Gitlab va être effectuée le mardi 14 décembre entre 13h30 et 14h00. Cette mise à jour va générer une interruption du service dont nous ne maîtrisons pas complètement la durée mais qui ne devrait pas excéder quelques minutes.

task.py 8.94 KB
Newer Older
1
# coding: utf8
2
import os
3
import sys
4

5
TASK_TIMEOUT = 10 * 60
6

7
def schedule_run(id_sequence, id_config):
8 9
    import time, datetime, sys, os.path
    from subprocess import Popen, PIPE, STDOUT, os
10 11

    id_patient = db.sequence_file[id_sequence].patient_id
12
        
13 14 15
    #check results_file
    row = db( ( db.results_file.config_id == id_config ) & 
             ( db.results_file.sequence_file_id == id_sequence )  
16 17
             ).select()
    
Marc Duez's avatar
Marc Duez committed
18 19 20 21
    ts = time.time()
    data_id = db.results_file.insert(sequence_file_id = id_sequence,
                                     run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
                                     config_id = id_config )
22 23 24
        
    ## check fused_file
    row2 = db( ( db.fused_file.config_id == id_config ) & 
25 26 27
              ( db.fused_file.patient_id == id_patient )  
            ).select()

28 29
    if len(row2) > 0 : ## update
        fuse_id = row2[0].id
30 31 32
    else:             ## create
        fuse_id = db.fused_file.insert(patient_id = id_patient,
                                        config_id = id_config)
33 34 35 36 37 38 39 40 41 42 43 44 45
        
    ##check scheduled run
    row3 = db( ( db.scheduler_task.args == '["' + id_sequence + '", "' + id_config + '", ' + str(data_id) + ', ' + str(fuse_id) + ']' ) 
         & ( db.scheduler_task.status != "FAILED"  )
         & ( db.scheduler_task.status != "EXPIRED"  )
         & ( db.scheduler_task.status != "TIMEOUT"  )
         & ( db.scheduler_task.status != "COMPLETED"  )
         ).select()

    if len(row3) > 0 :
        res = {"message": "run already registered"}
        return res

46

47
    program = db.config[id_config].program
48
    ##add task to scheduler
49
    task = scheduler.queue_task(program, [id_sequence, id_config, data_id, fuse_id]
50
                                , repeats = 1, timeout = TASK_TIMEOUT)
51
    db.results_file[data_id] = dict(scheduler_task_id = task.id)
52

53
    filename= db.sequence_file[id_sequence].filename
54 55 56
    config_name = db.config[id_config].name
    patient_name = db.patient[id_patient].first_name + " " + db.patient[id_patient].last_name

57
    res = {"redirect": "reload",
58
           "message": "%s (%s) %s: process requested" % (patient_name, config_name, filename)}
59

60
    log.info(res)
61 62
    return res

63

64
def run_vidjil(id_file, id_config, id_data, id_fuse):
65
    import time, datetime, sys, os.path
66 67 68
    from subprocess import Popen, PIPE, STDOUT, os
    
    ## les chemins d'acces a vidjil / aux fichiers de sequences
69
    vidjil_path = os.path.abspath(os.path.dirname(sys.argv[0])) + '/../..'
Marc Duez's avatar
Marc Duez committed
70
    germline_folder = vidjil_path + '/germline/'
71
    upload_folder = os.path.abspath(os.path.dirname(sys.argv[0])) + '/applications/vidjil/uploads/'
72
    out_folder = os.path.abspath(os.path.dirname(sys.argv[0])) + '/out-%06d' % id_data +'/'
73
    
74 75 76 77
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    
78 79 80
    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
81
    output_filename = "%06d" % id_data
82
    seq_file = upload_folder+filename
83
    id_patient = row[0].patient_id
84

85
    ## config de vidjil
86
    vidjil_cmd = db.config[id_config].command
Marc Duez's avatar
Marc Duez committed
87
    vidjil_germline = db.config[id_config].germline
88
    
89 90 91 92
    os.makedirs(out_folder)
    vidjil_log_file = open(out_folder+'/'+output_filename+'.vidjil.log', 'w')
    fuse_log_file = open(out_folder+'/'+output_filename+'.fuse.log', 'w')

93
    ## commande complete
94
    cmd = vidjil_path+'/vidjil ' + ' -o  ' + out_folder + " -b " + output_filename
95 96 97
    if not vidjil_germline == 'multi':
        cmd += ' -G ' + germline_folder + vidjil_germline 
    cmd += ' ' + vidjil_cmd + ' '+ seq_file
98
    
99
    ## execute la commande vidjil
100 101 102 103 104
    print "=== Launching Vidjil ==="
    print cmd    
    print "========================"
    sys.stdout.flush()

105
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=vidjil_log_file, stderr=STDOUT, close_fds=True)
106

107
    (stdoutdata, stderrdata) = p.communicate()
108

109
    print "Output file in "+out_folder+'/'+output_filename+'.vidjil.log'
110 111 112
    sys.stdout.flush()
    db.commit()

113
    ## récupération du fichier data.json généré
114
    results_filepath = os.path.abspath(out_folder+'/'+output_filename+".vidjil")
115
    stream = open(results_filepath, 'rb')
116 117
    
    ## insertion dans la base de donnée
118 119
    ts = time.time()
    
120
    db.results_file[id_data] = dict(status = "ready",
121
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
122 123
                                 data_file = stream
                                )
124
    
125 126
    db.commit()
    
127 128 129 130
    ## relance fuse.py 
    output_file = out_folder+"result"
    files = ""
    query = db( ( db.patient.id == db.sequence_file.patient_id )
131
                   & ( db.results_file.sequence_file_id == db.sequence_file.id )
132
                   & ( db.patient.id == id_patient )
133
                   & ( db.results_file.config_id == id_config )
134
                   ).select( orderby=db.sequence_file.id|db.results_file.run_date, groupby=db.sequence_file.id ) 
135
    for row in query :
136 137
        if row.results_file.data_file is not None :
            files += os.path.abspath(os.path.dirname(sys.argv[0])) + "/applications/vidjil/uploads/"+row.results_file.data_file+" "
138
    
139
    cmd = "python ../fuse.py -o "+output_file+" -t 100 "+files
140
    
141
    
142
    print "=== fuse.py ==="
143
    print cmd
144 145
    print "==============="
    sys.stdout.flush()
146
    db.commit()
147
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=fuse_log_file, stderr=STDOUT, close_fds=True)
148
    (stdoutdata, stderrdata) = p.communicate()
149
    print "Output file in "+out_folder+'/'+output_filename+'.fuse.log'
150

151 152 153
    fuse_filepath = os.path.abspath(output_file)
    stream = open(fuse_filepath, 'rb')
    
154 155 156
    ts = time.time()
    
    db.fused_file[id_fuse] = dict(fuse_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
157 158
                                 fused_file = stream)
    
159
    db.commit()
160 161 162 163

#    clean_cmd = "rm -rf " + out_folder 
#    p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
#    p.wait()
164
    
165 166
    ## l'output de Vidjil est stocké comme resultat pour l'ordonnanceur
    ## TODO parse result success/fail
Marc Duez's avatar
Marc Duez committed
167
    return "sucess"
168 169


170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195

def run_fuse_only(id_file, id_config, id_data, id_fuse):
    import time, datetime, sys, os.path
    from subprocess import Popen, PIPE, STDOUT, os
    
    ## les chemins d'acces a vidjil / aux fichiers de sequences
    vidjil_path = os.path.abspath(os.path.dirname(sys.argv[0])) + '/../..'
    germline_folder = vidjil_path + '/germline/'
    upload_folder = os.path.abspath(os.path.dirname(sys.argv[0])) + '/applications/vidjil/uploads/'
    out_folder = os.path.abspath(os.path.dirname(sys.argv[0])) + '/out_'+str(id_data)+'/'
    
    #clean folder
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    cmd = "mkdir "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    
    ## filepath du fichier input (data/clntab)
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
    seq_file = upload_folder+filename
    id_patient = row[0].patient_id
    vidjil_germline = db.config[id_config].germline
    
Marc's avatar
Marc committed
196 197 198 199 200 201
    ## insertion dans la base de donnée
    ts = time.time()
    db.results_file[id_data] = dict(status = "ready",
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
                                )
    
202 203 204 205 206 207 208 209 210
    ## fuse.py 
    output_file = out_folder+"result"
    files = ""
    query = db( ( db.patient.id == db.sequence_file.patient_id )
                   & ( db.results_file.sequence_file_id == db.sequence_file.id )
                   & ( db.patient.id == id_patient )
                   & ( db.results_file.config_id == id_config )
                   ).select( orderby=db.sequence_file.sampling_date ) 
    for row in query :
Marc's avatar
Marc committed
211
        if row.sequence_file.data_file is not None :
212 213
            files += os.path.abspath(os.path.dirname(sys.argv[0])) + "/applications/vidjil/uploads/"+row.sequence_file.data_file+" "
    
214
    cmd = "python ../fuse.py -o "+output_file+" -t 100 "+files
215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236
    print cmd
    
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    print p.stdout.read()
    
    #store fused file
    fuse_filepath = os.path.abspath(output_file)
    stream = open(fuse_filepath, 'rb')
    ts = time.time()
    db.fused_file[id_fuse] = dict(fuse_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
                                 fused_file = stream)
    db.commit()
    
    #clean tmp folder
    clean_cmd = "rm -rf " + out_folder 
    p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    
    return "sucess"


237
from gluon.scheduler import Scheduler
238 239
scheduler = Scheduler(db, dict(vidjil=run_vidjil,
                               none=run_fuse_only))