LMS-2_ReportAPI/tcpthreadhelper.pas
2025-07-02 12:34:35 +03:00

1118 lines
29 KiB
ObjectPascal

unit tcpthreadhelper;
{$mode ObjFPC}{$H+}
interface
uses
Classes, SysUtils,lNet,lnetbase, syncobjs, extTypes;
type
TMainThread=class;
TConnectionThread=class;
{ TConnectionThread }
TConnectionThread=class(TThread)
private
fSocket: TLSocket;
fCache: TRoundBuffer;
fOwner: TMainThread;
class function BufferToString(const Buffer: TBuffer; const len: integer): string;
class procedure AddToBuffer(const Value: byte; var Buffer: TBuffer; var pos: integer); overLoad;
class procedure AddToBuffer(const Value: word; var Buffer: TBuffer; var pos: integer); overLoad;
class procedure AddToBuffer(const Value: dword; var Buffer: TBuffer; var pos: integer); overLoad;
class procedure AddToBuffer(const Value: QWord; var Buffer: TBuffer; var pos: integer); overLoad;
class procedure AddToBuffer(const Value: string; var Buffer: TBuffer; var pos: integer); overLoad;
class procedure AddToBuffer(const Value: TGUID; var Buffer: TBuffer; var pos: integer); overLoad;
class procedure AddToBuffer(const Value: TBuffer; var Buffer: TBuffer; var pos: integer); overLoad;
class procedure AddToBuffer(const Value: TStream; var Buffer: TBuffer; var pos: integer); overLoad;
class procedure AddToBuffer(const Value: TParamArray; var Buffer: TBuffer; var pos: integer); overLoad;
class procedure ReadFromBuffer(var Value: byte; const Buffer: TBuffer; var pos: integer); overLoad;
class procedure ReadFromBuffer(var Value: word; const Buffer: TBuffer; var pos: integer); overLoad;
class procedure ReadFromBuffer(var Value: dword; const Buffer: TBuffer; var pos: integer); overLoad;
class procedure ReadFromBuffer(var Value: QWord; const Buffer: TBuffer; var pos: integer); overLoad;
class procedure ReadFromBuffer(var Value: string; const Buffer: TBuffer; var pos: integer); overLoad;
class procedure ReadFromBuffer(var Value: TGUID; const Buffer: TBuffer; var pos: integer); overLoad;
class procedure ReadFromBuffer(var Value: TBuffer; const Buffer: TBuffer; var pos: integer); overLoad;
class procedure ReadFromBuffer(out Value: TStream; const Buffer: TBuffer; var pos: integer); overLoad;
class procedure ReadFromBuffer(out Value: TStrings; const Buffer: TBuffer; var pos: integer); overLoad;
class procedure ReadFromBuffer(out Value: TParamArray; const Buffer: TBuffer; var pos: integer); overLoad;
class procedure InitBuffer(buffSize: dword; out Buffer: TBuffer; out pos: integer);
public
ID: TGUID;
recNo: qword;
property Owner: TMainThread read fOwner;
property Socket:TLSocket read fSocket;
property Cache: TRoundBuffer read fCache;
procedure log(ALevel: TLogLevel; msg: string);
procedure ProcessMessage(const mode: byte;const Code:DWORD; const Param:QWord; const ACommand: string;const Values: TStrings; const intData: TParamArray; const Data: TStream); virtual; abstract;
class function Role: string; virtual; abstract;
procedure SendBuffer(const Buffer: TBuffer; Len: dword);
function ReceiveBuffer(var Buffer: TBuffer; out Len: dword): boolean;
procedure SendHeader(packetType,state: byte; Code:DWORD; QP:QWORD;SP:string);
function ReceiveHeader(out packetType,state: byte;out Sender:TGUID; out num:QWORD; out Code:DWORD;out QP:QWORD;out SP:string): boolean;
procedure SendData(part: byte; const Data: TStream); overload;
procedure SendData(part: byte; const Data: TBuffer); overload;
procedure SendData(part: byte; const Data: TStrings); overload;
procedure SendData(part: byte; const Data: TParamArray); overload;
procedure SendMessage(const mode: byte; const Status: DWORD; const QParam: QWord; const sParam: string); overload;
procedure SendMessage(const mode: byte; const Status: DWORD; const QParam: QWord;const AValue: string; const AKeys: TStrings); overload;
procedure SendMessage(const mode: byte; const CommandID: DWORD;const QParam: QWord; const AValue: string; const AKeys: TStrings;const IntData: TParamArray; const AData: TStream); overload;
function ReceiveMessage(out mode: byte;out Sender: TGUID; out rNum: QWord;out CommandID: DWORD; out QParam:QWord; out Value: string; out intData: TParamArray; out Keys: TStrings; out Data: TStream): boolean; virtual;
constructor Create(Aowner: TMainThread; ASocket:TLSocket);
destructor Destroy; override;
procedure Execute;override;
procedure TerminatedSet; override;
end;
TConnectionThreadClass = class of TConnectionThread;
{ TMainThread }
TMainThread=class(TThread)
private
fCon: TLTCP;
fPort: integer;
fclients: TList;
flogger: TLogger;
fThreadClass: TConnectionThreadClass;
fStarted:boolean;
fComplete: boolean;
function getThread(index: TLSocket): TConnectionThread;
protected
procedure Log(ALevel: TLogLevel; Sender:TObject; msg: string);
procedure doStart; virtual;
procedure TerminateClients;
property Complete: boolean read fComplete;
public
property Port: integer read fPort;
property Connect: TLTCP read fCon;
property Client[index: TLSocket]: TConnectionThread read getThread;
procedure RemoveClient(clt:TConnectionThread);
procedure dataReady(aSocket: TLSocket);
procedure ProcessConnect(thread: TConnectionThread); virtual;
procedure ProcessAccept(thread: TConnectionThread); virtual;
procedure Accept(aSocket: TLSocket);
procedure doDisconnect(aSocket: TLSocket);
procedure doConnect(aSocket: TLSocket);
procedure TerminatedSet;override;
procedure NetError(const msg: string; aSocket: TLSocket);
constructor Create(AThreadClass: TConnectionThreadClass; ALogger: TLogger; APort: integer);
destructor Destroy; override;
procedure SetComplete;
end;
{ TClientRequest }
implementation
uses
lCommon;
function TMainThread.getThread(index: TLSocket): TConnectionThread;
var
clt: TConnectionThread;
i: integer;
begin
result := nil;
for i := 0 to fclients.Count-1 do
if TConnectionThread(fclients[i]).Socket=index then
begin
result := TConnectionThread(fclients[i]);
log(mtExtra,self,format('getThread(%d) %s',[index.Handle,guidToString(result.ID)]));
exit;
end;
result := fThreadClass.Create(self,index);
log(mtDebug,self,format('new Thread(%d) %s',[index.Handle,guidToString(result.ID)]));
fclients.Add(Result);
end;
procedure TMainThread.TerminateClients;
var
i: integer;
clt: TConnectionThread;
begin
log(mtDebug,Self,'Terminate Clients '+inttostr(fclients.Count));
for i := fclients.Count-1 downto 0 do
begin
sleep(0);
clt := TConnectionThread(fclients[i]);
try
log(mtDebug,self,GuidToString(clt.ID));
clt.Terminate;
clt.WaitFor;
clt.free;
except on e: Exception do
begin
log(mtError,self, '!!ERROR Destroy ' + e.Message);
raise;
end;
end;
end;
fClients.Clear;
end;
procedure TMainThread.Log(ALevel: TLogLevel; Sender: TObject; msg: string);
begin
if assigned(fLogger) then
fLogger(ALevel, Sender,Msg);
end;
procedure TMainThread.doStart;
begin
fStarted := true;
fComplete:=false;
end;
procedure TMainThread.RemoveClient(clt: TConnectionThread);
begin
clt.Terminate;
clt.WaitFor;
log(mtDebug,self,'RemoveClient '+GuidToString(clt.ID));
fclients.Remove(clt);
clt.free;
end;
procedure TMainThread.dataReady(aSocket: TLSocket);
var
clt: TConnectionThread;
begin
log(mtExtra,self,format('dataReady(%s:%d<-%s:%d) ',[aSocket.LocalAddress,aSocket.LocalPort,aSocket.PeerAddress,aSocket.PeerPort]));
if Terminated then exit;
clt := Client[aSocket];
clt.Cache.WriteReady.WaitFor(INFINITE);
while clt.Cache.ReadFromSocket(aSocket)<>0 do
begin
sleep(0);
end;
end;
procedure TMainThread.ProcessConnect(thread: TConnectionThread);
begin
log(mtExtra,self,'ProcessConnect'+GUIDToString(thread.ID));
end;
procedure TMainThread.ProcessAccept(thread: TConnectionThread);
begin
log(mtExtra,self,'ProcessAccept'+GUIDToString(thread.ID));
end;
procedure TMainThread.Accept(aSocket: TLSocket);
var
clt: TConnectionThread;
begin
log(mtExtra,self,format('accept(%s:%d<-%s:%d) ',[aSocket.LocalAddress,aSocket.LocalPort,aSocket.PeerAddress,aSocket.PeerPort]));
if Terminated then exit;
clt := Client[aSocket];
log(mtExtra,self,format('connected %s on %d ',[GUIDToString(clt.ID), aSocket.Handle]));
ProcessAccept(clt);
clt.start;
end;
procedure TMainThread.doDisconnect(aSocket: TLSocket);
var
clt: TConnectionThread;
begin
if terminated then exit;
log(mtDebug,self,format('disconnect(%s:%d<-%s:%d) ',[aSocket.LocalAddress,aSocket.LocalPort,aSocket.PeerAddress,aSocket.PeerPort]));
try
clt := Client[aSocket];
if assigned(clt) then
begin
log(mtDebug,self,format('disconnected %s on %d ',[GUIDToString(clt.ID), aSocket.Handle]));
clt.Terminate;
fclients.remove(clt);
clt.WaitFor;
clt.free;
end;
except on e: Exception do
begin
log(mtError,self,'!!ERROR doDisconnect '+e.Message);
raise;
end;
end;
end;
procedure TMainThread.doConnect(aSocket: TLSocket);
var
clt: TConnectionThread;
begin
log(mtExtra,self,format('doConnect(%s:%d<-%s:%d) ',[aSocket.LocalAddress,aSocket.LocalPort,aSocket.PeerAddress,aSocket.PeerPort]));
if Terminated then exit;
clt := Client[aSocket];
log(mtExtra,self,format('connected %s on %d ',[GUIDToString(clt.ID), aSocket.Handle]));
ProcessConnect(clt);
clt.Start;
end;
procedure TMainThread.TerminatedSet;
begin
log(mtExtra,self,'terminated required');
inherited TerminatedSet();
end;
procedure TMainThread.NetError(const msg: string; aSocket: TLSocket);
begin
if assigned(aSocket) then
log(mtWarning, self,'!!NETERROR on '+inttostr(aSocket.Handle)+#09+msg)
else
log(mtWarning, self,'!!NETERROR '+msg);
end;
constructor TMainThread.Create(AThreadClass: TConnectionThreadClass;
ALogger: TLogger; APort: integer);
begin
inherited Create(true);
fStarted := false;
fThreadClass:=AThreadClass;
fCon := TLTcp.Create(nil);
fclients := TList.Create;
fLogger := ALogger;
fPort := APort;
Connect.OnDisconnect:=@doDisconnect;
Connect.OnReceive:=@dataReady;
Connect.Timeout:=100;
log(mtExtra,self,'create main thread');
end;
destructor TMainThread.Destroy;
begin
log(mtExtra,self,'Destroy');
TerminateClients;
fClients.Free;
fCon.Free;
Inherited Destroy;
end;
procedure TMainThread.SetComplete;
begin
log(mtExtra,self,'setcomplete');
fComplete:=true;
end;
{ TConnectionThread }
class function TConnectionThread.BufferToString(const Buffer: TBuffer;
const len: integer): string;
var
i: integer;
begin
result := '';
for i := 1 to len do
begin
result := result + IntToHex(Buffer[i-1],2)+' ';
if (i mod 64)=0 then
result := result +#13#10;
end;
end;
class procedure TConnectionThread.AddToBuffer(const Value: byte;
var Buffer: TBuffer; var pos: integer);
begin
Buffer[pos] := Value;
inc(pos);
end;
class procedure TConnectionThread.AddToBuffer(const Value: word;
var Buffer: TBuffer; var pos: integer);
var
i: integer;
b: byte;
begin
for i := 0 to 1 do
begin
b := (Value shr (8*i)) and $FF;
Buffer[pos] := b;
inc(pos);
end;
end;
class procedure TConnectionThread.AddToBuffer(const Value: dword;
var Buffer: TBuffer; var pos: integer);
var
i: integer;
b: byte;
begin
for i := 0 to 3 do
begin
b := (Value shr (8*i)) and $FF;
Buffer[pos] := b;
inc(pos);
end;
end;
class procedure TConnectionThread.AddToBuffer(const Value: QWord;
var Buffer: TBuffer; var pos: integer);
var
i: integer;
b: Byte;
begin
for i := 0 to 7 do
begin
b := (Value shr (8*i)) and $FF;
Buffer[pos] := b;
inc(pos);
end;
end;
class procedure TConnectionThread.AddToBuffer(const Value: string;
var Buffer: TBuffer; var pos: integer);
var
len,i: DWORD;
p: PChar;
b: byte;
begin
len := Length(Value);
AddToBuffer(Len,Buffer,pos);
p := PChar(Value);
for i := 1 to len do
begin
b :=byte(p^);
AddToBuffer(b,Buffer,pos);
inc(p);
end;
end;
class procedure TConnectionThread.AddToBuffer(const Value: TGUID;
var Buffer: TBuffer; var pos: integer);
var
i: integer;
begin
AddToBuffer(Value.D1,Buffer,pos);
AddToBuffer(Value.D2,Buffer,pos);
AddToBuffer(Value.D3,Buffer,pos);
for i := 0 to 7 do
AddToBuffer(Value.D4[i],Buffer,pos);
end;
class procedure TConnectionThread.AddToBuffer(const Value: TBuffer;
var Buffer: TBuffer; var pos: integer);
var
len: DWORD;
i: integer;
begin
len := length(Value);
AddToBuffer(len,Buffer,pos);
for i := 0 to len-1 do
AddToBuffer(Value[i],Buffer,pos);
end;
class procedure TConnectionThread.AddToBuffer(const Value: TStream;
var Buffer: TBuffer; var pos: integer);
var
len: QWORD;
i: integer;
b: byte;
begin
len := Value.Size;
AddToBuffer(len,Buffer,pos);
Value.seek(0,soFromBeginning);
for i := 0 to len-1 do
begin
Value.Read(b,1);
AddToBuffer(b,Buffer,pos);
end;
end;
class procedure TConnectionThread.AddToBuffer(const Value: TParamArray;
var Buffer: TBuffer; var pos: integer);
var
l,i: DWORD;
begin
l := Length(Value);
AddToBuffer(l,Buffer,pos);
for i := low(Value) to high(Value) do
AddToBuffer(Value[i],Buffer,pos);
end;
class procedure TConnectionThread.ReadFromBuffer(var Value: byte;
const Buffer: TBuffer; var pos: integer);
begin
Value := Buffer[pos];
inc(pos);
end;
class procedure TConnectionThread.ReadFromBuffer(var Value: word;
const Buffer: TBuffer; var pos: integer);
var
i: integer;
begin
Value := 0;
for i := 0 to 1 do
begin
Value := Value or (Buffer[pos] shl (8*i));
inc(pos);
end;
end;
class procedure TConnectionThread.ReadFromBuffer(var Value: dword;
const Buffer: TBuffer; var pos: integer);
var
i: integer;
begin
Value := 0;
for i := 0 to 3 do
begin
Value := Value or (Buffer[pos] shl (8*i));
inc(pos);
end;
end;
class procedure TConnectionThread.ReadFromBuffer(var Value: QWord;
const Buffer: TBuffer; var pos: integer);
var
i: integer;
begin
Value := 0;
for i := 0 to 7 do
begin
Value := Value or (Buffer[pos] shl (8*i));
inc(pos);
end;
end;
class procedure TConnectionThread.ReadFromBuffer(var Value: string;
const Buffer: TBuffer; var pos: integer);
var
len: DWORD;
i: integer;
p: PChar;
begin
ReadFromBuffer(Len,Buffer,pos);
Value := StringOfChar(' ',len);
for i := 1 to len do
begin
Value[i] := chr(Buffer[pos]);
inc(pos);
end;
end;
class procedure TConnectionThread.ReadFromBuffer(var Value: TGUID;
const Buffer: TBuffer; var pos: integer);
var
i: integer;
begin
ReadFromBuffer(Value.D1,Buffer,pos);
ReadFromBuffer(Value.D2,Buffer,pos);
ReadFromBuffer(Value.D3,Buffer,pos);
for i := 0 to 7 do
ReadFromBuffer(Value.D4[i],Buffer,pos);
end;
class procedure TConnectionThread.ReadFromBuffer(var Value: TBuffer;
const Buffer: TBuffer; var pos: integer);
var
len: DWORD;
i: integer;
begin
ReadFromBuffer(len,Buffer,pos);
setLength(Value,len);
for i := 0 to len-1 do
ReadFromBuffer(Value[i],Buffer,pos);
end;
class procedure TConnectionThread.ReadFromBuffer(out Value: TStream;
const Buffer: TBuffer; var pos: integer);
var
len: QWORD;
i: integer;
b: byte;
begin
Value := TMemoryStream.Create;
ReadFromBuffer(len,Buffer,pos);
if len=0 then exit;
for i := 0 to len-1 do
begin
ReadFromBuffer(b,Buffer,pos);
Value.Write(b,1);
end;
Value.seek(0,soFromBeginning);
end;
class procedure TConnectionThread.ReadFromBuffer(out Value: TStrings;
const Buffer: TBuffer; var pos: integer);
var
i,w,d: dword;
s: string;
begin
Value := TStringList.Create;
ReadFromBuffer(w,Buffer,pos);
if w=0 then exit;
for i := 0 to w-1 do
begin
ReadFromBuffer(s,Buffer,pos);
ReadFromBuffer(d,Buffer,pos);
Value.AddObject(s, TObject(PtrInt(d)));
end;
end;
class procedure TConnectionThread.ReadFromBuffer(out Value: TParamArray;
const Buffer: TBuffer; var pos: integer);
var
l,i: DWORD;
begin
ReadFromBuffer(l,Buffer,pos);
SetLength(Value,l);
if l=0 then exit;
for i := 0 to l-1 do
ReadFromBuffer(Value[i],Buffer,pos);
end;
class procedure TConnectionThread.InitBuffer(buffSize: dword; out
Buffer: TBuffer; out pos: integer);
begin
SetLength(Buffer,buffSize);
pos := 0;
end;
procedure TConnectionThread.log(ALevel: TLogLevel; msg: string);
begin
if assigned(fOwner) then
fOwner.log(ALevel, self,Role+#09+ GuidToString(ID)+#09+msg);
end;
procedure TConnectionThread.SendBuffer(const Buffer: TBuffer; Len: dword);
var
p,t: PByte;
l,rem,i: integer;
part_id,tmp: QWORD;
b2: array[0..7] of byte;
begin
log(mtExtra,'Send buffer '+inttostr(len));
try
rem := len+Sizeof(dword)+Sizeof(QWord);
p := GetMem(rem);
try
t := p;
CopyBytes(t,PacketStart);
CopyBytes(t,len);
CopyBytes(t,Buffer);
t := p;
repeat
l := Socket.send(t^,rem);
dec(rem,l);
inc(t,l);
if l=0 then
sleep(100);
until terminated or (rem<=0);
finally
//log(format('%p',[p]));
freeMem(p);
end;
except on e:Exception do
begin
log(mtError,'!!ERROR SendBuffer '+e.message);
raise;
end;
end;
end;
function TConnectionThread.ReceiveBuffer(var Buffer: TBuffer; out Len: dword
): boolean;
var
p: PByte;
i,l,rem: integer;
lbytes: array[0..3] of byte;
b2: array[0..7] of byte;
part_id: QWORD;
begin
result := false;
if Terminated then
begin
log(mtExtra,'ReceiveBuffer terminated');
exit;
end;
try
Cache.Read(part_id);
if Part_id<>PacketStart then
begin
log(mtError,'ReceiveBuffer PacketStart '+inttohex(part_id,16));
exit;
end;
Cache.Read(len);
setlength(Buffer,len);
if len=0 then
begin
log(mtError,'ReceiveBuffer Length=0');
exit;
end;
rem := len;
p := PByte(Buffer);
repeat
l := Cache.pop(p^,rem);
dec(rem,l);
inc(p,l);
if Terminated then exit;
until terminated or (rem<=0) ;
log(mtExtra,'Receive buffer '+inttostr(len));
result := true;
except on e:Exception do
begin
log(mtError,'!!ERROR ReceiveBuffer '+e.message);
raise;
end;
end;
end;
procedure TConnectionThread.SendHeader(packetType, state: byte; Code: DWORD;
QP: QWORD; SP: string);
var
Buffer: TBuffer;
pos: integer;
begin
try
log(mtExtra,format('SendHeader(%d) Code=%d, Param=%x, Command=%s',[packettype,Code,QP,SP]));
InitBuffer(sizeof(BYTE)*(Length(SP)+2)+16+2*sizeof(QWord)+sizeof(DWORD)*2,Buffer,pos);
AddToBuffer(packetType,Buffer,pos);
AddToBuffer(state,Buffer,pos);
AddToBuffer(ID,Buffer,pos);
AddToBuffer(recNo,Buffer,pos);
AddToBuffer(Code,Buffer,pos);
AddToBuffer(QP,Buffer,pos);
AddToBuffer(SP,Buffer,pos);
SendBuffer(Buffer,pos);
except on e:Exception do
begin
log(mtError,'!!ERROR SendHeader '+e.message);
raise;
end;
end;
end;
function TConnectionThread.ReceiveHeader(out packetType, state: byte; out
Sender: TGUID; out num: QWORD; out Code: DWORD; out QP: QWORD; out SP: string
): boolean;
var
Buffer: TBuffer;
len: dword;
pos: integer;
begin
result := false;
if Terminated then exit;
try
if not ReceiveBuffer(Buffer,len) then exit;
pos := 0;
ReadFromBuffer(packetType,Buffer,pos);
ReadFromBuffer(State,Buffer,pos);
ReadFromBuffer(Sender,Buffer,pos);
ReadFromBuffer(num,Buffer,pos);
ReadFromBuffer(Code,Buffer,pos);
ReadFromBuffer(QP,Buffer,pos);
ReadFromBuffer(SP,Buffer,pos);
log(mtExtra,format('ReceiveHeader(%d) Code=%d, Param=%x, Command=%s',[packettype,Code,QP,SP]));
result := true;
except on e:Exception do
begin
log(mtError,'!!ERROR ReceiveHeader '+e.message);
raise;
end;
end;
end;
procedure TConnectionThread.SendMessage(const mode: byte;
const Status: DWORD; const QParam: QWord; const sParam: string);
begin
SendHeader(0,mode,Status,QParam,sParam);
end;
procedure TConnectionThread.SendMessage(const mode: byte;
const Status: DWORD; const QParam: QWord; const AValue: string;
const AKeys: TStrings);
begin
if assigned(AKeys) then
begin
SendHeader(1,mode,Status,QParam,AValue);
SendData(1,AKeys);
end
else
SendHeader(0,mode,Status,QParam,AValue);
end;
procedure TConnectionThread.SendMessage(const mode: byte;
const CommandID: DWORD; const QParam: QWord; const AValue: string;
const AKeys: TStrings; const IntData: TParamArray; const AData: TStream);
begin
try
if assigned(AKeys) and assigned(AData) and (length(IntData)>0) then
begin
SendHeader(7,mode,CommandID,QParam,AValue);
SendData(1,AKeys);
SendData(2,IntData);
SendData(3,AData);
end
else if (length(IntData)>0) and assigned(AKeys) then
begin
SendHeader(6,mode,CommandID,QParam,AValue);
SendData(1,AKeys);
SendData(2,IntData);
end
else if (length(IntData)>0) and assigned(AData) then
begin
SendHeader(5,mode,CommandID,QParam,AValue);
SendData(1,IntData);
SendData(2,AData);
end
else if assigned(AKeys) and assigned(AData) then
begin
SendHeader(4,mode,CommandID,QParam,AValue);
SendData(1,AKeys);
SendData(2,AData);
end
else if length(IntData)>0 then
begin
SendHeader(3,mode,CommandID,QParam,AValue);
SendData(1,IntData);
end
else if assigned(AData) then
begin
SendHeader(2,mode,CommandID,QParam,AValue);
SendData(2,AData);
end
else if assigned(AKeys) then
begin
SendHeader(1,mode,CommandID,QParam,AValue);
SendData(1,AKeys);
end
else
SendHeader(0,mode,CommandID,QParam,AValue);
except on E:exception do
begin
log(mtError,format('send error(%s) %s',[e.classname,e.message]));
raise;
end;
end;
end;
procedure TConnectionThread.SendData(part: byte; const Data: TStream);
var
Buffer: TBuffer;
pos: integer;
footer: dWORD;
begin
try
log(mtExtra,'Send Stream '+inttostr(Data.Size));
setLength(Buffer,1+Data.Size+8);
pos := 0;
AddToBuffer(part,Buffer,pos);
AddToBuffer(Data,Buffer,pos);
SendBuffer(Buffer,pos);
except on e:Exception do
begin
log(mtError,'TConnectionThread.SendData '+e.message);
raise;
end;
end;
end;
procedure TConnectionThread.SendData(part: byte; const Data: TBuffer);
var
Buffer: TBuffer;
pos: integer;
footer: dWORD;
begin
log(mtExtra,'Send Buffer '+inttostr(length(Data)));
try
setLength(Buffer,length(Data)+4+1);
pos := 0;
AddToBuffer(part,Buffer,pos);
AddToBuffer(Data,Buffer,pos);
SendBuffer(Buffer,pos);
except on e:Exception do
begin
log(mtError,'TConnectionThread.SendData '+e.message);
raise;
end;
end;
end;
procedure TConnectionThread.SendData(part: byte; const Data: TStrings);
var
Buffer: TBuffer;
pos: integer;
w,footer: dWORD;
len,i: integer;
begin
try
log(mtExtra,'Send Strings');
LogStrings(mtExtra,fOwner.flogger,self,'KEYS',Data);
len := 1+4+8*Data.Count;
for i:=0 to Data.Count-1 do
inc(len,length(Data[i]));
setLength(Buffer,len);
pos := 0;
AddToBuffer(part,Buffer,pos);
w := Data.Count;
AddToBuffer(w,Buffer,pos);
for i := 0 to Data.Count-1 do
begin
AddToBuffer(Data[i],Buffer,pos);
w := ptrInt(Pointer(Data.Objects[i])) and $FFFFFFFF;
AddToBuffer(w,Buffer,pos);
end;
SendBuffer(Buffer,pos);
except on e:Exception do
begin
log(mtError,'TConnectionThread.SendData '+e.message);
raise;
end;
end;
end;
procedure TConnectionThread.SendData(part: byte;
const Data: TParamArray);
var
i,pos: integer;
len: DWORD;
Buffer: TBuffer;
begin
len := length(Data);
log(mtExtra,'Send ParamArray '+inttostr(length(Data)));
InitBuffer(Sizeof(byte)+(len+1)*SizeOf(DWORD),Buffer,pos);
AddToBuffer(part,Buffer,pos);
AddToBuffer(len,Buffer,pos);
for i := low(Data) to High(Data) do
AddToBuffer(Data[i],Buffer,pos);
SendBuffer(Buffer,pos);
end;
function TConnectionThread.ReceiveMessage(out mode: byte; out Sender: TGUID;
out rNum: QWord; out CommandID: DWORD; out QParam: QWord; out Value: string;
out intData: TParamArray; out Keys: TStrings; out Data: TStream): boolean;
var
Buffer: TBuffer;
Len: dword;
pos: integer;
s: string;
b,b1: byte;
begin
result := false;
setlength(intData,0);
Keys := nil;
Data := nil;
if Terminated then exit;
try
log(mtExtra,'ReceiveMessage');
if not ReceiveHeader(b,mode,Sender,rNum,CommandID,QParam,Value) then exit;
if Terminated then exit;
case b of
1: begin
if not ReceiveBuffer(Buffer,len) then exit;
pos := 0;
ReadFromBuffer(b,Buffer,pos);
if b<>1 then raise EFormatException.Create('');
ReadFromBuffer(Keys,Buffer,pos);
LogStrings(mtExtra, fOwner.flogger,self,'KEYS',Keys);
end;
2: begin
if not ReceiveBuffer(Buffer,len) then exit;
pos := 0;
ReadFromBuffer(b,Buffer,pos);
if b<>1 then raise EFormatException.Create('');
ReadFromBuffer(Data,Buffer,pos);
end;
3: begin
if not ReceiveBuffer(Buffer,len) then exit;
pos := 0;
ReadFromBuffer(b,Buffer,pos);
if b<>1 then raise EFormatException.Create('');
ReadFromBuffer(intData,Buffer,pos);
end;
4: begin
if not ReceiveBuffer(Buffer,len) then exit;
pos := 0;
ReadFromBuffer(b,Buffer,pos);
if b<>1 then raise EFormatException.Create('');
ReadFromBuffer(Keys,Buffer,pos);
LogStrings(mtExtra,fOwner.flogger,self,'Values',Keys);
if not ReceiveBuffer(Buffer,len) then exit;
pos := 0;
ReadFromBuffer(b,Buffer,pos);
if b<>2 then raise EFormatException.Create('');
ReadFromBuffer(Data,Buffer,pos);
end;
5: begin
if not ReceiveBuffer(Buffer,len) then exit;
pos := 0;
ReadFromBuffer(b,Buffer,pos);
if b<>1 then raise EFormatException.Create('');
ReadFromBuffer(intData,Buffer,pos);
if not ReceiveBuffer(Buffer,len) then exit;
pos := 0;
ReadFromBuffer(b,Buffer,pos);
if b<>2 then raise EFormatException.Create('');
ReadFromBuffer(Data,Buffer,pos);
end;
6: begin
if not ReceiveBuffer(Buffer,len) then exit;
pos := 0;
ReadFromBuffer(b,Buffer,pos);
if b<>1 then raise EFormatException.Create('');
ReadFromBuffer(Keys,Buffer,pos);
if not ReceiveBuffer(Buffer,len) then exit;
pos := 0;
ReadFromBuffer(b,Buffer,pos);
if b<>2 then raise EFormatException.Create('');
ReadFromBuffer(intData,Buffer,pos);
end;
7: begin
if not ReceiveBuffer(Buffer,len) then exit;
pos := 0;
ReadFromBuffer(b,Buffer,pos);
if b<>1 then raise EFormatException.Create('');
ReadFromBuffer(Keys,Buffer,pos);
if not ReceiveBuffer(Buffer,len) then exit;
pos := 0;
ReadFromBuffer(b,Buffer,pos);
if b<>2 then raise EFormatException.Create('');
ReadFromBuffer(intData,Buffer,pos);
if not ReceiveBuffer(Buffer,len) then exit;
pos := 0;
ReadFromBuffer(b,Buffer,pos);
if b<>3 then raise EFormatException.Create('');
ReadFromBuffer(Data,Buffer,pos);
end;
end;
result := true;
except on e:Exception do
begin
log(mtError,'!!ERROR ReceiveMessage '+e.message);
raise;
end;
end;
end;
constructor TConnectionThread.Create(Aowner: TMainThread; ASocket: TLSocket);
var
i,d1,d2 : integer;
begin
inherited Create(true);
//FreeOnTerminate:=true;
fCache := TRoundBuffer.Create(@AOwner.log, 1024*1024);
fSocket := ASocket;
fOwner := AOwner;
CreateGuid(ID);
recNo := 0;
log(mtExtra,'Create client thread ');
end;
destructor TConnectionThread.Destroy;
begin
log(mtExtra,'destroy');
fCache.Free;
inherited Destroy;
end;
procedure TConnectionThread.Execute;
var
Sender: TGUID;
num,Param: QWORD;
CommandID: DWORD;
Value: string;
intData: TParamArray;
Keys: TStrings;
Data: TStream;
mode: byte;
begin
log(mtExtra,'start thread');
while not terminated do
begin
if cache.ReadReady.WaitFor(10000)<>wrSignaled then
begin
log(mtDebug,'TConnectionThread.Wait');
sleep(100);
continue;
end;
if terminated then break;
if not Socket.Connected then break;
Keys := nil;
Data := nil;
try
if ReceiveMessage(mode,Sender,num,CommandID,Param,Value,intData,Keys,Data) then
try
ProcessMessage(mode,CommandID,Param,Value,Keys,intData,Data);
except on e: Exception do
begin
log(mtError,'TConnectionThread.Execute'+e.message);
terminate;
end;
end;
finally
if assigned(Keys) then Keys.Free;
if assigned(Data) then Data.Free;
setLength(intData,0);
end;
end;
Cache.Close;
log(mtExtra,'terminated');
//Socket.Disconnect();
end;
procedure TConnectionThread.TerminatedSet;
begin
log(mtExtra,'terminate required');
Cache.Close;
end;
end.