diff --git a/src/Log.cc b/src/Log.cc index 27a2285e0e660f5419b1febfa2ad9814947ed317..cf62c0590f4e4c003cb55794f9fee5a7bc87fbec 100644 --- a/src/Log.cc +++ b/src/Log.cc @@ -182,7 +182,7 @@ Log::allocNewWritableHead(VirtualSegment* prevHead) // pass head reference to vhead; head is the storage for vhead's chunks vhead = new VirtualSegment(masterId, logId, - streamId, nextSegmentId++, segmentSize, head, prevHead, segmentManager); + streamId, nextSegmentId++, segmentSize-200, head, prevHead, segmentManager); // Allocate a new ReplicatedSegment to handle backing up the new head. This // call will also sync the initial data (header, digest, etc) to the needed diff --git a/src/ObjectManager.cc b/src/ObjectManager.cc index b182044dcdb2af0336e14fa9ed6c90d749017b30..bd73d673b155a1de54fb5c2c1a933635d4064ea7 100644 --- a/src/ObjectManager.cc +++ b/src/ObjectManager.cc @@ -708,7 +708,7 @@ ObjectManager::multireadSegment(Streamlet* streamlet, uint64_t streamId, uint32_ // RAMCLOUD_LOG(NOTICE, ">>>======!!!!!!!!===== ObjectManager::readSegment offset %u nextOffset %u reads %u max %u length %u head %u", // firstOffset, *nextOffset, *numberObjectsRead, maxObjects, currentLength, head); - assert(*numberObjectsRead > 0 || (*numberObjectsRead == 0 && (*isSegmentClosed || *nextOffset == head))); + //assert(*numberObjectsRead > 0 || (*numberObjectsRead == 0 && (*isSegmentClosed || *nextOffset == head))); if(useSharedPlasma) { //create Plasma shared object diff --git a/src/Transport.h b/src/Transport.h index 1b2f0fe1e96cb9825ea451c3f65dd5356b3a5843..24282cd676271100cb35ebf33cb4d2d29cf14ef8 100644 --- a/src/Transport.h +++ b/src/Transport.h @@ -356,7 +356,7 @@ class Transport { /// to trigger retransmissions. The current value for this was chosen /// somewhat subjectively in 11/2011, based on the presence of time gaps /// in the poller of as much as 250ms. - static const uint32_t DEFAULT_TIMEOUT_MS = 5000; + static const uint32_t DEFAULT_TIMEOUT_MS = 250000; /** * One ServerPort instance is created for a listen port diff --git a/src/VirtualSegment.cc b/src/VirtualSegment.cc index a4be9dcb6722f0ba775df173d70236a6797b5004..743bdbf96d1b595fe7d2a3fe844d70c62847552c 100644 --- a/src/VirtualSegment.cc +++ b/src/VirtualSegment.cc @@ -65,9 +65,9 @@ VirtualSegment::VirtualSegment(uint64_t masterId, uint64_t logId, vlogNextHead(NULL), vlogNextHeadOffset(0), chunksCount(0), - chunkLock("VirtualSegment::chunkLock"), - segmentsHeadMap(), -// durableChunkIndex(0), +// chunkLock("VirtualSegment::chunkLock"), +// segmentsHeadMap(), + durableChunkIndex(0), durableChunkOffset(0), closedCommitted(false), closed(false), @@ -325,7 +325,7 @@ VirtualSegment::appendToBuffer(Buffer& buffer, uint32_t offset, uint32_t length, //go forward through chunks until lastChunkOffset + chunk->length == offset + length Segment* segment = NULL; - segmentsHeadMap.clear(); +// segmentsHeadMap.clear(); while(*lastChunkOffset < offsetAndLength) { //safe to access chunksBuffer after getAppendedLength called with Log#appendLock @@ -343,11 +343,11 @@ VirtualSegment::appendToBuffer(Buffer& buffer, uint32_t offset, uint32_t length, segment->appendToBuffer(buffer, ced.offset, ced.length); // SpinLock::Guard _(chunkLock); - if(contains(segmentsHeadMap, ced.physicalSegmentId)) { - segmentsHeadMap[ced.physicalSegmentId] += ced.length; - } else { - segmentsHeadMap[ced.physicalSegmentId] = ced.length; - } +// if(contains(segmentsHeadMap, ced.physicalSegmentId)) { +// segmentsHeadMap[ced.physicalSegmentId] += ced.length; +// } else { +// segmentsHeadMap[ced.physicalSegmentId] = ced.length; +// } } else { // fprintf(stdout, ">>> replicate chunkIndex vlogNextHeadOffset vlogHeadLength %u %u %u\n", *chunkIndex, vlogNextHeadOffset, vlogHeadLength); @@ -365,11 +365,11 @@ VirtualSegment::appendToBuffer(Buffer& buffer, uint32_t offset, uint32_t length, segment->appendToBuffer(buffer, ced.offset, ced.length); // SpinLock::Guard _(chunkLock); - if(contains(segmentsHeadMap, ced.physicalSegmentId)) { - segmentsHeadMap[ced.physicalSegmentId] += ced.length; - } else { - segmentsHeadMap[ced.physicalSegmentId] = ced.length; - } +// if(contains(segmentsHeadMap, ced.physicalSegmentId)) { +// segmentsHeadMap[ced.physicalSegmentId] += ced.length; +// } else { +// segmentsHeadMap[ced.physicalSegmentId] = ced.length; +// } } *chunkIndex += sizeof32(ChunkEntryDigest); //move to next chunk @@ -420,16 +420,58 @@ VirtualSegment::syncTo(uint32_t syncedLength) { Segment* segment = NULL; - for (std::unordered_map<uint64_t, uint32_t>::iterator it=segmentsHeadMap.begin(); it!=segmentsHeadMap.end(); ++it) { - durableChunkOffset += it->second; - uint64_t physicalSegmentId = it->first; - uint16_t bucket = physicalSegmentId % 1024; //the physical segment id - SpinLock::Guard _(segmentManager->segmentsLocks[bucket]); - if (contains(segmentManager->segmentsMap[bucket], physicalSegmentId)) { - segment = segmentManager->segmentsMap[bucket][physicalSegmentId]; - segment->updateDurableHead(it->second); - } - } +// for (std::unordered_map<uint64_t, uint32_t>::iterator it=segmentsHeadMap.begin(); it!=segmentsHeadMap.end(); ++it) { +// durableChunkOffset += it->second; +// uint64_t physicalSegmentId = it->first; +// uint16_t bucket = physicalSegmentId % 1024; //the physical segment id +// SpinLock::Guard _(segmentManager->segmentsLocks[bucket]); +// if (contains(segmentManager->segmentsMap[bucket], physicalSegmentId)) { +// segment = segmentManager->segmentsMap[bucket][physicalSegmentId]; +// segment->updateDurableHead(it->second); +// } +// } + +// fprintf(stdout, ">>> before syncTo durableChunkOffset %u durableChunkIndex %u vlogHeadOffset %u vlogNextHeadOffset %u syncedLength %u\n", +// (unsigned)durableChunkOffset, durableChunkIndex, +// (unsigned)vlogHeadOffset, (unsigned)vlogNextHeadOffset, syncedLength); +// fflush (stdout); + + while(durableChunkOffset < syncedLength) { + //safe to access chunksBuffer after getAppendedLength called with Log#appendLock + if (durableChunkIndex + vlogHeadOffset + sizeof32(ChunkEntryDigest) < segmentSize) { + ChunkEntryDigest ced = vlogHead->getAtOffset(durableChunkIndex + vlogHeadOffset); + durableChunkOffset += ced.length; //just appended + { + // need to protect segmentsMap from segmentManager access + uint16_t bucket = ced.physicalSegmentId % 1024; //the physical segment id + SpinLock::Guard _(segmentManager->segmentsLocks[bucket]); + if (contains(segmentManager->segmentsMap[bucket], ced.physicalSegmentId)) { + segment = segmentManager->segmentsMap[bucket][ced.physicalSegmentId]; + } + } + segment->updateDurableHead(ced.length); + } else { +// fprintf(stdout, ">>> replicate chunkIndex vlogNextHeadOffset vlogHeadLength %u %u %u\n", *chunkIndex, vlogNextHeadOffset, vlogHeadLength); +// fflush (stdout); + ChunkEntryDigest ced = vlogNextHead->getAtOffset(durableChunkIndex + vlogNextHeadOffset - vlogHeadLength); + durableChunkOffset += ced.length; //just appended + { + // need to protect segmentsMap from segmentManager access + uint16_t bucket = ced.physicalSegmentId % 1024; //the physical segment id + SpinLock::Guard _(segmentManager->segmentsLocks[bucket]); + if (contains(segmentManager->segmentsMap[bucket], ced.physicalSegmentId)) { + segment = segmentManager->segmentsMap[bucket][ced.physicalSegmentId]; + } + } + segment->updateDurableHead(ced.length); + } + durableChunkIndex += sizeof32(ChunkEntryDigest); //move to next chunk + } + +// fprintf(stdout, ">>> after syncTo durableChunkOffset %u durableChunkIndex %u vlogHeadOffset %u vlogNextHeadOffset %u syncedLength %u\n", +// (unsigned)durableChunkOffset, durableChunkIndex, +// (unsigned)vlogHeadOffset, (unsigned)vlogNextHeadOffset, syncedLength); +// fflush (stdout); if(durableChunkOffset != syncedLength) { throw FatalError(HERE, "Chunks should be atomically replicated"); diff --git a/src/VirtualSegment.h b/src/VirtualSegment.h index b3b4b04e9c8fe9adc0851e322bb1bee6e2e2da83..ba9d2c697a0ee8324a4bf62f70bb8d57401e015e 100644 --- a/src/VirtualSegment.h +++ b/src/VirtualSegment.h @@ -118,13 +118,13 @@ class VirtualSegment { std::atomic<uint32_t> vlogNextHeadOffset; //0 uint32_t chunksCount; - SpinLock chunkLock; //protects segmentsHeadMap +// SpinLock chunkLock; //protects segmentsHeadMap //physicalSegmentId to (replicated) length - used by segments[key]->updateDurableHead(value); - std::unordered_map<uint64_t, uint32_t> segmentsHeadMap; +// std::unordered_map<uint64_t, uint32_t> segmentsHeadMap; //points to last valid chunk already synchronized, see syncTo -// uint32_t durableChunkIndex; //offset into last chunk durably synced - vlogHead + uint32_t durableChunkIndex; //offset into last chunk durably synced - vlogHead std::atomic<uint32_t> durableChunkOffset; // <= head , how much was durably replicated /// Tells the close state of this segment is replicated durably.