task.py 25 KB
Newer Older
1
# coding: utf8
2
import json
3
import os
4
import defs
5
import re
6
import os.path
7
8
9
import time
import sys
import datetime
10
11
import random
import xmlrpclib
12

13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

def assert_scheduler_task_does_not_exist(args):
    ##check scheduled run
    row = db( ( db.scheduler_task.args == args)
         & ( db.scheduler_task.status != "FAILED"  )
         & ( db.scheduler_task.status != "EXPIRED"  )
         & ( db.scheduler_task.status != "TIMEOUT"  )
         & ( db.scheduler_task.status != "COMPLETED"  )
         ).select()

    if len(row) > 0 :
        res = {"message": "task already registered"}
        return res

    return None

29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
def compute_contamination(sequence_file_id, results_file_id, config_id):
    result = []
    for i in range(0, len(sequence_file_id)) :
        result.append({})
        result[i]["total_clones"]=0
        result[i]["total_reads"]=0
        result[i]["sample"]={}
        
        #open sample
        with open(defs.DIR_RESULTS+db.results_file[results_file_id[i]].data_file, "r") as json_data:
            d = json.load(json_data)
            list1 = {}
            total_reads1=d["reads"]["segmented"][0]
            for clone in d["clones"]:
                list1[clone["id"]] = clone["reads"][0]
            json_data.close()
        
        #iterate trough others run's samples
        sample_set_run = db( (db.sample_set_membership.sequence_file_id == sequence_file_id[i])
                           & (db.sample_set_membership.sample_set_id == db.sample_set.id)
                           & (db.sample_set.sample_type == "run") ).select().first()
        
        if sample_set_run != None :
            sample_set_id = sample_set_run.sample_set.id
            query = db(  ( db.sample_set.id == sample_set_id )
                       & ( db.sample_set.id == db.sample_set_membership.sample_set_id )
                       & ( db.sequence_file.id == db.sample_set_membership.sequence_file_id)
                       & ( db.sequence_file.id != sequence_file_id[i])
                       & ( db.results_file.sequence_file_id == db.sequence_file.id )
                       & ( db.results_file.config_id == config_id[i]  )
                       ).select(db.sequence_file.ALL,db.results_file.ALL, db.sample_set.id, orderby=db.sequence_file.id|~db.results_file.run_date)

            query2 = []
            sfi = 0
            for row in query : 
                if row.sequence_file.id != sfi :
                    query2.append(row)
                    sfi = row.sequence_file.id
            
            for row in query2 :
                print defs.DIR_RESULTS+row.results_file.data_file
                result[i]["sample"][row.results_file.id] = {}
                result[i]["sample"][row.results_file.id]["clones"] = 0
                result[i]["sample"][row.results_file.id]["reads"] = 0
                with open(defs.DIR_RESULTS+row.results_file.data_file, "r") as json_data2:
                    try:
                        d = json.load(json_data2)
                        total_reads2=d["reads"]["segmented"][0]
                        for clone in d["clones"]:
                            if clone["id"] in list1 :
                                if clone["reads"][0] > 10*list1[clone["id"]] :
                                    result[i]["total_clones"] += 1
                                    result[i]["total_reads"] += list1[clone["id"]]
                                    result[i]["sample"][row.results_file.id]["clones"] += 1
                                    result[i]["sample"][row.results_file.id]["reads"] += list1[clone["id"]]
                            
                    except ValueError, e:
                        print 'invalid_json'
                    json_data2.close()


    return result
    
92

93
def schedule_run(id_sequence, id_sample_set, id_config, grep_reads=None):
94
    from subprocess import Popen, PIPE, STDOUT, os
95

96
97
98
    #check results_file
    row = db( ( db.results_file.config_id == id_config ) & 
             ( db.results_file.sequence_file_id == id_sequence )  
99
100
             ).select()
    
Marc Duez's avatar
Marc Duez committed
101
102
103
104
    ts = time.time()
    data_id = db.results_file.insert(sequence_file_id = id_sequence,
                                     run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
                                     config_id = id_config )
105
        
106
107
108
109
110
    if grep_reads:
        args = [id_sequence, id_config, data_id, None, grep_reads]
    else:
        ## check fused_file
        row2 = db( ( db.fused_file.config_id == id_config ) &
111
                   ( db.fused_file.sample_set_id == id_sample_set )
112
113
114
115
116
               ).select()

        if len(row2) > 0 : ## update
            fuse_id = row2[0].id
        else:             ## create
117
            fuse_id = db.fused_file.insert(sample_set_id = id_sample_set,
118
119
120
121
                                           config_id = id_config)

        args = [id_sequence, id_config, data_id, fuse_id, None]

122
123
124
125
    err = assert_scheduler_task_does_not_exist(str(args))
    if err:
        log.error(err)
        return err
126

127
    program = db.config[id_config].program
128
    ##add task to scheduler
129
130
    task = scheduler.queue_task(program, args,
                                repeats = 1, timeout = defs.TASK_TIMEOUT)
131
    db.results_file[data_id] = dict(scheduler_task_id = task.id)
132

133
    filename= db.sequence_file[id_sequence].filename
134

135
    res = {"redirect": "reload",
136
           "message": "[%s] (%s) c%s: process requested - %s %s" % (data_id, id_sample_set, id_config, grep_reads, filename)}
137

138
    log.info(res)
139
140
    return res

141

142
143
def run_vidjil(id_file, id_config, id_data, id_fuse, grep_reads,
               clean_before=False, clean_after=False):
144
    from subprocess import Popen, PIPE, STDOUT, os
145
146
147
148
149
    from datetime import timedelta as timed
    
    ## re schedule if pre_process is still pending
    if db.sequence_file[id_file].pre_process_flag != True :
        
150
        print "Pre-process is still pending, re-schedule"
151
152
153
154
155
156
157
158
159
160
161
162
    
        args = [id_file, id_config, id_data, id_fuse, grep_reads]
        task = scheduler.queue_task("vidjil", args,
                        repeats = 1, timeout = defs.TASK_TIMEOUT,
                               start_time=request.now + timed(seconds=120))
        db.results_file[id_data] = dict(scheduler_task_id = task.id)
        db.commit()
        print task.id
        sys.stdout.flush()
        
        return "SUCCESS"
    
163
164
    
    ## les chemins d'acces a vidjil / aux fichiers de sequences
165
    germline_folder = defs.DIR_VIDJIL + '/germline/'
166
167
    upload_folder = defs.DIR_SEQUENCES
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data
168
    
169
170
171
172
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    
173
174
175
    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
176
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
177
    seq_file = upload_folder+filename
178

179
    ## config de vidjil
180
    vidjil_cmd = db.config[id_config].command
181
    vidjil_cmd = vidjil_cmd.replace( ' germline' ,germline_folder)
182
183
184
185

    if grep_reads:
        # TODO: security, assert grep_reads XXXX
        vidjil_cmd += ' -FaW "%s" ' % grep_reads
186
    
187
    os.makedirs(out_folder)
188
189
    out_log = out_folder+'/'+output_filename+'.vidjil.log'
    vidjil_log_file = open(out_log, 'w')
190

191
    ## commande complete
192
    cmd = defs.DIR_VIDJIL + '/vidjil ' + ' -o  ' + out_folder + " -b " + output_filename
193
    cmd += ' ' + vidjil_cmd + ' '+ seq_file
194
    
195
    ## execute la commande vidjil
196
197
198
199
200
    print "=== Launching Vidjil ==="
    print cmd    
    print "========================"
    sys.stdout.flush()

201
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=vidjil_log_file, stderr=STDOUT, close_fds=True)
202

203
    (stdoutdata, stderrdata) = p.communicate()
204

205
    print "Output log in "+out_folder+'/'+output_filename+'.vidjil.log'
206
207
208
    sys.stdout.flush()
    db.commit()

209
210
211
212
213
214
215
216
    ## Get result file
    if grep_reads:
        out_results = out_folder + '/seq/clone.fa-1'
    else:
        out_results = out_folder + '/' + output_filename + '.vidjil'

    print "===>", out_results
    results_filepath = os.path.abspath(out_results)
217
218
219
220

    try:
        stream = open(results_filepath, 'rb')
    except IOError:
221
        print "!!! Vidjil failed, no result file"
222
223
        res = {"message": "[%s] c%s: Vidjil FAILED - %s" % (id_data, id_config, out_folder)}
        log.error(res)
224
        raise IOError
225
    
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
    ## Parse some info in .log
    vidjil_log_file.close()

    segmented = re.compile("==> segmented (\d+) reads \((\d*\.\d+|\d+)%\)")
    windows = re.compile("==> found (\d+) .*-windows in .* segments .* inside (\d+) sequences")
    info = ''
    reads = None
    segs = None
    ratio = None
    wins = None
    for l in open(out_log):
        m = segmented.search(l)
        if m:
            print l,
            segs = int(m.group(1))
            ratio = m.group(2)
            info = "%d segmented (%s%%)" % (segs, ratio)
            continue
        m = windows.search(l)
        if m:
            print l,
            wins = int(m.group(1))
            reads = int(m.group(2))
            info = "%d reads, " % reads + info + ", %d windows" % wins
            break


253
    ## insertion dans la base de donnée
254
255
    ts = time.time()
    
256
    db.results_file[id_data] = dict(status = "ready",
257
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
258
259
                                 data_file = stream
                                )
260
    
261
262
    db.commit()
    
Mathieu Giraud's avatar
Mathieu Giraud committed
263
264
265
266
    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
267
    
Mathieu Giraud's avatar
Mathieu Giraud committed
268
269
    ## l'output de Vidjil est stocké comme resultat pour l'ordonnanceur
    ## TODO parse result success/fail
270

271
272
    config_name = db.config[id_config].name

273
    res = {"message": "[%s] c%s: Vidjil finished - %s - %s" % (id_data, id_config, info, out_folder)}
274
275
    log.info(res)

276
277

    if not grep_reads:
278
279
280
281
        for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
	    sample_set_id = row.sample_set_id
	    print row.sample_set_id
            run_fuse(id_file, id_config, id_data, id_fuse, sample_set_id, clean_before = False)
282

Mathieu Giraud's avatar
Mathieu Giraud committed
283
    return "SUCCESS"
284

HERBERT Ryan's avatar
HERBERT Ryan committed
285
def run_mixcr(id_file, id_config, id_data, id_fuse, clean_before=False, clean_after=False):
286
287
    from subprocess import Popen, PIPE, STDOUT, os
    import time
288
    import json
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307

    upload_folder = defs.DIR_SEQUENCES
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data

    cmd = "rm -rf "+out_folder
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()

    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
    seq_file = upload_folder+filename

    ## config de vidjil
    arg_cmd = db.config[id_config].command

    os.makedirs(out_folder)
    out_log = out_folder + output_filename+'.mixcr.log'
308
    report = out_folder + output_filename + '.mixcr.report'
309
310
311
312
    log_file = open(out_log, 'w')

    out_alignments = out_folder + output_filename + '.align.vdjca'
    out_clones =  out_folder + output_filename + '.clones.clns'
313
314
    out_results_file = output_filename + '.mixcr'
    out_results = out_folder + out_results_file
315

316
    align_report = report + '.aln'
317
    assembly_report = report + '.asmbl'
318

319
320
321
322
323
324
325
326
    arg_cmds = arg_cmd.split('|')
    args_1, args_2, args_3 = '', '', ''
    try:
        args_1, args_2, args_3 = arg_cmds
    except:
        print arg_cmd
        print "! Bad arguments, we expect args_align | args_assemble | args_exportClones"

327
328
    ## commande complete
    mixcr = defs.DIR_MIXCR + 'mixcr'
329
330
331
    cmd = mixcr + ' align --save-reads -t 1 -r ' + align_report + ' ' + args_1 + ' ' + seq_file  + ' ' + out_alignments
    cmd += ' && '
    cmd += mixcr + ' assemble -t 1 -r ' + assembly_report + ' ' + args_2 + ' ' + out_alignments + ' ' + out_clones
332
    cmd += ' && rm ' + out_alignments
333
334
    cmd += ' && '
    cmd += mixcr + ' exportClones --format vidjil -germline -id -name -reads -sequence -top -seg -s ' + args_3 + ' ' + out_clones + ' ' + out_results
335
336
337
338
339
340
341
342

    ## execute la commande MiXCR
    print "=== Launching MiXCR ==="
    print cmd
    print "========================"
    sys.stdout.flush()

    p = Popen(cmd, shell=True, stdin=PIPE, stdout=log_file, stderr=STDOUT, close_fds=True)
343
    p.wait()
344
345
346
347
348
349

    print "Output log in " + out_log
    sys.stdout.flush()

    ## Get result file
    print "===>", out_results
HERBERT Ryan's avatar
HERBERT Ryan committed
350
351
352
    results_filepath = os.path.abspath(out_results)
    try:
        stream = open(results_filepath, 'rb')
353
        stream.close()
HERBERT Ryan's avatar
HERBERT Ryan committed
354
355
356
357
358
    except IOError:
        print "!!! MiXCR failed, no result file"
        res = {"message": "[%s] c%s: MiXCR FAILED - %s" % (id_data, id_config, out_folder)}
        log.error(res)
        raise IOError
359

HERBERT Ryan's avatar
HERBERT Ryan committed
360
361
362
    align_report = get_file_content(align_report)
    assembly_report = get_file_content(assembly_report)
    reports = align_report + assembly_report
363
    original_name = row[0].filename
HERBERT Ryan's avatar
HERBERT Ryan committed
364
    totalReads = extract_total_reads(assembly_report)
365
366
    with open(results_filepath, 'r') as json_file:
        my_json = json.load(json_file)
HERBERT Ryan's avatar
HERBERT Ryan committed
367
368
        fill_field(my_json, reports, "log", "samples", True)
        fill_field(my_json, original_name, "original_names", "samples")
369
        fill_field(my_json, cmd, "commandline", "samples")
HERBERT Ryan's avatar
HERBERT Ryan committed
370
        fill_field(my_json, totalReads, "total", "reads")
371

372
373
374
375
        # TODO fix this dirty hack to get around bad file descriptor error
    new_file = open(results_filepath, 'w')
    json.dump(my_json, new_file)
    new_file.close()
376

377
378
    ## insertion dans la base de donnée
    ts = time.time()
HERBERT Ryan's avatar
HERBERT Ryan committed
379
    
380
    stream = open(results_filepath, 'rb')
381
382
    db.results_file[id_data] = dict(status = "ready",
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
HERBERT Ryan's avatar
HERBERT Ryan committed
383
                                 data_file = stream
384
                                )
HERBERT Ryan's avatar
HERBERT Ryan committed
385
    
386
387
388
389
390
391
    db.commit()

    config_name = db.config[id_config].name
    res = {"message": "[%s] c%s: MiXCR finished - %s" % (id_data, id_config, out_folder)}
    log.info(res)

HERBERT Ryan's avatar
HERBERT Ryan committed
392
393
394
395
396
397
    for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
        sample_set_id = row.sample_set_id
        print row.sample_set_id
        run_fuse(id_file, id_config, id_data, id_fuse, sample_set_id, clean_before = False)


398
    return "SUCCESS"
399

400
401
402
403
404
def run_copy(id_file, id_config, id_data, id_fuse, clean_before=False, clean_after=False):
    from subprocess import Popen, PIPE, STDOUT, os
    
    ## les chemins d'acces a vidjil / aux fichiers de sequences
    upload_folder = defs.DIR_SEQUENCES
405
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
406
407
408
409
410
411
412
413
414
415
416
417
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data
    
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    
    ## filepath du fichier de séquence
    row = db(db.sequence_file.id==id_file).select()
    filename = row[0].data_file
    
    os.makedirs(out_folder)
    vidjil_log_file = open(out_folder+'/'+output_filename+'.vidjil.log', 'w')
418

419
420
421
422
423
424
    print "Output log in "+out_folder+'/'+output_filename+'.vidjil.log'
    sys.stdout.flush()
    db.commit()
    
    ## récupération du fichier 
    results_filepath = os.path.abspath(defs.DIR_SEQUENCES+row[0].data_file)
425

426
427
428
    try:
        stream = open(results_filepath, 'rb')
    except IOError:
429
430
431
        print "!!! 'copy' failed, no file"
        res = {"message": "[%s] c%s: 'copy' FAILED - %s - %s" % (id_data, id_config, info, out_folder)}
        log.error(res)
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
        raise IOError
    
    ## insertion dans la base de donnée
    ts = time.time()
    
    db.results_file[id_data] = dict(status = "ready",
                                 run_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
                                 #data_file = row[0].data_file
                                 data_file = stream
                                )
    db.commit()
    
    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
    
    ## l'output de Vidjil est stocké comme resultat pour l'ordonnanceur
    ## TODO parse result success/fail

452
    res = {"message": "[%s] c%s: 'copy' finished - %s" % (id_data, id_config, filename)}
453
454
    log.info(res)

455
456
457
458
459
    for row in db(db.sample_set_membership.sequence_file_id==id_file).select() :
        sample_set_id = row.sample_set_id
        print row.sample_set_id
        run_fuse(id_file, id_config, id_data, id_fuse, sample_set_id, clean_before = False)

460
461

    return "SUCCESS"
Mathieu Giraud's avatar
Mathieu Giraud committed
462
463
464



465
def run_fuse(id_file, id_config, id_data, id_fuse, sample_set_id, clean_before=True, clean_after=False):
466
467
    from subprocess import Popen, PIPE, STDOUT, os
    
468
    out_folder = defs.DIR_OUT_VIDJIL_ID % id_data
469
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % id_data
470
    
Mathieu Giraud's avatar
Mathieu Giraud committed
471
472
473
474
475
    if clean_before:
        cmd = "rm -rf "+out_folder 
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
        os.makedirs(out_folder)    
476
477
    
    
Mathieu Giraud's avatar
Mathieu Giraud committed
478
    fuse_log_file = open(out_folder+'/'+output_filename+'.fuse.log', 'w')
Marc's avatar
Marc committed
479
    
480
    ## fuse.py 
481
    output_file = out_folder+'/'+output_filename+'.fused'
482
    files = ""
483
    sequence_file_list = ""
484
    query2 = db( ( db.results_file.sequence_file_id == db.sequence_file.id )
485
                   & ( db.sample_set_membership.sequence_file_id == db.sequence_file.id)
486
                   & ( db.sample_set_membership.sample_set_id == sample_set_id)
487
                   & ( db.results_file.config_id == id_config )
Vidjil Team's avatar
Vidjil Team committed
488
                   ).select( orderby=db.sequence_file.id|~db.results_file.run_date) 
Marc Duez's avatar
Marc Duez committed
489
490
491
492
493
494
495
    query = []
    sequence_file_id = 0
    for row in query2 : 
        if row.sequence_file.id != sequence_file_id :
            query.append(row)
            sequence_file_id = row.sequence_file.id
            
496
    for row in query :
Mathieu Giraud's avatar
Mathieu Giraud committed
497
498
        if row.results_file.data_file is not None :
            files += defs.DIR_RESULTS + row.results_file.data_file + " "
499
500
            sequence_file_list += str(row.results_file.sequence_file_id) + "_"
            
501
502
    fuse_cmd = db.config[id_config].fuse_command
    cmd = "python "+defs.DIR_FUSE+"/fuse.py -o "+ output_file + " " + fuse_cmd + " " + files
Mathieu Giraud's avatar
Mathieu Giraud committed
503
504
505


    print "=== fuse.py ==="
506
    print cmd
Mathieu Giraud's avatar
Mathieu Giraud committed
507
508
509
510
511
512
513
    print "==============="
    sys.stdout.flush()

    p = Popen(cmd, shell=True, stdin=PIPE, stdout=fuse_log_file, stderr=STDOUT, close_fds=True)
    (stdoutdata, stderrdata) = p.communicate()
    print "Output log in "+out_folder+'/'+output_filename+'.fuse.log'

514
    fuse_filepath = os.path.abspath(output_file)
515
516
517
518
519

    try:
        stream = open(fuse_filepath, 'rb')
    except IOError:
        print "!!! Fuse failed, no .fused file"
520
521
        res = {"message": "[%s] c%s: 'fuse' FAILED - %s" % (id_data, id_config, output_file)}
        log.error(res)
522
523
        raise IOError

524
525
    ts = time.time()
    db.fused_file[id_fuse] = dict(fuse_date = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S'),
526
527
                                 fused_file = stream,
                                 sequence_file_list = sequence_file_list)
528
    db.commit()
Mathieu Giraud's avatar
Mathieu Giraud committed
529
530
531
532
533

    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
534
    
535
    res = {"message": "[%s] c%s: 'fuse' finished - %s" % (id_data, id_config, output_file)}
536
537
    log.info(res)

538
    return "SUCCESS"
539

540
def custom_fuse(file_list):
541
    from subprocess import Popen, PIPE, STDOUT, os
542
543
544

    if defs.PORT_FUSE_SERVER is None:
        raise IOError('This server cannot fuse custom data')
545
    random_id = random.randint(99999999,99999999999)
546
    out_folder = os.path.abspath(defs.DIR_OUT_VIDJIL_ID % random_id)
547
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % random_id
548
549
550
551
552
    
    cmd = "rm -rf "+out_folder 
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()
    os.makedirs(out_folder)    
553

554
    res = {"message": "'custom fuse' (%d files): %s" % (len(file_list), ','.join(file_list))}
555
556
    log.info(res)
        
557
558
559
560
561
    ## fuse.py 
    output_file = out_folder+'/'+output_filename+'.fused'
    files = ""
    for id in file_list :
        if db.results_file[id].data_file is not None :
562
            files += os.path.abspath(defs.DIR_RESULTS + db.results_file[id].data_file) + " "
563
    
564
    cmd = "python "+ os.path.abspath(defs.DIR_FUSE) +"/fuse.py -o "+output_file+" -t 100 "+files
565
    proc_srvr = xmlrpclib.ServerProxy("http://localhost:%d" % defs.PORT_FUSE_SERVER)
566
    fuse_filepath = proc_srvr.fuse(cmd, out_folder, output_filename)
567
568
569
570
    
    try:
        f = open(fuse_filepath, 'rb')
        data = gluon.contrib.simplejson.loads(f.read())
571
    except IOError, e:
572
573
        res = {"message": "'custom fuse' -> IOError"}
        log.error(res)
574
        raise e
575
576
577
578
579

    clean_cmd = "rm -rf " + out_folder 
    p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
    p.wait()

580
581
582
    res = {"message": "'custom fuse' -> finished"}
    log.info(res)

583
    return data
584

HERBERT Ryan's avatar
HERBERT Ryan committed
585
#TODO move this ?
586
587
588
589
590
591
def get_file_content(filename):
    content = ""
    with open(filename, 'rb') as my_file:
        content = my_file.read()
    return content

HERBERT Ryan's avatar
HERBERT Ryan committed
592
593
594
595
596
597
598
599
600
601
602
603
604
def fill_field(dest, value, field, parent="", append=False):
    if parent is not None and parent != "":
        dest = dest[parent]

    if field in dest:
        if append:
            dest[field][0] += value
        else:
            dest[field][0] = value
    else:
        dest[field] = []
        dest[field].append(value)

HERBERT Ryan's avatar
HERBERT Ryan committed
605
606
607
608
def extract_total_reads(report):
    reads_matcher = re.compile("Total Reads analysed: [0-9]+")
    reads_line = reads_matcher.search(report).group()
    return reads_line.split(' ')[-1]
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631

def schedule_pre_process(sequence_file_id, pre_process_id):
    from subprocess import Popen, PIPE, STDOUT, os


    args = [pre_process_id, sequence_file_id]

    err = assert_scheduler_task_does_not_exist(str(args))
    if err:
        log.error(err)
        return err

    task = scheduler.queue_task("pre_process", args,
                                repeats = 1, timeout = defs.TASK_TIMEOUT)

    res = {"redirect": "reload",
           "message": "[%s] (%s): process requested" % (sequence_file_id, pre_process_id)}

    log.info(res)
    return res


def run_pre_process(pre_process_id, sequence_file_id, clean_before=True, clean_after=False):
Mathieu Giraud's avatar
Mathieu Giraud committed
632
633
634
635
636
    '''
    Run a pre-process on sequence_file.data_file (and possibly sequence_file.data_file+2),
    put the output back in sequence_file.data_file.
    '''

637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
    from subprocess import Popen, PIPE, STDOUT, os
    
    out_folder = defs.DIR_PRE_VIDJIL_ID % sequence_file_id
    output_filename = defs.BASENAME_OUT_VIDJIL_ID % sequence_file_id
    
    if clean_before:
        cmd = "rm -rf "+out_folder 
        p = Popen(cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
        os.makedirs(out_folder)    

    output_file = out_folder+'/'+output_filename+'.fastq'
            
    sequence_file = db.sequence_file[sequence_file_id]
    pre_process = db.pre_process[pre_process_id]
    
    
    cmd = pre_process.command.replace( "&file1&", defs.DIR_SEQUENCES + sequence_file.data_file)
655
656
    if sequence_file.data_file2:
        cmd = cmd.replace( "&file2&", defs.DIR_SEQUENCES + sequence_file.data_file2)
657
658
    cmd = cmd.replace( "&result&", output_file)

659
    print "=== Pre-process %s ===" % pre_process_id
660
    print cmd
661
    print "==============="
662
    sys.stdout.flush()
663

664
665
    log_file = open(out_folder+'/'+output_filename+'.pre.log', 'w')
    
666
    os.chdir(defs.DIR_FUSE)
667
668
669
670
671
672
673
674
675
676
677
678
679
680
    p = Popen(cmd, shell=True, stdin=PIPE, stdout=log_file, stderr=log_file, close_fds=True)
    (stdoutdata, stderrdata) = p.communicate()
    print "Output log in "+out_folder+'/'+output_filename+'.pre.log'

    filepath = os.path.abspath(output_file)

    try:
        stream = open(filepath, 'rb')
    except IOError:
        print "!!! Pre-process failed, no result file"
        res = {"message": "[%s] c%s: 'pre-process' FAILED - %s" % (sequence_file_id, pre_process_id, output_file)}
        log.error(res)
        raise IOError

Mathieu Giraud's avatar
Mathieu Giraud committed
681
682
    # Now we update the sequence file with the result of the pre-process
    # We forget the initial data_file (and possibly data_file2)
683
    db.sequence_file[sequence_file_id] = dict(data_file = stream,
684
685
                                              data_file2 = None,
                                              pre_process_flag = True)
686
687
    db.commit()

688
689
690
691
    # Dump log in scheduler_run.run_output
    for l in open(log_file):
        print l,
    
692
693
694
695
696
697
698
699
700
701
    if clean_after:
        clean_cmd = "rm -rf " + out_folder 
        p = Popen(clean_cmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True)
        p.wait()
    
    res = {"message": "[%s] c%s: 'pre-process' finished - %s" % (sequence_file_id, pre_process_id, output_file)}
    log.info(res)

    return "SUCCESS"
    
HERBERT Ryan's avatar
HERBERT Ryan committed
702
    
703
from gluon.scheduler import Scheduler
704
scheduler = Scheduler(db, dict(vidjil=run_vidjil,
705
                               compute_contamination=compute_contamination,
706
                               mixcr=run_mixcr,
707
708
                               none=run_copy,
                               pre_process=run_pre_process))