![]() |
|
|
|||||||
| Регистрация | << Правила форума >> | FAQ | Пользователи | Календарь | Поиск | Сообщения за сегодня | Все разделы прочитаны |
![]() |
|
|
Опции темы | Поиск в этой теме | Опции просмотра |
|
|
|
#1
|
|||
|
|||
|
Доброго Всем.
Собственно ситуация программа для статистики. Делается выборка из блока данных по определенным правилам. есть тип запись обьедененная в массив обьявляю так: Код:
type TDataSection = record Data: TBytes; MD5: RecMD5uint64; PRuleN: uint32; NumBlock: byte; BtLen: word; //byte; SectUse: byte; end; type TForm1 = class(TForm) .... public FDDataSect: TArray<TArray<TDataSection>>; OutFDDataSect: TArray<TArray<TDataSection>>; есть поток в нем обьявляю так Код:
type
TFindTr = class(TThread)
private
{ Private declarations }
public
DataArr: TArray<TArray<TDataSection>>;
RuleArr: TArray<TArray<TDataSection>>;
// исполняемая часть
procedure TFindTr.Execute;
while CloseTR = 0 do begin // закрыть поток по внешнему требованию
StatWork := 1;
StopFind := 0;
for NumRule := FStart to FEnd do begin
Move((Pointer(DataArr[NumBlock,NumData].Data))^, n1arrB[0], SectLen);
Move((Pointer(RuleArr[NumBlock,NumRule].Data))^, n2arrB[0], SectLen);
NOutarr[0] := N1arr[0] xor N2arr[0];
NOutarr[1] := N1arr[1] xor N2arr[1];
NOutarr[2] := N1arr[2] xor N2arr[2];
Move(nOutarrB, (Pointer(OutxorArr))^, SectLen);
PacLenSect := 0;
BsectSet := [];
for nBt := 0 to SectLen - 1 do begin
if OutxorArr[nBt] in BSectSet = false then begin
include(BSectSet, OutxorArr[nBt]);
inc(PacLenSect, 1);
end;
end;
// првоверка на условия
if (PacLenSect <= MinLenUse) then begin
FindResult := 1;
break;
end;
if StopFind = 1 then break;
end;
suspend; // ждем обновления данных из формы.
end;
обработка данных в форме Код:
// create potok
x := 0;
FindTR1 := TFindTr.Create(true);
FindTR1.Priority := tpNormal;
FindTR1.DataArr := FDDataSect;
FindTR1.RuleArr := OutFDDataSect;
for n := 0 to Ndata -1 do begin
FindTR1.NumData := n; // задаем блок для обработки из иассива FDDataSect [x,n]
FindTR1.Resume; // запускаем поток
tmp1 := 0;
while FindTR1.Suspended = false do begin // ждем окончания работы потока
if (tmp1 mod 300) = 0 then begin
tmp1 := 0;
application.ProcessMessages;
end;
inc(tmp1, 1);
end;
if (FindTR1.FindResult = 1) then begin // если блок найдем в масиве OutFDDataSect
inc(NMinLen, 1);
FindTR1.StatWork := 0;
end;
// если блок ненайдем в масиве OutFDDataSect добовляем блок FDDataSect[x,n]
if (FindTR1.FindResult = 0) and (n > 0) then begin
OutFDLen := length(OutFDDataSect[x]);
Setlength(OutFDDataSect[x], OutFDlen + 1);
OutFDDataSect[x, OutFDLen] := FDDataSect[x, n];
inc(allFindNMinLen, 1);
Nrule := length(OutFDDataSect[x]);
FindTR1.FEnd := Nrule - 1;
end;
Теперь собственно грабли. Все Вычесления проходят в форме - результат правильный. повторяемость 100% - но ОЧЕНЬ медленно. вычисления проходят в потоке БЕЗ ИЗМЕНЕНИЯ массива OutFDDataSect - результат правильный. повторяемость 100% вычисления проходят в потоке С ИЗМЕНЕНИЯ массива OutFDDataSect - и тут начинаются грабли. правильного результата нет. часто дожодит до зависания потока намертво. при дебаге выяснилось что изменения длинны OutFDDataSect не доходят до потока. Вобщем чтото сделал не правильно, при попытке выяснить с помощь гугля и глубокого курения RTFM где именно результата не дало. Причем такоеже обьявление прекрасно и стабильно работает в паралелльной сортировке массива OutFDDataSect методом инжекции. Вобще буду рад любым подсказкам, желательно с кодом. Ужк начал посматривать в сторону стандартной библиотеки System.Threading (Parallel Programming Library (PPL)) в принципе для моей задачи подходит.... |
|
#2
|
|||
|
|||
|
Забыл добавить -
специально удалил все попытки синхронизации. все только ухудшалось. Родитель и хозяин OutFDDataSect - TForm, все изменения В OutFDDataSect делает он. Поток при RESUME только читает из него данные и выставляет флаг FindResult. после чего засыпает и ждет другого запуска на поиск. При изменении OutFDDataSect (TForm) нужно обновить RuleArr в потоке. Вобщем нужен алгоритм параллельного поиска в массиве. |
|
#3
|
||||
|
||||
|
Код:
Move((Pointer(DataArr[NumBlock,NumData].Data))^, n1arrB[0], SectLen); Move((Pointer(RuleArr[NumBlock,NumRule].Data))^, n2arrB[0], SectLen); Цитата:
Код:
type
TSomeData = record
Data:TBytes;
end;
TForm1 = class(TForm)
Button1: TButton;
procedure Button1Click(Sender: TObject);
private
{ Private declarations }
FParam:array of array of TSomeData;
public
{ Public declarations }
end;
var
Form1: TForm1;
implementation
{$R *.dfm}
procedure TForm1.Button1Click(Sender: TObject);
var
i: Integer;
j: Integer;
somedata:RawByteString;
begin
somedata:='0123456789';
SetLength(FParam,10);
for i := Low(FParam) to High(FParam) do
begin
SetLength(FParam[i],10);
for j := Low(FParam[i]) to High(FParam[i]) do
begin
SetLength(FParam[i,j].Data,10);
Move(somedata[1],FParam[i,j].Data[0],10); // <=== смотреть сюда
end;
end;
somedata:=' '; //чтоб не париться с длинной.
Move(FParam[9,9].Data[0],somedata[1],10);
ShowMessage(somedata);
end;
end.![]() Код:
FindTR1.Resume; // запускаем поток tmp1 := 0; while FindTR1.Suspended = false do begin // ждем окончания работы потока Не понятного больше чем понятного. Ну или ты передаёшь не всю картину происходящего. |
|
#4
|
|||
|
|||
|
Код:
var
N1arr: array[0..2] of uint64; // calc xor sect
N2arr: array[0..2] of uint64;
NOutarr: array[0..2] of uint64;
n1arrB: array[0..23] of byte absolute n1arr;
n2arrB: array[0..23] of byte absolute n2arr;
nOutarrB: array[0..23] of byte absolute nOutarr;
.....
Move((Pointer(DataArr[NumBlock,NumData].Data))^, n1arrB[0], SectLen);
Move((Pointer(RuleArr[NumBlock,NumRule].Data))^, n2arrB[0], SectLen);
NOutarr[0] := N1arr[0] xor N2arr[0];
NOutarr[1] := N1arr[1] xor N2arr[1];
NOutarr[2] := N1arr[2] xor N2arr[2];
Move(nOutarrB, (Pointer(OutxorArr))^, SectLen);Aristarh Dark из 24 байт делается 3 int64 и xor а после выводятся в 24 байта. работает намного быстрее чем прямой перебор. толи в регистр 24 раза загружать, толи 3 .... Но за код спасибо. более красиво получается. один поток сделан чтобы убедиться что все работает правильно. А уж потом заботливо раскладывать самому себе грабли с делением на потоки. Для меня задача - как обновить динамический массив в потоке. при условии что поток в suspend. получить правильный и стабильный результат - а потом уже исполнять танец на граблях в в виде распараллеливания и деспечера потоков. |
|
#5
|
|||
|
|||
|
мдя. мыши плакали, кололись - но упорно продолжали грысть кактус или век живи - век учись. сказад ежик слезая с кактуса....
итак костьль найден. пошговый дамп массива из формы и потока показал что данные доходят правильно. результат в потоке тоже верный. Проблема оказалась в необходимой задержке для остаканивания системы и потока после Resume. после введения sleep(10) все заработало. результат повторяемый и совпадает с первым алгоритмом на 100%. Костыль: Код:
FindTR1.Resume; sleep(10); // собственно сам костыль. 10 минмал время принятия по стаканчику для выхода из suspend. while FindTR1.Suspended = false do begin |
|
#6
|
||||
|
||||
|
Цитата:
|
|
#7
|
|||
|
|||
|
мдя Или "песнь о Великом человеке"
В детстве Били не любили. Что-бы Билли не побили Просто небыло и дня... итак ответ на собственный вопрос пошговый дамп массива из формы и потока показал что данные доходят правильно. результат в потоке тоже верный. Проблема оказалась в необходимой задержке для остаканивания системы и потока после Resume. после введения sleep(10) все заработало. результат повторяемый правда сильно не стабильный. любой чих приводит к зависанию потока.Так-что пердача массива только для чтения через глобальную переменную - вполне правильный подход. В прцессе поиска среди плагиата созданного из одной статьи обратил внимание на TTask & TParallel.For результат TParallel.For - ну вот наконец-то перенесли аналог из FORTRAN-a - а нет. не туто было.... Нормально запустить не удалось. точнее заработало но так медленно что линейный алгоритм сильно обгонял. TTask - заработало. код ниже. паралельность - только в таком виде работает. Сильно зависит от загрузки системы... ускорение от 2 до 2.8 раз. Вобщем если нет желания заморачиватся с диспечером - можно использовать. для проверки основного алгоритма сойдет. для нормальной работы софта - в принципе тоже. Резюме по PPL Lib. Ни о какой паралельности не и речи. Распределенные вычесления - да.(каждому потоку своя копия данных). Работа с общим блоком - только последовательно.... Вся прелесть паралельности теряется на копировании данных в поток... Вобщем что этой библиотекой хотели сказать индусы так и осталось тайной, покрытой матом. Отдельное Спасибо MBo за пример с потоками. По факту - раскуриваю книжку и буду разбиратся с примером. хотелосьбы получить стабильное ускорение процесса от 2.9 и выше. Собственно рабочий код с TTask Код:
// WT, WTS и start/stoptime для подсчета времени - заменить как кому нравится......
uses System.Threading, System.SyncObjs, System.IOUtils;
type RecMD5uint64 = record
MD5Hi: uint64;
MD5Lo: uint64;
end;
type
TDataSection = record
Data: TBytes;
MD5: RecMD5uint64;
PRuleN: uint32;
NumBlock: byte;
BtLen: word; //byte;
SectUse: byte;
end;
..........
// TForm1 //
public
FDDataSect: TArray<TArray<TDataSection>>;
OutFDDataSect: TArray<TArray<TDataSection>>;
Ts2OutFDDataSect: TArray<TArray<TDataSection>>;
FindResult: integer;
TasksEnd: TBytes;
...........
procedure CreateTasksV2(DataArr, RuleArr:TArray<TArray<TDataSection>>; min, max: uint32; Pn: uint32; PNTask: byte; PtasksEnd: TBytes; var Wtasks: TArray<ITask>);
begin
Wtasks[PNTask] := TTask.Create(procedure()
var
PNumRule: uint32;
PnBt: uint32;
N1arr: array[0..2] of uint64; // calc xor sect
N2arr: array[0..2] of uint64;
NOutarr: array[0..2] of uint64;
n1arrB: array[0..23] of byte absolute n1arr;
n2arrB: array[0..23] of byte absolute n2arr;
nOutarrB: array[0..23] of byte absolute nOutarr;
BsectSet: set of byte;
OutxorArr: TBytes;
PacLenSect: uint32;
Px: uint32;
SectLen: uint32;
MinLenUse: uint32;
begin
Px := 0;
SectLen := 24;
MinLenUse := 12;
for PNumRule := min to max do begin
setlength(OutxorArr, 24);
Move((Pointer(DataArr[Px,Pn].Data))^, n1arrB[0], 24);
Move((Pointer(RuleArr[Px,PNumRule].Data))^, n2arrB[0], 24);
NOutarr[0] := N1arr[0] xor N2arr[0];
NOutarr[1] := N1arr[1] xor N2arr[1];
NOutarr[2] := N1arr[2] xor N2arr[2];
Move(nOutarrB, (Pointer(OutxorArr))^, 24);
// tmp1 := CalcSectByteLen3T4(tmp);
PacLenSect := 0;
BsectSet := [];
for PnBt := 0 to SectLen - 1 do begin
if OutxorArr[PnBt] in BSectSet = false then begin
include(BSectSet, OutxorArr[PnBt]);
inc(PacLenSect, 1);
end;
end;
if (PacLenSect <= MinLenUse) then begin
PtasksEnd[PNTask] := 1;
if PNTask = 1 then begin
PacLenSect := 1;
end;
end;
end;
end);
end;
procedure TForm1.Button7Click(Sender: TObject);
var
Tasks: TArray<ITask>;
task: ITask;
FStart, FEnd: uint32;
FSectCount, FSectLen: uint32;
FTrNum: uint32;
FIn: TMemoryStream;
NLoadRule, x, n: uint32;
wt, wts, oldwts: RecTime;
Ndata, Nrule: uint32;
nresfind, nresnofind: uint32;
s: string;
NumRule: uint32;
SectLen: uint32;
OutFDLen: uint32;
MinLenUse: uint32;
NMinLen, allFindNMinLen: uint32;
PacLen: uint32;
FS: tstringlist;
TaskRes: uint32;
TasksecLen, TasksecLenEnd: uint32;
BTasksEnd: uint32;
Zt: uint32;
begin
starttime(wt);
x := 0;
SectLen := 24;
MinLenUse := 12;
NMinLen := 0;
allFindNMinLen := 0;
setlength(FDDataSect, 0); // clear
setlength(OutFDDataSect, 0); // clear
FIn := TMemoryStream.Create;
FIn.LoadFromFile(Memo1.Lines[0]);
FIn.Seek(0, soBeginning);
NLoadRule := FIn.Size div 24;
setlength(FDDataSect, x + 1);
setlength(FDDataSect[x], NLoadRule);
for n := 0 to NLoadRule - 1 do begin
setlength(FDDataSect[x,n].Data, 24);
FIn.ReadData(FDDataSect[x,n].Data, 24);
end;
FIn.Free;
setlength(OutFDDataSect, x + 1);
OutFDLen := length(OutFDDataSect[x]);
if OutFDLen = 0 then begin
Setlength(OutFDDataSect[x], OutFDlen + 1);
OutFDDataSect[x, OutFDLen] := FDDataSect[x, 0];
end;
Ndata := length(FDDataSect[x]);
PBar1.Max := Ndata;
TaskRes := 0;
starttime(wts);
for n := 0 to Ndata - 1 do begin // WORK
NLoadRule := length(OutFDDataSect[x]);
setlength(Tasks, 0);
setlength(TasksEnd, 0);
// создаем потоки
if NLoadRule <= 1000 then begin // 1 potok
setlength(Tasks, 1);
setlength(TasksEnd, 1);
for BTasksEnd := 0 to Length(tasksEnd) - 1 do tasksEnd[BTasksEnd] := 0;
// procedure CreateTasksV2(DataArr, RuleArr:TArray<TArray<TDataSection>>; min, max: uint32; Pn: uint32; PNTask: byte;
// var PtasksEnd: TBytes; var Wtasks: TArray<ITask>);
CreateTasksV2(FDDataSect, OutFDDataSect, 0, NLoadRule -1, n, 0, TasksEnd, tasks);
end;
if NLoadRule > 1000 then begin
setlength(Tasks, 4);
setlength(TasksEnd, 4);
for BTasksEnd := 0 to Length(tasksEnd) - 1 do tasksEnd[BTasksEnd] := 0;
TasksecLen := NLoadRule div 4;
TasksecLenEnd := NLoadRule - TasksecLen;
CreateTasksV2(FDDataSect, OutFDDataSect, 0, TasksecLen - 1, n, 0, TasksEnd, Tasks);
CreateTasksV2(FDDataSect, OutFDDataSect, TasksecLen, (TasksecLen * 2) - 1, n, 1, TasksEnd, Tasks);
CreateTasksV2(FDDataSect, OutFDDataSect, (TasksecLen * 2), (TasksecLen * 3) - 1 , n, 2, TasksEnd, Tasks);
CreateTasksV2(FDDataSect, OutFDDataSect, (TasksecLen * 3), NLoadRule - 1 , n, 3, TasksEnd, Tasks);
end;
FindResult := 0;
for BTasksEnd := 0 to Length(tasksEnd) - 1 do tasksEnd[BTasksEnd] := 0;
for task in tasks do
task.Start;
FindResult := 0;
// for BTasksEnd := 0 to Length(tasksEnd) - 1 do tasksEnd[BTasksEnd] := 0;
//Ждём выполнение всех задач.
TTask.WaitForAll(tasks);
FindResult := 0;
for BTasksEnd := 0 to Length(tasksEnd) - 1 do begin
if tasksEnd[BTasksEnd] = 1 then FindResult := 1;;
end;
if (FindResult > 0) then begin
inc(NMinLen, 1);
end;
if (FindResult = 0) and (n > 0) then begin
OutFDLen := length(OutFDDataSect[x]);
Setlength(OutFDDataSect[x], OutFDlen + 1);
OutFDDataSect[x, OutFDLen] := FDDataSect[x, n];
inc(allFindNMinLen, 1);
end;
if (n mod 1000) = 0 then begin
PBar1.Position := n;
stoptime(wts);
oldwts.start := wts.stop - wts.start;
s := DecodeRecTime(oldwts);
oldwts.stop := oldwts.start;
memo1.Lines.Add(n.ToString +' '+ s +' '+ DecodeRecTime(wts));
application.ProcessMessages;
starttime(wts);
end;
end;
stoptime(wt);
memo1.Lines.Add(Zt.ToString + ' Dup '+ NMinLen.ToString +' Add '+ allFindNMinLen.ToString +' '+ DecodeRecTime(wt));
Memo1.Lines.SaveToFile('LogTime');
end; |