XRootD
Loading...
Searching...
No Matches
XrdCephOssBufferedFile Class Reference

#include <XrdCephOssBufferedFile.hh>

+ Inheritance diagram for XrdCephOssBufferedFile:
+ Collaboration diagram for XrdCephOssBufferedFile:

Public Member Functions

 XrdCephOssBufferedFile (XrdCephOss *cephoss, XrdCephOssFile *cephossDF, size_t buffersize, const std::string &bufferIOmode, size_t maxNumberSimulBuffers)
 
virtual ~XrdCephOssBufferedFile ()
 
virtual int Close (long long *retsz=0)
 
virtual int Fstat (struct stat *buff)
 
virtual int Fsync (void)
 
virtual int Ftruncate (unsigned long long)
 
virtual int Open (const char *path, int flags, mode_t mode, XrdOucEnv &env)
 
virtual ssize_t Read (off_t offset, size_t blen)
 
virtual ssize_t Read (void *buff, off_t offset, size_t blen)
 
virtual int Read (XrdSfsAio *aoip)
 
virtual ssize_t ReadRaw (void *, off_t, size_t)
 
virtual ssize_t ReadV (XrdOucIOVec *readV, int rdvcnt)
 
virtual ssize_t Write (const void *buff, off_t offset, size_t blen)
 
virtual int Write (XrdSfsAio *aiop)
 
- Public Member Functions inherited from XrdCephOssFile
 XrdCephOssFile (XrdCephOss *cephoss)
 
virtual ~XrdCephOssFile ()
 
virtual int getFileDescriptor () const
 
- Public Member Functions inherited from XrdOssDF
 XrdOssDF (const char *tid="", uint16_t dftype=0, int fdnum=-1)
 
virtual ~XrdOssDF ()
 
uint16_t DFType ()
 
virtual int Fchmod (mode_t mode)
 
virtual int Fctl (int cmd, int alen, const char *args, char **resp=0)
 
virtual void Flush ()
 Flush filesystem cached pages for this file (used for checksums).
 
virtual int Fsync (XrdSfsAio *aiop)
 
virtual int getFD ()
 
virtual off_t getMmap (void **addr)
 
virtual const char * getTID ()
 
virtual int isCompressed (char *cxidp=0)
 
virtual int Opendir (const char *path, XrdOucEnv &env)
 
virtual ssize_t pgRead (void *buffer, off_t offset, size_t rdlen, uint32_t *csvec, uint64_t opts)
 
virtual int pgRead (XrdSfsAio *aioparm, uint64_t opts)
 
virtual ssize_t pgWrite (void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
 
virtual int pgWrite (XrdSfsAio *aioparm, uint64_t opts)
 
virtual int Readdir (char *buff, int blen)
 
virtual int StatRet (struct stat *buff)
 
virtual ssize_t WriteV (XrdOucIOVec *writeV, int wrvcnt)
 

Protected Member Functions

std::unique_ptr< XrdCephBuffer::IXrdCephBufferAlgcreateBuffer ()
 

Protected Attributes

std::mutex m_buf_mutex
 
std::unique_ptr< XrdCephBuffer::IXrdCephBufferAlgm_bufferAlg
 
std::string m_bufferIOmode
 
std::map< size_t, std::unique_ptr< XrdCephBuffer::IXrdCephBufferAlg > > m_bufferReadAlgs
 
size_t m_bufsize = 16*1024*1024L
 
std::atomic< size_t > m_bytesRead = {0}
 
std::atomic< size_t > m_bytesReadAIO = {0}
 number of bytes read or written
 
std::atomic< size_t > m_bytesReadV = {0}
 number of bytes read or written
 
std::atomic< size_t > m_bytesWrite = {0}
 number of bytes read or written
 
std::atomic< size_t > m_bytesWriteAIO = {0}
 number of bytes read or written
 
XrdCephOssm_cephoss = nullptr
 create a new instance of the buffer
 
int m_flags = 0
 number of ms to sleep if a retry is requested
 
int m_maxBufferRetries {5}
 set the maximum of buffers to open on a single instance (e.g. for simultaneous file reads)
 
int m_maxBufferRetrySleepTime_ms
 How many times to retry a ready from a buffer with EBUSY errors.
 
size_t m_maxCountReadBuffers {10}
 any data access method on the buffer will use this
 
std::string m_path
 
std::chrono::time_point< std::chrono::system_clock > m_timestart
 
XrdCephOssFilem_xrdOssDF = nullptr
 
- Protected Attributes inherited from XrdCephOssFile
XrdCephOssm_cephOss
 
int m_fd
 
- Protected Attributes inherited from XrdOssDF
uint16_t dfType
 
int fd
 
off_t pgwEOF
 
short rsvd
 
const char * tident
 

Additional Inherited Members

- Static Public Attributes inherited from XrdOssDF
static const uint16_t DF_isDir = 0x0001
 Object is for a directory.
 
static const uint16_t DF_isFile = 0x0002
 Object is for a file.
 
static const uint16_t DF_isProxy = 0x0010
 Object is a proxy object.
 
static const uint64_t doCalc = 0x4000000000000000ULL
 pgw: Calculate checksums
 
static const int Fctl_ckpObj = 0
 
static const int Fctl_utimes = 1
 
static const uint64_t Verify = 0x8000000000000000ULL
 all: Verify checksums
 

Detailed Description

Decorator class XrdCephOssBufferedFile designed to wrap XrdCephOssFile Functionality for buffered access to/from data in Ceph to avoid inefficient small reads / writes from the client side

Definition at line 48 of file XrdCephOssBufferedFile.hh.

Constructor & Destructor Documentation

◆ XrdCephOssBufferedFile()

XrdCephOssBufferedFile::XrdCephOssBufferedFile ( XrdCephOss * cephoss,
XrdCephOssFile * cephossDF,
size_t buffersize,
const std::string & bufferIOmode,
size_t maxNumberSimulBuffers )

Definition at line 58 of file XrdCephOssBufferedFile.cc.

60 :
61 XrdCephOssFile(cephoss), m_cephoss(cephoss), m_xrdOssDF(cephossDF),
62 m_maxCountReadBuffers(maxNumberSimulBuffers),
64 m_bufsize(buffersize),
65 m_bufferIOmode(bufferIOmode)
66{
67
68}
int m_maxBufferRetrySleepTime_ms
How many times to retry a ready from a buffer with EBUSY errors.
XrdCephOss * m_cephoss
create a new instance of the buffer
size_t m_maxCountReadBuffers
any data access method on the buffer will use this
XrdCephOssFile(XrdCephOss *cephoss)

References XrdCephOssFile::XrdCephOssFile(), m_bufferIOmode, m_bufsize, m_cephoss, m_maxBufferRetrySleepTime_ms, m_maxCountReadBuffers, and m_xrdOssDF.

+ Here is the call graph for this function:

◆ ~XrdCephOssBufferedFile()

XrdCephOssBufferedFile::~XrdCephOssBufferedFile ( )
virtual

Definition at line 70 of file XrdCephOssBufferedFile.cc.

70 {
71 // XrdCephEroute.Say("XrdCephOssBufferedFile::Destructor");
72
73 // remember to delete the inner XrdCephOssFile object
74 if (m_xrdOssDF) {
75 delete m_xrdOssDF;
76 m_xrdOssDF = nullptr;
77 }
78
79}

References m_xrdOssDF.

Member Function Documentation

◆ Close()

int XrdCephOssBufferedFile::Close ( long long * retsz = 0)
virtual

Close a directory or file.

Parameters
retszIf not nil, where the size of the file is to be returned.
Returns
0 upon success or -errno or -osserr (see XrdOssError.hh).

Reimplemented from XrdCephOssFile.

Definition at line 101 of file XrdCephOssBufferedFile.cc.

101 {
102 // if data is still in the buffer and we are writing, make sure to write it
103 if (m_bufferAlg && (m_flags & (O_WRONLY|O_RDWR)) != 0) {
104 ssize_t rc = m_bufferAlg->flushWriteCache();
105 if (rc < 0) {
106 LOGCEPH( "XrdCephOssBufferedFile::Close: flush Error fd: " << m_fd << " rc:" << rc );
107 // still try to close the file
108 ssize_t rc2 = m_xrdOssDF->Close(retsz);
109 if (rc2 < 0) {
110 LOGCEPH( "XrdCephOssBufferedFile::Close: Close error after flush Error fd: " << m_fd << " rc:" << rc2 );
111 }
112 return rc; // return the original flush error
113 } else {
114 LOGCEPH( "XrdCephOssBufferedFile::Close: Flushed data on close fd: " << m_fd << " rc:" << rc );
115 }
116 } // check for write
117 const std::chrono::time_point<std::chrono::system_clock> now =
118 std::chrono::system_clock::now();
119 const std::time_t t_s = std::chrono::system_clock::to_time_t(m_timestart);
120 const std::time_t t_c = std::chrono::system_clock::to_time_t(now);
121
122 auto t_dur = std::chrono::duration_cast<std::chrono::milliseconds>(now - m_timestart).count();
123
124 LOGCEPH("XrdCephOssBufferedFile::Summary: {\"fd\":" << m_fd << ", \"Elapsed_time_ms\":" << t_dur
125 << ", \"path\":\"" << m_path
126 << "\", read_B:" << m_bytesRead.load()
127 << ", readV_B:" << m_bytesReadV.load()
128 << ", readAIO_B:" << m_bytesReadAIO.load()
129 << ", writeB:" << m_bytesWrite.load()
130 << ", writeAIO_B:" << m_bytesWriteAIO.load()
131 << ", startTime:\"" << std::put_time(std::localtime(&t_s), "%F %T") << "\", endTime:\""
132 << std::put_time(std::localtime(&t_c), "%F %T") << "\""
133 << ", nBuffersRead:" << m_bufferReadAlgs.size()
134 << "}");
135
136 return m_xrdOssDF->Close(retsz);
137}
#define LOGCEPH(x)
std::map< size_t, std::unique_ptr< XrdCephBuffer::IXrdCephBufferAlg > > m_bufferReadAlgs
std::atomic< size_t > m_bytesRead
std::chrono::time_point< std::chrono::system_clock > m_timestart
std::atomic< size_t > m_bytesReadV
number of bytes read or written
std::atomic< size_t > m_bytesWrite
number of bytes read or written
std::unique_ptr< XrdCephBuffer::IXrdCephBufferAlg > m_bufferAlg
std::atomic< size_t > m_bytesReadAIO
number of bytes read or written
std::atomic< size_t > m_bytesWriteAIO
number of bytes read or written
int m_flags
number of ms to sleep if a retry is requested

References LOGCEPH, m_bufferAlg, m_bufferReadAlgs, m_bytesRead, m_bytesReadAIO, m_bytesReadV, m_bytesWrite, m_bytesWriteAIO, XrdCephOssFile::m_fd, m_flags, m_path, m_timestart, and m_xrdOssDF.

◆ createBuffer()

std::unique_ptr< XrdCephBuffer::IXrdCephBufferAlg > XrdCephOssBufferedFile::createBuffer ( )
protected

Definition at line 313 of file XrdCephOssBufferedFile.cc.

313 {
314 std::unique_ptr<IXrdCephBufferAlg> bufferAlg;
315
316 size_t bufferSize {m_bufsize}; // create buffer of default size
318 BUFLOG("XrdCephOssBufferedFile: buffer reached max number of simul-buffers for this file: creating only 1MiB buffer" );
319 bufferSize = 1048576;
320 } else {
321 BUFLOG("XrdCephOssBufferedFile: buffer: got " << m_bufferReadAlgs.size() << " buffers already");
322 }
323
324 try {
325 std::unique_ptr<IXrdCephBufferData> cephbuffer = std::unique_ptr<IXrdCephBufferData>(new XrdCephBufferDataSimple(bufferSize));
326 std::unique_ptr<ICephIOAdapter> cephio;
327 if (m_bufferIOmode == "aio") {
328 cephio = std::unique_ptr<ICephIOAdapter>(new CephIOAdapterAIORaw(cephbuffer.get(),m_fd));
329 } else if (m_bufferIOmode == "io") {
330 cephio = std::unique_ptr<ICephIOAdapter>(new CephIOAdapterRaw(cephbuffer.get(),m_fd,
331 !m_cephoss->m_useDefaultPreadAlg));
332 } else {
333 BUFLOG("XrdCephOssBufferedFile: buffer mode needs to be one of aio|io " );
334 m_xrdOssDF->Close();
335 return bufferAlg; // invalid instance;
336 }
337
338 LOGCEPH( "XrdCephOssBufferedFile::Open: fd: " << m_fd << " Buffer created: " << cephbuffer->capacity() );
339 bufferAlg = std::unique_ptr<IXrdCephBufferAlg>(new XrdCephBufferAlgSimple(std::move(cephbuffer),std::move(cephio),m_fd) );
340 } catch (const std::bad_alloc &e) {
341 BUFLOG("XrdCephOssBufferedFile: Bad memory allocation in buffer: " << e.what() );
342 }
343
344 return bufferAlg;
345 }
#define BUFLOG(x)

References BUFLOG, LOGCEPH, m_bufferIOmode, m_bufferReadAlgs, m_bufsize, m_cephoss, XrdCephOssFile::m_fd, m_maxCountReadBuffers, and m_xrdOssDF.

Referenced by Read(), Read(), Write(), and Write().

+ Here is the caller graph for this function:

◆ Fstat()

int XrdCephOssBufferedFile::Fstat ( struct stat * buf)
virtual

Return state information for this file.

Parameters
buf- Pointer to the structure where info it to be returned.
Returns
0 upon success or -errno or -osserr (see XrdOssError.hh).

Reimplemented from XrdCephOssFile.

Definition at line 240 of file XrdCephOssBufferedFile.cc.

240 {
241 return m_xrdOssDF->Fstat(buff);
242}

References m_xrdOssDF, and stat.

◆ Fsync()

int XrdCephOssBufferedFile::Fsync ( void )
virtual

Synchronize associated file with media (synchronous).

Returns
0 upon success or -errno or -osserr (see XrdOssError.hh).

Reimplemented from XrdCephOssFile.

Definition at line 304 of file XrdCephOssBufferedFile.cc.

304 {
305 return m_xrdOssDF->Fsync();
306}

References m_xrdOssDF.

◆ Ftruncate()

int XrdCephOssBufferedFile::Ftruncate ( unsigned long long flen)
virtual

Set the size of the associated file.

Parameters
flen- The new size of the file.
Returns
0 upon success or -errno or -osserr (see XrdOssError.hh).

Reimplemented from XrdCephOssFile.

Definition at line 308 of file XrdCephOssBufferedFile.cc.

308 {
309 return m_xrdOssDF->Ftruncate(len);
310}

References m_xrdOssDF.

◆ Open()

int XrdCephOssBufferedFile::Open ( const char * path,
int Oflag,
mode_t Mode,
XrdOucEnv & env )
virtual

Open a file.

Parameters
path- Pointer to the path of the file to be opened.
Oflag- Standard open flags.
Mode- File open mode (ignored unless creating a file).
env- Reference to environmental information.
Returns
0 upon success or -errno or -osserr (see XrdOssError.hh).

Reimplemented from XrdCephOssFile.

Definition at line 82 of file XrdCephOssBufferedFile.cc.

82 {
83
84 int rc = m_xrdOssDF->Open(path, flags, mode, env);
85 if (rc < 0) {
86 return rc;
87 }
88 m_fd = m_xrdOssDF->getFileDescriptor();
89 BUFLOG("XrdCephOssBufferedFile::Open got fd: " << m_fd << " " << path);
90 m_flags = flags; // e.g. for write/read knowledge
91 m_path = path; // good to keep the path for final stats presentation
92
93
94 // start the timer
95 //m_timestart = std::chrono::steady_clock::now();
96 m_timestart = std::chrono::system_clock::now();
97 // return the file descriptor
98 return rc;
99}

References BUFLOG, XrdCephOssFile::m_fd, m_flags, m_path, m_timestart, and m_xrdOssDF.

◆ Read() [1/3]

ssize_t XrdCephOssBufferedFile::Read ( off_t offset,
size_t size )
virtual

Preread file blocks into the file system cache.

Parameters
offset- The offset where the read is to start.
size- The number of bytes to pre-read.
Returns
0 upon success or -errno or -osserr (see XrdOssError.hh).

Reimplemented from XrdCephOssFile.

Definition at line 147 of file XrdCephOssBufferedFile.cc.

147 {
148 return m_xrdOssDF->Read(offset, blen);
149}

References m_xrdOssDF.

◆ Read() [2/3]

ssize_t XrdCephOssBufferedFile::Read ( void * buffer,
off_t offset,
size_t size )
virtual

Read file bytes into a buffer.

Parameters
buffer- pointer to buffer where the bytes are to be placed.
offset- The offset where the read is to start.
size- The number of bytes to read.
Returns
>= 0 The number of bytes that placed in buffer.
< 0 -errno or -osserr upon failure (see XrdOssError.hh).

Reimplemented from XrdCephOssFile.

Definition at line 151 of file XrdCephOssBufferedFile.cc.

151 {
152 size_t thread_id = std::hash<std::thread::id>{}(std::this_thread::get_id());
153
154 IXrdCephBufferAlg * buffer{nullptr};
155 // check for, and create if needed, a buffer
156 {
157 // lock in case need to create a new algorithm instance
158 const std::lock_guard<std::mutex> lock(m_buf_mutex);
159 auto buffer_itr = m_bufferReadAlgs.find(thread_id);
160 if (buffer_itr == m_bufferReadAlgs.end()) {
161 // only create a buffer, if we haven't hit the max buffers yet
162 auto buffer_ptr = createBuffer();
163 if (buffer_ptr) {
164 buffer = buffer_ptr.get();
165 m_bufferReadAlgs[thread_id] = std::move(buffer_ptr);
166 } else {
167 // if we can't create a buffer, we just have to pass through the read ...
168 ssize_t rc = m_xrdOssDF->Read(buff, offset, blen);
169 if (rc >= 0) {
170 LOGCEPH( "XrdCephOssBufferedFile::Read buffers and read failed with rc: " << rc );
171 }
172 return rc;
173 }
174 } else {
175 buffer = buffer_itr->second.get();
176 }
177 } // scope of lock
178
179 int retry_counter{m_maxBufferRetries};
180 ssize_t rc {0};
181 while (retry_counter > 0) {
182 rc = buffer->read(buff, offset, blen);
183 if (rc != -EBUSY) break; // either worked, or is a real non busy error
184 LOGCEPH( "XrdCephOssBufferedFile::Read Recieved EBUSY for fd: " << m_fd << " on try: " << (m_maxBufferRetries-retry_counter) << ". Sleeping .. "
185 << " rc:" << rc << " off:" << offset << " len:" << blen);
186 std::this_thread::sleep_for(m_maxBufferRetrySleepTime_ms * 1ms);
187 --retry_counter;
188 }
189 if (retry_counter == 0) {
190 // reach maximum attempts for ebusy retry; fail the job
191 LOGCEPH( "XrdCephOssBufferedFile::Read Max attempts for fd: " << m_fd << " on try: " << (m_maxBufferRetries-retry_counter) << ". Terminating with -EIO: "
192 << " rc:" << rc << " off:" << offset << " len:" << blen );
193 // set a permanent error code:
194 rc = -EIO;
195 }
196 if (rc >=0) {
197 m_bytesRead.fetch_add(rc);
198 } else {
199 LOGCEPH( "XrdCephOssBufferedFile::Read: Read error fd: " << m_fd << " rc:" << rc << " off:" << offset << " len:" << blen);
200 }
201 // LOGCEPH( "XrdCephOssBufferedFile::Read: Read good fd: " << m_fd << " rc:" << rc << " off:" << offset << " len:" << blen);
202 return rc;
203}
int m_maxBufferRetries
set the maximum of buffers to open on a single instance (e.g. for simultaneous file reads)
std::unique_ptr< XrdCephBuffer::IXrdCephBufferAlg > createBuffer()

References createBuffer(), LOGCEPH, m_buf_mutex, m_bufferReadAlgs, m_bytesRead, XrdCephOssFile::m_fd, m_maxBufferRetries, m_maxBufferRetrySleepTime_ms, and m_xrdOssDF.

+ Here is the call graph for this function:

◆ Read() [3/3]

int XrdCephOssBufferedFile::Read ( XrdSfsAio * aiop)
virtual

Read file bytes using asynchronous I/O.

Parameters
aiop- Pointer to async I/O object controlling the I/O.
Returns
0 upon if request started success or -errno or -osserr (see XrdOssError.hh).

Reimplemented from XrdCephOssFile.

Definition at line 205 of file XrdCephOssBufferedFile.cc.

205 {
206 size_t thread_id = std::hash<std::thread::id>{}(std::this_thread::get_id());
207 IXrdCephBufferAlg * buffer{nullptr};
208 // check for, and create if needed, a buffer
209 {
210 // lock in case need to create a new algorithm instance
211 const std::lock_guard<std::mutex> lock(m_buf_mutex);
212 auto buffer_itr = m_bufferReadAlgs.find(thread_id);
213 if (buffer_itr == m_bufferReadAlgs.end()) {
214 m_bufferReadAlgs[thread_id] = createBuffer();
215 buffer = m_bufferReadAlgs.find(thread_id)->second.get();
216 } else {
217 buffer = buffer_itr->second.get();
218 }
219 }
220
221 // LOGCEPH("XrdCephOssBufferedFile::AIOREAD: fd: " << m_xrdOssDF->getFileDescriptor() << " " << time(nullptr) << " : "
222 // << aiop->sfsAio.aio_offset << " "
223 // << aiop->sfsAio.aio_nbytes << " " << aiop->sfsAio.aio_reqprio << " "
224 // << aiop->sfsAio.aio_fildes );
225 ssize_t rc = buffer->read_aio(aiop);
226 if (rc > 0) {
227 m_bytesReadAIO.fetch_add(rc);
228 } else {
229 LOGCEPH( "XrdCephOssBufferedFile::Read: ReadAIO error fd: " << m_fd << " rc:" << rc
230 << " off:" << aiop->sfsAio.aio_offset << " len:" << aiop->sfsAio.aio_nbytes );
231 }
232 return rc;
233}
off_t aio_offset
Definition XrdSfsAio.hh:49
size_t aio_nbytes
Definition XrdSfsAio.hh:48
struct aiocb sfsAio
Definition XrdSfsAio.hh:62

References aiocb::aio_nbytes, aiocb::aio_offset, createBuffer(), LOGCEPH, m_buf_mutex, m_bufferReadAlgs, m_bytesReadAIO, XrdCephOssFile::m_fd, XrdCephBuffer::IXrdCephBufferAlg::read_aio(), and XrdSfsAio::sfsAio.

+ Here is the call graph for this function:

◆ ReadRaw()

ssize_t XrdCephOssBufferedFile::ReadRaw ( void * buffer,
off_t offset,
size_t size )
virtual

Read uncompressed file bytes into a buffer.

Parameters
buffer- pointer to buffer where the bytes are to be placed.
offset- The offset where the read is to start.
size- The number of bytes to read.
Returns
>= 0 The number of bytes that placed in buffer.
< 0 -errno or -osserr upon failure (see XrdOssError.hh).

Reimplemented from XrdCephOssFile.

Definition at line 235 of file XrdCephOssBufferedFile.cc.

235 {
236 // #TODO; ReadRaw should bypass the buffer ?
237 return m_xrdOssDF->ReadRaw(buff, offset, blen);
238}

References m_xrdOssDF.

◆ ReadV()

ssize_t XrdCephOssBufferedFile::ReadV ( XrdOucIOVec * readV,
int rdvcnt )
virtual

Read file bytes as directed by the read vector.

Parameters
readVpointer to the array of read requests.
rdvcntthe number of elements in readV.
Returns
>=0 The numbe of bytes read.
< 0 -errno or -osserr upon failure (see XrdOssError.hh).

Reimplemented from XrdCephOssFile.

Definition at line 140 of file XrdCephOssBufferedFile.cc.

140 {
141 // don't touch readV in the buffering method
142 ssize_t rc = m_xrdOssDF->ReadV(readV,rnum);
143 if (rc > 0) m_bytesReadV.fetch_add(rc);
144 return rc;
145}

References m_bytesReadV, and m_xrdOssDF.

◆ Write() [1/2]

ssize_t XrdCephOssBufferedFile::Write ( const void * buffer,
off_t offset,
size_t size )
virtual

Write file bytes from a buffer.

Parameters
buffer- pointer to buffer where the bytes reside.
offset- The offset where the write is to start.
size- The number of bytes to write.
Returns
>= 0 The number of bytes that were written.
< 0 -errno or -osserr upon failure (see XrdOssError.hh).

Reimplemented from XrdCephOssFile.

Definition at line 244 of file XrdCephOssBufferedFile.cc.

244 {
245
246 if (!m_bufferAlg) {
248 if (!m_bufferAlg) {
249 LOGCEPH( "XrdCephOssBufferedFile: Error in creating buffered object");
250 return -EINVAL;
251 }
252 }
253
254
255 int retry_counter{m_maxBufferRetries};
256 ssize_t rc {0};
257 while (retry_counter > 0) {
258 rc = m_bufferAlg->write(buff, offset, blen);
259 if (rc != -EBUSY) break; // either worked, or is a real non busy error
260 LOGCEPH( "XrdCephOssBufferedFile::Write Recieved EBUSY for fd: " << m_fd << " on try: " << (m_maxBufferRetries-retry_counter) << ". Sleeping .. "
261 << " rc:" << rc << " off:" << offset << " len:" << blen);
262 std::this_thread::sleep_for(m_maxBufferRetrySleepTime_ms * 1ms);
263 --retry_counter;
264 }
265 if (retry_counter == 0) {
266 // reach maximum attempts for ebusy retry; fail the job
267 LOGCEPH( "XrdCephOssBufferedFile::Write Max attempts for fd: " << m_fd << " on try: " << (m_maxBufferRetries-retry_counter) << ". Terminating with -EIO: "
268 << " rc:" << rc << " off:" << offset << " len:" << blen );
269 // set a permanent error code:
270 rc = -EIO;
271 }
272 if (rc >=0) {
273 m_bytesWrite.fetch_add(rc);
274 } else {
275 LOGCEPH( "XrdCephOssBufferedFile::Write: Write error fd: " << m_fd << " rc:" << rc << " off:" << offset << " len:" << blen);
276 }
277 return rc;
278}

References createBuffer(), LOGCEPH, m_bufferAlg, m_bytesWrite, XrdCephOssFile::m_fd, m_maxBufferRetries, and m_maxBufferRetrySleepTime_ms.

+ Here is the call graph for this function:

◆ Write() [2/2]

int XrdCephOssBufferedFile::Write ( XrdSfsAio * aiop)
virtual

Write file bytes using asynchronous I/O.

Parameters
aiop- Pointer to async I/O object controlling the I/O.
Returns
0 upon if request started success or -errno or -osserr (see XrdOssError.hh).

Reimplemented from XrdCephOssFile.

Definition at line 280 of file XrdCephOssBufferedFile.cc.

280 {
281 if (!m_bufferAlg) {
283 if (!m_bufferAlg) {
284 LOGCEPH( "XrdCephOssBufferedFile: Error in creating buffered object");
285 return -EINVAL;
286 }
287 }
288
289 // LOGCEPH("XrdCephOssBufferedFile::AIOWRITE: fd: " << m_xrdOssDF->getFileDescriptor() << " " << time(nullptr) << " : "
290 // << aiop->sfsAio.aio_offset << " "
291 // << aiop->sfsAio.aio_nbytes << " " << aiop->sfsAio.aio_reqprio << " "
292 // << aiop->sfsAio.aio_fildes << " " );
293 ssize_t rc = m_bufferAlg->write_aio(aiop);
294 if (rc > 0) {
295 m_bytesWriteAIO.fetch_add(rc);
296 } else {
297 LOGCEPH( "XrdCephOssBufferedFile::Write: WriteAIO error fd: " << m_fd << " rc:" << rc
298 << " off:" << aiop->sfsAio.aio_offset << " len:" << aiop->sfsAio.aio_nbytes );
299 }
300 return rc;
301
302}

References aiocb::aio_nbytes, aiocb::aio_offset, createBuffer(), LOGCEPH, m_bufferAlg, m_bytesWriteAIO, XrdCephOssFile::m_fd, and XrdSfsAio::sfsAio.

+ Here is the call graph for this function:

Member Data Documentation

◆ m_buf_mutex

std::mutex XrdCephOssBufferedFile::m_buf_mutex
protected

Definition at line 76 of file XrdCephOssBufferedFile.hh.

Referenced by Read(), and Read().

◆ m_bufferAlg

std::unique_ptr<XrdCephBuffer::IXrdCephBufferAlg> XrdCephOssBufferedFile::m_bufferAlg
protected

Definition at line 74 of file XrdCephOssBufferedFile.hh.

Referenced by Close(), Write(), and Write().

◆ m_bufferIOmode

std::string XrdCephOssBufferedFile::m_bufferIOmode
protected

Definition at line 85 of file XrdCephOssBufferedFile.hh.

Referenced by XrdCephOssBufferedFile(), and createBuffer().

◆ m_bufferReadAlgs

std::map<size_t, std::unique_ptr<XrdCephBuffer::IXrdCephBufferAlg> > XrdCephOssBufferedFile::m_bufferReadAlgs
protected

Definition at line 75 of file XrdCephOssBufferedFile.hh.

Referenced by Close(), createBuffer(), Read(), and Read().

◆ m_bufsize

size_t XrdCephOssBufferedFile::m_bufsize = 16*1024*1024L
protected

Definition at line 84 of file XrdCephOssBufferedFile.hh.

Referenced by XrdCephOssBufferedFile(), and createBuffer().

◆ m_bytesRead

std::atomic<size_t> XrdCephOssBufferedFile::m_bytesRead = {0}
protected

Definition at line 88 of file XrdCephOssBufferedFile.hh.

88{0};

Referenced by Close(), and Read().

◆ m_bytesReadAIO

std::atomic<size_t> XrdCephOssBufferedFile::m_bytesReadAIO = {0}
protected

number of bytes read or written

Definition at line 90 of file XrdCephOssBufferedFile.hh.

90{0};

Referenced by Close(), and Read().

◆ m_bytesReadV

std::atomic<size_t> XrdCephOssBufferedFile::m_bytesReadV = {0}
protected

number of bytes read or written

Definition at line 89 of file XrdCephOssBufferedFile.hh.

89{0};

Referenced by Close(), and ReadV().

◆ m_bytesWrite

std::atomic<size_t> XrdCephOssBufferedFile::m_bytesWrite = {0}
protected

number of bytes read or written

Definition at line 91 of file XrdCephOssBufferedFile.hh.

91{0};

Referenced by Close(), and Write().

◆ m_bytesWriteAIO

std::atomic<size_t> XrdCephOssBufferedFile::m_bytesWriteAIO = {0}
protected

number of bytes read or written

Definition at line 92 of file XrdCephOssBufferedFile.hh.

92{0};

Referenced by Close(), and Write().

◆ m_cephoss

XrdCephOss* XrdCephOssBufferedFile::m_cephoss = nullptr
protected

create a new instance of the buffer

Definition at line 72 of file XrdCephOssBufferedFile.hh.

Referenced by XrdCephOssBufferedFile(), and createBuffer().

◆ m_flags

int XrdCephOssBufferedFile::m_flags = 0
protected

number of ms to sleep if a retry is requested

Definition at line 83 of file XrdCephOssBufferedFile.hh.

Referenced by Close(), and Open().

◆ m_maxBufferRetries

int XrdCephOssBufferedFile::m_maxBufferRetries {5}
protected

set the maximum of buffers to open on a single instance (e.g. for simultaneous file reads)

Definition at line 80 of file XrdCephOssBufferedFile.hh.

80{5};

Referenced by Read(), and Write().

◆ m_maxBufferRetrySleepTime_ms

int XrdCephOssBufferedFile::m_maxBufferRetrySleepTime_ms
protected

How many times to retry a ready from a buffer with EBUSY errors.

Definition at line 81 of file XrdCephOssBufferedFile.hh.

Referenced by XrdCephOssBufferedFile(), Read(), and Write().

◆ m_maxCountReadBuffers

size_t XrdCephOssBufferedFile::m_maxCountReadBuffers {10}
protected

any data access method on the buffer will use this

Definition at line 77 of file XrdCephOssBufferedFile.hh.

77{10};

Referenced by XrdCephOssBufferedFile(), and createBuffer().

◆ m_path

std::string XrdCephOssBufferedFile::m_path
protected

Definition at line 86 of file XrdCephOssBufferedFile.hh.

Referenced by Close(), and Open().

◆ m_timestart

std::chrono::time_point<std::chrono::system_clock> XrdCephOssBufferedFile::m_timestart
protected

Definition at line 87 of file XrdCephOssBufferedFile.hh.

Referenced by Close(), and Open().

◆ m_xrdOssDF

XrdCephOssFile* XrdCephOssBufferedFile::m_xrdOssDF = nullptr
protected

The documentation for this class was generated from the following files: