task.py 12.4 KB
Newer Older
1
# coding: utf8
2
import os
3
import sys
4
import defs
5
import re
6

Vidjil Team's avatar
Vidjil Team committed
7
TASK_TIMEOUT = 60 * 60
8

9
def schedule_run(id_sequence, id_config):
10
11
    import time, datetime, sys, os.path
    from subprocess import Popen, PIPE, STDOUT, os
12
13

    id_patient = db.sequence_file[id_sequence].patient_id
14
        
15
16
17
    #check results_file
    row = db( ( db.results_file.config_id == id_config ) & 
             ( db.results_file.sequence_file_id == id_sequence )  
18
19
             ).select()
    
Marc Duez's avatar
Marc Duez committed
20
21
22
23
    ts = time.time()
    data_id = db.results_file.insert(sequence_file_id = id_sequence,
                                     run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
                                     config_id = id_config )
24
25
26
        
    ## check fused_file
    row2 = db( ( db.fused_file.config_id == id_config ) & 
27
28
29
              ( db.fused_file.patient_id == id_patient )  
            ).select()

30
31
    if len(row2) > 0 : ## update
        fuse_id = row2[0].id
32
33
34
    else:             ## create
        fuse_id = db.fused_file.insert(patient_id = id_patient,
                                        config_id = id_config)
35
36
        
    ##check scheduled run
37
    row3 = db( ( db.scheduler_task.args == '["' + str(id_sequence) + '", "' + str(id_config) + '", ' + str(data_id) + ', ' + str(fuse_id) + ']' ) 
38
39
40
41
42
43
44
45
         & ( db.scheduler_task.status != "FAILED"  )
         & ( db.scheduler_task.status != "EXPIRED"  )
         & ( db.scheduler_task.status != "TIMEOUT"  )
         & ( db.scheduler_task.status != "COMPLETED"  )
         ).select()

    if len(row3) > 0 :
        res = {"message": "run already registered"}
46
        log.error(res)
47
48
        return res

49

50
    program = db.config[id_config].program
51
    ##add task to scheduler
52
    task = scheduler.queue_task(program, [id_sequence, id_config, data_id, fuse_id]
53
                                , repeats = 1, timeout = TASK_TIMEOUT)
54
    db.results_file[data_id] = dict(scheduler_task_id = task.id)
55

56
    filename= db.sequence_file[id_sequence].filename
57

58
    res = {"redirect": "reload",
59
           "message": "[%s] (%s) c%s: process requested - %s" % (data_id, id_patient, id_config, filename)}
60

61
    log.info(res)
62
63
    return res

64

Mathieu Giraud's avatar
Mathieu Giraud committed
65
def run_vidjil(id_file, id_config, id_data, id_fuse, clean_before=False, clean_after=False):
66
    import time, datetime, sys, os.path
67
68
69
    from subprocess import Popen, PIPE, STDOUT, os
    
    ## les chemins d'acces a vidjil / aux fichiers de sequences
70
    germline_folder = defs.DIR_VIDJIL + '/germline/'
71
72
    upload_folder = defs.DIR_SEQUENCES
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data
73
    
74
75
76
77
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    
78
79
80
    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
81
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
82
    seq_file = upload_folder+filename
83

84
    ## config de vidjil
85
    vidjil_cmd = db.config[id_config].command
86
    vidjil_cmd = vidjil_cmd.replace( 'germline/' ,germline_folder)
87
    
88
    os.makedirs(out_folder)
89
90
    out_log = out_folder+'/'+output_filename+'.vidjil.log'
    vidjil_log_file = open(out_log, 'w')
91

92
    ## commande complete
93
    cmd = defs.DIR_VIDJIL + '/vidjil ' + ' -o  ' + out_folder + " -b " + output_filename
94
    cmd += ' ' + vidjil_cmd + ' '+ seq_file
95
    
96
    ## execute la commande vidjil
97
98
99
100
101
    print "=== Launching Vidjil ==="
    print cmd    
    print "========================"
    sys.stdout.flush()

102
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=vidjil_log_file, stderr=STDOUT, close_fds=True)
103

104
    (stdoutdata, stderrdata) = p.communicate()
105

106
    print "Output log in "+out_folder+'/'+output_filename+'.vidjil.log'
107
108
109
    sys.stdout.flush()
    db.commit()

110
    ## récupération du fichier data.json généré
111
    results_filepath = os.path.abspath(out_folder+'/'+output_filename+".vidjil")
112
113
114
115
116

    try:
        stream = open(results_filepath, 'rb')
    except IOError:
        print "!!! Vidjil failed, no .vidjil file"
117
118
        res = {"message": "[%s] c%s: Vidjil FAILED - %s" % (id_data, id_config, out_folder)}
        log.error(res)
119
        raise IOError
120
    
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
    ## Parse some info in .log
    vidjil_log_file.close()

    segmented = re.compile("==> segmented (\d+) reads \((\d*\.\d+|\d+)%\)")
    windows = re.compile("==> found (\d+) .*-windows in .* segments .* inside (\d+) sequences")
    info = ''
    reads = None
    segs = None
    ratio = None
    wins = None
    for l in open(out_log):
        m = segmented.search(l)
        if m:
            print l,
            segs = int(m.group(1))
            ratio = m.group(2)
            info = "%d segmented (%s%%)" % (segs, ratio)
            continue
        m = windows.search(l)
        if m:
            print l,
            wins = int(m.group(1))
            reads = int(m.group(2))
            info = "%d reads, " % reads + info + ", %d windows" % wins
            break


148
    ## insertion dans la base de donnée
149
150
    ts = time.time()
    
151
    db.results_file[id_data] = dict(status = "ready",
152
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
153
154
                                 data_file = stream
                                )
155
    
156
157
    db.commit()
    
Mathieu Giraud's avatar
Mathieu Giraud committed
158
159
160
161
    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
162
    
Mathieu Giraud's avatar
Mathieu Giraud committed
163
164
    ## l'output de Vidjil est stocké comme resultat pour l'ordonnanceur
    ## TODO parse result success/fail
165

166
167
    config_name = db.config[id_config].name

168
    res = {"message": "[%s] c%s: Vidjil finished - %s - %s" % (id_data, id_config, info, out_folder)}
169
170
    log.info(res)

Mathieu Giraud's avatar
Mathieu Giraud committed
171
    run_fuse(id_file, id_config, id_data, id_fuse, clean_before = False)
172

Mathieu Giraud's avatar
Mathieu Giraud committed
173
    return "SUCCESS"
174

175

176

177
178
179
180
181
182
def run_copy(id_file, id_config, id_data, id_fuse, clean_before=False, clean_after=False):
    import time, datetime, sys, os.path
    from subprocess import Popen, PIPE, STDOUT, os
    
    ## les chemins d'acces a vidjil / aux fichiers de sequences
    upload_folder = defs.DIR_SEQUENCES
183
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
184
185
186
187
188
189
190
191
192
193
194
195
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data
    
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    
    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
    
    os.makedirs(out_folder)
    vidjil_log_file = open(out_folder+'/'+output_filename+'.vidjil.log', 'w')
196

197
198
199
200
201
202
    print "Output log in "+out_folder+'/'+output_filename+'.vidjil.log'
    sys.stdout.flush()
    db.commit()
    
    ## récupération du fichier 
    results_filepath = os.path.abspath(defs.DIR_SEQUENCES+row[0].data_file)
203

204
205
206
    try:
        stream = open(results_filepath, 'rb')
    except IOError:
207
208
209
        print "!!! 'copy' failed, no file"
        res = {"message": "[%s] c%s: 'copy' FAILED - %s - %s" % (id_data, id_config, info, out_folder)}
        log.error(res)
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
        raise IOError
    
    ## insertion dans la base de donnée
    ts = time.time()
    
    db.results_file[id_data] = dict(status = "ready",
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
                                 #data_file = row[0].data_file
                                 data_file = stream
                                )
    db.commit()
    
    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
    
    ## l'output de Vidjil est stocké comme resultat pour l'ordonnanceur
    ## TODO parse result success/fail

230
    res = {"message": "[%s] c%s: 'copy' finished - %s" % (id_data, id_config, filename)}
231
232
    log.info(res)

233
234
235
    run_fuse(id_file, id_config, id_data, id_fuse, clean_before = False)

    return "SUCCESS"
Mathieu Giraud's avatar
Mathieu Giraud committed
236
237
238
239



def run_fuse(id_file, id_config, id_data, id_fuse, clean_before=True, clean_after=False):
240
241
242
    import time, datetime, sys, os.path
    from subprocess import Popen, PIPE, STDOUT, os
    
243
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data
244
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
245
    
Mathieu Giraud's avatar
Mathieu Giraud committed
246
247
248
249
250
    if clean_before:
        cmd = "rm -rf "+out_folder 
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
        os.makedirs(out_folder)    
251
252
253
254
    
    row = db(db.sequence_file.id==id_file).select()
    id_patient = row[0].patient_id
    
Mathieu Giraud's avatar
Mathieu Giraud committed
255
    fuse_log_file = open(out_folder+'/'+output_filename+'.fuse.log', 'w')
Marc's avatar
Marc committed
256
    
257
    ## fuse.py 
258
    output_file = out_folder+'/'+output_filename+'.fused'
259
    files = ""
260
    sequence_file_list = ""
Marc Duez's avatar
Marc Duez committed
261
    query2 = db( ( db.patient.id == db.sequence_file.patient_id )
262
263
264
                   & ( db.results_file.sequence_file_id == db.sequence_file.id )
                   & ( db.patient.id == id_patient )
                   & ( db.results_file.config_id == id_config )
Vidjil Team's avatar
Vidjil Team committed
265
                   ).select( orderby=db.sequence_file.id|~db.results_file.run_date) 
Marc Duez's avatar
Marc Duez committed
266
267
268
269
270
271
272
    query = []
    sequence_file_id = 0
    for row in query2 : 
        if row.sequence_file.id != sequence_file_id :
            query.append(row)
            sequence_file_id = row.sequence_file.id
            
273
    for row in query :
Mathieu Giraud's avatar
Mathieu Giraud committed
274
275
        if row.results_file.data_file is not None :
            files += defs.DIR_RESULTS + row.results_file.data_file + " "
276
277
            sequence_file_list += str(row.results_file.sequence_file_id) + "_"
            
278
    
279
    cmd = "python "+defs.DIR_FUSE+"/fuse.py -o "+output_file+" -t 100 "+files
Mathieu Giraud's avatar
Mathieu Giraud committed
280
281
282


    print "=== fuse.py ==="
283
    print cmd
Mathieu Giraud's avatar
Mathieu Giraud committed
284
285
286
287
288
289
290
    print "==============="
    sys.stdout.flush()

    p = Popen(cmd, shell=True, stdin=PIPE, stdout=fuse_log_file, stderr=STDOUT, close_fds=True)
    (stdoutdata, stderrdata) = p.communicate()
    print "Output log in "+out_folder+'/'+output_filename+'.fuse.log'

291
    fuse_filepath = os.path.abspath(output_file)
292
293
294
295
296

    try:
        stream = open(fuse_filepath, 'rb')
    except IOError:
        print "!!! Fuse failed, no .fused file"
297
298
        res = {"message": "[%s] c%s: 'fuse' FAILED - %s" % (id_data, id_config, output_file)}
        log.error(res)
299
300
        raise IOError

301
302
    ts = time.time()
    db.fused_file[id_fuse] = dict(fuse_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
303
304
                                 fused_file = stream,
                                 sequence_file_list = sequence_file_list)
305
    db.commit()
Mathieu Giraud's avatar
Mathieu Giraud committed
306
307
308
309
310

    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
311
    
312
    res = {"message": "[%s] c%s: 'fuse' finished - %s" % (id_data, id_config, output_file)}
313
314
    log.info(res)

315
    return "SUCCESS"
316

317
def custom_fuse(file_list):
318
    import time, datetime, sys, os.path, random, xmlrpclib
319
320
321
322
    from subprocess import Popen, PIPE, STDOUT, os
    
    random_id = random.randint(99999999,99999999999)
    out_folder = defs.DIR_OUT_VIDJIL_ID % random_id
323
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % random_id
324
325
326
327
328
    
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    os.makedirs(out_folder)    
329

330
    res = {"message": "'custom fuse' (%d files): %s" % (len(file_list), ','.join(file_list))}
331
332
    log.info(res)
        
333
334
335
336
337
338
339
    ## fuse.py 
    output_file = out_folder+'/'+output_filename+'.fused'
    files = ""
    for id in file_list :
        if db.results_file[id].data_file is not None :
            files += defs.DIR_RESULTS + db.results_file[id].data_file + " "
    
340
    cmd = "python "+ os.path.abspath(defs.DIR_FUSE) +"/fuse.py -o "+output_file+" -t 100 "+files
341
    proc_srvr = xmlrpclib.ServerProxy("http://localhost:%d" % defs.PORT_FUSE_SERVER)
342
    fuse_filepath = proc_srvr.fuse(cmd, out_folder, output_filename)
343
344
345
346
347
    
    try:
        f = open(fuse_filepath, 'rb')
        data = gluon.contrib.simplejson.loads(f.read())
    except IOError:
348
349
        res = {"message": "'custom fuse' -> IOError"}
        log.error(res)
350
351
352
353
354
355
        raise IOError

    clean_cmd = "rm -rf " + out_folder 
    p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()

356
357
358
    res = {"message": "'custom fuse' -> finished"}
    log.info(res)

359
    return data
360

361
from gluon.scheduler import Scheduler
362
scheduler = Scheduler(db, dict(vidjil=run_vidjil,
363
                               none=run_copy))