task.py 33 KB
Newer Older
1
# coding: utf8
Mathieu Giraud's avatar
Mathieu Giraud committed
2
3
from __future__ import print_function

4
import json
5
import os
6
import defs
7
import re
8
import os.path
9
10
11
import time
import sys
import datetime
12
13
import random
import xmlrpclib
14
import tools_utils
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

def assert_scheduler_task_does_not_exist(args):
    ##check scheduled run
    row = db( ( db.scheduler_task.args == args)
         & ( db.scheduler_task.status != "FAILED"  )
         & ( db.scheduler_task.status != "EXPIRED"  )
         & ( db.scheduler_task.status != "TIMEOUT"  )
         & ( db.scheduler_task.status != "COMPLETED"  )
         ).select()

    if len(row) > 0 :
        res = {"message": "task already registered"}
        return res

    return None

31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def compute_contamination(sequence_file_id, results_file_id, config_id):
    result = []
    for i in range(0, len(sequence_file_id)) :
        result.append({})
        result[i]["total_clones"]=0
        result[i]["total_reads"]=0
        result[i]["sample"]={}
        
        #open sample
        with open(defs.DIR_RESULTS+db.results_file[results_file_id[i]].data_file, "r") as json_data:
            d = json.load(json_data)
            list1 = {}
            total_reads1=d["reads"]["segmented"][0]
            for clone in d["clones"]:
                list1[clone["id"]] = clone["reads"][0]
            json_data.close()
        
        #iterate trough others run's samples
        sample_set_run = db( (db.sample_set_membership.sequence_file_id == sequence_file_id[i])
                           & (db.sample_set_membership.sample_set_id == db.sample_set.id)
                           & (db.sample_set.sample_type == "run") ).select().first()
        
        if sample_set_run != None :
            sample_set_id = sample_set_run.sample_set.id
            query = db(  ( db.sample_set.id == sample_set_id )
                       & ( db.sample_set.id == db.sample_set_membership.sample_set_id )
                       & ( db.sequence_file.id == db.sample_set_membership.sequence_file_id)
                       & ( db.sequence_file.id != sequence_file_id[i])
                       & ( db.results_file.sequence_file_id == db.sequence_file.id )
                       & ( db.results_file.config_id == config_id[i]  )
                       ).select(db.sequence_file.ALL,db.results_file.ALL, db.sample_set.id, orderby=db.sequence_file.id|~db.results_file.run_date)

            query2 = []
            sfi = 0
            for row in query : 
                if row.sequence_file.id != sfi :
                    query2.append(row)
                    sfi = row.sequence_file.id
            
            for row in query2 :
Mathieu Giraud's avatar
Mathieu Giraud committed
71
                print(defs.DIR_RESULTS+row.results_file.data_file)
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
                result[i]["sample"][row.results_file.id] = {}
                result[i]["sample"][row.results_file.id]["clones"] = 0
                result[i]["sample"][row.results_file.id]["reads"] = 0
                with open(defs.DIR_RESULTS+row.results_file.data_file, "r") as json_data2:
                    try:
                        d = json.load(json_data2)
                        total_reads2=d["reads"]["segmented"][0]
                        for clone in d["clones"]:
                            if clone["id"] in list1 :
                                if clone["reads"][0] > 10*list1[clone["id"]] :
                                    result[i]["total_clones"] += 1
                                    result[i]["total_reads"] += list1[clone["id"]]
                                    result[i]["sample"][row.results_file.id]["clones"] += 1
                                    result[i]["sample"][row.results_file.id]["reads"] += list1[clone["id"]]
                            
Mathieu Giraud's avatar
Mathieu Giraud committed
87
88
                    except ValueError as e:
                        print('invalid_json')
89
90
91
92
93
                    json_data2.close()


    return result
    
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
def compute_num_clones(results_file_id, min_threshold):
    results_file = db.results_file[results_file_id]
    with open(results_file.data_file, 'wb') as results:
        try:
            d = json.load(results)
            loci_threshold = {}
            loci_min = {}
            loci_totals = d['reads']['germline']
            for locus in loci_totals:
                loci_threshold[locus] = 0
                loci_min[locus] = loci_totals[locus][0] * (min_threshold/100.0)

            for clone in d["clones"]:
                gerlmine = clones['reads']['germline']
                if clone['reads'][0] >=  loci_min[germline]:
                    loci_threshold[germline] += 1

111

112
def schedule_run(id_sequence, id_config, grep_reads=None):
113
    from subprocess import Popen, PIPE, STDOUT, os
114

115
116
117
    #check results_file
    row = db( ( db.results_file.config_id == id_config ) & 
             ( db.results_file.sequence_file_id == id_sequence )  
118
119
             ).select()
    
Marc Duez's avatar
Marc Duez committed
120
121
122
123
    ts = time.time()
    data_id = db.results_file.insert(sequence_file_id = id_sequence,
                                     run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
                                     config_id = id_config )
124
        
125
    args = [id_sequence, id_config, data_id, grep_reads]
126

127
128
129
130
    err = assert_scheduler_task_does_not_exist(str(args))
    if err:
        log.error(err)
        return err
131

132
    program = db.config[id_config].program
133
    ##add task to scheduler
134
135
    task = scheduler.queue_task(program, args,
                                repeats = 1, timeout = defs.TASK_TIMEOUT)
136
    
137
    if db.sequence_file[id_sequence].pre_process_flag not in ["COMPLETED", "DONE"] and db.sequence_file[id_sequence].pre_process_flag :
138
139
        db.scheduler_task[task.id] = dict(status ="STOPPED")
    
140
    db.results_file[data_id] = dict(scheduler_task_id = task.id)
141

142
    filename= db.sequence_file[id_sequence].filename
143

144
    res = {"redirect": "reload",
145
           "message": "[%s] c%s: process requested - %s %s" % (data_id, id_config, grep_reads, filename)}
146

147
    log.info(res)
148
149
    return res

150
151
152
153
154
155
156
157
158
159
160
161
def schedule_fuse(sample_set_ids, config_ids):
    args = []
    for sample_set_id in sample_set_ids:
        for config_id in config_ids:
            row = db((db.sample_set_membership.sample_set_id == sample_set_id)
                   & (db.sample_set_membership.sequence_file_id == db.results_file.sequence_file_id)
                   & (db.results_file.config_id == config_id)
                ).select(db.sample_set_membership.sample_set_id, db.sample_set_membership.sequence_file_id,
                        db.results_file.id, db.results_file.config_id).first()
            if row:
                args.append([row.sample_set_membership.sequence_file_id, row.results_file.config_id,
                row.results_file.id, row.sample_set_membership.sample_set_id, False])
162
163
164
    if len(args) > 0:
        task = scheduler.queue_task('refuse', [args],
                                    repeats = 1, timeout = defs.TASK_TIMEOUT)
165

166
def run_vidjil(id_file, id_config, id_data, grep_reads,
167
               clean_before=False, clean_after=False):
168
    from subprocess import Popen, PIPE, STDOUT, os
169
170
171
    from datetime import timedelta as timed
    
    ## re schedule if pre_process is still pending
172
    if db.sequence_file[id_file].pre_process_flag not in ["COMPLETED", "DONE"] and db.sequence_file[id_file].pre_process_flag:
173
        
Mathieu Giraud's avatar
Mathieu Giraud committed
174
        print("Pre-process is still pending, re-schedule")
175
    
176
        args = [id_file, id_config, id_data, grep_reads]
177
178
        task = scheduler.queue_task("vidjil", args,
                        repeats = 1, timeout = defs.TASK_TIMEOUT,
179
                               start_time=request.now + timed(seconds=1200))
180
181
        db.results_file[id_data] = dict(scheduler_task_id = task.id)
        db.commit()
Mathieu Giraud's avatar
Mathieu Giraud committed
182
        print(task.id)
183
184
185
186
        sys.stdout.flush()
        
        return "SUCCESS"
    
marc's avatar
marc committed
187
    if db.sequence_file[id_file].pre_process_flag == "FAILED" :
Mathieu Giraud's avatar
Mathieu Giraud committed
188
        print("Pre-process has failed")
marc's avatar
marc committed
189
190
        raise ValueError('pre-process has failed')
        return "FAIL"
191
192
    
    ## les chemins d'acces a vidjil / aux fichiers de sequences
193
194
    upload_folder = defs.DIR_SEQUENCES
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data
195
    
196
197
198
199
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    
200
201
202
    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
203
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
204
    seq_file = upload_folder+filename
205

206
    ## config de vidjil
207
    vidjil_cmd = db.config[id_config].command
208
209
210
211
212
213
214

    if 'next' in vidjil_cmd:
        vidjil_cmd = vidjil_cmd.replace('next', '')
        vidjil_cmd = vidjil_cmd.replace(' germline' , defs.DIR_GERMLINE_NEXT)
        cmd = defs.DIR_VIDJIL_NEXT + '/vidjil-algo '
    else:
        vidjil_cmd = vidjil_cmd.replace(' germline' , defs.DIR_GERMLINE)
215
        cmd = defs.DIR_VIDJIL + '/vidjil-algo '
216
217
218
219

    if grep_reads:
        # TODO: security, assert grep_reads XXXX
        vidjil_cmd += ' -FaW "%s" ' % grep_reads
220
    
221
    os.makedirs(out_folder)
222
223
    out_log = out_folder+'/'+output_filename+'.vidjil.log'
    vidjil_log_file = open(out_log, 'w')
224

225
    try:
226
        ## commande complete
227
        cmd += ' -o  ' + out_folder + " -b " + output_filename
228
229
230
        cmd += ' ' + vidjil_cmd + ' '+ seq_file

        ## execute la commande vidjil
Mathieu Giraud's avatar
Mathieu Giraud committed
231
232
233
        print("=== Launching Vidjil ===")
        print(cmd)    
        print("========================")
234
        sys.stdout.flush()
235

236
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=vidjil_log_file, stderr=STDOUT, close_fds=True)
237

238
        (stdoutdata, stderrdata) = p.communicate()
239

Mathieu Giraud's avatar
Mathieu Giraud committed
240
        print("Output log in " + out_log)
241
242
        sys.stdout.flush()
        db.commit()
243

244
245
246
247
248
        ## Get result file
        if grep_reads:
            out_results = out_folder + '/seq/clone.fa-1'
        else:
            out_results = out_folder + '/' + output_filename + '.vidjil'
249

Mathieu Giraud's avatar
Mathieu Giraud committed
250
        print("===>", out_results)
251
        results_filepath = os.path.abspath(out_results)
252
253

        stream = open(results_filepath, 'rb')
254
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
255
        print("!!! Vidjil failed, no result file")
256
257
        res = {"message": "[%s] c%s: Vidjil FAILED - %s" % (id_data, id_config, out_folder)}
        log.error(res)
258
        raise
259
    
260
261
262
263
264
265
266
267
268
269
270
271
272
    ## Parse some info in .log
    vidjil_log_file.close()

    segmented = re.compile("==> segmented (\d+) reads \((\d*\.\d+|\d+)%\)")
    windows = re.compile("==> found (\d+) .*-windows in .* segments .* inside (\d+) sequences")
    info = ''
    reads = None
    segs = None
    ratio = None
    wins = None
    for l in open(out_log):
        m = segmented.search(l)
        if m:
Mathieu Giraud's avatar
Mathieu Giraud committed
273
            print(l, end=' ')
274
275
276
277
278
279
            segs = int(m.group(1))
            ratio = m.group(2)
            info = "%d segmented (%s%%)" % (segs, ratio)
            continue
        m = windows.search(l)
        if m:
Mathieu Giraud's avatar
Mathieu Giraud committed
280
            print(l, end=' ')
281
282
283
284
285
286
            wins = int(m.group(1))
            reads = int(m.group(2))
            info = "%d reads, " % reads + info + ", %d windows" % wins
            break


287
    ## insertion dans la base de donnée
288
289
    ts = time.time()
    
290
    db.results_file[id_data] = dict(status = "ready",
291
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
292
293
                                 data_file = stream
                                )
294
    
295
296
    db.commit()
    
Mathieu Giraud's avatar
Mathieu Giraud committed
297
298
299
300
    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
301
    
Mathieu Giraud's avatar
Mathieu Giraud committed
302
303
    ## l'output de Vidjil est stocké comme resultat pour l'ordonnanceur
    ## TODO parse result success/fail
304

305
306
    config_name = db.config[id_config].name

307
    res = {"message": "[%s] c%s: Vidjil finished - %s - %s" % (id_data, id_config, info, out_folder)}
308
309
    log.info(res)

310
311

    if not grep_reads:
312
313
        for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
	    sample_set_id = row.sample_set_id
Mathieu Giraud's avatar
Mathieu Giraud committed
314
	    print(row.sample_set_id)
315
            run_fuse(id_file, id_config, id_data, sample_set_id, clean_before = False)
316

Mathieu Giraud's avatar
Mathieu Giraud committed
317
    return "SUCCESS"
318

Alexander Shlemov's avatar
Alexander Shlemov committed
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
def run_igrec(id_file, id_config, id_data, clean_before=False, clean_after=False):
    from subprocess import Popen, PIPE, STDOUT, os
    import time
    import json

    upload_folder = defs.DIR_SEQUENCES
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data

    # FIXME Use shutil instead
    cmd = "rm -rf "+out_folder
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()

    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
    seq_file = upload_folder+filename

    ## config de vidjil
    arg_cmd = db.config[id_config].command

    os.makedirs(out_folder)
    out_log = out_folder + '/vidjil-igrec.log'
    log_file = open(out_log, 'w')

    out_results = out_folder + "/out/igrec.vidjil"
    ## commande complete
    try:
        igrec = defs.DIR_IGREC + '/igrec.py'
        if not os.path.isfile(igrec):
            print("!!! IgReC binary file not found")
        cmd = "%s -s %s -o %s/out %s" % (igrec, seq_file, out_folder, arg_cmd)

        ## execute la commande IgReC
        print("=== Launching IgReC ===")
        print(cmd)
        print("========================")
        sys.stdout.flush()

        p = Popen(cmd, shell=True, stdin=PIPE, stdout=log_file, stderr=STDOUT, close_fds=True)
        p.wait()

        print("Output log in " + out_log)
        sys.stdout.flush()

        ## Get result file
        print("===>", out_results)
        results_filepath = os.path.abspath(out_results)
        stream = open(results_filepath, 'rb')
        stream.close()
    except:
        print("!!! IgReC failed, no result file")
        res = {"message": "[%s] c%s: IgReC FAILED - %s" % (id_data, id_config, out_folder)}
        log.error(res)
        raise

    original_name = row[0].data_file
    with open(results_filepath, 'r') as json_file:
        my_json = json.load(json_file)
        fill_field(my_json, original_name, "original_names", "samples")
        fill_field(my_json, cmd, "commandline", "samples")

        # TODO fix this dirty hack to get around bad file descriptor error
    new_file = open(results_filepath, 'w')
    json.dump(my_json, new_file)
    new_file.close()

    ## insertion dans la base de donnée
    ts = time.time()

    stream = open(results_filepath, 'rb')
    db.results_file[id_data] = dict(status = "ready",
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
                                 data_file = stream
                                )

    db.commit()

    config_name = db.config[id_config].name
    res = {"message": "[%s] c%s: IgReC - %s" % (id_data, id_config, out_folder)}
    log.info(res)

    for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
        sample_set_id = row.sample_set_id
        print(row.sample_set_id)
        run_fuse(id_file, id_config, id_data, sample_set_id, clean_before = False)


    return "SUCCESS"

410
def run_mixcr(id_file, id_config, id_data, clean_before=False, clean_after=False):
411
412
    from subprocess import Popen, PIPE, STDOUT, os
    import time
413
    import json
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432

    upload_folder = defs.DIR_SEQUENCES
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data

    cmd = "rm -rf "+out_folder
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()

    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
    seq_file = upload_folder+filename

    ## config de vidjil
    arg_cmd = db.config[id_config].command

    os.makedirs(out_folder)
    out_log = out_folder + output_filename+'.mixcr.log'
433
    report = out_folder + output_filename + '.mixcr.report'
434
435
436
437
    log_file = open(out_log, 'w')

    out_alignments = out_folder + output_filename + '.align.vdjca'
    out_clones =  out_folder + output_filename + '.clones.clns'
438
439
    out_results_file = output_filename + '.mixcr'
    out_results = out_folder + out_results_file
440

441
    align_report = report + '.aln'
442
    assembly_report = report + '.asmbl'
443

444
445
446
447
448
    arg_cmds = arg_cmd.split('|')
    args_1, args_2, args_3 = '', '', ''
    try:
        args_1, args_2, args_3 = arg_cmds
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
449
450
        print(arg_cmd)
        print("! Bad arguments, we expect args_align | args_assemble | args_exportClones")
451

452
    ## commande complete
453
    try:
454
455
456
457
458
459
460
461
462
        mixcr = defs.DIR_MIXCR + 'mixcr'
        cmd = mixcr + ' align --save-reads -t 1 -r ' + align_report + ' ' + args_1 + ' ' + seq_file  + ' ' + out_alignments
        cmd += ' && '
        cmd += mixcr + ' assemble -t 1 -r ' + assembly_report + ' ' + args_2 + ' ' + out_alignments + ' ' + out_clones
        cmd += ' && rm ' + out_alignments
        cmd += ' && '
        cmd += mixcr + ' exportClones --format vidjil -germline -id -name -reads -sequence -top -seg -s ' + args_3 + ' ' + out_clones + ' ' + out_results

        ## execute la commande MiXCR
Mathieu Giraud's avatar
Mathieu Giraud committed
463
464
465
        print("=== Launching MiXCR ===")
        print(cmd)
        print("========================")
466
        sys.stdout.flush()
467

468
469
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=log_file, stderr=STDOUT, close_fds=True)
        p.wait()
470

Mathieu Giraud's avatar
Mathieu Giraud committed
471
        print("Output log in " + out_log)
472
        sys.stdout.flush()
473

474
        ## Get result file
Mathieu Giraud's avatar
Mathieu Giraud committed
475
        print("===>", out_results)
476
        results_filepath = os.path.abspath(out_results)
HERBERT Ryan's avatar
HERBERT Ryan committed
477
        stream = open(results_filepath, 'rb')
478
        stream.close()
479
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
480
        print("!!! MiXCR failed, no result file")
HERBERT Ryan's avatar
HERBERT Ryan committed
481
482
        res = {"message": "[%s] c%s: MiXCR FAILED - %s" % (id_data, id_config, out_folder)}
        log.error(res)
483
        raise
484

HERBERT Ryan's avatar
HERBERT Ryan committed
485
486
487
    align_report = get_file_content(align_report)
    assembly_report = get_file_content(assembly_report)
    reports = align_report + assembly_report
488
    original_name = row[0].data_file
HERBERT Ryan's avatar
HERBERT Ryan committed
489
    totalReads = extract_total_reads(assembly_report)
490
491
    with open(results_filepath, 'r') as json_file:
        my_json = json.load(json_file)
HERBERT Ryan's avatar
HERBERT Ryan committed
492
493
        fill_field(my_json, reports, "log", "samples", True)
        fill_field(my_json, original_name, "original_names", "samples")
494
        fill_field(my_json, cmd, "commandline", "samples")
HERBERT Ryan's avatar
HERBERT Ryan committed
495
        fill_field(my_json, totalReads, "total", "reads")
496

497
498
499
500
        # TODO fix this dirty hack to get around bad file descriptor error
    new_file = open(results_filepath, 'w')
    json.dump(my_json, new_file)
    new_file.close()
501

502
503
    ## insertion dans la base de donnée
    ts = time.time()
HERBERT Ryan's avatar
HERBERT Ryan committed
504
    
505
    stream = open(results_filepath, 'rb')
506
507
    db.results_file[id_data] = dict(status = "ready",
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
HERBERT Ryan's avatar
HERBERT Ryan committed
508
                                 data_file = stream
509
                                )
HERBERT Ryan's avatar
HERBERT Ryan committed
510
    
511
512
513
514
515
516
    db.commit()

    config_name = db.config[id_config].name
    res = {"message": "[%s] c%s: MiXCR finished - %s" % (id_data, id_config, out_folder)}
    log.info(res)

HERBERT Ryan's avatar
HERBERT Ryan committed
517
518
    for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
        sample_set_id = row.sample_set_id
Mathieu Giraud's avatar
Mathieu Giraud committed
519
        print(row.sample_set_id)
520
        run_fuse(id_file, id_config, id_data, sample_set_id, clean_before = False)
HERBERT Ryan's avatar
HERBERT Ryan committed
521
522


523
    return "SUCCESS"
524

525
def run_copy(id_file, id_config, id_data, clean_before=False, clean_after=False):
526
527
528
529
    from subprocess import Popen, PIPE, STDOUT, os
    
    ## les chemins d'acces a vidjil / aux fichiers de sequences
    upload_folder = defs.DIR_SEQUENCES
530
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
531
532
533
534
535
536
537
538
539
540
541
542
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data
    
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    
    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
    
    os.makedirs(out_folder)
    vidjil_log_file = open(out_folder+'/'+output_filename+'.vidjil.log', 'w')
543

Mathieu Giraud's avatar
Mathieu Giraud committed
544
    print("Output log in "+out_folder+'/'+output_filename+'.vidjil.log')
545
546
547
548
549
    sys.stdout.flush()
    db.commit()
    
    ## récupération du fichier 
    results_filepath = os.path.abspath(defs.DIR_SEQUENCES+row[0].data_file)
550

551
552
553
    try:
        stream = open(results_filepath, 'rb')
    except IOError:
Mathieu Giraud's avatar
Mathieu Giraud committed
554
        print("!!! 'copy' failed, no file")
555
556
        res = {"message": "[%s] c%s: 'copy' FAILED - %s - %s" % (id_data, id_config, info, out_folder)}
        log.error(res)
557
558
559
560
561
562
563
564
        raise IOError
    
    ## insertion dans la base de donnée
    ts = time.time()
    
    db.results_file[id_data] = dict(status = "ready",
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
                                 #data_file = row[0].data_file
565
                                 data_file = db.results_file.data_file.store(stream, row[0].filename)
566
567
568
569
570
571
572
573
574
575
576
                                )
    db.commit()
    
    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
    
    ## l'output de Vidjil est stocké comme resultat pour l'ordonnanceur
    ## TODO parse result success/fail

577
    res = {"message": "[%s] c%s: 'copy' finished - %s" % (id_data, id_config, filename)}
578
579
    log.info(res)

580
581
    for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
        sample_set_id = row.sample_set_id
Mathieu Giraud's avatar
Mathieu Giraud committed
582
        print(row.sample_set_id)
583
        run_fuse(id_file, id_config, id_data, sample_set_id, clean_before = False)
584

585
586

    return "SUCCESS"
Mathieu Giraud's avatar
Mathieu Giraud committed
587
588


589
590
591
592
def run_refuse(args):
    for arg in args:
        run_fuse(arg[0], arg[1], arg[2], arg[3], arg[4])
    return "SUCCESS"
Mathieu Giraud's avatar
Mathieu Giraud committed
593

594
def run_fuse(id_file, id_config, id_data, sample_set_id, clean_before=True, clean_after=False):
595
596
    from subprocess import Popen, PIPE, STDOUT, os
    
597
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data
598
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data + '-%s' % sample_set_id
599
    
Mathieu Giraud's avatar
Mathieu Giraud committed
600
601
602
603
604
    if clean_before:
        cmd = "rm -rf "+out_folder 
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
        os.makedirs(out_folder)    
605
606
    
    
Mathieu Giraud's avatar
Mathieu Giraud committed
607
    fuse_log_file = open(out_folder+'/'+output_filename+'.fuse.log', 'w')
Marc's avatar
Marc committed
608
    
609
    ## fuse.py 
610
    output_file = out_folder+'/'+output_filename+'.fused'
611
    files = ""
612
    sequence_file_list = ""
613
    query2 = db( ( db.results_file.sequence_file_id == db.sequence_file.id )
614
                   & ( db.sample_set_membership.sequence_file_id == db.sequence_file.id)
615
                   & ( db.sample_set_membership.sample_set_id == sample_set_id)
616
                   & ( db.results_file.config_id == id_config )
Vidjil Team's avatar
Vidjil Team committed
617
                   ).select( orderby=db.sequence_file.id|~db.results_file.run_date) 
Marc Duez's avatar
Marc Duez committed
618
619
620
621
622
623
624
    query = []
    sequence_file_id = 0
    for row in query2 : 
        if row.sequence_file.id != sequence_file_id :
            query.append(row)
            sequence_file_id = row.sequence_file.id
            
625
    for row in query :
Mathieu Giraud's avatar
Mathieu Giraud committed
626
627
        if row.results_file.data_file is not None :
            files += defs.DIR_RESULTS + row.results_file.data_file + " "
628
629
            sequence_file_list += str(row.results_file.sequence_file_id) + "_"
            
630
    try:
631
632
        fuse_cmd = db.config[id_config].fuse_command
        cmd = "python "+defs.DIR_FUSE+"/fuse.py -o "+ output_file + " " + fuse_cmd + " " + files
Mathieu Giraud's avatar
Mathieu Giraud committed
633
634


Mathieu Giraud's avatar
Mathieu Giraud committed
635
636
637
        print("=== fuse.py ===")
        print(cmd)
        print("===============")
638
        sys.stdout.flush()
Mathieu Giraud's avatar
Mathieu Giraud committed
639

640
641
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=fuse_log_file, stderr=STDOUT, close_fds=True)
        (stdoutdata, stderrdata) = p.communicate()
Mathieu Giraud's avatar
Mathieu Giraud committed
642
        print("Output log in "+out_folder+'/'+output_filename+'.fuse.log')
Mathieu Giraud's avatar
Mathieu Giraud committed
643

644
        fuse_filepath = os.path.abspath(output_file)
645
646

        stream = open(fuse_filepath, 'rb')
647
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
648
        print("!!! Fuse failed, no .fused file")
649
650
        res = {"message": "[%s] c%s: 'fuse' FAILED - %s" % (id_data, id_config, output_file)}
        log.error(res)
651
        raise
652

653
    ts = time.time()
654
655
656
657

    fused_files = db( ( db.fused_file.config_id == id_config ) &
                     ( db.fused_file.sample_set_id == sample_set_id )
                 ).select()
658
    existing_fused_file = None
659
    if len(fused_files) > 0:
660
661
662
        fused_file = fused_files[0]
        id_fuse = fused_file.id
        existing_fused_file = fused_file.fused_file
663
664
665
666
    else:
        id_fuse = db.fused_file.insert(sample_set_id = sample_set_id,
                                       config_id = id_config)

667
    db.fused_file[id_fuse] = dict(fuse_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
668
669
                                 fused_file = stream,
                                 sequence_file_list = sequence_file_list)
670
    db.commit()
Mathieu Giraud's avatar
Mathieu Giraud committed
671
672
673
674
675

    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
676
677
678
679
680
        # remove previous fused_file if it exists
        if existing_fused_file is not None:
            clean_cmd = "rm -rf %s/%s" % (out_folder, existing_fused_file)
            p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
            p.wait()
681
    
682
    res = {"message": "[%s] c%s: 'fuse' finished - %s" % (id_data, id_config, db.fused_file[id_fuse].fused_file)}
683
684
    log.info(res)

685
686
687
    # Remove temporary fused file
    os.remove(output_file)

688
    return "SUCCESS"
689

690
def custom_fuse(file_list):
691
    from subprocess import Popen, PIPE, STDOUT, os
692
693
694

    if defs.PORT_FUSE_SERVER is None:
        raise IOError('This server cannot fuse custom data')
695
    random_id = random.randint(99999999,99999999999)
696
    out_folder = os.path.abspath(defs.DIR_OUT_VIDJIL_ID % random_id)
697
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % random_id
698
699
700
701
702
    
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    os.makedirs(out_folder)    
703

704
    res = {"message": "'custom fuse' (%d files): %s" % (len(file_list), ','.join(file_list))}
705
706
    log.info(res)
        
707
708
709
710
711
    ## fuse.py 
    output_file = out_folder+'/'+output_filename+'.fused'
    files = ""
    for id in file_list :
        if db.results_file[id].data_file is not None :
712
            files += os.path.abspath(defs.DIR_RESULTS + db.results_file[id].data_file) + " "
713
    
714
    try:
715
        cmd = "python "+ os.path.abspath(defs.DIR_FUSE) +"/fuse.py -o "+output_file+" -t 100 "+files
716
        proc_srvr = xmlrpclib.ServerProxy("http://%s:%d" % (defs.FUSE_SERVER, defs.PORT_FUSE_SERVER))
717
        fuse_filepath = proc_srvr.fuse(cmd, out_folder, output_filename)
718
719
720
    
        f = open(fuse_filepath, 'rb')
        data = gluon.contrib.simplejson.loads(f.read())
721
    except:
722
723
        res = {"message": "'custom fuse' -> IOError"}
        log.error(res)
724
        raise
725
726
727
728
729

    clean_cmd = "rm -rf " + out_folder 
    p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()

730
731
732
    res = {"message": "'custom fuse' -> finished"}
    log.info(res)

733
    return data
734

HERBERT Ryan's avatar
HERBERT Ryan committed
735
#TODO move this ?
736
737
738
739
740
741
def get_file_content(filename):
    content = ""
    with open(filename, 'rb') as my_file:
        content = my_file.read()
    return content

HERBERT Ryan's avatar
HERBERT Ryan committed
742
743
744
745
746
747
748
749
750
751
752
753
754
def fill_field(dest, value, field, parent="", append=False):
    if parent is not None and parent != "":
        dest = dest[parent]

    if field in dest:
        if append:
            dest[field][0] += value
        else:
            dest[field][0] = value
    else:
        dest[field] = []
        dest[field].append(value)

HERBERT Ryan's avatar
HERBERT Ryan committed
755
756
757
758
def extract_total_reads(report):
    reads_matcher = re.compile("Total Reads analysed: [0-9]+")
    reads_line = reads_matcher.search(report).group()
    return reads_line.split(' ')[-1]
759
760
761
762
763
764
765
766
767
768
769
770
771
772

def schedule_pre_process(sequence_file_id, pre_process_id):
    from subprocess import Popen, PIPE, STDOUT, os


    args = [pre_process_id, sequence_file_id]

    err = assert_scheduler_task_does_not_exist(str(args))
    if err:
        log.error(err)
        return err

    task = scheduler.queue_task("pre_process", args,
                                repeats = 1, timeout = defs.TASK_TIMEOUT)
773
774
    db.sequence_file[sequence_file_id] = dict(pre_process_scheduler_task_id = task.id)
    
775
    res = {"redirect": "reload",
776
           "message": "{%s} (%s): process requested" % (sequence_file_id, pre_process_id)}
777
778
779
780
781

    log.info(res)
    return res


782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
def get_preprocessed_filename(filename1, filename2):
    '''
    Get the same extension and then get the most common among them

    >>> get_preprocessed_filename('lsdkj_toto-tata_mlkmfsdlkf.fastq.gz', 'rete_toto-tata_eekjdf.fastq.gz')
    '_toto-tata_.fastq.gz'
    '''
    if not filename2:
        return filename1
    extension = tools_utils.get_common_suffpref([filename1, filename2], min(len(filename1), len(filename2)), -1)
    without_extension = [x[0:-len(extension)] for x in [filename1, filename2]]
    common = tools_utils.common_substring(without_extension)
    if len(common) == 0:
        common = '_'.join(without_extension)
    return common+extension


799
def run_pre_process(pre_process_id, sequence_file_id, clean_before=True, clean_after=False):
Mathieu Giraud's avatar
Mathieu Giraud committed
800
801
802
803
804
    '''
    Run a pre-process on sequence_file.data_file (and possibly sequence_file.data_file+2),
    put the output back in sequence_file.data_file.
    '''

805
806
    from subprocess import Popen, PIPE, STDOUT, os
    
807
    sequence_file = db.sequence_file[sequence_file_id]
808
809
810
    db.sequence_file[sequence_file_id] = dict(pre_process_flag = "RUN")
    db.commit()
    
811
    out_folder = defs.DIR_PRE_VIDJIL_ID % sequence_file_id
812
813
    output_filename = get_preprocessed_filename(get_original_filename(sequence_file.data_file),
                                                get_original_filename(sequence_file.data_file2))
814
815
816
817
818
819
820
    
    if clean_before:
        cmd = "rm -rf "+out_folder 
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
        os.makedirs(out_folder)    

821
    output_file = out_folder+'/'+output_filename
822
823
            
    pre_process = db.pre_process[pre_process_id]
824

825
    try:
826
827
828
829
830
        cmd = pre_process.command.replace( "&file1&", defs.DIR_SEQUENCES + sequence_file.data_file)
        if sequence_file.data_file2:
            cmd = cmd.replace( "&file2&", defs.DIR_SEQUENCES + sequence_file.data_file2)
        cmd = cmd.replace( "&result&", output_file)
        cmd = cmd.replace("&pear&", defs.DIR_PEAR)
831
832
833
834
835
        # Example of template to add some preprocess shortcut
        # cmd = cmd.replace("&preprocess_template&", defs.DIR_preprocess_template)
        # Where &preprocess_template& is the shortcut to change and
        # defs.DIR_preprocess_template the variable to set into the file defs.py. 
        # The value should be the path to access to the preprocess software.
836

Mathieu Giraud's avatar
Mathieu Giraud committed
837
838
839
        print("=== Pre-process %s ===" % pre_process_id)
        print(cmd)
        print("===============")
840
        sys.stdout.flush()
841

842
843
844
845
846
847
        out_log = out_folder+'/'+output_filename+'.pre.log'
        log_file = open(out_log, 'w')

        os.chdir(defs.DIR_FUSE)
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=log_file, stderr=log_file, close_fds=True)
        (stdoutdata, stderrdata) = p.communicate()
Mathieu Giraud's avatar
Mathieu Giraud committed
848
        print("Output log in " + out_log)
849

850
        filepath = os.path.abspath(output_file)
851
852

        stream = open(filepath, 'rb')
853
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
854
        print("!!! Pre-process failed, no result file")
855
        res = {"message": "{%s} p%s: 'pre_process' FAILED - %s" % (sequence_file_id, pre_process_id, output_file)}
856
        log.error(res)
857
858
        db.sequence_file[sequence_file_id] = dict(pre_process_flag = "FAILED")
        db.commit()
859
        raise
860

marc's avatar
marc committed
861
862
        

Mathieu Giraud's avatar
Mathieu Giraud committed
863
864
    # Now we update the sequence file with the result of the pre-process
    # We forget the initial data_file (and possibly data_file2)
865
    db.sequence_file[sequence_file_id] = dict(data_file = stream,
866
                                              data_file2 = None,
867
                                              pre_process_flag = "COMPLETED")
868
    db.commit()
869
870
871
872
873
874
875
876
877
878
879
    #resume STOPPED task for this sequence file
    stopped_task = db(
        (db.results_file.sequence_file_id == sequence_file_id)
        & (db.results_file.scheduler_task_id == db.scheduler_task.id)
        & (db.scheduler_task.status == "STOPPED")
    ).select()
    
    for row in stopped_task :
        db.scheduler_task[row.scheduler_task.id] = dict(status = "QUEUED")
        
    
880
881
    db.commit()

882
    # Dump log in scheduler_run.run_output
883
884
    log_file.close()
    for l in open(out_log):
Mathieu Giraud's avatar
Mathieu Giraud committed
885
        print(l, end=' ')
886
887
888

    # Remove data file from disk to save space (it is now saved elsewhere)
    os.remove(filepath)
889
    
890
891
892
893
894
    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
    
895
    res = {"message": "{%s} p%s: 'pre_process' finished - %s" % (sequence_file_id, pre_process_id, output_file)}
896
897
898
899
    log.info(res)

    return "SUCCESS"
    
HERBERT Ryan's avatar
HERBERT Ryan committed
900
    
901
from gluon.scheduler import Scheduler
902
scheduler = Scheduler(db, dict(vidjil=run_vidjil,
903
                               compute_contamination=compute_contamination,
904
                               mixcr=run_mixcr,
Alexander Shlemov's avatar
Alexander Shlemov committed
905
                               igrec=run_igrec,
906
                               none=run_copy,
Ryan Herbert's avatar
Ryan Herbert committed
907
                               pre_process=run_pre_process,
908
                               refuse=run_refuse),
HERBERT Ryan's avatar
HERBERT Ryan committed
909
                        heartbeat=defs.SCHEDULER_HEARTBEAT)