//OpenSCADA module Archive.DBArch file: val.cpp
/***************************************************************************
 *   Copyright (C) 2007-2023 by Roman Savochenko, <roman@oscada.org>       *
 *                                                                         *
 *   This program is free software; you can redistribute it and/or modify  *
 *   it under the terms of the GNU General Public License as published by  *
 *   the Free Software Foundation; either version 2 of the License, or     *
 *   (at your option) any later version.                                   *
 *                                                                         *
 *   This program is distributed in the hope that it will be useful,       *
 *   but WITHOUT ANY WARRANTY; without even the implied warranty of        *
 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         *
 *   GNU General Public License for more details.                          *
 *                                                                         *
 *   You should have received a copy of the GNU General Public License     *
 *   along with this program; if not, write to the                         *
 *   Free Software Foundation, Inc.,                                       *
 *   59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.             *
 ***************************************************************************/

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <math.h>

#include <tsys.h>
#include <tmess.h>
#include "arch.h"
#include "val.h"

using namespace DBArch;

//*************************************************
//* DBArch::ModVArch - Value archiver             *
//*************************************************
ModVArch::ModVArch( const string &iid, const string &idb, TElem *cf_el ) :
    TVArchivator(iid,idb,cf_el), needMeta(true), needRePushGrps(false), reqRes(true), mMaxSize(0), mTmAsStr(false), mGroupPrms(100)
{
    setSelPrior(1);
    setAddr(DB_GEN);
}

ModVArch::~ModVArch( )
{
    try { stop(); } catch(...) { }
}

TCntrNode &ModVArch::operator=( const TCntrNode &node )
{
    const TVArchivator *src_n = dynamic_cast<const TVArchivator*>(&node);
    if(!src_n) return *this;

    //Configuration copy
    exclCopy(*src_n, "ID;START;");
    setDB(src_n->DB());

    //TVArchivator::operator=(node);
    load_();

    return *this;
}

void ModVArch::postDisable( int flag )
{
    TVArchivator::postDisable(flag);

    if(flag&NodeRemove) {
	//Removing the grouping mode tables and records
	TConfig cfg(&mod->archEl());
	for(int aCnt = 0; TBDS::dataSeek(addr()+"."+mod->mainTbl(),"",aCnt++,cfg); ) {
	    string vTbl = cfg.cfg("TBL").getS(), aNm;
	    if(vTbl.find(archTbl()+"_") == string::npos) continue;

	    //Removing the groups table of the grouping mode
	    TBDS::dataDelTbl(addr()+"."+vTbl);

	    if(!TBDS::dataDel(addr()+"."+mod->mainTbl(),"",cfg,TBDS::UseAllKeys|TBDS::NoException)) break;
	    aCnt--;
	}
    }
}

void ModVArch::load_( )
{
    //TVArchivator::load_();

    if(addr().empty()) setAddr(DB_GEN);

    try {
	XMLNode prmNd;
	string  vl;
	prmNd.load(cfg("A_PRMS").getS());
	if(!(vl=prmNd.attr("Size")).empty())		setMaxSize(s2r(vl));
	if(!(vl=prmNd.attr("TmAsStr")).empty())		setTmAsStr(s2i(vl));
	if(!(vl=prmNd.attr("GroupPrms")).empty())	setGroupPrms(s2i(vl));
    } catch(...) { }
}

void ModVArch::save_( )
{
    XMLNode prmNd("prms");
    prmNd.setAttr("Size", r2s(maxSize()));
    prmNd.setAttr("TmAsStr", i2s(tmAsStr()));
    prmNd.setAttr("GroupPrms", i2s(groupPrms()));
    cfg("A_PRMS").setS(prmNd.save(XMLNode::BrAllPast));

    TVArchivator::save_();
}

void ModVArch::start( )
{
    //Connection to DB and enable status check
    string wdb = TBDS::realDBName(addr());
    AutoHD<TBD> db = SYS->db().at().nodeAt(wdb,0,'.');
    try { if(!db.at().enableStat()) db.at().enable(); }
    catch(TError &err) { mess_sys(TMess::Warning, _("Error enabling the target DB: %s"), err.mess.c_str()); }

    //Start getting data cycle
    TVArchivator::start();

    //First scan meta-DB. Load and connect archives
    checkArchivator();
}

void ModVArch::stop( bool full_del )
{
    //Stop getting data cicle an detach archives
    TVArchivator::stop(full_del);

    MtxAlloc res(reqRes, true);
    if(groupPrms()) accm.clear();
    needMeta = true;
}

TVArchEl *ModVArch::getArchEl( TVArchive &arch )	{ return new ModVArchEl(arch, *this); }

string ModVArch::archTbl( int iG )	{ return "DBAVl_"+id()+((iG<0)?"":("_<GRP>"+(iG?i2s(iG):""))); }

void ModVArch::checkArchivator( unsigned int cnt )
{
    if(needRePushGrps) pushAccumVals();

    if((groupPrms() && !needMeta) || (!groupPrms() && (cnt%60))) return;	//One time update at <needMeta> for groupPrms()

    MtxAlloc res(reqRes, true);	//Moved to the up to prevent for unlocked cleaning the accumulator on the groupPrms() mode

    //Clear the accummulator's groups
    if(groupPrms()) accm.clear();

    //Loading and processing the main table with the meta-information.
    TConfig cfg(&mod->archEl());
    for(int aCnt = 0; TBDS::dataSeek(addr()+"."+mod->mainTbl(),"",aCnt++,cfg,TBDS::UseCache); ) {
	string vTbl = cfg.cfg("TBL").getS(), aNm;
	if(vTbl.find(archTbl()+"_") == string::npos) continue;
	//Table per parameter mode
	if(vTbl.find(archTbl(0)) == string::npos) {
	    if(groupPrms()) continue;
	    aNm = vTbl.substr(archTbl().size()+1);

	    // Check to the archive present
	    AutoHD<TVArchive> varch;
	    if(owner().owner().valPresent(aNm)) varch = owner().owner().valAt(aNm);
	    else {
		// Add no present archive
		owner().owner().valAdd(aNm, "");
		varch = owner().owner().valAt(aNm);
		varch.at().setToStart(true);
		varch.at().setValType((TFld::Type)cfg.cfg("PRM2").getI());
		//varch.at().start();
	    }
	    //  Check for archive's start state and its starts early for propper redundancy sync
	    if(!varch.at().startStat() && varch.at().toStart())
		try { varch.at().start(); } catch(TError&) { continue; }	//!!!! Pass wrong archives
	    // Check for attached
	    if(!varch.at().archivatorPresent(workId()))	varch.at().archivatorAttach(workId());
	}
	//Single table per a parameters group mode
	else {
	    if(!groupPrms()) continue;
	    int gId = s2i(vTbl.substr(archTbl(0).size()));
	    SGrp *gO = NULL;

	    //MtxAlloc res(reqRes, true);

	    // The parameters
	    for(int off = 0, aTp; (aNm=TSYS::strLine(cfg.cfg("PRM2").getS(),0,&off)).size(); ) {
		aTp = s2i(TSYS::strParse(aNm,0,":"));
		aNm = TSYS::strParse(aNm,1,":");
		bool gO_init = !gO;
		accmGetReg(aNm, &gO, (TFld::Type)aTp, gId);
		if(!gO) continue;
		if(gO_init) {
		    gO->beg = s2ll(cfg.cfg("BEGIN").getS());
		    gO->end = s2ll(cfg.cfg("END").getS());
		    gO->per = s2ll(cfg.cfg("PRM1").getS());
		    //  Checking for deleting the archives group table
		    if(maxSize() && gO->end <= (TSYS::curTime()-(int64_t)(maxSize()*86400e6))) {
			TBDS::dataDelTbl(addr()+"."+vTbl);
			gO->beg = gO->end = gO->per = 0;
		    }
		}

		// Check to the archive present
		AutoHD<TVArchive> varch;
		if(owner().owner().valPresent(aNm)) varch = owner().owner().valAt(aNm);
		else {
		    // Add no present archive
		    owner().owner().valAdd(aNm, "");
		    varch = owner().owner().valAt(aNm);
		    varch.at().setToStart(true);
		    varch.at().setValType((TFld::Type)aTp);
		    //varch.at().start();
		}
		// Check for archive's start state and it starts early for propper redundancy sync
		if(!varch.at().startStat() && varch.at().toStart())
		    try { varch.at().start(); } catch(TError&) { continue; }	//!!!! Pass wrong archives

		// Check for attached
		if(!varch.at().archivatorPresent(workId())) varch.at().archivatorAttach(workId());

		// Set the main parameters
		ResAlloc res1(archRes, false);
		map<string,TVArchEl*>::iterator iel = archEl.find(aNm);
		if(iel != archEl.end()) {
		    ModVArchEl *ael = (ModVArchEl*)iel->second;
		    ael->mBeg = gO->beg; ael->mEnd = gO->end; ael->mPer = gO->per;

		    //  Read previous one
		    int64_t curTm = (TSYS::curTime()/vmax(1,ael->period()))*ael->period();
		    if(curTm >= ael->begin() && curTm <= ael->end() && ael->period() > 10000000 && ael->prevVal == EVAL_REAL) {
			ael->prevTm = curTm;
			switch(varch.at().valType()) {
			    case TFld::Integer: case TFld::Real: ael->prevVal = ael->getVal(&curTm, false).getR();	break;
			    default: break;
			}
		    }
		}
	    }
	}
    }

    string wDB = TBDS::realDBName(addr());
    needMeta = (TSYS::strParse(wDB,0,".") != DB_CFG &&
		!SYS->db().at().at(TSYS::strParse(wDB,0,".")).at().at(TSYS::strParse(wDB,1,".")).at().enableStat());
}

TValBuf &ModVArch::accmGetReg( const string &aNm, SGrp **grp, TFld::Type tp, int prefGrpPos )
{
    MtxAlloc res(reqRes, true);

    //Find up the parameter in group
    for(unsigned iG = 0; iG < accm.size(); iG++) {
	map<string,TValBuf>::iterator iP = accm[iG].els.find(aNm);
	if(iP != accm[iG].els.end()) {
	    if(grp) *grp = &accm[iG];
	    return iP->second;
	}
    }

    //Find or create a propper group or pointed by <prefGrpPos>
    for(unsigned iG = 0; prefGrpPos < 0 && iG < accm.size(); iG++)
	if((int)accm[iG].els.size() < groupPrms()) prefGrpPos = iG;
    if(prefGrpPos < 0) prefGrpPos = accm.size();
    while((int)accm.size() <= prefGrpPos) {
	accm.push_back(SGrp(accm.size()));
	accm.back().tblEl.fldAdd(new TFld("MARK",trS("Mark, time/(10*per)"),TFld::Integer,TCfg::Key,"20"));
	accm.back().tblEl.fldAdd(new TFld("TM",trS("Time, seconds"),TFld::Integer,TCfg::Key|(tmAsStr()?TFld::DateTimeDec:0),"20"));
	//accm.back().tblEl.fldAdd(new TFld("TMU",trS("Time, microseconds"),TFld::Integer,TCfg::Key,"10"));
    }

    //Place the parameter to the selected group
    SGrp &gO = accm[prefGrpPos];

    switch(tp&TFld::GenMask) {
	case TFld::Boolean: gO.tblEl.fldAdd(new TFld(aNm.c_str(),aNm.c_str(),TFld::Integer,TFld::NoFlag,"1",i2s(EVAL_BOOL).c_str()));	break;
	case TFld::Integer: gO.tblEl.fldAdd(new TFld(aNm.c_str(),aNm.c_str(),TFld::Integer,TFld::NoFlag,"",ll2s(EVAL_INT).c_str()));	break;
	case TFld::Real:    gO.tblEl.fldAdd(new TFld(aNm.c_str(),aNm.c_str(),TFld::Real,TFld::NoFlag,"",r2s(EVAL_REAL).c_str()));	break;
	case TFld::String:  gO.tblEl.fldAdd(new TFld(aNm.c_str(),aNm.c_str(),TFld::String,TFld::NoFlag,"1000",EVAL_STR));		break;
	default: break;
    }

    if(grp) *grp = &gO;
    gO.els[aNm] = TValBuf(tp, 100, 0);	//Mostly to init the value type for direct archiving only
    return gO.els[aNm];
}

void ModVArch::accmUnreg( const string &aNm )
{
    MtxAlloc res(reqRes, true);

    //Find up the parameter in group
    for(unsigned iG = 0; iG < accm.size(); iG++) {
	SGrp &oG = accm[iG];
	map<string,TValBuf>::iterator iP = oG.els.find(aNm);
	if(iP == oG.els.end())	continue;

	oG.els.erase(iP);

	oG.tblEl.fldDel(oG.tblEl.fldId(aNm,true));

	string	pLs;
	for(iP = oG.els.begin(); iP != oG.els.end(); ++iP)
	    pLs += i2s(iP->second.valType(true)) + ":" + iP->first + "\n";

	//Update the group meta info
	try{ grpMetaUpd(oG, &pLs); } catch(TError&) { }
	break;
    }
}

bool ModVArch::grpLimits( SGrp &oG, int64_t *ibeg, int64_t *iend )
{
    int64_t wEnd = iend ? vmax(oG.end, *iend) : oG.end;
    int64_t wBeg = ibeg ? (vmin(oG.beg,*ibeg) ? vmin(oG.beg,*ibeg) : vmax(oG.beg,*ibeg)) : oG.beg;

    if(ibeg && iend && wEnd <= oG.end && wBeg >= oG.beg) return false;

    try {
	AutoHD<TTable> tbl = TBDS::tblOpen(addr()+"."+archTbl(oG.pos), true);

	MtxAlloc res(reqRes, true);

	//Remove limited records
	TConfig	cfg(&oG.tblEl);
	if(maxSize() && (wEnd-wBeg) > (int64_t)(maxSize()*86400e6)) {
	    cfg.cfg("TM").setKeyUse(false); //cfg.cfg("TMU").setKeyUse(false);
	    int64_t nEnd = ((wEnd-(int64_t)(maxSize()*86400e6))/oG.per)*oG.per;
	    for(int tC = vmax(wBeg,nEnd-3600ll*oG.per)/(10*oG.per); tC < nEnd/(10*oG.per); ++tC) {
		cfg.cfg("MARK").setI(tC, true);
		tbl.at().fieldDel(cfg);
	    }
	    wBeg = nEnd;
	}
	oG.beg = wBeg;
	if(ibeg) *ibeg = wBeg;

	oG.dbOK = true;
    } catch(TError&) { oG.dbOK = false; throw; }

    return true;
}

void ModVArch::grpMetaUpd( SGrp &oG, const string *aLs )
{
    //Update the group meta info
    TConfig cfg(&mod->archEl());
    cfg.cfgViewAll(false);
    cfg.cfg("TBL").setS(archTbl(oG.pos), true);
    cfg.cfg("BEGIN").setS(ll2s(oG.beg), true);
    cfg.cfg("END").setS(ll2s(oG.end), true);
    cfg.cfg("PRM1").setS(ll2s(oG.per), true);
    if(aLs) cfg.cfg("PRM2").setS(*aLs, true);
    try {
	TBDS::dataSet(addr()+"."+mod->mainTbl(), "", cfg);
	oG.dbOK = true;
    } catch(TError&) { oG.dbOK = false; throw; }
}

void ModVArch::pushAccumVals( )
{
    MtxAlloc res(reqRes, true);

    needRePushGrps = false;

    for(unsigned iG = 0; iG < accm.size(); iG++) {
	SGrp &oG = accm[iG];
	if(!oG.accmBeg || !oG.accmEnd) continue;

	try {
	    AutoHD<TTable> tbl = TBDS::tblOpen(addr()+"."+archTbl(iG), true);
	    if(tbl.freeStat()) continue;

	    TConfig	cfg(&oG.tblEl);

	    //Write data to the table
	    for(int64_t beg = oG.accmBeg, ctm, per = (oG.per?oG.per:(valPeriod()*1e6)); beg <= oG.accmEnd; beg += per) {
		for(map<string,TValBuf>::iterator iP = oG.els.begin(); iP != oG.els.end(); ++iP) {
		    TValBuf &oP = iP->second;
		    ctm = beg;
		    bool noData = (ctm < oP.begin() || ctm > oP.end());
		    switch(oP.valType()) {
			case TFld::Boolean:	cfg.cfg(iP->first).setI(noData?EVAL_BOOL:oP.getB(&ctm,true));	break;
			case TFld::Integer:	cfg.cfg(iP->first).setI(noData?EVAL_INT:oP.getI(&ctm,true));	break;
			case TFld::Real:	cfg.cfg(iP->first).setR(noData?EVAL_REAL:oP.getR(&ctm,true));	break;
			case TFld::String:	cfg.cfg(iP->first).setS(noData?EVAL_STR:oP.getS(&ctm,true));	break;
			default: break;
		    }
		}

		cfg.cfg("MARK").setI(beg/(10*per));
		cfg.cfg("TM").setI(beg/1000000);
		//cfg.cfg("TMU").setI(beg%1000000);
		tbl.at().fieldSet(cfg);
	    }

	    if(!oG.per) oG.per = (valPeriod()*1e6);
	    if(!oG.beg) oG.beg = oG.accmBeg;
	    oG.end = oG.accmEnd;

	    //Archive size limit process
	    grpLimits(oG);

	    //Actual parameters list prepare
	    string	pLs;
	    for(map<string,TValBuf>::iterator iP = oG.els.begin(); iP != oG.els.end(); ++iP)
		pLs += i2s(iP->second.valType(true)) + ":" + iP->first + "\n";

	    //Update the group meta info
	    grpMetaUpd(oG, &pLs);

	    //Clear the accumulators and some parameters update
	    for(map<string,TValBuf>::iterator iP = oG.els.begin(); iP != oG.els.end(); ++iP) {
		// Update the main archive parameters
		ResAlloc res1(archRes, false);
		map<string,TVArchEl*>::iterator iel = archEl.find(iP->first);
		if(iel != archEl.end()) {
		    ModVArchEl *ael = (ModVArchEl*)iel->second;
		    ael->mBeg = oG.beg;
		    ael->mEnd = /*oG.end;*/ vmin(oG.end, iP->second.end());
		    ael->mPer = oG.per;
		}

		iP->second.clear();
	    }

	    oG.accmBeg = oG.accmEnd = 0;

	    oG.dbOK = true;
	}
	catch(TError &err) {
	    mess_err(err.cat.c_str(), "%s", err.mess.c_str());
	    oG.dbOK = false; needRePushGrps = true;
	    continue;
	}
    }
}

void ModVArch::cntrCmdProc( XMLNode *opt )
{
    //Get page info
    if(opt->name() == "info") {
	TVArchivator::cntrCmdProc(opt);
	ctrRemoveNode(opt,"/prm/cfg/A_PRMS");
	ctrMkNode("fld",opt,-1,"/prm/cfg/ADDR",EVAL_STR,startStat()?R_R_R_:RWRWR_,"root",SARH_ID,3,
	    "dest","select","select","/db/list:onlydb","help",TMess::labStor().c_str());
	if(ctrMkNode("area",opt,-1,"/prm/add",_("Additional options"),R_R_R_,"root",SARH_ID)) {
	    ctrMkNode("fld",opt,-1,"/prm/add/sz",_("Archive size, days"),RWRWR_,"root",SARH_ID,2,
		"tp","real", "help",_("Set to 0 to disable this limit and to rise some the performance."));
	    ctrMkNode("fld",opt,-1,"/prm/add/tmAsStr",_("To form time as a string"),startStat()?R_R_R_:RWRWR_,"root",SARH_ID,2,
		"tp","bool", "help",_("Only for databases that support such by means of specific data types like \"datetime\" in MySQL."));
	    ctrMkNode("fld",opt,-1,"/prm/add/groupPrms",_("Grouping limit of the parameters"),startStat()?R_R_R_:RWRWR_,"root",SARH_ID,4,
		"tp","dec", "min","0", "max","10000",
		"help",_("Enables for grouping arhivator's parameters in single table. Set to '0' for one table per parameter."));
	}
	return;
    }

    //Process command to page
    string a_path = opt->attr("path");
    if(a_path == "/prm/add/sz") {
	if(ctrChkNode(opt,"get",RWRWR_,"root",SARH_ID,SEC_RD))	opt->setText(r2s(maxSize()));
	if(ctrChkNode(opt,"set",RWRWR_,"root",SARH_ID,SEC_WR))	setMaxSize(s2r(opt->text()));
    }
    else if(a_path == "/prm/add/tmAsStr") {
	if(ctrChkNode(opt,"get",RWRWR_,"root",SARH_ID,SEC_RD))	opt->setText(i2s(tmAsStr()));
	if(ctrChkNode(opt,"set",RWRWR_,"root",SARH_ID,SEC_WR))	setTmAsStr(s2i(opt->text()));
    }
    else if(a_path == "/prm/add/groupPrms") {
	if(ctrChkNode(opt,"get",RWRWR_,"root",SARH_ID,SEC_RD))	opt->setText(i2s(groupPrms()));
	if(ctrChkNode(opt,"set",RWRWR_,"root",SARH_ID,SEC_WR))	setGroupPrms(s2i(opt->text()));
    }
    else TVArchivator::cntrCmdProc(opt);
}

bool ModVArch::cfgChange( TCfg &co, const TVariant &pc )
{
    if(co.name() == "V_PER" && co.getR() && co.getR() != pc.getR())
	co.setR(floor(vmax(1,co.getR())));	//Up to seconds now for DB

    return TVArchivator::cfgChange(co, pc);
}

//*************************************************
//* DBArch::ModVArchEl - Value archive element    *
//*************************************************
ModVArchEl::ModVArchEl( TVArchive &iachive, TVArchivator &iarchivator ) :
    TVArchEl(iachive, iarchivator), mBeg(0), mEnd(0), mPer(0), needMeta(false)
{
    if(!archivator().groupPrms()) {
	reqEl.fldAdd(new TFld("MARK",trS("Mark, time/(10*per)"),TFld::Integer,TCfg::Key,"20"));
	reqEl.fldAdd(new TFld("TM",trS("Time, seconds"),TFld::Integer,TCfg::Key|(archivator().tmAsStr()?TFld::DateTimeDec:0),"20"));
	//reqEl.fldAdd(new TFld("TMU",trS("Time, microseconds"),TFld::Integer,TCfg::Key,"10"));
	switch(archive().valType()) {
	    case TFld::Boolean: reqEl.fldAdd(new TFld("VAL",trS("Value"),TFld::Integer,TFld::NoFlag,"1",i2s(EVAL_BOOL).c_str()));break;
	    case TFld::Integer: reqEl.fldAdd(new TFld("VAL",trS("Value"),TFld::Integer,TFld::NoFlag,"",ll2s(EVAL_INT).c_str()));break;
	    case TFld::Real:    reqEl.fldAdd(new TFld("VAL",trS("Value"),TFld::Real,TFld::NoFlag,"",r2s(EVAL_REAL).c_str()));	break;
	    case TFld::String:  reqEl.fldAdd(new TFld("VAL",trS("Value"),TFld::String,TFld::NoFlag,"1000",EVAL_STR));		break;
	    default: break;
	}
    }

    needMeta = !readMeta();
}

ModVArchEl::~ModVArchEl( )	{ }

string ModVArchEl::archTbl( )	{ return "DBAVl_"+archivator().id()+"_"+archive().id(); }

void ModVArchEl::fullErase( )
{
    if(archivator().groupPrms()) archivator().accmUnreg(archive().id());
    else {
	//Remove info record
	TConfig cfg(&mod->archEl());
	cfg.cfg("TBL").setS(archTbl(), true);
	TBDS::dataDel(archivator().addr()+"."+mod->mainTbl(), "", cfg);

	//Removing the archive DB table
	TBDS::dataDelTbl(archivator().addr()+"."+archTbl());
    }
}

void ModVArchEl::getValsProc( TValBuf &ibuf, int64_t ibegIn, int64_t iendIn )
{
    if(needMeta && (needMeta=!readMeta()))	return;

    TValBuf buf(ibuf.valType(), ibuf.size(), ibuf.period(), ibuf.hardGrid(), ibuf.highResTm());

    //Request by single values for most big buffer's period
    if(buf.period()/10 > period()) {
	ibegIn = (ibegIn/buf.period())*buf.period();
	for(int64_t ctm; ibegIn <= iendIn; ibegIn += buf.period()) {
	    ctm = ibegIn;
	    TVariant vl = getValProc(&ctm, false);
	    buf.set(vl, ibegIn);
	}
	return;
    }

    //Going border to period time
    ibegIn = (ibegIn/period())*period();
    iendIn = (iendIn/period())*period();

    //Prepare border
    int64_t ibeg = vmax(ibegIn, begin());
    int64_t iend = vmin(iendIn, end());

    if(iend < ibeg)	return;

    //Get values
    // Fill by EVAL the data full range
    for(int64_t cTm = ibegIn; cTm <= iendIn; cTm += period()) buf.setR(EVAL_REAL, cTm);

    // Fill by EVAL previous range part without a real data
    //for(int64_t cTm = ibegIn; cTm < ibeg; cTm += period()) buf.setR(EVAL_REAL, cTm);

    // Same data values get
    TConfig cfg(&reqEl);
    string tblAddr = archivator().addr() + "." + archTbl(),
	   vlFld = "VAL";
    MtxAlloc res(archivator().reqRes, false);
    if(archivator().groupPrms()) {
	vlFld = archive().id();
	res.lock();
	ModVArch::SGrp *gO = NULL;
	archivator().accmGetReg(vlFld, &gO, archive().valType(true));
	//if(!gO->dbOK && pDt.realSize()) return 0;
	tblAddr = archivator().addr() + "." + archivator().archTbl(gO->pos);

	cfg.setElem(&gO->tblEl);
	cfg.cfgViewAll(false);
	cfg.cfg(vlFld).setView(true);
    }
    cfg.cfg("TM").setKeyUse(false); //cfg.cfg("TMU").setKeyUse(false);
    cfg.cfg("TM").setView(true);
    for(int64_t tC = ibeg, cTm; tC/(10*period()) <= iend/(10*period()); ) {
	tC = (tC/(10*period()))*(10*period());
	cfg.cfg("MARK").setI(tC/(10*period()), true);
	int eC = 0;
	for( ; TBDS::dataSeek(tblAddr,"",eC++,cfg,TBDS::UseCache); ) {
	    cTm = 1000000ll * cfg.cfg("TM").getI();
	    if(cTm < ibeg || cTm > iend) continue;
	    switch(archive().valType()) {
		case TFld::Boolean:	buf.setB(cfg.cfg(vlFld).getB(), cTm);	break;
		case TFld::Integer:	buf.setI(cfg.cfg(vlFld).getI(), cTm);	break;
		case TFld::Real:	buf.setR(cfg.cfg(vlFld).getR(), cTm);	break;
		case TFld::String:	buf.setS(cfg.cfg(vlFld).getS(), cTm);	break;
		default:		buf.setR(EVAL_REAL, cTm);		break;
	    }
	}
	tC += 10*period();
    }
    cfg.setElem(NULL);
    res.unlock();

    // Fill by EVAL following range part without a real data
    //for(int64_t cTm = iend+period(); cTm <= iendIn; cTm += period()) buf.setR(EVAL_REAL, cTm);

    //Check for target DB enabled (disabled by the connection loss)
    string wDB = TBDS::realDBName(archivator().addr());
    if(TSYS::strParse(wDB,0,".") == DB_CFG ||
	    SYS->db().at().at(TSYS::strParse(wDB,0,".")).at().at(TSYS::strParse(wDB,1,".")).at().enableStat())
	buf.getVals(ibuf, buf.begin(), buf.end());
}

TVariant ModVArchEl::getValProc( int64_t *tm, bool up_ord )
{
    if(needMeta && (needMeta=!readMeta()))	return EVAL_REAL;

    int64_t itm = tm ? *tm : SYS->curTime();
    itm = (itm/period())*period()+((up_ord && itm%period())?period():0);

    TConfig cf(&reqEl);
    string tblAddr = archivator().addr() + "." + archTbl(),
	   vlFld = "VAL";
    MtxAlloc res(archivator().reqRes, false);
    if(archivator().groupPrms()) {
	vlFld = archive().id();
	res.lock();
	ModVArch::SGrp *gO = NULL;
	archivator().accmGetReg(vlFld, &gO, archive().valType(true));
	//if(!gO->dbOK && pDt.realSize()) return 0;
	tblAddr = archivator().addr() + "." + archivator().archTbl(gO->pos);

	cf.setElem(&gO->tblEl);
	cf.cfgViewAll(false);
	cf.cfg(vlFld).setView(true);
    }
    cf.cfg("MARK").setI(itm/(10*period()));
    cf.cfg("TM").setI(itm/1000000);
    //cf.cfg("TMU").setI(itm%1000000);
    if(TBDS::dataGet(tblAddr,"",cf,TBDS::NoException)) {
	if(tm) *tm = itm;
	switch(archive().valType()) {
	    case TFld::Boolean:	return cf.cfg(vlFld).getB();
	    case TFld::Integer:	return cf.cfg(vlFld).getI();
	    case TFld::Real:	return cf.cfg(vlFld).getR();
	    case TFld::String:	return cf.cfg(vlFld).getS();
	    default: break;
	}
    }
    cf.setElem(NULL);
    res.unlock();

    if(tm) *tm = 0;
    return EVAL_REAL;
}

int64_t ModVArchEl::setValsProc( TValBuf &buf, int64_t ibeg, int64_t iend, bool toAccum )
{
    if(needMeta && (needMeta=!readMeta())) return 0;

    //Check border
    if(!buf.vOK(ibeg,iend)) return 0;
    ibeg = vmax(ibeg, buf.begin());
    iend = vmin(iend, buf.end());
    ibeg = (ibeg/period())*period();
    iend = (iend/period())*period();

    TConfig cfg(&reqEl);
    if(!archivator().groupPrms()) {
	//Table struct init
	AutoHD<TTable> tbl = TBDS::tblOpen(archivator().addr()+"."+archTbl(), true);
	if(tbl.freeStat()) return 0;

	//Write data to the table
	for(int64_t ctm; ibeg <= iend; ibeg++) {
	    switch(archive().valType()) {
		case TFld::Boolean:	cfg.cfg("VAL").setI(buf.getB(&ibeg,true));	break;
		case TFld::Integer:	cfg.cfg("VAL").setI(buf.getI(&ibeg,true));	break;
		case TFld::Real:	cfg.cfg("VAL").setR(buf.getR(&ibeg,true));	break;
		case TFld::String:	cfg.cfg("VAL").setS(buf.getS(&ibeg,true));	break;
		default: break;
	    }
	    ctm = (ibeg/period())*period();
	    cfg.cfg("MARK").setI(ctm/(10*period()));
	    cfg.cfg("TM").setI(ctm/1000000);
	    //cfg.cfg("TMU").setI(ctm%1000000);
	    tbl.at().fieldSet(cfg);
	    //Archive time border update
	    mBeg = mBeg ? vmin(mBeg,ctm) : ctm;
	    mEnd = mEnd ? vmax(mEnd,ctm) : ctm;
	}

	//Archive size limit process
	if(archivator().maxSize() && (mEnd-mBeg) > (int64_t)(archivator().maxSize()*86400e6)) {
	    int64_t nEnd = ((mEnd-(int64_t)(archivator().maxSize()*86400e6))/period())*period();
	    cfg.cfg("TM").setKeyUse(false); //cfg.cfg("TMU").setKeyUse(false);
	    for(int tC = vmax(mBeg,nEnd-3600ll*period())/(10*period()); tC < nEnd/(10*period()); ++tC) {
		cfg.cfg("MARK").setI(tC, true);
		tbl.at().fieldDel(cfg);
	    }
	    mBeg = nEnd;
	}
	tbl.free();
	//SYS->db().at().close(archivator().addr()+"."+archTbl());		//!!! No close the table manually

	//Updating the archive info
	cfg.setElem(&mod->archEl());
	cfg.cfgViewAll(false);
	cfg.cfg("TBL").setS(archTbl(), true);
	cfg.cfg("BEGIN").setS(ll2s(mBeg), true);
	cfg.cfg("END").setS(ll2s(mEnd), true);
	cfg.cfg("PRM1").setS(ll2s(mPer), true);
	cfg.cfg("PRM2").setS(i2s(archive().valType(true)), true);

	return TBDS::dataSet(archivator().addr()+"."+mod->mainTbl(),"",cfg,TBDS::NoException) ? iend : 0;
    }

    //The group table processing
    MtxAlloc res(archivator().reqRes, true);
    string vlFld = archive().id();
    ModVArch::SGrp *gO = NULL;
    TValBuf &pDt = archivator().accmGetReg(vlFld, &gO, archive().valType(true));
    if(toAccum) {
	if(!gO->dbOK && pDt.realSize()) return 0;	//Prevent the accumulated data loss at DB errors
	pDt = buf;
	//Checking for freezing some first sources, at stopping the controller objects
	if((!gO->accmBeg || !gO->accmEnd) && (TSYS::curTime()-iend) < (2*period()))	//?!?! Get 2 from Vision, WebVision trends
	{ gO->accmBeg = ibeg; gO->accmEnd = iend; }
	return vmin(gO->accmEnd, iend);
    }

    //Direct writing to the group table
    // Table struct init
    AutoHD<TTable> tbl = TBDS::tblOpen(archivator().addr()+"."+archivator().archTbl(gO->pos), true);
    if(tbl.freeStat()) return 0;

    cfg.setElem(&gO->tblEl);
    cfg.cfgViewAll(false);
    cfg.cfg(vlFld).setView(true);

    //Write data to the table
    bool updMeta = false;
    int64_t lstWrTm = 0;
    for(int64_t ctm; ibeg <= iend; ibeg++) {
	switch(archive().valType()) {
	    case TFld::Boolean:	cfg.cfg(vlFld).setI(buf.getB(&ibeg,true));	break;
	    case TFld::Integer:	cfg.cfg(vlFld).setI(buf.getI(&ibeg,true));	break;
	    case TFld::Real:	cfg.cfg(vlFld).setR(buf.getR(&ibeg,true));	break;
	    case TFld::String:	cfg.cfg(vlFld).setS(buf.getS(&ibeg,true));	break;
	    default: break;
	}
	ctm = (ibeg/period())*period();
	cfg.cfg("MARK").setI(ctm/(10*period()));
	cfg.cfg("TM").setI(ctm/1000000);
	//cfg.cfg("TMU").setI(ctm%1000000);
	tbl.at().fieldSet(cfg);
	lstWrTm = ctm;

	//Archive's range update
	mBeg = mBeg ? vmin(mBeg,ctm) : ctm;
	mEnd = mEnd ? vmax(mEnd,ctm) : ctm;

	if(!gO->per || !gO->beg || mBeg < gO->beg || mEnd > gO->end) {
	    gO->per = (archivator().valPeriod()*1e6);
	    gO->beg = gO->beg ? vmin(gO->beg, mBeg) : mBeg;
	    gO->end = vmax(gO->end, mEnd);
	    updMeta = true;
	}
    }

    //Archive size limit process
    if(archivator().grpLimits(*gO,&mBeg,&mEnd) || updMeta) {
	string	pLs;
	for(map<string,TValBuf>::iterator iP = gO->els.begin(); updMeta && iP != gO->els.end(); ++iP)
	    pLs += i2s(iP->second.valType(true)) + ":" + iP->first + "\n";

	archivator().grpMetaUpd(*gO, pLs.size()?&pLs:NULL);
    }

    cfg.setElem(NULL);

    return lstWrTm;
}

bool ModVArchEl::readMeta( )
{
    if(archivator().groupPrms()) {
	if(!mPer) mPer = (int64_t)(archivator().valPeriod()*1e6);
	return !archivator().needMeta;
    }

    bool rez = true;

    //Load message archive parameters
    TConfig cfg(&mod->archEl());
    cfg.cfg("TBL").setS(archTbl());
    if(TBDS::dataGet(archivator().addr()+"."+mod->mainTbl(),"",cfg,TBDS::NoException)) {
	mBeg = s2ll(cfg.cfg("BEGIN").getS());
	mEnd = s2ll(cfg.cfg("END").getS());
	mPer = s2ll(cfg.cfg("PRM1").getS());
	// Checking for deleting the archiver table
	if(archivator().maxSize() && mEnd <= (TSYS::curTime()-(int64_t)(archivator().maxSize()*86400e6))) {
	    TBDS::dataDelTbl(archivator().addr()+"."+archTbl());
	    mBeg = mEnd = mPer = 0;
	}
    } else rez = false;

    if(!mPer) mPer = (int64_t)(archivator().valPeriod()*1e6);

    //Check for the target DB enabled (disabled by the connection loss)
    if(!rez) {
	string wDB = TBDS::realDBName(archivator().addr());
	rez = (TSYS::strParse(wDB,0,".") == DB_CFG ||
		SYS->db().at().at(TSYS::strParse(wDB,0,".")).at().at(TSYS::strParse(wDB,1,".")).at().enableStat());
    }

    //Read previous
    if(rez) {
	// Load previous val check
	int64_t cur_tm = (TSYS::curTime()/vmax(1,period()))*period();
	if(cur_tm >= begin() && cur_tm <= end() && period() > 10000000 && prevVal == EVAL_REAL) {
	    prevTm = cur_tm;
	    switch(archive().valType()) {
		case TFld::Integer: case TFld::Real: prevVal = getVal(&cur_tm, false).getR();	break;
		default: break;
	    }
	}
    }

    return rez;
}