@@ -201,6 +201,8 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path,
201
201
" file_cache_hit_ratio_1h" , 0.0 );
202
202
_disk_limit_mode_metrics = std::make_shared<bvar::Status<size_t >>(
203
203
_cache_base_path.c_str (), " file_cache_disk_limit_mode" , 0 );
204
+ _need_evict_cache_in_advance_metrics = std::make_shared<bvar::Status<size_t >>(
205
+ _cache_base_path.c_str (), " file_cache_need_evict_cache_in_advance" , 0 );
204
206
205
207
_cache_lock_wait_time_us = std::make_shared<bvar::LatencyRecorder>(
206
208
_cache_base_path.c_str (), " file_cache_cache_lock_wait_time_us" );
@@ -212,6 +214,11 @@ BlockFileCache::BlockFileCache(const std::string& cache_base_path,
212
214
_cache_base_path.c_str (), " file_cache_storage_retry_sync_remove_latency_us" );
213
215
_storage_async_remove_latency_us = std::make_shared<bvar::LatencyRecorder>(
214
216
_cache_base_path.c_str (), " file_cache_storage_async_remove_latency_us" );
217
+ _evict_in_advance_latency_us = std::make_shared<bvar::LatencyRecorder>(
218
+ _cache_base_path.c_str (), " file_cache_evict_in_advance_latency_us" );
219
+
220
+ _recycle_keys_length_recorder = std::make_shared<bvar::LatencyRecorder>(
221
+ _cache_base_path.c_str (), " file_cache_recycle_keys_length" );
215
222
216
223
_disposable_queue = LRUQueue (cache_settings.disposable_queue_size ,
217
224
cache_settings.disposable_queue_elements , 60 * 60 );
@@ -339,6 +346,8 @@ Status BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>& cache_lo
339
346
_cache_background_monitor_thread = std::thread (&BlockFileCache::run_background_monitor, this );
340
347
_cache_background_ttl_gc_thread = std::thread (&BlockFileCache::run_background_ttl_gc, this );
341
348
_cache_background_gc_thread = std::thread (&BlockFileCache::run_background_gc, this );
349
+ _cache_background_evict_in_advance_thread =
350
+ std::thread (&BlockFileCache::run_background_evict_in_advance, this );
342
351
343
352
return Status::OK ();
344
353
}
@@ -1021,6 +1030,16 @@ bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const CacheContext&
1021
1030
return true ;
1022
1031
}
1023
1032
1033
+ void BlockFileCache::try_evict_in_advance (size_t size, std::lock_guard<std::mutex>& cache_lock) {
1034
+ UInt128Wrapper hash = UInt128Wrapper ();
1035
+ size_t offset = 0 ;
1036
+ CacheContext context;
1037
+ context.cache_type = FileCacheType::NORMAL;
1038
+ try_reserve_for_lru (hash, nullptr , context, offset, size, cache_lock, false );
1039
+ context.cache_type = FileCacheType::TTL;
1040
+ try_reserve_for_lru (hash, nullptr , context, offset, size, cache_lock, false );
1041
+ }
1042
+
1024
1043
bool BlockFileCache::remove_if_ttl_file_blocks (const UInt128Wrapper& file_key, bool remove_directly,
1025
1044
std::lock_guard<std::mutex>& cache_lock, bool sync) {
1026
1045
auto & ttl_queue = get_queue (FileCacheType::TTL);
@@ -1178,7 +1197,7 @@ void BlockFileCache::reset_range(const UInt128Wrapper& hash, size_t offset, size
1178
1197
1179
1198
bool BlockFileCache::try_reserve_from_other_queue_by_time_interval (
1180
1199
FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
1181
- int64_t cur_time, std::lock_guard<std::mutex>& cache_lock) {
1200
+ int64_t cur_time, std::lock_guard<std::mutex>& cache_lock, bool sync_removal ) {
1182
1201
size_t removed_size = 0 ;
1183
1202
size_t cur_cache_size = _cur_cache_size;
1184
1203
std::vector<FileBlockCell*> to_evict;
@@ -1211,7 +1230,7 @@ bool BlockFileCache::try_reserve_from_other_queue_by_time_interval(
1211
1230
}
1212
1231
*(_evict_by_time_metrics_matrix[cache_type][cur_type]) << remove_size_per_type;
1213
1232
}
1214
- remove_file_blocks (to_evict, cache_lock, true );
1233
+ remove_file_blocks (to_evict, cache_lock, sync_removal );
1215
1234
1216
1235
return !is_overflow (removed_size, size, cur_cache_size);
1217
1236
}
@@ -1229,7 +1248,7 @@ bool BlockFileCache::is_overflow(size_t removed_size, size_t need_size,
1229
1248
1230
1249
bool BlockFileCache::try_reserve_from_other_queue_by_size (
1231
1250
FileCacheType cur_type, std::vector<FileCacheType> other_cache_types, size_t size,
1232
- std::lock_guard<std::mutex>& cache_lock) {
1251
+ std::lock_guard<std::mutex>& cache_lock, bool sync_removal ) {
1233
1252
size_t removed_size = 0 ;
1234
1253
size_t cur_cache_size = _cur_cache_size;
1235
1254
std::vector<FileBlockCell*> to_evict;
@@ -1249,17 +1268,18 @@ bool BlockFileCache::try_reserve_from_other_queue_by_size(
1249
1268
cur_removed_size);
1250
1269
*(_evict_by_size_metrics_matrix[cache_type][cur_type]) << cur_removed_size;
1251
1270
}
1252
- remove_file_blocks (to_evict, cache_lock, true );
1271
+ remove_file_blocks (to_evict, cache_lock, sync_removal );
1253
1272
return !is_overflow (removed_size, size, cur_cache_size);
1254
1273
}
1255
1274
1256
1275
bool BlockFileCache::try_reserve_from_other_queue (FileCacheType cur_cache_type, size_t size,
1257
1276
int64_t cur_time,
1258
- std::lock_guard<std::mutex>& cache_lock) {
1277
+ std::lock_guard<std::mutex>& cache_lock,
1278
+ bool sync_removal) {
1259
1279
// currently, TTL cache is not considered as a candidate
1260
1280
auto other_cache_types = get_other_cache_type_without_ttl (cur_cache_type);
1261
1281
bool reserve_success = try_reserve_from_other_queue_by_time_interval (
1262
- cur_cache_type, other_cache_types, size, cur_time, cache_lock);
1282
+ cur_cache_type, other_cache_types, size, cur_time, cache_lock, sync_removal );
1263
1283
if (reserve_success || !config::file_cache_enable_evict_from_other_queue_by_size) {
1264
1284
return reserve_success;
1265
1285
}
@@ -1272,14 +1292,15 @@ bool BlockFileCache::try_reserve_from_other_queue(FileCacheType cur_cache_type,
1272
1292
if (_cur_cache_size + size > _capacity && cur_queue_size + size > cur_queue_max_size) {
1273
1293
return false ;
1274
1294
}
1275
- return try_reserve_from_other_queue_by_size (cur_cache_type, other_cache_types, size,
1276
- cache_lock );
1295
+ return try_reserve_from_other_queue_by_size (cur_cache_type, other_cache_types, size, cache_lock,
1296
+ sync_removal );
1277
1297
}
1278
1298
1279
1299
bool BlockFileCache::try_reserve_for_lru (const UInt128Wrapper& hash,
1280
1300
QueryFileCacheContextPtr query_context,
1281
1301
const CacheContext& context, size_t offset, size_t size,
1282
- std::lock_guard<std::mutex>& cache_lock) {
1302
+ std::lock_guard<std::mutex>& cache_lock,
1303
+ bool sync_removal) {
1283
1304
int64_t cur_time = std::chrono::duration_cast<std::chrono::seconds>(
1284
1305
std::chrono::steady_clock::now ().time_since_epoch ())
1285
1306
.count ();
@@ -1292,7 +1313,7 @@ bool BlockFileCache::try_reserve_for_lru(const UInt128Wrapper& hash,
1292
1313
size_t cur_removed_size = 0 ;
1293
1314
find_evict_candidates (queue, size, cur_cache_size, removed_size, to_evict, cache_lock,
1294
1315
cur_removed_size);
1295
- remove_file_blocks (to_evict, cache_lock, true );
1316
+ remove_file_blocks (to_evict, cache_lock, sync_removal );
1296
1317
*(_evict_by_self_lru_metrics_matrix[context.cache_type ]) << cur_removed_size;
1297
1318
1298
1319
if (is_overflow (removed_size, size, cur_cache_size)) {
@@ -1345,7 +1366,9 @@ void BlockFileCache::remove(FileBlockSPtr file_block, T& cache_lock, U& block_lo
1345
1366
// so there will be a window that the file is not in the cache but still in the storage
1346
1367
// but it's ok, because the rowset is stale already
1347
1368
bool ret = _recycle_keys.enqueue (key);
1348
- if (!ret) {
1369
+ if (ret) [[likely]] {
1370
+ *_recycle_keys_length_recorder << _recycle_keys.size_approx ();
1371
+ } else {
1349
1372
LOG_WARNING (" Failed to push recycle key to queue, do it synchronously" );
1350
1373
int64_t duration_ns = 0 ;
1351
1374
Status st;
@@ -1551,6 +1574,10 @@ int disk_used_percentage(const std::string& path, std::pair<int, int>* percent)
1551
1574
int inode_percentage = int (inode_free * 1.0 / inode_total * 100 );
1552
1575
percent->first = capacity_percentage;
1553
1576
percent->second = 100 - inode_percentage;
1577
+
1578
+ // Add sync point for testing
1579
+ TEST_SYNC_POINT_CALLBACK (" BlockFileCache::disk_used_percentage:1" , percent);
1580
+
1554
1581
return 0 ;
1555
1582
}
1556
1583
@@ -1643,7 +1670,7 @@ void BlockFileCache::check_disk_resource_limit() {
1643
1670
LOG_WARNING (" config error, set to default value" )
1644
1671
.tag (" enter" , config::file_cache_enter_disk_resource_limit_mode_percent)
1645
1672
.tag (" exit" , config::file_cache_exit_disk_resource_limit_mode_percent);
1646
- config::file_cache_enter_disk_resource_limit_mode_percent = 90 ;
1673
+ config::file_cache_enter_disk_resource_limit_mode_percent = 88 ;
1647
1674
config::file_cache_exit_disk_resource_limit_mode_percent = 80 ;
1648
1675
}
1649
1676
if (is_insufficient (space_percentage) || is_insufficient (inode_percentage)) {
@@ -1664,11 +1691,69 @@ void BlockFileCache::check_disk_resource_limit() {
1664
1691
}
1665
1692
}
1666
1693
1694
+ void BlockFileCache::check_need_evict_cache_in_advance () {
1695
+ if (_storage->get_type () != FileCacheStorageType::DISK) {
1696
+ return ;
1697
+ }
1698
+
1699
+ std::pair<int , int > percent;
1700
+ int ret = disk_used_percentage (_cache_base_path, &percent);
1701
+ if (ret != 0 ) {
1702
+ LOG_ERROR (" " ).tag (" file cache path" , _cache_base_path).tag (" error" , strerror (errno));
1703
+ return ;
1704
+ }
1705
+ auto [space_percentage, inode_percentage] = percent;
1706
+ size_t size_percentage = static_cast <size_t >(
1707
+ (static_cast <double >(_cur_cache_size) / static_cast <double >(_capacity)) * 100 );
1708
+ auto is_insufficient = [](const int & percentage) {
1709
+ return percentage >= config::file_cache_enter_need_evict_cache_in_advance_percent;
1710
+ };
1711
+ DCHECK_GE (space_percentage, 0 );
1712
+ DCHECK_LE (space_percentage, 100 );
1713
+ DCHECK_GE (inode_percentage, 0 );
1714
+ DCHECK_LE (inode_percentage, 100 );
1715
+ // ATTN: due to that can be changed dynamically, set it to default value if it's invalid
1716
+ // FIXME: reject with config validator
1717
+ if (config::file_cache_enter_need_evict_cache_in_advance_percent <=
1718
+ config::file_cache_exit_need_evict_cache_in_advance_percent) {
1719
+ LOG_WARNING (" config error, set to default value" )
1720
+ .tag (" enter" , config::file_cache_enter_need_evict_cache_in_advance_percent)
1721
+ .tag (" exit" , config::file_cache_exit_need_evict_cache_in_advance_percent);
1722
+ config::file_cache_enter_need_evict_cache_in_advance_percent = 78 ;
1723
+ config::file_cache_exit_need_evict_cache_in_advance_percent = 75 ;
1724
+ }
1725
+ if (is_insufficient (space_percentage) || is_insufficient (inode_percentage) ||
1726
+ is_insufficient (size_percentage)) {
1727
+ _need_evict_cache_in_advance = true ;
1728
+ _need_evict_cache_in_advance_metrics->set_value (1 );
1729
+ } else if (_need_evict_cache_in_advance &&
1730
+ (space_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
1731
+ (inode_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent) &&
1732
+ (size_percentage < config::file_cache_exit_need_evict_cache_in_advance_percent)) {
1733
+ _need_evict_cache_in_advance = false ;
1734
+ _need_evict_cache_in_advance_metrics->set_value (0 );
1735
+ }
1736
+ if (_need_evict_cache_in_advance) {
1737
+ LOG (WARNING) << " file_cache=" << get_base_path () << " space_percent=" << space_percentage
1738
+ << " inode_percent=" << inode_percentage << " size_percent=" << size_percentage
1739
+ << " is_space_insufficient=" << is_insufficient (space_percentage)
1740
+ << " is_inode_insufficient=" << is_insufficient (inode_percentage)
1741
+ << " is_size_insufficient=" << is_insufficient (size_percentage)
1742
+ << " need evict cache in advance" ;
1743
+ }
1744
+ }
1745
+
1667
1746
void BlockFileCache::run_background_monitor () {
1668
1747
int64_t interval_time_seconds = 20 ;
1669
1748
while (!_close) {
1670
1749
TEST_SYNC_POINT_CALLBACK (" BlockFileCache::set_sleep_time" , &interval_time_seconds);
1671
1750
check_disk_resource_limit ();
1751
+ if (config::enable_evict_file_cache_in_advance) {
1752
+ check_need_evict_cache_in_advance ();
1753
+ } else {
1754
+ _need_evict_cache_in_advance = false ;
1755
+ }
1756
+
1672
1757
{
1673
1758
std::unique_lock close_lock (_close_mtx);
1674
1759
_close_cv.wait_for (close_lock, std::chrono::seconds (interval_time_seconds));
@@ -1753,11 +1838,8 @@ void BlockFileCache::run_background_gc() {
1753
1838
break ;
1754
1839
}
1755
1840
}
1756
- while (_recycle_keys.try_dequeue (key)) {
1757
- if (batch_count >= batch_limit) {
1758
- break ;
1759
- }
1760
1841
1842
+ while (batch_count < batch_limit && _recycle_keys.try_dequeue (key)) {
1761
1843
int64_t duration_ns = 0 ;
1762
1844
Status st;
1763
1845
{
@@ -1771,10 +1853,42 @@ void BlockFileCache::run_background_gc() {
1771
1853
}
1772
1854
batch_count++;
1773
1855
}
1856
+ *_recycle_keys_length_recorder << _recycle_keys.size_approx ();
1774
1857
batch_count = 0 ;
1775
1858
}
1776
1859
}
1777
1860
1861
+ void BlockFileCache::run_background_evict_in_advance () {
1862
+ LOG (INFO) << " Starting background evict in advance thread" ;
1863
+ int64_t batch = 0 ;
1864
+ while (!_close) {
1865
+ {
1866
+ std::unique_lock close_lock (_close_mtx);
1867
+ _close_cv.wait_for (
1868
+ close_lock,
1869
+ std::chrono::milliseconds (config::file_cache_evict_in_advance_interval_ms));
1870
+ if (_close) {
1871
+ LOG (INFO) << " Background evict in advance thread exiting due to cache closing" ;
1872
+ break ;
1873
+ }
1874
+ }
1875
+ batch = config::file_cache_evict_in_advance_batch_bytes;
1876
+
1877
+ // Skip if eviction not needed or too many pending recycles
1878
+ if (!_need_evict_cache_in_advance || _recycle_keys.size_approx () >= (batch * 10 )) {
1879
+ continue ;
1880
+ }
1881
+
1882
+ int64_t duration_ns = 0 ;
1883
+ {
1884
+ SCOPED_CACHE_LOCK (_mutex, this );
1885
+ SCOPED_RAW_TIMER (&duration_ns);
1886
+ try_evict_in_advance (batch, cache_lock);
1887
+ }
1888
+ *_evict_in_advance_latency_us << (duration_ns / 1000 );
1889
+ }
1890
+ }
1891
+
1778
1892
void BlockFileCache::modify_expiration_time (const UInt128Wrapper& hash,
1779
1893
uint64_t new_expiration_time) {
1780
1894
SCOPED_CACHE_LOCK (_mutex, this );
0 commit comments