1085 lines
28 KiB
ObjectPascal
1085 lines
28 KiB
ObjectPascal
unit tcpthreadhelper;
|
|
|
|
{$mode ObjFPC}{$H+}
|
|
|
|
interface
|
|
|
|
uses
|
|
Classes, SysUtils,lNet,lnetbase, syncobjs, extTypes;
|
|
|
|
type
|
|
|
|
TMainThread=class;
|
|
TConnectionThread=class;
|
|
|
|
{ TConnectionThread }
|
|
|
|
TConnectionThread=class(TThread)
|
|
private
|
|
fSocket: TLSocket;
|
|
fCache: TRoundBuffer;
|
|
fOwner: TMainThread;
|
|
class function BufferToString(const Buffer: TBuffer; const len: integer): string;
|
|
class procedure AddToBuffer(const Value: byte; var Buffer: TBuffer; var pos: integer); overLoad;
|
|
class procedure AddToBuffer(const Value: word; var Buffer: TBuffer; var pos: integer); overLoad;
|
|
class procedure AddToBuffer(const Value: dword; var Buffer: TBuffer; var pos: integer); overLoad;
|
|
class procedure AddToBuffer(const Value: QWord; var Buffer: TBuffer; var pos: integer); overLoad;
|
|
class procedure AddToBuffer(const Value: string; var Buffer: TBuffer; var pos: integer); overLoad;
|
|
class procedure AddToBuffer(const Value: TGUID; var Buffer: TBuffer; var pos: integer); overLoad;
|
|
class procedure AddToBuffer(const Value: TBuffer; var Buffer: TBuffer; var pos: integer); overLoad;
|
|
class procedure AddToBuffer(const Value: TStream; var Buffer: TBuffer; var pos: integer); overLoad;
|
|
class procedure AddToBuffer(const Value: TParamArray; var Buffer: TBuffer; var pos: integer); overLoad;
|
|
|
|
class procedure ReadFromBuffer(var Value: byte; const Buffer: TBuffer; var pos: integer); overLoad;
|
|
class procedure ReadFromBuffer(var Value: word; const Buffer: TBuffer; var pos: integer); overLoad;
|
|
class procedure ReadFromBuffer(var Value: dword; const Buffer: TBuffer; var pos: integer); overLoad;
|
|
class procedure ReadFromBuffer(var Value: QWord; const Buffer: TBuffer; var pos: integer); overLoad;
|
|
class procedure ReadFromBuffer(var Value: string; const Buffer: TBuffer; var pos: integer); overLoad;
|
|
class procedure ReadFromBuffer(var Value: TGUID; const Buffer: TBuffer; var pos: integer); overLoad;
|
|
class procedure ReadFromBuffer(var Value: TBuffer; const Buffer: TBuffer; var pos: integer); overLoad;
|
|
class procedure ReadFromBuffer(out Value: TStream; const Buffer: TBuffer; var pos: integer); overLoad;
|
|
class procedure ReadFromBuffer(out Value: TStrings; const Buffer: TBuffer; var pos: integer); overLoad;
|
|
class procedure ReadFromBuffer(out Value: TParamArray; const Buffer: TBuffer; var pos: integer); overLoad;
|
|
class procedure InitBuffer(buffSize: dword; out Buffer: TBuffer; out pos: integer);
|
|
public
|
|
ID: TGUID;
|
|
recNo: qword;
|
|
property Owner: TMainThread read fOwner;
|
|
property Socket:TLSocket read fSocket;
|
|
property Cache: TRoundBuffer read fCache;
|
|
procedure log(ALevel: TLogLevel; msg: string);
|
|
procedure ProcessMessage(const mode: byte;const Code:DWORD; const Param:QWord; const ACommand: string;const Values: TStrings; const intData: TParamArray; const Data: TStream); virtual; abstract;
|
|
class function Role: string; virtual; abstract;
|
|
procedure SendBuffer(const Buffer: TBuffer; Len: dword);
|
|
function ReceiveBuffer(var Buffer: TBuffer; out Len: dword): boolean;
|
|
|
|
procedure SendHeader(packetType,state: byte; Code:DWORD; QP:QWORD;SP:string);
|
|
function ReceiveHeader(out packetType,state: byte;out Sender:TGUID; out num:QWORD; out Code:DWORD;out QP:QWORD;out SP:string): boolean;
|
|
|
|
procedure SendData(part: byte; const Data: TStream); overload;
|
|
procedure SendData(part: byte; const Data: TBuffer); overload;
|
|
procedure SendData(part: byte; const Data: TStrings); overload;
|
|
procedure SendData(part: byte; const Data: TParamArray); overload;
|
|
|
|
procedure SendMessage(const mode: byte; const Status: DWORD; const QParam: QWord; const sParam: string); overload;
|
|
procedure SendMessage(const mode: byte; const Status: DWORD; const QParam: QWord;const AValue: string; const AKeys: TStrings); overload;
|
|
procedure SendMessage(const mode: byte; const CommandID: DWORD;const QParam: QWord; const AValue: string; const AKeys: TStrings;const IntData: TParamArray; const AData: TStream); overload;
|
|
|
|
|
|
|
|
function ReceiveMessage(out mode: byte;out Sender: TGUID; out rNum: QWord;out CommandID: DWORD; out QParam:QWord; out Value: string; out intData: TParamArray; out Keys: TStrings; out Data: TStream): boolean; virtual;
|
|
|
|
constructor Create(Aowner: TMainThread; ASocket:TLSocket);
|
|
destructor Destroy; override;
|
|
procedure Execute;override;
|
|
procedure TerminatedSet; override;
|
|
end;
|
|
TConnectionThreadClass = class of TConnectionThread;
|
|
|
|
{ TMainThread }
|
|
|
|
TMainThread=class(TThread)
|
|
private
|
|
fCon: TLTCP;
|
|
fPort: integer;
|
|
fclients: TList;
|
|
flogger: TLogger;
|
|
fThreadClass: TConnectionThreadClass;
|
|
fStarted:boolean;
|
|
fComplete: boolean;
|
|
|
|
function getThread(index: TLSocket): TConnectionThread;
|
|
|
|
protected
|
|
procedure Log(ALevel: TLogLevel; Sender:TObject; msg: string);
|
|
procedure doStart; virtual;
|
|
procedure TerminateClients;
|
|
property Complete: boolean read fComplete;
|
|
public
|
|
property Port: integer read fPort;
|
|
property Connect: TLTCP read fCon;
|
|
property Client[index: TLSocket]: TConnectionThread read getThread;
|
|
procedure RemoveClient(clt:TConnectionThread);
|
|
procedure dataReady(aSocket: TLSocket);
|
|
procedure ProcessConnect(thread: TConnectionThread); virtual;
|
|
procedure ProcessAccept(thread: TConnectionThread); virtual;
|
|
procedure Accept(aSocket: TLSocket);
|
|
procedure doDisconnect(aSocket: TLSocket);
|
|
procedure doConnect(aSocket: TLSocket);
|
|
procedure TerminatedSet;override;
|
|
procedure NetError(const msg: string; aSocket: TLSocket);
|
|
constructor Create(AThreadClass: TConnectionThreadClass; ALogger: TLogger; APort: integer);
|
|
destructor Destroy; override;
|
|
procedure SetComplete;
|
|
|
|
end;
|
|
|
|
|
|
{ TClientRequest }
|
|
|
|
|
|
implementation
|
|
uses
|
|
lCommon;
|
|
|
|
|
|
|
|
function TMainThread.getThread(index: TLSocket): TConnectionThread;
|
|
var
|
|
clt: TConnectionThread;
|
|
i: integer;
|
|
begin
|
|
for i := 0 to fclients.Count-1 do
|
|
if TConnectionThread(fclients[i]).Socket=index then
|
|
begin
|
|
result := TConnectionThread(fclients[i]);
|
|
log(mtExtra,self,format('getThread(%d) %s',[index.Handle,guidToString(result.ID)]));
|
|
exit;
|
|
end;
|
|
result := fThreadClass.Create(self,index);
|
|
log(mtExtra,self,format('new Thread(%d) %s',[index.Handle,guidToString(result.ID)]));
|
|
fclients.Add(Result);
|
|
|
|
end;
|
|
|
|
procedure TMainThread.TerminateClients;
|
|
var
|
|
i: integer;
|
|
clt: TConnectionThread;
|
|
begin
|
|
log(mtExtra,Self,'Terminate Clients');
|
|
for i := fclients.Count-1 downto 0 do
|
|
begin
|
|
sleep(0);
|
|
clt := TConnectionThread(fclients[i]);
|
|
try
|
|
log(mtExtra,self,GuidToString(clt.ID));
|
|
clt.Terminate;
|
|
clt.WaitFor;
|
|
clt.free;
|
|
except on e: Exception do
|
|
begin
|
|
log(mtError,self, '!!ERROR Destroy ' + e.Message);
|
|
end;
|
|
end;
|
|
end;
|
|
fClients.Clear;
|
|
|
|
end;
|
|
|
|
procedure TMainThread.Log(ALevel: TLogLevel; Sender: TObject; msg: string);
|
|
begin
|
|
if assigned(fLogger) then
|
|
fLogger(ALevel, Sender,Msg);
|
|
end;
|
|
|
|
procedure TMainThread.doStart;
|
|
begin
|
|
fStarted := true;
|
|
fComplete:=false;
|
|
end;
|
|
|
|
procedure TMainThread.RemoveClient(clt: TConnectionThread);
|
|
begin
|
|
fclients.Remove(clt);
|
|
end;
|
|
|
|
|
|
procedure TMainThread.dataReady(aSocket: TLSocket);
|
|
var
|
|
clt: TConnectionThread;
|
|
begin
|
|
log(mtExtra,self,'dataReady');
|
|
if Terminated then exit;
|
|
|
|
clt := Client[aSocket];
|
|
clt.Cache.WriteReady.WaitFor(INFINITE);
|
|
while clt.Cache.ReadFromSocket(aSocket)<>0 do
|
|
begin
|
|
sleep(0);
|
|
end;
|
|
end;
|
|
|
|
procedure TMainThread.ProcessConnect(thread: TConnectionThread);
|
|
begin
|
|
|
|
end;
|
|
|
|
procedure TMainThread.ProcessAccept(thread: TConnectionThread);
|
|
begin
|
|
|
|
end;
|
|
|
|
procedure TMainThread.Accept(aSocket: TLSocket);
|
|
var
|
|
clt: TConnectionThread;
|
|
begin
|
|
log(mtExtra,self,'connect');
|
|
if Terminated then exit;
|
|
clt := Client[aSocket];
|
|
log(mtExtra,self,format('connected %s on %d ',[GUIDToString(clt.ID), aSocket.Handle]));
|
|
|
|
ProcessAccept(clt);
|
|
clt.start;
|
|
end;
|
|
|
|
procedure TMainThread.doDisconnect(aSocket: TLSocket);
|
|
var
|
|
clt: TConnectionThread;
|
|
begin
|
|
if terminated then exit;
|
|
log(mtExtra,self,'disconnect');
|
|
try
|
|
clt := Client[aSocket];
|
|
if clt.terminated then exit;
|
|
log(mtExtra,self,format('disconnected %s on %d ',[GUIDToString(clt.ID), aSocket.Handle]));
|
|
clt.Terminate;
|
|
fclients.remove(clt);
|
|
|
|
except on e: Exception do
|
|
begin
|
|
log(mtError,self,'!!ERROR doDisconnect '+e.Message);
|
|
raise;
|
|
end;
|
|
end;
|
|
end;
|
|
|
|
procedure TMainThread.doConnect(aSocket: TLSocket);
|
|
var
|
|
clt: TConnectionThread;
|
|
begin
|
|
log(mtExtra,self,'doConnect');
|
|
if Terminated then exit;
|
|
clt := Client[aSocket];
|
|
log(mtExtra,self,format('connected %s on %d ',[GUIDToString(clt.ID), aSocket.Handle]));
|
|
ProcessConnect(clt);
|
|
clt.Start;
|
|
end;
|
|
|
|
procedure TMainThread.TerminatedSet;
|
|
begin
|
|
inherited TerminatedSet();
|
|
|
|
end;
|
|
|
|
|
|
|
|
|
|
procedure TMainThread.NetError(const msg: string; aSocket: TLSocket);
|
|
begin
|
|
if assigned(aSocket) then
|
|
log(mtWarning, self,'!!NETERROR on '+inttostr(aSocket.Handle)+#09+msg)
|
|
else
|
|
log(mtWarning, self,'!!NETERROR '+msg);
|
|
end;
|
|
|
|
constructor TMainThread.Create(AThreadClass: TConnectionThreadClass;
|
|
ALogger: TLogger; APort: integer);
|
|
begin
|
|
inherited Create(true);
|
|
fStarted := false;
|
|
fThreadClass:=AThreadClass;
|
|
fCon := TLTcp.Create(nil);
|
|
fclients := TList.Create;
|
|
fLogger := ALogger;
|
|
fPort := APort;
|
|
|
|
Connect.OnDisconnect:=@doDisconnect;
|
|
Connect.OnReceive:=@dataReady;
|
|
Connect.Timeout:=100;
|
|
log(mtExtra,self,'create main thread');
|
|
end;
|
|
|
|
destructor TMainThread.Destroy;
|
|
begin
|
|
fClients.Free;
|
|
fCon.Free;
|
|
Inherited Destroy;
|
|
end;
|
|
|
|
procedure TMainThread.SetComplete;
|
|
begin
|
|
fComplete:=true;
|
|
end;
|
|
|
|
|
|
|
|
{ TConnectionThread }
|
|
|
|
|
|
|
|
|
|
class function TConnectionThread.BufferToString(const Buffer: TBuffer;
|
|
const len: integer): string;
|
|
var
|
|
i: integer;
|
|
begin
|
|
result := '';
|
|
for i := 1 to len do
|
|
begin
|
|
result := result + IntToHex(Buffer[i-1],2)+' ';
|
|
if (i mod 64)=0 then
|
|
result := result +#13#10;
|
|
end;
|
|
|
|
end;
|
|
|
|
class procedure TConnectionThread.AddToBuffer(const Value: byte;
|
|
var Buffer: TBuffer; var pos: integer);
|
|
begin
|
|
Buffer[pos] := Value;
|
|
inc(pos);
|
|
end;
|
|
|
|
class procedure TConnectionThread.AddToBuffer(const Value: word;
|
|
var Buffer: TBuffer; var pos: integer);
|
|
var
|
|
i: integer;
|
|
b: byte;
|
|
begin
|
|
for i := 0 to 1 do
|
|
begin
|
|
b := (Value shr (8*i)) and $FF;
|
|
Buffer[pos] := b;
|
|
inc(pos);
|
|
end;
|
|
end;
|
|
|
|
|
|
class procedure TConnectionThread.AddToBuffer(const Value: dword;
|
|
var Buffer: TBuffer; var pos: integer);
|
|
var
|
|
i: integer;
|
|
b: byte;
|
|
begin
|
|
for i := 0 to 3 do
|
|
begin
|
|
b := (Value shr (8*i)) and $FF;
|
|
Buffer[pos] := b;
|
|
inc(pos);
|
|
end;
|
|
end;
|
|
|
|
class procedure TConnectionThread.AddToBuffer(const Value: QWord;
|
|
var Buffer: TBuffer; var pos: integer);
|
|
var
|
|
i: integer;
|
|
b: Byte;
|
|
begin
|
|
for i := 0 to 7 do
|
|
begin
|
|
b := (Value shr (8*i)) and $FF;
|
|
Buffer[pos] := b;
|
|
inc(pos);
|
|
end;
|
|
end;
|
|
|
|
class procedure TConnectionThread.AddToBuffer(const Value: string;
|
|
var Buffer: TBuffer; var pos: integer);
|
|
var
|
|
len,i: DWORD;
|
|
p: PChar;
|
|
b: byte;
|
|
begin
|
|
len := Length(Value);
|
|
AddToBuffer(Len,Buffer,pos);
|
|
p := PChar(Value);
|
|
for i := 1 to len do
|
|
begin
|
|
b :=byte(p^);
|
|
AddToBuffer(b,Buffer,pos);
|
|
inc(p);
|
|
end;
|
|
end;
|
|
|
|
class procedure TConnectionThread.AddToBuffer(const Value: TGUID;
|
|
var Buffer: TBuffer; var pos: integer);
|
|
var
|
|
i: integer;
|
|
begin
|
|
AddToBuffer(Value.D1,Buffer,pos);
|
|
AddToBuffer(Value.D2,Buffer,pos);
|
|
AddToBuffer(Value.D3,Buffer,pos);
|
|
for i := 0 to 7 do
|
|
AddToBuffer(Value.D4[i],Buffer,pos);
|
|
end;
|
|
|
|
class procedure TConnectionThread.AddToBuffer(const Value: TBuffer;
|
|
var Buffer: TBuffer; var pos: integer);
|
|
var
|
|
len: DWORD;
|
|
i: integer;
|
|
begin
|
|
len := length(Value);
|
|
AddToBuffer(len,Buffer,pos);
|
|
for i := 0 to len-1 do
|
|
AddToBuffer(Value[i],Buffer,pos);
|
|
end;
|
|
|
|
class procedure TConnectionThread.AddToBuffer(const Value: TStream;
|
|
var Buffer: TBuffer; var pos: integer);
|
|
var
|
|
len: QWORD;
|
|
i: integer;
|
|
b: byte;
|
|
begin
|
|
len := Value.Size;
|
|
AddToBuffer(len,Buffer,pos);
|
|
Value.seek(0,soFromBeginning);
|
|
for i := 0 to len-1 do
|
|
begin
|
|
Value.Read(b,1);
|
|
AddToBuffer(b,Buffer,pos);
|
|
end;
|
|
end;
|
|
|
|
class procedure TConnectionThread.AddToBuffer(const Value: TParamArray;
|
|
var Buffer: TBuffer; var pos: integer);
|
|
var
|
|
l,i: DWORD;
|
|
begin
|
|
l := Length(Value);
|
|
AddToBuffer(l,Buffer,pos);
|
|
for i := low(Value) to high(Value) do
|
|
AddToBuffer(Value[i],Buffer,pos);
|
|
end;
|
|
|
|
class procedure TConnectionThread.ReadFromBuffer(var Value: byte;
|
|
const Buffer: TBuffer; var pos: integer);
|
|
begin
|
|
Value := Buffer[pos];
|
|
inc(pos);
|
|
end;
|
|
|
|
class procedure TConnectionThread.ReadFromBuffer(var Value: word;
|
|
const Buffer: TBuffer; var pos: integer);
|
|
var
|
|
i: integer;
|
|
begin
|
|
Value := 0;
|
|
for i := 0 to 1 do
|
|
begin
|
|
Value := Value or (Buffer[pos] shl (8*i));
|
|
inc(pos);
|
|
end;
|
|
end;
|
|
|
|
class procedure TConnectionThread.ReadFromBuffer(var Value: dword;
|
|
const Buffer: TBuffer; var pos: integer);
|
|
var
|
|
i: integer;
|
|
begin
|
|
Value := 0;
|
|
for i := 0 to 3 do
|
|
begin
|
|
Value := Value or (Buffer[pos] shl (8*i));
|
|
inc(pos);
|
|
end;
|
|
end;
|
|
|
|
class procedure TConnectionThread.ReadFromBuffer(var Value: QWord;
|
|
const Buffer: TBuffer; var pos: integer);
|
|
var
|
|
i: integer;
|
|
begin
|
|
Value := 0;
|
|
for i := 0 to 7 do
|
|
begin
|
|
Value := Value or (Buffer[pos] shl (8*i));
|
|
inc(pos);
|
|
end;
|
|
end;
|
|
|
|
|
|
class procedure TConnectionThread.ReadFromBuffer(var Value: string;
|
|
const Buffer: TBuffer; var pos: integer);
|
|
var
|
|
len: DWORD;
|
|
i: integer;
|
|
p: PChar;
|
|
begin
|
|
ReadFromBuffer(Len,Buffer,pos);
|
|
Value := StringOfChar(' ',len);
|
|
for i := 1 to len do
|
|
begin
|
|
Value[i] := chr(Buffer[pos]);
|
|
inc(pos);
|
|
end;
|
|
end;
|
|
|
|
class procedure TConnectionThread.ReadFromBuffer(var Value: TGUID;
|
|
const Buffer: TBuffer; var pos: integer);
|
|
var
|
|
i: integer;
|
|
begin
|
|
ReadFromBuffer(Value.D1,Buffer,pos);
|
|
ReadFromBuffer(Value.D2,Buffer,pos);
|
|
ReadFromBuffer(Value.D3,Buffer,pos);
|
|
for i := 0 to 7 do
|
|
ReadFromBuffer(Value.D4[i],Buffer,pos);
|
|
end;
|
|
|
|
class procedure TConnectionThread.ReadFromBuffer(var Value: TBuffer;
|
|
const Buffer: TBuffer; var pos: integer);
|
|
var
|
|
len: DWORD;
|
|
i: integer;
|
|
begin
|
|
ReadFromBuffer(len,Buffer,pos);
|
|
setLength(Value,len);
|
|
for i := 0 to len-1 do
|
|
ReadFromBuffer(Value[i],Buffer,pos);
|
|
end;
|
|
|
|
class procedure TConnectionThread.ReadFromBuffer(out Value: TStream;
|
|
const Buffer: TBuffer; var pos: integer);
|
|
var
|
|
len: QWORD;
|
|
i: integer;
|
|
b: byte;
|
|
begin
|
|
Value := TMemoryStream.Create;
|
|
ReadFromBuffer(len,Buffer,pos);
|
|
if len=0 then exit;
|
|
for i := 0 to len-1 do
|
|
begin
|
|
ReadFromBuffer(b,Buffer,pos);
|
|
Value.Write(b,1);
|
|
end;
|
|
Value.seek(0,soFromBeginning);
|
|
end;
|
|
|
|
class procedure TConnectionThread.ReadFromBuffer(out Value: TStrings;
|
|
const Buffer: TBuffer; var pos: integer);
|
|
var
|
|
i,w,d: dword;
|
|
s: string;
|
|
begin
|
|
Value := TStringList.Create;
|
|
ReadFromBuffer(w,Buffer,pos);
|
|
if w=0 then exit;
|
|
for i := 0 to w-1 do
|
|
begin
|
|
ReadFromBuffer(s,Buffer,pos);
|
|
ReadFromBuffer(d,Buffer,pos);
|
|
Value.AddObject(s, TObject(PtrInt(d)));
|
|
end;
|
|
end;
|
|
|
|
class procedure TConnectionThread.ReadFromBuffer(out Value: TParamArray;
|
|
const Buffer: TBuffer; var pos: integer);
|
|
var
|
|
l,i: DWORD;
|
|
begin
|
|
ReadFromBuffer(l,Buffer,pos);
|
|
SetLength(Value,l);
|
|
if l=0 then exit;
|
|
for i := 0 to l-1 do
|
|
ReadFromBuffer(Value[i],Buffer,pos);
|
|
end;
|
|
|
|
class procedure TConnectionThread.InitBuffer(buffSize: dword; out
|
|
Buffer: TBuffer; out pos: integer);
|
|
begin
|
|
SetLength(Buffer,buffSize);
|
|
pos := 0;
|
|
end;
|
|
|
|
|
|
procedure TConnectionThread.log(ALevel: TLogLevel; msg: string);
|
|
begin
|
|
if assigned(fOwner) then
|
|
fOwner.log(ALevel, self,Role+#09+ GuidToString(ID)+#09+msg);
|
|
end;
|
|
|
|
|
|
|
|
procedure TConnectionThread.SendBuffer(const Buffer: TBuffer; Len: dword);
|
|
var
|
|
p,t: PByte;
|
|
l,rem,i: integer;
|
|
part_id,tmp: QWORD;
|
|
b2: array[0..7] of byte;
|
|
begin
|
|
log(mtExtra,'Send buffer '+inttostr(len));
|
|
try
|
|
rem := len+Sizeof(dword)+Sizeof(QWord);
|
|
p := GetMem(rem);
|
|
try
|
|
t := p;
|
|
CopyBytes(t,PacketStart);
|
|
CopyBytes(t,len);
|
|
CopyBytes(t,Buffer);
|
|
t := p;
|
|
repeat
|
|
l := Socket.send(t^,rem);
|
|
dec(rem,l);
|
|
inc(t,l);
|
|
if l=0 then
|
|
sleep(100);
|
|
until terminated or (rem<=0);
|
|
|
|
finally
|
|
//log(format('%p',[p]));
|
|
freeMem(p);
|
|
end;
|
|
except on e:Exception do
|
|
begin
|
|
log(mtError,'!!ERROR SendBuffer '+e.message);
|
|
raise;
|
|
end;
|
|
end;
|
|
|
|
end;
|
|
|
|
function TConnectionThread.ReceiveBuffer(var Buffer: TBuffer; out Len: dword
|
|
): boolean;
|
|
var
|
|
p: PByte;
|
|
i,l,rem: integer;
|
|
lbytes: array[0..3] of byte;
|
|
b2: array[0..7] of byte;
|
|
part_id: QWORD;
|
|
begin
|
|
result := false;
|
|
if Terminated then
|
|
begin
|
|
log(mtExtra,'ReceiveBuffer terminated');
|
|
exit;
|
|
end;
|
|
try
|
|
Cache.Read(part_id);
|
|
if Part_id<>PacketStart then
|
|
begin
|
|
log(mtError,'ReceiveBuffer PacketStart '+inttohex(part_id,16));
|
|
exit;
|
|
end;
|
|
Cache.Read(len);
|
|
setlength(Buffer,len);
|
|
if len=0 then
|
|
begin
|
|
log(mtError,'ReceiveBuffer Length=0');
|
|
exit;
|
|
end;
|
|
rem := len;
|
|
p := PByte(Buffer);
|
|
repeat
|
|
l := Cache.pop(p^,rem);
|
|
dec(rem,l);
|
|
inc(p,l);
|
|
if Terminated then exit;
|
|
until terminated or (rem<=0) ;
|
|
log(mtExtra,'Receive buffer '+inttostr(len));
|
|
result := true;
|
|
except on e:Exception do
|
|
begin
|
|
log(mtError,'!!ERROR ReceiveBuffer '+e.message);
|
|
raise;
|
|
end;
|
|
end;
|
|
end;
|
|
|
|
|
|
procedure TConnectionThread.SendHeader(packetType, state: byte; Code: DWORD;
|
|
QP: QWORD; SP: string);
|
|
var
|
|
Buffer: TBuffer;
|
|
pos: integer;
|
|
begin
|
|
try
|
|
log(mtExtra,format('SendHeader(%d) Code=%d, Param=%x, Command=%s',[packettype,Code,QP,SP]));
|
|
InitBuffer(sizeof(BYTE)*(Length(SP)+2)+16+2*sizeof(QWord)+sizeof(DWORD)*2,Buffer,pos);
|
|
AddToBuffer(packetType,Buffer,pos);
|
|
AddToBuffer(state,Buffer,pos);
|
|
AddToBuffer(ID,Buffer,pos);
|
|
AddToBuffer(recNo,Buffer,pos);
|
|
AddToBuffer(Code,Buffer,pos);
|
|
AddToBuffer(QP,Buffer,pos);
|
|
AddToBuffer(SP,Buffer,pos);
|
|
SendBuffer(Buffer,pos);
|
|
|
|
except on e:Exception do
|
|
begin
|
|
log(mtError,'!!ERROR SendHeader '+e.message);
|
|
raise;
|
|
end;
|
|
end;
|
|
end;
|
|
|
|
function TConnectionThread.ReceiveHeader(out packetType, state: byte; out
|
|
Sender: TGUID; out num: QWORD; out Code: DWORD; out QP: QWORD; out SP: string
|
|
): boolean;
|
|
var
|
|
Buffer: TBuffer;
|
|
len: dword;
|
|
pos: integer;
|
|
begin
|
|
result := false;
|
|
if Terminated then exit;
|
|
try
|
|
if not ReceiveBuffer(Buffer,len) then exit;
|
|
pos := 0;
|
|
ReadFromBuffer(packetType,Buffer,pos);
|
|
ReadFromBuffer(State,Buffer,pos);
|
|
ReadFromBuffer(Sender,Buffer,pos);
|
|
ReadFromBuffer(num,Buffer,pos);
|
|
ReadFromBuffer(Code,Buffer,pos);
|
|
ReadFromBuffer(QP,Buffer,pos);
|
|
ReadFromBuffer(SP,Buffer,pos);
|
|
log(mtExtra,format('ReceiveHeader(%d) Code=%d, Param=%x, Command=%s',[packettype,Code,QP,SP]));
|
|
result := true;
|
|
|
|
except on e:Exception do
|
|
begin
|
|
log(mtError,'!!ERROR ReceiveHeader '+e.message);
|
|
raise;
|
|
end;
|
|
end;
|
|
end;
|
|
|
|
|
|
|
|
procedure TConnectionThread.SendMessage(const mode: byte;
|
|
const Status: DWORD; const QParam: QWord; const sParam: string);
|
|
begin
|
|
SendHeader(0,mode,Status,QParam,sParam);
|
|
end;
|
|
|
|
procedure TConnectionThread.SendMessage(const mode: byte;
|
|
const Status: DWORD; const QParam: QWord; const AValue: string;
|
|
const AKeys: TStrings);
|
|
begin
|
|
if assigned(AKeys) then
|
|
begin
|
|
SendHeader(1,mode,Status,QParam,AValue);
|
|
SendData(1,AKeys);
|
|
end
|
|
else
|
|
SendHeader(0,mode,Status,QParam,AValue);
|
|
end;
|
|
|
|
|
|
procedure TConnectionThread.SendMessage(const mode: byte;
|
|
const CommandID: DWORD; const QParam: QWord; const AValue: string;
|
|
const AKeys: TStrings; const IntData: TParamArray; const AData: TStream);
|
|
begin
|
|
|
|
try
|
|
if assigned(AKeys) and assigned(AData) and (length(IntData)>0) then
|
|
begin
|
|
SendHeader(7,mode,CommandID,QParam,AValue);
|
|
SendData(1,AKeys);
|
|
SendData(2,IntData);
|
|
SendData(3,AData);
|
|
end
|
|
else if (length(IntData)>0) and assigned(AKeys) then
|
|
begin
|
|
SendHeader(6,mode,CommandID,QParam,AValue);
|
|
SendData(1,AKeys);
|
|
SendData(2,IntData);
|
|
end
|
|
else if (length(IntData)>0) and assigned(AData) then
|
|
begin
|
|
SendHeader(5,mode,CommandID,QParam,AValue);
|
|
SendData(1,IntData);
|
|
SendData(2,AData);
|
|
end
|
|
else if assigned(AKeys) and assigned(AData) then
|
|
begin
|
|
SendHeader(4,mode,CommandID,QParam,AValue);
|
|
SendData(1,AKeys);
|
|
SendData(2,AData);
|
|
end
|
|
else if length(IntData)>0 then
|
|
begin
|
|
SendHeader(3,mode,CommandID,QParam,AValue);
|
|
SendData(1,IntData);
|
|
end
|
|
else if assigned(AData) then
|
|
begin
|
|
SendHeader(2,mode,CommandID,QParam,AValue);
|
|
SendData(2,AData);
|
|
end
|
|
else if assigned(AKeys) then
|
|
begin
|
|
SendHeader(1,mode,CommandID,QParam,AValue);
|
|
SendData(1,AKeys);
|
|
end
|
|
else
|
|
SendHeader(0,mode,CommandID,QParam,AValue);
|
|
|
|
except on E:exception do
|
|
begin
|
|
log(mtError,format('send error(%s) %s',[e.classname,e.message]));
|
|
raise;
|
|
end;
|
|
end;
|
|
end;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
procedure TConnectionThread.SendData(part: byte; const Data: TStream);
|
|
var
|
|
Buffer: TBuffer;
|
|
pos: integer;
|
|
footer: dWORD;
|
|
begin
|
|
try
|
|
log(mtExtra,'Send Stream '+inttostr(Data.Size));
|
|
setLength(Buffer,1+Data.Size+8);
|
|
pos := 0;
|
|
AddToBuffer(part,Buffer,pos);
|
|
AddToBuffer(Data,Buffer,pos);
|
|
SendBuffer(Buffer,pos);
|
|
|
|
except on e:Exception do
|
|
begin
|
|
raise;
|
|
end;
|
|
end;
|
|
end;
|
|
|
|
procedure TConnectionThread.SendData(part: byte; const Data: TBuffer);
|
|
var
|
|
Buffer: TBuffer;
|
|
pos: integer;
|
|
footer: dWORD;
|
|
begin
|
|
log(mtExtra,'Send Buffer '+inttostr(length(Data)));
|
|
try
|
|
setLength(Buffer,length(Data)+4+1);
|
|
pos := 0;
|
|
AddToBuffer(part,Buffer,pos);
|
|
AddToBuffer(Data,Buffer,pos);
|
|
SendBuffer(Buffer,pos);
|
|
|
|
except on e:Exception do
|
|
begin
|
|
raise;
|
|
end;
|
|
end;
|
|
end;
|
|
|
|
procedure TConnectionThread.SendData(part: byte; const Data: TStrings);
|
|
var
|
|
Buffer: TBuffer;
|
|
pos: integer;
|
|
w,footer: dWORD;
|
|
len,i: integer;
|
|
begin
|
|
try
|
|
log(mtExtra,'Send Strings');
|
|
LogStrings(mtExtra,fOwner.flogger,self,'KEYS',Data);
|
|
len := 1+4+8*Data.Count;
|
|
for i:=0 to Data.Count-1 do
|
|
inc(len,length(Data[i]));
|
|
setLength(Buffer,len);
|
|
pos := 0;
|
|
AddToBuffer(part,Buffer,pos);
|
|
w := Data.Count;
|
|
AddToBuffer(w,Buffer,pos);
|
|
for i := 0 to Data.Count-1 do
|
|
begin
|
|
AddToBuffer(Data[i],Buffer,pos);
|
|
w := ptrInt(Pointer(Data.Objects[i])) and $FFFFFFFF;
|
|
AddToBuffer(w,Buffer,pos);
|
|
end;
|
|
SendBuffer(Buffer,pos);
|
|
|
|
except on e:Exception do
|
|
begin
|
|
raise;
|
|
end;
|
|
end;
|
|
end;
|
|
|
|
procedure TConnectionThread.SendData(part: byte;
|
|
const Data: TParamArray);
|
|
var
|
|
i,pos: integer;
|
|
len: DWORD;
|
|
Buffer: TBuffer;
|
|
begin
|
|
len := length(Data);
|
|
log(mtExtra,'Send ParamArray '+inttostr(length(Data)));
|
|
InitBuffer(Sizeof(byte)+(len+1)*SizeOf(DWORD),Buffer,pos);
|
|
AddToBuffer(part,Buffer,pos);
|
|
AddToBuffer(len,Buffer,pos);
|
|
for i := low(Data) to High(Data) do
|
|
AddToBuffer(Data[i],Buffer,pos);
|
|
SendBuffer(Buffer,pos);
|
|
end;
|
|
|
|
|
|
function TConnectionThread.ReceiveMessage(out mode: byte; out Sender: TGUID;
|
|
out rNum: QWord; out CommandID: DWORD; out QParam: QWord; out Value: string;
|
|
out intData: TParamArray; out Keys: TStrings; out Data: TStream): boolean;
|
|
var
|
|
Buffer: TBuffer;
|
|
Len: dword;
|
|
pos: integer;
|
|
s: string;
|
|
b,b1: byte;
|
|
begin
|
|
result := false;
|
|
if Terminated then exit;
|
|
try
|
|
log(mtExtra,'ReceiveMessage');
|
|
if not ReceiveHeader(b,mode,Sender,rNum,CommandID,QParam,Value) then exit;
|
|
if Terminated then exit;
|
|
case b of
|
|
1: begin
|
|
if not ReceiveBuffer(Buffer,len) then exit;
|
|
pos := 0;
|
|
ReadFromBuffer(b,Buffer,pos);
|
|
if b<>1 then raise EFormatException.Create('');
|
|
ReadFromBuffer(Keys,Buffer,pos);
|
|
LogStrings(mtExtra, fOwner.flogger,self,'KEYS',Keys);
|
|
end;
|
|
2: begin
|
|
if not ReceiveBuffer(Buffer,len) then exit;
|
|
pos := 0;
|
|
ReadFromBuffer(b,Buffer,pos);
|
|
if b<>1 then raise EFormatException.Create('');
|
|
ReadFromBuffer(Data,Buffer,pos);
|
|
end;
|
|
3: begin
|
|
if not ReceiveBuffer(Buffer,len) then exit;
|
|
pos := 0;
|
|
ReadFromBuffer(b,Buffer,pos);
|
|
if b<>1 then raise EFormatException.Create('');
|
|
ReadFromBuffer(intData,Buffer,pos);
|
|
end;
|
|
4: begin
|
|
if not ReceiveBuffer(Buffer,len) then exit;
|
|
pos := 0;
|
|
ReadFromBuffer(b,Buffer,pos);
|
|
if b<>1 then raise EFormatException.Create('');
|
|
ReadFromBuffer(Keys,Buffer,pos);
|
|
LogStrings(mtExtra,fOwner.flogger,self,'Values',Keys);
|
|
if not ReceiveBuffer(Buffer,len) then exit;
|
|
pos := 0;
|
|
ReadFromBuffer(b,Buffer,pos);
|
|
if b<>2 then raise EFormatException.Create('');
|
|
ReadFromBuffer(Data,Buffer,pos);
|
|
end;
|
|
5: begin
|
|
if not ReceiveBuffer(Buffer,len) then exit;
|
|
pos := 0;
|
|
ReadFromBuffer(b,Buffer,pos);
|
|
if b<>1 then raise EFormatException.Create('');
|
|
ReadFromBuffer(intData,Buffer,pos);
|
|
if not ReceiveBuffer(Buffer,len) then exit;
|
|
pos := 0;
|
|
ReadFromBuffer(b,Buffer,pos);
|
|
if b<>2 then raise EFormatException.Create('');
|
|
ReadFromBuffer(Data,Buffer,pos);
|
|
end;
|
|
6: begin
|
|
if not ReceiveBuffer(Buffer,len) then exit;
|
|
pos := 0;
|
|
ReadFromBuffer(b,Buffer,pos);
|
|
if b<>1 then raise EFormatException.Create('');
|
|
ReadFromBuffer(Keys,Buffer,pos);
|
|
if not ReceiveBuffer(Buffer,len) then exit;
|
|
pos := 0;
|
|
ReadFromBuffer(b,Buffer,pos);
|
|
if b<>2 then raise EFormatException.Create('');
|
|
ReadFromBuffer(intData,Buffer,pos);
|
|
end;
|
|
7: begin
|
|
if not ReceiveBuffer(Buffer,len) then exit;
|
|
pos := 0;
|
|
ReadFromBuffer(b,Buffer,pos);
|
|
if b<>1 then raise EFormatException.Create('');
|
|
ReadFromBuffer(Keys,Buffer,pos);
|
|
if not ReceiveBuffer(Buffer,len) then exit;
|
|
pos := 0;
|
|
ReadFromBuffer(b,Buffer,pos);
|
|
if b<>2 then raise EFormatException.Create('');
|
|
ReadFromBuffer(intData,Buffer,pos);
|
|
if not ReceiveBuffer(Buffer,len) then exit;
|
|
pos := 0;
|
|
ReadFromBuffer(b,Buffer,pos);
|
|
if b<>3 then raise EFormatException.Create('');
|
|
ReadFromBuffer(Data,Buffer,pos);
|
|
end;
|
|
end;
|
|
result := true;
|
|
|
|
except on e:Exception do
|
|
begin
|
|
log(mtError,'!!ERROR ReceiveMessage '+e.message);
|
|
raise;
|
|
end;
|
|
end;
|
|
end;
|
|
|
|
constructor TConnectionThread.Create(Aowner: TMainThread; ASocket: TLSocket);
|
|
var
|
|
i,d1,d2 : integer;
|
|
begin
|
|
inherited Create(true);
|
|
//FreeOnTerminate:=true;
|
|
fCache := TRoundBuffer.Create(@AOwner.log, 1024*1024);
|
|
fSocket := ASocket;
|
|
fOwner := AOwner;
|
|
CreateGuid(ID);
|
|
recNo := 0;
|
|
log(mtExtra,'Create');
|
|
end;
|
|
|
|
destructor TConnectionThread.Destroy;
|
|
begin
|
|
fCache.Free;
|
|
fOwner.removeClient(self);
|
|
inherited Destroy;
|
|
end;
|
|
|
|
procedure TConnectionThread.Execute;
|
|
var
|
|
Sender: TGUID;
|
|
num,Param: QWORD;
|
|
CommandID: DWORD;
|
|
Value: string;
|
|
intData: TParamArray;
|
|
Keys: TStrings;
|
|
Data: TStream;
|
|
mode: byte;
|
|
begin
|
|
log(mtExtra,'start thread');
|
|
while not terminated do
|
|
begin
|
|
if cache.ReadReady.WaitFor(1000)<>wrSignaled then begin sleep(10);continue;end;
|
|
if terminated then break;
|
|
if not Socket.Connected then break;
|
|
Keys := nil;
|
|
Data := nil;
|
|
try
|
|
if ReceiveMessage(mode,Sender,num,CommandID,Param,Value,intData,Keys,Data) then
|
|
ProcessMessage(mode,CommandID,Param,Value,Keys,intData,Data);
|
|
|
|
finally
|
|
if assigned(Keys) then Keys.Free;
|
|
if assigned(Data) then Data.Free;
|
|
setLength(intData,0);
|
|
end;
|
|
end;
|
|
Cache.Close;
|
|
//Socket.Disconnect();
|
|
end;
|
|
|
|
procedure TConnectionThread.TerminatedSet;
|
|
begin
|
|
log(mtExtra,'terminate required');
|
|
Cache.Close;
|
|
fOwner.removeClient(self);
|
|
end;
|
|
|
|
|
|
end.
|
|
|