task.py 34.1 KB
Newer Older
1
# coding: utf8
Mathieu Giraud's avatar
Mathieu Giraud committed
2
3
from __future__ import print_function

4
import json
5
import os
6
import defs
7
import re
8
import os.path
9
10
11
import time
import sys
import datetime
12
13
import random
import xmlrpclib
14
import tools_utils
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

def assert_scheduler_task_does_not_exist(args):
    ##check scheduled run
    row = db( ( db.scheduler_task.args == args)
         & ( db.scheduler_task.status != "FAILED"  )
         & ( db.scheduler_task.status != "EXPIRED"  )
         & ( db.scheduler_task.status != "TIMEOUT"  )
         & ( db.scheduler_task.status != "COMPLETED"  )
         ).select()

    if len(row) > 0 :
        res = {"message": "task already registered"}
        return res

    return None

31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def compute_contamination(sequence_file_id, results_file_id, config_id):
    result = []
    for i in range(0, len(sequence_file_id)) :
        result.append({})
        result[i]["total_clones"]=0
        result[i]["total_reads"]=0
        result[i]["sample"]={}
        
        #open sample
        with open(defs.DIR_RESULTS+db.results_file[results_file_id[i]].data_file, "r") as json_data:
            d = json.load(json_data)
            list1 = {}
            total_reads1=d["reads"]["segmented"][0]
            for clone in d["clones"]:
                list1[clone["id"]] = clone["reads"][0]
            json_data.close()
        
        #iterate trough others run's samples
        sample_set_run = db( (db.sample_set_membership.sequence_file_id == sequence_file_id[i])
                           & (db.sample_set_membership.sample_set_id == db.sample_set.id)
                           & (db.sample_set.sample_type == "run") ).select().first()
        
        if sample_set_run != None :
            sample_set_id = sample_set_run.sample_set.id
            query = db(  ( db.sample_set.id == sample_set_id )
                       & ( db.sample_set.id == db.sample_set_membership.sample_set_id )
                       & ( db.sequence_file.id == db.sample_set_membership.sequence_file_id)
                       & ( db.sequence_file.id != sequence_file_id[i])
                       & ( db.results_file.sequence_file_id == db.sequence_file.id )
                       & ( db.results_file.config_id == config_id[i]  )
                       ).select(db.sequence_file.ALL,db.results_file.ALL, db.sample_set.id, orderby=db.sequence_file.id|~db.results_file.run_date)

            query2 = []
            sfi = 0
            for row in query : 
                if row.sequence_file.id != sfi :
                    query2.append(row)
                    sfi = row.sequence_file.id
            
            for row in query2 :
Mathieu Giraud's avatar
Mathieu Giraud committed
71
                print(defs.DIR_RESULTS+row.results_file.data_file)
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
                result[i]["sample"][row.results_file.id] = {}
                result[i]["sample"][row.results_file.id]["clones"] = 0
                result[i]["sample"][row.results_file.id]["reads"] = 0
                with open(defs.DIR_RESULTS+row.results_file.data_file, "r") as json_data2:
                    try:
                        d = json.load(json_data2)
                        total_reads2=d["reads"]["segmented"][0]
                        for clone in d["clones"]:
                            if clone["id"] in list1 :
                                if clone["reads"][0] > 10*list1[clone["id"]] :
                                    result[i]["total_clones"] += 1
                                    result[i]["total_reads"] += list1[clone["id"]]
                                    result[i]["sample"][row.results_file.id]["clones"] += 1
                                    result[i]["sample"][row.results_file.id]["reads"] += list1[clone["id"]]
                            
Mathieu Giraud's avatar
Mathieu Giraud committed
87
88
                    except ValueError as e:
                        print('invalid_json')
89
90
91
92
93
                    json_data2.close()


    return result
    
94
def compute_extra(id_file, id_config, min_threshold):
Ryan Herbert's avatar
Ryan Herbert committed
95
    result = {}
96
    d = None
Ryan Herbert's avatar
Ryan Herbert committed
97
98
    results_file = db((db.results_file.sequence_file_id == id_file) &
                      (db.results_file.config_id == id_config)
99
100
101
                    ).select(orderby=~db.results_file.run_date).first()
    filename = defs.DIR_RESULTS+results_file.data_file
    with open(filename, "rb") as rf:
102
        try:
Ryan Herbert's avatar
Ryan Herbert committed
103
            d = json.load(rf)
104
            loci_min = {}
105
106
107
108
109
110
            if 'reads' in d and 'germline' in d['reads']:
                loci_totals = d['reads']['germline']
                for locus in loci_totals:
                    if locus not in result:
                        result[locus] = [0]
                    loci_min[locus] = loci_totals[locus][0] * (min_threshold/100.0)
111

112
113
114
115
116
            if 'clones' in d and d['clones'] is not None:
                for clone in d["clones"]:
                    germline = clone['germline']
                    if clone['reads'][0] >=  loci_min[germline]:
                        result[germline][0] += 1
Ryan Herbert's avatar
Ryan Herbert committed
117
118
119
        except ValueError as e:
            print('invalid_json')
            return "FAIL"
120
121
122
    d['reads']['distribution'] = result
    with open(filename, 'wb') as extra:
        json.dump(d, extra)
Ryan Herbert's avatar
Ryan Herbert committed
123
    return "SUCCESS"
124

125

126
def schedule_run(id_sequence, id_config, grep_reads=None):
127
    from subprocess import Popen, PIPE, STDOUT, os
128

129
130
131
    #check results_file
    row = db( ( db.results_file.config_id == id_config ) & 
             ( db.results_file.sequence_file_id == id_sequence )  
132
133
             ).select()
    
Marc Duez's avatar
Marc Duez committed
134
135
136
137
    ts = time.time()
    data_id = db.results_file.insert(sequence_file_id = id_sequence,
                                     run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
                                     config_id = id_config )
138
        
139
    args = [id_sequence, id_config, data_id, grep_reads]
140

141
142
143
144
    err = assert_scheduler_task_does_not_exist(str(args))
    if err:
        log.error(err)
        return err
145

146
    program = db.config[id_config].program
147
    ##add task to scheduler
148
149
    task = scheduler.queue_task(program, args,
                                repeats = 1, timeout = defs.TASK_TIMEOUT)
150
    
151
    if db.sequence_file[id_sequence].pre_process_flag not in ["COMPLETED", "DONE"] and db.sequence_file[id_sequence].pre_process_flag :
152
153
        db.scheduler_task[task.id] = dict(status ="STOPPED")
    
154
    db.results_file[data_id] = dict(scheduler_task_id = task.id)
155

156
    filename= db.sequence_file[id_sequence].filename
157

158
    res = {"redirect": "reload",
159
           "message": "[%s] c%s: process requested - %s %s" % (data_id, id_config, grep_reads, filename)}
160

161
    log.info(res)
162
163
    return res

164
165
166
167
168
169
170
171
172
173
174
175
def schedule_fuse(sample_set_ids, config_ids):
    args = []
    for sample_set_id in sample_set_ids:
        for config_id in config_ids:
            row = db((db.sample_set_membership.sample_set_id == sample_set_id)
                   & (db.sample_set_membership.sequence_file_id == db.results_file.sequence_file_id)
                   & (db.results_file.config_id == config_id)
                ).select(db.sample_set_membership.sample_set_id, db.sample_set_membership.sequence_file_id,
                        db.results_file.id, db.results_file.config_id).first()
            if row:
                args.append([row.sample_set_membership.sequence_file_id, row.results_file.config_id,
                row.results_file.id, row.sample_set_membership.sample_set_id, False])
176
177
178
    if len(args) > 0:
        task = scheduler.queue_task('refuse', [args],
                                    repeats = 1, timeout = defs.TASK_TIMEOUT)
179

180
181
182
183
def schedule_compute_extra(id_file, id_config, min_threshold):
    args = [id_file, id_config,  min_threshold]
    task = scheduler.queue_task('compute_extra', args, repeats=1, timeout=defs.TASK_TIMEOUT)

184
def run_vidjil(id_file, id_config, id_data, grep_reads,
185
               clean_before=False, clean_after=False):
186
    from subprocess import Popen, PIPE, STDOUT, os
187
188
    from datetime import timedelta as timed
    
189
190
191
192
193
    if db.sequence_file[id_file].pre_process_flag == "FAILED" :
        print("Pre-process has failed")
        raise ValueError('pre-process has failed')
        return "FAIL"
    
194
    ## re schedule if pre_process is still pending
195
    if db.sequence_file[id_file].pre_process_flag not in ["COMPLETED", "DONE"] and db.sequence_file[id_file].pre_process_flag:
196
        
Mathieu Giraud's avatar
Mathieu Giraud committed
197
        print("Pre-process is still pending, re-schedule")
198
    
199
        args = [id_file, id_config, id_data, grep_reads]
200
201
        task = scheduler.queue_task("vidjil", args,
                        repeats = 1, timeout = defs.TASK_TIMEOUT,
202
                               start_time=request.now + timed(seconds=1200))
203
204
        db.results_file[id_data] = dict(scheduler_task_id = task.id)
        db.commit()
Mathieu Giraud's avatar
Mathieu Giraud committed
205
        print(task.id)
206
207
208
209
        sys.stdout.flush()
        
        return "SUCCESS"
    
210
    ## les chemins d'acces a vidjil / aux fichiers de sequences
211
212
    upload_folder = defs.DIR_SEQUENCES
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data
213
    
214
215
216
217
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    
218
219
220
    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
221
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
222
    seq_file = upload_folder+filename
223

224
    ## config de vidjil
225
    vidjil_cmd = db.config[id_config].command
226
227
228
229
230
231
232

    if 'next' in vidjil_cmd:
        vidjil_cmd = vidjil_cmd.replace('next', '')
        vidjil_cmd = vidjil_cmd.replace(' germline' , defs.DIR_GERMLINE_NEXT)
        cmd = defs.DIR_VIDJIL_NEXT + '/vidjil-algo '
    else:
        vidjil_cmd = vidjil_cmd.replace(' germline' , defs.DIR_GERMLINE)
233
        cmd = defs.DIR_VIDJIL + '/vidjil-algo '
234
235
236

    if grep_reads:
        # TODO: security, assert grep_reads XXXX
237
        vidjil_cmd += ' --grep-reads "%s" ' % grep_reads
238
    
239
    os.makedirs(out_folder)
240
241
    out_log = out_folder+'/'+output_filename+'.vidjil.log'
    vidjil_log_file = open(out_log, 'w')
242

243
    try:
244
        ## commande complete
245
        cmd += ' -o  ' + out_folder + " -b " + output_filename
246
247
248
        cmd += ' ' + vidjil_cmd + ' '+ seq_file

        ## execute la commande vidjil
Mathieu Giraud's avatar
Mathieu Giraud committed
249
250
251
        print("=== Launching Vidjil ===")
        print(cmd)    
        print("========================")
252
        sys.stdout.flush()
253

254
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=vidjil_log_file, stderr=STDOUT, close_fds=True)
255

256
        (stdoutdata, stderrdata) = p.communicate()
257

Mathieu Giraud's avatar
Mathieu Giraud committed
258
        print("Output log in " + out_log)
259
260
        sys.stdout.flush()
        db.commit()
261

262
263
264
265
266
        ## Get result file
        if grep_reads:
            out_results = out_folder + '/seq/clone.fa-1'
        else:
            out_results = out_folder + '/' + output_filename + '.vidjil'
267

Mathieu Giraud's avatar
Mathieu Giraud committed
268
        print("===>", out_results)
269
        results_filepath = os.path.abspath(out_results)
270
271

        stream = open(results_filepath, 'rb')
272
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
273
        print("!!! Vidjil failed, no result file")
274
275
        res = {"message": "[%s] c%s: Vidjil FAILED - %s" % (id_data, id_config, out_folder)}
        log.error(res)
276
        raise
277
    
278
279
280
281
282
283
284
285
286
287
288
289
290
    ## Parse some info in .log
    vidjil_log_file.close()

    segmented = re.compile("==> segmented (\d+) reads \((\d*\.\d+|\d+)%\)")
    windows = re.compile("==> found (\d+) .*-windows in .* segments .* inside (\d+) sequences")
    info = ''
    reads = None
    segs = None
    ratio = None
    wins = None
    for l in open(out_log):
        m = segmented.search(l)
        if m:
Mathieu Giraud's avatar
Mathieu Giraud committed
291
            print(l, end=' ')
292
293
294
295
296
297
            segs = int(m.group(1))
            ratio = m.group(2)
            info = "%d segmented (%s%%)" % (segs, ratio)
            continue
        m = windows.search(l)
        if m:
Mathieu Giraud's avatar
Mathieu Giraud committed
298
            print(l, end=' ')
299
300
301
302
303
304
            wins = int(m.group(1))
            reads = int(m.group(2))
            info = "%d reads, " % reads + info + ", %d windows" % wins
            break


305
    ## insertion dans la base de donnée
306
307
    ts = time.time()
    
308
    db.results_file[id_data] = dict(status = "ready",
309
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
310
311
                                 data_file = stream
                                )
312
    
313
314
    db.commit()
    
Mathieu Giraud's avatar
Mathieu Giraud committed
315
316
317
318
    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
319
    
Mathieu Giraud's avatar
Mathieu Giraud committed
320
321
    ## l'output de Vidjil est stocké comme resultat pour l'ordonnanceur
    ## TODO parse result success/fail
322

323
324
    config_name = db.config[id_config].name

325
    res = {"message": "[%s] c%s: Vidjil finished - %s - %s" % (id_data, id_config, info, out_folder)}
326
327
    log.info(res)

328
329

    if not grep_reads:
330
331
        for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
	    sample_set_id = row.sample_set_id
Mathieu Giraud's avatar
Mathieu Giraud committed
332
	    print(row.sample_set_id)
333
            compute_extra(id_file, id_config, 5)
334
            run_fuse(id_file, id_config, id_data, sample_set_id, clean_before = False)
335

Mathieu Giraud's avatar
Mathieu Giraud committed
336
    return "SUCCESS"
337

Alexander Shlemov's avatar
Alexander Shlemov committed
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
def run_igrec(id_file, id_config, id_data, clean_before=False, clean_after=False):
    from subprocess import Popen, PIPE, STDOUT, os
    import time
    import json

    upload_folder = defs.DIR_SEQUENCES
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data

    # FIXME Use shutil instead
    cmd = "rm -rf "+out_folder
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()

    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
    seq_file = upload_folder+filename

    ## config de vidjil
    arg_cmd = db.config[id_config].command

    os.makedirs(out_folder)
    out_log = out_folder + '/vidjil-igrec.log'
    log_file = open(out_log, 'w')

    out_results = out_folder + "/out/igrec.vidjil"
    ## commande complete
    try:
        igrec = defs.DIR_IGREC + '/igrec.py'
        if not os.path.isfile(igrec):
            print("!!! IgReC binary file not found")
        cmd = "%s -s %s -o %s/out %s" % (igrec, seq_file, out_folder, arg_cmd)

        ## execute la commande IgReC
        print("=== Launching IgReC ===")
        print(cmd)
        print("========================")
        sys.stdout.flush()

        p = Popen(cmd, shell=True, stdin=PIPE, stdout=log_file, stderr=STDOUT, close_fds=True)
        p.wait()

        print("Output log in " + out_log)
        sys.stdout.flush()

        ## Get result file
        print("===>", out_results)
        results_filepath = os.path.abspath(out_results)
        stream = open(results_filepath, 'rb')
        stream.close()
    except:
        print("!!! IgReC failed, no result file")
        res = {"message": "[%s] c%s: IgReC FAILED - %s" % (id_data, id_config, out_folder)}
        log.error(res)
        raise

    original_name = row[0].data_file
    with open(results_filepath, 'r') as json_file:
        my_json = json.load(json_file)
        fill_field(my_json, original_name, "original_names", "samples")
        fill_field(my_json, cmd, "commandline", "samples")

        # TODO fix this dirty hack to get around bad file descriptor error
    new_file = open(results_filepath, 'w')
    json.dump(my_json, new_file)
    new_file.close()

    ## insertion dans la base de donnée
    ts = time.time()

    stream = open(results_filepath, 'rb')
    db.results_file[id_data] = dict(status = "ready",
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
                                 data_file = stream
                                )

    db.commit()

    config_name = db.config[id_config].name
    res = {"message": "[%s] c%s: IgReC - %s" % (id_data, id_config, out_folder)}
    log.info(res)

    for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
        sample_set_id = row.sample_set_id
        print(row.sample_set_id)
        run_fuse(id_file, id_config, id_data, sample_set_id, clean_before = False)


    return "SUCCESS"

429
def run_mixcr(id_file, id_config, id_data, clean_before=False, clean_after=False):
430
431
    from subprocess import Popen, PIPE, STDOUT, os
    import time
432
    import json
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451

    upload_folder = defs.DIR_SEQUENCES
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data

    cmd = "rm -rf "+out_folder
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()

    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
    seq_file = upload_folder+filename

    ## config de vidjil
    arg_cmd = db.config[id_config].command

    os.makedirs(out_folder)
    out_log = out_folder + output_filename+'.mixcr.log'
452
    report = out_folder + output_filename + '.mixcr.report'
453
454
455
456
    log_file = open(out_log, 'w')

    out_alignments = out_folder + output_filename + '.align.vdjca'
    out_clones =  out_folder + output_filename + '.clones.clns'
457
458
    out_results_file = output_filename + '.mixcr'
    out_results = out_folder + out_results_file
459

460
    align_report = report + '.aln'
461
    assembly_report = report + '.asmbl'
462

463
464
465
466
467
    arg_cmds = arg_cmd.split('|')
    args_1, args_2, args_3 = '', '', ''
    try:
        args_1, args_2, args_3 = arg_cmds
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
468
469
        print(arg_cmd)
        print("! Bad arguments, we expect args_align | args_assemble | args_exportClones")
470

471
    ## commande complete
472
    try:
473
474
475
476
477
478
479
480
481
        mixcr = defs.DIR_MIXCR + 'mixcr'
        cmd = mixcr + ' align --save-reads -t 1 -r ' + align_report + ' ' + args_1 + ' ' + seq_file  + ' ' + out_alignments
        cmd += ' && '
        cmd += mixcr + ' assemble -t 1 -r ' + assembly_report + ' ' + args_2 + ' ' + out_alignments + ' ' + out_clones
        cmd += ' && rm ' + out_alignments
        cmd += ' && '
        cmd += mixcr + ' exportClones --format vidjil -germline -id -name -reads -sequence -top -seg -s ' + args_3 + ' ' + out_clones + ' ' + out_results

        ## execute la commande MiXCR
Mathieu Giraud's avatar
Mathieu Giraud committed
482
483
484
        print("=== Launching MiXCR ===")
        print(cmd)
        print("========================")
485
        sys.stdout.flush()
486

487
488
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=log_file, stderr=STDOUT, close_fds=True)
        p.wait()
489

Mathieu Giraud's avatar
Mathieu Giraud committed
490
        print("Output log in " + out_log)
491
        sys.stdout.flush()
492

493
        ## Get result file
Mathieu Giraud's avatar
Mathieu Giraud committed
494
        print("===>", out_results)
495
        results_filepath = os.path.abspath(out_results)
HERBERT Ryan's avatar
HERBERT Ryan committed
496
        stream = open(results_filepath, 'rb')
497
        stream.close()
498
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
499
        print("!!! MiXCR failed, no result file")
HERBERT Ryan's avatar
HERBERT Ryan committed
500
501
        res = {"message": "[%s] c%s: MiXCR FAILED - %s" % (id_data, id_config, out_folder)}
        log.error(res)
502
        raise
503

HERBERT Ryan's avatar
HERBERT Ryan committed
504
505
506
    align_report = get_file_content(align_report)
    assembly_report = get_file_content(assembly_report)
    reports = align_report + assembly_report
507
    original_name = row[0].data_file
HERBERT Ryan's avatar
HERBERT Ryan committed
508
    totalReads = extract_total_reads(assembly_report)
509
510
    with open(results_filepath, 'r') as json_file:
        my_json = json.load(json_file)
HERBERT Ryan's avatar
HERBERT Ryan committed
511
512
        fill_field(my_json, reports, "log", "samples", True)
        fill_field(my_json, original_name, "original_names", "samples")
513
        fill_field(my_json, cmd, "commandline", "samples")
HERBERT Ryan's avatar
HERBERT Ryan committed
514
        fill_field(my_json, totalReads, "total", "reads")
515

516
517
518
519
        # TODO fix this dirty hack to get around bad file descriptor error
    new_file = open(results_filepath, 'w')
    json.dump(my_json, new_file)
    new_file.close()
520

521
522
    ## insertion dans la base de donnée
    ts = time.time()
HERBERT Ryan's avatar
HERBERT Ryan committed
523
    
524
    stream = open(results_filepath, 'rb')
525
526
    db.results_file[id_data] = dict(status = "ready",
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
HERBERT Ryan's avatar
HERBERT Ryan committed
527
                                 data_file = stream
528
                                )
HERBERT Ryan's avatar
HERBERT Ryan committed
529
    
530
531
532
533
534
535
    db.commit()

    config_name = db.config[id_config].name
    res = {"message": "[%s] c%s: MiXCR finished - %s" % (id_data, id_config, out_folder)}
    log.info(res)

HERBERT Ryan's avatar
HERBERT Ryan committed
536
537
    for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
        sample_set_id = row.sample_set_id
Mathieu Giraud's avatar
Mathieu Giraud committed
538
        print(row.sample_set_id)
539
        run_fuse(id_file, id_config, id_data, sample_set_id, clean_before = False)
HERBERT Ryan's avatar
HERBERT Ryan committed
540
541


542
    return "SUCCESS"
543

544
def run_copy(id_file, id_config, id_data, clean_before=False, clean_after=False):
545
546
547
548
    from subprocess import Popen, PIPE, STDOUT, os
    
    ## les chemins d'acces a vidjil / aux fichiers de sequences
    upload_folder = defs.DIR_SEQUENCES
549
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
550
551
552
553
554
555
556
557
558
559
560
561
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data
    
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    
    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
    
    os.makedirs(out_folder)
    vidjil_log_file = open(out_folder+'/'+output_filename+'.vidjil.log', 'w')
562

Mathieu Giraud's avatar
Mathieu Giraud committed
563
    print("Output log in "+out_folder+'/'+output_filename+'.vidjil.log')
564
565
566
567
568
    sys.stdout.flush()
    db.commit()
    
    ## récupération du fichier 
    results_filepath = os.path.abspath(defs.DIR_SEQUENCES+row[0].data_file)
569

570
571
572
    try:
        stream = open(results_filepath, 'rb')
    except IOError:
Mathieu Giraud's avatar
Mathieu Giraud committed
573
        print("!!! 'copy' failed, no file")
574
575
        res = {"message": "[%s] c%s: 'copy' FAILED - %s - %s" % (id_data, id_config, info, out_folder)}
        log.error(res)
576
577
578
579
580
581
582
583
        raise IOError
    
    ## insertion dans la base de donnée
    ts = time.time()
    
    db.results_file[id_data] = dict(status = "ready",
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
                                 #data_file = row[0].data_file
584
                                 data_file = db.results_file.data_file.store(stream, row[0].filename)
585
586
587
588
589
590
591
592
593
594
595
                                )
    db.commit()
    
    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
    
    ## l'output de Vidjil est stocké comme resultat pour l'ordonnanceur
    ## TODO parse result success/fail

596
    res = {"message": "[%s] c%s: 'copy' finished - %s" % (id_data, id_config, filename)}
597
598
    log.info(res)

599
600
    for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
        sample_set_id = row.sample_set_id
Mathieu Giraud's avatar
Mathieu Giraud committed
601
        print(row.sample_set_id)
602
        run_fuse(id_file, id_config, id_data, sample_set_id, clean_before = False)
603

604
605

    return "SUCCESS"
Mathieu Giraud's avatar
Mathieu Giraud committed
606
607


608
609
610
611
def run_refuse(args):
    for arg in args:
        run_fuse(arg[0], arg[1], arg[2], arg[3], arg[4])
    return "SUCCESS"
Mathieu Giraud's avatar
Mathieu Giraud committed
612

613
def run_fuse(id_file, id_config, id_data, sample_set_id, clean_before=True, clean_after=False):
614
615
    from subprocess import Popen, PIPE, STDOUT, os
    
616
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data
617
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data + '-%s' % sample_set_id
618
    
Mathieu Giraud's avatar
Mathieu Giraud committed
619
620
621
622
623
    if clean_before:
        cmd = "rm -rf "+out_folder 
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
        os.makedirs(out_folder)    
624
625
    
    
Mathieu Giraud's avatar
Mathieu Giraud committed
626
    fuse_log_file = open(out_folder+'/'+output_filename+'.fuse.log', 'w')
Marc's avatar
Marc committed
627
    
628
    ## fuse.py 
629
    output_file = out_folder+'/'+output_filename+'.fused'
630
    files = ""
631
    sequence_file_list = ""
632
    query2 = db( ( db.results_file.sequence_file_id == db.sequence_file.id )
633
                   & ( db.sample_set_membership.sequence_file_id == db.sequence_file.id)
634
                   & ( db.sample_set_membership.sample_set_id == sample_set_id)
635
                   & ( db.results_file.config_id == id_config )
Vidjil Team's avatar
Vidjil Team committed
636
                   ).select( orderby=db.sequence_file.id|~db.results_file.run_date) 
Marc Duez's avatar
Marc Duez committed
637
638
639
640
641
642
643
    query = []
    sequence_file_id = 0
    for row in query2 : 
        if row.sequence_file.id != sequence_file_id :
            query.append(row)
            sequence_file_id = row.sequence_file.id
            
644
    for row in query :
Mathieu Giraud's avatar
Mathieu Giraud committed
645
646
        if row.results_file.data_file is not None :
            files += defs.DIR_RESULTS + row.results_file.data_file + " "
647
648
            sequence_file_list += str(row.results_file.sequence_file_id) + "_"
            
649
650
651
652
653
654
    if files == "":
        print("!!! Fuse failed: no files to fuse")
        res = {"message": "[%s] c%s: 'fuse' FAILED - %s no files to fuse" % (id_data, id_config, output_file)}
        log.error(res)
        return "FAILED"

655
    try:
656
657
        fuse_cmd = db.config[id_config].fuse_command
        cmd = "python "+defs.DIR_FUSE+"/fuse.py -o "+ output_file + " " + fuse_cmd + " " + files
Mathieu Giraud's avatar
Mathieu Giraud committed
658
659


Mathieu Giraud's avatar
Mathieu Giraud committed
660
661
662
        print("=== fuse.py ===")
        print(cmd)
        print("===============")
663
        sys.stdout.flush()
Mathieu Giraud's avatar
Mathieu Giraud committed
664

665
666
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=fuse_log_file, stderr=STDOUT, close_fds=True)
        (stdoutdata, stderrdata) = p.communicate()
Mathieu Giraud's avatar
Mathieu Giraud committed
667
        print("Output log in "+out_folder+'/'+output_filename+'.fuse.log')
Mathieu Giraud's avatar
Mathieu Giraud committed
668

669
        fuse_filepath = os.path.abspath(output_file)
670
671

        stream = open(fuse_filepath, 'rb')
672
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
673
        print("!!! Fuse failed, no .fused file")
674
675
        res = {"message": "[%s] c%s: 'fuse' FAILED - %s" % (id_data, id_config, output_file)}
        log.error(res)
676
        raise
677

678
    ts = time.time()
679
680
681
682

    fused_files = db( ( db.fused_file.config_id == id_config ) &
                     ( db.fused_file.sample_set_id == sample_set_id )
                 ).select()
683
    existing_fused_file = None
684
    if len(fused_files) > 0:
685
686
687
        fused_file = fused_files[0]
        id_fuse = fused_file.id
        existing_fused_file = fused_file.fused_file
688
689
690
691
    else:
        id_fuse = db.fused_file.insert(sample_set_id = sample_set_id,
                                       config_id = id_config)

692
    db.fused_file[id_fuse] = dict(fuse_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
693
694
                                 fused_file = stream,
                                 sequence_file_list = sequence_file_list)
695
    db.commit()
Mathieu Giraud's avatar
Mathieu Giraud committed
696
697
698
699
700

    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
701
702
703
704
705
        # remove previous fused_file if it exists
        if existing_fused_file is not None:
            clean_cmd = "rm -rf %s/%s" % (out_folder, existing_fused_file)
            p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
            p.wait()
706
    
707
    res = {"message": "[%s] c%s: 'fuse' finished - %s" % (id_data, id_config, db.fused_file[id_fuse].fused_file)}
708
709
    log.info(res)

710
711
712
    # Remove temporary fused file
    os.remove(output_file)

713
    return "SUCCESS"
714

715
def custom_fuse(file_list):
716
    from subprocess import Popen, PIPE, STDOUT, os
717
718
719

    if defs.PORT_FUSE_SERVER is None:
        raise IOError('This server cannot fuse custom data')
720
    random_id = random.randint(99999999,99999999999)
721
    out_folder = os.path.abspath(defs.DIR_OUT_VIDJIL_ID % random_id)
722
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % random_id
723
724
725
726
727
    
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    os.makedirs(out_folder)    
728

729
    res = {"message": "'custom fuse' (%d files): %s" % (len(file_list), ','.join(file_list))}
730
731
    log.info(res)
        
732
733
734
735
736
    ## fuse.py 
    output_file = out_folder+'/'+output_filename+'.fused'
    files = ""
    for id in file_list :
        if db.results_file[id].data_file is not None :
737
            files += os.path.abspath(defs.DIR_RESULTS + db.results_file[id].data_file) + " "
738
    
739
    try:
740
        cmd = "python "+ os.path.abspath(defs.DIR_FUSE) +"/fuse.py -o "+output_file+" -t 100 "+files
741
        proc_srvr = xmlrpclib.ServerProxy("http://%s:%d" % (defs.FUSE_SERVER, defs.PORT_FUSE_SERVER))
742
        fuse_filepath = proc_srvr.fuse(cmd, out_folder, output_filename)
743
744
745
    
        f = open(fuse_filepath, 'rb')
        data = gluon.contrib.simplejson.loads(f.read())
746
    except:
747
748
        res = {"message": "'custom fuse' -> IOError"}
        log.error(res)
749
        raise
750
751
752
753
754

    clean_cmd = "rm -rf " + out_folder 
    p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()

755
756
757
    res = {"message": "'custom fuse' -> finished"}
    log.info(res)

758
    return data
759

HERBERT Ryan's avatar
HERBERT Ryan committed
760
#TODO move this ?
761
762
763
764
765
766
def get_file_content(filename):
    content = ""
    with open(filename, 'rb') as my_file:
        content = my_file.read()
    return content

HERBERT Ryan's avatar
HERBERT Ryan committed
767
768
769
770
771
772
773
774
775
776
777
778
779
def fill_field(dest, value, field, parent="", append=False):
    if parent is not None and parent != "":
        dest = dest[parent]

    if field in dest:
        if append:
            dest[field][0] += value
        else:
            dest[field][0] = value
    else:
        dest[field] = []
        dest[field].append(value)

HERBERT Ryan's avatar
HERBERT Ryan committed
780
781
782
783
def extract_total_reads(report):
    reads_matcher = re.compile("Total Reads analysed: [0-9]+")
    reads_line = reads_matcher.search(report).group()
    return reads_line.split(' ')[-1]
784
785
786
787
788
789
790
791
792
793
794
795
796
797

def schedule_pre_process(sequence_file_id, pre_process_id):
    from subprocess import Popen, PIPE, STDOUT, os


    args = [pre_process_id, sequence_file_id]

    err = assert_scheduler_task_does_not_exist(str(args))
    if err:
        log.error(err)
        return err

    task = scheduler.queue_task("pre_process", args,
                                repeats = 1, timeout = defs.TASK_TIMEOUT)
798
799
    db.sequence_file[sequence_file_id] = dict(pre_process_scheduler_task_id = task.id)
    
800
    res = {"redirect": "reload",
801
           "message": "{%s} (%s): process requested" % (sequence_file_id, pre_process_id)}
802
803
804
805
806

    log.info(res)
    return res


807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
def get_preprocessed_filename(filename1, filename2):
    '''
    Get the same extension and then get the most common among them

    >>> get_preprocessed_filename('lsdkj_toto-tata_mlkmfsdlkf.fastq.gz', 'rete_toto-tata_eekjdf.fastq.gz')
    '_toto-tata_.fastq.gz'
    '''
    if not filename2:
        return filename1
    extension = tools_utils.get_common_suffpref([filename1, filename2], min(len(filename1), len(filename2)), -1)
    without_extension = [x[0:-len(extension)] for x in [filename1, filename2]]
    common = tools_utils.common_substring(without_extension)
    if len(common) == 0:
        common = '_'.join(without_extension)
    return common+extension


824
def run_pre_process(pre_process_id, sequence_file_id, clean_before=True, clean_after=False):
Mathieu Giraud's avatar
Mathieu Giraud committed
825
826
827
828
829
    '''
    Run a pre-process on sequence_file.data_file (and possibly sequence_file.data_file+2),
    put the output back in sequence_file.data_file.
    '''

830
831
    from subprocess import Popen, PIPE, STDOUT, os
    
832
    sequence_file = db.sequence_file[sequence_file_id]
833
834
835
    db.sequence_file[sequence_file_id] = dict(pre_process_flag = "RUN")
    db.commit()
    
836
    out_folder = defs.DIR_PRE_VIDJIL_ID % sequence_file_id
837
838
    output_filename = get_preprocessed_filename(get_original_filename(sequence_file.data_file),
                                                get_original_filename(sequence_file.data_file2))
839
840
841
842
843
844
845
    
    if clean_before:
        cmd = "rm -rf "+out_folder 
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
        os.makedirs(out_folder)    

846
    output_file = out_folder+'/'+output_filename
847
848
            
    pre_process = db.pre_process[pre_process_id]
849

850
    try:
851
852
853
854
855
        cmd = pre_process.command.replace( "&file1&", defs.DIR_SEQUENCES + sequence_file.data_file)
        if sequence_file.data_file2:
            cmd = cmd.replace( "&file2&", defs.DIR_SEQUENCES + sequence_file.data_file2)
        cmd = cmd.replace( "&result&", output_file)
        cmd = cmd.replace("&pear&", defs.DIR_PEAR)
856
857
858
859
860
        # Example of template to add some preprocess shortcut
        # cmd = cmd.replace("&preprocess_template&", defs.DIR_preprocess_template)
        # Where &preprocess_template& is the shortcut to change and
        # defs.DIR_preprocess_template the variable to set into the file defs.py. 
        # The value should be the path to access to the preprocess software.
861

Mathieu Giraud's avatar
Mathieu Giraud committed
862
863
864
        print("=== Pre-process %s ===" % pre_process_id)
        print(cmd)
        print("===============")
865
        sys.stdout.flush()
866

867
868
869
870
871
872
        out_log = out_folder+'/'+output_filename+'.pre.log'
        log_file = open(out_log, 'w')

        os.chdir(defs.DIR_FUSE)
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=log_file, stderr=log_file, close_fds=True)
        (stdoutdata, stderrdata) = p.communicate()
Mathieu Giraud's avatar
Mathieu Giraud committed
873
        print("Output log in " + out_log)
874

875
        filepath = os.path.abspath(output_file)
876
877

        stream = open(filepath, 'rb')
878
    except:
Mathieu Giraud's avatar
Mathieu Giraud committed
879
        print("!!! Pre-process failed, no result file")
880
        res = {"message": "{%s} p%s: 'pre_process' FAILED - %s" % (sequence_file_id, pre_process_id, output_file)}
881
        log.error(res)
882
883
        db.sequence_file[sequence_file_id] = dict(pre_process_flag = "FAILED")
        db.commit()
884
        raise
885

marc's avatar
marc committed
886
887
        

Mathieu Giraud's avatar
Mathieu Giraud committed
888
889
    # Now we update the sequence file with the result of the pre-process
    # We forget the initial data_file (and possibly data_file2)
890
    db.sequence_file[sequence_file_id] = dict(data_file = stream,
891
                                              data_file2 = None,
892
                                              pre_process_flag = "COMPLETED")
893
    db.commit()
894
895
896
897
898
899
900
901
902
903
904
    #resume STOPPED task for this sequence file
    stopped_task = db(
        (db.results_file.sequence_file_id == sequence_file_id)
        & (db.results_file.scheduler_task_id == db.scheduler_task.id)
        & (db.scheduler_task.status == "STOPPED")
    ).select()
    
    for row in stopped_task :
        db.scheduler_task[row.scheduler_task.id] = dict(status = "QUEUED")
        
    
905
906
    db.commit()

907
    # Dump log in scheduler_run.run_output
908
909
    log_file.close()
    for l in open(out_log):
Mathieu Giraud's avatar
Mathieu Giraud committed
910
        print(l, end=' ')
911
912
913

    # Remove data file from disk to save space (it is now saved elsewhere)
    os.remove(filepath)
914
    
915
916
917
918
919
    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
    
920
    res = {"message": "{%s} p%s: 'pre_process' finished - %s" % (sequence_file_id, pre_process_id, output_file)}
921
922
923
924
    log.info(res)

    return "SUCCESS"
    
HERBERT Ryan's avatar
HERBERT Ryan committed
925
    
926
from gluon.scheduler import Scheduler
927
scheduler = Scheduler(db, dict(vidjil=run_vidjil,
928
                               compute_contamination=compute_contamination,
929
                               compute_extra=compute_extra,
930
                               mixcr=run_mixcr,
Alexander Shlemov's avatar
Alexander Shlemov committed
931
                               igrec=run_igrec,
932
                               none=run_copy,
Ryan Herbert's avatar
Ryan Herbert committed
933
                               pre_process=run_pre_process,
934
                               refuse=run_refuse),
HERBERT Ryan's avatar
HERBERT Ryan committed
935
                        heartbeat=defs.SCHEDULER_HEARTBEAT)