task.py 28.3 KB
Newer Older
1
# coding: utf8
2
import json
3
import os
4
import defs
5
import re
6
import os.path
7
8
9
import time
import sys
import datetime
10
11
import random
import xmlrpclib
12
import tools_utils
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

def assert_scheduler_task_does_not_exist(args):
    ##check scheduled run
    row = db( ( db.scheduler_task.args == args)
         & ( db.scheduler_task.status != "FAILED"  )
         & ( db.scheduler_task.status != "EXPIRED"  )
         & ( db.scheduler_task.status != "TIMEOUT"  )
         & ( db.scheduler_task.status != "COMPLETED"  )
         ).select()

    if len(row) > 0 :
        res = {"message": "task already registered"}
        return res

    return None

29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def compute_contamination(sequence_file_id, results_file_id, config_id):
    result = []
    for i in range(0, len(sequence_file_id)) :
        result.append({})
        result[i]["total_clones"]=0
        result[i]["total_reads"]=0
        result[i]["sample"]={}
        
        #open sample
        with open(defs.DIR_RESULTS+db.results_file[results_file_id[i]].data_file, "r") as json_data:
            d = json.load(json_data)
            list1 = {}
            total_reads1=d["reads"]["segmented"][0]
            for clone in d["clones"]:
                list1[clone["id"]] = clone["reads"][0]
            json_data.close()
        
        #iterate trough others run's samples
        sample_set_run = db( (db.sample_set_membership.sequence_file_id == sequence_file_id[i])
                           & (db.sample_set_membership.sample_set_id == db.sample_set.id)
                           & (db.sample_set.sample_type == "run") ).select().first()
        
        if sample_set_run != None :
            sample_set_id = sample_set_run.sample_set.id
            query = db(  ( db.sample_set.id == sample_set_id )
                       & ( db.sample_set.id == db.sample_set_membership.sample_set_id )
                       & ( db.sequence_file.id == db.sample_set_membership.sequence_file_id)
                       & ( db.sequence_file.id != sequence_file_id[i])
                       & ( db.results_file.sequence_file_id == db.sequence_file.id )
                       & ( db.results_file.config_id == config_id[i]  )
                       ).select(db.sequence_file.ALL,db.results_file.ALL, db.sample_set.id, orderby=db.sequence_file.id|~db.results_file.run_date)

            query2 = []
            sfi = 0
            for row in query : 
                if row.sequence_file.id != sfi :
                    query2.append(row)
                    sfi = row.sequence_file.id
            
            for row in query2 :
                print defs.DIR_RESULTS+row.results_file.data_file
                result[i]["sample"][row.results_file.id] = {}
                result[i]["sample"][row.results_file.id]["clones"] = 0
                result[i]["sample"][row.results_file.id]["reads"] = 0
                with open(defs.DIR_RESULTS+row.results_file.data_file, "r") as json_data2:
                    try:
                        d = json.load(json_data2)
                        total_reads2=d["reads"]["segmented"][0]
                        for clone in d["clones"]:
                            if clone["id"] in list1 :
                                if clone["reads"][0] > 10*list1[clone["id"]] :
                                    result[i]["total_clones"] += 1
                                    result[i]["total_reads"] += list1[clone["id"]]
                                    result[i]["sample"][row.results_file.id]["clones"] += 1
                                    result[i]["sample"][row.results_file.id]["reads"] += list1[clone["id"]]
                            
                    except ValueError, e:
                        print 'invalid_json'
                    json_data2.close()


    return result
    
92

93
def schedule_run(id_sequence, id_config, grep_reads=None):
94
    from subprocess import Popen, PIPE, STDOUT, os
95

96
97
98
    #check results_file
    row = db( ( db.results_file.config_id == id_config ) & 
             ( db.results_file.sequence_file_id == id_sequence )  
99
100
             ).select()
    
Marc Duez's avatar
Marc Duez committed
101
102
103
104
    ts = time.time()
    data_id = db.results_file.insert(sequence_file_id = id_sequence,
                                     run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
                                     config_id = id_config )
105
        
106
    args = [id_sequence, id_config, data_id, grep_reads]
107

108
109
110
111
    err = assert_scheduler_task_does_not_exist(str(args))
    if err:
        log.error(err)
        return err
112

113
    program = db.config[id_config].program
114
    ##add task to scheduler
115
116
    task = scheduler.queue_task(program, args,
                                repeats = 1, timeout = defs.TASK_TIMEOUT)
117
118
119
120
    
    if db.sequence_file[id_sequence].pre_process_flag == "WAIT" or db.sequence_file[id_sequence].pre_process_flag == "RUN" : 
        db.scheduler_task[task.id] = dict(status ="STOPPED")
    
121
    db.results_file[data_id] = dict(scheduler_task_id = task.id)
122

123
    filename= db.sequence_file[id_sequence].filename
124

125
    res = {"redirect": "reload",
126
           "message": "[%s] c%s: process requested - %s %s" % (data_id, id_config, grep_reads, filename)}
127

128
    log.info(res)
129
130
    return res

Ryan Herbert's avatar
Ryan Herbert committed
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def schedule_fuse(id_sample_set, id_config):
    sample_set = db.sample_set[id_sample_set]

    sequence_file_id = db((db.sample_set_membership.sample_set_id == sample_set.id)
                        &(db.sample_set_membership.sequence_file_id == db.sequence_file.id)
                        ).select(db.sequence_file.id).first().id

    results_file_id = db((db.results_file.sequence_file_id == sequence_file_id)
                        & (db.results_file.config_id == id_config)
                        ).select(db.results_file.id).first().id

    args = [sequence_file_id, id_config, results_file_id, id_sample_set, False]
    task = scheduler.queue_task('fuse', args,
                                repeats = 1, timeout = defs.TASK_TIMEOUT)
145

146
def run_vidjil(id_file, id_config, id_data, grep_reads,
147
               clean_before=False, clean_after=False):
148
    from subprocess import Popen, PIPE, STDOUT, os
149
150
151
    from datetime import timedelta as timed
    
    ## re schedule if pre_process is still pending
marc's avatar
marc committed
152
    if db.sequence_file[id_file].pre_process_flag == "WAIT" or db.sequence_file[id_file].pre_process_flag == "RUN" :
153
        
154
        print "Pre-process is still pending, re-schedule"
155
    
156
        args = [id_file, id_config, id_data, grep_reads]
157
158
        task = scheduler.queue_task("vidjil", args,
                        repeats = 1, timeout = defs.TASK_TIMEOUT,
159
                               start_time=request.now + timed(seconds=1200))
160
161
162
163
164
165
166
        db.results_file[id_data] = dict(scheduler_task_id = task.id)
        db.commit()
        print task.id
        sys.stdout.flush()
        
        return "SUCCESS"
    
marc's avatar
marc committed
167
168
169
170
    if db.sequence_file[id_file].pre_process_flag == "FAILED" :
        print "Pre-process has failed"
        raise ValueError('pre-process has failed')
        return "FAIL"
171
172
    
    ## les chemins d'acces a vidjil / aux fichiers de sequences
173
    germline_folder = defs.DIR_GERMLINE
174
175
    upload_folder = defs.DIR_SEQUENCES
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data
176
    
177
178
179
180
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    
181
182
183
    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
184
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
185
    seq_file = upload_folder+filename
186

187
    ## config de vidjil
188
    vidjil_cmd = db.config[id_config].command
189
    vidjil_cmd = vidjil_cmd.replace( ' germline' ,germline_folder)
190
191
192
193

    if grep_reads:
        # TODO: security, assert grep_reads XXXX
        vidjil_cmd += ' -FaW "%s" ' % grep_reads
194
    
195
    os.makedirs(out_folder)
196
197
    out_log = out_folder+'/'+output_filename+'.vidjil.log'
    vidjil_log_file = open(out_log, 'w')
198

199
    try:
200
201
202
203
204
205
206
207
208
        ## commande complete
        cmd = defs.DIR_VIDJIL + '/vidjil ' + ' -o  ' + out_folder + " -b " + output_filename
        cmd += ' ' + vidjil_cmd + ' '+ seq_file

        ## execute la commande vidjil
        print "=== Launching Vidjil ==="
        print cmd    
        print "========================"
        sys.stdout.flush()
209

210
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=vidjil_log_file, stderr=STDOUT, close_fds=True)
211

212
        (stdoutdata, stderrdata) = p.communicate()
213

214
215
216
        print "Output log in " + out_log
        sys.stdout.flush()
        db.commit()
217

218
219
220
221
222
        ## Get result file
        if grep_reads:
            out_results = out_folder + '/seq/clone.fa-1'
        else:
            out_results = out_folder + '/' + output_filename + '.vidjil'
223

224
225
        print "===>", out_results
        results_filepath = os.path.abspath(out_results)
226
227

        stream = open(results_filepath, 'rb')
228
    except:
229
        print "!!! Vidjil failed, no result file"
230
231
        res = {"message": "[%s] c%s: Vidjil FAILED - %s" % (id_data, id_config, out_folder)}
        log.error(res)
232
        raise
233
    
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
    ## Parse some info in .log
    vidjil_log_file.close()

    segmented = re.compile("==> segmented (\d+) reads \((\d*\.\d+|\d+)%\)")
    windows = re.compile("==> found (\d+) .*-windows in .* segments .* inside (\d+) sequences")
    info = ''
    reads = None
    segs = None
    ratio = None
    wins = None
    for l in open(out_log):
        m = segmented.search(l)
        if m:
            print l,
            segs = int(m.group(1))
            ratio = m.group(2)
            info = "%d segmented (%s%%)" % (segs, ratio)
            continue
        m = windows.search(l)
        if m:
            print l,
            wins = int(m.group(1))
            reads = int(m.group(2))
            info = "%d reads, " % reads + info + ", %d windows" % wins
            break


261
    ## insertion dans la base de donnée
262
263
    ts = time.time()
    
264
    db.results_file[id_data] = dict(status = "ready",
265
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
266
267
                                 data_file = stream
                                )
268
    
269
270
    db.commit()
    
Mathieu Giraud's avatar
Mathieu Giraud committed
271
272
273
274
    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
275
    
Mathieu Giraud's avatar
Mathieu Giraud committed
276
277
    ## l'output de Vidjil est stocké comme resultat pour l'ordonnanceur
    ## TODO parse result success/fail
278

279
280
    config_name = db.config[id_config].name

281
    res = {"message": "[%s] c%s: Vidjil finished - %s - %s" % (id_data, id_config, info, out_folder)}
282
283
    log.info(res)

284
285

    if not grep_reads:
286
287
288
        for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
	    sample_set_id = row.sample_set_id
	    print row.sample_set_id
289
            run_fuse(id_file, id_config, id_data, sample_set_id, clean_before = False)
290

Mathieu Giraud's avatar
Mathieu Giraud committed
291
    return "SUCCESS"
292

293
def run_mixcr(id_file, id_config, id_data, clean_before=False, clean_after=False):
294
295
    from subprocess import Popen, PIPE, STDOUT, os
    import time
296
    import json
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315

    upload_folder = defs.DIR_SEQUENCES
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data

    cmd = "rm -rf "+out_folder
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()

    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
    seq_file = upload_folder+filename

    ## config de vidjil
    arg_cmd = db.config[id_config].command

    os.makedirs(out_folder)
    out_log = out_folder + output_filename+'.mixcr.log'
316
    report = out_folder + output_filename + '.mixcr.report'
317
318
319
320
    log_file = open(out_log, 'w')

    out_alignments = out_folder + output_filename + '.align.vdjca'
    out_clones =  out_folder + output_filename + '.clones.clns'
321
322
    out_results_file = output_filename + '.mixcr'
    out_results = out_folder + out_results_file
323

324
    align_report = report + '.aln'
325
    assembly_report = report + '.asmbl'
326

327
328
329
330
331
332
333
334
    arg_cmds = arg_cmd.split('|')
    args_1, args_2, args_3 = '', '', ''
    try:
        args_1, args_2, args_3 = arg_cmds
    except:
        print arg_cmd
        print "! Bad arguments, we expect args_align | args_assemble | args_exportClones"

335
    ## commande complete
336
    try:
337
338
339
340
341
342
343
344
345
346
347
348
349
        mixcr = defs.DIR_MIXCR + 'mixcr'
        cmd = mixcr + ' align --save-reads -t 1 -r ' + align_report + ' ' + args_1 + ' ' + seq_file  + ' ' + out_alignments
        cmd += ' && '
        cmd += mixcr + ' assemble -t 1 -r ' + assembly_report + ' ' + args_2 + ' ' + out_alignments + ' ' + out_clones
        cmd += ' && rm ' + out_alignments
        cmd += ' && '
        cmd += mixcr + ' exportClones --format vidjil -germline -id -name -reads -sequence -top -seg -s ' + args_3 + ' ' + out_clones + ' ' + out_results

        ## execute la commande MiXCR
        print "=== Launching MiXCR ==="
        print cmd
        print "========================"
        sys.stdout.flush()
350

351
352
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=log_file, stderr=STDOUT, close_fds=True)
        p.wait()
353

354
355
        print "Output log in " + out_log
        sys.stdout.flush()
356

357
358
359
        ## Get result file
        print "===>", out_results
        results_filepath = os.path.abspath(out_results)
HERBERT Ryan's avatar
HERBERT Ryan committed
360
        stream = open(results_filepath, 'rb')
361
        stream.close()
362
    except:
HERBERT Ryan's avatar
HERBERT Ryan committed
363
364
365
        print "!!! MiXCR failed, no result file"
        res = {"message": "[%s] c%s: MiXCR FAILED - %s" % (id_data, id_config, out_folder)}
        log.error(res)
366
        raise
367

HERBERT Ryan's avatar
HERBERT Ryan committed
368
369
370
    align_report = get_file_content(align_report)
    assembly_report = get_file_content(assembly_report)
    reports = align_report + assembly_report
371
    original_name = row[0].data_file
HERBERT Ryan's avatar
HERBERT Ryan committed
372
    totalReads = extract_total_reads(assembly_report)
373
374
    with open(results_filepath, 'r') as json_file:
        my_json = json.load(json_file)
HERBERT Ryan's avatar
HERBERT Ryan committed
375
376
        fill_field(my_json, reports, "log", "samples", True)
        fill_field(my_json, original_name, "original_names", "samples")
377
        fill_field(my_json, cmd, "commandline", "samples")
HERBERT Ryan's avatar
HERBERT Ryan committed
378
        fill_field(my_json, totalReads, "total", "reads")
379

380
381
382
383
        # TODO fix this dirty hack to get around bad file descriptor error
    new_file = open(results_filepath, 'w')
    json.dump(my_json, new_file)
    new_file.close()
384

385
386
    ## insertion dans la base de donnée
    ts = time.time()
HERBERT Ryan's avatar
HERBERT Ryan committed
387
    
388
    stream = open(results_filepath, 'rb')
389
390
    db.results_file[id_data] = dict(status = "ready",
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
HERBERT Ryan's avatar
HERBERT Ryan committed
391
                                 data_file = stream
392
                                )
HERBERT Ryan's avatar
HERBERT Ryan committed
393
    
394
395
396
397
398
399
    db.commit()

    config_name = db.config[id_config].name
    res = {"message": "[%s] c%s: MiXCR finished - %s" % (id_data, id_config, out_folder)}
    log.info(res)

HERBERT Ryan's avatar
HERBERT Ryan committed
400
401
402
    for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
        sample_set_id = row.sample_set_id
        print row.sample_set_id
403
        run_fuse(id_file, id_config, id_data, sample_set_id, clean_before = False)
HERBERT Ryan's avatar
HERBERT Ryan committed
404
405


406
    return "SUCCESS"
407

408
def run_copy(id_file, id_config, id_data, clean_before=False, clean_after=False):
409
410
411
412
    from subprocess import Popen, PIPE, STDOUT, os
    
    ## les chemins d'acces a vidjil / aux fichiers de sequences
    upload_folder = defs.DIR_SEQUENCES
413
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
414
415
416
417
418
419
420
421
422
423
424
425
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data
    
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    
    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
    
    os.makedirs(out_folder)
    vidjil_log_file = open(out_folder+'/'+output_filename+'.vidjil.log', 'w')
426

427
428
429
430
431
432
    print "Output log in "+out_folder+'/'+output_filename+'.vidjil.log'
    sys.stdout.flush()
    db.commit()
    
    ## récupération du fichier 
    results_filepath = os.path.abspath(defs.DIR_SEQUENCES+row[0].data_file)
433

434
435
436
    try:
        stream = open(results_filepath, 'rb')
    except IOError:
437
438
439
        print "!!! 'copy' failed, no file"
        res = {"message": "[%s] c%s: 'copy' FAILED - %s - %s" % (id_data, id_config, info, out_folder)}
        log.error(res)
440
441
442
443
444
445
446
447
        raise IOError
    
    ## insertion dans la base de donnée
    ts = time.time()
    
    db.results_file[id_data] = dict(status = "ready",
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
                                 #data_file = row[0].data_file
448
                                 data_file = db.results_file.data_file.store(stream, row[0].filename)
449
450
451
452
453
454
455
456
457
458
459
                                )
    db.commit()
    
    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
    
    ## l'output de Vidjil est stocké comme resultat pour l'ordonnanceur
    ## TODO parse result success/fail

460
    res = {"message": "[%s] c%s: 'copy' finished - %s" % (id_data, id_config, filename)}
461
462
    log.info(res)

463
464
465
    for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
        sample_set_id = row.sample_set_id
        print row.sample_set_id
466
        run_fuse(id_file, id_config, id_data, sample_set_id, clean_before = False)
467

468
469

    return "SUCCESS"
Mathieu Giraud's avatar
Mathieu Giraud committed
470
471
472



473
def run_fuse(id_file, id_config, id_data, sample_set_id, clean_before=True, clean_after=False):
474
475
    from subprocess import Popen, PIPE, STDOUT, os
    
476
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data
477
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data + '-%s' % sample_set_id
478
    
Mathieu Giraud's avatar
Mathieu Giraud committed
479
480
481
482
483
    if clean_before:
        cmd = "rm -rf "+out_folder 
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
        os.makedirs(out_folder)    
484
485
    
    
Mathieu Giraud's avatar
Mathieu Giraud committed
486
    fuse_log_file = open(out_folder+'/'+output_filename+'.fuse.log', 'w')
Marc's avatar
Marc committed
487
    
488
    ## fuse.py 
489
    output_file = out_folder+'/'+output_filename+'.fused'
490
    files = ""
491
    sequence_file_list = ""
492
    query2 = db( ( db.results_file.sequence_file_id == db.sequence_file.id )
493
                   & ( db.sample_set_membership.sequence_file_id == db.sequence_file.id)
494
                   & ( db.sample_set_membership.sample_set_id == sample_set_id)
495
                   & ( db.results_file.config_id == id_config )
Vidjil Team's avatar
Vidjil Team committed
496
                   ).select( orderby=db.sequence_file.id|~db.results_file.run_date) 
Marc Duez's avatar
Marc Duez committed
497
498
499
500
501
502
503
    query = []
    sequence_file_id = 0
    for row in query2 : 
        if row.sequence_file.id != sequence_file_id :
            query.append(row)
            sequence_file_id = row.sequence_file.id
            
504
    for row in query :
Mathieu Giraud's avatar
Mathieu Giraud committed
505
506
        if row.results_file.data_file is not None :
            files += defs.DIR_RESULTS + row.results_file.data_file + " "
507
508
            sequence_file_list += str(row.results_file.sequence_file_id) + "_"
            
509
    try:
510
511
        fuse_cmd = db.config[id_config].fuse_command
        cmd = "python "+defs.DIR_FUSE+"/fuse.py -o "+ output_file + " " + fuse_cmd + " " + files
Mathieu Giraud's avatar
Mathieu Giraud committed
512
513


514
515
516
517
        print "=== fuse.py ==="
        print cmd
        print "==============="
        sys.stdout.flush()
Mathieu Giraud's avatar
Mathieu Giraud committed
518

519
520
521
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=fuse_log_file, stderr=STDOUT, close_fds=True)
        (stdoutdata, stderrdata) = p.communicate()
        print "Output log in "+out_folder+'/'+output_filename+'.fuse.log'
Mathieu Giraud's avatar
Mathieu Giraud committed
522

523
        fuse_filepath = os.path.abspath(output_file)
524
525

        stream = open(fuse_filepath, 'rb')
526
    except:
527
        print "!!! Fuse failed, no .fused file"
528
529
        res = {"message": "[%s] c%s: 'fuse' FAILED - %s" % (id_data, id_config, output_file)}
        log.error(res)
530
        raise
531

532
    ts = time.time()
533
534
535
536

    fused_files = db( ( db.fused_file.config_id == id_config ) &
                     ( db.fused_file.sample_set_id == sample_set_id )
                 ).select()
537
    existing_fused_file = None
538
    if len(fused_files) > 0:
539
540
541
        fused_file = fused_files[0]
        id_fuse = fused_file.id
        existing_fused_file = fused_file.fused_file
542
543
544
545
    else:
        id_fuse = db.fused_file.insert(sample_set_id = sample_set_id,
                                       config_id = id_config)

546
    db.fused_file[id_fuse] = dict(fuse_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
547
548
                                 fused_file = stream,
                                 sequence_file_list = sequence_file_list)
549
    db.commit()
Mathieu Giraud's avatar
Mathieu Giraud committed
550
551
552
553
554

    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
555
556
557
558
559
        # remove previous fused_file if it exists
        if existing_fused_file is not None:
            clean_cmd = "rm -rf %s/%s" % (out_folder, existing_fused_file)
            p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
            p.wait()
560
    
561
    res = {"message": "[%s] c%s: 'fuse' finished - %s" % (id_data, id_config, db.fused_file[id_fuse].fused_file)}
562
563
    log.info(res)

564
565
566
    # Remove temporary fused file
    os.remove(output_file)

567
    return "SUCCESS"
568

569
def custom_fuse(file_list):
570
    from subprocess import Popen, PIPE, STDOUT, os
571
572
573

    if defs.PORT_FUSE_SERVER is None:
        raise IOError('This server cannot fuse custom data')
574
    random_id = random.randint(99999999,99999999999)
575
    out_folder = os.path.abspath(defs.DIR_OUT_VIDJIL_ID % random_id)
576
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % random_id
577
578
579
580
581
    
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    os.makedirs(out_folder)    
582

583
    res = {"message": "'custom fuse' (%d files): %s" % (len(file_list), ','.join(file_list))}
584
585
    log.info(res)
        
586
587
588
589
590
    ## fuse.py 
    output_file = out_folder+'/'+output_filename+'.fused'
    files = ""
    for id in file_list :
        if db.results_file[id].data_file is not None :
591
            files += os.path.abspath(defs.DIR_RESULTS + db.results_file[id].data_file) + " "
592
    
593
    try:
594
595
596
        cmd = "python "+ os.path.abspath(defs.DIR_FUSE) +"/fuse.py -o "+output_file+" -t 100 "+files
        proc_srvr = xmlrpclib.ServerProxy("http://%s:%d" % (defs.FUSE_SERVER, defs.PORT_FUSE_SERVER))
        fuse_filepath = proc_srvr.fuse(cmd, out_folder, output_filename)
597
598
599
    
        f = open(fuse_filepath, 'rb')
        data = gluon.contrib.simplejson.loads(f.read())
600
    except:
601
602
        res = {"message": "'custom fuse' -> IOError"}
        log.error(res)
603
        raise
604
605
606
607
608

    clean_cmd = "rm -rf " + out_folder 
    p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()

609
610
611
    res = {"message": "'custom fuse' -> finished"}
    log.info(res)

612
    return data
613

HERBERT Ryan's avatar
HERBERT Ryan committed
614
#TODO move this ?
615
616
617
618
619
620
def get_file_content(filename):
    content = ""
    with open(filename, 'rb') as my_file:
        content = my_file.read()
    return content

HERBERT Ryan's avatar
HERBERT Ryan committed
621
622
623
624
625
626
627
628
629
630
631
632
633
def fill_field(dest, value, field, parent="", append=False):
    if parent is not None and parent != "":
        dest = dest[parent]

    if field in dest:
        if append:
            dest[field][0] += value
        else:
            dest[field][0] = value
    else:
        dest[field] = []
        dest[field].append(value)

HERBERT Ryan's avatar
HERBERT Ryan committed
634
635
636
637
def extract_total_reads(report):
    reads_matcher = re.compile("Total Reads analysed: [0-9]+")
    reads_line = reads_matcher.search(report).group()
    return reads_line.split(' ')[-1]
638
639
640
641
642
643
644
645
646
647
648
649
650
651

def schedule_pre_process(sequence_file_id, pre_process_id):
    from subprocess import Popen, PIPE, STDOUT, os


    args = [pre_process_id, sequence_file_id]

    err = assert_scheduler_task_does_not_exist(str(args))
    if err:
        log.error(err)
        return err

    task = scheduler.queue_task("pre_process", args,
                                repeats = 1, timeout = defs.TASK_TIMEOUT)
652
653
    db.sequence_file[sequence_file_id] = dict(pre_process_scheduler_task_id = task.id)
    
654
    res = {"redirect": "reload",
655
           "message": "{%s} (%s): process requested" % (sequence_file_id, pre_process_id)}
656
657
658
659
660

    log.info(res)
    return res


661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
def get_preprocessed_filename(filename1, filename2):
    '''
    Get the same extension and then get the most common among them

    >>> get_preprocessed_filename('lsdkj_toto-tata_mlkmfsdlkf.fastq.gz', 'rete_toto-tata_eekjdf.fastq.gz')
    '_toto-tata_.fastq.gz'
    '''
    if not filename2:
        return filename1
    extension = tools_utils.get_common_suffpref([filename1, filename2], min(len(filename1), len(filename2)), -1)
    without_extension = [x[0:-len(extension)] for x in [filename1, filename2]]
    common = tools_utils.common_substring(without_extension)
    if len(common) == 0:
        common = '_'.join(without_extension)
    return common+extension


678
def run_pre_process(pre_process_id, sequence_file_id, clean_before=True, clean_after=False):
Mathieu Giraud's avatar
Mathieu Giraud committed
679
680
681
682
683
    '''
    Run a pre-process on sequence_file.data_file (and possibly sequence_file.data_file+2),
    put the output back in sequence_file.data_file.
    '''

684
685
    from subprocess import Popen, PIPE, STDOUT, os
    
686
    sequence_file = db.sequence_file[sequence_file_id]
687
688
689
    db.sequence_file[sequence_file_id] = dict(pre_process_flag = "RUN")
    db.commit()
    
690
    out_folder = defs.DIR_PRE_VIDJIL_ID % sequence_file_id
691
692
    output_filename = get_preprocessed_filename(get_original_filename(sequence_file.data_file),
                                                get_original_filename(sequence_file.data_file2))
693
694
695
696
697
698
699
    
    if clean_before:
        cmd = "rm -rf "+out_folder 
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
        os.makedirs(out_folder)    

700
    output_file = out_folder+'/'+output_filename
701
702
            
    pre_process = db.pre_process[pre_process_id]
703

704
    try:
705
706
707
708
709
710
711
712
713
714
        cmd = pre_process.command.replace( "&file1&", defs.DIR_SEQUENCES + sequence_file.data_file)
        if sequence_file.data_file2:
            cmd = cmd.replace( "&file2&", defs.DIR_SEQUENCES + sequence_file.data_file2)
        cmd = cmd.replace( "&result&", output_file)
        cmd = cmd.replace("&pear&", defs.DIR_PEAR)

        print "=== Pre-process %s ===" % pre_process_id
        print cmd
        print "==============="
        sys.stdout.flush()
715

716
717
718
719
720
721
722
        out_log = out_folder+'/'+output_filename+'.pre.log'
        log_file = open(out_log, 'w')

        os.chdir(defs.DIR_FUSE)
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=log_file, stderr=log_file, close_fds=True)
        (stdoutdata, stderrdata) = p.communicate()
        print "Output log in " + out_log
723

724
        filepath = os.path.abspath(output_file)
725
726

        stream = open(filepath, 'rb')
727
    except:
728
        print "!!! Pre-process failed, no result file"
729
        res = {"message": "{%s} p%s: 'pre_process' FAILED - %s" % (sequence_file_id, pre_process_id, output_file)}
730
        log.error(res)
731
732
        db.sequence_file[sequence_file_id] = dict(pre_process_flag = "FAILED")
        db.commit()
733
        raise
734

marc's avatar
marc committed
735
736
        

Mathieu Giraud's avatar
Mathieu Giraud committed
737
738
    # Now we update the sequence file with the result of the pre-process
    # We forget the initial data_file (and possibly data_file2)
739
    db.sequence_file[sequence_file_id] = dict(data_file = stream,
740
                                              data_file2 = None,
741
                                              pre_process_flag = "DONE")
742
    db.commit()
743
744
745
746
747
748
749
750
751
752
753
    #resume STOPPED task for this sequence file
    stopped_task = db(
        (db.results_file.sequence_file_id == sequence_file_id)
        & (db.results_file.scheduler_task_id == db.scheduler_task.id)
        & (db.scheduler_task.status == "STOPPED")
    ).select()
    
    for row in stopped_task :
        db.scheduler_task[row.scheduler_task.id] = dict(status = "QUEUED")
        
    
754
755
    db.commit()

756
    # Dump log in scheduler_run.run_output
757
758
    log_file.close()
    for l in open(out_log):
759
        print l,
760
761
762

    # Remove data file from disk to save space (it is now saved elsewhere)
    os.remove(filepath)
763
    
764
765
766
767
768
    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
    
769
    res = {"message": "{%s} p%s: 'pre_process' finished - %s" % (sequence_file_id, pre_process_id, output_file)}
770
771
772
773
    log.info(res)

    return "SUCCESS"
    
HERBERT Ryan's avatar
HERBERT Ryan committed
774
    
775
from gluon.scheduler import Scheduler
776
scheduler = Scheduler(db, dict(vidjil=run_vidjil,
777
                               compute_contamination=compute_contamination,
778
                               mixcr=run_mixcr,
779
                               none=run_copy,
Ryan Herbert's avatar
Ryan Herbert committed
780
781
                               pre_process=run_pre_process,
                               fuse=run_fuse),
HERBERT Ryan's avatar
HERBERT Ryan committed
782
                        heartbeat=defs.SCHEDULER_HEARTBEAT)