diff --git a/engine.c b/engine.c index afaae1f..76af990 100644 --- a/engine.c +++ b/engine.c @@ -258,7 +258,7 @@ ptrackMapReadFromFile(const char *ptrack_path) * postmaster is the only user right now. */ elog(DEBUG1, "ptrack read map: crc %u, file_crc %u, init_lsn %X/%X", - crc, *file_crc, (uint32) (ptrack_map->init_lsn.value >> 32), (uint32) ptrack_map->init_lsn.value); + crc, *file_crc, (uint16) (ptrack_map->init_lsn.value >> 16), (uint16) ptrack_map->init_lsn.value); if (!EQ_CRC32C(*file_crc, crc)) { @@ -330,7 +330,7 @@ ptrackMapInit(void) * Fill entries with InvalidXLogRecPtr * (InvalidXLogRecPtr is actually 0) */ - memset(ptrack_map->entries, 0, PtrackContentNblocks * sizeof(pg_atomic_uint64)); + memset(ptrack_map->entries, 0, PtrackContentNblocks * sizeof(pg_atomic_uint32)); /* * Last part of memory representation of ptrack_map (crc) is actually unused * so leave it as it is @@ -348,11 +348,15 @@ ptrackCheckpoint(void) pg_crc32c crc; char ptrack_path[MAXPGPATH]; char ptrack_path_tmp[MAXPGPATH]; - XLogRecPtr init_lsn; - pg_atomic_uint64 buf[PTRACK_BUF_SIZE]; + uint32 init_lsn; + pg_atomic_uint32 buf[PTRACK_BUF_SIZE]; struct stat stat_buf; uint64 i = 0; uint64 j = 0; + XLogRecPtr new_init_lsn; + uint32 new_init_lsn32; + uint32 latest_lsn; + bool lsn_was_advanced = false; elog(DEBUG1, "ptrack checkpoint"); @@ -408,20 +412,27 @@ ptrackCheckpoint(void) ptrack_write_chunk(ptrack_tmp_fd, &crc, (char *) ptrack_map, offsetof(PtrackMapHdr, init_lsn)); - init_lsn = pg_atomic_read_u64(&ptrack_map->init_lsn); + latest_lsn = pg_atomic_read_u32(&ptrack_map->latest_lsn); + init_lsn = pg_atomic_read_u32(&ptrack_map->init_lsn); /* Set init_lsn during checkpoint if it is not set yet */ if (init_lsn == InvalidXLogRecPtr) { - XLogRecPtr new_init_lsn; - if (RecoveryInProgress()) new_init_lsn = GetXLogReplayRecPtr(NULL); else new_init_lsn = GetXLogInsertRecPtr(); - pg_atomic_write_u64(&ptrack_map->init_lsn, new_init_lsn); - init_lsn = new_init_lsn; + new_init_lsn32 = (uint32)(new_init_lsn >> 16); + pg_atomic_write_u32(&ptrack_map->init_lsn, new_init_lsn32); + init_lsn = new_init_lsn32; + } + else if (lsn_diff(lsn_advance(init_lsn, PtrackLSNGap), latest_lsn) < 0) + { + new_init_lsn32 = lsn_advance(init_lsn, PtrackLSNGap); + lsn_was_advanced = true; + pg_atomic_write_u32(&ptrack_map->init_lsn, new_init_lsn32); + init_lsn = new_init_lsn32; } /* Put init_lsn in the same buffer */ @@ -435,18 +446,22 @@ ptrackCheckpoint(void) */ while (i < PtrackContentNblocks) { - XLogRecPtr lsn; + uint32 lsn; /* - * We store LSN values as pg_atomic_uint64 in the ptrack map, but - * pg_atomic_read_u64() returns uint64. That way, we have to put this - * lsn into the buffer array of pg_atomic_uint64's. We are the only + * We store LSN values as pg_atomic_uint32 in the ptrack map, but + * pg_atomic_read_u32() returns uint32. That way, we have to put this + * lsn into the buffer array of pg_atomic_uint32's. We are the only * one who write into this buffer, so we do it without locks. * * TODO: is it safe and can we do any better? */ - lsn = pg_atomic_read_u64(&ptrack_map->entries[i]); - buf[j].value = lsn; + lsn = pg_atomic_read_u32(&ptrack_map->entries[i]); + + if (lsn_was_advanced && lsn_diff(lsn, init_lsn) < 0) + buf[j].value = InvalidXLogRecPtr; + else + buf[j].value = lsn; i++; j++; @@ -464,7 +479,6 @@ ptrackCheckpoint(void) ptrack_write_chunk(ptrack_tmp_fd, &crc, (char *) buf, writesz); elog(DEBUG5, "ptrack checkpoint: i " UINT64_FORMAT ", j " UINT64_FORMAT ", writesz %zu PtrackContentNblocks " UINT64_FORMAT, i, j, writesz, (uint64) PtrackContentNblocks); - j = 0; } } @@ -472,7 +486,7 @@ ptrackCheckpoint(void) /* Write if anything left */ if ((i + 1) % PTRACK_BUF_SIZE != 0) { - size_t writesz = sizeof(pg_atomic_uint64) * j; + size_t writesz = sizeof(pg_atomic_uint32) * j; ptrack_write_chunk(ptrack_tmp_fd, &crc, (char *) buf, writesz); elog(DEBUG5, "ptrack checkpoint: final i " UINT64_FORMAT ", j " UINT64_FORMAT ", writesz %zu PtrackContentNblocks " UINT64_FORMAT, @@ -537,7 +551,7 @@ assign_ptrack_map_size(int newval, void *extra) !InitializingParallelWorker) { /* Cast to uint64 in order to avoid int32 overflow */ - ptrack_map_size = (uint64) 1024 * 1024 * newval; + ptrack_map_size = (uint64)(1024 * 1024 * newval); elog(DEBUG1, "assign_ptrack_map_size: ptrack_map_size set to " UINT64_FORMAT, ptrack_map_size); @@ -673,8 +687,44 @@ ptrack_walkdir(const char *path, Oid tablespaceOid, Oid dbOid) } /* - * Mark modified block in ptrack_map. + * Get a second position within ptrack map so that it fits + * within the same memory page. */ +inline size_t +get_slot2(size_t slot1, uint32 hash) { + size_t slot2; + slot2 = TYPEALIGN_DOWN(ENTRIES_PER_PAGE, slot1) + ((hash << 16) | (hash >> 16)) % ENTRIES_PER_PAGE; + slot2 = slot1 == slot2 ? slot2+1 : slot2; + return slot2; +} + +static void +ptrack_mark_map_pair(size_t slot1, size_t slot2, uint32 new_lsn32) +{ + /* + * We use pg_atomic_uint64 here only for alignment purposes, because + * pg_atomic_uint64 is forcedly aligned on 8 bytes during the MSVC build. + */ + pg_atomic_uint32 old_lsn; + + /* Assign latest_lsn first */ + old_lsn.value = pg_atomic_read_u32(&ptrack_map->latest_lsn); + while (old_lsn.value < new_lsn32 && + !pg_atomic_compare_exchange_u32(&ptrack_map->latest_lsn, (uint32 *) &old_lsn.value, new_lsn32)); + + /* Then, atomically assign new LSN value to the first slot */ + old_lsn.value = pg_atomic_read_u32(&ptrack_map->entries[slot1]); + elog(DEBUG3, "ptrack_mark_block: map[%zu]=%u <- %u", slot1, old_lsn.value, new_lsn32); + while (old_lsn.value < new_lsn32 && + !pg_atomic_compare_exchange_u32(&ptrack_map->entries[slot1], (uint32 *) &old_lsn.value, new_lsn32)); + + /* And to the second */ + old_lsn.value = pg_atomic_read_u32(&ptrack_map->entries[slot2]); + elog(DEBUG3, "ptrack_mark_block: map[%zu]=%u <- %u", slot2, old_lsn.value, new_lsn32); + while (old_lsn.value < new_lsn32 && + !pg_atomic_compare_exchange_u32(&ptrack_map->entries[slot2], (uint32 *) &old_lsn.value, new_lsn32)); +} + void ptrack_mark_block(RelFileNodeBackend smgr_rnode, ForkNumber forknum, BlockNumber blocknum) @@ -683,13 +733,15 @@ ptrack_mark_block(RelFileNodeBackend smgr_rnode, uint64 hash; size_t slot1; size_t slot2; + size_t max_lsn_slot1; + size_t max_lsn_slot2; XLogRecPtr new_lsn; + uint32 new_lsn32; /* * We use pg_atomic_uint64 here only for alignment purposes, because * pg_atomic_uint64 is forcedly aligned on 8 bytes during the MSVC build. */ - pg_atomic_uint64 old_lsn; - pg_atomic_uint64 old_init_lsn; + pg_atomic_uint32 old_init_lsn; if (ptrack_map_size == 0 || ptrack_map == NULL @@ -703,32 +755,56 @@ ptrack_mark_block(RelFileNodeBackend smgr_rnode, hash = BID_HASH_FUNC(bid); slot1 = (size_t)(hash % PtrackContentNblocks); - slot2 = (size_t)(((hash << 32) | (hash >> 32)) % PtrackContentNblocks); + slot2 = get_slot2(slot1, hash); + + bid.blocknum = InvalidBlockNumber; + hash = BID_HASH_FUNC(bid); + max_lsn_slot1 = (size_t)(hash % PtrackContentNblocks); + max_lsn_slot2 = (max_lsn_slot1 + 1) % PtrackContentNblocks; if (RecoveryInProgress()) new_lsn = GetXLogReplayRecPtr(NULL); else new_lsn = GetXLogInsertRecPtr(); + new_lsn32 = (uint32)(new_lsn >> 16); + /* Atomically assign new init LSN value */ - old_init_lsn.value = pg_atomic_read_u64(&ptrack_map->init_lsn); + old_init_lsn.value = pg_atomic_read_u32(&ptrack_map->init_lsn); if (old_init_lsn.value == InvalidXLogRecPtr) { - elog(DEBUG1, "ptrack_mark_block: init_lsn " UINT64_FORMAT " <- " UINT64_FORMAT, old_init_lsn.value, new_lsn); + elog(DEBUG1, "ptrack_mark_block: init_lsn %u <- %u", old_init_lsn.value, new_lsn32); - while (old_init_lsn.value < new_lsn && - !pg_atomic_compare_exchange_u64(&ptrack_map->init_lsn, (uint64 *) &old_init_lsn.value, new_lsn)); + while (old_init_lsn.value < new_lsn32 && + !pg_atomic_compare_exchange_u32(&ptrack_map->init_lsn, (uint32 *) &old_init_lsn.value, new_lsn32)); } - /* Atomically assign new LSN value to the first slot */ - old_lsn.value = pg_atomic_read_u64(&ptrack_map->entries[slot1]); - elog(DEBUG3, "ptrack_mark_block: map[%zu]=" UINT64_FORMAT " <- " UINT64_FORMAT, slot1, old_lsn.value, new_lsn); - while (old_lsn.value < new_lsn && - !pg_atomic_compare_exchange_u64(&ptrack_map->entries[slot1], (uint64 *) &old_lsn.value, new_lsn)); + // mark the page + ptrack_mark_map_pair(slot1, slot2, new_lsn32); + // mark the file (new LSN is always valid maximum LSN) + ptrack_mark_map_pair(max_lsn_slot1, max_lsn_slot2, new_lsn32); +} - /* And to the second */ - old_lsn.value = pg_atomic_read_u64(&ptrack_map->entries[slot2]); - elog(DEBUG3, "ptrack_mark_block: map[%zu]=" UINT64_FORMAT " <- " UINT64_FORMAT, slot2, old_lsn.value, new_lsn); - while (old_lsn.value < new_lsn && - !pg_atomic_compare_exchange_u64(&ptrack_map->entries[slot2], (uint64 *) &old_lsn.value, new_lsn)); +XLogRecPtr ptrack_read_file_maxlsn(RelFileNode rnode, ForkNumber forknum) +{ + PtBlockId bid; + uint64 hash; + size_t slot1; + size_t slot2; + XLogRecPtr update_lsn1; + XLogRecPtr update_lsn2; + + bid.relnode = rnode; + bid.forknum = forknum; + bid.blocknum = InvalidBlockNumber; + + hash = BID_HASH_FUNC(bid); + + slot1 = (size_t)(hash % PtrackContentNblocks); + slot2 = (slot1 + 1) % PtrackContentNblocks; + + update_lsn1 = pg_atomic_read_u32(&ptrack_map->entries[slot1]); + update_lsn2 = pg_atomic_read_u32(&ptrack_map->entries[slot2]); + + return update_lsn1 == update_lsn2 ? update_lsn1 : InvalidXLogRecPtr; } diff --git a/engine.h b/engine.h index 56777fc..98f2ceb 100644 --- a/engine.h +++ b/engine.h @@ -40,6 +40,15 @@ */ #define PTRACK_BUF_SIZE ((uint64) 8000) +/* + * A reasonable assumption for most systems. Postgres core + * leverages the same value for this purpose. + */ +#define MEMORY_PAGE_SIZE 4096 +#define MEMORY_PAGE_ALIGN(LEN) TYPEALIGN(MEMORY_PAGE_SIZE, (LEN)) +#define MEMORY_PAGE_ALIGN_DOWN(LEN) TYPEALIGN_DOWN(MEMORY_PAGE_SIZE, (LEN)) +#define ENTRIES_PER_PAGE (MEMORY_PAGE_SIZE/sizeof(uint32)) + /* Ptrack magic bytes */ #define PTRACK_MAGIC "ptk" #define PTRACK_MAGIC_SIZE 4 @@ -65,11 +74,15 @@ typedef struct PtrackMapHdr */ uint32 version_num; + /* Padding needed to align entries[] by the page boundary */ + char padding[4096 - PTRACK_MAGIC_SIZE - sizeof(uint32) - 2*sizeof(pg_atomic_uint32)]; + /* LSN of current writing position */ + pg_atomic_uint32 latest_lsn; /* LSN of the moment, when map was last enabled. */ - pg_atomic_uint64 init_lsn; + pg_atomic_uint32 init_lsn; /* Followed by the actual map of LSNs */ - pg_atomic_uint64 entries[FLEXIBLE_ARRAY_MEMBER]; + pg_atomic_uint32 entries[FLEXIBLE_ARRAY_MEMBER]; /* * At the end of the map CRC of type pg_crc32c is stored. @@ -80,11 +93,11 @@ typedef PtrackMapHdr * PtrackMap; /* Number of elements in ptrack map (LSN array) */ #define PtrackContentNblocks \ - ((ptrack_map_size - offsetof(PtrackMapHdr, entries) - sizeof(pg_crc32c)) / sizeof(pg_atomic_uint64)) + ((ptrack_map_size - offsetof(PtrackMapHdr, entries) - sizeof(pg_crc32c)) / sizeof(pg_atomic_uint32)) /* Actual size of the ptrack map, that we are able to fit into ptrack_map_size */ #define PtrackActualSize \ - (offsetof(PtrackMapHdr, entries) + PtrackContentNblocks * sizeof(pg_atomic_uint64) + sizeof(pg_crc32c)) + (offsetof(PtrackMapHdr, entries) + PtrackContentNblocks * sizeof(pg_atomic_uint32) + sizeof(pg_crc32c)) /* CRC32 value offset in order to directly access it in the shared memory chunk */ #define PtrackCrcOffset (PtrackActualSize - sizeof(pg_crc32c)) @@ -94,6 +107,7 @@ typedef PtrackMapHdr * PtrackMap; #define BID_HASH_FUNC(bid) \ (DatumGetUInt64(hash_any_extended((unsigned char *)&bid, sizeof(bid), 0))) +#define PtrackLSNGap 10e8 /* * Per process pointer to shared ptrack_map */ @@ -115,8 +129,11 @@ extern void assign_ptrack_map_size(int newval, void *extra); extern void ptrack_walkdir(const char *path, Oid tablespaceOid, Oid dbOid); extern void ptrack_mark_block(RelFileNodeBackend smgr_rnode, ForkNumber forkno, BlockNumber blkno); +extern XLogRecPtr ptrack_read_file_maxlsn(RelFileNode smgr_rnode, + ForkNumber forknum); extern bool is_cfm_file_path(const char *path); +extern size_t get_slot2(size_t slot1, uint32 hash); #ifdef PGPRO_EE extern off_t get_cfs_relation_file_decompressed_size(RelFileNodeBackend rnode, const char *fullpath, ForkNumber forknum); diff --git a/ptrack.c b/ptrack.c index 22a2acf..d4b38e3 100644 --- a/ptrack.c +++ b/ptrack.c @@ -410,6 +410,7 @@ ptrack_filelist_getnext(PtScanCtx * ctx) char *fullpath; struct stat fst; off_t rel_st_size = 0; + XLogRecPtr maxlsn; #if CFS_SUPPORT RelFileNodeBackend rnodebackend; #endif @@ -461,6 +462,18 @@ ptrack_filelist_getnext(PtScanCtx * ctx) goto get_next; } + maxlsn = ptrack_read_file_maxlsn(pfl->relnode, pfl->forknum); + + if (maxlsn < ctx->lsn) + { + elog(DEBUG3, "ptrack: skip file %s: maxlsn is %X/%X, expected %X/%X", + fullpath, (uint16) (maxlsn >> 16), (uint16) maxlsn, + (uint16) (ctx->lsn >> 16), (uint16) ctx->lsn); + + /* Try the next one */ + goto get_next; + } + #if CFS_SUPPORT nodeOf(rnodebackend) = ctx->bid.relnode; rnodebackend.backend = InvalidBackendId; @@ -508,7 +521,7 @@ ptrack_init_lsn(PG_FUNCTION_ARGS) { if (ptrack_map != NULL) { - XLogRecPtr init_lsn = pg_atomic_read_u64(&ptrack_map->init_lsn); + XLogRecPtr init_lsn = (XLogRecPtr) (pg_atomic_read_u32(&ptrack_map->init_lsn) << 16); PG_RETURN_LSN(init_lsn); } @@ -533,6 +546,8 @@ ptrack_get_pagemapset(PG_FUNCTION_ARGS) datapagemap_t pagemap; int64 pagecount = 0; char gather_path[MAXPGPATH]; + uint32 init_lsn = InvalidXLogRecPtr; + bool within_ptrack_map = true; /* Exit immediately if there is no map */ if (ptrack_map == NULL) @@ -541,13 +556,14 @@ ptrack_get_pagemapset(PG_FUNCTION_ARGS) if (SRF_IS_FIRSTCALL()) { TupleDesc tupdesc; + XLogRecPtr lsn = PG_GETARG_LSN(0); funcctx = SRF_FIRSTCALL_INIT(); oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); ctx = (PtScanCtx *) palloc0(sizeof(PtScanCtx)); - ctx->lsn = PG_GETARG_LSN(0); + ctx->lsn = (uint32)(lsn >> 16); ctx->filelist = NIL; /* Make tuple descriptor */ @@ -597,8 +613,8 @@ ptrack_get_pagemapset(PG_FUNCTION_ARGS) uint64 hash; size_t slot1; size_t slot2; - XLogRecPtr update_lsn1; - XLogRecPtr update_lsn2; + uint32 update_lsn1; + uint32 update_lsn2; /* Stop traversal if there are no more segments */ if (ctx->bid.blocknum >= ctx->relsize) @@ -638,29 +654,36 @@ ptrack_get_pagemapset(PG_FUNCTION_ARGS) SRF_RETURN_DONE(funcctx); } + init_lsn = pg_atomic_read_u32(&ptrack_map->init_lsn); hash = BID_HASH_FUNC(ctx->bid); slot1 = (size_t)(hash % PtrackContentNblocks); - update_lsn1 = pg_atomic_read_u64(&ptrack_map->entries[slot1]); + update_lsn1 = pg_atomic_read_u32(&ptrack_map->entries[slot1]); if (update_lsn1 != InvalidXLogRecPtr) elog(DEBUG3, "ptrack: update_lsn1 %X/%X of blckno %u of file %s", - (uint32) (update_lsn1 >> 32), (uint32) update_lsn1, + (uint16) (update_lsn1 >> 16), (uint16) update_lsn1, ctx->bid.blocknum, ctx->relpath); + if (init_lsn != InvalidXLogRecPtr) + within_ptrack_map = lsn_diff(init_lsn, update_lsn1) <= 0; + /* Only probe the second slot if the first one is marked */ - if (update_lsn1 >= ctx->lsn) + if (within_ptrack_map && lsn_diff(ctx->lsn, update_lsn1) <= 0) { - slot2 = (size_t)(((hash << 32) | (hash >> 32)) % PtrackContentNblocks); - update_lsn2 = pg_atomic_read_u64(&ptrack_map->entries[slot2]); + slot2 = get_slot2(slot1, hash); + update_lsn2 = pg_atomic_read_u32(&ptrack_map->entries[slot2]); if (update_lsn2 != InvalidXLogRecPtr) elog(DEBUG3, "ptrack: update_lsn2 %X/%X of blckno %u of file %s", - (uint32) (update_lsn1 >> 32), (uint32) update_lsn2, + (uint16) (update_lsn1 >> 16), (uint16) update_lsn2, ctx->bid.blocknum, ctx->relpath); + if (init_lsn != InvalidXLogRecPtr) + within_ptrack_map = lsn_diff(init_lsn, update_lsn2) <= 0; + /* Block has been changed since specified LSN. Mark it in the bitmap */ - if (update_lsn2 >= ctx->lsn) + if (within_ptrack_map && lsn_diff(ctx->lsn, update_lsn2) <= 0) { pagecount += 1; datapagemap_add(&pagemap, ctx->bid.blocknum % ((BlockNumber) RELSEG_SIZE)); diff --git a/ptrack.h b/ptrack.h index e56f60b..45a8c39 100644 --- a/ptrack.h +++ b/ptrack.h @@ -47,6 +47,8 @@ #define nodeOf(ndbck) (ndbck).node #endif +#define lsn_diff(lsn1, lsn2) ((int32)(lsn1-lsn2)) +#define lsn_advance(lsn, incr) ((uint32)(lsn+incr)) /* * Structure identifying block on the disk. */ @@ -62,7 +64,7 @@ typedef struct PtBlockId */ typedef struct PtScanCtx { - XLogRecPtr lsn; + uint32 lsn; PtBlockId bid; uint32 relsize; char *relpath;