VirtualBox

source: vbox/trunk/src/VBox/ValidationKit/testdriver/txsclient.py

Last change on this file was 106061, checked in by vboxsync, 3 months ago

Copyright year updates by scm.

  • Property svn:eol-style set to native
  • Property svn:keywords set to Author Date Id Revision
File size: 90.9 KB
Line 
1# -*- coding: utf-8 -*-
2# $Id: txsclient.py 106061 2024-09-16 14:03:52Z vboxsync $
3# pylint: disable=too-many-lines
4
5"""
6Test eXecution Service Client.
7"""
8__copyright__ = \
9"""
10Copyright (C) 2010-2024 Oracle and/or its affiliates.
11
12This file is part of VirtualBox base platform packages, as
13available from https://www.virtualbox.org.
14
15This program is free software; you can redistribute it and/or
16modify it under the terms of the GNU General Public License
17as published by the Free Software Foundation, in version 3 of the
18License.
19
20This program is distributed in the hope that it will be useful, but
21WITHOUT ANY WARRANTY; without even the implied warranty of
22MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
23General Public License for more details.
24
25You should have received a copy of the GNU General Public License
26along with this program; if not, see <https://www.gnu.org/licenses>.
27
28The contents of this file may alternatively be used under the terms
29of the Common Development and Distribution License Version 1.0
30(CDDL), a copy of it is provided in the "COPYING.CDDL" file included
31in the VirtualBox distribution, in which case the provisions of the
32CDDL are applicable instead of those of the GPL.
33
34You may elect to license modified versions of this file under the
35terms and conditions of either the GPL or the CDDL or both.
36
37SPDX-License-Identifier: GPL-3.0-only OR CDDL-1.0
38"""
39__version__ = "$Revision: 106061 $"
40
41# Standard Python imports.
42import array;
43import errno;
44import os;
45import select;
46import socket;
47import sys;
48import threading;
49import time;
50import zlib;
51import uuid;
52
53# Validation Kit imports.
54from common import utils;
55from testdriver import base;
56from testdriver import reporter;
57from testdriver.base import TdTaskBase;
58
59# Python 3 hacks:
60if sys.version_info[0] >= 3:
61 long = int; # pylint: disable=redefined-builtin,invalid-name
62
63#
64# Helpers for decoding data received from the TXS.
65# These are used both the Session and Transport classes.
66#
67
68def getU32(abData, off):
69 """Get a U32 field."""
70 return abData[off] \
71 + abData[off + 1] * 256 \
72 + abData[off + 2] * 65536 \
73 + abData[off + 3] * 16777216;
74
75def getSZ(abData, off, sDefault = None):
76 """
77 Get a zero-terminated string field.
78 Returns sDefault if the string is invalid.
79 """
80 cchStr = getSZLen(abData, off);
81 if cchStr >= 0:
82 abStr = abData[off:(off + cchStr)];
83 try:
84 if sys.version_info < (3, 9, 0):
85 # Removed since Python 3.9.
86 sStr = abStr.tostring(); # pylint: disable=no-member
87 else:
88 sStr = abStr.tobytes();
89 return sStr.decode('utf_8');
90 except:
91 reporter.errorXcpt('getSZ(,%u)' % (off));
92 return sDefault;
93
94def getSZLen(abData, off):
95 """
96 Get the length of a zero-terminated string field, in bytes.
97 Returns -1 if off is beyond the data packet or not properly terminated.
98 """
99 cbData = len(abData);
100 if off >= cbData:
101 return -1;
102
103 offCur = off;
104 while abData[offCur] != 0:
105 offCur = offCur + 1;
106 if offCur >= cbData:
107 return -1;
108
109 return offCur - off;
110
111def isValidOpcodeEncoding(sOpcode):
112 """
113 Checks if the specified opcode is valid or not.
114 Returns True on success.
115 Returns False if it is invalid, details in the log.
116 """
117 sSet1 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
118 sSet2 = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_ ";
119 if len(sOpcode) != 8:
120 reporter.error("invalid opcode length: %s" % (len(sOpcode)));
121 return False;
122 for i in range(0, 1):
123 if sSet1.find(sOpcode[i]) < 0:
124 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
125 return False;
126 for i in range(2, 7):
127 if sSet2.find(sOpcode[i]) < 0:
128 reporter.error("invalid opcode char #%u: %s" % (i, sOpcode));
129 return False;
130 return True;
131
132#
133# Helper for encoding data sent to the TXS.
134#
135
136def u32ToByteArray(u32):
137 """Encodes the u32 value as a little endian byte (B) array."""
138 return array.array('B',
139 ( u32 % 256,
140 (u32 // 256) % 256,
141 (u32 // 65536) % 256,
142 (u32 // 16777216) % 256) );
143
144def escapeString(sString):
145 """
146 Does $ escaping of the string so TXS doesn't try do variable expansion.
147 """
148 return sString.replace('$', '$$');
149
150
151
152class TransportBase(object):
153 """
154 Base class for the transport layer.
155 """
156
157 def __init__(self, sCaller):
158 self.sDbgCreated = '%s: %s' % (utils.getTimePrefix(), sCaller);
159 self.fDummy = 0;
160 self.abReadAheadHdr = array.array('B');
161
162 def toString(self):
163 """
164 Stringify the instance for logging and debugging.
165 """
166 return '<%s: abReadAheadHdr=%s, sDbgCreated=%s>' % (type(self).__name__, self.abReadAheadHdr, self.sDbgCreated);
167
168 def __str__(self):
169 return self.toString();
170
171 def cancelConnect(self):
172 """
173 Cancels any pending connect() call.
174 Returns None;
175 """
176 return None;
177
178 def connect(self, cMsTimeout):
179 """
180 Quietly attempts to connect to the TXS.
181
182 Returns True on success.
183 Returns False on retryable errors (no logging).
184 Returns None on fatal errors with details in the log.
185
186 Override this method, don't call super.
187 """
188 _ = cMsTimeout;
189 return False;
190
191 def disconnect(self, fQuiet = False):
192 """
193 Disconnect from the TXS.
194
195 Returns True.
196
197 Override this method, don't call super.
198 """
199 _ = fQuiet;
200 return True;
201
202 def sendBytes(self, abBuf, cMsTimeout):
203 """
204 Sends the bytes in the buffer abBuf to the TXS.
205
206 Returns True on success.
207 Returns False on failure and error details in the log.
208
209 Override this method, don't call super.
210
211 Remarks: len(abBuf) is always a multiple of 16.
212 """
213 _ = abBuf; _ = cMsTimeout;
214 return False;
215
216 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
217 """
218 Receive cb number of bytes from the TXS.
219
220 Returns the bytes (array('B')) on success.
221 Returns None on failure and error details in the log.
222
223 Override this method, don't call super.
224
225 Remarks: cb is always a multiple of 16.
226 """
227 _ = cb; _ = cMsTimeout; _ = fNoDataOk;
228 return None;
229
230 def isConnectionOk(self):
231 """
232 Checks if the connection is OK.
233
234 Returns True if it is.
235 Returns False if it isn't (caller should call diconnect).
236
237 Override this method, don't call super.
238 """
239 return True;
240
241 def isRecvPending(self, cMsTimeout = 0):
242 """
243 Checks if there is incoming bytes, optionally waiting cMsTimeout
244 milliseconds for something to arrive.
245
246 Returns True if there is, False if there isn't.
247
248 Override this method, don't call super.
249 """
250 _ = cMsTimeout;
251 return False;
252
253 def sendMsgInt(self, sOpcode, cMsTimeout, abPayload = array.array('B')):
254 """
255 Sends a message (opcode + encoded payload).
256
257 Returns True on success.
258 Returns False on failure and error details in the log.
259 """
260 # Fix + check the opcode.
261 if len(sOpcode) < 2:
262 reporter.fatal('sendMsgInt: invalid opcode length: %d (\"%s\")' % (len(sOpcode), sOpcode));
263 return False;
264 sOpcode = sOpcode.ljust(8);
265 if not isValidOpcodeEncoding(sOpcode):
266 reporter.fatal('sendMsgInt: invalid opcode encoding: \"%s\"' % (sOpcode));
267 return False;
268
269 # Start construct the message.
270 cbMsg = 16 + len(abPayload);
271 abMsg = array.array('B');
272 abMsg.extend(u32ToByteArray(cbMsg));
273 abMsg.extend((0, 0, 0, 0)); # uCrc32
274 try:
275 abMsg.extend(array.array('B', \
276 ( ord(sOpcode[0]), \
277 ord(sOpcode[1]), \
278 ord(sOpcode[2]), \
279 ord(sOpcode[3]), \
280 ord(sOpcode[4]), \
281 ord(sOpcode[5]), \
282 ord(sOpcode[6]), \
283 ord(sOpcode[7]) ) ) );
284 if abPayload:
285 abMsg.extend(abPayload);
286 except:
287 reporter.fatalXcpt('sendMsgInt: packing problem...');
288 return False;
289
290 # checksum it, padd it and send it off.
291 uCrc32 = zlib.crc32(abMsg[8:]);
292 abMsg[4:8] = u32ToByteArray(uCrc32);
293
294 while len(abMsg) % 16:
295 abMsg.append(0);
296
297 reporter.log2('sendMsgInt: op=%s len=%d timeout=%d' % (sOpcode, len(abMsg), cMsTimeout));
298 return self.sendBytes(abMsg, cMsTimeout);
299
300 def recvMsg(self, cMsTimeout, fNoDataOk = False):
301 """
302 Receives a message from the TXS.
303
304 Returns the message three-tuple: length, opcode, payload.
305 Returns (None, None, None) on failure and error details in the log.
306 """
307
308 # Read the header.
309 if self.abReadAheadHdr:
310 assert(len(self.abReadAheadHdr) == 16);
311 abHdr = self.abReadAheadHdr;
312 self.abReadAheadHdr = array.array('B');
313 else:
314 abHdr = self.recvBytes(16, cMsTimeout, fNoDataOk); # (virtual method) # pylint: disable=assignment-from-none
315 if abHdr is None:
316 return (None, None, None);
317 if len(abHdr) != 16:
318 reporter.fatal('recvBytes(16) returns %d bytes!' % (len(abHdr)));
319 return (None, None, None);
320
321 # Unpack and validate the header.
322 cbMsg = getU32(abHdr, 0);
323 uCrc32 = getU32(abHdr, 4);
324
325 if sys.version_info < (3, 9, 0):
326 # Removed since Python 3.9.
327 sOpcode = abHdr[8:16].tostring(); # pylint: disable=no-member
328 else:
329 sOpcode = abHdr[8:16].tobytes();
330 sOpcode = sOpcode.decode('ascii');
331
332 if cbMsg < 16:
333 reporter.fatal('recvMsg: message length is out of range: %s (min 16 bytes)' % (cbMsg));
334 return (None, None, None);
335 if cbMsg > 1024*1024:
336 reporter.fatal('recvMsg: message length is out of range: %s (max 1MB)' % (cbMsg));
337 return (None, None, None);
338 if not isValidOpcodeEncoding(sOpcode):
339 reporter.fatal('recvMsg: invalid opcode \"%s\"' % (sOpcode));
340 return (None, None, None);
341
342 # Get the payload (if any), dropping the padding.
343 abPayload = array.array('B');
344 if cbMsg > 16:
345 if cbMsg % 16:
346 cbPadding = 16 - (cbMsg % 16);
347 else:
348 cbPadding = 0;
349 abPayload = self.recvBytes(cbMsg - 16 + cbPadding, cMsTimeout, False); # pylint: disable=assignment-from-none
350 if abPayload is None:
351 self.abReadAheadHdr = abHdr;
352 if not fNoDataOk :
353 reporter.log('recvMsg: failed to recv payload bytes!');
354 return (None, None, None);
355
356 while cbPadding > 0:
357 abPayload.pop();
358 cbPadding = cbPadding - 1;
359
360 # Check the CRC-32.
361 if uCrc32 != 0:
362 uActualCrc32 = zlib.crc32(abHdr[8:]);
363 if cbMsg > 16:
364 uActualCrc32 = zlib.crc32(abPayload, uActualCrc32);
365 uActualCrc32 = uActualCrc32 & 0xffffffff;
366 if uCrc32 != uActualCrc32:
367 reporter.fatal('recvMsg: crc error: expected %s, got %s' % (hex(uCrc32), hex(uActualCrc32)));
368 return (None, None, None);
369
370 reporter.log2('recvMsg: op=%s len=%d' % (sOpcode, len(abPayload)));
371 return (cbMsg, sOpcode, abPayload);
372
373 def sendMsg(self, sOpcode, cMsTimeout, aoPayload = ()):
374 """
375 Sends a message (opcode + payload tuple).
376
377 Returns True on success.
378 Returns False on failure and error details in the log.
379 Returns None if you pass the incorrectly typed parameters.
380 """
381 # Encode the payload.
382 abPayload = array.array('B');
383 for o in aoPayload:
384 try:
385 if utils.isString(o):
386 if sys.version_info[0] >= 3:
387 abPayload.extend(o.encode('utf_8'));
388 else:
389 # the primitive approach...
390 sUtf8 = o.encode('utf_8');
391 for ch in sUtf8:
392 abPayload.append(ord(ch))
393 abPayload.append(0);
394 elif isinstance(o, (long, int)):
395 if o < 0 or o > 0xffffffff:
396 reporter.fatal('sendMsg: uint32_t payload is out of range: %s' % (hex(o)));
397 return None;
398 abPayload.extend(u32ToByteArray(o));
399 elif isinstance(o, array.array):
400 abPayload.extend(o);
401 else:
402 reporter.fatal('sendMsg: unexpected payload type: %s (%s) (aoPayload=%s)' % (type(o), o, aoPayload));
403 return None;
404 except:
405 reporter.fatalXcpt('sendMsg: screwed up the encoding code...');
406 return None;
407 return self.sendMsgInt(sOpcode, cMsTimeout, abPayload);
408
409
410class Session(TdTaskBase):
411 """
412 A Test eXecution Service (TXS) client session.
413 """
414
415 def __init__(self, oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = False, fnProcessEvents = None):
416 """
417 Construct a TXS session.
418
419 This starts by connecting to the TXS and will enter the signalled state
420 when connected or the timeout has been reached.
421 """
422 TdTaskBase.__init__(self, utils.getCallerName(), fnProcessEvents);
423 self.oTransport = oTransport;
424 self.sStatus = "";
425 self.cMsTimeout = 0;
426 self.fErr = True; # Whether to report errors as error.
427 self.msStart = 0;
428 self.oThread = None;
429 self.fnTask = self.taskDummy;
430 self.aTaskArgs = None;
431 self.oTaskRc = None;
432 self.t3oReply = (None, None, None);
433 self.fScrewedUpMsgState = False;
434 self.fTryConnect = fTryConnect;
435
436 if not self.startTask(cMsTimeout, False, "connecting", self.taskConnect, (cMsIdleFudge,)):
437 raise base.GenError("startTask failed");
438
439 def __del__(self):
440 """Make sure to cancel the task when deleted."""
441 self.cancelTask();
442
443 def toString(self):
444 return '<%s fnTask=%s, aTaskArgs=%s, sStatus=%s, oTaskRc=%s, cMsTimeout=%s,' \
445 ' msStart=%s, fTryConnect=%s, fErr=%s, fScrewedUpMsgState=%s, t3oReply=%s oTransport=%s, oThread=%s>' \
446 % (TdTaskBase.toString(self), self.fnTask, self.aTaskArgs, self.sStatus, self.oTaskRc, self.cMsTimeout,
447 self.msStart, self.fTryConnect, self.fErr, self.fScrewedUpMsgState, self.t3oReply, self.oTransport, self.oThread);
448
449 def taskDummy(self):
450 """Place holder to catch broken state handling."""
451 raise Exception();
452
453 def startTask(self, cMsTimeout, fIgnoreErrors, sStatus, fnTask, aArgs = ()):
454 """
455 Kicks of a new task.
456
457 cMsTimeout: The task timeout in milliseconds. Values less than
458 500 ms will be adjusted to 500 ms. This means it is
459 OK to use negative value.
460 sStatus: The task status.
461 fnTask: The method that'll execute the task.
462 aArgs: Arguments to pass to fnTask.
463
464 Returns True on success, False + error in log on failure.
465 """
466 if not self.cancelTask():
467 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: failed to cancel previous task.');
468 return False;
469
470 # Change status and make sure we're the
471 self.lockTask();
472 if self.sStatus != "":
473 self.unlockTask();
474 reporter.maybeErr(not fIgnoreErrors, 'txsclient.Session.startTask: race.');
475 return False;
476 self.sStatus = "setup";
477 self.oTaskRc = None;
478 self.t3oReply = (None, None, None);
479 self.resetTaskLocked();
480 self.unlockTask();
481
482 self.cMsTimeout = max(cMsTimeout, 500);
483 self.fErr = not fIgnoreErrors;
484 self.fnTask = fnTask;
485 self.aTaskArgs = aArgs;
486 self.oThread = threading.Thread(target=self.taskThread, args=(), name='TXS-%s' % (sStatus));
487 self.oThread.setDaemon(True); # pylint: disable=deprecated-method
488 self.msStart = base.timestampMilli();
489
490 self.lockTask();
491 self.sStatus = sStatus;
492 self.unlockTask();
493 self.oThread.start();
494
495 return True;
496
497 def cancelTask(self, fSync = True):
498 """
499 Attempts to cancel any pending tasks.
500 Returns success indicator (True/False).
501 """
502 self.lockTask();
503
504 if self.sStatus == "":
505 self.unlockTask();
506 return True;
507 if self.sStatus == "setup":
508 self.unlockTask();
509 return False;
510 if self.sStatus == "cancelled":
511 self.unlockTask();
512 return False;
513
514 reporter.log('txsclient: cancelling "%s"...' % (self.sStatus));
515 if self.sStatus == 'connecting':
516 self.oTransport.cancelConnect();
517
518 self.sStatus = "cancelled";
519 oThread = self.oThread;
520 self.unlockTask();
521
522 if not fSync:
523 return False;
524
525 oThread.join(61.0);
526
527 if sys.version_info < (3, 9, 0):
528 # Removed since Python 3.9.
529 return oThread.isAlive(); # pylint: disable=no-member,deprecated-method
530 return oThread.is_alive();
531
532 def taskThread(self):
533 """
534 The task thread function.
535 This does some housekeeping activities around the real task method call.
536 """
537 if not self.isCancelled():
538 try:
539 fnTask = self.fnTask;
540 oTaskRc = fnTask(*self.aTaskArgs);
541 except:
542 reporter.fatalXcpt('taskThread', 15);
543 oTaskRc = None;
544 else:
545 reporter.log('taskThread: cancelled already');
546
547 self.lockTask();
548
549 reporter.log('taskThread: signalling task with status "%s", oTaskRc=%s' % (self.sStatus, oTaskRc));
550 self.oTaskRc = oTaskRc;
551 self.oThread = None;
552 self.sStatus = '';
553 self.signalTaskLocked();
554
555 self.unlockTask();
556 return None;
557
558 def isCancelled(self):
559 """Internal method for checking if the task has been cancelled."""
560 self.lockTask();
561 sStatus = self.sStatus;
562 self.unlockTask();
563 if sStatus == "cancelled":
564 return True;
565 return False;
566
567 def hasTimedOut(self):
568 """Internal method for checking if the task has timed out or not."""
569 cMsLeft = self.getMsLeft();
570 if cMsLeft <= 0:
571 return True;
572 return False;
573
574 def getMsLeft(self, cMsMin = 0, cMsMax = -1):
575 """Gets the time left until the timeout."""
576 cMsElapsed = base.timestampMilli() - self.msStart;
577 if cMsElapsed < 0:
578 return cMsMin;
579 cMsLeft = self.cMsTimeout - cMsElapsed;
580 if cMsLeft <= cMsMin:
581 return cMsMin;
582 if cMsLeft > cMsMax > 0:
583 return cMsMax
584 return cMsLeft;
585
586 def recvReply(self, cMsTimeout = None, fNoDataOk = False):
587 """
588 Wrapper for TransportBase.recvMsg that stashes the response away
589 so the client can inspect it later on.
590 """
591 if cMsTimeout is None:
592 cMsTimeout = self.getMsLeft(500);
593 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(cMsTimeout, fNoDataOk);
594 self.lockTask();
595 self.t3oReply = (cbMsg, sOpcode, abPayload);
596 self.unlockTask();
597 return (cbMsg, sOpcode, abPayload);
598
599 def recvAck(self, fNoDataOk = False):
600 """
601 Receives an ACK or error response from the TXS.
602
603 Returns True on success.
604 Returns False on timeout or transport error.
605 Returns (sOpcode, sDetails) tuple on failure. The opcode is stripped
606 and there are always details of some sort or another.
607 """
608 cbMsg, sOpcode, abPayload = self.recvReply(None, fNoDataOk);
609 if cbMsg is None:
610 return False;
611 sOpcode = sOpcode.strip()
612 if sOpcode == "ACK":
613 return True;
614 return (sOpcode, getSZ(abPayload, 0, sOpcode));
615
616 def recvAckLogged(self, sCommand, fNoDataOk = False):
617 """
618 Wrapper for recvAck and logging.
619 Returns True on success (ACK).
620 Returns False on time, transport error and errors signalled by TXS.
621 """
622 rc = self.recvAck(fNoDataOk);
623 if rc is not True and not fNoDataOk:
624 if rc is False:
625 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
626 else:
627 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, rc[0], rc[1]));
628 rc = False;
629 return rc;
630
631 def recvTrueFalse(self, sCommand):
632 """
633 Receives a TRUE/FALSE response from the TXS.
634 Returns True on TRUE, False on FALSE and None on error/other (logged).
635 """
636 cbMsg, sOpcode, abPayload = self.recvReply();
637 if cbMsg is None:
638 reporter.maybeErr(self.fErr, 'recvAckLogged: %s transport error' % (sCommand));
639 return None;
640
641 sOpcode = sOpcode.strip()
642 if sOpcode == "TRUE":
643 return True;
644 if sOpcode == "FALSE":
645 return False;
646 reporter.maybeErr(self.fErr, 'recvAckLogged: %s response was %s: %s' % (sCommand, sOpcode, getSZ(abPayload, 0, sOpcode)));
647 return None;
648
649 def sendMsg(self, sOpcode, aoPayload = (), cMsTimeout = None):
650 """
651 Wrapper for TransportBase.sendMsg that inserts the correct timeout.
652 """
653 if cMsTimeout is None:
654 cMsTimeout = self.getMsLeft(500);
655 return self.oTransport.sendMsg(sOpcode, cMsTimeout, aoPayload);
656
657 def asyncToSync(self, fnAsync, *aArgs):
658 """
659 Wraps an asynchronous task into a synchronous operation.
660
661 Returns False on failure, task return status on success.
662 """
663 rc = fnAsync(*aArgs);
664 if rc is False:
665 reporter.log2('asyncToSync(%s): returns False (#1)' % (fnAsync));
666 return rc;
667
668 rc = self.waitForTask(self.cMsTimeout + 5000);
669 if rc is False:
670 reporter.maybeErr(self.fErr, 'asyncToSync: waitForTask (timeout %d) failed...' % (self.cMsTimeout,));
671 self.cancelTask();
672 #reporter.log2('asyncToSync(%s): returns False (#2)' % (fnAsync, rc));
673 return False;
674
675 rc = self.getResult();
676 #reporter.log2('asyncToSync(%s): returns %s' % (fnAsync, rc));
677 return rc;
678
679 #
680 # Connection tasks.
681 #
682
683 def taskConnect(self, cMsIdleFudge):
684 """Tries to connect to the TXS"""
685 while not self.isCancelled():
686 reporter.log2('taskConnect: connecting ...');
687 rc = self.oTransport.connect(self.getMsLeft(500));
688 if rc is True:
689 reporter.log('taskConnect: succeeded');
690 return self.taskGreet(cMsIdleFudge);
691 if rc is None:
692 reporter.log2('taskConnect: unable to connect');
693 return None;
694 if self.hasTimedOut():
695 reporter.log2('taskConnect: timed out');
696 if not self.fTryConnect:
697 reporter.maybeErr(self.fErr, 'taskConnect: timed out');
698 return False;
699 time.sleep(self.getMsLeft(1, 1000) / 1000.0);
700 if not self.fTryConnect:
701 reporter.maybeErr(self.fErr, 'taskConnect: cancelled');
702 return False;
703
704 def taskGreet(self, cMsIdleFudge):
705 """Greets the TXS"""
706 rc = self.sendMsg("HOWDY", ());
707 if rc is True:
708 rc = self.recvAckLogged("HOWDY", self.fTryConnect);
709 if rc is True:
710 while cMsIdleFudge > 0:
711 cMsIdleFudge -= 1000;
712 time.sleep(1);
713 else:
714 self.oTransport.disconnect(self.fTryConnect);
715 return rc;
716
717 def taskBye(self):
718 """Says goodbye to the TXS"""
719 rc = self.sendMsg("BYE");
720 if rc is True:
721 rc = self.recvAckLogged("BYE");
722 self.oTransport.disconnect();
723 return rc;
724
725 def taskVer(self):
726 """Requests version information from TXS"""
727 rc = self.sendMsg("VER");
728 if rc is True:
729 rc = False;
730 cbMsg, sOpcode, abPayload = self.recvReply();
731 if cbMsg is not None:
732 sOpcode = sOpcode.strip();
733 if sOpcode == "ACK VER":
734 sVer = getSZ(abPayload, 0);
735 if sVer is not None:
736 rc = sVer;
737 elif sOpcode == "UNKNOWN":
738 reporter.log(self.fErr, 'taskVer got a UNKNOWN, txs server is probably too old, just ignore')
739 else:
740 reporter.maybeErr(self.fErr, 'taskVer got a bad reply: %s' % (sOpcode,));
741 else:
742 reporter.maybeErr(self.fErr, 'taskVer got 3xNone from recvReply.');
743 return rc;
744
745 def taskUuid(self):
746 """Gets the TXS UUID"""
747 rc = self.sendMsg("UUID");
748 if rc is True:
749 rc = False;
750 cbMsg, sOpcode, abPayload = self.recvReply();
751 if cbMsg is not None:
752 sOpcode = sOpcode.strip()
753 if sOpcode == "ACK UUID":
754 sUuid = getSZ(abPayload, 0);
755 if sUuid is not None:
756 sUuid = '{%s}' % (sUuid,)
757 try:
758 _ = uuid.UUID(sUuid);
759 rc = sUuid;
760 except:
761 reporter.errorXcpt('taskUuid got an invalid UUID string %s' % (sUuid,));
762 else:
763 reporter.maybeErr(self.fErr, 'taskUuid did not get a UUID string.');
764 else:
765 reporter.maybeErr(self.fErr, 'taskUuid got a bad reply: %s' % (sOpcode,));
766 else:
767 reporter.maybeErr(self.fErr, 'taskUuid got 3xNone from recvReply.');
768 return rc;
769
770 #
771 # Process task
772 # pylint: disable=missing-docstring
773 #
774
775 def taskExecEx(self, sExecName, fFlags, asArgs, asAddEnv, oStdIn, oStdOut, oStdErr, oTestPipe, sAsUser): # pylint: disable=too-many-arguments,too-many-locals,too-many-statements,line-too-long
776 # Construct the payload.
777 aoPayload = [long(fFlags), '%s' % (sExecName), long(len(asArgs))];
778 for sArg in asArgs:
779 aoPayload.append('%s' % (sArg));
780 aoPayload.append(long(len(asAddEnv)));
781 for sPutEnv in asAddEnv:
782 aoPayload.append('%s' % (sPutEnv));
783 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
784 if utils.isString(o):
785 aoPayload.append(o);
786 elif o is not None:
787 aoPayload.append('|');
788 o.uTxsClientCrc32 = zlib.crc32(b'');
789 else:
790 aoPayload.append('');
791 aoPayload.append('%s' % (sAsUser));
792 aoPayload.append(long(self.cMsTimeout));
793
794 # Kick of the EXEC command.
795 rc = self.sendMsg('EXEC', aoPayload)
796 if rc is True:
797 rc = self.recvAckLogged('EXEC');
798 if rc is True:
799 # Loop till the process completes, feed input to the TXS and
800 # receive output from it.
801 sFailure = "";
802 msPendingInputReply = None;
803 cbMsg, sOpcode, abPayload = (None, None, None);
804 while True:
805 # Pending input?
806 if msPendingInputReply is None \
807 and oStdIn is not None \
808 and not utils.isString(oStdIn):
809 try:
810 sInput = oStdIn.read(65536);
811 except:
812 reporter.errorXcpt('read standard in');
813 sFailure = 'exception reading stdin';
814 rc = None;
815 break;
816 if sInput:
817 # Convert to a byte array before handing it of to sendMsg or the string
818 # will get some zero termination added breaking the CRC (and injecting
819 # unwanted bytes).
820 abInput = array.array('B', sInput.encode('utf-8'));
821 oStdIn.uTxsClientCrc32 = zlib.crc32(abInput, oStdIn.uTxsClientCrc32);
822 rc = self.sendMsg('STDIN', (long(oStdIn.uTxsClientCrc32 & 0xffffffff), abInput));
823 if rc is not True:
824 sFailure = 'sendMsg failure';
825 break;
826 msPendingInputReply = base.timestampMilli();
827 continue;
828
829 rc = self.sendMsg('STDINEOS');
830 oStdIn = None;
831 if rc is not True:
832 sFailure = 'sendMsg failure';
833 break;
834 msPendingInputReply = base.timestampMilli();
835
836 # Wait for input (500 ms timeout).
837 if cbMsg is None:
838 cbMsg, sOpcode, abPayload = self.recvReply(cMsTimeout=500, fNoDataOk=True);
839 if cbMsg is None:
840 # Check for time out before restarting the loop.
841 # Note! Only doing timeout checking here does mean that
842 # the TXS may prevent us from timing out by
843 # flooding us with data. This is unlikely though.
844 if self.hasTimedOut() \
845 and ( msPendingInputReply is None \
846 or base.timestampMilli() - msPendingInputReply > 30000):
847 reporter.maybeErr(self.fErr, 'taskExecEx: timed out');
848 sFailure = 'timeout';
849 rc = None;
850 break;
851 # Check that the connection is OK.
852 if not self.oTransport.isConnectionOk():
853 self.oTransport.disconnect();
854 sFailure = 'disconnected';
855 rc = False;
856 break;
857 continue;
858
859 # Handle the response.
860 sOpcode = sOpcode.rstrip();
861 if sOpcode == 'STDOUT':
862 oOut = oStdOut;
863 elif sOpcode == 'STDERR':
864 oOut = oStdErr;
865 elif sOpcode == 'TESTPIPE':
866 oOut = oTestPipe;
867 else:
868 oOut = None;
869 if oOut is not None:
870 # Output from the process.
871 if len(abPayload) < 4:
872 sFailure = 'malformed output packet (%s, %u bytes)' % (sOpcode, cbMsg);
873 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
874 rc = None;
875 break;
876 uStreamCrc32 = getU32(abPayload, 0);
877 oOut.uTxsClientCrc32 = zlib.crc32(abPayload[4:], oOut.uTxsClientCrc32);
878 if uStreamCrc32 != (oOut.uTxsClientCrc32 & 0xffffffff):
879 sFailure = 'crc error - mine=%#x their=%#x (%s, %u bytes: %s)' \
880 % (oOut.uTxsClientCrc32 & 0xffffffff, uStreamCrc32, sOpcode, cbMsg,
881 ' '.join(['%02x' % (b,) for b in abPayload]),);
882 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
883 rc = None;
884 break;
885 try:
886 oOut.write(abPayload[4:]);
887 except:
888 sFailure = 'exception writing %s' % (sOpcode);
889 reporter.errorXcpt('taskExecEx: %s' % (sFailure));
890 rc = None;
891 break;
892 elif sOpcode == 'STDINIGN' and msPendingInputReply is not None:
893 # Standard input is ignored. Ignore this condition for now.
894 msPendingInputReply = None;
895 reporter.log('taskExecEx: Standard input is ignored... why?');
896 del oStdIn.uTxsClientCrc32;
897 oStdIn = '/dev/null';
898 elif sOpcode in ('STDINMEM', 'STDINBAD', 'STDINCRC',) and msPendingInputReply is not None:
899 # TXS STDIN error, abort.
900 # TODO: STDINMEM - consider undoing the previous stdin read and try resubmitt it.
901 msPendingInputReply = None;
902 sFailure = 'TXS is out of memory for std input buffering';
903 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
904 rc = None;
905 break;
906 elif sOpcode == 'ACK' and msPendingInputReply is not None:
907 msPendingInputReply = None;
908 elif sOpcode.startswith('PROC '):
909 # Process status message, handle it outside the loop.
910 rc = True;
911 break;
912 else:
913 sFailure = 'Unexpected opcode %s' % (sOpcode);
914 reporter.maybeErr(self.fErr, 'taskExecEx: %s' % (sFailure));
915 rc = None;
916 break;
917 # Clear the message.
918 cbMsg, sOpcode, abPayload = (None, None, None);
919
920 # If we sent an STDIN packet and didn't get a reply yet, we'll give
921 # TXS some 5 seconds to reply to this. If we don't wait here we'll
922 # get screwed later on if we mix it up with the reply to some other
923 # command. Hackish.
924 if msPendingInputReply is not None:
925 cbMsg2, sOpcode2, abPayload2 = self.oTransport.recvMsg(5000);
926 if cbMsg2 is not None:
927 reporter.log('taskExecEx: Out of order STDIN, got reply: %s, %s, %s [ignored]'
928 % (cbMsg2, sOpcode2, abPayload2));
929 msPendingInputReply = None;
930 else:
931 reporter.maybeErr(self.fErr, 'taskExecEx: Pending STDIN, no reply after 5 secs!');
932 self.fScrewedUpMsgState = True;
933
934 # Parse the exit status (True), abort (None) or do nothing (False).
935 if rc is True:
936 if sOpcode == 'PROC OK':
937 pass;
938 else:
939 rc = False;
940 # Do proper parsing some other day if needed:
941 # PROC TOK, PROC TOA, PROC DWN, PROC DOO,
942 # PROC NOK + rc, PROC SIG + sig, PROC ABD, FAILED.
943 if sOpcode == 'PROC DOO':
944 reporter.log('taskExecEx: PROC DOO[FUS]: %s' % (abPayload,));
945 elif sOpcode.startswith('PROC NOK'):
946 reporter.log('taskExecEx: PROC NOK: rcExit=%s' % (abPayload,));
947 elif abPayload and sOpcode.startswith('PROC '):
948 reporter.log('taskExecEx: %s payload=%s' % (sOpcode, abPayload,));
949
950 else:
951 if rc is None:
952 # Abort it.
953 reporter.log('taskExecEx: sending ABORT...');
954 rc = self.sendMsg('ABORT');
955 while rc is True:
956 cbMsg, sOpcode, abPayload = self.oTransport.recvMsg(30000);
957 if cbMsg is None:
958 reporter.maybeErr(self.fErr, 'taskExecEx: Pending ABORT, no reply after 30 secs!')
959 self.fScrewedUpMsgState = True;
960 break;
961 if sOpcode.startswith('PROC '):
962 reporter.log('taskExecEx: ABORT reply: %s, %s, %s [ignored]' % (cbMsg, sOpcode, abPayload));
963 break;
964 reporter.log('taskExecEx: ABORT in process, ignoring reply: %s, %s, %s' % (cbMsg, sOpcode, abPayload));
965 # Check that the connection is OK before looping.
966 if not self.oTransport.isConnectionOk():
967 self.oTransport.disconnect();
968 break;
969
970 # Fake response with the reason why we quit.
971 if sFailure is not None:
972 self.t3oReply = (0, 'EXECFAIL', sFailure);
973 rc = None;
974 else:
975 rc = None;
976
977 # Cleanup.
978 for o in (oStdIn, oStdOut, oStdErr, oTestPipe):
979 if o is not None and not utils.isString(o):
980 del o.uTxsClientCrc32; # pylint: disable=maybe-no-member
981 # Make sure all files are closed
982 o.close(); # pylint: disable=maybe-no-member
983 reporter.log('taskExecEx: returns %s' % (rc));
984 return rc;
985
986 #
987 # Admin tasks
988 #
989
990 def hlpRebootShutdownWaitForAck(self, sCmd):
991 """Wait for reboot/shutodwn ACK."""
992 rc = self.recvAckLogged(sCmd);
993 if rc is True:
994 # poll a little while for server to disconnect.
995 uMsStart = base.timestampMilli();
996 while self.oTransport.isConnectionOk() \
997 and base.timestampMilli() - uMsStart >= 5000:
998 if self.oTransport.isRecvPending(min(500, self.getMsLeft())):
999 break;
1000 self.oTransport.disconnect();
1001 return rc;
1002
1003 def taskReboot(self):
1004 rc = self.sendMsg('REBOOT');
1005 if rc is True:
1006 rc = self.hlpRebootShutdownWaitForAck('REBOOT');
1007 return rc;
1008
1009 def taskShutdown(self):
1010 rc = self.sendMsg('SHUTDOWN');
1011 if rc is True:
1012 rc = self.hlpRebootShutdownWaitForAck('SHUTDOWN');
1013 return rc;
1014
1015 #
1016 # CD/DVD control tasks.
1017 #
1018
1019 ## TODO
1020
1021 #
1022 # File system tasks
1023 #
1024
1025 def taskMkDir(self, sRemoteDir, fMode):
1026 rc = self.sendMsg('MKDIR', (fMode, sRemoteDir));
1027 if rc is True:
1028 rc = self.recvAckLogged('MKDIR');
1029 return rc;
1030
1031 def taskMkDirPath(self, sRemoteDir, fMode):
1032 rc = self.sendMsg('MKDRPATH', (fMode, sRemoteDir));
1033 if rc is True:
1034 rc = self.recvAckLogged('MKDRPATH');
1035 return rc;
1036
1037 def taskMkSymlink(self, sLinkTarget, sLink):
1038 rc = self.sendMsg('MKSYMLNK', (sLinkTarget, sLink));
1039 if rc is True:
1040 rc = self.recvAckLogged('MKSYMLNK');
1041 return rc;
1042
1043 def taskRmDir(self, sRemoteDir):
1044 rc = self.sendMsg('RMDIR', (sRemoteDir,));
1045 if rc is True:
1046 rc = self.recvAckLogged('RMDIR');
1047 return rc;
1048
1049 def taskRmFile(self, sRemoteFile):
1050 rc = self.sendMsg('RMFILE', (sRemoteFile,));
1051 if rc is True:
1052 rc = self.recvAckLogged('RMFILE');
1053 return rc;
1054
1055 def taskRmSymlink(self, sRemoteSymlink):
1056 rc = self.sendMsg('RMSYMLNK', (sRemoteSymlink,));
1057 if rc is True:
1058 rc = self.recvAckLogged('RMSYMLNK');
1059 return rc;
1060
1061 def taskRmTree(self, sRemoteTree):
1062 rc = self.sendMsg('RMTREE', (sRemoteTree,));
1063 if rc is True:
1064 rc = self.recvAckLogged('RMTREE');
1065 return rc;
1066
1067 def taskChMod(self, sRemotePath, fMode):
1068 rc = self.sendMsg('CHMOD', (int(fMode), sRemotePath,));
1069 if rc is True:
1070 rc = self.recvAckLogged('CHMOD');
1071 return rc;
1072
1073 def taskChOwn(self, sRemotePath, idUser, idGroup):
1074 rc = self.sendMsg('CHOWN', (int(idUser), int(idGroup), sRemotePath,));
1075 if rc is True:
1076 rc = self.recvAckLogged('CHOWN');
1077 return rc;
1078
1079 def taskIsDir(self, sRemoteDir):
1080 rc = self.sendMsg('ISDIR', (sRemoteDir,));
1081 if rc is True:
1082 rc = self.recvTrueFalse('ISDIR');
1083 return rc;
1084
1085 def taskIsFile(self, sRemoteFile):
1086 rc = self.sendMsg('ISFILE', (sRemoteFile,));
1087 if rc is True:
1088 rc = self.recvTrueFalse('ISFILE');
1089 return rc;
1090
1091 def taskIsSymlink(self, sRemoteSymlink):
1092 rc = self.sendMsg('ISSYMLNK', (sRemoteSymlink,));
1093 if rc is True:
1094 rc = self.recvTrueFalse('ISSYMLNK');
1095 return rc;
1096
1097 #def "STAT "
1098 #def "LSTAT "
1099 #def "LIST "
1100
1101 def taskCopyFile(self, sSrcFile, sDstFile, fMode, fFallbackOkay):
1102 """ Copies a file within the remote from source to destination. """
1103 _ = fFallbackOkay; # Not used yet.
1104 # Note: If fMode is set to 0, it's up to the target OS' implementation with
1105 # what a file mode the destination file gets created (i.e. via umask).
1106 rc = self.sendMsg('CPFILE', (int(fMode), sSrcFile, sDstFile,));
1107 if rc is True:
1108 rc = self.recvAckLogged('CPFILE');
1109 return rc;
1110
1111 def taskUploadFile(self, sLocalFile, sRemoteFile, fMode, fFallbackOkay):
1112 #
1113 # Open the local file (make sure it exist before bothering TXS) and
1114 # tell TXS that we want to upload a file.
1115 #
1116 try:
1117 oLocalFile = utils.openNoInherit(sLocalFile, 'rb');
1118 except:
1119 reporter.errorXcpt('taskUpload: failed to open "%s"' % (sLocalFile));
1120 return False;
1121
1122 # Common cause with taskUploadStr
1123 rc = self.taskUploadCommon(oLocalFile, sRemoteFile, fMode, fFallbackOkay);
1124
1125 # Cleanup.
1126 oLocalFile.close();
1127 return rc;
1128
1129 def taskUploadString(self, sContent, sRemoteFile, fMode, fFallbackOkay):
1130 # Wrap sContent in a file like class.
1131 class InStringFile(object): # pylint: disable=too-few-public-methods
1132 def __init__(self, sContent):
1133 self.sContent = sContent;
1134 self.off = 0;
1135
1136 def read(self, cbMax):
1137 cbLeft = len(self.sContent) - self.off;
1138 if cbLeft == 0:
1139 return "";
1140 if cbLeft <= cbMax:
1141 sRet = self.sContent[self.off:(self.off + cbLeft)];
1142 else:
1143 sRet = self.sContent[self.off:(self.off + cbMax)];
1144 self.off = self.off + len(sRet);
1145 return sRet;
1146
1147 oLocalString = InStringFile(sContent);
1148 return self.taskUploadCommon(oLocalString, sRemoteFile, fMode, fFallbackOkay);
1149
1150 def taskUploadCommon(self, oLocalFile, sRemoteFile, fMode, fFallbackOkay):
1151 """Common worker used by taskUploadFile and taskUploadString."""
1152 #
1153 # Command + ACK.
1154 #
1155 # Only used the new PUT2FILE command if we've got a non-zero mode mask.
1156 # Fall back on the old command if the new one is not known by the TXS.
1157 #
1158 if fMode == 0:
1159 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1160 if rc is True:
1161 rc = self.recvAckLogged('PUT FILE');
1162 else:
1163 rc = self.sendMsg('PUT2FILE', (fMode, sRemoteFile));
1164 if rc is True:
1165 rc = self.recvAck();
1166 if rc is False:
1167 reporter.maybeErr(self.fErr, 'recvAckLogged: PUT2FILE transport error');
1168 elif rc is not True:
1169 if rc[0] == 'UNKNOWN' and fFallbackOkay:
1170 # Fallback:
1171 rc = self.sendMsg('PUT FILE', (sRemoteFile,));
1172 if rc is True:
1173 rc = self.recvAckLogged('PUT FILE');
1174 else:
1175 reporter.maybeErr(self.fErr, 'recvAckLogged: PUT2FILE response was %s: %s' % (rc[0], rc[1],));
1176 rc = False;
1177 if rc is True:
1178 #
1179 # Push data packets until eof.
1180 #
1181 uMyCrc32 = zlib.crc32(b'');
1182 while True:
1183 # Read up to 64 KB of data.
1184 try:
1185 sRaw = oLocalFile.read(65536);
1186 except:
1187 rc = None;
1188 break;
1189
1190 # Convert to array - this is silly!
1191 abBuf = array.array('B');
1192 if utils.isString(sRaw):
1193 for ch in sRaw:
1194 abBuf.append(ord(ch));
1195 else:
1196 abBuf.extend(sRaw);
1197 sRaw = None;
1198
1199 # Update the file stream CRC and send it off.
1200 uMyCrc32 = zlib.crc32(abBuf, uMyCrc32);
1201 if not abBuf:
1202 rc = self.sendMsg('DATA EOF', (long(uMyCrc32 & 0xffffffff), ));
1203 else:
1204 rc = self.sendMsg('DATA ', (long(uMyCrc32 & 0xffffffff), abBuf));
1205 if rc is False:
1206 break;
1207
1208 # Wait for the reply.
1209 rc = self.recvAck();
1210 if rc is not True:
1211 if rc is False:
1212 reporter.maybeErr(self.fErr, 'taskUpload: transport error waiting for ACK');
1213 else:
1214 reporter.maybeErr(self.fErr, 'taskUpload: DATA response was %s: %s' % (rc[0], rc[1]));
1215 rc = False;
1216 break;
1217
1218 # EOF?
1219 if not abBuf:
1220 break;
1221
1222 # Send ABORT on ACK and I/O errors.
1223 if rc is None:
1224 rc = self.sendMsg('ABORT');
1225 if rc is True:
1226 self.recvAckLogged('ABORT');
1227 rc = False;
1228 return rc;
1229
1230 def taskDownloadFile(self, sRemoteFile, sLocalFile):
1231 try:
1232 oLocalFile = utils.openNoInherit(sLocalFile, 'wb');
1233 except:
1234 reporter.errorXcpt('taskDownload: failed to open "%s"' % (sLocalFile));
1235 return False;
1236
1237 rc = self.taskDownloadCommon(sRemoteFile, oLocalFile);
1238
1239 oLocalFile.close();
1240 if rc is False:
1241 try:
1242 os.remove(sLocalFile);
1243 except:
1244 reporter.errorXcpt();
1245 return rc;
1246
1247 def taskDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True):
1248 # Wrap sContent in a file like class.
1249 class OutStringFile(object): # pylint: disable=too-few-public-methods
1250 def __init__(self):
1251 self.asContent = [];
1252
1253 def write(self, sBuf):
1254 self.asContent.append(sBuf);
1255 return None;
1256
1257 oLocalString = OutStringFile();
1258 rc = self.taskDownloadCommon(sRemoteFile, oLocalString);
1259 if rc is True:
1260 rc = '';
1261 for sBuf in oLocalString.asContent:
1262 if hasattr(sBuf, 'decode'):
1263 rc += sBuf.decode(sEncoding, 'ignore' if fIgnoreEncodingErrors else 'strict');
1264 else:
1265 rc += sBuf;
1266 return rc;
1267
1268 def taskDownloadCommon(self, sRemoteFile, oLocalFile):
1269 """Common worker for taskDownloadFile and taskDownloadString."""
1270 rc = self.sendMsg('GET FILE', (sRemoteFile,))
1271 if rc is True:
1272 #
1273 # Process data packets until eof.
1274 #
1275 uMyCrc32 = zlib.crc32(b'');
1276 while rc is True:
1277 cbMsg, sOpcode, abPayload = self.recvReply();
1278 if cbMsg is None:
1279 reporter.maybeErr(self.fErr, 'taskDownload got 3xNone from recvReply.');
1280 rc = None;
1281 break;
1282
1283 # Validate.
1284 sOpcode = sOpcode.rstrip();
1285 if sOpcode not in ('DATA', 'DATA EOF',):
1286 reporter.maybeErr(self.fErr, 'taskDownload got a error reply: opcode="%s" details="%s"'
1287 % (sOpcode, getSZ(abPayload, 0, "None")));
1288 rc = False;
1289 break;
1290 if sOpcode == 'DATA' and len(abPayload) < 4:
1291 reporter.maybeErr(self.fErr, 'taskDownload got a bad DATA packet: len=%u' % (len(abPayload)));
1292 rc = None;
1293 break;
1294 if sOpcode == 'DATA EOF' and len(abPayload) != 4:
1295 reporter.maybeErr(self.fErr, 'taskDownload got a bad EOF packet: len=%u' % (len(abPayload)));
1296 rc = None;
1297 break;
1298
1299 # Check the CRC (common for both packets).
1300 uCrc32 = getU32(abPayload, 0);
1301 if sOpcode == 'DATA':
1302 uMyCrc32 = zlib.crc32(abPayload[4:], uMyCrc32);
1303 if uCrc32 != (uMyCrc32 & 0xffffffff):
1304 reporter.maybeErr(self.fErr, 'taskDownload got a bad CRC: mycrc=%s remotecrc=%s'
1305 % (hex(uMyCrc32), hex(uCrc32)));
1306 rc = None;
1307 break;
1308 if sOpcode == 'DATA EOF':
1309 rc = self.sendMsg('ACK');
1310 break;
1311
1312 # Finally, push the data to the file.
1313 try:
1314 if sys.version_info < (3, 9, 0):
1315 # Removed since Python 3.9.
1316 abData = abPayload[4:].tostring();
1317 else:
1318 abData = abPayload[4:].tobytes();
1319 oLocalFile.write(abData);
1320 except:
1321 reporter.errorXcpt('I/O error writing to "%s"' % (sRemoteFile));
1322 rc = None;
1323 break;
1324 rc = self.sendMsg('ACK');
1325
1326 # Send NACK on validation and I/O errors.
1327 if rc is None:
1328 rc = self.sendMsg('NACK');
1329 rc = False;
1330 return rc;
1331
1332 def taskPackFile(self, sRemoteFile, sRemoteSource):
1333 rc = self.sendMsg('PKFILE', (sRemoteFile, sRemoteSource));
1334 if rc is True:
1335 rc = self.recvAckLogged('PKFILE');
1336 return rc;
1337
1338 def taskUnpackFile(self, sRemoteFile, sRemoteDir):
1339 rc = self.sendMsg('UNPKFILE', (sRemoteFile, sRemoteDir));
1340 if rc is True:
1341 rc = self.recvAckLogged('UNPKFILE');
1342 return rc;
1343
1344 def taskExpandString(self, sString):
1345 rc = self.sendMsg('EXP STR ', (sString,));
1346 if rc is True:
1347 rc = False;
1348 cbMsg, sOpcode, abPayload = self.recvReply();
1349 if cbMsg is not None:
1350 sOpcode = sOpcode.strip();
1351 if sOpcode == "STRING":
1352 sStringExp = getSZ(abPayload, 0);
1353 if sStringExp is not None:
1354 rc = sStringExp;
1355 else: # Also handles SHORTSTR reply (not enough space to store result).
1356 reporter.maybeErr(self.fErr, 'taskExpandString got a bad reply: %s' % (sOpcode,));
1357 else:
1358 reporter.maybeErr(self.fErr, 'taskExpandString got 3xNone from recvReply.');
1359 return rc;
1360
1361 # pylint: enable=missing-docstring
1362
1363
1364 #
1365 # Public methods - generic task queries
1366 #
1367
1368 def isSuccess(self):
1369 """Returns True if the task completed successfully, otherwise False."""
1370 self.lockTask();
1371 sStatus = self.sStatus;
1372 oTaskRc = self.oTaskRc;
1373 self.unlockTask();
1374 if sStatus != "":
1375 return False;
1376 if oTaskRc is False or oTaskRc is None:
1377 return False;
1378 return True;
1379
1380 def getResult(self):
1381 """
1382 Returns the result of a completed task.
1383 Returns None if not completed yet or no previous task.
1384 """
1385 self.lockTask();
1386 sStatus = self.sStatus;
1387 oTaskRc = self.oTaskRc;
1388 self.unlockTask();
1389 if sStatus != "":
1390 return None;
1391 return oTaskRc;
1392
1393 def getLastReply(self):
1394 """
1395 Returns the last reply three-tuple: cbMsg, sOpcode, abPayload.
1396 Returns a None, None, None three-tuple if there was no last reply.
1397 """
1398 self.lockTask();
1399 t3oReply = self.t3oReply;
1400 self.unlockTask();
1401 return t3oReply;
1402
1403 #
1404 # Public methods - connection.
1405 #
1406
1407 def asyncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1408 """
1409 Initiates a disconnect task.
1410
1411 Returns True on success, False on failure (logged).
1412
1413 The task returns True on success and False on failure.
1414 """
1415 return self.startTask(cMsTimeout, fIgnoreErrors, "bye", self.taskBye);
1416
1417 def syncDisconnect(self, cMsTimeout = 30000, fIgnoreErrors = False):
1418 """Synchronous version."""
1419 return self.asyncToSync(self.asyncDisconnect, cMsTimeout, fIgnoreErrors);
1420
1421 def asyncVer(self, cMsTimeout = 30000, fIgnoreErrors = False):
1422 """
1423 Initiates a task for getting the TXS version information.
1424
1425 Returns True on success, False on failure (logged).
1426
1427 The task returns the version string on success and False on failure.
1428 """
1429 return self.startTask(cMsTimeout, fIgnoreErrors, "ver", self.taskVer);
1430
1431 def syncVer(self, cMsTimeout = 30000, fIgnoreErrors = False):
1432 """Synchronous version."""
1433 return self.asyncToSync(self.asyncVer, cMsTimeout, fIgnoreErrors);
1434
1435 def asyncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1436 """
1437 Initiates a task for getting the TXS UUID.
1438
1439 Returns True on success, False on failure (logged).
1440
1441 The task returns UUID string (in {}) on success and False on failure.
1442 """
1443 return self.startTask(cMsTimeout, fIgnoreErrors, "uuid", self.taskUuid);
1444
1445 def syncUuid(self, cMsTimeout = 30000, fIgnoreErrors = False):
1446 """Synchronous version."""
1447 return self.asyncToSync(self.asyncUuid, cMsTimeout, fIgnoreErrors);
1448
1449 #
1450 # Public methods - execution.
1451 #
1452
1453 def asyncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=too-many-arguments
1454 oStdIn = None, oStdOut = None, oStdErr = None, oTestPipe = None,
1455 sAsUser = "", cMsTimeout = 3600000, fIgnoreErrors = False):
1456 """
1457 Initiates a exec process task.
1458
1459 Returns True on success, False on failure (logged).
1460
1461 The task returns True if the process exited normally with status code 0.
1462 The task returns None if on failure prior to executing the process, and
1463 False if the process exited with a different status or in an abnormal
1464 manner. Both None and False are logged of course and further info can
1465 also be obtained by getLastReply().
1466
1467 The oStdIn, oStdOut, oStdErr and oTestPipe specifiy how to deal with
1468 these streams. If None, no special action is taken and the output goes
1469 to where ever the TXS sends its output, and ditto for input.
1470 - To send to / read from the bitbucket, pass '/dev/null'.
1471 - To redirect to/from a file, just specify the remote filename.
1472 - To append to a file use '>>' followed by the remote filename.
1473 - To pipe the stream to/from the TXS, specify a file like
1474 object. For StdIn a non-blocking read() method is required. For
1475 the other a write() method is required. Watch out for deadlock
1476 conditions between StdIn and StdOut/StdErr/TestPipe piping.
1477 """
1478 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1479 (sExecName, long(0), asArgs, asAddEnv, oStdIn,
1480 oStdOut, oStdErr, oTestPipe, sAsUser));
1481
1482 def syncExecEx(self, sExecName, asArgs = (), asAddEnv = (), # pylint: disable=too-many-arguments
1483 oStdIn = '/dev/null', oStdOut = '/dev/null',
1484 oStdErr = '/dev/null', oTestPipe = '/dev/null',
1485 sAsUser = '', cMsTimeout = 3600000, fIgnoreErrors = False):
1486 """Synchronous version."""
1487 return self.asyncToSync(self.asyncExecEx, sExecName, asArgs, asAddEnv, oStdIn, oStdOut, \
1488 oStdErr, oTestPipe, sAsUser, cMsTimeout, fIgnoreErrors);
1489
1490 def asyncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = "", fWithTestPipe = True, sPrefix = '', \
1491 cMsTimeout = 3600000, fIgnoreErrors = False):
1492 """
1493 Initiates a exec process test task.
1494
1495 Returns True on success, False on failure (logged).
1496
1497 The task returns True if the process exited normally with status code 0.
1498 The task returns None if on failure prior to executing the process, and
1499 False if the process exited with a different status or in an abnormal
1500 manner. Both None and False are logged of course and further info can
1501 also be obtained by getLastReply().
1502
1503 Standard in is taken from /dev/null. While both standard output and
1504 standard error goes directly to reporter.log(). The testpipe is piped
1505 to reporter.xxxx.
1506 """
1507
1508 sStdIn = '/dev/null';
1509 oStdOut = reporter.FileWrapper('%sstdout' % sPrefix);
1510 oStdErr = reporter.FileWrapper('%sstderr' % sPrefix);
1511 if fWithTestPipe: oTestPipe = reporter.FileWrapperTestPipe();
1512 else: oTestPipe = '/dev/null'; # pylint: disable=redefined-variable-type
1513
1514 return self.startTask(cMsTimeout, fIgnoreErrors, "exec", self.taskExecEx,
1515 (sExecName, long(0), asArgs, asAddEnv, sStdIn, oStdOut, oStdErr, oTestPipe, sAsUser));
1516
1517 def syncExec(self, sExecName, asArgs = (), asAddEnv = (), sAsUser = '', fWithTestPipe = True, sPrefix = '',
1518 cMsTimeout = 3600000, fIgnoreErrors = False):
1519 """Synchronous version."""
1520 return self.asyncToSync(self.asyncExec, sExecName, asArgs, asAddEnv, sAsUser, fWithTestPipe, sPrefix, \
1521 cMsTimeout, fIgnoreErrors);
1522
1523 #
1524 # Public methods - system
1525 #
1526
1527 def asyncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1528 """
1529 Initiates a reboot task.
1530
1531 Returns True on success, False on failure (logged).
1532
1533 The task returns True on success, False on failure (logged). The
1534 session will be disconnected on successful task completion.
1535 """
1536 return self.startTask(cMsTimeout, fIgnoreErrors, "reboot", self.taskReboot, ());
1537
1538 def syncReboot(self, cMsTimeout = 30000, fIgnoreErrors = False):
1539 """Synchronous version."""
1540 return self.asyncToSync(self.asyncReboot, cMsTimeout, fIgnoreErrors);
1541
1542 def asyncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1543 """
1544 Initiates a shutdown task.
1545
1546 Returns True on success, False on failure (logged).
1547
1548 The task returns True on success, False on failure (logged).
1549 """
1550 return self.startTask(cMsTimeout, fIgnoreErrors, "shutdown", self.taskShutdown, ());
1551
1552 def syncShutdown(self, cMsTimeout = 30000, fIgnoreErrors = False):
1553 """Synchronous version."""
1554 return self.asyncToSync(self.asyncShutdown, cMsTimeout, fIgnoreErrors);
1555
1556
1557 #
1558 # Public methods - file system
1559 #
1560
1561 def asyncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1562 """
1563 Initiates a mkdir task.
1564
1565 Returns True on success, False on failure (logged).
1566
1567 The task returns True on success, False on failure (logged).
1568 """
1569 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDir", self.taskMkDir, (sRemoteDir, long(fMode)));
1570
1571 def syncMkDir(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1572 """Synchronous version."""
1573 return self.asyncToSync(self.asyncMkDir, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1574
1575 def asyncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1576 """
1577 Initiates a mkdir -p task.
1578
1579 Returns True on success, False on failure (logged).
1580
1581 The task returns True on success, False on failure (logged).
1582 """
1583 return self.startTask(cMsTimeout, fIgnoreErrors, "mkDirPath", self.taskMkDirPath, (sRemoteDir, long(fMode)));
1584
1585 def syncMkDirPath(self, sRemoteDir, fMode = 0o700, cMsTimeout = 30000, fIgnoreErrors = False):
1586 """Synchronous version."""
1587 return self.asyncToSync(self.asyncMkDirPath, sRemoteDir, long(fMode), cMsTimeout, fIgnoreErrors);
1588
1589 def asyncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1590 """
1591 Initiates a symlink task.
1592
1593 Returns True on success, False on failure (logged).
1594
1595 The task returns True on success, False on failure (logged).
1596 """
1597 return self.startTask(cMsTimeout, fIgnoreErrors, "mkSymlink", self.taskMkSymlink, (sLinkTarget, sLink));
1598
1599 def syncMkSymlink(self, sLinkTarget, sLink, cMsTimeout = 30000, fIgnoreErrors = False):
1600 """Synchronous version."""
1601 return self.asyncToSync(self.asyncMkSymlink, sLinkTarget, sLink, cMsTimeout, fIgnoreErrors);
1602
1603 def asyncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1604 """
1605 Initiates a rmdir task.
1606
1607 Returns True on success, False on failure (logged).
1608
1609 The task returns True on success, False on failure (logged).
1610 """
1611 return self.startTask(cMsTimeout, fIgnoreErrors, "rmDir", self.taskRmDir, (sRemoteDir,));
1612
1613 def syncRmDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1614 """Synchronous version."""
1615 return self.asyncToSync(self.asyncRmDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1616
1617 def asyncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1618 """
1619 Initiates a rmfile task.
1620
1621 Returns True on success, False on failure (logged).
1622
1623 The task returns True on success, False on failure (logged).
1624 """
1625 return self.startTask(cMsTimeout, fIgnoreErrors, "rmFile", self.taskRmFile, (sRemoteFile,));
1626
1627 def syncRmFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1628 """Synchronous version."""
1629 return self.asyncToSync(self.asyncRmFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1630
1631 def asyncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1632 """
1633 Initiates a rmsymlink task.
1634
1635 Returns True on success, False on failure (logged).
1636
1637 The task returns True on success, False on failure (logged).
1638 """
1639 return self.startTask(cMsTimeout, fIgnoreErrors, "rmSymlink", self.taskRmSymlink, (sRemoteSymlink,));
1640
1641 def syncRmSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1642 """Synchronous version."""
1643 return self.asyncToSync(self.asyncRmSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1644
1645 def asyncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1646 """
1647 Initiates a rmtree task.
1648
1649 Returns True on success, False on failure (logged).
1650
1651 The task returns True on success, False on failure (logged).
1652 """
1653 return self.startTask(cMsTimeout, fIgnoreErrors, "rmTree", self.taskRmTree, (sRemoteTree,));
1654
1655 def syncRmTree(self, sRemoteTree, cMsTimeout = 30000, fIgnoreErrors = False):
1656 """Synchronous version."""
1657 return self.asyncToSync(self.asyncRmTree, sRemoteTree, cMsTimeout, fIgnoreErrors);
1658
1659 def asyncChMod(self, sRemotePath, fMode, cMsTimeout = 30000, fIgnoreErrors = False):
1660 """
1661 Initiates a chmod task.
1662
1663 Returns True on success, False on failure (logged).
1664
1665 The task returns True on success, False on failure (logged).
1666 """
1667 return self.startTask(cMsTimeout, fIgnoreErrors, "chMod", self.taskChMod, (sRemotePath, fMode));
1668
1669 def syncChMod(self, sRemotePath, fMode, cMsTimeout = 30000, fIgnoreErrors = False):
1670 """Synchronous version."""
1671 return self.asyncToSync(self.asyncChMod, sRemotePath, fMode, cMsTimeout, fIgnoreErrors);
1672
1673 def asyncChOwn(self, sRemotePath, idUser, idGroup, cMsTimeout = 30000, fIgnoreErrors = False):
1674 """
1675 Initiates a chown task.
1676
1677 Returns True on success, False on failure (logged).
1678
1679 The task returns True on success, False on failure (logged).
1680 """
1681 return self.startTask(cMsTimeout, fIgnoreErrors, "chOwn", self.taskChOwn, (sRemotePath, idUser, idGroup));
1682
1683 def syncChOwn(self, sRemotePath, idUser, idGroup, cMsTimeout = 30000, fIgnoreErrors = False):
1684 """Synchronous version."""
1685 return self.asyncToSync(self.asyncChMod, sRemotePath, idUser, idGroup, cMsTimeout, fIgnoreErrors);
1686
1687 def asyncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1688 """
1689 Initiates a is-dir query task.
1690
1691 Returns True on success, False on failure (logged).
1692
1693 The task returns True if it's a directory, False if it isn't, and
1694 None on error (logged).
1695 """
1696 return self.startTask(cMsTimeout, fIgnoreErrors, "isDir", self.taskIsDir, (sRemoteDir,));
1697
1698 def syncIsDir(self, sRemoteDir, cMsTimeout = 30000, fIgnoreErrors = False):
1699 """Synchronous version."""
1700 return self.asyncToSync(self.asyncIsDir, sRemoteDir, cMsTimeout, fIgnoreErrors);
1701
1702 def asyncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1703 """
1704 Initiates a is-file query task.
1705
1706 Returns True on success, False on failure (logged).
1707
1708 The task returns True if it's a file, False if it isn't, and None on
1709 error (logged).
1710 """
1711 return self.startTask(cMsTimeout, fIgnoreErrors, "isFile", self.taskIsFile, (sRemoteFile,));
1712
1713 def syncIsFile(self, sRemoteFile, cMsTimeout = 30000, fIgnoreErrors = False):
1714 """Synchronous version."""
1715 return self.asyncToSync(self.asyncIsFile, sRemoteFile, cMsTimeout, fIgnoreErrors);
1716
1717 def asyncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1718 """
1719 Initiates a is-symbolic-link query task.
1720
1721 Returns True on success, False on failure (logged).
1722
1723 The task returns True if it's a symbolic linke, False if it isn't, and
1724 None on error (logged).
1725 """
1726 return self.startTask(cMsTimeout, fIgnoreErrors, "isSymlink", self.taskIsSymlink, (sRemoteSymlink,));
1727
1728 def syncIsSymlink(self, sRemoteSymlink, cMsTimeout = 30000, fIgnoreErrors = False):
1729 """Synchronous version."""
1730 return self.asyncToSync(self.asyncIsSymlink, sRemoteSymlink, cMsTimeout, fIgnoreErrors);
1731
1732 #def "STAT "
1733 #def "LSTAT "
1734 #def "LIST "
1735
1736 @staticmethod
1737 def calcFileXferTimeout(cbFile):
1738 """
1739 Calculates a reasonable timeout for an upload/download given the file size.
1740
1741 Returns timeout in milliseconds.
1742 """
1743 return 30000 + cbFile / 32; # 32 KiB/s (picked out of thin air)
1744
1745 @staticmethod
1746 def calcUploadTimeout(sLocalFile):
1747 """
1748 Calculates a reasonable timeout for an upload given the file (will stat it).
1749
1750 Returns timeout in milliseconds.
1751 """
1752 try: cbFile = os.path.getsize(sLocalFile);
1753 except: cbFile = 1024*1024;
1754 return Session.calcFileXferTimeout(cbFile);
1755
1756 def asyncCopyFile(self, sSrcFile, sDstFile,
1757 fMode = 0, fFallbackOkay = True, cMsTimeout = 30000, fIgnoreErrors = False):
1758 """
1759 Initiates a file copying task on the remote.
1760
1761 Returns True on success, False on failure (logged).
1762
1763 The task returns True on success, False on failure (logged).
1764 """
1765 return self.startTask(cMsTimeout, fIgnoreErrors, "cpfile",
1766 self.taskCopyFile, (sSrcFile, sDstFile, fMode, fFallbackOkay));
1767
1768 def syncCopyFile(self, sSrcFile, sDstFile, fMode = 0, cMsTimeout = 30000, fIgnoreErrors = False):
1769 """Synchronous version."""
1770 return self.asyncToSync(self.asyncCopyFile, sSrcFile, sDstFile, fMode, cMsTimeout, fIgnoreErrors);
1771
1772 def asyncUploadFile(self, sLocalFile, sRemoteFile,
1773 fMode = 0, fFallbackOkay = True, cMsTimeout = 30000, fIgnoreErrors = False):
1774 """
1775 Initiates a download query task.
1776
1777 Returns True on success, False on failure (logged).
1778
1779 The task returns True on success, False on failure (logged).
1780 """
1781 return self.startTask(cMsTimeout, fIgnoreErrors, "upload",
1782 self.taskUploadFile, (sLocalFile, sRemoteFile, fMode, fFallbackOkay));
1783
1784 def syncUploadFile(self, sLocalFile, sRemoteFile, fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1785 """Synchronous version."""
1786 if cMsTimeout <= 0:
1787 cMsTimeout = self.calcUploadTimeout(sLocalFile);
1788 return self.asyncToSync(self.asyncUploadFile, sLocalFile, sRemoteFile, fMode, fFallbackOkay, cMsTimeout, fIgnoreErrors);
1789
1790 def asyncUploadString(self, sContent, sRemoteFile,
1791 fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1792 """
1793 Initiates a upload string task.
1794
1795 Returns True on success, False on failure (logged).
1796
1797 The task returns True on success, False on failure (logged).
1798 """
1799 if cMsTimeout <= 0:
1800 cMsTimeout = self.calcFileXferTimeout(len(sContent));
1801 return self.startTask(cMsTimeout, fIgnoreErrors, "uploadString",
1802 self.taskUploadString, (sContent, sRemoteFile, fMode, fFallbackOkay));
1803
1804 def syncUploadString(self, sContent, sRemoteFile, fMode = 0, fFallbackOkay = True, cMsTimeout = 0, fIgnoreErrors = False):
1805 """Synchronous version."""
1806 if cMsTimeout <= 0:
1807 cMsTimeout = self.calcFileXferTimeout(len(sContent));
1808 return self.asyncToSync(self.asyncUploadString, sContent, sRemoteFile, fMode, fFallbackOkay, cMsTimeout, fIgnoreErrors);
1809
1810 def asyncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 120000, fIgnoreErrors = False):
1811 """
1812 Initiates a download file task.
1813
1814 Returns True on success, False on failure (logged).
1815
1816 The task returns True on success, False on failure (logged).
1817 """
1818 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadFile", self.taskDownloadFile, (sRemoteFile, sLocalFile));
1819
1820 def syncDownloadFile(self, sRemoteFile, sLocalFile, cMsTimeout = 120000, fIgnoreErrors = False):
1821 """Synchronous version."""
1822 return self.asyncToSync(self.asyncDownloadFile, sRemoteFile, sLocalFile, cMsTimeout, fIgnoreErrors);
1823
1824 def asyncDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True,
1825 cMsTimeout = 30000, fIgnoreErrors = False):
1826 """
1827 Initiates a download string task.
1828
1829 Returns True on success, False on failure (logged).
1830
1831 The task returns a byte string on success, False on failure (logged).
1832 """
1833 return self.startTask(cMsTimeout, fIgnoreErrors, "downloadString",
1834 self.taskDownloadString, (sRemoteFile, sEncoding, fIgnoreEncodingErrors));
1835
1836 def syncDownloadString(self, sRemoteFile, sEncoding = 'utf-8', fIgnoreEncodingErrors = True,
1837 cMsTimeout = 30000, fIgnoreErrors = False):
1838 """Synchronous version."""
1839 return self.asyncToSync(self.asyncDownloadString, sRemoteFile, sEncoding, fIgnoreEncodingErrors,
1840 cMsTimeout, fIgnoreErrors);
1841
1842 def asyncPackFile(self, sRemoteFile, sRemoteSource, cMsTimeout = 120000, fIgnoreErrors = False):
1843 """
1844 Initiates a packing file/directory task.
1845
1846 Returns True on success, False on failure (logged).
1847
1848 The task returns True on success, False on failure (logged).
1849 """
1850 return self.startTask(cMsTimeout, fIgnoreErrors, "packFile", self.taskPackFile,
1851 (sRemoteFile, sRemoteSource));
1852
1853 def syncPackFile(self, sRemoteFile, sRemoteSource, cMsTimeout = 120000, fIgnoreErrors = False):
1854 """Synchronous version."""
1855 return self.asyncToSync(self.asyncPackFile, sRemoteFile, sRemoteSource, cMsTimeout, fIgnoreErrors);
1856
1857 def asyncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 120000, fIgnoreErrors = False):
1858 """
1859 Initiates a unpack file task.
1860
1861 Returns True on success, False on failure (logged).
1862
1863 The task returns True on success, False on failure (logged).
1864 """
1865 return self.startTask(cMsTimeout, fIgnoreErrors, "unpackFile", self.taskUnpackFile,
1866 (sRemoteFile, sRemoteDir));
1867
1868 def syncUnpackFile(self, sRemoteFile, sRemoteDir, cMsTimeout = 120000, fIgnoreErrors = False):
1869 """Synchronous version."""
1870 return self.asyncToSync(self.asyncUnpackFile, sRemoteFile, sRemoteDir, cMsTimeout, fIgnoreErrors);
1871
1872 def asyncExpandString(self, sString, cMsTimeout = 120000, fIgnoreErrors = False):
1873 """
1874 Initiates an expand string task.
1875
1876 Returns expanded string on success, False on failure (logged).
1877
1878 The task returns True on success, False on failure (logged).
1879 """
1880 return self.startTask(cMsTimeout, fIgnoreErrors, "expandString",
1881 self.taskExpandString, (sString,));
1882
1883 def syncExpandString(self, sString, cMsTimeout = 120000, fIgnoreErrors = False):
1884 """Synchronous version."""
1885 return self.asyncToSync(self.asyncExpandString, sString, cMsTimeout, fIgnoreErrors);
1886
1887
1888class TransportTcp(TransportBase):
1889 """
1890 TCP transport layer for the TXS client session class.
1891 """
1892
1893 def __init__(self, sHostname, uPort, fReversedSetup):
1894 """
1895 Save the parameters. The session will call us back to make the
1896 connection later on its worker thread.
1897 """
1898 TransportBase.__init__(self, utils.getCallerName());
1899 self.sHostname = sHostname;
1900 self.fReversedSetup = fReversedSetup;
1901 self.uPort = uPort if uPort is not None else 5042 if fReversedSetup is False else 5048;
1902 self.oSocket = None;
1903 self.oWakeupW = None;
1904 self.oWakeupR = None;
1905 self.fConnectCanceled = False;
1906 self.fIsConnecting = False;
1907 self.oCv = threading.Condition();
1908 self.abReadAhead = array.array('B');
1909
1910 def toString(self):
1911 return '<%s sHostname=%s, fReversedSetup=%s, uPort=%s, oSocket=%s,'\
1912 ' fConnectCanceled=%s, fIsConnecting=%s, oCv=%s, abReadAhead=%s>' \
1913 % (TransportBase.toString(self), self.sHostname, self.fReversedSetup, self.uPort, self.oSocket,
1914 self.fConnectCanceled, self.fIsConnecting, self.oCv, self.abReadAhead);
1915
1916 def __isInProgressXcpt(self, oXcpt):
1917 """ In progress exception? """
1918 try:
1919 if isinstance(oXcpt, socket.error):
1920 try:
1921 if oXcpt.errno == errno.EINPROGRESS:
1922 return True;
1923 except: pass;
1924 # Windows?
1925 try:
1926 if oXcpt.errno == errno.EWOULDBLOCK:
1927 return True;
1928 except: pass;
1929 except:
1930 pass;
1931 return False;
1932
1933 def __isWouldBlockXcpt(self, oXcpt):
1934 """ Would block exception? """
1935 try:
1936 if isinstance(oXcpt, socket.error):
1937 try:
1938 if oXcpt.errno == errno.EWOULDBLOCK:
1939 return True;
1940 except: pass;
1941 try:
1942 if oXcpt.errno == errno.EAGAIN:
1943 return True;
1944 except: pass;
1945 except:
1946 pass;
1947 return False;
1948
1949 def __isConnectionReset(self, oXcpt):
1950 """ Connection reset by Peer or others. """
1951 try:
1952 if isinstance(oXcpt, socket.error):
1953 try:
1954 if oXcpt.errno == errno.ECONNRESET:
1955 return True;
1956 except: pass;
1957 try:
1958 if oXcpt.errno == errno.ENETRESET:
1959 return True;
1960 except: pass;
1961 except:
1962 pass;
1963 return False;
1964
1965 def _closeWakeupSockets(self):
1966 """ Closes the wakup sockets. Caller should own the CV. """
1967 oWakeupR = self.oWakeupR;
1968 self.oWakeupR = None;
1969 if oWakeupR is not None:
1970 oWakeupR.close();
1971
1972 oWakeupW = self.oWakeupW;
1973 self.oWakeupW = None;
1974 if oWakeupW is not None:
1975 oWakeupW.close();
1976
1977 return None;
1978
1979 def cancelConnect(self):
1980 # This is bad stuff.
1981 self.oCv.acquire();
1982 reporter.log2('TransportTcp::cancelConnect: fIsConnecting=%s oSocket=%s' % (self.fIsConnecting, self.oSocket));
1983 self.fConnectCanceled = True;
1984 if self.fIsConnecting:
1985 oSocket = self.oSocket;
1986 self.oSocket = None;
1987 if oSocket is not None:
1988 reporter.log2('TransportTcp::cancelConnect: closing the socket');
1989 oSocket.close();
1990
1991 oWakeupW = self.oWakeupW;
1992 self.oWakeupW = None;
1993 if oWakeupW is not None:
1994 reporter.log2('TransportTcp::cancelConnect: wakeup call');
1995 try: oWakeupW.send(b'cancelled!\n');
1996 except: reporter.logXcpt();
1997 try: oWakeupW.shutdown(socket.SHUT_WR);
1998 except: reporter.logXcpt();
1999 oWakeupW.close();
2000 self.oCv.release();
2001
2002 def _connectAsServer(self, oSocket, oWakeupR, cMsTimeout):
2003 """ Connects to the TXS server as server, i.e. the reversed setup. """
2004 assert(self.fReversedSetup);
2005
2006 reporter.log2('TransportTcp::_connectAsServer: oSocket=%s, cMsTimeout=%u' % (oSocket, cMsTimeout));
2007
2008 # Workaround for bind() failure...
2009 try:
2010 oSocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1);
2011 except:
2012 reporter.errorXcpt('socket.listen(1) failed');
2013 return None;
2014
2015 # Bind the socket and make it listen.
2016 try:
2017 oSocket.bind((self.sHostname, self.uPort));
2018 except:
2019 reporter.errorXcpt('socket.bind((%s,%s)) failed' % (self.sHostname, self.uPort));
2020 return None;
2021 try:
2022 oSocket.listen(1);
2023 except:
2024 reporter.errorXcpt('socket.listen(1) failed');
2025 return None;
2026
2027 # Accept connections.
2028 oClientSocket = None;
2029 tClientAddr = None;
2030 try:
2031 (oClientSocket, tClientAddr) = oSocket.accept();
2032 except socket.error as e:
2033 if not self.__isInProgressXcpt(e):
2034 raise;
2035
2036 # Do the actual waiting.
2037 reporter.log2('TransportTcp::accept: operation in progress (%s)...' % (e,));
2038 try:
2039 select.select([oSocket, oWakeupR], [], [oSocket, oWakeupR], cMsTimeout / 1000.0);
2040 except socket.error as oXctp:
2041 if oXctp.errno != errno.EBADF or not self.fConnectCanceled:
2042 raise;
2043 reporter.log('socket.select() on accept was canceled');
2044 return None;
2045 except:
2046 reporter.logXcpt('socket.select() on accept');
2047
2048 # Try accept again.
2049 try:
2050 (oClientSocket, tClientAddr) = oSocket.accept();
2051 except socket.error as oXcpt:
2052 if not self.__isInProgressXcpt(e):
2053 if oXcpt.errno != errno.EBADF or not self.fConnectCanceled:
2054 raise;
2055 reporter.log('socket.accept() was canceled');
2056 return None;
2057 reporter.log('socket.accept() timed out');
2058 return False;
2059 except:
2060 reporter.errorXcpt('socket.accept() failed');
2061 return None;
2062 except:
2063 reporter.errorXcpt('socket.accept() failed');
2064 return None;
2065
2066 # Store the connected socket and throw away the server socket.
2067 self.oCv.acquire();
2068 if not self.fConnectCanceled:
2069 self.oSocket.close();
2070 self.oSocket = oClientSocket;
2071 self.sHostname = "%s:%s" % (tClientAddr[0], tClientAddr[1]);
2072 self.oCv.release();
2073 return True;
2074
2075 def _connectAsClient(self, oSocket, oWakeupR, cMsTimeout):
2076 """ Connects to the TXS server as client. """
2077 assert(not self.fReversedSetup);
2078
2079 # Connect w/ timeouts.
2080 rc = None;
2081 try:
2082 oSocket.connect((self.sHostname, self.uPort));
2083 rc = True;
2084 except socket.error as oXcpt:
2085 iRc = oXcpt.errno;
2086 if self.__isInProgressXcpt(oXcpt):
2087 # Do the actual waiting.
2088 reporter.log2('TransportTcp::connect: operation in progress (%s)...' % (oXcpt,));
2089 try:
2090 ttRc = select.select([oWakeupR], [oSocket], [oSocket, oWakeupR], cMsTimeout / 1000.0);
2091 if len(ttRc[1]) + len(ttRc[2]) == 0:
2092 raise socket.error(errno.ETIMEDOUT, 'select timed out');
2093 iRc = oSocket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR);
2094 rc = iRc == 0;
2095 except socket.error as oXcpt2:
2096 iRc = oXcpt2.errno;
2097 except:
2098 iRc = -42;
2099 reporter.fatalXcpt('socket.select() on connect failed');
2100
2101 if rc is True:
2102 pass;
2103 elif iRc in (errno.ECONNREFUSED, errno.EHOSTUNREACH, errno.EINTR, errno.ENETDOWN, errno.ENETUNREACH, errno.ETIMEDOUT):
2104 rc = False; # try again.
2105 else:
2106 if iRc != errno.EBADF or not self.fConnectCanceled:
2107 reporter.fatalXcpt('socket.connect((%s,%s)) failed; iRc=%s' % (self.sHostname, self.uPort, iRc));
2108 reporter.log2('TransportTcp::connect: rc=%s iRc=%s' % (rc, iRc));
2109 except:
2110 reporter.fatalXcpt('socket.connect((%s,%s)) failed' % (self.sHostname, self.uPort));
2111 return rc;
2112
2113
2114 def connect(self, cMsTimeout):
2115 # Create a non-blocking socket.
2116 reporter.log2('TransportTcp::connect: cMsTimeout=%s sHostname=%s uPort=%s' % (cMsTimeout, self.sHostname, self.uPort));
2117 try:
2118 oSocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0);
2119 except:
2120 reporter.fatalXcpt('socket.socket() failed');
2121 return None;
2122 try:
2123 oSocket.setblocking(0);
2124 except:
2125 oSocket.close();
2126 reporter.fatalXcpt('socket.socket() failed');
2127 return None;
2128
2129 # Create wakeup socket pair for unix (select doesn't wake up on socket close on Linux).
2130 oWakeupR = None;
2131 oWakeupW = None;
2132 if hasattr(socket, 'socketpair'):
2133 try: (oWakeupR, oWakeupW) = socket.socketpair(); # pylint: disable=no-member
2134 except: reporter.logXcpt('socket.socketpair() failed');
2135
2136 # Update the state.
2137 self.oCv.acquire();
2138 rc = None;
2139 if not self.fConnectCanceled:
2140 self.oSocket = oSocket;
2141 self.oWakeupW = oWakeupW;
2142 self.oWakeupR = oWakeupR;
2143 self.fIsConnecting = True;
2144 self.oCv.release();
2145
2146 # Try connect.
2147 if oWakeupR is None:
2148 oWakeupR = oSocket; # Avoid select failure.
2149 if self.fReversedSetup:
2150 rc = self._connectAsServer(oSocket, oWakeupR, cMsTimeout);
2151 else:
2152 rc = self._connectAsClient(oSocket, oWakeupR, cMsTimeout);
2153 oSocket = None;
2154
2155 # Update the state and cleanup on failure/cancel.
2156 self.oCv.acquire();
2157 if rc is True and self.fConnectCanceled:
2158 rc = False;
2159 self.fIsConnecting = False;
2160
2161 if rc is not True:
2162 if self.oSocket is not None:
2163 self.oSocket.close();
2164 self.oSocket = None;
2165 self._closeWakeupSockets();
2166 self.oCv.release();
2167
2168 reporter.log2('TransportTcp::connect: returning %s' % (rc,));
2169 return rc;
2170
2171 def disconnect(self, fQuiet = False):
2172 if self.oSocket is not None:
2173 self.abReadAhead = array.array('B');
2174
2175 # Try a shutting down the socket gracefully (draining it).
2176 try:
2177 self.oSocket.shutdown(socket.SHUT_WR);
2178 except:
2179 if not fQuiet:
2180 reporter.error('shutdown(SHUT_WR)');
2181 try:
2182 self.oSocket.setblocking(0); # just in case it's not set.
2183 sData = "1";
2184 while sData:
2185 sData = self.oSocket.recv(16384);
2186 except:
2187 pass;
2188
2189 # Close it.
2190 self.oCv.acquire();
2191 try: self.oSocket.setblocking(1);
2192 except: pass;
2193 self.oSocket.close();
2194 self.oSocket = None;
2195 else:
2196 self.oCv.acquire();
2197 self._closeWakeupSockets();
2198 self.oCv.release();
2199
2200 def sendBytes(self, abBuf, cMsTimeout):
2201 if self.oSocket is None:
2202 reporter.error('TransportTcp.sendBytes: No connection.');
2203 return False;
2204
2205 # Try send it all.
2206 try:
2207 cbSent = self.oSocket.send(abBuf);
2208 if cbSent == len(abBuf):
2209 return True;
2210 except Exception as oXcpt:
2211 if not self.__isWouldBlockXcpt(oXcpt):
2212 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
2213 return False;
2214 cbSent = 0;
2215
2216 # Do a timed send.
2217 msStart = base.timestampMilli();
2218 while True:
2219 cMsElapsed = base.timestampMilli() - msStart;
2220 if cMsElapsed > cMsTimeout:
2221 reporter.error('TranportTcp.sendBytes: %s bytes timed out (1)' % (len(abBuf)));
2222 break;
2223
2224 # wait.
2225 try:
2226 ttRc = select.select([], [self.oSocket], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2227 if ttRc[2] and not ttRc[1]:
2228 reporter.error('TranportTcp.sendBytes: select returned with exception');
2229 break;
2230 if not ttRc[1]:
2231 reporter.error('TranportTcp.sendBytes: %s bytes timed out (2)' % (len(abBuf)));
2232 break;
2233 except:
2234 reporter.errorXcpt('TranportTcp.sendBytes: select failed');
2235 break;
2236
2237 # Try send more.
2238 try:
2239 cbSent += self.oSocket.send(abBuf[cbSent:]);
2240 if cbSent == len(abBuf):
2241 return True;
2242 except Exception as oXcpt:
2243 if not self.__isWouldBlockXcpt(oXcpt):
2244 reporter.errorXcpt('TranportTcp.sendBytes: %s bytes' % (len(abBuf)));
2245 break;
2246
2247 return False;
2248
2249 def __returnReadAheadBytes(self, cb):
2250 """ Internal worker for recvBytes. """
2251 assert(len(self.abReadAhead) >= cb);
2252 abRet = self.abReadAhead[:cb];
2253 self.abReadAhead = self.abReadAhead[cb:];
2254 return abRet;
2255
2256 def recvBytes(self, cb, cMsTimeout, fNoDataOk):
2257 if self.oSocket is None:
2258 reporter.error('TransportTcp.recvBytes(%s,%s): No connection.' % (cb, cMsTimeout));
2259 return None;
2260
2261 # Try read in some more data without bothering with timeout handling first.
2262 if len(self.abReadAhead) < cb:
2263 try:
2264 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2265 if abBuf:
2266 self.abReadAhead.extend(array.array('B', abBuf));
2267 except Exception as oXcpt:
2268 if not self.__isWouldBlockXcpt(oXcpt):
2269 reporter.errorXcpt('TranportTcp.recvBytes: 0/%s bytes' % (cb,));
2270 return None;
2271
2272 if len(self.abReadAhead) >= cb:
2273 return self.__returnReadAheadBytes(cb);
2274
2275 # Timeout loop.
2276 msStart = base.timestampMilli();
2277 while True:
2278 cMsElapsed = base.timestampMilli() - msStart;
2279 if cMsElapsed > cMsTimeout:
2280 if not fNoDataOk or self.abReadAhead:
2281 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (1)' % (len(self.abReadAhead), cb));
2282 break;
2283
2284 # Wait.
2285 try:
2286 ttRc = select.select([self.oSocket], [], [self.oSocket], (cMsTimeout - cMsElapsed) / 1000.0);
2287 if ttRc[2] and not ttRc[0]:
2288 reporter.error('TranportTcp.recvBytes: select returned with exception');
2289 break;
2290 if not ttRc[0]:
2291 if not fNoDataOk or self.abReadAhead:
2292 reporter.error('TranportTcp.recvBytes: %s/%s bytes timed out (2) fNoDataOk=%s'
2293 % (len(self.abReadAhead), cb, fNoDataOk));
2294 break;
2295 except:
2296 reporter.errorXcpt('TranportTcp.recvBytes: select failed');
2297 break;
2298
2299 # Try read more.
2300 try:
2301 abBuf = self.oSocket.recv(cb - len(self.abReadAhead));
2302 if not abBuf:
2303 reporter.error('TranportTcp.recvBytes: %s/%s bytes (%s) - connection has been shut down'
2304 % (len(self.abReadAhead), cb, fNoDataOk));
2305 self.disconnect();
2306 return None;
2307
2308 self.abReadAhead.extend(array.array('B', abBuf));
2309
2310 except Exception as oXcpt:
2311 reporter.log('recv => exception %s' % (oXcpt,));
2312 if not self.__isWouldBlockXcpt(oXcpt):
2313 if not fNoDataOk or not self.__isConnectionReset(oXcpt) or self.abReadAhead:
2314 reporter.errorXcpt('TranportTcp.recvBytes: %s/%s bytes (%s)' % (len(self.abReadAhead), cb, fNoDataOk));
2315 break;
2316
2317 # Done?
2318 if len(self.abReadAhead) >= cb:
2319 return self.__returnReadAheadBytes(cb);
2320
2321 #reporter.log('recv => None len(self.abReadAhead) -> %d' % (len(self.abReadAhead), ));
2322 return None;
2323
2324 def isConnectionOk(self):
2325 if self.oSocket is None:
2326 return False;
2327 try:
2328 ttRc = select.select([], [], [self.oSocket], 0.0);
2329 if ttRc[2]:
2330 return False;
2331
2332 self.oSocket.send(array.array('B')); # send zero bytes.
2333 except:
2334 return False;
2335 return True;
2336
2337 def isRecvPending(self, cMsTimeout = 0):
2338 try:
2339 ttRc = select.select([self.oSocket], [], [], cMsTimeout / 1000.0);
2340 if not ttRc[0]:
2341 return False;
2342 except:
2343 pass;
2344 return True;
2345
2346
2347def openTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0, fnProcessEvents = None):
2348 """
2349 Opens a connection to a Test Execution Service via TCP, given its name.
2350
2351 The optional fnProcessEvents callback should be set to vbox.processPendingEvents
2352 or similar.
2353 """
2354 reporter.log2('openTcpSession(%s, %s, %s, %s, %s)' %
2355 (cMsTimeout, sHostname, uPort, fReversedSetup, cMsIdleFudge));
2356 try:
2357 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2358 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fnProcessEvents = fnProcessEvents);
2359 except:
2360 reporter.errorXcpt(None, 15);
2361 return None;
2362 return oSession;
2363
2364
2365def tryOpenTcpSession(cMsTimeout, sHostname, uPort = None, fReversedSetup = False, cMsIdleFudge = 0, fnProcessEvents = None):
2366 """
2367 Tries to open a connection to a Test Execution Service via TCP, given its name.
2368
2369 This differs from openTcpSession in that it won't log a connection failure
2370 as an error.
2371 """
2372 try:
2373 oTransport = TransportTcp(sHostname, uPort, fReversedSetup);
2374 oSession = Session(oTransport, cMsTimeout, cMsIdleFudge, fTryConnect = True, fnProcessEvents = fnProcessEvents);
2375 except:
2376 reporter.errorXcpt(None, 15);
2377 return None;
2378 return oSession;
Note: See TracBrowser for help on using the repository browser.

© 2024 Oracle Support Privacy / Do Not Sell My Info Terms of Use Trademark Policy Automated Access Etiquette