diff --git a/src/Log.cc b/src/Log.cc index 1093263ecc14596c3151e656ef60fbd0c18c557c..75b62f683a068cf7435be2773991415da7f12cc9 100644 --- a/src/Log.cc +++ b/src/Log.cc @@ -168,7 +168,7 @@ Log::sync() CycleCounter<uint64_t> __(&PerfStats::threadStats.logSyncCycles); Tub<SpinLock::Guard> lock; - lock.construct(appendLock); +//lock.construct(appendLock); metrics.totalSyncCalls++; // The only time 'head' should be NULL is after construction and before the @@ -198,9 +198,9 @@ Log::sync() // lock and grab the sync lock. This allows other writers to append to the // log while we wait. Once we grab the sync lock, take the append lock again // to ensure our new view of the head is consistent. - lock.destroy(); - SpinLock::Guard _(syncLock); - lock.construct(appendLock); +//lock.destroy(); +//SpinLock::Guard _(syncLock); +//lock.construct(appendLock); // See if we still have work to do. It's possible that another thread // already did the syncing we needed for us. @@ -212,7 +212,7 @@ Log::sync() // Drop the append lock. We don't want to block other appending threads // while we sync. - lock.destroy(); +//lock.destroy(); originalHead->replicatedSegment->sync(appendedLength, &certificate); originalHead->syncedLength = appendedLength; diff --git a/src/MasterService.cc b/src/MasterService.cc index c3c538e14505a0a281e164f949c905524e403fee..49513e5e0c0297902544af1e1fa8189943d0e923 100644 --- a/src/MasterService.cc +++ b/src/MasterService.cc @@ -1607,20 +1607,26 @@ MasterService::multiReadGroup(const WireFormat::MultiOp::Request* reqHdr, currentResp->offset = 0; bool succeeded = false; - succeeded = objectManager.getMultiLog(currentReq->streamId, currentReq->streamletId) - ->readSegment( - currentReq->segmentId, currentReq->groupId, currentReq->offset, - currentReq->maxObjects, - ¤tResp->numberObjectsRead, maxResponseSize, - rpc->replyPayload, ¤tResp->offset, - ¤tResp->numberOfObjectEntries, - ¤tResp->isSegmentClosed); + MultiLog* streamlet = objectManager.getMultiLog(currentReq->streamId, currentReq->streamletId); + if(streamlet) + succeeded = streamlet->readSegment( currentReq->segmentId, + currentReq->groupId, currentReq->offset, currentReq->maxObjects, + ¤tResp->numberObjectsRead, maxResponseSize, + rpc->replyPayload, ¤tResp->offset, + ¤tResp->numberOfObjectEntries, + ¤tResp->isSegmentClosed); - if (!succeeded) { //some error in log.multireadSegment - currentResp->status = STATUS_RETRY; - } else { - currentResp->status = STATUS_OK; - } + if (!succeeded) { //some error in log.multireadSegment + if(!streamlet) { + currentResp->status = STATUS_REQUEST_FORMAT_ERROR; + RAMCLOUD_LOG(WARNING, "Consumer requesting non-existing streamId: <%lu> - streamletID: <%u>", currentReq->streamId, currentReq->streamletId); + } + else + currentResp->status = STATUS_RETRY; + + } else { + currentResp->status = STATUS_OK; + } // RAMCLOUD_LOG(NOTICE, ">>> request service input: groupId=%lu segmentId=%lu " // " offset=%u maxObjects=%u numberObjectsRead=%u status=%d", currentReq->groupId, currentReq->segmentId, diff --git a/src/MultiLog.cc b/src/MultiLog.cc index e3f46264aa90d0b02cf0ab0a2516e43c96cbbf6a..83514426b948646519ae7e25ee46c55a3ee55b15 100644 --- a/src/MultiLog.cc +++ b/src/MultiLog.cc @@ -149,7 +149,8 @@ bool MultiLog::appendMultipleObjects(uint64_t tableId, Buffer& objectsBuffer, throw FatalError(HERE, "Guaranteed append managed to fail"); *numberObjectsAppended += recordsCount; - + + log->sync(); return true; @@ -264,7 +265,6 @@ LogSegment* MultiLog::allocHead(uint64_t* groupId, Log* log) { //Check the safety TODO log->head = groupHead; - return groupHead; }; diff --git a/src/MultiReadWriteGroupBatchPerfTest.cc b/src/MultiReadWriteGroupBatchPerfTest.cc index a88d780a7b42a72c5f478b14cce6e79ecc81b15f..22ae25011432f5030f1165acee3da9241291209c 100644 --- a/src/MultiReadWriteGroupBatchPerfTest.cc +++ b/src/MultiReadWriteGroupBatchPerfTest.cc @@ -642,7 +642,7 @@ TEST_F(MultiReadWriteGroupBatchPerfTest, multiwritegroupbatch_multireadbatch_pro // memset(data, 0, dataLength); // objreq.get()->allocFirstChunkInternal(dataLength); // objreq.get()->allocFirstChunk(dataLength, data); - + RAMCLOUD_LOG(NOTICE, "Requesting object with stream: %lu - streamlet %u", tableId1, streamletId); multiReadGroupObject.construct(tableId1, streamletId, "object1-1", keysize, &respvalues[0], groupList); MultiReadGroupObject* requests[] = {multiReadGroupObject.get()};