XRootD
Loading...
Searching...
No Matches
XrdPfcFile.cc
Go to the documentation of this file.
1//----------------------------------------------------------------------------------
2// Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3// Author: Alja Mrak-Tadel, Matevz Tadel
4//----------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//----------------------------------------------------------------------------------
18
19
20#include "XrdPfcFile.hh"
21#include "XrdPfcIO.hh"
22#include "XrdPfcTrace.hh"
23#include <cstdio>
24#include <sstream>
25#include <fcntl.h>
26#include <assert.h>
27#include "XrdCl/XrdClLog.hh"
29#include "XrdCl/XrdClFile.hh"
31#include "XrdSys/XrdSysTimer.hh"
32#include "XrdOss/XrdOss.hh"
33#include "XrdOuc/XrdOucEnv.hh"
35#include "XrdPfc.hh"
36
37
38using namespace XrdPfc;
39
40namespace
41{
42
43const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
44
45Cache* cache() { return &Cache::GetInstance(); }
46
47}
48
49const char *File::m_traceID = "File";
50
51//------------------------------------------------------------------------------
52
53File::File(const std::string& path, long long iOffset, long long iFileSize) :
54 m_ref_cnt(0),
55 m_data_file(0),
56 m_info_file(0),
57 m_cfi(Cache::GetInstance().GetTrace(), Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks > 0),
58 m_filename(path),
59 m_offset(iOffset),
60 m_file_size(iFileSize),
61 m_current_io(m_io_set.end()),
62 m_ios_in_detach(0),
63 m_non_flushed_cnt(0),
64 m_in_sync(false),
65 m_detach_time_logged(false),
66 m_in_shutdown(false),
67 m_state_cond(0),
68 m_block_size(0),
69 m_num_blocks(0),
70 m_prefetch_state(kOff),
71 m_prefetch_read_cnt(0),
72 m_prefetch_hit_cnt(0),
73 m_prefetch_score(0)
74{}
75
76File::~File()
77{
78 if (m_info_file)
79 {
80 TRACEF(Debug, "~File() close info ");
81 m_info_file->Close();
82 delete m_info_file;
83 m_info_file = NULL;
84 }
85
86 if (m_data_file)
87 {
88 TRACEF(Debug, "~File() close output ");
89 m_data_file->Close();
90 delete m_data_file;
91 m_data_file = NULL;
92 }
93
94 TRACEF(Debug, "~File() ended, prefetch score = " << m_prefetch_score);
95}
96
97//------------------------------------------------------------------------------
98
99File* File::FileOpen(const std::string &path, long long offset, long long fileSize)
100{
101 File *file = new File(path, offset, fileSize);
102 if ( ! file->Open())
103 {
104 delete file;
105 file = 0;
106 }
107 return file;
108}
109
110//------------------------------------------------------------------------------
111
113{
114 // Called from Cache::Unlink() when the file is currently open.
115 // Cache::Unlink is also called on FSync error and when wrong number of bytes
116 // is received from a remote read.
117 //
118 // From this point onward the file will not be written to, cinfo file will
119 // not be updated, and all new read requests will return -ENOENT.
120 //
121 // File's entry in the Cache's active map is set to nullptr and will be
122 // removed from there shortly, in any case, well before this File object
123 // shuts down. So we do not communicate to Cache about our destruction when
124 // it happens.
125
126 {
127 XrdSysCondVarHelper _lck(m_state_cond);
128
129 m_in_shutdown = true;
130
131 if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
132 {
133 m_prefetch_state = kStopped;
134 cache()->DeRegisterPrefetchFile(this);
135 }
136 }
137
138}
139
140//------------------------------------------------------------------------------
141
143{
144 // Not locked, only used from Cache / Purge thread.
145
146 Stats delta = m_last_stats;
147
148 m_last_stats = m_stats.Clone();
149
150 delta.DeltaToReference(m_last_stats);
151
152 return delta;
153}
154
155//------------------------------------------------------------------------------
156
158{
159 TRACEF(Dump, "BlockRemovedFromWriteQ() block = " << (void*) b << " idx= " << b->m_offset/m_block_size);
160
161 XrdSysCondVarHelper _lck(m_state_cond);
162 dec_ref_count(b);
163}
164
165void File::BlocksRemovedFromWriteQ(std::list<Block*>& blocks)
166{
167 TRACEF(Dump, "BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
168
169 XrdSysCondVarHelper _lck(m_state_cond);
170
171 for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
172 {
173 dec_ref_count(*i);
174 }
175}
176
177//------------------------------------------------------------------------------
178
180{
181 std::string loc(io->GetLocation());
182 XrdSysCondVarHelper _lck(m_state_cond);
183 insert_remote_location(loc);
184}
185
186//------------------------------------------------------------------------------
187
189{
190 // Returns true if delay is needed.
191
192 TRACEF(Debug, "ioActive start for io " << io);
193
194 std::string loc(io->GetLocation());
195
196 {
197 XrdSysCondVarHelper _lck(m_state_cond);
198
199 IoSet_i mi = m_io_set.find(io);
200
201 if (mi != m_io_set.end())
202 {
203 unsigned int n_active_reads = io->m_active_read_reqs;
204
205 TRACE(Info, "ioActive for io " << io <<
206 ", active_reads " << n_active_reads <<
207 ", active_prefetches " << io->m_active_prefetches <<
208 ", allow_prefetching " << io->m_allow_prefetching <<
209 ", ios_in_detach " << m_ios_in_detach);
210 TRACEF(Info,
211 "\tio_map.size() " << m_io_set.size() <<
212 ", block_map.size() " << m_block_map.size() << ", file");
213
214 insert_remote_location(loc);
215
216 io->m_allow_prefetching = false;
217 io->m_in_detach = true;
218
219 // Check if any IO is still available for prfetching. If not, stop it.
220 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
221 {
222 if ( ! select_current_io_or_disable_prefetching(false) )
223 {
224 TRACEF(Debug, "ioActive stopping prefetching after io " << io << " retreat.");
225 }
226 }
227
228 // On last IO, consider write queue blocks. Note, this also contains
229 // blocks being prefetched.
230
231 bool io_active_result;
232
233 if (n_active_reads > 0)
234 {
235 io_active_result = true;
236 }
237 else if (m_io_set.size() - m_ios_in_detach == 1)
238 {
239 io_active_result = ! m_block_map.empty();
240 }
241 else
242 {
243 io_active_result = io->m_active_prefetches > 0;
244 }
245
246 if ( ! io_active_result)
247 {
248 ++m_ios_in_detach;
249 }
250
251 TRACEF(Info, "ioActive for io " << io << " returning " << io_active_result << ", file");
252
253 return io_active_result;
254 }
255 else
256 {
257 TRACEF(Error, "ioActive io " << io <<" not found in IoSet. This should not happen.");
258 return false;
259 }
260 }
261}
262
263//------------------------------------------------------------------------------
264
266{
267 XrdSysCondVarHelper _lck(m_state_cond);
268 m_detach_time_logged = false;
269}
270
272{
273 // Returns true if sync is required.
274 // This method is called after corresponding IO is detached from PosixCache.
275
276 XrdSysCondVarHelper _lck(m_state_cond);
277 if ( ! m_in_shutdown)
278 {
279 if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
280 {
281 Stats loc_stats = m_stats.Clone();
282 m_cfi.WriteIOStatDetach(loc_stats);
283 m_detach_time_logged = true;
284 m_in_sync = true;
285 TRACEF(Debug, "FinalizeSyncBeforeExit requesting sync to write detach stats");
286 return true;
287 }
288 }
289 TRACEF(Debug, "FinalizeSyncBeforeExit sync not required");
290 return false;
291}
292
293//------------------------------------------------------------------------------
294
296{
297 // Called from Cache::GetFile() when a new IO asks for the file.
298
299 TRACEF(Debug, "AddIO() io = " << (void*)io);
300
301 time_t now = time(0);
302 std::string loc(io->GetLocation());
303
304 m_state_cond.Lock();
305
306 IoSet_i mi = m_io_set.find(io);
307
308 if (mi == m_io_set.end())
309 {
310 m_io_set.insert(io);
311 io->m_attach_time = now;
312 m_stats.IoAttach();
313
314 insert_remote_location(loc);
315
316 if (m_prefetch_state == kStopped)
317 {
318 m_prefetch_state = kOn;
319 cache()->RegisterPrefetchFile(this);
320 }
321 }
322 else
323 {
324 TRACEF(Error, "AddIO() io = " << (void*)io << " already registered.");
325 }
326
327 m_state_cond.UnLock();
328}
329
330//------------------------------------------------------------------------------
331
333{
334 // Called from Cache::ReleaseFile.
335
336 TRACEF(Debug, "RemoveIO() io = " << (void*)io);
337
338 time_t now = time(0);
339
340 m_state_cond.Lock();
341
342 IoSet_i mi = m_io_set.find(io);
343
344 if (mi != m_io_set.end())
345 {
346 if (mi == m_current_io)
347 {
348 ++m_current_io;
349 }
350
351 m_stats.IoDetach(now - io->m_attach_time);
352 m_io_set.erase(mi);
353 --m_ios_in_detach;
354
355 if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
356 {
357 TRACEF(Error, "RemoveIO() io = " << (void*)io << " Prefetching is not stopped/complete -- it should be by now.");
358 m_prefetch_state = kStopped;
359 cache()->DeRegisterPrefetchFile(this);
360 }
361 }
362 else
363 {
364 TRACEF(Error, "RemoveIO() io = " << (void*)io << " is NOT registered.");
365 }
366
367 m_state_cond.UnLock();
368}
369
370//------------------------------------------------------------------------------
371
372bool File::Open()
373{
374 // Sets errno accordingly.
375
376 static const char *tpfx = "Open() ";
377
378 TRACEF(Dump, tpfx << "open file for disk cache");
379
381
382 XrdOss &myOss = * Cache::GetInstance().GetOss();
383 const char *myUser = conf.m_username.c_str();
384 XrdOucEnv myEnv;
385 struct stat data_stat, info_stat;
386
387 std::string ifn = m_filename + Info::s_infoExtension;
388
389 bool data_existed = (myOss.Stat(m_filename.c_str(), &data_stat) == XrdOssOK);
390 bool info_existed = (myOss.Stat(ifn.c_str(), &info_stat) == XrdOssOK);
391
392 // Create the data file itself.
393 char size_str[32]; sprintf(size_str, "%lld", m_file_size);
394 myEnv.Put("oss.asize", size_str);
395 myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
396
397 int res;
398
399 if ((res = myOss.Create(myUser, m_filename.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
400 {
401 TRACEF(Error, tpfx << "Create failed " << ERRNO_AND_ERRSTR(-res));
402 errno = -res;
403 return false;
404 }
405
406 m_data_file = myOss.newFile(myUser);
407 if ((res = m_data_file->Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
408 {
409 TRACEF(Error, tpfx << "Open failed " << ERRNO_AND_ERRSTR(-res));
410 errno = -res;
411 delete m_data_file; m_data_file = 0;
412 return false;
413 }
414
415 myEnv.Put("oss.asize", "64k"); // TODO: Calculate? Get it from configuration? Do not know length of access lists ...
416 myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
417 if ((res = myOss.Create(myUser, ifn.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
418 {
419 TRACE(Error, tpfx << "Create failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
420 errno = -res;
421 m_data_file->Close(); delete m_data_file; m_data_file = 0;
422 return false;
423 }
424
425 m_info_file = myOss.newFile(myUser);
426 if ((res = m_info_file->Open(ifn.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
427 {
428 TRACEF(Error, tpfx << "Failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
429 errno = -res;
430 delete m_info_file; m_info_file = 0;
431 m_data_file->Close(); delete m_data_file; m_data_file = 0;
432 return false;
433 }
434
435 bool initialize_info_file = true;
436
437 if (info_existed && m_cfi.Read(m_info_file, ifn.c_str()))
438 {
439 TRACEF(Debug, tpfx << "Reading existing info file. (data_existed=" << data_existed <<
440 ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
441 ", data_size_from_last_block=" << m_cfi.GetExpectedDataFileSize() << ")");
442
443 // Check if data file exists and is of reasonable size.
444 if (data_existed && data_stat.st_size >= m_cfi.GetExpectedDataFileSize())
445 {
446 initialize_info_file = false;
447 } else {
448 TRACEF(Warning, tpfx << "Basic sanity checks on data file failed, resetting info file, truncating data file.");
449 m_cfi.ResetAllAccessStats();
450 m_data_file->Ftruncate(0);
451 }
452 }
453
454 if ( ! initialize_info_file && m_cfi.GetCkSumState() != conf.get_cs_Chk())
455 {
457 conf.should_uvkeep_purge(time(0) - m_cfi.GetNoCkSumTimeForUVKeep()))
458 {
459 TRACEF(Info, tpfx << "Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
460 initialize_info_file = true;
461 m_cfi.ResetAllAccessStats();
462 m_data_file->Ftruncate(0);
463 } else {
464 // TODO: If the file is complete, we don't need to reset net cksums.
465 m_cfi.DowngradeCkSumState(conf.get_cs_Chk());
466 }
467 }
468
469 if (initialize_info_file)
470 {
471 m_cfi.SetBufferSizeFileSizeAndCreationTime(conf.m_bufferSize, m_file_size);
472 m_cfi.SetCkSumState(conf.get_cs_Chk());
473 m_cfi.ResetNoCkSumTime();
474 m_cfi.Write(m_info_file, ifn.c_str());
475 m_info_file->Fsync();
476 TRACEF(Debug, tpfx << "Creating new file info, data size = " << m_file_size << " num blocks = " << m_cfi.GetNBlocks());
477 }
478
479 m_cfi.WriteIOStatAttach();
480 m_state_cond.Lock();
481 m_block_size = m_cfi.GetBufferSize();
482 m_num_blocks = m_cfi.GetNBlocks();
483 m_prefetch_state = (m_cfi.IsComplete()) ? kComplete : kStopped; // Will engage in AddIO().
484 m_state_cond.UnLock();
485
486 return true;
487}
488
489
490//==============================================================================
491// Read and helpers
492//==============================================================================
493
494bool File::overlap(int blk, // block to query
495 long long blk_size, //
496 long long req_off, // offset of user request
497 int req_size, // size of user request
498 // output:
499 long long &off, // offset in user buffer
500 long long &blk_off, // offset in block
501 int &size) // size to copy
502{
503 const long long beg = blk * blk_size;
504 const long long end = beg + blk_size;
505 const long long req_end = req_off + req_size;
506
507 if (req_off < end && req_end > beg)
508 {
509 const long long ovlp_beg = std::max(beg, req_off);
510 const long long ovlp_end = std::min(end, req_end);
511
512 off = ovlp_beg - req_off;
513 blk_off = ovlp_beg - beg;
514 size = (int) (ovlp_end - ovlp_beg);
515
516 assert(size <= blk_size);
517 return true;
518 }
519 else
520 {
521 return false;
522 }
523}
524
525//------------------------------------------------------------------------------
526
527Block* File::PrepareBlockRequest(int i, IO *io, void *req_id, bool prefetch)
528{
529 // Must be called w/ state_cond locked.
530 // Checks on size etc should be done before.
531 //
532 // Reference count is 0 so increase it in calling function if you want to
533 // catch the block while still in memory.
534
535 const long long off = i * m_block_size;
536 const int last_block = m_num_blocks - 1;
537 const bool cs_net = cache()->RefConfiguration().is_cschk_net();
538
539 int blk_size, req_size;
540 if (i == last_block) {
541 blk_size = req_size = m_file_size - off;
542 if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
543 } else {
544 blk_size = req_size = m_block_size;
545 }
546
547 Block *b = 0;
548 char *buf = cache()->RequestRAM(req_size);
549
550 if (buf)
551 {
552 b = new (std::nothrow) Block(this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
553
554 if (b)
555 {
556 m_block_map[i] = b;
557
558 // Actual Read request is issued in ProcessBlockRequests().
559
560 if (m_prefetch_state == kOn && (int) m_block_map.size() >= Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks)
561 {
562 m_prefetch_state = kHold;
563 cache()->DeRegisterPrefetchFile(this);
564 }
565 }
566 else
567 {
568 TRACEF(Dump, "PrepareBlockRequest() " << i << " prefetch " << prefetch << ", allocation failed.");
569 }
570 }
571
572 return b;
573}
574
575void File::ProcessBlockRequest(Block *b)
576{
577 // This *must not* be called with block_map locked.
578
580
581 if (XRD_TRACE What >= TRACE_Dump) {
582 char buf[256];
583 snprintf(buf, 256, "idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
584 b->get_offset()/m_block_size, b, b->m_prefetch, b->get_offset(), b->get_req_size(), b->get_buff(), brh);
585 TRACEF(Dump, "ProcessBlockRequest() " << buf);
586 }
587
588 if (b->req_cksum_net())
589 {
590 b->get_io()->GetInput()->pgRead(*brh, b->get_buff(), b->get_offset(), b->get_req_size(),
591 b->ref_cksum_vec(), 0, b->ptr_n_cksum_errors());
592 } else {
593 b->get_io()->GetInput()-> Read(*brh, b->get_buff(), b->get_offset(), b->get_size());
594 }
595}
596
597void File::ProcessBlockRequests(BlockList_t& blks)
598{
599 // This *must not* be called with block_map locked.
600
601 for (BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
602 {
603 ProcessBlockRequest(*bi);
604 }
605}
606
607//------------------------------------------------------------------------------
608
609void File::RequestBlocksDirect(IO *io, DirectResponseHandler *handler, std::vector<XrdOucIOVec>& ioVec, int expected_size)
610{
611 TRACEF(DumpXL, "RequestBlocksDirect() issuing ReadV for n_chunks = " << (int) ioVec.size() << ", total_size = " << expected_size);
612
613 io->GetInput()->ReadV( *handler, ioVec.data(), (int) ioVec.size());
614}
615
616//------------------------------------------------------------------------------
617
618int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec, int expected_size)
619{
620 TRACEF(DumpXL, "ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (int) ioVec.size() << ", total_size = " << expected_size);
621
622 long long rs = m_data_file->ReadV(ioVec.data(), (int) ioVec.size());
623
624 if (rs < 0)
625 {
626 TRACEF(Error, "ReadBlocksFromDisk neg retval = " << rs);
627 return rs;
628 }
629
630 if (rs != expected_size)
631 {
632 TRACEF(Error, "ReadBlocksFromDisk incomplete size = " << rs);
633 return -EIO;
634 }
635
636 return (int) rs;
637}
638
639//------------------------------------------------------------------------------
640
641int File::Read(IO *io, char* iUserBuff, long long iUserOff, int iUserSize, ReadReqRH *rh)
642{
643 // rrc_func is ONLY called from async processing.
644 // If this function returns anything other than -EWOULDBLOCK, rrc_func needs to be called by the caller.
645 // This streamlines implementation of synchronous IO::Read().
646
647 TRACEF(Dump, "Read() sid: " << Xrd::hex1 << rh->m_seq_id << " size: " << iUserSize);
648
649 m_state_cond.Lock();
650
651 if (m_in_shutdown || io->m_in_detach)
652 {
653 m_state_cond.UnLock();
654 return m_in_shutdown ? -ENOENT : -EBADF;
655 }
656
657 // Shortcut -- file is fully downloaded.
658
659 if (m_cfi.IsComplete())
660 {
661 m_state_cond.UnLock();
662 int ret = m_data_file->Read(iUserBuff, iUserOff, iUserSize);
663 if (ret > 0) m_stats.AddBytesHit(ret);
664 return ret;
665 }
666
667 XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
668
669 return ReadOpusCoalescere(io, &readV, 1, rh, "Read() ");
670}
671
672//------------------------------------------------------------------------------
673
674int File::ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
675{
676 TRACEF(Dump, "ReadV() for " << readVnum << " chunks.");
677
678 m_state_cond.Lock();
679
680 if (m_in_shutdown || io->m_in_detach)
681 {
682 m_state_cond.UnLock();
683 return m_in_shutdown ? -ENOENT : -EBADF;
684 }
685
686 // Shortcut -- file is fully downloaded.
687
688 if (m_cfi.IsComplete())
689 {
690 m_state_cond.UnLock();
691 int ret = m_data_file->ReadV(const_cast<XrdOucIOVec*>(readV), readVnum);
692 if (ret > 0) m_stats.AddBytesHit(ret);
693 return ret;
694 }
695
696 return ReadOpusCoalescere(io, readV, readVnum, rh, "ReadV() ");
697}
698
699//------------------------------------------------------------------------------
700
701int File::ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum,
702 ReadReqRH *rh, const char *tpfx)
703{
704 // Non-trivial processing for Read and ReadV.
705 // Entered under lock.
706 //
707 // loop over reqired blocks:
708 // - if on disk, ok;
709 // - if in ram or incoming, inc ref-count
710 // - otherwise request and inc ref count (unless RAM full => request direct)
711 // unlock
712
713 int prefetch_cnt = 0;
714
715 ReadRequest *read_req = nullptr;
716 BlockList_t blks_to_request; // blocks we are issuing a new remote request for
717
718 std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
719
720 std::vector<XrdOucIOVec> iovec_disk;
721 std::vector<XrdOucIOVec> iovec_direct;
722 int iovec_disk_total = 0;
723 int iovec_direct_total = 0;
724
725 for (int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
726 {
727 const XrdOucIOVec &iov = readV[iov_idx];
728 long long iUserOff = iov.offset;
729 int iUserSize = iov.size;
730 char *iUserBuff = iov.data;
731
732 const int idx_first = iUserOff / m_block_size;
733 const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
734
735 TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx_first: " << idx_first << " idx_last: " << idx_last);
736
737 enum LastBlock_e { LB_other, LB_disk, LB_direct };
738
739 LastBlock_e lbe = LB_other;
740
741 for (int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
742 {
743 TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx: " << block_idx);
744 BlockMap_i bi = m_block_map.find(block_idx);
745
746 // overlap and read
747 long long off; // offset in user buffer
748 long long blk_off; // offset in block
749 int size; // size to copy
750
751 overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
752
753 // In RAM or incoming?
754 if (bi != m_block_map.end())
755 {
756 inc_ref_count(bi->second);
757 TRACEF(Dump, tpfx << (void*) iUserBuff << " inc_ref_count for existing block " << bi->second << " idx = " << block_idx);
758
759 if (bi->second->is_finished())
760 {
761 // note, blocks with error should not be here !!!
762 // they should be either removed or reissued in ProcessBlockResponse()
763 assert(bi->second->is_ok());
764
765 blks_ready[bi->second].emplace_back( ChunkRequest(nullptr, iUserBuff + off, blk_off, size) );
766
767 if (bi->second->m_prefetch)
768 ++prefetch_cnt;
769 }
770 else
771 {
772 if ( ! read_req)
773 read_req = new ReadRequest(io, rh);
774
775 // We have a lock on state_cond --> as we register the request before releasing the lock,
776 // we are sure to get a call-in via the ChunkRequest handling when this block arrives.
777
778 bi->second->m_chunk_reqs.emplace_back( ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
779 ++read_req->m_n_chunk_reqs;
780 }
781
782 lbe = LB_other;
783 }
784 // On disk?
785 else if (m_cfi.TestBitWritten(offsetIdx(block_idx)))
786 {
787 TRACEF(DumpXL, tpfx << "read from disk " << (void*)iUserBuff << " idx = " << block_idx);
788
789 if (lbe == LB_disk)
790 iovec_disk.back().size += size;
791 else
792 iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
793 iovec_disk_total += size;
794
795 if (m_cfi.TestBitPrefetch(offsetIdx(block_idx)))
796 ++prefetch_cnt;
797
798 lbe = LB_disk;
799 }
800 // Neither ... then we have to go get it ...
801 else
802 {
803 if ( ! read_req)
804 read_req = new ReadRequest(io, rh);
805
806 // Is there room for one more RAM Block?
807 Block *b = PrepareBlockRequest(block_idx, io, read_req, false);
808 if (b)
809 {
810 TRACEF(Dump, tpfx << "inc_ref_count new " << (void*)iUserBuff << " idx = " << block_idx);
811 inc_ref_count(b);
812 blks_to_request.push_back(b);
813
814 b->m_chunk_reqs.emplace_back(ChunkRequest(read_req, iUserBuff + off, blk_off, size));
815 ++read_req->m_n_chunk_reqs;
816
817 lbe = LB_other;
818 }
819 else // Nope ... read this directly without caching.
820 {
821 TRACEF(DumpXL, tpfx << "direct block " << block_idx << ", blk_off " << blk_off << ", size " << size);
822
823 if (lbe == LB_direct)
824 iovec_direct.back().size += size;
825 else
826 iovec_direct.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
827 iovec_direct_total += size;
828 read_req->m_direct_done = false;
829
830 lbe = LB_direct;
831 }
832 }
833 } // end for over blocks in an IOVec
834 } // end for over readV IOVec
835
836 inc_prefetch_hit_cnt(prefetch_cnt);
837
838 m_state_cond.UnLock();
839
840 // First, send out remote requests for new blocks.
841 if ( ! blks_to_request.empty())
842 {
843 ProcessBlockRequests(blks_to_request);
844 blks_to_request.clear();
845 }
846
847 // Second, send out remote direct read requests.
848 if ( ! iovec_direct.empty())
849 {
850 DirectResponseHandler *direct_handler = new DirectResponseHandler(this, read_req, 1);
851 RequestBlocksDirect(io, direct_handler, iovec_direct, iovec_direct_total);
852
853 TRACEF(Dump, tpfx << "direct read requests sent out, n_chunks = " << (int) iovec_direct.size() << ", total_size = " << iovec_direct_total);
854 }
855
856 // Begin synchronous part where we process data that is already in RAM or on disk.
857
858 long long bytes_read = 0;
859 int error_cond = 0; // to be set to -errno
860
861 // Third, process blocks that are available in RAM.
862 if ( ! blks_ready.empty())
863 {
864 for (auto &bvi : blks_ready)
865 {
866 for (auto &cr : bvi.second)
867 {
868 TRACEF(DumpXL, tpfx << "ub=" << (void*)cr.m_buf << " from pre-finished block " << bvi.first->m_offset/m_block_size << " size " << cr.m_size);
869 memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
870 bytes_read += cr.m_size;
871 }
872 }
873 }
874
875 // Fourth, read blocks from disk.
876 if ( ! iovec_disk.empty())
877 {
878 int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
879 TRACEF(DumpXL, tpfx << "from disk finished size = " << rc);
880 if (rc >= 0)
881 {
882 bytes_read += rc;
883 }
884 else
885 {
886 error_cond = rc;
887 TRACEF(Error, tpfx << "failed read from disk");
888 }
889 }
890
891 // End synchronous part -- update with sync stats and determine actual state of this read.
892 // Note: remote reads might have already finished during disk-read!
893
894 m_state_cond.Lock();
895
896 for (auto &bvi : blks_ready)
897 dec_ref_count(bvi.first, (int) bvi.second.size());
898
899 if (read_req)
900 {
901 read_req->m_bytes_read += bytes_read;
902 read_req->update_error_cond(error_cond);
903 read_req->m_stats.m_BytesHit += bytes_read;
904 read_req->m_sync_done = true;
905
906 if (read_req->is_complete())
907 {
908 // Almost like FinalizeReadRequest(read_req) -- but no callout!
909 m_state_cond.UnLock();
910
911 m_stats.AddReadStats(read_req->m_stats);
912
913 int ret = read_req->return_value();
914 delete read_req;
915 return ret;
916 }
917 else
918 {
919 m_state_cond.UnLock();
920 return -EWOULDBLOCK;
921 }
922 }
923 else
924 {
925 m_stats.m_BytesHit += bytes_read;
926 m_state_cond.UnLock();
927
928 // !!! No callout.
929
930 return error_cond ? error_cond : bytes_read;
931 }
932}
933
934
935//==============================================================================
936// WriteBlock and Sync
937//==============================================================================
938
940{
941 // write block buffer into disk file
942 long long offset = b->m_offset - m_offset;
943 long long size = b->get_size();
944 ssize_t retval;
945
946 if (m_cfi.IsCkSumCache())
947 if (b->has_cksums())
948 retval = m_data_file->pgWrite(b->get_buff(), offset, size, b->ref_cksum_vec().data(), 0);
949 else
950 retval = m_data_file->pgWrite(b->get_buff(), offset, size, 0, 0);
951 else
952 retval = m_data_file->Write(b->get_buff(), offset, size);
953
954 if (retval < size)
955 {
956 if (retval < 0)
957 {
958 GetLog()->Emsg("WriteToDisk()", -retval, "write block to disk", GetLocalPath().c_str());
959 }
960 else
961 {
962 TRACEF(Error, "WriteToDisk() incomplete block write ret=" << retval << " (should be " << size << ")");
963 }
964
965 XrdSysCondVarHelper _lck(m_state_cond);
966
967 dec_ref_count(b);
968
969 return;
970 }
971
972 const int blk_idx = (b->m_offset - m_offset) / m_block_size;
973
974 // Set written bit.
975 TRACEF(Dump, "WriteToDisk() success set bit for block " << b->m_offset << " size=" << size);
976
977 bool schedule_sync = false;
978 {
979 XrdSysCondVarHelper _lck(m_state_cond);
980
981 m_cfi.SetBitWritten(blk_idx);
982
983 if (b->m_prefetch)
984 {
985 m_cfi.SetBitPrefetch(blk_idx);
986 }
987 if (b->req_cksum_net() && ! b->has_cksums() && m_cfi.IsCkSumNet())
988 {
989 m_cfi.ResetCkSumNet();
990 }
991
992 dec_ref_count(b);
993
994 // Set synced bit or stash block index if in actual sync.
995 // Synced state is only written out to cinfo file when data file is synced.
996 if (m_in_sync)
997 {
998 m_writes_during_sync.push_back(blk_idx);
999 }
1000 else
1001 {
1002 m_cfi.SetBitSynced(blk_idx);
1003 ++m_non_flushed_cnt;
1004 if ((m_cfi.IsComplete() || m_non_flushed_cnt >= Cache::GetInstance().RefConfiguration().m_flushCnt) &&
1005 ! m_in_shutdown)
1006 {
1007 schedule_sync = true;
1008 m_in_sync = true;
1009 m_non_flushed_cnt = 0;
1010 }
1011 }
1012 }
1013
1014 if (schedule_sync)
1015 {
1016 cache()->ScheduleFileSync(this);
1017 }
1018}
1019
1020//------------------------------------------------------------------------------
1021
1023{
1024 TRACEF(Dump, "Sync()");
1025
1026 int ret = m_data_file->Fsync();
1027 bool errorp = false;
1028 if (ret == XrdOssOK)
1029 {
1030 Stats loc_stats = m_stats.Clone();
1031 m_cfi.WriteIOStat(loc_stats);
1032 m_cfi.Write(m_info_file, m_filename.c_str());
1033 int cret = m_info_file->Fsync();
1034 if (cret != XrdOssOK)
1035 {
1036 TRACEF(Error, "Sync cinfo file sync error " << cret);
1037 errorp = true;
1038 }
1039 }
1040 else
1041 {
1042 TRACEF(Error, "Sync data file sync error " << ret << ", cinfo file has not been updated");
1043 errorp = true;
1044 }
1045
1046 if (errorp)
1047 {
1048 TRACEF(Error, "Sync failed, unlinking local files and initiating shutdown of File object");
1049
1050 // Unlink will also call this->initiate_emergency_shutdown()
1051 Cache::GetInstance().UnlinkFile(m_filename, false);
1052
1053 XrdSysCondVarHelper _lck(&m_state_cond);
1054
1055 m_writes_during_sync.clear();
1056 m_in_sync = false;
1057
1058 return;
1059 }
1060
1061 int written_while_in_sync;
1062 bool resync = false;
1063 {
1064 XrdSysCondVarHelper _lck(&m_state_cond);
1065 for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1066 {
1067 m_cfi.SetBitSynced(*i);
1068 }
1069 written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1070 m_writes_during_sync.clear();
1071
1072 // If there were writes during sync and the file is now complete,
1073 // let us call Sync again without resetting the m_in_sync flag.
1074 if (written_while_in_sync > 0 && m_cfi.IsComplete() && ! m_in_shutdown)
1075 resync = true;
1076 else
1077 m_in_sync = false;
1078 }
1079 TRACEF(Dump, "Sync "<< written_while_in_sync << " blocks written during sync." << (resync ? " File is now complete - resyncing." : ""));
1080
1081 if (resync)
1082 Sync();
1083}
1084
1085
1086//==============================================================================
1087// Block processing
1088//==============================================================================
1089
1090void File::free_block(Block* b)
1091{
1092 // Method always called under lock.
1093 int i = b->m_offset / m_block_size;
1094 TRACEF(Dump, "free_block block " << b << " idx = " << i);
1095 size_t ret = m_block_map.erase(i);
1096 if (ret != 1)
1097 {
1098 // assert might be a better option than a warning
1099 TRACEF(Error, "free_block did not erase " << i << " from map");
1100 }
1101 else
1102 {
1103 cache()->ReleaseRAM(b->m_buff, b->m_req_size);
1104 delete b;
1105 }
1106
1107 if (m_prefetch_state == kHold && (int) m_block_map.size() < Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks)
1108 {
1109 m_prefetch_state = kOn;
1110 cache()->RegisterPrefetchFile(this);
1111 }
1112}
1113
1114//------------------------------------------------------------------------------
1115
1116bool File::select_current_io_or_disable_prefetching(bool skip_current)
1117{
1118 // Method always called under lock. It also expects prefetch to be active.
1119
1120 int io_size = (int) m_io_set.size();
1121 bool io_ok = false;
1122
1123 if (io_size == 1)
1124 {
1125 io_ok = (*m_io_set.begin())->m_allow_prefetching;
1126 if (io_ok)
1127 {
1128 m_current_io = m_io_set.begin();
1129 }
1130 }
1131 else if (io_size > 1)
1132 {
1133 IoSet_i mi = m_current_io;
1134 if (skip_current && mi != m_io_set.end()) ++mi;
1135
1136 for (int i = 0; i < io_size; ++i)
1137 {
1138 if (mi == m_io_set.end()) mi = m_io_set.begin();
1139
1140 if ((*mi)->m_allow_prefetching)
1141 {
1142 m_current_io = mi;
1143 io_ok = true;
1144 break;
1145 }
1146 ++mi;
1147 }
1148 }
1149
1150 if ( ! io_ok)
1151 {
1152 m_current_io = m_io_set.end();
1153 m_prefetch_state = kStopped;
1154 cache()->DeRegisterPrefetchFile(this);
1155 }
1156
1157 return io_ok;
1158}
1159
1160//------------------------------------------------------------------------------
1161
1162void File::ProcessDirectReadFinished(ReadRequest *rreq, int bytes_read, int error_cond)
1163{
1164 // Called from DirectResponseHandler.
1165 // NOT under lock.
1166
1167 if (error_cond)
1168 TRACEF(Error, "Read(), direct read finished with error " << -error_cond << " " << XrdSysE2T(-error_cond));
1169
1170 m_state_cond.Lock();
1171
1172 if (error_cond)
1173 rreq->update_error_cond(error_cond);
1174 else {
1175 rreq->m_stats.m_BytesBypassed += bytes_read;
1176 rreq->m_bytes_read += bytes_read;
1177 }
1178
1179 rreq->m_direct_done = true;
1180
1181 bool rreq_complete = rreq->is_complete();
1182
1183 m_state_cond.UnLock();
1184
1185 if (rreq_complete)
1186 FinalizeReadRequest(rreq);
1187}
1188
1189void File::ProcessBlockError(Block *b, ReadRequest *rreq)
1190{
1191 // Called from ProcessBlockResponse().
1192 // YES under lock -- we have to protect m_block_map for recovery through multiple IOs.
1193 // Does not manage m_read_req.
1194 // Will not complete the request.
1195
1196 TRACEF(Error, "ProcessBlockError() io " << b->m_io << ", block "<< b->m_offset/m_block_size <<
1197 " finished with error " << -b->get_error() << " " << XrdSysE2T(-b->get_error()));
1198
1199 rreq->update_error_cond(b->get_error());
1200 --rreq->m_n_chunk_reqs;
1201
1202 dec_ref_count(b);
1203}
1204
1205void File::ProcessBlockSuccess(Block *b, ChunkRequest &creq)
1206{
1207 // Called from ProcessBlockResponse().
1208 // NOT under lock as it does memcopy ofor exisf block data.
1209 // Acquires lock for block, m_read_req and rreq state update.
1210
1211 ReadRequest *rreq = creq.m_read_req;
1212
1213 TRACEF(Dump, "ProcessBlockSuccess() ub=" << (void*)creq.m_buf << " from finished block " << b->m_offset/m_block_size << " size " << creq.m_size);
1214 memcpy(creq.m_buf, b->m_buff + creq.m_off, creq.m_size);
1215
1216 m_state_cond.Lock();
1217
1218 rreq->m_bytes_read += creq.m_size;
1219
1220 if (b->get_req_id() == (void*) rreq)
1221 rreq->m_stats.m_BytesMissed += creq.m_size;
1222 else
1223 rreq->m_stats.m_BytesHit += creq.m_size;
1224
1225 --rreq->m_n_chunk_reqs;
1226
1227 if (b->m_prefetch)
1228 inc_prefetch_hit_cnt(1);
1229
1230 dec_ref_count(b);
1231
1232 bool rreq_complete = rreq->is_complete();
1233
1234 m_state_cond.UnLock();
1235
1236 if (rreq_complete)
1237 FinalizeReadRequest(rreq);
1238}
1239
1240void File::FinalizeReadRequest(ReadRequest *rreq)
1241{
1242 // called from ProcessBlockResponse()
1243 // NOT under lock -- does callout
1244
1245 m_stats.AddReadStats(rreq->m_stats);
1246
1247 rreq->m_rh->Done(rreq->return_value());
1248 delete rreq;
1249}
1250
1251void File::ProcessBlockResponse(Block *b, int res)
1252{
1253 static const char* tpfx = "ProcessBlockResponse ";
1254
1255 TRACEF(Dump, tpfx << "block=" << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset << ", res=" << res);
1256
1257 if (res >= 0 && res != b->get_size())
1258 {
1259 // Incorrect number of bytes received, apparently size of the file on the remote
1260 // is different than what the cache expects it to be.
1261 TRACEF(Error, tpfx << "Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1262 Cache::GetInstance().UnlinkFile(m_filename, false);
1263 }
1264
1265 m_state_cond.Lock();
1266
1267 // Deregister block from IO's prefetch count, if needed.
1268 if (b->m_prefetch)
1269 {
1270 IO *io = b->get_io();
1271 IoSet_i mi = m_io_set.find(io);
1272 if (mi != m_io_set.end())
1273 {
1274 --io->m_active_prefetches;
1275
1276 // If failed and IO is still prefetching -- disable prefetching on this IO.
1277 if (res < 0 && io->m_allow_prefetching)
1278 {
1279 TRACEF(Debug, tpfx << "after failed prefetch on io " << io << " disabling prefetching on this io.");
1280 io->m_allow_prefetching = false;
1281
1282 // Check if any IO is still available for prfetching. If not, stop it.
1283 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1284 {
1285 if ( ! select_current_io_or_disable_prefetching(false) )
1286 {
1287 TRACEF(Debug, tpfx << "stopping prefetching after io " << b->get_io() << " marked as bad.");
1288 }
1289 }
1290 }
1291
1292 // If failed with no subscribers -- delete the block and exit.
1293 if (b->m_refcnt == 0 && (res < 0 || m_in_shutdown))
1294 {
1295 free_block(b);
1296 m_state_cond.UnLock();
1297 return;
1298 }
1299 }
1300 else
1301 {
1302 TRACEF(Error, tpfx << "io " << b->get_io() << " not found in IoSet.");
1303 }
1304 }
1305
1306 if (res == b->get_size())
1307 {
1308 b->set_downloaded();
1309 TRACEF(Dump, tpfx << "inc_ref_count idx=" << b->m_offset/m_block_size);
1310 if ( ! m_in_shutdown)
1311 {
1312 // Increase ref-count for the writer.
1313 inc_ref_count(b);
1314 m_stats.AddWriteStats(b->get_size(), b->get_n_cksum_errors());
1315 cache()->AddWriteTask(b, true);
1316 }
1317
1318 // Swap chunk-reqs vector out of Block, it will be processed outside of lock.
1319 vChunkRequest_t creqs_to_notify;
1320 creqs_to_notify.swap( b->m_chunk_reqs );
1321
1322 m_state_cond.UnLock();
1323
1324 for (auto &creq : creqs_to_notify)
1325 {
1326 ProcessBlockSuccess(b, creq);
1327 }
1328 }
1329 else
1330 {
1331 if (res < 0) {
1332 TRACEF(Error, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset << " error=" << res);
1333 } else {
1334 TRACEF(Error, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset << " incomplete, got " << res << " expected " << b->get_size());
1335#if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1336 res = -EIO;
1337#else
1338 res = -EREMOTEIO;
1339#endif
1340 }
1341 b->set_error(res);
1342
1343 // Loop over Block's chunk-reqs vector, error out ones with the same IO.
1344 // Collect others with a different IO, the first of them will be used to reissue the request.
1345 // This is then done outside of lock.
1346 std::list<ReadRequest*> rreqs_to_complete;
1347 vChunkRequest_t creqs_to_keep;
1348
1349 for(ChunkRequest &creq : b->m_chunk_reqs)
1350 {
1351 ReadRequest *rreq = creq.m_read_req;
1352
1353 if (rreq->m_io == b->get_io())
1354 {
1355 ProcessBlockError(b, rreq);
1356 if (rreq->is_complete())
1357 {
1358 rreqs_to_complete.push_back(rreq);
1359 }
1360 }
1361 else
1362 {
1363 creqs_to_keep.push_back(creq);
1364 }
1365 }
1366
1367 bool reissue = false;
1368 if ( ! creqs_to_keep.empty())
1369 {
1370 ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1371
1372 TRACEF(Info, "ProcessBlockResponse() requested block " << (void*)b << " failed with another io " <<
1373 b->get_io() << " - reissuing request with my io " << rreq->m_io);
1374
1375 b->reset_error_and_set_io(rreq->m_io, rreq);
1376 b->m_chunk_reqs.swap( creqs_to_keep );
1377 reissue = true;
1378 }
1379
1380 m_state_cond.UnLock();
1381
1382 for (auto rreq : rreqs_to_complete)
1383 FinalizeReadRequest(rreq);
1384
1385 if (reissue)
1386 ProcessBlockRequest(b);
1387 }
1388}
1389
1390//------------------------------------------------------------------------------
1391
1392const char* File::lPath() const
1393{
1394 return m_filename.c_str();
1395}
1396
1397//------------------------------------------------------------------------------
1398
1399int File::offsetIdx(int iIdx) const
1400{
1401 return iIdx - m_offset/m_block_size;
1402}
1403
1404
1405//------------------------------------------------------------------------------
1406
1408{
1409 // Check that block is not on disk and not in RAM.
1410 // TODO: Could prefetch several blocks at once!
1411 // blks_max could be an argument
1412
1413 BlockList_t blks;
1414
1415 TRACEF(DumpXL, "Prefetch() entering.");
1416 {
1417 XrdSysCondVarHelper _lck(m_state_cond);
1418
1419 if (m_prefetch_state != kOn)
1420 {
1421 return;
1422 }
1423
1424 if ( ! select_current_io_or_disable_prefetching(true) )
1425 {
1426 TRACEF(Error, "Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1427 return;
1428 }
1429
1430 // Select block(s) to fetch.
1431 for (int f = 0; f < m_num_blocks; ++f)
1432 {
1433 if ( ! m_cfi.TestBitWritten(f))
1434 {
1435 int f_act = f + m_offset / m_block_size;
1436
1437 BlockMap_i bi = m_block_map.find(f_act);
1438 if (bi == m_block_map.end())
1439 {
1440 Block *b = PrepareBlockRequest(f_act, *m_current_io, nullptr, true);
1441 if (b)
1442 {
1443 TRACEF(Dump, "Prefetch take block " << f_act);
1444 blks.push_back(b);
1445 // Note: block ref_cnt not increased, it will be when placed into write queue.
1446
1447 inc_prefetch_read_cnt(1);
1448 }
1449 else
1450 {
1451 // This shouldn't happen as prefetching stops when RAM is 70% full.
1452 TRACEF(Warning, "Prefetch allocation failed for block " << f_act);
1453 }
1454 break;
1455 }
1456 }
1457 }
1458
1459 if (blks.empty())
1460 {
1461 TRACEF(Debug, "Prefetch file is complete, stopping prefetch.");
1462 m_prefetch_state = kComplete;
1463 cache()->DeRegisterPrefetchFile(this);
1464 }
1465 else
1466 {
1467 (*m_current_io)->m_active_prefetches += (int) blks.size();
1468 }
1469 }
1470
1471 if ( ! blks.empty())
1472 {
1473 ProcessBlockRequests(blks);
1474 }
1475}
1476
1477
1478//------------------------------------------------------------------------------
1479
1481{
1482 return m_prefetch_score;
1483}
1484
1486{
1487 return Cache::GetInstance().GetLog();
1488}
1489
1494
1495void File::insert_remote_location(const std::string &loc)
1496{
1497 if ( ! loc.empty())
1498 {
1499 size_t p = loc.find_first_of('@');
1500 m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1501 }
1502}
1503
1504std::string File::GetRemoteLocations() const
1505{
1506 std::string s;
1507 if ( ! m_remote_locations.empty())
1508 {
1509 size_t sl = 0;
1510 int nl = 0;
1511 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1512 {
1513 sl += i->size();
1514 }
1515 s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1516 s = '[';
1517 int j = 1;
1518 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1519 {
1520 s += '"'; s += *i; s += '"';
1521 if (j < nl) s += ',';
1522 }
1523 s += ']';
1524 }
1525 else
1526 {
1527 s = "[]";
1528 }
1529 return s;
1530}
1531
1532//==============================================================================
1533//======================= RESPONSE HANDLERS ==============================
1534//==============================================================================
1535
1537{
1538 m_block->m_file->ProcessBlockResponse(m_block, res);
1539 delete this;
1540}
1541
1542//------------------------------------------------------------------------------
1543
1545{
1546 m_mutex.Lock();
1547
1548 int n_left = --m_to_wait;
1549
1550 if (res < 0) {
1551 if (m_errno == 0) m_errno = res; // store first reported error
1552 } else {
1553 m_bytes_read += res;
1554 }
1555
1556 m_mutex.UnLock();
1557
1558 if (n_left == 0)
1559 {
1560 m_file->ProcessDirectReadFinished(m_read_req, m_bytes_read, m_errno);
1561 delete this;
1562 }
1563}
#define XrdOssOK
Definition XrdOss.hh:50
#define XRDOSS_mkpath
Definition XrdOss.hh:466
#define TRACE_Dump
#define TRACEF(act, x)
#define ERRNO_AND_ERRSTR(err_code)
#define stat(a, b)
Definition XrdPosix.hh:96
#define XRD_TRACE
bool Debug
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:99
#define TRACE(act, x)
Definition XrdTrace.hh:63
virtual int Fsync()
Definition XrdOss.hh:144
virtual int Ftruncate(unsigned long long flen)
Definition XrdOss.hh:164
virtual int Close(long long *retsz=0)=0
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition XrdOss.hh:200
virtual ssize_t Read(off_t offset, size_t size)
Definition XrdOss.hh:281
virtual ssize_t pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
Definition XrdOss.cc:198
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
Definition XrdOss.cc:236
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
Definition XrdOss.hh:345
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
void Put(const char *varname, const char *value)
Definition XrdOucEnv.hh:85
void Done(int result) override
int get_size() const
int get_error() const
int get_n_cksum_errors()
int * ptr_n_cksum_errors()
IO * get_io() const
vCkSum_t & ref_cksum_vec()
long long get_offset() const
vChunkRequest_t m_chunk_reqs
void set_error(int err)
void * get_req_id() const
void set_downloaded()
bool req_cksum_net() const
char * get_buff() const
bool has_cksums() const
long long m_offset
void reset_error_and_set_io(IO *io, void *rid)
int get_req_size() const
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition XrdPfc.hh:267
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition XrdPfc.hh:315
XrdSysTrace * GetTrace()
Definition XrdPfc.hh:398
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:159
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition XrdPfc.cc:1132
XrdOss * GetOss() const
Definition XrdPfc.hh:385
XrdSysError * GetLog()
Definition XrdPfc.hh:397
void Done(int result) override
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
XrdSysTrace * GetTrace()
void WriteBlockToDisk(Block *b)
std::string & GetLocalPath()
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
Definition XrdPfcFile.cc:99
float GetPrefetchScore() const
friend class BlockResponseHandler
XrdSysError * GetLog()
std::string GetRemoteLocations() const
void AddIO(IO *io)
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
friend class DirectResponseHandler
void initiate_emergency_shutdown()
void Sync()
Sync file cache inf o and output data with disk.
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
void RemoveIO(IO *io)
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
Stats DeltaStatsFromLastCall()
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition XrdPfcIO.hh:18
XrdOucCacheIO * GetInput()
Definition XrdPfcIO.cc:30
RAtomic_int m_active_read_reqs
number of active read requests
Definition XrdPfcIO.hh:72
const char * GetLocation()
Definition XrdPfcIO.hh:46
Status of cached file. Can be read from and written into a binary file.
Definition XrdPfcInfo.hh:45
void SetBitPrefetch(int i)
Mark block as obtained through prefetch.
static const char * s_infoExtension
void SetBitSynced(int i)
Mark block as synced to disk.
time_t GetNoCkSumTimeForUVKeep() const
CkSumCheck_e GetCkSumState() const
void WriteIOStatAttach()
Write open time in the last entry of access statistics.
void ResetCkSumNet()
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
void DowngradeCkSumState(CkSumCheck_e css_ref)
bool IsCkSumNet() const
void ResetAllAccessStats()
Reset IO Stats.
bool TestBitPrefetch(int i) const
Test if block at the given index has been prefetched.
bool IsComplete() const
Get complete status.
bool IsCkSumCache() const
void SetBitWritten(int i)
Mark block as written to disk.
long long GetBufferSize() const
Get prefetch buffer size.
void WriteIOStat(Stats &s)
Write bytes missed, hits, and disk.
long long GetExpectedDataFileSize() const
Get expected data file size.
bool TestBitWritten(int i) const
Test if block at the given index is written to disk.
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
void SetCkSumState(CkSumCheck_e css)
void ResetNoCkSumTime()
void SetBufferSizeFileSizeAndCreationTime(long long bs, long long fs)
void WriteIOStatDetach(Stats &s)
Write close time together with bytes missed, hits, and disk.
int GetNBlocks() const
Get number of blocks represented in download-state bit-vector.
Statistics of cache utilisation by a File object.
void AddReadStats(const Stats &s)
long long m_BytesBypassed
number of bytes served directly through XrdCl
void AddWriteStats(long long bytes_written, int n_cks_errs)
void AddBytesHit(long long bh)
long long m_BytesHit
number of bytes served from disk
void IoDetach(int duration)
void DeltaToReference(const Stats &ref)
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
std::vector< ChunkRequest > vChunkRequest_t
std::list< Block * > BlockList_t
XrdSysTrace * GetTrace()
std::list< Block * >::iterator BlockList_i
long long offset
ReadRequest * m_read_req
Contains parameters configurable from the xrootd config file.
Definition XrdPfc.hh:56
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition XrdPfc.hh:109
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
Definition XrdPfc.hh:74
CkSumCheck_e get_cs_Chk() const
Definition XrdPfc.hh:67
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
Definition XrdPfc.hh:106
bool should_uvkeep_purge(time_t delta) const
Definition XrdPfc.hh:76
std::string m_data_space
oss space for data files
Definition XrdPfc.hh:82
long long m_bufferSize
prefetch buffer size, default 1MB
Definition XrdPfc.hh:101
std::string m_meta_space
oss space for metadata files (cinfo)
Definition XrdPfc.hh:83
std::string m_username
username passed to oss plugin
Definition XrdPfc.hh:81
unsigned short m_seq_id
Definition XrdPfcFile.hh:64
void update_error_cond(int ec)
Definition XrdPfcFile.hh:91
bool is_complete() const
Definition XrdPfcFile.hh:93
int return_value() const
Definition XrdPfcFile.hh:94
long long m_bytes_read
Definition XrdPfcFile.hh:79